Add missing data into the notif. endpoint callback

Some information about the notification (ie: message_id and timestamp)
are currently not available for the application.

This change add them.

Partial implements blueprint notification-subscriber-server

Change-Id: I83e562725205fb270f6065fe1118c3c9865b2294
This commit is contained in:
Mehdi Abaakouk 2014-02-27 17:37:14 +01:00 committed by Mehdi Abaakouk
parent 71ac681a73
commit ed2a1545c0
4 changed files with 46 additions and 20 deletions

View File

@ -101,6 +101,10 @@ class NotificationDispatcher(object):
publisher_id = message.get('publisher_id')
event_type = message.get('event_type')
metadata = {
'message_id': message.get('message_id'),
'timestamp': message.get('timestamp')
}
priority = message.get('priority', '').lower()
if priority not in PRIORITIES:
LOG.warning('Unknown priority "%s"' % priority)
@ -112,7 +116,8 @@ class NotificationDispatcher(object):
for callback in self._callbacks_by_priority.get(priority, []):
localcontext.set_local_context(ctxt)
try:
ret = callback(ctxt, publisher_id, event_type, payload)
ret = callback(ctxt, publisher_id, event_type, payload,
metadata)
ret = NotificationResult.HANDLED if ret is None else ret
if ret != NotificationResult.HANDLED:
return ret

View File

@ -45,11 +45,11 @@ A simple example of a notification listener with multiple endpoints might be::
from oslo import messaging
class NotificationEndpoint(object):
def warn(self, ctxt, publisher_id, event_type, payload):
def warn(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
class ErrorEndpoint(object):
def error(self, ctxt, publisher_id, event_type, payload):
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = messaging.get_transport(cfg.CONF)

View File

@ -118,7 +118,10 @@ class TestDispatcher(test_utils.BaseTestCase):
method = getattr(endpoints[i], m)
expected = [mock.call({}, msg['publisher_id'],
msg['event_type'],
msg['payload'])]
msg['payload'], {
'timestamp': mock.ANY,
'message_id': mock.ANY
})]
self.assertEqual(method.call_args_list, expected)
else:
self.assertEqual(endpoints[i].call_count, 0)

View File

@ -131,7 +131,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self._stop_listener(listener_thread)
endpoint.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test message')
{}, 'testpublisher', 'an_event.start', 'test message',
{'message_id': mock.ANY, 'timestamp': mock.ANY})
def test_two_topics(self):
transport = messaging.get_transport(self.conf, url='fake:')
@ -143,14 +144,19 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
listener_thread = self._setup_listener(transport, [endpoint], 2,
targets=targets)
notifier = self._setup_notifier(transport, topic='topic1')
notifier.info({}, 'an_event.start1', 'test')
notifier.info({'ctxt': '1'}, 'an_event.start1', 'test')
notifier = self._setup_notifier(transport, topic='topic2')
notifier.info({}, 'an_event.start2', 'test')
notifier.info({'ctxt': '2'}, 'an_event.start2', 'test')
self._stop_listener(listener_thread)
expected = [mock.call({}, 'testpublisher', 'an_event.start1', 'test'),
mock.call({}, 'testpublisher', 'an_event.start2', 'test')]
expected = [mock.call({'ctxt': '1'}, 'testpublisher',
'an_event.start1', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({'ctxt': '2'}, 'testpublisher',
'an_event.start2', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY})]
self.assertEqual(sorted(endpoint.info.call_args_list), expected)
def test_two_exchanges(self):
@ -175,18 +181,23 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
transport._send_notification = mock.MagicMock(
side_effect=side_effect)
notifier.info({}, 'an_event.start', 'test message default exchange')
notifier.info({'ctxt': '0'},
'an_event.start', 'test message default exchange')
mock_notifier_exchange('exchange1')
notifier.info({}, 'an_event.start', 'test message exchange1')
notifier.info({'ctxt': '1'},
'an_event.start', 'test message exchange1')
mock_notifier_exchange('exchange2')
notifier.info({}, 'an_event.start', 'test message exchange2')
notifier.info({'ctxt': '2'},
'an_event.start', 'test message exchange2')
self._stop_listener(listener_thread)
expected = [mock.call({}, 'testpublisher', 'an_event.start',
'test message exchange1'),
mock.call({}, 'testpublisher', 'an_event.start',
'test message exchange2')]
expected = [mock.call({'ctxt': '1'}, 'testpublisher', 'an_event.start',
'test message exchange1',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({'ctxt': '2'}, 'testpublisher', 'an_event.start',
'test message exchange2',
{'timestamp': mock.ANY, 'message_id': mock.ANY})]
self.assertEqual(sorted(endpoint.info.call_args_list), expected)
def test_two_endpoints(self):
@ -204,9 +215,14 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self._stop_listener(listener_thread)
endpoint1.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test')
{}, 'testpublisher', 'an_event.start', 'test', {
'timestamp': mock.ANY,
'message_id': mock.ANY})
endpoint2.info.assert_called_once_with(
{}, 'testpublisher', 'an_event.start', 'test')
{}, 'testpublisher', 'an_event.start', 'test', {
'timestamp': mock.ANY,
'message_id': mock.ANY})
def test_requeue(self):
transport = messaging.get_transport(self.conf, url='fake:')
@ -226,6 +242,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self._stop_listener(listener_thread)
expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test'),
mock.call({}, 'testpublisher', 'an_event.start', 'test')]
expected = [mock.call({}, 'testpublisher', 'an_event.start', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY}),
mock.call({}, 'testpublisher', 'an_event.start', 'test',
{'timestamp': mock.ANY, 'message_id': mock.ANY})]
self.assertEqual(endpoint.info.call_args_list, expected)