Merge "Custom fields in summary get API"

This commit is contained in:
Zuul 2020-12-22 17:18:51 +00:00 committed by Gerrit Code Review
commit 994edade52
12 changed files with 165 additions and 82 deletions

View File

@ -26,6 +26,7 @@ class Summary(base.BaseResource):
@api_utils.paginated @api_utils.paginated
@api_utils.add_input_schema('query', { @api_utils.add_input_schema('query', {
voluptuous.Optional('custom_fields'): api_utils.SingleQueryParam(str),
voluptuous.Optional('groupby'): api_utils.MultiQueryParam(str), voluptuous.Optional('groupby'): api_utils.MultiQueryParam(str),
voluptuous.Optional('filters'): voluptuous.Optional('filters'):
api_utils.SingleDictQueryParam(str, str), api_utils.SingleDictQueryParam(str, str),
@ -34,9 +35,8 @@ class Summary(base.BaseResource):
voluptuous.Optional('end'): api_utils.SingleQueryParam( voluptuous.Optional('end'): api_utils.SingleQueryParam(
tzutils.dt_from_iso), tzutils.dt_from_iso),
}) })
def get(self, groupby=None, filters={}, def get(self, custom_fields=None, groupby=None, filters={}, begin=None,
begin=None, end=None, end=None, offset=0, limit=100):
offset=0, limit=100):
policy.authorize( policy.authorize(
flask.request.context, flask.request.context,
'summary:get_summary', 'summary:get_summary',
@ -55,15 +55,20 @@ class Summary(base.BaseResource):
filters['project_id'] = flask.request.context.project_id filters['project_id'] = flask.request.context.project_id
metric_types = [filters.pop('type')] if 'type' in filters else None metric_types = [filters.pop('type')] if 'type' in filters else None
total = self._storage.total( arguments = {
begin=begin, end=end, 'begin': begin,
groupby=groupby, 'end': end,
filters=filters, 'groupby': groupby,
metric_types=metric_types, 'filters': filters,
offset=offset, 'metric_types': metric_types,
limit=limit, 'offset': offset,
paginate=True, 'limit': limit,
) 'paginate': True
}
if custom_fields:
arguments['custom_fields'] = custom_fields
total = self._storage.total(**arguments)
columns = [] columns = []
if len(total['results']) > 0: if len(total['results']) > 0:
columns = list(total['results'][0].keys()) columns = list(total['results'][0].keys())

View File

@ -127,28 +127,31 @@ class V1StorageAdapter(storage_v2.BaseStorage):
if end: if end:
elem['end'] = tzutils.utc_to_local(end) elem['end'] = tzutils.utc_to_local(end)
def total(self, groupby=None, def total(self, **arguments):
begin=None, end=None, filters = arguments.pop('filters', None)
metric_types=None, if filters:
filters=None, tenant_id = filters.get('project_id')
offset=0, limit=100, paginate=True):
tenant_id = filters.get('project_id') if filters else None
storage_gby = [] arguments['tenant_id'] = tenant_id
if groupby: else:
for elem in set(groupby): tenant_id = None
if elem == 'type':
storage_gby.append('res_type') groupby = arguments.get('groupby')
elif elem == 'project_id': storage_gby = self.get_storage_groupby(groupby)
storage_gby.append('tenant_id')
storage_gby = ','.join(storage_gby) if storage_gby else None metric_types = arguments.pop('metric_types', None)
metric_types = self._check_metric_types(metric_types) if metric_types:
total = self.storage.get_total( metric_types = self._check_metric_types(metric_types)
tzutils.local_to_utc(begin, naive=True), arguments['service'] = metric_types
tzutils.local_to_utc(end, naive=True),
tenant_id=tenant_id, arguments['begin'] = tzutils.local_to_utc(
service=metric_types, arguments['begin'], naive=True)
groupby=storage_gby) arguments['end'] = tzutils.local_to_utc(
arguments['end'], naive=True)
arguments['groupby'] = storage_gby
total = self.storage.get_total(**arguments)
for t in total: for t in total:
if t.get('tenant_id') is None: if t.get('tenant_id') is None:
@ -165,9 +168,19 @@ class V1StorageAdapter(storage_v2.BaseStorage):
'results': total, 'results': total,
} }
@staticmethod
def get_storage_groupby(groupby):
storage_gby = []
if groupby:
for elem in set(groupby):
if elem == 'type':
storage_gby.append('res_type')
elif elem == 'project_id':
storage_gby.append('tenant_id')
return ','.join(storage_gby) if storage_gby else None
def get_tenants(self, begin, end): def get_tenants(self, begin, end):
tenants = self.storage.get_tenants(begin, end) return self.storage.get_tenants(begin, end)
return tenants
def get_state(self, tenant_id=None): def get_state(self, tenant_id=None):
return self.storage.get_state(tenant_id) return self.storage.get_state(tenant_id)

