Refactored service layer

Service layer has been simplified by removing abstraction
and making the implementation more in line with other
OpenStack projects.

Moved Heartbeat code out of Service class and
into the console scripts. We only need one instance
of the Heartbeat Emitter.

Cleaned up the WSGI code by making use of the
reusable oslo_service.wsgi code.

* Added Heartbeat to designate-sink.
* Cleaned up and refactored Service layers.
* Fixed various bugs e.g. errors on shutdown.
* Removed deprecated options host, port etc.
* Simplified Heartbeat implementation.

Closes-Bug: #1442141
Change-Id: I536b92407bf6ca5bddf4c048909cd13d4e094d26
changes/32/678432/40
Erik Olof Gunnar Andersson 3 years ago
parent 23f6a79aef
commit a09064a5d1
  1. 1
      designate/__init__.py
  2. 36
      designate/agent/service.py
  3. 29
      designate/api/service.py
  4. 29
      designate/central/service.py
  5. 5
      designate/cmd/agent.py
  6. 6
      designate/cmd/api.py
  7. 9
      designate/cmd/central.py
  8. 6
      designate/cmd/mdns.py
  9. 9
      designate/cmd/pool_manager.py
  10. 6
      designate/cmd/producer.py
  11. 5
      designate/cmd/sink.py
  12. 6
      designate/cmd/worker.py
  13. 8
      designate/cmd/zone_manager.py
  14. 8
      designate/conf/agent.py
  15. 8
      designate/conf/api.py
  16. 8
      designate/conf/mdns.py
  17. 45
      designate/coordination.py
  18. 42
      designate/mdns/service.py
  19. 60
      designate/pool_manager/service.py
  20. 34
      designate/producer/service.py
  21. 316
      designate/service.py
  22. 5
      designate/service_status.py
  23. 13
      designate/sink/service.py
  24. 3
      designate/tests/test_api/test_service.py
  25. 44
      designate/tests/test_coordination.py
  26. 26
      designate/tests/test_mdns/test_service.py
  27. 2
      designate/tests/unit/agent/backends/test_bind9.py
  28. 2
      designate/tests/unit/agent/backends/test_denominator.py
  29. 2
      designate/tests/unit/agent/backends/test_fake.py
  30. 34
      designate/tests/unit/agent/test_service.py
  31. 53
      designate/tests/unit/mdns/test_service.py
  32. 2
      designate/tests/unit/pool_manager/test_service.py
  33. 51
      designate/tests/unit/producer/test_service.py
  34. 8
      designate/tests/unit/sink/test_service.py
  35. 53
      designate/tests/unit/test_heartbeat.py
  36. 15
      designate/tests/unit/workers/test_service.py
  37. 30
      designate/worker/service.py
  38. 1
      etc/designate/designate-config-generator.conf
  39. 18
      releasenotes/notes/new-service-layer-8023c242de89075a.yaml

@ -24,7 +24,6 @@ from oslo_concurrency import lockutils
import oslo_messaging as messaging
_EXTRA_DEFAULT_LOG_LEVELS = [
'eventlet.wsgi.server=WARN',
'kazoo.client=WARN',
'keystone=INFO',
'oslo_service.loopingcall=WARN',

@ -37,22 +37,41 @@ from designate.utils import DEFAULT_AGENT_PORT
CONF = cfg.CONF
class Service(service.DNSService, service.Service):
class Service(service.Service):
_dns_default_port = DEFAULT_AGENT_PORT
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
super(Service, self).__init__(
self.service_name, threads=cfg.CONF['service:agent'].threads
)
self.dns_service = service.DNSService(
self.dns_application, self.tg,
cfg.CONF['service:agent'].listen,
cfg.CONF['service:agent'].tcp_backlog,
cfg.CONF['service:agent'].tcp_recv_timeout,
)
backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)
def start(self):
super(Service, self).start()
self.dns_service.start()
self.backend.start()
def stop(self, graceful=False):
self.dns_service.stop()
self.backend.stop()
super(Service, self).stop(graceful)
@property
def service_name(self):
return 'agent'
@property
@utils.cache_result
def _dns_application(self):
def dns_application(self):
# Create an instance of the RequestHandler class
application = handler.RequestHandler()
if cfg.CONF['service:agent'].notify_delay > 0.0:
@ -60,12 +79,3 @@ class Service(service.DNSService, service.Service):
application = dnsutils.SerializationMiddleware(application)
return application
def start(self):
super(Service, self).start()
self.backend.start()
def stop(self):
super(Service, self).stop()
# TODO(kiall): Shouldn't we be stppping the backend here too? To fix
# in another review.

