diff --git a/senlin/common/messaging.py b/senlin/common/messaging.py index eeb2959c5..2913f8ebe 100644 --- a/senlin/common/messaging.py +++ b/senlin/common/messaging.py @@ -11,18 +11,19 @@ # under the License. import eventlet - from oslo_config import cfg -import oslo_messaging -from oslo_serialization import jsonutils +import oslo_messaging as messaging from senlin.common import context +# An alias for the default serializer +JsonPayloadSerializer = messaging.JsonPayloadSerializer + TRANSPORT = None NOTIFIER = None -class RequestContextSerializer(oslo_messaging.Serializer): +class RequestContextSerializer(messaging.Serializer): def __init__(self, base): self._base = base @@ -45,37 +46,32 @@ class RequestContextSerializer(oslo_messaging.Serializer): return context.RequestContext.from_dict(ctxt) -class JsonPayloadSerializer(oslo_messaging.NoOpSerializer): - @classmethod - def serialize_entity(cls, context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - def setup(url=None, optional=False): """Initialise the oslo_messaging layer.""" global TRANSPORT, NOTIFIER if url and url.startswith("fake://"): - # NOTE(sileht): oslo_messaging fake driver uses time.sleep + # NOTE: oslo_messaging fake driver uses time.sleep # for task switch, so we need to monkey_patch it eventlet.monkey_patch(time=True) if not TRANSPORT: - oslo_messaging.set_transport_defaults('senlin') + messaging.set_transport_defaults('senlin') exmods = ['senlin.common.exception'] try: - TRANSPORT = oslo_messaging.get_transport( + TRANSPORT = messaging.get_transport( cfg.CONF, url, allowed_remote_exmods=exmods) - except oslo_messaging.InvalidTransportURL as e: + except messaging.InvalidTransportURL as e: TRANSPORT = None if not optional or e.url: - # NOTE(sileht): oslo_messaging is configured but unloadable + # NOTE: oslo_messaging is configured but unloadable # so reraise the exception raise if not NOTIFIER and TRANSPORT: serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = oslo_messaging.Notifier(TRANSPORT, serializer=serializer) + NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer, + topics=['versioned_notifications']) def cleanup(): @@ -83,25 +79,26 @@ def cleanup(): global TRANSPORT, NOTIFIER if TRANSPORT: TRANSPORT.cleanup() - TRANSPORT = NOTIFIER = None + TRANSPORT = None + NOTIFIER = None def get_rpc_server(target, endpoint): """Return a configured oslo_messaging rpc server.""" serializer = RequestContextSerializer(JsonPayloadSerializer()) - return oslo_messaging.get_rpc_server(TRANSPORT, target, [endpoint], - executor='eventlet', - serializer=serializer) + return messaging.get_rpc_server(TRANSPORT, target, [endpoint], + executor='eventlet', + serializer=serializer) def get_rpc_client(**kwargs): """Return a configured oslo_messaging RPCClient.""" - target = oslo_messaging.Target(**kwargs) + target = messaging.Target(**kwargs) serializer = RequestContextSerializer(JsonPayloadSerializer()) - return oslo_messaging.RPCClient(TRANSPORT, target, - serializer=serializer) + return messaging.RPCClient(TRANSPORT, target, serializer=serializer) def get_notifier(publisher_id): """Return a configured oslo_messaging notifier.""" + global NOTIFIER return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/senlin/objects/notification.py b/senlin/objects/notification.py index 712903032..f9cf919b8 100644 --- a/senlin/objects/notification.py +++ b/senlin/objects/notification.py @@ -10,9 +10,9 @@ # License for the specific language governing permissions and limitations # under the License. +from senlin.common import messaging from senlin.objects import base from senlin.objects import fields -from senlin.rpc import notifier as rpc @base.SenlinObjectRegistry.register @@ -100,7 +100,7 @@ class NotificationBase(base.SenlinObject): } def _emit(self, context, event_type, publisher_id, payload): - notifier = rpc.get_notifier(publisher_id) + notifier = messaging.get_notifier(publisher_id) notify = getattr(notifier, self.priority) notify(context, event_type=event_type, payload=payload) diff --git a/senlin/rpc/notifier.py b/senlin/rpc/notifier.py deleted file mode 100644 index c63a8310b..000000000 --- a/senlin/rpc/notifier.py +++ /dev/null @@ -1,78 +0,0 @@ -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -"""Senlin RPC Notifier.""" - -import oslo_messaging as messaging -from oslo_serialization import jsonutils - -from senlin.common import context as senlin_context -from senlin.common import exception as senlin_exc - -TRANSPORT = None -NOTIFIER = None - - -class JsonPayloadSerializer(messaging.NoOpSerializer): - - @staticmethod - def serialize_entity(context, entity): - return jsonutils.to_primitive(entity, convert_instances=True) - - -class RequestContextSerializer(messaging.Serializer): - - def __init__(self, base): - self._base = base - - def serialize_entity(self, context, entity): - if not self._base: - return entity - return self._base.serialize_entity(context, entity) - - def deserialize_entity(self, context, entity): - if not self._base: - return entity - return self._base.deserialize_entity(context, entity) - - def serialize_context(self, context): - return context.to_dict() - - def deserialize_context(self, context): - return senlin_context.RequestContext.from_dict(context) - - -def init(conf): - global TRANSPORT, NOTIFIER - - exmods = [senlin_exc.__name__] - TRANSPORT = messaging.get_notification_transport( - conf, allowed_remote_exmods=exmods) - serializer = RequestContextSerializer(JsonPayloadSerializer()) - NOTIFIER = messaging.Notifier(TRANSPORT, serializer=serializer, - topics=['versioned_notifications']) - - -def cleanup(): - global TRANSPORT, NOTIFIER - assert TRANSPORT is not None - assert NOTIFIER is not None - - TRANSPORT.cleanup() - TRANSPORT = None - NOTIFIER = None - - -def get_notifier(publisher_id): - assert NOTIFIER is not None - - return NOTIFIER.prepare(publisher_id=publisher_id) diff --git a/senlin/tests/unit/objects/test_notification.py b/senlin/tests/unit/objects/test_notification.py index dea584937..cd5ea10a4 100644 --- a/senlin/tests/unit/objects/test_notification.py +++ b/senlin/tests/unit/objects/test_notification.py @@ -139,7 +139,7 @@ class TestNotificationBase(test_base.SenlinTestCase): actual_payload = mock_notify.call_args[1]['payload'] self.assertJsonEqual(expected_payload, actual_payload) - @mock.patch('senlin.rpc.notifier.NOTIFIER') + @mock.patch('senlin.common.messaging.NOTIFIER') def test_emit_notification(self, mock_notifier): mock_context = mock.Mock() @@ -152,7 +152,7 @@ class TestNotificationBase(test_base.SenlinTestCase): expected_event_type='test_object.update.start', expected_payload=self.expected_payload) - @mock.patch('senlin.rpc.notifier.NOTIFIER') + @mock.patch('senlin.common.messaging.NOTIFIER') def test_emit_with_host_and_binary_as_publisher(self, mock_notifier): event_type = notification.EventType( object='test_object', @@ -175,7 +175,7 @@ class TestNotificationBase(test_base.SenlinTestCase): expected_event_type='test_object.update', expected_payload=self.expected_payload) - @mock.patch('senlin.rpc.notifier.NOTIFIER') + @mock.patch('senlin.common.messaging.NOTIFIER') def test_emit_event_type_without_phase(self, mock_notifier): noti = TestNotification( event_type=notification.EventType( @@ -196,7 +196,7 @@ class TestNotificationBase(test_base.SenlinTestCase): expected_event_type='test_object.update', expected_payload=self.expected_payload) - @mock.patch('senlin.rpc.notifier.NOTIFIER') + @mock.patch('senlin.common.messaging.NOTIFIER') def test_not_possible_to_emit_if_not_populated(self, mock_notifier): # create a non-populated payload payload = TestPayload(extra_field='test string') @@ -216,7 +216,7 @@ class TestNotificationBase(test_base.SenlinTestCase): self.assertRaises(AssertionError, noti.emit, mock_context) self.assertFalse(mock_notifier.called) - @mock.patch('senlin.rpc.notifier.NOTIFIER') + @mock.patch('senlin.common.messaging.NOTIFIER') def test_empty_schema(self, mock_notifier): # create a non-populated payload payload = TestPayloadEmptySchema(extra_field='test string')