Synchronize notification queue setup between nova and glance

Fixes bug 901376

Glance used a SimpleQueue which would end up with a direct queue with an
exchange named the same as the queue. This was different than Nova, which
uses a topic queue with an exchange named 'nova'.

This change makes Glance use a topic queue with a configurable exchange
name to match Nova.

Change-Id: Ia014e4c00060abc2345289a54e45bbfdc6b7e8e5
This commit is contained in:
Johannes Erdfelt 2011-12-28 16:05:05 +00:00
parent 00ac2c7250
commit fa1b0b1f78
4 changed files with 36 additions and 5 deletions

View File

@ -670,6 +670,12 @@ Optional. Default: ``/``
Virtual host to use for connection when using ``rabbit`` strategy.
* ``rabbit_notification_exchange``
Optional. Default: ``glance``
Exchange name to use for connection when using ``rabbit`` strategy.
* ``rabbit_notification_topic``
Optional. Default: ``glance_notifications``

View File

@ -84,6 +84,7 @@ rabbit_use_ssl = false
rabbit_userid = guest
rabbit_password = guest
rabbit_virtual_host = /
rabbit_notification_exchange = glance
rabbit_notification_topic = glance_notifications
# ============ Filesystem Store Options ========================

View File

@ -16,11 +16,13 @@
# under the License.
import datetime
import json
import logging
import socket
import uuid
import kombu.connection
import kombu.entity
from glance.common import cfg
from glance.common import exception
@ -68,6 +70,7 @@ class RabbitStrategy(object):
cfg.StrOpt('rabbit_userid', default='guest'),
cfg.StrOpt('rabbit_password', default='guest'),
cfg.StrOpt('rabbit_virtual_host', default='/'),
cfg.StrOpt('rabbit_notification_exchange', default='glance'),
cfg.StrOpt('rabbit_notification_topic', default='glance_notifications')
]
@ -76,20 +79,40 @@ class RabbitStrategy(object):
self._conf = conf
self._conf.register_opts(self.opts)
self.topic = self._conf.rabbit_notification_topic
self.connect()
def connect(self):
self.connection = kombu.connection.BrokerConnection(
hostname=self._conf.rabbit_host,
userid=self._conf.rabbit_userid,
password=self._conf.rabbit_password,
virtual_host=self._conf.rabbit_virtual_host,
ssl=self._conf.rabbit_use_ssl)
self.channel = self.connection.channel()
self.topic = self._conf.rabbit_notification_topic
self.exchange = kombu.entity.Exchange(
channel=self.channel,
type="topic",
name=self._conf.rabbit_notification_exchange)
self.exchange.declare()
def _send_message(self, message, priority):
topic = "%s.%s" % (self.topic, priority)
queue = self.connection.SimpleQueue(topic)
queue.put(message, serializer="json")
queue.close()
routing_key = "%s.%s" % (self.topic, priority.lower())
# NOTE(jerdfelt): Normally the consumer would create the queue, but
# we do this to ensure that messages don't get dropped if the
# consumer is started after we do
queue = kombu.entity.Queue(
channel=self.channel,
exchange=self.exchange,
durable=True,
name=routing_key,
routing_key=routing_key)
queue.declare()
msg = self.exchange.Message(json.dumps(message))
self.exchange.publish(msg, routing_key=routing_key)
def warn(self, msg):
self._send_message(msg, "WARN")

View File

@ -86,6 +86,7 @@ class TestRabbitNotifier(unittest.TestCase):
def setUp(self):
notifier.RabbitStrategy._send_message = self._send_message
notifier.RabbitStrategy.connect = lambda s: None
self.called = False
conf = utils.TestConfigOpts({"notifier_strategy": "rabbit"})
self.notifier = notifier.Notifier(conf)