Separate notification handling from the central service

Change-Id: Iee0be50dfa3c56b70279580b7af243842df96d74
This commit is contained in:
Kiall Mac Innes 2013-02-03 17:46:46 +00:00
parent 83d135a669
commit b4aa98bf32
15 changed files with 269 additions and 122 deletions

33
bin/moniker-sink Executable file
View File

@ -0,0 +1,33 @@
#!/usr/bin/env python
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import eventlet
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.openstack.common import service
from moniker import utils
from moniker.sink import service as sink_service
eventlet.monkey_patch()
utils.read_config('moniker', sys.argv)
logging.setup('moniker')
launcher = service.launch(sink_service.Service(),
cfg.CONF['service:sink'].workers)
launcher.wait()

View File

@ -36,10 +36,6 @@ default_log_levels = amqplib=WARN, sqlalchemy=WARN, boto=WARN, suds=INFO, keysto
# Driver used for backend communication (e.g. rpc, bind9, powerdns)
#backend_driver = rpc
# List of notification handlers to enable, configuration of these needs to
# correspond to a [handler:my_driver] section below or else in the config
#enabled_notification_handlers = nova_fixed
# List of blacklist domain name regexes
#domain_name_blacklist = arpa.$, ^com.$, ^net.$, ^org.$, novalocal.$
@ -63,6 +59,14 @@ default_log_levels = amqplib=WARN, sqlalchemy=WARN, boto=WARN, suds=INFO, keysto
# Driver used for backend communication (e.g. bind9, powerdns)
#backend_driver = bind9
#-----------------------
# Sink Service
#-----------------------
[service:sink]
# List of notification handlers to enable, configuration of these needs to
# correspond to a [handler:my_driver] section below or else in the config
#enabled_notification_handlers = nova_fixed
########################
## Storage Configuration
########################

View File

