diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 4c874ee0..ecf06de8 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,7 +12,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import contextlib import errno import functools @@ -505,10 +504,17 @@ class Connection(object): self._initial_pid = os.getpid() self._consumers = {} + self._producer = None self._new_tags = set() self._active_tags = {} self._tags = itertools.count(1) + # Set of exchanges and queues declared on the channel to avoid + # unnecessary redeclaration. This set is resetted each time + # the connection is resetted in Connection._set_current_channel + self._declared_exchanges = set() + self._declared_queues = set() + self._consume_loop_stopped = False self.channel = None self.purpose = purpose @@ -791,14 +797,16 @@ class Connection(object): return if self.channel is not None: - self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None) + self._declared_queues.clear() + self._declared_exchanges.clear() self.connection.maybe_close_channel(self.channel) self.channel = new_channel - if (new_channel is not None and - self.purpose == rpc_common.PURPOSE_LISTEN): - self._set_qos(new_channel) + if new_channel is not None: + if self.purpose == rpc_common.PURPOSE_LISTEN: + self._set_qos(new_channel) + self._producer = kombu.messaging.Producer(new_channel) def _set_qos(self, channel): """Set QoS prefetch count on the channel""" @@ -1117,10 +1125,10 @@ class Connection(object): def _publish(self, exchange, msg, routing_key=None, timeout=None): """Publish a message.""" - producer = kombu.messaging.Producer(exchange=exchange, - channel=self.channel, - auto_declare=not exchange.passive, - routing_key=routing_key) + + if not (exchange.passive or exchange.name in self._declared_exchanges): + exchange(self.channel).declare() + self._declared_exchanges.add(exchange.name) log_info = {'msg': msg, 'who': exchange or 'default', @@ -1131,13 +1139,11 @@ class Connection(object): # NOTE(sileht): no need to wait more, caller expects # a answer before timeout is reached with self._transport_socket_timeout(timeout): - producer.publish(msg, expiration=self._get_expiration(timeout), - compression=self.kombu_compression) - - # List of notification queue declared on the channel to avoid - # unnecessary redeclaration. This list is resetted each time - # the connection is resetted in Connection._set_current_channel - PUBLISHER_DECLARED_QUEUES = collections.defaultdict(set) + self._producer.publish(msg, + exchange=exchange, + routing_key=routing_key, + expiration=self._get_expiration(timeout), + compression=self.kombu_compression) def _publish_and_creates_default_queue(self, exchange, msg, routing_key=None, timeout=None): @@ -1157,8 +1163,7 @@ class Connection(object): # NOTE(sileht): We only do it once per reconnection # the Connection._set_current_channel() is responsible to clear # this cache - if (queue_indentifier not in - self.PUBLISHER_DECLARED_QUEUES[self.channel]): + if queue_indentifier not in self._declared_queues: queue = kombu.entity.Queue( channel=self.channel, exchange=exchange, @@ -1172,7 +1177,7 @@ class Connection(object): 'Connection._publish_and_creates_default_queue: ' 'declare queue %(key)s on %(exchange)s exchange', log_info) queue.declare() - self.PUBLISHER_DECLARED_QUEUES[self.channel].add(queue_indentifier) + self._declared_queues.add(queue_indentifier) self._publish(exchange, msg, routing_key=routing_key, timeout=timeout) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 62597d65..e4f86917 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -212,10 +212,11 @@ class TestRabbitPublisher(test_utils.BaseTestCase): def test_send_with_timeout(self, fake_publish): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') + exchange_mock = mock.Mock() with transport._driver._get_connection( driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection - conn._publish(mock.Mock(), 'msg', routing_key='routing_key', + conn._publish(exchange_mock, 'msg', routing_key='routing_key', timeout=1) # NOTE(gcb) kombu accept TTL as seconds instead of millisecond since @@ -226,23 +227,30 @@ class TestRabbitPublisher(test_utils.BaseTestCase): if versionutils.is_compatible('3.0.25', kombu_version): fake_publish.assert_called_with( 'msg', expiration=1, - compression=self.conf.oslo_messaging_rabbit.kombu_compression) + exchange=exchange_mock, + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + routing_key='routing_key') else: fake_publish.assert_called_with( 'msg', expiration=1000, - compression=self.conf.oslo_messaging_rabbit.kombu_compression) + exchange=exchange_mock, + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + routing_key='routing_key') @mock.patch('kombu.messaging.Producer.publish') def test_send_no_timeout(self, fake_publish): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') + exchange_mock = mock.Mock() with transport._driver._get_connection( driver_common.PURPOSE_SEND) as pool_conn: conn = pool_conn.connection - conn._publish(mock.Mock(), 'msg', routing_key='routing_key') + conn._publish(exchange_mock, 'msg', routing_key='routing_key') fake_publish.assert_called_with( 'msg', expiration=None, - compression=self.conf.oslo_messaging_rabbit.kombu_compression) + compression=self.conf.oslo_messaging_rabbit.kombu_compression, + exchange=exchange_mock, + routing_key='routing_key') def test_declared_queue_publisher(self): transport = oslo_messaging.get_transport(self.conf, @@ -277,14 +285,17 @@ class TestRabbitPublisher(test_utils.BaseTestCase): # Ensure it creates it try_send(e_passive) - with mock.patch('kombu.messaging.Producer', side_effect=exc): - # Should reset the cache and ensures the exchange does - # not exists - self.assertRaises(exc, try_send, e_passive) - # Recreate it - try_send(e_active) - # Ensure it have been recreated - try_send(e_passive) + with mock.patch('kombu.messaging.Producer.publish', + side_effect=exc): + # Ensure the exchange is already in cache + self.assertIn('foobar', conn._declared_exchanges) + # Reset connection + self.assertRaises(exc, try_send, e_passive) + # Ensure the cache is empty + self.assertEqual(0, len(conn._declared_exchanges)) + + try_send(e_active) + self.assertIn('foobar', conn._declared_exchanges) class TestRabbitConsume(test_utils.BaseTestCase):