
The repo is Python 3 now, so update hacking to version 3.0 which supports Python 3. Fix problems found. Update local hacking checks for new flake8. Change-Id: I6396403d0a62f5403fc5b7fb04b6ce790c332c84
201 lines
7.7 KiB
Python
201 lines
7.7 KiB
Python
# (C) Copyright 2015-2016,2018 Hewlett Packard Enterprise Development LP
|
|
# Copyright 2017 Fujitsu LIMITED
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import copy
|
|
import json
|
|
import logging
|
|
|
|
from osc_lib import exceptions
|
|
|
|
from monascaclient import client
|
|
|
|
from monasca_agent.common import keystone
|
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
class MonascaAPI(object):
|
|
|
|
"""Sends measurements to MonascaAPI
|
|
Any errors should raise an exception so the transaction calling
|
|
this is not committed
|
|
"""
|
|
|
|
LOG_INTERVAL = 10 # messages
|
|
|
|
def __init__(self, config):
|
|
"""Initialize Mon api client connection."""
|
|
self._config = config
|
|
self._mon_client = None
|
|
self._api_version = '2_0'
|
|
|
|
self._failure_reason = None
|
|
self._log_interval_remaining = 1
|
|
self._current_number_measurements = 0
|
|
self._max_buffer_size = int(config['max_buffer_size'])
|
|
self._max_batch_size = int(config['max_batch_size'])
|
|
self._max_measurement_buffer_size = int(
|
|
config['max_measurement_buffer_size'])
|
|
|
|
if self._max_buffer_size > -1:
|
|
log.debug("'max_buffer_size' is deprecated. Please use"
|
|
" 'max_measurement_buffer_size' instead")
|
|
if self._max_measurement_buffer_size > -1:
|
|
log.debug("Overriding 'max_buffer_size' option with new"
|
|
" 'max_measurment_buffer_size' option")
|
|
self._max_buffer_size = -1
|
|
|
|
self.backlog_send_rate = int(config['backlog_send_rate'])
|
|
if self._max_buffer_size > -1:
|
|
self.message_queue = collections.deque(maxlen=self._max_buffer_size)
|
|
else:
|
|
self.message_queue = collections.deque()
|
|
self.write_timeout = int(config['write_timeout'])
|
|
# 'amplifier' is completely optional and may not exist in the config
|
|
try:
|
|
self.amplifier = int(config['amplifier'])
|
|
except KeyError:
|
|
self.amplifier = None
|
|
|
|
def _post(self, measurements, tenant=None):
|
|
"""Does the actual http post
|
|
measurements is a list of Measurement
|
|
"""
|
|
kwargs = {
|
|
'jsonbody': measurements
|
|
}
|
|
|
|
if tenant:
|
|
kwargs['tenant_id'] = tenant
|
|
|
|
if not self._mon_client:
|
|
self._mon_client = self._get_mon_client()
|
|
if not self._mon_client:
|
|
# Keystone is down, queue the message
|
|
self._queue_message(kwargs.copy(), "Keystone API is down or unreachable")
|
|
return
|
|
|
|
if self._send_message(**kwargs):
|
|
if len(self.message_queue) > 0:
|
|
messages_sent = 0
|
|
for index in range(0, len(self.message_queue)):
|
|
if index < self.backlog_send_rate:
|
|
|
|
msg = json.loads(self.message_queue.pop())
|
|
|
|
if self._send_message(**msg):
|
|
messages_sent += 1
|
|
for value in msg.values():
|
|
self._current_number_measurements -= len(value)
|
|
else:
|
|
self._queue_message(msg, self._failure_reason)
|
|
break
|
|
else:
|
|
break
|
|
log.info("Sent {0} messages from the backlog.".format(messages_sent))
|
|
log.info("{0} messages remaining in the queue.".format(len(self.message_queue)))
|
|
self._log_interval_remaining = 0
|
|
else:
|
|
self._queue_message(kwargs.copy(), self._failure_reason)
|
|
|
|
def post_metrics(self, measurements):
|
|
"""post_metrics
|
|
given [Measurement, ...], format the request and post to
|
|
the monitoring api
|
|
"""
|
|
# Add default dimensions
|
|
for envelope in measurements:
|
|
measurement = envelope['measurement']
|
|
if isinstance(measurement['dimensions'], list):
|
|
measurement['dimensions'] = dict([(d[0], d[1]) for d in measurement['dimensions']])
|
|
|
|
# Split out separate POSTs for each delegated tenant (includes 'None')
|
|
tenant_group = {}
|
|
for envelope in measurements:
|
|
measurement = envelope['measurement']
|
|
tenant = envelope['tenant_id']
|
|
tenant_group.setdefault(tenant, []).append(copy.deepcopy(measurement))
|
|
if self._max_batch_size and len(tenant_group[tenant]) >= self._max_batch_size:
|
|
self._post(tenant_group[tenant], tenant)
|
|
del tenant_group[tenant]
|
|
|
|
for tenant in tenant_group:
|
|
self._post(tenant_group[tenant], tenant)
|
|
|
|
def _get_mon_client(self):
|
|
k = keystone.Keystone(self._config)
|
|
endpoint = k.get_monasca_url()
|
|
session = k.get_session()
|
|
c = client.Client(
|
|
api_version=self._api_version,
|
|
endpoint=endpoint,
|
|
session=session,
|
|
timeout=self.write_timeout,
|
|
**keystone.get_args(self._config)
|
|
)
|
|
return c
|
|
|
|
def _send_message(self, **kwargs):
|
|
try:
|
|
self._mon_client.metrics.create(**kwargs)
|
|
return True
|
|
except exceptions.ClientException as ex:
|
|
log.exception("ClientException: error sending "
|
|
"message to monasca-api.")
|
|
self._failure_reason = ('Error sending message to '
|
|
'the Monasca API: {0}').format(str(ex))
|
|
except Exception:
|
|
log.exception("Error sending message to Monasca API.")
|
|
self._failure_reason = 'The Monasca API is DOWN or unreachable'
|
|
|
|
return False
|
|
|
|
def _queue_message(self, msg, reason):
|
|
if self._max_buffer_size == 0 or self._max_measurement_buffer_size == 0:
|
|
return
|
|
|
|
self.message_queue.append(json.dumps(msg))
|
|
|
|
for value in msg.values():
|
|
self._current_number_measurements += len(value)
|
|
|
|
if self._max_measurement_buffer_size > -1:
|
|
while self._current_number_measurements > self._max_measurement_buffer_size:
|
|
self._remove_oldest_from_queue()
|
|
|
|
if self._log_interval_remaining <= 1:
|
|
log.warn("{0}. Queuing the messages to send later...".format(reason))
|
|
log.info("Current agent queue size: {0} of {1}.".format(len(self.message_queue),
|
|
self._max_buffer_size))
|
|
log.info("Current measurements in queue: {0} of {1}".format(
|
|
self._current_number_measurements, self._max_measurement_buffer_size))
|
|
|
|
log.info(
|
|
"A message will be logged for every {0} messages queued.".format(
|
|
MonascaAPI.LOG_INTERVAL))
|
|
self._log_interval_remaining = MonascaAPI.LOG_INTERVAL
|
|
else:
|
|
self._log_interval_remaining -= 1
|
|
|
|
def _remove_oldest_from_queue(self):
|
|
removed_batch = json.loads(self.message_queue.popleft())
|
|
num_discarded = 0
|
|
for value in removed_batch.values():
|
|
num_discarded += len(value)
|
|
self._current_number_measurements -= num_discarded
|
|
log.warn("Queue too large, discarding oldest batch: {0} measurements discarded".format(
|
|
num_discarded))
|