diff --git a/heat/common/messaging.py b/heat/common/messaging.py index 588e433d1..efc4b3cd8 100644 --- a/heat/common/messaging.py +++ b/heat/common/messaging.py @@ -23,6 +23,7 @@ from heat.common import context TRANSPORT = None +NOTIFICATIONS_TRANSPORT = None NOTIFIER = None _ALIASES = { @@ -73,39 +74,56 @@ class JsonPayloadSerializer(oslo_messaging.NoOpSerializer): return jsonutils.to_primitive(entity, convert_instances=True) +def get_specific_transport(url, optional, exmods, is_for_notifications=False): + try: + if is_for_notifications: + return oslo_messaging.get_notification_transport( + cfg.CONF, url, allowed_remote_exmods=exmods, aliases=_ALIASES) + else: + return oslo_messaging.get_transport( + cfg.CONF, url, allowed_remote_exmods=exmods, aliases=_ALIASES) + except oslo_messaging.InvalidTransportURL as e: + if not optional or e.url: + # NOTE(sileht): oslo_messaging is configured but unloadable + # so reraise the exception + raise + else: + return None + + +def setup_transports(url, optional): + global TRANSPORT, NOTIFICATIONS_TRANSPORT + oslo_messaging.set_transport_defaults('heat') + exmods = ['heat.common.exception'] + TRANSPORT = get_specific_transport(url, optional, exmods) + NOTIFICATIONS_TRANSPORT = get_specific_transport(url, optional, exmods, + is_for_notifications=True) + + def setup(url=None, optional=False): """Initialise the oslo_messaging layer.""" - global TRANSPORT, NOTIFIER + global NOTIFIER if url and url.startswith("fake://"): # NOTE(sileht): oslo_messaging fake driver uses time.sleep # for task switch, so we need to monkey_patch it eventlet.monkey_patch(time=True) + if not TRANSPORT or not NOTIFICATIONS_TRANSPORT: + setup_transports(url, optional) - if not TRANSPORT: - oslo_messaging.set_transport_defaults('heat') - exmods = ['heat.common.exception'] - try: - TRANSPORT = oslo_messaging.get_transport( - cfg.CONF, url, allowed_remote_exmods=exmods, aliases=_ALIASES) - except oslo_messaging.InvalidTransportURL as e: - TRANSPORT = None - if not optional or e.url: - # NOTE(sileht): oslo_messaging is configured but unloadable - # so reraise the exception - raise - - if not NOTIFIER and TRANSPORT: + if not NOTIFIER and NOTIFICATIONS_TRANSPORT: serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer) + NOTIFIER = oslo_messaging.Notifier(NOTIFICATIONS_TRANSPORT, + serializer=serializer) def cleanup(): """Cleanup the oslo_messaging layer.""" - global TRANSPORT, NOTIFIER + global TRANSPORT, NOTIFICATIONS_TRANSPORT, NOTIFIER if TRANSPORT: TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None + NOTIFICATIONS_TRANSPORT.cleanup() + TRANSPORT = NOTIFICATIONS_TRANSPORT = NOTIFIER = None def get_rpc_server(target, endpoint):