High precision rate of change timedelta

The current way to calculate rate of change is not precise at all and
depends on the local host clock. So, we have good chance that the host
clock derive a bit between each polling. Also the timestamp is polling
cycle run and not the exact polled sample.

This makes the rate of change transformer not accurate, and maybe wrong
if the local clock have jumped to much or if a pollster make to much
time to get the stats (libvirt reconnection, ...).

A sample gets a new attribute monotonic_time, where we can store an
accurate polling time using monotonic.monotonic().

In rate of change transformer, if the monotonic time is available we use
to calculate the time delta between samples.

For instance metrics, we set monotonic_time as soon as we poll it from
libvirt, avoiding almost all precision issue.

That makes the rate of change precise to the nanoseconds for polled
samples, while keeping the timestamp identical for all samples polled
during one cycle.

Related-bug: #1527620
Change-Id: I40e14fb6aa595a86df9767be5758f52b7ceafc8f
This commit is contained in:
Mehdi Abaakouk 2017-07-18 08:10:38 +02:00
parent 5a04926012
commit 75e10b2a48
12 changed files with 115 additions and 13 deletions

View File

@ -14,6 +14,7 @@
import collections import collections
import monotonic
from oslo_log import log from oslo_log import log
from oslo_utils import timeutils from oslo_utils import timeutils
@ -93,13 +94,16 @@ class GenericComputePollster(plugin_base.PollsterBase):
if instance.id not in cache[self.inspector_method]: if instance.id not in cache[self.inspector_method]:
result = getattr(self.inspector, self.inspector_method)( result = getattr(self.inspector, self.inspector_method)(
instance, duration) instance, duration)
polled_time = monotonic.monotonic()
# Ensure we don't cache an iterator # Ensure we don't cache an iterator
if isinstance(result, collections.Iterable): if isinstance(result, collections.Iterable):
result = list(result) result = list(result)
cache[self.inspector_method][instance.id] = result else:
result = [result]
cache[self.inspector_method][instance.id] = (polled_time, result)
return cache[self.inspector_method][instance.id] return cache[self.inspector_method][instance.id]
def _stats_to_sample(self, instance, stats): def _stats_to_sample(self, instance, stats, polled_time):
volume = getattr(stats, self.sample_stats_key) volume = getattr(stats, self.sample_stats_key)
LOG.debug("%(instance_id)s/%(name)s volume: " LOG.debug("%(instance_id)s/%(name)s volume: "
"%(volume)s" % { "%(volume)s" % {
@ -121,21 +125,19 @@ class GenericComputePollster(plugin_base.PollsterBase):
volume=volume, volume=volume,
additional_metadata=self.get_additional_metadata( additional_metadata=self.get_additional_metadata(
instance, stats), instance, stats),
monotonic_time=polled_time,
) )
def get_samples(self, manager, cache, resources): def get_samples(self, manager, cache, resources):
self._inspection_duration = self._record_poll_time() self._inspection_duration = self._record_poll_time()
for instance in resources: for instance in resources:
try: try:
result = self._inspect_cached(cache, instance, polled_time, result = self._inspect_cached(
self._inspection_duration) cache, instance, self._inspection_duration)
if not result: if not result:
continue continue
if not isinstance(result, collections.Iterable):
result = [result]
for stats in self.aggregate_method(result): for stats in self.aggregate_method(result):
yield self._stats_to_sample(instance, stats) yield self._stats_to_sample(instance, stats, polled_time)
except NoVolumeException: except NoVolumeException:
# FIXME(sileht): This should be a removed... but I will # FIXME(sileht): This should be a removed... but I will
# not change the test logic for now # not change the test logic for now

View File

@ -76,7 +76,8 @@ def _get_metadata_from_object(conf, instance):
def make_sample_from_instance(conf, instance, name, type, unit, volume, def make_sample_from_instance(conf, instance, name, type, unit, volume,
resource_id=None, additional_metadata=None): resource_id=None, additional_metadata=None,
monotonic_time=None):
additional_metadata = additional_metadata or {} additional_metadata = additional_metadata or {}
resource_metadata = _get_metadata_from_object(conf, instance) resource_metadata = _get_metadata_from_object(conf, instance)
resource_metadata.update(additional_metadata) resource_metadata.update(additional_metadata)
@ -89,6 +90,7 @@ def make_sample_from_instance(conf, instance, name, type, unit, volume,
project_id=instance.tenant_id, project_id=instance.tenant_id,
resource_id=resource_id or instance.id, resource_id=resource_id or instance.id,
resource_metadata=resource_metadata, resource_metadata=resource_metadata,
monotonic_time=monotonic_time,
) )

View File

@ -97,7 +97,10 @@ class SamplePipelineEndpoint(PipelineEndpoint):
resource_id=s['resource_id'], resource_id=s['resource_id'],
timestamp=s['timestamp'], timestamp=s['timestamp'],
resource_metadata=s['resource_metadata'], resource_metadata=s['resource_metadata'],
source=s.get('source')) source=s.get('source'),
# NOTE(sileht): May come from an older node,
# Put None in this case.
monotonic_time=s.get('monotonic_time'))
for s in samples if publisher_utils.verify_signature( for s in samples if publisher_utils.verify_signature(
s, self.conf.publisher.telemetry_secret) s, self.conf.publisher.telemetry_secret)
] ]

