From c69884209a73792e85c2e8d2c5b1fab8685847cd Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Fri, 29 Aug 2014 14:48:22 -0700 Subject: [PATCH] Be explicit about publish keyword arguments Instead of allowing for arbitrary keyword arguments which makes the API hard to change in the future prefer to have explicit arguments (and keyword arguments) instead of allowing **kwargs to be passed. Change-Id: I374db6b19ef76c2f9ee04771f5d928c79b7cf049 --- taskflow/engines/worker_based/executor.py | 3 +-- taskflow/engines/worker_based/proxy.py | 20 +++++++++---------- .../tests/unit/worker_based/test_executor.py | 12 +++++------ .../tests/unit/worker_based/test_proxy.py | 11 +++++----- 4 files changed, 22 insertions(+), 24 deletions(-) diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index ae8e0e40..5982a463 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -211,8 +211,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): " correlation_id=%s)", request, topic, self._uuid, request.uuid) try: - self._proxy.publish(msg=request, - routing_key=topic, + self._proxy.publish(request, topic, reply_to=self._uuid, correlation_id=request.uuid) except Exception: diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index 6f608f42..3f279a77 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -99,16 +99,15 @@ class Proxy(object): """Return whether the proxy is running.""" return self._running.is_set() - def _make_queue(self, name, exchange, **kwargs): - """Make named queue for the given exchange.""" - return kombu.Queue(name="%s_%s" % (self._exchange_name, name), - exchange=exchange, - routing_key=name, - durable=False, - auto_delete=True, - **kwargs) + def _make_queue(self, routing_key, exchange, channel=None): + """Make a named queue for the given exchange.""" + queue_name = "%s_%s" % (self._exchange_name, routing_key) + return kombu.Queue(name=queue_name, + routing_key=routing_key, durable=False, + exchange=exchange, auto_delete=True, + channel=channel) - def publish(self, msg, routing_key, **kwargs): + def publish(self, msg, routing_key, reply_to=None, correlation_id=None): """Publish message to the named exchange with given routing key.""" if isinstance(routing_key, six.string_types): routing_keys = [routing_key] @@ -123,7 +122,8 @@ class Proxy(object): exchange=self._exchange, declare=[queue], type=msg.TYPE, - **kwargs) + reply_to=reply_to, + correlation_id=correlation_id) def start(self): """Start proxy.""" diff --git a/taskflow/tests/unit/worker_based/test_executor.py b/taskflow/tests/unit/worker_based/test_executor.py index d2b97bfe..ba73ad7e 100644 --- a/taskflow/tests/unit/worker_based/test_executor.py +++ b/taskflow/tests/unit/worker_based/test_executor.py @@ -214,8 +214,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.task_args, None, self.timeout), mock.call.request.transition_and_log_error(pr.PENDING, logger=mock.ANY), - mock.call.proxy.publish(msg=self.request_inst_mock, - routing_key=self.executor_topic, + mock.call.proxy.publish(self.request_inst_mock, + self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid) ] @@ -236,8 +236,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): result=self.task_result), mock.call.request.transition_and_log_error(pr.PENDING, logger=mock.ANY), - mock.call.proxy.publish(msg=self.request_inst_mock, - routing_key=self.executor_topic, + mock.call.proxy.publish(self.request_inst_mock, + self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid) ] @@ -267,8 +267,8 @@ class TestWorkerTaskExecutor(test.MockTestCase): self.task_args, None, self.timeout), mock.call.request.transition_and_log_error(pr.PENDING, logger=mock.ANY), - mock.call.proxy.publish(msg=self.request_inst_mock, - routing_key=self.executor_topic, + mock.call.proxy.publish(self.request_inst_mock, + self.executor_topic, reply_to=self.executor_uuid, correlation_id=self.task_uuid), mock.call.request.transition_and_log_error(pr.FAILURE, diff --git a/taskflow/tests/unit/worker_based/test_proxy.py b/taskflow/tests/unit/worker_based/test_proxy.py index 4217a726..a3c7d13f 100644 --- a/taskflow/tests/unit/worker_based/test_proxy.py +++ b/taskflow/tests/unit/worker_based/test_proxy.py @@ -16,10 +16,9 @@ import socket -from six.moves import mock - from taskflow.engines.worker_based import proxy from taskflow import test +from taskflow.test import mock from taskflow.utils import threading_utils @@ -133,24 +132,24 @@ class TestProxy(test.MockTestCase): msg_mock.to_dict.return_value = msg_data routing_key = 'routing-key' task_uuid = 'task-uuid' - kwargs = dict(a='a', b='b') self.proxy(reset_master_mock=True).publish( - msg_mock, routing_key, correlation_id=task_uuid, **kwargs) + msg_mock, routing_key, correlation_id=task_uuid) master_mock_calls = [ mock.call.Queue(name=self._queue_name(routing_key), exchange=self.exchange_inst_mock, routing_key=routing_key, durable=False, - auto_delete=True), + auto_delete=True, + channel=None), mock.call.producer.publish(body=msg_data, routing_key=routing_key, exchange=self.exchange_inst_mock, correlation_id=task_uuid, declare=[self.queue_inst_mock], type=msg_mock.TYPE, - **kwargs) + reply_to=None) ] self.master_mock.assert_has_calls(master_mock_calls)