From 042fef573e7baa1fec0a0772af00b1be2521f73e Mon Sep 17 00:00:00 2001 From: Kirill Bespalov Date: Fri, 29 Apr 2016 05:55:55 +0300 Subject: [PATCH] Use single producer and to avoid an exchange redeclaration Current implementation of the Connection class on each call of _publish method creates its own channel-bounded producer and if auto_declared is set to True then will be send exchange declaration request to the broker despite the fact that the exchange might be created by previous calls. It leads to the significant ~11-30% overhead. I got the following results with notification client & server in the simulator: - http://s32.postimg.org/jzgkbs1qt/notify_client.png I suggest to retain the sets of already declared exchanges and queues per each connection instance and to use a single channel-bounded producer. No needs to create a producer on each call, it is just wrapper over channel. Change-Id: I89564dd1aa079476c68c50979b6b79f8889db5d9 --- oslo_messaging/_drivers/impl_rabbit.py | 43 +++++++++++-------- .../tests/drivers/test_impl_rabbit.py | 37 ++++++++++------ 2 files changed, 48 insertions(+), 32 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 259e8d616..2db60d826 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import contextlib import errno import functools @@ -504,10 +503,17 @@ class Connection(object): self._initial_pid = os.getpid() self._consumers = {} + self._producer = None self._new_tags = set() self._active_tags = {} self._tags = itertools.count(1) + # Set of exchanges and queues declared on the channel to avoid + # unnecessary redeclaration. This set is resetted each time + # the connection is resetted in Connection._set_current_channel + self._declared_exchanges = set() + self._declared_queues = set() + self._consume_loop_stopped = False self.channel = None self.purpose = purpose @@ -790,14 +796,16 @@ class Connection(object): return if self.channel is not None: - self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None) + self._declared_queues.clear() + self._declared_exchanges.clear() self.connection.maybe_close_channel(self.channel) self.channel = new_channel - if (new_channel is not None and - self.purpose == rpc_common.PURPOSE_LISTEN): - self._set_qos(new_channel) + if new_channel is not None: + if self.purpose == rpc_common.PURPOSE_LISTEN: + self._set_qos(new_channel) + self._producer = kombu.messaging.Producer(new_channel) def _set_qos(self, channel): """Set QoS prefetch count on the channel""" @@ -1111,10 +1119,10 @@ class Connection(object): def _publish(self, exchange, msg, routing_key=None, timeout=None): """Publish a message.""" - producer = kombu.messaging.Producer(exchange=exchange, - channel=self.channel, - auto_declare=not exchange.passive, - routing_key=routing_key) + + if not (exchange.passive or exchange.name in self._declared_exchanges): + exchange(self.channel).declare() + self._declared_exchanges.add(exchange.name) log_info = {'msg': msg, 'who': exchange or 'default', @@ -1125,13 +1133,11 @@ class Connection(object): # NOTE(sileht): no need to wait more, caller expects # a answer before timeout is reached with self._transport_socket_timeout(timeout): - producer.publish(msg, expiration=self._get_expiration(timeout), - compression=self.kombu_compression) - - # List of notification queue declared on the channel to avoid - # unnecessary redeclaration. This list is resetted each time - # the connection is resetted in Connection._set_current_channel - PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set) + self._producer.publish(msg, + exchange=exchange, + routing_key=routing_key, + expiration=self._get_expiration(timeout), + compression=self.kombu_compression) def _publish_and_creates_default_queue(self, exchange, msg, routing_key=None, timeout=None): @@ -1151,8 +1157,7 @@ class Connection(object): # NOTE(sileht): We only do it once per reconnection # the Connection._set_current_channel() is responsible to clear # this cache - if (queue_indentifier not in - self.PUBLISHER_DECLARED_QUEUES[self.channel]): + if queue_indentifier not in self._declared_queues: queue = kombu.entity.Queue( channel=self.channel, exchange=exchange, @@ -1166,7 +1171,7 @@ class Connection(object): 'Connection._publish_and_creates_default_queue: ' 'declare queue %(key)s on %(exchange)s exchange', log_info) queue.declare() - self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) + self._declared_queues.add(queue_indentifier) self._publish(exchange, msg, routing_key=routing_key, timeout=timeout) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 62597d65a..e4f86917f 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -212,10 +212,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase): def test_send_with_timeout(self, fake_publish): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') + exchange_mock = mock.Mock() with transport._driver._get_connection( driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection - conn._publish(mock.Mock(), 'msg', routing_key='routing_key', + conn._publish(exchange_mock, 'msg', routing_key='routing_key', timeout=1) # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since @@ -226,23 +227,30 @@ class TestRabbitPublisher(test_utils.BaseTestCase): if versionutils.is_compatible('3.0.25', kombu_version): fake_publish.assert_called_with( 'msg', expiration=1, - compression=self.conf.oslo_messaging_rabbit.kombu_compression) + exchange=exchange_mock, + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + routing_key='routing_key') else: fake_publish.assert_called_with( 'msg', expiration=1000, - compression=self.conf.oslo_messaging_rabbit.kombu_compression) + exchange=exchange_mock, + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + routing_key='routing_key') @mock.patch('kombu.messaging.Producer.publish') def test_send_no_timeout(self, fake_publish): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') + exchange_mock = mock.Mock() with transport._driver._get_connection( driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection - conn._publish(mock.Mock(), 'msg', routing_key='routing_key') + conn._publish(exchange_mock, 'msg', routing_key='routing_key') fake_publish.assert_called_with( 'msg', expiration=None, - compression=self.conf.oslo_messaging_rabbit.kombu_compression) + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + exchange=exchange_mock, + routing_key='routing_key') def test_declared_queue_publisher(self): transport = oslo_messaging.get_transport(self.conf, @@ -277,14 +285,17 @@ class TestRabbitPublisher(test_utils.BaseTestCase): # Ensure it creates it try_send(e_passive) - with mock.patch('kombu.messaging.Producer', side_effect=exc): - # Should reset the cache and ensures the exchange does - # not exists - self.assertRaises(exc, try_send, e_passive) - # Recreate it - try_send(e_active) - # Ensure it have been recreated - try_send(e_passive) + with mock.patch('kombu.messaging.Producer.publish', + side_effect=exc): + # Ensure the exchange is already in cache + self.assertIn('foobar', conn._declared_exchanges) + # Reset connection + self.assertRaises(exc, try_send, e_passive) + # Ensure the cache is empty + self.assertEqual(0, len(conn._declared_exchanges)) + + try_send(e_active) + self.assertIn('foobar', conn._declared_exchanges) class TestRabbitConsume(test_utils.BaseTestCase):