From 07f1d43a0479c385bbc22484a2145c080abfbb2c Mon Sep 17 00:00:00 2001 From: Johannes Erdfelt Date: Wed, 28 Dec 2011 22:34:08 +0000 Subject: [PATCH] Ensure queue is declared durable so messages aren't dropped Fixes bug lp901375 Ensure that a queue is declared durable so messages aren't dropped before consumers are started Change-Id: I9f8dfd6eaf3996be58fecff6ad91508bdcef23f3 --- nova/rpc/impl_kombu.py | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 8da8ff1a..810a6629 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -290,6 +290,27 @@ class FanoutPublisher(Publisher): **options) +class NotifyPublisher(TopicPublisher): + """Publisher class for 'notify'""" + + def __init__(self, *args, **kwargs): + self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues) + super(NotifyPublisher, self).__init__(*args, **kwargs) + + def reconnect(self, channel): + super(NotifyPublisher, self).reconnect(channel) + + # NOTE(jerdfelt): Normally the consumer would create the queue, but + # we do this to ensure that messages don't get dropped if the + # consumer is started after we do + queue = kombu.entity.Queue(channel=channel, + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) + queue.declare() + + class Connection(object): """Connection object.""" @@ -461,14 +482,18 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg, **kwargs): + def topic_send(self, topic, msg): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg, **kwargs) + self.publisher_send(TopicPublisher, topic, msg) def fanout_send(self, topic, msg): """Send a 'fanout' message""" self.publisher_send(FanoutPublisher, topic, msg) + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic""" + self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + def consume(self, limit=None): """Consume from all queues/consumers""" it = self.iterconsume(limit=limit) @@ -778,7 +803,7 @@ def notify(context, topic, msg): LOG.debug(_('Sending notification on %s...'), topic) _pack_context(msg, context) with ConnectionContext() as conn: - conn.topic_send(topic, msg, durable=True) + conn.notify_send(topic, msg, durable=True) def msg_reply(msg_id, reply=None, failure=None, ending=False):