From 501143062b82da180b0f9c40e4a505d833892e92 Mon Sep 17 00:00:00 2001 From: Dan Travis Date: Thu, 28 Jan 2016 16:36:27 +0000 Subject: [PATCH] Adds timestamp option to Aggregation transformer Adds an argument to the Aggregation transformer constructor that allows a user to specify whether to include the timestamp from either the first or last sample received for a given aggregated sample. This addresses an issue with transformer chaining where incorrect values will sometimes be produced by the Rate of Change transformer when chaining the Aggregation transformer with the Rate of Change transformer. Change-Id: Ib163a80a7e6ddaf58d7cc555fb4f4d87d570b1a1 Closes-Bug: #1539163 --- ceilometer/tests/pipeline_base.py | 61 +++++++++++++++++++ .../unit/transformer/test_conversions.py | 26 ++++++++ ceilometer/transformer/conversions.py | 15 ++++- 3 files changed, 101 insertions(+), 1 deletion(-) diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 60960add..867fff51 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -18,10 +18,12 @@ # under the License. import abc +import copy import datetime import traceback import mock +from oslo_context import context from oslo_utils import timeutils from oslotest import base from oslotest import mockpatch @@ -1660,6 +1662,65 @@ class BasePipelineTestCase(base.BaseTestCase): self.assertEqual("test_resource", getattr(publisher.samples[0], 'resource_id')) + def test_aggregator_to_rate_of_change_transformer_two_resources(self): + resource_id = ['1ca738a1-c49c-4401-8346-5c60ebdb03f4', + '5dd418a6-c6a9-49c9-9cef-b357d72c71dd'] + + aggregator = conversions.AggregatorTransformer(size="2", + timestamp="last") + + rate_of_change_transformer = conversions.RateOfChangeTransformer() + + counter_time = timeutils.parse_isotime('2016-01-01T12:00:00+00:00') + + for offset in range(2): + counter = copy.copy(self.test_counter) + counter.timestamp = timeutils.isotime(counter_time) + counter.resource_id = resource_id[0] + counter.volume = offset + counter.type = sample.TYPE_CUMULATIVE + counter.unit = 'ns' + aggregator.handle_sample(context.get_admin_context(), counter) + + if offset == 1: + test_time = counter_time + + counter_time = counter_time + datetime.timedelta(0, 1) + + aggregated_counters = aggregator.flush(context.get_admin_context()) + self.assertEqual(len(aggregated_counters), 1) + self.assertEqual(aggregated_counters[0].timestamp, + timeutils.isotime(test_time)) + + rate_of_change_transformer.handle_sample(context.get_admin_context(), + aggregated_counters[0]) + + for offset in range(2): + counter = copy.copy(self.test_counter) + counter.timestamp = timeutils.isotime(counter_time) + counter.resource_id = resource_id[offset] + counter.volume = 2 + counter.type = sample.TYPE_CUMULATIVE + counter.unit = 'ns' + aggregator.handle_sample(context.get_admin_context(), counter) + + if offset == 0: + test_time = counter_time + + counter_time = counter_time + datetime.timedelta(0, 1) + + aggregated_counters = aggregator.flush(context.get_admin_context()) + self.assertEqual(len(aggregated_counters), 2) + + for counter in aggregated_counters: + if counter.resource_id == resource_id[0]: + rateOfChange = rate_of_change_transformer.handle_sample( + context.get_admin_context(), counter) + self.assertEqual(counter.timestamp, + timeutils.isotime(test_time)) + + self.assertEqual(rateOfChange.volume, 1) + def _do_test_arithmetic_expr_parse(self, expr, expected): actual = arithmetic.ArithmeticTransformer.parse_expr(expr) self.assertEqual(expected, actual) diff --git a/ceilometer/tests/unit/transformer/test_conversions.py b/ceilometer/tests/unit/transformer/test_conversions.py index ddbef41b..30a6a104 100644 --- a/ceilometer/tests/unit/transformer/test_conversions.py +++ b/ceilometer/tests/unit/transformer/test_conversions.py @@ -63,6 +63,32 @@ class AggregatorTransformerTestCase(base.BaseTestCase): self.assertRaises(ValueError, conversions.AggregatorTransformer, "2", "abc", None, None, None) + def test_init_no_timestamp(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None) + self.assertEqual("first", aggregator.timestamp) + + def test_init_timestamp_none(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, None) + self.assertEqual("first", aggregator.timestamp) + + def test_init_timestamp_first(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, "first") + self.assertEqual("first", aggregator.timestamp) + + def test_init_timestamp_last(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, "last") + self.assertEqual("last", aggregator.timestamp) + + def test_init_timestamp_invalid(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, + "invalid_option") + self.assertEqual("first", aggregator.timestamp) + def test_size_unbounded(self): aggregator = conversions.AggregatorTransformer(size="0", retention_time="300") diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 62642108..d9d091d5 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -236,11 +236,17 @@ class AggregatorTransformer(ScalingTransformer): AggregatorTransformer(size=15, user_id='first', resource_metadata='drop') + + To keep the timestamp of the last received sample rather + than the first: + + AggregatorTransformer(timestamp="last") + """ def __init__(self, size=1, retention_time=None, project_id=None, user_id=None, resource_metadata="last", - **kwargs): + timestamp="first", **kwargs): super(AggregatorTransformer, self).__init__(**kwargs) self.samples = {} self.counts = collections.defaultdict(int) @@ -249,6 +255,11 @@ class AggregatorTransformer(ScalingTransformer): if not (self.size or self.retention_time): self.size = 1 + if timestamp in ["first", "last"]: + self.timestamp = timestamp + else: + self.timestamp = "first" + self.initial_timestamp = None self.aggregated_samples = 0 @@ -295,6 +306,8 @@ class AggregatorTransformer(ScalingTransformer): 'resource_metadata'] == 'drop': self.samples[key].resource_metadata = {} else: + if self.timestamp == "last": + self.samples[key].timestamp = sample_.timestamp if sample_.type == sample.TYPE_CUMULATIVE: self.samples[key].volume = self._scale(sample_) else: