diff --git a/oslo/messaging/notify/dispatcher.py b/oslo/messaging/notify/dispatcher.py index 87deee85..5c58832f 100644 --- a/oslo/messaging/notify/dispatcher.py +++ b/oslo/messaging/notify/dispatcher.py @@ -101,6 +101,10 @@ class NotificationDispatcher(object): publisher_id = message.get('publisher_id') event_type = message.get('event_type') + metadata = { + 'message_id': message.get('message_id'), + 'timestamp': message.get('timestamp') + } priority = message.get('priority', '').lower() if priority not in PRIORITIES: LOG.warning('Unknown priority "%s"' % priority) @@ -112,7 +116,8 @@ class NotificationDispatcher(object): for callback in self._callbacks_by_priority.get(priority, []): localcontext.set_local_context(ctxt) try: - ret = callback(ctxt, publisher_id, event_type, payload) + ret = callback(ctxt, publisher_id, event_type, payload, + metadata) ret = NotificationResult.HANDLED if ret is None else ret if ret != NotificationResult.HANDLED: return ret diff --git a/oslo/messaging/notify/listener.py b/oslo/messaging/notify/listener.py index 06a2e24c..017f797f 100644 --- a/oslo/messaging/notify/listener.py +++ b/oslo/messaging/notify/listener.py @@ -45,11 +45,11 @@ A simple example of a notification listener with multiple endpoints might be:: from oslo import messaging class NotificationEndpoint(object): - def warn(self, ctxt, publisher_id, event_type, payload): + def warn(self, ctxt, publisher_id, event_type, payload, metadata): do_something(payload) class ErrorEndpoint(object): - def error(self, ctxt, publisher_id, event_type, payload): + def error(self, ctxt, publisher_id, event_type, payload, metadata): do_something(payload) transport = messaging.get_transport(cfg.CONF) diff --git a/tests/test_notify_dispatcher.py b/tests/test_notify_dispatcher.py index 3e2c966a..afe0bd02 100644 --- a/tests/test_notify_dispatcher.py +++ b/tests/test_notify_dispatcher.py @@ -118,7 +118,10 @@ class TestDispatcher(test_utils.BaseTestCase): method = getattr(endpoints[i], m) expected = [mock.call({}, msg['publisher_id'], msg['event_type'], - msg['payload'])] + msg['payload'], { + 'timestamp': mock.ANY, + 'message_id': mock.ANY + })] self.assertEqual(method.call_args_list, expected) else: self.assertEqual(endpoints[i].call_count, 0) diff --git a/tests/test_notify_listener.py b/tests/test_notify_listener.py index 317de71f..ddcf584f 100644 --- a/tests/test_notify_listener.py +++ b/tests/test_notify_listener.py @@ -131,7 +131,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): self._stop_listener(listener_thread) endpoint.info.assert_called_once_with( - {}, 'testpublisher', 'an_event.start', 'test message') + {}, 'testpublisher', 'an_event.start', 'test message', + {'message_id': mock.ANY, 'timestamp': mock.ANY}) def test_two_topics(self): transport = messaging.get_transport(self.conf, url='fake:') @@ -143,14 +144,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): listener_thread = self._setup_listener(transport, [endpoint], 2, targets=targets) notifier = self._setup_notifier(transport, topic='topic1') - notifier.info({}, 'an_event.start1', 'test') + notifier.info({'ctxt': '1'}, 'an_event.start1', 'test') notifier = self._setup_notifier(transport, topic='topic2') - notifier.info({}, 'an_event.start2', 'test') + notifier.info({'ctxt': '2'}, 'an_event.start2', 'test') self._stop_listener(listener_thread) - expected = [mock.call({}, 'testpublisher', 'an_event.start1', 'test'), - mock.call({}, 'testpublisher', 'an_event.start2', 'test')] + expected = [mock.call({'ctxt': '1'}, 'testpublisher', + 'an_event.start1', 'test', + {'timestamp': mock.ANY, 'message_id': mock.ANY}), + mock.call({'ctxt': '2'}, 'testpublisher', + 'an_event.start2', 'test', + {'timestamp': mock.ANY, 'message_id': mock.ANY})] + self.assertEqual(sorted(endpoint.info.call_args_list), expected) def test_two_exchanges(self): @@ -175,18 +181,23 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): transport._send_notification = mock.MagicMock( side_effect=side_effect) - notifier.info({}, 'an_event.start', 'test message default exchange') + notifier.info({'ctxt': '0'}, + 'an_event.start', 'test message default exchange') mock_notifier_exchange('exchange1') - notifier.info({}, 'an_event.start', 'test message exchange1') + notifier.info({'ctxt': '1'}, + 'an_event.start', 'test message exchange1') mock_notifier_exchange('exchange2') - notifier.info({}, 'an_event.start', 'test message exchange2') + notifier.info({'ctxt': '2'}, + 'an_event.start', 'test message exchange2') self._stop_listener(listener_thread) - expected = [mock.call({}, 'testpublisher', 'an_event.start', - 'test message exchange1'), - mock.call({}, 'testpublisher', 'an_event.start', - 'test message exchange2')] + expected = [mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start', + 'test message exchange1', + {'timestamp': mock.ANY, 'message_id': mock.ANY}), + mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start', + 'test message exchange2', + {'timestamp': mock.ANY, 'message_id': mock.ANY})] self.assertEqual(sorted(endpoint.info.call_args_list), expected) def test_two_endpoints(self): @@ -204,9 +215,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): self._stop_listener(listener_thread) endpoint1.info.assert_called_once_with( - {}, 'testpublisher', 'an_event.start', 'test') + {}, 'testpublisher', 'an_event.start', 'test', { + 'timestamp': mock.ANY, + 'message_id': mock.ANY}) + endpoint2.info.assert_called_once_with( - {}, 'testpublisher', 'an_event.start', 'test') + {}, 'testpublisher', 'an_event.start', 'test', { + 'timestamp': mock.ANY, + 'message_id': mock.ANY}) def test_requeue(self): transport = messaging.get_transport(self.conf, url='fake:') @@ -226,6 +242,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin): self._stop_listener(listener_thread) - expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'), - mock.call({}, 'testpublisher', 'an_event.start', 'test')] + expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test', + {'timestamp': mock.ANY, 'message_id': mock.ANY}), + mock.call({}, 'testpublisher', 'an_event.start', 'test', + {'timestamp': mock.ANY, 'message_id': mock.ANY})] self.assertEqual(endpoint.info.call_args_list, expected)