De-dupe selectable aggregate list in statistics API

Fixes bug 1298514

Discard full duplicates from the selectable aggregate list passed
to the statistics API.

Partial duplicates (i.e. matching func but different param) are
now allowed and supported by the storage drivers.

Change-Id: I5e2780befa9e265d5f1deb71b2aa375d05ca2e64
This commit is contained in:
Eoghan Glynn 2014-03-27 17:02:42 +00:00
parent e8a7eed15c
commit ef2ccd3992
6 changed files with 105 additions and 28 deletions

View File

@ -899,6 +899,8 @@ class MeterController(rest.RestController):
kwargs['meter'] = self.meter_name
f = storage.SampleFilter(**kwargs)
g = _validate_groupby_fields(groupby)
aggregate = utils.uniq(aggregate, ['func', 'param'])
computed = pecan.request.storage_conn.get_meter_statistics(f,
period,
g,

View File

@ -217,16 +217,15 @@ class Connection(pymongo_base.Connection):
),
emit_initial=dict(
cardinality=(
'var aggregate = {};'
'aggregate["cardinality/%(aggregate_param)s"] = 1;'
'var distincts = {};'
'distincts[this["%(aggregate_param)s"]] = true;'
'var distinct_%(aggregate_param)s = {};'
'distinct_%(aggregate_param)s[this["%(aggregate_param)s"]]'
' = true;'
)
),
emit_body=dict(
cardinality=(
'aggregate : aggregate,'
'distincts : distincts ,'
'distinct_%(aggregate_param)s : distinct_%(aggregate_param)s,'
'%(aggregate_param)s : this["%(aggregate_param)s"],'
)
),
@ -236,14 +235,17 @@ class Connection(pymongo_base.Connection):
reduce_body=dict(
cardinality=(
'aggregate : values[0].aggregate,'
'distincts: values[0].distincts,'
'distinct_%(aggregate_param)s:'
' values[0].distinct_%(aggregate_param)s,'
'%(aggregate_param)s : values[0]["%(aggregate_param)s"],'
)
),
reduce_computation=dict(
cardinality=(
'if (!(values[i]["%(aggregate_param)s"] in res.distincts)) {'
' res.distincts[values[i]["%(aggregate_param)s"]] = true;'
'if (!(values[i]["%(aggregate_param)s"] in'
' res.distinct_%(aggregate_param)s)) {'
' res.distinct_%(aggregate_param)s[values[i]'
' ["%(aggregate_param)s"]] = true;'
' res.aggregate["cardinality/%(aggregate_param)s"] += 1;}'
)
),
@ -253,8 +255,10 @@ class Connection(pymongo_base.Connection):
)
EMIT_STATS_COMMON = """
var aggregate = {};
%(aggregate_initial_placeholder)s
emit(%(key_val)s, { unit: this.counter_unit,
aggregate : aggregate,
%(aggregate_body_placeholder)s
groupby : %(groupby_val)s,
duration_start : this.timestamp,
@ -347,6 +351,7 @@ class Connection(pymongo_base.Connection):
function (key, values) {
%(aggregate_initial_val)s
var res = { unit: values[0].unit,
aggregate: values[0].aggregate,
%(aggregate_body_val)s
groupby: values[0].groupby,
period_start: values[0].period_start,

View File

@ -129,7 +129,7 @@ PARAMETERIZED_AGGREGATES = dict(
compute=dict(
cardinality=lambda p: func.count(
distinct(getattr(models.Sample, p))
).label('cardinality')
).label('cardinality/%s' % p)
)
)
@ -732,10 +732,10 @@ class Connection(base.Connection):
if hasattr(result, attr):
stats_args[attr] = getattr(result, attr)
if aggregate:
stats_args['aggregate'] = dict(
('%s%s' % (a.func, '/%s' % a.param if a.param else ''),
getattr(result, a.func)) for a in aggregate
)
stats_args['aggregate'] = {}
for a in aggregate:
key = '%s%s' % (a.func, '/%s' % a.param if a.param else '')
stats_args['aggregate'][key] = getattr(result, key)
return stats_args
@staticmethod

View File

@ -179,7 +179,7 @@ class FunctionalTest(db_test_base.TestBase):
def get_json(self, path, expect_errors=False, headers=None,
extra_environ=None, q=[], groupby=[], status=None,
**params):
override_params=None, **params):
"""Sends simulated HTTP GET request to Pecan test app.
:param path: url path of target service
@ -192,23 +192,27 @@ class FunctionalTest(db_test_base.TestBase):
keys
:param groupby: list of fields to group by
:param status: Expected status code of response
:param override_params: literally encoded query param string
:param params: content for wsgi.input of request
"""
full_path = self.PATH_PREFIX + path
query_params = {'q.field': [],
'q.value': [],
'q.op': [],
'q.type': [],
}
for query in q:
for name in ['field', 'op', 'value', 'type']:
query_params['q.%s' % name].append(query.get(name, ''))
all_params = {}
all_params.update(params)
if q:
all_params.update(query_params)
if groupby:
all_params.update({'groupby': groupby})
if override_params:
all_params = override_params
else:
query_params = {'q.field': [],
'q.value': [],
'q.op': [],
'q.type': [],
}
for query in q:
for name in ['field', 'op', 'value', 'type']:
query_params['q.%s' % name].append(query.get(name, ''))
all_params = {}
all_params.update(params)
if q:
all_params.update(query_params)
if groupby:
all_params.update({'groupby': groupby})
response = self.app.get(full_path,
params=all_params,
headers=headers,

View File

@ -1506,6 +1506,60 @@ class TestSelectableAggregates(FunctionalTest,
for a in standard_aggregates:
self.assertNotIn(a, r)
def test_repeated_unparameterized_aggregate(self):
agg_params = 'aggregate.func=count&aggregate.func=count'
data = self.get_json(self.PATH, override_params=agg_params)
aggregate = 'count'
expected_value = 8.0
standard_aggregates = set(['min', 'max', 'sum', 'avg'])
r = data[0]
self.assertIn(aggregate, r)
self.assertEqual(expected_value, r[aggregate])
self.assertIn('aggregate', r)
self.assertIn(aggregate, r['aggregate'])
self.assertEqual(expected_value, r['aggregate'][aggregate])
for a in standard_aggregates:
self.assertNotIn(a, r)
def test_fully_repeated_parameterized_aggregate(self):
agg_params = ('aggregate.func=cardinality&'
'aggregate.param=resource_id&'
'aggregate.func=cardinality&'
'aggregate.param=resource_id&')
data = self.get_json(self.PATH, override_params=agg_params)
aggregate = 'cardinality/resource_id'
expected_value = 5.0
standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg'])
r = data[0]
self.assertIn('aggregate', r)
self.assertNotIn(aggregate, r)
self.assertIn(aggregate, r['aggregate'])
self.assertEqual(expected_value, r['aggregate'][aggregate])
for a in standard_aggregates:
self.assertNotIn(a, r)
def test_partially_repeated_parameterized_aggregate(self):
agg_params = ('aggregate.func=cardinality&'
'aggregate.param=resource_id&'
'aggregate.func=cardinality&'
'aggregate.param=project_id&')
data = self.get_json(self.PATH, override_params=agg_params)
expected_values = {'cardinality/resource_id': 5.0,
'cardinality/project_id': 3.0}
standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg'])
r = data[0]
self.assertIn('aggregate', r)
for aggregate in expected_values.keys():
self.assertNotIn(aggregate, r)
self.assertIn(aggregate, r['aggregate'])
self.assertEqual(expected_values[aggregate],
r['aggregate'][aggregate])
for a in standard_aggregates:
self.assertNotIn(a, r)
def test_bad_selectable_parameterized_aggregate(self):
agg_args = {'aggregate.func': 'cardinality',
'aggregate.param': 'injection_attack'}

View File

@ -155,3 +155,15 @@ def cpu_count():
return multiprocessing.cpu_count() or 1
except NotImplementedError:
return 1
def uniq(dupes, attrs):
"""Exclude elements of dupes with a duplicated set of attribute values."""
key = lambda d: '/'.join([getattr(d, a) or '' for a in attrs])
keys = []
deduped = []
for d in dupes:
if key(d) not in keys:
deduped.append(d)
keys.append(key(d))
return deduped