2016-06-23 15:51:48 -06:00
|
|
|
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
|
2014-12-17 12:01:44 -07:00
|
|
|
""" Aggregation classes used by the collector and statsd to batch messages sent to the forwarder.
|
2014-05-01 15:58:49 -06:00
|
|
|
"""
|
2016-03-08 13:59:27 -07:00
|
|
|
import json
|
2014-02-24 15:02:15 -07:00
|
|
|
import logging
|
2015-07-21 16:06:43 -06:00
|
|
|
import re
|
2014-02-24 15:02:15 -07:00
|
|
|
from time import time
|
2014-05-02 15:06:54 -06:00
|
|
|
|
2015-01-14 17:22:48 -07:00
|
|
|
import monasca_agent.common.metrics as metrics_pkg
|
2014-05-02 15:06:54 -06:00
|
|
|
|
2014-02-28 10:19:02 -07:00
|
|
|
|
2014-02-24 15:02:15 -07:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
# This is used to ensure that metrics with a timestamp older than
|
|
|
|
# RECENT_POINT_THRESHOLD_DEFAULT seconds (or the value passed in to
|
|
|
|
# the MetricsAggregator constructor) get discarded rather than being
|
|
|
|
# input into the incorrect bucket. Currently, the MetricsAggregator
|
|
|
|
# does not support submitting values for the past, and all values get
|
|
|
|
# submitted for the timestamp passed into the flush() function.
|
|
|
|
RECENT_POINT_THRESHOLD_DEFAULT = 3600
|
2016-03-08 13:59:27 -07:00
|
|
|
VALUE_META_MAX_NUMBER = 16
|
|
|
|
VALUE_META_VALUE_MAX_LENGTH = 2048
|
|
|
|
VALUE_META_NAME_MAX_LENGTH = 255
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-08-27 13:22:02 -06:00
|
|
|
invalid_chars = "<>={}(),\"\\\\;&"
|
|
|
|
restricted_dimension_chars = re.compile('[' + invalid_chars + ']')
|
|
|
|
restricted_name_chars = re.compile('[' + invalid_chars + ' ' + ']')
|
2015-07-21 16:06:43 -06:00
|
|
|
|
|
|
|
|
|
|
|
class InvalidMetricName(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidDimensionKey(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidDimensionValue(Exception):
|
|
|
|
pass
|
|
|
|
|
2014-04-29 10:06:05 -06:00
|
|
|
|
2015-08-21 08:12:05 -06:00
|
|
|
class InvalidValue(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2016-03-08 13:59:27 -07:00
|
|
|
class InvalidValueMeta(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
class MetricsAggregator(object):
|
2015-07-26 22:10:37 -06:00
|
|
|
"""A metric aggregator class."""
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2017-01-10 11:21:34 -07:00
|
|
|
def __init__(self, hostname, recent_point_threshold=None, tenant_id=None):
|
2014-02-24 15:02:15 -07:00
|
|
|
self.total_count = 0
|
|
|
|
self.count = 0
|
|
|
|
self.hostname = hostname
|
2017-01-10 11:21:34 -07:00
|
|
|
self.global_delegated_tenant = tenant_id
|
2014-02-24 15:02:15 -07:00
|
|
|
|
|
|
|
recent_point_threshold = recent_point_threshold or RECENT_POINT_THRESHOLD_DEFAULT
|
|
|
|
self.recent_point_threshold = int(recent_point_threshold)
|
|
|
|
self.num_discarded_old_points = 0
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
self.metrics = {}
|
2014-03-26 16:20:57 -06:00
|
|
|
|
|
|
|
def flush(self):
|
2015-02-03 09:24:08 -07:00
|
|
|
# Flush samples. The individual metrics reset their internal samples
|
|
|
|
# when required
|
2014-03-26 16:20:57 -06:00
|
|
|
metrics = []
|
2015-01-12 16:15:52 -07:00
|
|
|
for context, metric in self.metrics.items():
|
2015-03-12 11:36:35 -06:00
|
|
|
try:
|
2016-06-23 15:51:48 -06:00
|
|
|
metrics.extend(metric.flush())
|
2015-07-26 22:10:37 -06:00
|
|
|
except Exception:
|
2015-03-12 11:36:35 -06:00
|
|
|
log.exception('Error flushing {0} metrics.'.format(metric.name))
|
2014-03-26 16:20:57 -06:00
|
|
|
|
|
|
|
# Log a warning regarding metrics with old timestamps being submitted
|
|
|
|
if self.num_discarded_old_points > 0:
|
2015-01-12 16:15:52 -07:00
|
|
|
log.warn('{0} points were discarded as a result of having an old timestamp'.format(
|
|
|
|
self.num_discarded_old_points))
|
2014-03-26 16:20:57 -06:00
|
|
|
self.num_discarded_old_points = 0
|
|
|
|
|
|
|
|
# Save some stats.
|
2015-01-12 16:15:52 -07:00
|
|
|
log.debug("received {0} payloads since last flush".format(self.count))
|
2014-03-26 16:20:57 -06:00
|
|
|
self.total_count += self.count
|
|
|
|
self.count = 0
|
|
|
|
return metrics
|
|
|
|
|
2016-04-13 10:24:29 -06:00
|
|
|
def get_hostname_to_post(self, hostname):
|
|
|
|
if 'SUPPRESS' == hostname:
|
|
|
|
return None
|
|
|
|
return hostname or self.hostname
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def packets_per_second(self, interval):
|
|
|
|
if interval == 0:
|
|
|
|
return 0
|
|
|
|
return round(float(self.count) / interval, 2)
|
|
|
|
|
2016-03-08 13:59:27 -07:00
|
|
|
def _valid_value_meta(self, value_meta, name, dimensions):
|
|
|
|
if len(value_meta) > VALUE_META_MAX_NUMBER:
|
|
|
|
msg = "Too many valueMeta entries {0}, limit is {1}: {2} -> {3} valueMeta {4}"
|
|
|
|
log.error(msg.format(len(value_meta), VALUE_META_MAX_NUMBER, name, dimensions, value_meta))
|
|
|
|
return False
|
2017-04-02 23:38:23 +08:00
|
|
|
for key, value in value_meta.items():
|
2016-03-08 13:59:27 -07:00
|
|
|
if not key:
|
|
|
|
log.error("valueMeta name cannot be empty: {0} -> {1}".format(name, dimensions))
|
|
|
|
return False
|
|
|
|
if len(key) > VALUE_META_NAME_MAX_LENGTH:
|
|
|
|
msg = "valueMeta name {0} must be {1} characters or less: {2} -> {3}"
|
|
|
|
log.error(msg.format(key, VALUE_META_NAME_MAX_LENGTH, name, dimensions))
|
|
|
|
return False
|
|
|
|
|
|
|
|
try:
|
2016-09-27 08:20:33 -06:00
|
|
|
if get_value_meta_overage(value_meta):
|
2016-03-08 13:59:27 -07:00
|
|
|
msg = "valueMeta name value combinations must be {0} characters or less: {1} -> {2} valueMeta {3}"
|
|
|
|
log.error(msg.format(VALUE_META_VALUE_MAX_LENGTH, name, dimensions, value_meta))
|
|
|
|
return False
|
|
|
|
except Exception:
|
|
|
|
log.error("Unable to serialize valueMeta into JSON: {2} -> {3}".format(name, dimensions))
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
2015-06-11 09:29:33 -06:00
|
|
|
def submit_metric(self, name, value, metric_class, dimensions=None,
|
2015-01-12 16:15:52 -07:00
|
|
|
delegated_tenant=None, hostname=None, device_name=None,
|
2015-03-12 11:36:35 -06:00
|
|
|
value_meta=None, timestamp=None, sample_rate=1):
|
2015-07-21 16:06:43 -06:00
|
|
|
if dimensions:
|
2017-04-02 23:38:23 +08:00
|
|
|
for k, v in dimensions.items():
|
2015-08-24 09:26:48 -06:00
|
|
|
if not isinstance(k, (str, unicode)):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid dimension key {0} must be a string: {1} -> {2}".format(k, name, dimensions))
|
|
|
|
raise InvalidDimensionKey
|
|
|
|
if len(k) > 255 or len(k) < 1:
|
|
|
|
log.error("invalid length for dimension key {0}: {1} -> {2}".format(k, name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionKey
|
2015-08-27 13:22:02 -06:00
|
|
|
if restricted_dimension_chars.search(k) or re.match('^_', k):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid characters in dimension key {0}: {1} -> {2}".format(k, name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionKey
|
|
|
|
|
2015-08-24 09:26:48 -06:00
|
|
|
if not isinstance(v, (str, unicode)):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid dimension value {0} for key {1} must be a string: {2} -> {3}".format(v, k, name,
|
|
|
|
dimensions))
|
|
|
|
raise InvalidDimensionValue
|
|
|
|
if len(v) > 255 or len(v) < 1:
|
|
|
|
log.error("invalid length dimension value {0} for key {1}: {2} -> {3}".format(v, k, name,
|
|
|
|
dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionValue
|
2015-08-27 13:22:02 -06:00
|
|
|
if restricted_dimension_chars.search(v):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid characters in dimension value {0} for key {1}: {2} -> {3}".format(v, k, name,
|
|
|
|
dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionValue
|
|
|
|
|
2015-08-24 09:26:48 -06:00
|
|
|
if not isinstance(name, (str, unicode)):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid metric name must be a string: {0} -> {1}".format(name, dimensions))
|
|
|
|
raise InvalidMetricName
|
|
|
|
if len(name) > 255 or len(name) < 1:
|
|
|
|
log.error("invalid length for metric name: {0} -> {1}".format(name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidMetricName
|
2015-08-27 13:22:02 -06:00
|
|
|
if restricted_name_chars.search(name):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid characters in metric name: {0} -> {1}".format(name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidMetricName
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-08-21 08:12:05 -06:00
|
|
|
if not isinstance(value, (int, long, float)):
|
|
|
|
log.error("invalid value {0} is not of number type for metric {1}".format(value, name))
|
|
|
|
raise InvalidValue
|
|
|
|
|
2015-03-12 11:36:35 -06:00
|
|
|
if value_meta:
|
2016-03-08 13:59:27 -07:00
|
|
|
if not self._valid_value_meta(value_meta, name, dimensions):
|
|
|
|
raise InvalidValueMeta
|
2015-03-12 11:36:35 -06:00
|
|
|
|
2016-04-13 10:24:29 -06:00
|
|
|
hostname_to_post = self.get_hostname_to_post(hostname)
|
2016-06-23 15:51:48 -06:00
|
|
|
|
2017-01-10 11:21:34 -07:00
|
|
|
tenant_to_post = delegated_tenant or self.global_delegated_tenant
|
|
|
|
|
2016-11-30 09:35:47 -07:00
|
|
|
dimensions_copy = dimensions.copy()
|
|
|
|
|
|
|
|
if 'hostname' not in dimensions_copy and hostname_to_post:
|
|
|
|
dimensions_copy.update({'hostname': hostname_to_post})
|
2016-06-23 15:51:48 -06:00
|
|
|
|
|
|
|
# TODO(joe): Shouldn't device_name be added to dimensions in the check
|
|
|
|
# plugin? Why is it special cased through so many layers?
|
|
|
|
if device_name:
|
2016-11-30 09:35:47 -07:00
|
|
|
dimensions_copy.update({'device': device_name})
|
2016-06-23 15:51:48 -06:00
|
|
|
|
|
|
|
# TODO(joe): Decide if hostname_to_post and device_name are necessary
|
|
|
|
# for the context tuple
|
2017-01-10 11:21:34 -07:00
|
|
|
context = (name, tuple(dimensions_copy.items()), tenant_to_post,
|
2016-04-13 10:24:29 -06:00
|
|
|
hostname_to_post, device_name)
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
if context not in self.metrics:
|
2016-06-23 15:51:48 -06:00
|
|
|
self.metrics[context] = metric_class(name,
|
2016-11-30 09:35:47 -07:00
|
|
|
dimensions_copy,
|
2017-01-10 11:21:34 -07:00
|
|
|
tenant=tenant_to_post)
|
2015-01-12 16:15:52 -07:00
|
|
|
cur_time = time()
|
2015-02-03 09:24:08 -07:00
|
|
|
if timestamp is not None:
|
|
|
|
if cur_time - int(timestamp) > self.recent_point_threshold:
|
|
|
|
log.debug("Discarding {0} - ts = {1}, current ts = {2} ".format(name, timestamp, cur_time))
|
|
|
|
self.num_discarded_old_points += 1
|
|
|
|
return
|
2015-01-12 16:15:52 -07:00
|
|
|
else:
|
2015-02-03 09:24:08 -07:00
|
|
|
timestamp = cur_time
|
2016-06-23 15:51:48 -06:00
|
|
|
self.metrics[context].value_meta = value_meta
|
2015-02-03 09:24:08 -07:00
|
|
|
self.metrics[context].sample(value, sample_rate, timestamp)
|
2016-09-27 08:20:33 -06:00
|
|
|
|
|
|
|
|
|
|
|
def get_value_meta_overage(value_meta):
|
|
|
|
if len(json.dumps(value_meta)) > VALUE_META_VALUE_MAX_LENGTH:
|
|
|
|
return len(json.dumps(value_meta)) - VALUE_META_VALUE_MAX_LENGTH
|
|
|
|
return 0
|