Merge "Refactored service layer"

This commit is contained in:
Zuul 2019-09-25 18:11:31 +00:00 committed by Gerrit Code Review
commit 4d47719b26
39 changed files with 570 additions and 480 deletions

View File

@ -24,7 +24,6 @@ from oslo_concurrency import lockutils
import oslo_messaging as messaging
_EXTRA_DEFAULT_LOG_LEVELS = [
'eventlet.wsgi.server=WARN',
'kazoo.client=WARN',
'keystone=INFO',
'oslo_service.loopingcall=WARN',

View File

@ -37,22 +37,41 @@ from designate.utils import DEFAULT_AGENT_PORT
CONF = cfg.CONF
class Service(service.DNSService, service.Service):
class Service(service.Service):
_dns_default_port = DEFAULT_AGENT_PORT
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
super(Service, self).__init__(
self.service_name, threads=cfg.CONF['service:agent'].threads
)
self.dns_service = service.DNSService(
self.dns_application, self.tg,
cfg.CONF['service:agent'].listen,
cfg.CONF['service:agent'].tcp_backlog,
cfg.CONF['service:agent'].tcp_recv_timeout,
)
backend_driver = cfg.CONF['service:agent'].backend_driver
self.backend = agent_backend.get_backend(backend_driver, self)
def start(self):
super(Service, self).start()
self.dns_service.start()
self.backend.start()
def stop(self, graceful=False):
self.dns_service.stop()
self.backend.stop()
super(Service, self).stop(graceful)
@property
def service_name(self):
return 'agent'
@property
@utils.cache_result
def _dns_application(self):
def dns_application(self):
# Create an instance of the RequestHandler class
application = handler.RequestHandler()
if cfg.CONF['service:agent'].notify_delay > 0.0:
@ -60,12 +79,3 @@ class Service(service.DNSService, service.Service):
application = dnsutils.SerializationMiddleware(application)
return application
def start(self):
super(Service, self).start()
self.backend.start()
def stop(self):
super(Service, self).stop()
# TODO(kiall): Shouldn't we be stppping the backend here too? To fix
# in another review.

View File

@ -18,41 +18,32 @@ from oslo_log import log as logging
from paste import deploy
from designate import exceptions
from designate import utils
from designate import service
from designate import service_status
from designate import utils
LOG = logging.getLogger(__name__)
class Service(service.WSGIService, service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
emitter_cls = service_status.HeartBeatEmitter.get_driver(
cfg.CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
self.service_name, self.tg, status_factory=self._get_status
class Service(service.WSGIService):
def __init__(self):
super(Service, self).__init__(
self.wsgi_application,
self.service_name,
cfg.CONF['service:api'].listen,
)
def start(self):
super(Service, self).start()
self.heartbeat_emitter.start()
def _get_status(self):
status = "UP"
stats = {}
capabilities = {}
return status, stats, capabilities
def stop(self, graceful=True):
super(Service, self).stop(graceful)
@property
def service_name(self):
return 'api'
@property
def _wsgi_application(self):
def wsgi_application(self):
api_paste_config = cfg.CONF['service:api'].api_paste_config
config_paths = utils.find_config(api_paste_config)

View File

@ -184,37 +184,40 @@ def notification(notification_type):
return outer
class Service(service.RPCService, service.Service):
class Service(service.RPCService):
RPC_API_VERSION = '6.2'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
self._scheduler = None
self._storage = None
self._quota = None
super(Service, self).__init__(
self.service_name, cfg.CONF['service:central'].topic,
threads=cfg.CONF['service:central'].threads,
)
self.network_api = network_api.get_network_api(cfg.CONF.network_api)
# update_service_status needs is called by the emitter so we pass
# ourselves as the rpc_api.
self.heartbeat_emitter.rpc_api = self
@property
def scheduler(self):
if not hasattr(self, '_scheduler'):
if not self._scheduler:
# Get a scheduler instance
self._scheduler = scheduler.get_scheduler(storage=self.storage)
return self._scheduler
@property
def quota(self):
if not hasattr(self, '_quota'):
if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@property
def storage(self):
if not hasattr(self, '_storage'):
if not self._storage:
# Get a storage connection
storage_driver = cfg.CONF['service:central'].storage_driver
self._storage = storage.get_storage(storage_driver)
@ -232,8 +235,8 @@ class Service(service.RPCService, service.Service):
super(Service, self).start()
def stop(self):
super(Service, self).stop()
def stop(self, graceful=True):
super(Service, self).stop(graceful)
@property
def mdns_api(self):
@ -251,7 +254,7 @@ class Service(service.RPCService, service.Service):
def zone_api(self):
# TODO(timsim): Remove this when pool_manager_api is gone
if cfg.CONF['service:worker'].enabled:
return self.worker_api
return self.worker_api
return self.pool_manager_api
def _is_valid_zone_name(self, context, zone_name):

View File

@ -28,7 +28,6 @@ from designate.agent import service as agent_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.agent', group='service:agent')
CONF.import_opt('threads', 'designate.agent', group='service:agent')
def main():
@ -38,6 +37,8 @@ def main():
hookpoints.log_hook_setup()
server = agent_service.Service(threads=CONF['service:agent'].threads)
server = agent_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:agent'].workers)
heartbeat.start()
service.wait()

View File

@ -29,7 +29,6 @@ from designate.api import service as api_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.api', group='service:api')
CONF.import_opt('threads', 'designate.api', group='service:api')
cfg.CONF.import_group('keystone_authtoken', 'keystonemiddleware.auth_token')
@ -40,7 +39,8 @@ def main():
hookpoints.log_hook_setup()
server = api_service.Service(threads=CONF['service:api'].threads)
server = api_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:api'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -23,12 +23,11 @@ from designate import hookpoints
from designate import service
from designate import utils
from designate import version
from designate.central import service as central
from designate.central import service as central_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.central', group='service:central')
CONF.import_opt('threads', 'designate.central', group='service:central')
def main():
@ -38,7 +37,9 @@ def main():
hookpoints.log_hook_setup()
server = central.Service(threads=CONF['service:central'].threads)
server = central_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg,
rpc_api=server)
service.serve(server, workers=CONF['service:central'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -28,7 +28,6 @@ from designate.mdns import service as mdns_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.mdns', group='service:mdns')
CONF.import_opt('threads', 'designate.mdns', group='service:mdns')
def main():
@ -38,7 +37,8 @@ def main():
hookpoints.log_hook_setup()
server = mdns_service.Service(threads=CONF['service:mdns'].threads)
server = mdns_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:mdns'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.pool_manager',
group='service:pool_manager')
CONF.import_opt('threads', 'designate.pool_manager',
group='service:pool_manager')
def main():
@ -53,12 +51,11 @@ def main():
'designate-worker', version='newton',
removal_version='rocky')
server = pool_manager_service.Service(
threads=CONF['service:pool_manager'].threads
)
server = pool_manager_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
hookpoints.log_hook_setup()
service.serve(server, workers=CONF['service:pool_manager'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -28,7 +28,6 @@ from designate.producer import service as producer_service
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer', group='service:producer')
CONF.import_opt('threads', 'designate.producer', group='service:producer')
def main():
@ -46,7 +45,8 @@ def main():
hookpoints.log_hook_setup()
server = producer_service.Service(threads=CONF['service:producer'].threads)
server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:producer'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -28,7 +28,6 @@ from designate.sink import service as sink_service
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.sink', group='service:sink')
CONF.import_opt('threads', 'designate.sink', group='service:sink')
def main():
@ -38,6 +37,8 @@ def main():
hookpoints.log_hook_setup()
server = sink_service.Service(threads=CONF['service:sink'].threads)
server = sink_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:sink'].workers)
heartbeat.start()
service.wait()

View File

@ -28,7 +28,6 @@ from designate.worker import service as worker_service
LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.worker', group='service:worker')
CONF.import_opt('threads', 'designate.worker', group='service:worker')
def main():
@ -46,7 +45,8 @@ def main():
hookpoints.log_hook_setup()
server = worker_service.Service(threads=CONF['service:worker'].threads)
server = worker_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:worker'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -30,8 +30,6 @@ LOG = logging.getLogger(__name__)
CONF = designate.conf.CONF
CONF.import_opt('workers', 'designate.producer',
group='service:zone_manager')
CONF.import_opt('threads', 'designate.producer',
group='service:zone_manager')
def main():
@ -56,8 +54,8 @@ def main():
LOG.warning('Starting designate-producer under the zone-manager name')
server = producer_service.Service(
threads=CONF['service:zone_manager'].threads)
server = producer_service.Service()
heartbeat = service.Heartbeat(server.service_name, server.tg)
service.serve(server, workers=CONF['service:zone_manager'].workers)
server.heartbeat_emitter.start()
heartbeat.start()
service.wait()

View File

@ -27,14 +27,6 @@ AGENT_OPTS = [
help='Number of agent worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of agent greenthreads to spawn'),
cfg.IPOpt('host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='Agent Bind Host'),
cfg.PortOpt('port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='Agent Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_AGENT_PORT],
help='Agent host:port pairs to listen on'),

View File

@ -33,14 +33,6 @@ API_OPTS = [
'the hostname, port, and any paths that are added'
'to the base of Designate is URLs,'
'For example http://dns.openstack.example.com/dns'),
cfg.IPOpt('api_host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='API Bind Host'),
cfg.PortOpt('api_port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='API Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:9001'],
help='API host:port pairs to listen on'),

View File

@ -26,14 +26,6 @@ MDNS_OPTS = [
help='Number of mdns worker processes to spawn'),
cfg.IntOpt('threads', default=1000,
help='Number of mdns greenthreads to spawn'),
cfg.IPOpt('host',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='mDNS Bind Host'),
cfg.PortOpt('port',
deprecated_for_removal=True,
deprecated_reason="Replaced by 'listen' option",
help='mDNS Port Number'),
cfg.ListOpt('listen',
default=['0.0.0.0:%d' % DEFAULT_MDNS_PORT],
help='mDNS host:port pairs to listen on'),

View File

@ -35,21 +35,31 @@ def _retry_if_tooz_error(exception):
return isinstance(exception, tooz.coordination.ToozError)
class CoordinationMixin(object):
def __init__(self, *args, **kwargs):
super(CoordinationMixin, self).__init__(*args, **kwargs)
class Coordination(object):
def __init__(self, name, tg):
self.name = name
self.tg = tg
self.coordination_id = None
self._coordinator = None
self._started = False
@property
def coordinator(self):
return self._coordinator
@property
def started(self):
return self._started
def start(self):
self._coordination_id = ":".join([CONF.host, generate_uuid()])
self.coordination_id = ":".join([CONF.host, generate_uuid()])
if CONF.coordination.backend_url is not None:
backend_url = CONF.coordination.backend_url
self._coordinator = tooz.coordination.get_coordinator(
backend_url, self._coordination_id.encode())
self._coordination_started = False
backend_url, self.coordination_id.encode())
self._started = False
self.tg.add_timer(CONF.coordination.heartbeat_interval,
self._coordinator_heartbeat)
@ -61,25 +71,22 @@ class CoordinationMixin(object):
"coordination functionality will be disabled. "
"Please configure a coordination backend.")
super(CoordinationMixin, self).start()
if self._coordinator is not None:
while not self._coordination_started:
while not self._started:
try:
self._coordinator.start()
try:
create_group_req = self._coordinator.create_group(
self.service_name)
self.name)
create_group_req.get()
except tooz.coordination.GroupAlreadyExist:
pass
join_group_req = self._coordinator.join_group(
self.service_name)
join_group_req = self._coordinator.join_group(self.name)
join_group_req.get()
self._coordination_started = True
self._started = True
except Exception:
LOG.warning("Failed to start Coordinator:", exc_info=True)
@ -87,18 +94,16 @@ class CoordinationMixin(object):
def stop(self):
if self._coordinator is not None:
self._coordination_started = False
self._started = False
leave_group_req = self._coordinator.leave_group(self.service_name)
leave_group_req = self._coordinator.leave_group(self.name)
leave_group_req.get()
self._coordinator.stop()
super(CoordinationMixin, self).stop()
self._coordinator = None
def _coordinator_heartbeat(self):
if not self._coordination_started:
if not self._started:
return
try:
@ -107,7 +112,7 @@ class CoordinationMixin(object):
LOG.exception('Error sending a heartbeat to coordination backend.')
def _coordinator_run_watchers(self):
if not self._coordination_started:
if not self._started:
return
self._coordinator.run_watchers()

View File

@ -16,10 +16,10 @@
from oslo_config import cfg
from oslo_log import log as logging
from designate import utils
from designate import dnsutils
from designate import service
from designate import storage
from designate import dnsutils
from designate import utils
from designate.mdns import handler
from designate.mdns import notify
from designate.mdns import xfr
@ -29,13 +29,38 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
class Service(service.DNSService, service.RPCService, service.Service):
class Service(service.RPCService):
_dns_default_port = DEFAULT_MDNS_PORT
def __init__(self):
self._storage = None
super(Service, self).__init__(
self.service_name, cfg.CONF['service:mdns'].topic,
threads=cfg.CONF['service:mdns'].threads,
)
self.override_endpoints(
[notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
)
self.dns_service = service.DNSService(
self.dns_application, self.tg,
cfg.CONF['service:mdns'].listen,
cfg.CONF['service:mdns'].tcp_backlog,
cfg.CONF['service:mdns'].tcp_recv_timeout,
)
def start(self):
super(Service, self).start()
self.dns_service.start()
def stop(self, graceful=False):
self.dns_service.stop()
super(Service, self).stop(graceful)
@property
def storage(self):
if not hasattr(self, '_storage'):
# Get a storage connection
if not self._storage:
self._storage = storage.get_storage(
CONF['service:mdns'].storage_driver
)
@ -47,12 +72,7 @@ class Service(service.DNSService, service.RPCService, service.Service):
@property
@utils.cache_result
def _rpc_endpoints(self):
return [notify.NotifyEndpoint(self.tg), xfr.XfrEndpoint(self.tg)]
@property
@utils.cache_result
def _dns_application(self):
def dns_application(self):
# Create an instance of the RequestHandler class and wrap with
# necessary middleware.
application = handler.RequestHandler(self.storage, self.tg)

View File

@ -76,8 +76,7 @@ def _constant_retries(num_attempts, sleep_interval):
yield True
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
class Service(service.RPCService):
"""
Service side of the Pool Manager RPC API.
@ -91,8 +90,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
self._scheduler = None
self._storage = None
self._quota = None
self._pool_election = None
self._central_api = None
self._mdns_api = None
self._pool_manager_api = None
topic = '%s.%s' % (
cfg.CONF['service:pool_manager'].topic,
CONF['service:pool_manager'].pool_id
)
super(Service, self).__init__(
self.service_name, topic,
threads=cfg.CONF['service:worker'].threads,
)
self.coordination = coordination.Coordination(
self.service_name, self.tg
)
# Get a pool manager cache connection.
self.cache = cache.get_pool_manager_cache(
@ -110,8 +131,9 @@ class Service(service.RPCService, coordination.CoordinationMixin,
CONF['service:pool_manager'].periodic_sync_retry_interval
# Compute a time (seconds) by which things should have propagated
self.max_prop_time = utils.max_prop_time(self.timeout,
self.max_retries, self.retry_interval, self.delay)
self.max_prop_time = utils.max_prop_time(
self.timeout, self.max_retries, self.retry_interval, self.delay
)
def _setup_target_backends(self):
self.target_backends = {}
@ -130,17 +152,6 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def service_name(self):
return 'pool_manager'
@property
def _rpc_topic(self):
# Modify the default topic so it's pool manager instance specific.
topic = super(Service, self)._rpc_topic
topic = '%s.%s' % (topic, CONF['service:pool_manager'].pool_id)
LOG.info('Using topic %(topic)s for this pool manager instance.',
{'topic': topic})
return topic
def start(self):
# Build the Pool (and related) Object from Config
context = DesignateContext.get_admin_context()
@ -182,11 +193,13 @@ class Service(service.RPCService, coordination.CoordinationMixin,
self.target_backends[target.id].start()
super(Service, self).start()
self.coordination.start()
# Setup a Leader Election, use for ensuring certain tasks are executed
# on exactly one pool-manager instance at a time]
self._pool_election = coordination.LeaderElection(
self._coordinator, '%s:%s' % (self.service_name, self.pool.id))
self.coordination.coordinator,
'%s:%s' % (self.service_name, self.pool.id))
self._pool_election.start()
if CONF['service:pool_manager'].enable_recovery_timer:
@ -201,29 +214,30 @@ class Service(service.RPCService, coordination.CoordinationMixin,
' %(interval)s s', {'interval': interval})
self.tg.add_timer(interval, self.periodic_sync, interval)
def stop(self):
def stop(self, graceful=True):
self._pool_election.stop()
# self.coordination.stop()
super(Service, self).stop()
super(Service, self).stop(graceful)
for target in self.pool.targets:
self.target_backends[target.id].stop()
@property
def central_api(self):
if not hasattr(self, '_central_api'):
if not self._central_api:
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
@property
def mdns_api(self):
if not hasattr(self, '_mdns_adpi'):
if not self._mdns_api:
self._mdns_api = mdns_api.MdnsAPI.get_instance()
return self._mdns_api
@property
def pool_manager_api(self):
if not hasattr(self, '_pool_manager_api'):
if not self._pool_manager_api:
pool_mgr_api = pool_manager_rpcapi.PoolManagerAPI
self._pool_manager_api = pool_mgr_api.get_instance()
return self._pool_manager_api

View File

@ -31,15 +31,29 @@ CONF = cfg.CONF
NS = 'designate.periodic_tasks'
class Service(service.RPCService, coordination.CoordinationMixin,
service.Service):
class Service(service.RPCService):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self):
self._partitioner = None
self._storage = None
self._quota = None
super(Service, self).__init__(
self.service_name, cfg.CONF['service:producer'].topic,
threads=cfg.CONF['service:producer'].threads,
)
self.coordination = coordination.Coordination(
self.service_name, self.tg
)
@property
def storage(self):
if not hasattr(self, '_storage'):
if not self._storage:
# TODO(timsim): Remove this when zone_mgr goes away
storage_driver = cfg.CONF['service:zone_manager'].storage_driver
if cfg.CONF['service:producer'].storage_driver != storage_driver:
@ -49,7 +63,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
@property
def quota(self):
if not hasattr(self, '_quota'):
if not self._quota:
# Get a quota manager instance
self._quota = quota.get_quota()
return self._quota
@ -64,10 +78,12 @@ class Service(service.RPCService, coordination.CoordinationMixin,
def start(self):
super(Service, self).start()
self.coordination.start()
self._partitioner = coordination.Partitioner(
self._coordinator, self.service_name,
self._coordination_id.encode(), range(0, 4095))
self.coordination.coordinator, self.service_name,
self.coordination.coordination_id.encode(), range(0, 4095)
)
self._partitioner.start()
self._partitioner.watch_partition_change(self._rebalance)
@ -76,7 +92,7 @@ class Service(service.RPCService, coordination.CoordinationMixin,
zmgr_enabled_tasks = CONF['service:zone_manager'].enabled_tasks
producer_enabled_tasks = CONF['service:producer'].enabled_tasks
enabled = zmgr_enabled_tasks
if producer_enabled_tasks != []:
if producer_enabled_tasks:
enabled = producer_enabled_tasks
for task in tasks.PeriodicTask.get_extensions(enabled):
@ -91,6 +107,10 @@ class Service(service.RPCService, coordination.CoordinationMixin,
interval = CONF[task.get_canonical_name()].interval
self.tg.add_timer(interval, task)
def stop(self, graceful=True):
super(Service, self).stop(graceful)
self.coordination.stop()
def _rebalance(self, my_partitions, members, event):
LOG.info("Received rebalance event %s", event)
self.partition_range = my_partitions

View File

@ -17,241 +17,161 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import errno
import socket
import struct
import errno
import six
import eventlet.wsgi
import eventlet.debug
from oslo_config import cfg
import oslo_messaging as messaging
from oslo_log import log as logging
import oslo_messaging as messaging
from oslo_service import service
from oslo_service import sslutils
from oslo_service import wsgi
from oslo_utils import netutils
import designate.conf
from designate.i18n import _
from designate.metrics import metrics
from designate import policy
from designate import rpc
from designate import service_status
from designate import version
from designate import utils
# TODO(kiall): These options have been cut+paste from the old WSGI code, and
# should be moved into service:api etc..
from designate import version
import designate.conf
from designate.i18n import _
from designate.metrics import metrics
CONF = designate.conf.CONF
LOG = logging.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Service(service.Service):
"""
Service class to be shared among the diverse service inside of Designate.
"""
def __init__(self, threads=None):
def __init__(self, name, threads=None):
threads = threads or 1000
super(Service, self).__init__(threads)
self._host = CONF.host
self._service_config = CONF['service:%s' % self.service_name]
self.name = name
self.host = CONF.host
policy.init()
# NOTE(kiall): All services need RPC initialized, as this is used
# for clients AND servers. Hence, this is common to
# all Designate services.
if not rpc.initialized():
rpc.init(CONF)
@abc.abstractproperty
def service_name(self):
pass
def start(self):
super(Service, self).start()
LOG.info('Starting %(name)s service (version: %(version)s)',
{
'name': self.service_name,
'name': self.name,
'version': version.version_info.version_string()
})
super(Service, self).start()
def stop(self):
LOG.info('Stopping %(name)s service', {'name': self.service_name})
super(Service, self).stop()
def _get_listen_on_addresses(self, default_port):
"""
Helper Method to handle migration from singular host/port to
multiple binds
"""
try:
# The API service uses "api_host", and "api_port", others use
# just host and port.
host = self._service_config.api_host
port = self._service_config.api_port
except cfg.NoSuchOptError:
host = self._service_config.host
port = self._service_config.port
if host or port is not None:
LOG.warning("host and port config options used, the 'listen' "
"option has been ignored")
host = host or "0.0.0.0"
# "port" might be 0 to pick a free port, usually during testing
port = default_port if port is None else port
return [(host, port)]
else:
return map(
netutils.parse_host_port,
set(self._service_config.listen)
)
def stop(self, graceful=True):
LOG.info('Stopping %(name)s service', {'name': self.name})
super(Service, self).stop(graceful)
class RPCService(object):
"""
RPC Service mixin used by all Designate RPC Services
"""
def __init__(self, *args, **kwargs):
super(RPCService, self).__init__(*args, **kwargs)
class Heartbeat(object):
def __init__(self, name, tg, rpc_api=None):
self.name = name
self.tg = tg
LOG.debug("Creating RPC Server on topic '%s'", self._rpc_topic)
self._rpc_server = rpc.get_server(
messaging.Target(topic=self._rpc_topic, server=self._host),
self._rpc_endpoints)
self._status = 'UP'
self._stats = {}
self._capabilities = {}
emitter_cls = service_status.HeartBeatEmitter.get_driver(
CONF.heartbeat_emitter.emitter_type
)
self.heartbeat_emitter = emitter_cls(
self.service_name, self.tg, status_factory=self._get_status
self.name, self.tg,
status_factory=self.get_status, rpc_api=rpc_api
)
def _get_status(self):
status = "UP"
stats = {}
capabilities = {}
return status, stats, capabilities
@property
def _rpc_endpoints(self):
return [self]
@property
def _rpc_topic(self):
return CONF['service:%s' % self.service_name].topic
def get_status(self):
return self._status, self._stats, self._capabilities
def start(self):
super(RPCService, self).start()
LOG.debug("Starting RPC server on topic '%s'", self._rpc_topic)
self._rpc_server.start()
# TODO(kiall): This probably belongs somewhere else, maybe the base
# Service class?
self.notifier = rpc.get_notifier(self.service_name)
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'start'):
e.start()
self.heartbeat_emitter.start()
def stop(self):
LOG.debug("Stopping RPC server on topic '%s'", self._rpc_topic)
self.heartbeat_emitter.stop()
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'stop'):
e.stop()
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self._rpc_server.stop()
except Exception:
pass
class RPCService(Service):
def __init__(self, name, rpc_topic, threads=None):
super(RPCService, self).__init__(name, threads)
LOG.debug("Creating RPC Server on topic '%s' for %s",
rpc_topic, self.name)
super(RPCService, self).stop()
self.endpoints = [self]
self.notifier = None
self.rpc_server = None
self.rpc_topic = rpc_topic
def override_endpoints(self, endpoints):
self.endpoints = endpoints
def start(self):
super(RPCService, self).start()
target = messaging.Target(topic=self.rpc_topic, server=self.host)
self.rpc_server = rpc.get_server(target, self.endpoints)
self.rpc_server.start()
self.notifier = rpc.get_notifier(self.name)
def stop(self, graceful=True):
if self.rpc_server:
self.rpc_server.stop()
super(RPCService, self).stop(graceful)
def wait(self):
for e in self._rpc_endpoints:
if e != self and hasattr(e, 'wait'):
e.wait()
super(RPCService, self).wait()
@six.add_metaclass(abc.ABCMeta)
class WSGIService(object):
"""
WSGI Service mixin used by all Designate WSGI Services
"""
def __init__(self, *args, **kwargs):
super(WSGIService, self).__init__(*args, **kwargs)
class WSGIService(Service):
def __init__(self, app, name, listen, max_url_len=None):
super(WSGIService, self).__init__(name)
self.app = app
self.name = name
self._use_ssl = sslutils.is_enabled(CONF)
self._wsgi_socks = []
self.listen = listen
@abc.abstractproperty
def _wsgi_application(self):
pass
self.servers = []
for address in self.listen:
host, port = netutils.parse_host_port(address)
server = wsgi.Server(
CONF, name, app,
host=host,
port=port,
pool_size=CONF['service:api'].threads,
use_ssl=sslutils.is_enabled(CONF),
max_url_len=max_url_len
)
self.servers.append(server)
def start(self):
for server in self.servers:
server.start()
super(WSGIService, self).start()
addresses = self._get_listen_on_addresses(9001)
def stop(self, graceful=True):
for server in self.servers:
server.stop()
super(WSGIService, self).stop(graceful)
for address in addresses:
self._start(address[0], address[1])
def _start(self, host, port):
wsgi_sock = utils.bind_tcp(
host, port, CONF.backlog, CONF.tcp_keepidle)
if self._use_ssl:
wsgi_sock = sslutils.wrap(CONF, wsgi_sock)
self._wsgi_socks.append(wsgi_sock)
self.tg.add_thread(self._wsgi_handle, wsgi_sock)
def _wsgi_handle(self, wsgi_sock):
logger = logging.getLogger('eventlet.wsgi')
# Adjust wsgi MAX_HEADER_LINE to accept large tokens.
eventlet.wsgi.MAX_HEADER_LINE = self._service_config.max_header_line
eventlet.wsgi.server(wsgi_sock,
self._wsgi_application,
custom_pool=self.tg.pool,
log=logger)
def wait(self):
for server in self.servers:
server.wait()
super(WSGIService, self).wait()
@six.add_metaclass(abc.ABCMeta)
class DNSService(object):
"""
DNS Service mixin used by all Designate DNS Services
"""
_TCP_RECV_MAX_SIZE = 65535
def __init__(self, *args, **kwargs):
super(DNSService, self).__init__(*args, **kwargs)
def __init__(self, app, tg, listen, tcp_backlog, tcp_recv_timeout):
self.app = app
self.tg = tg
self.tcp_backlog = tcp_backlog
self.tcp_recv_timeout = tcp_recv_timeout
self.listen = listen
metrics.init()
# Eventet will complain loudly about our use of multiple greentheads
@ -261,21 +181,18 @@ class DNSService(object):
self._dns_socks_tcp = []
self._dns_socks_udp = []
@abc.abstractproperty
def _dns_application(self):
pass
def start(self):
super(DNSService, self).start()
addresses = self._get_listen_on_addresses(self._dns_default_port)
addresses = map(
netutils.parse_host_port,
set(self.listen)
)
for address in addresses:
self._start(address[0], address[1])
def _start(self, host, port):
sock_tcp = utils.bind_tcp(
host, port, self._service_config.tcp_backlog)
host, port, self.tcp_backlog)
sock_udp = utils.bind_udp(
host, port)
@ -286,14 +203,7 @@ class DNSService(object):
self.tg.add_thread(self._dns_handle_tcp, sock_tcp)
self.tg.add_thread(self._dns_handle_udp, sock_udp)
def wait(self):
super(DNSService, self).wait()
def stop(self):
# When the service is stopped, the threads for _handle_tcp and
# _handle_udp are stopped too.
super(DNSService, self).stop()
for sock_tcp in self._dns_socks_tcp:
sock_tcp.close()
@ -301,7 +211,7 @@ class DNSService(object):
sock_udp.close()
def _dns_handle_tcp(self, sock_tcp):
LOG.info("_handle_tcp thread started")
LOG.info('_handle_tcp thread started')
client = None
while True:
@ -309,13 +219,13 @@ class DNSService(object):
# handle a new TCP connection
client, addr = sock_tcp.accept()
if self._service_config.tcp_recv_timeout:
client.settimeout(self._service_config.tcp_recv_timeout)
if self.tcp_recv_timeout:
client.settimeout(self.tcp_recv_timeout)
LOG.debug("Handling TCP Request from: %(host)s:%(port)d",
LOG.debug('Handling TCP Request from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
if len(addr) == 4:
LOG.debug("Flow info: %(host)s scope: %(port)d",
LOG.debug('Flow info: %(host)s scope: %(port)d',
{'host': addr[2], 'port': addr[3]})
# Dispatch a thread to handle the connection
@ -327,21 +237,21 @@ class DNSService(object):
except socket.timeout:
if client:
client.close()
LOG.warning("TCP Timeout from: %(host)s:%(port)d",
LOG.warning('TCP Timeout from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
except socket.error as e:
if client:
client.close()
errname = errno.errorcode[e.args[0]]
LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
if client:
client.close()
LOG.exception("Unknown exception handling TCP request from: "
"%(host)s:%(port)d",
LOG.exception('Unknown exception handling TCP request from: '
'%(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
def _dns_handle_tcp_conn(self, addr, client):
@ -369,7 +279,7 @@ class DNSService(object):
expected_length_raw = client.recv(2)
if len(expected_length_raw) == 0:
break
(expected_length, ) = struct.unpack('!H', expected_length_raw)
(expected_length,) = struct.unpack('!H', expected_length_raw)
# Keep receiving data until we've got all the data we expect
# The buffer contains only one query at a time
@ -385,7 +295,7 @@ class DNSService(object):
query = buf
# Call into the DNS Application itself with payload and addr
for response in self._dns_application(
for response in self.app(
{'payload': query, 'addr': addr}):
# Send back a response only if present
@ -398,20 +308,20 @@ class DNSService(object):
client.sendall(tcp_response)
except socket.timeout:
LOG.info("TCP Timeout from: %(host)s:%(port)d",
LOG.info('TCP Timeout from: %(host)s:%(port)d',
{'host': host, 'port': port})
except socket.error as e:
errname = errno.errorcode[e.args[0]]
LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': host, 'port': port, 'err': errname})
except struct.error:
LOG.warning("Invalid packet from: %(host)s:%(port)d",
LOG.warning('Invalid packet from: %(host)s:%(port)d',
{'host': host, 'port': port})
except Exception:
LOG.exception("Unknown exception handling TCP request from: "
LOG.exception('Unknown exception handling TCP request from: '
"%(host)s:%(port)d", {'host': host, 'port': port})
finally:
if client:
@ -424,7 +334,7 @@ class DNSService(object):
:type sock_udp: socket
:raises: None
"""
LOG.info("_handle_udp thread started")
LOG.info('_handle_udp thread started')
while True:
try:
@ -432,8 +342,8 @@ class DNSService(object):
# UDP recvfrom.
payload, addr = sock_udp.recvfrom(8192)
LOG.debug("Handling UDP Request from: %(host)s:%(port)d",
{'host': addr[0], 'port': addr[1]})
LOG.debug('Handling UDP Request from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
# Dispatch a thread to handle the query
self.tg.add_thread(self._dns_handle_udp_query, sock_udp, addr,
@ -441,12 +351,12 @@ class DNSService(object):
except socket.error as e:
errname = errno.errorcode[e.args[0]]
LOG.warning("Socket error %(err)s from: %(host)s:%(port)d",
LOG.warning('Socket error %(err)s from: %(host)s:%(port)d',
{'host': addr[0], 'port': addr[1], 'err': errname})
except Exception:
LOG.exception("Unknown exception handling UDP request from: "
"%(host)s:%(port)d",
LOG.exception('Unknown exception handling UDP request from: '
'%(host)s:%(port)d',
{'host': addr[0], 'port': addr[1]})
def _dns_handle_udp_query(self, sock, addr, payload):
@ -463,7 +373,7 @@ class DNSService(object):
"""
try:
# Call into the DNS Application itself with the payload and addr
for response in self._dns_application(
for response in self.app(
{'payload': payload, 'addr': addr}):
# Send back a response only if present
@ -471,7 +381,7 @@ class DNSService(object):
sock.sendto(response, addr)
except Exception:
LOG.exception("Unhandled exception while processing request from "
LOG.exception('Unhandled exception while processing request from '
"%(host)s:%(port)d",
{'host': addr[0], 'port': addr[1]})

View File

@ -31,14 +31,15 @@ class HeartBeatEmitter(plugin.DriverPlugin):
__plugin_ns__ = 'designate.heartbeat_emitter'
__plugin_type__ = 'heartbeat_emitter'
def __init__(self, service, threadgroup, status_factory=None):
def __init__(self, service, thread_group, status_factory=None,
*args, **kwargs):
super(HeartBeatEmitter, self).__init__()
self._service = service
self._hostname = CONF.host
self._running = False
self._tg = threadgroup
self._tg = thread_group
self._tg.add_timer(
CONF.heartbeat_emitter.heartbeat_interval,
self._emit_heartbeat)

View File

@ -27,8 +27,10 @@ LOG = logging.getLogger(__name__)
class Service(service.Service):
def __init__(self, threads=None):
super(Service, self).__init__(threads=threads)
def __init__(self):
super(Service, self).__init__(
self.service_name, threads=cfg.CONF['service:sink'].threads
)
# Initialize extensions
self.handlers = self._init_extensions()
@ -38,7 +40,8 @@ class Service(service.Service):
def service_name(self):
return 'sink'
def _init_extensions(self):
@staticmethod
def _init_extensions():
"""Loads and prepares all enabled extensions"""
enabled_notification_handlers = \
@ -75,7 +78,7 @@ class Service(service.Service):
if targets:
self._server.start()
def stop(self):
def stop(self, graceful=True):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
@ -83,7 +86,7 @@ class Service(service.Service):
except Exception:
pass
super(Service, self).stop()
super(Service, self).stop(graceful)
def _get_targets(self):
"""

View File

@ -21,8 +21,7 @@ class ApiServiceTest(ApiTestCase):
def setUp(self):
super(ApiServiceTest, self).setUp()
# Use a random port for the API
self.config(api_port=0, group='service:api')
self.config(listen=['0.0.0.0:0'], group='service:api')
self.service = service.Service()

View File

@ -27,45 +27,57 @@ cfg.CONF.register_opts([
], group="service:dummy")
class CoordinatedService(coordination.CoordinationMixin, service.Service):
class CoordinatedService(service.Service):
def __init__(self):
super(CoordinatedService, self).__init__()
self.coordination = coordination.Coordination(
self.service_name, self.tg
)
def start(self):
super(CoordinatedService, self).start()
self.coordination.start()
@property
def service_name(self):
return "dummy"
class TestCoordinationMixin(TestCase):
class TestCoordination(TestCase):
def setUp(self):
super(TestCoordinationMixin, self).setUp()
super(TestCoordination, self).setUp()
self.name = 'coordination'
self.tg = mock.Mock()
self.config(backend_url="zake://", group="coordination")
def test_start(self):
service = CoordinatedService()
service = coordination.Coordination(self.name, self.tg)
service.start()
self.assertTrue(service._coordination_started)
self.assertIn(service.service_name.encode('utf-8'),
service._coordinator.get_groups().get())
self.assertIn(service._coordination_id.encode('utf-8'),
service._coordinator.get_members(
service.service_name).get())
self.assertTrue(service.started)
self.assertIn(self.name.encode('utf-8'),
service.coordinator.get_groups().get())
self.assertIn(service.coordination_id.encode('utf-8'),
service.coordinator.get_members(
self.name.encode('utf-8')).get())
service.stop()
def test_stop(self):
service = CoordinatedService()
service = coordination.Coordination(self.name, self.tg)
service.start()
service.stop()
self.assertFalse(service._coordination_started)
self.assertFalse(service.started)
def test_start_no_coordination(self):
self.config(backend_url=None, group="coordination")
service = CoordinatedService()
service = coordination.Coordination(self.name, self.tg)
service.start()
self.assertIsNone(service._coordinator)
self.assertIsNone(service.coordinator)
def test_stop_no_coordination(self):
self.config(backend_url=None, group="coordination")
service = CoordinatedService()
self.assertIsNone(service._coordinator)
service = coordination.Coordination(self.name, self.tg)
self.assertIsNone(service.coordinator)
service.start()
service.stop()

View File

@ -34,7 +34,6 @@ def hex_wire(response):
class MdnsServiceTest(MdnsTestCase):
# DNS packet with IQUERY opcode
query_payload = binascii.a2b_hex(
"271209000001000000000000076578616d706c6503636f6d0000010001"
@ -42,6 +41,7 @@ class MdnsServiceTest(MdnsTestCase):
expected_response = binascii.a2b_hex(
b"271289050001000000000000076578616d706c6503636f6d0000010001"
)
# expected response is an error code REFUSED. The other fields are
# id 10002
# opcode IQUERY
@ -58,10 +58,10 @@ class MdnsServiceTest(MdnsTestCase):
def setUp(self):
super(MdnsServiceTest, self).setUp()
# Use a random port for MDNS
self.config(port=0, group='service:mdns')
self.config(listen=['0.0.0.0:0'], group='service:mdns')
self.service = self.start_service('mdns')
self.dns_service = self.service.dns_service
self.addr = ['0.0.0.0', 5556]
@staticmethod
@ -77,14 +77,14 @@ class MdnsServiceTest(MdnsTestCase):
@mock.patch.object(dns.message, 'make_query')
def test_handle_empty_payload(self, query_mock):
mock_socket = mock.Mock()
self.service._dns_handle_udp_query(mock_socket, self.addr,
' '.encode('utf-8'))
self.dns_service._dns_handle_udp_query(mock_socket, self.addr,
' '.encode('utf-8'))
query_mock.assert_called_once_with('unknown', dns.rdatatype.A)
def test_handle_udp_payload(self):
mock_socket = mock.Mock()
self.service._dns_handle_udp_query(mock_socket, self.addr,
self.query_payload)
self.dns_service._dns_handle_udp_query(mock_socket, self.addr,
self.query_payload)
mock_socket.sendto.assert_called_once_with(self.expected_response,
self.addr)
@ -93,7 +93,7 @@ class MdnsServiceTest(MdnsTestCase):
mock_socket = mock.Mock()
mock_socket.recv.side_effect = ['X', 'boo'] # X will fail unpack
self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(1, mock_socket.recv.call_count)
self.assertEqual(1, mock_socket.close.call_count)
@ -103,14 +103,14 @@ class MdnsServiceTest(MdnsTestCase):
pay_len = struct.pack("!H", len(payload))
mock_socket.recv.side_effect = [pay_len, payload, socket.timeout]
self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(3, mock_socket.recv.call_count)
self.assertEqual(1, mock_socket.sendall.call_count)
self.assertEqual(1, mock_socket.close.call_count)
wire = mock_socket.sendall.call_args[0][0]
expected_length_raw = wire[:2]
(expected_length, ) = struct.unpack('!H', expected_length_raw)
(expected_length,) = struct.unpack('!H', expected_length_raw)
self.assertEqual(len(wire), expected_length + 2)
self.assertEqual(self.expected_response, wire[2:])
@ -130,7 +130,7 @@ class MdnsServiceTest(MdnsTestCase):
pay_len, payload,
pay_len, payload,
]
self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(11, mock_socket.recv.call_count)
self.assertEqual(5, mock_socket.sendall.call_count)
@ -152,7 +152,7 @@ class MdnsServiceTest(MdnsTestCase):
pay_len, payload,
pay_len, payload,
]
self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(11, mock_socket.recv.call_count)
self.assertEqual(5, mock_socket.sendall.call_count)
@ -171,7 +171,7 @@ class MdnsServiceTest(MdnsTestCase):
pay_len, payload,
pay_len, payload,
]
self.service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.dns_service._dns_handle_tcp_conn(('1.2.3.4', 42), mock_socket)
self.assertEqual(11, mock_socket.recv.call_count)
self.assertEqual(4, mock_socket.sendall.call_count)

View File

@ -27,7 +27,7 @@ class Bind9AgentBackendTestCase(designate.tests.TestCase):
def setUp(self):
super(Bind9AgentBackendTestCase, self).setUp()
self.CONF.set_override('port', 0, 'service:agent')
self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.backend = impl_bind9.Bind9Backend('foo')

View File

@ -28,7 +28,7 @@ class DenominatorAgentBackendTestCase(designate.tests.TestCase):
def setUp(self):
super(DenominatorAgentBackendTestCase, self).setUp()
self.CONF.set_override('port', 0, 'service:agent')
self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.backend = impl_denominator.DenominatorBackend('foo')

View File

@ -22,7 +22,7 @@ class FakeAgentBackendTestCase(designate.tests.TestCase):
def setUp(self):
super(FakeAgentBackendTestCase, self).setUp()
self.CONF.set_override('port', 0, 'service:agent')
self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.backend = impl_fake.FakeBackend('foo')

View File

@ -21,30 +21,40 @@ from designate import utils
from designate.agent import service
from designate.backend import agent_backend
from designate.backend.agent_backend import impl_fake
from designate.tests import fixtures
class AgentServiceTest(designate.tests.TestCase):
def setUp(self):
super(AgentServiceTest, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.CONF.set_override('port', 0, 'service:agent')
self.CONF.set_override('listen', ['0.0.0.0:0'], 'service:agent')
self.CONF.set_override('notify_delay', 0, 'service:agent')
self.service = service.Service()
self.service._start = mock.Mock()
self.service._rpc_server = mock.Mock()
self.service.dns_service._start = mock.Mock()
def test_service_start(self):
self.service.start()
self.assertTrue(self.service.dns_service._start.called)
def test_service_stop(self):
self.service.dns_service.stop = mock.Mock()
self.service.backend.stop = mock.Mock()
self.service.stop()
self.assertTrue(self.service.dns_service.stop.called)
self.assertTrue(self.service.backend.stop.called)
self.assertIn('Stopping agent service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('agent', self.service.service_name)
def test_start(self):
self.service.start()
self.assertTrue(self.service._start.called)
def test_stop(self):
self.service.stop()
def test_get_backend(self):
backend = agent_backend.get_backend('fake', agent_service=self.service)
self.assertIsInstance(backend, impl_fake.FakeBackend)
@ -52,17 +62,15 @@ class AgentServiceTest(designate.tests.TestCase):
@mock.patch.object(utils, 'cache_result')
def test_get_dns_application(self, mock_cache_result):
self.assertIsInstance(
self.service._dns_application, dnsutils.SerializationMiddleware
self.service.dns_application, dnsutils.SerializationMiddleware
)
@mock.patch.object(utils, 'cache_result')
def test_get_dns_application_with_notify_delay(self, mock_cache_result):
self.service = service.Service()
self.service._start = mock.Mock()
self.service._rpc_server = mock.Mock()
self.CONF.set_override('notify_delay', 1.0, 'service:agent')
self.assertIsInstance(
self.service._dns_application, dnsutils.SerializationMiddleware
self.service.dns_application, dnsutils.SerializationMiddleware
)

View File

@ -21,27 +21,42 @@ from oslo_config import fixture as cfg_fixture
import designate.dnsutils
import designate.rpc
import designate.service
import designate.storage.base
from designate import storage
import designate.utils
from designate.mdns import handler
from designate.mdns import service
from designate.tests import fixtures
CONF = cfg.CONF
class MdnsServiceTest(oslotest.base.BaseTestCase):
@mock.patch.object(designate.rpc, 'get_server')
def setUp(self, mock_rpc_server):
@mock.patch.object(storage, 'get_storage', mock.Mock())
def setUp(self):
super(MdnsServiceTest, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))
self.service = service.Service()
@mock.patch.object(designate.service.DNSService, '_start')
def test_service_start(self, mock_service_start):
@mock.patch.object(designate.service.DNSService, 'start')
@mock.patch.object(designate.service.RPCService, 'start')
def test_service_start(self, mock_rpc_start, mock_dns_start):
self.service.start()
self.assertTrue(mock_service_start.called)
self.assertTrue(mock_dns_start.called)
self.assertTrue(mock_rpc_start.called)
def test_service_stop(self):
self.service.dns_service.stop = mock.Mock()
self.service.stop()
self.assertTrue(self.service.dns_service.stop.called)
self.assertIn('Stopping mdns service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('mdns', self.service.service_name)
@ -51,17 +66,13 @@ class MdnsServiceTest(oslotest.base.BaseTestCase):
self.service = service.Service()
self.assertEqual('test-topic', self.service._rpc_topic)
self.assertEqual('test-topic', self.service.rpc_topic)
self.assertEqual('mdns', self.service.service_name)
def test_rpc_endpoints(self):
endpoints = self.service._rpc_endpoints
self.assertIsInstance(endpoints[0], service.notify.NotifyEndpoint)
self.assertIsInstance(endpoints[1], service.xfr.XfrEndpoint)
@mock.patch.object(designate.storage.base.Storage, 'get_driver')
@mock.patch.object(storage, 'get_storage')
def test_storage_driver(self, mock_get_driver):
self.service._storage = None
mock_driver = mock.MagicMock()
mock_driver.name = 'noop_driver'
mock_get_driver.return_value = mock_driver
@ -70,16 +81,12 @@ class MdnsServiceTest(oslotest.base.BaseTestCase):
self.assertTrue(mock_get_driver.called)
@mock.patch.object(handler, 'RequestHandler', name='reqh')
@mock.patch.object(designate.service.DNSService, '_start')
@mock.patch.object(handler, 'RequestHandler')
@mock.patch.object(designate.service.DNSService, 'start')
@mock.patch.object(designate.utils, 'cache_result')
@mock.patch.object(designate.storage.base.Storage, 'get_driver')
def test_dns_application(self, mock_req_handler, mock_cache_result,
mock_service_start, mock_get_driver):
mock_driver = mock.MagicMock()
mock_driver.name = 'noop_driver'
mock_get_driver.return_value = mock_driver
def test_dns_application(self, mock_cache_result, mock_dns_start,
mock_request_handler):
app = self.service._dns_application
app = self.service.dns_application
self.assertIsInstance(app, designate.dnsutils.DNSMiddleware)

View File

@ -102,7 +102,7 @@ class PoolManagerInitTest(tests.TestCase):
self.service = service.Service()
self.assertEqual('test-topic.794ccc2c-d751-44fe-b57f-8894c9f5c842',
self.service._rpc_topic)
self.service.rpc_topic)
self.assertEqual('pool_manager', self.service.service_name)
@mock.patch('designate.service.RPCService.start')

View File

@ -19,18 +19,20 @@ Unit-test Producer service
"""
import mock
import oslotest.base
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
from oslotest import base as test
from designate.producer import service
import designate.service
from designate.tests import fixtures
from designate.tests.unit import RoObject
CONF = cfg.CONF
@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance')
class ProducerTest(test.BaseTestCase):
@mock.patch.object(service.rpcapi.CentralAPI, 'get_instance', mock.Mock())
class ProducerTest(oslotest.base.BaseTestCase):
def setUp(self):
self.useFixture(cfg_fixture.Config(CONF))
@ -46,28 +48,45 @@ class ProducerTest(test.BaseTestCase):
'producer_task:zone_purge': '',
})
super(ProducerTest, self).setUp()
self.service = service.Service()
self.service._storage = mock.Mock()
self.service._rpc_server = mock.Mock()
self.service._quota = mock.Mock()
self.service.quota.limit_check = mock.Mock()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
def test_service_name(self, _):
self.service = service.Service()
self.service.rpc_server = mock.Mock()
self.service._storage = mock.Mock()
self.service._quota = mock.Mock()
self.service._quota.limit_check = mock.Mock()
@mock.patch.object(service.tasks, 'PeriodicTask')
@mock.patch.object(service.coordination, 'Partitioner')
@mock.patch.object(designate.service.RPCService, 'start')
def test_service_start(self, mock_rpc_start, mock_partitioner,
mock_periodic_task):
self.service.coordination = mock.Mock()
self.service.start()
self.assertTrue(mock_rpc_start.called)
def test_service_stop(self):
self.service.coordination.stop = mock.Mock()
self.service.stop()
self.assertTrue(self.service.coordination.stop.called)
self.assertIn('Stopping producer service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('producer', self.service.service_name)
def test_producer_rpc_topic(self, _):
def test_producer_rpc_topic(self):
CONF.set_override('topic', 'test-topic', 'service:producer')
self.service = service.Service()
self.assertEqual('test-topic', self.service._rpc_topic)
self.assertEqual('test-topic', self.service.rpc_topic)
self.assertEqual('producer', self.service.service_name)
def test_central_api(self, _):
capi = self.service.central_api
self.assertIsInstance(capi, mock.MagicMock)
@mock.patch.object(service.tasks, 'PeriodicTask')
@mock.patch.object(service.coordination, 'Partitioner')
def test_stark(self, _, mock_partitioner, mock_PeriodicTask):
self.service.start()
def test_central_api(self):
self.assertIsInstance(self.service.central_api, mock.Mock)

View File

@ -11,14 +11,13 @@
# under the License.mport threading
import mock
import designate.tests
import designate.rpc
from designate import tests
from designate.sink import service
from designate.tests import fixtures
class TestSinkService(tests.TestCase):
class TestSinkService(designate.tests.TestCase):
def setUp(self):
super(TestSinkService, self).setUp()
self.stdlog = fixtures.StandardLogging()
@ -35,8 +34,7 @@ class TestSinkService(tests.TestCase):
self.assertTrue(mock_notification_listener.called)
@mock.patch.object(designate.rpc, 'get_notification_listener')
def test_service_stop(self, mock_notification_listener):
def test_service_stop(self):
self.service.stop()
self.assertIn('Stopping sink service', self.stdlog.logger.output)

View File

@ -0,0 +1,53 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import oslotest.base
from oslo_config import cfg
from oslo_config import fixture as cfg_fixture
from designate import service
CONF = cfg.CONF
class HeartbeatTest(oslotest.base.BaseTestCase):
def setUp(self):
super(HeartbeatTest, self).setUp()
self.useFixture(cfg_fixture.Config(CONF))
CONF.set_override('emitter_type', 'noop', 'heartbeat_emitter')
self.mock_tg = mock.Mock()
self.heartbeat = service.Heartbeat('test', self.mock_tg)
def test_get_status(self):
self.assertEqual(('UP', {}, {},), self.heartbeat.get_status())
def test_get_heartbeat_emitter(self):
self.assertEqual(
'noop', self.heartbeat.heartbeat_emitter.__plugin_name__
)
def test_start_heartbeat(self):
self.assertFalse(self.heartbeat.heartbeat_emitter._running)
self.heartbeat.start()
self.assertTrue(self.heartbeat.heartbeat_emitter._running)
def test_stop_heartbeat(self):
self.assertFalse(self.heartbeat.heartbeat_emitter._running)
self.heartbeat.start()
self.heartbeat.stop()
self.assertFalse(self.heartbeat.heartbeat_emitter._running)

View File

@ -24,6 +24,7 @@ import designate.tests
from designate import backend
from designate import exceptions
from designate import objects
from designate.tests import fixtures
from designate.worker import processing
from designate.worker import service
@ -33,6 +34,8 @@ CONF = cfg.CONF
class TestService(oslotest.base.BaseTestCase):
def setUp(self):
super(TestService, self).setUp()
self.stdlog = fixtures.StandardLogging()
self.useFixture(self.stdlog)
self.useFixture(cfg_fixture.Config(CONF))
self.context = mock.Mock()
@ -40,10 +43,16 @@ class TestService(oslotest.base.BaseTestCase):
self.service = service.Service()
@mock.patch.object(designate.service.RPCService, 'start')
def test_service_start(self, mock_service_start):
def test_service_start(self, mock_rpc_start):
self.service.start()
self.assertTrue(mock_service_start.called)
self.assertTrue(mock_rpc_start.called)
@mock.patch.object(designate.rpc, 'get_notification_listener')
def test_service_stop(self, mock_notification_listener):
self.service.stop()
self.assertIn('Stopping worker service', self.stdlog.logger.output)
def test_service_name(self):
self.assertEqual('worker', self.service.service_name)
@ -53,7 +62,7 @@ class TestService(oslotest.base.BaseTestCase):
self.service = service.Service()
self.assertEqual('test-topic', self.service._rpc_topic)
self.assertEqual('test-topic', self.service.rpc_topic)
self.assertEqual('worker', self.service.service_name)
def test_central_api(self):

View File

@ -41,25 +41,38 @@ class AlsoNotifyTask(object):
pass
class Service(service.RPCService, service.Service):
class Service(service.RPCService):
RPC_API_VERSION = '1.0'
target = messaging.Target(version=RPC_API_VERSION)
def __init__(self):
self._central_api = None
self._storage = None
self._executor = None
self._pools_map = None
super(Service, self).__init__(
self.service_name, cfg.CONF['service:worker'].topic,
threads=cfg.CONF['service:worker'].threads,
)
@property
def central_api(self):
if not hasattr(self, '_central_api'):
if not self._central_api:
self._central_api = central_api.CentralAPI.get_instance()
return self._central_api
def _setup_target_backends(self, pool):
@staticmethod
def _setup_target_backends(pool):
for target in pool.targets:
# Fetch an instance of the Backend class
target.backend = backend.get_backend(target)
LOG.info('%d targets setup', len(pool.targets))
if len(pool.targets) == 0:
if not pool.targets:
raise exceptions.NoPoolTargetsConfigured()
return pool
@ -97,21 +110,21 @@ class Service(service.RPCService, service.Service):
@property
def storage(self):
if not hasattr(self, '_storage'):
if not self._storage:
storage_driver = cfg.CONF['service:worker'].storage_driver
self._storage = storage.get_storage(storage_driver)
return self._storage
@property
def executor(self):
if not hasattr(self, '_executor'):
if not self._executor:
# TODO(elarson): Create this based on config
self._executor = processing.Executor()
return self._executor
@property
def pools_map(self):
if not hasattr(self, '_pools_map'):
if self._pools_map is None:
self._pools_map = {}
return self._pools_map
@ -125,6 +138,9 @@ class Service(service.RPCService, service.Service):
super(Service, self).start()
LOG.info('Started worker')
def stop(self, graceful=True):
super(Service, self).stop(graceful)
def _do_zone_action(self, context, zone):
pool = self.get_pool(zone.pool_id)
all_tasks = []

View File

@ -8,6 +8,7 @@ namespace = oslo.policy
namespace = oslo.service.periodic_task
namespace = oslo.service.service
namespace = oslo.service.sslutils
namespace = oslo.service.wsgi
namespace = oslo.db
namespace = oslo.middleware
namespace = oslo.concurrency

View File

@ -0,0 +1,18 @@
---
upgrade:
- |
The previously deprecated options ``api_host``, ``api_port``, ``host`` and
``port`` have been permanently removed and are replaced by ``listen``.
e.g.
.. code-block:: ini
[service:api]
listen = 0.0.0.0:9001
..
- |
The Designate ``sink`` service will now use the heartbeat reporting system to
report its status. This was already the case for all other Designate
services.