Retry to declare a queue after internal error
Without this commit, the client can lose the messages, because the
client does not handler the 'AMQP internal error 541',
read here [2] for details.
The fix retries to create the queue after a delay.
When the virtual-host is ready the declare does not fail.
This is a rare condiction, please read the bug [1] for details.
Closes-Bug: #1822778
[1] https://bugs.launchpad.net/oslo.messaging/+bug/1822778
[2] https://www.rabbitmq.com/amqp-0-9-1-reference.html
Change-Id: I7ab1f9d21ebb807285bf1422bc14cc6e07dcd32a
(cherry picked from commit 4d2787227b
)
This commit is contained in:

committed by
Gabriele Santomaggio

parent
7c00cc2c2c
commit
d75eba02c7
@@ -270,7 +270,6 @@ class Consumer(object):
|
|||||||
self.nowait = nowait
|
self.nowait = nowait
|
||||||
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
|
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
|
||||||
rabbit_queue_ttl)
|
rabbit_queue_ttl)
|
||||||
|
|
||||||
self.queue = None
|
self.queue = None
|
||||||
self._declared_on = None
|
self._declared_on = None
|
||||||
self.exchange = kombu.entity.Exchange(
|
self.exchange = kombu.entity.Exchange(
|
||||||
@@ -305,6 +304,28 @@ class Consumer(object):
|
|||||||
self.queue.declare()
|
self.queue.declare()
|
||||||
else:
|
else:
|
||||||
raise
|
raise
|
||||||
|
except kombu.exceptions.ConnectionError as exc:
|
||||||
|
# NOTE(gsantomaggio): This exception happens when the
|
||||||
|
# connection is established,but it fails to create the queue.
|
||||||
|
# Add some delay to avoid too many requests to the server.
|
||||||
|
# See: https://bugs.launchpad.net/oslo.messaging/+bug/1822778
|
||||||
|
# for details.
|
||||||
|
if exc.code == 541:
|
||||||
|
interval = 2
|
||||||
|
info = {'sleep_time': interval,
|
||||||
|
'queue': self.queue_name,
|
||||||
|
'err_str': exc
|
||||||
|
}
|
||||||
|
LOG.error(_LE('Internal amqp error (541) '
|
||||||
|
'during queue declare,'
|
||||||
|
'retrying in %(sleep_time)s seconds. '
|
||||||
|
'Queue: [%(queue)s], '
|
||||||
|
'error message: [%(err_str)s]'), info)
|
||||||
|
time.sleep(interval)
|
||||||
|
self.queue.declare()
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
self._declared_on = conn.channel
|
self._declared_on = conn.channel
|
||||||
|
|
||||||
def consume(self, conn, tag):
|
def consume(self, conn, tag):
|
||||||
|
Reference in New Issue
Block a user