From e95f334459d4dfd3778ec9e84d716f69f2c08ad5 Mon Sep 17 00:00:00 2001 From: "julien.cosmao" Date: Tue, 25 Jul 2023 15:42:22 +0200 Subject: [PATCH] Add an option to use rabbitmq stream for fanout queues This is introducing the "stream" queues for fanout so all components relying on fanout can use the same stream, lowering the number of queues needed and leveraging the new "stream" type of queues from rabbitmq. Closes-Bug: #2031497 Change-Id: I5056a19aada9143bcd80aaf064ced8cad441e6eb Signed-off-by: Arnaud Morin --- oslo_messaging/_drivers/impl_rabbit.py | 85 +++++++++++++++---- .../notes/stream-c3dd31ee98f6bbd7.yaml | 4 + 2 files changed, 71 insertions(+), 18 deletions(-) create mode 100644 releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 2150536a3..b31fb03f9 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -251,6 +251,14 @@ rabbit_opts = [ cfg.StrOpt('processname', default=os.path.basename(sys.argv[0]), help='Process name used by queue manager'), + cfg.BoolOpt('rabbit_stream_fanout', + default=False, + help='Use stream queues in RabbitMQ (x-queue-type: stream). ' + 'The stream queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will replace all fanout ' + 'queues with only one stream queue.'), ] LOG = logging.getLogger(__name__) @@ -258,7 +266,8 @@ LOG = logging.getLogger(__name__) def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, - rabbit_quorum_queue_config): + rabbit_quorum_queue_config, + rabbit_stream_fanout): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -295,11 +304,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue_config: Quorum queues provides three options to handle message poisoning - and limit the resources the qourum queue can use + and limit the resources the quorum queue can use x-delivery-limit number of times the queue will try to deliver a message before it decide to discard it x-max-in-memory-length, x-max-in-memory-bytes control the size of memory used by quorum queue + + If the rabbit_stream_fanout option is set, fanout queues are going to use + stream instead of quorum queues. See here: + https://www.rabbitmq.com/streams.html """ args = {} @@ -326,6 +339,12 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 + if rabbit_stream_fanout: + args = {'x-queue-type': 'stream'} + if rabbit_queue_ttl > 0: + # max-age is a string + args['x-max-age'] = f"{rabbit_queue_ttl}s" + return args @@ -352,7 +371,8 @@ class Consumer(object): exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, enable_cancel_on_failover=False, rabbit_quorum_queue=False, - rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): + rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0), + rabbit_stream_fanout=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -368,7 +388,7 @@ class Consumer(object): rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} self.queue_arguments = _get_queue_arguments( rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, - rabbit_quorum_queue_config) + rabbit_quorum_queue_config, rabbit_stream_fanout) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -657,6 +677,7 @@ class Connection(object): driver_conf) self.rabbit_transient_quorum_queue = \ driver_conf.rabbit_transient_quorum_queue + self.rabbit_stream_fanout = driver_conf.rabbit_stream_fanout self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -676,6 +697,17 @@ class Connection(object): self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover self.use_queue_manager = driver_conf.use_queue_manager + if self.rabbit_stream_fanout and self.rabbit_qos_prefetch_count <= 0: + raise RuntimeError('Configuration Error: rabbit_stream_fanout ' + 'need rabbit_qos_prefetch_count to be set to ' + 'a value greater than 0.') + + if (self.rabbit_stream_fanout and not + self.rabbit_transient_quorum_queue): + raise RuntimeError('Configuration Error: rabbit_stream_fanout ' + 'need rabbit_transient_quorum_queue to be set ' + 'to true.') + if self.heartbeat_in_pthread: # NOTE(hberaud): Experimental: threading module is in use to run # the rabbitmq health check heartbeat. in some situation like @@ -1121,11 +1153,20 @@ class Connection(object): """Close/release this connection.""" self._heartbeat_stop() if self.connection: - for consumer in filter(lambda c: c.type == 'fanout', - self._consumers): - LOG.debug('[connection close] Deleting fanout ' - 'queue: %s ' % consumer.queue.name) - consumer.queue.delete() + # NOTE(jcosmao) Delete queue should be called only when queue name + # is randomized. When using streams, queue is shared between + # all consumers, thus deleting fanout queue will force all other + # consumers to disconnect/reconnect by throwing + # amqp.exceptions.ConsumerCancelled. + # When using QManager, queue name is consistent accross agent + # restart, so we don't need to delete it either. Deletion must be + # handled by expiration policy. + if not self.rabbit_stream_fanout and not self.use_queue_manager: + for consumer in filter(lambda c: c.type == 'fanout', + self._consumers): + LOG.debug('[connection close] Deleting fanout ' + 'queue: %s ' % consumer.queue.name) + consumer.queue.delete() self._set_current_channel(None) self.connection.release() self.connection = None @@ -1371,7 +1412,7 @@ class Connection(object): queue_name=topic, routing_key='', type='direct', - durable=False, + durable=self.rabbit_transient_quorum_queue, exchange_auto_delete=False, queue_auto_delete=False, callback=callback, @@ -1405,20 +1446,26 @@ class Connection(object): def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer.""" - if self._q_manager: - unique = self._q_manager.get() - else: - unique = uuid.uuid4().hex exchange_name = '%s_fanout' % topic - queue_name = '%s_fanout_%s' % (topic, unique) + if self.rabbit_stream_fanout: + queue_name = '%s_fanout' % topic + else: + if self._q_manager: + unique = self._q_manager.get() + else: + unique = uuid.uuid4().hex + queue_name = '%s_fanout_%s' % (topic, unique) LOG.info('Creating fanout queue: %s', queue_name) + is_durable = (self.rabbit_transient_quorum_queue or + self.rabbit_stream_fanout) + consumer = Consumer( exchange_name=exchange_name, queue_name=queue_name, routing_key=topic, type='fanout', - durable=self.rabbit_transient_quorum_queue, + durable=is_durable, exchange_auto_delete=True, queue_auto_delete=False, callback=callback, @@ -1426,7 +1473,8 @@ class Connection(object): rabbit_queue_ttl=self.rabbit_transient_queues_ttl, enable_cancel_on_failover=self.enable_cancel_on_failover, rabbit_quorum_queue=self.rabbit_transient_quorum_queue, - rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config, + rabbit_stream_fanout=self.rabbit_stream_fanout) self.declare_consumer(consumer) @@ -1533,7 +1581,8 @@ class Connection(object): self.rabbit_ha_queues, 0, self.rabbit_quorum_queue, - self.rabbit_quorum_queue_config)) + self.rabbit_quorum_queue_config, + False)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' diff --git a/releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml b/releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml new file mode 100644 index 000000000..29804f19f --- /dev/null +++ b/releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add an option to use stream queues for rabbitmq driver instead of fanouts.