Merge "Fix for not working 'run-action' on kombu driver"
This commit is contained in:
commit
3c11fc4ed5
@ -13,6 +13,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import kombu
|
||||
from kombu import serialization
|
||||
|
||||
from mistral.utils import serializers
|
||||
|
||||
|
||||
class Base(object):
|
||||
@ -97,3 +100,16 @@ class Base(object):
|
||||
auto_delete=auto_delete,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _register_mistral_serialization():
|
||||
"""Adds mistral serializer to available serializers in kombu.
|
||||
|
||||
:return: None
|
||||
"""
|
||||
serialization.register(
|
||||
'mistral_serialization',
|
||||
encoder=serializers.KombuSerializer.serialize,
|
||||
decoder=serializers.KombuSerializer.deserialize,
|
||||
content_type='application/json'
|
||||
)
|
||||
|
@ -33,6 +33,8 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
def __init__(self, conf):
|
||||
super(KombuRPCClient, self).__init__(conf)
|
||||
|
||||
self._register_mistral_serialization()
|
||||
|
||||
self.exchange = conf.get('exchange', '')
|
||||
self.user_id = conf.get('user_id', 'guest')
|
||||
self.password = conf.get('password', 'guest')
|
||||
@ -158,6 +160,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
routing_key=self.topic,
|
||||
reply_to=self.callback_queue.name,
|
||||
correlation_id=utils.get_thread_local(CORR_ID),
|
||||
serializer='mistral_serialization',
|
||||
delivery_mode=2
|
||||
)
|
||||
|
||||
|
@ -18,7 +18,7 @@ import threading
|
||||
import kombu
|
||||
from oslo_log import log as logging
|
||||
|
||||
from mistral import context as auth_context
|
||||
from mistral import context as auth_ctx
|
||||
from mistral.engine.rpc_backend import base as rpc_base
|
||||
from mistral.engine.rpc_backend.kombu import base as kombu_base
|
||||
from mistral import exceptions as exc
|
||||
@ -31,6 +31,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
def __init__(self, conf):
|
||||
super(KombuRPCServer, self).__init__(conf)
|
||||
|
||||
self._register_mistral_serialization()
|
||||
|
||||
self.exchange = conf.get('exchange', '')
|
||||
self.user_id = conf.get('user_id', 'guest')
|
||||
self.password = conf.get('password', 'guest')
|
||||
@ -112,20 +114,21 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
if not isinstance(ctx, dict):
|
||||
return
|
||||
|
||||
context = auth_context.MistralContext(**ctx)
|
||||
auth_context.set_ctx(context)
|
||||
context = auth_ctx.MistralContext(**ctx)
|
||||
auth_ctx.set_ctx(context)
|
||||
|
||||
return context
|
||||
|
||||
def publish_message(self, body, reply_to, corr_id, type='response'):
|
||||
def publish_message(self, body, reply_to, corr_id, res_type='response'):
|
||||
with kombu.producers[self.conn].acquire(block=True) as producer:
|
||||
producer.publish(
|
||||
body=body,
|
||||
exchange=self.exchange,
|
||||
routing_key=reply_to,
|
||||
correlation_id=corr_id,
|
||||
type=type,
|
||||
serializer='pickle' if type == 'error' else None
|
||||
serializer='pickle' if res_type == 'error'
|
||||
else 'mistral_serialization',
|
||||
type=res_type
|
||||
)
|
||||
|
||||
def _on_message_safe(self, request, message):
|
||||
@ -136,7 +139,7 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
e,
|
||||
message.properties['reply_to'],
|
||||
message.properties['correlation_id'],
|
||||
type='error'
|
||||
res_type='error'
|
||||
)
|
||||
finally:
|
||||
message.ack()
|
||||
|
@ -25,6 +25,8 @@ connection = mock.MagicMock()
|
||||
connections = mock.MagicMock()
|
||||
connections.__getitem__ = lambda *args, **kwargs: connection
|
||||
|
||||
serialization = mock.MagicMock()
|
||||
|
||||
|
||||
def BrokerConnection(*args, **kwargs):
|
||||
return mock.MagicMock()
|
||||
|
@ -69,7 +69,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
||||
routing_key=reply_to,
|
||||
correlation_id=corr_id,
|
||||
type=type,
|
||||
serializer=None
|
||||
serializer='mistral_serialization'
|
||||
)
|
||||
|
||||
def test_run_launch_successfully(self):
|
||||
@ -164,7 +164,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
||||
test_exception,
|
||||
reply_to,
|
||||
correlation_id,
|
||||
type='error'
|
||||
res_type='error'
|
||||
)
|
||||
|
||||
@mock.patch.object(
|
||||
|
@ -14,12 +14,28 @@
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_serialization import jsonutils
|
||||
|
||||
|
||||
class Serializer(object):
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def serialize(self, entity):
|
||||
def serialize(entity):
|
||||
pass
|
||||
|
||||
@staticmethod
|
||||
@abc.abstractmethod
|
||||
def deserialize(self, entity):
|
||||
def deserialize(entity):
|
||||
pass
|
||||
|
||||
|
||||
class KombuSerializer(Serializer):
|
||||
@staticmethod
|
||||
def deserialize(entity):
|
||||
return jsonutils.loads(entity)
|
||||
|
||||
@staticmethod
|
||||
def serialize(entity):
|
||||
return jsonutils.dumps(
|
||||
jsonutils.to_primitive(entity, convert_instances=True)
|
||||
)
|
||||
|
@ -53,14 +53,16 @@ class Result(object):
|
||||
|
||||
|
||||
class ResultSerializer(serializers.Serializer):
|
||||
def serialize(self, entity):
|
||||
@staticmethod
|
||||
def serialize(entity):
|
||||
return {
|
||||
'data': entity.data,
|
||||
'error': entity.error,
|
||||
'cancel': entity.cancel
|
||||
}
|
||||
|
||||
def deserialize(self, entity):
|
||||
@staticmethod
|
||||
def deserialize(entity):
|
||||
return Result(
|
||||
entity['data'],
|
||||
entity['error'],
|
||||
|
Loading…
Reference in New Issue
Block a user