Make notification driver configurable

This commit makes the notification driver configurable. Currently there
is only one notification backend available, rabbit/pika, but to support
emitting notifications to firehose.openstack.org in the future a MQTT
backend will be added. This commit lays to groundwork for doing this.
Note this only effects the notification publisher, the subscriber calls
will be updated in the future. (there is only one use of the subscriber
currently so that is updated manually to directly use the new path)

The external api for registering a pecan notification hook remains the
same, but the underlying publish call being made is set by the
configuration option. This enables a different publish() method to be
called depending on configuration.

To facilitate this change the current pika notification options are
deprecated in the 'notifications' group and are instead added to the
'pika-notifications' configuration group. This will be removed in the
future. A new 'driver' option is added to the ntofications group to
specify which driver to use which defaults to the current pika driver.

Change-Id: I5235ae4368f8bff13c767b87ced83173d52b5155
This commit is contained in:
Matthew Treinish 2018-01-28 00:44:35 -05:00
parent f45708e194
commit c7e0e458ff
No known key found for this signature in database
GPG Key ID: FD12A0F214C9E177
11 changed files with 282 additions and 210 deletions

View File

@ -26,7 +26,7 @@ from storyboard.db.api import base as api_base
from storyboard.db.api import stories as stories_api
from storyboard.db.api import tasks as tasks_api
from storyboard.db import models
from storyboard.notifications.publisher import publish
from storyboard.notifications import publisher
CONF = cfg.CONF
@ -112,15 +112,15 @@ def event_create(values):
event_dict = tojson(TimeLineEvent,
TimeLineEvent.from_db_model(new_event))
publish(author_id=request.current_user_id or None,
method="POST",
url=request.headers.get('Referer') or None,
path=request.path or None,
query_string=request.query_string or None,
status=response.status_code or None,
resource="timeline_event",
resource_id=new_event.id or None,
resource_after=event_dict or None)
publisher.publish(author_id=request.current_user_id or None,
method="POST",
url=request.headers.get('Referer') or None,
path=request.path or None,
query_string=request.query_string or None,
status=response.status_code or None,
resource="timeline_event",
resource_id=new_event.id or None,
resource_after=event_dict or None)
return new_event

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -17,31 +17,7 @@ from oslo_config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events."),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events."),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection."),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server."),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method."),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid."),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password."),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live."),
cfg.IntOpt("rabbit_connection_attempts", default=6,
help="The number of connection attempts before giving-up"),
cfg.IntOpt("rabbit_retry_delay", default=10,
help="The interval between connection attempts (in seconds)")
OPTS = [
cfg.StrOpt('driver', choices=['pika'],
help='The notification driver to use', default='pika')
]

View File

