Remove measurement transformation from forwarder

The forwarder modified the timestamp and possibly the tenant_id of every
measurement sent to it.  This modification does the processing on those
attributes in the collector and sends the forwarder the final set of data.

Value meta can't be part of the context used to determine if we have a new
metric.  Value meta can change while the metric itself remains the same which
means that we construct a new metric object each time we get a new value meta
value.  This doesn't have any effect on Gauge metrics but Counter and Rate
metrics return invalid data as value meta changes.  To fix this value meta is
now applied to each sample instead of being assigned during __init__.

Change-Id: I1d6a52266755b2e20149ef7ec6eb76fe2459b2a8
Closes-Bug: #1596686
This commit is contained in:
Joe Keen 2016-06-23 15:51:48 -06:00
parent e918a21950
commit c0393c46f9
9 changed files with 121 additions and 321 deletions

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
"""Base class for Checks.
If you are writing your own checks you should subclass the AgentCheck class.
@ -188,27 +188,27 @@ class AgentCheck(util.Dimensions):
metrics = self.aggregator.flush()
if prettyprint:
for metric in metrics:
print(" Timestamp: {0}".format(metric.timestamp))
print(" Name: {0}".format(metric.name))
print(" Value: {0}".format(metric.value))
if (metric.delegated_tenant):
print(" Delegate ID: {0}".format(metric.delegated_tenant))
measurement = metric['measurement']
print(" Timestamp: {0}".format(measurement['timestamp']))
print(" Name: {0}".format(measurement['name']))
print(" Value: {0}".format(measurement['value']))
print(" Dimensions: ", end='')
line = 0
for name in metric.dimensions:
dimensions = measurement['dimensions']
for name in dimensions:
if line != 0:
print(" " * 13, end='')
print("{0}={1}".format(name, metric.dimensions[name]))
print("{0}={1}".format(name, dimensions[name]))
line += 1
print(" Value Meta: ", end='')
if metric.value_meta:
value_meta = measurement['value_meta']
if value_meta:
line = 0
for name in metric.value_meta:
for name in value_meta:
if line != 0:
print(" " * 13, end='')
print("{0}={1}".format(name, metric.value_meta[name]))
print("{0}={1}".format(name, value_meta[name]))
line += 1
else:
print('None')

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
# Core modules
import logging
@ -111,11 +111,10 @@ class Collector(util.Dimensions):
dimensions = {'component': 'monasca-agent', 'service': 'monitoring'}
# Add in metrics on the collector run
for name, value in self.collection_metrics.iteritems():
collect_stats.append(metrics.Measurement(name,
time.time(),
value,
self._set_dimensions(dimensions),
None))
metric = metrics.Metric(name,
self._set_dimensions(dimensions),
tenant=None)
collect_stats.append(metric.measurement(value, time.time()))
self.collection_metrics.clear()
self._emit(collect_stats)

View File

@ -1,5 +1,5 @@
#!/bin/env python
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
"""Monitoring Agent plugin for HTTP/API checks.
"""
@ -19,8 +19,6 @@ class HTTPMetrics(http_check.HTTPCheck):
self.metric_method = {
'gauge': self.gauge,
'counter': self.increment,
'histogram': self.histogram,
'set': self.set,
'rate': self.rate}
def _valid_number(self, value, name):
@ -60,8 +58,8 @@ class HTTPMetrics(http_check.HTTPCheck):
value = current
# everything requires a number, except set
if metric_type in ['gauge', 'counter', 'histogram', 'rate']:
# everything requires a number
if metric_type in ['gauge', 'counter', 'rate']:
if not self._valid_number(value, metric_name):
self.log.warning("Invalid value '{0}' for metric '{1}'".format(value, metric_name))
continue

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
""" Aggregation classes used by the collector and statsd to batch messages sent to the forwarder.
"""
import json
@ -62,14 +62,12 @@ class MetricsAggregator(object):
self.metrics = {}
def flush(self):
timestamp = time()
# Flush samples. The individual metrics reset their internal samples
# when required
metrics = []
for context, metric in self.metrics.items():
try:
metrics.extend(metric.flush(timestamp))
metrics.extend(metric.flush())
except Exception:
log.exception('Error flushing {0} metrics.'.format(metric.name))
@ -90,25 +88,6 @@ class MetricsAggregator(object):
return None
return hostname or self.hostname
@staticmethod
def formatter(metric, value, timestamp, dimensions, hostname, delegated_tenant=None,
device_name=None, metric_type=None, value_meta=None):
"""Formats metrics, put them into a Measurement class
(metric, timestamp, value, {"dimensions": {"name1": "value1", "name2": "value2"}, ...})
dimensions should be a dictionary
"""
if 'hostname' not in dimensions and hostname:
dimensions.update({'hostname': hostname})
if device_name:
dimensions.update({'device': device_name})
return metrics_pkg.Measurement(metric,
int(timestamp),
value,
dimensions,
delegated_tenant=delegated_tenant,
value_meta=value_meta)
def packets_per_second(self, interval):
if interval == 0:
return 0
@ -185,22 +164,26 @@ class MetricsAggregator(object):
if value_meta:
if not self._valid_value_meta(value_meta, name, dimensions):
raise InvalidValueMeta
meta = tuple(value_meta.items())
else:
meta = None
hostname_to_post = self.get_hostname_to_post(hostname)
context = (name, tuple(dimensions.items()), meta, delegated_tenant,
if 'hostname' not in dimensions and hostname_to_post:
dimensions.update({'hostname': hostname_to_post})
# TODO(joe): Shouldn't device_name be added to dimensions in the check
# plugin? Why is it special cased through so many layers?
if device_name:
dimensions.update({'device': device_name})
# TODO(joe): Decide if hostname_to_post and device_name are necessary
# for the context tuple
context = (name, tuple(dimensions.items()), delegated_tenant,
hostname_to_post, device_name)
if context not in self.metrics:
self.metrics[context] = metric_class(self.formatter,
name,
self.metrics[context] = metric_class(name,
dimensions,
hostname_to_post,
device_name,
delegated_tenant,
value_meta)
tenant=delegated_tenant)
cur_time = time()
if timestamp is not None:
if cur_time - int(timestamp) > self.recent_point_threshold:
@ -209,4 +192,5 @@ class MetricsAggregator(object):
return
else:
timestamp = cur_time
self.metrics[context].value_meta = value_meta
self.metrics[context].sample(value, sample_rate, timestamp)

View File

@ -1,11 +1,9 @@
# (C) Copyright 2015 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
from hashlib import md5
import json
import urllib2
from monasca_agent.common.metrics import Measurement
def post_headers(payload):
return {
@ -25,11 +23,7 @@ def http_emitter(message, log, url):
# Post back the data
partial_payload = []
for measurement in message:
if not isinstance(measurement, Measurement):
log.error('Data was not in the form of a monasca_agent.common.metrics.Measurement')
continue
# Measurements need their __dict__ encoded to avoid being expressed as a tuple
partial_payload.append(measurement.__dict__)
partial_payload.append(measurement)
payload = json.dumps(partial_payload)
url = "%s/intake" % url

View File

@ -2,7 +2,6 @@
""" Metric data types
"""
import logging
from time import time
from monasca_agent.common.exceptions import UnknownValue
@ -10,254 +9,99 @@ from monasca_agent.common.exceptions import UnknownValue
log = logging.getLogger(__name__)
class Measurement(object):
def __init__(self, name, timestamp, value, dimensions, delegated_tenant=None, value_meta=None):
self.name = name
self.timestamp = timestamp
self.value = value
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
class MetricTypes(object):
GAUGE = 'gauge'
COUNTER = 'counter'
RATE = 'rate'
class Metric(object):
"""A base metric class that accepts points, slices them into time intervals
and performs roll-ups within those intervals.
"""
"""A base metric class """
def sample(self, value, sample_rate, timestamp=None):
"""Add a point to the given metric. """
def __init__(self, name, dimensions, tenant):
self.metric = {'name': name,
'dimensions': dimensions.copy()}
self.value_meta = None
self.value = None
self.timestamp = None
self.tenant = tenant
def measurement(self, value, timestamp):
measurement = self.metric.copy()
if self.value_meta:
measurement['value_meta'] = self.value_meta.copy()
else:
measurement['value_meta'] = None
measurement['value'] = value
measurement['timestamp'] = timestamp * 1000
envelope = {'measurement': measurement,
'tenant_id': self.tenant}
return envelope
def sample(self, value, sample_rate, timestamp):
"""Save a sample. """
raise NotImplementedError()
def flush(self, timestamp):
"""Flush all metrics up to the given timestamp. """
def flush(self):
"""Flush current sample. """
raise NotImplementedError()
class Gauge(Metric):
"""A metric that tracks a value at particular points in time. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.value = None
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
self.timestamp = time()
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def __init__(self, name, dimensions, tenant=None):
super(Gauge, self).__init__(name, dimensions, tenant)
def sample(self, value, sample_rate, timestamp=None):
def sample(self, value, sample_rate, timestamp):
self.value = value
self.timestamp = timestamp
def flush(self, timestamp):
if self.value is not None:
value = self.value
self.value = None
return [self.formatter(metric=self.name,
timestamp=self.timestamp or timestamp,
value=value,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
hostname=self.hostname,
device_name=self.device_name,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta)]
else:
def flush(self):
if not self.value:
return []
envelope = self.measurement(self.value, self.timestamp)
self.value = None
return [envelope]
class Counter(Metric):
"""A metric that tracks a counter value. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
def __init__(self, name, dimensions, tenant=None):
super(Counter, self).__init__(name, dimensions, tenant)
self.value = 0
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
def sample(self, value, sample_rate, timestamp):
try:
self.value += value * int(1 / sample_rate)
self.timestamp = timestamp
except TypeError:
log.error("metric {} value {} sample_rate {}".format(self.name, value, sample_rate))
log.error("metric {} value {} sample_rate {}".
format(self.name, value, sample_rate))
def flush(self, timestamp):
if self.value is not None:
value = self.value
self.value = 0
return [self.formatter(metric=self.name,
value=value,
timestamp=timestamp,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
hostname=self.hostname,
device_name=self.device_name,
metric_type=MetricTypes.RATE,
value_meta=self.value_meta)]
else:
return []
class Histogram(Metric):
"""A metric to track the distribution of a set of values. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.count = 0
self.samples = []
self.percentiles = [0.95]
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
self.count += int(1 / sample_rate)
self.samples.append(value)
def flush(self, timestamp):
metrics = []
if not self.count:
return metrics
self.samples.sort()
length = len(self.samples)
max_ = self.samples[-1]
med = self.samples[int(round(length / 2 - 1))]
avg = sum(self.samples) / float(length)
metric_aggrs = [
('max', max_, MetricTypes.GAUGE),
('median', med, MetricTypes.GAUGE),
('avg', avg, MetricTypes.GAUGE),
('count', self.count, MetricTypes.RATE)
]
metrics.extend(self.formatter(hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric='%s.%s' % (self.name, suffix),
value=value,
timestamp=timestamp,
metric_type=metric_type,
value_meta=self.value_meta
) for suffix, value, metric_type in metric_aggrs)
for p in self.percentiles:
val = self.samples[int(round(p * length - 1))]
name = '%s.%spercentile' % (self.name, int(p * 100))
metrics.append(self.formatter(hostname=self.hostname,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=name,
value=val,
timestamp=timestamp,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta))
# Reset our state.
self.samples = []
self.count = 0
return metrics
class Set(Metric):
"""A metric to track the number of unique elements in a set. """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
self.values = set()
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
def sample(self, value, sample_rate, timestamp=None):
self.values.add(value)
def flush(self, timestamp):
if not self.values:
return []
else:
values = self.values.copy()
self.values = set()
return [self.formatter(hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=self.name,
value=len(values),
timestamp=timestamp,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta)]
def flush(self):
envelope = self.measurement(self.value, self.timestamp)
self.value = 0
return [envelope]
class Rate(Metric):
"""Track the rate of metrics over each flush interval """
def __init__(self, formatter, name, dimensions,
hostname, device_name, delegated_tenant=None,
value_meta=None):
self.formatter = formatter
self.name = name
self.dimensions = dimensions.copy()
self.delegated_tenant = delegated_tenant
self.hostname = hostname
self.device_name = device_name
def __init__(self, name, dimensions, tenant=None):
super(Rate, self).__init__(name, dimensions, tenant)
self.samples = []
if value_meta:
self.value_meta = value_meta.copy()
else:
self.value_meta = None
self.value = None
def sample(self, value, sample_rate, timestamp=None):
if not timestamp:
timestamp = time()
def sample(self, value, sample_rate, timestamp):
self.samples.append((int(timestamp), value))
self.timestamp = timestamp
if len(self.samples) < 2:
self.value = None
else:
self.value = self._rate(self.samples[-2], self.samples[-1])
self.samples = self.samples[-1:]
if len(self.samples) < 2:
self.value = None
@ -282,16 +126,10 @@ class Rate(Metric):
'0, returned {1}'.format(self.name, e))
return rate
def flush(self, timestamp):
def flush(self):
if not self.value:
return []
else:
return [self.formatter(hostname=self.hostname,
device_name=self.device_name,
dimensions=self.dimensions,
delegated_tenant=self.delegated_tenant,
metric=self.name,
value=self.value,
timestamp=timestamp,
metric_type=MetricTypes.GAUGE,
value_meta=self.value_meta)]
envelope = self.measurement(self.value, self.timestamp)
self.value = None
return [envelope]