@ -18,41 +18,32 @@ from oslo_log import log as logging
from paste import deploy
from designate import exceptions
from designate import utils
from designate import service
from designate import service_status
from designate import utils
LOG = logging.getLogger(__name__)
class Service(service.WSGIService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
emitter_cls = service_status.HeartBeatEmitter.get_driver(
cfg.CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
self.service_name, self.tg, status_factory=self._get_status
class Service(service.WSGIService):
def __init__(self):
super(Service, self).__init__(
self.wsgi_application,
self.service_name,
cfg.CONF['service:api'].listen,
)
def start(self):
super(Service, self).start()
self.heartbeat_emitter.start()
def _get_status(self):
status = "UP"
stats = {}
capabilities = {}
return status, stats, capabilities
def stop(self, graceful=True):
super(Service, self).stop(graceful)
@property
def service_name(self):
return 'api'
@property
def _wsgi_application(self):
def wsgi_application(self):
api_paste_config = cfg.CONF['service:api'].api_paste_config
config_paths = utils.find_config(api_paste_config)

@ -184,37 +184,40 @@ def notification(notification_type):
return outer
class Service(service.RPCService, service.Service):
class Service(service.RPCService):
RPC_API_VERSION = '6.2'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
self._scheduler = None
self._storage = None
self._quota = None
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
super(Service, self).__init__(
self.service_name, cfg.CONF['service:central'].topic,
threads=cfg.CONF['service:central'].threads,
)
# update_service_status needs is called by the emitter so we pass
# ourselves as the rpc_api.
self.heartbeat_emitter.rpc_api = self
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
@property
def scheduler(self):
if not hasattr(self, '_scheduler'):
if not self._scheduler:
# Get a scheduler instance
self._scheduler = scheduler.get_scheduler(storage=self.storage)
return self._scheduler
@property
def quota(self):
if not hasattr(self, '_quota'):
if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def storage(self):
if not hasattr(self, '_storage'):
if not self._storage:
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self._storage = storage.get_storage(storage_driver)
@ -232,8 +235,8 @@ class Service(service.RPCService, service.Service):
super(Service, self).start()
def stop(self):
super(Service, self).stop()
def stop(self, graceful=True):
super(Service, self).stop(graceful)
@property
def mdns_api(self):
@ -251,7 +254,7 @@ class Service(service.RPCService, service.Service):
def zone_api(self):
# TODO(timsim): Remove this when pool_manager_api is gone
if cfg.CONF['service:worker'].enabled:
return self.worker_api
return self.worker_api
return self.pool_manager_api
def _is_valid_zone_name(self, context, zone_name):

@ -28,7 +28,6 @@ from designate.agent import service as agent_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.agent', group='service:agent')
CONF.import_opt('threads', 'designate.agent', group='service:agent')
def main():
@ -38,6 +37,8 @@ def main():
hookpoints.log_hook_setup()
server = agent_service.Service(threads=CONF['service:agent'].threads)
server = agent_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()

@ -29,7 +29,6 @@ from designate.api import service as api_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.api', group='service:api')
CONF.import_opt('threads', 'designate.api', group='service:api')
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
@ -40,7 +39,8 @@ def main():
hookpoints.log_hook_setup()
server = api_service.Service(threads=CONF['service:api'].threads)
server = api_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:api'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -23,12 +23,11 @@ from designate import hookpoints
from designate import service
from designate import utils
from designate import version
from designate.central import service as central
from designate.central import service as central_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.central', group='service:central')
CONF.import_opt('threads', 'designate.central', group='service:central')
def main():
@ -38,7 +37,9 @@ def main():
hookpoints.log_hook_setup()
server = central.Service(threads=CONF['service:central'].threads)
server = central_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg,
rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -28,7 +28,6 @@ from designate.mdns import service as mdns_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.mdns', group='service:mdns')
CONF.import_opt('threads', 'designate.mdns', group='service:mdns')
def main():
@ -38,7 +37,8 @@ def main():
hookpoints.log_hook_setup()
server = mdns_service.Service(threads=CONF['service:mdns'].threads)
server = mdns_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:mdns'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.pool_manager',
group='service:pool_manager')
CONF.import_opt('threads', 'designate.pool_manager',
group='service:pool_manager')
def main():
@ -53,12 +51,11 @@ def main():
'designate-worker', version='newton',
removal_version='rocky')
server = pool_manager_service.Service(
threads=CONF['service:pool_manager'].threads
)
server = pool_manager_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
hookpoints.log_hook_setup()
service.serve(server, workers=CONF['service:pool_manager'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -28,7 +28,6 @@ from designate.producer import service as producer_service
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer', group='service:producer')
CONF.import_opt('threads', 'designate.producer', group='service:producer')
def main():
@ -46,7 +45,8 @@ def main():
hookpoints.log_hook_setup()
server = producer_service.Service(threads=CONF['service:producer'].threads)
server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:producer'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -28,7 +28,6 @@ from designate.sink import service as sink_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.sink', group='service:sink')
CONF.import_opt('threads', 'designate.sink', group='service:sink')
def main():
@ -38,6 +37,8 @@ def main():
hookpoints.log_hook_setup()
server = sink_service.Service(threads=CONF['service:sink'].threads)
server = sink_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()

@ -28,7 +28,6 @@ from designate.worker import service as worker_service
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.worker', group='service:worker')
CONF.import_opt('threads', 'designate.worker', group='service:worker')
def main():
@ -46,7 +45,8 @@ def main():
hookpoints.log_hook_setup()
server = worker_service.Service(threads=CONF['service:worker'].threads)
server = worker_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:worker'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer',
group='service:zone_manager')
CONF.import_opt('threads', 'designate.producer',
group='service:zone_manager')
def main():
@ -56,8 +54,8 @@ def main():
LOG.warning('Starting designate-producer under the zone-manager name')
server = producer_service.Service(
threads=CONF['service:zone_manager'].threads)
server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:zone_manager'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

