diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 9d99822d5..a7161ed17 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -149,6 +149,16 @@ rabbit_opts = [ 'nodes, run: ' """\"rabbitmqctl set_policy HA '^(?!amq\\.).*' """ """'{"ha-mode": "all"}' \""""), + cfg.BoolOpt('rabbit_quorum_queue', + default=False, + help='Try to use quorum queues in RabbitMQ ' + '(x-queue-type: quorum). ' + 'The quorum queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will take priority over ' + 'the HA queues (``rabbit_ha_queues``) aka mirrored queues, ' + 'in other words it will disable HA queues.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -191,7 +201,8 @@ rabbit_opts = [ LOG = logging.getLogger(__name__) -def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): +def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, + rabbit_quorum_queue): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -214,12 +225,26 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl): Setting a queue TTL causes the queue to be automatically deleted if it is unused for the TTL duration. This is a helpful safeguard to prevent queues with zero consumers from growing without bound. + + If the rabbit_quorum_queue option is set, we try to declare a mirrored + queue as described here: + + https://www.rabbitmq.com/quorum-queues.html + + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will take priority over + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues will be ignored. """ args = {} - if rabbit_ha_queues: + if not rabbit_quorum_queue and rabbit_ha_queues: args['x-ha-policy'] = 'all' + if rabbit_quorum_queue: + args['x-queue-type'] = 'quorum' + if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -248,7 +273,7 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False): + rabbit_quorum_queue=None, enable_cancel_on_failover=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -262,7 +287,8 @@ class Consumer(object): self.type = type self.nowait = nowait self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl) + rabbit_queue_ttl, + rabbit_quorum_queue) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -475,6 +501,7 @@ class Connection(object): self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues + self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -682,6 +709,9 @@ class Connection(object): except KeyError: raise RuntimeError("Invalid SSL version : %s" % version) + def _durable(self): + return self.rabbit_quorum_queue or self.amqp_durable_queues + # NOTE(moguimar): default_password in this function's context is just # a fallback option, not a hardcoded password. def _transform_transport_url(self, url, host, default_username='', # nosec @@ -1145,12 +1175,13 @@ class Connection(object): queue_name=topic, routing_key='', type='direct', - durable=False, + durable=self._durable(), exchange_auto_delete=False, queue_auto_delete=False, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + rabbit_quorum_queue=self.rabbit_quorum_queue, enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1163,11 +1194,12 @@ class Connection(object): queue_name=queue_name or topic, routing_key=topic, type='topic', - durable=self.amqp_durable_queues, + durable=self._durable(), exchange_auto_delete=self.amqp_auto_delete, queue_auto_delete=self.amqp_auto_delete, callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, + rabbit_quorum_queue=self.rabbit_quorum_queue, enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1190,6 +1222,7 @@ class Connection(object): callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, rabbit_queue_ttl=self.rabbit_transient_queues_ttl, + rabbit_quorum_queue=self.rabbit_quorum_queue, enable_cancel_on_failover=self.enable_cancel_on_failover) self.declare_consumer(consumer) @@ -1280,7 +1313,10 @@ class Connection(object): auto_delete=exchange.auto_delete, name=routing_key, routing_key=routing_key, - queue_arguments=_get_queue_arguments(self.rabbit_ha_queues, 0)) + queue_arguments=_get_queue_arguments( + self.rabbit_ha_queues, + 0, + self.rabbit_quorum_queue)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' @@ -1336,7 +1372,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self._durable(), auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish, exchange, msg, @@ -1358,7 +1394,7 @@ class Connection(object): exchange = kombu.entity.Exchange( name=exchange_name, type='topic', - durable=self.amqp_durable_queues, + durable=self._durable(), auto_delete=self.amqp_auto_delete) self._ensure_publishing(self._publish_and_creates_default_queue, diff --git a/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml new file mode 100644 index 000000000..1e92c7aca --- /dev/null +++ b/releasenotes/notes/adding_support_for_quorum_queues-3101d055b492289e.yaml @@ -0,0 +1,10 @@ +--- +features: + - | + Adding support for quorum queues. Quorum queues are enabled if the + ``rabbit_quorum_queue`` parameter is sets (``x-queue-type: quorum``). + Setting x-queue-type to quorum means that replicated FIFO queue based on + the Raft consensus algorithm will be used. It is available as of + RabbitMQ 3.8.0. If set this option will take priority over + the HA queues (``rabbit_ha_queues``) aka mirrored queues, + in other words HA queues will be ignored.