limit maximum timeout in the poll loop

We should properly limit the maximum timeout with a 'min' to avoid
long delays before message processing. Such delays may happen if
the connection to a RabbitMQ server is re-established at the same
time when the message arrives (see attached bug for more info).

Moreover, this change is in line with the original intent to
actually have an upper limit on maximum possible timeout (see
comments in code and in the original review).

Closes-Bug: #1935864
Change-Id: Iebc8a96e868d938a5d250bf9d66d20746c63d3d5
This commit is contained in:
Nikita Kalyanov 2021-07-13 00:03:31 +03:00
parent 9ab3f4f30d
commit bdcf915e78
2 changed files with 29 additions and 2 deletions

View File

@ -351,7 +351,7 @@ class AMQPListener(base.PollStyleListener):
self.conn.consume(timeout=min(self._current_timeout, left))
except rpc_common.Timeout:
LOG.debug("AMQPListener connection timeout")
self._current_timeout = max(self._current_timeout * 2,
self._current_timeout = min(self._current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
@ -490,7 +490,7 @@ class ReplyWaiter(object):
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
except rpc_common.Timeout:
current_timeout = max(current_timeout * 2,
current_timeout = min(current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
except Exception:
LOG.exception("Failed to process incoming message, retrying..")

View File

@ -1080,3 +1080,30 @@ class ConnectionLockTestCase(test_utils.BaseTestCase):
t2 = self._thread(lock, 1)
self.assertAlmostEqual(1, t1(), places=0)
self.assertAlmostEqual(2, t2(), places=0)
class TestPollTimeoutLimit(test_utils.BaseTestCase):
def test_poll_timeout_limit(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = oslo_messaging.Target(topic='testtopic')
listener = driver.listen(target, None, None)._poll_style_listener
thread = threading.Thread(target=listener.poll)
thread.daemon = True
thread.start()
time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2)
try:
# timeout should not grow past the maximum
self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX,
listener._current_timeout)
finally:
# gracefully stop waiting
driver.send(target,
{},
{'tx_id': 'test'})
thread.join()