diff --git a/cloudkitty/common/config.py b/cloudkitty/common/config.py index 8fef26af..d7a71827 100644 --- a/cloudkitty/common/config.py +++ b/cloudkitty/common/config.py @@ -29,6 +29,7 @@ import cloudkitty.service import cloudkitty.storage import cloudkitty.storage.v1.hybrid.backends.gnocchi import cloudkitty.storage.v2.gnocchi +import cloudkitty.storage.v2.influx import cloudkitty.utils __all__ = ['list_opts'] @@ -61,6 +62,8 @@ _opts = [ cloudkitty.config.state_opts))), ('storage', list(itertools.chain( cloudkitty.storage.storage_opts))), + ('storage_influx', list(itertools.chain( + cloudkitty.storage.v2.influx.influx_storage_opts))), ('storage_gnocchi', list(itertools.chain( cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))), ('storage_gnocchi', list(itertools.chain( diff --git a/cloudkitty/storage/v2/__init__.py b/cloudkitty/storage/v2/__init__.py index 36a26bf8..83ef4ea0 100644 --- a/cloudkitty/storage/v2/__init__.py +++ b/cloudkitty/storage/v2/__init__.py @@ -166,7 +166,6 @@ class BaseStorage(object): { 'begin': XXX, 'end': XXX, - 'type': XXX, 'rate': XXX, 'groupby1': XXX, 'groupby2': XXX diff --git a/cloudkitty/storage/v2/influx.py b/cloudkitty/storage/v2/influx.py new file mode 100644 index 00000000..e8483fb6 --- /dev/null +++ b/cloudkitty/storage/v2/influx.py @@ -0,0 +1,369 @@ +# Copyright 2018 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +import copy +import datetime +import decimal + +import influxdb +from oslo_config import cfg +from oslo_log import log +import six + +from cloudkitty.storage import v2 as v2_storage +from cloudkitty import utils + + +LOG = log.getLogger(__name__) + + +CONF = cfg.CONF +CONF.import_opt('period', 'cloudkitty.collector', 'collect') + +INFLUX_STORAGE_GROUP = 'storage_influxdb' + +influx_storage_opts = [ + cfg.StrOpt('username', help='InfluxDB username'), + cfg.StrOpt('password', help='InfluxDB password', secret=True), + cfg.StrOpt('database', help='InfluxDB database'), + cfg.StrOpt('retention_policy', default='autogen', + help='Retention policy to use'), + cfg.StrOpt('host', help='InfluxDB host', default='localhost'), + cfg.IntOpt('port', help='InfluxDB port', default=8086), + cfg.BoolOpt( + 'use_ssl', + help='Set to true to use ssl for influxDB connection. ' + 'Defaults to False', + default=False, + ), + cfg.BoolOpt( + 'insecure', + help='Set to true to authorize insecure HTTPS connections to ' + 'influxDB. Defaults to False', + default=False, + ), + cfg.StrOpt( + 'cacert', + 'Path of the CA certificate to trust for HTTPS connections', + default=None + ), +] + +CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP) + + +class InfluxClient(object): + """Classe used to ease interaction with InfluxDB""" + + def __init__(self, chunk_size=500, autocommit=True): + """Creates an InfluxClient object. + + :param chunk_size: Size after which points should be pushed. + :param autocommit: Set to false to disable autocommit + """ + self._conn = self._get_influx_client() + self._chunk_size = chunk_size + self._autocommit = autocommit + self._retention_policy = CONF.storage_influxdb.retention_policy + self._points = [] + + @staticmethod + def _get_influx_client(): + verify = CONF.storage_influxdb.use_ssl and not \ + CONF.storage_influxdb.insecure + + if verify and CONF.storage_influxdb.cacert: + verify = CONF.storage_influxdb.cacert + + return influxdb.InfluxDBClient( + username=CONF.storage_influxdb.username, + password=CONF.storage_influxdb.password, + host=CONF.storage_influxdb.host, + port=CONF.storage_influxdb.port, + database=CONF.storage_influxdb.database, + ssl=CONF.storage_influxdb.use_ssl, + verify_ssl=verify, + ) + + def retention_policy_exists(self, database, policy): + policies = self._conn.get_list_retention_policies(database) + return policy in [pol['name'] for pol in policies] + + def commit(self): + total_points = len(self._points) + if len(self._points) < 1: + return + LOG.debug('Pushing {} points to InfluxDB'.format(total_points)) + self._conn.write_points(self._points, + retention_policy=self._retention_policy) + self._points = [] + + def append_point(self, + metric_type, + timestamp, + qty, price, unit, + fields, tags): + """Adds two points to commit to InfluxDB""" + + measurement_fields = copy.deepcopy(fields) + measurement_fields['qty'] = float(qty) + measurement_fields['price'] = float(price) + measurement_fields['unit'] = unit + # Unfortunately, this seems to be the fastest way: Having several + # measurements would imply a high client-side workload, and this allows + # us to filter out unrequired keys + measurement_fields['groupby'] = '|'.join(tags.keys()) + measurement_fields['metadata'] = '|'.join(fields.keys()) + + measurement_tags = copy.deepcopy(tags) + measurement_tags['type'] = metric_type + + self._points.append({ + 'measurement': 'dataframes', + 'tags': measurement_tags, + 'fields': measurement_fields, + 'time': utils.ts2dt(timestamp), + }) + if self._autocommit and len(self._points) >= self._chunk_size: + self.commit() + + @staticmethod + def _get_filter(key, value): + if isinstance(value, six.text_type): + format_string = "{}='{}'" + elif isinstance(value, (six.integer_types, float)): + format_string = "{}='{}'" + return format_string.format(key, value) + + @staticmethod + def _get_time_query(begin, end): + return " WHERE time >= '{}' AND time < '{}'".format( + utils.isotime(begin), utils.isotime(end)) + + def _get_filter_query(self, filters): + if not filters: + return '' + return ' AND ' + ' AND '.join( + self._get_filter(k, v) for k, v in filters.items()) + + @staticmethod + def _get_type_query(types): + if not types: + return '' + type_query = ' OR '.join("type='{}'".format(mtype) + for mtype in types) + return ' AND (' + type_query + ')' + + def get_total(self, types, begin, end, groupby=None, filters=None): + query = 'SELECT SUM(qty) AS qty, SUM(price) AS price FROM "dataframes"' + query += self._get_time_query(begin, end) + query += self._get_filter_query(filters) + query += self._get_type_query(types) + + if groupby: + groupby_query = ','.join(groupby) + query += ' GROUP BY ' + groupby_query + + query += ';' + + return self._conn.query(query) + + def retrieve(self, + types, + filters, + begin, end, + offset=0, limit=1000, paginate=True): + query = 'SELECT * FROM "dataframes"' + query += self._get_time_query(begin, end) + query += self._get_filter_query(filters) + query += self._get_type_query(types) + + if paginate: + query += ' OFFSET {} LIMIT {}'.format(offset, limit) + + query += ';' + + total_query = 'SELECT COUNT(groupby) FROM "dataframes"' + total_query += self._get_time_query(begin, end) + total_query += self._get_filter_query(filters) + total_query += self._get_type_query(types) + total_query += ';' + + total, result = self._conn.query(total_query + query) + total = sum(point['count'] for point in total.get_points()) + return total, result + + +class InfluxStorage(v2_storage.BaseStorage): + + def __init__(self, *args, **kwargs): + super(InfluxStorage, self).__init__(*args, **kwargs) + self._conn = InfluxClient() + self._period = kwargs.get('period', None) or CONF.collect.period + + def init(self): + policy = CONF.storage_influxdb.retention_policy + database = CONF.storage_influxdb.database + if not self._conn.retention_policy_exists(database, policy): + LOG.error( + 'Archive policy "{}" does not exist in database "{}"'.format( + policy, database) + ) + + def push(self, dataframes, scope_id=None): + + for dataframe in dataframes: + timestamp = dataframe['period']['begin'] + for metric_name, metrics in dataframe['usage'].items(): + for metric in metrics: + self._conn.append_point( + metric_name, + timestamp, + metric['vol']['qty'], + metric['rating']['price'], + metric['vol']['unit'], + metric['metadata'], + metric['groupby'], + ) + + self._conn.commit() + + @staticmethod + def _check_begin_end(begin, end): + if not begin: + begin = utils.get_month_start() + if not end: + end = utils.get_next_month() + if isinstance(begin, six.text_type): + begin = utils.iso2dt(begin) + if isinstance(begin, int): + begin = utils.ts2dt(begin) + if isinstance(end, six.text_type): + end = utils.iso2dt(end) + if isinstance(end, int): + end = utils.ts2dt(end) + + return begin, end + + @staticmethod + def _build_filters(filters, group_filters): + output = None + if filters and group_filters: + output = copy.deepcopy(filters) + output.update(group_filters) + elif group_filters: + output = group_filters + return output + + @staticmethod + def _point_to_dataframe_entry(point): + groupby = point.pop('groupby').split('|') + metadata = point.pop('metadata').split('|') + return { + 'vol': { + 'unit': point['unit'], + 'qty': decimal.Decimal(point['qty']), + }, + 'rating': { + 'price': point['price'], + }, + 'groupby': {key: point.get(key, '') for key in groupby}, + 'metadata': {key: point.get(key, '') for key in metadata}, + } + + def _build_dataframes(self, points): + dataframes = {} + for point in points: + point_type = point['type'] + if point['time'] not in dataframes.keys(): + dataframes[point['time']] = { + 'period': { + 'begin': point['time'], + 'end': utils.isotime( + utils.iso2dt(point['time']) + + datetime.timedelta(seconds=self._period)), + }, + 'usage': {}, + } + usage = dataframes[point['time']]['usage'] + if point_type not in usage.keys(): + usage[point_type] = [] + usage[point_type].append(self._point_to_dataframe_entry(point)) + + output = list(dataframes.values()) + output.sort(key=lambda x: x['period']['begin']) + return output + + def retrieve(self, begin=None, end=None, + filters=None, group_filters=None, + metric_types=None, + offset=0, limit=1000, paginate=True): + begin, end = self._check_begin_end(begin, end) + filters = self._build_filters(filters, group_filters) + total, resp = self._conn.retrieve( + metric_types, filters, begin, end, offset, limit, paginate) + + # Unfortunately, a ResultSet has no values() method, so we need to + # get them manually + points = [] + for _, item in resp.items(): + points += list(item) + + return { + 'total': total, + 'dataframes': self._build_dataframes(points) + } + + @staticmethod + def _get_total_elem(begin, end, groupby, series_groupby, point): + output = { + 'begin': begin, + 'end': end, + 'qty': point['qty'], + 'rate': point['price'], + } + if groupby: + for group in groupby: + output[group] = series_groupby.get(group, '') + return output + + def total(self, groupby=None, + begin=None, end=None, + metric_types=None, + filters=None, group_filters=None, + offset=0, limit=1000, paginate=True): + + begin, end = self._check_begin_end(begin, end) + filters = self._build_filters(filters, group_filters) + + total = self._conn.get_total( + metric_types, begin, end, groupby, filters) + + output = [] + for (series_name, series_groupby), points in total.items(): + for point in points: + output.append(self._get_total_elem( + begin, end, + groupby, + series_groupby, + point)) + + if groupby: + output.sort(key=lambda x: [x[group] for group in groupby]) + return { + 'total': len(output), + 'results': output[offset:limit] if paginate else output, + } diff --git a/cloudkitty/tests/gabbi/fixtures.py b/cloudkitty/tests/gabbi/fixtures.py index a5cf6c9d..f54ad9c9 100644 --- a/cloudkitty/tests/gabbi/fixtures.py +++ b/cloudkitty/tests/gabbi/fixtures.py @@ -44,7 +44,7 @@ from cloudkitty import rating from cloudkitty import storage from cloudkitty.storage.v1.sqlalchemy import models from cloudkitty import tests -from cloudkitty.tests import test_utils +from cloudkitty.tests import utils as test_utils from cloudkitty.tests.utils import is_functional_test from cloudkitty import utils as ck_utils diff --git a/cloudkitty/tests/samples.py b/cloudkitty/tests/samples.py index ca5822e3..b2098226 100644 --- a/cloudkitty/tests/samples.py +++ b/cloudkitty/tests/samples.py @@ -22,7 +22,10 @@ from oslo_utils import uuidutils from cloudkitty import utils as ck_utils +# These have a different format in order to check that both forms are supported TENANT = 'f266f30b11f246b589fd266f85eeec39' +OTHER_TENANT = '8d3ae500-89ea-4142-9c6e-1269db6a0b64' + INITIAL_TIMESTAMP = 1420070400 FIRST_PERIOD_BEGIN = INITIAL_TIMESTAMP FIRST_PERIOD_BEGIN_ISO = ck_utils.ts2iso(FIRST_PERIOD_BEGIN) diff --git a/cloudkitty/tests/storage/v1/test_hybrid_storage.py b/cloudkitty/tests/storage/v1/test_hybrid_storage.py index 47af3685..7609a0cd 100644 --- a/cloudkitty/tests/storage/v1/test_hybrid_storage.py +++ b/cloudkitty/tests/storage/v1/test_hybrid_storage.py @@ -22,8 +22,7 @@ from gnocchiclient import exceptions as gexc from cloudkitty import storage from cloudkitty import tests -from cloudkitty.tests import test_utils -from cloudkitty.tests.utils import is_functional_test +from cloudkitty.tests import utils as test_utils class BaseHybridStorageTest(tests.TestCase): @@ -56,7 +55,7 @@ class PermissiveDict(object): return self.value == other.get(self.key) -@testtools.skipIf(is_functional_test(), 'Not a functional test') +@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test') class HybridStorageTestGnocchi(BaseHybridStorageTest): def setUp(self): diff --git a/cloudkitty/tests/storage/v1/test_storage.py b/cloudkitty/tests/storage/v1/test_storage.py index ea383ddf..ba52d8a1 100644 --- a/cloudkitty/tests/storage/v1/test_storage.py +++ b/cloudkitty/tests/storage/v1/test_storage.py @@ -24,8 +24,7 @@ import testscenarios from cloudkitty import storage from cloudkitty import tests from cloudkitty.tests import samples -from cloudkitty.tests import test_utils -from cloudkitty.tests.utils import is_functional_test +from cloudkitty.tests import utils as test_utils from cloudkitty import utils as ck_utils @@ -66,7 +65,7 @@ class StorageTest(tests.TestCase): self.storage.push(working_data, self._other_tenant_id) -@testtools.skipIf(is_functional_test(), 'Not a functional test') +@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test') class StorageDataframeTest(StorageTest): storage_scenarios = [ @@ -130,7 +129,7 @@ class StorageDataframeTest(StorageTest): self.assertEqual(3, len(data)) -@testtools.skipIf(is_functional_test(), 'Not a functional test') +@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test') class StorageTotalTest(StorageTest): storage_scenarios = [ @@ -270,7 +269,7 @@ class StorageTotalTest(StorageTest): self.assertEqual(end, total[3]["end"]) -if not is_functional_test(): +if not test_utils.is_functional_test(): StorageTest.generate_scenarios() StorageTotalTest.generate_scenarios() StorageDataframeTest.generate_scenarios() diff --git a/cloudkitty/tests/storage/v2/base_functional.py b/cloudkitty/tests/storage/v2/base_functional.py index 7598c94d..0c47c15f 100644 --- a/cloudkitty/tests/storage/v2/base_functional.py +++ b/cloudkitty/tests/storage/v2/base_functional.py @@ -26,7 +26,7 @@ from oslo_config import fixture as config_fixture from oslo_utils import uuidutils from cloudkitty import storage -from cloudkitty.tests import samples +from cloudkitty.tests import utils as test_utils from cloudkitty import utils as ck_utils @@ -42,43 +42,6 @@ def _init_conf(): default_config_files=['/etc/cloudkitty/cloudkitty.conf']) -def get_storage_data(min_length=10, - nb_projects=2, - project_ids=None, - start=datetime(2018, 1, 1), - end=datetime(2018, 1, 1, 1)): - if isinstance(start, datetime): - start = ck_utils.dt2ts(start) - if isinstance(end, datetime): - end = ck_utils.dt2ts(end) - - if not project_ids: - project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)] - elif not isinstance(project_ids, list): - project_ids = [project_ids] - - usage = {} - for metric_name, sample in samples.V2_STORAGE_SAMPLE.items(): - dataframes = [] - for project_id in project_ids: - data = [copy.deepcopy(sample) - # for i in range(min_length + random.randint(1, 10))] - for i in range(1)] - for elem in data: - elem['groupby']['id'] = uuidutils.generate_uuid() - elem['groupby']['project_id'] = project_id - dataframes += data - usage[metric_name] = dataframes - - return { - 'usage': usage, - 'period': { - 'begin': start, - 'end': end - } - } - - class BaseFunctionalStorageTest(testtools.TestCase): # Name of the storage backend to test @@ -138,8 +101,9 @@ class BaseFunctionalStorageTest(testtools.TestCase): @staticmethod def gen_data_separate_projects(nb_projects): project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)] - data = [get_storage_data( - project_ids=project_ids[i], nb_projects=1) + data = [ + test_utils.generate_v2_storage_data( + project_ids=project_ids[i], nb_projects=1) for i in range(nb_projects)] return project_ids, data diff --git a/cloudkitty/tests/storage/v2/influx_utils.py b/cloudkitty/tests/storage/v2/influx_utils.py new file mode 100644 index 00000000..bbe7b929 --- /dev/null +++ b/cloudkitty/tests/storage/v2/influx_utils.py @@ -0,0 +1,141 @@ +# Copyright 2018 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +import copy +import functools + +from influxdb import resultset + +from cloudkitty.storage.v2.influx import InfluxClient +from cloudkitty import utils + + +class FakeInfluxClient(InfluxClient): + + total_sample = { + "statement_id": 0, + "series": [] + } + + total_series_sample = { + "name": "dataframes", + "tags": {}, + "columns": ["time", "qty", "price"], + "values": [], + } + + def __init__(self, **kwargs): + super(FakeInfluxClient, self).__init__(autocommit=False) + + def commit(self): + pass + + @staticmethod + def __filter_func(types, filters, begin, end, elem): + if elem['time'] < begin or elem['time'] >= end: + return False + if types and elem['tags']['type'] not in types: + return False + if filters is None: + return True + for key in filters.keys(): + if key not in elem['tags'].keys(): + return False + if elem['tags'][key] != filters[key]: + return False + return True + + def __get_target_serie(self, point, series, groupby): + target_serie = None + for serie in series: + if not groupby: + target_serie = serie + break + valid = True + for tag in serie['tags'].keys(): + if tag not in point['tags'].keys() or \ + point['tags'][tag] != serie['tags'][tag]: + valid = False + break + if valid: + target_serie = serie + break + + if target_serie is None: + target_serie = copy.deepcopy(self.total_series_sample) + if groupby: + target_serie['tags'] = {k: point['tags'][k] for k in groupby} + else: + target_serie['tags'] = {} + target_serie['values'] = [['1970-01-01T00:00:00Z', 0, 0]] + series.append(target_serie) + return target_serie + + def get_total(self, types, begin, end, groupby=None, filters=None): + total = copy.deepcopy(self.total_sample) + series = [] + + filter_func = functools.partial( + self.__filter_func, types, filters, begin, end) + points = filter(filter_func, self._points) + + for point in points: + target_serie = self.__get_target_serie(point, series, groupby) + target_serie['values'][0][1] += point['fields']['qty'] + target_serie['values'][0][2] += point['fields']['price'] + total['series'] = series + + return resultset.ResultSet(total) + + def retrieve(self, + types, + filters, + begin, end, + offset=0, limit=1000, paginate=True): + output = copy.deepcopy(self.total_sample) + + filter_func = functools.partial( + self.__filter_func, types, filters, begin, end) + points = list(filter(filter_func, self._points)) + + columns = set() + for point in list(points): + columns.update(point['tags'].keys()) + columns.update(point['fields'].keys()) + columns.add('time') + + series = { + 'name': 'dataframes', + 'columns': list(columns), + } + values = [] + + def __get_tag_or_field(point, key): + if key == 'time': + return utils.isotime(point['time']) + return point['tags'].get(key) or point['fields'].get(key) + + for point in points: + values.append([__get_tag_or_field(point, key) + for key in series['columns']]) + + series['values'] = values + output['series'] = [series] + + return len(points), resultset.ResultSet(output) + + def retention_policy_exists(self, database, policy): + return True diff --git a/cloudkitty/tests/storage/v2/test_storage_unit.py b/cloudkitty/tests/storage/v2/test_storage_unit.py new file mode 100644 index 00000000..a5697dda --- /dev/null +++ b/cloudkitty/tests/storage/v2/test_storage_unit.py @@ -0,0 +1,327 @@ +# Copyright 2018 Objectif Libre +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# +# @author: Luka Peschke +# +import datetime + +import mock +import testscenarios + +from cloudkitty import storage +from cloudkitty.tests import samples +from cloudkitty.tests.storage.v2 import influx_utils +from cloudkitty.tests import TestCase +from cloudkitty.tests import utils as test_utils + + +class StorageUnitTest(TestCase): + + storage_scenarios = [ + ('influx', dict(storage_backend='influxdb'))] + + @classmethod + def generate_scenarios(cls): + cls.scenarios = testscenarios.multiply_scenarios( + cls.scenarios, + cls.storage_scenarios) + + @mock.patch('cloudkitty.storage.v2.influx.InfluxClient', + new=influx_utils.FakeInfluxClient) + @mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf) + def setUp(self): + super(StorageUnitTest, self).setUp() + self._project_id = samples.TENANT + self._other_project_id = samples.OTHER_TENANT + self.conf.set_override('backend', self.storage_backend, 'storage') + self.conf.set_override('version', '2', 'storage') + self.storage = storage.get_storage(conf=test_utils.load_conf()) + self.storage.init() + self.data = [] + self.init_data() + + def init_data(self): + project_ids = [self._project_id, self._other_project_id] + for i in range(3): + start_delta = 3600 * i + end_delta = start_delta + 3600 + start = datetime.datetime(2018, 1, 1) \ + + datetime.timedelta(seconds=start_delta) + end = datetime.datetime(2018, 1, 1) \ + + datetime.timedelta(seconds=end_delta) + data = test_utils.generate_v2_storage_data( + project_ids=project_ids, + start=start, + end=end) + self.data.append(data) + self.storage.push([data]) + + @staticmethod + def _expected_total_qty_len(data, project_id=None, types=None): + total = 0 + qty = 0 + length = 0 + for data_part in data: + for mtype, usage_part in data_part['usage'].items(): + if types is not None and mtype not in types: + continue + for item in usage_part: + if project_id is None or \ + project_id == item['groupby']['project_id']: + total += item['rating']['price'] + qty += item['vol']['qty'] + length += 1 + + return round(float(total), 5), round(float(qty), 5), length + + def _compare_get_total_result_with_expected(self, + expected_qty, + expected_total, + expected_total_len, + total): + self.assertEqual(len(total['results']), expected_total_len) + self.assertEqual(total['total'], expected_total_len) + + returned_total = round(sum(r['rate'] for r in total['results']), 5) + self.assertLessEqual(abs(expected_total - returned_total), 0.00001) + + returned_qty = round(sum(r['qty'] for r in total['results']), 5) + self.assertLessEqual(abs(expected_qty - returned_qty), 0.00001) + + def test_get_total_all_scopes_all_periods(self): + expected_total, expected_qty, _ = self._expected_total_qty_len( + self.data) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 4) + + self._compare_get_total_result_with_expected( + expected_qty, + expected_total, + 1, + self.storage.total(begin=begin, end=end)) + + def test_get_total_one_scope_all_periods(self): + expected_total, expected_qty, _ = self._expected_total_qty_len( + self.data, self._project_id) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 4) + + group_filters = {'project_id': self._project_id} + self._compare_get_total_result_with_expected( + expected_qty, + expected_total, + 1, + self.storage.total(begin=begin, + end=end, + group_filters=group_filters), + ) + + def test_get_total_all_scopes_one_period(self): + expected_total, expected_qty, _ = self._expected_total_qty_len( + [self.data[0]]) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 1) + + self._compare_get_total_result_with_expected( + expected_qty, + expected_total, + 1, + self.storage.total(begin=begin, end=end)) + + def test_get_total_one_scope_one_period(self): + expected_total, expected_qty, _ = self._expected_total_qty_len( + [self.data[0]], self._project_id) + expected_total, expected_qty, _ = self._expected_total_qty_len( + [self.data[0]], self._project_id) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 1) + + group_filters = {'project_id': self._project_id} + self._compare_get_total_result_with_expected( + expected_qty, + expected_total, + 1, + self.storage.total(begin=begin, + end=end, + group_filters=group_filters), + ) + + def test_get_total_all_scopes_all_periods_groupby_project_id(self): + expected_total_first, expected_qty_first, _ = \ + self._expected_total_qty_len(self.data, self._project_id) + expected_total_second, expected_qty_second, _ = \ + self._expected_total_qty_len(self.data, self._other_project_id) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 4) + total = self.storage.total(begin=begin, end=end, + groupby=['project_id']) + self.assertEqual(len(total['results']), 2) + self.assertEqual(total['total'], 2) + + for t in total['results']: + self.assertIn('project_id', t.keys()) + + total['results'].sort(key=lambda x: x['project_id'], reverse=True) + + self.assertLessEqual( + abs(round(total['results'][0]['rate'], 5) - expected_total_first), + 0.00001, + ) + self.assertLessEqual( + abs(round(total['results'][1]['rate'], 5) - expected_total_second), + 0.00001, + ) + self.assertLessEqual( + abs(round(total['results'][0]['qty'], 5) - expected_qty_first), + 0.00001, + ) + self.assertLessEqual( + abs(round(total['results'][1]['qty'], 5) - expected_qty_second), + 0.00001, + ) + + def test_get_total_all_scopes_one_period_groupby_project_id(self): + expected_total_first, expected_qty_first, _ = \ + self._expected_total_qty_len([self.data[0]], self._project_id) + expected_total_second, expected_qty_second, _ = \ + self._expected_total_qty_len([self.data[0]], + self._other_project_id) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 1) + total = self.storage.total(begin=begin, end=end, + groupby=['project_id']) + self.assertEqual(len(total), 2) + + for t in total['results']: + self.assertIn('project_id', t.keys()) + + total['results'].sort(key=lambda x: x['project_id'], reverse=True) + + self.assertLessEqual( + abs(round(total['results'][0]['rate'], 5) - expected_total_first), + 0.00001, + ) + self.assertLessEqual( + abs(round(total['results'][1]['rate'], 5) - expected_total_second), + 0.00001, + ) + self.assertLessEqual( + abs(round(total['results'][0]['qty'], 5) - expected_qty_first), + 0.00001, + ) + self.assertLessEqual( + abs(round(total['results'][1]['qty'], 5) - expected_qty_second), + 0.00001, + ) + + def test_get_total_all_scopes_all_periods_groupby_type_paginate(self): + expected_total, expected_qty, _ = \ + self._expected_total_qty_len(self.data) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 4) + + total = {'total': 0, 'results': []} + for offset in range(0, 7, 2): + chunk = self.storage.total( + begin=begin, + end=end, + offset=offset, + limit=offset + 2, + groupby=['type']) + # there are seven metric types + self.assertEqual(chunk['total'], 7) + # last chunk, shorter + if offset == 6: + self.assertEqual(len(chunk['results']), 1) + else: + self.assertEqual(len(chunk['results']), 2) + total['results'] += chunk['results'] + total['total'] += len(chunk['results']) + + unpaginated_total = self.storage.total( + begin=begin, end=end, groupby=['type']) + self.assertEqual(total, unpaginated_total) + + self._compare_get_total_result_with_expected( + expected_qty, + expected_total, + 7, + total) + + def test_retrieve_all_scopes_all_types(self): + expected_total, expected_qty, expected_length = \ + self._expected_total_qty_len(self.data) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 4) + + frames = self.storage.retrieve(begin=begin, end=end) + self.assertEqual(frames['total'], expected_length) + + retrieved_length = 0 + for data_part in frames['dataframes']: + for usage_part in data_part['usage'].values(): + retrieved_length += len(usage_part) + + self.assertEqual(expected_length, retrieved_length) + + def test_retrieve_all_scopes_one_type(self): + expected_total, expected_qty, expected_length = \ + self._expected_total_qty_len(self.data, types=['image.size']) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 4) + + frames = self.storage.retrieve(begin=begin, end=end, + metric_types=['image.size']) + self.assertEqual(frames['total'], expected_length) + + retrieved_length = 0 + for data_part in frames['dataframes']: + for usage_part in data_part['usage'].values(): + retrieved_length += len(usage_part) + + self.assertEqual(expected_length, retrieved_length) + + def test_retrieve_one_scope_two_types_one_period(self): + expected_total, expected_qty, expected_length = \ + self._expected_total_qty_len([self.data[0]], self._project_id, + types=['image.size', 'instance']) + + begin = datetime.datetime(2018, 1, 1) + end = datetime.datetime(2018, 1, 1, 1) + + group_filters = {'project_id': self._project_id} + frames = self.storage.retrieve(begin=begin, end=end, + group_filters=group_filters, + metric_types=['image.size', 'instance']) + self.assertEqual(frames['total'], expected_length) + + retrieved_length = 0 + for data_part in frames['dataframes']: + for usage_part in data_part['usage'].values(): + retrieved_length += len(usage_part) + + self.assertEqual(expected_length, retrieved_length) + + +if not test_utils.is_functional_test(): + StorageUnitTest.generate_scenarios() diff --git a/cloudkitty/tests/test_utils.py b/cloudkitty/tests/test_utils.py index 1629a394..17468462 100644 --- a/cloudkitty/tests/test_utils.py +++ b/cloudkitty/tests/test_utils.py @@ -25,7 +25,6 @@ import unittest import mock from oslo_utils import timeutils -from cloudkitty.tests.samples import DEFAULT_METRICS_CONF from cloudkitty.tests.utils import is_functional_test from cloudkitty import utils as ck_utils @@ -200,7 +199,3 @@ class ConvertUnitTest(unittest.TestCase): def test_convert_decimal(self): result = ck_utils.num2decimal(decimal.Decimal(2)) self.assertEqual(result, decimal.Decimal(2)) - - -def load_conf(*args): - return DEFAULT_METRICS_CONF diff --git a/cloudkitty/tests/utils.py b/cloudkitty/tests/utils.py index 07667ea8..e3a22372 100644 --- a/cloudkitty/tests/utils.py +++ b/cloudkitty/tests/utils.py @@ -15,8 +15,56 @@ # # @author: Luka Peschke # +import copy +from datetime import datetime from os import getenv +import random + +from oslo_utils import uuidutils + +from cloudkitty.tests import samples +from cloudkitty import utils as ck_utils def is_functional_test(): return getenv('TEST_FUNCTIONAL', False) + + +def generate_v2_storage_data(min_length=10, + nb_projects=2, + project_ids=None, + start=datetime(2018, 1, 1), + end=datetime(2018, 1, 1, 1)): + if isinstance(start, datetime): + start = ck_utils.dt2ts(start) + if isinstance(end, datetime): + end = ck_utils.dt2ts(end) + + if not project_ids: + project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)] + elif not isinstance(project_ids, list): + project_ids = [project_ids] + + usage = {} + for metric_name, sample in samples.V2_STORAGE_SAMPLE.items(): + dataframes = [] + for project_id in project_ids: + data = [copy.deepcopy(sample) + for i in range(min_length + random.randint(1, 10))] + for elem in data: + elem['groupby']['id'] = uuidutils.generate_uuid() + elem['groupby']['project_id'] = project_id + dataframes += data + usage[metric_name] = dataframes + + return { + 'usage': usage, + 'period': { + 'begin': start, + 'end': end + } + } + + +def load_conf(*args): + return samples.DEFAULT_METRICS_CONF diff --git a/lower-constraints.txt b/lower-constraints.txt index 7eed03b2..45c5f24b 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -26,6 +26,7 @@ six==1.9.0 # MIT stevedore==1.5.0 # Apache-2.0 tooz==1.28.0 # Apache-2.0 voluptuous==0.11.1 # BSD-3 +influxdb==5.1.0 # MIT # test-requirements coverage==3.6 # Apache-2.0 diff --git a/releasenotes/notes/add-influx-storage-backend-3ace5b451e789e64.yaml b/releasenotes/notes/add-influx-storage-backend-3ace5b451e789e64.yaml new file mode 100644 index 00000000..c8167875 --- /dev/null +++ b/releasenotes/notes/add-influx-storage-backend-3ace5b451e789e64.yaml @@ -0,0 +1,9 @@ +--- +features: + - | + An InfluxDB v2 storage backend has been added. It will become the default + backend of the v2 storage interface. + + The v1 storage interface will be deprecated in a future release. At that + point, documentation about how to upgrade the storage backend will be made + available, along with some helpers. diff --git a/requirements.txt b/requirements.txt index 5fcd0e1d..b261bdd6 100644 --- a/requirements.txt +++ b/requirements.txt @@ -28,3 +28,4 @@ six>=1.9.0 # MIT stevedore>=1.5.0 # Apache-2.0 tooz>=1.28.0 # Apache-2.0 voluptuous>=0.11.1 # BSD License +influxdb>=5.1.0,!=5.2.0 # MIT diff --git a/setup.cfg b/setup.cfg index 5a6df84b..b6f11528 100644 --- a/setup.cfg +++ b/setup.cfg @@ -72,6 +72,7 @@ cloudkitty.storage.v1.backends = cloudkitty.storage.v2.backends = gnocchi = cloudkitty.storage.v2.gnocchi:GnocchiStorage + influxdb = cloudkitty.storage.v2.influx:InfluxStorage cloudkitty.storage.hybrid.backends = gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage