Notabene pipeline handler

For publishing new events into an exchange.

PipelineHandlers can return new Events for subsequent processing.
However, sometimes we need to publish Notifications. Remember that
Notifications are less restrictive than Events. They can be larger.
They can be nested. They can contain lists.

We store these pending notifications in the handler env variable
so downstream handlers can access them.

The NotabeneHandler takes a configuration variable 'env_keys' which
specifies the env keys to look for. Lists of notifications in these
variables are published to the queue using the connection parameters
supplied.

Errors in transmission are logged and ignored since exceptions during
the commit() phase do not flag the steam as in error.

A sample pipeline definition might look like this:

test_expire_pipeline:
    - logger
    - usage
    - name: notabene
      params:⋅
        host: localhost
        user: guest
        password: guest
        port: 5672
        vhost: /
        library: librabbitmq
        exchange: nova
        exchange_type: topic
        queue_name: monitor.info
        env_keys:
            - usage_notifications

Change-Id: If1958135ad6fbed88e2c18b9fac7efde51ee3113
This commit is contained in:
Sandy Walsh 2015-02-02 19:33:03 -08:00
parent 6557de033c
commit ce91d56d51
5 changed files with 394 additions and 8 deletions

View File

@ -8,3 +8,4 @@ enum34>=1.0
SQLAlchemy>=0.9.6
python-dateutil
requests
notabene>=0.0.dev0

249
tests/test_notabene.py Normal file
View File