View File

@ -1,4 +1,4 @@
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015-2016 Hewlett Packard Enterprise Development LP
import collections
import copy
@ -42,7 +42,7 @@ class MonascaAPI(object):
except KeyError:
self.amplifier = None
def _post(self, measurements, delegated_tenant=None):
def _post(self, measurements, tenant=None):
"""Does the actual http post
measurements is a list of Measurement
"""
@ -50,8 +50,9 @@ class MonascaAPI(object):
'jsonbody': measurements
}
if delegated_tenant is not None:
kwargs['tenant_id'] = delegated_tenant
if tenant:
kwargs['tenant_id'] = tenant
if not self.mon_client:
self.mon_client = self.get_monclient()
if not self.mon_client:
@ -84,32 +85,18 @@ class MonascaAPI(object):
the monitoring api
"""
# Add default dimensions
for measurement in measurements:
if isinstance(measurement.dimensions, list):
measurement.dimensions = dict([(d[0], d[1]) for d in measurement.dimensions])
# "Amplify" these measurements to produce extra load, if so configured
if self.amplifier is not None and self.amplifier > 0:
extra_measurements = []
for measurement in measurements:
for multiple in range(1, self.amplifier + 1):
# Create a copy of the measurement, but with the addition
# of an 'amplifier' dimension
measurement_copy = copy.deepcopy(measurement)
measurement_copy.dimensions.update({'amplifier': multiple})
extra_measurements.append(measurement_copy)
measurements.extend(extra_measurements)
for envelope in measurements:
measurement = envelope['measurement']
if isinstance(measurement['dimensions'], list):
measurement['dimensions'] = dict([(d[0], d[1]) for d in measurement['dimensions']])
# Split out separate POSTs for each delegated tenant (includes 'None')
tenant_group = {}
for measurement_element in measurements:
measurement = copy.deepcopy(measurement_element)
m_dict = measurement.__dict__
m_dict['timestamp'] *= 1000
delegated_tenant = m_dict.pop('delegated_tenant')
if delegated_tenant not in tenant_group:
tenant_group[delegated_tenant] = []
tenant_group[delegated_tenant].extend([m_dict.copy()])
for envelope in measurements:
measurement = envelope['measurement']
tenant = envelope['tenant_id']
tenant_group.setdefault(tenant, []).append(copy.deepcopy(measurement))
for tenant in tenant_group:
self._post(tenant_group[tenant], tenant)

View File

@ -1,5 +1,5 @@
#!/usr/bin/env python
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development Company LP
# (C) Copyright 2015,2016 Hewlett Packard Enterprise Development LP
"""
Licensed under Simplified BSD License (see LICENSE)
(C) Boxed Ice 2010 all rights reserved
@ -57,7 +57,7 @@ class AgentInputHandler(tornado.web.RequestHandler):
msg = tornado.escape.json_decode(self.request.body)
try:
message_batch.extend([metrics.Measurement(**m) for m in msg])
message_batch.extend(msg)
except Exception:
log.exception('Error parsing body of Agent Input')
raise tornado.web.HTTPError(500)

View File

@ -33,7 +33,7 @@ class TestSimpleProcess(unittest.TestCase):
metrics = self.check.get_metrics()
self.assertEqual(1, len(metrics))
self.assertEqual('process.pid_count', metrics[0].name)
self.assertEqual('process.pid_count', metrics[0]['measurement']['name'])
class TestDetailedProcess(unittest.TestCase):
@ -83,7 +83,7 @@ class TestDetailedProcess(unittest.TestCase):
self.check.run()
metrics = self.check.get_metrics()
measurement_names = [metric.name for metric in metrics]
measurement_names = [metric['measurement']['name'] for metric in metrics]
measurement_names.sort()
return measurement_names