Merge "Wider selection of aggregates for sqlalchemy"

This commit is contained in:
Jenkins
2014-03-04 18:42:55 +00:00
committed by Gerrit Code Review
2 changed files with 196 additions and 25 deletions

View File

@@ -26,6 +26,7 @@ import types
from sqlalchemy import and_ from sqlalchemy import and_
from sqlalchemy import asc from sqlalchemy import asc
from sqlalchemy import desc from sqlalchemy import desc
from sqlalchemy import distinct
from sqlalchemy import func from sqlalchemy import func
from sqlalchemy import not_ from sqlalchemy import not_
from sqlalchemy import or_ from sqlalchemy import or_
@@ -36,6 +37,7 @@ import ceilometer.openstack.common.db.sqlalchemy.session as sqlalchemy_session
from ceilometer.openstack.common.gettextutils import _ # noqa from ceilometer.openstack.common.gettextutils import _ # noqa
from ceilometer.openstack.common import log from ceilometer.openstack.common import log
from ceilometer.openstack.common import timeutils from ceilometer.openstack.common import timeutils
from ceilometer import storage
from ceilometer.storage import base from ceilometer.storage import base
from ceilometer.storage import models as api_models from ceilometer.storage import models as api_models
from ceilometer.storage.sqlalchemy import migration from ceilometer.storage.sqlalchemy import migration
@@ -116,6 +118,21 @@ STANDARD_AGGREGATES = dict(
count=func.count(models.Sample.volume).label('count') count=func.count(models.Sample.volume).label('count')
) )
UNPARAMETERIZED_AGGREGATES = dict(
stddev=func.stddev_pop(models.Sample.volume).label('stddev')
)
PARAMETERIZED_AGGREGATES = dict(
validate=dict(
cardinality=lambda p: p in ['resource_id', 'user_id', 'project_id']
),
compute=dict(
cardinality=lambda p: func.count(
distinct(getattr(models.Sample, p))
).label('cardinality')
)
)
def apply_metaquery_filter(session, query, metaquery): def apply_metaquery_filter(session, query, metaquery):
"""Apply provided metaquery filter to existing query. """Apply provided metaquery filter to existing query.
@@ -652,6 +669,15 @@ class Connection(base.Connection):
for a in aggregate: for a in aggregate:
if a.func in STANDARD_AGGREGATES: if a.func in STANDARD_AGGREGATES:
functions.append(STANDARD_AGGREGATES[a.func]) functions.append(STANDARD_AGGREGATES[a.func])
elif a.func in UNPARAMETERIZED_AGGREGATES:
functions.append(UNPARAMETERIZED_AGGREGATES[a.func])
elif a.func in PARAMETERIZED_AGGREGATES['compute']:
validate = PARAMETERIZED_AGGREGATES['validate'].get(a.func)
if not (validate and validate(a.param)):
raise storage.StorageBadAggregate('Bad aggregate: %s.%s'
% (a.func, a.param))
compute = PARAMETERIZED_AGGREGATES['compute'][a.func]
functions.append(compute(a.param))
else: else:
raise NotImplementedError(_('Selectable aggregate function %s' raise NotImplementedError(_('Selectable aggregate function %s'
' is not supported') % a.func) ' is not supported') % a.func)
@@ -1345,7 +1371,9 @@ class QueryTransformer(object):
'min': True, 'min': True,
'sum': True, 'sum': True,
'avg': True, 'avg': True,
'count': True}} 'count': True,
'stddev': True,
'cardinality': True}}
}, },
'alarms': {'query': {'simple': True, 'alarms': {'query': {'simple': True,
'complex': True}, 'complex': True},

View File

@@ -1368,6 +1368,10 @@ class TestSelectableAggregates(FunctionalTest,
'resource': 'resource-3', 'timestamp': (2013, 8, 1, 11, 22), 'resource': 'resource-3', 'timestamp': (2013, 8, 1, 11, 22),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2', 'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'}, 'source': 'source'},
{'volume': 9, 'user': 'user-3', 'project': 'project-3',
'resource': 'resource-4', 'timestamp': (2013, 8, 1, 11, 59),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-3',
'source': 'source'},
) )
for test_sample in test_sample_data: for test_sample in test_sample_data:
@@ -1400,22 +1404,15 @@ class TestSelectableAggregates(FunctionalTest,
groupby_vals_set = set(x for sub_dict in data groupby_vals_set = set(x for sub_dict in data
for x in sub_dict['groupby'].values()) for x in sub_dict['groupby'].values())
self.assertEqual(groupby_keys_set, set(['project_id'])) self.assertEqual(groupby_keys_set, set(['project_id']))
self.assertEqual(groupby_vals_set, set(['project-1', 'project-2'])) projects = ['project-1', 'project-2', 'project-3']
self.assertEqual(groupby_vals_set, set(projects))
standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg']) standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg'])
for r in data: for r in data:
grp = r['groupby'] grp = r['groupby']
if grp == {'project_id': 'project-1'}: for project in projects:
expected = expected_values[0] if grp == {'project_id': project}:
self.assertEqual(r['unit'], 'instance') expected = expected_values[projects.index(project)]
self.assertAlmostEqual(r[aggregate], expected)
self.assertIn('aggregate', r)
self.assertIn(aggregate, r['aggregate'])
self.assertAlmostEqual(r['aggregate'][aggregate], expected)
for a in standard_aggregates - set([aggregate]):
self.assertNotIn(a, r)
elif grp == {'project_id': 'project-2'}:
expected = expected_values[1]
self.assertEqual(r['unit'], 'instance') self.assertEqual(r['unit'], 'instance')
self.assertAlmostEqual(r[aggregate], expected) self.assertAlmostEqual(r[aggregate], expected)
self.assertIn('aggregate', r) self.assertIn('aggregate', r)
@@ -1426,20 +1423,166 @@ class TestSelectableAggregates(FunctionalTest,
def test_per_tenant_selectable_max(self): def test_per_tenant_selectable_max(self):
self._do_test_per_tenant_selectable_standard_aggregate('max', self._do_test_per_tenant_selectable_standard_aggregate('max',
[5, 4]) [5, 4, 9])
def test_per_tenant_selectable_min(self): def test_per_tenant_selectable_min(self):
self._do_test_per_tenant_selectable_standard_aggregate('min', self._do_test_per_tenant_selectable_standard_aggregate('min',
[2, 1]) [2, 1, 9])
def test_per_tenant_selectable_sum(self): def test_per_tenant_selectable_sum(self):
self._do_test_per_tenant_selectable_standard_aggregate('sum', self._do_test_per_tenant_selectable_standard_aggregate('sum',
[9, 9]) [9, 9, 9])
def test_per_tenant_selectable_avg(self): def test_per_tenant_selectable_avg(self):
self._do_test_per_tenant_selectable_standard_aggregate('avg', self._do_test_per_tenant_selectable_standard_aggregate('avg',
[3, 2.25]) [3, 2.25, 9])
def test_per_tenant_selectable_count(self): def test_per_tenant_selectable_count(self):
self._do_test_per_tenant_selectable_standard_aggregate('count', self._do_test_per_tenant_selectable_standard_aggregate('count',
[3, 4]) [3, 4, 1])
def test_per_tenant_selectable_parameterized_aggregate(self):
agg_args = {'aggregate.func': 'cardinality',
'aggregate.param': 'resource_id'}
data = self.get_json(self.PATH, groupby=['project_id'], **agg_args)
groupby_keys_set = set(x for sub_dict in data
for x in sub_dict['groupby'].keys())
groupby_vals_set = set(x for sub_dict in data
for x in sub_dict['groupby'].values())
self.assertEqual(groupby_keys_set, set(['project_id']))
projects = ['project-1', 'project-2', 'project-3']
self.assertEqual(groupby_vals_set, set(projects))
aggregate = 'cardinality/resource_id'
expected_values = [2.0, 3.0, 1.0]
standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg'])
for r in data:
grp = r['groupby']
for project in projects:
if grp == {'project_id': project}:
expected = expected_values[projects.index(project)]
self.assertEqual(r['unit'], 'instance')
self.assertNotIn(aggregate, r)
self.assertIn('aggregate', r)
self.assertIn(aggregate, r['aggregate'])
self.assertEqual(r['aggregate'][aggregate], expected)
for a in standard_aggregates:
self.assertNotIn(a, r)
def test_bad_selectable_parameterized_aggregate(self):
agg_args = {'aggregate.func': 'cardinality',
'aggregate.param': 'injection_attack'}
resp = self.get_json(self.PATH, expect_errors=True,
groupby=['project_id'], **agg_args)
self.assertTrue(400, resp.status_code)
self.assertTrue('error_message' in resp.json)
self.assertEqual(resp.json['error_message'].get('faultstring'),
'Bad aggregate: cardinality.injection_attack')
class TestUnparameterizedAggregates(FunctionalTest,
tests_db.MixinTestsWithBackendScenarios):
# We put the stddev test case in a separate class so that we
# can easily exclude the sqlalchemy scenario, as sqlite doesn't
# support the stddev_pop function and fails ungracefully with
# OperationalError when it is used. However we still want to
# test the corresponding functionality in the mongo driver.
# For hbase & db2, the skip on NotImplementedError logic works
# in the usual way.
scenarios = [
('mongodb',
dict(database_connection=tests_db.MongoDBFakeConnectionUrl())),
('hbase', dict(database_connection=tests_db.HBaseFakeConnectionUrl())),
('db2', dict(database_connection=tests_db.DB2FakeConnectionUrl())),
]
PATH = '/meters/instance/statistics'
def setUp(self):
super(TestUnparameterizedAggregates, self).setUp()
test_sample_data = (
{'volume': 2, 'user': 'user-1', 'project': 'project-1',
'resource': 'resource-1', 'timestamp': (2013, 8, 1, 16, 10),
'metadata_flavor': 'm1.tiny', 'metadata_event': 'event-1',
'source': 'source'},
{'volume': 2, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-3', 'timestamp': (2013, 8, 1, 15, 37),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-1',
'source': 'source'},
{'volume': 1, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-5', 'timestamp': (2013, 8, 1, 10, 11),
'metadata_flavor': 'm1.medium', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 2, 'user': 'user-1', 'project': 'project-1',
'resource': 'resource-2', 'timestamp': (2013, 8, 1, 10, 40),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 2, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-4', 'timestamp': (2013, 8, 1, 14, 59),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 5, 'user': 'user-1', 'project': 'project-1',
'resource': 'resource-2', 'timestamp': (2013, 8, 1, 17, 28),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 4, 'user': 'user-2', 'project': 'project-2',
'resource': 'resource-3', 'timestamp': (2013, 8, 1, 11, 22),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-2',
'source': 'source'},
{'volume': 9, 'user': 'user-3', 'project': 'project-3',
'resource': 'resource-4', 'timestamp': (2013, 8, 1, 11, 59),
'metadata_flavor': 'm1.large', 'metadata_event': 'event-3',
'source': 'source'},
)
for test_sample in test_sample_data:
c = sample.Sample(
'instance',
sample.TYPE_GAUGE,
unit='instance',
volume=test_sample['volume'],
user_id=test_sample['user'],
project_id=test_sample['project'],
resource_id=test_sample['resource'],
timestamp=datetime.datetime(*test_sample['timestamp']),
resource_metadata={'flavor': test_sample['metadata_flavor'],
'event': test_sample['metadata_event'], },
source=test_sample['source'],
)
msg = utils.meter_message_from_counter(
c,
self.CONF.publisher.metering_secret,
)
self.conn.record_metering_data(msg)
def test_per_tenant_selectable_unparameterized_aggregate(self):
agg_args = {'aggregate.func': 'stddev'}
data = self.get_json(self.PATH, groupby=['project_id'], **agg_args)
groupby_keys_set = set(x for sub_dict in data
for x in sub_dict['groupby'].keys())
groupby_vals_set = set(x for sub_dict in data
for x in sub_dict['groupby'].values())
self.assertEqual(groupby_keys_set, set(['project_id']))
projects = ['project-1', 'project-2', 'project-3']
self.assertEqual(groupby_vals_set, set(projects))
aggregate = 'stddev'
expected_values = [1.4142, 1.0897, 0.0]
standard_aggregates = set(['count', 'min', 'max', 'sum', 'avg'])
for r in data:
grp = r['groupby']
for project in projects:
if grp == {'project_id': project}:
expected = expected_values[projects.index(project)]
self.assertEqual(r['unit'], 'instance')
self.assertNotIn(aggregate, r)
self.assertIn('aggregate', r)
self.assertIn(aggregate, r['aggregate'])
self.assertAlmostEqual(r['aggregate'][aggregate],
expected,
places=4)
for a in standard_aggregates:
self.assertNotIn(a, r)