Create MistralContext from rpc context in kombu engine

Partially implements blueprint mistral-task-delivery-model

Change-Id: I6a4f6e362e6c8e94db8bebebbc35c2abcec88c89
This commit is contained in:
Dawid Deja 2016-07-15 16:09:11 +02:00
parent 80e925b523
commit 138b885170
2 changed files with 14 additions and 8 deletions

View File

@ -115,6 +115,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
context = auth_context.MistralContext(**ctx)
auth_context.set_ctx(context)
return context
def publish_message(self, body, reply_to, corr_id, type='response'):
with kombu.producers[self.conn].acquire(block=True) as producer:
producer.publish(
@ -144,13 +146,13 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
request)
is_async = request.get('async', False)
rpc_context = request.get('rpc_ctx')
rpc_ctx = request.get('rpc_ctx')
rpc_method_name = request.get('rpc_method')
arguments = request.get('arguments')
correlation_id = message.properties['correlation_id']
reply_to = message.properties['reply_to']
self._set_auth_ctx(rpc_context)
rpc_context = self._set_auth_ctx(rpc_ctx)
rpc_method = self._get_rpc_method(rpc_method_name)

View File

@ -194,11 +194,13 @@ class KombuServerTestCase(base.KombuTestCase):
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
def test__on_message_is_async(self, get_rpc_method, publish_message):
@mock.patch('mistral.context.MistralContext')
def test__on_message_is_async(self, mistral_context, get_rpc_method,
publish_message):
result = 'result'
request = {
'async': True,
'rpc_ctx': self.ctx,
'rpc_ctx': {},
'rpc_method': 'found_method',
'arguments': {
'a': 1,
@ -217,7 +219,7 @@ class KombuServerTestCase(base.KombuTestCase):
self.server._on_message(request, message)
rpc_method.assert_called_once_with(
rpc_ctx=self.ctx,
rpc_ctx=mistral_context(),
a=1,
b=2
)
@ -225,11 +227,13 @@ class KombuServerTestCase(base.KombuTestCase):
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
def test__on_message_is_sync(self, get_rpc_method, publish_message):
@mock.patch('mistral.context.MistralContext')
def test__on_message_is_sync(self, mistral_context, get_rpc_method,
publish_message):
result = 'result'
request = {
'async': False,
'rpc_ctx': self.ctx,
'rpc_ctx': {},
'rpc_method': 'found_method',
'arguments': {
'a': 1,
@ -250,7 +254,7 @@ class KombuServerTestCase(base.KombuTestCase):
self.server._on_message(request, message)
rpc_method.assert_called_once_with(
rpc_ctx=self.ctx,
rpc_ctx=mistral_context(),
a=1,
b=2
)