From bdcf915e788bb368774e5462ccc15e6f5b7223d7 Mon Sep 17 00:00:00 2001 From: Nikita Kalyanov Date: Tue, 13 Jul 2021 00:03:31 +0300 Subject: [PATCH] limit maximum timeout in the poll loop We should properly limit the maximum timeout with a 'min' to avoid long delays before message processing. Such delays may happen if the connection to a RabbitMQ server is re-established at the same time when the message arrives (see attached bug for more info). Moreover, this change is in line with the original intent to actually have an upper limit on maximum possible timeout (see comments in code and in the original review). Closes-Bug: #1935864 Change-Id: Iebc8a96e868d938a5d250bf9d66d20746c63d3d5 --- oslo_messaging/_drivers/amqpdriver.py | 4 +-- .../tests/drivers/test_impl_rabbit.py | 27 +++++++++++++++++++ 2 files changed, 29 insertions(+), 2 deletions(-) diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index cdc21c5a5..589baf5e5 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -351,7 +351,7 @@ class AMQPListener(base.PollStyleListener): self.conn.consume(timeout=min(self._current_timeout, left)) except rpc_common.Timeout: LOG.debug("AMQPListener connection timeout") - self._current_timeout = max(self._current_timeout * 2, + self._current_timeout = min(self._current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) else: self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN @@ -490,7 +490,7 @@ class ReplyWaiter(object): # ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds self.conn.consume(timeout=current_timeout) except rpc_common.Timeout: - current_timeout = max(current_timeout * 2, + current_timeout = min(current_timeout * 2, ACK_REQUEUE_EVERY_SECONDS_MAX) except Exception: LOG.exception("Failed to process incoming message, retrying..") diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index e035150e4..f8882f27e 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -1080,3 +1080,30 @@ class ConnectionLockTestCase(test_utils.BaseTestCase): t2 = self._thread(lock, 1) self.assertAlmostEqual(1, t1(), places=0) self.assertAlmostEqual(2, t2(), places=0) + + +class TestPollTimeoutLimit(test_utils.BaseTestCase): + def test_poll_timeout_limit(self): + transport = oslo_messaging.get_transport(self.conf, + 'kombu+memory:////') + self.addCleanup(transport.cleanup) + driver = transport._driver + target = oslo_messaging.Target(topic='testtopic') + listener = driver.listen(target, None, None)._poll_style_listener + + thread = threading.Thread(target=listener.poll) + thread.daemon = True + thread.start() + time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2) + + try: + # timeout should not grow past the maximum + self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX, + listener._current_timeout) + + finally: + # gracefully stop waiting + driver.send(target, + {}, + {'tx_id': 'test'}) + thread.join()