NotabeneHandler no longer formats notifications.
It just passes it through verbatim. Only required key is 'event_type'. Change-Id: Idcd911c9f09d405d57ecaf5b14b72bfebe2a179a
This commit is contained in:
parent
e08924f1f8
commit
4cb16327c7
|
@ -1,7 +1,7 @@
|
||||||
[metadata]
|
[metadata]
|
||||||
description-file = README.md
|
description-file = README.md
|
||||||
name = winchester
|
name = winchester
|
||||||
version = 0.5
|
version = 0.51
|
||||||
author = Monsyne Dragon
|
author = Monsyne Dragon
|
||||||
author_email = mdragon@rackspace.com
|
author_email = mdragon@rackspace.com
|
||||||
summary = An OpenStack notification event processing library.
|
summary = An OpenStack notification event processing library.
|
||||||
|
|
|
@ -180,41 +180,17 @@ class TestNotabeneHandler(unittest.TestCase):
|
||||||
self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish'])
|
self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish'])
|
||||||
|
|
||||||
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
|
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
|
||||||
def test_format_notification(self, cm):
|
def test_commit_good(self, cm):
|
||||||
cm.return_value = (1, 2)
|
cm.return_value = (1, 2)
|
||||||
kw = {'queue_name': 'foo'}
|
kw = {'queue_name': 'foo'}
|
||||||
h = pipeline_handler.NotabeneHandler(**kw)
|
h = pipeline_handler.NotabeneHandler(**kw)
|
||||||
notification = {}
|
|
||||||
n = h._format_notification(notification)
|
|
||||||
self.assertEquals(n, {'event_type': None,
|
|
||||||
'message_id': None,
|
|
||||||
'publisher_id': 'stv3',
|
|
||||||
'timestamp': 'None',
|
|
||||||
'payload': {}})
|
|
||||||
|
|
||||||
now = datetime.datetime.utcnow()
|
h.pending_notifications = [{'event_type': 'event1'},
|
||||||
notification = {'event_type': 'name',
|
{'event_type': 'event2'}]
|
||||||
'message_id': '1234',
|
with mock.patch.object(pipeline_handler.driver,
|
||||||
'timestamp': now,
|
'send_notification') as sn:
|
||||||
'service': 'tests'}
|
h.commit()
|
||||||
n = h._format_notification(notification)
|
self.assertEquals(sn.call_count, 2)
|
||||||
self.assertEquals(n, {'event_type': 'name',
|
|
||||||
'message_id': '1234',
|
|
||||||
'timestamp': str(now),
|
|
||||||
'publisher_id': 'tests',
|
|
||||||
'payload': {}})
|
|
||||||
|
|
||||||
notification = {'event_type': 'name',
|
|
||||||
'message_id': '1234',
|
|
||||||
'timestamp': now,
|
|
||||||
'service': 'tests',
|
|
||||||
'extra1': 'e1', 'extra2': 'e2'}
|
|
||||||
n = h._format_notification(notification)
|
|
||||||
self.assertEquals(n, {'event_type': 'name',
|
|
||||||
'message_id': '1234',
|
|
||||||
'timestamp': str(now),
|
|
||||||
'publisher_id': 'tests',
|
|
||||||
'payload': {'extra1': 'e1', 'extra2': 'e2'}})
|
|
||||||
|
|
||||||
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
|
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
|
||||||
def test_commit(self, cm):
|
def test_commit(self, cm):
|
||||||
|
@ -222,28 +198,13 @@ class TestNotabeneHandler(unittest.TestCase):
|
||||||
kw = {'queue_name': 'foo'}
|
kw = {'queue_name': 'foo'}
|
||||||
h = pipeline_handler.NotabeneHandler(**kw)
|
h = pipeline_handler.NotabeneHandler(**kw)
|
||||||
|
|
||||||
h.pending_notifications = range(2)
|
h.pending_notifications = [{'event_type': 'event1'},
|
||||||
with mock.patch.object(h, '_format_notification') as fn:
|
{'event_type': 'event2'}]
|
||||||
fn.return_value = {'event_type': 'event1'}
|
with mock.patch.object(pipeline_handler.driver,
|
||||||
with mock.patch.object(pipeline_handler.driver,
|
'send_notification') as sn:
|
||||||
'send_notification') as sn:
|
sn.side_effect = TestException
|
||||||
|
with mock.patch.object(pipeline_handler.logger,
|
||||||
|
'exception') as ex:
|
||||||
h.commit()
|
h.commit()
|
||||||
self.assertEquals(sn.call_count, 2)
|
self.assertEquals(ex.call_count, 2)
|
||||||
|
self.assertEquals(sn.call_count, 2)
|
||||||
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
|
|
||||||
def test_commit(self, cm):
|
|
||||||
cm.return_value = (1, 2)
|
|
||||||
kw = {'queue_name': 'foo'}
|
|
||||||
h = pipeline_handler.NotabeneHandler(**kw)
|
|
||||||
|
|
||||||
h.pending_notifications = range(2)
|
|
||||||
with mock.patch.object(h, '_format_notification') as fn:
|
|
||||||
fn.return_value = {'event_type': 'event1'}
|
|
||||||
with mock.patch.object(pipeline_handler.driver,
|
|
||||||
'send_notification') as sn:
|
|
||||||
sn.side_effect = TestException
|
|
||||||
with mock.patch.object(pipeline_handler.logger,
|
|
||||||
'exception') as ex:
|
|
||||||
h.commit()
|
|
||||||
self.assertEquals(ex.call_count, 2)
|
|
||||||
self.assertEquals(sn.call_count, 2)
|
|
||||||
|
|
|
@ -186,32 +186,9 @@ class NotabeneHandler(PipelineHandlerBase):
|
||||||
self.pending_notifications.extend(env.get(key, []))
|
self.pending_notifications.extend(env.get(key, []))
|
||||||
return events
|
return events
|
||||||
|
|
||||||
def _format_notification(self, notification):
|
|
||||||
"""Core traits are in the root of the notification and extra
|
|
||||||
traits go in the payload."""
|
|
||||||
core_keys = ['event_type', 'message_id', 'timestamp', 'service']
|
|
||||||
core = dict((key, notification.get(key)) for key in core_keys)
|
|
||||||
|
|
||||||
payload = dict((key, notification[key])
|
|
||||||
for key in notification.keys()
|
|
||||||
if key not in core_keys)
|
|
||||||
|
|
||||||
core['payload'] = payload
|
|
||||||
|
|
||||||
# Notifications require "publisher_id", not "service" ...
|
|
||||||
publisher = core.get('service')
|
|
||||||
if not publisher:
|
|
||||||
publisher = "stv3"
|
|
||||||
core['publisher_id'] = publisher
|
|
||||||
del core['service']
|
|
||||||
|
|
||||||
core['timestamp'] = str(core['timestamp'])
|
|
||||||
return core
|
|
||||||
|
|
||||||
def commit(self):
|
def commit(self):
|
||||||
for notification in self.pending_notifications:
|
for notification in self.pending_notifications:
|
||||||
notification = self._format_notification(notification)
|
logger.info("Publishing '%s' to '%s' with routing_key '%s'" %
|
||||||
logger.debug("Publishing '%s' to '%s' with routing_key '%s'" %
|
|
||||||
(notification['event_type'], self.exchange,
|
(notification['event_type'], self.exchange,
|
||||||
self.queue_name))
|
self.queue_name))
|
||||||
try:
|
try:
|
||||||
|
@ -410,7 +387,8 @@ class UsageHandler(PipelineHandlerBase):
|
||||||
apb, ape, len(block)))
|
apb, ape, len(block)))
|
||||||
|
|
||||||
if len(block) > 1:
|
if len(block) > 1:
|
||||||
logger.warn("Events for Stream: %s" % self.stream_id)
|
logger.warn("%s - events (stream: %s)"
|
||||||
|
% (event_type, self.stream_id))
|
||||||
for event in block:
|
for event in block:
|
||||||
logger.warn("^Event: %s - %s" %
|
logger.warn("^Event: %s - %s" %
|
||||||
(event['timestamp'], event['event_type']))
|
(event['timestamp'], event['event_type']))
|
||||||
|
@ -420,6 +398,7 @@ class UsageHandler(PipelineHandlerBase):
|
||||||
if self.warnings:
|
if self.warnings:
|
||||||
instance_id = exists.get('instance_id', 'n/a')
|
instance_id = exists.get('instance_id', 'n/a')
|
||||||
warning_event = {'event_type': 'compute.instance.exists.warnings',
|
warning_event = {'event_type': 'compute.instance.exists.warnings',
|
||||||
|
'publisher_id': 'stv3',
|
||||||
'message_id': str(uuid.uuid4()),
|
'message_id': str(uuid.uuid4()),
|
||||||
'timestamp': exists.get('timestamp',
|
'timestamp': exists.get('timestamp',
|
||||||
datetime.datetime.utcnow()),
|
datetime.datetime.utcnow()),
|
||||||
|
@ -431,8 +410,9 @@ class UsageHandler(PipelineHandlerBase):
|
||||||
new_event = self._base_notification(exists)
|
new_event = self._base_notification(exists)
|
||||||
new_event.update({'event_type': event_type,
|
new_event.update({'event_type': event_type,
|
||||||
'message_id': str(uuid.uuid4()),
|
'message_id': str(uuid.uuid4()),
|
||||||
'timestamp': str(exists.get('timestamp',
|
'publisher_id': 'stv3',
|
||||||
datetime.datetime.utcnow())),
|
'timestamp': exists.get('timestamp',
|
||||||
|
datetime.datetime.utcnow()),
|
||||||
'stream_id': int(self.stream_id),
|
'stream_id': int(self.stream_id),
|
||||||
'error': str(error),
|
'error': str(error),
|
||||||
'error_code': error and error.code})
|
'error_code': error and error.code})
|
||||||
|
|
Loading…
Reference in New Issue