From 1a18d3acaeab6754055805571938faa546f2b214 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Fri, 18 Sep 2015 09:29:40 -0400 Subject: [PATCH] remove batch processing requirement from arithmetic transformer arithmetic transformers relies on batch processing of samples. this is invalid assumption as samples are discrete datapoints and the transformer should not rely on polling logic to group datapoints especially now that we don't process data at polling agents. Change-Id: Id433bdef5ca3a59de49fc66e9805a37b4cd19781 Closes-Bug: #1496701 --- ceilometer/tests/pipeline_base.py | 9 ++++----- ceilometer/transformer/arithmetic.py | 9 ++++----- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/ceilometer/tests/pipeline_base.py b/ceilometer/tests/pipeline_base.py index 4ff335ab..adcc9695 100644 --- a/ceilometer/tests/pipeline_base.py +++ b/ceilometer/tests/pipeline_base.py @@ -1742,12 +1742,11 @@ class BasePipelineTestCase(base.BaseTestCase): pipeline_manager = pipeline.PipelineManager(self.pipeline_cfg, self.transformer_manager) pipe = pipeline_manager.pipelines[0] - - pipe.publish_data(None, counters) + for s in counters: + pipe.publish_data(None, s) + pipe.flush(None) publisher = pipeline_manager.pipelines[0].publishers[0] expected_len = len(test_resources) * len(expected) - self.assertEqual(0, len(publisher.samples)) - pipe.flush(None) self.assertEqual(expected_len, len(publisher.samples)) # bucket samples by resource first @@ -1817,7 +1816,7 @@ class BasePipelineTestCase(base.BaseTestCase): dict(name='memory.usage', volume=512.0), dict(name='memory', volume=1024.0), ] - expected = [50.0] + expected = [25.0] self._do_test_arithmetic(expression, scenario, expected) def test_arithmetic_transformer_missing(self): diff --git a/ceilometer/transformer/arithmetic.py b/ceilometer/transformer/arithmetic.py index 365c7848..63a844cb 100644 --- a/ceilometer/transformer/arithmetic.py +++ b/ceilometer/transformer/arithmetic.py @@ -101,15 +101,14 @@ class ArithmeticTransformer(transformer.TransformerBase): def flush(self, context): new_samples = [] + cache_clean_list = [] if not self.misconfigured: for resource_id in self.cache: if self._check_requirements(resource_id): new_samples.append(self._calculate(resource_id)) - else: - LOG.warn(_('Unable to perform calculation, not all of ' - '{%s} are present'), - ', '.join(self.required_meters)) - self.cache.clear() + cache_clean_list.append(resource_id) + for res_id in cache_clean_list: + self.cache.pop(res_id) return new_samples @classmethod