Browse Source

Fix reconnect race condition with RabbitMQ cluster

Retry Queue creation to workaround race condition
that may happen when both the client and broker race over
exchange creation and deletion respectively which happen only
when the Queue/Exchange were created with auto-delete flag.

Queues/Exchange declared with auto-delete instruct the Broker to
delete the Queue when the last Consumer disconnect from it, and
the Exchange when the last Queue is deleted from this Exchange.

Now in a RabbitMQ cluster setup, if the cluster node that we are
connected to go down, 2 things will happen:

 1. From RabbitMQ side, the Queues w/ auto-delete will be deleted
    from the other cluster nodes and then the Exchanges that the
    Queues are bind to if they were also created w/ auto-delete.
 2. From client side, client will reconnect to another cluster
    node and call queue.declare() which  create Exchanges then
    Queues then Binding in that order.

Now in a happy path the queues/exchanges will be deleted from the
broker before client start re-creating them again, but it also
possible that the client first start by creating queues/exchange
as part of the queue.declare() call, which are no-op operations
b/c they alreay existed, but before it could bind Queue to
Exchange, RabbitMQ nodes just received the 'signal' that the
queue doesn't have any consumer so it should be delete, and the
same with exchanges, which will lead to binding fail with
NotFound error.

Illustration of the time line from Client and RabbitMQ cluster
respectively when the race condition happen:

       e-declare(E)      q-declare(Q)       q-bind(Q, E)

Change-Id: Ideb73af6f246a8282780cdb204d675d5d4555bf0
Closes-Bug: #1318721
Jens Rosenboom 7 years ago
  1. 15


@ -161,7 +161,20 @@ class ConsumerBase(object): = channel
self.kwargs['channel'] = channel
self.queue = kombu.entity.Queue(**self.kwargs)
except Exception as e:
# NOTE: This exception may be triggered by a race condition.
# Simply retrying will solve the error most of the time and
# should work well enough as a workaround until the race condition
# itself can be fixed.
# TODO(jrosenboom): In order to be able to match the Execption
# more specifically, we have to refactor ConsumerBase to use
# 'channel_errors' of the kombu connection object that
# has created the channel.
# See for details.
LOG.exception(_("Declaring queue failed with (%s), retrying"), e)
def _callback_handler(self, message, callback):
"""Call callback with deserialized message.