Merge "limit maximum timeout in the poll loop"

This commit is contained in:
Zuul 2021-09-13 16:39:10 +00:00 committed by Gerrit Code Review
commit ef0b31f112
2 changed files with 29 additions and 2 deletions
oslo_messaging

@ -351,7 +351,7 @@ class AMQPListener(base.PollStyleListener):
self.conn.consume(timeout=min(self._current_timeout, left))
except rpc_common.Timeout:
LOG.debug("AMQPListener connection timeout")
self._current_timeout = max(self._current_timeout * 2,
self._current_timeout = min(self._current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
else:
self._current_timeout = ACK_REQUEUE_EVERY_SECONDS_MIN
@ -490,7 +490,7 @@ class ReplyWaiter(object):
# ack every ACK_REQUEUE_EVERY_SECONDS_MAX seconds
self.conn.consume(timeout=current_timeout)
except rpc_common.Timeout:
current_timeout = max(current_timeout * 2,
current_timeout = min(current_timeout * 2,
ACK_REQUEUE_EVERY_SECONDS_MAX)
except Exception:
LOG.exception("Failed to process incoming message, retrying..")

@ -1080,3 +1080,30 @@ class ConnectionLockTestCase(test_utils.BaseTestCase):
t2 = self._thread(lock, 1)
self.assertAlmostEqual(1, t1(), places=0)
self.assertAlmostEqual(2, t2(), places=0)
class TestPollTimeoutLimit(test_utils.BaseTestCase):
def test_poll_timeout_limit(self):
transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////')
self.addCleanup(transport.cleanup)
driver = transport._driver
target = oslo_messaging.Target(topic='testtopic')
listener = driver.listen(target, None, None)._poll_style_listener
thread = threading.Thread(target=listener.poll)
thread.daemon = True
thread.start()
time.sleep(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX * 2)
try:
# timeout should not grow past the maximum
self.assertEqual(amqpdriver.ACK_REQUEUE_EVERY_SECONDS_MAX,
listener._current_timeout)
finally:
# gracefully stop waiting
driver.send(target,
{},
{'tx_id': 'test'})
thread.join()