6ae192335b
Change-Id: If4e18f50126089a1971ab3ba945b34f6774423dd
499 lines
16 KiB
Python
499 lines
16 KiB
Python
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# Copyright 2011 Justin Santa Barbara
|
|
# Copyright 2011 OpenStack Foundation
|
|
# Copyright 2015 Hewlett-Packard Development Company, L.P.
|
|
#
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
import abc
|
|
import socket
|
|
import struct
|
|
import errno
|
|
|
|
import six
|
|
import eventlet.wsgi
|
|
import eventlet.debug
|
|
import oslo_messaging as messaging
|
|
from oslo_config import cfg
|
|
from oslo_log import log as logging
|
|
from oslo_service import service
|
|
from oslo_service import sslutils
|
|
|
|
from designate.i18n import _
|
|
from designate.i18n import _LE
|
|
from designate.i18n import _LI
|
|
from designate.i18n import _LW
|
|
from designate import policy
|
|
from designate import rpc
|
|
from designate import service_status
|
|
from designate import version
|
|
from designate import utils
|
|
|
|
|
|
# TODO(kiall): These options have been cut+paste from the old WSGI code, and
|
|
# should be moved into service:api etc..
|
|
wsgi_socket_opts = [
|
|
cfg.IntOpt('backlog',
|
|
default=4096,
|
|
help="Number of backlog requests to configure the socket with"),
|
|
cfg.IntOpt('tcp_keepidle',
|
|
default=600,
|
|
help="Sets the value of TCP_KEEPIDLE in seconds for each "
|
|
"server socket. Not supported on OS X."),
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(wsgi_socket_opts)
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class Service(service.Service):
|
|
"""
|
|
Service class to be shared among the diverse service inside of Designate.
|
|
"""
|
|
def __init__(self, threads=None):
|
|
threads = threads or 1000
|
|
|
|
super(Service, self).__init__(threads)
|
|
|
|
self._host = CONF.host
|
|
self._service_config = CONF['service:%s' % self.service_name]
|
|
|
|
policy.init()
|
|
|
|
# NOTE(kiall): All services need RPC initialized, as this is used
|
|
# for clients AND servers. Hence, this is common to
|
|
# all Designate services.
|
|
if not rpc.initialized():
|
|
rpc.init(CONF)
|
|
|
|
@abc.abstractproperty
|
|
def service_name(self):
|
|
pass
|
|
|
|
def start(self):
|
|
super(Service, self).start()
|
|
|
|
LOG.info(_('Starting %(name)s service (version: %(version)s)'),
|
|
{'name': self.service_name,
|
|
'version': version.version_info.version_string()})
|
|
|
|
def stop(self):
|
|
LOG.info(_('Stopping %(name)s service'), {'name': self.service_name})
|
|
|
|
super(Service, self).stop()
|
|
|
|
def _get_listen_on_addresses(self, default_port):
|
|
"""
|
|
Helper Method to handle migration from singular host/port to
|
|
multiple binds
|
|
"""
|
|
try:
|
|
# The API service uses "api_host", and "api_port", others use
|
|
# just host and port.
|
|
host = self._service_config.api_host
|
|
port = self._service_config.api_port
|
|
|
|
except cfg.NoSuchOptError:
|
|
host = self._service_config.host
|
|
port = self._service_config.port
|
|
|
|
if host or port is not None:
|
|
LOG.warning(_LW("host and port config options used, the 'listen' "
|
|
"option has been ignored"))
|
|
|
|
host = host or "0.0.0.0"
|
|
# "port" might be 0 to pick a free port, usually during testing
|
|
port = default_port if port is None else port
|
|
|
|
return [(host, port)]
|
|
|
|
else:
|
|
def _split_host_port(l):
|
|
try:
|
|
host, port = l.split(':', 1)
|
|
return host, int(port)
|
|
except ValueError:
|
|
LOG.exception(_LE('Invalid ip:port pair: %s'), l)
|
|
raise
|
|
|
|
# Convert listen pair list to a set, to remove accidental
|
|
# duplicates.
|
|
return map(_split_host_port, set(self._service_config.listen))
|
|
|
|
|
|
class RPCService(object):
|
|
"""
|
|
RPC Service mixin used by all Designate RPC Services
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
super(RPCService, self).__init__(*args, **kwargs)
|
|
|
|
LOG.debug("Creating RPC Server on topic '%s'" % self._rpc_topic)
|
|
self._rpc_server = rpc.get_server(
|
|
messaging.Target(topic=self._rpc_topic, server=self._host),
|
|
self._rpc_endpoints)
|
|
|
|
emitter_cls = service_status.HeartBeatEmitter.get_driver(
|
|
cfg.CONF.heartbeat_emitter.emitter_type
|
|
)
|
|
self.heartbeat_emitter = emitter_cls(
|
|
self.service_name, self.tg, status_factory=self._get_status
|
|
)
|
|
|
|
def _get_status(self):
|
|
status = "UP"
|
|
stats = {}
|
|
capabilities = {}
|
|
return status, stats, capabilities
|
|
|
|
@property
|
|
def _rpc_endpoints(self):
|
|
return [self]
|
|
|
|
@property
|
|
def _rpc_topic(self):
|
|
return self.service_name
|
|
|
|
def start(self):
|
|
super(RPCService, self).start()
|
|
|
|
LOG.debug("Starting RPC server on topic '%s'" % self._rpc_topic)
|
|
self._rpc_server.start()
|
|
|
|
# TODO(kiall): This probably belongs somewhere else, maybe the base
|
|
# Service class?
|
|
self.notifier = rpc.get_notifier(self.service_name)
|
|
|
|
for e in self._rpc_endpoints:
|
|
if e != self and hasattr(e, 'start'):
|
|
e.start()
|
|
|
|
self.heartbeat_emitter.start()
|
|
|
|
def stop(self):
|
|
LOG.debug("Stopping RPC server on topic '%s'" % self._rpc_topic)
|
|
self.heartbeat_emitter.stop()
|
|
|
|
for e in self._rpc_endpoints:
|
|
if e != self and hasattr(e, 'stop'):
|
|
e.stop()
|
|
|
|
# Try to shut the connection down, but if we get any sort of
|
|
# errors, go ahead and ignore them.. as we're shutting down anyway
|
|
try:
|
|
self._rpc_server.stop()
|
|
except Exception:
|
|
pass
|
|
|
|
super(RPCService, self).stop()
|
|
|
|
def wait(self):
|
|
for e in self._rpc_endpoints:
|
|
if e != self and hasattr(e, 'wait'):
|
|
e.wait()
|
|
|
|
super(RPCService, self).wait()
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class WSGIService(object):
|
|
"""
|
|
WSGI Service mixin used by all Designate WSGI Services
|
|
"""
|
|
def __init__(self, *args, **kwargs):
|
|
super(WSGIService, self).__init__(*args, **kwargs)
|
|
|
|
self._wsgi_socks = []
|
|
|
|
@abc.abstractproperty
|
|
def _wsgi_application(self):
|
|
pass
|
|
|
|
def start(self):
|
|
super(WSGIService, self).start()
|
|
|
|
addresses = self._get_listen_on_addresses(9001)
|
|
|
|
for address in addresses:
|
|
self._start(address[0], address[1])
|
|
|
|
def _start(self, host, port):
|
|
wsgi_sock = utils.bind_tcp(
|
|
host, port, CONF.backlog, CONF.tcp_keepidle)
|
|
|
|
if sslutils.is_enabled(CONF):
|
|
wsgi_sock = sslutils.wrap(CONF, wsgi_sock)
|
|
|
|
self._wsgi_socks.append(wsgi_sock)
|
|
|
|
self.tg.add_thread(self._wsgi_handle, wsgi_sock)
|
|
|
|
def _wsgi_handle(self, wsgi_sock):
|
|
logger = logging.getLogger('eventlet.wsgi')
|
|
# Adjust wsgi MAX_HEADER_LINE to accept large tokens.
|
|
eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line
|
|
|
|
eventlet.wsgi.server(wsgi_sock,
|
|
self._wsgi_application,
|
|
custom_pool=self.tg.pool,
|
|
log=logger)
|
|
|
|
|
|
@six.add_metaclass(abc.ABCMeta)
|
|
class DNSService(object):
|
|
"""
|
|
DNS Service mixin used by all Designate DNS Services
|
|
"""
|
|
|
|
_TCP_RECV_MAX_SIZE = 65535
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(DNSService, self).__init__(*args, **kwargs)
|
|
|
|
# Eventet will complain loudly about our use of multiple greentheads
|
|
# reading/writing to the UDP socket at once. Disable this warning.
|
|
eventlet.debug.hub_prevent_multiple_readers(False)
|
|
|
|
self._dns_socks_tcp = []
|
|
self._dns_socks_udp = []
|
|
|
|
@abc.abstractproperty
|
|
def _dns_application(self):
|
|
pass
|
|
|
|
def start(self):
|
|
super(DNSService, self).start()
|
|
|
|
addresses = self._get_listen_on_addresses(self._dns_default_port)
|
|
|
|
for address in addresses:
|
|
self._start(address[0], address[1])
|
|
|
|
def _start(self, host, port):
|
|
sock_tcp = utils.bind_tcp(
|
|
host, port, self._service_config.tcp_backlog)
|
|
|
|
sock_udp = utils.bind_udp(
|
|
host, port)
|
|
|
|
self._dns_socks_tcp.append(sock_tcp)
|
|
self._dns_socks_udp.append(sock_udp)
|
|
|
|
self.tg.add_thread(self._dns_handle_tcp, sock_tcp)
|
|
self.tg.add_thread(self._dns_handle_udp, sock_udp)
|
|
|
|
def wait(self):
|
|
super(DNSService, self).wait()
|
|
|
|
def stop(self):
|
|
# When the service is stopped, the threads for _handle_tcp and
|
|
# _handle_udp are stopped too.
|
|
super(DNSService, self).stop()
|
|
|
|
for sock_tcp in self._dns_socks_tcp:
|
|
sock_tcp.close()
|
|
|
|
for sock_udp in self._dns_socks_udp:
|
|
sock_udp.close()
|
|
|
|
def _dns_handle_tcp(self, sock_tcp):
|
|
LOG.info(_LI("_handle_tcp thread started"))
|
|
|
|
while True:
|
|
try:
|
|
# handle a new TCP connection
|
|
client, addr = sock_tcp.accept()
|
|
|
|
if self._service_config.tcp_recv_timeout:
|
|
client.settimeout(self._service_config.tcp_recv_timeout)
|
|
|
|
LOG.debug("Handling TCP Request from: %(host)s:%(port)d" %
|
|
{'host': addr[0], 'port': addr[1]})
|
|
|
|
# Dispatch a thread to handle the connection
|
|
self.tg.add_thread(self._dns_handle_tcp_conn, addr, client)
|
|
|
|
# NOTE: Any uncaught exceptions will result in the main loop
|
|
# ending unexpectedly. Ensure proper ordering of blocks, and
|
|
# ensure no exceptions are generated from within.
|
|
except socket.timeout:
|
|
client.close()
|
|
LOG.warning(_LW("TCP Timeout from: %(host)s:%(port)d") %
|
|
{'host': addr[0], 'port': addr[1]})
|
|
|
|
except socket.error as e:
|
|
client.close()
|
|
errname = errno.errorcode[e.args[0]]
|
|
LOG.warning(
|
|
_LW("Socket error %(err)s from: %(host)s:%(port)d") %
|
|
{'host': addr[0], 'port': addr[1], 'err': errname})
|
|
|
|
except Exception:
|
|
client.close()
|
|
LOG.exception(_LE("Unknown exception handling TCP request "
|
|
"from: %(host)s:%(port)d") %
|
|
{'host': addr[0], 'port': addr[1]})
|
|
|
|
def _dns_handle_tcp_conn(self, addr, client):
|
|
"""
|
|
Handle a DNS Query over TCP. Multiple queries can be pipelined
|
|
though the same TCP connection but they will be processed
|
|
sequentially.
|
|
See https://tools.ietf.org/html/draft-ietf-dnsop-5966bis-03
|
|
Raises no exception: it's to be run in an eventlet green thread
|
|
|
|
:param addr: Tuple of the client's (IP addr, Port)
|
|
:type addr: tupple
|
|
:param client: Client socket
|
|
:type client: socket
|
|
:raises: None
|
|
"""
|
|
host, port = addr
|
|
try:
|
|
# The whole loop lives in a try/except block. On exceptions, the
|
|
# connection is closed: there would be little chance to save save
|
|
# the connection after a struct error, a socket error.
|
|
while True:
|
|
# Decode the first 2 bytes containing the query length
|
|
expected_length_raw = client.recv(2)
|
|
(expected_length, ) = struct.unpack('!H', expected_length_raw)
|
|
|
|
# Keep receiving data until we've got all the data we expect
|
|
# The buffer contains only one query at a time
|
|
buf = b''
|
|
while len(buf) < expected_length:
|
|
recv_size = min(expected_length - len(buf),
|
|
self._TCP_RECV_MAX_SIZE)
|
|
data = client.recv(recv_size)
|
|
if not data:
|
|
break
|
|
buf += data
|
|
|
|
query = buf
|
|
|
|
# Call into the DNS Application itself with payload and addr
|
|
for response in self._dns_application(
|
|
{'payload': query, 'addr': addr}):
|
|
|
|
# Send back a response only if present
|
|
if response is None:
|
|
continue
|
|
|
|
# Handle TCP Responses
|
|
msg_length = len(response)
|
|
tcp_response = struct.pack("!H", msg_length) + response
|
|
client.sendall(tcp_response)
|
|
|
|
except socket.timeout:
|
|
LOG.info(_LI("TCP Timeout from: %(host)s:%(port)d"),
|
|
{'host': host, 'port': port})
|
|
|
|
except socket.error as e:
|
|
errname = errno.errorcode[e.args[0]]
|
|
LOG.warning(_LW("Socket error %(err)s from: %(host)s:%(port)d"),
|
|
{'host': host, 'port': port, 'err': errname})
|
|
|
|
except struct.error:
|
|
LOG.warning(_LW("Invalid packet from: %(host)s:%(port)d"),
|
|
{'host': host, 'port': port})
|
|
|
|
except Exception:
|
|
LOG.exception(_LE("Unknown exception handling TCP request "
|
|
"from: %(host)s:%(port)d"),
|
|
{'host': host, 'port': port})
|
|
finally:
|
|
client.close()
|
|
|
|
def _dns_handle_udp(self, sock_udp):
|
|
"""Handle a DNS Query over UDP in a dedicated thread
|
|
|
|
:param sock_udp: UDP socket
|
|
:type sock_udp: socket
|
|
:raises: None
|
|
"""
|
|
LOG.info(_LI("_handle_udp thread started"))
|
|
|
|
while True:
|
|
try:
|
|
# TODO(kiall): Determine the appropriate default value for
|
|
# UDP recvfrom.
|
|
payload, addr = sock_udp.recvfrom(8192)
|
|
|
|
LOG.debug("Handling UDP Request from: %(host)s:%(port)d" %
|
|
{'host': addr[0], 'port': addr[1]})
|
|
|
|
# Dispatch a thread to handle the query
|
|
self.tg.add_thread(self._dns_handle_udp_query, sock_udp, addr,
|
|
payload)
|
|
|
|
except socket.error as e:
|
|
errname = errno.errorcode[e.args[0]]
|
|
LOG.warning(
|
|
_LW("Socket error %(err)s from: %(host)s:%(port)d") %
|
|
{'host': addr[0], 'port': addr[1], 'err': errname})
|
|
|
|
except Exception:
|
|
LOG.exception(_LE("Unknown exception handling UDP request "
|
|
"from: %(host)s:%(port)d") %
|
|
{'host': addr[0], 'port': addr[1]})
|
|
|
|
def _dns_handle_udp_query(self, sock, addr, payload):
|
|
"""
|
|
Handle a DNS Query over UDP
|
|
|
|
:param sock: UDP socket
|
|
:type sock: socket
|
|
:param addr: Tuple of the client's (IP, Port)
|
|
:type addr: tuple
|
|
:param payload: Raw DNS query payload
|
|
:type payload: string
|
|
:raises: None
|
|
"""
|
|
try:
|
|
# Call into the DNS Application itself with the payload and addr
|
|
for response in self._dns_application(
|
|
{'payload': payload, 'addr': addr}):
|
|
|
|
# Send back a response only if present
|
|
if response is not None:
|
|
sock.sendto(response, addr)
|
|
|
|
except Exception:
|
|
LOG.exception(_LE("Unhandled exception while processing request "
|
|
"from %(host)s:%(port)d") %
|
|
{'host': addr[0], 'port': addr[1]})
|
|
|
|
|
|
_launcher = None
|
|
|
|
|
|
def serve(server, workers=None):
|
|
global _launcher
|
|
if _launcher:
|
|
raise RuntimeError(_('serve() can only be called once'))
|
|
|
|
_launcher = service.launch(CONF, server, workers=workers)
|
|
|
|
|
|
def wait():
|
|
try:
|
|
_launcher.wait()
|
|
except KeyboardInterrupt:
|
|
LOG.debug('Caught KeyboardInterrupt, shutting down now')
|
|
rpc.cleanup()
|