@ -16,9 +16,7 @@
import re
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.openstack.common import rpc
from moniker.openstack.common.rpc import service as rpc_service
from stevedore.named import NamedExtensionManager
from moniker import exceptions
from moniker import policy
from moniker import storage
@ -27,12 +25,9 @@ from moniker import backend
LOG = logging.getLogger(__name__)
HANDLER_NAMESPACE = 'moniker.notification.handler'
class Service(rpc_service.Service):
def __init__(self, *args, **kwargs):
backend_driver = cfg.CONF['service:central'].backend_driver
self.backend = backend.get_backend(backend_driver,
central_service=self)
@ -49,103 +44,16 @@ class Service(rpc_service.Service):
# Get a storage connection
self.storage = storage.get_storage()
# Initialize extensions
self.handlers = self._init_extensions()
if self.handlers:
# Get a rpc connection if needed
self.rpc_conn = rpc.create_connection()
def _init_extensions(self):
""" Loads and prepares all enabled extensions """
enabled_notification_handlers = \
cfg.CONF['service:central'].enabled_notification_handlers
self.extensions_manager = NamedExtensionManager(
HANDLER_NAMESPACE, names=enabled_notification_handlers)
def _load_extension(ext):
handler_cls = ext.plugin
return handler_cls(central_service=self)
try:
return self.extensions_manager.map(_load_extension)
except RuntimeError:
# No handlers enabled. No problem.
return []
def start(self):
self.backend.start()
super(Service, self).start()
if self.handlers:
# Setup notification subscriptions and start consuming
self._setup_subscriptions()
self.rpc_conn.consume_in_thread_group(self.tg)
def stop(self):
if self.handlers:
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpc_conn.close()
except Exception:
pass
super(Service, self).stop()
self.backend.stop()
def _setup_subscriptions(self):
"""
Set's up subscriptions for the various exchange+topic combinations that
we have a handler for.
"""
for handler in self.handlers:
exchange, topics = handler.get_exchange_topics()
for topic in topics:
queue_name = "moniker.notifications.%s.%s.%s" % (
handler.get_canonical_name(), exchange, topic)
self.rpc_conn.declare_topic_consumer(
queue_name=queue_name,
topic=topic,
exchange_name=exchange,
callback=self._process_notification)
def _get_handler_event_types(self):
event_types = set()
for handler in self.handlers:
for et in handler.get_event_types():
event_types.add(et)
return event_types
def _process_notification(self, notification):
"""
Processes an incoming notification, offering each extension the
opportunity to handle it.
"""
event_type = notification.get('event_type')
# NOTE(zykes): Only bother to actually do processing if there's any
# matching events, skips logging of things like compute.exists etc.
if event_type in self._get_handler_event_types():
for handler in self.handlers:
self._process_notification_for_handler(handler, notification)
def _process_notification_for_handler(self, handler, notification):
"""
Processes an incoming notification for a specific handler, checking
to see if the handler is interested in the notification before
handing it over.
"""
event_type = notification['event_type']
payload = notification['payload']
if event_type in handler.get_event_types():
LOG.debug('Found handler for: %s' % event_type)
handler.process_notification(event_type, payload)
def _is_blacklisted_domain_name(self, context, domain_name):
"""
Ensures the provided domain_name is not blacklisted.

View File

@ -17,6 +17,7 @@
import abc
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.central import api as central_api
from moniker.context import MonikerContext
from moniker.plugin import Plugin
@ -46,11 +47,6 @@ class Handler(Plugin):
__plugin_ns__ = 'moniker.notification.handler'
__plugin_type__ = 'handler'
def __init__(self, central_service):
super(Handler, self).__init__()
LOG.debug('Loaded handler: %s' % __name__)
self.central_service = central_service
@abc.abstractmethod
def get_exchange_topics(self):
"""
@ -73,7 +69,7 @@ class Handler(Plugin):
Return the domain for this context
"""
context = MonikerContext.get_admin_context()
return self.central_service.get_domain(context, domain_id)
return central_api.get_domain(context, domain_id)
class BaseAddressHandler(Handler):
@ -94,7 +90,9 @@ class BaseAddressHandler(Handler):
:param resource_type: The managed resource type
:param resource_id: The managed resource ID
"""
LOG.debug('Using DomainID: %s' % cfg.CONF[self.name].domain_id)
domain = self.get_domain(cfg.CONF[self.name].domain_id)
LOG.debug('Domain: %r' % domain)
data = extra.copy()
data['domain'] = domain['name']
@ -117,8 +115,7 @@ class BaseAddressHandler(Handler):
'managed_plugin_type': self.get_plugin_type(),
'managed_resource_type': resource_type,
'managed_resource_id': resource_id})
self.central_service.create_record(context, domain['id'],
record_values)
central_api.create_record(context, domain['id'], record_values)
def _delete(self, managed=True, resource_id=None, resource_type='instance',
criterion={}):
@ -138,14 +135,12 @@ class BaseAddressHandler(Handler):
'managed_resource_type': resource_type
})
records = self.central_service.get_records(
context,
records = central_api.get_records(context,
cfg.CONF[self.name].domain_id,
criterion)
for record in records:
LOG.debug('Deleting record %s' % record['id'])
self.central_service.delete_record(
context,
cfg.CONF[self.name].domain_id,
central_api.delete_record(context, cfg.CONF[self.name].domain_id,
record['id'])

View File

@ -27,7 +27,7 @@ cfg.CONF.register_group(cfg.OptGroup(
cfg.CONF.register_opts([
cfg.ListOpt('notification-topics', default=['monitor']),
cfg.StrOpt('control-exchange', default='nova'),
cfg.StrOpt('domain_id', default=None),
cfg.StrOpt('domain-id', default=None),
cfg.StrOpt('format', default=None)
], group='handler:nova_fixed')

View File

@ -27,7 +27,7 @@ cfg.CONF.register_group(cfg.OptGroup(
cfg.CONF.register_opts([
cfg.ListOpt('notification-topics', default=['monitor']),
cfg.StrOpt('control-exchange', default='quantum'),
cfg.StrOpt('domain_id', default=None),
cfg.StrOpt('domain-id', default=None),
cfg.StrOpt('format', default=None)
], group='handler:quantum_floatingip')

27
moniker/sink/__init__.py Normal file
View File

@ -0,0 +1,27 @@
# Copyright 2012 Hewlett-Packard Development Company, L.P. All Rights Reserved.
#
# Author: Kiall Mac Innes <kiall@hp.com>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import cfg
cfg.CONF.register_group(cfg.OptGroup(
name='service:sink', title="Configuration for Sink Service"
))
cfg.CONF.register_opts([
cfg.IntOpt('workers', default=None,
help='Number of worker processes to spawn'),
cfg.ListOpt('enabled-notification-handlers', default=[],
help='Enabled Notification Handlers'),
], group='service:sink')

121
moniker/sink/service.py Normal file
View File

@ -0,0 +1,121 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import cfg
from moniker.openstack.common import log as logging
from moniker.openstack.common import rpc
from moniker.openstack.common import service
from stevedore.named import NamedExtensionManager
LOG = logging.getLogger(__name__)
HANDLER_NAMESPACE = 'moniker.notification.handler'
class Service(service.Service):
def __init__(self, *args, **kwargs):
super(Service, self).__init__(*args, **kwargs)
# Initialize extensions
self.handlers = self._init_extensions()
# Get a rpc connection
self.rpc_conn = rpc.create_connection()
def _init_extensions(self):
""" Loads and prepares all enabled extensions """
enabled_notification_handlers = \
cfg.CONF['service:sink'].enabled_notification_handlers
self.extensions_manager = NamedExtensionManager(
HANDLER_NAMESPACE, names=enabled_notification_handlers)
def _load_extension(ext):
handler_cls = ext.plugin
return handler_cls()
try:
return self.extensions_manager.map(_load_extension)
except RuntimeError:
# No handlers enabled. No problem.
return []
def start(self):
super(Service, self).start()
# Setup notification subscriptions and start consuming
self._setup_subscriptions()
self.rpc_conn.consume_in_thread_group(self.tg)
def stop(self):
# Try to shut the connection down, but if we get any sort of
# errors, go ahead and ignore them.. as we're shutting down anyway
try:
self.rpc_conn.close()
except Exception:
pass
super(Service, self).stop()
def _setup_subscriptions(self):
"""
Set's up subscriptions for the various exchange+topic combinations that
we have a handler for.
"""
for handler in self.handlers:
exchange, topics = handler.get_exchange_topics()
for topic in topics:
queue_name = "moniker.notifications.%s.%s.%s" % (
handler.get_canonical_name(), exchange, topic)
self.rpc_conn.declare_topic_consumer(
queue_name=queue_name,
topic=topic,
exchange_name=exchange,
callback=self._process_notification)
def _get_handler_event_types(self):
event_types = set()
for handler in self.handlers:
for et in handler.get_event_types():
event_types.add(et)
return event_types
def _process_notification(self, notification):
"""
Processes an incoming notification, offering each extension the
opportunity to handle it.
"""
event_type = notification.get('event_type')
# NOTE(zykes): Only bother to actually do processing if there's any
# matching events, skips logging of things like compute.exists etc.
if event_type in self._get_handler_event_types():
for handler in self.handlers:
self._process_notification_for_handler(handler, notification)
def _process_notification_for_handler(self, handler, notification):
"""
Processes an incoming notification for a specific handler, checking
to see if the handler is interested in the notification before
handing it over.
"""
event_type = notification['event_type']
payload = notification['payload']
if event_type in handler.get_event_types():
LOG.debug('Found handler for: %s' % event_type)
handler.process_notification(event_type, payload)

View File

@ -25,6 +25,7 @@ from moniker import exceptions
from moniker.agent import service as agent_service
from moniker.api import service as api_service
from moniker.central import service as central_service
from moniker.sink import service as sink_service
LOG = logging.getLogger(__name__)
@ -179,6 +180,9 @@ class TestCase(unittest2.TestCase, AssertMixin):
def get_central_service(self):
return central_service.Service()
def get_sink_service(self):
return sink_service.Service()
# Context Methods
def get_context(self, **kwargs):
return MonikerContext(**kwargs)
@ -228,6 +232,7 @@ class TestCase(unittest2.TestCase, AssertMixin):
return self.central_service.create_tsigkey(context, values=values)
def create_domain(self, **kwargs):
LOG.critical('*************CREATE DOMAIN CALLED*************')
context = kwargs.pop('context', self.get_admin_context())
fixture = kwargs.pop('fixture', 0)

View File

@ -30,7 +30,7 @@ class NovaFixedHandlerTest(NotificationHandlerTestCase):
self.domain_id = domain['id']
self.config(domain_id=domain['id'], group='handler:nova_fixed')
self.plugin = NovaFixedHandler(self.central_service)
self.plugin = NovaFixedHandler()
def test_instance_create_end(self):
event_type = 'compute.instance.create.end'

View File

@ -30,7 +30,7 @@ class QuantumFloatingHandlerTest(NotificationHandlerTestCase):
self.domain_id = domain['id']
self.config(domain_id=domain['id'], group='handler:quantum_floatingip')
self.plugin = QuantumFloatingHandler(self.central_service)
self.plugin = QuantumFloatingHandler()
def test_floatingip_associate(self):
event_type = 'floatingip.update.end'

View File

@ -0,0 +1,20 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.tests import TestCase
class SinkTestCase(TestCase):
__test__ = False

View File

@ -0,0 +1,33 @@
# Copyright 2012 Managed I.T.
#
# Author: Kiall Mac Innes <kiall@managedit.ie>
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from moniker.openstack.common import log as logging
from moniker.tests.test_sink import SinkTestCase
LOG = logging.getLogger(__name__)
class SinkServiceTest(SinkTestCase):
__test__ = True
def setUp(self):
super(SinkServiceTest, self).setUp()
self.sink_service = self.get_sink_service()
def test_start_and_stop(self):
# Ensures the start/stop actions don't raise
self.sink_service.start()
self.sink_service.stop()

View File

@ -48,11 +48,12 @@ setup(
},
dependency_links=dependency_links,
scripts=[
'bin/moniker-central',
'bin/moniker-api',
'bin/moniker-agent',
'bin/moniker-api',
'bin/moniker-central',
'bin/moniker-manage',
'bin/moniker-rootwrap'
'bin/moniker-rootwrap',
'bin/moniker-sink',
],
cmdclass=common_setup.get_cmdclass(),
entry_points=textwrap.dedent("""

View File

@ -29,7 +29,7 @@ setenv = {[testenv]setenv}
[testenv:pep8]
deps = {[testenv]deps}
pep8==1.3.3
commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack moniker setup.py bin/moniker-api bin/moniker-central bin/moniker-agent bin/moniker-manage
commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,openstack moniker setup.py bin/moniker-api bin/moniker-central bin/moniker-agent bin/moniker-sink bin/moniker-manage
[testenv:pyflakes]
deps = {[testenv]deps}