Merge "Correctly handle missing RabbitMQ queues"
This commit is contained in:
commit
11a49a0a3e
@ -66,7 +66,8 @@ flag is used`_.
|
||||
through the *Connection* class.
|
||||
|
||||
With mandatory flag RabbitMQ raises a callback if the message is not routed to
|
||||
any queue.
|
||||
any queue. This callback will be used to loop for a timeout and let's a chance
|
||||
to sender to recover.
|
||||
|
||||
.. _Exchange is a AMQP mechanism: https://www.rabbitmq.com/tutorials/amqp-concepts.html#exchanges
|
||||
.. _queues: https://www.rabbitmq.com/queues.html
|
||||
|
@ -145,39 +145,67 @@ class AMQPIncomingMessage(base.RpcIncomingMessage):
|
||||
while True:
|
||||
try:
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
rpc_common.PURPOSE_SEND,
|
||||
) as conn:
|
||||
self._send_reply(conn, reply, failure)
|
||||
|
||||
return
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
if timer.check_return() > 0:
|
||||
LOG.debug(("The reply %(msg_id)s cannot be sent "
|
||||
"%(reply_q)s reply queue doesn't exist, "
|
||||
"retrying..."), {
|
||||
except oslo_messaging.MessageUndeliverable:
|
||||
# queue not found
|
||||
if timer.check_return() <= 0:
|
||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||
LOG.error(
|
||||
'The reply %(msg_id)s failed to send after '
|
||||
'%(duration)d seconds due to a missing queue '
|
||||
'(%(reply_q)s). Abandoning...', {
|
||||
'msg_id': self.msg_id,
|
||||
'duration': duration,
|
||||
'reply_q': self.reply_q})
|
||||
return
|
||||
|
||||
LOG.debug(
|
||||
'The reply %(msg_id)s could not be sent due to a missing '
|
||||
'queue (%(reply_q)s). Retrying...', {
|
||||
'msg_id': self.msg_id,
|
||||
'reply_q': self.reply_q})
|
||||
time.sleep(0.25)
|
||||
else:
|
||||
except rpc_amqp.AMQPDestinationNotFound as exc:
|
||||
# exchange not found/down
|
||||
if timer.check_return() <= 0:
|
||||
self._obsolete_reply_queues.add(self.reply_q, self.msg_id)
|
||||
infos = {
|
||||
LOG.error(
|
||||
'The reply %(msg_id)s failed to send after '
|
||||
'%(duration)d seconds due to a broker issue '
|
||||
'(%(exc)s). Abandoning...', {
|
||||
'msg_id': self.msg_id,
|
||||
'reply_q': self.reply_q,
|
||||
'duration': duration
|
||||
}
|
||||
LOG.info("The reply %(msg_id)s cannot be sent "
|
||||
"%(reply_q)s reply queue don't exist after "
|
||||
"%(duration)s sec abandoning...", infos)
|
||||
'duration': duration,
|
||||
'exc': exc})
|
||||
return
|
||||
|
||||
LOG.debug(
|
||||
'The reply %(msg_id)s could not be sent due to a broker '
|
||||
'issue (%(exc)s). Retrying...', {
|
||||
'msg_id': self.msg_id,
|
||||
'exc': exc})
|
||||
time.sleep(0.25)
|
||||
|
||||
def heartbeat(self):
|
||||
# generate a keep alive for RPC call monitoring
|
||||
with self.listener.driver._get_connection(
|
||||
rpc_common.PURPOSE_SEND) as conn:
|
||||
rpc_common.PURPOSE_SEND,
|
||||
) as conn:
|
||||
try:
|
||||
self._send_reply(conn, None, None, ending=False)
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
# internal exception that indicates queue/exchange gone -
|
||||
except oslo_messaging.MessageUndeliverable:
|
||||
# internal exception that indicates queue gone -
|
||||
# broker unreachable.
|
||||
raise MessageDeliveryFailure("Heartbeat send failed")
|
||||
raise MessageDeliveryFailure(
|
||||
"Heartbeat send failed. Missing queue")
|
||||
except rpc_amqp.AMQPDestinationNotFound:
|
||||
# internal exception that indicates exchange gone -
|
||||
# broker unreachable.
|
||||
raise MessageDeliveryFailure(
|
||||
"Heartbeat send failed. Missing exchange")
|
||||
|
||||
# NOTE(sileht): Those have already be ack in RpcListener IO thread
|
||||
# We keep them as noop until all drivers do the same
|
||||
|
@ -177,6 +177,8 @@ rabbit_opts = [
|
||||
'flag for direct send. The direct send is used as reply, '
|
||||
'so the MessageUndeliverable exception is raised '
|
||||
'in case the client queue does not exist.'
|
||||
'MessageUndeliverable exception will be used to loop for a '
|
||||
'timeout to lets a chance to sender to recover.'
|
||||
'This flag is deprecated and it will not be possible to '
|
||||
'deactivate this functionality anymore'),
|
||||
cfg.BoolOpt('enable_cancel_on_failover',
|
||||
@ -516,6 +518,7 @@ class Connection(object):
|
||||
# if it was already monkey patched by eventlet/greenlet.
|
||||
global threading
|
||||
threading = stdlib_threading
|
||||
|
||||
self.direct_mandatory_flag = driver_conf.direct_mandatory_flag
|
||||
|
||||
if self.ssl:
|
||||
|
@ -0,0 +1,5 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Adding retry strategy based on the mandatory flag. Missing exchanges and
|
||||
queues are now identified separately for logging purposes.
|
Loading…
x
Reference in New Issue
Block a user