diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 8ac107808..a9d0ba248 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -489,6 +489,7 @@ class Connection(object): self._new_consumers = [] self._consume_loop_stopped = False self.channel = None + self.purpose = purpose # NOTE(sileht): if purpose is PURPOSE_LISTEN # we don't need the lock because we don't @@ -626,7 +627,6 @@ class Connection(object): # the kombu underlying connection works self._set_current_channel(None) self.ensure(method=lambda: self.connection.connection) - self._set_qos(self.channel) def ensure(self, method, retry=None, recoverable_error_callback=None, error_callback=None, @@ -695,7 +695,6 @@ class Connection(object): a new channel, we use it the reconfigure our consumers. """ self._set_current_channel(new_channel) - self._set_qos(new_channel) for consumer in self._consumers: consumer.declare(self) @@ -759,14 +758,22 @@ class Connection(object): NOTE(sileht): Must be called within the connection lock """ - if self.channel is not None and new_channel != self.channel: + if new_channel == self.channel: + return + + if self.channel is not None: self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None) self.connection.maybe_close_channel(self.channel) + self.channel = new_channel + if (new_channel is not None and + self.purpose == rpc_common.PURPOSE_LISTEN): + self._set_qos(new_channel) + def _set_qos(self, channel): """Set QoS prefetch count on the channel""" - if self.rabbit_qos_prefetch_count != 0: + if self.rabbit_qos_prefetch_count > 0: channel.basic_qos(0, self.rabbit_qos_prefetch_count, False) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index dbd1ecada..234273404 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -109,6 +109,31 @@ class TestHeartbeat(test_utils.BaseTestCase): 'trying to reconnect: %s') +class TestRabbitQos(test_utils.BaseTestCase): + + def connection_with(self, prefetch, purpose): + self.config(rabbit_qos_prefetch_count=prefetch, + group="oslo_messaging_rabbit") + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + transport._driver._get_connection(purpose) + + @mock.patch('kombu.transport.memory.Channel.basic_qos') + def test_qos_sent_on_listen_connection(self, fake_basic_qos): + self.connection_with(prefetch=1, purpose=driver_common.PURPOSE_LISTEN) + fake_basic_qos.assert_called_once_with(0, 1, False) + + @mock.patch('kombu.transport.memory.Channel.basic_qos') + def test_qos_not_sent_when_cfg_zero(self, fake_basic_qos): + self.connection_with(prefetch=0, purpose=driver_common.PURPOSE_LISTEN) + fake_basic_qos.assert_not_called() + + @mock.patch('kombu.transport.memory.Channel.basic_qos') + def test_qos_not_sent_on_send_connection(self, fake_basic_qos): + self.connection_with(prefetch=1, purpose=driver_common.PURPOSE_SEND) + fake_basic_qos.assert_not_called() + + class TestRabbitDriverLoad(test_utils.BaseTestCase): scenarios = [