Adding an InfluxDB storage backend

This adds an InfluxDB backend to v2 storage. It is much more performant than the
gnocchi backend, and adds support for grafana.

In order to avoid making this patch too big, the documentation will be updated
in another patch.

Support for InfluxDB installation in the devstack plugin will also be added in
another patch.

Change-Id: Icaa23acb1a4791aac0dd8afb122d561065193eea
Story: 2001372
Task: 24536
This commit is contained in:
Luka Peschke 2018-10-10 10:37:59 +02:00
parent 6350923970
commit c4758e78b4
16 changed files with 914 additions and 55 deletions

View File

@ -29,6 +29,7 @@ import cloudkitty.service
import cloudkitty.storage
import cloudkitty.storage.v1.hybrid.backends.gnocchi
import cloudkitty.storage.v2.gnocchi
import cloudkitty.storage.v2.influx
import cloudkitty.utils
__all__ = ['list_opts']
@ -61,6 +62,8 @@ _opts = [
cloudkitty.config.state_opts))),
('storage', list(itertools.chain(
cloudkitty.storage.storage_opts))),
('storage_influx', list(itertools.chain(
cloudkitty.storage.v2.influx.influx_storage_opts))),
('storage_gnocchi', list(itertools.chain(
cloudkitty.storage.v1.hybrid.backends.gnocchi.gnocchi_storage_opts))),
('storage_gnocchi', list(itertools.chain(

View File

@ -166,7 +166,6 @@ class BaseStorage(object):
{
'begin': XXX,
'end': XXX,
'type': XXX,
'rate': XXX,
'groupby1': XXX,
'groupby2': XXX

View File

@ -0,0 +1,369 @@
# Copyright 2018 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
import copy
import datetime
import decimal
import influxdb
from oslo_config import cfg
from oslo_log import log
import six
from cloudkitty.storage import v2 as v2_storage
from cloudkitty import utils
LOG = log.getLogger(__name__)
CONF = cfg.CONF
CONF.import_opt('period', 'cloudkitty.collector', 'collect')
INFLUX_STORAGE_GROUP = 'storage_influxdb'
influx_storage_opts = [
cfg.StrOpt('username', help='InfluxDB username'),
cfg.StrOpt('password', help='InfluxDB password', secret=True),
cfg.StrOpt('database', help='InfluxDB database'),
cfg.StrOpt('retention_policy', default='autogen',
help='Retention policy to use'),
cfg.StrOpt('host', help='InfluxDB host', default='localhost'),
cfg.IntOpt('port', help='InfluxDB port', default=8086),
cfg.BoolOpt(
'use_ssl',
help='Set to true to use ssl for influxDB connection. '
'Defaults to False',
default=False,
),
cfg.BoolOpt(
'insecure',
help='Set to true to authorize insecure HTTPS connections to '
'influxDB. Defaults to False',
default=False,
),
cfg.StrOpt(
'cacert',
'Path of the CA certificate to trust for HTTPS connections',
default=None
),
]
CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP)
class InfluxClient(object):
"""Classe used to ease interaction with InfluxDB"""
def __init__(self, chunk_size=500, autocommit=True):
"""Creates an InfluxClient object.
:param chunk_size: Size after which points should be pushed.
:param autocommit: Set to false to disable autocommit
"""
self._conn = self._get_influx_client()
self._chunk_size = chunk_size
self._autocommit = autocommit
self._retention_policy = CONF.storage_influxdb.retention_policy
self._points = []
@staticmethod
def _get_influx_client():
verify = CONF.storage_influxdb.use_ssl and not \
CONF.storage_influxdb.insecure
if verify and CONF.storage_influxdb.cacert:
verify = CONF.storage_influxdb.cacert
return influxdb.InfluxDBClient(
username=CONF.storage_influxdb.username,
password=CONF.storage_influxdb.password,
host=CONF.storage_influxdb.host,
port=CONF.storage_influxdb.port,
database=CONF.storage_influxdb.database,
ssl=CONF.storage_influxdb.use_ssl,
verify_ssl=verify,
)
def retention_policy_exists(self, database, policy):
policies = self._conn.get_list_retention_policies(database)
return policy in [pol['name'] for pol in policies]
def commit(self):
total_points = len(self._points)
if len(self._points) < 1:
return
LOG.debug('Pushing {} points to InfluxDB'.format(total_points))
self._conn.write_points(self._points,
retention_policy=self._retention_policy)
self._points = []
def append_point(self,
metric_type,
timestamp,
qty, price, unit,
fields, tags):
"""Adds two points to commit to InfluxDB"""
measurement_fields = copy.deepcopy(fields)
measurement_fields['qty'] = float(qty)
measurement_fields['price'] = float(price)
measurement_fields['unit'] = unit
# Unfortunately, this seems to be the fastest way: Having several
# measurements would imply a high client-side workload, and this allows
# us to filter out unrequired keys
measurement_fields['groupby'] = '|'.join(tags.keys())
measurement_fields['metadata'] = '|'.join(fields.keys())
measurement_tags = copy.deepcopy(tags)
measurement_tags['type'] = metric_type
self._points.append({
'measurement': 'dataframes',
'tags': measurement_tags,
'fields': measurement_fields,
'time': utils.ts2dt(timestamp),
})
if self._autocommit and len(self._points) >= self._chunk_size:
self.commit()
@staticmethod
def _get_filter(key, value):
if isinstance(value, six.text_type):
format_string = "{}='{}'"
elif isinstance(value, (six.integer_types, float)):
format_string = "{}='{}'"
return format_string.format(key, value)
@staticmethod
def _get_time_query(begin, end):
return " WHERE time >= '{}' AND time < '{}'".format(
utils.isotime(begin), utils.isotime(end))
def _get_filter_query(self, filters):
if not filters:
return ''
return ' AND ' + ' AND '.join(
self._get_filter(k, v) for k, v in filters.items())
@staticmethod
def _get_type_query(types):
if not types:
return ''
type_query = ' OR '.join("type='{}'".format(mtype)
for mtype in types)
return ' AND (' + type_query + ')'
def get_total(self, types, begin, end, groupby=None, filters=None):
query = 'SELECT SUM(qty) AS qty, SUM(price) AS price FROM "dataframes"'
query += self._get_time_query(begin, end)
query += self._get_filter_query(filters)
query += self._get_type_query(types)
if groupby:
groupby_query = ','.join(groupby)
query += ' GROUP BY ' + groupby_query
query += ';'
return self._conn.query(query)
def retrieve(self,
types,
filters,
begin, end,
offset=0, limit=1000, paginate=True):
query = 'SELECT * FROM "dataframes"'
query += self._get_time_query(begin, end)
query += self._get_filter_query(filters)
query += self._get_type_query(types)
if paginate:
query += ' OFFSET {} LIMIT {}'.format(offset, limit)
query += ';'
total_query = 'SELECT COUNT(groupby) FROM "dataframes"'
total_query += self._get_time_query(begin, end)
total_query += self._get_filter_query(filters)
total_query += self._get_type_query(types)
total_query += ';'
total, result = self._conn.query(total_query + query)
total = sum(point['count'] for point in total.get_points())
return total, result
class InfluxStorage(v2_storage.BaseStorage):
def __init__(self, *args, **kwargs):
super(InfluxStorage, self).__init__(*args, **kwargs)
self._conn = InfluxClient()
self._period = kwargs.get('period', None) or CONF.collect.period
def init(self):
policy = CONF.storage_influxdb.retention_policy
database = CONF.storage_influxdb.database
if not self._conn.retention_policy_exists(database, policy):
LOG.error(
'Archive policy "{}" does not exist in database "{}"'.format(
policy, database)
)
def push(self, dataframes, scope_id=None):
for dataframe in dataframes:
timestamp = dataframe['period']['begin']
for metric_name, metrics in dataframe['usage'].items():
for metric in metrics:
self._conn.append_point(
metric_name,
timestamp,
metric['vol']['qty'],
metric['rating']['price'],
metric['vol']['unit'],
metric['metadata'],
metric['groupby'],
)
self._conn.commit()
@staticmethod
def _check_begin_end(begin, end):
if not begin:
begin = utils.get_month_start()
if not end:
end = utils.get_next_month()
if isinstance(begin, six.text_type):
begin = utils.iso2dt(begin)
if isinstance(begin, int):
begin = utils.ts2dt(begin)
if isinstance(end, six.text_type):
end = utils.iso2dt(end)
if isinstance(end, int):
end = utils.ts2dt(end)
return begin, end
@staticmethod
def _build_filters(filters, group_filters):
output = None
if filters and group_filters:
output = copy.deepcopy(filters)
output.update(group_filters)
elif group_filters:
output = group_filters
return output
@staticmethod
def _point_to_dataframe_entry(point):
groupby = point.pop('groupby').split('|')
metadata = point.pop('metadata').split('|')
return {
'vol': {
'unit': point['unit'],
'qty': decimal.Decimal(point['qty']),
},
'rating': {
'price': point['price'],
},
'groupby': {key: point.get(key, '') for key in groupby},
'metadata': {key: point.get(key, '') for key in metadata},
}
def _build_dataframes(self, points):
dataframes = {}
for point in points:
point_type = point['type']
if point['time'] not in dataframes.keys():
dataframes[point['time']] = {
'period': {
'begin': point['time'],
'end': utils.isotime(
utils.iso2dt(point['time'])
+ datetime.timedelta(seconds=self._period)),
},
'usage': {},
}
usage = dataframes[point['time']]['usage']
if point_type not in usage.keys():
usage[point_type] = []
usage[point_type].append(self._point_to_dataframe_entry(point))
output = list(dataframes.values())
output.sort(key=lambda x: x['period']['begin'])
return output
def retrieve(self, begin=None, end=None,
filters=None, group_filters=None,
metric_types=None,
offset=0, limit=1000, paginate=True):
begin, end = self._check_begin_end(begin, end)
filters = self._build_filters(filters, group_filters)
total, resp = self._conn.retrieve(
metric_types, filters, begin, end, offset, limit, paginate)
# Unfortunately, a ResultSet has no values() method, so we need to
# get them manually
points = []
for _, item in resp.items():
points += list(item)
return {
'total': total,
'dataframes': self._build_dataframes(points)
}
@staticmethod
def _get_total_elem(begin, end, groupby, series_groupby, point):
output = {
'begin': begin,
'end': end,
'qty': point['qty'],
'rate': point['price'],
}
if groupby:
for group in groupby:
output[group] = series_groupby.get(group, '')
return output
def total(self, groupby=None,
begin=None, end=None,
metric_types=None,
filters=None, group_filters=None,
offset=0, limit=1000, paginate=True):
begin, end = self._check_begin_end(begin, end)
filters = self._build_filters(filters, group_filters)
total = self._conn.get_total(
metric_types, begin, end, groupby, filters)
output = []
for (series_name, series_groupby), points in total.items():
for point in points:
output.append(self._get_total_elem(
begin, end,
groupby,
series_groupby,
point))
if groupby:
output.sort(key=lambda x: [x[group] for group in groupby])
return {
'total': len(output),
'results': output[offset:limit] if paginate else output,
}

View File

@ -44,7 +44,7 @@ from cloudkitty import rating
from cloudkitty import storage
from cloudkitty.storage.v1.sqlalchemy import models
from cloudkitty import tests
from cloudkitty.tests import test_utils
from cloudkitty.tests import utils as test_utils
from cloudkitty.tests.utils import is_functional_test
from cloudkitty import utils as ck_utils

View File

@ -22,7 +22,10 @@ from oslo_utils import uuidutils
from cloudkitty import utils as ck_utils
# These have a different format in order to check that both forms are supported
TENANT = 'f266f30b11f246b589fd266f85eeec39'
OTHER_TENANT = '8d3ae500-89ea-4142-9c6e-1269db6a0b64'
INITIAL_TIMESTAMP = 1420070400
FIRST_PERIOD_BEGIN = INITIAL_TIMESTAMP
FIRST_PERIOD_BEGIN_ISO = ck_utils.ts2iso(FIRST_PERIOD_BEGIN)

View File

@ -22,8 +22,7 @@ from gnocchiclient import exceptions as gexc
from cloudkitty import storage
from cloudkitty import tests
from cloudkitty.tests import test_utils
from cloudkitty.tests.utils import is_functional_test
from cloudkitty.tests import utils as test_utils
class BaseHybridStorageTest(tests.TestCase):
@ -56,7 +55,7 @@ class PermissiveDict(object):
return self.value == other.get(self.key)
@testtools.skipIf(is_functional_test(), 'Not a functional test')
@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test')
class HybridStorageTestGnocchi(BaseHybridStorageTest):
def setUp(self):

View File

@ -24,8 +24,7 @@ import testscenarios
from cloudkitty import storage
from cloudkitty import tests
from cloudkitty.tests import samples
from cloudkitty.tests import test_utils
from cloudkitty.tests.utils import is_functional_test
from cloudkitty.tests import utils as test_utils
from cloudkitty import utils as ck_utils
@ -66,7 +65,7 @@ class StorageTest(tests.TestCase):
self.storage.push(working_data, self._other_tenant_id)
@testtools.skipIf(is_functional_test(), 'Not a functional test')
@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test')
class StorageDataframeTest(StorageTest):
storage_scenarios = [
@ -130,7 +129,7 @@ class StorageDataframeTest(StorageTest):
self.assertEqual(3, len(data))
@testtools.skipIf(is_functional_test(), 'Not a functional test')
@testtools.skipIf(test_utils.is_functional_test(), 'Not a functional test')
class StorageTotalTest(StorageTest):
storage_scenarios = [
@ -270,7 +269,7 @@ class StorageTotalTest(StorageTest):
self.assertEqual(end, total[3]["end"])
if not is_functional_test():
if not test_utils.is_functional_test():
StorageTest.generate_scenarios()
StorageTotalTest.generate_scenarios()
StorageDataframeTest.generate_scenarios()

View File

@ -26,7 +26,7 @@ from oslo_config import fixture as config_fixture
from oslo_utils import uuidutils
from cloudkitty import storage
from cloudkitty.tests import samples
from cloudkitty.tests import utils as test_utils
from cloudkitty import utils as ck_utils
@ -42,43 +42,6 @@ def _init_conf():
default_config_files=['/etc/cloudkitty/cloudkitty.conf'])
def get_storage_data(min_length=10,
nb_projects=2,
project_ids=None,
start=datetime(2018, 1, 1),
end=datetime(2018, 1, 1, 1)):
if isinstance(start, datetime):
start = ck_utils.dt2ts(start)
if isinstance(end, datetime):
end = ck_utils.dt2ts(end)
if not project_ids:
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
elif not isinstance(project_ids, list):
project_ids = [project_ids]
usage = {}
for metric_name, sample in samples.V2_STORAGE_SAMPLE.items():
dataframes = []
for project_id in project_ids:
data = [copy.deepcopy(sample)
# for i in range(min_length + random.randint(1, 10))]
for i in range(1)]
for elem in data:
elem['groupby']['id'] = uuidutils.generate_uuid()
elem['groupby']['project_id'] = project_id
dataframes += data
usage[metric_name] = dataframes
return {
'usage': usage,
'period': {
'begin': start,
'end': end
}
}
class BaseFunctionalStorageTest(testtools.TestCase):
# Name of the storage backend to test
@ -138,8 +101,9 @@ class BaseFunctionalStorageTest(testtools.TestCase):
@staticmethod
def gen_data_separate_projects(nb_projects):
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
data = [get_storage_data(
project_ids=project_ids[i], nb_projects=1)
data = [
test_utils.generate_v2_storage_data(
project_ids=project_ids[i], nb_projects=1)
for i in range(nb_projects)]
return project_ids, data

View File

@ -0,0 +1,141 @@
# Copyright 2018 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
import copy
import functools
from influxdb import resultset
from cloudkitty.storage.v2.influx import InfluxClient
from cloudkitty import utils
class FakeInfluxClient(InfluxClient):
total_sample = {
"statement_id": 0,
"series": []
}
total_series_sample = {
"name": "dataframes",
"tags": {},
"columns": ["time", "qty", "price"],
"values": [],
}
def __init__(self, **kwargs):
super(FakeInfluxClient, self).__init__(autocommit=False)
def commit(self):
pass
@staticmethod
def __filter_func(types, filters, begin, end, elem):
if elem['time'] < begin or elem['time'] >= end:
return False
if types and elem['tags']['type'] not in types:
return False
if filters is None:
return True
for key in filters.keys():
if key not in elem['tags'].keys():
return False
if elem['tags'][key] != filters[key]:
return False
return True
def __get_target_serie(self, point, series, groupby):
target_serie = None
for serie in series:
if not groupby:
target_serie = serie
break
valid = True
for tag in serie['tags'].keys():
if tag not in point['tags'].keys() or \
point['tags'][tag] != serie['tags'][tag]:
valid = False
break
if valid:
target_serie = serie
break
if target_serie is None:
target_serie = copy.deepcopy(self.total_series_sample)
if groupby:
target_serie['tags'] = {k: point['tags'][k] for k in groupby}
else:
target_serie['tags'] = {}
target_serie['values'] = [['1970-01-01T00:00:00Z', 0, 0]]
series.append(target_serie)
return target_serie
def get_total(self, types, begin, end, groupby=None, filters=None):
total = copy.deepcopy(self.total_sample)
series = []
filter_func = functools.partial(
self.__filter_func, types, filters, begin, end)
points = filter(filter_func, self._points)
for point in points:
target_serie = self.__get_target_serie(point, series, groupby)
target_serie['values'][0][1] += point['fields']['qty']
target_serie['values'][0][2] += point['fields']['price']
total['series'] = series
return resultset.ResultSet(total)
def retrieve(self,
types,
filters,
begin, end,
offset=0, limit=1000, paginate=True):
output = copy.deepcopy(self.total_sample)
filter_func = functools.partial(
self.__filter_func, types, filters, begin, end)
points = list(filter(filter_func, self._points))
columns = set()
for point in list(points):
columns.update(point['tags'].keys())
columns.update(point['fields'].keys())
columns.add('time')
series = {
'name': 'dataframes',
'columns': list(columns),
}
values = []
def __get_tag_or_field(point, key):
if key == 'time':
return utils.isotime(point['time'])
return point['tags'].get(key) or point['fields'].get(key)
for point in points:
values.append([__get_tag_or_field(point, key)
for key in series['columns']])
series['values'] = values
output['series'] = [series]
return len(points), resultset.ResultSet(output)
def retention_policy_exists(self, database, policy):
return True

View File

@ -0,0 +1,327 @@
# Copyright 2018 Objectif Libre
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# @author: Luka Peschke
#
import datetime
import mock
import testscenarios
from cloudkitty import storage
from cloudkitty.tests import samples
from cloudkitty.tests.storage.v2 import influx_utils
from cloudkitty.tests import TestCase
from cloudkitty.tests import utils as test_utils
class StorageUnitTest(TestCase):
storage_scenarios = [
('influx', dict(storage_backend='influxdb'))]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(
cls.scenarios,
cls.storage_scenarios)
@mock.patch('cloudkitty.storage.v2.influx.InfluxClient',
new=influx_utils.FakeInfluxClient)
@mock.patch('cloudkitty.utils.load_conf', new=test_utils.load_conf)
def setUp(self):
super(StorageUnitTest, self).setUp()
self._project_id = samples.TENANT
self._other_project_id = samples.OTHER_TENANT
self.conf.set_override('backend', self.storage_backend, 'storage')
self.conf.set_override('version', '2', 'storage')
self.storage = storage.get_storage(conf=test_utils.load_conf())
self.storage.init()
self.data = []
self.init_data()
def init_data(self):
project_ids = [self._project_id, self._other_project_id]
for i in range(3):
start_delta = 3600 * i
end_delta = start_delta + 3600
start = datetime.datetime(2018, 1, 1) \
+ datetime.timedelta(seconds=start_delta)
end = datetime.datetime(2018, 1, 1) \
+ datetime.timedelta(seconds=end_delta)
data = test_utils.generate_v2_storage_data(
project_ids=project_ids,
start=start,
end=end)
self.data.append(data)
self.storage.push([data])
@staticmethod
def _expected_total_qty_len(data, project_id=None, types=None):
total = 0
qty = 0
length = 0
for data_part in data:
for mtype, usage_part in data_part['usage'].items():
if types is not None and mtype not in types:
continue
for item in usage_part:
if project_id is None or \
project_id == item['groupby']['project_id']:
total += item['rating']['price']
qty += item['vol']['qty']
length += 1
return round(float(total), 5), round(float(qty), 5), length
def _compare_get_total_result_with_expected(self,
expected_qty,
expected_total,
expected_total_len,
total):
self.assertEqual(len(total['results']), expected_total_len)
self.assertEqual(total['total'], expected_total_len)
returned_total = round(sum(r['rate'] for r in total['results']), 5)
self.assertLessEqual(abs(expected_total - returned_total), 0.00001)
returned_qty = round(sum(r['qty'] for r in total['results']), 5)
self.assertLessEqual(abs(expected_qty - returned_qty), 0.00001)
def test_get_total_all_scopes_all_periods(self):
expected_total, expected_qty, _ = self._expected_total_qty_len(
self.data)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 4)
self._compare_get_total_result_with_expected(
expected_qty,
expected_total,
1,
self.storage.total(begin=begin, end=end))
def test_get_total_one_scope_all_periods(self):
expected_total, expected_qty, _ = self._expected_total_qty_len(
self.data, self._project_id)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 4)
group_filters = {'project_id': self._project_id}
self._compare_get_total_result_with_expected(
expected_qty,
expected_total,
1,
self.storage.total(begin=begin,
end=end,
group_filters=group_filters),
)
def test_get_total_all_scopes_one_period(self):
expected_total, expected_qty, _ = self._expected_total_qty_len(
[self.data[0]])
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 1)
self._compare_get_total_result_with_expected(
expected_qty,
expected_total,
1,
self.storage.total(begin=begin, end=end))
def test_get_total_one_scope_one_period(self):
expected_total, expected_qty, _ = self._expected_total_qty_len(
[self.data[0]], self._project_id)
expected_total, expected_qty, _ = self._expected_total_qty_len(
[self.data[0]], self._project_id)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 1)
group_filters = {'project_id': self._project_id}
self._compare_get_total_result_with_expected(
expected_qty,
expected_total,
1,
self.storage.total(begin=begin,
end=end,
group_filters=group_filters),
)
def test_get_total_all_scopes_all_periods_groupby_project_id(self):
expected_total_first, expected_qty_first, _ = \
self._expected_total_qty_len(self.data, self._project_id)
expected_total_second, expected_qty_second, _ = \
self._expected_total_qty_len(self.data, self._other_project_id)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 4)
total = self.storage.total(begin=begin, end=end,
groupby=['project_id'])
self.assertEqual(len(total['results']), 2)
self.assertEqual(total['total'], 2)
for t in total['results']:
self.assertIn('project_id', t.keys())
total['results'].sort(key=lambda x: x['project_id'], reverse=True)
self.assertLessEqual(
abs(round(total['results'][0]['rate'], 5) - expected_total_first),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['rate'], 5) - expected_total_second),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][0]['qty'], 5) - expected_qty_first),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['qty'], 5) - expected_qty_second),
0.00001,
)
def test_get_total_all_scopes_one_period_groupby_project_id(self):
expected_total_first, expected_qty_first, _ = \
self._expected_total_qty_len([self.data[0]], self._project_id)
expected_total_second, expected_qty_second, _ = \
self._expected_total_qty_len([self.data[0]],
self._other_project_id)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 1)
total = self.storage.total(begin=begin, end=end,
groupby=['project_id'])
self.assertEqual(len(total), 2)
for t in total['results']:
self.assertIn('project_id', t.keys())
total['results'].sort(key=lambda x: x['project_id'], reverse=True)
self.assertLessEqual(
abs(round(total['results'][0]['rate'], 5) - expected_total_first),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['rate'], 5) - expected_total_second),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][0]['qty'], 5) - expected_qty_first),
0.00001,
)
self.assertLessEqual(
abs(round(total['results'][1]['qty'], 5) - expected_qty_second),
0.00001,
)
def test_get_total_all_scopes_all_periods_groupby_type_paginate(self):
expected_total, expected_qty, _ = \
self._expected_total_qty_len(self.data)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 4)
total = {'total': 0, 'results': []}
for offset in range(0, 7, 2):
chunk = self.storage.total(
begin=begin,
end=end,
offset=offset,
limit=offset + 2,
groupby=['type'])
# there are seven metric types
self.assertEqual(chunk['total'], 7)
# last chunk, shorter
if offset == 6:
self.assertEqual(len(chunk['results']), 1)
else:
self.assertEqual(len(chunk['results']), 2)
total['results'] += chunk['results']
total['total'] += len(chunk['results'])
unpaginated_total = self.storage.total(
begin=begin, end=end, groupby=['type'])
self.assertEqual(total, unpaginated_total)
self._compare_get_total_result_with_expected(
expected_qty,
expected_total,
7,
total)
def test_retrieve_all_scopes_all_types(self):
expected_total, expected_qty, expected_length = \
self._expected_total_qty_len(self.data)
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 4)
frames = self.storage.retrieve(begin=begin, end=end)
self.assertEqual(frames['total'], expected_length)
retrieved_length = 0
for data_part in frames['dataframes']:
for usage_part in data_part['usage'].values():
retrieved_length += len(usage_part)
self.assertEqual(expected_length, retrieved_length)
def test_retrieve_all_scopes_one_type(self):
expected_total, expected_qty, expected_length = \
self._expected_total_qty_len(self.data, types=['image.size'])
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 4)
frames = self.storage.retrieve(begin=begin, end=end,
metric_types=['image.size'])
self.assertEqual(frames['total'], expected_length)
retrieved_length = 0
for data_part in frames['dataframes']:
for usage_part in data_part['usage'].values():
retrieved_length += len(usage_part)
self.assertEqual(expected_length, retrieved_length)
def test_retrieve_one_scope_two_types_one_period(self):
expected_total, expected_qty, expected_length = \
self._expected_total_qty_len([self.data[0]], self._project_id,
types=['image.size', 'instance'])
begin = datetime.datetime(2018, 1, 1)
end = datetime.datetime(2018, 1, 1, 1)
group_filters = {'project_id': self._project_id}
frames = self.storage.retrieve(begin=begin, end=end,
group_filters=group_filters,
metric_types=['image.size', 'instance'])
self.assertEqual(frames['total'], expected_length)
retrieved_length = 0
for data_part in frames['dataframes']:
for usage_part in data_part['usage'].values():
retrieved_length += len(usage_part)
self.assertEqual(expected_length, retrieved_length)
if not test_utils.is_functional_test():
StorageUnitTest.generate_scenarios()

