Ensure queue is declared durable so messages aren't dropped

Fixes bug lp901375

Ensure that a queue is declared durable so messages aren't dropped before
consumers are started

Change-Id: I9f8dfd6eaf3996be58fecff6ad91508bdcef23f3
This commit is contained in:
Johannes Erdfelt
2011-12-28 22:34:08 +00:00
parent 599760a1f8
commit 28ac41efda

View File

@@ -290,6 +290,27 @@ class FanoutPublisher(Publisher):
**options)
class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'"""
def __init__(self, *args, **kwargs):
self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues)
super(NotifyPublisher, self).__init__(*args, **kwargs)
def reconnect(self, channel):
super(NotifyPublisher, self).reconnect(channel)
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(channel=channel,
exchange=self.exchange,
durable=self.durable,
name=self.routing_key,
routing_key=self.routing_key)
queue.declare()
class Connection(object):
"""Connection object."""
@@ -461,14 +482,18 @@ class Connection(object):
"""Send a 'direct' message"""
self.publisher_send(DirectPublisher, msg_id, msg)
def topic_send(self, topic, msg, **kwargs):
def topic_send(self, topic, msg):
"""Send a 'topic' message"""
self.publisher_send(TopicPublisher, topic, msg, **kwargs)
self.publisher_send(TopicPublisher, topic, msg)
def fanout_send(self, topic, msg):
"""Send a 'fanout' message"""
self.publisher_send(FanoutPublisher, topic, msg)
def notify_send(self, topic, msg, **kwargs):
"""Send a notify message on a topic"""
self.publisher_send(NotifyPublisher, topic, msg, **kwargs)
def consume(self, limit=None):
"""Consume from all queues/consumers"""
it = self.iterconsume(limit=limit)
@@ -778,7 +803,7 @@ def notify(context, topic, msg):
LOG.debug(_('Sending notification on %s...'), topic)
_pack_context(msg, context)
with ConnectionContext() as conn:
conn.topic_send(topic, msg, durable=True)
conn.notify_send(topic, msg, durable=True)
def msg_reply(msg_id, reply=None, failure=None, ending=False):