Merge "Be explicit about publish keyword arguments"

This commit is contained in:
Jenkins
2014-12-05 08:17:08 +00:00
committed by Gerrit Code Review
4 changed files with 22 additions and 24 deletions

View File

@@ -211,8 +211,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
" correlation_id=%s)", request, topic, self._uuid,
request.uuid)
try:
self._proxy.publish(msg=request,
routing_key=topic,
self._proxy.publish(request, topic,
reply_to=self._uuid,
correlation_id=request.uuid)
except Exception:

View File

@@ -99,16 +99,15 @@ class Proxy(object):
"""Return whether the proxy is running."""
return self._running.is_set()
def _make_queue(self, name, exchange, **kwargs):
"""Make named queue for the given exchange."""
return kombu.Queue(name="%s_%s" % (self._exchange_name, name),
exchange=exchange,
routing_key=name,
durable=False,
auto_delete=True,
**kwargs)
def _make_queue(self, routing_key, exchange, channel=None):
"""Make a named queue for the given exchange."""
queue_name = "%s_%s" % (self._exchange_name, routing_key)
return kombu.Queue(name=queue_name,
routing_key=routing_key, durable=False,
exchange=exchange, auto_delete=True,
channel=channel)
def publish(self, msg, routing_key, **kwargs):
def publish(self, msg, routing_key, reply_to=None, correlation_id=None):
"""Publish message to the named exchange with given routing key."""
if isinstance(routing_key, six.string_types):
routing_keys = [routing_key]
@@ -123,7 +122,8 @@ class Proxy(object):
exchange=self._exchange,
declare=[queue],
type=msg.TYPE,
**kwargs)
reply_to=reply_to,
correlation_id=correlation_id)
def start(self):
"""Start proxy."""

View File

@@ -214,8 +214,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.task_args, None, self.timeout),
mock.call.request.transition_and_log_error(pr.PENDING,
logger=mock.ANY),
mock.call.proxy.publish(msg=self.request_inst_mock,
routing_key=self.executor_topic,
mock.call.proxy.publish(self.request_inst_mock,
self.executor_topic,
reply_to=self.executor_uuid,
correlation_id=self.task_uuid)
]
@@ -236,8 +236,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
result=self.task_result),
mock.call.request.transition_and_log_error(pr.PENDING,
logger=mock.ANY),
mock.call.proxy.publish(msg=self.request_inst_mock,
routing_key=self.executor_topic,
mock.call.proxy.publish(self.request_inst_mock,
self.executor_topic,
reply_to=self.executor_uuid,
correlation_id=self.task_uuid)
]
@@ -267,8 +267,8 @@ class TestWorkerTaskExecutor(test.MockTestCase):
self.task_args, None, self.timeout),
mock.call.request.transition_and_log_error(pr.PENDING,
logger=mock.ANY),
mock.call.proxy.publish(msg=self.request_inst_mock,
routing_key=self.executor_topic,
mock.call.proxy.publish(self.request_inst_mock,
self.executor_topic,
reply_to=self.executor_uuid,
correlation_id=self.task_uuid),
mock.call.request.transition_and_log_error(pr.FAILURE,

View File

@@ -16,10 +16,9 @@
import socket
from six.moves import mock
from taskflow.engines.worker_based import proxy
from taskflow import test
from taskflow.test import mock
from taskflow.utils import threading_utils
@@ -133,24 +132,24 @@ class TestProxy(test.MockTestCase):
msg_mock.to_dict.return_value = msg_data
routing_key = 'routing-key'
task_uuid = 'task-uuid'
kwargs = dict(a='a', b='b')
self.proxy(reset_master_mock=True).publish(
msg_mock, routing_key, correlation_id=task_uuid, **kwargs)
msg_mock, routing_key, correlation_id=task_uuid)
master_mock_calls = [
mock.call.Queue(name=self._queue_name(routing_key),
exchange=self.exchange_inst_mock,
routing_key=routing_key,
durable=False,
auto_delete=True),
auto_delete=True,
channel=None),
mock.call.producer.publish(body=msg_data,
routing_key=routing_key,
exchange=self.exchange_inst_mock,
correlation_id=task_uuid,
declare=[self.queue_inst_mock],
type=msg_mock.TYPE,
**kwargs)
reply_to=None)
]
self.master_mock.assert_has_calls(master_mock_calls)