rabbit: improvements to QoS
- Don't call _set_qos from both ensure_connection and on_reconnection, instead consolidate and only call from _set_current_channel - Only set QoS on PURPOSE_LISTEN connections. It's a waste of a roundtrip to call it on PURPOSE_SEND connections and slows things down unnecessarily - Guard against rabbit_qos_prefetch_count being set to a negative value - Tests, because we love them Change-Id: I365414c541d895dcd49ebcd32c3a456a92c392d6
This commit is contained in:
parent
eb4df4ecd0
commit
5954d2ad64
@ -489,6 +489,7 @@ class Connection(object):
|
|||||||
self._new_consumers = []
|
self._new_consumers = []
|
||||||
self._consume_loop_stopped = False
|
self._consume_loop_stopped = False
|
||||||
self.channel = None
|
self.channel = None
|
||||||
|
self.purpose = purpose
|
||||||
|
|
||||||
# NOTE(sileht): if purpose is PURPOSE_LISTEN
|
# NOTE(sileht): if purpose is PURPOSE_LISTEN
|
||||||
# we don't need the lock because we don't
|
# we don't need the lock because we don't
|
||||||
@ -626,7 +627,6 @@ class Connection(object):
|
|||||||
# the kombu underlying connection works
|
# the kombu underlying connection works
|
||||||
self._set_current_channel(None)
|
self._set_current_channel(None)
|
||||||
self.ensure(method=lambda: self.connection.connection)
|
self.ensure(method=lambda: self.connection.connection)
|
||||||
self._set_qos(self.channel)
|
|
||||||
|
|
||||||
def ensure(self, method, retry=None,
|
def ensure(self, method, retry=None,
|
||||||
recoverable_error_callback=None, error_callback=None,
|
recoverable_error_callback=None, error_callback=None,
|
||||||
@ -695,7 +695,6 @@ class Connection(object):
|
|||||||
a new channel, we use it the reconfigure our consumers.
|
a new channel, we use it the reconfigure our consumers.
|
||||||
"""
|
"""
|
||||||
self._set_current_channel(new_channel)
|
self._set_current_channel(new_channel)
|
||||||
self._set_qos(new_channel)
|
|
||||||
for consumer in self._consumers:
|
for consumer in self._consumers:
|
||||||
consumer.declare(self)
|
consumer.declare(self)
|
||||||
|
|
||||||
@ -759,14 +758,22 @@ class Connection(object):
|
|||||||
|
|
||||||
NOTE(sileht): Must be called within the connection lock
|
NOTE(sileht): Must be called within the connection lock
|
||||||
"""
|
"""
|
||||||
if self.channel is not None and new_channel != self.channel:
|
if new_channel == self.channel:
|
||||||
|
return
|
||||||
|
|
||||||
|
if self.channel is not None:
|
||||||
self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None)
|
self.PUBLISHER_DECLARED_QUEUES.pop(self.channel, None)
|
||||||
self.connection.maybe_close_channel(self.channel)
|
self.connection.maybe_close_channel(self.channel)
|
||||||
|
|
||||||
self.channel = new_channel
|
self.channel = new_channel
|
||||||
|
|
||||||
|
if (new_channel is not None and
|
||||||
|
self.purpose == rpc_common.PURPOSE_LISTEN):
|
||||||
|
self._set_qos(new_channel)
|
||||||
|
|
||||||
def _set_qos(self, channel):
|
def _set_qos(self, channel):
|
||||||
"""Set QoS prefetch count on the channel"""
|
"""Set QoS prefetch count on the channel"""
|
||||||
if self.rabbit_qos_prefetch_count != 0:
|
if self.rabbit_qos_prefetch_count > 0:
|
||||||
channel.basic_qos(0,
|
channel.basic_qos(0,
|
||||||
self.rabbit_qos_prefetch_count,
|
self.rabbit_qos_prefetch_count,
|
||||||
False)
|
False)
|
||||||
|
@ -109,6 +109,31 @@ class TestHeartbeat(test_utils.BaseTestCase):
|
|||||||
'trying to reconnect: %s')
|
'trying to reconnect: %s')
|
||||||
|
|
||||||
|
|
||||||
|
class TestRabbitQos(test_utils.BaseTestCase):
|
||||||
|
|
||||||
|
def connection_with(self, prefetch, purpose):
|
||||||
|
self.config(rabbit_qos_prefetch_count=prefetch,
|
||||||
|
group="oslo_messaging_rabbit")
|
||||||
|
transport = oslo_messaging.get_transport(self.conf,
|
||||||
|
'kombu+memory:////')
|
||||||
|
transport._driver._get_connection(purpose)
|
||||||
|
|
||||||
|
@mock.patch('kombu.transport.memory.Channel.basic_qos')
|
||||||
|
def test_qos_sent_on_listen_connection(self, fake_basic_qos):
|
||||||
|
self.connection_with(prefetch=1, purpose=driver_common.PURPOSE_LISTEN)
|
||||||
|
fake_basic_qos.assert_called_once_with(0, 1, False)
|
||||||
|
|
||||||
|
@mock.patch('kombu.transport.memory.Channel.basic_qos')
|
||||||
|
def test_qos_not_sent_when_cfg_zero(self, fake_basic_qos):
|
||||||
|
self.connection_with(prefetch=0, purpose=driver_common.PURPOSE_LISTEN)
|
||||||
|
fake_basic_qos.assert_not_called()
|
||||||
|
|
||||||
|
@mock.patch('kombu.transport.memory.Channel.basic_qos')
|
||||||
|
def test_qos_not_sent_on_send_connection(self, fake_basic_qos):
|
||||||
|
self.connection_with(prefetch=1, purpose=driver_common.PURPOSE_SEND)
|
||||||
|
fake_basic_qos.assert_not_called()
|
||||||
|
|
||||||
|
|
||||||
class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
class TestRabbitDriverLoad(test_utils.BaseTestCase):
|
||||||
|
|
||||||
scenarios = [
|
scenarios = [
|
||||||
|
Loading…
Reference in New Issue
Block a user