From 231a08296e5b5701e5115da6b2dcfa6d60c6734d Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Fri, 27 Nov 2015 14:26:46 +0100 Subject: [PATCH] notification: Use oslo.messaging batch listener This change uses the batch oslo.messaging listener for the notification-agent but configures it have the same behavior as the legacy one. Change-Id: I460ab9dc387eb56108b5bceb7415c3ee6538bc54 Depends-On: I16184da24b8661aff7f4fba6196ecf33165f1a77 --- ceilometer/agent/plugin_base.py | 40 ++++++------ ceilometer/event/endpoint.py | 62 ++++++++----------- ceilometer/messaging.py | 17 +++-- ceilometer/notification.py | 20 ++++-- ceilometer/pipeline.py | 13 ++-- .../tests/functional/test_notification.py | 27 +++++--- ceilometer/tests/unit/agent/test_plugin.py | 17 ++--- ceilometer/tests/unit/event/test_endpoint.py | 34 ++++++---- ceilometer/tests/unit/test_event_pipeline.py | 5 +- 9 files changed, 131 insertions(+), 104 deletions(-) diff --git a/ceilometer/agent/plugin_base.py b/ceilometer/agent/plugin_base.py index cc6a100635..7b54e92c8c 100644 --- a/ceilometer/agent/plugin_base.py +++ b/ceilometer/agent/plugin_base.py @@ -19,12 +19,16 @@ import abc import collections from oslo_context import context +from oslo_log import log import oslo_messaging import six from stevedore import extension +from ceilometer.i18n import _LE from ceilometer import messaging +LOG = log.getLogger(__name__) + ExchangeTopics = collections.namedtuple('ExchangeTopics', ['exchange', 'topics']) @@ -74,39 +78,35 @@ class NotificationBase(PluginBase): :param message: Message to process. """ - def info(self, ctxt, publisher_id, event_type, payload, metadata): + def info(self, notifications): """RPC endpoint for notification messages at info level When another service sends a notification over the message bus, this method receives it. - :param ctxt: oslo.messaging context - :param publisher_id: publisher of the notification - :param event_type: type of notification - :param payload: notification payload - :param metadata: metadata about the notification - + :param notifications: list of notifications """ - notification = messaging.convert_to_old_notification_format( - 'info', ctxt, publisher_id, event_type, payload, metadata) - self.to_samples_and_publish(context.get_admin_context(), notification) + self._process_notifications('info', notifications) - def sample(self, ctxt, publisher_id, event_type, payload, metadata): + def sample(self, notifications): """RPC endpoint for notification messages at sample level When another service sends a notification over the message bus at sample priority, this method receives it. - :param ctxt: oslo.messaging context - :param publisher_id: publisher of the notification - :param event_type: type of notification - :param payload: notification payload - :param metadata: metadata about the notification - + :param notifications: list of notifications """ - notification = messaging.convert_to_old_notification_format( - 'sample', ctxt, publisher_id, event_type, payload, metadata) - self.to_samples_and_publish(context.get_admin_context(), notification) + self._process_notifications('sample', notifications) + + def _process_notifications(self, priority, notifications): + for notification in notifications: + try: + notification = messaging.convert_to_old_notification_format( + priority, notification) + self.to_samples_and_publish(context.get_admin_context(), + notification) + except Exception: + LOG.error(_LE('Fail to process notification'), exc_info=True) def to_samples_and_publish(self, context, notification): """Return samples produced by *process_notification*. diff --git a/ceilometer/event/endpoint.py b/ceilometer/event/endpoint.py index cbe8cf913f..911706980a 100644 --- a/ceilometer/event/endpoint.py +++ b/ceilometer/event/endpoint.py @@ -20,6 +20,7 @@ import oslo_messaging from stevedore import extension from ceilometer.event import converter as event_converter +from ceilometer.i18n import _LE from ceilometer import messaging LOG = logging.getLogger(__name__) @@ -35,48 +36,35 @@ class EventsNotificationEndpoint(object): namespace='ceilometer.event.trait_plugin')) self.manager = manager - def info(self, ctxt, publisher_id, event_type, payload, metadata): + def info(self, notifications): """Convert message at info level to Ceilometer Event. - :param ctxt: oslo_messaging context - :param publisher_id: publisher of the notification - :param event_type: type of notification - :param payload: notification payload - :param metadata: metadata about the notification + :param notifications: list of notifications """ + return self.process_notification('info', notifications) - # NOTE: the rpc layer currently rips out the notification - # delivery_info, which is critical to determining the - # source of the notification. This will have to get added back later. - notification = messaging.convert_to_old_notification_format( - 'info', ctxt, publisher_id, event_type, payload, metadata) - return self.process_notification(notification) + def error(self, notifications): + """Convert message at error level to Ceilometer Event. - def error(self, ctxt, publisher_id, event_type, payload, metadata): - """Convert error message to Ceilometer Event. - - :param ctxt: oslo_messaging context - :param publisher_id: publisher of the notification - :param event_type: type of notification - :param payload: notification payload - :param metadata: metadata about the notification + :param notifications: list of notifications """ + return self.process_notification('error', notifications) - # NOTE: the rpc layer currently rips out the notification - # delivery_info, which is critical to determining the - # source of the notification. This will have to get added back later. - notification = messaging.convert_to_old_notification_format( - 'error', ctxt, publisher_id, event_type, payload, metadata) - return self.process_notification(notification) - - def process_notification(self, notification): - try: - event = self.event_converter.to_event(notification) - if event is not None: - with self.manager.publisher(self.ctxt) as p: - p(event) - except Exception: - if not cfg.CONF.notification.ack_on_event_error: - return oslo_messaging.NotificationResult.REQUEUE - raise + def process_notification(self, priority, notifications): + for notification in notifications: + # NOTE: the rpc layer currently rips out the notification + # delivery_info, which is critical to determining the + # source of the notification. This will have to get added back + # later. + notification = messaging.convert_to_old_notification_format( + priority, notification) + try: + event = self.event_converter.to_event(notification) + if event is not None: + with self.manager.publisher(self.ctxt) as p: + p(event) + except Exception: + if not cfg.CONF.notification.ack_on_event_error: + return oslo_messaging.NotificationResult.REQUEUE + LOG.error(_LE('Fail to process a notification'), exc_info=True) return oslo_messaging.NotificationResult.HANDLED diff --git a/ceilometer/messaging.py b/ceilometer/messaging.py index d7580db414..f1504fcf8c 100644 --- a/ceilometer/messaging.py +++ b/ceilometer/messaging.py @@ -105,16 +105,15 @@ def get_notifier(transport, publisher_id): return notifier.prepare(publisher_id=publisher_id) -def convert_to_old_notification_format(priority, ctxt, publisher_id, - event_type, payload, metadata): +def convert_to_old_notification_format(priority, notification): # FIXME(sileht): temporary convert notification to old format # to focus on oslo_messaging migration before refactoring the code to # use the new oslo_messaging facilities - notification = {'priority': priority, - 'payload': payload, - 'event_type': event_type, - 'publisher_id': publisher_id} - notification.update(metadata) - for k in ctxt: - notification['_context_' + k] = ctxt[k] + notification = notification.copy() + notification['priority'] = priority + notification.update(notification["metadata"]) + for k in notification['ctxt']: + notification['_context_' + k] = notification['ctxt'][k] + del notification['ctxt'] + del notification['metadata'] return notification diff --git a/ceilometer/notification.py b/ceilometer/notification.py index d3fd176468..5de404ff48 100644 --- a/ceilometer/notification.py +++ b/ceilometer/notification.py @@ -66,6 +66,14 @@ OPTS = [ "Example: transport://user:pass@host1:port" "[,hostN:portN]/virtual_host " "(DEFAULT/transport_url is used if empty)"), + cfg.IntOpt('batch_size', + default=1, + help='Number of notification messages to wait before ' + 'publishing them'), + cfg.IntOpt('batch_timeout', + default=None, + help='Number of seconds to wait before publishing samples' + 'when batch_size is not reached (None means indefinitely)'), ] cfg.CONF.register_opts(exchange_control.EXCHANGE_OPTS) @@ -228,8 +236,10 @@ class NotificationService(service_base.BaseService): urls = cfg.CONF.notification.messaging_urls or [None] for url in urls: transport = messaging.get_transport(url) - listener = messaging.get_notification_listener( - transport, targets, endpoints) + listener = messaging.get_batch_notification_listener( + transport, targets, endpoints, + batch_size=cfg.CONF.notification.batch_size, + batch_timeout=cfg.CONF.notification.batch_timeout) listener.start() self.listeners.append(listener) @@ -272,10 +282,12 @@ class NotificationService(service_base.BaseService): pipe_endpoint = (pipeline.EventPipelineEndpoint if isinstance(pipe, pipeline.EventPipeline) else pipeline.SamplePipelineEndpoint) - listener = messaging.get_notification_listener( + listener = messaging.get_batch_notification_listener( transport, [oslo_messaging.Target(topic=topic)], - [pipe_endpoint(self.ctxt, pipe)]) + [pipe_endpoint(self.ctxt, pipe)], + batch_size=cfg.CONF.notification.batch_size, + batch_timeout=cfg.CONF.notification.batch_timeout) listener.start() self.pipeline_listeners.append(listener) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index cf69d986e5..af20f3ab20 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -19,6 +19,7 @@ import abc import hashlib +from itertools import chain import os from oslo_config import cfg @@ -83,12 +84,13 @@ class PipelineEndpoint(object): self.publish_context = PublishContext(context, [pipeline]) @abc.abstractmethod - def sample(self, ctxt, publisher_id, event_type, payload, metadata): + def sample(self, messages): pass class SamplePipelineEndpoint(PipelineEndpoint): - def sample(self, ctxt, publisher_id, event_type, payload, metadata): + def sample(self, messages): + samples = chain.from_iterable(m["payload"] for m in messages) samples = [ sample_util.Sample(name=s['counter_name'], type=s['counter_type'], @@ -100,7 +102,7 @@ class SamplePipelineEndpoint(PipelineEndpoint): timestamp=s['timestamp'], resource_metadata=s['resource_metadata'], source=s.get('source')) - for s in payload if publisher_utils.verify_signature( + for s in samples if publisher_utils.verify_signature( s, cfg.CONF.publisher.telemetry_secret) ] with self.publish_context as p: @@ -108,7 +110,8 @@ class SamplePipelineEndpoint(PipelineEndpoint): class EventPipelineEndpoint(PipelineEndpoint): - def sample(self, ctxt, publisher_id, event_type, payload, metadata): + def sample(self, messages): + events = chain.from_iterable(m["payload"] for m in messages) events = [ models.Event( message_id=ev['message_id'], @@ -119,7 +122,7 @@ class EventPipelineEndpoint(PipelineEndpoint): models.Trait.convert_value(dtype, value)) for name, dtype, value in ev['traits']], raw=ev.get('raw', {})) - for ev in payload if publisher_utils.verify_signature( + for ev in events if publisher_utils.verify_signature( ev, cfg.CONF.publisher.telemetry_secret) ] try: diff --git a/ceilometer/tests/functional/test_notification.py b/ceilometer/tests/functional/test_notification.py index e74c9b89f2..03382d9354 100644 --- a/ceilometer/tests/functional/test_notification.py +++ b/ceilometer/tests/functional/test_notification.py @@ -131,9 +131,11 @@ class TestNotification(tests_base.BaseTestCase): self._do_process_notification_manager_start() self.srv.pipeline_manager.pipelines[0] = mock.MagicMock() - self.plugin.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', - 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + self.plugin.info([{'ctxt': TEST_NOTICE_CTXT, + 'publisher_id': 'compute.vagrant-precise', + 'event_type': 'compute.instance.create.end', + 'payload': TEST_NOTICE_PAYLOAD, + 'metadata': TEST_NOTICE_METADATA}]) self.assertEqual(1, len(self.srv.listeners[0].dispatcher.endpoints)) self.assertTrue(self.srv.pipeline_manager.publisher.called) @@ -415,9 +417,12 @@ class TestRealNotificationHA(BaseRealNotification): not endpoint.filter_rule.match(None, None, 'nonmatching.end', None, None)): continue - endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', - 'nonmatching.end', - TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + endpoint.info([{ + 'ctxt': TEST_NOTICE_CTXT, + 'publisher_id': 'compute.vagrant-precise', + 'event_type': 'nonmatching.end', + 'payload': TEST_NOTICE_PAYLOAD, + 'metadata': TEST_NOTICE_METADATA}]) self.assertFalse(mock_notifier.called) for endpoint in self.srv.listeners[0].dispatcher.endpoints: if (hasattr(endpoint, 'filter_rule') and @@ -425,9 +430,13 @@ class TestRealNotificationHA(BaseRealNotification): 'compute.instance.create.end', None, None)): continue - endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', - 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + endpoint.info([{ + 'ctxt': TEST_NOTICE_CTXT, + 'publisher_id': 'compute.vagrant-precise', + 'event_type': 'compute.instance.create.end', + 'payload': TEST_NOTICE_PAYLOAD, + 'metadata': TEST_NOTICE_METADATA}]) + self.assertTrue(mock_notifier.called) self.assertEqual(3, mock_notifier.call_count) self.assertEqual('pipeline.event', diff --git a/ceilometer/tests/unit/agent/test_plugin.py b/ceilometer/tests/unit/agent/test_plugin.py index 3f16037103..59f1f8783e 100644 --- a/ceilometer/tests/unit/agent/test_plugin.py +++ b/ceilometer/tests/unit/agent/test_plugin.py @@ -37,13 +37,16 @@ class NotificationBaseTestCase(base.BaseTestCase): def test_plugin_info(self): plugin = self.FakePlugin(mock.Mock()) plugin.to_samples_and_publish = mock.Mock() - ctxt = {'user_id': 'fake_user_id', 'project_id': 'fake_project_id'} - publisher_id = 'fake.publisher_id' - event_type = 'fake.event' - payload = {'foo': 'bar'} - metadata = {'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8', - 'timestamp': '2015-06-1909:19:35.786893'} - plugin.info(ctxt, publisher_id, event_type, payload, metadata) + message = { + 'ctxt': {'user_id': 'fake_user_id', + 'project_id': 'fake_project_id'}, + 'publisher_id': 'fake.publisher_id', + 'event_type': 'fake.event', + 'payload': {'foo': 'bar'}, + 'metadata': {'message_id': '3577a84f-29ec-4904-9566-12c52289c2e8', + 'timestamp': '2015-06-1909:19:35.786893'} + } + plugin.info([message]) notification = { 'priority': 'info', 'event_type': 'fake.event', diff --git a/ceilometer/tests/unit/event/test_endpoint.py b/ceilometer/tests/unit/event/test_endpoint.py index 613f8b8c39..c79d3b227a 100644 --- a/ceilometer/tests/unit/event/test_endpoint.py +++ b/ceilometer/tests/unit/event/test_endpoint.py @@ -142,18 +142,23 @@ class TestEventEndpoint(tests_base.BaseTestCase): def test_message_to_event(self): self._setup_endpoint(['test://']) - self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', - 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + self.endpoint.info([{'ctxt': TEST_NOTICE_CTXT, + 'publisher_id': 'compute.vagrant-precise', + 'event_type': 'compute.instance.create.end', + 'payload': TEST_NOTICE_PAYLOAD, + 'metadata': TEST_NOTICE_METADATA}]) def test_bad_event_non_ack_and_requeue(self): self._setup_endpoint(['test://']) self.fake_publisher.publish_events.side_effect = Exception self.CONF.set_override("ack_on_event_error", False, group="notification") - ret = self.endpoint.info(TEST_NOTICE_CTXT, 'compute.vagrant-precise', - 'compute.instance.create.end', - TEST_NOTICE_PAYLOAD, TEST_NOTICE_METADATA) + ret = self.endpoint.info([{'ctxt': TEST_NOTICE_CTXT, + 'publisher_id': 'compute.vagrant-precise', + 'event_type': 'compute.instance.create.end', + 'payload': TEST_NOTICE_PAYLOAD, + 'metadata': TEST_NOTICE_METADATA}]) + self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) def test_message_to_event_bad_event(self): @@ -162,9 +167,13 @@ class TestEventEndpoint(tests_base.BaseTestCase): self.CONF.set_override("ack_on_event_error", False, group="notification") - message = {'event_type': "foo", 'message_id': "abc"} + message = { + 'payload': {'event_type': "foo", 'message_id': "abc"}, + 'metadata': {}, + 'ctxt': {} + } with mock.patch("ceilometer.pipeline.LOG") as mock_logger: - ret = self.endpoint.process_notification(message) + ret = self.endpoint.process_notification('info', [message]) self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret) exception_mock = mock_logger.exception self.assertIn('Exit after error from publisher', @@ -178,10 +187,13 @@ class TestEventEndpoint(tests_base.BaseTestCase): self.CONF.set_override("ack_on_event_error", False, group="notification") - message = {'event_type': "foo", 'message_id': "abc"} - + message = { + 'payload': {'event_type': "foo", 'message_id': "abc"}, + 'metadata': {}, + 'ctxt': {} + } with mock.patch("ceilometer.pipeline.LOG") as mock_logger: - ret = self.endpoint.process_notification(message) + ret = self.endpoint.process_notification('info', [message]) self.assertEqual(oslo_messaging.NotificationResult.HANDLED, ret) exception_mock = mock_logger.exception self.assertIn('Continue after error from publisher', diff --git a/ceilometer/tests/unit/test_event_pipeline.py b/ceilometer/tests/unit/test_event_pipeline.py index fac9d51b86..66fedce0bc 100644 --- a/ceilometer/tests/unit/test_event_pipeline.py +++ b/ceilometer/tests/unit/test_event_pipeline.py @@ -404,6 +404,7 @@ class EventPipelineTestCase(base.BaseTestCase): mock.Mock(), pipeline_manager.pipelines[0]) fake_publisher.publish_events.side_effect = Exception - ret = event_pipeline_endpoint.sample(None, 'compute.vagrant-precise', - 'a', [test_data], None) + ret = event_pipeline_endpoint.sample([ + {'ctxt': {}, 'publisher_id': 'compute.vagrant-precise', + 'event_type': 'a', 'payload': [test_data], 'metadata': {}}]) self.assertEqual(oslo_messaging.NotificationResult.REQUEUE, ret)