Merge "qpid: honor iterconsume timeout"
This commit is contained in:
commit
9cc46bafb0
@ -640,20 +640,33 @@ class Connection(object):
|
||||
def iterconsume(self, limit=None, timeout=None):
|
||||
"""Return an iterator that will consume from all queues/consumers."""
|
||||
|
||||
timer = rpc_common.DecayingTimer(duration=timeout).start()
|
||||
|
||||
def _raise_timeout(exc):
|
||||
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
||||
raise rpc_common.Timeout()
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, qpid_exceptions.Empty):
|
||||
LOG.debug('Timed out waiting for RPC response: %s', exc)
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s'),
|
||||
exc)
|
||||
timer.check_return(_raise_timeout, exc)
|
||||
LOG.exception(_('Failed to consume message from queue: %s'), exc)
|
||||
|
||||
def _consume():
|
||||
nxt_receiver = self.session.next_receiver(timeout=timeout)
|
||||
poll_timeout = 1 if timeout is None else min(timeout, 1)
|
||||
while True:
|
||||
try:
|
||||
nxt_receiver = self.session.next_receiver(
|
||||
timeout=poll_timeout)
|
||||
except qpid_exceptions.Empty as exc:
|
||||
poll_timeout = timer.check_return(_raise_timeout, exc,
|
||||
maximum=1)
|
||||
else:
|
||||
break
|
||||
|
||||
try:
|
||||
self._lookup_consumer(nxt_receiver).consume()
|
||||
except Exception:
|
||||
LOG.exception(_("Error processing message. Skipping it."))
|
||||
LOG.exception(_("Error processing message. "
|
||||
"Skipping it."))
|
||||
|
||||
for iteration in itertools.count(0):
|
||||
if limit and iteration >= limit:
|
||||
|
Loading…
Reference in New Issue
Block a user