Files
monasca-agent/monasca_agent/forwarder/api/monasca_api.py
Craig Bryant 50824d1170 Add max batch size for writing to API
Add configurable maximum size of batches of measumrement to write
to Monasca API. Prevents killing Monasca API in memory limited
configurations. The default is no limit.

Change-Id: I2bf84501cc51c24843d7c3befd8f9dd42f010f0c
Story: 2001434
2018-01-03 22:53:53 -07:00

187 lines
7.1 KiB
Python

# (C) Copyright 2015-2016,2018 Hewlett Packard Enterprise Development LP
# Copyright 2017 Fujitsu LIMITED
import collections
import copy
import json
import logging
from osc_lib import exceptions
from monascaclient import client
from monasca_agent.common import keystone
log = logging.getLogger(__name__)
class MonascaAPI(object):
"""Sends measurements to MonascaAPI
Any errors should raise an exception so the transaction calling
this is not committed
"""
LOG_INTERVAL = 10 # messages
def __init__(self, config):
"""Initialize Mon api client connection."""
self._config = config
self._mon_client = None
self._api_version = '2_0'
self._failure_reason = None
self._log_interval_remaining = 1
self._current_number_measurements = 0
self._max_buffer_size = int(config['max_buffer_size'])
self._max_batch_size = int(config['max_batch_size'])
self._max_measurement_buffer_size = int(
config['max_measurement_buffer_size'])
if self._max_buffer_size > -1:
log.debug("'max_buffer_size' is deprecated. Please use"
" 'max_measurement_buffer_size' instead")
if self._max_measurement_buffer_size > -1:
log.debug("Overriding 'max_buffer_size' option with new"
" 'max_measurment_buffer_size' option")
self._max_buffer_size = -1
self.backlog_send_rate = int(config['backlog_send_rate'])
if self._max_buffer_size > -1:
self.message_queue = collections.deque(maxlen=self._max_buffer_size)
else:
self.message_queue = collections.deque()
self.write_timeout = int(config['write_timeout'])
# 'amplifier' is completely optional and may not exist in the config
try:
self.amplifier = int(config['amplifier'])
except KeyError:
self.amplifier = None
def _post(self, measurements, tenant=None):
"""Does the actual http post
measurements is a list of Measurement
"""
kwargs = {
'jsonbody': measurements
}
if tenant:
kwargs['tenant_id'] = tenant
if not self._mon_client:
self._mon_client = self._get_mon_client()
if not self._mon_client:
# Keystone is down, queue the message
self._queue_message(kwargs.copy(), "Keystone API is down or unreachable")
return
if self._send_message(**kwargs):
if len(self.message_queue) > 0:
messages_sent = 0
for index in range(0, len(self.message_queue)):
if index < self.backlog_send_rate:
msg = json.loads(self.message_queue.pop())
if self._send_message(**msg):
messages_sent += 1
for value in msg.values():
self._current_number_measurements -= len(value)
else:
self._queue_message(msg, self._failure_reason)
break
else:
break
log.info("Sent {0} messages from the backlog.".format(messages_sent))
log.info("{0} messages remaining in the queue.".format(len(self.message_queue)))
self._log_interval_remaining = 0
else:
self._queue_message(kwargs.copy(), self._failure_reason)
def post_metrics(self, measurements):
"""post_metrics
given [Measurement, ...], format the request and post to
the monitoring api
"""
# Add default dimensions
for envelope in measurements:
measurement = envelope['measurement']
if isinstance(measurement['dimensions'], list):
measurement['dimensions'] = dict([(d[0], d[1]) for d in measurement['dimensions']])
# Split out separate POSTs for each delegated tenant (includes 'None')
tenant_group = {}
for envelope in measurements:
measurement = envelope['measurement']
tenant = envelope['tenant_id']
tenant_group.setdefault(tenant, []).append(copy.deepcopy(measurement))
if self._max_batch_size and len(tenant_group[tenant]) >= self._max_batch_size:
self._post(tenant_group[tenant], tenant)
del tenant_group[tenant]
for tenant in tenant_group:
self._post(tenant_group[tenant], tenant)
def _get_mon_client(self):
k = keystone.Keystone(self._config)
endpoint = k.get_monasca_url()
session = k.get_session()
c = client.Client(
api_version=self._api_version,
endpoint=endpoint,
session=session,
timeout=self.write_timeout,
**keystone.get_args(self._config)
)
return c
def _send_message(self, **kwargs):
try:
self._mon_client.metrics.create(**kwargs)
return True
except exceptions.ClientException as ex:
log.exception("ClientException: error sending "
"message to monasca-api.")
self._failure_reason = ('Error sending message to '
'the Monasca API: {0}').format(str(ex))
except Exception:
log.exception("Error sending message to Monasca API.")
self._failure_reason = 'The Monasca API is DOWN or unreachable'
return False
def _queue_message(self, msg, reason):
if self._max_buffer_size == 0 or self._max_measurement_buffer_size == 0:
return
self.message_queue.append(json.dumps(msg))
for value in msg.values():
self._current_number_measurements += len(value)
if self._max_measurement_buffer_size > -1:
while self._current_number_measurements > self._max_measurement_buffer_size:
self._remove_oldest_from_queue()
if self._log_interval_remaining <= 1:
log.warn("{0}. Queuing the messages to send later...".format(reason))
log.info("Current agent queue size: {0} of {1}.".format(len(self.message_queue),
self._max_buffer_size))
log.info("Current measurements in queue: {0} of {1}".format(
self._current_number_measurements, self._max_measurement_buffer_size))
log.info("A message will be logged for every {0} messages queued.".format(MonascaAPI.LOG_INTERVAL))
self._log_interval_remaining = MonascaAPI.LOG_INTERVAL
else:
self._log_interval_remaining -= 1
def _remove_oldest_from_queue(self):
removed_batch = json.loads(self.message_queue.popleft())
num_discarded = 0
for value in removed_batch.values():
num_discarded += len(value)
self._current_number_measurements -= num_discarded
log.warn("Queue too large, discarding oldest batch: {0} measurements discarded".format(
num_discarded))