Sync rpc fix from oslo-incubator

Sync the following fix from oslo-incubator:

76972e2 Support a new qpid topology

This includes one other commit, so that the above fix could be brought
over cleanly:

5ff534d Add config for amqp durable/auto_delete queues

Closes-bug: #1178375
Change-Id: I99d6a1771bc3223f86db0132525bf22c271fe862
This commit is contained in:
Russell Bryant 2013-09-03 02:51:14 -04:00
parent 957533f685
commit 34a208d1f3
3 changed files with 179 additions and 65 deletions

View File

@ -34,6 +34,7 @@ from eventlet import greenpool
from eventlet import pools from eventlet import pools
from eventlet import queue from eventlet import queue
from eventlet import semaphore from eventlet import semaphore
from oslo.config import cfg
from neutron.openstack.common import excutils from neutron.openstack.common import excutils
from neutron.openstack.common.gettextutils import _ from neutron.openstack.common.gettextutils import _
@ -42,6 +43,19 @@ from neutron.openstack.common import log as logging
from neutron.openstack.common.rpc import common as rpc_common from neutron.openstack.common.rpc import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
deprecated_name='rabbit_durable_queues',
deprecated_group='DEFAULT',
help='Use durable queues in amqp.'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
help='Auto-delete queues in amqp.'),
]
cfg.CONF.register_opts(amqp_opts)
UNIQUE_ID = '_unique_id' UNIQUE_ID = '_unique_id'
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -82,9 +82,6 @@ kombu_opts = [
default=0, default=0,
help='maximum retries with trying to connect to RabbitMQ ' help='maximum retries with trying to connect to RabbitMQ '
'(the default of 0 implies an infinite retry count)'), '(the default of 0 implies an infinite retry count)'),
cfg.BoolOpt('rabbit_durable_queues',
default=False,
help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues', cfg.BoolOpt('rabbit_ha_queues',
default=False, default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).' help='use H/A queues in RabbitMQ (x-ha-policy: all).'
@ -233,9 +230,9 @@ class TopicConsumer(ConsumerBase):
Other kombu options may be passed as keyword arguments Other kombu options may be passed as keyword arguments
""" """
# Default options # Default options
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.amqp_durable_queues,
'queue_arguments': _get_queue_arguments(conf), 'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
@ -339,8 +336,8 @@ class TopicPublisher(Publisher):
Kombu options may be passed as keyword args to override defaults Kombu options may be passed as keyword args to override defaults
""" """
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.amqp_durable_queues,
'auto_delete': False, 'auto_delete': conf.amqp_auto_delete,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
@ -370,7 +367,7 @@ class NotifyPublisher(TopicPublisher):
"""Publisher class for 'notify'.""" """Publisher class for 'notify'."""
def __init__(self, conf, channel, topic, **kwargs): def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) self.durable = kwargs.pop('durable', conf.amqp_durable_queues)
self.queue_arguments = _get_queue_arguments(conf) self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)

View File

