From 43a9dc1de58df6559be02dc9f9ae3f5eeb12cb7a Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Mon, 8 Dec 2014 10:28:12 +0100 Subject: [PATCH] qpid: honor iterconsume timeout The qpid driver must always honor the timeout passed the iterconsume method, this change fixes that. Related bug: #1400268 Related bug: #1399257 Change-Id: I8f267fc8b5a7abc852f0caf84d1e7c2c342ba951 --- oslo/messaging/_drivers/impl_qpid.py | 29 ++++++++++++++++++++-------- 1 file changed, 21 insertions(+), 8 deletions(-) diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index 63a0a727b..43c7f57e3 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -640,20 +640,33 @@ class Connection(object): def iterconsume(self, limit=None, timeout=None): """Return an iterator that will consume from all queues/consumers.""" + timer = rpc_common.DecayingTimer(duration=timeout).start() + + def _raise_timeout(exc): + LOG.debug('Timed out waiting for RPC response: %s', exc) + raise rpc_common.Timeout() + def _error_callback(exc): - if isinstance(exc, qpid_exceptions.Empty): - LOG.debug('Timed out waiting for RPC response: %s', exc) - raise rpc_common.Timeout() - else: - LOG.exception(_('Failed to consume message from queue: %s'), - exc) + timer.check_return(_raise_timeout, exc) + LOG.exception(_('Failed to consume message from queue: %s'), exc) def _consume(): - nxt_receiver = self.session.next_receiver(timeout=timeout) + poll_timeout = 1 if timeout is None else min(timeout, 1) + while True: + try: + nxt_receiver = self.session.next_receiver( + timeout=poll_timeout) + except qpid_exceptions.Empty as exc: + poll_timeout = timer.check_return(_raise_timeout, exc, + maximum=1) + else: + break + try: self._lookup_consumer(nxt_receiver).consume() except Exception: - LOG.exception(_("Error processing message. Skipping it.")) + LOG.exception(_("Error processing message. " + "Skipping it.")) for iteration in itertools.count(0): if limit and iteration >= limit: