diff --git a/nova/rpc/impl_kombu.py b/nova/rpc/impl_kombu.py index 8da8ff1a..810a6629 100644 --- a/nova/rpc/impl_kombu.py +++ b/nova/rpc/impl_kombu.py @@ -290,6 +290,27 @@ class FanoutPublisher(Publisher): **options) +class NotifyPublisher(TopicPublisher): + """Publisher class for 'notify'""" + + def __init__(self, *args, **kwargs): + self.durable = kwargs.pop('durable', FLAGS.rabbit_durable_queues) + super(NotifyPublisher, self).__init__(*args, **kwargs) + + def reconnect(self, channel): + super(NotifyPublisher, self).reconnect(channel) + + # NOTE(jerdfelt): Normally the consumer would create the queue, but + # we do this to ensure that messages don't get dropped if the + # consumer is started after we do + queue = kombu.entity.Queue(channel=channel, + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) + queue.declare() + + class Connection(object): """Connection object.""" @@ -461,14 +482,18 @@ class Connection(object): """Send a 'direct' message""" self.publisher_send(DirectPublisher, msg_id, msg) - def topic_send(self, topic, msg, **kwargs): + def topic_send(self, topic, msg): """Send a 'topic' message""" - self.publisher_send(TopicPublisher, topic, msg, **kwargs) + self.publisher_send(TopicPublisher, topic, msg) def fanout_send(self, topic, msg): """Send a 'fanout' message""" self.publisher_send(FanoutPublisher, topic, msg) + def notify_send(self, topic, msg, **kwargs): + """Send a notify message on a topic""" + self.publisher_send(NotifyPublisher, topic, msg, **kwargs) + def consume(self, limit=None): """Consume from all queues/consumers""" it = self.iterconsume(limit=limit) @@ -778,7 +803,7 @@ def notify(context, topic, msg): LOG.debug(_('Sending notification on %s...'), topic) _pack_context(msg, context) with ConnectionContext() as conn: - conn.topic_send(topic, msg, durable=True) + conn.notify_send(topic, msg, durable=True) def msg_reply(msg_id, reply=None, failure=None, ending=False):