@ -27,14 +27,6 @@ AGENT_OPTS = [
help='Number of agent worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of agent greenthreads to spawn'),
cfg.IPOpt('host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='Agent Bind Host'),
cfg.PortOpt('port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='Agent Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_AGENT_PORT],
help='Agent host:port pairs to listen on'),

@ -33,14 +33,6 @@ API_OPTS = [
'the hostname, port, and any paths that are added'
'to the base of Designate is URLs,'
'For example http://dns.openstack.example.com/dns'),
cfg.IPOpt('api_host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='API Bind Host'),
cfg.PortOpt('api_port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='API Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:9001'],
help='API host:port pairs to listen on'),

@ -26,14 +26,6 @@ MDNS_OPTS = [
help='Number of mdns worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of mdns greenthreads to spawn'),
cfg.IPOpt('host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='mDNS Bind Host'),
cfg.PortOpt('port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='mDNS Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_MDNS_PORT],
help='mDNS host:port pairs to listen on'),

@ -35,21 +35,31 @@ def _retry_if_tooz_error(exception):
return isinstance(exception, tooz.coordination.ToozError)
class CoordinationMixin(object):
def __init__(self, *args, **kwargs):
super(CoordinationMixin, self).__init__(*args, **kwargs)
class Coordination(object):
def __init__(self, name, tg):
self.name = name
self.tg = tg
self.coordination_id = None
self._coordinator = None
self._started = False
@property
def coordinator(self):
return self._coordinator
@property
def started(self):
return self._started
def start(self):
self._coordination_id = ":".join([CONF.host, generate_uuid()])
self.coordination_id = ":".join([CONF.host, generate_uuid()])
if CONF.coordination.backend_url is not None:
backend_url = CONF.coordination.backend_url
self._coordinator = tooz.coordination.get_coordinator(
backend_url, self._coordination_id.encode())
self._coordination_started = False
backend_url, self.coordination_id.encode())
self._started = False
self.tg.add_timer(CONF.coordination.heartbeat_interval,
self._coordinator_heartbeat)
@ -61,25 +71,22 @@ class CoordinationMixin(object):
"coordination functionality will be disabled. "
"Please configure a coordination backend.")
super(CoordinationMixin, self).start()
if self._coordinator is not None:
while not self._coordination_started:
while not self._started:
try:
self._coordinator.start()
try:
create_group_req = self._coordinator.create_group(
self.service_name)
self.name)
create_group_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
join_group_req = self._coordinator.join_group(
self.service_name)
join_group_req = self._coordinator.join_group(self.name)
join_group_req.get()
self._coordination_started = True
self._started = True
except Exception:
LOG.warning("Failed to start Coordinator:", exc_info=True)
@ -87,18 +94,16 @@ class CoordinationMixin(object):
def stop(self):
if self._coordinator is not None:
self._coordination_started = False
self._started = False
leave_group_req = self._coordinator.leave_group(self.service_name)
leave_group_req = self._coordinator.leave_group(self.name)
leave_group_req.get()
self._coordinator.stop()
super(CoordinationMixin, self).stop()
self._coordinator = None
def _coordinator_heartbeat(self):
if not self._coordination_started:
if not self._started:
return
try:
@ -107,7 +112,7 @@ class CoordinationMixin(object):
LOG.exception('Error sending a heartbeat to coordination backend.')
def _coordinator_run_watchers(self):
if not self._coordination_started:
if not self._started:
return
self._coordinator.run_watchers()