View File

@ -98,6 +98,7 @@ def meter_message_from_counter(sample, secret):
'timestamp': sample.timestamp, 'timestamp': sample.timestamp,
'resource_metadata': sample.resource_metadata, 'resource_metadata': sample.resource_metadata,
'message_id': sample.id, 'message_id': sample.id,
'monotonic_time': sample.monotonic_time,
} }
msg['message_signature'] = compute_signature(msg, secret) msg['message_signature'] = compute_signature(msg, secret)
return msg return msg

View File

@ -95,7 +95,7 @@ class Sample(object):
def __init__(self, name, type, unit, volume, user_id, project_id, def __init__(self, name, type, unit, volume, user_id, project_id,
resource_id, timestamp=None, resource_metadata=None, resource_id, timestamp=None, resource_metadata=None,
source=None, id=None): source=None, id=None, monotonic_time=None):
self.name = name self.name = name
self.type = type self.type = type
self.unit = unit self.unit = unit
@ -107,6 +107,7 @@ class Sample(object):
self.resource_metadata = resource_metadata or {} self.resource_metadata = resource_metadata or {}
self.source = source or self.SOURCE_DEFAULT self.source = source or self.SOURCE_DEFAULT
self.id = id or str(uuid.uuid1()) self.id = id or str(uuid.uuid1())
self.monotonic_time = monotonic_time
def as_dict(self): def as_dict(self):
return copy.copy(self.__dict__) return copy.copy(self.__dict__)

View File

@ -143,6 +143,10 @@ class Connection(hbase_base.Connection, base.Connection):
:param data: a dictionary such as returned by :param data: a dictionary such as returned by
ceilometer.publisher.utils.meter_message_from_counter ceilometer.publisher.utils.meter_message_from_counter
""" """
# We must not record thing.
data.pop("monotonic_time", None)
with self.conn_pool.connection() as conn: with self.conn_pool.connection() as conn:
resource_table = conn.table(self.RESOURCE_TABLE) resource_table = conn.table(self.RESOURCE_TABLE)
meter_table = conn.table(self.METER_TABLE) meter_table = conn.table(self.METER_TABLE)

View File

@ -259,6 +259,13 @@ class Connection(pymongo_base.Connection):
# unconditionally insert sample timestamps and resource metadata # unconditionally insert sample timestamps and resource metadata
# (in the update case, this must be conditional on the sample not # (in the update case, this must be conditional on the sample not
# being out-of-order) # being out-of-order)
# We must not store this
samples = copy.deepcopy(samples)
for sample in samples:
sample.pop("monotonic_time", None)
sorted_samples = sorted( sorted_samples = sorted(
copy.deepcopy(samples), copy.deepcopy(samples),
key=lambda s: (s['resource_id'], s['timestamp'])) key=lambda s: (s['resource_id'], s['timestamp']))

