Browse Source

Correctly handle missing RabbitMQ queues

Currently, setting the '[oslo_messaging] direct_mandatory_flag' config
option to 'True' (the default) will result in a 'MessageUndeliverable'
exception being raised when sending a reply if a RabbitMQ queue is
missing [1]. It was the responsibility of the application to handle
this exception, however, many applications are not doing so. This has
resulted in a number of bug reports.

Start handling this error condition, using a retry loop to attempt to
resend the message and work around any temporary glitches. Since
attempting to send a reply will will no longer raise an exception,
there is little benefit in retaining the '[oslo_messaging]
direct_mandatory_flag' config option: users setting this to False will
simply not benefit from the retry logic and improved logging added
here. This option is already deprecated though and will be fully
removed in a future release.

[1] https://www.rabbitmq.com/channels.html

Change-Id: Id5cddbefbe24ef100f1cc522f44430df77d217cb
Closes-Bug: #1905965
(cherry picked from commit 4937949dff)
changes/18/776418/1 12.5.2
Hervé Beraud 3 months ago
parent
commit
391ce7fc69
4 changed files with 58 additions and 21 deletions
  1. +2
    -1
      doc/source/admin/rabbit.rst
  2. +48
    -20
      oslo_messaging/_drivers/amqpdriver.py
  3. +3
    -0
      oslo_messaging/_drivers/impl_rabbit.py
  4. +5
    -0
      releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml

+ 2
- 1
doc/source/admin/rabbit.rst View File

@ -66,7 +66,8 @@ flag is used`_.
through the *Connection* class.
With mandatory flag RabbitMQ raises a callback if the message is not routed to
any queue.
any queue. This callback will be used to loop for a timeout and let's a chance
to sender to recover.
.. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
.. _queues: https://www.rabbitmq.com/queues.html


+ 48
- 20
oslo_messaging/_drivers/amqpdriver.py View File

@ -145,39 +145,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
while True:
try:
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
rpc_common.PURPOSE_SEND,
) as conn:
self._send_reply(conn, reply, failure)
return
except rpc_amqp.AMQPDestinationNotFound:
if timer.check_return() > 0:
LOG.debug(("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue doesn't exist, "
"retrying..."), {
'msg_id': self.msg_id,
'reply_q': self.reply_q})
time.sleep(0.25)
else:
except oslo_messaging.MessageUndeliverable:
# queue not found
if timer.check_return() <= 0:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
infos = {
LOG.error(
'The reply %(msg_id)s failed to send after '
'%(duration)d seconds due to a missing queue '
'(%(reply_q)s). Abandoning...', {
'msg_id': self.msg_id,
'duration': duration,
'reply_q': self.reply_q})
return
LOG.debug(
'The reply %(msg_id)s could not be sent due to a missing '
'queue (%(reply_q)s). Retrying...', {
'msg_id': self.msg_id,
'reply_q': self.reply_q,
'duration': duration
}
LOG.info("The reply %(msg_id)s cannot be sent "
"%(reply_q)s reply queue don't exist after "
"%(duration)s sec abandoning...", infos)
'reply_q': self.reply_q})
time.sleep(0.25)
except rpc_amqp.AMQPDestinationNotFound as exc:
# exchange not found/down
if timer.check_return() <= 0:
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
LOG.error(
'The reply %(msg_id)s failed to send after '
'%(duration)d seconds due to a broker issue '
'(%(exc)s). Abandoning...', {
'msg_id': self.msg_id,
'duration': duration,
'exc': exc})
return
LOG.debug(
'The reply %(msg_id)s could not be sent due to a broker '
'issue (%(exc)s). Retrying...', {
'msg_id': self.msg_id,
'exc': exc})
time.sleep(0.25)
def heartbeat(self):
# generate a keep alive for RPC call monitoring
with self.listener.driver._get_connection(
rpc_common.PURPOSE_SEND) as conn:
rpc_common.PURPOSE_SEND,
) as conn:
try:
self._send_reply(conn, None, None, ending=False)
except oslo_messaging.MessageUndeliverable:
# internal exception that indicates queue gone -
# broker unreachable.
raise MessageDeliveryFailure(
"Heartbeat send failed. Missing queue")
except rpc_amqp.AMQPDestinationNotFound:
# internal exception that indicates queue/exchange gone -
# internal exception that indicates exchange gone -
# broker unreachable.
raise MessageDeliveryFailure("Heartbeat send failed")
raise MessageDeliveryFailure(
"Heartbeat send failed. Missing exchange")
# NOTE(sileht): Those have already be ack in RpcListener IO thread
# We keep them as noop until all drivers do the same


+ 3
- 0
oslo_messaging/_drivers/impl_rabbit.py View File

@ -175,6 +175,8 @@ rabbit_opts = [
'flag for direct send. The direct send is used as reply, '
'so the MessageUndeliverable exception is raised '
'in case the client queue does not exist.'
'MessageUndeliverable exception will be used to loop for a '
'timeout to lets a chance to sender to recover.'
'This flag is deprecated and it will not be possible to '
'deactivate this functionality anymore'),
cfg.BoolOpt('enable_cancel_on_failover',
@ -514,6 +516,7 @@ class Connection(object):
# if it was already monkey patched by eventlet/greenlet.
global threading
threading = stdlib_threading
self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
if self.ssl:


+ 5
- 0
releasenotes/notes/handle-missing-queue-553a803f94976be7.yaml View File

@ -0,0 +1,5 @@
---
features:
- |
Adding retry strategy based on the mandatory flag. Missing exchanges and
queues are now identified separately for logging purposes.

Loading…
Cancel
Save