diff --git a/doc/source/admin/rabbit.rst b/doc/source/admin/rabbit.rst index bf91e47c5..23db3525e 100644 --- a/doc/source/admin/rabbit.rst +++ b/doc/source/admin/rabbit.rst @@ -241,6 +241,9 @@ Consuming Options - :oslo.config:option:`oslo_messaging_rabbit.rabbit_ha_queues` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_queue` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quorum_delivery_limit` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_length` +- :oslo.config:option:`oslo_messaging_rabbit.rabbit_quroum_max_memory_bytes` - :oslo.config:option:`oslo_messaging_rabbit.rabbit_transient_queues_ttl` Connection Options diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ae26f3d35..6f737c458 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -12,6 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. +import collections import contextlib import errno import functools @@ -45,6 +46,13 @@ from oslo_messaging._drivers import pool from oslo_messaging import _utils from oslo_messaging import exceptions + +# The QuorumMemConfig will hold the quorum queue memory configurations +QuorumMemConfig = collections.namedtuple('QuorumMemConfig', + 'delivery_limit' + ' max_memory_length' + ' max_memory_bytes') + # NOTE(sileht): don't exist in py2 socket module TCP_USER_TIMEOUT = 18 @@ -147,6 +155,30 @@ rabbit_opts = [ 'in other words the HA queues should be disabled, quorum ' 'queues durable by default so the amqp_durable_queues ' 'opion is ignored when this option enabled.'), + cfg.IntOpt('rabbit_quorum_delivery_limit', + default=0, + help='Each time a message is redelivered to a consumer, ' + 'a counter is incremented. Once the redelivery count ' + 'exceeds the delivery limit the message gets dropped ' + 'or dead-lettered (if a DLX exchange has been configured) ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_length', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of messages in the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), + cfg.IntOpt('rabbit_quroum_max_memory_bytes', + default=0, + help='By default all messages are maintained in memory ' + 'if a quorum queue grows in length it can put memory ' + 'pressure on a cluster. This option can limit the number ' + 'of memory bytes used by the quorum queue. ' + 'Used only when rabbit_quorum_queue is enabled, ' + 'Default 0 which means dont set a limit.'), cfg.IntOpt('rabbit_transient_queues_ttl', min=1, default=1800, @@ -190,7 +222,8 @@ LOG = logging.getLogger(__name__) def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, - rabbit_quorum_queue): + rabbit_quorum_queue, + rabbit_quorum_queue_config): """Construct the arguments for declaring a queue. If the rabbit_ha_queues option is set, we try to declare a mirrored queue @@ -224,6 +257,14 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, RabbitMQ 3.8.0. If set this option will conflict with the HA queues (``rabbit_ha_queues``) aka mirrored queues, in other words HA queues should be disabled. + + rabbit_quorum_queue_config: + Quorum queues provides three options to handle message poisoning + and limit the resources the qourum queue can use + x-delivery-limit number of times the queue will try to deliver + a message before it decide to discard it + x-max-in-memory-length, x-max-in-memory-bytes control the size + of memory used by quorum queue """ args = {} @@ -237,6 +278,15 @@ def _get_queue_arguments(rabbit_ha_queues, rabbit_queue_ttl, if rabbit_quorum_queue: args['x-queue-type'] = 'quorum' + if rabbit_quorum_queue_config.delivery_limit: + args['x-delivery-limit'] = \ + rabbit_quorum_queue_config.delivery_limit + if rabbit_quorum_queue_config.max_memory_length: + args['x-max-in-memory-length'] = \ + rabbit_quorum_queue_config.max_memory_length + if rabbit_quorum_queue_config.max_memory_bytes: + args['x-max-in-memory-bytes'] = \ + rabbit_quorum_queue_config.max_memory_bytes if rabbit_queue_ttl > 0: args['x-expires'] = rabbit_queue_ttl * 1000 @@ -266,7 +316,8 @@ class Consumer(object): def __init__(self, exchange_name, queue_name, routing_key, type, durable, exchange_auto_delete, queue_auto_delete, callback, nowait=False, rabbit_ha_queues=None, rabbit_queue_ttl=0, - enable_cancel_on_failover=False, rabbit_quorum_queue=False): + enable_cancel_on_failover=False, rabbit_quorum_queue=False, + rabbit_quorum_queue_config=QuorumMemConfig(0, 0, 0)): """Init the Consumer class with the exchange_name, routing_key, type, durable auto_delete """ @@ -279,9 +330,10 @@ class Consumer(object): self.callback = callback self.type = type self.nowait = nowait - self.queue_arguments = _get_queue_arguments(rabbit_ha_queues, - rabbit_queue_ttl, - rabbit_quorum_queue) + rabbit_quorum_queue_config = rabbit_quorum_queue_config or {} + self.queue_arguments = _get_queue_arguments( + rabbit_ha_queues, rabbit_queue_ttl, rabbit_quorum_queue, + rabbit_quorum_queue_config) self.queue = None self._declared_on = None self.exchange = kombu.entity.Exchange( @@ -496,6 +548,8 @@ class Connection(object): self.login_method = driver_conf.rabbit_login_method self.rabbit_ha_queues = driver_conf.rabbit_ha_queues self.rabbit_quorum_queue = driver_conf.rabbit_quorum_queue + self.rabbit_quorum_queue_config = self._get_quorum_configurations( + driver_conf) self.rabbit_transient_queues_ttl = \ driver_conf.rabbit_transient_queues_ttl self.rabbit_qos_prefetch_count = driver_conf.rabbit_qos_prefetch_count @@ -709,6 +763,14 @@ class Connection(object): except KeyError: raise RuntimeError("Invalid SSL version : %s" % version) + def _get_quorum_configurations(self, driver_conf): + """Get the quorum queue configurations""" + delivery_limit = driver_conf.rabbit_quorum_delivery_limit + max_memory_length = driver_conf.rabbit_quroum_max_memory_length + max_memory_bytes = driver_conf.rabbit_quroum_max_memory_bytes + return QuorumMemConfig(delivery_limit, max_memory_length, + max_memory_bytes) + # NOTE(moguimar): default_password in this function's context is just # a fallback option, not a hardcoded password. def _transform_transport_url(self, url, host, default_username='', # nosec @@ -1202,7 +1264,8 @@ class Connection(object): callback=callback, rabbit_ha_queues=self.rabbit_ha_queues, enable_cancel_on_failover=self.enable_cancel_on_failover, - rabbit_quorum_queue=self.rabbit_quorum_queue) + rabbit_quorum_queue=self.rabbit_quorum_queue, + rabbit_quorum_queue_config=self.rabbit_quorum_queue_config) self.declare_consumer(consumer) @@ -1317,7 +1380,8 @@ class Connection(object): queue_arguments=_get_queue_arguments( self.rabbit_ha_queues, 0, - self.rabbit_quorum_queue)) + self.rabbit_quorum_queue, + self.rabbit_quorum_queue_config)) log_info = {'key': routing_key, 'exchange': exchange} LOG.trace( 'Connection._publish_and_creates_default_queue: ' diff --git a/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml new file mode 100644 index 000000000..42fdfbf6d --- /dev/null +++ b/releasenotes/notes/add-quorum-control-configurations-beed79811ff97ba2.yaml @@ -0,0 +1,6 @@ +--- +features: + - | + Add quorum configuration x-max-in-memory-length, + x-max-in-memory-bytes, x-delivery-limit which control the quorum + queue memory usage and handle the message poisoning problem