diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index ae8e0e40..5982a463 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -211,8 +211,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): " correlation_id=%s)", request, topic, self._uuid, request.uuid) try: - self._proxy.publish(msg=request, - routing_key=topic, + self._proxy.publish(request, topic, reply_to=self._uuid, correlation_id=request.uuid) except Exception: diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 6f608f42..3f279a77 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -99,16 +99,15 @@ class Proxy(object): """Return whether the proxy is running.""" return self._running.is_set() - def _make_queue(self, name, exchange, **kwargs): - """Make named queue for the given exchange.""" - return kombu.Queue(name="%s_%s" % (self._exchange_name, name), - exchange=exchange, - routing_key=name, - durable=False, - auto_delete=True, - **kwargs) + def _make_queue(self, routing_key, exchange, channel=None): + """Make a named queue for the given exchange.""" + queue_name = "%s_%s" % (self._exchange_name, routing_key) + return kombu.Queue(name=queue_name, + routing_key=routing_key, durable=False, + exchange=exchange, auto_delete=True, + channel=channel) - def publish(self, msg, routing_key, **kwargs): + def publish(self, msg, routing_key, reply_to=None, correlation_id=None): """Publish message to the named exchange with given routing key.""" if isinstance(routing_key, six.string_types): routing_keys = [routing_key] @@ -123,7 +122,8 @@ class Proxy(object): exchange=self._exchange, declare=[queue], type=msg.TYPE, - **kwargs) + reply_to=reply_to, + correlation_id=correlation_id) def start(self): """Start proxy.""" diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index d2b97bfe..ba73ad7e 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -214,8 +214,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.task_args, None, self.timeout), mock.call.request.transition_and_log_error(pr.PENDING, logger=mock.ANY), - mock.call.proxy.publish(msg=self.request_inst_mock, - routing_key=self.executor_topic, + mock.call.proxy.publish(self.request_inst_mock, + self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid) ] @@ -236,8 +236,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): result=self.task_result), mock.call.request.transition_and_log_error(pr.PENDING, logger=mock.ANY), - mock.call.proxy.publish(msg=self.request_inst_mock, - routing_key=self.executor_topic, + mock.call.proxy.publish(self.request_inst_mock, + self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid) ] @@ -267,8 +267,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.task_args, None, self.timeout), mock.call.request.transition_and_log_error(pr.PENDING, logger=mock.ANY), - mock.call.proxy.publish(msg=self.request_inst_mock, - routing_key=self.executor_topic, + mock.call.proxy.publish(self.request_inst_mock, + self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid), mock.call.request.transition_and_log_error(pr.FAILURE, diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index 4217a726..a3c7d13f 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -16,10 +16,9 @@ import socket -from six.moves import mock - from taskflow.engines.worker_based import proxy from taskflow import test +from taskflow.test import mock from taskflow.utils import threading_utils @@ -133,24 +132,24 @@ class TestProxy(test.MockTestCase): msg_mock.to_dict.return_value = msg_data routing_key = 'routing-key' task_uuid = 'task-uuid' - kwargs = dict(a='a', b='b') self.proxy(reset_master_mock=True).publish( - msg_mock, routing_key, correlation_id=task_uuid, **kwargs) + msg_mock, routing_key, correlation_id=task_uuid) master_mock_calls = [ mock.call.Queue(name=self._queue_name(routing_key), exchange=self.exchange_inst_mock, routing_key=routing_key, durable=False, - auto_delete=True), + auto_delete=True, + channel=None), mock.call.producer.publish(body=msg_data, routing_key=routing_key, exchange=self.exchange_inst_mock, correlation_id=task_uuid, declare=[self.queue_inst_mock], type=msg_mock.TYPE, - **kwargs) + reply_to=None) ] self.master_mock.assert_has_calls(master_mock_calls)