diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 26ff1aee..4554e5a4 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -1010,7 +1010,7 @@ class BasePipelineTestCase(base.BaseTestCase): offset=0) def test_rate_of_change_no_predecessor(self): - s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" + s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" transformer_cfg = [ { 'name': 'rate_of_change', @@ -1050,6 +1050,80 @@ class BasePipelineTestCase(base.BaseTestCase): pipe.flush(None) self.assertEqual(0, len(publisher.samples)) + @mock.patch('ceilometer.transformer.conversions.LOG') + def test_rate_of_change_out_of_order(self, the_log): + s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))" + transformer_cfg = [ + { + 'name': 'rate_of_change', + 'parameters': { + 'source': {}, + 'target': {'name': 'cpu_util', + 'unit': '%', + 'type': sample.TYPE_GAUGE, + 'scale': s} + } + }, + ] + self._set_pipeline_cfg('transformers', transformer_cfg) + self._set_pipeline_cfg('counters', ['cpu']) + pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, + self.transformer_manager) + pipe = pipeline_manager.pipelines[0] + + now = timeutils.utcnow() + earlier = now - datetime.timedelta(seconds=10) + later = now + datetime.timedelta(seconds=10) + + counters = [ + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=125000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=now.isoformat(), + resource_metadata={'cpu_number': 4} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=120000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=earlier.isoformat(), + resource_metadata={'cpu_number': 4} + ), + sample.Sample( + name='cpu', + type=sample.TYPE_CUMULATIVE, + volume=130000000000, + unit='ns', + user_id='test_user', + project_id='test_proj', + resource_id='test_resource', + timestamp=later.isoformat(), + resource_metadata={'cpu_number': 4} + ), + ] + + pipe.publish_data(None, counters) + publisher = pipe.publishers[0] + self.assertEqual(1, len(publisher.samples)) + pipe.flush(None) + self.assertEqual(1, len(publisher.samples)) + + cpu_util_sample = publisher.samples[0] + self.assertEqual(12.5, cpu_util_sample.volume) + the_log.warn.assert_called_with( + 'dropping out of time order sample: %s', + (counters[1],) + ) + def test_resources(self): resources = ['test1://', 'test2://'] self._set_pipeline_cfg('resources', resources) diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index cf29f6d4..4e40d37d 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -121,9 +121,16 @@ class RateOfChangeTransformer(ScalingTransformer): prev_volume = prev[0] prev_timestamp = prev[1] time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) - # we only allow negative deltas for noncumulative samples, whereas - # for cumulative we assume that a reset has occurred in the interim - # so that the current volume gives a lower bound on growth + # disallow violations of the arrow of time + if time_delta < 0: + LOG.warn(_('dropping out of time order sample: %s'), (s,)) + # Reset the cache to the newer sample. + self.cache[key] = prev + return None + # we only allow negative volume deltas for noncumulative + # samples, whereas for cumulative we assume that a reset has + # occurred in the interim so that the current volume gives a + # lower bound on growth volume_delta = (s.volume - prev_volume if (prev_volume <= s.volume or s.type != sample.TYPE_CUMULATIVE)