diff --git a/designate/__init__.py b/designate/__init__.py index 45dfced4b..8df655239 100644 --- a/designate/__init__.py +++ b/designate/__init__.py @@ -24,7 +24,6 @@ from oslo_concurrency import lockutils import oslo_messaging as messaging _EXTRA_DEFAULT_LOG_LEVELS = [ - 'eventlet.wsgi.server=WARN', 'kazoo.client=WARN', 'keystone=INFO', 'oslo_service.loopingcall=WARN', diff --git a/designate/agent/service.py b/designate/agent/service.py index 74acaa47d..1763712e0 100644 --- a/designate/agent/service.py +++ b/designate/agent/service.py @@ -37,22 +37,41 @@ from designate.utils import DEFAULT_AGENT_PORT CONF = cfg.CONF -class Service(service.DNSService, service.Service): +class Service(service.Service): _dns_default_port = DEFAULT_AGENT_PORT - def __init__(self, threads=None): - super(Service, self).__init__(threads=threads) + def __init__(self): + super(Service, self).__init__( + self.service_name, threads=cfg.CONF['service:agent'].threads + ) + + self.dns_service = service.DNSService( + self.dns_application, self.tg, + cfg.CONF['service:agent'].listen, + cfg.CONF['service:agent'].tcp_backlog, + cfg.CONF['service:agent'].tcp_recv_timeout, + ) backend_driver = cfg.CONF['service:agent'].backend_driver self.backend = agent_backend.get_backend(backend_driver, self) + def start(self): + super(Service, self).start() + self.dns_service.start() + self.backend.start() + + def stop(self, graceful=False): + self.dns_service.stop() + self.backend.stop() + super(Service, self).stop(graceful) + @property def service_name(self): return 'agent' @property @utils.cache_result - def _dns_application(self): + def dns_application(self): # Create an instance of the RequestHandler class application = handler.RequestHandler() if cfg.CONF['service:agent'].notify_delay > 0.0: @@ -60,12 +79,3 @@ class Service(service.DNSService, service.Service): application = dnsutils.SerializationMiddleware(application) return application - - def start(self): - super(Service, self).start() - self.backend.start() - - def stop(self): - super(Service, self).stop() - # TODO(kiall): Shouldn't we be stppping the backend here too? To fix - # in another review. diff --git a/designate/api/service.py b/designate/api/service.py index ca04b0888..224145eb2 100644 --- a/designate/api/service.py +++ b/designate/api/service.py @@ -18,41 +18,32 @@ from oslo_log import log as logging from paste import deploy from designate import exceptions -from designate import utils from designate import service -from designate import service_status - +from designate import utils LOG = logging.getLogger(__name__) -class Service(service.WSGIService, service.Service): - def __init__(self, threads=None): - super(Service, self).__init__(threads=threads) - - emitter_cls = service_status.HeartBeatEmitter.get_driver( - cfg.CONF.heartbeat_emitter.emitter_type - ) - self.heartbeat_emitter = emitter_cls( - self.service_name, self.tg, status_factory=self._get_status +class Service(service.WSGIService): + def __init__(self): + super(Service, self).__init__( + self.wsgi_application, + self.service_name, + cfg.CONF['service:api'].listen, ) def start(self): super(Service, self).start() - self.heartbeat_emitter.start() - def _get_status(self): - status = "UP" - stats = {} - capabilities = {} - return status, stats, capabilities + def stop(self, graceful=True): + super(Service, self).stop(graceful) @property def service_name(self): return 'api' @property - def _wsgi_application(self): + def wsgi_application(self): api_paste_config = cfg.CONF['service:api'].api_paste_config config_paths = utils.find_config(api_paste_config) diff --git a/designate/central/service.py b/designate/central/service.py index 169f7fbe5..3843de888 100644 --- a/designate/central/service.py +++ b/designate/central/service.py @@ -184,37 +184,40 @@ def notification(notification_type): return outer -class Service(service.RPCService, service.Service): +class Service(service.RPCService): RPC_API_VERSION = '6.2' target = messaging.Target(version=RPC_API_VERSION) - def __init__(self, threads=None): - super(Service, self).__init__(threads=threads) + def __init__(self): + self._scheduler = None + self._storage = None + self._quota = None + + super(Service, self).__init__( + self.service_name, cfg.CONF['service:central'].topic, + threads=cfg.CONF['service:central'].threads, + ) self.network_api = network_api.get_network_api(cfg.CONF.network_api) - # update_service_status needs is called by the emitter so we pass - # ourselves as the rpc_api. - self.heartbeat_emitter.rpc_api = self - @property def scheduler(self): - if not hasattr(self, '_scheduler'): + if not self._scheduler: # Get a scheduler instance self._scheduler = scheduler.get_scheduler(storage=self.storage) return self._scheduler @property def quota(self): - if not hasattr(self, '_quota'): + if not self._quota: # Get a quota manager instance self._quota = quota.get_quota() return self._quota @property def storage(self): - if not hasattr(self, '_storage'): + if not self._storage: # Get a storage connection storage_driver = cfg.CONF['service:central'].storage_driver self._storage = storage.get_storage(storage_driver) @@ -232,8 +235,8 @@ class Service(service.RPCService, service.Service): super(Service, self).start() - def stop(self): - super(Service, self).stop() + def stop(self, graceful=True): + super(Service, self).stop(graceful) @property def mdns_api(self): @@ -251,7 +254,7 @@ class Service(service.RPCService, service.Service): def zone_api(self): # TODO(timsim): Remove this when pool_manager_api is gone if cfg.CONF['service:worker'].enabled: - return self.worker_api + return self.worker_api return self.pool_manager_api def _is_valid_zone_name(self, context, zone_name): diff --git a/designate/cmd/agent.py b/designate/cmd/agent.py index ffe0c775e..e96a5d59b 100644 --- a/designate/cmd/agent.py +++ b/designate/cmd/agent.py @@ -28,7 +28,6 @@ from designate.agent import service as agent_service CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.agent', group='service:agent') -CONF.import_opt('threads', 'designate.agent', group='service:agent') def main(): @@ -38,6 +37,8 @@ def main(): hookpoints.log_hook_setup() - server = agent_service.Service(threads=CONF['service:agent'].threads) + server = agent_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:agent'].workers) + heartbeat.start() service.wait() diff --git a/designate/cmd/api.py b/designate/cmd/api.py index a4e83251f..6ac6d5581 100644 --- a/designate/cmd/api.py +++ b/designate/cmd/api.py @@ -29,7 +29,6 @@ from designate.api import service as api_service CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.api', group='service:api') -CONF.import_opt('threads', 'designate.api', group='service:api') cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token') @@ -40,7 +39,8 @@ def main(): hookpoints.log_hook_setup() - server = api_service.Service(threads=CONF['service:api'].threads) + server = api_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:api'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/cmd/central.py b/designate/cmd/central.py index 5364e3544..80ee8b930 100644 --- a/designate/cmd/central.py +++ b/designate/cmd/central.py @@ -23,12 +23,11 @@ from designate import hookpoints from designate import service from designate import utils from designate import version -from designate.central import service as central +from designate.central import service as central_service CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.central', group='service:central') -CONF.import_opt('threads', 'designate.central', group='service:central') def main(): @@ -38,7 +37,9 @@ def main(): hookpoints.log_hook_setup() - server = central.Service(threads=CONF['service:central'].threads) + server = central_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg, + rpc_api=server) service.serve(server, workers=CONF['service:central'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/cmd/mdns.py b/designate/cmd/mdns.py index 66dd98705..e79586dc8 100644 --- a/designate/cmd/mdns.py +++ b/designate/cmd/mdns.py @@ -28,7 +28,6 @@ from designate.mdns import service as mdns_service CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.mdns', group='service:mdns') -CONF.import_opt('threads', 'designate.mdns', group='service:mdns') def main(): @@ -38,7 +37,8 @@ def main(): hookpoints.log_hook_setup() - server = mdns_service.Service(threads=CONF['service:mdns'].threads) + server = mdns_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:mdns'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/cmd/pool_manager.py b/designate/cmd/pool_manager.py index 686541ba9..013af497a 100644 --- a/designate/cmd/pool_manager.py +++ b/designate/cmd/pool_manager.py @@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__) CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.pool_manager', group='service:pool_manager') -CONF.import_opt('threads', 'designate.pool_manager', - group='service:pool_manager') def main(): @@ -53,12 +51,11 @@ def main(): 'designate-worker', version='newton', removal_version='rocky') - server = pool_manager_service.Service( - threads=CONF['service:pool_manager'].threads - ) + server = pool_manager_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) hookpoints.log_hook_setup() service.serve(server, workers=CONF['service:pool_manager'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/cmd/producer.py b/designate/cmd/producer.py index 9b16a735a..bd7bbe2bc 100644 --- a/designate/cmd/producer.py +++ b/designate/cmd/producer.py @@ -28,7 +28,6 @@ from designate.producer import service as producer_service LOG = logging.getLogger(__name__) CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.producer', group='service:producer') -CONF.import_opt('threads', 'designate.producer', group='service:producer') def main(): @@ -46,7 +45,8 @@ def main(): hookpoints.log_hook_setup() - server = producer_service.Service(threads=CONF['service:producer'].threads) + server = producer_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:producer'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/cmd/sink.py b/designate/cmd/sink.py index 4081e1462..7242b198c 100644 --- a/designate/cmd/sink.py +++ b/designate/cmd/sink.py @@ -28,7 +28,6 @@ from designate.sink import service as sink_service CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.sink', group='service:sink') -CONF.import_opt('threads', 'designate.sink', group='service:sink') def main(): @@ -38,6 +37,8 @@ def main(): hookpoints.log_hook_setup() - server = sink_service.Service(threads=CONF['service:sink'].threads) + server = sink_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:sink'].workers) + heartbeat.start() service.wait() diff --git a/designate/cmd/worker.py b/designate/cmd/worker.py index 48a239387..0560a9462 100644 --- a/designate/cmd/worker.py +++ b/designate/cmd/worker.py @@ -28,7 +28,6 @@ from designate.worker import service as worker_service LOG = logging.getLogger(__name__) CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.worker', group='service:worker') -CONF.import_opt('threads', 'designate.worker', group='service:worker') def main(): @@ -46,7 +45,8 @@ def main(): hookpoints.log_hook_setup() - server = worker_service.Service(threads=CONF['service:worker'].threads) + server = worker_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:worker'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/cmd/zone_manager.py b/designate/cmd/zone_manager.py index 327cd6d90..d09433e4b 100644 --- a/designate/cmd/zone_manager.py +++ b/designate/cmd/zone_manager.py @@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__) CONF = designate.conf.CONF CONF.import_opt('workers', 'designate.producer', group='service:zone_manager') -CONF.import_opt('threads', 'designate.producer', - group='service:zone_manager') def main(): @@ -56,8 +54,8 @@ def main(): LOG.warning('Starting designate-producer under the zone-manager name') - server = producer_service.Service( - threads=CONF['service:zone_manager'].threads) + server = producer_service.Service() + heartbeat = service.Heartbeat(server.service_name, server.tg) service.serve(server, workers=CONF['service:zone_manager'].workers) - server.heartbeat_emitter.start() + heartbeat.start() service.wait() diff --git a/designate/conf/agent.py b/designate/conf/agent.py index ffca416ed..672496fc7 100644 --- a/designate/conf/agent.py +++ b/designate/conf/agent.py @@ -27,14 +27,6 @@ AGENT_OPTS = [ help='Number of agent worker processes to spawn'), cfg.IntOpt('threads', default=1000, help='Number of agent greenthreads to spawn'), - cfg.IPOpt('host', - deprecated_for_removal=True, - deprecated_reason="Replaced by 'listen' option", - help='Agent Bind Host'), - cfg.PortOpt('port', - deprecated_for_removal=True, - deprecated_reason="Replaced by 'listen' option", - help='Agent Port Number'), cfg.ListOpt('listen', default=['0.0.0.0:%d' % DEFAULT_AGENT_PORT], help='Agent host:port pairs to listen on'), diff --git a/designate/conf/api.py b/designate/conf/api.py index cb8dd02c3..d9c40c268 100644 --- a/designate/conf/api.py +++ b/designate/conf/api.py @@ -33,14 +33,6 @@ API_OPTS = [ 'the hostname, port, and any paths that are added' 'to the base of Designate is URLs,' 'For example http://dns.openstack.example.com/dns'), - cfg.IPOpt('api_host', - deprecated_for_removal=True, - deprecated_reason="Replaced by 'listen' option", - help='API Bind Host'), - cfg.PortOpt('api_port', - deprecated_for_removal=True, - deprecated_reason="Replaced by 'listen' option", - help='API Port Number'), cfg.ListOpt('listen', default=['0.0.0.0:9001'], help='API host:port pairs to listen on'), diff --git a/designate/conf/mdns.py b/designate/conf/mdns.py index 0d481fd7a..c7d941b58 100644 --- a/designate/conf/mdns.py +++ b/designate/conf/mdns.py @@ -26,14 +26,6 @@ MDNS_OPTS = [ help='Number of mdns worker processes to spawn'), cfg.IntOpt('threads', default=1000, help='Number of mdns greenthreads to spawn'), - cfg.IPOpt('host', - deprecated_for_removal=True, - deprecated_reason="Replaced by 'listen' option", - help='mDNS Bind Host'), - cfg.PortOpt('port', - deprecated_for_removal=True, - deprecated_reason="Replaced by 'listen' option", - help='mDNS Port Number'), cfg.ListOpt('listen', default=['0.0.0.0:%d' % DEFAULT_MDNS_PORT], help='mDNS host:port pairs to listen on'), diff --git a/designate/coordination.py b/designate/coordination.py index f107d1da2..bbc1556fa 100644 --- a/designate/coordination.py +++ b/designate/coordination.py @@ -35,21 +35,31 @@ def _retry_if_tooz_error(exception): return isinstance(exception, tooz.coordination.ToozError) -class CoordinationMixin(object): - def __init__(self, *args, **kwargs): - super(CoordinationMixin, self).__init__(*args, **kwargs) - +class Coordination(object): + def __init__(self, name, tg): + self.name = name + self.tg = tg + self.coordination_id = None self._coordinator = None + self._started = False + + @property + def coordinator(self): + return self._coordinator + + @property + def started(self): + return self._started def start(self): - self._coordination_id = ":".join([CONF.host, generate_uuid()]) + self.coordination_id = ":".join([CONF.host, generate_uuid()]) if CONF.coordination.backend_url is not None: backend_url = CONF.coordination.backend_url self._coordinator = tooz.coordination.get_coordinator( - backend_url, self._coordination_id.encode()) - self._coordination_started = False + backend_url, self.coordination_id.encode()) + self._started = False self.tg.add_timer(CONF.coordination.heartbeat_interval, self._coordinator_heartbeat) @@ -61,25 +71,22 @@ class CoordinationMixin(object): "coordination functionality will be disabled. " "Please configure a coordination backend.") - super(CoordinationMixin, self).start() - if self._coordinator is not None: - while not self._coordination_started: + while not self._started: try: self._coordinator.start() try: create_group_req = self._coordinator.create_group( - self.service_name) + self.name) create_group_req.get() except tooz.coordination.GroupAlreadyExist: pass - join_group_req = self._coordinator.join_group( - self.service_name) + join_group_req = self._coordinator.join_group(self.name) join_group_req.get() - self._coordination_started = True + self._started = True except Exception: LOG.warning("Failed to start Coordinator:", exc_info=True) @@ -87,18 +94,16 @@ class CoordinationMixin(object): def stop(self): if self._coordinator is not None: - self._coordination_started = False + self._started = False - leave_group_req = self._coordinator.leave_group(self.service_name) + leave_group_req = self._coordinator.leave_group(self.name) leave_group_req.get() self._coordinator.stop() - super(CoordinationMixin, self).stop() - self._coordinator = None def _coordinator_heartbeat(self): - if not self._coordination_started: + if not self._started: return try: @@ -107,7 +112,7 @@ class CoordinationMixin(object): LOG.exception('Error sending a heartbeat to coordination backend.') def _coordinator_run_watchers(self): - if not self._coordination_started: + if not self._started: return self._coordinator.run_watchers() diff --git a/designate/mdns/service.py b/designate/mdns/service.py index 75c24682f..324d354a4 100644 --- a/designate/mdns/service.py +++ b/designate/mdns/service.py @@ -16,10 +16,10 @@ from oslo_config import cfg from oslo_log import log as logging -from designate import utils +from designate import dnsutils from designate import service from designate import storage -from designate import dnsutils +from designate import utils from designate.mdns import handler from designate.mdns import notify from designate.mdns import xfr @@ -29,13 +29,38 @@ LOG = logging.getLogger(__name__) CONF = cfg.CONF -class Service(service.DNSService, service.RPCService, service.Service): +class Service(service.RPCService): _dns_default_port = DEFAULT_MDNS_PORT + def __init__(self): + self._storage = None + + super(Service, self).__init__( + self.service_name, cfg.CONF['service:mdns'].topic, + threads=cfg.CONF['service:mdns'].threads, + ) + self.override_endpoints( + [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)] + ) + + self.dns_service = service.DNSService( + self.dns_application, self.tg, + cfg.CONF['service:mdns'].listen, + cfg.CONF['service:mdns'].tcp_backlog, + cfg.CONF['service:mdns'].tcp_recv_timeout, + ) + + def start(self): + super(Service, self).start() + self.dns_service.start() + + def stop(self, graceful=False): + self.dns_service.stop() + super(Service, self).stop(graceful) + @property def storage(self): - if not hasattr(self, '_storage'): - # Get a storage connection + if not self._storage: self._storage = storage.get_storage( CONF['service:mdns'].storage_driver ) @@ -47,12 +72,7 @@ class Service(service.DNSService, service.RPCService, service.Service): @property @utils.cache_result - def _rpc_endpoints(self): - return [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)] - - @property - @utils.cache_result - def _dns_application(self): + def dns_application(self): # Create an instance of the RequestHandler class and wrap with # necessary middleware. application = handler.RequestHandler(self.storage, self.tg) diff --git a/designate/pool_manager/service.py b/designate/pool_manager/service.py index fe854dddb..da32e9697 100644 --- a/designate/pool_manager/service.py +++ b/designate/pool_manager/service.py @@ -76,8 +76,7 @@ def _constant_retries(num_attempts, sleep_interval): yield True -class Service(service.RPCService, coordination.CoordinationMixin, - service.Service): +class Service(service.RPCService): """ Service side of the Pool Manager RPC API. @@ -91,8 +90,30 @@ class Service(service.RPCService, coordination.CoordinationMixin, target = messaging.Target(version=RPC_API_VERSION) - def __init__(self, threads=None): - super(Service, self).__init__(threads=threads) + def __init__(self): + self._scheduler = None + self._storage = None + self._quota = None + + self._pool_election = None + + self._central_api = None + self._mdns_api = None + self._pool_manager_api = None + + topic = '%s.%s' % ( + cfg.CONF['service:pool_manager'].topic, + CONF['service:pool_manager'].pool_id + ) + + super(Service, self).__init__( + self.service_name, topic, + threads=cfg.CONF['service:worker'].threads, + ) + + self.coordination = coordination.Coordination( + self.service_name, self.tg + ) # Get a pool manager cache connection. self.cache = cache.get_pool_manager_cache( @@ -110,8 +131,9 @@ class Service(service.RPCService, coordination.CoordinationMixin, CONF['service:pool_manager'].periodic_sync_retry_interval # Compute a time (seconds) by which things should have propagated - self.max_prop_time = utils.max_prop_time(self.timeout, - self.max_retries, self.retry_interval, self.delay) + self.max_prop_time = utils.max_prop_time( + self.timeout, self.max_retries, self.retry_interval, self.delay + ) def _setup_target_backends(self): self.target_backends = {} @@ -130,17 +152,6 @@ class Service(service.RPCService, coordination.CoordinationMixin, def service_name(self): return 'pool_manager' - @property - def _rpc_topic(self): - # Modify the default topic so it's pool manager instance specific. - topic = super(Service, self)._rpc_topic - - topic = '%s.%s' % (topic, CONF['service:pool_manager'].pool_id) - LOG.info('Using topic %(topic)s for this pool manager instance.', - {'topic': topic}) - - return topic - def start(self): # Build the Pool (and related) Object from Config context = DesignateContext.get_admin_context() @@ -182,11 +193,13 @@ class Service(service.RPCService, coordination.CoordinationMixin, self.target_backends[target.id].start() super(Service, self).start() + self.coordination.start() # Setup a Leader Election, use for ensuring certain tasks are executed # on exactly one pool-manager instance at a time] self._pool_election = coordination.LeaderElection( - self._coordinator, '%s:%s' % (self.service_name, self.pool.id)) + self.coordination.coordinator, + '%s:%s' % (self.service_name, self.pool.id)) self._pool_election.start() if CONF['service:pool_manager'].enable_recovery_timer: @@ -201,29 +214,30 @@ class Service(service.RPCService, coordination.CoordinationMixin, ' %(interval)s s', {'interval': interval}) self.tg.add_timer(interval, self.periodic_sync, interval) - def stop(self): + def stop(self, graceful=True): self._pool_election.stop() + # self.coordination.stop() - super(Service, self).stop() + super(Service, self).stop(graceful) for target in self.pool.targets: self.target_backends[target.id].stop() @property def central_api(self): - if not hasattr(self, '_central_api'): + if not self._central_api: self._central_api = central_api.CentralAPI.get_instance() return self._central_api @property def mdns_api(self): - if not hasattr(self, '_mdns_adpi'): + if not self._mdns_api: self._mdns_api = mdns_api.MdnsAPI.get_instance() return self._mdns_api @property def pool_manager_api(self): - if not hasattr(self, '_pool_manager_api'): + if not self._pool_manager_api: pool_mgr_api = pool_manager_rpcapi.PoolManagerAPI self._pool_manager_api = pool_mgr_api.get_instance() return self._pool_manager_api diff --git a/designate/producer/service.py b/designate/producer/service.py index b9c196ee6..51af68825 100644 --- a/designate/producer/service.py +++ b/designate/producer/service.py @@ -31,15 +31,29 @@ CONF = cfg.CONF NS = 'designate.periodic_tasks' -class Service(service.RPCService, coordination.CoordinationMixin, - service.Service): +class Service(service.RPCService): RPC_API_VERSION = '1.0' target = messaging.Target(version=RPC_API_VERSION) + def __init__(self): + self._partitioner = None + + self._storage = None + self._quota = None + + super(Service, self).__init__( + self.service_name, cfg.CONF['service:producer'].topic, + threads=cfg.CONF['service:producer'].threads, + ) + + self.coordination = coordination.Coordination( + self.service_name, self.tg + ) + @property def storage(self): - if not hasattr(self, '_storage'): + if not self._storage: # TODO(timsim): Remove this when zone_mgr goes away storage_driver = cfg.CONF['service:zone_manager'].storage_driver if cfg.CONF['service:producer'].storage_driver != storage_driver: @@ -49,7 +63,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, @property def quota(self): - if not hasattr(self, '_quota'): + if not self._quota: # Get a quota manager instance self._quota = quota.get_quota() return self._quota @@ -64,10 +78,12 @@ class Service(service.RPCService, coordination.CoordinationMixin, def start(self): super(Service, self).start() + self.coordination.start() self._partitioner = coordination.Partitioner( - self._coordinator, self.service_name, - self._coordination_id.encode(), range(0, 4095)) + self.coordination.coordinator, self.service_name, + self.coordination.coordination_id.encode(), range(0, 4095) + ) self._partitioner.start() self._partitioner.watch_partition_change(self._rebalance) @@ -76,7 +92,7 @@ class Service(service.RPCService, coordination.CoordinationMixin, zmgr_enabled_tasks = CONF['service:zone_manager'].enabled_tasks producer_enabled_tasks = CONF['service:producer'].enabled_tasks enabled = zmgr_enabled_tasks - if producer_enabled_tasks != []: + if producer_enabled_tasks: enabled = producer_enabled_tasks for task in tasks.PeriodicTask.get_extensions(enabled): @@ -91,6 +107,10 @@ class Service(service.RPCService, coordination.CoordinationMixin, interval = CONF[task.get_canonical_name()].interval self.tg.add_timer(interval, task) + def stop(self, graceful=True): + super(Service, self).stop(graceful) + self.coordination.stop() + def _rebalance(self, my_partitions, members, event): LOG.info("Received rebalance event %s", event) self.partition_range = my_partitions diff --git a/designate/service.py b/designate/service.py index 0b499799a..e4e590b55 100644 --- a/designate/service.py +++ b/designate/service.py @@ -17,241 +17,161 @@ # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. -import abc +import errno import socket import struct -import errno -import six -import eventlet.wsgi import eventlet.debug -from oslo_config import cfg -import oslo_messaging as messaging from oslo_log import log as logging +import oslo_messaging as messaging from oslo_service import service from oslo_service import sslutils +from oslo_service import wsgi from oslo_utils import netutils -import designate.conf -from designate.i18n import _ -from designate.metrics import metrics from designate import policy from designate import rpc from designate import service_status -from designate import version from designate import utils - - -# TODO(kiall): These options have been cut+paste from the old WSGI code, and -# should be moved into service:api etc.. - +from designate import version +import designate.conf +from designate.i18n import _ +from designate.metrics import metrics CONF = designate.conf.CONF LOG = logging.getLogger(__name__) -@six.add_metaclass(abc.ABCMeta) class Service(service.Service): - """ - Service class to be shared among the diverse service inside of Designate. - """ - def __init__(self, threads=None): + def __init__(self, name, threads=None): threads = threads or 1000 - super(Service, self).__init__(threads) - - self._host = CONF.host - self._service_config = CONF['service:%s' % self.service_name] + self.name = name + self.host = CONF.host policy.init() - # NOTE(kiall): All services need RPC initialized, as this is used - # for clients AND servers. Hence, this is common to - # all Designate services. if not rpc.initialized(): rpc.init(CONF) - @abc.abstractproperty - def service_name(self): - pass - def start(self): - super(Service, self).start() - LOG.info('Starting %(name)s service (version: %(version)s)', { - 'name': self.service_name, + 'name': self.name, 'version': version.version_info.version_string() }) + super(Service, self).start() - def stop(self): - LOG.info('Stopping %(name)s service', {'name': self.service_name}) - - super(Service, self).stop() - - def _get_listen_on_addresses(self, default_port): - """ - Helper Method to handle migration from singular host/port to - multiple binds - """ - try: - # The API service uses "api_host", and "api_port", others use - # just host and port. - host = self._service_config.api_host - port = self._service_config.api_port - - except cfg.NoSuchOptError: - host = self._service_config.host - port = self._service_config.port - - if host or port is not None: - LOG.warning("host and port config options used, the 'listen' " - "option has been ignored") - - host = host or "0.0.0.0" - # "port" might be 0 to pick a free port, usually during testing - port = default_port if port is None else port - - return [(host, port)] - - else: - - return map( - netutils.parse_host_port, - set(self._service_config.listen) - ) + def stop(self, graceful=True): + LOG.info('Stopping %(name)s service', {'name': self.name}) + super(Service, self).stop(graceful) -class RPCService(object): - """ - RPC Service mixin used by all Designate RPC Services - """ - def __init__(self, *args, **kwargs): - super(RPCService, self).__init__(*args, **kwargs) +class Heartbeat(object): + def __init__(self, name, tg, rpc_api=None): + self.name = name + self.tg = tg - LOG.debug("Creating RPC Server on topic '%s'", self._rpc_topic) - self._rpc_server = rpc.get_server( - messaging.Target(topic=self._rpc_topic, server=self._host), - self._rpc_endpoints) + self._status = 'UP' + self._stats = {} + self._capabilities = {} emitter_cls = service_status.HeartBeatEmitter.get_driver( CONF.heartbeat_emitter.emitter_type ) self.heartbeat_emitter = emitter_cls( - self.service_name, self.tg, status_factory=self._get_status + self.name, self.tg, + status_factory=self.get_status, rpc_api=rpc_api ) - def _get_status(self): - status = "UP" - stats = {} - capabilities = {} - return status, stats, capabilities - - @property - def _rpc_endpoints(self): - return [self] - - @property - def _rpc_topic(self): - return CONF['service:%s' % self.service_name].topic + def get_status(self): + return self._status, self._stats, self._capabilities def start(self): - super(RPCService, self).start() - - LOG.debug("Starting RPC server on topic '%s'", self._rpc_topic) - self._rpc_server.start() - - # TODO(kiall): This probably belongs somewhere else, maybe the base - # Service class? - self.notifier = rpc.get_notifier(self.service_name) - - for e in self._rpc_endpoints: - if e != self and hasattr(e, 'start'): - e.start() - self.heartbeat_emitter.start() def stop(self): - LOG.debug("Stopping RPC server on topic '%s'", self._rpc_topic) self.heartbeat_emitter.stop() - for e in self._rpc_endpoints: - if e != self and hasattr(e, 'stop'): - e.stop() - # Try to shut the connection down, but if we get any sort of - # errors, go ahead and ignore them.. as we're shutting down anyway - try: - self._rpc_server.stop() - except Exception: - pass +class RPCService(Service): + def __init__(self, name, rpc_topic, threads=None): + super(RPCService, self).__init__(name, threads) + LOG.debug("Creating RPC Server on topic '%s' for %s", + rpc_topic, self.name) - super(RPCService, self).stop() + self.endpoints = [self] + self.notifier = None + self.rpc_server = None + self.rpc_topic = rpc_topic + + def override_endpoints(self, endpoints): + self.endpoints = endpoints + + def start(self): + super(RPCService, self).start() + target = messaging.Target(topic=self.rpc_topic, server=self.host) + self.rpc_server = rpc.get_server(target, self.endpoints) + self.rpc_server.start() + self.notifier = rpc.get_notifier(self.name) + + def stop(self, graceful=True): + if self.rpc_server: + self.rpc_server.stop() + super(RPCService, self).stop(graceful) def wait(self): - for e in self._rpc_endpoints: - if e != self and hasattr(e, 'wait'): - e.wait() - super(RPCService, self).wait() -@six.add_metaclass(abc.ABCMeta) -class WSGIService(object): - """ - WSGI Service mixin used by all Designate WSGI Services - """ - def __init__(self, *args, **kwargs): - super(WSGIService, self).__init__(*args, **kwargs) +class WSGIService(Service): + def __init__(self, app, name, listen, max_url_len=None): + super(WSGIService, self).__init__(name) + self.app = app + self.name = name - self._use_ssl = sslutils.is_enabled(CONF) - self._wsgi_socks = [] + self.listen = listen - @abc.abstractproperty - def _wsgi_application(self): - pass + self.servers = [] + + for address in self.listen: + host, port = netutils.parse_host_port(address) + server = wsgi.Server( + CONF, name, app, + host=host, + port=port, + pool_size=CONF['service:api'].threads, + use_ssl=sslutils.is_enabled(CONF), + max_url_len=max_url_len + ) + + self.servers.append(server) def start(self): + for server in self.servers: + server.start() super(WSGIService, self).start() - addresses = self._get_listen_on_addresses(9001) + def stop(self, graceful=True): + for server in self.servers: + server.stop() + super(WSGIService, self).stop(graceful) - for address in addresses: - self._start(address[0], address[1]) - - def _start(self, host, port): - wsgi_sock = utils.bind_tcp( - host, port, CONF.backlog, CONF.tcp_keepidle) - - if self._use_ssl: - wsgi_sock = sslutils.wrap(CONF, wsgi_sock) - - self._wsgi_socks.append(wsgi_sock) - - self.tg.add_thread(self._wsgi_handle, wsgi_sock) - - def _wsgi_handle(self, wsgi_sock): - logger = logging.getLogger('eventlet.wsgi') - # Adjust wsgi MAX_HEADER_LINE to accept large tokens. - eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line - - eventlet.wsgi.server(wsgi_sock, - self._wsgi_application, - custom_pool=self.tg.pool, - log=logger) + def wait(self): + for server in self.servers: + server.wait() + super(WSGIService, self).wait() -@six.add_metaclass(abc.ABCMeta) class DNSService(object): - """ - DNS Service mixin used by all Designate DNS Services - """ - _TCP_RECV_MAX_SIZE = 65535 - def __init__(self, *args, **kwargs): - super(DNSService, self).__init__(*args, **kwargs) - + def __init__(self, app, tg, listen, tcp_backlog, tcp_recv_timeout): + self.app = app + self.tg = tg + self.tcp_backlog = tcp_backlog + self.tcp_recv_timeout = tcp_recv_timeout + self.listen = listen metrics.init() # Eventet will complain loudly about our use of multiple greentheads @@ -261,21 +181,18 @@ class DNSService(object): self._dns_socks_tcp = [] self._dns_socks_udp = [] - @abc.abstractproperty - def _dns_application(self): - pass - def start(self): - super(DNSService, self).start() - - addresses = self._get_listen_on_addresses(self._dns_default_port) + addresses = map( + netutils.parse_host_port, + set(self.listen) + ) for address in addresses: self._start(address[0], address[1]) def _start(self, host, port): sock_tcp = utils.bind_tcp( - host, port, self._service_config.tcp_backlog) + host, port, self.tcp_backlog) sock_udp = utils.bind_udp( host, port) @@ -286,14 +203,7 @@ class DNSService(object): self.tg.add_thread(self._dns_handle_tcp, sock_tcp) self.tg.add_thread(self._dns_handle_udp, sock_udp) - def wait(self): - super(DNSService, self).wait() - def stop(self): - # When the service is stopped, the threads for _handle_tcp and - # _handle_udp are stopped too. - super(DNSService, self).stop() - for sock_tcp in self._dns_socks_tcp: sock_tcp.close() @@ -301,7 +211,7 @@ class DNSService(object): sock_udp.close() def _dns_handle_tcp(self, sock_tcp): - LOG.info("_handle_tcp thread started") + LOG.info('_handle_tcp thread started') client = None while True: @@ -309,13 +219,13 @@ class DNSService(object): # handle a new TCP connection client, addr = sock_tcp.accept() - if self._service_config.tcp_recv_timeout: - client.settimeout(self._service_config.tcp_recv_timeout) + if self.tcp_recv_timeout: + client.settimeout(self.tcp_recv_timeout) - LOG.debug("Handling TCP Request from: %(host)s:%(port)d", + LOG.debug('Handling TCP Request from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) if len(addr) == 4: - LOG.debug("Flow info: %(host)s scope: %(port)d", + LOG.debug('Flow info: %(host)s scope: %(port)d', {'host': addr[2], 'port': addr[3]}) # Dispatch a thread to handle the connection @@ -327,21 +237,21 @@ class DNSService(object): except socket.timeout: if client: client.close() - LOG.warning("TCP Timeout from: %(host)s:%(port)d", + LOG.warning('TCP Timeout from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) except socket.error as e: if client: client.close() errname = errno.errorcode[e.args[0]] - LOG.warning("Socket error %(err)s from: %(host)s:%(port)d", + LOG.warning('Socket error %(err)s from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1], 'err': errname}) except Exception: if client: client.close() - LOG.exception("Unknown exception handling TCP request from: " - "%(host)s:%(port)d", + LOG.exception('Unknown exception handling TCP request from: ' + '%(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) def _dns_handle_tcp_conn(self, addr, client): @@ -369,7 +279,7 @@ class DNSService(object): expected_length_raw = client.recv(2) if len(expected_length_raw) == 0: break - (expected_length, ) = struct.unpack('!H', expected_length_raw) + (expected_length,) = struct.unpack('!H', expected_length_raw) # Keep receiving data until we've got all the data we expect # The buffer contains only one query at a time @@ -385,7 +295,7 @@ class DNSService(object): query = buf # Call into the DNS Application itself with payload and addr - for response in self._dns_application( + for response in self.app( {'payload': query, 'addr': addr}): # Send back a response only if present @@ -398,20 +308,20 @@ class DNSService(object): client.sendall(tcp_response) except socket.timeout: - LOG.info("TCP Timeout from: %(host)s:%(port)d", + LOG.info('TCP Timeout from: %(host)s:%(port)d', {'host': host, 'port': port}) except socket.error as e: errname = errno.errorcode[e.args[0]] - LOG.warning("Socket error %(err)s from: %(host)s:%(port)d", + LOG.warning('Socket error %(err)s from: %(host)s:%(port)d', {'host': host, 'port': port, 'err': errname}) except struct.error: - LOG.warning("Invalid packet from: %(host)s:%(port)d", + LOG.warning('Invalid packet from: %(host)s:%(port)d', {'host': host, 'port': port}) except Exception: - LOG.exception("Unknown exception handling TCP request from: " + LOG.exception('Unknown exception handling TCP request from: ' "%(host)s:%(port)d", {'host': host, 'port': port}) finally: if client: @@ -424,7 +334,7 @@ class DNSService(object): :type sock_udp: socket :raises: None """ - LOG.info("_handle_udp thread started") + LOG.info('_handle_udp thread started') while True: try: @@ -432,8 +342,8 @@ class DNSService(object): # UDP recvfrom. payload, addr = sock_udp.recvfrom(8192) - LOG.debug("Handling UDP Request from: %(host)s:%(port)d", - {'host': addr[0], 'port': addr[1]}) + LOG.debug('Handling UDP Request from: %(host)s:%(port)d', + {'host': addr[0], 'port': addr[1]}) # Dispatch a thread to handle the query self.tg.add_thread(self._dns_handle_udp_query, sock_udp, addr, @@ -441,12 +351,12 @@ class DNSService(object): except socket.error as e: errname = errno.errorcode[e.args[0]] - LOG.warning("Socket error %(err)s from: %(host)s:%(port)d", + LOG.warning('Socket error %(err)s from: %(host)s:%(port)d', {'host': addr[0], 'port': addr[1], 'err': errname}) except Exception: - LOG.exception("Unknown exception handling UDP request from: " - "%(host)s:%(port)d", + LOG.exception('Unknown exception handling UDP request from: ' + '%(host)s:%(port)d', {'host': addr[0], 'port': addr[1]}) def _dns_handle_udp_query(self, sock, addr, payload): @@ -463,7 +373,7 @@ class DNSService(object): """ try: # Call into the DNS Application itself with the payload and addr - for response in self._dns_application( + for response in self.app( {'payload': payload, 'addr': addr}): # Send back a response only if present @@ -471,7 +381,7 @@ class DNSService(object): sock.sendto(response, addr) except Exception: - LOG.exception("Unhandled exception while processing request from " + LOG.exception('Unhandled exception while processing request from ' "%(host)s:%(port)d", {'host': addr[0], 'port': addr[1]}) diff --git a/designate/service_status.py b/designate/service_status.py index bf4a2044b..b6f0ba241 100644 --- a/designate/service_status.py +++ b/designate/service_status.py @@ -31,14 +31,15 @@ class HeartBeatEmitter(plugin.DriverPlugin): __plugin_ns__ = 'designate.heartbeat_emitter' __plugin_type__ = 'heartbeat_emitter' - def __init__(self, service, threadgroup, status_factory=None): + def __init__(self, service, thread_group, status_factory=None, + *args, **kwargs): super(HeartBeatEmitter, self).__init__() self._service = service self._hostname = CONF.host self._running = False - self._tg = threadgroup + self._tg = thread_group self._tg.add_timer( CONF.heartbeat_emitter.heartbeat_interval, self._emit_heartbeat) diff --git a/designate/sink/service.py b/designate/sink/service.py index 11f3e1569..ea7ddfbca 100644 --- a/designate/sink/service.py +++ b/designate/sink/service.py @@ -27,8 +27,10 @@ LOG = logging.getLogger(__name__) class Service(service.Service): - def __init__(self, threads=None): - super(Service, self).__init__(threads=threads) + def __init__(self): + super(Service, self).__init__( + self.service_name, threads=cfg.CONF['service:sink'].threads + ) # Initialize extensions self.handlers = self._init_extensions() @@ -38,7 +40,8 @@ class Service(service.Service): def service_name(self): return 'sink' - def _init_extensions(self): + @staticmethod + def _init_extensions(): """Loads and prepares all enabled extensions""" enabled_notification_handlers = \ @@ -75,7 +78,7 @@ class Service(service.Service): if targets: self._server.start() - def stop(self): + def stop(self, graceful=True): # Try to shut the connection down, but if we get any sort of # errors, go ahead and ignore them.. as we're shutting down anyway try: @@ -83,7 +86,7 @@ class Service(service.Service): except Exception: pass - super(Service, self).stop() + super(Service, self).stop(graceful) def _get_targets(self): """ diff --git a/designate/tests/test_api/test_service.py b/designate/tests/test_api/test_service.py index 12eafc072..e8e054d4c 100644 --- a/designate/tests/test_api/test_service.py +++ b/designate/tests/test_api/test_service.py @@ -21,8 +21,7 @@ class ApiServiceTest(ApiTestCase): def setUp(self): super(ApiServiceTest, self).setUp() - # Use a random port for the API - self.config(api_port=0, group='service:api') + self.config(listen=['0.0.0.0:0'], group='service:api') self.service = service.Service() diff --git a/designate/tests/test_coordination.py b/designate/tests/test_coordination.py index 7a5125d62..67fbe1497 100644 --- a/designate/tests/test_coordination.py +++ b/designate/tests/test_coordination.py @@ -27,45 +27,57 @@ cfg.CONF.register_opts([ ], group="service:dummy") -class CoordinatedService(coordination.CoordinationMixin, service.Service): +class CoordinatedService(service.Service): + def __init__(self): + super(CoordinatedService, self).__init__() + self.coordination = coordination.Coordination( + self.service_name, self.tg + ) + + def start(self): + super(CoordinatedService, self).start() + self.coordination.start() + @property def service_name(self): return "dummy" -class TestCoordinationMixin(TestCase): +class TestCoordination(TestCase): def setUp(self): - super(TestCoordinationMixin, self).setUp() + super(TestCoordination, self).setUp() + self.name = 'coordination' + self.tg = mock.Mock() self.config(backend_url="zake://", group="coordination") def test_start(self): - service = CoordinatedService() + service = coordination.Coordination(self.name, self.tg) service.start() - self.assertTrue(service._coordination_started) - self.assertIn(service.service_name.encode('utf-8'), - service._coordinator.get_groups().get()) - self.assertIn(service._coordination_id.encode('utf-8'), - service._coordinator.get_members( - service.service_name).get()) + self.assertTrue(service.started) + self.assertIn(self.name.encode('utf-8'), + service.coordinator.get_groups().get()) + self.assertIn(service.coordination_id.encode('utf-8'), + service.coordinator.get_members( + self.name.encode('utf-8')).get()) service.stop() def test_stop(self): - service = CoordinatedService() + service = coordination.Coordination(self.name, self.tg) service.start() service.stop() - self.assertFalse(service._coordination_started) + self.assertFalse(service.started) def test_start_no_coordination(self): self.config(backend_url=None, group="coordination") - service = CoordinatedService() + service = coordination.Coordination(self.name, self.tg) service.start() - self.assertIsNone(service._coordinator) + self.assertIsNone(service.coordinator) def test_stop_no_coordination(self): self.config(backend_url=None, group="coordination") - service = CoordinatedService() - self.assertIsNone(service._coordinator) + service = coordination.Coordination(self.name, self.tg) + self.assertIsNone(service.coordinator) service.start() service.stop() diff --git a/designate/tests/test_mdns/test_service.py b/designate/tests/test_mdns/test_service.py index 6a7e763e9..f28045253 100644 --- a/designate/tests/test_mdns/test_service.py +++ b/designate/tests/test_mdns/test_service.py @@ -34,7 +34,6 @@ def hex_wire(response): class MdnsServiceTest(MdnsTestCase): - # DNS packet with IQUERY opcode query_payload = binascii.a2b_hex( "271209000001000000000000076578616d706c6503636f6d0000010001" @@ -42,6 +41,7 @@ class MdnsServiceTest(MdnsTestCase): expected_response = binascii.a2b_hex( b"271289050001000000000000076578616d706c6503636f6d0000010001" ) + # expected response is an error code REFUSED. The other fields are # id 10002 # opcode IQUERY @@ -58,10 +58,10 @@ class MdnsServiceTest(MdnsTestCase): def setUp(self): super(MdnsServiceTest, self).setUp() - # Use a random port for MDNS - self.config(port=0, group='service:mdns') + self.config(listen=['0.0.0.0:0'], group='service:mdns') self.service = self.start_service('mdns') + self.dns_service = self.service.dns_service self.addr = ['0.0.0.0', 5556] @staticmethod @@ -77,14 +77,14 @@ class MdnsServiceTest(MdnsTestCase): @mock.patch.object(dns.message, 'make_query') def test_handle_empty_payload(self, query_mock): mock_socket = mock.Mock() - self.service._dns_handle_udp_query(mock_socket, self.addr, - ' '.encode('utf-8')) + self.dns_service._dns_handle_udp_query(mock_socket, self.addr, + ' '.encode('utf-8')) query_mock.assert_called_once_with('unknown', dns.rdatatype.A) def test_handle_udp_payload(self): mock_socket = mock.Mock() - self.service._dns_handle_udp_query(mock_socket, self.addr, - self.query_payload) + self.dns_service._dns_handle_udp_query(mock_socket, self.addr, + self.query_payload) mock_socket.sendto.assert_called_once_with(self.expected_response, self.addr) @@ -93,7 +93,7 @@ class MdnsServiceTest(MdnsTestCase): mock_socket = mock.Mock() mock_socket.recv.side_effect = ['X', 'boo'] # X will fail unpack - self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) + self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) self.assertEqual(1, mock_socket.recv.call_count) self.assertEqual(1, mock_socket.close.call_count) @@ -103,14 +103,14 @@ class MdnsServiceTest(MdnsTestCase): pay_len = struct.pack("!H", len(payload)) mock_socket.recv.side_effect = [pay_len, payload, socket.timeout] - self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) + self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) self.assertEqual(3, mock_socket.recv.call_count) self.assertEqual(1, mock_socket.sendall.call_count) self.assertEqual(1, mock_socket.close.call_count) wire = mock_socket.sendall.call_args[0][0] expected_length_raw = wire[:2] - (expected_length, ) = struct.unpack('!H', expected_length_raw) + (expected_length,) = struct.unpack('!H', expected_length_raw) self.assertEqual(len(wire), expected_length + 2) self.assertEqual(self.expected_response, wire[2:]) @@ -130,7 +130,7 @@ class MdnsServiceTest(MdnsTestCase): pay_len, payload, pay_len, payload, ] - self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) + self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) self.assertEqual(11, mock_socket.recv.call_count) self.assertEqual(5, mock_socket.sendall.call_count) @@ -152,7 +152,7 @@ class MdnsServiceTest(MdnsTestCase): pay_len, payload, pay_len, payload, ] - self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) + self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) self.assertEqual(11, mock_socket.recv.call_count) self.assertEqual(5, mock_socket.sendall.call_count) @@ -171,7 +171,7 @@ class MdnsServiceTest(MdnsTestCase): pay_len, payload, pay_len, payload, ] - self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) + self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket) self.assertEqual(11, mock_socket.recv.call_count) self.assertEqual(4, mock_socket.sendall.call_count) diff --git a/designate/tests/unit/agent/backends/test_bind9.py b/designate/tests/unit/agent/backends/test_bind9.py index 91a422d1d..a88ff0d89 100644 --- a/designate/tests/unit/agent/backends/test_bind9.py +++ b/designate/tests/unit/agent/backends/test_bind9.py @@ -27,7 +27,7 @@ class Bind9AgentBackendTestCase(designate.tests.TestCase): def setUp(self): super(Bind9AgentBackendTestCase, self).setUp() - self.CONF.set_override('port', 0, 'service:agent') + self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent') self.backend = impl_bind9.Bind9Backend('foo') diff --git a/designate/tests/unit/agent/backends/test_denominator.py b/designate/tests/unit/agent/backends/test_denominator.py index 89c354503..36a728068 100644 --- a/designate/tests/unit/agent/backends/test_denominator.py +++ b/designate/tests/unit/agent/backends/test_denominator.py @@ -28,7 +28,7 @@ class DenominatorAgentBackendTestCase(designate.tests.TestCase): def setUp(self): super(DenominatorAgentBackendTestCase, self).setUp() - self.CONF.set_override('port', 0, 'service:agent') + self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent') self.backend = impl_denominator.DenominatorBackend('foo') diff --git a/designate/tests/unit/agent/backends/test_fake.py b/designate/tests/unit/agent/backends/test_fake.py index e4b327789..00fd64147 100644 --- a/designate/tests/unit/agent/backends/test_fake.py +++ b/designate/tests/unit/agent/backends/test_fake.py @@ -22,7 +22,7 @@ class FakeAgentBackendTestCase(designate.tests.TestCase): def setUp(self): super(FakeAgentBackendTestCase, self).setUp() - self.CONF.set_override('port', 0, 'service:agent') + self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent') self.backend = impl_fake.FakeBackend('foo') diff --git a/designate/tests/unit/agent/test_service.py b/designate/tests/unit/agent/test_service.py index 94cde354a..f4d47c9f6 100644 --- a/designate/tests/unit/agent/test_service.py +++ b/designate/tests/unit/agent/test_service.py @@ -21,30 +21,40 @@ from designate import utils from designate.agent import service from designate.backend import agent_backend from designate.backend.agent_backend import impl_fake +from designate.tests import fixtures class AgentServiceTest(designate.tests.TestCase): def setUp(self): super(AgentServiceTest, self).setUp() + self.stdlog = fixtures.StandardLogging() + self.useFixture(self.stdlog) - self.CONF.set_override('port', 0, 'service:agent') + self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent') self.CONF.set_override('notify_delay', 0, 'service:agent') self.service = service.Service() - self.service._start = mock.Mock() - self.service._rpc_server = mock.Mock() + self.service.dns_service._start = mock.Mock() + + def test_service_start(self): + self.service.start() + + self.assertTrue(self.service.dns_service._start.called) + + def test_service_stop(self): + self.service.dns_service.stop = mock.Mock() + self.service.backend.stop = mock.Mock() + + self.service.stop() + + self.assertTrue(self.service.dns_service.stop.called) + self.assertTrue(self.service.backend.stop.called) + + self.assertIn('Stopping agent service', self.stdlog.logger.output) def test_service_name(self): self.assertEqual('agent', self.service.service_name) - def test_start(self): - self.service.start() - - self.assertTrue(self.service._start.called) - - def test_stop(self): - self.service.stop() - def test_get_backend(self): backend = agent_backend.get_backend('fake', agent_service=self.service) self.assertIsInstance(backend, impl_fake.FakeBackend) @@ -52,17 +62,15 @@ class AgentServiceTest(designate.tests.TestCase): @mock.patch.object(utils, 'cache_result') def test_get_dns_application(self, mock_cache_result): self.assertIsInstance( - self.service._dns_application, dnsutils.SerializationMiddleware + self.service.dns_application, dnsutils.SerializationMiddleware ) @mock.patch.object(utils, 'cache_result') def test_get_dns_application_with_notify_delay(self, mock_cache_result): self.service = service.Service() - self.service._start = mock.Mock() - self.service._rpc_server = mock.Mock() self.CONF.set_override('notify_delay', 1.0, 'service:agent') self.assertIsInstance( - self.service._dns_application, dnsutils.SerializationMiddleware + self.service.dns_application, dnsutils.SerializationMiddleware ) diff --git a/designate/tests/unit/mdns/test_service.py b/designate/tests/unit/mdns/test_service.py index e305a5b06..7c7e45def 100644 --- a/designate/tests/unit/mdns/test_service.py +++ b/designate/tests/unit/mdns/test_service.py @@ -21,27 +21,42 @@ from oslo_config import fixture as cfg_fixture import designate.dnsutils import designate.rpc import designate.service -import designate.storage.base +from designate import storage import designate.utils from designate.mdns import handler from designate.mdns import service +from designate.tests import fixtures CONF = cfg.CONF class MdnsServiceTest(oslotest.base.BaseTestCase): - @mock.patch.object(designate.rpc, 'get_server') - def setUp(self, mock_rpc_server): + @mock.patch.object(storage, 'get_storage', mock.Mock()) + def setUp(self): super(MdnsServiceTest, self).setUp() + self.stdlog = fixtures.StandardLogging() + self.useFixture(self.stdlog) + self.useFixture(cfg_fixture.Config(CONF)) self.service = service.Service() - @mock.patch.object(designate.service.DNSService, '_start') - def test_service_start(self, mock_service_start): + @mock.patch.object(designate.service.DNSService, 'start') + @mock.patch.object(designate.service.RPCService, 'start') + def test_service_start(self, mock_rpc_start, mock_dns_start): self.service.start() - self.assertTrue(mock_service_start.called) + self.assertTrue(mock_dns_start.called) + self.assertTrue(mock_rpc_start.called) + + def test_service_stop(self): + self.service.dns_service.stop = mock.Mock() + + self.service.stop() + + self.assertTrue(self.service.dns_service.stop.called) + + self.assertIn('Stopping mdns service', self.stdlog.logger.output) def test_service_name(self): self.assertEqual('mdns', self.service.service_name) @@ -51,17 +66,13 @@ class MdnsServiceTest(oslotest.base.BaseTestCase): self.service = service.Service() - self.assertEqual('test-topic', self.service._rpc_topic) + self.assertEqual('test-topic', self.service.rpc_topic) self.assertEqual('mdns', self.service.service_name) - def test_rpc_endpoints(self): - endpoints = self.service._rpc_endpoints - - self.assertIsInstance(endpoints[0], service.notify.NotifyEndpoint) - self.assertIsInstance(endpoints[1], service.xfr.XfrEndpoint) - - @mock.patch.object(designate.storage.base.Storage, 'get_driver') + @mock.patch.object(storage, 'get_storage') def test_storage_driver(self, mock_get_driver): + self.service._storage = None + mock_driver = mock.MagicMock() mock_driver.name = 'noop_driver' mock_get_driver.return_value = mock_driver @@ -70,16 +81,12 @@ class MdnsServiceTest(oslotest.base.BaseTestCase): self.assertTrue(mock_get_driver.called) - @mock.patch.object(handler, 'RequestHandler', name='reqh') - @mock.patch.object(designate.service.DNSService, '_start') + @mock.patch.object(handler, 'RequestHandler') + @mock.patch.object(designate.service.DNSService, 'start') @mock.patch.object(designate.utils, 'cache_result') - @mock.patch.object(designate.storage.base.Storage, 'get_driver') - def test_dns_application(self, mock_req_handler, mock_cache_result, - mock_service_start, mock_get_driver): - mock_driver = mock.MagicMock() - mock_driver.name = 'noop_driver' - mock_get_driver.return_value = mock_driver + def test_dns_application(self, mock_cache_result, mock_dns_start, + mock_request_handler): - app = self.service._dns_application + app = self.service.dns_application self.assertIsInstance(app, designate.dnsutils.DNSMiddleware) diff --git a/designate/tests/unit/pool_manager/test_service.py b/designate/tests/unit/pool_manager/test_service.py index e96324fff..9ab3f808a 100644 --- a/designate/tests/unit/pool_manager/test_service.py +++ b/designate/tests/unit/pool_manager/test_service.py @@ -102,7 +102,7 @@ class PoolManagerInitTest(tests.TestCase): self.service = service.Service() self.assertEqual('test-topic.794ccc2c-d751-44fe-b57f-8894c9f5c842', - self.service._rpc_topic) + self.service.rpc_topic) self.assertEqual('pool_manager', self.service.service_name) @mock.patch('designate.service.RPCService.start') diff --git a/designate/tests/unit/producer/test_service.py b/designate/tests/unit/producer/test_service.py index d54688714..1379a5ff1 100644 --- a/designate/tests/unit/producer/test_service.py +++ b/designate/tests/unit/producer/test_service.py @@ -19,18 +19,20 @@ Unit-test Producer service """ import mock +import oslotest.base from oslo_config import cfg from oslo_config import fixture as cfg_fixture -from oslotest import base as test from designate.producer import service +import designate.service +from designate.tests import fixtures from designate.tests.unit import RoObject CONF = cfg.CONF -@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance') -class ProducerTest(test.BaseTestCase): +@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance', mock.Mock()) +class ProducerTest(oslotest.base.BaseTestCase): def setUp(self): self.useFixture(cfg_fixture.Config(CONF)) @@ -46,28 +48,45 @@ class ProducerTest(test.BaseTestCase): 'producer_task:zone_purge': '', }) super(ProducerTest, self).setUp() - self.service = service.Service() - self.service._storage = mock.Mock() - self.service._rpc_server = mock.Mock() - self.service._quota = mock.Mock() - self.service.quota.limit_check = mock.Mock() + self.stdlog = fixtures.StandardLogging() + self.useFixture(self.stdlog) - def test_service_name(self, _): + self.service = service.Service() + self.service.rpc_server = mock.Mock() + self.service._storage = mock.Mock() + self.service._quota = mock.Mock() + self.service._quota.limit_check = mock.Mock() + + @mock.patch.object(service.tasks, 'PeriodicTask') + @mock.patch.object(service.coordination, 'Partitioner') + @mock.patch.object(designate.service.RPCService, 'start') + def test_service_start(self, mock_rpc_start, mock_partitioner, + mock_periodic_task): + self.service.coordination = mock.Mock() + + self.service.start() + + self.assertTrue(mock_rpc_start.called) + + def test_service_stop(self): + self.service.coordination.stop = mock.Mock() + + self.service.stop() + + self.assertTrue(self.service.coordination.stop.called) + + self.assertIn('Stopping producer service', self.stdlog.logger.output) + + def test_service_name(self): self.assertEqual('producer', self.service.service_name) - def test_producer_rpc_topic(self, _): + def test_producer_rpc_topic(self): CONF.set_override('topic', 'test-topic', 'service:producer') self.service = service.Service() - self.assertEqual('test-topic', self.service._rpc_topic) + self.assertEqual('test-topic', self.service.rpc_topic) self.assertEqual('producer', self.service.service_name) - def test_central_api(self, _): - capi = self.service.central_api - self.assertIsInstance(capi, mock.MagicMock) - - @mock.patch.object(service.tasks, 'PeriodicTask') - @mock.patch.object(service.coordination, 'Partitioner') - def test_stark(self, _, mock_partitioner, mock_PeriodicTask): - self.service.start() + def test_central_api(self): + self.assertIsInstance(self.service.central_api, mock.Mock) diff --git a/designate/tests/unit/sink/test_service.py b/designate/tests/unit/sink/test_service.py index 13ac41bc4..b792b1f2a 100644 --- a/designate/tests/unit/sink/test_service.py +++ b/designate/tests/unit/sink/test_service.py @@ -11,14 +11,13 @@ # under the License.mport threading import mock +import designate.tests import designate.rpc -from designate import tests from designate.sink import service from designate.tests import fixtures -class TestSinkService(tests.TestCase): - +class TestSinkService(designate.tests.TestCase): def setUp(self): super(TestSinkService, self).setUp() self.stdlog = fixtures.StandardLogging() @@ -35,8 +34,7 @@ class TestSinkService(tests.TestCase): self.assertTrue(mock_notification_listener.called) - @mock.patch.object(designate.rpc, 'get_notification_listener') - def test_service_stop(self, mock_notification_listener): + def test_service_stop(self): self.service.stop() self.assertIn('Stopping sink service', self.stdlog.logger.output) diff --git a/designate/tests/unit/test_heartbeat.py b/designate/tests/unit/test_heartbeat.py new file mode 100644 index 000000000..fbb01862f --- /dev/null +++ b/designate/tests/unit/test_heartbeat.py @@ -0,0 +1,53 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock +import oslotest.base +from oslo_config import cfg +from oslo_config import fixture as cfg_fixture + +from designate import service + +CONF = cfg.CONF + + +class HeartbeatTest(oslotest.base.BaseTestCase): + def setUp(self): + super(HeartbeatTest, self).setUp() + self.useFixture(cfg_fixture.Config(CONF)) + + CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter') + + self.mock_tg = mock.Mock() + self.heartbeat = service.Heartbeat('test', self.mock_tg) + + def test_get_status(self): + self.assertEqual(('UP', {}, {},), self.heartbeat.get_status()) + + def test_get_heartbeat_emitter(self): + self.assertEqual( + 'noop', self.heartbeat.heartbeat_emitter.__plugin_name__ + ) + + def test_start_heartbeat(self): + self.assertFalse(self.heartbeat.heartbeat_emitter._running) + + self.heartbeat.start() + + self.assertTrue(self.heartbeat.heartbeat_emitter._running) + + def test_stop_heartbeat(self): + self.assertFalse(self.heartbeat.heartbeat_emitter._running) + + self.heartbeat.start() + self.heartbeat.stop() + + self.assertFalse(self.heartbeat.heartbeat_emitter._running) diff --git a/designate/tests/unit/workers/test_service.py b/designate/tests/unit/workers/test_service.py index aca00dd48..37072f110 100644 --- a/designate/tests/unit/workers/test_service.py +++ b/designate/tests/unit/workers/test_service.py @@ -24,6 +24,7 @@ import designate.tests from designate import backend from designate import exceptions from designate import objects +from designate.tests import fixtures from designate.worker import processing from designate.worker import service @@ -33,6 +34,8 @@ CONF = cfg.CONF class TestService(oslotest.base.BaseTestCase): def setUp(self): super(TestService, self).setUp() + self.stdlog = fixtures.StandardLogging() + self.useFixture(self.stdlog) self.useFixture(cfg_fixture.Config(CONF)) self.context = mock.Mock() @@ -40,10 +43,16 @@ class TestService(oslotest.base.BaseTestCase): self.service = service.Service() @mock.patch.object(designate.service.RPCService, 'start') - def test_service_start(self, mock_service_start): + def test_service_start(self, mock_rpc_start): self.service.start() - self.assertTrue(mock_service_start.called) + self.assertTrue(mock_rpc_start.called) + + @mock.patch.object(designate.rpc, 'get_notification_listener') + def test_service_stop(self, mock_notification_listener): + self.service.stop() + + self.assertIn('Stopping worker service', self.stdlog.logger.output) def test_service_name(self): self.assertEqual('worker', self.service.service_name) @@ -53,7 +62,7 @@ class TestService(oslotest.base.BaseTestCase): self.service = service.Service() - self.assertEqual('test-topic', self.service._rpc_topic) + self.assertEqual('test-topic', self.service.rpc_topic) self.assertEqual('worker', self.service.service_name) def test_central_api(self): diff --git a/designate/worker/service.py b/designate/worker/service.py index b08cf9239..af86cd5cd 100644 --- a/designate/worker/service.py +++ b/designate/worker/service.py @@ -41,25 +41,38 @@ class AlsoNotifyTask(object): pass -class Service(service.RPCService, service.Service): +class Service(service.RPCService): RPC_API_VERSION = '1.0' target = messaging.Target(version=RPC_API_VERSION) + def __init__(self): + self._central_api = None + self._storage = None + + self._executor = None + self._pools_map = None + + super(Service, self).__init__( + self.service_name, cfg.CONF['service:worker'].topic, + threads=cfg.CONF['service:worker'].threads, + ) + @property def central_api(self): - if not hasattr(self, '_central_api'): + if not self._central_api: self._central_api = central_api.CentralAPI.get_instance() return self._central_api - def _setup_target_backends(self, pool): + @staticmethod + def _setup_target_backends(pool): for target in pool.targets: # Fetch an instance of the Backend class target.backend = backend.get_backend(target) LOG.info('%d targets setup', len(pool.targets)) - if len(pool.targets) == 0: + if not pool.targets: raise exceptions.NoPoolTargetsConfigured() return pool @@ -97,21 +110,21 @@ class Service(service.RPCService, service.Service): @property def storage(self): - if not hasattr(self, '_storage'): + if not self._storage: storage_driver = cfg.CONF['service:worker'].storage_driver self._storage = storage.get_storage(storage_driver) return self._storage @property def executor(self): - if not hasattr(self, '_executor'): + if not self._executor: # TODO(elarson): Create this based on config self._executor = processing.Executor() return self._executor @property def pools_map(self): - if not hasattr(self, '_pools_map'): + if self._pools_map is None: self._pools_map = {} return self._pools_map @@ -125,6 +138,9 @@ class Service(service.RPCService, service.Service): super(Service, self).start() LOG.info('Started worker') + def stop(self, graceful=True): + super(Service, self).stop(graceful) + def _do_zone_action(self, context, zone): pool = self.get_pool(zone.pool_id) all_tasks = [] diff --git a/etc/designate/designate-config-generator.conf b/etc/designate/designate-config-generator.conf index d35bbceb8..4ac8762da 100644 --- a/etc/designate/designate-config-generator.conf +++ b/etc/designate/designate-config-generator.conf @@ -8,6 +8,7 @@ namespace = oslo.policy namespace = oslo.service.periodic_task namespace = oslo.service.service namespace = oslo.service.sslutils +namespace = oslo.service.wsgi namespace = oslo.db namespace = oslo.middleware namespace = oslo.concurrency diff --git a/releasenotes/notes/new-service-layer-8023c242de89075a.yaml b/releasenotes/notes/new-service-layer-8023c242de89075a.yaml new file mode 100644 index 000000000..06f87b53e --- /dev/null +++ b/releasenotes/notes/new-service-layer-8023c242de89075a.yaml @@ -0,0 +1,18 @@ +--- +upgrade: + - | + The previously deprecated options ``api_host``, ``api_port``, ``host`` and + ``port`` have been permanently removed and are replaced by ``listen``. + + e.g. + + .. code-block:: ini + + [service:api] + listen = 0.0.0.0:9001 + + .. + - | + The Designate ``sink`` service will now use the heartbeat reporting system to + report its status. This was already the case for all other Designate + services.