View File

@ -82,8 +82,8 @@ class SQLAlchemyStorage(storage.BaseStorage):
if r: if r:
return r.begin return r.begin
def get_total(self, begin=None, end=None, tenant_id=None, def get_total(self, begin=None, end=None, tenant_id=None, service=None,
service=None, groupby=None): groupby=None):
session = db.get_session() session = db.get_session()
querymodels = [ querymodels = [
sqlalchemy.func.sum(self.frame_model.rate).label('rate') sqlalchemy.func.sum(self.frame_model.rate).label('rate')

View File

@ -100,11 +100,9 @@ class BaseStorage(object):
""" """
@abc.abstractmethod @abc.abstractmethod
def total(self, groupby=None, def total(self, groupby=None, begin=None, end=None, metric_types=None,
begin=None, end=None, filters=None, custom_fields=None, offset=0, limit=1000,
metric_types=None, paginate=True):
filters=None,
offset=0, limit=1000, paginate=True):
"""Returns a grouped total for given groupby. """Returns a grouped total for given groupby.
:param groupby: Attributes on which to group by. These attributes must :param groupby: Attributes on which to group by. These attributes must
@ -116,10 +114,14 @@ class BaseStorage(object):
:type begin: datetime :type begin: datetime
:param end: End date :param end: End date
:type end: datetime :type end: datetime
:param filters: Attributes to filter on. ex: {'flavor_id': '42'}
:type filters: dict
:param metric_types: Metric type to filter on. :param metric_types: Metric type to filter on.
:type metric_types: str or list :type metric_types: str or list
:param custom_fields: the custom fields that one desires to add in
the summary reporting. Each driver must handle
these values by themselves.
:type: custom_fields: list of strings
:param filters: Attributes to filter on. ex: {'flavor_id': '42'}
:type filters: dict
:param offset: Offset for pagination :param offset: Offset for pagination
:type offset: int :type offset: int
:param limit: Maximum amount of elements to return :param limit: Maximum amount of elements to return
@ -127,6 +129,7 @@ class BaseStorage(object):
:param paginate: Defaults to True. If False, all found results :param paginate: Defaults to True. If False, all found results
will be returned. will be returned.
:type paginate: bool :type paginate: bool
:rtype: dict :rtype: dict
Returns a dict with the following format:: Returns a dict with the following format::

View File

@ -25,7 +25,6 @@ from cloudkitty.utils import tz as tzutils
LOG = log.getLogger(__name__) LOG = log.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
ELASTICSEARCH_STORAGE_GROUP = 'storage_elasticsearch' ELASTICSEARCH_STORAGE_GROUP = 'storage_elasticsearch'
@ -34,7 +33,7 @@ elasticsearch_storage_opts = [
cfg.StrOpt( cfg.StrOpt(
'host', 'host',
help='Elasticsearch host, along with port and protocol. ' help='Elasticsearch host, along with port and protocol. '
'Defaults to http://localhost:9200', 'Defaults to http://localhost:9200',
default='http://localhost:9200'), default='http://localhost:9200'),
cfg.StrOpt( cfg.StrOpt(
'index_name', 'index_name',
@ -42,22 +41,21 @@ elasticsearch_storage_opts = [
default='cloudkitty'), default='cloudkitty'),
cfg.BoolOpt('insecure', cfg.BoolOpt('insecure',
help='Set to true to allow insecure HTTPS ' help='Set to true to allow insecure HTTPS '
'connections to Elasticsearch', 'connections to Elasticsearch',
default=False), default=False),
cfg.StrOpt('cafile', cfg.StrOpt('cafile',
help='Path of the CA certificate to trust for ' help='Path of the CA certificate to trust for '
'HTTPS connections.', 'HTTPS connections.',
default=None), default=None),
cfg.IntOpt('scroll_duration', cfg.IntOpt('scroll_duration',
help="Duration (in seconds) for which the ES scroll contexts " help="Duration (in seconds) for which the ES scroll contexts "
"should be kept alive.", "should be kept alive.",
advanced=True, advanced=True,
default=30, min=0, max=300), default=30, min=0, max=300),
] ]
CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP) CONF.register_opts(elasticsearch_storage_opts, ELASTICSEARCH_STORAGE_GROUP)
CLOUDKITTY_INDEX_MAPPING = { CLOUDKITTY_INDEX_MAPPING = {
"dynamic_templates": [ "dynamic_templates": [
{ {
@ -190,16 +188,16 @@ class ElasticsearchStorage(v2_storage.BaseStorage):
output[key] = value output[key] = value
return output return output
def total(self, groupby=None, def total(self, groupby=None, begin=None, end=None, metric_types=None,
begin=None, end=None, filters=None, custom_fields=None, offset=0, limit=1000,
metric_types=None, paginate=True):
filters=None,
offset=0, limit=1000, paginate=True):
begin, end = self._local_to_utc(begin or tzutils.get_month_start(), begin, end = self._local_to_utc(begin or tzutils.get_month_start(),
end or tzutils.get_next_month()) end or tzutils.get_next_month())
total, docs = self._conn.total(
begin, end, metric_types, filters, groupby, total, docs = self._conn.total(begin, end, metric_types, filters,
offset=offset, limit=limit, paginate=paginate) groupby, custom_fields=custom_fields,
offset=offset, limit=limit,
paginate=paginate)
return { return {
'total': total, 'total': total,
'results': [self._doc_to_total_result(doc, begin, end) 'results': [self._doc_to_total_result(doc, begin, end)

View File

@ -349,7 +349,12 @@ class ElasticsearchClient(object):
return self._req(self._sess.post, url, data, None) return self._req(self._sess.post, url, data, None)
def total(self, begin, end, metric_types, filters, groupby, def total(self, begin, end, metric_types, filters, groupby,
offset=0, limit=1000, paginate=True): custom_fields=None, offset=0, limit=1000, paginate=True):
if custom_fields:
LOG.warning("'custom_fields' are not implemented yet for "
"ElasticSearch. Therefore, the custom fields [%s] "
"informed by the user will be ignored.", custom_fields)
if not paginate: if not paginate:
offset = 0 offset = 0

View File

@ -187,8 +187,12 @@ class InfluxClient(object):
for mtype in types) for mtype in types)
return ' AND (' + type_query + ')' return ' AND (' + type_query + ')'
def get_total(self, types, begin, end, groupby=None, filters=None): def get_total(self, types, begin, end, custom_fields,
query = 'SELECT SUM(qty) AS qty, SUM(price) AS price FROM "dataframes"' groupby=None, filters=None):
self.validate_custom_fields(custom_fields)
query = 'SELECT %s FROM "dataframes"' % custom_fields
query += self._get_time_query(begin, end) query += self._get_time_query(begin, end)
query += self._get_filter_query(filters) query += self._get_filter_query(filters)
query += self._get_type_query(types) query += self._get_type_query(types)
@ -204,7 +208,21 @@ class InfluxClient(object):
query += ' GROUP BY ' + groupby_query query += ' GROUP BY ' + groupby_query
query += ';' query += ';'
return self._conn.query(query) total = self._conn.query(query)
LOG.debug(
"Data [%s] received when executing query [%s].", total, query)
return total
@staticmethod
def validate_custom_fields(custom_fields):
forbidden_clauses = ["select", "from", "drop", "delete", "create",
"alter", "insert", "update"]
for field in custom_fields.split(","):
if field.lower() in forbidden_clauses:
raise RuntimeError("Clause [%s] is not allowed in custom"
" fields summary get report. The following"
" clauses are not allowed [%s].",
field, forbidden_clauses)
def retrieve(self, def retrieve(self,
types, types,
@ -349,24 +367,25 @@ class InfluxStorage(v2_storage.BaseStorage):
output = { output = {
'begin': begin, 'begin': begin,
'end': end, 'end': end,
'qty': point['qty'],
'rate': point['price'],
} }
for key in point.keys():
if "time" != key:
output[key] = point[key]
if groupby: if groupby:
for group in _sanitized_groupby(groupby): for group in _sanitized_groupby(groupby):
output[group] = series_groupby.get(group, '') output[group] = series_groupby.get(group, '')
return output return output
def total(self, groupby=None, def total(self, groupby=None, begin=None, end=None, metric_types=None,
begin=None, end=None, filters=None, offset=0, limit=1000, paginate=True,
metric_types=None, custom_fields="SUM(qty) AS qty, SUM(price) AS rate"):
filters=None,
offset=0, limit=1000, paginate=True):
begin, end = self._check_begin_end(begin, end) begin, end = self._check_begin_end(begin, end)
total = self._conn.get_total( total = self._conn.get_total(metric_types, begin, end,
metric_types, begin, end, groupby, filters) custom_fields, groupby, filters)
output = [] output = []
for (series_name, series_groupby), points in total.items(): for (series_name, series_groupby), points in total.items():
@ -374,7 +393,12 @@ class InfluxStorage(v2_storage.BaseStorage):
# NOTE(peschk_l): InfluxDB returns all timestamps for a given # NOTE(peschk_l): InfluxDB returns all timestamps for a given
# period and interval, even those with no data. This filters # period and interval, even those with no data. This filters
# out periods with no data # out periods with no data
if point['qty'] is not None and point['price'] is not None:
# NOTE (rafaelweingartner): the summary get API is allowing
# users to customize the report. Therefore, we only ignore
# data points, if all of the entries have None values.
# Otherwise, they are presented to the user.
if [k for k in point.keys() if point[k]]:
output.append(self._get_total_elem( output.append(self._get_total_elem(
tzutils.utc_to_local(begin), tzutils.utc_to_local(begin),
tzutils.utc_to_local(end), tzutils.utc_to_local(end),
@ -385,6 +409,7 @@ class InfluxStorage(v2_storage.BaseStorage):
groupby = _sanitized_groupby(groupby) groupby = _sanitized_groupby(groupby)
if groupby: if groupby:
output.sort(key=lambda x: [x[group] for group in groupby]) output.sort(key=lambda x: [x[group] for group in groupby])
return { return {
'total': len(output), 'total': len(output),
'results': output[offset:offset + limit] if paginate else output, 'results': output[offset:offset + limit] if paginate else output,

View File

@ -65,7 +65,7 @@ class FakeElasticsearchClient(client.ElasticsearchClient):
return len(output), output return len(output), output
def total(self, begin, end, metric_types, filters, groupby, def total(self, begin, end, metric_types, filters, groupby,
offset=0, limit=1000, paginate=True): custom_fields=None, offset=0, limit=1000, paginate=True):
filter_func = functools.partial( filter_func = functools.partial(
self.__filter_func, begin, end, filters, metric_types) self.__filter_func, begin, end, filters, metric_types)
docs = list(filter(filter_func, self._docs)) docs = list(filter(filter_func, self._docs))

View File

@ -89,7 +89,8 @@ class FakeInfluxClient(InfluxClient):
series.append(target_serie) series.append(target_serie)
return target_serie return target_serie
def get_total(self, types, begin, end, groupby=None, filters=None): def get_total(self, types, begin, end, custom_fields, groupby=None,
filters=None):
total = copy.deepcopy(self.total_sample) total = copy.deepcopy(self.total_sample)
series = [] series = []

View File

@ -101,7 +101,8 @@ class StorageUnitTest(TestCase):
self.assertEqual(len(total['results']), expected_total_len) self.assertEqual(len(total['results']), expected_total_len)
self.assertEqual(total['total'], expected_total_len) self.assertEqual(total['total'], expected_total_len)
returned_total = round(sum(r['rate'] for r in total['results']), 5) returned_total = round(
sum(r.get('rate', r.get('price')) for r in total['results']), 5)
self.assertLessEqual( self.assertLessEqual(
abs(expected_total - float(returned_total)), 0.00001) abs(expected_total - float(returned_total)), 0.00001)
@ -189,14 +190,18 @@ class StorageUnitTest(TestCase):
total['results'].sort(key=lambda x: x['project_id'], reverse=True) total['results'].sort(key=lambda x: x['project_id'], reverse=True)
first_element = total['results'][0]
self.assertLessEqual( self.assertLessEqual(
abs(round(float(total['results'][0]['rate']) abs(round(
- expected_total_first, 5)), float(first_element.get('rate', first_element.get('price')))
- expected_total_first, 5)),
0.00001, 0.00001,
) )
second_element = total['results'][1]
self.assertLessEqual( self.assertLessEqual(
abs(round(float(total['results'][1]['rate']) abs(round(
- expected_total_second, 5)), float(second_element.get('rate', second_element.get('price')))
- expected_total_second, 5)),
0.00001, 0.00001,
) )
self.assertLessEqual( self.assertLessEqual(
@ -228,14 +233,17 @@ class StorageUnitTest(TestCase):
total['results'].sort(key=lambda x: x['project_id'], reverse=True) total['results'].sort(key=lambda x: x['project_id'], reverse=True)
first_entry = total['results'][0]
second_entry = total['results'][1]
self.assertLessEqual( self.assertLessEqual(
abs(round(float(total['results'][0]['rate']) abs(round(float(first_entry.get('rate', first_entry.get('price')))
- expected_total_first, 5)), - expected_total_first, 5)),
0.00001, 0.00001,
) )
self.assertLessEqual( self.assertLessEqual(
abs(round(float(total['results'][1]['rate']) abs(round(
- expected_total_second, 5)), float(second_entry.get('rate', second_entry.get('price')))
- expected_total_second, 5)),
0.00001, 0.00001,
) )
self.assertLessEqual( self.assertLessEqual(

View File

@ -17,6 +17,7 @@ Get a rating summary for one or several tenants.
- end: end - end: end
- groupby: groupby - groupby: groupby
- filters: filters - filters: filters
- custom_fields: custom_fields
Status codes Status codes
------------ ------------

View File

@ -5,6 +5,30 @@ begin: &begin
type: iso8601 timestamp type: iso8601 timestamp
required: false required: false
custom_fields:
in: query
description: |
Optional attributes to customize the summary GET API response. When
using this parameter, users can create custom reports. The default
behavior is to list the sum of the quantity and the sum of the price,
which is projected as ``rate`` field. The default value for the
``custom_fields`` parameter is ``SUM(qty) AS qty, SUM(price) AS rate``.
One can customize this field as they wish with InfluxDB queries. The
following statements ``"select", "from", "drop", "delete", "create",
"alter", "insert", "update"`` are not allowed though. For instance, if
one wants to retrieve the quantity field as the last value of the
quantity, and not the sum (this is quite interesting when generating
reports for storage values), the user can send the parameter as ``last(qty)
AS qty, SUM(price) AS rate``. To discover all possible fields that one
can work with, the user can also use ``*`` as a parameter.
``Currently this feature only works for Influx storage backend.`` It
(the feature) depends on the storage backend driver to support it. If
the user tries to set this configuration while using other storage
backends, it will be ignored.
type: list of strings
required: false
end: &end end: &end
in: query in: query
description: | description: |