From 4b2221b5d2bad7642f62d527948f5a50e2067087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Herv=C3=A9=20Beraud?= Date: Wed, 8 Sep 2021 08:34:13 +0200 Subject: [PATCH] Adding support for rabbitmq quorum queues https://www.rabbitmq.com/quorum-queues.html The quorum queue is a modern queue type for RabbitMQ implementing a durable, replicated FIFO queue based on the Raft consensus algorithm. It is available as of RabbitMQ 3.8.0. the quorum queues can not be set by policy so this should be done when declaring the queue. To declare a quorum queue set the x-queue-type queue argument to quorum (the default is classic). This argument must be provided by a client at queue declaration time; it cannot be set or changed using a policy. This is because policy definition or applicable policy can be changed dynamically but queue type cannot. It must be specified at the time of declaration. its good for the oslo messaging to add support for that type of queue that have multiple advantaged over mirroring. If quorum queues are sets mirrored queues will be ignored. Closes-Bug: #1942933 Change-Id: Id573e04c287e034e50626daf6e18a34735d45251 --- oslo_messaging/_drivers/impl_rabbit.py | 54 +++++++++++++++---- ...rt_for_quorum_queues-3101d055b492289e.yaml | 10 ++++ 2 files changed, 55 insertions(+), 9 deletions(-) create mode 100644 releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d99822d5..a7161ed17 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -149,6 +149,16 @@ rabbit_opts = [ 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """ """'{"ha-mode": "all"}' \""""), + cfg.BoolOpt('rabbit_quorum_queue', + default=False, + help='Try to use quorum queues in RabbitMQ ' + '(x-queue-type: quorum). ' + 'The quorum queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will take priority over ' + 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' + 'in other words it will disable HA queues.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -191,7 +201,8 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, + rabbit_quorum_queue): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -214,12 +225,26 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): Setting a queue TTL causes the queue to be automatically deleted if it is unused for the TTL duration. This is a helpful safeguard to prevent queues with zero consumers from growing without bound. + + If the rabbit_quorum_queue option is set, we try to declare a mirrored + queue as described here: + + https://www.rabbitmq.com/quorum-queues.html + + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will take priority over + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues will be ignored. """ args = {} - if rabbit_ha_queues: + if not rabbit_quorum_queue and rabbit_ha_queues: args['x-ha-policy'] = 'all' + if rabbit_quorum_queue: + args['x-queue-type'] = 'quorum' + if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -248,7 +273,7 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False): + rabbit_quorum_queue=None, enable_cancel_on_failover=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -262,7 +287,8 @@ class Consumer(object): self.type = type self.nowait = nowait self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl) + rabbit_queue_ttl, + rabbit_quorum_queue) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -475,6 +501,7 @@ class Connection(object): self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -682,6 +709,9 @@ class Connection(object): except KeyError: raise RuntimeError("Invalid SSL version : %s" % version) + def _durable(self): + return self.rabbit_quorum_queue or self.amqp_durable_queues + # NOTE(moguimar): default_password in this function's context is just # a fallback option, not a hardcoded password. def _transform_transport_url(self, url, host, default_username='', # nosec @@ -1145,12 +1175,13 @@ class Connection(object): queue_name=topic, routing_key='', type='direct', - durable=False, + durable=self._durable(), exchange_auto_delete=False, queue_auto_delete=False, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + rabbit_quorum_queue=self.rabbit_quorum_queue, enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1163,11 +1194,12 @@ class Connection(object): queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.amqp_durable_queues, + durable=self._durable(), exchange_auto_delete=self.amqp_auto_delete, queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_quorum_queue=self.rabbit_quorum_queue, enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1190,6 +1222,7 @@ class Connection(object): callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + rabbit_quorum_queue=self.rabbit_quorum_queue, enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1280,7 +1313,10 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) + queue_arguments=_get_queue_arguments( + self.rabbit_ha_queues, + 0, + self.rabbit_quorum_queue)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' @@ -1336,7 +1372,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self._durable(), auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, @@ -1358,7 +1394,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self._durable(), auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, diff --git a/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml new file mode 100644 index 000000000..1e92c7aca --- /dev/null +++ b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml @@ -0,0 +1,10 @@ +--- +features: + - | + Adding support for quorum queues. Quorum queues are enabled if the + ``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``). + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will take priority over + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues will be ignored.