diff --git a/mistral/engine/rpc/kombu/kombu_client.py b/mistral/engine/rpc/kombu/kombu_client.py index dcaa63a0..4a3536cf 100644 --- a/mistral/engine/rpc/kombu/kombu_client.py +++ b/mistral/engine/rpc/kombu/kombu_client.py @@ -107,6 +107,8 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base): message.properties['correlation_id']): utils.set_thread_local(IS_RECEIVED, True) + # TODO(ddeja): Decide if raising exception to kombu is best + # behaviour. if message.properties.get('type') == 'error': raise response utils.set_thread_local(RESULT, response) diff --git a/mistral/tests/unit/engine/rpc/kombu/__init__.py b/mistral/tests/unit/engine/rpc/kombu/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/mistral/tests/unit/engine/rpc/kombu/base.py b/mistral/tests/unit/engine/rpc/kombu/base.py new file mode 100644 index 00000000..a76ae926 --- /dev/null +++ b/mistral/tests/unit/engine/rpc/kombu/base.py @@ -0,0 +1,20 @@ +# Copyright (c) 2016 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from mistral.tests.unit import base + + +class KombuTestCase(base.BaseTest): + pass diff --git a/mistral/tests/unit/engine/rpc/kombu/fake_kombu.py b/mistral/tests/unit/engine/rpc/kombu/fake_kombu.py new file mode 100644 index 00000000..72b1d14a --- /dev/null +++ b/mistral/tests/unit/engine/rpc/kombu/fake_kombu.py @@ -0,0 +1,42 @@ +# Copyright (c) 2016 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +import mock + + +producer = mock.MagicMock() + +producers = mock.MagicMock() +producers.__getitem__ = lambda *args, **kwargs: producer + +connection = mock.MagicMock() + +connections = mock.MagicMock() +connections.__getitem__ = lambda *args, **kwargs: connection + + +def BrokerConnection(*args, **kwargs): + return mock.MagicMock() + + +def Exchange(*args, **kwargs): + return mock.MagicMock() + + +def Queue(*args, **kwargs): + return mock.MagicMock() + + +def Consumer(*args, **kwargs): + return mock.MagicMock() diff --git a/mistral/tests/unit/engine/rpc/kombu/test_kombu_client.py b/mistral/tests/unit/engine/rpc/kombu/test_kombu_client.py new file mode 100644 index 00000000..12bfd1e3 --- /dev/null +++ b/mistral/tests/unit/engine/rpc/kombu/test_kombu_client.py @@ -0,0 +1,173 @@ +# Copyright (c) 2016 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from mistral import exceptions as exc +from mistral.tests.unit.engine.rpc.kombu import base +from mistral.tests.unit.engine.rpc.kombu import fake_kombu +from mistral import utils + +import mock +import socket +import sys + +with mock.patch.dict('sys.modules', kombu=fake_kombu): + from mistral.engine.rpc.kombu import kombu_client + + +class TestException(exc.MistralException): + pass + + +class KombuClientTestCase(base.KombuTestCase): + + _RESPONSE = "response" + + def setUp(self): + super(KombuClientTestCase, self).setUp() + conf = mock.MagicMock() + + self.client = kombu_client.KombuRPCClient(conf) + self.ctx = type('context', (object,), {'to_dict': lambda self: {}})() + + @mock.patch.object(utils, 'set_thread_local', mock.MagicMock()) + @mock.patch.object(utils, 'get_thread_local') + def test_sync_call_result_get(self, get_thread_local): + + def side_effect(var_name): + if var_name == kombu_client.IS_RECEIVED: + return True + elif var_name == kombu_client.RESULT: + return self._RESPONSE + + get_thread_local.side_effect = side_effect + + response = self.client.sync_call(self.ctx, 'method') + self.assertEqual(response, self._RESPONSE) + # check if consumer.consume was called once + self.assertEqual(self.client.consumer.consume.call_count, 1) + + @mock.patch.object(utils, 'set_thread_local', mock.MagicMock()) + @mock.patch.object(utils, 'get_thread_local') + def test_sync_call_result_not_get(self, get_thread_local): + + def side_effect(var_name): + if var_name == kombu_client.IS_RECEIVED: + return False + elif var_name == kombu_client.RESULT: + return self._RESPONSE + + get_thread_local.side_effect = side_effect + + self.client.conn.drain_events = mock.MagicMock( + side_effect=socket.timeout + ) + + self.client._timeout = sys.float_info.epsilon + self.assertRaises( + exc.MistralException, + self.client.sync_call, + self.ctx, + 'method_not_found' + ) + # check if consumer.consume was called once + self.assertEqual(self.client.consumer.consume.call_count, 1) + + @mock.patch.object(utils, 'set_thread_local', mock.MagicMock()) + @mock.patch.object(utils, 'get_thread_local') + def test_async_call(self, get_thread_local): + + def side_effect(var_name): + if var_name == kombu_client.IS_RECEIVED: + return True + elif var_name == kombu_client.RESULT: + return self._RESPONSE + + get_thread_local.side_effect = side_effect + + response = self.client.async_call(self.ctx, 'method') + self.assertEqual(response, None) + # check if consumer.consume was called once + self.assertEqual(self.client.consumer.consume.call_count, 1) + + def test__on_response_message_ack_fail(self): + message = mock.MagicMock() + message.ack.side_effect = Exception('Test Exception') + response = 'response' + + kombu_client.LOG = mock.MagicMock() + + self.client._on_response(response, message) + self.assertEqual(kombu_client.LOG.debug.call_count, 1) + self.assertEqual(kombu_client.LOG.exception.call_count, 1) + + @mock.patch.object(utils, 'get_thread_local', mock.MagicMock( + return_value=False + )) + def test__on_response_message_ack_ok_corr_id_not_match(self): + message = mock.MagicMock() + message.properties = mock.MagicMock() + message.properties.__getitem__ = lambda *args, **kwargs: True + response = 'response' + + kombu_client.LOG = mock.MagicMock() + + self.client._on_response(response, message) + self.assertEqual(kombu_client.LOG.debug.call_count, 2) + self.assertEqual(kombu_client.LOG.exception.call_count, 0) + + @mock.patch.object(utils, 'set_thread_local') + @mock.patch.object(utils, 'get_thread_local', mock.MagicMock( + return_value=True + )) + def test__on_response_message_ack_ok_messsage_type_error( + self, + set_thread_local + ): + message = mock.MagicMock() + message.properties = mock.MagicMock() + message.properties.__getitem__ = lambda *args, **kwargs: True + message.properties.get.return_value = 'error' + response = TestException('response') + + kombu_client.LOG = mock.MagicMock() + + self.assertRaises( + TestException, + self.client._on_response, + response, + message + ) + self.assertEqual(kombu_client.LOG.debug.call_count, 2) + self.assertEqual(kombu_client.LOG.exception.call_count, 0) + self.assertEqual(set_thread_local.call_count, 1) + + @mock.patch.object(utils, 'set_thread_local') + @mock.patch.object(utils, 'get_thread_local', mock.MagicMock( + return_value=True + )) + def test__on_response_message_ack_ok(self, set_thread_local): + + message = mock.MagicMock() + message.properties = mock.MagicMock() + message.properties.__getitem__ = lambda *args, **kwargs: True + response = 'response' + + kombu_client.LOG = mock.MagicMock() + + self.client._on_response(response, message) + + self.assertEqual(kombu_client.LOG.debug.call_count, 2) + self.assertEqual(kombu_client.LOG.exception.call_count, 0) + self.assertEqual(set_thread_local.call_count, 2) diff --git a/mistral/tests/unit/engine/rpc/kombu/test_kombu_server.py b/mistral/tests/unit/engine/rpc/kombu/test_kombu_server.py new file mode 100644 index 00000000..b4fa5f4c --- /dev/null +++ b/mistral/tests/unit/engine/rpc/kombu/test_kombu_server.py @@ -0,0 +1,261 @@ +# Copyright (c) 2016 Intel Corporation +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from mistral import exceptions as exc +from mistral.tests.unit.engine.rpc.kombu import base +from mistral.tests.unit.engine.rpc.kombu import fake_kombu + +import mock +import socket + +with mock.patch.dict('sys.modules', kombu=fake_kombu): + from mistral.engine.rpc.kombu import kombu_server + + +class TestException(exc.MistralError): + pass + + +class KombuServerTestCase(base.KombuTestCase): + + def setUp(self): + super(KombuServerTestCase, self).setUp() + + self.conf = {} + self.conf['exchange'] = 'test_exchange' + self.server = kombu_server.KombuRPCServer(self.conf) + self.ctx = type('context', (object,), {'to_dict': lambda self: {}})() + + def test_is_running_is_running(self): + self.server._running.set() + self.assertTrue(self.server.is_running) + + def test_is_running_is_not_running(self): + self.server._running.clear() + self.assertFalse(self.server.is_running) + + def test_stop(self): + self.server.stop() + self.assertFalse(self.server.is_running) + + def test_publish_message(self): + body = 'body' + reply_to = 'reply_to' + corr_id = 'corr_id' + type = 'type' + + acquire_mock = mock.MagicMock() + fake_kombu.producer.acquire.return_value = acquire_mock + + enter_mock = mock.MagicMock() + acquire_mock.__enter__.return_value = enter_mock + + self.server.publish_message(body, reply_to, corr_id, type) + enter_mock.publish.assert_called_once_with( + body=body, + exchange=self.conf['exchange'], + routing_key=reply_to, + correlation_id=corr_id, + type=type, + serializer=None + ) + + def test_run_launch_successfully(self): + acquire_mock = mock.MagicMock() + acquire_mock.drain_events.side_effect = TestException() + fake_kombu.connection.acquire.return_value = acquire_mock + + self.assertRaises(TestException, self.server.run) + self.assertTrue(self.server.is_running) + + def test_run_launch_successfully_than_stop(self): + + def side_effect(*args, **kwargs): + self.assertTrue(self.server.is_running) + self.server.stop() + + acquire_mock = mock.MagicMock() + acquire_mock.drain_events.side_effect = side_effect + fake_kombu.connection.acquire.return_value = acquire_mock + + self.server.run() + self.assertFalse(self.server.is_running) + + def test_run_raise_mistral_exception(self): + acquire_mock = mock.MagicMock() + acquire_mock.drain_events.side_effect = socket.error() + fake_kombu.connection.acquire.return_value = acquire_mock + + self.assertRaises(exc.MistralException, self.server.run) + + def test_run_socket_timeout_still_running(self): + + def side_effect(*args, **kwargs): + if acquire_mock.drain_events.call_count == 0: + raise socket.timeout() + raise TestException() + + acquire_mock = mock.MagicMock() + acquire_mock.drain_events.side_effect = side_effect + fake_kombu.connection.acquire.return_value = acquire_mock + + self.assertRaises( + TestException, + self.server.run + ) + self.assertTrue(self.server.is_running) + + def test_run_keyboard_interrupt_not_running(self): + acquire_mock = mock.MagicMock() + acquire_mock.drain_events.side_effect = KeyboardInterrupt() + fake_kombu.connection.acquire.return_value = acquire_mock + + self.assertEqual(self.server.run(), None) + self.assertFalse(self.server.is_running) + + @mock.patch.object( + kombu_server.KombuRPCServer, + '_on_message', + mock.MagicMock() + ) + @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') + def test__on_message_safe_message_processing_ok(self, publish_message): + message = mock.MagicMock() + + self.server._on_message_safe(None, message) + + self.assertEqual(message.ack.call_count, 1) + self.assertEqual(publish_message.call_count, 0) + + @mock.patch.object(kombu_server.KombuRPCServer, '_on_message') + @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') + def test__on_message_safe_message_processing_raise( + self, + publish_message, + _on_message + ): + reply_to = 'reply_to' + correlation_id = 'corr_id' + message = mock.MagicMock() + message.properties = { + 'reply_to': reply_to, + 'correlation_id': correlation_id + } + + test_exception = TestException() + _on_message.side_effect = test_exception + + self.server._on_message_safe(None, message) + + self.assertEqual(message.ack.call_count, 1) + publish_message.assert_called_once_with( + test_exception, + reply_to, + correlation_id, + type='error' + ) + + @mock.patch.object( + kombu_server.KombuRPCServer, + '_get_rpc_method', + mock.MagicMock(return_value=None) + ) + def test__on_message_rpc_method_not_found(self): + request = { + 'rpc_ctx': self.ctx, + 'rpc_method': 'not_found_method', + 'arguments': None + } + + message = mock.MagicMock() + message.properties = { + 'reply_to': None, + 'correlation_id': None + } + + self.assertRaises( + exc.MistralException, + self.server._on_message, + request, + message + ) + + @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') + @mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method') + def test__on_message_is_async(self, get_rpc_method, publish_message): + result = 'result' + request = { + 'async': True, + 'rpc_ctx': self.ctx, + 'rpc_method': 'found_method', + 'arguments': { + 'a': 1, + 'b': 2 + } + } + + message = mock.MagicMock() + message.properties = { + 'reply_to': None, + 'correlation_id': None + } + + rpc_method = mock.MagicMock(return_value=result) + get_rpc_method.return_value = rpc_method + + self.server._on_message(request, message) + rpc_method.assert_called_once_with( + rpc_ctx=self.ctx, + a=1, + b=2 + ) + self.assertEqual(publish_message.call_count, 0) + + @mock.patch.object(kombu_server.KombuRPCServer, 'publish_message') + @mock.patch.object(kombu_server.KombuRPCServer, '_get_rpc_method') + def test__on_message_is_sync(self, get_rpc_method, publish_message): + result = 'result' + request = { + 'async': False, + 'rpc_ctx': self.ctx, + 'rpc_method': 'found_method', + 'arguments': { + 'a': 1, + 'b': 2 + } + } + + reply_to = 'reply_to' + correlation_id = 'corr_id' + message = mock.MagicMock() + message.properties = { + 'reply_to': reply_to, + 'correlation_id': correlation_id + } + + rpc_method = mock.MagicMock(return_value=result) + get_rpc_method.return_value = rpc_method + + self.server._on_message(request, message) + rpc_method.assert_called_once_with( + rpc_ctx=self.ctx, + a=1, + b=2 + ) + publish_message.assert_called_once_with( + result, + reply_to, + correlation_id + )