Merge "allow to specify notifications transport url"

This commit is contained in:
Jenkins 2016-07-09 09:17:25 +00:00 committed by Gerrit Code Review
commit 82e87ab0e6
4 changed files with 131 additions and 29 deletions

View File

@ -0,0 +1,5 @@
---
features:
- Separate transport url can be used for notifications purposes now,
to enable this feature 'transport_url' should be provided in
'oslo_messaging_notifications' section.

View File

@ -86,9 +86,7 @@ def setup_common(possible_topdir, service_name):
cinder.validate_config()
castellan.validate_config()
if (service_name != 'all-in-one' or
CONF.oslo_messaging_notifications.enable):
messaging.setup()
messaging.setup(service_name)
plugins_base.setup_plugins()

View File

@ -27,27 +27,95 @@ _ALIASES = {
class TestMessagingSetup(base.SaharaTestCase):
@mock.patch('oslo_messaging.set_transport_defaults')
@mock.patch('oslo_messaging.get_transport')
@mock.patch('oslo_messaging.Notifier')
def test_set_defaults(self, notifier_init,
get_transport, set_transport_def):
def setUp(self):
super(TestMessagingSetup, self).setUp()
self.patchers = []
notifier_init_patch = mock.patch('oslo_messaging.Notifier')
self.notifier_init = notifier_init_patch.start()
self.patchers.append(notifier_init_patch)
get_notif_transp_patch = mock.patch(
'oslo_messaging.get_notification_transport')
self.get_notify_transport = get_notif_transp_patch.start()
self.patchers.append(get_notif_transp_patch)
get_transport_patch = mock.patch('oslo_messaging.get_transport')
self.get_transport = get_transport_patch.start()
self.patchers.append(get_transport_patch)
set_def_patch = mock.patch('oslo_messaging.set_transport_defaults')
self.set_transport_def = set_def_patch.start()
self.patchers.append(set_def_patch)
def tearDown(self):
messaging.NOTIFICATION_TRANSPORT = None
messaging.MESSAGING_TRANSPORT = None
messaging.NOTIFIER = None
for patch in reversed(self.patchers):
patch.stop()
super(TestMessagingSetup, self).tearDown()
def test_set_defaults(self):
self.override_config('enable', True,
group='oslo_messaging_notifications')
messaging.setup()
self.assertIsNotNone(messaging.TRANSPORT)
messaging.setup('distributed')
self.assertIsNotNone(messaging.MESSAGING_TRANSPORT)
self.assertIsNotNone(messaging.NOTIFICATION_TRANSPORT)
self.assertIsNotNone(messaging.NOTIFIER)
expected = [
mock.call('sahara')
]
self.assertEqual(expected, set_transport_def.call_args_list)
self.assertEqual(expected, self.set_transport_def.call_args_list)
self.assertEqual(
[mock.call(main.CONF, aliases=_ALIASES)],
get_transport.call_args_list)
self.assertEqual(1, notifier_init.call_count)
self.get_transport.call_args_list)
self.assertEqual(
[mock.call(main.CONF, aliases=_ALIASES)],
self.get_notify_transport.call_args_list)
self.assertEqual(1, self.notifier_init.call_count)
if messaging.TRANSPORT:
messaging.TRANSPORT.cleanup()
messaging.TRANSPORT = messaging.NOTIFIER = None
def test_fallback(self):
self.override_config('enable', True,
group='oslo_messaging_notifications')
self.get_notify_transport.side_effect = ValueError()
messaging.setup('distributed')
self.assertIsNotNone(messaging.MESSAGING_TRANSPORT)
self.assertIsNotNone(messaging.NOTIFICATION_TRANSPORT)
self.assertEqual(
messaging.MESSAGING_TRANSPORT, messaging.NOTIFICATION_TRANSPORT)
self.assertIsNotNone(messaging.NOTIFIER)
expected = [
mock.call('sahara')
]
self.assertEqual(expected, self.set_transport_def.call_args_list)
self.assertEqual(
[mock.call(main.CONF, aliases=_ALIASES)],
self.get_transport.call_args_list)
self.assertEqual(
[mock.call(main.CONF, aliases=_ALIASES)],
self.get_notify_transport.call_args_list)
self.assertEqual(1, self.notifier_init.call_count)
def test_no_messaging(self):
messaging.setup('all-in-one')
self.assertEqual(0, self.get_notify_transport.call_count)
self.assertEqual(0, self.get_transport.call_count)
def test_only_notifications(self):
self.override_config('enable', True,
group='oslo_messaging_notifications')
messaging.setup('all-in-one')
self.assertEqual(0, self.get_transport.call_count)
self.assertEqual(1, self.get_notify_transport.call_count)
def test_only_service_messaging(self):
messaging.setup('distributed')
self.assertEqual(1, self.get_transport.call_count)
self.assertEqual(0, self.get_notify_transport.call_count)