@ -66,6 +66,17 @@ qpid_opts = [
cfg.BoolOpt('qpid_tcp_nodelay', cfg.BoolOpt('qpid_tcp_nodelay',
default=True, default=True,
help='Disable Nagle algorithm'), help='Disable Nagle algorithm'),
# NOTE(russellb) If any additional versions are added (beyond 1 and 2),
# this file could probably use some additional refactoring so that the
# differences between each version are split into different classes.
cfg.IntOpt('qpid_topology_version',
default=1,
help="The qpid topology version to use. Version 1 is what "
"was originally used by impl_qpid. Version 2 includes "
"some backwards-incompatible changes that allow broker "
"federation to work. Users should update to version 2 "
"when they are able to take everything down, as it "
"requires a clean break."),
] ]
cfg.CONF.register_opts(qpid_opts) cfg.CONF.register_opts(qpid_opts)
@ -73,10 +84,17 @@ cfg.CONF.register_opts(qpid_opts)
JSON_CONTENT_TYPE = 'application/json; charset=utf8' JSON_CONTENT_TYPE = 'application/json; charset=utf8'
def raise_invalid_topology_version(conf):
msg = (_("Invalid value for qpid_topology_version: %d") %
conf.qpid_topology_version)
LOG.error(msg)
raise Exception(msg)
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
def __init__(self, session, callback, node_name, node_opts, def __init__(self, conf, session, callback, node_name, node_opts,
link_name, link_opts): link_name, link_opts):
"""Declare a queue on an amqp session. """Declare a queue on an amqp session.
@ -94,26 +112,38 @@ class ConsumerBase(object):
self.receiver = None self.receiver = None
self.session = None self.session = None
addr_opts = { if conf.qpid_topology_version == 1:
"create": "always", addr_opts = {
"node": { "create": "always",
"type": "topic", "node": {
"x-declare": { "type": "topic",
"x-declare": {
"durable": True,
"auto-delete": True,
},
},
"link": {
"name": link_name,
"durable": True, "durable": True,
"auto-delete": True, "x-declare": {
"durable": False,
"auto-delete": True,
"exclusive": False,
},
}, },
}, }
"link": { addr_opts["node"]["x-declare"].update(node_opts)
"name": link_name, elif conf.qpid_topology_version == 2:
"durable": True, addr_opts = {
"x-declare": { "link": {
"durable": False, "x-declare": {
"auto-delete": True, "auto-delete": True,
"exclusive": False, },
}, },
}, }
} else:
addr_opts["node"]["x-declare"].update(node_opts) raise_invalid_topology_version()
addr_opts["link"]["x-declare"].update(link_opts) addr_opts["link"]["x-declare"].update(link_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
@ -169,11 +199,24 @@ class DirectConsumer(ConsumerBase):
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
super(DirectConsumer, self).__init__(session, callback, link_opts = {
"%s/%s" % (msg_id, msg_id), "auto-delete": conf.amqp_auto_delete,
{"type": "direct"}, "exclusive": True,
msg_id, "durable": conf.amqp_durable_queues,
{"exclusive": True}) }
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (msg_id, msg_id)
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()
super(DirectConsumer, self).__init__(conf, session, callback,
node_name, node_opts, msg_id,
link_opts)
class TopicConsumer(ConsumerBase): class TopicConsumer(ConsumerBase):
@ -191,9 +234,20 @@ class TopicConsumer(ConsumerBase):
""" """
exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf) exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
super(TopicConsumer, self).__init__(session, callback, link_opts = {
"%s/%s" % (exchange_name, topic), "auto-delete": conf.amqp_auto_delete,
{}, name or topic, {}) "durable": conf.amqp_durable_queues,
}
if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(TopicConsumer, self).__init__(conf, session, callback, node_name,
{}, name or topic, link_opts)
class FanoutConsumer(ConsumerBase): class FanoutConsumer(ConsumerBase):
@ -207,40 +261,55 @@ class FanoutConsumer(ConsumerBase):
'callback' is the callback to call when messages are received 'callback' is the callback to call when messages are received
""" """
super(FanoutConsumer, self).__init__( link_opts = {"exclusive": True}
session, callback,
"%s_fanout" % topic, if conf.qpid_topology_version == 1:
{"durable": False, "type": "fanout"}, node_name = "%s_fanout" % topic
"%s_fanout_%s" % (topic, uuid.uuid4().hex), node_opts = {"durable": False, "type": "fanout"}
{"exclusive": True}) link_name = "%s_fanout_%s" % (topic, uuid.uuid4().hex)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
link_name = ""
else:
raise_invalid_topology_version()
super(FanoutConsumer, self).__init__(conf, session, callback,
node_name, node_opts, link_name,
link_opts)
class Publisher(object): class Publisher(object):
"""Base Publisher class.""" """Base Publisher class."""
def __init__(self, session, node_name, node_opts=None): def __init__(self, conf, session, node_name, node_opts=None):
"""Init the Publisher class with the exchange_name, routing_key, """Init the Publisher class with the exchange_name, routing_key,
and other options and other options
""" """
self.sender = None self.sender = None
self.session = session self.session = session
addr_opts = { if conf.qpid_topology_version == 1:
"create": "always", addr_opts = {
"node": { "create": "always",
"type": "topic", "node": {
"x-declare": { "type": "topic",
"durable": False, "x-declare": {
# auto-delete isn't implemented for exchanges in qpid, "durable": False,
# but put in here anyway # auto-delete isn't implemented for exchanges in qpid,
"auto-delete": True, # but put in here anyway
"auto-delete": True,
},
}, },
}, }
} if node_opts:
if node_opts: addr_opts["node"]["x-declare"].update(node_opts)
addr_opts["node"]["x-declare"].update(node_opts)
self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts))
elif conf.qpid_topology_version == 2:
self.address = node_name
else:
raise_invalid_topology_version()
self.reconnect(session) self.reconnect(session)
@ -284,8 +353,18 @@ class DirectPublisher(Publisher):
"""Publisher class for 'direct'.""" """Publisher class for 'direct'."""
def __init__(self, conf, session, msg_id): def __init__(self, conf, session, msg_id):
"""Init a 'direct' publisher.""" """Init a 'direct' publisher."""
super(DirectPublisher, self).__init__(session, msg_id,
{"type": "direct"}) if conf.qpid_topology_version == 1:
node_name = msg_id
node_opts = {"type": "direct"}
elif conf.qpid_topology_version == 2:
node_name = "amq.direct/%s" % msg_id
node_opts = {}
else:
raise_invalid_topology_version()
super(DirectPublisher, self).__init__(conf, session, node_name,
node_opts)
class TopicPublisher(Publisher): class TopicPublisher(Publisher):
@ -294,8 +373,15 @@ class TopicPublisher(Publisher):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
super(TopicPublisher, self).__init__(session,
"%s/%s" % (exchange_name, topic)) if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(TopicPublisher, self).__init__(conf, session, node_name)
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
@ -303,9 +389,18 @@ class FanoutPublisher(Publisher):
def __init__(self, conf, session, topic): def __init__(self, conf, session, topic):
"""init a 'fanout' publisher. """init a 'fanout' publisher.
""" """
super(FanoutPublisher, self).__init__(
session, if conf.qpid_topology_version == 1:
"%s_fanout" % topic, {"type": "fanout"}) node_name = "%s_fanout" % topic
node_opts = {"type": "fanout"}
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/fanout/%s" % topic
node_opts = {}
else:
raise_invalid_topology_version()
super(FanoutPublisher, self).__init__(conf, session, node_name,
node_opts)
class NotifyPublisher(Publisher): class NotifyPublisher(Publisher):
@ -314,9 +409,17 @@ class NotifyPublisher(Publisher):
"""init a 'topic' publisher. """init a 'topic' publisher.
""" """
exchange_name = rpc_amqp.get_control_exchange(conf) exchange_name = rpc_amqp.get_control_exchange(conf)
super(NotifyPublisher, self).__init__(session, node_opts = {"durable": True}
"%s/%s" % (exchange_name, topic),
{"durable": True}) if conf.qpid_topology_version == 1:
node_name = "%s/%s" % (exchange_name, topic)
elif conf.qpid_topology_version == 2:
node_name = "amq.topic/topic/%s/%s" % (exchange_name, topic)
else:
raise_invalid_topology_version()
super(NotifyPublisher, self).__init__(conf, session, node_name,
node_opts)
class Connection(object): class Connection(object):