diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 122b16607..8a3640a9a 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -154,6 +154,11 @@ rabbit_opts = [ default=2, help='How often times during the heartbeat_timeout_threshold ' 'we check the heartbeat.'), + cfg.BoolOpt('enable_cancel_on_failover', + default=False, + help="Enable x-cancel-on-ha-failover flag so that " + "rabbitmq server will cancel and notify consumers" + "when queue is down") ] LOG = logging.getLogger(__name__) @@ -215,7 +220,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, - nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0): + nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, + enable_cancel_on_failover=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -237,10 +243,16 @@ class Consumer(object): type=type, durable=self.durable, auto_delete=self.exchange_auto_delete) + self.enable_cancel_on_failover = enable_cancel_on_failover def declare(self, conn): """Re-declare the queue after a rabbit (re)connect.""" + consumer_arguments = None + if self.enable_cancel_on_failover: + consumer_arguments = { + "x-cancel-on-ha-failover": True} + self.queue = kombu.entity.Queue( name=self.queue_name, channel=conn.channel, @@ -248,7 +260,9 @@ class Consumer(object): durable=self.durable, auto_delete=self.queue_auto_delete, routing_key=self.routing_key, - queue_arguments=self.queue_arguments) + queue_arguments=self.queue_arguments, + consumer_arguments=consumer_arguments + ) try: LOG.debug('[%s] Queue.declare: %s', @@ -451,6 +465,7 @@ class Connection(object): driver_conf.kombu_missing_consumer_retry_timeout self.kombu_failover_strategy = driver_conf.kombu_failover_strategy self.kombu_compression = driver_conf.kombu_compression + self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover if self.ssl: self.ssl_version = driver_conf.ssl_version @@ -1065,31 +1080,35 @@ class Connection(object): """ # TODO(obondarev): use default exchange since T release - consumer = Consumer(exchange_name=topic, - queue_name=topic, - routing_key=topic, - type='direct', - durable=False, - exchange_auto_delete=True, - queue_auto_delete=False, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues, - rabbit_queue_ttl=self.rabbit_transient_queues_ttl) + consumer = Consumer( + exchange_name=topic, + queue_name=topic, + routing_key=topic, + type='direct', + durable=False, + exchange_auto_delete=True, + queue_auto_delete=False, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) def declare_topic_consumer(self, exchange_name, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - consumer = Consumer(exchange_name=exchange_name, - queue_name=queue_name or topic, - routing_key=topic, - type='topic', - durable=self.amqp_durable_queues, - exchange_auto_delete=self.amqp_auto_delete, - queue_auto_delete=self.amqp_auto_delete, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues) + consumer = Consumer( + exchange_name=exchange_name, + queue_name=queue_name or topic, + routing_key=topic, + type='topic', + durable=self.amqp_durable_queues, + exchange_auto_delete=self.amqp_auto_delete, + queue_auto_delete=self.amqp_auto_delete, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1100,16 +1119,18 @@ class Connection(object): exchange_name = '%s_fanout' % topic queue_name = '%s_fanout_%s' % (topic, unique) - consumer = Consumer(exchange_name=exchange_name, - queue_name=queue_name, - routing_key=topic, - type='fanout', - durable=False, - exchange_auto_delete=True, - queue_auto_delete=False, - callback=callback, - rabbit_ha_queues=self.rabbit_ha_queues, - rabbit_queue_ttl=self.rabbit_transient_queues_ttl) + consumer = Consumer( + exchange_name=exchange_name, + queue_name=queue_name, + routing_key=topic, + type='fanout', + durable=False, + exchange_auto_delete=True, + queue_auto_delete=False, + callback=callback, + rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) diff --git a/oslo_messaging/tests/functional/test_rabbitmq.py b/oslo_messaging/tests/functional/test_rabbitmq.py index db06d01ef..84f84e854 100644 --- a/oslo_messaging/tests/functional/test_rabbitmq.py +++ b/oslo_messaging/tests/functional/test_rabbitmq.py @@ -39,6 +39,12 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase): ] def test_failover_scenario(self): + self._test_failover_scenario() + + def test_failover_scenario_enable_cancel_on_failover(self): + self._test_failover_scenario(enable_cancel_on_failover=True) + + def _test_failover_scenario(self, enable_cancel_on_failover=False): # NOTE(sileht): run this test only if functional suite run of a driver # that use rabbitmq as backend self.driver = os.environ.get('TRANSPORT_DRIVER') @@ -53,6 +59,7 @@ class RabbitMQFailoverTests(test_utils.BaseTestCase): kombu_reconnect_delay=0, rabbit_retry_interval=0, rabbit_retry_backoff=0, + enable_cancel_on_failover=enable_cancel_on_failover, group='oslo_messaging_rabbit') self.pifpaf = self.useFixture(rabbitmq.RabbitMQDriver(cluster=True, diff --git a/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml new file mode 100644 index 000000000..affab65b2 --- /dev/null +++ b/releasenotes/notes/add-enable_cancel_on_failover-22ac472b93dd3a23.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Add a new option `enable_cancel_on_failover` for rabbitmq driver + which when enabled, will cancel consumers when queue appears + to be down.