Fix for not working 'run-action' on kombu driver

Closes-bug:1612154

Change-Id: Ia2f2375ce1bb193ba0b7b78606ba2b33c1a9dd21
This commit is contained in:
Dawid Deja 2016-08-11 15:29:55 +02:00
parent 8bdef0f84e
commit 10425c9fa5
7 changed files with 55 additions and 13 deletions

View File

@ -13,6 +13,9 @@
# limitations under the License.
import kombu
from kombu import serialization
from mistral.utils import serializers
class Base(object):
@ -97,3 +100,16 @@ class Base(object):
auto_delete=auto_delete,
**kwargs
)
@staticmethod
def _register_mistral_serialization():
"""Adds mistral serializer to available serializers in kombu.
:return: None
"""
serialization.register(
'mistral_serialization',
encoder=serializers.KombuSerializer.serialize,
decoder=serializers.KombuSerializer.deserialize,
content_type='application/json'
)

View File

@ -33,6 +33,8 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
def __init__(self, conf):
super(KombuRPCClient, self).__init__(conf)
self._register_mistral_serialization()
self.exchange = conf.get('exchange', '')
self.user_id = conf.get('user_id', 'guest')
self.password = conf.get('password', 'guest')
@ -158,6 +160,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
routing_key=self.topic,
reply_to=self.callback_queue.name,
correlation_id=utils.get_thread_local(CORR_ID),
serializer='mistral_serialization',
delivery_mode=2
)

View File

@ -18,7 +18,7 @@ import threading
import kombu
from oslo_log import log as logging
from mistral import context as auth_context
from mistral import context as auth_ctx
from mistral.engine.rpc_backend import base as rpc_base
from mistral.engine.rpc_backend.kombu import base as kombu_base
from mistral import exceptions as exc
@ -31,6 +31,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
def __init__(self, conf):
super(KombuRPCServer, self).__init__(conf)
self._register_mistral_serialization()
self.exchange = conf.get('exchange', '')
self.user_id = conf.get('user_id', 'guest')
self.password = conf.get('password', 'guest')
@ -112,20 +114,21 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
if not isinstance(ctx, dict):
return
context = auth_context.MistralContext(**ctx)
auth_context.set_ctx(context)
context = auth_ctx.MistralContext(**ctx)
auth_ctx.set_ctx(context)
return context
def publish_message(self, body, reply_to, corr_id, type='response'):
def publish_message(self, body, reply_to, corr_id, res_type='response'):
with kombu.producers[self.conn].acquire(block=True) as producer:
producer.publish(
body=body,
exchange=self.exchange,
routing_key=reply_to,
correlation_id=corr_id,
type=type,
serializer='pickle' if type == 'error' else None
serializer='pickle' if res_type == 'error'
else 'mistral_serialization',
type=res_type
)
def _on_message_safe(self, request, message):
@ -136,7 +139,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
e,
message.properties['reply_to'],
message.properties['correlation_id'],
type='error'
res_type='error'
)
finally:
message.ack()

View File

@ -25,6 +25,8 @@ connection = mock.MagicMock()
connections = mock.MagicMock()
connections.__getitem__ = lambda *args, **kwargs: connection
serialization = mock.MagicMock()
def BrokerConnection(*args, **kwargs):
return mock.MagicMock()

View File

@ -69,7 +69,7 @@ class KombuServerTestCase(base.KombuTestCase):
routing_key=reply_to,
correlation_id=corr_id,
type=type,
serializer=None
serializer='mistral_serialization'
)
def test_run_launch_successfully(self):
@ -164,7 +164,7 @@ class KombuServerTestCase(base.KombuTestCase):
test_exception,
reply_to,
correlation_id,
type='error'
res_type='error'
)
@mock.patch.object(

View File

@ -14,12 +14,28 @@
import abc
from oslo_serialization import jsonutils
class Serializer(object):
@staticmethod
@abc.abstractmethod
def serialize(self, entity):
def serialize(entity):
pass
@staticmethod
@abc.abstractmethod
def deserialize(self, entity):
def deserialize(entity):
pass
class KombuSerializer(Serializer):
@staticmethod
def deserialize(entity):
return jsonutils.loads(entity)
@staticmethod
def serialize(entity):
return jsonutils.dumps(
jsonutils.to_primitive(entity, convert_instances=True)
)

View File

@ -53,14 +53,16 @@ class Result(object):
class ResultSerializer(serializers.Serializer):
def serialize(self, entity):
@staticmethod
def serialize(entity):
return {
'data': entity.data,
'error': entity.error,
'cancel': entity.cancel
}
def deserialize(self, entity):
@staticmethod
def deserialize(entity):
return Result(
entity['data'],
entity['error'],