2014-12-17 12:01:44 -07:00
|
|
|
""" Aggregation classes used by the collector and statsd to batch messages sent to the forwarder.
|
2014-05-01 15:58:49 -06:00
|
|
|
"""
|
2014-02-24 15:02:15 -07:00
|
|
|
import logging
|
|
|
|
from time import time
|
2014-05-02 15:06:54 -06:00
|
|
|
|
2015-01-14 17:22:48 -07:00
|
|
|
import monasca_agent.common.metrics as metrics_pkg
|
2014-05-02 15:06:54 -06:00
|
|
|
|
2014-02-28 10:19:02 -07:00
|
|
|
|
2014-02-24 15:02:15 -07:00
|
|
|
log = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
# This is used to ensure that metrics with a timestamp older than
|
|
|
|
# RECENT_POINT_THRESHOLD_DEFAULT seconds (or the value passed in to
|
|
|
|
# the MetricsAggregator constructor) get discarded rather than being
|
|
|
|
# input into the incorrect bucket. Currently, the MetricsAggregator
|
|
|
|
# does not support submitting values for the past, and all values get
|
|
|
|
# submitted for the timestamp passed into the flush() function.
|
|
|
|
RECENT_POINT_THRESHOLD_DEFAULT = 3600
|
|
|
|
|
2014-04-29 10:06:05 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
class MetricsAggregator(object):
|
2014-07-01 14:27:12 -07:00
|
|
|
|
2014-02-24 15:02:15 -07:00
|
|
|
"""
|
2015-01-12 16:15:52 -07:00
|
|
|
A metric aggregator class.
|
2014-02-24 15:02:15 -07:00
|
|
|
"""
|
|
|
|
|
2015-02-19 12:24:23 -07:00
|
|
|
def __init__(self, hostname, recent_point_threshold=None):
|
2014-02-24 15:02:15 -07:00
|
|
|
self.events = []
|
|
|
|
self.total_count = 0
|
|
|
|
self.count = 0
|
|
|
|
self.event_count = 0
|
|
|
|
self.hostname = hostname
|
|
|
|
|
|
|
|
recent_point_threshold = recent_point_threshold or RECENT_POINT_THRESHOLD_DEFAULT
|
|
|
|
self.recent_point_threshold = int(recent_point_threshold)
|
|
|
|
self.num_discarded_old_points = 0
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
self.metrics = {}
|
|
|
|
self.metric_type_to_class = {
|
2015-01-14 17:22:48 -07:00
|
|
|
'g': metrics_pkg.Gauge,
|
|
|
|
'c': metrics_pkg.Counter,
|
|
|
|
'h': metrics_pkg.Histogram,
|
|
|
|
'ms': metrics_pkg.Histogram,
|
|
|
|
's': metrics_pkg.Set,
|
2015-02-03 09:24:08 -07:00
|
|
|
'r': metrics_pkg.Rate,
|
2015-01-12 16:15:52 -07:00
|
|
|
}
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def decrement(self, name, value=-1, dimensions=None, delegated_tenant=None,
|
|
|
|
hostname=None, device_name=None):
|
|
|
|
self.submit_metric(name, value, 'c', dimensions, delegated_tenant,
|
|
|
|
hostname, device_name)
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2014-07-17 17:08:36 -07:00
|
|
|
def event(
|
|
|
|
self,
|
|
|
|
title,
|
|
|
|
text,
|
|
|
|
date_happened=None,
|
|
|
|
alert_type=None,
|
|
|
|
aggregation_key=None,
|
|
|
|
source_type_name=None,
|
|
|
|
priority=None,
|
|
|
|
dimensions=None,
|
|
|
|
hostname=None):
|
2014-03-26 16:20:57 -06:00
|
|
|
event = {
|
|
|
|
'msg_title': title,
|
|
|
|
'msg_text': text,
|
2015-01-12 16:15:52 -07:00
|
|
|
}
|
2014-03-26 16:20:57 -06:00
|
|
|
if date_happened is not None:
|
|
|
|
event['timestamp'] = date_happened
|
|
|
|
else:
|
|
|
|
event['timestamp'] = int(time())
|
|
|
|
if alert_type is not None:
|
|
|
|
event['alert_type'] = alert_type
|
|
|
|
if aggregation_key is not None:
|
|
|
|
event['aggregation_key'] = aggregation_key
|
|
|
|
if source_type_name is not None:
|
|
|
|
event['source_type_name'] = source_type_name
|
|
|
|
if priority is not None:
|
|
|
|
event['priority'] = priority
|
2014-05-02 16:41:50 -06:00
|
|
|
if dimensions is not None:
|
|
|
|
event['dimensions'] = dimensions
|
2014-03-26 16:20:57 -06:00
|
|
|
if hostname is not None:
|
|
|
|
event['host'] = hostname
|
|
|
|
else:
|
|
|
|
event['host'] = self.hostname
|
|
|
|
|
|
|
|
self.events.append(event)
|
|
|
|
|
|
|
|
def flush(self):
|
2015-01-12 16:15:52 -07:00
|
|
|
timestamp = time()
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-02-03 09:24:08 -07:00
|
|
|
# Flush samples. The individual metrics reset their internal samples
|
|
|
|
# when required
|
2014-03-26 16:20:57 -06:00
|
|
|
metrics = []
|
2015-01-12 16:15:52 -07:00
|
|
|
for context, metric in self.metrics.items():
|
2015-02-19 12:24:23 -07:00
|
|
|
metrics.extend(metric.flush(timestamp))
|
2014-03-26 16:20:57 -06:00
|
|
|
|
|
|
|
# Log a warning regarding metrics with old timestamps being submitted
|
|
|
|
if self.num_discarded_old_points > 0:
|
2015-01-12 16:15:52 -07:00
|
|
|
log.warn('{0} points were discarded as a result of having an old timestamp'.format(
|
|
|
|
self.num_discarded_old_points))
|
2014-03-26 16:20:57 -06:00
|
|
|
self.num_discarded_old_points = 0
|
|
|
|
|
|
|
|
# Save some stats.
|
2015-01-12 16:15:52 -07:00
|
|
|
log.debug("received {0} payloads since last flush".format(self.count))
|
2014-03-26 16:20:57 -06:00
|
|
|
self.total_count += self.count
|
|
|
|
self.count = 0
|
|
|
|
return metrics
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def flush_events(self):
|
|
|
|
events = self.events
|
|
|
|
self.events = []
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
self.total_count += self.event_count
|
|
|
|
self.event_count = 0
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
log.debug("Received {0} events since last flush".format(len(events)))
|
2014-03-26 16:20:57 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
return events
|
2014-10-03 14:26:55 -06:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
@staticmethod
|
|
|
|
def formatter(metric, value, timestamp, dimensions, hostname,
|
2015-02-19 12:24:23 -07:00
|
|
|
delegated_tenant=None, device_name=None, metric_type=None):
|
2015-01-12 16:15:52 -07:00
|
|
|
""" Formats metrics, put them into a Measurement class
|
|
|
|
(metric, timestamp, value, {"dimensions": {"name1": "value1", "name2": "value2"}, ...})
|
|
|
|
dimensions should be a dictionary
|
|
|
|
"""
|
|
|
|
if hostname:
|
|
|
|
dimensions.update({'hostname': hostname})
|
|
|
|
if device_name:
|
|
|
|
dimensions.update({'device': device_name})
|
2014-05-06 09:55:15 -06:00
|
|
|
|
2015-01-14 17:22:48 -07:00
|
|
|
return metrics_pkg.Measurement(metric,
|
|
|
|
int(timestamp),
|
|
|
|
value,
|
|
|
|
dimensions,
|
|
|
|
delegated_tenant)
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2014-10-03 14:26:55 -06:00
|
|
|
def gauge(self, name, value, dimensions=None, delegated_tenant=None,
|
|
|
|
hostname=None, device_name=None, timestamp=None):
|
|
|
|
self.submit_metric(name, value, 'g', dimensions, delegated_tenant,
|
|
|
|
hostname, device_name, timestamp)
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def histogram(self, name, value, dimensions=None, delegated_tenant=None,
|
2014-10-03 14:26:55 -06:00
|
|
|
hostname=None, device_name=None):
|
2015-01-12 16:15:52 -07:00
|
|
|
self.submit_metric(name, value, 'h', dimensions, delegated_tenant,
|
2014-10-03 14:26:55 -06:00
|
|
|
hostname, device_name)
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def increment(self, name, value=1, dimensions=None, delegated_tenant=None,
|
2014-10-03 14:26:55 -06:00
|
|
|
hostname=None, device_name=None):
|
|
|
|
self.submit_metric(name, value, 'c', dimensions, delegated_tenant,
|
|
|
|
hostname, device_name)
|
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def packets_per_second(self, interval):
|
|
|
|
if interval == 0:
|
|
|
|
return 0
|
|
|
|
return round(float(self.count) / interval, 2)
|
|
|
|
|
2014-10-03 14:26:55 -06:00
|
|
|
def rate(self, name, value, dimensions=None, delegated_tenant=None,
|
|
|
|
hostname=None, device_name=None):
|
2015-02-03 09:24:08 -07:00
|
|
|
self.submit_metric(name, value, 'r', dimensions, delegated_tenant,
|
2014-10-03 14:26:55 -06:00
|
|
|
hostname, device_name)
|
|
|
|
|
|
|
|
def set(self, name, value, dimensions=None, delegated_tenant=None,
|
|
|
|
hostname=None, device_name=None):
|
|
|
|
self.submit_metric(name, value, 's', dimensions, delegated_tenant,
|
|
|
|
hostname, device_name)
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
def submit_metric(self, name, value, mtype, dimensions=None,
|
|
|
|
delegated_tenant=None, hostname=None, device_name=None,
|
|
|
|
timestamp=None, sample_rate=1):
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-14 17:22:48 -07:00
|
|
|
context = (name, tuple(dimensions.items()),
|
2015-01-12 16:15:52 -07:00
|
|
|
delegated_tenant, hostname, device_name)
|
2014-02-24 15:02:15 -07:00
|
|
|
|
2015-01-12 16:15:52 -07:00
|
|
|
if context not in self.metrics:
|
|
|
|
metric_class = self.metric_type_to_class[mtype]
|
|
|
|
self.metrics[context] = metric_class(self.formatter,
|
|
|
|
name,
|
2015-01-14 17:22:48 -07:00
|
|
|
dimensions,
|
2015-01-12 16:15:52 -07:00
|
|
|
hostname or self.hostname,
|
|
|
|
device_name,
|
|
|
|
delegated_tenant)
|
|
|
|
cur_time = time()
|
2015-02-03 09:24:08 -07:00
|
|
|
if timestamp is not None:
|
|
|
|
if cur_time - int(timestamp) > self.recent_point_threshold:
|
|
|
|
log.debug("Discarding {0} - ts = {1}, current ts = {2} ".format(name, timestamp, cur_time))
|
|
|
|
self.num_discarded_old_points += 1
|
|
|
|
return
|
2015-01-12 16:15:52 -07:00
|
|
|
else:
|
2015-02-03 09:24:08 -07:00
|
|
|
timestamp = cur_time
|
|
|
|
self.metrics[context].sample(value, sample_rate, timestamp)
|