View File

@ -20,10 +20,12 @@ import oslo_messaging as messaging
from oslo_serialization import jsonutils
from sahara import context
from sahara.i18n import _LE
from sahara.i18n import _LI
TRANSPORT = None
MESSAGING_TRANSPORT = None
NOTIFICATION_TRANSPORT = None
NOTIFIER = None
CONF = cfg.CONF
@ -63,11 +65,11 @@ class JsonPayloadSerializer(messaging.NoOpSerializer):
class RPCClient(object):
def __init__(self, target):
global TRANSPORT
global MESSAGING_TRANSPORT
self.__client = messaging.RPCClient(
target=target,
transport=TRANSPORT,
transport=MESSAGING_TRANSPORT,
)
def cast(self, name, **kwargs):
@ -81,11 +83,11 @@ class RPCClient(object):
class RPCServer(object):
def __init__(self, target):
global TRANSPORT
global MESSAGING_TRANSPORT
self.__server = messaging.get_rpc_server(
target=target,
transport=TRANSPORT,
transport=MESSAGING_TRANSPORT,
endpoints=[self],
executor='eventlet')
@ -93,21 +95,50 @@ class RPCServer(object):
return self.__server
def setup():
"""Initialise the oslo_messaging layer."""
global TRANSPORT, NOTIFIER
def setup_service_messaging():
global MESSAGING_TRANSPORT
if MESSAGING_TRANSPORT:
# Already is up
return
MESSAGING_TRANSPORT = messaging.get_transport(cfg.CONF, aliases=_ALIASES)
messaging.set_transport_defaults('sahara')
TRANSPORT = messaging.get_transport(cfg.CONF, aliases=_ALIASES)
def setup_notifications():
global NOTIFICATION_TRANSPORT, NOTIFIER, MESSAGING_TRANSPORT
if not cfg.CONF.oslo_messaging_notifications.enable:
LOG.info(_LI("Notifications disabled"))
return
LOG.info(_LI("Notifications enabled"))
try:
NOTIFICATION_TRANSPORT = messaging.get_notification_transport(
cfg.CONF, aliases=_ALIASES)
except Exception:
LOG.error(_LE("Unable to setup notification transport. Reusing "
"service transport for that."))
setup_service_messaging()
NOTIFICATION_TRANSPORT = MESSAGING_TRANSPORT
serializer = ContextSerializer(JsonPayloadSerializer())
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
NOTIFIER = messaging.Notifier(
NOTIFICATION_TRANSPORT, serializer=serializer)
LOG.info(_LI("Notifications enabled"))
def setup(service_name):
"""Initialise the oslo_messaging layer."""
global MESSAGING_TRANSPORT, NOTIFICATION_TRANSPORT, NOTIFIER
if (service_name == 'all-in-one' and
not cfg.CONF.oslo_messaging_notifications.enable):
LOG.info(_LI("Notifications disabled"))
return
messaging.set_transport_defaults('sahara')
setup_notifications()
if service_name != 'all-in-one':
setup_service_messaging()
def get_notifier(publisher_id):