Merge "Add an option to use rabbitmq stream for fanout queues"

This commit is contained in:
Zuul 2024-01-19 15:24:44 +00:00 committed by Gerrit Code Review
commit a417b425a0
2 changed files with 71 additions and 18 deletions

View File

@ -251,6 +251,14 @@ rabbit_opts = [
cfg.StrOpt('processname',
default=os.path.basename(sys.argv[0]),
help='Process name used by queue manager'),
cfg.BoolOpt('rabbit_stream_fanout',
default=False,
help='Use stream queues in RabbitMQ (x-queue-type: stream). '
'The stream queue is a modern queue type for RabbitMQ '
'implementing a durable, replicated FIFO queue based on the '
'Raft consensus algorithm. It is available as of '
'RabbitMQ 3.8.0. If set this option will replace all fanout '
'queues with only one stream queue.'),
]
LOG = logging.getLogger(__name__)
@ -258,7 +266,8 @@ LOG = logging.getLogger(__name__)
def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue,
rabbit_quorum_queue_config):
rabbit_quorum_queue_config,
rabbit_stream_fanout):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we try to declare a mirrored queue
@ -295,11 +304,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
rabbit_quorum_queue_config:
Quorum queues provides three options to handle message poisoning
and limit the resources the qourum queue can use
and limit the resources the quorum queue can use
x-delivery-limit number of times the queue will try to deliver
a message before it decide to discard it
x-max-in-memory-length, x-max-in-memory-bytes control the size
of memory used by quorum queue
If the rabbit_stream_fanout option is set, fanout queues are going to use
stream instead of quorum queues. See here:
https://www.rabbitmq.com/streams.html
"""
args = {}
@ -326,6 +339,12 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl,
if rabbit_queue_ttl > 0:
args['x-expires'] = rabbit_queue_ttl * 1000
if rabbit_stream_fanout:
args = {'x-queue-type': 'stream'}
if rabbit_queue_ttl > 0:
# max-age is a string
args['x-max-age'] = f"{rabbit_queue_ttl}s"
return args
@ -352,7 +371,8 @@ class Consumer(object):
exchange_auto_delete, queue_auto_delete, callback,
nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0,
enable_cancel_on_failover=False, rabbit_quorum_queue=False,
rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)):
rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0),
rabbit_stream_fanout=False):
"""Init the Consumer class with the exchange_name, routing_key,
type, durable auto_delete
"""
@ -368,7 +388,7 @@ class Consumer(object):
rabbit_quorum_queue_config = rabbit_quorum_queue_config or {}
self.queue_arguments = _get_queue_arguments(
rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue,
rabbit_quorum_queue_config)
rabbit_quorum_queue_config, rabbit_stream_fanout)
self.queue = None
self._declared_on = None
self.exchange = kombu.entity.Exchange(
@ -657,6 +677,7 @@ class Connection(object):
driver_conf)
self.rabbit_transient_quorum_queue = \
driver_conf.rabbit_transient_quorum_queue
self.rabbit_stream_fanout = driver_conf.rabbit_stream_fanout
self.rabbit_transient_queues_ttl = \
driver_conf.rabbit_transient_queues_ttl
self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count
@ -676,6 +697,17 @@ class Connection(object):
self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover
self.use_queue_manager = driver_conf.use_queue_manager
if self.rabbit_stream_fanout and self.rabbit_qos_prefetch_count <= 0:
raise RuntimeError('Configuration Error: rabbit_stream_fanout '
'need rabbit_qos_prefetch_count to be set to '
'a value greater than 0.')
if (self.rabbit_stream_fanout and not
self.rabbit_transient_quorum_queue):
raise RuntimeError('Configuration Error: rabbit_stream_fanout '
'need rabbit_transient_quorum_queue to be set '
'to true.')
if self.heartbeat_in_pthread:
# NOTE(hberaud): Experimental: threading module is in use to run
# the rabbitmq health check heartbeat. in some situation like
@ -1121,11 +1153,20 @@ class Connection(object):
"""Close/release this connection."""
self._heartbeat_stop()
if self.connection:
for consumer in filter(lambda c: c.type == 'fanout',
self._consumers):
LOG.debug('[connection close] Deleting fanout '
'queue: %s ' % consumer.queue.name)
consumer.queue.delete()
# NOTE(jcosmao) Delete queue should be called only when queue name
# is randomized. When using streams, queue is shared between
# all consumers, thus deleting fanout queue will force all other
# consumers to disconnect/reconnect by throwing
# amqp.exceptions.ConsumerCancelled.
# When using QManager, queue name is consistent accross agent
# restart, so we don't need to delete it either. Deletion must be
# handled by expiration policy.
if not self.rabbit_stream_fanout and not self.use_queue_manager:
for consumer in filter(lambda c: c.type == 'fanout',
self._consumers):
LOG.debug('[connection close] Deleting fanout '
'queue: %s ' % consumer.queue.name)
consumer.queue.delete()
self._set_current_channel(None)
self.connection.release()
self.connection = None
@ -1371,7 +1412,7 @@ class Connection(object):
queue_name=topic,
routing_key='',
type='direct',
durable=False,
durable=self.rabbit_transient_quorum_queue,
exchange_auto_delete=False,
queue_auto_delete=False,
callback=callback,
@ -1405,20 +1446,26 @@ class Connection(object):
def declare_fanout_consumer(self, topic, callback):
"""Create a 'fanout' consumer."""
if self._q_manager:
unique = self._q_manager.get()
else:
unique = uuid.uuid4().hex
exchange_name = '%s_fanout' % topic
queue_name = '%s_fanout_%s' % (topic, unique)
if self.rabbit_stream_fanout:
queue_name = '%s_fanout' % topic
else:
if self._q_manager:
unique = self._q_manager.get()
else:
unique = uuid.uuid4().hex
queue_name = '%s_fanout_%s' % (topic, unique)
LOG.info('Creating fanout queue: %s', queue_name)
is_durable = (self.rabbit_transient_quorum_queue or
self.rabbit_stream_fanout)
consumer = Consumer(
exchange_name=exchange_name,
queue_name=queue_name,
routing_key=topic,
type='fanout',
durable=self.rabbit_transient_quorum_queue,
durable=is_durable,
exchange_auto_delete=True,
queue_auto_delete=False,
callback=callback,
@ -1426,7 +1473,8 @@ class Connection(object):
rabbit_queue_ttl=self.rabbit_transient_queues_ttl,
enable_cancel_on_failover=self.enable_cancel_on_failover,
rabbit_quorum_queue=self.rabbit_transient_quorum_queue,
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config)
rabbit_quorum_queue_config=self.rabbit_quorum_queue_config,
rabbit_stream_fanout=self.rabbit_stream_fanout)
self.declare_consumer(consumer)
@ -1533,7 +1581,8 @@ class Connection(object):
self.rabbit_ha_queues,
0,
self.rabbit_quorum_queue,
self.rabbit_quorum_queue_config))
self.rabbit_quorum_queue_config,
False))
log_info = {'key': routing_key, 'exchange': exchange}
LOG.trace(
'Connection._publish_and_creates_default_queue: '

View File

@ -0,0 +1,4 @@
---
features:
- |
Add an option to use stream queues for rabbitmq driver instead of fanouts.