@ -23,7 +23,7 @@ from storyboard.api.v1 import wmodels
import storyboard.common.hook_priorities as priority
from storyboard.db.api import base as api_base
from storyboard.db import models
from storyboard.notifications.publisher import publish
from storyboard.notifications import publisher
class_mappings = {'task': [models.Task, wmodels.Task],
@ -110,18 +110,18 @@ class NotificationHook(hooks.PecanHook):
# Build the payload. Use of None is included to ensure that we don't
# accidentally blow up the API call, but we don't anticipate it
# happening.
publish(author_id=request.current_user_id,
method=request.method,
url=request.headers.get('Referer'),
path=request.path,
query_string=request.query_string,
status=response.status_code,
resource=resource,
resource_id=resource_id,
sub_resource=subresource,
sub_resource_id=subresource_id,
resource_before=old_resource,
resource_after=new_resource)
publisher.publish(author_id=request.current_user_id,
method=request.method,
url=request.headers.get('Referer'),
path=request.path,
query_string=request.query_string,
status=response.status_code,
resource=resource,
resource_id=resource_id,
sub_resource=subresource,
sub_resource_id=subresource_id,
resource_before=old_resource,
resource_after=new_resource)
def get_original_resource(self, resource, resource_id):
"""Given a resource name and ID, will load that resource and map it

View File

@ -0,0 +1,56 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
CONF = cfg.CONF
NOTIFICATION_OPTS = [
cfg.StrOpt("rabbit_exchange_name", default="storyboard",
help="The name of the topic exchange which storyboard will "
"use to broadcast its events.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_event_queue_name", default="storyboard_events",
help="The name of the queue that will be created for "
"API events.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_application_name", default="storyboard",
help="The rabbit application identifier for storyboard's "
"connection.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_host", default="localhost",
help="Host of the rabbitmq server.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_login_method", default="AMQPLAIN",
help="The RabbitMQ login method.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_userid", default="storyboard",
help="The RabbitMQ userid.",
deprecated_group='notifications'),
cfg.StrOpt("rabbit_password", default="storyboard",
help="The RabbitMQ password.",
deprecated_group='notifications'),
cfg.IntOpt("rabbit_port", default=5672,
help="The RabbitMQ broker port where a single node is used."),
cfg.StrOpt("rabbit_virtual_host", default="/",
help="The virtual host within which our queues and exchanges "
"live.", deprecated_group='notifications'),
cfg.IntOpt("rabbit_connection_attempts", default=6,
help="The number of connection attempts before giving-up",
deprecated_group='notifications'),
cfg.IntOpt("rabbit_retry_delay", default=10,
help="The interval between connection attempts (in seconds)",
deprecated_group='notifications')
]

View File

@ -0,0 +1,181 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from oslo_log import log
from pika.exceptions import ConnectionClosed
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
from storyboard.notifications.pika.connection_service import ConnectionService
from storyboard._i18n import _, _LW, _LE
CONF = cfg.CONF
LOG = log.getLogger(__name__)
PUBLISHER = None
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
{'name': self._exchange_name, 'topic': payload.topic})
# First check, are we closing?
if self._closing:
LOG.warning(_LW("Cannot send message, publisher is closing."))
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug(_("Cannot send message, publisher is connecting."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error(_LE("Cannot send message, publisher is "
"an unexpected state."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except (ConnectionClosed, AttributeError) as cc:
LOG.warning(_LW("Attempted to send message on closed connection."))
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
self.topic = topic
self.payload = payload
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
"""Send a message for an API event to the storyboard exchange.
The message will be automatically JSON encoded.
:param resource: The extrapolated resource type (project, story, etc).
:param author_id: The ID of the author who performed this action.
:param method: The HTTP Method used.
:param url: The Referer header from the request.
:param path: The HTTP Path used.
:param query_string: The HTTP query string used.
:param status: The HTTP Status code of the response.
:param resource_id: The ID of the resource.
:param sub_resource: The extracted subresource (user_token, etc)
:param sub_resource_id: THe ID of the subresource.
:param resource_before: The resource state before this event occurred.
:param resource_after: The resource state after this event occurred.
"""
global PUBLISHER
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "pika-notifications")
PUBLISHER = Publisher(CONF.pika_notifications)
PUBLISHER.start()
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
if resource:
PUBLISHER.publish_message(resource, payload)
else:
LOG.warning("Attempted to send payload with no destination "
"resource.")

View File

@ -21,8 +21,8 @@ from oslo_log import log
from pika.exceptions import ConnectionClosed
from stevedore import enabled
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard.notifications.pika.conf import NOTIFICATION_OPTS
from storyboard.notifications.pika.connection_service import ConnectionService
from storyboard._i18n import _, _LW

View File

@ -1,4 +1,4 @@
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
# Copyright (c) 2018 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -13,167 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import json
from oslo_config import cfg
from oslo_log import log
from pika.exceptions import ConnectionClosed
from storyboard.notifications.conf import NOTIFICATION_OPTS
from storyboard.notifications.connection_service import ConnectionService
from storyboard._i18n import _, _LW, _LE
from oslo_utils import importutils
from storyboard.notifications import conf
CONF = cfg.CONF
LOG = log.getLogger(__name__)
PUBLISHER = None
class Publisher(ConnectionService):
"""A generic message publisher that uses delivery confirmation to ensure
that messages are delivered, and will keep a running cache of unsent
messages while the publisher is attempting to reconnect.
"""
def __init__(self, conf):
"""Setup the publisher instance based on our configuration.
:param conf A configuration object.
"""
super(Publisher, self).__init__(conf)
self._pending = list()
self.add_open_hook(self._publish_pending)
def _publish_pending(self):
"""Publishes any pending messages that were broadcast while the
publisher was connecting.
"""
# Shallow copy, so we can iterate over it without having it be modified
# out of band.
pending = list(self._pending)
for payload in pending:
self._publish(payload)
def _publish(self, payload):
"""Publishes a payload to the passed exchange. If it encounters a
failure, will store the payload for later.
:param Payload payload: The payload to send.
"""
LOG.debug(_("Sending message to %(name)s [%(topic)s]") %
{'name': self._exchange_name, 'topic': payload.topic})
# First check, are we closing?
if self._closing:
LOG.warning(_LW("Cannot send message, publisher is closing."))
if payload not in self._pending:
self._pending.append(payload)
return
# Second check, are we open?
if not self._open:
LOG.debug(_("Cannot send message, publisher is connecting."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Third check, are we in a sane state? This should never happen,
# but just in case...
if not self._connection or not self._channel:
LOG.error(_LE("Cannot send message, publisher is "
"an unexpected state."))
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return
# Try to send a message. If we fail, schedule a reconnect and store
# the message.
try:
self._channel.basic_publish(self._exchange_name,
payload.topic,
json.dumps(payload.payload,
ensure_ascii=False),
self._properties)
if payload in self._pending:
self._pending.remove(payload)
return True
except (ConnectionClosed, AttributeError) as cc:
LOG.warning(_LW("Attempted to send message on closed connection."))
LOG.debug(cc)
self._open = False
if payload not in self._pending:
self._pending.append(payload)
self._reconnect()
return False
def publish_message(self, topic, payload):
"""Publishes a message to RabbitMQ.
"""
self._publish(Payload(topic, payload))
class Payload(object):
def __init__(self, topic, payload):
"""Setup the example publisher object, passing in the URL we will use
to connect to RabbitMQ.
:param topic string The exchange topic to broadcast on.
:param payload string The message payload to send.
"""
self.topic = topic
self.payload = payload
CONF.register_opts(conf.OPTS, 'notifications')
def publish(resource, author_id=None, method=None, url=None, path=None,
query_string=None, status=None, resource_id=None,
sub_resource=None, sub_resource_id=None, resource_before=None,
resource_after=None):
"""Send a message for an API event to the storyboard exchange. The message
will be automatically JSON encoded.
:param resource: The extrapolated resource type (project, story, etc).
:param author_id: The ID of the author who performed this action.
:param method: The HTTP Method used.
:param url: The Referer header from the request.
:param path: The HTTP Path used.
:param query_string: The HTTP query string used.
:param status: The HTTP Status code of the response.
:param resource_id: The ID of the resource.
:param sub_resource: The extracted subresource (user_token, etc)
:param sub_resource_id: THe ID of the subresource.
:param resource_before: The resource state before this event occurred.
:param resource_after: The resource state after this event occurred.
"""
global PUBLISHER
if not PUBLISHER:
CONF.register_opts(NOTIFICATION_OPTS, "notifications")
PUBLISHER = Publisher(CONF.notifications)
PUBLISHER.start()
payload = {
"author_id": author_id,
"method": method,
"url": url,
"path": path,
"query_string": query_string,
"status": status,
"resource": resource,
"resource_id": resource_id,
"sub_resource": sub_resource,
"sub_resource_id": sub_resource_id,
"resource_before": resource_before,
"resource_after": resource_after
}
if resource:
PUBLISHER.publish_message(resource, payload)
else:
LOG.warning("Attempted to send payload with no destination resource.")
publisher_module = importutils.import_module(
'storyboard.notifications.' + CONF.notifications.driver + '.publisher')
publisher_module.publish(resource, author_id=author_id, method=method,
url=url, path=path, query_string=query_string,
status=status, resource_id=resource_id,
sub_resource=sub_resource,
sub_resource_id=sub_resource_id,
resource_before=resource_before,
resource_after=resource_after)

View File

@ -22,7 +22,7 @@ from oslo_log import log
import storyboard.db.api.base as db_api
from storyboard.notifications.notification_hook import class_mappings
from storyboard.notifications.subscriber import subscribe
from storyboard.notifications.pika.subscriber import subscribe
from storyboard._i18n import _LI, _LW
from storyboard.plugin.base import PluginBase

View File

@ -221,7 +221,7 @@ class TestNotificationHook(base.BaseDbTestCase):
self.assertEqual(mock_state.old_entity_values['priority'],
sample_task_wmodel.priority)
@patch('storyboard.notifications.notification_hook.publish')
@patch('storyboard.notifications.notification_hook.publisher')
@patch.object(NotificationHook, 'get_original_resource')
def test_after_publishes_payload(self, mock_get_original_resource,
mock_publish):
@ -261,7 +261,7 @@ class TestNotificationHook(base.BaseDbTestCase):
mock_get_original_resource.return_value = smt_json
n.after(mock_state)
mock_publish.assert_called_with(
mock_publish.publish.assert_called_with(
author_id=mock_state.request.current_user_id,
method=mock_state.request.method,
url=mock_state.request.headers['Referer'],