From d8d564cd09ab5d4b07b6d9492b9014c024302433 Mon Sep 17 00:00:00 2001 From: kbespalov Date: Thu, 4 Aug 2016 15:18:25 +0300 Subject: [PATCH] Fix consuming from unbound reply queue Consumer declaration consist of the next steps: 1) declare an exchange 2) declare a queue 3) bind the queue to the exchange Due to reply exchanges are auto-delete, at the step 3 the exchange can be removed and consumer.declare() will raise `queue.bind 404 Exchange not found`. So, in this case the queue is exist and AMQPListener just call consumer.consume() on the queue and go to drain_events() despite on the fact that the queue is unbound. This change tries to redeclare queue/exchange proactively each times channel change and just before consuming messages. Co-Authored-By: Mehdi Abaakouk Closes-Bug: #1609766 Change-Id: Id8b48df3d26675d72955d417ce7622b1e8aa6195 (cherry picked from commit 3f4ce9470b3f9e0502f61345a6943adad2dadac9) --- oslo_messaging/_drivers/impl_rabbit.py | 64 ++++++++++++++------------ 1 file changed, 35 insertions(+), 29 deletions(-) diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index b51724040..badff50d5 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -277,6 +277,7 @@ class Consumer(object): rabbit_queue_ttl) self.queue = None + self._declared_on = None self.exchange = kombu.entity.Exchange( name=exchange_name, type=type, @@ -285,6 +286,7 @@ class Consumer(object): def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" + self.queue = kombu.entity.Queue( name=self.queue_name, channel=conn.channel, @@ -308,17 +310,41 @@ class Consumer(object): self.queue.declare() else: raise + self._declared_on = conn.channel - def consume(self, tag): + def consume(self, conn, tag): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the Connection.consume() will process the messages, calling the appropriate callback. """ - self.queue.consume(callback=self._callback, - consumer_tag=six.text_type(tag), - nowait=self.nowait) + # Ensure we are on the correct channel before consuming + if conn.channel != self._declared_on: + self.declare(conn) + try: + self.queue.consume(callback=self._callback, + consumer_tag=six.text_type(tag), + nowait=self.nowait) + except conn.connection.channel_errors as exc: + # We retries once because of some races that we can + # recover before informing the deployer + # bugs.launchpad.net/oslo.messaging/+bug/1581148 + # bugs.launchpad.net/oslo.messaging/+bug/1609766 + # bugs.launchpad.net/neutron/+bug/1318721 + + # At any channel error, the RabbitMQ closes + # the channel, but the amqp-lib quietly re-open + # it. So, we must reset all tags and declare + # all consumers again. + conn._new_tags = set(conn._consumers.values()) + if exc.code == 404: + self.declare(conn) + self.queue.consume(callback=self._callback, + consumer_tag=six.text_type(tag), + nowait=self.nowait) + else: + raise def cancel(self, tag): LOG.trace('ConsumerBase.cancel: canceling %s', tag) @@ -754,8 +780,6 @@ class Connection(object): self.set_transport_socket_timeout() self._set_current_channel(new_channel) - for consumer in self._consumers: - consumer.declare(self) LOG.info(_LI('[%(connection_id)s] Reconnected to AMQP server on ' '%(hostname)s:%(port)s via [%(transport)s] client' @@ -832,6 +856,8 @@ class Connection(object): if self.purpose == rpc_common.PURPOSE_LISTEN: self._set_qos(new_channel) self._producer = kombu.messaging.Producer(new_channel) + for consumer in self._consumers: + consumer.declare(self) def _set_qos(self, channel): """Set QoS prefetch count on the channel""" @@ -1042,31 +1068,11 @@ class Connection(object): if not self.connection.connected: raise self.connection.recoverable_connection_errors[0] - consume_max_retries = 2 - while self._new_tags and consume_max_retries: + while self._new_tags: for consumer, tag in self._consumers.items(): if tag in self._new_tags: - try: - consumer.consume(tag=tag) - self._new_tags.remove(tag) - except self.connection.channel_errors as exc: - # NOTE(kbespalov): during the interval between - # a queue declaration and consumer declaration - # the queue can disappear. In this case - # we must redeclare queue and try to re-consume. - # More details is here: - # bugs.launchpad.net/oslo.messaging/+bug/1581148 - if exc.code == 404 and consume_max_retries: - consumer.declare(self) - # NOTE(kbespalov): the broker closes a channel - # at any channel error. The py-amqp catches - # this situation and re-open a new channel. - # So, we must re-declare all consumers again. - self._new_tags = set(self._consumers.values()) - consume_max_retries -= 1 - break - else: - raise + consumer.consume(self, tag=tag) + self._new_tags.remove(tag) poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout))