Merge "Create MistralContext from rpc context in kombu engine"
This commit is contained in:
commit
dc3ba20179
@ -115,6 +115,8 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
context = auth_context.MistralContext(**ctx)
|
||||
auth_context.set_ctx(context)
|
||||
|
||||
return context
|
||||
|
||||
def publish_message(self, body, reply_to, corr_id, type='response'):
|
||||
with kombu.producers[self.conn].acquire(block=True) as producer:
|
||||
producer.publish(
|
||||
@ -144,13 +146,13 @@ class KombuRPCServer(rpc_base.RPCServer, kombu_base.Base):
|
||||
request)
|
||||
|
||||
is_async = request.get('async', False)
|
||||
rpc_context = request.get('rpc_ctx')
|
||||
rpc_ctx = request.get('rpc_ctx')
|
||||
rpc_method_name = request.get('rpc_method')
|
||||
arguments = request.get('arguments')
|
||||
correlation_id = message.properties['correlation_id']
|
||||
reply_to = message.properties['reply_to']
|
||||
|
||||
self._set_auth_ctx(rpc_context)
|
||||
rpc_context = self._set_auth_ctx(rpc_ctx)
|
||||
|
||||
rpc_method = self._get_rpc_method(rpc_method_name)
|
||||
|
||||
|
@ -194,11 +194,13 @@ class KombuServerTestCase(base.KombuTestCase):
|
||||
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
|
||||
def test__on_message_is_async(self, get_rpc_method, publish_message):
|
||||
@mock.patch('mistral.context.MistralContext')
|
||||
def test__on_message_is_async(self, mistral_context, get_rpc_method,
|
||||
publish_message):
|
||||
result = 'result'
|
||||
request = {
|
||||
'async': True,
|
||||
'rpc_ctx': self.ctx,
|
||||
'rpc_ctx': {},
|
||||
'rpc_method': 'found_method',
|
||||
'arguments': {
|
||||
'a': 1,
|
||||
@ -217,7 +219,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
||||
|
||||
self.server._on_message(request, message)
|
||||
rpc_method.assert_called_once_with(
|
||||
rpc_ctx=self.ctx,
|
||||
rpc_ctx=mistral_context(),
|
||||
a=1,
|
||||
b=2
|
||||
)
|
||||
@ -225,11 +227,13 @@ class KombuServerTestCase(base.KombuTestCase):
|
||||
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
|
||||
def test__on_message_is_sync(self, get_rpc_method, publish_message):
|
||||
@mock.patch('mistral.context.MistralContext')
|
||||
def test__on_message_is_sync(self, mistral_context, get_rpc_method,
|
||||
publish_message):
|
||||
result = 'result'
|
||||
request = {
|
||||
'async': False,
|
||||
'rpc_ctx': self.ctx,
|
||||
'rpc_ctx': {},
|
||||
'rpc_method': 'found_method',
|
||||
'arguments': {
|
||||
'a': 1,
|
||||
@ -250,7 +254,7 @@ class KombuServerTestCase(base.KombuTestCase):
|
||||
|
||||
self.server._on_message(request, message)
|
||||
rpc_method.assert_called_once_with(
|
||||
rpc_ctx=self.ctx,
|
||||
rpc_ctx=mistral_context(),
|
||||
a=1,
|
||||
b=2
|
||||
)
|
||||
|
Loading…
Reference in New Issue
Block a user