From 251a06d5c6a636050f92984b8f969f56868c41ec Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Tue, 18 Jul 2017 08:10:38 +0200 Subject: [PATCH] High precision rate of change timedelta The current way to calculate rate of change is not precise at all and depends on the local host clock. So, we have good chance that the host clock derive a bit between each polling. Also the timestamp is polling cycle run and not the exact polled sample. This makes the rate of change transformer not accurate, and maybe wrong if the local clock have jumped to much or if a pollster make to much time to get the stats (libvirt reconnection, ...). A sample gets a new attribute monotonic_time, where we can store an accurate polling time using monotonic.monotonic(). In rate of change transformer, if the monotonic time is available we use to calculate the time delta between samples. For instance metrics, we set monotonic_time as soon as we poll it from libvirt, avoiding almost all precision issue. That makes the rate of change precise to the nanoseconds for polled samples, while keeping the timestamp identical for all samples polled during one cycle. Related-bug: #1527620 Change-Id: I40e14fb6aa595a86df9767be5758f52b7ceafc8f (cherry picked from commit fd6a76601a382cdf47527893f9255b48bc235d05) --- ceilometer/compute/pollsters/__init__.py | 3 +- ceilometer/compute/pollsters/cpu.py | 3 + ceilometer/compute/pollsters/disk.py | 9 ++- ceilometer/compute/pollsters/net.py | 48 +++++++++----- ceilometer/compute/pollsters/util.py | 4 +- ceilometer/pipeline.py | 5 +- ceilometer/publisher/utils.py | 1 + ceilometer/sample.py | 3 +- ceilometer/storage/impl_hbase.py | 4 ++ ceilometer/storage/impl_mongodb.py | 7 +++ .../api/v2/test_post_samples_scenarios.py | 7 +++ .../storage/test_storage_scenarios.py | 6 ++ .../tests/unit/compute/pollsters/test_net.py | 5 +- ceilometer/tests/unit/pipeline_base.py | 62 +++++++++++++++++++ ceilometer/transformer/conversions.py | 10 ++- requirements.txt | 1 + 16 files changed, 151 insertions(+), 27 deletions(-) diff --git a/ceilometer/compute/pollsters/__init__.py b/ceilometer/compute/pollsters/__init__.py index f3360270..c3ef015b 100644 --- a/ceilometer/compute/pollsters/__init__.py +++ b/ceilometer/compute/pollsters/__init__.py @@ -73,7 +73,7 @@ class BaseComputePollster(plugin_base.PollsterBase): return duration def _get_samples_per_devices(self, attribute, instance, _name, _type, - _unit): + _unit, monotonic_time=None): samples = [] for disk, value in six.iteritems(attribute): samples.append(util.make_sample_from_instance( @@ -85,5 +85,6 @@ class BaseComputePollster(plugin_base.PollsterBase): volume=value, resource_id="%s-%s" % (instance.id, disk), additional_metadata={'disk_name': disk}, + monotonic_time=monotonic_time, )) return samples diff --git a/ceilometer/compute/pollsters/cpu.py b/ceilometer/compute/pollsters/cpu.py index b8287d3b..167e194d 100644 --- a/ceilometer/compute/pollsters/cpu.py +++ b/ceilometer/compute/pollsters/cpu.py @@ -14,6 +14,8 @@ # License for the specific language governing permissions and limitations # under the License. + +import monotonic from oslo_log import log import ceilometer @@ -46,6 +48,7 @@ class CPUPollster(pollsters.BaseComputePollster): unit='ns', volume=cpu_info.time, additional_metadata=cpu_num, + monotonic_time=monotonic.monotonic() ) except virt_inspector.InstanceNotFoundException as err: # Instance was deleted while getting samples. Ignore it. diff --git a/ceilometer/compute/pollsters/disk.py b/ceilometer/compute/pollsters/disk.py index 9b35fdc1..82ecf40a 100644 --- a/ceilometer/compute/pollsters/disk.py +++ b/ceilometer/compute/pollsters/disk.py @@ -17,6 +17,7 @@ import abc import collections +import monotonic from oslo_log import log import six @@ -33,7 +34,7 @@ LOG = log.getLogger(__name__) DiskIOData = collections.namedtuple( 'DiskIOData', - 'r_bytes r_requests w_bytes w_requests per_disk_requests', + 'r_bytes r_requests w_bytes w_requests per_disk_requests polled_time', ) DiskRateData = collections.namedtuple('DiskRateData', @@ -103,12 +104,14 @@ class _Base(pollsters.BaseComputePollster): 'write_bytes': per_device_write_bytes, 'write_requests': per_device_write_requests, } + polled_time = monotonic.monotonic() i_cache[instance.id] = DiskIOData( r_bytes=r_bytes, r_requests=r_requests, w_bytes=w_bytes, w_requests=w_requests, per_disk_requests=per_device_requests, + polled_time=polled_time ) return i_cache[instance.id] @@ -128,13 +131,15 @@ class _Base(pollsters.BaseComputePollster): volume=getattr(c_data, _volume), additional_metadata={ 'device': c_data.per_disk_requests[_metadata].keys()}, + monotonic_time=c_data.polled_time, )] def _get_samples_per_device(self, c_data, _attr, instance, _name, _unit): """Return one or more Samples for meter 'disk.device.*'""" return self._get_samples_per_devices(c_data.per_disk_requests[_attr], instance, _name, - sample.TYPE_CUMULATIVE, _unit) + sample.TYPE_CUMULATIVE, _unit, + c_data.polled_time) def get_samples(self, manager, cache, resources): for instance in resources: diff --git a/ceilometer/compute/pollsters/net.py b/ceilometer/compute/pollsters/net.py index 933d6577..54ae7f7a 100644 --- a/ceilometer/compute/pollsters/net.py +++ b/ceilometer/compute/pollsters/net.py @@ -16,6 +16,7 @@ import copy +import monotonic from oslo_log import log import ceilometer @@ -34,7 +35,8 @@ class _Base(pollsters.BaseComputePollster): NET_USAGE_MESSAGE = ' '.join(["NETWORK USAGE:", "%s %s:", "read-bytes=%d", "write-bytes=%d"]) - def make_vnic_sample(self, instance, name, type, unit, volume, vnic_data): + def make_vnic_sample(self, instance, name, type, unit, volume, vnic_data, + monotonic_time): metadata = copy.copy(vnic_data) additional_metadata = dict(zip(metadata._fields, metadata)) if vnic_data.fref is not None: @@ -53,7 +55,8 @@ class _Base(pollsters.BaseComputePollster): unit=unit, volume=volume, resource_id=rid, - additional_metadata=additional_metadata + additional_metadata=additional_metadata, + monotonic_time=monotonic_time, ) CACHE_KEY_VNIC = 'vnics' @@ -72,9 +75,10 @@ class _Base(pollsters.BaseComputePollster): def _get_vnics_for_instance(self, cache, inspector, instance): i_cache = cache.setdefault(self.CACHE_KEY_VNIC, {}) if instance.id not in i_cache: - i_cache[instance.id] = list( - self._get_vnic_info(inspector, instance) - ) + data = list(self._get_vnic_info(inspector, instance)) + polled_time = monotonic.monotonic() + i_cache[instance.id] = [(vnic, info, polled_time) + for vnic, info in data] return i_cache[instance.id] def get_samples(self, manager, cache, resources): @@ -88,11 +92,11 @@ class _Base(pollsters.BaseComputePollster): self.inspector, instance, ) - for vnic, info in vnics: + for vnic, info, polled_time in vnics: LOG.debug(self.NET_USAGE_MESSAGE, instance_name, vnic.name, self._get_rx_info(info), self._get_tx_info(info)) - yield self._get_sample(instance, vnic, info) + yield self._get_sample(instance, vnic, info, polled_time) except virt_inspector.InstanceNotFoundException as err: # Instance was deleted while getting samples. Ignore it. LOG.debug('Exception while getting samples %s', err) @@ -179,7 +183,7 @@ class _ErrorsBase(_Base): class IncomingBytesPollster(_Base): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.incoming.bytes', @@ -187,12 +191,13 @@ class IncomingBytesPollster(_Base): unit='B', volume=info.rx_bytes, vnic_data=vnic, + monotonic_time=polled_time, ) class IncomingPacketsPollster(_PacketsBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.incoming.packets', @@ -200,12 +205,13 @@ class IncomingPacketsPollster(_PacketsBase): unit='packet', volume=info.rx_packets, vnic_data=vnic, + monotonic_time=polled_time, ) class OutgoingBytesPollster(_Base): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.outgoing.bytes', @@ -213,12 +219,13 @@ class OutgoingBytesPollster(_Base): unit='B', volume=info.tx_bytes, vnic_data=vnic, + monotonic_time=polled_time, ) class OutgoingPacketsPollster(_PacketsBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.outgoing.packets', @@ -226,12 +233,13 @@ class OutgoingPacketsPollster(_PacketsBase): unit='packet', volume=info.tx_packets, vnic_data=vnic, + monotonic_time=polled_time, ) class IncomingBytesRatePollster(_RateBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.incoming.bytes.rate', @@ -239,12 +247,13 @@ class IncomingBytesRatePollster(_RateBase): unit='B/s', volume=info.rx_bytes_rate, vnic_data=vnic, + monotonic_time=polled_time, ) class OutgoingBytesRatePollster(_RateBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.outgoing.bytes.rate', @@ -252,12 +261,13 @@ class OutgoingBytesRatePollster(_RateBase): unit='B/s', volume=info.tx_bytes_rate, vnic_data=vnic, + monotonic_time=polled_time, ) class IncomingDropPollster(_DropBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.incoming.packets.drop', @@ -265,12 +275,13 @@ class IncomingDropPollster(_DropBase): unit='packet', volume=info.rx_drop, vnic_data=vnic, + monotonic_time=polled_time, ) class OutgoingDropPollster(_DropBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.outgoing.packets.drop', @@ -278,12 +289,13 @@ class OutgoingDropPollster(_DropBase): unit='packet', volume=info.tx_drop, vnic_data=vnic, + monotonic_time=polled_time, ) class IncomingErrorsPollster(_ErrorsBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.incoming.packets.error', @@ -291,12 +303,13 @@ class IncomingErrorsPollster(_ErrorsBase): unit='packet', volume=info.rx_errors, vnic_data=vnic, + monotonic_time=polled_time, ) class OutgoingErrorsPollster(_ErrorsBase): - def _get_sample(self, instance, vnic, info): + def _get_sample(self, instance, vnic, info, polled_time): return self.make_vnic_sample( instance, name='network.outgoing.packets.error', @@ -304,4 +317,5 @@ class OutgoingErrorsPollster(_ErrorsBase): unit='packet', volume=info.tx_errors, vnic_data=vnic, + monotonic_time=polled_time, ) diff --git a/ceilometer/compute/pollsters/util.py b/ceilometer/compute/pollsters/util.py index 11f39ffe..a0ac83b7 100644 --- a/ceilometer/compute/pollsters/util.py +++ b/ceilometer/compute/pollsters/util.py @@ -76,7 +76,8 @@ def _get_metadata_from_object(conf, instance): def make_sample_from_instance(conf, instance, name, type, unit, volume, - resource_id=None, additional_metadata=None): + resource_id=None, additional_metadata=None, + monotonic_time=None): additional_metadata = additional_metadata or {} resource_metadata = _get_metadata_from_object(conf, instance) resource_metadata.update(additional_metadata) @@ -89,6 +90,7 @@ def make_sample_from_instance(conf, instance, name, type, unit, volume, project_id=instance.tenant_id, resource_id=resource_id or instance.id, resource_metadata=resource_metadata, + monotonic_time=monotonic_time, ) diff --git a/ceilometer/pipeline.py b/ceilometer/pipeline.py index d39afc82..8db7bc09 100644 --- a/ceilometer/pipeline.py +++ b/ceilometer/pipeline.py @@ -110,7 +110,10 @@ class SamplePipelineEndpoint(PipelineEndpoint): resource_id=s['resource_id'], timestamp=s['timestamp'], resource_metadata=s['resource_metadata'], - source=s.get('source')) + source=s.get('source'), + # NOTE(sileht): May come from an older node, + # Put None in this case. + monotonic_time=s.get('monotonic_time')) for s in samples if publisher_utils.verify_signature( s, self.conf.publisher.telemetry_secret) ] diff --git a/ceilometer/publisher/utils.py b/ceilometer/publisher/utils.py index 6db55a4f..b49f0220 100644 --- a/ceilometer/publisher/utils.py +++ b/ceilometer/publisher/utils.py @@ -98,6 +98,7 @@ def meter_message_from_counter(sample, secret): 'timestamp': sample.timestamp, 'resource_metadata': sample.resource_metadata, 'message_id': sample.id, + 'monotonic_time': sample.monotonic_time, } msg['message_signature'] = compute_signature(msg, secret) return msg diff --git a/ceilometer/sample.py b/ceilometer/sample.py index d5318822..ed780127 100644 --- a/ceilometer/sample.py +++ b/ceilometer/sample.py @@ -95,7 +95,7 @@ class Sample(object): def __init__(self, name, type, unit, volume, user_id, project_id, resource_id, timestamp=None, resource_metadata=None, - source=None, id=None): + source=None, id=None, monotonic_time=None): self.name = name self.type = type self.unit = unit @@ -107,6 +107,7 @@ class Sample(object): self.resource_metadata = resource_metadata or {} self.source = source or self.SOURCE_DEFAULT self.id = id or str(uuid.uuid1()) + self.monotonic_time = monotonic_time def as_dict(self): return copy.copy(self.__dict__) diff --git a/ceilometer/storage/impl_hbase.py b/ceilometer/storage/impl_hbase.py index 494db884..aafaff6c 100644 --- a/ceilometer/storage/impl_hbase.py +++ b/ceilometer/storage/impl_hbase.py @@ -143,6 +143,10 @@ class Connection(hbase_base.Connection, base.Connection): :param data: a dictionary such as returned by ceilometer.publisher.utils.meter_message_from_counter """ + + # We must not record thing. + data.pop("monotonic_time", None) + with self.conn_pool.connection() as conn: resource_table = conn.table(self.RESOURCE_TABLE) meter_table = conn.table(self.METER_TABLE) diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index 82ed679c..afe2e527 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -259,6 +259,13 @@ class Connection(pymongo_base.Connection): # unconditionally insert sample timestamps and resource metadata # (in the update case, this must be conditional on the sample not # being out-of-order) + + # We must not store this + samples = copy.deepcopy(samples) + + for sample in samples: + sample.pop("monotonic_time", None) + sorted_samples = sorted( copy.deepcopy(samples), key=lambda s: (s['resource_id'], s['timestamp'])) diff --git a/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py b/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py index 033f2925..ca787ded 100644 --- a/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py +++ b/ceilometer/tests/functional/api/v2/test_post_samples_scenarios.py @@ -80,6 +80,7 @@ class TestPostSamples(v2.FunctionalTest): s1[0]['source'] = '%s:openstack' % s1[0]['project_id'] self.assertEqual(s1, data.json) + s1[0]["monotonic_time"] = None self.assertEqual(s1[0], self.published[0][0]) def test_nested_metadata(self): @@ -107,6 +108,7 @@ class TestPostSamples(v2.FunctionalTest): unwound['resource_metadata'] = {'nest': {'name1': 'value1', 'name2': 'value3'}, 'name2': 'value2'} + unwound["monotonic_time"] = None # only the published sample should be unwound, not the representation # in the API response self.assertEqual(s1[0], data.json[0]) @@ -218,6 +220,7 @@ class TestPostSamples(v2.FunctionalTest): msg['timestamp'] = timestamp.replace(tzinfo=None).isoformat() self.assertEqual(s, c) + s["monotonic_time"] = None self.assertEqual(s, self.published[0][x]) def test_missing_mandatory_fields(self): @@ -278,6 +281,7 @@ class TestPostSamples(v2.FunctionalTest): s['timestamp'] = data.json[x]['timestamp'] s.setdefault('resource_metadata', dict()) self.assertEqual(s, data.json[x]) + s['monotonic_time'] = None self.assertEqual(s, self.published[0][x]) def test_multiple_samples_multiple_sources(self): @@ -328,6 +332,7 @@ class TestPostSamples(v2.FunctionalTest): s['timestamp'] = data.json[x]['timestamp'] s.setdefault('resource_metadata', dict()) self.assertEqual(s, data.json[x]) + s['monotonic_time'] = None self.assertEqual(s, self.published[0][x]) def test_missing_project_user_id(self): @@ -364,4 +369,6 @@ class TestPostSamples(v2.FunctionalTest): s['project_id'] = project_id self.assertEqual(s, data.json[x]) + + s['monotonic_time'] = None self.assertEqual(s, self.published[0][x]) diff --git a/ceilometer/tests/functional/storage/test_storage_scenarios.py b/ceilometer/tests/functional/storage/test_storage_scenarios.py index 1273cc9c..a8e0492d 100644 --- a/ceilometer/tests/functional/storage/test_storage_scenarios.py +++ b/ceilometer/tests/functional/storage/test_storage_scenarios.py @@ -430,6 +430,7 @@ class RawSampleTest(DBTestBase): d = meter.as_dict() self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs[:3]) def test_get_samples_by_user_limit(self): @@ -450,6 +451,7 @@ class RawSampleTest(DBTestBase): d = meter.as_dict() self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs[:4]) def test_get_samples_by_resource(self): @@ -459,6 +461,7 @@ class RawSampleTest(DBTestBase): d = results[1].as_dict() self.assertEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertEqual(self.msgs[0], d) def test_get_samples_by_metaquery(self): @@ -470,6 +473,7 @@ class RawSampleTest(DBTestBase): d = meter.as_dict() self.assertTimestampEqual(timeutils.utcnow(), d['recorded_at']) del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs) def test_get_samples_by_metaquery_key_with_dot_in_metadata(self): @@ -721,6 +725,7 @@ class ComplexSampleQueryTest(DBTestBase): for sample_item in results: d = sample_item.as_dict() del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.msgs) def test_query_complex_filter_with_regexp(self): @@ -2785,6 +2790,7 @@ class TestBatchRecordingMetering(tests_db.TestBase): for sample_item in results: d = sample_item.as_dict() del d['recorded_at'] + d['monotonic_time'] = None self.assertIn(d, self.sample_dicts) resources = list(self.conn.get_resources()) diff --git a/ceilometer/tests/unit/compute/pollsters/test_net.py b/ceilometer/tests/unit/compute/pollsters/test_net.py index 047307f2..cdf5fed1 100644 --- a/ceilometer/tests/unit/compute/pollsters/test_net.py +++ b/ceilometer/tests/unit/compute/pollsters/test_net.py @@ -234,7 +234,8 @@ class TestNetPollster(base.TestPollsterBase): type=sample.TYPE_CUMULATIVE, unit='B', volume=100, - vnic_data=self.vnic0) + vnic_data=self.vnic0, + monotonic_time=123.123) user_metadata = sm.resource_metadata['user_metadata'] expected = self.INSTANCE_PROPERTIES[ @@ -259,7 +260,7 @@ class TestNetPollsterCache(base.TestPollsterBase): rx_drop=20, rx_errors=21, tx_bytes=3, tx_packets=4, tx_drop=22, tx_errors=23) - vnics = [(vnic0, stats0)] + vnics = [(vnic0, stats0, 123.123)] mgr = manager.AgentManager(0, self.CONF) pollster = factory(self.CONF) diff --git a/ceilometer/tests/unit/pipeline_base.py b/ceilometer/tests/unit/pipeline_base.py index 67a6f032..979e594d 100644 --- a/ceilometer/tests/unit/pipeline_base.py +++ b/ceilometer/tests/unit/pipeline_base.py @@ -21,6 +21,7 @@ import traceback import unittest import mock +import monotonic from oslo_config import fixture as fixture_config from oslo_utils import timeutils from oslotest import mockpatch @@ -1097,6 +1098,67 @@ class BasePipelineTestCase(base.BaseTestCase): pipe.flush() self.assertEqual(0, len(publisher.samples)) + def test_rate_of_change_precision(self): + s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" + transformer_cfg = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': sample.TYPE_GAUGE, + 'scale': s} + } + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('meters', ['cpu']) + pipeline_manager = pipeline.PipelineManager( + self.CONF, + self.cfg2file(self.pipeline_cfg), self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + now = timeutils.utcnow() + now_time = monotonic.monotonic() + # Simulate a laggy poller + later = now + datetime.timedelta(seconds=12345) + later_time = now_time + 10 + + counters = [ + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=125000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + monotonic_time=now_time, + resource_metadata={'cpu_number': 4} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=165000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=later.isoformat(), + monotonic_time=later_time, + resource_metadata={'cpu_number': 4} + ), + ] + + pipe.publish_data(counters) + publisher = pipe.publishers[0] + self.assertEqual(1, len(publisher.samples)) + + cpu_util_sample = publisher.samples[0] + self.assertEqual(100, cpu_util_sample.volume) + def test_rate_of_change_max(self): s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" transformer_cfg = [ diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 3690bacc..44ac8607 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -187,12 +187,18 @@ class RateOfChangeTransformer(ScalingTransformer): key = s.name + s.resource_id prev = self.cache.get(key) timestamp = timeutils.parse_isotime(s.timestamp) - self.cache[key] = (s.volume, timestamp) + self.cache[key] = (s.volume, timestamp, s.monotonic_time) if prev: prev_volume = prev[0] prev_timestamp = prev[1] - time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) + prev_monotonic_time = prev[2] + if (prev_monotonic_time is not None and + s.monotonic_time is not None): + # NOTE(sileht): Prefer high precision timer + time_delta = s.monotonic_time - prev_monotonic_time + else: + time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) # disallow violations of the arrow of time if time_delta < 0: LOG.warning(_('dropping out of time order sample: %s'), (s,)) diff --git a/requirements.txt b/requirements.txt index 21c8c636..fad0c0ee 100644 --- a/requirements.txt +++ b/requirements.txt @@ -13,6 +13,7 @@ jsonschema!=2.5.0,<3.0.0,>=2.0.0 # MIT kafka-python>=1.3.1 # Apache-2.0 keystonemiddleware!=4.1.0,>=4.0.0 # Apache-2.0 lxml>=2.3 # BSD +monotonic msgpack-python>=0.4.0 # Apache-2.0 oslo.concurrency>=3.5.0 # Apache-2.0 oslo.config>=3.9.0 # Apache-2.0