diff --git a/monasca_api/conf/kafka.py b/monasca_api/conf/kafka.py index 95836d441..99e9e6433 100644 --- a/monasca_api/conf/kafka.py +++ b/monasca_api/conf/kafka.py @@ -64,7 +64,11 @@ kafka_opts = [ help='Enable legacy Kafka client. When set old version of ' 'kafka-python library is used. Message format version ' 'for the brokers should be set to 0.9.0.0 to avoid ' - 'performance issues until all consumers are upgraded.') + 'performance issues until all consumers are upgraded.'), + cfg.IntOpt('queue_buffering_max_messages', default=1000, + help='The maximum number of metrics per payload sent to ' + 'Kafka. Posts to the Monasca API which exceed this will ' + 'be chunked into batches not exceeding this number.') ] kafka_group = cfg.OptGroup(name='kafka', title='kafka') diff --git a/monasca_api/v2/reference/metrics.py b/monasca_api/v2/reference/metrics.py index 1d47fcdfd..6d94970f7 100644 --- a/monasca_api/v2/reference/metrics.py +++ b/monasca_api/v2/reference/metrics.py @@ -57,6 +57,7 @@ class Metrics(metrics_api_v2.MetricsV2API): 'metrics') self._metrics_repo = simport.load( cfg.CONF.repositories.metrics_driver)() + self._batch_size = cfg.CONF.kafka.queue_buffering_max_messages except Exception as ex: LOG.exception(ex) @@ -65,7 +66,9 @@ class Metrics(metrics_api_v2.MetricsV2API): def _send_metrics(self, metrics): try: - self._message_queue.send_message(metrics) + for i in range(0, len(metrics), self._batch_size): + batch = metrics[i:i + self._batch_size] + self._message_queue.send_message(batch) except message_queue_exceptions.MessageQueueException as ex: LOG.exception(ex) raise falcon.HTTPServiceUnavailable('Service unavailable', diff --git a/releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml b/releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml new file mode 100644 index 000000000..f941cda09 --- /dev/null +++ b/releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml @@ -0,0 +1,4 @@ +--- +features: + - A new config option, queue_buffering_max_messages, has been added to + support controlling the size of posts to Kafka from the Monasca API.