View File

@ -80,6 +80,7 @@ class TestPostSamples(v2.FunctionalTest):
s1[0]['source'] = '%s:openstack' % s1[0]['project_id'] s1[0]['source'] = '%s:openstack' % s1[0]['project_id']
self.assertEqual(s1, data.json) self.assertEqual(s1, data.json)
s1[0]["monotonic_time"] = None
self.assertEqual(s1[0], self.published[0][0]) self.assertEqual(s1[0], self.published[0][0])
def test_nested_metadata(self): def test_nested_metadata(self):
@ -107,6 +108,7 @@ class TestPostSamples(v2.FunctionalTest):
unwound['resource_metadata'] = {'nest': {'name1': 'value1', unwound['resource_metadata'] = {'nest': {'name1': 'value1',
'name2': 'value3'}, 'name2': 'value3'},
'name2': 'value2'} 'name2': 'value2'}
unwound["monotonic_time"] = None
# only the published sample should be unwound, not the representation # only the published sample should be unwound, not the representation
# in the API response # in the API response
self.assertEqual(s1[0], data.json[0]) self.assertEqual(s1[0], data.json[0])
@ -218,6 +220,7 @@ class TestPostSamples(v2.FunctionalTest):
msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat() msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat()
self.assertEqual(s, c) self.assertEqual(s, c)
s["monotonic_time"] = None
self.assertEqual(s, self.published[0][x]) self.assertEqual(s, self.published[0][x])
def test_missing_mandatory_fields(self): def test_missing_mandatory_fields(self):
@ -278,6 +281,7 @@ class TestPostSamples(v2.FunctionalTest):
s['timestamp'] = data.json[x]['timestamp'] s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict()) s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x]) self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x]) self.assertEqual(s, self.published[0][x])
def test_multiple_samples_multiple_sources(self): def test_multiple_samples_multiple_sources(self):
@ -328,6 +332,7 @@ class TestPostSamples(v2.FunctionalTest):
s['timestamp'] = data.json[x]['timestamp'] s['timestamp'] = data.json[x]['timestamp']
s.setdefault('resource_metadata', dict()) s.setdefault('resource_metadata', dict())
self.assertEqual(s, data.json[x]) self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x]) self.assertEqual(s, self.published[0][x])
def test_missing_project_user_id(self): def test_missing_project_user_id(self):
@ -364,4 +369,6 @@ class TestPostSamples(v2.FunctionalTest):
s['project_id'] = project_id s['project_id'] = project_id
self.assertEqual(s, data.json[x]) self.assertEqual(s, data.json[x])
s['monotonic_time'] = None
self.assertEqual(s, self.published[0][x]) self.assertEqual(s, self.published[0][x])

View File

@ -430,6 +430,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict() d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at'] del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs[:3]) self.assertIn(d, self.msgs[:3])
def test_get_samples_by_user_limit(self): def test_get_samples_by_user_limit(self):
@ -450,6 +451,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict() d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at'] del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs[:4]) self.assertIn(d, self.msgs[:4])
def test_get_samples_by_resource(self): def test_get_samples_by_resource(self):
@ -459,6 +461,7 @@ class RawSampleTest(DBTestBase):
d = results[1].as_dict() d = results[1].as_dict()
self.assertEqual(timeutils.utcnow(), d['recorded_at']) self.assertEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at'] del d['recorded_at']
d['monotonic_time'] = None
self.assertEqual(self.msgs[0], d) self.assertEqual(self.msgs[0], d)
def test_get_samples_by_metaquery(self): def test_get_samples_by_metaquery(self):
@ -470,6 +473,7 @@ class RawSampleTest(DBTestBase):
d = meter.as_dict() d = meter.as_dict()
self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at'])
del d['recorded_at'] del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs) self.assertIn(d, self.msgs)
def test_get_samples_by_metaquery_key_with_dot_in_metadata(self): def test_get_samples_by_metaquery_key_with_dot_in_metadata(self):
@ -721,6 +725,7 @@ class ComplexSampleQueryTest(DBTestBase):
for sample_item in results: for sample_item in results:
d = sample_item.as_dict() d = sample_item.as_dict()
del d['recorded_at'] del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.msgs) self.assertIn(d, self.msgs)
def test_query_complex_filter_with_regexp(self): def test_query_complex_filter_with_regexp(self):
@ -2787,6 +2792,7 @@ class TestBatchRecordingMetering(tests_db.TestBase):
for sample_item in results: for sample_item in results:
d = sample_item.as_dict() d = sample_item.as_dict()
del d['recorded_at'] del d['recorded_at']
d['monotonic_time'] = None
self.assertIn(d, self.sample_dicts) self.assertIn(d, self.sample_dicts)
resources = list(self.conn.get_resources()) resources = list(self.conn.get_resources())

