From 10425c9fa5e81d30f0d2ebb8f7ded4b9346060a6 Mon Sep 17 00:00:00 2001 From: Dawid Deja Date: Thu, 11 Aug 2016 15:29:55 +0200 Subject: [PATCH] Fix for not working 'run-action' on kombu driver Closes-bug:1612154 Change-Id: Ia2f2375ce1bb193ba0b7b78606ba2b33c1a9dd21 --- mistral/engine/rpc_backend/kombu/base.py | 16 +++++++++++++++ .../engine/rpc_backend/kombu/kombu_client.py | 3 +++ .../engine/rpc_backend/kombu/kombu_server.py | 17 +++++++++------- .../engine/rpc_backend/kombu/fake_kombu.py | 2 ++ .../rpc_backend/kombu/test_kombu_server.py | 4 ++-- mistral/utils/serializers.py | 20 +++++++++++++++++-- mistral/workflow/utils.py | 6 ++++-- 7 files changed, 55 insertions(+), 13 deletions(-) diff --git a/mistral/engine/rpc_backend/kombu/base.py b/mistral/engine/rpc_backend/kombu/base.py index 83327fb0..e3b917ec 100644 --- a/mistral/engine/rpc_backend/kombu/base.py +++ b/mistral/engine/rpc_backend/kombu/base.py @@ -13,6 +13,9 @@ # limitations under the License. import kombu +from kombu import serialization + +from mistral.utils import serializers class Base(object): @@ -97,3 +100,16 @@ class Base(object): auto_delete=auto_delete, **kwargs ) + + @staticmethod + def _register_mistral_serialization(): + """Adds mistral serializer to available serializers in kombu. + + :return: None + """ + serialization.register( + 'mistral_serialization', + encoder=serializers.KombuSerializer.serialize, + decoder=serializers.KombuSerializer.deserialize, + content_type='application/json' + ) diff --git a/mistral/engine/rpc_backend/kombu/kombu_client.py b/mistral/engine/rpc_backend/kombu/kombu_client.py index 1badce72..20b10db9 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_client.py +++ b/mistral/engine/rpc_backend/kombu/kombu_client.py @@ -33,6 +33,8 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): def __init__(self, conf): super(KombuRPCClient, self).__init__(conf) + self._register_mistral_serialization() + self.exchange = conf.get('exchange', '') self.user_id = conf.get('user_id', 'guest') self.password = conf.get('password', 'guest') @@ -158,6 +160,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): routing_key=self.topic, reply_to=self.callback_queue.name, correlation_id=utils.get_thread_local(CORR_ID), + serializer='mistral_serialization', delivery_mode=2 ) diff --git a/mistral/engine/rpc_backend/kombu/kombu_server.py b/mistral/engine/rpc_backend/kombu/kombu_server.py index 3613b99b..481bf0bc 100644 --- a/mistral/engine/rpc_backend/kombu/kombu_server.py +++ b/mistral/engine/rpc_backend/kombu/kombu_server.py @@ -18,7 +18,7 @@ import threading import kombu from oslo_log import log as logging -from mistral import context as auth_context +from mistral import context as auth_ctx from mistral.engine.rpc_backend import base as rpc_base from mistral.engine.rpc_backend.kombu import base as kombu_base from mistral import exceptions as exc @@ -31,6 +31,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): def __init__(self, conf): super(KombuRPCServer, self).__init__(conf) + self._register_mistral_serialization() + self.exchange = conf.get('exchange', '') self.user_id = conf.get('user_id', 'guest') self.password = conf.get('password', 'guest') @@ -112,20 +114,21 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): if not isinstance(ctx, dict): return - context = auth_context.MistralContext(**ctx) - auth_context.set_ctx(context) + context = auth_ctx.MistralContext(**ctx) + auth_ctx.set_ctx(context) return context - def publish_message(self, body, reply_to, corr_id, type='response'): + def publish_message(self, body, reply_to, corr_id, res_type='response'): with kombu.producers[self.conn].acquire(block=True) as producer: producer.publish( body=body, exchange=self.exchange, routing_key=reply_to, correlation_id=corr_id, - type=type, - serializer='pickle' if type == 'error' else None + serializer='pickle' if res_type == 'error' + else 'mistral_serialization', + type=res_type ) def _on_message_safe(self, request, message): @@ -136,7 +139,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base): e, message.properties['reply_to'], message.properties['correlation_id'], - type='error' + res_type='error' ) finally: message.ack() diff --git a/mistral/tests/unit/engine/rpc_backend/kombu/fake_kombu.py b/mistral/tests/unit/engine/rpc_backend/kombu/fake_kombu.py index 72b1d14a..8511a07a 100644 --- a/mistral/tests/unit/engine/rpc_backend/kombu/fake_kombu.py +++ b/mistral/tests/unit/engine/rpc_backend/kombu/fake_kombu.py @@ -25,6 +25,8 @@ connection = mock.MagicMock() connections = mock.MagicMock() connections.__getitem__ = lambda *args, **kwargs: connection +serialization = mock.MagicMock() + def BrokerConnection(*args, **kwargs): return mock.MagicMock() diff --git a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py index b95abb18..a39c55a3 100644 --- a/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py +++ b/mistral/tests/unit/engine/rpc_backend/kombu/test_kombu_server.py @@ -69,7 +69,7 @@ class KombuServerTestCase(base.KombuTestCase): routing_key=reply_to, correlation_id=corr_id, type=type, - serializer=None + serializer='mistral_serialization' ) def test_run_launch_successfully(self): @@ -164,7 +164,7 @@ class KombuServerTestCase(base.KombuTestCase): test_exception, reply_to, correlation_id, - type='error' + res_type='error' ) @mock.patch.object( diff --git a/mistral/utils/serializers.py b/mistral/utils/serializers.py index c43a1d32..04294eb7 100644 --- a/mistral/utils/serializers.py +++ b/mistral/utils/serializers.py @@ -14,12 +14,28 @@ import abc +from oslo_serialization import jsonutils + class Serializer(object): + @staticmethod @abc.abstractmethod - def serialize(self, entity): + def serialize(entity): pass + @staticmethod @abc.abstractmethod - def deserialize(self, entity): + def deserialize(entity): pass + + +class KombuSerializer(Serializer): + @staticmethod + def deserialize(entity): + return jsonutils.loads(entity) + + @staticmethod + def serialize(entity): + return jsonutils.dumps( + jsonutils.to_primitive(entity, convert_instances=True) + ) diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index fa91dab9..06fe8720 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -53,14 +53,16 @@ class Result(object): class ResultSerializer(serializers.Serializer): - def serialize(self, entity): + @staticmethod + def serialize(entity): return { 'data': entity.data, 'error': entity.error, 'cancel': entity.cancel } - def deserialize(self, entity): + @staticmethod + def deserialize(entity): return Result( entity['data'], entity['error'],