Improve proxy publish method

Added possibility to send a message to multiple routing keys
without having to create new connection for each send.

Change-Id: Ie23cfd130996b693c091e164043d80f35f04d694
This commit is contained in:
Stanislav Kudriashev
2014-03-19 14:01:15 +02:00
parent 09490cd904
commit e394d03a3a
2 changed files with 15 additions and 9 deletions

View File

@@ -169,8 +169,7 @@ class WorkerTaskExecutor(executor.TaskExecutorBase):
"""Cyclically publish notify message to each topic."""
LOG.debug("Notify thread started.")
while not self._notify_event.is_set():
for topic in self._topics:
self._proxy.publish(pr.Notify(), topic, reply_to=self._uuid)
self._proxy.publish(pr.Notify(), self._topics, reply_to=self._uuid)
self._notify_event.wait(pr.NOTIFY_PERIOD)
def execute_task(self, task, task_uuid, arguments,

View File

@@ -19,6 +19,8 @@ import logging
import socket
import threading
import six
LOG = logging.getLogger(__name__)
# NOTE(skudriashev): A timeout of 1 is often used in environments where
@@ -67,14 +69,19 @@ class Proxy(object):
def publish(self, msg, routing_key, **kwargs):
"""Publish message to the named exchange with routing key."""
if isinstance(routing_key, six.string_types):
routing_keys = [routing_key]
else:
routing_keys = routing_key
with kombu.producers[self._conn].acquire(block=True) as producer:
queue = self._make_queue(routing_key, self._exchange)
producer.publish(body=msg.to_dict(),
routing_key=routing_key,
exchange=self._exchange,
declare=[queue],
type=msg.TYPE,
**kwargs)
for routing_key in routing_keys:
queue = self._make_queue(routing_key, self._exchange)
producer.publish(body=msg.to_dict(),
routing_key=routing_key,
exchange=self._exchange,
declare=[queue],
type=msg.TYPE,
**kwargs)
def start(self):
"""Start proxy."""