Cancel consumer if queue down

Previously, we have switched to use default exchanges
to avoid excessive amounts of exchange not found messages.
But it does not actually solve the problem because
reply_* queue is already gone and agent will not receive callbacks.

after some debugging, I found under some circumstances
seems rabbitmq consumer does not receive basic cancel
signal when queue is already gone. This might due to
rabbitmq try to restart consumer when queue is down
(for example when split brain). In such cases,
it might be better to fail early.

by reading the code, seems like x-cancel-on-ha-failover
is not dedicated to mirror queues only, https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1894,
https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1926.

By failing early, in my own test setup,
I could solve a certain case of exchange not found problem.

Change-Id: I2ae53340783e4044dab58035bc0992dc08145b53
Related-bug: #1789177
Depends-On: https://review.opendev.org/#/c/747892/
(cherry picked from commit 196fa877a9)
(cherry picked from commit 0a432c7fb1)
(cherry picked from commit 5de11fa752)
This commit is contained in:
shenjiatong 2020-07-03 15:51:21 +08:00 committed by Stephen Finucane
parent 0c47ed4e1b
commit b2acc6663f
3 changed files with 65 additions and 31 deletions

View File

@ -151,6 +151,11 @@ rabbit_opts = [
default=2,
help='How often times during the heartbeat_timeout_threshold '
'we check the heartbeat.'),
cfg.BoolOpt('enable_cancel_on_failover',
default=False,
help="Enable x-cancel-on-ha-failover flag so that "
"rabbitmq server will cancel and notify consumers"
"when queue is down")
]
LOG = logging.getLogger(__name__)
@ -212,7 +217,8 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0):
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@ -234,10 +240,16 @@ class Consumer(object):
type=type,
durable=self.durable,
auto_delete=self.exchange_auto_delete)
self.enable_cancel_on_failover = enable_cancel_on_failover
def declare(self, conn):
"""Re-declare the queue after a rabbit (re)connect."""
consumer_arguments = None
if self.enable_cancel_on_failover:
consumer_arguments = {
"x-cancel-on-ha-failover": True}
self.queue = kombu.entity.Queue(
name=self.queue_name,
channel=conn.channel,
@ -245,7 +257,9 @@ class Consumer(object):
durable=self.durable,
auto_delete=self.queue_auto_delete,
routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue_arguments=self.queue_arguments,
consumer_arguments=consumer_arguments
)
try:
LOG.debug('[%s] Queue.declare: %s',
@ -448,6 +462,7 @@ class Connection(object):
driver_conf.kombu_missing_consumer_retry_timeout
self.kombu_failover_strategy = driver_conf.kombu_failover_strategy
self.kombu_compression = driver_conf.kombu_compression
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
if self.ssl:
self.ssl_version = driver_conf.ssl_version
@ -1061,31 +1076,35 @@ class Connection(object):
"""
# TODO(obondarev): use default exchange since T release
consumer = Consumer(exchange_name=topic,
queue_name=topic,
routing_key=topic,
type='direct',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
consumer = Consumer(
exchange_name=topic,
queue_name=topic,
routing_key=topic,
type='direct',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
def declare_topic_consumer(self, exchange_name, topic, callback=None,
queue_name=None):
"""Create a 'topic' consumer."""
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues)
consumer = Consumer(
exchange_name=exchange_name,
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)
@ -1096,16 +1115,18 @@ class Connection(object):
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
consumer = Consumer(exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl)
consumer = Consumer(
exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=False,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)
self.declare_consumer(consumer)

View File

@ -39,6 +39,12 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
]
def test_failover_scenario(self):
self._test_failover_scenario()
def test_failover_scenario_enable_cancel_on_failover(self):
self._test_failover_scenario(enable_cancel_on_failover=True)
def _test_failover_scenario(self, enable_cancel_on_failover=False):
# NOTE(sileht): run this test only if functional suite run of a driver
# that use rabbitmq as backend
self.driver = os.environ.get('TRANSPORT_DRIVER')
@ -53,6 +59,7 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase):
kombu_reconnect_delay=0,
rabbit_retry_interval=0,
rabbit_retry_backoff=0,
enable_cancel_on_failover=enable_cancel_on_failover,
group='oslo_messaging_rabbit')
self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True,

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Add a new option `enable_cancel_on_failover` for rabbitmq driver
which when enabled, will cancel consumers when queue appears
to be down.