diff --git a/storyboard/db/api/timeline_events.py b/storyboard/db/api/timeline_events.py index 9ee4713e..ab177ec2 100644 --- a/storyboard/db/api/timeline_events.py +++ b/storyboard/db/api/timeline_events.py @@ -26,7 +26,7 @@ from storyboard.db.api import base as api_base from storyboard.db.api import stories as stories_api from storyboard.db.api import tasks as tasks_api from storyboard.db import models -from storyboard.notifications.publisher import publish +from storyboard.notifications import publisher CONF = cfg.CONF @@ -112,15 +112,15 @@ def event_create(values): event_dict = tojson(TimeLineEvent, TimeLineEvent.from_db_model(new_event)) - publish(author_id=request.current_user_id or None, - method="POST", - url=request.headers.get('Referer') or None, - path=request.path or None, - query_string=request.query_string or None, - status=response.status_code or None, - resource="timeline_event", - resource_id=new_event.id or None, - resource_after=event_dict or None) + publisher.publish(author_id=request.current_user_id or None, + method="POST", + url=request.headers.get('Referer') or None, + path=request.path or None, + query_string=request.query_string or None, + status=response.status_code or None, + resource="timeline_event", + resource_id=new_event.id or None, + resource_after=event_dict or None) return new_event diff --git a/storyboard/notifications/conf.py b/storyboard/notifications/conf.py index 325be8e1..e6d96591 100644 --- a/storyboard/notifications/conf.py +++ b/storyboard/notifications/conf.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# Copyright (c) 2018 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,31 +17,7 @@ from oslo_config import cfg CONF = cfg.CONF -NOTIFICATION_OPTS = [ - cfg.StrOpt("rabbit_exchange_name", default="storyboard", - help="The name of the topic exchange which storyboard will " - "use to broadcast its events."), - cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events", - help="The name of the queue that will be created for " - "API events."), - cfg.StrOpt("rabbit_application_name", default="storyboard", - help="The rabbit application identifier for storyboard's " - "connection."), - cfg.StrOpt("rabbit_host", default="localhost", - help="Host of the rabbitmq server."), - cfg.StrOpt("rabbit_login_method", default="AMQPLAIN", - help="The RabbitMQ login method."), - cfg.StrOpt("rabbit_userid", default="storyboard", - help="The RabbitMQ userid."), - cfg.StrOpt("rabbit_password", default="storyboard", - help="The RabbitMQ password."), - cfg.IntOpt("rabbit_port", default=5672, - help="The RabbitMQ broker port where a single node is used."), - cfg.StrOpt("rabbit_virtual_host", default="/", - help="The virtual host within which our queues and exchanges " - "live."), - cfg.IntOpt("rabbit_connection_attempts", default=6, - help="The number of connection attempts before giving-up"), - cfg.IntOpt("rabbit_retry_delay", default=10, - help="The interval between connection attempts (in seconds)") +OPTS = [ + cfg.StrOpt('driver', choices=['pika'], + help='The notification driver to use', default='pika') ] diff --git a/storyboard/notifications/notification_hook.py b/storyboard/notifications/notification_hook.py index aab541c6..11018ad2 100644 --- a/storyboard/notifications/notification_hook.py +++ b/storyboard/notifications/notification_hook.py @@ -23,7 +23,7 @@ from storyboard.api.v1 import wmodels import storyboard.common.hook_priorities as priority from storyboard.db.api import base as api_base from storyboard.db import models -from storyboard.notifications.publisher import publish +from storyboard.notifications import publisher class_mappings = {'task': [models.Task, wmodels.Task], @@ -110,18 +110,18 @@ class NotificationHook(hooks.PecanHook): # Build the payload. Use of None is included to ensure that we don't # accidentally blow up the API call, but we don't anticipate it # happening. - publish(author_id=request.current_user_id, - method=request.method, - url=request.headers.get('Referer'), - path=request.path, - query_string=request.query_string, - status=response.status_code, - resource=resource, - resource_id=resource_id, - sub_resource=subresource, - sub_resource_id=subresource_id, - resource_before=old_resource, - resource_after=new_resource) + publisher.publish(author_id=request.current_user_id, + method=request.method, + url=request.headers.get('Referer'), + path=request.path, + query_string=request.query_string, + status=response.status_code, + resource=resource, + resource_id=resource_id, + sub_resource=subresource, + sub_resource_id=subresource_id, + resource_before=old_resource, + resource_after=new_resource) def get_original_resource(self, resource, resource_id): """Given a resource name and ID, will load that resource and map it diff --git a/storyboard/notifications/pika/__init__.py b/storyboard/notifications/pika/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/storyboard/notifications/pika/conf.py b/storyboard/notifications/pika/conf.py new file mode 100644 index 00000000..f5c323d5 --- /dev/null +++ b/storyboard/notifications/pika/conf.py @@ -0,0 +1,56 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg + +CONF = cfg.CONF + +NOTIFICATION_OPTS = [ + cfg.StrOpt("rabbit_exchange_name", default="storyboard", + help="The name of the topic exchange which storyboard will " + "use to broadcast its events.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events", + help="The name of the queue that will be created for " + "API events.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_application_name", default="storyboard", + help="The rabbit application identifier for storyboard's " + "connection.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_host", default="localhost", + help="Host of the rabbitmq server.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_login_method", default="AMQPLAIN", + help="The RabbitMQ login method.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_userid", default="storyboard", + help="The RabbitMQ userid.", + deprecated_group='notifications'), + cfg.StrOpt("rabbit_password", default="storyboard", + help="The RabbitMQ password.", + deprecated_group='notifications'), + cfg.IntOpt("rabbit_port", default=5672, + help="The RabbitMQ broker port where a single node is used."), + cfg.StrOpt("rabbit_virtual_host", default="/", + help="The virtual host within which our queues and exchanges " + "live.", deprecated_group='notifications'), + cfg.IntOpt("rabbit_connection_attempts", default=6, + help="The number of connection attempts before giving-up", + deprecated_group='notifications'), + cfg.IntOpt("rabbit_retry_delay", default=10, + help="The interval between connection attempts (in seconds)", + deprecated_group='notifications') +] diff --git a/storyboard/notifications/connection_service.py b/storyboard/notifications/pika/connection_service.py similarity index 100% rename from storyboard/notifications/connection_service.py rename to storyboard/notifications/pika/connection_service.py diff --git a/storyboard/notifications/pika/publisher.py b/storyboard/notifications/pika/publisher.py new file mode 100644 index 00000000..11b882fd --- /dev/null +++ b/storyboard/notifications/pika/publisher.py @@ -0,0 +1,181 @@ +# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from oslo_config import cfg +from oslo_log import log +from pika.exceptions import ConnectionClosed + +from storyboard.notifications.pika.conf import NOTIFICATION_OPTS +from storyboard.notifications.pika.connection_service import ConnectionService +from storyboard._i18n import _, _LW, _LE + + +CONF = cfg.CONF +LOG = log.getLogger(__name__) +PUBLISHER = None + + +class Publisher(ConnectionService): + """A generic message publisher that uses delivery confirmation to ensure + that messages are delivered, and will keep a running cache of unsent + messages while the publisher is attempting to reconnect. + """ + + def __init__(self, conf): + """Setup the publisher instance based on our configuration. + + :param conf A configuration object. + """ + super(Publisher, self).__init__(conf) + + self._pending = list() + + self.add_open_hook(self._publish_pending) + + def _publish_pending(self): + """Publishes any pending messages that were broadcast while the + publisher was connecting. + """ + + # Shallow copy, so we can iterate over it without having it be modified + # out of band. + pending = list(self._pending) + + for payload in pending: + self._publish(payload) + + def _publish(self, payload): + """Publishes a payload to the passed exchange. If it encounters a + failure, will store the payload for later. + + :param Payload payload: The payload to send. + """ + LOG.debug(_("Sending message to %(name)s [%(topic)s]") % + {'name': self._exchange_name, 'topic': payload.topic}) + + # First check, are we closing? + if self._closing: + LOG.warning(_LW("Cannot send message, publisher is closing.")) + if payload not in self._pending: + self._pending.append(payload) + return + + # Second check, are we open? + if not self._open: + LOG.debug(_("Cannot send message, publisher is connecting.")) + if payload not in self._pending: + self._pending.append(payload) + self._reconnect() + return + + # Third check, are we in a sane state? This should never happen, + # but just in case... + if not self._connection or not self._channel: + LOG.error(_LE("Cannot send message, publisher is " + "an unexpected state.")) + if payload not in self._pending: + self._pending.append(payload) + self._reconnect() + return + + # Try to send a message. If we fail, schedule a reconnect and store + # the message. + try: + self._channel.basic_publish(self._exchange_name, + payload.topic, + json.dumps(payload.payload, + ensure_ascii=False), + self._properties) + if payload in self._pending: + self._pending.remove(payload) + return True + except (ConnectionClosed, AttributeError) as cc: + LOG.warning(_LW("Attempted to send message on closed connection.")) + LOG.debug(cc) + self._open = False + if payload not in self._pending: + self._pending.append(payload) + self._reconnect() + return False + + def publish_message(self, topic, payload): + """Publishes a message to RabbitMQ. + """ + self._publish(Payload(topic, payload)) + + +class Payload(object): + def __init__(self, topic, payload): + """Setup the example publisher object, passing in the URL we will use + to connect to RabbitMQ. + + :param topic string The exchange topic to broadcast on. + :param payload string The message payload to send. + """ + + self.topic = topic + self.payload = payload + + +def publish(resource, author_id=None, method=None, url=None, path=None, + query_string=None, status=None, resource_id=None, + sub_resource=None, sub_resource_id=None, resource_before=None, + resource_after=None): + """Send a message for an API event to the storyboard exchange. + + The message will be automatically JSON encoded. + + :param resource: The extrapolated resource type (project, story, etc). + :param author_id: The ID of the author who performed this action. + :param method: The HTTP Method used. + :param url: The Referer header from the request. + :param path: The HTTP Path used. + :param query_string: The HTTP query string used. + :param status: The HTTP Status code of the response. + :param resource_id: The ID of the resource. + :param sub_resource: The extracted subresource (user_token, etc) + :param sub_resource_id: THe ID of the subresource. + :param resource_before: The resource state before this event occurred. + :param resource_after: The resource state after this event occurred. + """ + global PUBLISHER + + if not PUBLISHER: + CONF.register_opts(NOTIFICATION_OPTS, "pika-notifications") + PUBLISHER = Publisher(CONF.pika_notifications) + PUBLISHER.start() + + payload = { + "author_id": author_id, + "method": method, + "url": url, + "path": path, + "query_string": query_string, + "status": status, + "resource": resource, + "resource_id": resource_id, + "sub_resource": sub_resource, + "sub_resource_id": sub_resource_id, + "resource_before": resource_before, + "resource_after": resource_after + } + + if resource: + PUBLISHER.publish_message(resource, payload) + else: + LOG.warning("Attempted to send payload with no destination " + "resource.") diff --git a/storyboard/notifications/subscriber.py b/storyboard/notifications/pika/subscriber.py similarity index 97% rename from storyboard/notifications/subscriber.py rename to storyboard/notifications/pika/subscriber.py index 236cf6d6..73b7dcd4 100644 --- a/storyboard/notifications/subscriber.py +++ b/storyboard/notifications/pika/subscriber.py @@ -21,8 +21,8 @@ from oslo_log import log from pika.exceptions import ConnectionClosed from stevedore import enabled -from storyboard.notifications.conf import NOTIFICATION_OPTS -from storyboard.notifications.connection_service import ConnectionService +from storyboard.notifications.pika.conf import NOTIFICATION_OPTS +from storyboard.notifications.pika.connection_service import ConnectionService from storyboard._i18n import _, _LW diff --git a/storyboard/notifications/publisher.py b/storyboard/notifications/publisher.py index 3425497c..23539dcc 100644 --- a/storyboard/notifications/publisher.py +++ b/storyboard/notifications/publisher.py @@ -1,4 +1,4 @@ -# Copyright (c) 2014 Hewlett-Packard Development Company, L.P. +# Copyright (c) 2018 IBM Corp. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -13,167 +13,26 @@ # See the License for the specific language governing permissions and # limitations under the License. -import json - from oslo_config import cfg -from oslo_log import log -from pika.exceptions import ConnectionClosed - -from storyboard.notifications.conf import NOTIFICATION_OPTS -from storyboard.notifications.connection_service import ConnectionService -from storyboard._i18n import _, _LW, _LE +from oslo_utils import importutils +from storyboard.notifications import conf CONF = cfg.CONF -LOG = log.getLogger(__name__) -PUBLISHER = None - - -class Publisher(ConnectionService): - """A generic message publisher that uses delivery confirmation to ensure - that messages are delivered, and will keep a running cache of unsent - messages while the publisher is attempting to reconnect. - """ - - def __init__(self, conf): - """Setup the publisher instance based on our configuration. - - :param conf A configuration object. - """ - super(Publisher, self).__init__(conf) - - self._pending = list() - - self.add_open_hook(self._publish_pending) - - def _publish_pending(self): - """Publishes any pending messages that were broadcast while the - publisher was connecting. - """ - - # Shallow copy, so we can iterate over it without having it be modified - # out of band. - pending = list(self._pending) - - for payload in pending: - self._publish(payload) - - def _publish(self, payload): - """Publishes a payload to the passed exchange. If it encounters a - failure, will store the payload for later. - - :param Payload payload: The payload to send. - """ - LOG.debug(_("Sending message to %(name)s [%(topic)s]") % - {'name': self._exchange_name, 'topic': payload.topic}) - - # First check, are we closing? - if self._closing: - LOG.warning(_LW("Cannot send message, publisher is closing.")) - if payload not in self._pending: - self._pending.append(payload) - return - - # Second check, are we open? - if not self._open: - LOG.debug(_("Cannot send message, publisher is connecting.")) - if payload not in self._pending: - self._pending.append(payload) - self._reconnect() - return - - # Third check, are we in a sane state? This should never happen, - # but just in case... - if not self._connection or not self._channel: - LOG.error(_LE("Cannot send message, publisher is " - "an unexpected state.")) - if payload not in self._pending: - self._pending.append(payload) - self._reconnect() - return - - # Try to send a message. If we fail, schedule a reconnect and store - # the message. - try: - self._channel.basic_publish(self._exchange_name, - payload.topic, - json.dumps(payload.payload, - ensure_ascii=False), - self._properties) - if payload in self._pending: - self._pending.remove(payload) - return True - except (ConnectionClosed, AttributeError) as cc: - LOG.warning(_LW("Attempted to send message on closed connection.")) - LOG.debug(cc) - self._open = False - if payload not in self._pending: - self._pending.append(payload) - self._reconnect() - return False - - def publish_message(self, topic, payload): - """Publishes a message to RabbitMQ. - """ - self._publish(Payload(topic, payload)) - - -class Payload(object): - def __init__(self, topic, payload): - """Setup the example publisher object, passing in the URL we will use - to connect to RabbitMQ. - - :param topic string The exchange topic to broadcast on. - :param payload string The message payload to send. - """ - - self.topic = topic - self.payload = payload +CONF.register_opts(conf.OPTS, 'notifications') def publish(resource, author_id=None, method=None, url=None, path=None, query_string=None, status=None, resource_id=None, sub_resource=None, sub_resource_id=None, resource_before=None, resource_after=None): - """Send a message for an API event to the storyboard exchange. The message - will be automatically JSON encoded. - :param resource: The extrapolated resource type (project, story, etc). - :param author_id: The ID of the author who performed this action. - :param method: The HTTP Method used. - :param url: The Referer header from the request. - :param path: The HTTP Path used. - :param query_string: The HTTP query string used. - :param status: The HTTP Status code of the response. - :param resource_id: The ID of the resource. - :param sub_resource: The extracted subresource (user_token, etc) - :param sub_resource_id: THe ID of the subresource. - :param resource_before: The resource state before this event occurred. - :param resource_after: The resource state after this event occurred. - """ - global PUBLISHER - - if not PUBLISHER: - CONF.register_opts(NOTIFICATION_OPTS, "notifications") - PUBLISHER = Publisher(CONF.notifications) - PUBLISHER.start() - - payload = { - "author_id": author_id, - "method": method, - "url": url, - "path": path, - "query_string": query_string, - "status": status, - "resource": resource, - "resource_id": resource_id, - "sub_resource": sub_resource, - "sub_resource_id": sub_resource_id, - "resource_before": resource_before, - "resource_after": resource_after - } - - if resource: - PUBLISHER.publish_message(resource, payload) - else: - LOG.warning("Attempted to send payload with no destination resource.") + publisher_module = importutils.import_module( + 'storyboard.notifications.' + CONF.notifications.driver + '.publisher') + publisher_module.publish(resource, author_id=author_id, method=method, + url=url, path=path, query_string=query_string, + status=status, resource_id=resource_id, + sub_resource=sub_resource, + sub_resource_id=sub_resource_id, + resource_before=resource_before, + resource_after=resource_after) diff --git a/storyboard/plugin/event_worker.py b/storyboard/plugin/event_worker.py index 93572d2e..f71e00e8 100644 --- a/storyboard/plugin/event_worker.py +++ b/storyboard/plugin/event_worker.py @@ -22,7 +22,7 @@ from oslo_log import log import storyboard.db.api.base as db_api from storyboard.notifications.notification_hook import class_mappings -from storyboard.notifications.subscriber import subscribe +from storyboard.notifications.pika.subscriber import subscribe from storyboard._i18n import _LI, _LW from storyboard.plugin.base import PluginBase diff --git a/storyboard/tests/notifications/test_notification_hook.py b/storyboard/tests/notifications/test_notification_hook.py index 29314b62..6148a90b 100644 --- a/storyboard/tests/notifications/test_notification_hook.py +++ b/storyboard/tests/notifications/test_notification_hook.py @@ -221,7 +221,7 @@ class TestNotificationHook(base.BaseDbTestCase): self.assertEqual(mock_state.old_entity_values['priority'], sample_task_wmodel.priority) - @patch('storyboard.notifications.notification_hook.publish') + @patch('storyboard.notifications.notification_hook.publisher') @patch.object(NotificationHook, 'get_original_resource') def test_after_publishes_payload(self, mock_get_original_resource, mock_publish): @@ -261,7 +261,7 @@ class TestNotificationHook(base.BaseDbTestCase): mock_get_original_resource.return_value = smt_json n.after(mock_state) - mock_publish.assert_called_with( + mock_publish.publish.assert_called_with( author_id=mock_state.request.current_user_id, method=mock_state.request.method, url=mock_state.request.headers['Referer'],