diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 60960add..867fff51 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -18,10 +18,12 @@ # under the License. import abc +import copy import datetime import traceback import mock +from oslo_context import context from oslo_utils import timeutils from oslotest import base from oslotest import mockpatch @@ -1660,6 +1662,65 @@ class BasePipelineTestCase(base.BaseTestCase): self.assertEqual("test_resource", getattr(publisher.samples[0], 'resource_id')) + def test_aggregator_to_rate_of_change_transformer_two_resources(self): + resource_id = ['1ca738a1-c49c-4401-8346-5c60ebdb03f4', + '5dd418a6-c6a9-49c9-9cef-b357d72c71dd'] + + aggregator = conversions.AggregatorTransformer(size="2", + timestamp="last") + + rate_of_change_transformer = conversions.RateOfChangeTransformer() + + counter_time = timeutils.parse_isotime('2016-01-01T12:00:00+00:00') + + for offset in range(2): + counter = copy.copy(self.test_counter) + counter.timestamp = timeutils.isotime(counter_time) + counter.resource_id = resource_id[0] + counter.volume = offset + counter.type = sample.TYPE_CUMULATIVE + counter.unit = 'ns' + aggregator.handle_sample(context.get_admin_context(), counter) + + if offset == 1: + test_time = counter_time + + counter_time = counter_time + datetime.timedelta(0, 1) + + aggregated_counters = aggregator.flush(context.get_admin_context()) + self.assertEqual(len(aggregated_counters), 1) + self.assertEqual(aggregated_counters[0].timestamp, + timeutils.isotime(test_time)) + + rate_of_change_transformer.handle_sample(context.get_admin_context(), + aggregated_counters[0]) + + for offset in range(2): + counter = copy.copy(self.test_counter) + counter.timestamp = timeutils.isotime(counter_time) + counter.resource_id = resource_id[offset] + counter.volume = 2 + counter.type = sample.TYPE_CUMULATIVE + counter.unit = 'ns' + aggregator.handle_sample(context.get_admin_context(), counter) + + if offset == 0: + test_time = counter_time + + counter_time = counter_time + datetime.timedelta(0, 1) + + aggregated_counters = aggregator.flush(context.get_admin_context()) + self.assertEqual(len(aggregated_counters), 2) + + for counter in aggregated_counters: + if counter.resource_id == resource_id[0]: + rateOfChange = rate_of_change_transformer.handle_sample( + context.get_admin_context(), counter) + self.assertEqual(counter.timestamp, + timeutils.isotime(test_time)) + + self.assertEqual(rateOfChange.volume, 1) + def _do_test_arithmetic_expr_parse(self, expr, expected): actual = arithmetic.ArithmeticTransformer.parse_expr(expr) self.assertEqual(expected, actual) diff --git a/ceilometer/tests/unit/transformer/test_conversions.py b/ceilometer/tests/unit/transformer/test_conversions.py index ddbef41b..30a6a104 100644 --- a/ceilometer/tests/unit/transformer/test_conversions.py +++ b/ceilometer/tests/unit/transformer/test_conversions.py @@ -63,6 +63,32 @@ class AggregatorTransformerTestCase(base.BaseTestCase): self.assertRaises(ValueError, conversions.AggregatorTransformer, "2", "abc", None, None, None) + def test_init_no_timestamp(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None) + self.assertEqual("first", aggregator.timestamp) + + def test_init_timestamp_none(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, None) + self.assertEqual("first", aggregator.timestamp) + + def test_init_timestamp_first(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, "first") + self.assertEqual("first", aggregator.timestamp) + + def test_init_timestamp_last(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, "last") + self.assertEqual("last", aggregator.timestamp) + + def test_init_timestamp_invalid(self): + aggregator = conversions.AggregatorTransformer("1", "1", None, + None, None, + "invalid_option") + self.assertEqual("first", aggregator.timestamp) + def test_size_unbounded(self): aggregator = conversions.AggregatorTransformer(size="0", retention_time="300") diff --git a/ceilometer/transformer/conversions.py b/ceilometer/transformer/conversions.py index 62642108..d9d091d5 100644 --- a/ceilometer/transformer/conversions.py +++ b/ceilometer/transformer/conversions.py @@ -236,11 +236,17 @@ class AggregatorTransformer(ScalingTransformer): AggregatorTransformer(size=15, user_id='first', resource_metadata='drop') + + To keep the timestamp of the last received sample rather + than the first: + + AggregatorTransformer(timestamp="last") + """ def __init__(self, size=1, retention_time=None, project_id=None, user_id=None, resource_metadata="last", - **kwargs): + timestamp="first", **kwargs): super(AggregatorTransformer, self).__init__(**kwargs) self.samples = {} self.counts = collections.defaultdict(int) @@ -249,6 +255,11 @@ class AggregatorTransformer(ScalingTransformer): if not (self.size or self.retention_time): self.size = 1 + if timestamp in ["first", "last"]: + self.timestamp = timestamp + else: + self.timestamp = "first" + self.initial_timestamp = None self.aggregated_samples = 0 @@ -295,6 +306,8 @@ class AggregatorTransformer(ScalingTransformer): 'resource_metadata'] == 'drop': self.samples[key].resource_metadata = {} else: + if self.timestamp == "last": + self.samples[key].timestamp = sample_.timestamp if sample_.type == sample.TYPE_CUMULATIVE: self.samples[key].volume = self._scale(sample_) else: