diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 2150536a3..b31fb03f9 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -251,6 +251,14 @@ rabbit_opts = [ cfg.StrOpt('processname', default=os.path.basename(sys.argv[0]), help='Process name used by queue manager'), + cfg.BoolOpt('rabbit_stream_fanout', + default=False, + help='Use stream queues in RabbitMQ (x-queue-type: stream). ' + 'The stream queue is a modern queue type for RabbitMQ ' + 'implementing a durable, replicated FIFO queue based on the ' + 'Raft consensus algorithm. It is available as of ' + 'RabbitMQ 3.8.0. If set this option will replace all fanout ' + 'queues with only one stream queue.'), ] LOG = logging.getLogger(__name__) @@ -258,7 +266,8 @@ LOG = logging.getLogger(__name__) def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, - rabbit_quorum_queue_config): + rabbit_quorum_queue_config, + rabbit_stream_fanout): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -295,11 +304,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue_config: Quorum queues provides three options to handle message poisoning - and limit the resources the qourum queue can use + and limit the resources the quorum queue can use x-delivery-limit number of times the queue will try to deliver a message before it decide to discard it x-max-in-memory-length, x-max-in-memory-bytes control the size of memory used by quorum queue + + If the rabbit_stream_fanout option is set, fanout queues are going to use + stream instead of quorum queues. See here: + https://www.rabbitmq.com/streams.html """ args = {} @@ -326,6 +339,12 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 + if rabbit_stream_fanout: + args = {'x-queue-type': 'stream'} + if rabbit_queue_ttl > 0: + # max-age is a string + args['x-max-age'] = f"{rabbit_queue_ttl}s" + return args @@ -352,7 +371,8 @@ class Consumer(object): exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, enable_cancel_on_failover=False, rabbit_quorum_queue=False, - rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): + rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0), + rabbit_stream_fanout=False): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -368,7 +388,7 @@ class Consumer(object): rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} self.queue_arguments = _get_queue_arguments( rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, - rabbit_quorum_queue_config) + rabbit_quorum_queue_config, rabbit_stream_fanout) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -657,6 +677,7 @@ class Connection(object): driver_conf) self.rabbit_transient_quorum_queue = \ driver_conf.rabbit_transient_quorum_queue + self.rabbit_stream_fanout = driver_conf.rabbit_stream_fanout self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -676,6 +697,17 @@ class Connection(object): self.enable_cancel_on_failover = driver_conf.enable_cancel_on_failover self.use_queue_manager = driver_conf.use_queue_manager + if self.rabbit_stream_fanout and self.rabbit_qos_prefetch_count <= 0: + raise RuntimeError('Configuration Error: rabbit_stream_fanout ' + 'need rabbit_qos_prefetch_count to be set to ' + 'a value greater than 0.') + + if (self.rabbit_stream_fanout and not + self.rabbit_transient_quorum_queue): + raise RuntimeError('Configuration Error: rabbit_stream_fanout ' + 'need rabbit_transient_quorum_queue to be set ' + 'to true.') + if self.heartbeat_in_pthread: # NOTE(hberaud): Experimental: threading module is in use to run # the rabbitmq health check heartbeat. in some situation like @@ -1121,11 +1153,20 @@ class Connection(object): """Close/release this connection.""" self._heartbeat_stop() if self.connection: - for consumer in filter(lambda c: c.type == 'fanout', - self._consumers): - LOG.debug('[connection close] Deleting fanout ' - 'queue: %s ' % consumer.queue.name) - consumer.queue.delete() + # NOTE(jcosmao) Delete queue should be called only when queue name + # is randomized. When using streams, queue is shared between + # all consumers, thus deleting fanout queue will force all other + # consumers to disconnect/reconnect by throwing + # amqp.exceptions.ConsumerCancelled. + # When using QManager, queue name is consistent accross agent + # restart, so we don't need to delete it either. Deletion must be + # handled by expiration policy. + if not self.rabbit_stream_fanout and not self.use_queue_manager: + for consumer in filter(lambda c: c.type == 'fanout', + self._consumers): + LOG.debug('[connection close] Deleting fanout ' + 'queue: %s ' % consumer.queue.name) + consumer.queue.delete() self._set_current_channel(None) self.connection.release() self.connection = None @@ -1371,7 +1412,7 @@ class Connection(object): queue_name=topic, routing_key='', type='direct', - durable=False, + durable=self.rabbit_transient_quorum_queue, exchange_auto_delete=False, queue_auto_delete=False, callback=callback, @@ -1405,20 +1446,26 @@ class Connection(object): def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer.""" - if self._q_manager: - unique = self._q_manager.get() - else: - unique = uuid.uuid4().hex exchange_name = '%s_fanout' % topic - queue_name = '%s_fanout_%s' % (topic, unique) + if self.rabbit_stream_fanout: + queue_name = '%s_fanout' % topic + else: + if self._q_manager: + unique = self._q_manager.get() + else: + unique = uuid.uuid4().hex + queue_name = '%s_fanout_%s' % (topic, unique) LOG.info('Creating fanout queue: %s', queue_name) + is_durable = (self.rabbit_transient_quorum_queue or + self.rabbit_stream_fanout) + consumer = Consumer( exchange_name=exchange_name, queue_name=queue_name, routing_key=topic, type='fanout', - durable=self.rabbit_transient_quorum_queue, + durable=is_durable, exchange_auto_delete=True, queue_auto_delete=False, callback=callback, @@ -1426,7 +1473,8 @@ class Connection(object): rabbit_queue_ttl=self.rabbit_transient_queues_ttl, enable_cancel_on_failover=self.enable_cancel_on_failover, rabbit_quorum_queue=self.rabbit_transient_quorum_queue, - rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config, + rabbit_stream_fanout=self.rabbit_stream_fanout) self.declare_consumer(consumer) @@ -1533,7 +1581,8 @@ class Connection(object): self.rabbit_ha_queues, 0, self.rabbit_quorum_queue, - self.rabbit_quorum_queue_config)) + self.rabbit_quorum_queue_config, + False)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' diff --git a/releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml b/releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml new file mode 100644 index 000000000..29804f19f --- /dev/null +++ b/releasenotes/notes/stream-c3dd31ee98f6bbd7.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add an option to use stream queues for rabbitmq driver instead of fanouts.