Merge "Changes aggregator transformer to allow retention_time w/o size"
This commit is contained in:
commit
bdb352c550
@ -1370,15 +1370,6 @@ class BasePipelineTestCase(base.BaseTestCase):
|
||||
actual = sorted(s.volume for s in publisher.samples)
|
||||
self.assertEqual([2.0, 3.0, 6.0], actual)
|
||||
|
||||
def test_aggregator_input_validation(self):
|
||||
aggregator = conversions.AggregatorTransformer("1", "15", None,
|
||||
None, None)
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertEqual(15, aggregator.retention_time)
|
||||
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"abc", "cde", None, None, None)
|
||||
|
||||
def test_aggregator_metadata(self):
|
||||
for conf, expected_version in [('last', '2.0'), ('first', '1.0')]:
|
||||
samples = self._do_test_aggregator({
|
||||
|
0
ceilometer/tests/unit/transformer/__init__.py
Normal file
0
ceilometer/tests/unit/transformer/__init__.py
Normal file
89
ceilometer/tests/unit/transformer/test_conversions.py
Normal file
89
ceilometer/tests/unit/transformer/test_conversions.py
Normal file
@ -0,0 +1,89 @@
|
||||
#
|
||||
# Copyright 2016 IBM Corp.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import copy
|
||||
|
||||
from oslo_context import context
|
||||
from oslo_utils import timeutils
|
||||
from oslotest import base
|
||||
|
||||
from ceilometer import sample
|
||||
from ceilometer.transformer import conversions
|
||||
|
||||
|
||||
class AggregatorTransformerTestCase(base.BaseTestCase):
|
||||
SAMPLE = sample.Sample(
|
||||
name='cpu',
|
||||
type=sample.TYPE_CUMULATIVE,
|
||||
unit='ns',
|
||||
volume='1234567',
|
||||
user_id='56c5692032f34041900342503fecab30',
|
||||
project_id='ac9494df2d9d4e709bac378cceabaf23',
|
||||
resource_id='1ca738a1-c49c-4401-8346-5c60ebdb03f4',
|
||||
timestamp="2015-10-29 14:12:15.485877+00:00",
|
||||
resource_metadata={}
|
||||
)
|
||||
|
||||
def setUp(self):
|
||||
super(AggregatorTransformerTestCase, self).setUp()
|
||||
self._sample_offset = 0
|
||||
|
||||
def test_init_input_validation(self):
|
||||
aggregator = conversions.AggregatorTransformer("2", "15", None,
|
||||
None, None)
|
||||
self.assertEqual(2, aggregator.size)
|
||||
self.assertEqual(15, aggregator.retention_time)
|
||||
|
||||
def test_init_no_size_or_rention_time(self):
|
||||
aggregator = conversions.AggregatorTransformer()
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertEqual(None, aggregator.retention_time)
|
||||
|
||||
def test_init_size_zero(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="0")
|
||||
self.assertEqual(1, aggregator.size)
|
||||
self.assertEqual(None, aggregator.retention_time)
|
||||
|
||||
def test_init_input_validation_size_invalid(self):
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"abc", "15", None, None, None)
|
||||
|
||||
def test_init_input_validation_retention_time_invalid(self):
|
||||
self.assertRaises(ValueError, conversions.AggregatorTransformer,
|
||||
"2", "abc", None, None, None)
|
||||
|
||||
def test_size_unbounded(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="0",
|
||||
retention_time="300")
|
||||
self._insert_sample_data(aggregator)
|
||||
|
||||
samples = aggregator.flush(context.get_admin_context())
|
||||
|
||||
self.assertEqual([], samples)
|
||||
|
||||
def test_size_bounded(self):
|
||||
aggregator = conversions.AggregatorTransformer(size="100")
|
||||
self._insert_sample_data(aggregator)
|
||||
|
||||
samples = aggregator.flush(context.get_admin_context())
|
||||
|
||||
self.assertEqual(100, len(samples))
|
||||
|
||||
def _insert_sample_data(self, aggregator):
|
||||
for _ in range(100):
|
||||
sample = copy.copy(self.SAMPLE)
|
||||
sample.resource_id = sample.resource_id + str(self._sample_offset)
|
||||
sample.timestamp = timeutils.isotime()
|
||||
aggregator.handle_sample(context.get_admin_context(), sample)
|
||||
self._sample_offset += 1
|
@ -246,6 +246,9 @@ class AggregatorTransformer(ScalingTransformer):
|
||||
self.counts = collections.defaultdict(int)
|
||||
self.size = int(size) if size else None
|
||||
self.retention_time = float(retention_time) if retention_time else None
|
||||
if not (self.size or self.retention_time):
|
||||
self.size = 1
|
||||
|
||||
self.initial_timestamp = None
|
||||
self.aggregated_samples = 0
|
||||
|
||||
@ -308,7 +311,7 @@ class AggregatorTransformer(ScalingTransformer):
|
||||
expired = (self.retention_time and
|
||||
timeutils.is_older_than(self.initial_timestamp,
|
||||
self.retention_time))
|
||||
full = self.aggregated_samples >= self.size
|
||||
full = self.size and self.aggregated_samples >= self.size
|
||||
if full or expired:
|
||||
x = list(self.samples.values())
|
||||
# gauge aggregates need to be averages
|
||||
|
Loading…
x
Reference in New Issue
Block a user