Fix consuming from unbound reply queue

Consumer declaration consist of the next steps:
  1) declare an exchange
  2) declare a queue
  3) bind the queue to the exchange

Due to reply exchanges are auto-delete, at the
step 3 the exchange can be removed and consumer.declare()
will raise `queue.bind 404 Exchange not found`.

So, in this case the queue is exist and AMQPListener
just call consumer.consume() on the queue and go to
drain_events() despite on the fact that the queue is
unbound.

This change tries to redeclare queue/exchange proactively
each times channel change and just before consuming messages.

Co-Authored-By: Mehdi Abaakouk <sileht@redhat.com>
Closes-Bug: #1609766
Change-Id: Id8b48df3d26675d72955d417ce7622b1e8aa6195
(cherry picked from commit 3f4ce9470b)
This commit is contained in:
kbespalov 2016-08-04 15:18:25 +03:00 committed by Mehdi Abaakouk (sileht)
parent f3c4c671cc
commit d8d564cd09
1 changed files with 35 additions and 29 deletions

View File

@ -277,6 +277,7 @@ class Consumer(object):
rabbit_queue_ttl)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
name=exchange_name,
type=type,
@ -285,6 +286,7 @@ class Consumer(object):
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@ -308,17 +310,41 @@ class Consumer(object):
self.queue.declare()
else:
raise
self._declared_on = conn.channel
def consume(self, tag):
def consume(self, conn, tag):
"""Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the
Connection.consume() will process the messages,
calling the appropriate callback.
"""
# Ensure we are on the correct channel before consuming
if conn.channel != self._declared_on:
self.declare(conn)
try:
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
except conn.connection.channel_errors as exc:
# We retries once because of some races that we can
# recover before informing the deployer
# bugs.launchpad.net/oslo.messaging/+bug/1581148
# bugs.launchpad.net/oslo.messaging/+bug/1609766
# bugs.launchpad.net/neutron/+bug/1318721
# At any channel error, the RabbitMQ closes
# the channel, but the amqp-lib quietly re-open
# it. So, we must reset all tags and declare
# all consumers again.
conn._new_tags = set(conn._consumers.values())
if exc.code == 404:
self.declare(conn)
self.queue.consume(callback=self._callback,
consumer_tag=six.text_type(tag),
nowait=self.nowait)
else:
raise
def cancel(self, tag):
LOG.trace('ConsumerBase.cancel: canceling %s', tag)
@ -754,8 +780,6 @@ class Connection(object):
self.set_transport_socket_timeout()
self._set_current_channel(new_channel)
for consumer in self._consumers:
consumer.declare(self)
LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on '
'%(hostname)s:%(port)s via [%(transport)s] client'
@ -832,6 +856,8 @@ class Connection(object):
if self.purpose == rpc_common.PURPOSE_LISTEN:
self._set_qos(new_channel)
self._producer = kombu.messaging.Producer(new_channel)
for consumer in self._consumers:
consumer.declare(self)
def _set_qos(self, channel):
"""Set QoS prefetch count on the channel"""
@ -1042,31 +1068,11 @@ class Connection(object):
if not self.connection.connected:
raise self.connection.recoverable_connection_errors[0]
consume_max_retries = 2
while self._new_tags and consume_max_retries:
while self._new_tags:
for consumer, tag in self._consumers.items():
if tag in self._new_tags:
try:
consumer.consume(tag=tag)
consumer.consume(self, tag=tag)
self._new_tags.remove(tag)
except self.connection.channel_errors as exc:
# NOTE(kbespalov): during the interval between
# a queue declaration and consumer declaration
# the queue can disappear. In this case
# we must redeclare queue and try to re-consume.
# More details is here:
# bugs.launchpad.net/oslo.messaging/+bug/1581148
if exc.code == 404 and consume_max_retries:
consumer.declare(self)
# NOTE(kbespalov): the broker closes a channel
# at any channel error. The py-amqp catches
# this situation and re-open a new channel.
# So, we must re-declare all consumers again.
self._new_tags = set(self._consumers.values())
consume_max_retries -= 1
break
else:
raise
poll_timeout = (self._poll_timeout if timeout is None
else min(timeout, self._poll_timeout))