Sync rpc/impl_qpid.py from oslo-incubator.

Fixes bug #1211338 (seen in Neutron, marked in launchpad as oslo
since the bug was in oslo code).

Change-Id: I11971c1213b095979bd4f2e878ea2bcad3ceb617
This commit is contained in:
David Ripton 2013-08-12 10:38:17 -04:00
parent 6eae300755
commit 3f4bb0443e

View File

@ -24,7 +24,8 @@ import eventlet
import greenlet import greenlet
from oslo.config import cfg from oslo.config import cfg
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _ # noqa
from neutron.openstack.common import importutils from neutron.openstack.common import importutils
from neutron.openstack.common import jsonutils from neutron.openstack.common import jsonutils
from neutron.openstack.common import log as logging from neutron.openstack.common import log as logging
@ -118,10 +119,17 @@ class ConsumerBase(object):
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
self.reconnect(session) self.connect(session)
def connect(self, session):
"""Declare the reciever on connect."""
self._declare_receiver(session)
def reconnect(self, session): def reconnect(self, session):
"""Re-declare the receiver after a qpid reconnect.""" """Re-declare the receiver after a qpid reconnect."""
self._declare_receiver(session)
def _declare_receiver(self, session):
self.session = session self.session = session
self.receiver = session.receiver(self.address) self.receiver = session.receiver(self.address)
self.receiver.capacity = 1 self.receiver.capacity = 1
@ -152,11 +160,15 @@ class ConsumerBase(object):
except Exception: except Exception:
LOG.exception(_("Failed to process message... skipping it.")) LOG.exception(_("Failed to process message... skipping it."))
finally: finally:
# TODO(sandy): Need support for optional ack_on_error.
self.session.acknowledge(message) self.session.acknowledge(message)
def get_receiver(self): def get_receiver(self):
return self.receiver return self.receiver
def get_node_name(self):
return self.address.split(';')[0]
class DirectConsumer(ConsumerBase): class DirectConsumer(ConsumerBase):
"""Queue/consumer class for 'direct'.""" """Queue/consumer class for 'direct'."""
@ -169,11 +181,16 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
super(DirectConsumer, self).__init__(session, callback, super(DirectConsumer, self).__init__(
"%s/%s" % (msg_id, msg_id), session, callback,
{"type": "direct"}, "%s/%s" % (msg_id, msg_id),
msg_id, {"type": "direct"},
{"exclusive": True}) msg_id,
{
"auto-delete": conf.amqp_auto_delete,
"exclusive": True,
"durable": conf.amqp_durable_queues,
})
class TopicConsumer(ConsumerBase): class TopicConsumer(ConsumerBase):
@ -191,9 +208,14 @@ class TopicConsumer(ConsumerBase):
""" """
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback, super(TopicConsumer, self).__init__(
"%s/%s" % (exchange_name, topic), session, callback,
{}, name or topic, {}) "%s/%s" % (exchange_name, topic),
{}, name or topic,
{
"auto-delete": conf.amqp_auto_delete,
"durable": conf.amqp_durable_queues,
})
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
@ -206,6 +228,7 @@ class FanoutConsumer(ConsumerBase):
'topic' is the topic to listen on 'topic' is the topic to listen on
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
self.conf = conf
super(FanoutConsumer, self).__init__( super(FanoutConsumer, self).__init__(
session, callback, session, callback,
@ -214,6 +237,18 @@ class FanoutConsumer(ConsumerBase):
"%s_fanout_%s" % (topic, uuid.uuid4().hex), "%s_fanout_%s" % (topic, uuid.uuid4().hex),
{"exclusive": True}) {"exclusive": True})
def reconnect(self, session):
topic = self.get_node_name().rpartition('_fanout')[0]
params = {
'session': session,
'topic': topic,
'callback': self.callback,
}
self.__init__(conf=self.conf, **params)
super(FanoutConsumer, self).reconnect(session)
class Publisher(object): class Publisher(object):
"""Base Publisher class.""" """Base Publisher class."""
@ -285,7 +320,7 @@ class DirectPublisher(Publisher):
def __init__(self, conf, session, msg_id): def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher.""" """Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id, super(DirectPublisher, self).__init__(session, msg_id,
{"type": "Direct"}) {"type": "direct"})
class TopicPublisher(Publisher): class TopicPublisher(Publisher):
@ -575,6 +610,7 @@ class Connection(object):
def consume_in_thread(self): def consume_in_thread(self):
"""Consumer from all queues/consumers in a greenthread.""" """Consumer from all queues/consumers in a greenthread."""
@excutils.forever_retry_uncaught_exceptions
def _consumer_thread(): def _consumer_thread():
try: try:
self.consume() self.consume()
@ -615,7 +651,7 @@ class Connection(object):
return consumer return consumer
def join_consumer_pool(self, callback, pool_name, topic, def join_consumer_pool(self, callback, pool_name, topic,
exchange_name=None): exchange_name=None, ack_on_error=True):
"""Register as a member of a group of consumers for a given topic from """Register as a member of a group of consumers for a given topic from
the specified exchange. the specified exchange.