Implementation of statistics aggregators

This corresponds to do API calls on:
/v2/meters/meter-name/statistics?aggregate.func=func-name

Usage:
aggregates = [{'func': 'cardinality', 'param': 'resource_id'}])
client.statistics.list(meter_name="instance", aggregates=aggregates)

CLI:
ceilometer statistics -m instance -a "cardinality<-resource_id"

Change-Id: I0096668585a5c7e7985973f07049eb91f44413fe
This commit is contained in:
Michał Jastrzębski
2014-03-13 15:48:03 +01:00
parent b99547b4d3
commit fcfffacd3b
4 changed files with 257 additions and 15 deletions

View File

@@ -27,6 +27,7 @@ from ceilometerclient.tests import utils
from ceilometerclient.v2 import alarms
from ceilometerclient.v2 import samples
from ceilometerclient.v2 import shell as ceilometer_shell
from ceilometerclient.v2 import statistics
class ShellAlarmStateCommandsTest(utils.BaseTestCase):
@@ -634,3 +635,156 @@ rule change | {"threshold": 42.0, "evaluation_periods": 4} | 2014-03-11T16:0\
------------+----------------------------------------------+----------------\
------------+
''', output.getvalue())
class ShellStatisticsTest(utils.BaseTestCase):
def setUp(self):
super(ShellStatisticsTest, self).setUp()
self.cc = mock.Mock()
self.displays = {
'duration': 'Duration',
'duration_end': 'Duration End',
'duration_start': 'Duration Start',
'period': 'Period',
'period_end': 'Period End',
'period_start': 'Period Start',
'groupby': 'Group By',
'avg': 'Avg',
'count': 'Count',
'max': 'Max',
'min': 'Min',
'sum': 'Sum',
'stddev': 'Standard deviation',
'cardinality': 'Cardinality'
}
self.args = mock.Mock()
self.args.meter_name = 'instance'
self.args.aggregate = []
self.args.groupby = None
self.args.query = None
def test_statistics_list_simple(self):
samples = [
{u'count': 135,
u'duration_start': u'2013-02-04T10:51:42',
u'min': 1.0,
u'max': 1.0,
u'duration_end':
u'2013-02-05T15:46:09',
u'duration': 1734.0,
u'avg': 1.0,
u'sum': 135.0},
]
fields = [
'period',
'period_start',
'period_end',
'max',
'min',
'avg',
'sum',
'count',
'duration',
'duration_start',
'duration_end',
]
statistics_ret = [
statistics.Statistics(mock.Mock(), sample) for sample in samples
]
self.cc.statistics.list.return_value = statistics_ret
with mock.patch('ceilometerclient.v2.shell.utils.print_list') as pmock:
ceilometer_shell.do_statistics(self.cc, self.args)
pmock.assert_called_with(
statistics_ret,
fields,
[self.displays[f] for f in fields]
)
def test_statistics_list_groupby(self):
samples = [
{u'count': 135,
u'duration_start': u'2013-02-04T10:51:42',
u'min': 1.0,
u'max': 1.0,
u'duration_end':
u'2013-02-05T15:46:09',
u'duration': 1734.0,
u'avg': 1.0,
u'sum': 135.0,
u'groupby': {u'resource_id': u'foo'}
},
{u'count': 12,
u'duration_start': u'2013-02-04T10:51:42',
u'min': 1.0,
u'max': 1.0,
u'duration_end':
u'2013-02-05T15:46:09',
u'duration': 1734.0,
u'avg': 1.0,
u'sum': 12.0,
u'groupby': {u'resource_id': u'bar'}
},
]
fields = [
'period',
'period_start',
'period_end',
'groupby',
'max',
'min',
'avg',
'sum',
'count',
'duration',
'duration_start',
'duration_end',
]
self.args.groupby = 'resource_id'
statistics_ret = [
statistics.Statistics(mock.Mock(), sample) for sample in samples
]
self.cc.statistics.list.return_value = statistics_ret
with mock.patch('ceilometerclient.v2.shell.utils.print_list') as pmock:
ceilometer_shell.do_statistics(self.cc, self.args)
pmock.assert_called_with(
statistics_ret,
fields,
[self.displays[f] for f in fields],
)
def test_statistics_list_aggregates(self):
samples = [
{u'aggregate': {u'cardinality/resource_id': 4.0, u'count': 2.0},
u'count': 2,
u'duration': 0.442451,
u'duration_end': u'2014-03-12T14:00:21.774154',
u'duration_start': u'2014-03-12T14:00:21.331703',
u'groupby': None,
u'period': 0,
u'period_end': u'2014-03-12T14:00:21.774154',
u'period_start': u'2014-03-12T14:00:21.331703',
u'unit': u'instance',
},
]
fields = [
'period',
'period_start',
'period_end',
'count',
'cardinality/resource_id',
'duration',
'duration_start',
'duration_end',
]
self.args.aggregate = ['count', 'cardinality<-resource_id']
statistics_ret = [
statistics.Statistics(mock.Mock(), sample) for sample in samples
]
self.cc.statistics.list.return_value = statistics_ret
with mock.patch('ceilometerclient.v2.shell.utils.print_list') as pmock:
ceilometer_shell.do_statistics(self.cc, self.args)
pmock.assert_called_with(
statistics_ret,
fields,
[self.displays.get(f, f) for f in fields],
)

View File

@@ -21,6 +21,8 @@ qry = ('q.field=resource_id&q.field=source&q.op=&q.op='
'&q.type=&q.type=&q.value=foo&q.value=bar')
period = '&period=60'
groupby = '&groupby=resource_id'
aggregate_query = ("aggregate.func=cardinality&aggregate.param=resource_id"
"&aggregate.func=count")
samples = [
{u'count': 135,
u'duration_start': u'2013-02-04T10:51:42',
@@ -56,6 +58,19 @@ groupby_samples = [
u'groupby': {u'resource_id': u'bar'}
},
]
aggregate_samples = [
{u'aggregate': {u'cardinality/resource_id': 4.0, u'count': 2.0},
u'count': 2,
u'duration': 0.442451,
u'duration_end': u'2014-03-12T14:00:21.774154',
u'duration_start': u'2014-03-12T14:00:21.331703',
u'groupby': None,
u'period': 0,
u'period_end': u'2014-03-12T14:00:21.774154',
u'period_start': u'2014-03-12T14:00:21.331703',
u'unit': u'instance',
},
]
fixtures = {
base_url:
{
@@ -85,6 +100,13 @@ fixtures = {
groupby_samples
),
},
'%s?%s' % (base_url, aggregate_query):
{
'GET': (
{},
aggregate_samples
),
}
}
@@ -156,3 +178,27 @@ class StatisticsManagerTest(utils.BaseTestCase):
self.assertEqual(stats[1].count, 12)
self.assertEqual(stats[0].groupby.get('resource_id'), 'foo')
self.assertEqual(stats[1].groupby.get('resource_id'), 'bar')
def test_list_by_meter_name_with_aggregates(self):
aggregates = [
{
'func': 'cardinality',
'param': 'resource_id',
},
{
'func': 'count',
}
]
stats = list(self.mgr.list(meter_name='instance',
aggregates=aggregates))
expect = [
('GET',
'%s?%s' % (base_url, aggregate_query), {}, None),
]
self.assertEqual(expect, self.api.calls)
self.assertEqual(1, len(stats))
self.assertEqual(2, stats[0].count)
self.assertEqual(2.0, stats[0].aggregate.get('count'))
self.assertEqual(4.0, stats[0].aggregate.get(
'cardinality/resource_id',
))

View File

@@ -33,6 +33,13 @@ ALARM_STATES = ['ok', 'alarm', 'insufficient_data']
ALARM_OPERATORS = ['lt', 'le', 'eq', 'ne', 'ge', 'gt']
ALARM_COMBINATION_OPERATORS = ['and', 'or']
STATISTICS = ['max', 'min', 'avg', 'sum', 'count']
AGGREGATES = {'avg': 'Avg',
'count': 'Count',
'max': 'Max',
'min': 'Min',
'sum': 'Sum',
'stddev': 'Standard deviation',
'cardinality': 'Cardinality'}
OPERATORS_STRING = dict(gt='>', ge='>=',
lt='<', le="<=",
eq='==', ne='!=')
@@ -49,28 +56,50 @@ SIMPLE_OPERATORS = ["=", "!=", "<", "<=", '>', '>=']
@utils.arg('-p', '--period', metavar='<PERIOD>',
help='Period in seconds over which to group samples.')
@utils.arg('-g', '--groupby', metavar='<FIELD>', action='append',
help='Field for group aggregation.')
help='Field for group by.')
@utils.arg('-a', '--aggregate', metavar='<FUNC>[<-<PARAM>]', action='append',
default=[], help=('Function for data aggregation. '
'Available aggregates are: '
'%s.' % ", ".join(AGGREGATES.keys())))
def do_statistics(cc, args):
'''List the statistics for a meter.'''
fields = {'meter_name': args.meter,
'q': options.cli_to_array(args.query),
'period': args.period,
'groupby': args.groupby}
aggregates = []
for a in args.aggregate:
aggregates.append(dict(zip(('func', 'param'), a.split("<-"))))
api_args = {'meter_name': args.meter,
'q': options.cli_to_array(args.query),
'period': args.period,
'groupby': args.groupby,
'aggregates': aggregates}
try:
statistics = cc.statistics.list(**fields)
statistics = cc.statistics.list(**api_args)
except exc.HTTPNotFound:
raise exc.CommandError('Samples not found: %s' % args.meter)
else:
field_labels = ['Period', 'Period Start', 'Period End',
'Count', 'Min', 'Max', 'Sum', 'Avg',
'Duration', 'Duration Start', 'Duration End']
fields = ['period', 'period_start', 'period_end',
'count', 'min', 'max', 'sum', 'avg',
'duration', 'duration_start', 'duration_end']
fields_display = {'duration': 'Duration',
'duration_end': 'Duration End',
'duration_start': 'Duration Start',
'period': 'Period',
'period_end': 'Period End',
'period_start': 'Period Start',
'groupby': 'Group By'}
fields_display.update(AGGREGATES)
fields = ['period', 'period_start', 'period_end']
if args.groupby:
field_labels.append('Group By')
fields.append('groupby')
utils.print_list(statistics, fields, field_labels)
if args.aggregate:
for a in aggregates:
if 'param' in a:
fields.append("%(func)s/%(param)s" % a)
else:
fields.append(a['func'])
for stat in statistics:
stat.__dict__.update(stat.aggregate)
else:
fields.extend(['max', 'min', 'avg', 'sum', 'count'])
fields.extend(['duration', 'duration_start', 'duration_end'])
cols = [fields_display.get(f, f) for f in fields]
utils.print_list(statistics, fields, cols)
@utils.arg('-q', '--query', metavar='<QUERY>',

View File

@@ -23,9 +23,22 @@ class Statistics(base.Resource):
class StatisticsManager(base.Manager):
resource_class = Statistics
def list(self, meter_name, q=None, period=None, groupby=[]):
def _build_aggregates(self, aggregates):
url_aggregates = []
for aggregate in aggregates:
url_aggregates.append(
"aggregate.func=%(func)s" % aggregate
)
if 'param' in aggregate:
url_aggregates.append(
"aggregate.param=%(param)s" % aggregate
)
return url_aggregates
def list(self, meter_name, q=None, period=None, groupby=[], aggregates=[]):
p = ['period=%s' % period] if period else []
p.extend(['groupby=%s' % g for g in groupby] if groupby else [])
p.extend(self._build_aggregates(aggregates))
return self._list(options.build_url(
'/v2/meters/' + meter_name + '/statistics',
q, p))