From ce91d56d51c95037b2e92e0f858720865497da39 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Mon, 2 Feb 2015 19:33:03 -0800 Subject: [PATCH] Notabene pipeline handler MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit For publishing new events into an exchange. PipelineHandlers can return new Events for subsequent processing. However, sometimes we need to publish Notifications. Remember that Notifications are less restrictive than Events. They can be larger. They can be nested. They can contain lists. We store these pending notifications in the handler env variable so downstream handlers can access them. The NotabeneHandler takes a configuration variable 'env_keys' which specifies the env keys to look for. Lists of notifications in these variables are published to the queue using the connection parameters supplied. Errors in transmission are logged and ignored since exceptions during the commit() phase do not flag the steam as in error. A sample pipeline definition might look like this: test_expire_pipeline: - logger - usage - name: notabene params:⋅ host: localhost user: guest password: guest port: 5672 vhost: / library: librabbitmq exchange: nova exchange_type: topic queue_name: monitor.info env_keys: - usage_notifications Change-Id: If1958135ad6fbed88e2c18b9fac7efde51ee3113 --- requirements.txt | 1 + tests/test_notabene.py | 249 +++++++++++++++++++++++++++++++++ tests/test_usage_handler.py | 12 +- winchester/pipeline_handler.py | 134 +++++++++++++++++- winchester/pipeline_manager.py | 6 +- 5 files changed, 394 insertions(+), 8 deletions(-) create mode 100644 tests/test_notabene.py diff --git a/requirements.txt b/requirements.txt index f59e5d1..25a62b1 100644 --- a/requirements.txt +++ b/requirements.txt @@ -8,3 +8,4 @@ enum34>=1.0 SQLAlchemy>=0.9.6 python-dateutil requests +notabene>=0.0.dev0 diff --git a/tests/test_notabene.py b/tests/test_notabene.py new file mode 100644 index 0000000..422f66f --- /dev/null +++ b/tests/test_notabene.py @@ -0,0 +1,249 @@ +import unittest2 as unittest + +import datetime +import mock + +from winchester import pipeline_handler + + +class TestConnectionManager(unittest.TestCase): + def setUp(self): + super(TestConnectionManager, self).setUp() + self.mgr = pipeline_handler.ConnectionManager() + + def test_extract_params(self): + with self.assertRaises(pipeline_handler.NotabeneException): + self.mgr._extract_params({}) + + cd, ct, ed, et = self.mgr._extract_params({'exchange': 'my_exchange'}) + + self.assertEquals(cd, {'host': 'localhost', + 'port': 5672, + 'user': 'guest', + 'password': 'guest', + 'library': 'librabbitmq', + 'vhost': '/'}) + + self.assertEquals(ct, (('host', 'localhost'), + ('library', 'librabbitmq'), + ('password', 'guest'), + ('port', 5672), + ('user', 'guest'), + ('vhost', '/'))) + + self.assertEquals(ed, {'exchange_name': 'my_exchange', + 'exchange_type': 'topic'}) + + self.assertEquals(et, (('exchange_name', 'my_exchange'), + ('exchange_type', 'topic'))) + + + kw = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd', + 'port': 123, 'vhost': 'virtual', 'library': 'my_lib', + 'exchange': 'my_exchange', 'exchange_type': 'foo'} + + cd, ct, ed, et = self.mgr._extract_params(kw) + + self.assertEquals(cd, {'host': 'my_host', + 'port': 123, + 'user': 'my_user', + 'password': 'pwd', + 'library': 'my_lib', + 'vhost': 'virtual'}) + + self.assertEquals(ct, (('host', 'my_host'), + ('library', 'my_lib'), + ('password', 'pwd'), + ('port', 123), + ('user', 'my_user'), + ('vhost', 'virtual'))) + + self.assertEquals(ed, {'exchange_name': 'my_exchange', + 'exchange_type': 'foo'}) + + self.assertEquals(et, (('exchange_name', 'my_exchange'), + ('exchange_type', 'foo'))) + + + @mock.patch.object(pipeline_handler.ConnectionManager, '_extract_params') + @mock.patch.object(pipeline_handler.driver, 'create_connection') + @mock.patch.object(pipeline_handler.driver, 'create_exchange') + @mock.patch.object(pipeline_handler.driver, 'create_queue') + def test_get_connection(self, cq, ce, cc, ep): + conn = {'host': 'my_host', 'user': 'my_user', 'password': 'pwd', + 'port': 123, 'vhost': 'virtual', 'library': 'my_lib'} + conn_set = tuple(sorted(conn.items())) + exchange = {'exchange_name': 'my_exchange', 'exchange_type': 'foo'} + exchange_set = tuple(sorted(exchange.items())) + + ep.return_value = (conn, conn_set, exchange, exchange_set) + connection = mock.MagicMock("the connection") + channel = mock.MagicMock("the channel") + connection.channel = channel + cc.return_value = connection + mexchange = mock.MagicMock("the exchange") + ce.return_value = mexchange + queue = mock.MagicMock("the queue") + queue.declare = mock.MagicMock() + cq.return_value = queue + + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection) + self.assertEquals(final_exchange, mexchange) + self.assertEquals(1, queue.declare.call_count) + + # Calling again should give the same results ... + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection) + self.assertEquals(final_exchange, mexchange) + self.assertTrue(queue.declare.called) + self.assertEquals(1, queue.declare.call_count) + + # Change the exchange, and we should have same connection, but new + # exchange object. + exchange2 = {'exchange_name': 'my_exchange2', 'exchange_type': 'foo2'} + exchange2_set = tuple(sorted(exchange2.items())) + + ep.return_value = (conn, conn_set, exchange2, exchange2_set) + mexchange2 = mock.MagicMock("the exchange 2") + ce.return_value = mexchange2 + + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection) + self.assertEquals(final_exchange, mexchange2) + self.assertEquals(2, queue.declare.call_count) + + # Change the connection, and we should have a new connection and new + # exchange object. + conn2 = {'host': 'my_host2', 'user': 'my_user2', 'password': 'pwd2', + 'port': 1234, 'vhost': 'virtual2', 'library': 'my_lib2'} + conn2_set = tuple(sorted(conn2.items())) + exchange3= {'exchange_name': 'my_exchange', 'exchange_type': 'foo'} + exchange3_set = tuple(sorted(exchange3.items())) + + ep.return_value = (conn2, conn2_set, exchange3, exchange3_set) + mexchange3 = mock.MagicMock("the exchange 3") + ce.return_value = mexchange3 + + connection2 = mock.MagicMock("the connection 2") + channel2 = mock.MagicMock("the channel 2") + connection2.channel = channel2 + cc.return_value = connection2 + + final_connection, final_exchange = self.mgr.get_connection({}, "foo") + + self.assertEquals(final_connection, connection2) + self.assertEquals(final_exchange, mexchange3) + self.assertEquals(3, queue.declare.call_count) + + +class TestException(Exception): + pass + + +class TestNotabeneHandler(unittest.TestCase): + + def test_constructor_no_queue(self): + with self.assertRaises(pipeline_handler.NotabeneException) as e: + pipeline_handler.NotabeneHandler() + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_constructor_queue(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + self.assertIsNotNone(h.connection) + self.assertIsNotNone(h.exchange) + self.assertEquals(h.env_keys, []) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_constructor_env_keys(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo', 'env_keys': ['x', 'y']} + h = pipeline_handler.NotabeneHandler(**kw) + self.assertIsNotNone(h.connection) + self.assertIsNotNone(h.exchange) + self.assertEquals(h.env_keys, ['x', 'y']) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_handle_events(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo', 'env_keys': ['x', 'y', 'z']} + h = pipeline_handler.NotabeneHandler(**kw) + events = range(5) + env = {'x': ['cat', 'dog'], 'y': ['fish']} + ret = h.handle_events(events, env) + self.assertEquals(ret, events) + self.assertEquals(h.pending_notifications, ['cat', 'dog', 'fish']) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_format_notification(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + notification = {} + n = h._format_notification(notification) + self.assertEquals(n, {'event_type': None, + 'message_id': None, + 'publisher_id': 'stv3', + 'timestamp': 'None', + 'payload': {}}) + + now = datetime.datetime.utcnow() + notification = {'event_type': 'name', + 'message_id': '1234', + 'timestamp': now, + 'service': 'tests'} + n = h._format_notification(notification) + self.assertEquals(n, {'event_type': 'name', + 'message_id': '1234', + 'timestamp': str(now), + 'publisher_id': 'tests', + 'payload': {}}) + + notification = {'event_type': 'name', + 'message_id': '1234', + 'timestamp': now, + 'service': 'tests', + 'extra1': 'e1', 'extra2': 'e2'} + n = h._format_notification(notification) + self.assertEquals(n, {'event_type': 'name', + 'message_id': '1234', + 'timestamp': str(now), + 'publisher_id': 'tests', + 'payload': {'extra1': 'e1', 'extra2': 'e2'}}) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_commit(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + + h.pending_notifications = range(2) + with mock.patch.object(h, '_format_notification') as fn: + fn.return_value = {'event_type': 'event1'} + with mock.patch.object(pipeline_handler.driver, + 'send_notification') as sn: + h.commit() + self.assertEquals(sn.call_count, 2) + + @mock.patch.object(pipeline_handler.connection_manager, 'get_connection') + def test_commit(self, cm): + cm.return_value = (1, 2) + kw = {'queue_name': 'foo'} + h = pipeline_handler.NotabeneHandler(**kw) + + h.pending_notifications = range(2) + with mock.patch.object(h, '_format_notification') as fn: + fn.return_value = {'event_type': 'event1'} + with mock.patch.object(pipeline_handler.driver, + 'send_notification') as sn: + sn.side_effect = TestException + with mock.patch.object(pipeline_handler.logger, + 'exception') as ex: + h.commit() + self.assertEquals(ex.call_count, 2) + self.assertEquals(sn.call_count, 2) diff --git a/tests/test_usage_handler.py b/tests/test_usage_handler.py index b3fc02d..823d5c9 100644 --- a/tests/test_usage_handler.py +++ b/tests/test_usage_handler.py @@ -285,9 +285,11 @@ class TestUsageHandler(unittest.TestCase): env = {'stream_id': 123} raw = [{'event_type': 'foo'}] events = self.handler.handle_events(raw, env) - self.assertEquals(2, len(events)) + self.assertEquals(1, len(events)) + notifications = env['usage_notifications'] + self.assertEquals(1, len(notifications)) self.assertEquals("compute.instance.exists.failed", - events[1]['event_type']) + notifications[0]['event_type']) @mock.patch.object(pipeline_handler.UsageHandler, '_process_block') def test_handle_events_exists(self, pb): @@ -306,7 +308,9 @@ class TestUsageHandler(unittest.TestCase): {'event_type': 'foo'}, ] events = self.handler.handle_events(raw, env) - self.assertEquals(4, len(events)) + self.assertEquals(3, len(events)) + notifications = env['usage_notifications'] + self.assertEquals(1, len(notifications)) self.assertEquals("compute.instance.exists.failed", - events[3]['event_type']) + notifications[0]['event_type']) self.assertTrue(pb.called) diff --git a/winchester/pipeline_handler.py b/winchester/pipeline_handler.py index da63951..53cb6e9 100644 --- a/winchester/pipeline_handler.py +++ b/winchester/pipeline_handler.py @@ -4,6 +4,8 @@ import logging import six import uuid +from notabene import kombu_driver as driver + logger = logging.getLogger(__name__) @@ -92,6 +94,136 @@ class LoggingHandler(PipelineHandlerBase): pass +class NotabeneException(Exception): + pass + + +class ConnectionManager(object): + def __init__(self): + # {connection_properties: + # {exchange_properties: (connection, exchange)}} + self.pool = {} + + def _extract_params(self, kw): + host = kw.get('host', 'localhost') + user = kw.get('user', 'guest') + password = kw.get('password', 'guest') + port = kw.get('port', 5672) + vhost = kw.get('vhost', '/') + library = kw.get('library', 'librabbitmq') + exchange_name = kw.get('exchange') + exchange_type = kw.get('exchange_type', 'topic') + + if exchange_name is None: + raise NotabeneException("No 'exchange' name provided") + + connection_dict = {'host': host, 'port': port, + 'user': user, 'password': password, + 'library': library, 'vhost': vhost} + connection_tuple = tuple(sorted(connection_dict.items())) + + exchange_dict = {'exchange_name': exchange_name, + 'exchange_type': exchange_type} + exchange_tuple = tuple(sorted(exchange_dict.items())) + + return (connection_dict, connection_tuple, + exchange_dict, exchange_tuple) + + def get_connection(self, properties, queue_name): + connection_dict, connection_tuple, \ + exchange_dict, exchange_tuple = self._extract_params(properties) + connection_info = self.pool.get(connection_tuple) + if connection_info is None: + connection = driver.create_connection(connection_dict['host'], + connection_dict['port'], + connection_dict['user'], + connection_dict['password'], + connection_dict['library'], + connection_dict['vhost']) + connection_info = (connection, {}) + self.pool[connection_tuple] = connection_info + connection, exchange_pool = connection_info + exchange = exchange_pool.get(exchange_tuple) + if exchange is None: + exchange = driver.create_exchange(exchange_dict['exchange_name'], + exchange_dict['exchange_type']) + exchange_pool[exchange_tuple] = exchange + + # Make sure the queue exists so we don't lose events. + queue = driver.create_queue(queue_name, exchange, queue_name, + channel=connection.channel()) + queue.declare() + + return (connection, exchange) + + +# Global ConnectionManager. Shared by all Handlers. +connection_manager = ConnectionManager() + + +class NotabeneHandler(PipelineHandlerBase): + # Handlers are created per stream, so we have to be smart about + # things like connections to databases and queues. + # We don't want to create too many connections, and we have to + # remember that stream processing has to occur quickly, so + # we want to avoid round-trips where possible. + def __init__(self, **kw): + super(NotabeneHandler, self).__init__(**kw) + global connection_manager + + self.queue_name = kw.get('queue_name') + if self.queue_name is None: + raise NotabeneException("No 'queue_name' provided") + self.connection, self.exchange = connection_manager.get_connection( + kw, self.queue_name) + + self.env_keys = kw.get('env_keys', []) + + def handle_events(self, events, env): + keys = [key for key in self.env_keys] + self.pending_notifications = [] + for key in keys: + self.pending_notifications.extend(env.get(key, [])) + return events + + def _format_notification(self, notification): + """Core traits are in the root of the notification and extra + traits go in the payload.""" + core_keys = ['event_type', 'message_id', 'timestamp', 'service'] + core = dict((key, notification.get(key)) for key in core_keys) + + payload = dict((key, notification[key]) + for key in notification.keys() + if key not in core_keys) + + core['payload'] = payload + + # Notifications require "publisher_id", not "service" ... + publisher = core.get('service') + if not publisher: + publisher = "stv3" + core['publisher_id'] = publisher + del core['service'] + + core['timestamp'] = str(core['timestamp']) + return core + + def commit(self): + for notification in self.pending_notifications: + notification = self._format_notification(notification) + logger.debug("Publishing '%s' to '%s' with routing_key '%s'" % + (notification['event_type'], self.exchange, + self.queue_name)) + try: + driver.send_notification(notification, self.queue_name, + self.connection, self.exchange) + except Exception as e: + logger.exception(e) + + def rollback(self): + pass + + class UsageException(Exception): def __init__(self, code, message): super(UsageException, self).__init__(message) @@ -309,7 +441,7 @@ class UsageHandler(PipelineHandlerBase): } new_events.append(new_event) - events.extend(new_events) + env['usage_notifications'] = new_events return events def commit(self): diff --git a/winchester/pipeline_manager.py b/winchester/pipeline_manager.py index 160a41b..e606596 100644 --- a/winchester/pipeline_manager.py +++ b/winchester/pipeline_manager.py @@ -55,7 +55,7 @@ class Pipeline(object): handler = handler_class(**params) except Exception as e: logger.exception("Error initalizing handler %s for pipeline %s" % - handler_class, self.name) + (handler_class, self.name)) raise PipelineExecutionError("Error loading pipeline", e) self.handlers.append(handler) @@ -186,8 +186,8 @@ class PipelineManager(object): try: plugins[name] = simport.load(cls_string) except simport.ImportFailed as e: - log.error("Could not load plugin %s: Import failed. %s" % ( - name, e)) + logger.error("Could not load plugin %s: Import failed. %s" % ( + name, e)) except (simport.MissingMethodOrFunction, simport.MissingModule, simport.BadDirectory) as e: