From 101a410739d07fdaf82cf4f059495a427a319e61 Mon Sep 17 00:00:00 2001 From: Pedro Henrique Date: Thu, 11 May 2023 12:39:39 +0000 Subject: [PATCH] Add support to InfluxDB v2 as storage backend This patch allows CloudKitty to use InfluxDB v2 with Flux queries. This type of query uses less CPU and RAM to be processed in the InfluxDB backend. Change-Id: I8ee3c92776aa69afbede353981a5fcd65dd7d099 Depends-On: https://review.opendev.org/c/openstack/requirements/+/895629 Story: 2010863 Task: 48539 --- .zuul.yaml | 19 +- cloudkitty/storage/v2/influx.py | 517 ++++++++++++++++-- cloudkitty/tests/storage/v2/influx_utils.py | 2 +- cloudkitty/tests/storage/v2/test_influxdb.py | 230 ++++++++ devstack/plugin.sh | 50 +- devstack/settings | 1 + ...b-v2-storage-backend-f94df79f9e5276a8.yaml | 4 + requirements.txt | 1 + 8 files changed, 771 insertions(+), 53 deletions(-) create mode 100644 releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml diff --git a/.zuul.yaml b/.zuul.yaml index b1ab83c2..af04f354 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -76,12 +76,25 @@ name: cloudkitty-tempest-full-v2-storage-influxdb parent: base-cloudkitty-v2-api-tempest-job description: | - Job testing cloudkitty installation on devstack with python 3 and the - InfluxDB v2 storage driver and running tempest tests + Job testing cloudkitty installation on devstack with python 3, InfluxDB + v1 and the InfluxDB v2 storage driver and running tempest tests vars: devstack_localrc: CLOUDKITTY_STORAGE_BACKEND: influxdb CLOUDKITTY_STORAGE_VERSION: 2 + CLOUDKITTY_INFLUX_VERSION: 1 + +- job: + name: cloudkitty-tempest-full-v2-storage-influxdb-v2 + parent: base-cloudkitty-v2-api-tempest-job + description: | + Job testing cloudkitty installation on devstack with python 3, InfluxDB + v2 and the InfluxDB v2 storage driver and running tempest tests + vars: + devstack_localrc: + CLOUDKITTY_STORAGE_BACKEND: influxdb + CLOUDKITTY_STORAGE_VERSION: 2 + CLOUDKITTY_INFLUX_VERSION: 2 - job: name: cloudkitty-tempest-full-v2-storage-elasticsearch @@ -139,6 +152,7 @@ check: jobs: - cloudkitty-tempest-full-v2-storage-influxdb + - cloudkitty-tempest-full-v2-storage-influxdb-v2 - cloudkitty-tempest-full-v2-storage-elasticsearch: voting: false - cloudkitty-tempest-full-v2-storage-opensearch: @@ -150,5 +164,6 @@ gate: jobs: - cloudkitty-tempest-full-v2-storage-influxdb + - cloudkitty-tempest-full-v2-storage-influxdb-v2 - cloudkitty-tempest-full-v1-storage-sqlalchemy - cloudkitty-tempest-full-ipv6-only diff --git a/cloudkitty/storage/v2/influx.py b/cloudkitty/storage/v2/influx.py index 2591bfc8..243daa94 100644 --- a/cloudkitty/storage/v2/influx.py +++ b/cloudkitty/storage/v2/influx.py @@ -12,11 +12,18 @@ # License for the specific language governing permissions and limitations # under the License. # +import csv import datetime import influxdb +import io +import json +import re +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client import InfluxDBClient from oslo_config import cfg from oslo_log import log +import requests from cloudkitty import dataframe from cloudkitty.storage import v2 as v2_storage @@ -56,6 +63,29 @@ influx_storage_opts = [ help='Path of the CA certificate to trust for HTTPS connections', default=None ), + cfg.IntOpt('version', help='InfluxDB version', default=1), + cfg.IntOpt('query_timeout', help='Flux query timeout in milliseconds', + default=3600000), + cfg.StrOpt( + 'token', + help='InfluxDB API token for version 2 authentication', + default=None + ), + cfg.StrOpt( + 'org', + help='InfluxDB 2 org', + default="openstack" + ), + cfg.StrOpt( + 'bucket', + help='InfluxDB 2 bucket', + default="cloudkitty" + ), + cfg.StrOpt( + 'url', + help='InfluxDB 2 URL', + default=None + ) ] CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP) @@ -192,7 +222,7 @@ class InfluxClient(object): return " AND " + InfluxClient._get_filter("type", types) def get_total(self, types, begin, end, custom_fields, - groupby=None, filters=None): + groupby=None, filters=None, limit=None): self.validate_custom_fields(custom_fields) @@ -232,11 +262,8 @@ class InfluxClient(object): " clauses are not allowed [%s].", field, forbidden_clauses) - def retrieve(self, - types, - filters, - begin, end, - offset=0, limit=1000, paginate=True): + def retrieve(self, types, filters, begin, end, offset=0, limit=1000, + paginate=True): query = 'SELECT * FROM "dataframes"' query += self._get_time_query(begin, end) query += self._get_filter_query(filters) @@ -282,15 +309,448 @@ class InfluxClient(object): self._conn.query(query) + def _get_total_elem(self, begin, end, groupby, series_groupby, point): + if groupby and 'time' in groupby: + begin = tzutils.dt_from_iso(point['time']) + period = point.get(PERIOD_FIELD_NAME) or self._default_period + end = tzutils.add_delta(begin, datetime.timedelta(seconds=period)) + output = { + 'begin': begin, + 'end': end, + } + + for key in point.keys(): + if "time" != key: + output[key] = point[key] + + if groupby: + for group in _sanitized_groupby(groupby): + output[group] = series_groupby.get(group, '') + return output + + def process_total(self, total, begin, end, groupby, *args): + output = [] + for (series_name, series_groupby), points in total.items(): + for point in points: + # NOTE(peschk_l): InfluxDB returns all timestamps for a given + # period and interval, even those with no data. This filters + # out periods with no data + + # NOTE (rafaelweingartner): the summary get API is allowing + # users to customize the report. Therefore, we only ignore + # data points, if all of the entries have None values. + # Otherwise, they are presented to the user. + if [k for k in point.keys() if point[k]]: + output.append(self._get_total_elem( + tzutils.utc_to_local(begin), + tzutils.utc_to_local(end), + groupby, + series_groupby, + point)) + return output + + +class InfluxClientV2(InfluxClient): + """Class used to facilitate interaction with InfluxDB v2 + + custom_fields_rgx: Regex to parse the input custom_fields and + retrieve the field name, the desired alias + and the aggregation function to use. + It allows us to keep the same custom_fields + representation for both InfluxQL and Flux + queries. + + """ + + custom_fields_rgx = r'([\w_\\"]+)\(([\w_\\"]+)\) (AS|as) ' \ + r'\\?"?([\w_ \\]+)"?,? ?' + + class FluxResponseHandler(object): + """Class used to process the response of Flux queries + + As the Flux response splits its result set by the + requested fields, we need to merge them into a single + one based on their groups (tags). + + Using this approach we keep the response data + compatible with the InfluxQL result set, where we + already have the multiple result set for each field + merged into a single one. + """ + + def __init__(self, response, groupby, fields, begin, end, + field_filters): + self.data = response + self.field_filters = field_filters + self.response = {} + self.begin = begin + self.end = end + self.groupby = groupby + self.fields = fields + self.process() + + def process(self): + """This method merges all the Flux result sets into a single one. + + To make sure the fields filtering comply with the user's + request, we need to remove the merged entries that have None + value for filtered fields, we need to do that because working + with fields one by one in Flux queries is more performant + than working with all the fields together, but it brings some + problems when we want to filter some data. E.g: + + We want the fields A and B, grouped by C and D, and the field + A must be 2. Imagine this query for the following + dataset: + + A : C : D B : C : D + 1 : 1 : 1 5 : 1 : 1 + 2 : 2 : 2 6 : 2 : 2 + 2 : 3 : 3 7 : 3 : 3 + 2 : 4 : 4 + + The result set is going to be like: + + A : C : D B : C : D + 2 : 2 : 2 5 : 1 : 1 + 2 : 3 : 3 6 : 2 : 2 + 2 : 4 : 4 7 : 3 : 3 + + And the merged value is going to be like: + + A : B : C : D + None : 5 : 1 : 1 + 2 : 6 : 2 : 2 + 2 : 7 : 3 : 3 + 2 : None : 4 : 4 + + So, we need to remove the first undesired entry to get the + correct result: + + A : B : C : D + 2 : 6 : 2 : 2 + 2 : 7 : 3 : 3 + 2 : None : 4 : 4 + """ + + LOG.debug("Using fields %s to process InfluxDB V2 response.", + self.fields) + LOG.debug("Start processing data [%s] of InfluxDB V2 API.", + self.data) + if self.fields == ["*"] and not self.groupby: + self.process_data_wildcard() + else: + self.process_data_with_fields() + + LOG.debug("Data processed by the InfluxDB V2 backend with " + "result [%s].", self.response) + LOG.debug("Start sanitizing the response of Influx V2 API.") + self.sanitize_filtered_entries() + LOG.debug("Response sanitized [%s] for InfluxDB V2 API.", + self.response) + + def process_data_wildcard(self): + LOG.debug("Processing wildcard response for InfluxDB V2 API.") + found_fields = set() + for r in self.data: + if self.is_header_entry(r): + LOG.debug("Skipping header entry: [%s].", r) + continue + r_key = ''.join(sorted(r.values())) + found_fields.add(r['_field']) + r_value = r + r_value['begin'] = self.begin + r_value['end'] = self.end + self.response.setdefault( + r_key, r_value)[r['result']] = float(r['_value']) + + def process_data_with_fields(self): + for r in self.data: + if self.is_header_entry(r): + LOG.debug("Skipping header entry: [%s].", r) + continue + r_key = '' + r_value = {f: None for f in self.fields} + r_value['begin'] = self.begin + r_value['end'] = self.end + for g in (self.groupby or []): + val = r.get(g) + r_key += val or '' + r_value[g] = val + + self.response.setdefault( + r_key, r_value)[r['result']] = float(r['_value']) + + @staticmethod + def is_header_entry(entry): + """Check header entries. + + As the response contains multiple resultsets, + each entry in the response CSV has its own + header, which is the same for all the result sets, + but the CSV parser does not ignore it + and processes all headers except the first as a + dict entry, so for these cases, each dict's value + is going to be the same as the dict's key, so we + are picking one and if it is this case, we skip it. + + """ + + return entry.get('_start') == '_start' + + def sanitize_filtered_entries(self): + """Removes entries where filtered fields have None as value.""" + + for d in self.field_filters or []: + for k in list(self.response.keys()): + if self.response[k][d] is None: + self.response.pop(k, None) + + def __init__(self, default_period=None): + super().__init__(default_period=default_period) + self.client = InfluxDBClient( + url=CONF.storage_influxdb.url, + timeout=CONF.storage_influxdb.query_timeout, + token=CONF.storage_influxdb.token, + org=CONF.storage_influxdb.org) + self._conn = self.client + + def retrieve(self, types, filters, begin, end, offset=0, limit=1000, + paginate=True): + + query = self.get_query(begin, end, '*', filters=filters) + response = self.query(query) + output = self.process_total( + response, begin, end, None, '*', filters) + LOG.debug("Retrieved output %s", output) + results = {'results': output[ + offset:offset + limit] if paginate else output} + return len(output), results + + def delete(self, begin, end, filters): + predicate = '_measurement="dataframes"' + f = self.get_group_filters_query( + filters, fmt=lambda x: '"' + str(x) + '"') + if f: + f = f.replace('==', '=').replace('and', 'AND') + predicate += f'{f}' + + LOG.debug("InfluxDB v2 deleting elements filtering by [%s] and " + "with [begin=%s, end=%s].", predicate, begin, end) + delete_api = self.client.delete_api() + delete_api.delete(begin, end, bucket=CONF.storage_influxdb.bucket, + predicate=predicate) + + def process_total(self, total, begin, end, groupby, custom_fields, + filters): + cf = self.get_custom_fields(custom_fields) + fields = list(map(lambda f: f[2], cf)) + c_fields = {f[1]: f[2] for f in cf} + field_filters = [c_fields[f] for f in filters if f in c_fields] + handler = self.FluxResponseHandler(total, groupby, fields, begin, end, + field_filters) + return list(handler.response.values()) + + def commit(self): + total_points = len(self._points) + if len(self._points) < 1: + return + LOG.debug('Pushing {} points to InfluxDB'.format(total_points)) + self.write_points(self._points, + retention_policy=self._retention_policy) + self._points = [] + + def write_points(self, points, retention_policy=None): + write_api = self.client.write_api(write_options=SYNCHRONOUS) + [write_api.write( + bucket=CONF.storage_influxdb.bucket, record=p) + for p in points] + + def _get_filter_query(self, filters): + if not filters: + return '' + return ' and ' + ' and '.join( + self._get_filter(k, v) for k, v in filters.items()) + + def get_custom_fields(self, custom_fields): + + if not custom_fields: + return [] + + if custom_fields.strip() == '*': + return [('*', '*', '*')] + + groups = [list(i.groups()) for i in re.finditer( + self.custom_fields_rgx, custom_fields)] + + # Remove the "As|as" group that is useless. + if groups: + for g in groups: + del g[2] + + return groups + + def get_group_filters_query(self, group_filters, fmt=lambda x: f'r.{x}'): + if not group_filters: + return '' + + get_val = (lambda x: x if isinstance(v, (int, float)) or + x.isnumeric() else f'"{x}"') + + f = '' + for k, v in group_filters.items(): + if isinstance(v, (list, tuple)): + if len(v) == 1: + f += f' and {fmt(k)}=={get_val(v[0])}' + continue + + f += ' and (%s)' % ' or '.join([f'{fmt(k)}=={get_val(val)}' + for val in v]) + continue + + f += f' and {fmt(k)}=={get_val(v)}' + + return f + + def get_field_filters_query(self, field_filters, + fmt=lambda x: 'r["_value"]'): + return self.get_group_filters_query(field_filters, fmt) + + def get_custom_fields_query(self, custom_fields, query, field_filters, + group_filters, limit=None, groupby=None): + if not groupby: + groupby = [] + if not custom_fields: + custom_fields = 'sum(price) AS price,sum(qty) AS qty' + columns_to_keep = ', '.join(map(lambda g: f'"{g}"', groupby)) + columns_to_keep += ', "_field", "_value", "_start", "_stop"' + new_query = '' + LOG.debug("Custom fields: %s", custom_fields) + LOG.debug("Custom fields processed: %s", + self.get_custom_fields(custom_fields)) + for operation, field, alias in self.get_custom_fields(custom_fields): + LOG.debug("Generating query with operation [%s]," + " field [%s] and alias [%s]", operation, field, alias) + field_filter = {} + if field_filters and field in field_filters: + field_filter = {field: field_filters[field]} + + if field == '*': + group_filter = self.get_group_filters_query( + group_filters).replace(" and ", "", 1) + filter_to_replace = f'|> filter(fn: (r) => {group_filter})' + new_query += query.replace( + '', + filter_to_replace).replace( + '', + f'''|> drop(columns: ["_time"]) + {'|> limit(n: ' + str(limit) + ')' if limit else ''} + |> yield(name: "result")''') + continue + + new_query += query.replace( + '', + f'|> filter(fn: (r) => r["_field"] == ' + f'"{field}" {self.get_group_filters_query(group_filters)} ' + f'{self.get_field_filters_query(field_filter)})' + ).replace( + '', + f'''|> {operation.lower()}() + |> keep(columns: [{columns_to_keep}]) + |> set(key: "_field", value: "{alias}") + |> yield(name: "{alias}")''') + return new_query + + def get_groupby(self, groupby): + if not groupby: + return "|> group()" + return f'''|> group(columns: [{','.join([f'"{g}"' + for g in groupby])}])''' + + def get_time_range(self, begin, end): + if not begin or not end: + return '' + return f'|> range(start: {begin.isoformat()}, stop: {end.isoformat()})' + + def get_query(self, begin, end, custom_fields, groupby=None, filters=None, + limit=None): + + custom_fields_processed = list( + map(lambda x: x[1], self.get_custom_fields(custom_fields))) + field_filters = dict(filter( + lambda f: f[0] in custom_fields_processed, filters.items())) + group_filters = dict(filter( + lambda f: f[0] not in field_filters, filters.items())) + + query = f''' + from(bucket:"{CONF.storage_influxdb.bucket}") + {self.get_time_range(begin, end)} + |> filter(fn: (r) => r["_measurement"] == "dataframes") + + {self.get_groupby(groupby)} + + ''' + + LOG.debug("Field Filters: %s", field_filters) + LOG.debug("Group Filters: %s", group_filters) + query = self.get_custom_fields_query(custom_fields, query, + field_filters, group_filters, + limit, groupby) + return query + + def query(self, query): + url_base = CONF.storage_influxdb.url + org = CONF.storage_influxdb.org + url = f'{url_base}/api/v2/query?org={org}' + response = requests.post( + url=url, + headers={ + 'Content-type': 'application/json', + 'Authorization': f'Token {CONF.storage_influxdb.token}'}, + data=json.dumps({ + 'query': query})) + response_text = response.text + LOG.debug("Raw Response: [%s].", response_text) + handled_response = [] + for csv_tables in response_text.split(',result,table,'): + csv_tables = ',result,table,' + csv_tables + LOG.debug("Processing CSV [%s].", csv_tables) + processed = list(csv.DictReader(io.StringIO(csv_tables))) + LOG.debug("Processed CSV in dict [%s]", processed) + handled_response.extend(processed) + return handled_response + + def get_total(self, types, begin, end, custom_fields, + groupby=None, filters=None, limit=None): + + LOG.debug("Query types: %s", types) + if types: + if not filters: + filters = {} + filters['type'] = types + + LOG.debug("Query filters: %s", filters) + query = self.get_query(begin, end, custom_fields, + groupby, filters, limit) + + LOG.debug("Executing the Flux query [%s].", query) + + return self.query(query) + class InfluxStorage(v2_storage.BaseStorage): def __init__(self, *args, **kwargs): super(InfluxStorage, self).__init__(*args, **kwargs) self._default_period = kwargs.get('period') or CONF.collect.period - self._conn = InfluxClient(default_period=self._default_period) + if CONF.storage_influxdb.version == 2: + self._conn = InfluxClientV2(default_period=self._default_period) + else: + self._conn = InfluxClient(default_period=self._default_period) def init(self): + if CONF.storage_influxdb.version == 2: + return policy = CONF.storage_influxdb.retention_policy database = CONF.storage_influxdb.database if not self._conn.retention_policy_exists(database, policy): @@ -371,25 +831,6 @@ class InfluxStorage(v2_storage.BaseStorage): def delete(self, begin=None, end=None, filters=None): self._conn.delete(begin, end, filters) - def _get_total_elem(self, begin, end, groupby, series_groupby, point): - if groupby and 'time' in groupby: - begin = tzutils.dt_from_iso(point['time']) - period = point.get(PERIOD_FIELD_NAME) or self._default_period - end = tzutils.add_delta(begin, datetime.timedelta(seconds=period)) - output = { - 'begin': begin, - 'end': end, - } - - for key in point.keys(): - if "time" != key: - output[key] = point[key] - - if groupby: - for group in _sanitized_groupby(groupby): - output[group] = series_groupby.get(group, '') - return output - def total(self, groupby=None, begin=None, end=None, metric_types=None, filters=None, offset=0, limit=1000, paginate=True, custom_fields="SUM(qty) AS qty, SUM(price) AS rate"): @@ -398,30 +839,14 @@ class InfluxStorage(v2_storage.BaseStorage): groupby = self.parse_groupby_syntax_to_groupby_elements(groupby) total = self._conn.get_total(metric_types, begin, end, - custom_fields, groupby, filters) + custom_fields, groupby, filters, limit) - output = [] - for (series_name, series_groupby), points in total.items(): - for point in points: - # NOTE(peschk_l): InfluxDB returns all timestamps for a given - # period and interval, even those with no data. This filters - # out periods with no data - - # NOTE (rafaelweingartner): the summary get API is allowing - # users to customize the report. Therefore, we only ignore - # data points, if all of the entries have None values. - # Otherwise, they are presented to the user. - if [k for k in point.keys() if point[k]]: - output.append(self._get_total_elem( - tzutils.utc_to_local(begin), - tzutils.utc_to_local(end), - groupby, - series_groupby, - point)) + output = self._conn.process_total( + total, begin, end, groupby, custom_fields, filters) groupby = _sanitized_groupby(groupby) if groupby: - output.sort(key=lambda x: [x[group] for group in groupby]) + output.sort(key=lambda x: [x[group] or "" for group in groupby]) return { 'total': len(output), diff --git a/cloudkitty/tests/storage/v2/influx_utils.py b/cloudkitty/tests/storage/v2/influx_utils.py index 76fce87a..8474a8a9 100644 --- a/cloudkitty/tests/storage/v2/influx_utils.py +++ b/cloudkitty/tests/storage/v2/influx_utils.py @@ -90,7 +90,7 @@ class FakeInfluxClient(InfluxClient): return target_serie def get_total(self, types, begin, end, custom_fields, groupby=None, - filters=None): + filters=None, limit=None): total = copy.deepcopy(self.total_sample) series = [] diff --git a/cloudkitty/tests/storage/v2/test_influxdb.py b/cloudkitty/tests/storage/v2/test_influxdb.py index 0478f592..8cd802a6 100644 --- a/cloudkitty/tests/storage/v2/test_influxdb.py +++ b/cloudkitty/tests/storage/v2/test_influxdb.py @@ -209,3 +209,233 @@ class TestInfluxClient(unittest.TestCase): self._storage.delete(end=datetime(2019, 1, 2)) m.assert_called_once_with("""DELETE FROM "dataframes" WHERE """ """time < '2019-01-02T00:00:00';""") + + def test_process_total(self): + begin = datetime(2019, 1, 2, 10) + end = datetime(2019, 1, 2, 11) + groupby = ['valA', 'time'] + points_1 = [ + { + 'qty': 42, + 'price': 1.0, + 'time': begin.isoformat() + } + ] + series_groupby_1 = { + 'valA': '1' + } + points_2 = [ + { + 'qty': 12, + 'price': 2.0, + 'time': begin.isoformat() + } + ] + series_groupby_2 = { + 'valA': '2' + } + points_3 = [ + { + 'qty': None, + 'price': None, + 'time': None + } + ] + series_groupby_3 = { + 'valA': None + } + series_name = 'dataframes' + items = [((series_name, series_groupby_1), points_1), + ((series_name, series_groupby_2), points_2), + ((series_name, series_groupby_3), points_3)] + total = FakeResultSet(items=items) + result = self.client.process_total(total=total, begin=begin, end=end, + groupby=groupby) + expected = [{'begin': tzutils.utc_to_local(begin), + 'end': tzutils.utc_to_local(end), + 'qty': 42, + 'price': 1.0, + 'valA': '1'}, + {'begin': tzutils.utc_to_local(begin), + 'end': tzutils.utc_to_local(end), + 'qty': 12, + 'price': 2.0, + 'valA': '2'} + ] + self.assertEqual(expected, result) + + +class TestInfluxClientV2(unittest.TestCase): + + @mock.patch('cloudkitty.storage.v2.influx.InfluxDBClient') + def setUp(self, client_mock): + self.period_begin = tzutils.local_to_utc( + tzutils.get_month_start()) + self.period_end = tzutils.local_to_utc( + tzutils.get_next_month()) + self.client = influx.InfluxClientV2() + + @mock.patch('cloudkitty.storage.v2.influx.requests') + def test_query(self, mock_request): + static_vals = ['', 'result', 'table', '_start', '_value'] + custom_fields = 'last(f1) AS f1, last(f2) AS f2, last(f3) AS f3' + groups = ['g1', 'g2', 'g3'] + data = [ + static_vals + groups, + ['', 'f1', 0, 1, 1, 1, 2, 3], + ['', 'f2', 0, 1, 2, 1, 2, 3], + ['', 'f3', 0, 1, 3, 1, 2, 3], + static_vals + groups, + ['', 'f1', 0, 1, 3, 3, 1, 2], + ['', 'f2', 0, 1, 1, 3, 1, 2], + ['', 'f3', 0, 1, 2, 3, 1, 2], + static_vals + groups, + ['', 'f1', 0, 1, 2, 2, 3, 1], + ['', 'f2', 0, 1, 3, 2, 3, 1], + ['', 'f3', 0, 1, 1, 2, 3, 1] + ] + + expected_value = [ + {'f1': 1.0, 'f2': 2.0, 'f3': 3.0, 'begin': self.period_begin, + 'end': self.period_end, 'g1': '1', 'g2': '2', 'g3': '3'}, + {'f1': 3.0, 'f2': 1.0, 'f3': 2.0, 'begin': self.period_begin, + 'end': self.period_end, 'g1': '3', 'g2': '1', 'g3': '2'}, + {'f1': 2.0, 'f2': 3.0, 'f3': 1.0, 'begin': self.period_begin, + 'end': self.period_end, 'g1': '2', 'g2': '3', 'g3': '1'} + ] + + data_csv = '\n'.join([','.join(map(str, d)) for d in data]) + mock_request.post.return_value = mock.Mock(text=data_csv) + response = self.client.get_total( + None, self.period_begin, self.period_end, custom_fields, + filters={}, groupby=groups) + + result = self.client.process_total( + response, self.period_begin, self.period_end, + groups, custom_fields, {}) + + self.assertEqual(result, expected_value) + + def test_query_build(self): + custom_fields = 'last(field1) AS F1, sum(field2) AS F2' + groupby = ['group1', 'group2', 'group3'] + filters = { + 'filter1': '10', + 'filter2': 'filter2_filter' + } + beg = self.period_begin.isoformat() + end = self.period_end.isoformat() + expected = ('\n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "field1"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> last()\n' + ' |> keep(columns: ["group1", "group2",' + ' "group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "F1")\n' + ' |> yield(name: "F1")\n' + ' \n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "field2"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> sum()\n' + ' |> keep(columns: ["group1", "group2", ' + '"group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "F2")\n' + ' |> yield(name: "F2")\n' + ' ') + + query = self.client.get_query(begin=self.period_begin, + end=self.period_end, + custom_fields=custom_fields, + filters=filters, + groupby=groupby) + + self.assertEqual(query, expected) + + def test_query_build_no_custom_fields(self): + custom_fields = None + groupby = ['group1', 'group2', 'group3'] + filters = { + 'filter1': '10', + 'filter2': 'filter2_filter' + } + beg = self.period_begin.isoformat() + end = self.period_end.isoformat() + self.maxDiff = None + expected = ('\n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "price"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> sum()\n' + ' |> keep(columns: ["group1", "group2",' + ' "group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "price")\n' + ' |> yield(name: "price")\n' + ' \n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "qty"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> sum()\n' + ' |> keep(columns: ["group1", "group2", ' + '"group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "qty")\n' + ' |> yield(name: "qty")\n' + ' ') + + query = self.client.get_query(begin=self.period_begin, + end=self.period_end, + custom_fields=custom_fields, + filters=filters, + groupby=groupby) + + self.assertEqual(query, expected) + + def test_query_build_all_custom_fields(self): + custom_fields = '*' + groupby = ['group1', 'group2', 'group3'] + filters = { + 'filter1': '10', + 'filter2': 'filter2_filter' + } + beg = self.period_begin.isoformat() + end = self.period_end.isoformat() + expected = (f''' + from(bucket:"cloudkitty") + |> range(start: {beg}, stop: {end}) + |> filter(fn: (r) => r["_measurement"] == "dataframes") + |> filter(fn: (r) => r.filter1==10 and r.filter2=="filter + 2_filter") + |> group(columns: ["group1","group2","group3"]) + |> drop(columns: ["_time"]) + |> yield(name: "result")'''.replace( + ' ', '').replace('\n', '').replace('\t', '')) + + query = self.client.get_query(begin=self.period_begin, + end=self.period_end, + custom_fields=custom_fields, + filters=filters, + groupby=groupby).replace( + ' ', '').replace('\n', '').replace('\t', '') + + self.assertEqual(query, expected) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 5357a348..23d30c41 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -165,7 +165,7 @@ function configure_cloudkitty { iniset $CLOUDKITTY_CONF fetcher_keystone keystone_version 3 fi - if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} user ${CLOUDKITTY_INFLUXDB_USER} iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} password ${CLOUDKITTY_INFLUXDB_PASSWORD} iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} database ${CLOUDKITTY_INFLUXDB_DATABASE} @@ -173,6 +173,14 @@ function configure_cloudkitty { iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT} fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_INFLUXDB_HOST} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} url "http://${CLOUDKITTY_INFLUXDB_HOST}:${CLOUDKITTY_INFLUXDB_PORT}" + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} token ${CLOUDKITTY_INFLUXDB_PASSWORD} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} version 2 + fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "elasticsearch" ]; then iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_ELASTICSEARCH_HOST} iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} index_name ${CLOUDKITTY_ELASTICSEARCH_INDEX} @@ -242,9 +250,13 @@ function create_cloudkitty_data_dir { } function create_influxdb_database { - if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then influx -execute "CREATE DATABASE ${CLOUDKITTY_INFLUXDB_DATABASE}" fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then + influx setup --username ${CLOUDKITTY_INFLUXDB_USER} --password ${CLOUDKITTY_INFLUXDB_PASSWORD} --token ${CLOUDKITTY_INFLUXDB_PASSWORD} --org openstack --bucket cloudkitty --force + fi + } function create_elasticsearch_index { @@ -296,11 +308,27 @@ function install_influx_ubuntu { sudo dpkg -i --skip-same-version ${influxdb_file} } +function install_influx_v2_ubuntu { + local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb) + sudo dpkg -i --skip-same-version ${influxdb_file} + local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz) + tar xvzf ${influxcli_file} + sudo cp ./influx /usr/local/bin/ +} + function install_influx_fedora { local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb-1.6.3.x86_64.rpm) sudo yum localinstall -y ${influxdb_file} } +function install_influx_v2_fedora { + local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.5-1.x86_64.rpm) + sudo yum localinstall -y ${influxdb_file} + local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz) + tar xvzf ${influxcli_file} + sudo cp ./influx /usr/local/bin/ +} + function install_influx { if is_ubuntu; then install_influx_ubuntu @@ -313,6 +341,19 @@ function install_influx { sudo systemctl start influxdb || sudo systemctl restart influxdb } + +function install_influx_v2 { + if is_ubuntu; then + install_influx_v2_ubuntu + elif is_fedora; then + install_influx_v2_fedora + else + die $LINENO "Distribution must be Debian or Fedora-based" + fi + sudo cp -f "${CLOUDKITTY_DIR}"/devstack/files/influxdb.conf /etc/influxdb/influxdb.conf + sudo systemctl start influxdb || sudo systemctl restart influxdb +} + function install_elasticsearch_ubuntu { local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.9/opensearch-1.3.9-linux-x64.deb) sudo dpkg -i --skip-same-version ${opensearch_file} @@ -367,9 +408,10 @@ function install_opensearch { function install_cloudkitty { git_clone $CLOUDKITTY_REPO $CLOUDKITTY_DIR $CLOUDKITTY_BRANCH setup_develop $CLOUDKITTY_DIR - - if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ]; then + if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then install_influx + elif [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then + install_influx_v2 elif [ $CLOUDKITTY_STORAGE_BACKEND == 'elasticsearch' ]; then install_elasticsearch elif [ $CLOUDKITTY_STORAGE_BACKEND == 'opensearch' ]; then diff --git a/devstack/settings b/devstack/settings index 106f81b4..758169ff 100644 --- a/devstack/settings +++ b/devstack/settings @@ -50,6 +50,7 @@ CLOUDKITTY_METRICS_CONF=metrics.yml # Set CloudKitty storage info CLOUDKITTY_STORAGE_BACKEND=${CLOUDKITTY_STORAGE_BACKEND:-"influxdb"} CLOUDKITTY_STORAGE_VERSION=${CLOUDKITTY_STORAGE_VERSION:-"2"} +CLOUDKITTY_INFLUX_VERSION=${CLOUDKITTY_INFLUX_VERSION:-1} # Set CloudKitty output info CLOUDKITTY_OUTPUT_BACKEND=${CLOUDKITTY_OUTPUT_BACKEND:-"cloudkitty.backend.file.FileBackend"} diff --git a/releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml b/releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml new file mode 100644 index 00000000..01b4943f --- /dev/null +++ b/releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add support to Influx v2 database as storage backend. diff --git a/requirements.txt b/requirements.txt index 2eabcfe0..18f81d0d 100644 --- a/requirements.txt +++ b/requirements.txt @@ -34,6 +34,7 @@ stevedore>=3.2.2 # Apache-2.0 tooz>=2.7.1 # Apache-2.0 voluptuous>=0.12.0 # BSD License influxdb>=5.3.1 # MIT +influxdb-client>=1.36.0 # MIT Flask>=2.0.0 # BSD Flask-RESTful>=0.3.9 # BSD cotyledon>=1.7.3 # Apache-2.0