Partial support for DogStatsd extensions
* Support tags * Support timer (provisional by mapping metric type 'ms' to 'gauge') * Support histograms (same) * Support service-checks (by mapping status to gauge) * Fix calculation of counters for sporadic metrics and sample-rates < 1 Story: 2000951 Task: 4079 Task: 3944 Change-Id: Ib4996b88a2845bceda9c803a386b76b2e0ae125a
This commit is contained in:
parent
50ab293b80
commit
170c3e45e8
@ -121,9 +121,9 @@ Example commands to add the Agent user/project to the monitoring-delegate role:
|
||||
Once the Agent's user and project are assigned to the `monitoring-delegate` group, the Agent can submit metrics for other tenants.
|
||||
|
||||
# Statsd
|
||||
The Monasca Agent ships with a Statsd daemon implementation called monasca-statsd. A statsd client can be used to send metrics to the Forwarder via the Statsd daemon.
|
||||
The Monasca Agent ships with a StatsD daemon implementation. A StatsD client can be used to send metrics to the Forwarder via the StatsD daemon.
|
||||
|
||||
monasca-statsd will accept metrics submitted by functions in either the standard statsd Python client library, or the monasca-agent's [monasca-statsd Python client library](https://github.com/openstack/monasca-statsd). The advantage of using the python-monasca-statsd library is that it is possible to specify dimensions on submitted metrics. Dimensions are not handled by the standard statsd client.
|
||||
monasca-statsd will accept counters, gauges and timing values following the standard StatsD protocol. Dimensions are supported compatible to the [DogStatsD extension](http://docs.datadoghq.com/guides/dogstatsd/#metrics-1) for tags. Support for the [monasca-statsd Python client library](https://github.com/openstack/monasca-statsd) is deprecated and might be removed in the future.
|
||||
|
||||
Statsd metrics are not bundled along with the metrics gathered by the Collector, but are flushed to the agent Forwarder on a separate schedule (every 10 seconds by default, rather than 60 seconds for Collector metrics).
|
||||
|
||||
@ -137,6 +137,23 @@ statsd.timing('pipeline', 2468.34) # Pipeline took 2468.34 ms to execute
|
||||
statsd.gauge('gaugething', 3.14159265) # 'gauge' would be the preferred metric type for Monitoring
|
||||
```
|
||||
|
||||
## StatsD Protocol Compatiblity
|
||||
|
||||
The moansca-statsd daemon supports the following parts of the StatsD protocol and its extensions:
|
||||
|
||||
StatsD 1.0
|
||||
* counters
|
||||
* gauges
|
||||
* timings (no histograms)
|
||||
|
||||
DogStatsD
|
||||
* dimensions/tags (`key:value`, tags without value will be mapped to `<tag>:True`)
|
||||
|
||||
Monasca
|
||||
* rates
|
||||
|
||||
## Examples
|
||||
|
||||
The [monasca-statsd](https://github.com/openstack/monasca-statsd library provides a python based implementation
|
||||
of a statsd client but also adds the ability to add dimensions to the statsd metrics for the client.
|
||||
|
||||
|
@ -39,8 +39,13 @@ class Metric(object):
|
||||
raise NotImplementedError()
|
||||
|
||||
def flush(self):
|
||||
"""Flush current sample. """
|
||||
raise NotImplementedError()
|
||||
if self.timestamp is None:
|
||||
return []
|
||||
|
||||
envelope = self.measurement(self.value, self.timestamp)
|
||||
self.timestamp = None
|
||||
self.value = None
|
||||
return [envelope]
|
||||
|
||||
|
||||
class Gauge(Metric):
|
||||
@ -53,35 +58,32 @@ class Gauge(Metric):
|
||||
self.value = value
|
||||
self.timestamp = timestamp
|
||||
|
||||
def flush(self):
|
||||
# 0 is a valid value, so can't do: if not self.value:
|
||||
if self.value is None:
|
||||
return []
|
||||
|
||||
envelope = self.measurement(self.value, self.timestamp)
|
||||
self.value = None
|
||||
return [envelope]
|
||||
|
||||
|
||||
class Counter(Metric):
|
||||
"""A metric that tracks a counter value. """
|
||||
|
||||
def __init__(self, name, dimensions, tenant=None):
|
||||
super(Counter, self).__init__(name, dimensions, tenant)
|
||||
self.value = 0
|
||||
|
||||
def sample(self, value, sample_rate, timestamp):
|
||||
try:
|
||||
self.value += value * int(1 / sample_rate)
|
||||
inc = float(value) / sample_rate
|
||||
if self.timestamp is None:
|
||||
self.value = inc
|
||||
else:
|
||||
self.value += inc
|
||||
self.timestamp = timestamp
|
||||
except TypeError:
|
||||
log.error("metric {} value {} sample_rate {}".
|
||||
format(self.metric['name'], value, sample_rate))
|
||||
except (TypeError, ValueError):
|
||||
log.exception("illegal metric {} value {} sample_rate {}".
|
||||
format(self.metric['name'], value, sample_rate))
|
||||
|
||||
# redefine flush method to make counter an integer when sample rates <> 1.0 used
|
||||
def flush(self):
|
||||
envelope = self.measurement(self.value, self.timestamp)
|
||||
self.value = 0
|
||||
return [envelope]
|
||||
if self.timestamp:
|
||||
self.value = int(self.value)
|
||||
return super(Counter, self).flush()
|
||||
else:
|
||||
return []
|
||||
|
||||
|
||||
class Rate(Metric):
|
||||
@ -89,40 +91,43 @@ class Rate(Metric):
|
||||
|
||||
def __init__(self, name, dimensions, tenant=None):
|
||||
super(Rate, self).__init__(name, dimensions, tenant)
|
||||
self.samples = []
|
||||
self.start_value = None
|
||||
self.start_timestamp = None
|
||||
|
||||
def sample(self, value, sample_rate, timestamp):
|
||||
self.samples.append((int(timestamp), value))
|
||||
self.timestamp = timestamp
|
||||
|
||||
if len(self.samples) < 2:
|
||||
self.value = None
|
||||
# set first value if missing
|
||||
if self.start_timestamp is None:
|
||||
self.start_timestamp = timestamp
|
||||
self.start_value = value
|
||||
# set second value otherwise
|
||||
else:
|
||||
self.value = self._rate(self.samples[-2], self.samples[-1])
|
||||
self.samples = self.samples[-1:]
|
||||
|
||||
def _rate(self, sample1, sample2):
|
||||
delta_t = sample2[0] - sample1[0]
|
||||
delta_v = sample2[1] - sample1[1]
|
||||
rate = None
|
||||
if delta_v < 0:
|
||||
log.debug('Metric {0} has a rate < 0. New value = {1} and old '
|
||||
'value = {2}. Counter may have been Reset.'.
|
||||
format(self.metric['name'], sample2[1], sample1[1]))
|
||||
return rate
|
||||
try:
|
||||
rate = delta_v / float(delta_t)
|
||||
except ZeroDivisionError as e:
|
||||
log.exception('Error in sampling metric {0}, time difference '
|
||||
'between current time and last_update time is '
|
||||
'0, returned {1}'.
|
||||
format(self.metric['name'], e))
|
||||
return rate
|
||||
self.timestamp = timestamp
|
||||
self.value = value
|
||||
|
||||
# redefine flush method to calculate rate from metrics
|
||||
def flush(self):
|
||||
if self.value is None:
|
||||
# need at least two timestamps to determine rate
|
||||
# is the second one is missing then the first is kept as start value for the subsequent interval
|
||||
if self.start_timestamp is None or self.timestamp is None:
|
||||
return []
|
||||
|
||||
envelope = self.measurement(self.value, self.timestamp)
|
||||
delta_t = self.timestamp - self.start_timestamp
|
||||
delta_v = self.value - self.start_value
|
||||
try:
|
||||
rate = delta_v / float(delta_t)
|
||||
except ZeroDivisionError:
|
||||
log.warning('Conflicting values reported for metric %s with dimensions %s at time %d: (%f, %f)', self.metric['name'],
|
||||
self.metric['dimensions'], self.timestamp, self.start_value, self.value)
|
||||
|
||||
# skip this measurement, but keep value for next cycle
|
||||
self.start_value = self.value
|
||||
return []
|
||||
|
||||
# make it start value for next interval (even if it is None!)
|
||||
self.start_value = self.value
|
||||
self.start_timestamp = self.timestamp
|
||||
|
||||
envelope = self.measurement(rate, self.timestamp)
|
||||
self.timestamp = None
|
||||
self.value = None
|
||||
return [envelope]
|
||||
|
@ -16,6 +16,8 @@ metric_class = {
|
||||
'g': metrics_pkg.Gauge,
|
||||
'c': metrics_pkg.Counter,
|
||||
'r': metrics_pkg.Rate,
|
||||
'ms': metrics_pkg.Gauge,
|
||||
'h': metrics_pkg.Gauge
|
||||
}
|
||||
|
||||
|
||||
@ -48,6 +50,18 @@ class Server(object):
|
||||
except Exception:
|
||||
log.exception("Error while setting up connection to external statsd server")
|
||||
|
||||
@staticmethod
|
||||
def _parse_service_check_packet(packet):
|
||||
parts = packet.split('|')
|
||||
name = parts[1]
|
||||
status = int(parts[2])
|
||||
dimensions = {}
|
||||
for metadata in parts[3:]:
|
||||
if metadata.startswith('#'):
|
||||
dimensions = Server._parse_dogstatsd_tags(metadata)
|
||||
|
||||
return name, status, dimensions
|
||||
|
||||
@staticmethod
|
||||
def _parse_metric_packet(packet):
|
||||
name_and_metadata = packet.split(':', 1)
|
||||
@ -87,12 +101,49 @@ class Server(object):
|
||||
if m[0] == '@':
|
||||
sample_rate = float(m[1:])
|
||||
assert 0 <= sample_rate <= 1
|
||||
# Parse dimensions
|
||||
elif m[0] == '#':
|
||||
dimensions = ast.literal_eval(m[1:])
|
||||
# Parse dimensions, supporting both Monasca and DogStatsd extensions
|
||||
elif m[0] == '#' and len(m) > 2:
|
||||
if m[1] == '{':
|
||||
dimensions = Server._parse_monasca_statsd_dims(m[1:])
|
||||
else:
|
||||
dimensions = Server._parse_dogstatsd_tags(m[1:])
|
||||
|
||||
return name, value, metric_type, dimensions, sample_rate
|
||||
|
||||
@staticmethod
|
||||
def _parse_monasca_statsd_dims(dimensions):
|
||||
dimensions = ast.literal_eval(dimensions)
|
||||
return dimensions
|
||||
|
||||
@staticmethod
|
||||
def _parse_dogstatsd_tags(statsd_msg):
|
||||
dimensions = {}
|
||||
s = ''
|
||||
key = ''
|
||||
for c in statsd_msg[1:]:
|
||||
if c == ':':
|
||||
key = s.strip()
|
||||
s = ''
|
||||
elif c == ',':
|
||||
s = s.strip()
|
||||
if len(key) > 0:
|
||||
if len(s) > 0:
|
||||
dimensions[key] = s
|
||||
else:
|
||||
dimensions[key] = '?'
|
||||
elif len(s) > 0:
|
||||
# handle tags w/o value
|
||||
dimensions[s] = "True"
|
||||
key = ''
|
||||
s = ''
|
||||
else:
|
||||
s += c
|
||||
s = s.strip()
|
||||
if len(s) > 0 and len(key) > 0:
|
||||
dimensions[key] = s
|
||||
|
||||
return dimensions
|
||||
|
||||
def submit_packets(self, packets):
|
||||
for packet in packets.split("\n"):
|
||||
|
||||
@ -101,11 +152,14 @@ class Server(object):
|
||||
|
||||
if packet.startswith('_e'):
|
||||
# Monasca api doesnt support events
|
||||
log.warn("events not supported.")
|
||||
continue
|
||||
|
||||
# todo it seems like this count should be done in the submit_metric method
|
||||
self.aggregator.count += 1
|
||||
name, value, mtype, dimensions, sample_rate = self._parse_metric_packet(packet)
|
||||
elif packet.startswith('_sc'):
|
||||
sample_rate = 1.0
|
||||
mtype = 'g'
|
||||
name, value, dimensions = self._parse_service_check_packet(packet)
|
||||
else:
|
||||
name, value, mtype, dimensions, sample_rate = self._parse_metric_packet(packet)
|
||||
|
||||
if mtype not in metric_class:
|
||||
log.warn("metric type {} not supported.".format(mtype))
|
||||
|
@ -57,6 +57,7 @@ class TestMetrics(unittest.TestCase):
|
||||
|
||||
counter = metrics.Counter(metric_name, dimensions, tenant_name)
|
||||
|
||||
# single counter value
|
||||
counter.sample(5, SAMPLE_RATE, 1)
|
||||
|
||||
envelope = counter.flush()[0]
|
||||
@ -67,36 +68,34 @@ class TestMetrics(unittest.TestCase):
|
||||
self.assertEqual(measurement['value'], 5)
|
||||
self.assertEqual(measurement['timestamp'], 1000)
|
||||
|
||||
# multiple counter value with different timestamps: add
|
||||
counter.sample(5, SAMPLE_RATE, 1)
|
||||
counter.sample(6, SAMPLE_RATE, 2)
|
||||
|
||||
envelope = counter.flush()[0]
|
||||
measurement = envelope['measurement']
|
||||
self.assertEqual(envelope['tenant_id'], tenant_name)
|
||||
self.assertEqual(measurement['name'], metric_name)
|
||||
self.assertEqual(measurement['dimensions'], dimensions)
|
||||
self.assertEqual(measurement['value'], 5)
|
||||
self.assertEqual(measurement['timestamp'], 1000)
|
||||
self.assertEqual(measurement['value'], 11)
|
||||
self.assertEqual(measurement['timestamp'], 2000)
|
||||
|
||||
counter.sample(5, SAMPLE_RATE, 1)
|
||||
counter.sample(5, SAMPLE_RATE, 1)
|
||||
# multiple counter values with same timestamp: add
|
||||
counter.sample(5, SAMPLE_RATE, 3)
|
||||
counter.sample(5, SAMPLE_RATE, 3)
|
||||
|
||||
envelope = counter.flush()[0]
|
||||
measurement = envelope['measurement']
|
||||
self.assertEqual(envelope['tenant_id'], tenant_name)
|
||||
self.assertEqual(measurement['name'], metric_name)
|
||||
self.assertEqual(measurement['dimensions'], dimensions)
|
||||
self.assertEqual(measurement['value'], 10)
|
||||
self.assertEqual(measurement['timestamp'], 1000)
|
||||
self.assertEqual(measurement['value'], 5+5)
|
||||
self.assertEqual(measurement['timestamp'], 3000)
|
||||
|
||||
# Errors in counter report 0 value with previous timestamp
|
||||
# Invalid metric values: ignore
|
||||
counter.sample("WEGONI", SAMPLE_RATE, 2)
|
||||
envelope = counter.flush()[0]
|
||||
measurement = envelope['measurement']
|
||||
self.assertEqual(envelope['tenant_id'], tenant_name)
|
||||
self.assertEqual(measurement['name'], metric_name)
|
||||
self.assertEqual(measurement['dimensions'], dimensions)
|
||||
self.assertEqual(measurement['value'], 0)
|
||||
self.assertEqual(measurement['timestamp'], 1000)
|
||||
results = counter.flush()
|
||||
self.assertEqual(results, [])
|
||||
|
||||
def test_Rate(self):
|
||||
tenant_name = "test_rate"
|
||||
@ -105,9 +104,11 @@ class TestMetrics(unittest.TestCase):
|
||||
|
||||
rate = metrics.Rate(metric_name, dimensions, tenant_name)
|
||||
|
||||
# single sample without predecessor: no rate can be calculated
|
||||
rate.sample(5, SAMPLE_RATE, 1)
|
||||
self.assertEqual(rate.flush(), [])
|
||||
|
||||
# zero difference between samples: rate 0
|
||||
rate.sample(5, SAMPLE_RATE, 2)
|
||||
|
||||
envelope = rate.flush()[0]
|
||||
@ -118,6 +119,7 @@ class TestMetrics(unittest.TestCase):
|
||||
self.assertEqual(measurement['value'], 0.0)
|
||||
self.assertEqual(measurement['timestamp'], 2000)
|
||||
|
||||
# samples (5,10) in 1 sec interval: rate 5/sec.
|
||||
rate.sample(10, SAMPLE_RATE, 3)
|
||||
|
||||
envelope = rate.flush()[0]
|
||||
@ -128,9 +130,11 @@ class TestMetrics(unittest.TestCase):
|
||||
self.assertEqual(measurement['value'], 5)
|
||||
self.assertEqual(measurement['timestamp'], 3000)
|
||||
|
||||
# conflicting values for same timestamp: no result, but keep last sample for next rate calc.
|
||||
rate.sample(12, SAMPLE_RATE, 3)
|
||||
self.assertEqual(rate.flush(), [])
|
||||
|
||||
# zero difference between samples, incomplete previous interval T: rate 0/sec.
|
||||
rate.sample(12, SAMPLE_RATE, 4)
|
||||
|
||||
envelope = rate.flush()[0]
|
||||
@ -141,15 +145,37 @@ class TestMetrics(unittest.TestCase):
|
||||
self.assertEqual(measurement['value'], 0.0)
|
||||
self.assertEqual(measurement['timestamp'], 4000)
|
||||
|
||||
rate.sample(14, SAMPLE_RATE, 5)
|
||||
# several samples (13, 14) in interval, take last values of T1 and T0 for rate calc: rate = (14-12)/(6-4)
|
||||
rate.sample(13, SAMPLE_RATE, 5)
|
||||
rate.sample(14, SAMPLE_RATE, 6)
|
||||
|
||||
envelope = rate.flush()[0]
|
||||
measurement = envelope['measurement']
|
||||
self.assertEqual(envelope['tenant_id'], tenant_name)
|
||||
self.assertEqual(measurement['name'], metric_name)
|
||||
self.assertEqual(measurement['dimensions'], dimensions)
|
||||
self.assertEqual(measurement['value'], 2)
|
||||
self.assertEqual(measurement['timestamp'], 5000)
|
||||
self.assertEqual(measurement['value'], 1)
|
||||
self.assertEqual(measurement['timestamp'], 6000)
|
||||
|
||||
rate.sample(1, SAMPLE_RATE, 6)
|
||||
self.assertEqual(rate.flush(), [])
|
||||
# negative rate: often result of a restart, but that should not be hidden
|
||||
rate.sample(1, SAMPLE_RATE, 7)
|
||||
|
||||
envelope = rate.flush()[0]
|
||||
measurement = envelope['measurement']
|
||||
self.assertEqual(envelope['tenant_id'], tenant_name)
|
||||
self.assertEqual(measurement['name'], metric_name)
|
||||
self.assertEqual(measurement['dimensions'], dimensions)
|
||||
self.assertEqual(measurement['value'], -13)
|
||||
self.assertEqual(measurement['timestamp'], 7000)
|
||||
|
||||
# recover from negative rate
|
||||
rate.sample(2, SAMPLE_RATE, 8)
|
||||
|
||||
envelope = rate.flush()[0]
|
||||
measurement = envelope['measurement']
|
||||
self.assertEqual(envelope['tenant_id'], tenant_name)
|
||||
self.assertEqual(measurement['name'], metric_name)
|
||||
self.assertEqual(measurement['dimensions'], dimensions)
|
||||
self.assertEqual(measurement['value'], 1)
|
||||
self.assertEqual(measurement['timestamp'], 8000)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user