diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst index 72c438c9d..687bc42cf 100644 --- a/doc/source/admin/rabbit.rst +++ b/doc/source/admin/rabbit.rst @@ -66,7 +66,8 @@ flag is used`_. through the *Connection* class. With mandatory flag RabbitMQ raises a callback if the message is not routed to -any queue. +any queue. This callback will be used to loop for a timeout and let's a chance +to sender to recover. .. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges .. _queues: https://www.rabbitmq.com/queues.html diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index 18bcd2422..d4658cd0d 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -143,39 +143,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage): while True: try: with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: self._send_reply(conn, reply, failure) + return - except rpc_amqp.AMQPDestinationNotFound: - if timer.check_return() > 0: - LOG.debug(("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue doesn't exist, " - "retrying..."), { - 'msg_id': self.msg_id, - 'reply_q': self.reply_q}) - time.sleep(0.25) - else: + except oslo_messaging.MessageUndeliverable: + # queue not found + if timer.check_return() <= 0: self._obsolete_reply_queues.add(self.reply_q, self.msg_id) - infos = { - 'msg_id': self.msg_id, - 'reply_q': self.reply_q, - 'duration': duration - } - LOG.info("The reply %(msg_id)s cannot be sent " - "%(reply_q)s reply queue don't exist after " - "%(duration)s sec abandoning...", infos) + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a missing queue ' + '(%(reply_q)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'reply_q': self.reply_q}) return + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a missing ' + 'queue (%(reply_q)s). Retrying...', { + 'msg_id': self.msg_id, + 'reply_q': self.reply_q}) + time.sleep(0.25) + except rpc_amqp.AMQPDestinationNotFound as exc: + # exchange not found/down + if timer.check_return() <= 0: + self._obsolete_reply_queues.add(self.reply_q, self.msg_id) + LOG.error( + 'The reply %(msg_id)s failed to send after ' + '%(duration)d seconds due to a broker issue ' + '(%(exc)s). Abandoning...', { + 'msg_id': self.msg_id, + 'duration': duration, + 'exc': exc}) + return + + LOG.debug( + 'The reply %(msg_id)s could not be sent due to a broker ' + 'issue (%(exc)s). Retrying...', { + 'msg_id': self.msg_id, + 'exc': exc}) + time.sleep(0.25) + def heartbeat(self): # generate a keep alive for RPC call monitoring with self.listener.driver._get_connection( - rpc_common.PURPOSE_SEND) as conn: + rpc_common.PURPOSE_SEND, + ) as conn: try: self._send_reply(conn, None, None, ending=False) - except rpc_amqp.AMQPDestinationNotFound: - # internal exception that indicates queue/exchange gone - + except oslo_messaging.MessageUndeliverable: + # internal exception that indicates queue gone - # broker unreachable. - raise MessageDeliveryFailure("Heartbeat send failed") + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing queue") + except rpc_amqp.AMQPDestinationNotFound: + # internal exception that indicates exchange gone - + # broker unreachable. + raise MessageDeliveryFailure( + "Heartbeat send failed. Missing exchange") # NOTE(sileht): Those have already be ack in RpcListener IO thread # We keep them as noop until all drivers do the same diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 504fb716f..6b89f89de 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -176,6 +176,8 @@ rabbit_opts = [ 'flag for direct send. The direct send is used as reply, ' 'so the MessageUndeliverable exception is raised ' 'in case the client queue does not exist.' + 'MessageUndeliverable exception will be used to loop for a ' + 'timeout to lets a chance to sender to recover.' 'This flag is deprecated and it will not be possible to ' 'deactivate this functionality anymore'), cfg.BoolOpt('enable_cancel_on_failover', @@ -517,6 +519,7 @@ class Connection(object): # if it was already monkey patched by eventlet/greenlet. global threading threading = stdlib_threading + self.direct_mandatory_flag = driver_conf.direct_mandatory_flag if self.ssl: diff --git a/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml new file mode 100644 index 000000000..0407e6238 --- /dev/null +++ b/releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml @@ -0,0 +1,5 @@ +--- +features: + - | + Adding retry strategy based on the mandatory flag. Missing exchanges and + queues are now identified separately for logging purposes.