Drop out-of-time-sequence rate of change samples

Now that samples always go over the messaging bus before being
potentially supplied to the transformers, there is a possibility
that samples can arrive out of time order. When this happens a
negative time delta could happen, causing a negative number where
one makes no sense.

This change adjusts handling such that if a sample if older than the
sample in the cache it is dropped and the cache is reset to have the
most recent sample. A dropped sample is logged with a warning as we
assume that violations of the order of time are something we might
want to be aware of.

If the samples are the exact same age, we carry on.

Change-Id: I8c8fe0c0ab3c01ef5b2913ab18dfd5959aefaed3
This commit is contained in:
Chris Dent 2015-07-29 18:11:32 +00:00
parent 29d3cd405b
commit 5bc3281f4e
2 changed files with 85 additions and 4 deletions

View File

@ -1010,7 +1010,7 @@ class BasePipelineTestCase(base.BaseTestCase):
offset=0) offset=0)
def test_rate_of_change_no_predecessor(self): def test_rate_of_change_no_predecessor(self):
s = "100.0 / (10**9 * resource_metadata.get('cpu_number', 1))" s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [ transformer_cfg = [
{ {
'name': 'rate_of_change', 'name': 'rate_of_change',
@ -1050,6 +1050,80 @@ class BasePipelineTestCase(base.BaseTestCase):
pipe.flush(None) pipe.flush(None)
self.assertEqual(0, len(publisher.samples)) self.assertEqual(0, len(publisher.samples))
@mock.patch('ceilometer.transformer.conversions.LOG')
def test_rate_of_change_out_of_order(self, the_log):
s = "100.0 / (10**9 * (resource_metadata.cpu_number or 1))"
transformer_cfg = [
{
'name': 'rate_of_change',
'parameters': {
'source': {},
'target': {'name': 'cpu_util',
'unit': '%',
'type': sample.TYPE_GAUGE,
'scale': s}
}
},
]
self._set_pipeline_cfg('transformers', transformer_cfg)
self._set_pipeline_cfg('counters', ['cpu'])
pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg,
self.transformer_manager)
pipe = pipeline_manager.pipelines[0]
now = timeutils.utcnow()
earlier = now - datetime.timedelta(seconds=10)
later = now + datetime.timedelta(seconds=10)
counters = [
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=125000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=now.isoformat(),
resource_metadata={'cpu_number': 4}
),
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=120000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=earlier.isoformat(),
resource_metadata={'cpu_number': 4}
),
sample.Sample(
name='cpu',
type=sample.TYPE_CUMULATIVE,
volume=130000000000,
unit='ns',
user_id='test_user',
project_id='test_proj',
resource_id='test_resource',
timestamp=later.isoformat(),
resource_metadata={'cpu_number': 4}
),
]
pipe.publish_data(None, counters)
publisher = pipe.publishers[0]
self.assertEqual(1, len(publisher.samples))
pipe.flush(None)
self.assertEqual(1, len(publisher.samples))
cpu_util_sample = publisher.samples[0]
self.assertEqual(12.5, cpu_util_sample.volume)
the_log.warn.assert_called_with(
'dropping out of time order sample: %s',
(counters[1],)
)
def test_resources(self): def test_resources(self):
resources = ['test1://', 'test2://'] resources = ['test1://', 'test2://']
self._set_pipeline_cfg('resources', resources) self._set_pipeline_cfg('resources', resources)

View File

@ -121,9 +121,16 @@ class RateOfChangeTransformer(ScalingTransformer):
prev_volume = prev[0] prev_volume = prev[0]
prev_timestamp = prev[1] prev_timestamp = prev[1]
time_delta = timeutils.delta_seconds(prev_timestamp, timestamp) time_delta = timeutils.delta_seconds(prev_timestamp, timestamp)
# we only allow negative deltas for noncumulative samples, whereas # disallow violations of the arrow of time
# for cumulative we assume that a reset has occurred in the interim if time_delta < 0:
# so that the current volume gives a lower bound on growth LOG.warn(_('dropping out of time order sample: %s'), (s,))
# Reset the cache to the newer sample.
self.cache[key] = prev
return None
# we only allow negative volume deltas for noncumulative
# samples, whereas for cumulative we assume that a reset has
# occurred in the interim so that the current volume gives a
# lower bound on growth
volume_delta = (s.volume - prev_volume volume_delta = (s.volume - prev_volume
if (prev_volume <= s.volume or if (prev_volume <= s.volume or
s.type != sample.TYPE_CUMULATIVE) s.type != sample.TYPE_CUMULATIVE)