View File

@ -25,7 +25,6 @@ import unittest
import mock
from oslo_utils import timeutils
from cloudkitty.tests.samples import DEFAULT_METRICS_CONF
from cloudkitty.tests.utils import is_functional_test
from cloudkitty import utils as ck_utils
@ -200,7 +199,3 @@ class ConvertUnitTest(unittest.TestCase):
def test_convert_decimal(self):
result = ck_utils.num2decimal(decimal.Decimal(2))
self.assertEqual(result, decimal.Decimal(2))
def load_conf(*args):
return DEFAULT_METRICS_CONF

View File

@ -15,8 +15,56 @@
#
# @author: Luka Peschke
#
import copy
from datetime import datetime
from os import getenv
import random
from oslo_utils import uuidutils
from cloudkitty.tests import samples
from cloudkitty import utils as ck_utils
def is_functional_test():
return getenv('TEST_FUNCTIONAL', False)
def generate_v2_storage_data(min_length=10,
nb_projects=2,
project_ids=None,
start=datetime(2018, 1, 1),
end=datetime(2018, 1, 1, 1)):
if isinstance(start, datetime):
start = ck_utils.dt2ts(start)
if isinstance(end, datetime):
end = ck_utils.dt2ts(end)
if not project_ids:
project_ids = [uuidutils.generate_uuid() for i in range(nb_projects)]
elif not isinstance(project_ids, list):
project_ids = [project_ids]
usage = {}
for metric_name, sample in samples.V2_STORAGE_SAMPLE.items():
dataframes = []
for project_id in project_ids:
data = [copy.deepcopy(sample)
for i in range(min_length + random.randint(1, 10))]
for elem in data:
elem['groupby']['id'] = uuidutils.generate_uuid()
elem['groupby']['project_id'] = project_id
dataframes += data
usage[metric_name] = dataframes
return {
'usage': usage,
'period': {
'begin': start,
'end': end
}
}
def load_conf(*args):
return samples.DEFAULT_METRICS_CONF

View File

@ -26,6 +26,7 @@ six==1.9.0 # MIT
stevedore==1.5.0 # Apache-2.0
tooz==1.28.0 # Apache-2.0
voluptuous==0.11.1 # BSD-3
influxdb==5.1.0 # MIT
# test-requirements
coverage==3.6 # Apache-2.0

View File

@ -0,0 +1,9 @@
---
features:
- |
An InfluxDB v2 storage backend has been added. It will become the default
backend of the v2 storage interface.
The v1 storage interface will be deprecated in a future release. At that
point, documentation about how to upgrade the storage backend will be made
available, along with some helpers.

View File

@ -28,3 +28,4 @@ six>=1.9.0 # MIT
stevedore>=1.5.0 # Apache-2.0
tooz>=1.28.0 # Apache-2.0
voluptuous>=0.11.1 # BSD License
influxdb>=5.1.0,!=5.2.0 # MIT

View File

@ -72,6 +72,7 @@ cloudkitty.storage.v1.backends =
cloudkitty.storage.v2.backends =
gnocchi = cloudkitty.storage.v2.gnocchi:GnocchiStorage
influxdb = cloudkitty.storage.v2.influx:InfluxStorage
cloudkitty.storage.hybrid.backends =
gnocchi = cloudkitty.storage.v1.hybrid.backends.gnocchi:GnocchiStorage