From 9ff4d9982354c06495dbabbc8fa8455de02a49cc Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Fri, 3 Jan 2014 11:15:44 +0100 Subject: [PATCH] Fix duplicate topic messages for Qpid topology=2 Manually applied ef406a21782134aeefb944f74b3f1a47d6169318 from oslo-incubator to get the fix required for bug 1257293. Copying the original commit message from oslo-incubator below. """ When multiple RPC servers (consumers) are subscribed to the same RPC topic, a single RPC request to that topic should be received by only one of the consumers. A bug in the QPID driver caused every consumer to receive a copy of the RPC request. This bug affects only Topology version 2. This patch will cause a single queue to be created for each topic, and shared among all consumers of that topic. This results in each RPC request being received by only one consumer, in turn across all the competing consumers. """ Change-Id: I76bfa5b48bad4a70fbf06b74f4cc8234af6610c2 Closes-bug: #1257293. --- oslo/messaging/_drivers/impl_qpid.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index b3717b9a4..c8366dd93 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -131,14 +131,13 @@ class ConsumerBase(object): }, }, } - if link_name: - addr_opts["link"]["name"] = link_name addr_opts["node"]["x-declare"].update(node_opts) elif conf.qpid_topology_version == 2: addr_opts = { "link": { "x-declare": { "auto-delete": True, + "exclusive": False, }, }, } @@ -146,6 +145,8 @@ class ConsumerBase(object): raise_invalid_topology_version(conf) addr_opts["link"]["x-declare"].update(link_opts) + if link_name: + addr_opts["link"]["name"] = link_name self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) @@ -219,14 +220,16 @@ class DirectConsumer(ConsumerBase): if conf.qpid_topology_version == 1: node_name = "%s/%s" % (msg_id, msg_id) node_opts = {"type": "direct"} + link_name = msg_id elif conf.qpid_topology_version == 2: node_name = "amq.direct/%s" % msg_id node_opts = {} + link_name = None else: raise_invalid_topology_version(conf) super(DirectConsumer, self).__init__(conf, session, callback, - node_name, node_opts, msg_id, + node_name, node_opts, link_name, link_opts)