diff --git a/taskflow/engines/worker_based/executor.py b/taskflow/engines/worker_based/executor.py index 033ffff7..65d6ccd6 100644 --- a/taskflow/engines/worker_based/executor.py +++ b/taskflow/engines/worker_based/executor.py @@ -169,8 +169,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase): """Cyclically publish notify message to each topic.""" LOG.debug("Notify thread started.") while not self._notify_event.is_set(): - for topic in self._topics: - self._proxy.publish(pr.Notify(), topic, reply_to=self._uuid) + self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid) self._notify_event.wait(pr.NOTIFY_PERIOD) def execute_task(self, task, task_uuid, arguments, diff --git a/taskflow/engines/worker_based/proxy.py b/taskflow/engines/worker_based/proxy.py index c2203f30..7fc3fb7d 100644 --- a/taskflow/engines/worker_based/proxy.py +++ b/taskflow/engines/worker_based/proxy.py @@ -19,6 +19,8 @@ import logging import socket import threading +import six + LOG = logging.getLogger(__name__) # NOTE(skudriashev): A timeout of 1 is often used in environments where @@ -67,14 +69,19 @@ class Proxy(object): def publish(self, msg, routing_key, **kwargs): """Publish message to the named exchange with routing key.""" + if isinstance(routing_key, six.string_types): + routing_keys = [routing_key] + else: + routing_keys = routing_key with kombu.producers[self._conn].acquire(block=True) as producer: - queue = self._make_queue(routing_key, self._exchange) - producer.publish(body=msg.to_dict(), - routing_key=routing_key, - exchange=self._exchange, - declare=[queue], - type=msg.TYPE, - **kwargs) + for routing_key in routing_keys: + queue = self._make_queue(routing_key, self._exchange) + producer.publish(body=msg.to_dict(), + routing_key=routing_key, + exchange=self._exchange, + declare=[queue], + type=msg.TYPE, + **kwargs) def start(self): """Start proxy."""