diff --git a/ceilometer/api/controllers/v2.py b/ceilometer/api/controllers/v2.py index 5058c2454f..9dc00200dd 100644 --- a/ceilometer/api/controllers/v2.py +++ b/ceilometer/api/controllers/v2.py @@ -899,6 +899,8 @@ class MeterController(rest.RestController): kwargs['meter'] = self.meter_name f = storage.SampleFilter(**kwargs) g = _validate_groupby_fields(groupby) + + aggregate = utils.uniq(aggregate, ['func', 'param']) computed = pecan.request.storage_conn.get_meter_statistics(f, period, g, diff --git a/ceilometer/storage/impl_mongodb.py b/ceilometer/storage/impl_mongodb.py index f9491c10c5..f871af0178 100644 --- a/ceilometer/storage/impl_mongodb.py +++ b/ceilometer/storage/impl_mongodb.py @@ -217,16 +217,15 @@ class Connection(pymongo_base.Connection): ), emit_initial=dict( cardinality=( - 'var aggregate = {};' 'aggregate["cardinality/%(aggregate_param)s"] = 1;' - 'var distincts = {};' - 'distincts[this["%(aggregate_param)s"]] = true;' + 'var distinct_%(aggregate_param)s = {};' + 'distinct_%(aggregate_param)s[this["%(aggregate_param)s"]]' + ' = true;' ) ), emit_body=dict( cardinality=( - 'aggregate : aggregate,' - 'distincts : distincts ,' + 'distinct_%(aggregate_param)s : distinct_%(aggregate_param)s,' '%(aggregate_param)s : this["%(aggregate_param)s"],' ) ), @@ -236,14 +235,17 @@ class Connection(pymongo_base.Connection): reduce_body=dict( cardinality=( 'aggregate : values[0].aggregate,' - 'distincts: values[0].distincts,' + 'distinct_%(aggregate_param)s:' + ' values[0].distinct_%(aggregate_param)s,' '%(aggregate_param)s : values[0]["%(aggregate_param)s"],' ) ), reduce_computation=dict( cardinality=( - 'if (!(values[i]["%(aggregate_param)s"] in res.distincts)) {' - ' res.distincts[values[i]["%(aggregate_param)s"]] = true;' + 'if (!(values[i]["%(aggregate_param)s"] in' + ' res.distinct_%(aggregate_param)s)) {' + ' res.distinct_%(aggregate_param)s[values[i]' + ' ["%(aggregate_param)s"]] = true;' ' res.aggregate["cardinality/%(aggregate_param)s"] += 1;}' ) ), @@ -253,8 +255,10 @@ class Connection(pymongo_base.Connection): ) EMIT_STATS_COMMON = """ + var aggregate = {}; %(aggregate_initial_placeholder)s emit(%(key_val)s, { unit: this.counter_unit, + aggregate : aggregate, %(aggregate_body_placeholder)s groupby : %(groupby_val)s, duration_start : this.timestamp, @@ -347,6 +351,7 @@ class Connection(pymongo_base.Connection): function (key, values) { %(aggregate_initial_val)s var res = { unit: values[0].unit, + aggregate: values[0].aggregate, %(aggregate_body_val)s groupby: values[0].groupby, period_start: values[0].period_start, diff --git a/ceilometer/storage/impl_sqlalchemy.py b/ceilometer/storage/impl_sqlalchemy.py index 84e05b6a1c..96c98afcee 100644 --- a/ceilometer/storage/impl_sqlalchemy.py +++ b/ceilometer/storage/impl_sqlalchemy.py @@ -129,7 +129,7 @@ PARAMETERIZED_AGGREGATES = dict( compute=dict( cardinality=lambda p: func.count( distinct(getattr(models.Sample, p)) - ).label('cardinality') + ).label('cardinality/%s' % p) ) ) @@ -732,10 +732,10 @@ class Connection(base.Connection): if hasattr(result, attr): stats_args[attr] = getattr(result, attr) if aggregate: - stats_args['aggregate'] = dict( - ('%s%s' % (a.func, '/%s' % a.param if a.param else ''), - getattr(result, a.func)) for a in aggregate - ) + stats_args['aggregate'] = {} + for a in aggregate: + key = '%s%s' % (a.func, '/%s' % a.param if a.param else '') + stats_args['aggregate'][key] = getattr(result, key) return stats_args @staticmethod diff --git a/ceilometer/tests/api/__init__.py b/ceilometer/tests/api/__init__.py index b6a6d79d42..eee701a16b 100644 --- a/ceilometer/tests/api/__init__.py +++ b/ceilometer/tests/api/__init__.py @@ -179,7 +179,7 @@ class FunctionalTest(db_test_base.TestBase): def get_json(self, path, expect_errors=False, headers=None, extra_environ=None, q=[], groupby=[], status=None, - **params): + override_params=None, **params): """Sends simulated HTTP GET request to Pecan test app. :param path: url path of target service @@ -192,23 +192,27 @@ class FunctionalTest(db_test_base.TestBase): keys :param groupby: list of fields to group by :param status: Expected status code of response + :param override_params: literally encoded query param string :param params: content for wsgi.input of request """ full_path = self.PATH_PREFIX + path - query_params = {'q.field': [], - 'q.value': [], - 'q.op': [], - 'q.type': [], - } - for query in q: - for name in ['field', 'op', 'value', 'type']: - query_params['q.%s' % name].append(query.get(name, '')) - all_params = {} - all_params.update(params) - if q: - all_params.update(query_params) - if groupby: - all_params.update({'groupby': groupby}) + if override_params: + all_params = override_params + else: + query_params = {'q.field': [], + 'q.value': [], + 'q.op': [], + 'q.type': [], + } + for query in q: + for name in ['field', 'op', 'value', 'type']: + query_params['q.%s' % name].append(query.get(name, '')) + all_params = {} + all_params.update(params) + if q: + all_params.update(query_params) + if groupby: + all_params.update({'groupby': groupby}) response = self.app.get(full_path, params=all_params, headers=headers, diff --git a/ceilometer/tests/api/v2/test_statistics_scenarios.py b/ceilometer/tests/api/v2/test_statistics_scenarios.py index a363aa8a71..41aeecbf09 100644 --- a/ceilometer/tests/api/v2/test_statistics_scenarios.py +++ b/ceilometer/tests/api/v2/test_statistics_scenarios.py @@ -1506,6 +1506,60 @@ class TestSelectableAggregates(FunctionalTest, for a in standard_aggregates: self.assertNotIn(a, r) + def test_repeated_unparameterized_aggregate(self): + agg_params = 'aggregate.func=count&aggregate.func=count' + data = self.get_json(self.PATH, override_params=agg_params) + + aggregate = 'count' + expected_value = 8.0 + standard_aggregates = set(['min', 'max', 'sum', 'avg']) + r = data[0] + self.assertIn(aggregate, r) + self.assertEqual(expected_value, r[aggregate]) + self.assertIn('aggregate', r) + self.assertIn(aggregate, r['aggregate']) + self.assertEqual(expected_value, r['aggregate'][aggregate]) + for a in standard_aggregates: + self.assertNotIn(a, r) + + def test_fully_repeated_parameterized_aggregate(self): + agg_params = ('aggregate.func=cardinality&' + 'aggregate.param=resource_id&' + 'aggregate.func=cardinality&' + 'aggregate.param=resource_id&') + data = self.get_json(self.PATH, override_params=agg_params) + + aggregate = 'cardinality/resource_id' + expected_value = 5.0 + standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg']) + r = data[0] + self.assertIn('aggregate', r) + self.assertNotIn(aggregate, r) + self.assertIn(aggregate, r['aggregate']) + self.assertEqual(expected_value, r['aggregate'][aggregate]) + for a in standard_aggregates: + self.assertNotIn(a, r) + + def test_partially_repeated_parameterized_aggregate(self): + agg_params = ('aggregate.func=cardinality&' + 'aggregate.param=resource_id&' + 'aggregate.func=cardinality&' + 'aggregate.param=project_id&') + data = self.get_json(self.PATH, override_params=agg_params) + + expected_values = {'cardinality/resource_id': 5.0, + 'cardinality/project_id': 3.0} + standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg']) + r = data[0] + self.assertIn('aggregate', r) + for aggregate in expected_values.keys(): + self.assertNotIn(aggregate, r) + self.assertIn(aggregate, r['aggregate']) + self.assertEqual(expected_values[aggregate], + r['aggregate'][aggregate]) + for a in standard_aggregates: + self.assertNotIn(a, r) + def test_bad_selectable_parameterized_aggregate(self): agg_args = {'aggregate.func': 'cardinality', 'aggregate.param': 'injection_attack'} diff --git a/ceilometer/utils.py b/ceilometer/utils.py index 0326395dcb..615764278d 100644 --- a/ceilometer/utils.py +++ b/ceilometer/utils.py @@ -155,3 +155,15 @@ def cpu_count(): return multiprocessing.cpu_count() or 1 except NotImplementedError: return 1 + + +def uniq(dupes, attrs): + """Exclude elements of dupes with a duplicated set of attribute values.""" + key = lambda d: '/'.join([getattr(d, a) or '' for a in attrs]) + keys = [] + deduped = [] + for d in dupes: + if key(d) not in keys: + deduped.append(d) + keys.append(key(d)) + return deduped