diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 63a0a727b..43c7f57e3 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -640,20 +640,33 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" + timer = rpc_common.DecayingTimer(duration=timeout).start() + + def _raise_timeout(exc): + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() + def _error_callback(exc): - if isinstance(exc, qpid_exceptions.Empty): - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() - else: - LOG.exception(_('Failed to consume message from queue: %s'), - exc) + timer.check_return(_raise_timeout, exc) + LOG.exception(_('Failed to consume message from queue: %s'), exc) def _consume(): - nxt_receiver = self.session.next_receiver(timeout=timeout) + poll_timeout = 1 if timeout is None else min(timeout, 1) + while True: + try: + nxt_receiver = self.session.next_receiver( + timeout=poll_timeout) + except qpid_exceptions.Empty as exc: + poll_timeout = timer.check_return(_raise_timeout, exc, + maximum=1) + else: + break + try: self._lookup_consumer(nxt_receiver).consume() except Exception: - LOG.exception(_("Error processing message. Skipping it.")) + LOG.exception(_("Error processing message. " + "Skipping it.")) for iteration in itertools.count(0): if limit and iteration >= limit: