Merge "Fix for raising exception directly to kombu"
This commit is contained in:
commit
1b22267b0f
@ -27,6 +27,7 @@ LOG = logging.getLogger(__name__)
|
||||
IS_RECEIVED = 'kombu_rpc_is_received'
|
||||
RESULT = 'kombu_rpc_result'
|
||||
CORR_ID = 'kombu_rpc_correlation_id'
|
||||
TYPE = 'kombu_rpc_type'
|
||||
|
||||
|
||||
class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
@ -108,10 +109,8 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
message.properties['correlation_id']):
|
||||
utils.set_thread_local(IS_RECEIVED, True)
|
||||
|
||||
# TODO(ddeja): Decide if raising exception to kombu is best
|
||||
# behaviour.
|
||||
if message.properties.get('type') == 'error':
|
||||
raise response
|
||||
utils.set_thread_local(TYPE, 'error')
|
||||
utils.set_thread_local(RESULT, response)
|
||||
|
||||
def _wait_for_result(self):
|
||||
@ -170,9 +169,13 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
|
||||
self._wait_for_result()
|
||||
result = utils.get_thread_local(RESULT)
|
||||
res_type = utils.get_thread_local(TYPE)
|
||||
|
||||
self._clear_thread_local()
|
||||
|
||||
if res_type == 'error':
|
||||
raise result
|
||||
|
||||
return result
|
||||
|
||||
@staticmethod
|
||||
@ -180,6 +183,7 @@ class KombuRPCClient(rpc_base.RPCClient, kombu_base.Base):
|
||||
utils.set_thread_local(RESULT, None)
|
||||
utils.set_thread_local(CORR_ID, None)
|
||||
utils.set_thread_local(IS_RECEIVED, None)
|
||||
utils.set_thread_local(TYPE, None)
|
||||
|
||||
def sync_call(self, ctx, method, target=None, **kwargs):
|
||||
return self._call(ctx, method, async=False, target=target, **kwargs)
|
||||
|
@ -20,7 +20,6 @@ from mistral import utils
|
||||
|
||||
import mock
|
||||
import socket
|
||||
import sys
|
||||
|
||||
with mock.patch.dict('sys.modules', kombu=fake_kombu):
|
||||
from mistral.engine.rpc_backend.kombu import kombu_client
|
||||
@ -74,13 +73,37 @@ class KombuClientTestCase(base.KombuTestCase):
|
||||
side_effect=socket.timeout
|
||||
)
|
||||
|
||||
self.client._timeout = sys.float_info.epsilon
|
||||
self.assertRaises(
|
||||
exc.MistralException,
|
||||
self.client.sync_call,
|
||||
self.ctx,
|
||||
'method_not_found'
|
||||
)
|
||||
# Check if consumer.consume was called once.
|
||||
self.assertEqual(self.client.consumer.consume.call_count, 1)
|
||||
|
||||
@mock.patch.object(utils, 'set_thread_local', mock.MagicMock())
|
||||
@mock.patch.object(utils, 'get_thread_local')
|
||||
def test_sync_call_result_type_error(self, get_thread_local):
|
||||
|
||||
def side_effect(var_name):
|
||||
if var_name == kombu_client.IS_RECEIVED:
|
||||
return True
|
||||
elif var_name == kombu_client.RESULT:
|
||||
return TestException()
|
||||
elif var_name == kombu_client.TYPE:
|
||||
return 'error'
|
||||
|
||||
get_thread_local.side_effect = side_effect
|
||||
|
||||
self.client.conn.drain_events = mock.MagicMock()
|
||||
|
||||
self.assertRaises(
|
||||
TestException,
|
||||
self.client.sync_call,
|
||||
self.ctx,
|
||||
'method'
|
||||
)
|
||||
# check if consumer.consume was called once
|
||||
self.assertEqual(self.client.consumer.consume.call_count, 1)
|
||||
|
||||
@ -143,15 +166,11 @@ class KombuClientTestCase(base.KombuTestCase):
|
||||
|
||||
kombu_client.LOG = mock.MagicMock()
|
||||
|
||||
self.assertRaises(
|
||||
TestException,
|
||||
self.client._on_response,
|
||||
response,
|
||||
message
|
||||
)
|
||||
self.client._on_response(response, message)
|
||||
|
||||
self.assertEqual(kombu_client.LOG.debug.call_count, 2)
|
||||
self.assertEqual(kombu_client.LOG.exception.call_count, 0)
|
||||
self.assertEqual(set_thread_local.call_count, 1)
|
||||
self.assertEqual(set_thread_local.call_count, 3)
|
||||
|
||||
@mock.patch.object(utils, 'set_thread_local')
|
||||
@mock.patch.object(utils, 'get_thread_local', mock.MagicMock(
|
||||
|
Loading…
Reference in New Issue
Block a user