From b2acc6663f6c3f60e07cdeb1eae97fd1210a4d81 Mon Sep 17 00:00:00 2001 From: shenjiatong Date: Fri, 3 Jul 2020 15:51:21 +0800 Subject: [PATCH] Cancel consumer if queue down Previously, we have switched to use default exchanges to avoid excessive amounts of exchange not found messages. But it does not actually solve the problem because reply_* queue is already gone and agent will not receive callbacks. after some debugging, I found under some circumstances seems rabbitmq consumer does not receive basic cancel signal when queue is already gone. This might due to rabbitmq try to restart consumer when queue is down (for example when split brain). In such cases, it might be better to fail early. by reading the code, seems like x-cancel-on-ha-failover is not dedicated to mirror queues only, https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1894, https://github.com/rabbitmq/rabbitmq-server/blob/master/src/rabbit_channel.erl#L1926. By failing early, in my own test setup, I could solve a certain case of exchange not found problem. Change-Id: I2ae53340783e4044dab58035bc0992dc08145b53 Related-bug: #1789177 Depends-On: https://review.opendev.org/#/c/747892/ (cherry picked from commit 196fa877a90d7eb0f82ec9e1c194eef3f98fc0b1) (cherry picked from commit 0a432c7fb107d04f7a41199fe9a8c4fbd344d009) (cherry picked from commit 5de11fa752ab8e37b95b1785f4c71210bf473f0c) --- oslo_messaging/_drivers/impl_rabbit.py | 83 ++++++++++++------- .../tests/functional/test_rabbitmq.py | 7 ++ ...e_cancel_on_failover-22ac472b93dd3a23.yaml | 6 ++ 3 files changed, 65 insertions(+), 31 deletions(-) create mode 100644 releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 54e4dc03e..b0e3ed44f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -151,6 +151,11 @@ rabbit_opts = [ default=2, help='How often times during the heartbeat_timeout_threshold ' 'we check the heartbeat.'), + cfg.BoolOpt('enable_cancel_on_failover', + default=False, + help="Enable x-cancel-on-ha-failover flag so that " + "rabbitmq server will cancel and notify consumers" + "when queue is down") ] LOG = logging.getLogger(__name__) @@ -212,7 +217,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, - nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0): + nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, + enable_cancel_on_failover=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -234,10 +240,16 @@ class Consumer(object): type=type, durable=self.durable, auto_delete=self.exchange_auto_delete) + self.enable_cancel_on_failover = enable_cancel_on_failover def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" + consumer_arguments = None + if self.enable_cancel_on_failover: + consumer_arguments = { + "x-cancel-on-ha-failover": True} + self.queue = kombu.entity.Queue( name=self.queue_name, channel=conn.channel, @@ -245,7 +257,9 @@ class Consumer(object): durable=self.durable, auto_delete=self.queue_auto_delete, routing_key=self.routing_key, - queue_arguments=self.queue_arguments) + queue_arguments=self.queue_arguments, + consumer_arguments=consumer_arguments + ) try: LOG.debug('[%s] Queue.declare: %s', @@ -448,6 +462,7 @@ class Connection(object): driver_conf.kombu_missing_consumer_retry_timeout self.kombu_failover_strategy = driver_conf.kombu_failover_strategy self.kombu_compression = driver_conf.kombu_compression + self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover if self.ssl: self.ssl_version = driver_conf.ssl_version @@ -1061,31 +1076,35 @@ class Connection(object): """ # TODO(obondarev): use default exchange since T release - consumer = Consumer(exchange_name=topic, - queue_name=topic, - routing_key=topic, - type='direct', - durable=False, - exchange_auto_delete=True, - queue_auto_delete=False, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues, - rabbit_queue_ttl=self.rabbit_transient_queues_ttl) + consumer = Consumer( + exchange_name=topic, + queue_name=topic, + routing_key=topic, + type='direct', + durable=False, + exchange_auto_delete=True, + queue_auto_delete=False, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) def declare_topic_consumer(self, exchange_name, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - consumer = Consumer(exchange_name=exchange_name, - queue_name=queue_name or topic, - routing_key=topic, - type='topic', - durable=self.amqp_durable_queues, - exchange_auto_delete=self.amqp_auto_delete, - queue_auto_delete=self.amqp_auto_delete, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + consumer = Consumer( + exchange_name=exchange_name, + queue_name=queue_name or topic, + routing_key=topic, + type='topic', + durable=self.amqp_durable_queues, + exchange_auto_delete=self.amqp_auto_delete, + queue_auto_delete=self.amqp_auto_delete, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1096,16 +1115,18 @@ class Connection(object): exchange_name = '%s_fanout' % topic queue_name = '%s_fanout_%s' % (topic, unique) - consumer = Consumer(exchange_name=exchange_name, - queue_name=queue_name, - routing_key=topic, - type='fanout', - durable=False, - exchange_auto_delete=True, - queue_auto_delete=False, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues, - rabbit_queue_ttl=self.rabbit_transient_queues_ttl) + consumer = Consumer( + exchange_name=exchange_name, + queue_name=queue_name, + routing_key=topic, + type='fanout', + durable=False, + exchange_auto_delete=True, + queue_auto_delete=False, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py index db06d01ef..84f84e854 100644 --- a/oslo_messaging/tests/functional/test_rabbitmq.py +++ b/oslo_messaging/tests/functional/test_rabbitmq.py @@ -39,6 +39,12 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase): ] def test_failover_scenario(self): + self._test_failover_scenario() + + def test_failover_scenario_enable_cancel_on_failover(self): + self._test_failover_scenario(enable_cancel_on_failover=True) + + def _test_failover_scenario(self, enable_cancel_on_failover=False): # NOTE(sileht): run this test only if functional suite run of a driver # that use rabbitmq as backend self.driver = os.environ.get('TRANSPORT_DRIVER') @@ -53,6 +59,7 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase): kombu_reconnect_delay=0, rabbit_retry_interval=0, rabbit_retry_backoff=0, + enable_cancel_on_failover=enable_cancel_on_failover, group='oslo_messaging_rabbit') self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True, diff --git a/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml new file mode 100644 index 000000000..affab65b2 --- /dev/null +++ b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Add a new option `enable_cancel_on_failover` for rabbitmq driver + which when enabled, will cancel consumers when queue appears + to be down.