From c929bf36fd805b3581727211370134767516e1bd Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 29 Apr 2014 10:54:43 +0200 Subject: [PATCH] Initialize dispatcher manager in event endpoint Dispatcher Manager is not a service, so moved it into the correct namespace, this allows the event handler to initialize it correctly. Closes-bug: #1314080 Change-Id: I02792eef2d49f1c6b15d144d97c8a4a280ca150f --- ceilometer/collector.py | 6 +++-- ceilometer/dispatcher/__init__.py | 34 +++++++++++++++++++++++++ ceilometer/event/endpoint.py | 16 +++++++----- ceilometer/service.py | 24 ----------------- ceilometer/tests/event/test_endpoint.py | 9 +++---- ceilometer/tests/test_notification.py | 15 +++++++++++ 6 files changed, 66 insertions(+), 38 deletions(-) diff --git a/ceilometer/collector.py b/ceilometer/collector.py index 0949182bf..be56fdf6c 100644 --- a/ceilometer/collector.py +++ b/ceilometer/collector.py @@ -21,12 +21,12 @@ import socket import msgpack from oslo.config import cfg +from ceilometer import dispatcher from ceilometer import messaging from ceilometer.openstack.common.gettextutils import _ from ceilometer.openstack.common import log from ceilometer.openstack.common import service as os_service from ceilometer.openstack.common import units -from ceilometer import service OPTS = [ cfg.StrOpt('udp_address', @@ -46,7 +46,7 @@ cfg.CONF.import_opt('metering_topic', 'ceilometer.publisher.rpc', LOG = log.getLogger(__name__) -class CollectorService(service.DispatchedService, os_service.Service): +class CollectorService(os_service.Service): """Listener for the collector service.""" @staticmethod @@ -56,6 +56,8 @@ class CollectorService(service.DispatchedService, os_service.Service): def start(self): """Bind the UDP socket and handle incoming data.""" + # ensure dispatcher is configured before starting other services + self.dispatcher_manager = dispatcher.load_dispatcher_manager() super(CollectorService, self).start() if cfg.CONF.collector.udp_address: self.tg.add_thread(self.start_udp) diff --git a/ceilometer/dispatcher/__init__.py b/ceilometer/dispatcher/__init__.py index 13b9ca88e..ad0eb272b 100644 --- a/ceilometer/dispatcher/__init__.py +++ b/ceilometer/dispatcher/__init__.py @@ -19,6 +19,40 @@ import abc import six +from oslo.config import cfg +from stevedore import named + +from ceilometer.openstack.common.gettextutils import _ +from ceilometer.openstack.common import log + + +LOG = log.getLogger(__name__) + +OPTS = [ + cfg.MultiStrOpt('dispatcher', + deprecated_group="collector", + default=['database'], + help='Dispatcher to process data.'), +] +cfg.CONF.register_opts(OPTS) + + +DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' + + +def load_dispatcher_manager(): + LOG.debug(_('loading dispatchers from %s'), + DISPATCHER_NAMESPACE) + dispatcher_manager = named.NamedExtensionManager( + namespace=DISPATCHER_NAMESPACE, + names=cfg.CONF.dispatcher, + invoke_on_load=True, + invoke_args=[cfg.CONF]) + if not list(dispatcher_manager): + LOG.warning(_('Failed to load any dispatchers for %s'), + DISPATCHER_NAMESPACE) + return dispatcher_manager + @six.add_metaclass(abc.ABCMeta) class Base(object): diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py index 58c21db58..fa2187dc2 100644 --- a/ceilometer/event/endpoint.py +++ b/ceilometer/event/endpoint.py @@ -22,17 +22,19 @@ from oslo.config import cfg import oslo.messaging from stevedore import extension +from ceilometer import dispatcher from ceilometer.event import converter as event_converter from ceilometer import messaging from ceilometer.openstack.common.gettextutils import _ -from ceilometer import service from ceilometer.storage import models LOG = logging.getLogger(__name__) -class EventsNotificationEndpoint(service.DispatchedService): +class EventsNotificationEndpoint(object): def __init__(self): + super(EventsNotificationEndpoint, self).__init__() + self.dispatcher_manager = dispatcher.load_dispatcher_manager() LOG.debug(_('Loading event definitions')) self.event_converter = event_converter.setup_events( extension.ExtensionManager( @@ -48,9 +50,9 @@ class EventsNotificationEndpoint(service.DispatchedService): :param metadata: metadata about the notification """ - #NOTE: the rpc layer currently rips out the notification - #delivery_info, which is critical to determining the - #source of the notification. This will have to get added back later. + # NOTE: the rpc layer currently rips out the notification + # delivery_info, which is critical to determining the + # source of the notification. This will have to get added back later. notification = messaging.convert_to_old_notification_format( 'info', ctxt, publisher_id, event_type, payload, metadata) self.process_notification(notification) @@ -61,8 +63,8 @@ class EventsNotificationEndpoint(service.DispatchedService): if event is not None: LOG.debug(_('Saving event "%s"'), event.event_type) problem_events = [] - for dispatcher in self.dispatcher_manager: - problem_events.extend(dispatcher.obj.record_events(event)) + for dispatcher_ext in self.dispatcher_manager: + problem_events.extend(dispatcher_ext.obj.record_events(event)) if models.Event.UNKNOWN_PROBLEM in [x[0] for x in problem_events]: if not cfg.CONF.notification.ack_on_event_error: return oslo.messaging.NotificationResult.REQUEUE diff --git a/ceilometer/service.py b/ceilometer/service.py index 5eed5e89a..66df31b40 100644 --- a/ceilometer/service.py +++ b/ceilometer/service.py @@ -22,7 +22,6 @@ import socket import sys from oslo.config import cfg -from stevedore import named from ceilometer import messaging from ceilometer.openstack.common import gettextutils @@ -37,10 +36,6 @@ OPTS = [ help='Name of this node, which must be valid in an AMQP ' 'key. Can be an opaque identifier. For ZeroMQ only, must ' 'be a valid host name, FQDN, or IP address.'), - cfg.MultiStrOpt('dispatcher', - deprecated_group="collector", - default=['database'], - help='Dispatcher to process data.'), cfg.IntOpt('collector_workers', default=1, help='Number of workers for collector service. A single ' @@ -102,25 +97,6 @@ class WorkerException(Exception): """ -class DispatchedService(object): - - DISPATCHER_NAMESPACE = 'ceilometer.dispatcher' - - def start(self): - LOG.debug(_('loading dispatchers from %s'), - self.DISPATCHER_NAMESPACE) - self.dispatcher_manager = named.NamedExtensionManager( - namespace=self.DISPATCHER_NAMESPACE, - names=cfg.CONF.dispatcher, - invoke_on_load=True, - invoke_args=[cfg.CONF]) - if not list(self.dispatcher_manager): - LOG.warning(_('Failed to load any dispatchers for %s'), - self.DISPATCHER_NAMESPACE) - # ensure dispatcher is configured before starting other services - super(DispatchedService, self).start() - - def get_workers(name): workers = (cfg.CONF.get('%s_workers' % name) or utils.cpu_count()) diff --git a/ceilometer/tests/event/test_endpoint.py b/ceilometer/tests/event/test_endpoint.py index 0d9980d52..8ef9d5e44 100644 --- a/ceilometer/tests/event/test_endpoint.py +++ b/ceilometer/tests/event/test_endpoint.py @@ -91,16 +91,15 @@ class TestEventEndpoint(tests_base.BaseTestCase): self.CONF.set_override("connection", "log://", group='database') self.CONF.set_override("store_events", True, group="notification") - self.endpoint = event_endpoint.EventsNotificationEndpoint() - self.mock_dispatcher = mock.MagicMock() - self.endpoint.event_converter = mock.MagicMock() - self.endpoint.event_converter.to_event.return_value = mock.MagicMock( - event_type='test.test') + self.endpoint = event_endpoint.EventsNotificationEndpoint() self.endpoint.dispatcher_manager = \ extension.ExtensionManager.make_test_instance([ extension.Extension('test', None, None, self.mock_dispatcher) ]) + self.endpoint.event_converter = mock.MagicMock() + self.endpoint.event_converter.to_event.return_value = mock.MagicMock( + event_type='test.test') def test_message_to_event(self): self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', diff --git a/ceilometer/tests/test_notification.py b/ceilometer/tests/test_notification.py index 6db40bf82..b62437149 100644 --- a/ceilometer/tests/test_notification.py +++ b/ceilometer/tests/test_notification.py @@ -143,3 +143,18 @@ class TestNotification(tests_base.BaseTestCase): self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) self.assertEqual(self.fake_event_endpoint, self.srv.listeners[0].dispatcher.endpoints[0]) + + @mock.patch('ceilometer.event.converter.get_config_file', + mock.MagicMock(return_value=None)) + @mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock()) + @mock.patch.object(oslo.messaging.MessageHandlingServer, 'start', + mock.MagicMock()) + def test_event_dispatcher_loaded(self): + self.CONF.set_override("store_events", True, group="notification") + with mock.patch.object(self.srv, '_get_notifications_manager') \ + as get_nm: + get_nm.side_effect = self.fake_get_notifications_manager + self.srv.start() + self.assertEqual(2, len(self.srv.listeners[0].dispatcher.endpoints)) + event_endpoint = self.srv.listeners[0].dispatcher.endpoints[0] + self.assertEqual(1, len(list(event_endpoint.dispatcher_manager)))