From 2464ca0b200ef01403aa3ceab52fae2fae521183 Mon Sep 17 00:00:00 2001 From: Zhi Kun Liu Date: Thu, 7 Aug 2014 17:34:17 +0800 Subject: [PATCH] Add parameter to customize Qpid receiver capacity In Qpid rpc backend, receiver capacity is 1 and is hardcoding. User can not customize it. This patch adds a parameter to make user can specify receiver capacity in conf file. - Reference about Receiver Capacity(Prefetch) from Qpid Doc By default, receiver requests the next message from the server in response to each fetch call, resulting in messages being sent to the receiver one at a time. As in the case of sending, it is often desirable to avoid this roundtrip for each message. This can be achieved by allowing the receiver to prefetch messages in anticipation of fetch calls being made. The receiver needs to be able to store these prefetched messages, the number it can hold is controlled by the receivers capacity. DocImpact Change-Id: I966b512aba6bdd8e9c5cf65cae01bfd21f04f330 Closes-Bug: #1353914 --- oslo/messaging/_drivers/impl_qpid.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index f79fae6a7..623ede988 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -69,6 +69,9 @@ qpid_opts = [ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Whether to disable the Nagle algorithm.'), + cfg.IntOpt('qpid_receiver_capacity', + default=1, + help='The number of prefetched messages held by receiver.'), # NOTE(russellb) If any additional versions are added (beyond 1 and 2), # this file could probably use some additional refactoring so that the # differences between each version are split into different classes. @@ -125,6 +128,7 @@ class ConsumerBase(object): """ self.callback = callback self.receiver = None + self.rcv_capacity = conf.qpid_receiver_capacity self.session = None if conf.qpid_topology_version == 1: @@ -178,7 +182,7 @@ class ConsumerBase(object): def _declare_receiver(self, session): self.session = session self.receiver = session.receiver(self.address) - self.receiver.capacity = 1 + self.receiver.capacity = self.rcv_capacity def _unpack_json_msg(self, msg): """Load the JSON data in msg if msg.content_type indicates that it