From a76a7459785d62e8473b233ef104650f9e6a75cc Mon Sep 17 00:00:00 2001 From: Witek Bedyk Date: Mon, 18 Nov 2019 11:13:50 +0100 Subject: [PATCH] Set maximum buffer size for Kafka producer The change sets queue.buffering.max.messages configuration option for Kafka producer effectively limiting the number of messages in the buffer before sending them to Apache Kafka. Depends-On: https://review.opendev.org/694738 Change-Id: I6ebd4e21e9d55d1ac836e92dd8bf02a678170c68 Story: 2006059 Task: 37532 --- monasca_api/common/messaging/kafka_publisher.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/monasca_api/common/messaging/kafka_publisher.py b/monasca_api/common/messaging/kafka_publisher.py index 1cb28eba9..21eb9d250 100644 --- a/monasca_api/common/messaging/kafka_publisher.py +++ b/monasca_api/common/messaging/kafka_publisher.py @@ -20,7 +20,6 @@ from oslo_log import log from monasca_api.common.messaging import exceptions from monasca_api.common.messaging import publisher - LOG = log.getLogger(__name__) @@ -44,8 +43,10 @@ class KafkaPublisher(publisher.Publisher): self.partitions = cfg.CONF.kafka.partitions self.drop_data = cfg.CONF.kafka.drop_data + config = {'queue.buffering.max.messages': + cfg.CONF.kafka.queue_buffering_max_messages} self._producer = client_factory.get_kafka_producer( - self.uri, cfg.CONF.kafka.legacy_kafka_client_enabled) + self.uri, cfg.CONF.kafka.legacy_kafka_client_enabled, **config) def close(self): pass