Merge "Retry to declare a queue after internal error"

This commit is contained in:
Zuul 2019-04-22 22:23:07 +00:00 committed by Gerrit Code Review
commit 433d34ed41

View File

@ -227,7 +227,6 @@ class Consumer(object):
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@ -262,6 +261,28 @@ class Consumer(object):
self.queue.declare()
else:
raise
except kombu.exceptions.ConnectionError as exc:
# NOTE(gsantomaggio): This exception happens when the
# connection is established,but it fails to create the queue.
# Add some delay to avoid too many requests to the server.
# See: https://bugs.launchpad.net/oslo.messaging/+bug/1822778
# for details.
if exc.code == 541:
interval = 2
info = {'sleep_time': interval,
'queue': self.queue_name,
'err_str': exc
}
LOG.error(_LE('Internal amqp error (541) '
'during queue declare,'
'retrying in %(sleep_time)s seconds. '
'Queue: [%(queue)s], '
'error message: [%(err_str)s]'), info)
time.sleep(interval)
self.queue.declare()
else:
raise
self._declared_on = conn.channel
def consume(self, conn, tag):