Initialize MQ transport only once
There are two separate features that use MQ transport: 1. Ceilometer notifications 2. api-engine communication Merged MQ initialization code for them. The question "Should they use the same MQ or not?" is out of scope of this CR. Change-Id: I6a38708bce4c031f9e759a29afbce14325960808 Closes-Bug: #1413005
This commit is contained in:
parent
c293092a88
commit
264b5bb4c5
@ -83,7 +83,8 @@ def setup_common(possible_topdir, service_name):
|
|||||||
# Validate other configurations (that may produce logs) here
|
# Validate other configurations (that may produce logs) here
|
||||||
cinder.validate_config()
|
cinder.validate_config()
|
||||||
|
|
||||||
messaging.setup()
|
if service_name != 'all-in-one' or cfg.CONF.enable_notifications:
|
||||||
|
messaging.setup()
|
||||||
|
|
||||||
if service_name != 'all-in-one':
|
if service_name != 'all-in-one':
|
||||||
LOG.warn(
|
LOG.warn(
|
||||||
|
@ -52,7 +52,7 @@ class NotificationTest(base.SaharaTestCase):
|
|||||||
@mock.patch('oslo_messaging.notify.notifier.Notifier.info')
|
@mock.patch('oslo_messaging.notify.notifier.Notifier.info')
|
||||||
def test_update_cluster(self, mock_notify):
|
def test_update_cluster(self, mock_notify):
|
||||||
self.override_config("enable_notifications", True)
|
self.override_config("enable_notifications", True)
|
||||||
messaging.setup("fake://", optional=True)
|
messaging.setup()
|
||||||
|
|
||||||
self._make_sample()
|
self._make_sample()
|
||||||
self.assertEqual([self.expected],
|
self.assertEqual([self.expected],
|
||||||
|
@ -33,7 +33,7 @@ class TestMessagingSetup(base.SaharaTestCase):
|
|||||||
self.override_config('enable_notifications', True)
|
self.override_config('enable_notifications', True)
|
||||||
|
|
||||||
def _install(self):
|
def _install(self):
|
||||||
messaging.setup('fake://', optional=True)
|
messaging.setup()
|
||||||
self.assertNotEqual(None, messaging.TRANSPORT)
|
self.assertNotEqual(None, messaging.TRANSPORT)
|
||||||
self.assertNotEqual(None, messaging.NOTIFIER)
|
self.assertNotEqual(None, messaging.NOTIFIER)
|
||||||
|
|
||||||
@ -58,7 +58,7 @@ class TestMessagingSetup(base.SaharaTestCase):
|
|||||||
self._install()
|
self._install()
|
||||||
|
|
||||||
expected = [
|
expected = [
|
||||||
mock.call(main.CONF, 'fake://', aliases=_ALIASES)
|
mock.call(main.CONF, aliases=_ALIASES)
|
||||||
]
|
]
|
||||||
self.assertEqual(expected, mock_transport.call_args_list)
|
self.assertEqual(expected, mock_transport.call_args_list)
|
||||||
|
|
||||||
@ -68,11 +68,6 @@ class TestMessagingSetup(base.SaharaTestCase):
|
|||||||
def test_notifier(self, mock_init):
|
def test_notifier(self, mock_init):
|
||||||
self._install()
|
self._install()
|
||||||
|
|
||||||
serializer = messaging.SERIALIZER
|
self.assertEqual(1, mock_init.call_count)
|
||||||
expected = [
|
|
||||||
mock.call(messaging.TRANSPORT, serializer=serializer)
|
|
||||||
]
|
|
||||||
|
|
||||||
self.assertEqual(expected, mock_init.call_args_list)
|
|
||||||
|
|
||||||
self._remove_install()
|
self._remove_install()
|
||||||
|
@ -25,7 +25,6 @@ from sahara.i18n import _LI
|
|||||||
|
|
||||||
TRANSPORT = None
|
TRANSPORT = None
|
||||||
NOTIFIER = None
|
NOTIFIER = None
|
||||||
SERIALIZER = None
|
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
@ -64,9 +63,11 @@ class JsonPayloadSerializer(messaging.NoOpSerializer):
|
|||||||
|
|
||||||
class RPCClient(object):
|
class RPCClient(object):
|
||||||
def __init__(self, target):
|
def __init__(self, target):
|
||||||
|
global TRANSPORT
|
||||||
|
|
||||||
self.__client = messaging.RPCClient(
|
self.__client = messaging.RPCClient(
|
||||||
target=target,
|
target=target,
|
||||||
transport=messaging.get_transport(cfg.CONF),
|
transport=TRANSPORT,
|
||||||
)
|
)
|
||||||
|
|
||||||
def cast(self, name, **kwargs):
|
def cast(self, name, **kwargs):
|
||||||
@ -80,9 +81,11 @@ class RPCClient(object):
|
|||||||
|
|
||||||
class RPCServer(object):
|
class RPCServer(object):
|
||||||
def __init__(self, target):
|
def __init__(self, target):
|
||||||
|
global TRANSPORT
|
||||||
|
|
||||||
self.__server = messaging.get_rpc_server(
|
self.__server = messaging.get_rpc_server(
|
||||||
target=target,
|
target=target,
|
||||||
transport=messaging.get_transport(cfg.CONF),
|
transport=TRANSPORT,
|
||||||
endpoints=[self],
|
endpoints=[self],
|
||||||
executor='eventlet')
|
executor='eventlet')
|
||||||
|
|
||||||
@ -91,29 +94,21 @@ class RPCServer(object):
|
|||||||
self.__server.wait()
|
self.__server.wait()
|
||||||
|
|
||||||
|
|
||||||
def setup(url=None, optional=False):
|
def setup():
|
||||||
"""Initialise the oslo_messaging layer."""
|
"""Initialise the oslo_messaging layer."""
|
||||||
global TRANSPORT, NOTIFIER, SERIALIZER
|
global TRANSPORT, NOTIFIER
|
||||||
|
|
||||||
|
messaging.set_transport_defaults('sahara')
|
||||||
|
|
||||||
|
TRANSPORT = messaging.get_transport(cfg.CONF, aliases=_ALIASES)
|
||||||
|
|
||||||
if not cfg.CONF.enable_notifications:
|
if not cfg.CONF.enable_notifications:
|
||||||
LOG.info(_LI("Notifications disabled"))
|
LOG.info(_LI("Notifications disabled"))
|
||||||
return
|
return
|
||||||
LOG.info(_LI("Notifications enabled"))
|
LOG.info(_LI("Notifications enabled"))
|
||||||
|
|
||||||
messaging.set_transport_defaults('sahara')
|
serializer = ContextSerializer(JsonPayloadSerializer())
|
||||||
|
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer)
|
||||||
SERIALIZER = ContextSerializer(JsonPayloadSerializer())
|
|
||||||
|
|
||||||
try:
|
|
||||||
TRANSPORT = messaging.get_transport(cfg.CONF, url,
|
|
||||||
aliases=_ALIASES)
|
|
||||||
except messaging.InvalidTransportURL as e:
|
|
||||||
TRANSPORT = None
|
|
||||||
if not optional or e.url:
|
|
||||||
raise
|
|
||||||
|
|
||||||
if TRANSPORT:
|
|
||||||
NOTIFIER = messaging.Notifier(TRANSPORT, serializer=SERIALIZER)
|
|
||||||
|
|
||||||
|
|
||||||
def get_notifier(publisher_id):
|
def get_notifier(publisher_id):
|
||||||
|
Loading…
Reference in New Issue
Block a user