2016-03-08 13:59:27 -07:00
|
|
|
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
|
2014-12-17 12:01:44 -07:00
|
|
|
""" Aggregation classes used by the collector and statsd to batch messages sent to the forwarder.
|
2014-05-01 15:58:49 -06:00
|
|
|
"""
|
2016-03-08 13:59:27 -07:00
|
|
|
import json
|
2014-02-24 15:02:15 -07:00
|
|
|
import logging
|
2015-07-21 16:06:43 -06:00
|
|
|
import re
|
2014-02-24 15:02:15 -07:00
|
|
|
from time import time
|
2014-05-02 15:06:54 -06:00
|
|
|
|
2015-01-14 17:22:48 -07:00
|
|
|
import monasca_agent.common.metrics as metrics_pkg
|
2014-05-02 15:06:54 -06:00
|
|
|
|
2014-02-28 10:19:02 -07:00
|
|
|
|
2014-02-24 15:02:15 -07:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
# This is used to ensure that metrics with a timestamp older than
|
|
|
|
# RECENT_POINT_THRESHOLD_DEFAULT seconds (or the value passed in to
|
|
|
|
# the MetricsAggregator constructor) get discarded rather than being
|
|
|
|
# input into the incorrect bucket. Currently, the MetricsAggregator
|
|
|
|
# does not support submitting values for the past, and all values get
|
|
|
|
# submitted for the timestamp passed into the flush() function.
|
|
|
|
RECENT_POINT_THRESHOLD_DEFAULT = 3600
|
2016-03-08 13:59:27 -07:00
|
|
|
VALUE_META_MAX_NUMBER = 16
|
|
|
|
VALUE_META_VALUE_MAX_LENGTH = 2048
|
|
|
|
VALUE_META_NAME_MAX_LENGTH = 255
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-08-27 13:22:02 -06:00
|
|
|
invalid_chars = "<>={}(),\"\\\\;&"
|
|
|
|
restricted_dimension_chars = re.compile('[' + invalid_chars + ']')
|
|
|
|
restricted_name_chars = re.compile('[' + invalid_chars + ' ' + ']')
|
2015-07-21 16:06:43 -06:00
|
|
|
|
|
|
|
|
|
|
|
class InvalidMetricName(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidDimensionKey(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
|
|
class InvalidDimensionValue(Exception):
|
|
|
|
pass
|
|
|
|
|
2014-04-29 10:06:05 -06:00
|
|
|
|
2015-08-21 08:12:05 -06:00
|
|
|
class InvalidValue(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2016-03-08 13:59:27 -07:00
|
|
|
class InvalidValueMeta(Exception):
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
class MetricsAggregator(object):
|
2015-07-26 22:10:37 -06:00
|
|
|
"""A metric aggregator class."""
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-02-19 12:24:23 -07:00
|
|
|
def __init__(self, hostname, recent_point_threshold=None):
|
2014-02-24 15:02:15 -07:00
|
|
|
self.events = []
|
|
|
|
self.total_count = 0
|
|
|
|
self.count = 0
|
|
|
|
self.event_count = 0
|
|
|
|
self.hostname = hostname
|
|
|
|
|
|
|
|
recent_point_threshold = recent_point_threshold or RECENT_POINT_THRESHOLD_DEFAULT
|
|
|
|
self.recent_point_threshold = int(recent_point_threshold)
|
|
|
|
self.num_discarded_old_points = 0
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
self.metrics = {}
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2014-07-17 17:08:36 -07:00
|
|
|
def event(
|
|
|
|
self,
|
|
|
|
title,
|
|
|
|
text,
|
|
|
|
date_happened=None,
|
|
|
|
alert_type=None,
|
|
|
|
aggregation_key=None,
|
|
|
|
source_type_name=None,
|
|
|
|
priority=None,
|
|
|
|
dimensions=None,
|
|
|
|
hostname=None):
|
2015-07-26 22:10:37 -06:00
|
|
|
event = {'msg_title': title,
|
|
|
|
'msg_text': text}
|
2014-03-26 16:20:57 -06:00
|
|
|
if date_happened is not None:
|
|
|
|
event['timestamp'] = date_happened
|
|
|
|
else:
|
|
|
|
event['timestamp'] = int(time())
|
|
|
|
if alert_type is not None:
|
|
|
|
event['alert_type'] = alert_type
|
|
|
|
if aggregation_key is not None:
|
|
|
|
event['aggregation_key'] = aggregation_key
|
|
|
|
if source_type_name is not None:
|
|
|
|
event['source_type_name'] = source_type_name
|
|
|
|
if priority is not None:
|
|
|
|
event['priority'] = priority
|
2014-05-02 16:41:50 -06:00
|
|
|
if dimensions is not None:
|
|
|
|
event['dimensions'] = dimensions
|
2014-03-26 16:20:57 -06:00
|
|
|
if hostname is not None:
|
|
|
|
event['host'] = hostname
|
|
|
|
else:
|
|
|
|
event['host'] = self.hostname
|
|
|
|
|
|
|
|
self.events.append(event)
|
|
|
|
|
|
|
|
def flush(self):
|
2015-01-12 16:15:52 -07:00
|
|
|
timestamp = time()
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-02-03 09:24:08 -07:00
|
|
|
# Flush samples. The individual metrics reset their internal samples
|
|
|
|
# when required
|
2014-03-26 16:20:57 -06:00
|
|
|
metrics = []
|
2015-01-12 16:15:52 -07:00
|
|
|
for context, metric in self.metrics.items():
|
2015-03-12 11:36:35 -06:00
|
|
|
try:
|
|
|
|
metrics.extend(metric.flush(timestamp))
|
2015-07-26 22:10:37 -06:00
|
|
|
except Exception:
|
2015-03-12 11:36:35 -06:00
|
|
|
log.exception('Error flushing {0} metrics.'.format(metric.name))
|
2014-03-26 16:20:57 -06:00
|
|
|
|
|
|
|
# Log a warning regarding metrics with old timestamps being submitted
|
|
|
|
if self.num_discarded_old_points > 0:
|
2015-01-12 16:15:52 -07:00
|
|
|
log.warn('{0} points were discarded as a result of having an old timestamp'.format(
|
|
|
|
self.num_discarded_old_points))
|
2014-03-26 16:20:57 -06:00
|
|
|
self.num_discarded_old_points = 0
|
|
|
|
|
|
|
|
# Save some stats.
|
2015-01-12 16:15:52 -07:00
|
|
|
log.debug("received {0} payloads since last flush".format(self.count))
|
2014-03-26 16:20:57 -06:00
|
|
|
self.total_count += self.count
|
|
|
|
self.count = 0
|
|
|
|
return metrics
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def flush_events(self):
|
|
|
|
events = self.events
|
|
|
|
self.events = []
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
self.total_count += self.event_count
|
|
|
|
self.event_count = 0
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
log.debug("Received {0} events since last flush".format(len(events)))
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
return events
|
2014-10-03 14:26:55 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
@staticmethod
|
2015-03-12 11:36:35 -06:00
|
|
|
def formatter(metric, value, timestamp, dimensions, hostname, delegated_tenant=None,
|
|
|
|
device_name=None, metric_type=None, value_meta=None):
|
2015-07-26 22:10:37 -06:00
|
|
|
"""Formats metrics, put them into a Measurement class
|
|
|
|
(metric, timestamp, value, {"dimensions": {"name1": "value1", "name2": "value2"}, ...})
|
|
|
|
dimensions should be a dictionary
|
2015-01-12 16:15:52 -07:00
|
|
|
"""
|
2015-06-24 11:26:11 -06:00
|
|
|
if 'hostname' not in dimensions and hostname:
|
2015-01-12 16:15:52 -07:00
|
|
|
dimensions.update({'hostname': hostname})
|
|
|
|
if device_name:
|
|
|
|
dimensions.update({'device': device_name})
|
2014-05-06 09:55:15 -06:00
|
|
|
|
2015-01-14 17:22:48 -07:00
|
|
|
return metrics_pkg.Measurement(metric,
|
|
|
|
int(timestamp),
|
|
|
|
value,
|
|
|
|
dimensions,
|
2015-03-12 11:36:35 -06:00
|
|
|
delegated_tenant=delegated_tenant,
|
|
|
|
value_meta=value_meta)
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def packets_per_second(self, interval):
|
|
|
|
if interval == 0:
|
|
|
|
return 0
|
|
|
|
return round(float(self.count) / interval, 2)
|
|
|
|
|
2016-03-08 13:59:27 -07:00
|
|
|
def _valid_value_meta(self, value_meta, name, dimensions):
|
|
|
|
if len(value_meta) > VALUE_META_MAX_NUMBER:
|
|
|
|
msg = "Too many valueMeta entries {0}, limit is {1}: {2} -> {3} valueMeta {4}"
|
|
|
|
log.error(msg.format(len(value_meta), VALUE_META_MAX_NUMBER, name, dimensions, value_meta))
|
|
|
|
return False
|
|
|
|
for key, value in value_meta.iteritems():
|
|
|
|
if not key:
|
|
|
|
log.error("valueMeta name cannot be empty: {0} -> {1}".format(name, dimensions))
|
|
|
|
return False
|
|
|
|
if len(key) > VALUE_META_NAME_MAX_LENGTH:
|
|
|
|
msg = "valueMeta name {0} must be {1} characters or less: {2} -> {3}"
|
|
|
|
log.error(msg.format(key, VALUE_META_NAME_MAX_LENGTH, name, dimensions))
|
|
|
|
return False
|
|
|
|
|
|
|
|
try:
|
|
|
|
value_meta_json = json.dumps(value_meta)
|
|
|
|
if len(value_meta_json) > VALUE_META_VALUE_MAX_LENGTH:
|
|
|
|
msg = "valueMeta name value combinations must be {0} characters or less: {1} -> {2} valueMeta {3}"
|
|
|
|
log.error(msg.format(VALUE_META_VALUE_MAX_LENGTH, name, dimensions, value_meta))
|
|
|
|
return False
|
|
|
|
except Exception:
|
|
|
|
log.error("Unable to serialize valueMeta into JSON: {2} -> {3}".format(name, dimensions))
|
|
|
|
return False
|
|
|
|
|
|
|
|
return True
|
|
|
|
|
2015-06-11 09:29:33 -06:00
|
|
|
def submit_metric(self, name, value, metric_class, dimensions=None,
|
2015-01-12 16:15:52 -07:00
|
|
|
delegated_tenant=None, hostname=None, device_name=None,
|
2015-03-12 11:36:35 -06:00
|
|
|
value_meta=None, timestamp=None, sample_rate=1):
|
2015-07-21 16:06:43 -06:00
|
|
|
if dimensions:
|
|
|
|
for k, v in dimensions.iteritems():
|
2015-08-24 09:26:48 -06:00
|
|
|
if not isinstance(k, (str, unicode)):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid dimension key {0} must be a string: {1} -> {2}".format(k, name, dimensions))
|
|
|
|
raise InvalidDimensionKey
|
|
|
|
if len(k) > 255 or len(k) < 1:
|
|
|
|
log.error("invalid length for dimension key {0}: {1} -> {2}".format(k, name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionKey
|
2015-08-27 13:22:02 -06:00
|
|
|
if restricted_dimension_chars.search(k) or re.match('^_', k):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid characters in dimension key {0}: {1} -> {2}".format(k, name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionKey
|
|
|
|
|
2015-08-24 09:26:48 -06:00
|
|
|
if not isinstance(v, (str, unicode)):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid dimension value {0} for key {1} must be a string: {2} -> {3}".format(v, k, name,
|
|
|
|
dimensions))
|
|
|
|
raise InvalidDimensionValue
|
|
|
|
if len(v) > 255 or len(v) < 1:
|
|
|
|
log.error("invalid length dimension value {0} for key {1}: {2} -> {3}".format(v, k, name,
|
|
|
|
dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionValue
|
2015-08-27 13:22:02 -06:00
|
|
|
if restricted_dimension_chars.search(v):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid characters in dimension value {0} for key {1}: {2} -> {3}".format(v, k, name,
|
|
|
|
dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidDimensionValue
|
|
|
|
|
2015-08-24 09:26:48 -06:00
|
|
|
if not isinstance(name, (str, unicode)):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid metric name must be a string: {0} -> {1}".format(name, dimensions))
|
|
|
|
raise InvalidMetricName
|
|
|
|
if len(name) > 255 or len(name) < 1:
|
|
|
|
log.error("invalid length for metric name: {0} -> {1}".format(name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidMetricName
|
2015-08-27 13:22:02 -06:00
|
|
|
if restricted_name_chars.search(name):
|
2015-08-21 08:12:05 -06:00
|
|
|
log.error("invalid characters in metric name: {0} -> {1}".format(name, dimensions))
|
2015-07-21 16:06:43 -06:00
|
|
|
raise InvalidMetricName
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-08-21 08:12:05 -06:00
|
|
|
if not isinstance(value, (int, long, float)):
|
|
|
|
log.error("invalid value {0} is not of number type for metric {1}".format(value, name))
|
|
|
|
raise InvalidValue
|
|
|
|
|
2015-03-12 11:36:35 -06:00
|
|
|
if value_meta:
|
2016-03-08 13:59:27 -07:00
|
|
|
if not self._valid_value_meta(value_meta, name, dimensions):
|
|
|
|
raise InvalidValueMeta
|
2015-03-12 11:36:35 -06:00
|
|
|
meta = tuple(value_meta.items())
|
|
|
|
else:
|
|
|
|
meta = None
|
|
|
|
|
|
|
|
context = (name, tuple(dimensions.items()), meta, delegated_tenant,
|
|
|
|
hostname, device_name)
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
if context not in self.metrics:
|
|
|
|
self.metrics[context] = metric_class(self.formatter,
|
|
|
|
name,
|
2015-01-14 17:22:48 -07:00
|
|
|
dimensions,
|
2015-01-12 16:15:52 -07:00
|
|
|
hostname or self.hostname,
|
|
|
|
device_name,
|
2015-03-12 11:36:35 -06:00
|
|
|
delegated_tenant,
|
|
|
|
value_meta)
|
2015-01-12 16:15:52 -07:00
|
|
|
cur_time = time()
|
2015-02-03 09:24:08 -07:00
|
|
|
if timestamp is not None:
|
|
|
|
if cur_time - int(timestamp) > self.recent_point_threshold:
|
|
|
|
log.debug("Discarding {0} - ts = {1}, current ts = {2} ".format(name, timestamp, cur_time))
|
|
|
|
self.num_discarded_old_points += 1
|
|
|
|
return
|
2015-01-12 16:15:52 -07:00
|
|
|
else:
|
2015-02-03 09:24:08 -07:00
|
|
|
timestamp = cur_time
|
|
|
|
self.metrics[context].sample(value, sample_rate, timestamp)
|