From b4aa98bf3217258c9b2054b6ad42aed67e449567 Mon Sep 17 00:00:00 2001 From: Kiall Mac Innes Date: Sun, 3 Feb 2013 17:46:46 +0000 Subject: [PATCH] Separate notification handling from the central service Change-Id: Iee0be50dfa3c56b70279580b7af243842df96d74 --- bin/moniker-sink | 33 +++++ etc/moniker/moniker.conf.sample | 12 +- moniker/central/service.py | 96 +------------- moniker/notification_handler/base.py | 27 ++-- moniker/notification_handler/nova.py | 2 +- moniker/notification_handler/quantum.py | 2 +- moniker/sink/__init__.py | 27 ++++ moniker/sink/service.py | 121 ++++++++++++++++++ moniker/tests/__init__.py | 5 + .../test_notification_handler/test_nova.py | 2 +- .../test_notification_handler/test_quantum.py | 2 +- moniker/tests/test_sink/__init__.py | 20 +++ moniker/tests/test_sink/test_service.py | 33 +++++ setup.py | 7 +- tox.ini | 2 +- 15 files changed, 269 insertions(+), 122 deletions(-) create mode 100755 bin/moniker-sink create mode 100644 moniker/sink/__init__.py create mode 100644 moniker/sink/service.py create mode 100644 moniker/tests/test_sink/__init__.py create mode 100644 moniker/tests/test_sink/test_service.py diff --git a/bin/moniker-sink b/bin/moniker-sink new file mode 100755 index 000000000..1a87fc7db --- /dev/null +++ b/bin/moniker-sink @@ -0,0 +1,33 @@ +#!/usr/bin/env python +# Copyright 2012 Managed I.T. +# +# Author: Kiall Mac Innes +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import sys +import eventlet +from moniker.openstack.common import cfg +from moniker.openstack.common import log as logging +from moniker.openstack.common import service +from moniker import utils +from moniker.sink import service as sink_service + +eventlet.monkey_patch() + +utils.read_config('moniker', sys.argv) + +logging.setup('moniker') + +launcher = service.launch(sink_service.Service(), + cfg.CONF['service:sink'].workers) +launcher.wait() diff --git a/etc/moniker/moniker.conf.sample b/etc/moniker/moniker.conf.sample index bdba21dea..8374bf080 100644 --- a/etc/moniker/moniker.conf.sample +++ b/etc/moniker/moniker.conf.sample @@ -36,10 +36,6 @@ default_log_levels = amqplib=WARN, sqlalchemy=WARN, boto=WARN, suds=INFO, keysto # Driver used for backend communication (e.g. rpc, bind9, powerdns) #backend_driver = rpc -# List of notification handlers to enable, configuration of these needs to -# correspond to a [handler:my_driver] section below or else in the config -#enabled_notification_handlers = nova_fixed - # List of blacklist domain name regexes #domain_name_blacklist = arpa.$, ^com.$, ^net.$, ^org.$, novalocal.$ @@ -63,6 +59,14 @@ default_log_levels = amqplib=WARN, sqlalchemy=WARN, boto=WARN, suds=INFO, keysto # Driver used for backend communication (e.g. bind9, powerdns) #backend_driver = bind9 +#----------------------- +# Sink Service +#----------------------- +[service:sink] +# List of notification handlers to enable, configuration of these needs to +# correspond to a [handler:my_driver] section below or else in the config +#enabled_notification_handlers = nova_fixed + ######################## ## Storage Configuration ######################## diff --git a/moniker/central/service.py b/moniker/central/service.py index c38acaa0e..9c85fde3f 100644 --- a/moniker/central/service.py +++ b/moniker/central/service.py @@ -16,9 +16,7 @@ import re from moniker.openstack.common import cfg from moniker.openstack.common import log as logging -from moniker.openstack.common import rpc from moniker.openstack.common.rpc import service as rpc_service -from stevedore.named import NamedExtensionManager from moniker import exceptions from moniker import policy from moniker import storage @@ -27,12 +25,9 @@ from moniker import backend LOG = logging.getLogger(__name__) -HANDLER_NAMESPACE = 'moniker.notification.handler' - class Service(rpc_service.Service): def __init__(self, *args, **kwargs): - backend_driver = cfg.CONF['service:central'].backend_driver self.backend = backend.get_backend(backend_driver, central_service=self) @@ -49,103 +44,16 @@ class Service(rpc_service.Service): # Get a storage connection self.storage = storage.get_storage() - # Initialize extensions - self.handlers = self._init_extensions() - - if self.handlers: - # Get a rpc connection if needed - self.rpc_conn = rpc.create_connection() - - def _init_extensions(self): - """ Loads and prepares all enabled extensions """ - enabled_notification_handlers = \ - cfg.CONF['service:central'].enabled_notification_handlers - - self.extensions_manager = NamedExtensionManager( - HANDLER_NAMESPACE, names=enabled_notification_handlers) - - def _load_extension(ext): - handler_cls = ext.plugin - return handler_cls(central_service=self) - - try: - return self.extensions_manager.map(_load_extension) - except RuntimeError: - # No handlers enabled. No problem. - return [] - def start(self): self.backend.start() + super(Service, self).start() - if self.handlers: - # Setup notification subscriptions and start consuming - self._setup_subscriptions() - self.rpc_conn.consume_in_thread_group(self.tg) - def stop(self): - if self.handlers: - # Try to shut the connection down, but if we get any sort of - # errors, go ahead and ignore them.. as we're shutting down anyway - try: - self.rpc_conn.close() - except Exception: - pass - super(Service, self).stop() + self.backend.stop() - def _setup_subscriptions(self): - """ - Set's up subscriptions for the various exchange+topic combinations that - we have a handler for. - """ - for handler in self.handlers: - exchange, topics = handler.get_exchange_topics() - - for topic in topics: - queue_name = "moniker.notifications.%s.%s.%s" % ( - handler.get_canonical_name(), exchange, topic) - - self.rpc_conn.declare_topic_consumer( - queue_name=queue_name, - topic=topic, - exchange_name=exchange, - callback=self._process_notification) - - def _get_handler_event_types(self): - event_types = set() - for handler in self.handlers: - for et in handler.get_event_types(): - event_types.add(et) - return event_types - - def _process_notification(self, notification): - """ - Processes an incoming notification, offering each extension the - opportunity to handle it. - """ - event_type = notification.get('event_type') - - # NOTE(zykes): Only bother to actually do processing if there's any - # matching events, skips logging of things like compute.exists etc. - if event_type in self._get_handler_event_types(): - for handler in self.handlers: - self._process_notification_for_handler(handler, notification) - - def _process_notification_for_handler(self, handler, notification): - """ - Processes an incoming notification for a specific handler, checking - to see if the handler is interested in the notification before - handing it over. - """ - event_type = notification['event_type'] - payload = notification['payload'] - - if event_type in handler.get_event_types(): - LOG.debug('Found handler for: %s' % event_type) - handler.process_notification(event_type, payload) - def _is_blacklisted_domain_name(self, context, domain_name): """ Ensures the provided domain_name is not blacklisted. diff --git a/moniker/notification_handler/base.py b/moniker/notification_handler/base.py index a60549a5d..1ce6f5956 100644 --- a/moniker/notification_handler/base.py +++ b/moniker/notification_handler/base.py @@ -17,6 +17,7 @@ import abc from moniker.openstack.common import cfg from moniker.openstack.common import log as logging +from moniker.central import api as central_api from moniker.context import MonikerContext from moniker.plugin import Plugin @@ -46,11 +47,6 @@ class Handler(Plugin): __plugin_ns__ = 'moniker.notification.handler' __plugin_type__ = 'handler' - def __init__(self, central_service): - super(Handler, self).__init__() - LOG.debug('Loaded handler: %s' % __name__) - self.central_service = central_service - @abc.abstractmethod def get_exchange_topics(self): """ @@ -73,7 +69,7 @@ class Handler(Plugin): Return the domain for this context """ context = MonikerContext.get_admin_context() - return self.central_service.get_domain(context, domain_id) + return central_api.get_domain(context, domain_id) class BaseAddressHandler(Handler): @@ -94,7 +90,9 @@ class BaseAddressHandler(Handler): :param resource_type: The managed resource type :param resource_id: The managed resource ID """ + LOG.debug('Using DomainID: %s' % cfg.CONF[self.name].domain_id) domain = self.get_domain(cfg.CONF[self.name].domain_id) + LOG.debug('Domain: %r' % domain) data = extra.copy() data['domain'] = domain['name'] @@ -117,8 +115,7 @@ class BaseAddressHandler(Handler): 'managed_plugin_type': self.get_plugin_type(), 'managed_resource_type': resource_type, 'managed_resource_id': resource_id}) - self.central_service.create_record(context, domain['id'], - record_values) + central_api.create_record(context, domain['id'], record_values) def _delete(self, managed=True, resource_id=None, resource_type='instance', criterion={}): @@ -138,14 +135,12 @@ class BaseAddressHandler(Handler): 'managed_resource_type': resource_type }) - records = self.central_service.get_records( - context, - cfg.CONF[self.name].domain_id, - criterion) + records = central_api.get_records(context, + cfg.CONF[self.name].domain_id, + criterion) for record in records: LOG.debug('Deleting record %s' % record['id']) - self.central_service.delete_record( - context, - cfg.CONF[self.name].domain_id, - record['id']) + + central_api.delete_record(context, cfg.CONF[self.name].domain_id, + record['id']) diff --git a/moniker/notification_handler/nova.py b/moniker/notification_handler/nova.py index fc2ed00fb..7790d2038 100644 --- a/moniker/notification_handler/nova.py +++ b/moniker/notification_handler/nova.py @@ -27,7 +27,7 @@ cfg.CONF.register_group(cfg.OptGroup( cfg.CONF.register_opts([ cfg.ListOpt('notification-topics', default=['monitor']), cfg.StrOpt('control-exchange', default='nova'), - cfg.StrOpt('domain_id', default=None), + cfg.StrOpt('domain-id', default=None), cfg.StrOpt('format', default=None) ], group='handler:nova_fixed') diff --git a/moniker/notification_handler/quantum.py b/moniker/notification_handler/quantum.py index f2fe99f65..8d924fc32 100644 --- a/moniker/notification_handler/quantum.py +++ b/moniker/notification_handler/quantum.py @@ -27,7 +27,7 @@ cfg.CONF.register_group(cfg.OptGroup( cfg.CONF.register_opts([ cfg.ListOpt('notification-topics', default=['monitor']), cfg.StrOpt('control-exchange', default='quantum'), - cfg.StrOpt('domain_id', default=None), + cfg.StrOpt('domain-id', default=None), cfg.StrOpt('format', default=None) ], group='handler:quantum_floatingip') diff --git a/moniker/sink/__init__.py b/moniker/sink/__init__.py new file mode 100644 index 000000000..016d4ef48 --- /dev/null +++ b/moniker/sink/__init__.py @@ -0,0 +1,27 @@ +# Copyright 2012 Hewlett-Packard Development Company, L.P. All Rights Reserved. +# +# Author: Kiall Mac Innes +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from moniker.openstack.common import cfg + +cfg.CONF.register_group(cfg.OptGroup( + name='service:sink', title="Configuration for Sink Service" +)) + +cfg.CONF.register_opts([ + cfg.IntOpt('workers', default=None, + help='Number of worker processes to spawn'), + cfg.ListOpt('enabled-notification-handlers', default=[], + help='Enabled Notification Handlers'), +], group='service:sink') diff --git a/moniker/sink/service.py b/moniker/sink/service.py new file mode 100644 index 000000000..acca435b1 --- /dev/null +++ b/moniker/sink/service.py @@ -0,0 +1,121 @@ +# Copyright 2012 Managed I.T. +# +# Author: Kiall Mac Innes +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from moniker.openstack.common import cfg +from moniker.openstack.common import log as logging +from moniker.openstack.common import rpc +from moniker.openstack.common import service +from stevedore.named import NamedExtensionManager + +LOG = logging.getLogger(__name__) + +HANDLER_NAMESPACE = 'moniker.notification.handler' + + +class Service(service.Service): + def __init__(self, *args, **kwargs): + super(Service, self).__init__(*args, **kwargs) + + # Initialize extensions + self.handlers = self._init_extensions() + + # Get a rpc connection + self.rpc_conn = rpc.create_connection() + + def _init_extensions(self): + """ Loads and prepares all enabled extensions """ + enabled_notification_handlers = \ + cfg.CONF['service:sink'].enabled_notification_handlers + + self.extensions_manager = NamedExtensionManager( + HANDLER_NAMESPACE, names=enabled_notification_handlers) + + def _load_extension(ext): + handler_cls = ext.plugin + return handler_cls() + + try: + return self.extensions_manager.map(_load_extension) + except RuntimeError: + # No handlers enabled. No problem. + return [] + + def start(self): + super(Service, self).start() + + # Setup notification subscriptions and start consuming + self._setup_subscriptions() + self.rpc_conn.consume_in_thread_group(self.tg) + + def stop(self): + # Try to shut the connection down, but if we get any sort of + # errors, go ahead and ignore them.. as we're shutting down anyway + try: + self.rpc_conn.close() + except Exception: + pass + + super(Service, self).stop() + + def _setup_subscriptions(self): + """ + Set's up subscriptions for the various exchange+topic combinations that + we have a handler for. + """ + for handler in self.handlers: + exchange, topics = handler.get_exchange_topics() + + for topic in topics: + queue_name = "moniker.notifications.%s.%s.%s" % ( + handler.get_canonical_name(), exchange, topic) + + self.rpc_conn.declare_topic_consumer( + queue_name=queue_name, + topic=topic, + exchange_name=exchange, + callback=self._process_notification) + + def _get_handler_event_types(self): + event_types = set() + for handler in self.handlers: + for et in handler.get_event_types(): + event_types.add(et) + return event_types + + def _process_notification(self, notification): + """ + Processes an incoming notification, offering each extension the + opportunity to handle it. + """ + event_type = notification.get('event_type') + + # NOTE(zykes): Only bother to actually do processing if there's any + # matching events, skips logging of things like compute.exists etc. + if event_type in self._get_handler_event_types(): + for handler in self.handlers: + self._process_notification_for_handler(handler, notification) + + def _process_notification_for_handler(self, handler, notification): + """ + Processes an incoming notification for a specific handler, checking + to see if the handler is interested in the notification before + handing it over. + """ + event_type = notification['event_type'] + payload = notification['payload'] + + if event_type in handler.get_event_types(): + LOG.debug('Found handler for: %s' % event_type) + handler.process_notification(event_type, payload) diff --git a/moniker/tests/__init__.py b/moniker/tests/__init__.py index 830b176dc..21a061720 100644 --- a/moniker/tests/__init__.py +++ b/moniker/tests/__init__.py @@ -25,6 +25,7 @@ from moniker import exceptions from moniker.agent import service as agent_service from moniker.api import service as api_service from moniker.central import service as central_service +from moniker.sink import service as sink_service LOG = logging.getLogger(__name__) @@ -179,6 +180,9 @@ class TestCase(unittest2.TestCase, AssertMixin): def get_central_service(self): return central_service.Service() + def get_sink_service(self): + return sink_service.Service() + # Context Methods def get_context(self, **kwargs): return MonikerContext(**kwargs) @@ -228,6 +232,7 @@ class TestCase(unittest2.TestCase, AssertMixin): return self.central_service.create_tsigkey(context, values=values) def create_domain(self, **kwargs): + LOG.critical('*************CREATE DOMAIN CALLED*************') context = kwargs.pop('context', self.get_admin_context()) fixture = kwargs.pop('fixture', 0) diff --git a/moniker/tests/test_notification_handler/test_nova.py b/moniker/tests/test_notification_handler/test_nova.py index d60fee6ee..c1f19a1ea 100644 --- a/moniker/tests/test_notification_handler/test_nova.py +++ b/moniker/tests/test_notification_handler/test_nova.py @@ -30,7 +30,7 @@ class NovaFixedHandlerTest(NotificationHandlerTestCase): self.domain_id = domain['id'] self.config(domain_id=domain['id'], group='handler:nova_fixed') - self.plugin = NovaFixedHandler(self.central_service) + self.plugin = NovaFixedHandler() def test_instance_create_end(self): event_type = 'compute.instance.create.end' diff --git a/moniker/tests/test_notification_handler/test_quantum.py b/moniker/tests/test_notification_handler/test_quantum.py index 0c98673b1..7fe1ae957 100644 --- a/moniker/tests/test_notification_handler/test_quantum.py +++ b/moniker/tests/test_notification_handler/test_quantum.py @@ -30,7 +30,7 @@ class QuantumFloatingHandlerTest(NotificationHandlerTestCase): self.domain_id = domain['id'] self.config(domain_id=domain['id'], group='handler:quantum_floatingip') - self.plugin = QuantumFloatingHandler(self.central_service) + self.plugin = QuantumFloatingHandler() def test_floatingip_associate(self): event_type = 'floatingip.update.end' diff --git a/moniker/tests/test_sink/__init__.py b/moniker/tests/test_sink/__init__.py new file mode 100644 index 000000000..f305ea292 --- /dev/null +++ b/moniker/tests/test_sink/__init__.py @@ -0,0 +1,20 @@ +# Copyright 2012 Managed I.T. +# +# Author: Kiall Mac Innes +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from moniker.tests import TestCase + + +class SinkTestCase(TestCase): + __test__ = False diff --git a/moniker/tests/test_sink/test_service.py b/moniker/tests/test_sink/test_service.py new file mode 100644 index 000000000..439c58d0e --- /dev/null +++ b/moniker/tests/test_sink/test_service.py @@ -0,0 +1,33 @@ +# Copyright 2012 Managed I.T. +# +# Author: Kiall Mac Innes +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +from moniker.openstack.common import log as logging +from moniker.tests.test_sink import SinkTestCase + +LOG = logging.getLogger(__name__) + + +class SinkServiceTest(SinkTestCase): + __test__ = True + + def setUp(self): + super(SinkServiceTest, self).setUp() + + self.sink_service = self.get_sink_service() + + def test_start_and_stop(self): + # Ensures the start/stop actions don't raise + self.sink_service.start() + self.sink_service.stop() diff --git a/setup.py b/setup.py index ab1dd1e41..1423c62b2 100755 --- a/setup.py +++ b/setup.py @@ -48,11 +48,12 @@ setup( }, dependency_links=dependency_links, scripts=[ - 'bin/moniker-central', - 'bin/moniker-api', 'bin/moniker-agent', + 'bin/moniker-api', + 'bin/moniker-central', 'bin/moniker-manage', - 'bin/moniker-rootwrap' + 'bin/moniker-rootwrap', + 'bin/moniker-sink', ], cmdclass=common_setup.get_cmdclass(), entry_points=textwrap.dedent(""" diff --git a/tox.ini b/tox.ini index d4851c996..4de48102d 100644 --- a/tox.ini +++ b/tox.ini @@ -29,7 +29,7 @@ setenv = {[testenv]setenv} [testenv:pep8] deps = {[testenv]deps} pep8==1.3.3 -commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack moniker setup.py bin/moniker-api bin/moniker-central bin/moniker-agent bin/moniker-manage +commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack moniker setup.py bin/moniker-api bin/moniker-central bin/moniker-agent bin/moniker-sink bin/moniker-manage [testenv:pyflakes] deps = {[testenv]deps}