Adding support for rabbitmq quorum queues

https://www.rabbitmq.com/quorum-queues.html

The quorum queue is a modern queue type for RabbitMQ implementing a
durable, replicated FIFO queue based on the Raft consensus algorithm. It
is available as of RabbitMQ 3.8.0.

the quorum queues can not be set by policy so this should be done when
declaring the queue.

To declare a quorum queue set the x-queue-type queue argument to quorum
(the default is classic). This argument must be provided by a client at
queue declaration time; it cannot be set or changed using a policy. This
is because policy definition or applicable policy can be changed
dynamically but queue type cannot. It must be specified at the time of
declaration.

its good for the oslo messaging to add support for that type of queue
that have multiple advantaged over mirroring.

If quorum queues are sets mirrored queues will be ignored.

Closes-Bug: #1942933
Change-Id: Id573e04c287e034e50626daf6e18a34735d45251
This commit is contained in:
Hervé Beraud 2021-09-08 08:34:13 +02:00 committed by hamza alqtaishat
parent f9de265f39
commit 9e2ae43834
3 changed files with 64 additions and 13 deletions

View File

@ -32,7 +32,8 @@ from oslo_messaging._drivers import common as rpc_common
amqp_opts = [
cfg.BoolOpt('amqp_durable_queues',
default=False,
help='Use durable queues in AMQP.'),
help='Use durable queues in AMQP. this will enabled '
'by default if rabbit_quorum_queue is enabled'),
cfg.BoolOpt('amqp_auto_delete',
default=False,
deprecated_group='DEFAULT',

View File

@ -148,7 +148,18 @@ rabbit_opts = [
'those with auto-generated names) are mirrored across all '
'nodes, run: '
"""\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """
"""'{"ha-mode": "all"}' \""""),
"""'{"ha-mode": "all"}' \""""
'this will be disabled if rabbit_quorum_queue is enabled '),
cfg.BoolOpt('rabbit_quorum_queue',
default=False,
help='Try to use quorum queues in RabbitMQ '
'(x-queue-type: quorum). '
'The quorum queue is a modern queue type for RabbitMQ '
'implementing a durable, replicated FIFO queue based on the '
'Raft consensus algorithm. It is available as of '
'RabbitMQ 3.8.0. If set this option will take priority over '
'the HA queues (``rabbit_ha_queues``) aka mirrored queues, '
'in other words it will disable HA queues.'),
cfg.IntOpt('rabbit_transient_queues_ttl',
min=1,
default=1800,
@ -191,7 +202,8 @@ rabbit_opts = [
LOG = logging.getLogger(__name__)
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue, durable):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@ -214,12 +226,29 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl):
Setting a queue TTL causes the queue to be automatically deleted
if it is unused for the TTL duration. This is a helpful safeguard
to prevent queues with zero consumers from growing without bound.
If the rabbit_quorum_queue option is set, we try to declare a mirrored
queue as described here:
https://www.rabbitmq.com/quorum-queues.html
Setting x-queue-type to quorum means that replicated FIFO queue based on
the Raft consensus algorithm will be used. It is available as of
RabbitMQ 3.8.0. If set this option will take priority over
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
in other words HA queues will be ignored.
durable parameter will be true if the queue is meant to survive RabbitMQ
restart like quorum queues which always durable
"""
args = {}
if rabbit_ha_queues:
if not rabbit_quorum_queue and rabbit_ha_queues:
args['x-ha-policy'] = 'all'
if rabbit_quorum_queue and durable:
args['x-queue-type'] = 'quorum'
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
@ -248,7 +277,7 @@ class Consumer(object):
def __init__(self, exchange_name, queue_name, routing_key, type, durable,
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False):
enable_cancel_on_failover=False, rabbit_quorum_queue=None):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@ -262,7 +291,9 @@ class Consumer(object):
self.type = type
self.nowait = nowait
self.queue_arguments = _get_queue_arguments(rabbit_ha_queues,
rabbit_queue_ttl)
rabbit_queue_ttl,
rabbit_quorum_queue,
durable)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@ -475,6 +506,7 @@ class Connection(object):
self.login_method = driver_conf.rabbit_login_method
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@ -483,6 +515,7 @@ class Connection(object):
self.heartbeat_rate = driver_conf.heartbeat_rate
self.kombu_reconnect_delay = driver_conf.kombu_reconnect_delay
self.amqp_durable_queues = driver_conf.amqp_durable_queues
self.durable = (self.amqp_durable_queues or self.rabbit_quorum_queue)
self.amqp_auto_delete = driver_conf.amqp_auto_delete
self.ssl = driver_conf.ssl
self.kombu_missing_consumer_retry_timeout = \
@ -1151,7 +1184,8 @@ class Connection(object):
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=False)
self.declare_consumer(consumer)
@ -1163,12 +1197,13 @@ class Connection(object):
queue_name=queue_name or topic,
routing_key=topic,
type='topic',
durable=self.amqp_durable_queues,
durable=self.durable,
exchange_auto_delete=self.amqp_auto_delete,
queue_auto_delete=self.amqp_auto_delete,
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
enable_cancel_on_failover=self.enable_cancel_on_failover)
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_quorum_queue)
self.declare_consumer(consumer)
@ -1190,7 +1225,8 @@ class Connection(object):
callback=callback,
rabbit_ha_queues=self.rabbit_ha_queues,
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover)
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=False)
self.declare_consumer(consumer)
@ -1280,7 +1316,11 @@ class Connection(object):
auto_delete=exchange.auto_delete,
name=routing_key,
routing_key=routing_key,
queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0))
queue_arguments=_get_queue_arguments(
self.rabbit_ha_queues,
0,
self.rabbit_quorum_queue,
exchange.durable))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '
@ -1336,7 +1376,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.amqp_durable_queues,
durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish, exchange, msg,
@ -1358,7 +1398,7 @@ class Connection(object):
exchange = kombu.entity.Exchange(
name=exchange_name,
type='topic',
durable=self.amqp_durable_queues,
durable=self.durable,
auto_delete=self.amqp_auto_delete)
self._ensure_publishing(self._publish_and_creates_default_queue,

View File

@ -0,0 +1,10 @@
---
features:
- |
Adding support for quorum queues. Quorum queues are enabled if the
``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``).
Setting x-queue-type to quorum means that replicated FIFO queue based on
the Raft consensus algorithm will be used. It is available as of
RabbitMQ 3.8.0. If set this option will take priority over
the HA queues (``rabbit_ha_queues``) aka mirrored queues,
in other words HA queues will be ignored.