@ -0,0 +1,249 @@
import unittest2 as unittest
import datetime
import mock
from winchester import pipeline_handler
class TestConnectionManager(unittest.TestCase):
def setUp(self):
super(TestConnectionManager, self).setUp()
self.mgr = pipeline_handler.ConnectionManager()
def test_extract_params(self):
with self.assertRaises(pipeline_handler.NotabeneException):
self.mgr._extract_params({})
cd, ct, ed, et = self.mgr._extract_params({'exchange': 'my_exchange'})
self.assertEquals(cd, {'host': 'localhost',
'port': 5672,
'user': 'guest',
'password': 'guest',
'library': 'librabbitmq',
'vhost': '/'})
self.assertEquals(ct, (('host', 'localhost'),
('library', 'librabbitmq'),
('password', 'guest'),
('port', 5672),
('user', 'guest'),
('vhost', '/')))
self.assertEquals(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'topic'})
self.assertEquals(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'topic')))
kw = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd',
'port': 123, 'vhost': 'virtual', 'library': 'my_lib',
'exchange': 'my_exchange', 'exchange_type': 'foo'}
cd, ct, ed, et = self.mgr._extract_params(kw)
self.assertEquals(cd, {'host': 'my_host',
'port': 123,
'user': 'my_user',
'password': 'pwd',
'library': 'my_lib',
'vhost': 'virtual'})
self.assertEquals(ct, (('host', 'my_host'),
('library', 'my_lib'),
('password', 'pwd'),
('port', 123),
('user', 'my_user'),
('vhost', 'virtual')))
self.assertEquals(ed, {'exchange_name': 'my_exchange',
'exchange_type': 'foo'})
self.assertEquals(et, (('exchange_name', 'my_exchange'),
('exchange_type', 'foo')))
@mock.patch.object(pipeline_handler.ConnectionManager, '_extract_params')
@mock.patch.object(pipeline_handler.driver, 'create_connection')
@mock.patch.object(pipeline_handler.driver, 'create_exchange')
@mock.patch.object(pipeline_handler.driver, 'create_queue')
def test_get_connection(self, cq, ce, cc, ep):
conn = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd',
'port': 123, 'vhost': 'virtual', 'library': 'my_lib'}
conn_set = tuple(sorted(conn.items()))
exchange = {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange_set = tuple(sorted(exchange.items()))
ep.return_value = (conn, conn_set, exchange, exchange_set)
connection = mock.MagicMock("the connection")
channel = mock.MagicMock("the channel")
connection.channel = channel
cc.return_value = connection
mexchange = mock.MagicMock("the exchange")
ce.return_value = mexchange
queue = mock.MagicMock("the queue")
queue.declare = mock.MagicMock()
cq.return_value = queue
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange)
self.assertEquals(1, queue.declare.call_count)
# Calling again should give the same results ...
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange)
self.assertTrue(queue.declare.called)
self.assertEquals(1, queue.declare.call_count)
# Change the exchange, and we should have same connection, but new
# exchange object.
exchange2 = {'exchange_name': 'my_exchange2', 'exchange_type': 'foo2'}
exchange2_set = tuple(sorted(exchange2.items()))
ep.return_value = (conn, conn_set, exchange2, exchange2_set)
mexchange2 = mock.MagicMock("the exchange 2")
ce.return_value = mexchange2
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection)
self.assertEquals(final_exchange, mexchange2)
self.assertEquals(2, queue.declare.call_count)
# Change the connection, and we should have a new connection and new
# exchange object.
conn2 = {'host': 'my_host2', 'user': 'my_user2', 'password': 'pwd2',
'port': 1234, 'vhost': 'virtual2', 'library': 'my_lib2'}
conn2_set = tuple(sorted(conn2.items()))
exchange3= {'exchange_name': 'my_exchange', 'exchange_type': 'foo'}
exchange3_set = tuple(sorted(exchange3.items()))
ep.return_value = (conn2, conn2_set, exchange3, exchange3_set)
mexchange3 = mock.MagicMock("the exchange 3")
ce.return_value = mexchange3
connection2 = mock.MagicMock("the connection 2")
channel2 = mock.MagicMock("the channel 2")
connection2.channel = channel2
cc.return_value = connection2
final_connection, final_exchange = self.mgr.get_connection({}, "foo")
self.assertEquals(final_connection, connection2)
self.assertEquals(final_exchange, mexchange3)
self.assertEquals(3, queue.declare.call_count)
class TestException(Exception):
pass
class TestNotabeneHandler(unittest.TestCase):
def test_constructor_no_queue(self):
with self.assertRaises(pipeline_handler.NotabeneException) as e:
pipeline_handler.NotabeneHandler()
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_constructor_queue(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
self.assertIsNotNone(h.connection)
self.assertIsNotNone(h.exchange)
self.assertEquals(h.env_keys, [])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_constructor_env_keys(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo', 'env_keys': ['x', 'y']}
h = pipeline_handler.NotabeneHandler(**kw)
self.assertIsNotNone(h.connection)
self.assertIsNotNone(h.exchange)
self.assertEquals(h.env_keys, ['x', 'y'])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_handle_events(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo', 'env_keys': ['x', 'y', 'z']}
h = pipeline_handler.NotabeneHandler(**kw)
events = range(5)
env = {'x': ['cat', 'dog'], 'y': ['fish']}
ret = h.handle_events(events, env)
self.assertEquals(ret, events)
self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish'])
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_format_notification(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
notification = {}
n = h._format_notification(notification)
self.assertEquals(n, {'event_type': None,
'message_id': None,
'publisher_id': 'stv3',
'timestamp': 'None',
'payload': {}})
now = datetime.datetime.utcnow()
notification = {'event_type': 'name',
'message_id': '1234',
'timestamp': now,
'service': 'tests'}
n = h._format_notification(notification)
self.assertEquals(n, {'event_type': 'name',
'message_id': '1234',
'timestamp': str(now),
'publisher_id': 'tests',
'payload': {}})
notification = {'event_type': 'name',
'message_id': '1234',
'timestamp': now,
'service': 'tests',
'extra1': 'e1', 'extra2': 'e2'}
n = h._format_notification(notification)
self.assertEquals(n, {'event_type': 'name',
'message_id': '1234',
'timestamp': str(now),
'publisher_id': 'tests',
'payload': {'extra1': 'e1', 'extra2': 'e2'}})
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_commit(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
h.pending_notifications = range(2)
with mock.patch.object(h, '_format_notification') as fn:
fn.return_value = {'event_type': 'event1'}
with mock.patch.object(pipeline_handler.driver,
'send_notification') as sn:
h.commit()
self.assertEquals(sn.call_count, 2)
@mock.patch.object(pipeline_handler.connection_manager, 'get_connection')
def test_commit(self, cm):
cm.return_value = (1, 2)
kw = {'queue_name': 'foo'}
h = pipeline_handler.NotabeneHandler(**kw)
h.pending_notifications = range(2)
with mock.patch.object(h, '_format_notification') as fn:
fn.return_value = {'event_type': 'event1'}
with mock.patch.object(pipeline_handler.driver,
'send_notification') as sn:
sn.side_effect = TestException
with mock.patch.object(pipeline_handler.logger,
'exception') as ex:
h.commit()
self.assertEquals(ex.call_count, 2)
self.assertEquals(sn.call_count, 2)

View File

@ -285,9 +285,11 @@ class TestUsageHandler(unittest.TestCase):
env = {'stream_id': 123}
raw = [{'event_type': 'foo'}]
events = self.handler.handle_events(raw, env)
self.assertEquals(2, len(events))
self.assertEquals(1, len(events))
notifications = env['usage_notifications']
self.assertEquals(1, len(notifications))
self.assertEquals("compute.instance.exists.failed",
events[1]['event_type'])
notifications[0]['event_type'])
@mock.patch.object(pipeline_handler.UsageHandler, '_process_block')
def test_handle_events_exists(self, pb):
@ -306,7 +308,9 @@ class TestUsageHandler(unittest.TestCase):
{'event_type': 'foo'},
]
events = self.handler.handle_events(raw, env)
self.assertEquals(4, len(events))
self.assertEquals(3, len(events))
notifications = env['usage_notifications']
self.assertEquals(1, len(notifications))
self.assertEquals("compute.instance.exists.failed",
events[3]['event_type'])
notifications[0]['event_type'])
self.assertTrue(pb.called)

View File

@ -4,6 +4,8 @@ import logging
import six
import uuid
from notabene import kombu_driver as driver
logger = logging.getLogger(__name__)
@ -92,6 +94,136 @@ class LoggingHandler(PipelineHandlerBase):
pass
class NotabeneException(Exception):
pass
class ConnectionManager(object):
def __init__(self):
# {connection_properties:
# {exchange_properties: (connection, exchange)}}
self.pool = {}
def _extract_params(self, kw):
host = kw.get('host', 'localhost')
user = kw.get('user', 'guest')
password = kw.get('password', 'guest')
port = kw.get('port', 5672)
vhost = kw.get('vhost', '/')
library = kw.get('library', 'librabbitmq')
exchange_name = kw.get('exchange')
exchange_type = kw.get('exchange_type', 'topic')
if exchange_name is None:
raise NotabeneException("No 'exchange' name provided")
connection_dict = {'host': host, 'port': port,
'user': user, 'password': password,
'library': library, 'vhost': vhost}
connection_tuple = tuple(sorted(connection_dict.items()))
exchange_dict = {'exchange_name': exchange_name,
'exchange_type': exchange_type}
exchange_tuple = tuple(sorted(exchange_dict.items()))
return (connection_dict, connection_tuple,
exchange_dict, exchange_tuple)
def get_connection(self, properties, queue_name):
connection_dict, connection_tuple, \
exchange_dict, exchange_tuple = self._extract_params(properties)
connection_info = self.pool.get(connection_tuple)
if connection_info is None:
connection = driver.create_connection(connection_dict['host'],
connection_dict['port'],
connection_dict['user'],
connection_dict['password'],
connection_dict['library'],
connection_dict['vhost'])
connection_info = (connection, {})
self.pool[connection_tuple] = connection_info
connection, exchange_pool = connection_info
exchange = exchange_pool.get(exchange_tuple)
if exchange is None:
exchange = driver.create_exchange(exchange_dict['exchange_name'],
exchange_dict['exchange_type'])
exchange_pool[exchange_tuple] = exchange
# Make sure the queue exists so we don't lose events.
queue = driver.create_queue(queue_name, exchange, queue_name,
channel=connection.channel())
queue.declare()
return (connection, exchange)
# Global ConnectionManager. Shared by all Handlers.
connection_manager = ConnectionManager()
class NotabeneHandler(PipelineHandlerBase):
# Handlers are created per stream, so we have to be smart about
# things like connections to databases and queues.
# We don't want to create too many connections, and we have to
# remember that stream processing has to occur quickly, so
# we want to avoid round-trips where possible.
def __init__(self, **kw):
super(NotabeneHandler, self).__init__(**kw)
global connection_manager
self.queue_name = kw.get('queue_name')
if self.queue_name is None:
raise NotabeneException("No 'queue_name' provided")
self.connection, self.exchange = connection_manager.get_connection(
kw, self.queue_name)
self.env_keys = kw.get('env_keys', [])
def handle_events(self, events, env):
keys = [key for key in self.env_keys]
self.pending_notifications = []
for key in keys:
self.pending_notifications.extend(env.get(key, []))
return events
def _format_notification(self, notification):
"""Core traits are in the root of the notification and extra
traits go in the payload."""
core_keys = ['event_type', 'message_id', 'timestamp', 'service']
core = dict((key, notification.get(key)) for key in core_keys)
payload = dict((key, notification[key])
for key in notification.keys()
if key not in core_keys)
core['payload'] = payload
# Notifications require "publisher_id", not "service" ...
publisher = core.get('service')
if not publisher:
publisher = "stv3"
core['publisher_id'] = publisher
del core['service']
core['timestamp'] = str(core['timestamp'])
return core
def commit(self):
for notification in self.pending_notifications:
notification = self._format_notification(notification)
logger.debug("Publishing '%s' to '%s' with routing_key '%s'" %
(notification['event_type'], self.exchange,
self.queue_name))
try:
driver.send_notification(notification, self.queue_name,
self.connection, self.exchange)
except Exception as e:
logger.exception(e)
def rollback(self):
pass
class UsageException(Exception):
def __init__(self, code, message):
super(UsageException, self).__init__(message)
@ -309,7 +441,7 @@ class UsageHandler(PipelineHandlerBase):
}
new_events.append(new_event)
events.extend(new_events)
env['usage_notifications'] = new_events
return events
def commit(self):

View File

@ -55,7 +55,7 @@ class Pipeline(object):
handler = handler_class(**params)
except Exception as e:
logger.exception("Error initalizing handler %s for pipeline %s" %
handler_class, self.name)
(handler_class, self.name))
raise PipelineExecutionError("Error loading pipeline", e)
self.handlers.append(handler)
@ -186,8 +186,8 @@ class PipelineManager(object):
try:
plugins[name] = simport.load(cls_string)
except simport.ImportFailed as e:
log.error("Could not load plugin %s: Import failed. %s" % (
name, e))
logger.error("Could not load plugin %s: Import failed. %s" % (
name, e))
except (simport.MissingMethodOrFunction,
simport.MissingModule,
simport.BadDirectory) as e: