diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 1db25e7fa..4a49f6ad5 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -496,12 +496,18 @@ class ReplyWaiters(object): self._wrn_threshold = 10 def get(self, msg_id, timeout): - try: - return self._queues[msg_id].get(block=True, timeout=timeout) - except queue.Empty: - raise oslo_messaging.MessagingTimeout( - 'Timed out waiting for a reply ' - 'to message ID %s' % msg_id) + watch = timeutils.StopWatch(duration=timeout) + watch.start() + while not watch.expired(): + try: + # NOTE(amorin) we can't use block=True + # See lp-2035113 + return self._queues[msg_id].get(block=False) + except queue.Empty: + time.sleep(0.5) + raise oslo_messaging.MessagingTimeout( + 'Timed out waiting for a reply ' + 'to message ID %s' % msg_id) def put(self, msg_id, message_data): LOG.info('Received RPC response for msg %s', msg_id)