From f4dce6c37d19060fd426f3ae834e295b286be986 Mon Sep 17 00:00:00 2001 From: Doug Szumski Date: Thu, 27 Jun 2019 14:46:28 +0100 Subject: [PATCH] Support batching metrics when writing to Kafka When a large post (> 10s of MB) is made to the Monasca API an attempt is made to write these metrics to the metrics topic in Kafka. However, due to the large size of the write, this can fail with a number of obscure errors which depend on exactly how much data is written. This change supports splitting the post into chunks so that they can be written to Kafka in sequence. A default has been chosen so that the maximum write to Kafka should be comfortably under 1MB. A future extension could support splitting the post by size, rather than the number of measurements. A better time to look at this may be after the Python Kafka library has been upgraded. Story: 2006059 Task: 34772 Change-Id: I588a9bc0a19cd02ebfb8c0c1742896f208941396 --- monasca_api/conf/kafka.py | 6 +++++- monasca_api/v2/reference/metrics.py | 5 ++++- ...upport-configuring-kafka-post-size-4baa10353e859b8a.yaml | 4 ++++ 3 files changed, 13 insertions(+), 2 deletions(-) create mode 100644 releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml diff --git a/monasca_api/conf/kafka.py b/monasca_api/conf/kafka.py index 95836d441..99e9e6433 100644 --- a/monasca_api/conf/kafka.py +++ b/monasca_api/conf/kafka.py @@ -64,7 +64,11 @@ kafka_opts = [ help='Enable legacy Kafka client. When set old version of ' 'kafka-python library is used. Message format version ' 'for the brokers should be set to 0.9.0.0 to avoid ' - 'performance issues until all consumers are upgraded.') + 'performance issues until all consumers are upgraded.'), + cfg.IntOpt('queue_buffering_max_messages', default=1000, + help='The maximum number of metrics per payload sent to ' + 'Kafka. Posts to the Monasca API which exceed this will ' + 'be chunked into batches not exceeding this number.') ] kafka_group = cfg.OptGroup(name='kafka', title='kafka') diff --git a/monasca_api/v2/reference/metrics.py b/monasca_api/v2/reference/metrics.py index 1d47fcdfd..6d94970f7 100644 --- a/monasca_api/v2/reference/metrics.py +++ b/monasca_api/v2/reference/metrics.py @@ -57,6 +57,7 @@ class Metrics(metrics_api_v2.MetricsV2API): 'metrics') self._metrics_repo = simport.load( cfg.CONF.repositories.metrics_driver)() + self._batch_size = cfg.CONF.kafka.queue_buffering_max_messages except Exception as ex: LOG.exception(ex) @@ -65,7 +66,9 @@ class Metrics(metrics_api_v2.MetricsV2API): def _send_metrics(self, metrics): try: - self._message_queue.send_message(metrics) + for i in range(0, len(metrics), self._batch_size): + batch = metrics[i:i + self._batch_size] + self._message_queue.send_message(batch) except message_queue_exceptions.MessageQueueException as ex: LOG.exception(ex) raise falcon.HTTPServiceUnavailable('Service unavailable', diff --git a/releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml b/releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml new file mode 100644 index 000000000..f941cda09 --- /dev/null +++ b/releasenotes/notes/support-configuring-kafka-post-size-4baa10353e859b8a.yaml @@ -0,0 +1,4 @@ +--- +features: + - A new config option, queue_buffering_max_messages, has been added to + support controlling the size of posts to Kafka from the Monasca API.