Merge "Improve proxy publish method"

This commit is contained in:
Jenkins
2014-03-20 09:34:37 +00:00
committed by Gerrit Code Review
2 changed files with 15 additions and 9 deletions

View File

@@ -169,8 +169,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
"""Cyclically publish notify message to each topic."""
LOG.debug("Notify thread started.")
while not self._notify_event.is_set():
for topic in self._topics:
self._proxy.publish(pr.Notify(), topic, reply_to=self._uuid)
self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
self._notify_event.wait(pr.NOTIFY_PERIOD)
def execute_task(self, task, task_uuid, arguments,

View File

@@ -19,6 +19,8 @@ import logging
import socket
import threading
import six
LOG = logging.getLogger(__name__)
# NOTE(skudriashev): A timeout of 1 is often used in environments where
@@ -67,14 +69,19 @@ class Proxy(object):
def publish(self, msg, routing_key, **kwargs):
"""Publish message to the named exchange with routing key."""
if isinstance(routing_key, six.string_types):
routing_keys = [routing_key]
else:
routing_keys = routing_key
with kombu.producers[self._conn].acquire(block=True) as producer:
queue = self._make_queue(routing_key, self._exchange)
producer.publish(body=msg.to_dict(),
routing_key=routing_key,
exchange=self._exchange,
declare=[queue],
type=msg.TYPE,
**kwargs)
for routing_key in routing_keys:
queue = self._make_queue(routing_key, self._exchange)
producer.publish(body=msg.to_dict(),
routing_key=routing_key,
exchange=self._exchange,
declare=[queue],
type=msg.TYPE,
**kwargs)
def start(self):
"""Start proxy."""