@ -16,10 +16,10 @@
from oslo_config import cfg
from oslo_log import log as logging
from designate import utils
from designate import dnsutils
from designate import service
from designate import storage
from designate import dnsutils
from designate import utils
from designate.mdns import handler
from designate.mdns import notify
from designate.mdns import xfr
@ -29,13 +29,38 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Service(service.DNSService, service.RPCService, service.Service):
class Service(service.RPCService):
_dns_default_port = DEFAULT_MDNS_PORT
def __init__(self):
self._storage = None
super(Service, self).__init__(
self.service_name, cfg.CONF['service:mdns'].topic,
threads=cfg.CONF['service:mdns'].threads,
)
self.override_endpoints(
[notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
)
self.dns_service = service.DNSService(
self.dns_application, self.tg,
cfg.CONF['service:mdns'].listen,
cfg.CONF['service:mdns'].tcp_backlog,
cfg.CONF['service:mdns'].tcp_recv_timeout,
)
def start(self):
super(Service, self).start()
self.dns_service.start()
def stop(self, graceful=False):
self.dns_service.stop()
super(Service, self).stop(graceful)
@property
def storage(self):
if not hasattr(self, '_storage'):
# Get a storage connection
if not self._storage:
self._storage = storage.get_storage(
CONF['service:mdns'].storage_driver
)
@ -47,12 +72,7 @@ class Service(service.DNSService, service.RPCService, service.Service):
@property
@utils.cache_result
def _rpc_endpoints(self):
return [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
@property
@utils.cache_result
def _dns_application(self):
def dns_application(self):
# Create an instance of the RequestHandler class and wrap with
# necessary middleware.
application = handler.RequestHandler(self.storage, self.tg)

@ -76,8 +76,7 @@ def _constant_retries(num_attempts, sleep_interval):
yield True
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
class Service(service.RPCService):
"""
Service side of the Pool Manager RPC API.
@ -91,8 +90,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
self._scheduler = None
self._storage = None
self._quota = None
self._pool_election = None
self._central_api = None
self._mdns_api = None
self._pool_manager_api = None
topic = '%s.%s' % (
cfg.CONF['service:pool_manager'].topic,
CONF['service:pool_manager'].pool_id
)
super(Service, self).__init__(
self.service_name, topic,
threads=cfg.CONF['service:worker'].threads,
)
self.coordination = coordination.Coordination(
self.service_name, self.tg
)
# Get a pool manager cache connection.
self.cache = cache.get_pool_manager_cache(
@ -110,8 +131,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
CONF['service:pool_manager'].periodic_sync_retry_interval
# Compute a time (seconds) by which things should have propagated
self.max_prop_time = utils.max_prop_time(self.timeout,
self.max_retries, self.retry_interval, self.delay)
self.max_prop_time = utils.max_prop_time(
self.timeout, self.max_retries, self.retry_interval, self.delay
)
def _setup_target_backends(self):
self.target_backends = {}
@ -130,17 +152,6 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def service_name(self):
return 'pool_manager'
@property
def _rpc_topic(self):
# Modify the default topic so it's pool manager instance specific.
topic = super(Service, self)._rpc_topic
topic = '%s.%s' % (topic, CONF['service:pool_manager'].pool_id)
LOG.info('Using topic %(topic)s for this pool manager instance.',
{'topic': topic})
return topic
def start(self):
# Build the Pool (and related) Object from Config
context = DesignateContext.get_admin_context()
@ -182,11 +193,13 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self.target_backends[target.id].start()
super(Service, self).start()
self.coordination.start()
# Setup a Leader Election, use for ensuring certain tasks are executed
# on exactly one pool-manager instance at a time]
self._pool_election = coordination.LeaderElection(
self._coordinator, '%s:%s' % (self.service_name, self.pool.id))
self.coordination.coordinator,
'%s:%s' % (self.service_name, self.pool.id))
self._pool_election.start()
if CONF['service:pool_manager'].enable_recovery_timer:
@ -201,29 +214,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
' %(interval)s s', {'interval': interval})
self.tg.add_timer(interval, self.periodic_sync, interval)
def stop(self):
def stop(self, graceful=True):
self._pool_election.stop()
# self.coordination.stop()
super(Service, self).stop()
super(Service, self).stop(graceful)
for target in self.pool.targets:
self.target_backends[target.id].stop()
@property
def central_api(self):
if not hasattr(self, '_central_api'):
if not self._central_api:
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
@property
def mdns_api(self):
if not hasattr(self, '_mdns_adpi'):
if not self._mdns_api:
self._mdns_api = mdns_api.MdnsAPI.get_instance()
return self._mdns_api
@property
def pool_manager_api(self):
if not hasattr(self, '_pool_manager_api'):
if not self._pool_manager_api:
pool_mgr_api = pool_manager_rpcapi.PoolManagerAPI
self._pool_manager_api = pool_mgr_api.get_instance()
return self._pool_manager_api

@ -31,15 +31,29 @@ CONF = cfg.CONF
NS = 'designate.periodic_tasks'
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
class Service(service.RPCService):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self):
self._partitioner = None
self._storage = None
self._quota = None
super(Service, self).__init__(
self.service_name, cfg.CONF['service:producer'].topic,
threads=cfg.CONF['service:producer'].threads,
)
self.coordination = coordination.Coordination(
self.service_name, self.tg
)
@property
def storage(self):
if not hasattr(self, '_storage'):
if not self._storage:
# TODO(timsim): Remove this when zone_mgr goes away
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
if cfg.CONF['service:producer'].storage_driver != storage_driver:
@ -49,7 +63,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
@property
def quota(self):
if not hasattr(self, '_quota'):
if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@ -64,10 +78,12 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def start(self):
super(Service, self).start()
self.coordination.start()
self._partitioner = coordination.Partitioner(
self._coordinator, self.service_name,
self._coordination_id.encode(), range(0, 4095))
self.coordination.coordinator, self.service_name,
self.coordination.coordination_id.encode(), range(0, 4095)
)
self._partitioner.start()
self._partitioner.watch_partition_change(self._rebalance)
@ -76,7 +92,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
zmgr_enabled_tasks = CONF['service:zone_manager'].enabled_tasks
producer_enabled_tasks = CONF['service:producer'].enabled_tasks
enabled = zmgr_enabled_tasks
if producer_enabled_tasks != []:
if producer_enabled_tasks:
enabled = producer_enabled_tasks
for task in tasks.PeriodicTask.get_extensions(enabled):
@ -91,6 +107,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
interval = CONF[task.get_canonical_name()].interval
self.tg.add_timer(interval, task)
def stop(self, graceful=True):
super(Service, self).stop(graceful)
self.coordination.stop()
def _rebalance(self, my_partitions, members, event):
LOG.info("Received rebalance event %s", event)
self.partition_range = my_partitions

@ -17,241 +17,161 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import errno
import socket
import struct
import errno
import six
import eventlet.wsgi
import eventlet.debug
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
from oslo_service import sslutils
from oslo_service import wsgi
from oslo_utils import netutils
import designate.conf
from designate.i18n import _
from designate.metrics import metrics
from designate import policy
from designate import rpc
from designate import service_status
from designate import version
from designate import utils
# TODO(kiall): These options have been cut+paste from the old WSGI code, and
# should be moved into service:api etc..
from designate import version
import designate.conf
from designate.i18n import _
from designate.metrics import metrics
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Service(service.Service):
"""
Service class to be shared among the diverse service inside of Designate.
"""
def __init__(self, threads=None):
def __init__(self, name, threads=None):
threads = threads or 1000
super(Service, self).__init__(threads)
self._host = CONF.host
self._service_config = CONF['service:%s' % self.service_name]
self.name = name
self.host = CONF.host
policy.init()
# NOTE(kiall): All services need RPC initialized, as this is used
# for clients AND servers. Hence, this is common to
# all Designate services.
if not rpc.initialized():
rpc.init(CONF)
@abc.abstractproperty
def service_name(self):
pass
def start(self):
super(Service, self).start()
LOG.info('Starting %(name)s service (version: %(version)s)',
{
'name': self.service_name,
'name': self.name,
'version': version.version_info.version_string()
})
super(Service, self).start()
def stop(self):
LOG.info('Stopping %(name)s service', {'name': self.service_name})
super(Service, self).stop()
def _get_listen_on_addresses(self, default_port):
"""
Helper Method to handle migration from singular host/port to
multiple binds
"""
try:
# The API service uses "api_host", and "api_port", others use
# just host and port.
host = self._service_config.api_host
port = self._service_config.api_port
except cfg.NoSuchOptError:
host = self._service_config.host
port = self._service_config.port
if host or port is not None:
LOG.warning("host and port config options used, the 'listen' "
"option has been ignored")
host = host or "0.0.0.0"
# "port" might be 0 to pick a free port, usually during testing
port = default_port if port is None else port
return [(host, port)]
else:
return map(
netutils.parse_host_port,
set(self._service_config.listen)
)
def stop(self, graceful=True):
LOG.info('Stopping %(name)s service', {'name': self.name})
super(Service, self).stop(graceful)
class RPCService(object):
"""
RPC Service mixin used by all Designate RPC Services
"""
def __init__(self, *args, **kwargs):
super(RPCService, self).__init__(*args, **kwargs)
class Heartbeat(object):
def __init__(self, name, tg, rpc_api=None):
self.name = name
self.tg = tg
LOG.debug("Creating RPC Server on topic '%s'", self._rpc_topic)
self._rpc_server = rpc.get_server(
messaging.Target(topic=self._rpc_topic, server=self._host),
self._rpc_endpoints)
self._status = 'UP'
self._stats = {}
self._capabilities = {}
emitter_cls = service_status.HeartBeatEmitter.get_driver(
CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
self.service_name, self.tg, status_factory=self._get_status
self.name, self.tg,
status_factory=self.get_status, rpc_api=rpc_api
)
def _get_status(self):
status = "UP"
stats = {}
capabilities = {}
return status, stats, capabilities
@property
def _rpc_endpoints(self):
return [self]
@property
def _rpc_topic(self):
return CONF['service:%s' % self.service_name].topic
def get_status(self):
return self._status, self._stats, self._capabilities
def start(self):
super(RPCService, self).start()
LOG.debug("Starting RPC server on topic '%s'", self._rpc_topic)
self._rpc_server.start()
# TODO(kiall): This probably belongs somewhere else, maybe the base
# Service class?
self.notifier = rpc.get_notifier(self.service_name)
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'start'):
e.start()
self.heartbeat_emitter.start()
def stop(self):
LOG.debug("Stopping RPC server on topic '%s'", self._rpc_topic)
self.heartbeat_emitter.stop()
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'stop'):
e.stop()
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self._rpc_server.stop()
except Exception:
pass
class RPCService(Service):
def __init__(self, name, rpc_topic, threads=None):
super(RPCService, self).__init__(name, threads)
LOG.debug("Creating RPC Server on topic '%s' for %s",
rpc_topic, self.name)
super(RPCService, self).stop()
self.endpoints = [self]
self.notifier = None
self.rpc_server = None
self.rpc_topic = rpc_topic
def wait(self):
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'wait'):
e.wait()
def override_endpoints(self, endpoints):
self.endpoints = endpoints
super(RPCService, self).wait()
@six.add_metaclass(abc.ABCMeta)
class WSGIService(object):
"""
WSGI Service mixin used by all Designate WSGI Services
"""
def __init__(self, *args, **kwargs):
super(WSGIService, self).__init__(*args, **kwargs)
def start(self):
super(RPCService, self).start()
target = messaging.Target(topic=self.rpc_topic, server=self.host)
self.rpc_server = rpc.get_server(target, self.endpoints)
self.rpc_server.start()
self.notifier = rpc.get_notifier(self.name)
self._use_ssl = sslutils.is_enabled(CONF)
self._wsgi_socks = []
def stop(self, graceful=True):
if self.rpc_server:
self.rpc_server.stop()
super(RPCService, self).stop(graceful)
@abc.abstractproperty
def _wsgi_application(self):
pass
def wait(self):
super(RPCService, self).wait()
def start(self):
super(WSGIService, self).start()
addresses = self._get_listen_on_addresses(9001)
class WSGIService(Service):
def __init__(self, app, name, listen, max_url_len=None):
super(WSGIService, self).__init__(name)
self.app = app
self.name = name
for address in addresses:
self._start(address[0], address[1])
self.listen = listen
def _start(self, host, port):
wsgi_sock = utils.bind_tcp(
host, port, CONF.backlog, CONF.tcp_keepidle)
self.servers = []
if self._use_ssl:
wsgi_sock = sslutils.wrap(CONF, wsgi_sock)
for address in self.listen:
host, port = netutils.parse_host_port(address)
server = wsgi.Server(
CONF, name, app,
host=host,
port=port,
pool_size=CONF['service:api'].threads,
use_ssl=sslutils.is_enabled(CONF),
max_url_len=max_url_len
)
self._wsgi_socks.append(wsgi_sock)
self.servers.append(server)
self.tg.add_thread(self._wsgi_handle, wsgi_sock)
def start(self):
for server in self.servers:
server.start()
super(WSGIService, self).start()
def _wsgi_handle(self, wsgi_sock):
logger = logging.getLogger('eventlet.wsgi')
# Adjust wsgi MAX_HEADER_LINE to accept large tokens.
eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line
def stop(self, graceful=True):
for server in self.servers:
server.stop()
super(WSGIService, self).stop(graceful)
eventlet.wsgi.server(wsgi_sock,
self._wsgi_application,
custom_pool=self.tg.pool,
log=logger)
def wait(self):
for server in self.servers:
server.wait()
super(WSGIService, self).wait()
@six.add_metaclass(abc.ABCMeta)
class DNSService(object):
"""
DNS Service mixin used by all Designate DNS Services
"""
_TCP_RECV_MAX_SIZE = 65535
def __init__(self, *args, **kwargs):
super(DNSService, self).__init__(*args, **kwargs)
def __init__(self, app, tg, listen, tcp_backlog, tcp_recv_timeout):
self.app = app
self.tg = tg
self.tcp_backlog = tcp_backlog
self.tcp_recv_timeout = tcp_recv_timeout
self.listen = listen
metrics.init()
# Eventet will complain loudly about our use of multiple greentheads
@ -261,21 +181,18 @@ class DNSService(object):
self._dns_socks_tcp = []
self._dns_socks_udp = []
@abc.abstractproperty
def _dns_application(self):
pass
def start(self):
super(DNSService, self).start()
addresses = self._get_listen_on_addresses(self._dns_default_port)
addresses = map(
netutils.parse_host_port,
set(self.listen)
)
for address in addresses:
self._start(address[0], address[1])
def _start(self, host, port):
sock_tcp = utils.bind_tcp(
host, port, self._service_config.tcp_backlog)
host, port, self.tcp_backlog)
sock_udp = utils.bind_udp(
host, port)
@ -286,14 +203,7 @@ class DNSService(object):
self.tg.add_thread(self._dns_handle_tcp, sock_tcp)
self.tg.add_thread(self._dns_handle_udp, sock_udp)
def wait(self):
super(DNSService, self).wait()
def stop(self):
# When the service is stopped, the threads for _handle_tcp and
# _handle_udp are stopped too.
super(DNSService, self).stop()
for sock_tcp in self._dns_socks_tcp:
sock_tcp.close()
@ -301,7 +211,7 @@ class DNSService(object):
sock_udp.close()
def _dns_handle_tcp(self, sock_tcp):
LOG.info("_handle_tcp thread started")
LOG.info('_handle_tcp thread started')
client = None
while True:
@ -309,13 +219,13 @@ class DNSService(object):
# handle a new TCP connection
client, addr = sock_tcp.accept()
if self._service_config.tcp_recv_timeout:
client.settimeout(self._service_config.tcp_recv_timeout)
if self.tcp_recv_timeout:
client.settimeout(self.tcp_recv_timeout)
LOG.debug("Handling TCP Request from: %(host)s:%(port)d",
LOG.debug('Handling TCP Request from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
if len(addr) == 4:
LOG.debug("Flow info: %(host)s scope: %(port)d",
LOG.debug('Flow info: %(host)s scope: %(port)d',
{'host': addr[2], 'port': addr[3]})
# Dispatch a thread to handle the connection
@ -327,21 +237,21 @@ class DNSService(object):
except socket.timeout:
if client:
client.close()
LOG.warning("TCP Timeout from: %(host)s:%(port)d",
LOG.warning('TCP Timeout from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
except socket.error as e:
if client:
client.close()
errname = errno.errorcode[e.args[0]]
LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
if client:
client.close()
LOG.exception("Unknown exception handling TCP request from: "
"%(host)s:%(port)d",
LOG.exception('Unknown exception handling TCP request from: '