Add tests for Kombu driver
Implements blueprint mistral-alternative-rpc Change-Id: I80c426e3fd3611ada0b969a1f097410f63554cee
This commit is contained in:
parent
320dd398ad
commit
4919a0a483
@ -107,6 +107,8 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
|||||||
message.properties['correlation_id']):
|
message.properties['correlation_id']):
|
||||||
utils.set_thread_local(IS_RECEIVED, True)
|
utils.set_thread_local(IS_RECEIVED, True)
|
||||||
|
|
||||||
|
# TODO(ddeja): Decide if raising exception to kombu is best
|
||||||
|
# behaviour.
|
||||||
if message.properties.get('type') == 'error':
|
if message.properties.get('type') == 'error':
|
||||||
raise response
|
raise response
|
||||||
utils.set_thread_local(RESULT, response)
|
utils.set_thread_local(RESULT, response)
|
||||||
|
0
mistral/tests/unit/engine/rpc/kombu/__init__.py
Normal file
0
mistral/tests/unit/engine/rpc/kombu/__init__.py
Normal file
20
mistral/tests/unit/engine/rpc/kombu/base.py
Normal file
20
mistral/tests/unit/engine/rpc/kombu/base.py
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
# Copyright (c) 2016 Intel Corporation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from mistral.tests.unit import base
|
||||||
|
|
||||||
|
|
||||||
|
class KombuTestCase(base.BaseTest):
|
||||||
|
pass
|
42
mistral/tests/unit/engine/rpc/kombu/fake_kombu.py
Normal file
42
mistral/tests/unit/engine/rpc/kombu/fake_kombu.py
Normal file
@ -0,0 +1,42 @@
|
|||||||
|
# Copyright (c) 2016 Intel Corporation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
import mock
|
||||||
|
|
||||||
|
|
||||||
|
producer = mock.MagicMock()
|
||||||
|
|
||||||
|
producers = mock.MagicMock()
|
||||||
|
producers.__getitem__ = lambda *args, **kwargs: producer
|
||||||
|
|
||||||
|
connection = mock.MagicMock()
|
||||||
|
|
||||||
|
connections = mock.MagicMock()
|
||||||
|
connections.__getitem__ = lambda *args, **kwargs: connection
|
||||||
|
|
||||||
|
|
||||||
|
def BrokerConnection(*args, **kwargs):
|
||||||
|
return mock.MagicMock()
|
||||||
|
|
||||||
|
|
||||||
|
def Exchange(*args, **kwargs):
|
||||||
|
return mock.MagicMock()
|
||||||
|
|
||||||
|
|
||||||
|
def Queue(*args, **kwargs):
|
||||||
|
return mock.MagicMock()
|
||||||
|
|
||||||
|
|
||||||
|
def Consumer(*args, **kwargs):
|
||||||
|
return mock.MagicMock()
|
173
mistral/tests/unit/engine/rpc/kombu/test_kombu_client.py
Normal file
173
mistral/tests/unit/engine/rpc/kombu/test_kombu_client.py
Normal file
@ -0,0 +1,173 @@
|
|||||||
|
# Copyright (c) 2016 Intel Corporation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from mistral import exceptions as exc
|
||||||
|
from mistral.tests.unit.engine.rpc.kombu import base
|
||||||
|
from mistral.tests.unit.engine.rpc.kombu import fake_kombu
|
||||||
|
from mistral import utils
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import socket
|
||||||
|
import sys
|
||||||
|
|
||||||
|
with mock.patch.dict('sys.modules', kombu=fake_kombu):
|
||||||
|
from mistral.engine.rpc.kombu import kombu_client
|
||||||
|
|
||||||
|
|
||||||
|
class TestException(exc.MistralException):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KombuClientTestCase(base.KombuTestCase):
|
||||||
|
|
||||||
|
_RESPONSE = "response"
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(KombuClientTestCase, self).setUp()
|
||||||
|
conf = mock.MagicMock()
|
||||||
|
|
||||||
|
self.client = kombu_client.KombuRPCClient(conf)
|
||||||
|
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
|
||||||
|
|
||||||
|
@mock.patch.object(utils, 'set_thread_local', mock.MagicMock())
|
||||||
|
@mock.patch.object(utils, 'get_thread_local')
|
||||||
|
def test_sync_call_result_get(self, get_thread_local):
|
||||||
|
|
||||||
|
def side_effect(var_name):
|
||||||
|
if var_name == kombu_client.IS_RECEIVED:
|
||||||
|
return True
|
||||||
|
elif var_name == kombu_client.RESULT:
|
||||||
|
return self._RESPONSE
|
||||||
|
|
||||||
|
get_thread_local.side_effect = side_effect
|
||||||
|
|
||||||
|
response = self.client.sync_call(self.ctx, 'method')
|
||||||
|
self.assertEqual(response, self._RESPONSE)
|
||||||
|
# check if consumer.consume was called once
|
||||||
|
self.assertEqual(self.client.consumer.consume.call_count, 1)
|
||||||
|
|
||||||
|
@mock.patch.object(utils, 'set_thread_local', mock.MagicMock())
|
||||||
|
@mock.patch.object(utils, 'get_thread_local')
|
||||||
|
def test_sync_call_result_not_get(self, get_thread_local):
|
||||||
|
|
||||||
|
def side_effect(var_name):
|
||||||
|
if var_name == kombu_client.IS_RECEIVED:
|
||||||
|
return False
|
||||||
|
elif var_name == kombu_client.RESULT:
|
||||||
|
return self._RESPONSE
|
||||||
|
|
||||||
|
get_thread_local.side_effect = side_effect
|
||||||
|
|
||||||
|
self.client.conn.drain_events = mock.MagicMock(
|
||||||
|
side_effect=socket.timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
self.client._timeout = sys.float_info.epsilon
|
||||||
|
self.assertRaises(
|
||||||
|
exc.MistralException,
|
||||||
|
self.client.sync_call,
|
||||||
|
self.ctx,
|
||||||
|
'method_not_found'
|
||||||
|
)
|
||||||
|
# check if consumer.consume was called once
|
||||||
|
self.assertEqual(self.client.consumer.consume.call_count, 1)
|
||||||
|
|
||||||
|
@mock.patch.object(utils, 'set_thread_local', mock.MagicMock())
|
||||||
|
@mock.patch.object(utils, 'get_thread_local')
|
||||||
|
def test_async_call(self, get_thread_local):
|
||||||
|
|
||||||
|
def side_effect(var_name):
|
||||||
|
if var_name == kombu_client.IS_RECEIVED:
|
||||||
|
return True
|
||||||
|
elif var_name == kombu_client.RESULT:
|
||||||
|
return self._RESPONSE
|
||||||
|
|
||||||
|
get_thread_local.side_effect = side_effect
|
||||||
|
|
||||||
|
response = self.client.async_call(self.ctx, 'method')
|
||||||
|
self.assertEqual(response, None)
|
||||||
|
# check if consumer.consume was called once
|
||||||
|
self.assertEqual(self.client.consumer.consume.call_count, 1)
|
||||||
|
|
||||||
|
def test__on_response_message_ack_fail(self):
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.ack.side_effect = Exception('Test Exception')
|
||||||
|
response = 'response'
|
||||||
|
|
||||||
|
kombu_client.LOG = mock.MagicMock()
|
||||||
|
|
||||||
|
self.client._on_response(response, message)
|
||||||
|
self.assertEqual(kombu_client.LOG.debug.call_count, 1)
|
||||||
|
self.assertEqual(kombu_client.LOG.exception.call_count, 1)
|
||||||
|
|
||||||
|
@mock.patch.object(utils, 'get_thread_local', mock.MagicMock(
|
||||||
|
return_value=False
|
||||||
|
))
|
||||||
|
def test__on_response_message_ack_ok_corr_id_not_match(self):
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = mock.MagicMock()
|
||||||
|
message.properties.__getitem__ = lambda *args, **kwargs: True
|
||||||
|
response = 'response'
|
||||||
|
|
||||||
|
kombu_client.LOG = mock.MagicMock()
|
||||||
|
|
||||||
|
self.client._on_response(response, message)
|
||||||
|
self.assertEqual(kombu_client.LOG.debug.call_count, 2)
|
||||||
|
self.assertEqual(kombu_client.LOG.exception.call_count, 0)
|
||||||
|
|
||||||
|
@mock.patch.object(utils, 'set_thread_local')
|
||||||
|
@mock.patch.object(utils, 'get_thread_local', mock.MagicMock(
|
||||||
|
return_value=True
|
||||||
|
))
|
||||||
|
def test__on_response_message_ack_ok_messsage_type_error(
|
||||||
|
self,
|
||||||
|
set_thread_local
|
||||||
|
):
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = mock.MagicMock()
|
||||||
|
message.properties.__getitem__ = lambda *args, **kwargs: True
|
||||||
|
message.properties.get.return_value = 'error'
|
||||||
|
response = TestException('response')
|
||||||
|
|
||||||
|
kombu_client.LOG = mock.MagicMock()
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
TestException,
|
||||||
|
self.client._on_response,
|
||||||
|
response,
|
||||||
|
message
|
||||||
|
)
|
||||||
|
self.assertEqual(kombu_client.LOG.debug.call_count, 2)
|
||||||
|
self.assertEqual(kombu_client.LOG.exception.call_count, 0)
|
||||||
|
self.assertEqual(set_thread_local.call_count, 1)
|
||||||
|
|
||||||
|
@mock.patch.object(utils, 'set_thread_local')
|
||||||
|
@mock.patch.object(utils, 'get_thread_local', mock.MagicMock(
|
||||||
|
return_value=True
|
||||||
|
))
|
||||||
|
def test__on_response_message_ack_ok(self, set_thread_local):
|
||||||
|
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = mock.MagicMock()
|
||||||
|
message.properties.__getitem__ = lambda *args, **kwargs: True
|
||||||
|
response = 'response'
|
||||||
|
|
||||||
|
kombu_client.LOG = mock.MagicMock()
|
||||||
|
|
||||||
|
self.client._on_response(response, message)
|
||||||
|
|
||||||
|
self.assertEqual(kombu_client.LOG.debug.call_count, 2)
|
||||||
|
self.assertEqual(kombu_client.LOG.exception.call_count, 0)
|
||||||
|
self.assertEqual(set_thread_local.call_count, 2)
|
261
mistral/tests/unit/engine/rpc/kombu/test_kombu_server.py
Normal file
261
mistral/tests/unit/engine/rpc/kombu/test_kombu_server.py
Normal file
@ -0,0 +1,261 @@
|
|||||||
|
# Copyright (c) 2016 Intel Corporation
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from mistral import exceptions as exc
|
||||||
|
from mistral.tests.unit.engine.rpc.kombu import base
|
||||||
|
from mistral.tests.unit.engine.rpc.kombu import fake_kombu
|
||||||
|
|
||||||
|
import mock
|
||||||
|
import socket
|
||||||
|
|
||||||
|
with mock.patch.dict('sys.modules', kombu=fake_kombu):
|
||||||
|
from mistral.engine.rpc.kombu import kombu_server
|
||||||
|
|
||||||
|
|
||||||
|
class TestException(exc.MistralError):
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
class KombuServerTestCase(base.KombuTestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
super(KombuServerTestCase, self).setUp()
|
||||||
|
|
||||||
|
self.conf = {}
|
||||||
|
self.conf['exchange'] = 'test_exchange'
|
||||||
|
self.server = kombu_server.KombuRPCServer(self.conf)
|
||||||
|
self.ctx = type('context', (object,), {'to_dict': lambda self: {}})()
|
||||||
|
|
||||||
|
def test_is_running_is_running(self):
|
||||||
|
self.server._running.set()
|
||||||
|
self.assertTrue(self.server.is_running)
|
||||||
|
|
||||||
|
def test_is_running_is_not_running(self):
|
||||||
|
self.server._running.clear()
|
||||||
|
self.assertFalse(self.server.is_running)
|
||||||
|
|
||||||
|
def test_stop(self):
|
||||||
|
self.server.stop()
|
||||||
|
self.assertFalse(self.server.is_running)
|
||||||
|
|
||||||
|
def test_publish_message(self):
|
||||||
|
body = 'body'
|
||||||
|
reply_to = 'reply_to'
|
||||||
|
corr_id = 'corr_id'
|
||||||
|
type = 'type'
|
||||||
|
|
||||||
|
acquire_mock = mock.MagicMock()
|
||||||
|
fake_kombu.producer.acquire.return_value = acquire_mock
|
||||||
|
|
||||||
|
enter_mock = mock.MagicMock()
|
||||||
|
acquire_mock.__enter__.return_value = enter_mock
|
||||||
|
|
||||||
|
self.server.publish_message(body, reply_to, corr_id, type)
|
||||||
|
enter_mock.publish.assert_called_once_with(
|
||||||
|
body=body,
|
||||||
|
exchange=self.conf['exchange'],
|
||||||
|
routing_key=reply_to,
|
||||||
|
correlation_id=corr_id,
|
||||||
|
type=type,
|
||||||
|
serializer=None
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_run_launch_successfully(self):
|
||||||
|
acquire_mock = mock.MagicMock()
|
||||||
|
acquire_mock.drain_events.side_effect = TestException()
|
||||||
|
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||||
|
|
||||||
|
self.assertRaises(TestException, self.server.run)
|
||||||
|
self.assertTrue(self.server.is_running)
|
||||||
|
|
||||||
|
def test_run_launch_successfully_than_stop(self):
|
||||||
|
|
||||||
|
def side_effect(*args, **kwargs):
|
||||||
|
self.assertTrue(self.server.is_running)
|
||||||
|
self.server.stop()
|
||||||
|
|
||||||
|
acquire_mock = mock.MagicMock()
|
||||||
|
acquire_mock.drain_events.side_effect = side_effect
|
||||||
|
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||||
|
|
||||||
|
self.server.run()
|
||||||
|
self.assertFalse(self.server.is_running)
|
||||||
|
|
||||||
|
def test_run_raise_mistral_exception(self):
|
||||||
|
acquire_mock = mock.MagicMock()
|
||||||
|
acquire_mock.drain_events.side_effect = socket.error()
|
||||||
|
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||||
|
|
||||||
|
self.assertRaises(exc.MistralException, self.server.run)
|
||||||
|
|
||||||
|
def test_run_socket_timeout_still_running(self):
|
||||||
|
|
||||||
|
def side_effect(*args, **kwargs):
|
||||||
|
if acquire_mock.drain_events.call_count == 0:
|
||||||
|
raise socket.timeout()
|
||||||
|
raise TestException()
|
||||||
|
|
||||||
|
acquire_mock = mock.MagicMock()
|
||||||
|
acquire_mock.drain_events.side_effect = side_effect
|
||||||
|
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
TestException,
|
||||||
|
self.server.run
|
||||||
|
)
|
||||||
|
self.assertTrue(self.server.is_running)
|
||||||
|
|
||||||
|
def test_run_keyboard_interrupt_not_running(self):
|
||||||
|
acquire_mock = mock.MagicMock()
|
||||||
|
acquire_mock.drain_events.side_effect = KeyboardInterrupt()
|
||||||
|
fake_kombu.connection.acquire.return_value = acquire_mock
|
||||||
|
|
||||||
|
self.assertEqual(self.server.run(), None)
|
||||||
|
self.assertFalse(self.server.is_running)
|
||||||
|
|
||||||
|
@mock.patch.object(
|
||||||
|
kombu_server.KombuRPCServer,
|
||||||
|
'_on_message',
|
||||||
|
mock.MagicMock()
|
||||||
|
)
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||||
|
def test__on_message_safe_message_processing_ok(self, publish_message):
|
||||||
|
message = mock.MagicMock()
|
||||||
|
|
||||||
|
self.server._on_message_safe(None, message)
|
||||||
|
|
||||||
|
self.assertEqual(message.ack.call_count, 1)
|
||||||
|
self.assertEqual(publish_message.call_count, 0)
|
||||||
|
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, '_on_message')
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||||
|
def test__on_message_safe_message_processing_raise(
|
||||||
|
self,
|
||||||
|
publish_message,
|
||||||
|
_on_message
|
||||||
|
):
|
||||||
|
reply_to = 'reply_to'
|
||||||
|
correlation_id = 'corr_id'
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = {
|
||||||
|
'reply_to': reply_to,
|
||||||
|
'correlation_id': correlation_id
|
||||||
|
}
|
||||||
|
|
||||||
|
test_exception = TestException()
|
||||||
|
_on_message.side_effect = test_exception
|
||||||
|
|
||||||
|
self.server._on_message_safe(None, message)
|
||||||
|
|
||||||
|
self.assertEqual(message.ack.call_count, 1)
|
||||||
|
publish_message.assert_called_once_with(
|
||||||
|
test_exception,
|
||||||
|
reply_to,
|
||||||
|
correlation_id,
|
||||||
|
type='error'
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch.object(
|
||||||
|
kombu_server.KombuRPCServer,
|
||||||
|
'_get_rpc_method',
|
||||||
|
mock.MagicMock(return_value=None)
|
||||||
|
)
|
||||||
|
def test__on_message_rpc_method_not_found(self):
|
||||||
|
request = {
|
||||||
|
'rpc_ctx': self.ctx,
|
||||||
|
'rpc_method': 'not_found_method',
|
||||||
|
'arguments': None
|
||||||
|
}
|
||||||
|
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = {
|
||||||
|
'reply_to': None,
|
||||||
|
'correlation_id': None
|
||||||
|
}
|
||||||
|
|
||||||
|
self.assertRaises(
|
||||||
|
exc.MistralException,
|
||||||
|
self.server._on_message,
|
||||||
|
request,
|
||||||
|
message
|
||||||
|
)
|
||||||
|
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
|
||||||
|
def test__on_message_is_async(self, get_rpc_method, publish_message):
|
||||||
|
result = 'result'
|
||||||
|
request = {
|
||||||
|
'async': True,
|
||||||
|
'rpc_ctx': self.ctx,
|
||||||
|
'rpc_method': 'found_method',
|
||||||
|
'arguments': {
|
||||||
|
'a': 1,
|
||||||
|
'b': 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = {
|
||||||
|
'reply_to': None,
|
||||||
|
'correlation_id': None
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc_method = mock.MagicMock(return_value=result)
|
||||||
|
get_rpc_method.return_value = rpc_method
|
||||||
|
|
||||||
|
self.server._on_message(request, message)
|
||||||
|
rpc_method.assert_called_once_with(
|
||||||
|
rpc_ctx=self.ctx,
|
||||||
|
a=1,
|
||||||
|
b=2
|
||||||
|
)
|
||||||
|
self.assertEqual(publish_message.call_count, 0)
|
||||||
|
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, 'publish_message')
|
||||||
|
@mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method')
|
||||||
|
def test__on_message_is_sync(self, get_rpc_method, publish_message):
|
||||||
|
result = 'result'
|
||||||
|
request = {
|
||||||
|
'async': False,
|
||||||
|
'rpc_ctx': self.ctx,
|
||||||
|
'rpc_method': 'found_method',
|
||||||
|
'arguments': {
|
||||||
|
'a': 1,
|
||||||
|
'b': 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
reply_to = 'reply_to'
|
||||||
|
correlation_id = 'corr_id'
|
||||||
|
message = mock.MagicMock()
|
||||||
|
message.properties = {
|
||||||
|
'reply_to': reply_to,
|
||||||
|
'correlation_id': correlation_id
|
||||||
|
}
|
||||||
|
|
||||||
|
rpc_method = mock.MagicMock(return_value=result)
|
||||||
|
get_rpc_method.return_value = rpc_method
|
||||||
|
|
||||||
|
self.server._on_message(request, message)
|
||||||
|
rpc_method.assert_called_once_with(
|
||||||
|
rpc_ctx=self.ctx,
|
||||||
|
a=1,
|
||||||
|
b=2
|
||||||
|
)
|
||||||
|
publish_message.assert_called_once_with(
|
||||||
|
result,
|
||||||
|
reply_to,
|
||||||
|
correlation_id
|
||||||
|
)
|
Loading…
Reference in New Issue
Block a user