Be explicit about publish keyword arguments
Instead of allowing for arbitrary keyword arguments which makes the API hard to change in the future prefer to have explicit arguments (and keyword arguments) instead of allowing **kwargs to be passed. Change-Id: I374db6b19ef76c2f9ee04771f5d928c79b7cf049
This commit is contained in:
@@ -211,8 +211,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
|
||||
" correlation_id=%s)", request, topic, self._uuid,
|
||||
request.uuid)
|
||||
try:
|
||||
self._proxy.publish(msg=request,
|
||||
routing_key=topic,
|
||||
self._proxy.publish(request, topic,
|
||||
reply_to=self._uuid,
|
||||
correlation_id=request.uuid)
|
||||
except Exception:
|
||||
|
||||
@@ -99,16 +99,15 @@ class Proxy(object):
|
||||
"""Return whether the proxy is running."""
|
||||
return self._running.is_set()
|
||||
|
||||
def _make_queue(self, name, exchange, **kwargs):
|
||||
"""Make named queue for the given exchange."""
|
||||
return kombu.Queue(name="%s_%s" % (self._exchange_name, name),
|
||||
exchange=exchange,
|
||||
routing_key=name,
|
||||
durable=False,
|
||||
auto_delete=True,
|
||||
**kwargs)
|
||||
def _make_queue(self, routing_key, exchange, channel=None):
|
||||
"""Make a named queue for the given exchange."""
|
||||
queue_name = "%s_%s" % (self._exchange_name, routing_key)
|
||||
return kombu.Queue(name=queue_name,
|
||||
routing_key=routing_key, durable=False,
|
||||
exchange=exchange, auto_delete=True,
|
||||
channel=channel)
|
||||
|
||||
def publish(self, msg, routing_key, **kwargs):
|
||||
def publish(self, msg, routing_key, reply_to=None, correlation_id=None):
|
||||
"""Publish message to the named exchange with given routing key."""
|
||||
if isinstance(routing_key, six.string_types):
|
||||
routing_keys = [routing_key]
|
||||
@@ -123,7 +122,8 @@ class Proxy(object):
|
||||
exchange=self._exchange,
|
||||
declare=[queue],
|
||||
type=msg.TYPE,
|
||||
**kwargs)
|
||||
reply_to=reply_to,
|
||||
correlation_id=correlation_id)
|
||||
|
||||
def start(self):
|
||||
"""Start proxy."""
|
||||
|
||||
@@ -214,8 +214,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
self.task_args, None, self.timeout),
|
||||
mock.call.request.transition_and_log_error(pr.PENDING,
|
||||
logger=mock.ANY),
|
||||
mock.call.proxy.publish(msg=self.request_inst_mock,
|
||||
routing_key=self.executor_topic,
|
||||
mock.call.proxy.publish(self.request_inst_mock,
|
||||
self.executor_topic,
|
||||
reply_to=self.executor_uuid,
|
||||
correlation_id=self.task_uuid)
|
||||
]
|
||||
@@ -236,8 +236,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
result=self.task_result),
|
||||
mock.call.request.transition_and_log_error(pr.PENDING,
|
||||
logger=mock.ANY),
|
||||
mock.call.proxy.publish(msg=self.request_inst_mock,
|
||||
routing_key=self.executor_topic,
|
||||
mock.call.proxy.publish(self.request_inst_mock,
|
||||
self.executor_topic,
|
||||
reply_to=self.executor_uuid,
|
||||
correlation_id=self.task_uuid)
|
||||
]
|
||||
@@ -267,8 +267,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
|
||||
self.task_args, None, self.timeout),
|
||||
mock.call.request.transition_and_log_error(pr.PENDING,
|
||||
logger=mock.ANY),
|
||||
mock.call.proxy.publish(msg=self.request_inst_mock,
|
||||
routing_key=self.executor_topic,
|
||||
mock.call.proxy.publish(self.request_inst_mock,
|
||||
self.executor_topic,
|
||||
reply_to=self.executor_uuid,
|
||||
correlation_id=self.task_uuid),
|
||||
mock.call.request.transition_and_log_error(pr.FAILURE,
|
||||
|
||||
@@ -16,10 +16,9 @@
|
||||
|
||||
import socket
|
||||
|
||||
from six.moves import mock
|
||||
|
||||
from taskflow.engines.worker_based import proxy
|
||||
from taskflow import test
|
||||
from taskflow.test import mock
|
||||
from taskflow.utils import threading_utils
|
||||
|
||||
|
||||
@@ -133,24 +132,24 @@ class TestProxy(test.MockTestCase):
|
||||
msg_mock.to_dict.return_value = msg_data
|
||||
routing_key = 'routing-key'
|
||||
task_uuid = 'task-uuid'
|
||||
kwargs = dict(a='a', b='b')
|
||||
|
||||
self.proxy(reset_master_mock=True).publish(
|
||||
msg_mock, routing_key, correlation_id=task_uuid, **kwargs)
|
||||
msg_mock, routing_key, correlation_id=task_uuid)
|
||||
|
||||
master_mock_calls = [
|
||||
mock.call.Queue(name=self._queue_name(routing_key),
|
||||
exchange=self.exchange_inst_mock,
|
||||
routing_key=routing_key,
|
||||
durable=False,
|
||||
auto_delete=True),
|
||||
auto_delete=True,
|
||||
channel=None),
|
||||
mock.call.producer.publish(body=msg_data,
|
||||
routing_key=routing_key,
|
||||
exchange=self.exchange_inst_mock,
|
||||
correlation_id=task_uuid,
|
||||
declare=[self.queue_inst_mock],
|
||||
type=msg_mock.TYPE,
|
||||
**kwargs)
|
||||
reply_to=None)
|
||||
]
|
||||
self.master_mock.assert_has_calls(master_mock_calls)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user