From ef2ccd39923e30af713fd5a63b5f3d2333d8bd13 Mon Sep 17 00:00:00 2001 From: Eoghan Glynn Date: Thu, 27 Mar 2014 17:02:42 +0000 Subject: [PATCH] De-dupe selectable aggregate list in statistics API Fixes bug 1298514 Discard full duplicates from the selectable aggregate list passed to the statistics API. Partial duplicates (i.e. matching func but different param) are now allowed and supported by the storage drivers. Change-Id: I5e2780befa9e265d5f1deb71b2aa375d05ca2e64 --- ceilometer/api/controllers/v2.py | 2 + ceilometer/storage/impl_mongodb.py | 21 +++++--- ceilometer/storage/impl_sqlalchemy.py | 10 ++-- ceilometer/tests/api/__init__.py | 34 ++++++------ .../tests/api/v2/test_statistics_scenarios.py | 54 +++++++++++++++++++ ceilometer/utils.py | 12 +++++ 6 files changed, 105 insertions(+), 28 deletions(-) diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 5058c2454f..9dc00200dd 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -899,6 +899,8 @@ class MeterController(rest.RestController): kwargs['meter'] = self.meter_name f = storage.SampleFilter(**kwargs) g = _validate_groupby_fields(groupby) + + aggregate = utils.uniq(aggregate, ['func', 'param']) computed = pecan.request.storage_conn.get_meter_statistics(f, period, g, diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index f9491c10c5..f871af0178 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -217,16 +217,15 @@ class Connection(pymongo_base.Connection): ), emit_initial=dict( cardinality=( - 'var aggregate = {};' 'aggregate["cardinality/%(aggregate_param)s"] = 1;' - 'var distincts = {};' - 'distincts[this["%(aggregate_param)s"]] = true;' + 'var distinct_%(aggregate_param)s = {};' + 'distinct_%(aggregate_param)s[this["%(aggregate_param)s"]]' + ' = true;' ) ), emit_body=dict( cardinality=( - 'aggregate : aggregate,' - 'distincts : distincts ,' + 'distinct_%(aggregate_param)s : distinct_%(aggregate_param)s,' '%(aggregate_param)s : this["%(aggregate_param)s"],' ) ), @@ -236,14 +235,17 @@ class Connection(pymongo_base.Connection): reduce_body=dict( cardinality=( 'aggregate : values[0].aggregate,' - 'distincts: values[0].distincts,' + 'distinct_%(aggregate_param)s:' + ' values[0].distinct_%(aggregate_param)s,' '%(aggregate_param)s : values[0]["%(aggregate_param)s"],' ) ), reduce_computation=dict( cardinality=( - 'if (!(values[i]["%(aggregate_param)s"] in res.distincts)) {' - ' res.distincts[values[i]["%(aggregate_param)s"]] = true;' + 'if (!(values[i]["%(aggregate_param)s"] in' + ' res.distinct_%(aggregate_param)s)) {' + ' res.distinct_%(aggregate_param)s[values[i]' + ' ["%(aggregate_param)s"]] = true;' ' res.aggregate["cardinality/%(aggregate_param)s"] += 1;}' ) ), @@ -253,8 +255,10 @@ class Connection(pymongo_base.Connection): ) EMIT_STATS_COMMON = """ + var aggregate = {}; %(aggregate_initial_placeholder)s emit(%(key_val)s, { unit: this.counter_unit, + aggregate : aggregate, %(aggregate_body_placeholder)s groupby : %(groupby_val)s, duration_start : this.timestamp, @@ -347,6 +351,7 @@ class Connection(pymongo_base.Connection): function (key, values) { %(aggregate_initial_val)s var res = { unit: values[0].unit, + aggregate: values[0].aggregate, %(aggregate_body_val)s groupby: values[0].groupby, period_start: values[0].period_start, diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 84e05b6a1c..96c98afcee 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -129,7 +129,7 @@ PARAMETERIZED_AGGREGATES = dict( compute=dict( cardinality=lambda p: func.count( distinct(getattr(models.Sample, p)) - ).label('cardinality') + ).label('cardinality/%s' % p) ) ) @@ -732,10 +732,10 @@ class Connection(base.Connection): if hasattr(result, attr): stats_args[attr] = getattr(result, attr) if aggregate: - stats_args['aggregate'] = dict( - ('%s%s' % (a.func, '/%s' % a.param if a.param else ''), - getattr(result, a.func)) for a in aggregate - ) + stats_args['aggregate'] = {} + for a in aggregate: + key = '%s%s' % (a.func, '/%s' % a.param if a.param else '') + stats_args['aggregate'][key] = getattr(result, key) return stats_args @staticmethod diff --git a/ceilometer/tests/api/__init__.py b/ceilometer/tests/api/__init__.py index b6a6d79d42..eee701a16b 100644 --- a/ceilometer/tests/api/__init__.py +++ b/ceilometer/tests/api/__init__.py @@ -179,7 +179,7 @@ class FunctionalTest(db_test_base.TestBase): def get_json(self, path, expect_errors=False, headers=None, extra_environ=None, q=[], groupby=[], status=None, - **params): + override_params=None, **params): """Sends simulated HTTP GET request to Pecan test app. :param path: url path of target service @@ -192,23 +192,27 @@ class FunctionalTest(db_test_base.TestBase): keys :param groupby: list of fields to group by :param status: Expected status code of response + :param override_params: literally encoded query param string :param params: content for wsgi.input of request """ full_path = self.PATH_PREFIX + path - query_params = {'q.field': [], - 'q.value': [], - 'q.op': [], - 'q.type': [], - } - for query in q: - for name in ['field', 'op', 'value', 'type']: - query_params['q.%s' % name].append(query.get(name, '')) - all_params = {} - all_params.update(params) - if q: - all_params.update(query_params) - if groupby: - all_params.update({'groupby': groupby}) + if override_params: + all_params = override_params + else: + query_params = {'q.field': [], + 'q.value': [], + 'q.op': [], + 'q.type': [], + } + for query in q: + for name in ['field', 'op', 'value', 'type']: + query_params['q.%s' % name].append(query.get(name, '')) + all_params = {} + all_params.update(params) + if q: + all_params.update(query_params) + if groupby: + all_params.update({'groupby': groupby}) response = self.app.get(full_path, params=all_params, headers=headers, diff --git a/ceilometer/tests/api/v2/test_statistics_scenarios.py b/ceilometer/tests/api/v2/test_statistics_scenarios.py index a363aa8a71..41aeecbf09 100644 --- a/ceilometer/tests/api/v2/test_statistics_scenarios.py +++ b/ceilometer/tests/api/v2/test_statistics_scenarios.py @@ -1506,6 +1506,60 @@ class TestSelectableAggregates(FunctionalTest, for a in standard_aggregates: self.assertNotIn(a, r) + def test_repeated_unparameterized_aggregate(self): + agg_params = 'aggregate.func=count&aggregate.func=count' + data = self.get_json(self.PATH, override_params=agg_params) + + aggregate = 'count' + expected_value = 8.0 + standard_aggregates = set(['min', 'max', 'sum', 'avg']) + r = data[0] + self.assertIn(aggregate, r) + self.assertEqual(expected_value, r[aggregate]) + self.assertIn('aggregate', r) + self.assertIn(aggregate, r['aggregate']) + self.assertEqual(expected_value, r['aggregate'][aggregate]) + for a in standard_aggregates: + self.assertNotIn(a, r) + + def test_fully_repeated_parameterized_aggregate(self): + agg_params = ('aggregate.func=cardinality&' + 'aggregate.param=resource_id&' + 'aggregate.func=cardinality&' + 'aggregate.param=resource_id&') + data = self.get_json(self.PATH, override_params=agg_params) + + aggregate = 'cardinality/resource_id' + expected_value = 5.0 + standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg']) + r = data[0] + self.assertIn('aggregate', r) + self.assertNotIn(aggregate, r) + self.assertIn(aggregate, r['aggregate']) + self.assertEqual(expected_value, r['aggregate'][aggregate]) + for a in standard_aggregates: + self.assertNotIn(a, r) + + def test_partially_repeated_parameterized_aggregate(self): + agg_params = ('aggregate.func=cardinality&' + 'aggregate.param=resource_id&' + 'aggregate.func=cardinality&' + 'aggregate.param=project_id&') + data = self.get_json(self.PATH, override_params=agg_params) + + expected_values = {'cardinality/resource_id': 5.0, + 'cardinality/project_id': 3.0} + standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg']) + r = data[0] + self.assertIn('aggregate', r) + for aggregate in expected_values.keys(): + self.assertNotIn(aggregate, r) + self.assertIn(aggregate, r['aggregate']) + self.assertEqual(expected_values[aggregate], + r['aggregate'][aggregate]) + for a in standard_aggregates: + self.assertNotIn(a, r) + def test_bad_selectable_parameterized_aggregate(self): agg_args = {'aggregate.func': 'cardinality', 'aggregate.param': 'injection_attack'} diff --git a/ceilometer/utils.py b/ceilometer/utils.py index 0326395dcb..615764278d 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -155,3 +155,15 @@ def cpu_count(): return multiprocessing.cpu_count() or 1 except NotImplementedError: return 1 + + +def uniq(dupes, attrs): + """Exclude elements of dupes with a duplicated set of attribute values.""" + key = lambda d: '/'.join([getattr(d, a) or '' for a in attrs]) + keys = [] + deduped = [] + for d in dupes: + if key(d) not in keys: + deduped.append(d) + keys.append(key(d)) + return deduped