Decouple transport for RPC and Notification

Add a new configuration option for setting up
an alternate notification_transport_url that
can be used for notifications. This allows
operators to separate the transport mechanisms
used for RPC and Notifications.

DocImpact

Closes-Bug: #1504622
Change-Id: Ief6f95ea906bfd95b3218a930c9db5d8a764beb9
This commit is contained in:
Davanum Srinivas 2015-10-09 15:44:35 -07:00
parent 468437454f
commit 6621b9010e
7 changed files with 49 additions and 20 deletions

View File

@ -15,6 +15,7 @@
__all__ = ['Notifier',
'LoggingNotificationHandler',
'get_notification_transport',
'get_notification_listener',
'NotificationResult',
'NotificationFilter',

View File

@ -19,12 +19,13 @@ contain a set of methods. Each method corresponds to a notification priority.
To create a notification listener, you supply a transport, list of targets and
a list of endpoints.
A transport can be obtained simply by calling the get_transport() method::
A transport can be obtained simply by calling the get_notification_transport()
method::
transport = messaging.get_transport(conf)
transport = messaging.get_notification_transport(conf)
which will load the appropriate transport driver according to the user's
messaging configuration. See get_transport() for more details.
messaging configuration. See get_notification_transport() for more details.
The target supplied when creating a notification listener expresses the topic
and - optionally - the exchange to listen on. See Target for more details
@ -56,7 +57,7 @@ A simple example of a notification listener with multiple endpoints might be::
def error(self, ctxt, publisher_id, event_type, payload, metadata):
do_something(payload)
transport = oslo_messaging.get_transport(cfg.CONF)
transport = oslo_messaging.get_notification_transport(cfg.CONF)
targets = [
oslo_messaging.Target(topic='notifications')
oslo_messaging.Target(topic='notifications_bis')

View File

@ -21,7 +21,7 @@ class LoggingErrorNotificationHandler(logging.Handler):
# at runtime.
import oslo_messaging
logging.Handler.__init__(self, *args, **kwargs)
self._transport = oslo_messaging.get_transport(cfg.CONF)
self._transport = oslo_messaging.get_notification_transport(cfg.CONF)
self._notifier = oslo_messaging.Notifier(
self._transport,
publisher_id='error.publisher')

View File

@ -19,7 +19,6 @@ import logging
from oslo_config import cfg
from oslo_messaging.notify import notifier
from oslo_messaging import transport
class LoggingNotificationHandler(logging.Handler):
@ -47,7 +46,7 @@ class LoggingNotificationHandler(logging.Handler):
def __init__(self, url, publisher_id=None, driver=None,
topic=None, serializer=None):
self.notifier = notifier.Notifier(
transport.get_transport(self.CONF, url),
notifier.get_notification_transport(self.CONF, url),
publisher_id, driver,
topic,
serializer() if serializer else None)

View File

@ -59,7 +59,8 @@ class RequestNotifier(base.Middleware):
def __init__(self, app, **conf):
self.notifier = notify.Notifier(
oslo_messaging.get_transport(cfg.CONF, conf.get('url')),
oslo_messaging.get_notification_transport(cfg.CONF,
conf.get('url')),
publisher_id=conf.get('publisher_id',
os.path.basename(sys.argv[0])))
self.service_name = conf.get('service_name')

View File

@ -25,6 +25,7 @@ import six
from stevedore import named
from oslo_messaging import serializer as msg_serializer
from oslo_messaging import transport as msg_transport
_notifier_opts = [
cfg.MultiStrOpt('notification_driver',
@ -32,6 +33,10 @@ _notifier_opts = [
help='The Drivers(s) to handle sending notifications. '
'Possible values are messaging, messagingv2, '
'routing, log, test, noop'),
cfg.StrOpt('notification_transport_url',
help='A URL representing the messaging driver to use for '
'notifications. If not set, we fall back to the same '
'configuration used for RPC.'),
cfg.ListOpt('notification_topics',
default=['notifications', ],
deprecated_name='topics',
@ -75,6 +80,15 @@ class Driver(object):
pass
def get_notification_transport(conf, url=None,
allowed_remote_exmods=None, aliases=None):
if url is None:
conf.register_opts(_notifier_opts)
url = conf.notification_transport_url
return msg_transport.get_transport(conf, url,
allowed_remote_exmods, aliases)
class Notifier(object):
"""Send notification messages.
@ -94,7 +108,8 @@ class Notifier(object):
A Notifier object can be instantiated with a transport object and a
publisher ID:
notifier = messaging.Notifier(get_transport(CONF), 'compute')
notifier = messaging.Notifier(get_notification_transport(CONF),
'compute')
and notifications are sent via drivers chosen with the notification_driver
config option and on the topics chosen with the notification_topics config
@ -103,7 +118,8 @@ class Notifier(object):
Alternatively, a Notifier object can be instantiated with a specific
driver or topic::
notifier = notifier.Notifier(RPC_TRANSPORT,
transport = notifier.get_notification_transport(CONF)
notifier = notifier.Notifier(transport,
'compute.host',
driver='messaging',
topic='notifications')

View File

@ -21,6 +21,7 @@ import testscenarios
import oslo_messaging
from oslo_messaging.notify import dispatcher
from oslo_messaging.notify import notifier as msg_notifier
from oslo_messaging.tests import utils as test_utils
from six.moves import mock
@ -126,7 +127,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
ListenerSetupMixin.setUp(self)
def test_constructor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
target = oslo_messaging.Target(topic='foo')
endpoints = [object()]
@ -141,7 +143,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertEqual('blocking', listener.executor)
def test_no_target_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
listener = oslo_messaging.get_notification_listener(
transport,
@ -155,7 +158,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertTrue(False)
def test_unknown_executor(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
try:
oslo_messaging.get_notification_listener(transport, [], [],
@ -167,7 +171,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
self.assertTrue(False)
def test_one_topic(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -184,7 +189,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
{'message_id': mock.ANY, 'timestamp': mock.ANY})
def test_two_topics(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -210,7 +216,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
any_order=True)
def test_two_exchanges(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info.return_value = None
@ -254,7 +261,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
any_order=True)
def test_two_endpoints(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
@ -279,7 +287,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
'message_id': mock.ANY})
def test_requeue(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint = mock.Mock()
endpoint.info = mock.Mock()
@ -303,7 +312,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
{'timestamp': mock.ANY, 'message_id': mock.ANY})])
def test_two_pools(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None
@ -336,7 +346,8 @@ class TestNotifyListener(test_utils.BaseTestCase, ListenerSetupMixin):
mocked_endpoint_call(1)])
def test_two_pools_three_listener(self):
transport = oslo_messaging.get_transport(self.conf, url='fake:')
transport = msg_notifier.get_notification_transport(
self.conf, url='fake:')
endpoint1 = mock.Mock()
endpoint1.info.return_value = None