View File

@ -22,6 +22,7 @@ import unittest
import fixtures import fixtures
import mock import mock
import monotonic
from oslo_utils import timeutils from oslo_utils import timeutils
import six import six
from stevedore import extension from stevedore import extension
@ -1097,6 +1098,67 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe.flush() pipe.flush()
self.assertEqual(0, len(publisher.samples)) self.assertEqual(0, len(publisher.samples))
def test_rate_of_change_precision(self):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
'source': {},
'target': {'name': 'cpu_util',
'unit': '%',
'type': sample.TYPE_GAUGE,
'scale': s}
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('meters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(
self.CONF,
self.cfg2file(self.pipeline_cfg), self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
now = timeutils.utcnow()
now_time = monotonic.monotonic()
# Simulate a laggy poller
later = now + datetime.timedelta(seconds=12345)
later_time = now_time + 10
counters = [
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=125000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=now.isoformat(),
monotonic_time=now_time,
resource_metadata={'cpu_number': 4}
),
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=165000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=later.isoformat(),
monotonic_time=later_time,
resource_metadata={'cpu_number': 4}
),
]
pipe.publish_data(counters)
publisher = pipe.publishers[0]
self.assertEqual(1, len(publisher.samples))
cpu_util_sample = publisher.samples[0]
self.assertEqual(100, cpu_util_sample.volume)
def test_rate_of_change_max(self): def test_rate_of_change_max(self):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [ transformer_cfg = [

View File

@ -187,12 +187,18 @@ class RateOfChangeTransformer(ScalingTransformer):
key = s.name + s.resource_id key = s.name + s.resource_id
prev = self.cache.get(key) prev = self.cache.get(key)
timestamp = timeutils.parse_isotime(s.timestamp) timestamp = timeutils.parse_isotime(s.timestamp)
self.cache[key] = (s.volume, timestamp) self.cache[key] = (s.volume, timestamp, s.monotonic_time)
if prev: if prev:
prev_volume = prev[0] prev_volume = prev[0]
prev_timestamp = prev[1] prev_timestamp = prev[1]
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) prev_monotonic_time = prev[2]
if (prev_monotonic_time is not None and
s.monotonic_time is not None):
# NOTE(sileht): Prefer high precision timer
time_delta = s.monotonic_time - prev_monotonic_time
else:
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# disallow violations of the arrow of time # disallow violations of the arrow of time
if time_delta < 0: if time_delta < 0:
LOG.warning(_('dropping out of time order sample: %s'), (s,)) LOG.warning(_('dropping out of time order sample: %s'), (s,))

View File

@ -12,6 +12,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT
kafka-python>=1.3.2 # Apache-2.0 kafka-python>=1.3.2 # Apache-2.0
keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0 keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0
lxml>=2.3 # BSD lxml>=2.3 # BSD
monotonic
msgpack-python>=0.4.0 # Apache-2.0 msgpack-python>=0.4.0 # Apache-2.0
oslo.concurrency>=3.5.0 # Apache-2.0 oslo.concurrency>=3.5.0 # Apache-2.0
oslo.config>=3.22.0 # Apache-2.0 oslo.config>=3.22.0 # Apache-2.0