diff --git a/.zuul.yaml b/.zuul.yaml index b1ab83c2..af04f354 100644 --- a/.zuul.yaml +++ b/.zuul.yaml @@ -76,12 +76,25 @@ name: cloudkitty-tempest-full-v2-storage-influxdb parent: base-cloudkitty-v2-api-tempest-job description: | - Job testing cloudkitty installation on devstack with python 3 and the - InfluxDB v2 storage driver and running tempest tests + Job testing cloudkitty installation on devstack with python 3, InfluxDB + v1 and the InfluxDB v2 storage driver and running tempest tests vars: devstack_localrc: CLOUDKITTY_STORAGE_BACKEND: influxdb CLOUDKITTY_STORAGE_VERSION: 2 + CLOUDKITTY_INFLUX_VERSION: 1 + +- job: + name: cloudkitty-tempest-full-v2-storage-influxdb-v2 + parent: base-cloudkitty-v2-api-tempest-job + description: | + Job testing cloudkitty installation on devstack with python 3, InfluxDB + v2 and the InfluxDB v2 storage driver and running tempest tests + vars: + devstack_localrc: + CLOUDKITTY_STORAGE_BACKEND: influxdb + CLOUDKITTY_STORAGE_VERSION: 2 + CLOUDKITTY_INFLUX_VERSION: 2 - job: name: cloudkitty-tempest-full-v2-storage-elasticsearch @@ -139,6 +152,7 @@ check: jobs: - cloudkitty-tempest-full-v2-storage-influxdb + - cloudkitty-tempest-full-v2-storage-influxdb-v2 - cloudkitty-tempest-full-v2-storage-elasticsearch: voting: false - cloudkitty-tempest-full-v2-storage-opensearch: @@ -150,5 +164,6 @@ gate: jobs: - cloudkitty-tempest-full-v2-storage-influxdb + - cloudkitty-tempest-full-v2-storage-influxdb-v2 - cloudkitty-tempest-full-v1-storage-sqlalchemy - cloudkitty-tempest-full-ipv6-only diff --git a/cloudkitty/storage/v2/influx.py b/cloudkitty/storage/v2/influx.py index 2591bfc8..243daa94 100644 --- a/cloudkitty/storage/v2/influx.py +++ b/cloudkitty/storage/v2/influx.py @@ -12,11 +12,18 @@ # License for the specific language governing permissions and limitations # under the License. # +import csv import datetime import influxdb +import io +import json +import re +from influxdb_client.client.write_api import SYNCHRONOUS +from influxdb_client import InfluxDBClient from oslo_config import cfg from oslo_log import log +import requests from cloudkitty import dataframe from cloudkitty.storage import v2 as v2_storage @@ -56,6 +63,29 @@ influx_storage_opts = [ help='Path of the CA certificate to trust for HTTPS connections', default=None ), + cfg.IntOpt('version', help='InfluxDB version', default=1), + cfg.IntOpt('query_timeout', help='Flux query timeout in milliseconds', + default=3600000), + cfg.StrOpt( + 'token', + help='InfluxDB API token for version 2 authentication', + default=None + ), + cfg.StrOpt( + 'org', + help='InfluxDB 2 org', + default="openstack" + ), + cfg.StrOpt( + 'bucket', + help='InfluxDB 2 bucket', + default="cloudkitty" + ), + cfg.StrOpt( + 'url', + help='InfluxDB 2 URL', + default=None + ) ] CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP) @@ -192,7 +222,7 @@ class InfluxClient(object): return " AND " + InfluxClient._get_filter("type", types) def get_total(self, types, begin, end, custom_fields, - groupby=None, filters=None): + groupby=None, filters=None, limit=None): self.validate_custom_fields(custom_fields) @@ -232,11 +262,8 @@ class InfluxClient(object): " clauses are not allowed [%s].", field, forbidden_clauses) - def retrieve(self, - types, - filters, - begin, end, - offset=0, limit=1000, paginate=True): + def retrieve(self, types, filters, begin, end, offset=0, limit=1000, + paginate=True): query = 'SELECT * FROM "dataframes"' query += self._get_time_query(begin, end) query += self._get_filter_query(filters) @@ -282,15 +309,448 @@ class InfluxClient(object): self._conn.query(query) + def _get_total_elem(self, begin, end, groupby, series_groupby, point): + if groupby and 'time' in groupby: + begin = tzutils.dt_from_iso(point['time']) + period = point.get(PERIOD_FIELD_NAME) or self._default_period + end = tzutils.add_delta(begin, datetime.timedelta(seconds=period)) + output = { + 'begin': begin, + 'end': end, + } + + for key in point.keys(): + if "time" != key: + output[key] = point[key] + + if groupby: + for group in _sanitized_groupby(groupby): + output[group] = series_groupby.get(group, '') + return output + + def process_total(self, total, begin, end, groupby, *args): + output = [] + for (series_name, series_groupby), points in total.items(): + for point in points: + # NOTE(peschk_l): InfluxDB returns all timestamps for a given + # period and interval, even those with no data. This filters + # out periods with no data + + # NOTE (rafaelweingartner): the summary get API is allowing + # users to customize the report. Therefore, we only ignore + # data points, if all of the entries have None values. + # Otherwise, they are presented to the user. + if [k for k in point.keys() if point[k]]: + output.append(self._get_total_elem( + tzutils.utc_to_local(begin), + tzutils.utc_to_local(end), + groupby, + series_groupby, + point)) + return output + + +class InfluxClientV2(InfluxClient): + """Class used to facilitate interaction with InfluxDB v2 + + custom_fields_rgx: Regex to parse the input custom_fields and + retrieve the field name, the desired alias + and the aggregation function to use. + It allows us to keep the same custom_fields + representation for both InfluxQL and Flux + queries. + + """ + + custom_fields_rgx = r'([\w_\\"]+)\(([\w_\\"]+)\) (AS|as) ' \ + r'\\?"?([\w_ \\]+)"?,? ?' + + class FluxResponseHandler(object): + """Class used to process the response of Flux queries + + As the Flux response splits its result set by the + requested fields, we need to merge them into a single + one based on their groups (tags). + + Using this approach we keep the response data + compatible with the InfluxQL result set, where we + already have the multiple result set for each field + merged into a single one. + """ + + def __init__(self, response, groupby, fields, begin, end, + field_filters): + self.data = response + self.field_filters = field_filters + self.response = {} + self.begin = begin + self.end = end + self.groupby = groupby + self.fields = fields + self.process() + + def process(self): + """This method merges all the Flux result sets into a single one. + + To make sure the fields filtering comply with the user's + request, we need to remove the merged entries that have None + value for filtered fields, we need to do that because working + with fields one by one in Flux queries is more performant + than working with all the fields together, but it brings some + problems when we want to filter some data. E.g: + + We want the fields A and B, grouped by C and D, and the field + A must be 2. Imagine this query for the following + dataset: + + A : C : D B : C : D + 1 : 1 : 1 5 : 1 : 1 + 2 : 2 : 2 6 : 2 : 2 + 2 : 3 : 3 7 : 3 : 3 + 2 : 4 : 4 + + The result set is going to be like: + + A : C : D B : C : D + 2 : 2 : 2 5 : 1 : 1 + 2 : 3 : 3 6 : 2 : 2 + 2 : 4 : 4 7 : 3 : 3 + + And the merged value is going to be like: + + A : B : C : D + None : 5 : 1 : 1 + 2 : 6 : 2 : 2 + 2 : 7 : 3 : 3 + 2 : None : 4 : 4 + + So, we need to remove the first undesired entry to get the + correct result: + + A : B : C : D + 2 : 6 : 2 : 2 + 2 : 7 : 3 : 3 + 2 : None : 4 : 4 + """ + + LOG.debug("Using fields %s to process InfluxDB V2 response.", + self.fields) + LOG.debug("Start processing data [%s] of InfluxDB V2 API.", + self.data) + if self.fields == ["*"] and not self.groupby: + self.process_data_wildcard() + else: + self.process_data_with_fields() + + LOG.debug("Data processed by the InfluxDB V2 backend with " + "result [%s].", self.response) + LOG.debug("Start sanitizing the response of Influx V2 API.") + self.sanitize_filtered_entries() + LOG.debug("Response sanitized [%s] for InfluxDB V2 API.", + self.response) + + def process_data_wildcard(self): + LOG.debug("Processing wildcard response for InfluxDB V2 API.") + found_fields = set() + for r in self.data: + if self.is_header_entry(r): + LOG.debug("Skipping header entry: [%s].", r) + continue + r_key = ''.join(sorted(r.values())) + found_fields.add(r['_field']) + r_value = r + r_value['begin'] = self.begin + r_value['end'] = self.end + self.response.setdefault( + r_key, r_value)[r['result']] = float(r['_value']) + + def process_data_with_fields(self): + for r in self.data: + if self.is_header_entry(r): + LOG.debug("Skipping header entry: [%s].", r) + continue + r_key = '' + r_value = {f: None for f in self.fields} + r_value['begin'] = self.begin + r_value['end'] = self.end + for g in (self.groupby or []): + val = r.get(g) + r_key += val or '' + r_value[g] = val + + self.response.setdefault( + r_key, r_value)[r['result']] = float(r['_value']) + + @staticmethod + def is_header_entry(entry): + """Check header entries. + + As the response contains multiple resultsets, + each entry in the response CSV has its own + header, which is the same for all the result sets, + but the CSV parser does not ignore it + and processes all headers except the first as a + dict entry, so for these cases, each dict's value + is going to be the same as the dict's key, so we + are picking one and if it is this case, we skip it. + + """ + + return entry.get('_start') == '_start' + + def sanitize_filtered_entries(self): + """Removes entries where filtered fields have None as value.""" + + for d in self.field_filters or []: + for k in list(self.response.keys()): + if self.response[k][d] is None: + self.response.pop(k, None) + + def __init__(self, default_period=None): + super().__init__(default_period=default_period) + self.client = InfluxDBClient( + url=CONF.storage_influxdb.url, + timeout=CONF.storage_influxdb.query_timeout, + token=CONF.storage_influxdb.token, + org=CONF.storage_influxdb.org) + self._conn = self.client + + def retrieve(self, types, filters, begin, end, offset=0, limit=1000, + paginate=True): + + query = self.get_query(begin, end, '*', filters=filters) + response = self.query(query) + output = self.process_total( + response, begin, end, None, '*', filters) + LOG.debug("Retrieved output %s", output) + results = {'results': output[ + offset:offset + limit] if paginate else output} + return len(output), results + + def delete(self, begin, end, filters): + predicate = '_measurement="dataframes"' + f = self.get_group_filters_query( + filters, fmt=lambda x: '"' + str(x) + '"') + if f: + f = f.replace('==', '=').replace('and', 'AND') + predicate += f'{f}' + + LOG.debug("InfluxDB v2 deleting elements filtering by [%s] and " + "with [begin=%s, end=%s].", predicate, begin, end) + delete_api = self.client.delete_api() + delete_api.delete(begin, end, bucket=CONF.storage_influxdb.bucket, + predicate=predicate) + + def process_total(self, total, begin, end, groupby, custom_fields, + filters): + cf = self.get_custom_fields(custom_fields) + fields = list(map(lambda f: f[2], cf)) + c_fields = {f[1]: f[2] for f in cf} + field_filters = [c_fields[f] for f in filters if f in c_fields] + handler = self.FluxResponseHandler(total, groupby, fields, begin, end, + field_filters) + return list(handler.response.values()) + + def commit(self): + total_points = len(self._points) + if len(self._points) < 1: + return + LOG.debug('Pushing {} points to InfluxDB'.format(total_points)) + self.write_points(self._points, + retention_policy=self._retention_policy) + self._points = [] + + def write_points(self, points, retention_policy=None): + write_api = self.client.write_api(write_options=SYNCHRONOUS) + [write_api.write( + bucket=CONF.storage_influxdb.bucket, record=p) + for p in points] + + def _get_filter_query(self, filters): + if not filters: + return '' + return ' and ' + ' and '.join( + self._get_filter(k, v) for k, v in filters.items()) + + def get_custom_fields(self, custom_fields): + + if not custom_fields: + return [] + + if custom_fields.strip() == '*': + return [('*', '*', '*')] + + groups = [list(i.groups()) for i in re.finditer( + self.custom_fields_rgx, custom_fields)] + + # Remove the "As|as" group that is useless. + if groups: + for g in groups: + del g[2] + + return groups + + def get_group_filters_query(self, group_filters, fmt=lambda x: f'r.{x}'): + if not group_filters: + return '' + + get_val = (lambda x: x if isinstance(v, (int, float)) or + x.isnumeric() else f'"{x}"') + + f = '' + for k, v in group_filters.items(): + if isinstance(v, (list, tuple)): + if len(v) == 1: + f += f' and {fmt(k)}=={get_val(v[0])}' + continue + + f += ' and (%s)' % ' or '.join([f'{fmt(k)}=={get_val(val)}' + for val in v]) + continue + + f += f' and {fmt(k)}=={get_val(v)}' + + return f + + def get_field_filters_query(self, field_filters, + fmt=lambda x: 'r["_value"]'): + return self.get_group_filters_query(field_filters, fmt) + + def get_custom_fields_query(self, custom_fields, query, field_filters, + group_filters, limit=None, groupby=None): + if not groupby: + groupby = [] + if not custom_fields: + custom_fields = 'sum(price) AS price,sum(qty) AS qty' + columns_to_keep = ', '.join(map(lambda g: f'"{g}"', groupby)) + columns_to_keep += ', "_field", "_value", "_start", "_stop"' + new_query = '' + LOG.debug("Custom fields: %s", custom_fields) + LOG.debug("Custom fields processed: %s", + self.get_custom_fields(custom_fields)) + for operation, field, alias in self.get_custom_fields(custom_fields): + LOG.debug("Generating query with operation [%s]," + " field [%s] and alias [%s]", operation, field, alias) + field_filter = {} + if field_filters and field in field_filters: + field_filter = {field: field_filters[field]} + + if field == '*': + group_filter = self.get_group_filters_query( + group_filters).replace(" and ", "", 1) + filter_to_replace = f'|> filter(fn: (r) => {group_filter})' + new_query += query.replace( + '', + filter_to_replace).replace( + '', + f'''|> drop(columns: ["_time"]) + {'|> limit(n: ' + str(limit) + ')' if limit else ''} + |> yield(name: "result")''') + continue + + new_query += query.replace( + '', + f'|> filter(fn: (r) => r["_field"] == ' + f'"{field}" {self.get_group_filters_query(group_filters)} ' + f'{self.get_field_filters_query(field_filter)})' + ).replace( + '', + f'''|> {operation.lower()}() + |> keep(columns: [{columns_to_keep}]) + |> set(key: "_field", value: "{alias}") + |> yield(name: "{alias}")''') + return new_query + + def get_groupby(self, groupby): + if not groupby: + return "|> group()" + return f'''|> group(columns: [{','.join([f'"{g}"' + for g in groupby])}])''' + + def get_time_range(self, begin, end): + if not begin or not end: + return '' + return f'|> range(start: {begin.isoformat()}, stop: {end.isoformat()})' + + def get_query(self, begin, end, custom_fields, groupby=None, filters=None, + limit=None): + + custom_fields_processed = list( + map(lambda x: x[1], self.get_custom_fields(custom_fields))) + field_filters = dict(filter( + lambda f: f[0] in custom_fields_processed, filters.items())) + group_filters = dict(filter( + lambda f: f[0] not in field_filters, filters.items())) + + query = f''' + from(bucket:"{CONF.storage_influxdb.bucket}") + {self.get_time_range(begin, end)} + |> filter(fn: (r) => r["_measurement"] == "dataframes") + + {self.get_groupby(groupby)} + + ''' + + LOG.debug("Field Filters: %s", field_filters) + LOG.debug("Group Filters: %s", group_filters) + query = self.get_custom_fields_query(custom_fields, query, + field_filters, group_filters, + limit, groupby) + return query + + def query(self, query): + url_base = CONF.storage_influxdb.url + org = CONF.storage_influxdb.org + url = f'{url_base}/api/v2/query?org={org}' + response = requests.post( + url=url, + headers={ + 'Content-type': 'application/json', + 'Authorization': f'Token {CONF.storage_influxdb.token}'}, + data=json.dumps({ + 'query': query})) + response_text = response.text + LOG.debug("Raw Response: [%s].", response_text) + handled_response = [] + for csv_tables in response_text.split(',result,table,'): + csv_tables = ',result,table,' + csv_tables + LOG.debug("Processing CSV [%s].", csv_tables) + processed = list(csv.DictReader(io.StringIO(csv_tables))) + LOG.debug("Processed CSV in dict [%s]", processed) + handled_response.extend(processed) + return handled_response + + def get_total(self, types, begin, end, custom_fields, + groupby=None, filters=None, limit=None): + + LOG.debug("Query types: %s", types) + if types: + if not filters: + filters = {} + filters['type'] = types + + LOG.debug("Query filters: %s", filters) + query = self.get_query(begin, end, custom_fields, + groupby, filters, limit) + + LOG.debug("Executing the Flux query [%s].", query) + + return self.query(query) + class InfluxStorage(v2_storage.BaseStorage): def __init__(self, *args, **kwargs): super(InfluxStorage, self).__init__(*args, **kwargs) self._default_period = kwargs.get('period') or CONF.collect.period - self._conn = InfluxClient(default_period=self._default_period) + if CONF.storage_influxdb.version == 2: + self._conn = InfluxClientV2(default_period=self._default_period) + else: + self._conn = InfluxClient(default_period=self._default_period) def init(self): + if CONF.storage_influxdb.version == 2: + return policy = CONF.storage_influxdb.retention_policy database = CONF.storage_influxdb.database if not self._conn.retention_policy_exists(database, policy): @@ -371,25 +831,6 @@ class InfluxStorage(v2_storage.BaseStorage): def delete(self, begin=None, end=None, filters=None): self._conn.delete(begin, end, filters) - def _get_total_elem(self, begin, end, groupby, series_groupby, point): - if groupby and 'time' in groupby: - begin = tzutils.dt_from_iso(point['time']) - period = point.get(PERIOD_FIELD_NAME) or self._default_period - end = tzutils.add_delta(begin, datetime.timedelta(seconds=period)) - output = { - 'begin': begin, - 'end': end, - } - - for key in point.keys(): - if "time" != key: - output[key] = point[key] - - if groupby: - for group in _sanitized_groupby(groupby): - output[group] = series_groupby.get(group, '') - return output - def total(self, groupby=None, begin=None, end=None, metric_types=None, filters=None, offset=0, limit=1000, paginate=True, custom_fields="SUM(qty) AS qty, SUM(price) AS rate"): @@ -398,30 +839,14 @@ class InfluxStorage(v2_storage.BaseStorage): groupby = self.parse_groupby_syntax_to_groupby_elements(groupby) total = self._conn.get_total(metric_types, begin, end, - custom_fields, groupby, filters) + custom_fields, groupby, filters, limit) - output = [] - for (series_name, series_groupby), points in total.items(): - for point in points: - # NOTE(peschk_l): InfluxDB returns all timestamps for a given - # period and interval, even those with no data. This filters - # out periods with no data - - # NOTE (rafaelweingartner): the summary get API is allowing - # users to customize the report. Therefore, we only ignore - # data points, if all of the entries have None values. - # Otherwise, they are presented to the user. - if [k for k in point.keys() if point[k]]: - output.append(self._get_total_elem( - tzutils.utc_to_local(begin), - tzutils.utc_to_local(end), - groupby, - series_groupby, - point)) + output = self._conn.process_total( + total, begin, end, groupby, custom_fields, filters) groupby = _sanitized_groupby(groupby) if groupby: - output.sort(key=lambda x: [x[group] for group in groupby]) + output.sort(key=lambda x: [x[group] or "" for group in groupby]) return { 'total': len(output), diff --git a/cloudkitty/tests/storage/v2/influx_utils.py b/cloudkitty/tests/storage/v2/influx_utils.py index 76fce87a..8474a8a9 100644 --- a/cloudkitty/tests/storage/v2/influx_utils.py +++ b/cloudkitty/tests/storage/v2/influx_utils.py @@ -90,7 +90,7 @@ class FakeInfluxClient(InfluxClient): return target_serie def get_total(self, types, begin, end, custom_fields, groupby=None, - filters=None): + filters=None, limit=None): total = copy.deepcopy(self.total_sample) series = [] diff --git a/cloudkitty/tests/storage/v2/test_influxdb.py b/cloudkitty/tests/storage/v2/test_influxdb.py index 0478f592..8cd802a6 100644 --- a/cloudkitty/tests/storage/v2/test_influxdb.py +++ b/cloudkitty/tests/storage/v2/test_influxdb.py @@ -209,3 +209,233 @@ class TestInfluxClient(unittest.TestCase): self._storage.delete(end=datetime(2019, 1, 2)) m.assert_called_once_with("""DELETE FROM "dataframes" WHERE """ """time < '2019-01-02T00:00:00';""") + + def test_process_total(self): + begin = datetime(2019, 1, 2, 10) + end = datetime(2019, 1, 2, 11) + groupby = ['valA', 'time'] + points_1 = [ + { + 'qty': 42, + 'price': 1.0, + 'time': begin.isoformat() + } + ] + series_groupby_1 = { + 'valA': '1' + } + points_2 = [ + { + 'qty': 12, + 'price': 2.0, + 'time': begin.isoformat() + } + ] + series_groupby_2 = { + 'valA': '2' + } + points_3 = [ + { + 'qty': None, + 'price': None, + 'time': None + } + ] + series_groupby_3 = { + 'valA': None + } + series_name = 'dataframes' + items = [((series_name, series_groupby_1), points_1), + ((series_name, series_groupby_2), points_2), + ((series_name, series_groupby_3), points_3)] + total = FakeResultSet(items=items) + result = self.client.process_total(total=total, begin=begin, end=end, + groupby=groupby) + expected = [{'begin': tzutils.utc_to_local(begin), + 'end': tzutils.utc_to_local(end), + 'qty': 42, + 'price': 1.0, + 'valA': '1'}, + {'begin': tzutils.utc_to_local(begin), + 'end': tzutils.utc_to_local(end), + 'qty': 12, + 'price': 2.0, + 'valA': '2'} + ] + self.assertEqual(expected, result) + + +class TestInfluxClientV2(unittest.TestCase): + + @mock.patch('cloudkitty.storage.v2.influx.InfluxDBClient') + def setUp(self, client_mock): + self.period_begin = tzutils.local_to_utc( + tzutils.get_month_start()) + self.period_end = tzutils.local_to_utc( + tzutils.get_next_month()) + self.client = influx.InfluxClientV2() + + @mock.patch('cloudkitty.storage.v2.influx.requests') + def test_query(self, mock_request): + static_vals = ['', 'result', 'table', '_start', '_value'] + custom_fields = 'last(f1) AS f1, last(f2) AS f2, last(f3) AS f3' + groups = ['g1', 'g2', 'g3'] + data = [ + static_vals + groups, + ['', 'f1', 0, 1, 1, 1, 2, 3], + ['', 'f2', 0, 1, 2, 1, 2, 3], + ['', 'f3', 0, 1, 3, 1, 2, 3], + static_vals + groups, + ['', 'f1', 0, 1, 3, 3, 1, 2], + ['', 'f2', 0, 1, 1, 3, 1, 2], + ['', 'f3', 0, 1, 2, 3, 1, 2], + static_vals + groups, + ['', 'f1', 0, 1, 2, 2, 3, 1], + ['', 'f2', 0, 1, 3, 2, 3, 1], + ['', 'f3', 0, 1, 1, 2, 3, 1] + ] + + expected_value = [ + {'f1': 1.0, 'f2': 2.0, 'f3': 3.0, 'begin': self.period_begin, + 'end': self.period_end, 'g1': '1', 'g2': '2', 'g3': '3'}, + {'f1': 3.0, 'f2': 1.0, 'f3': 2.0, 'begin': self.period_begin, + 'end': self.period_end, 'g1': '3', 'g2': '1', 'g3': '2'}, + {'f1': 2.0, 'f2': 3.0, 'f3': 1.0, 'begin': self.period_begin, + 'end': self.period_end, 'g1': '2', 'g2': '3', 'g3': '1'} + ] + + data_csv = '\n'.join([','.join(map(str, d)) for d in data]) + mock_request.post.return_value = mock.Mock(text=data_csv) + response = self.client.get_total( + None, self.period_begin, self.period_end, custom_fields, + filters={}, groupby=groups) + + result = self.client.process_total( + response, self.period_begin, self.period_end, + groups, custom_fields, {}) + + self.assertEqual(result, expected_value) + + def test_query_build(self): + custom_fields = 'last(field1) AS F1, sum(field2) AS F2' + groupby = ['group1', 'group2', 'group3'] + filters = { + 'filter1': '10', + 'filter2': 'filter2_filter' + } + beg = self.period_begin.isoformat() + end = self.period_end.isoformat() + expected = ('\n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "field1"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> last()\n' + ' |> keep(columns: ["group1", "group2",' + ' "group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "F1")\n' + ' |> yield(name: "F1")\n' + ' \n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "field2"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> sum()\n' + ' |> keep(columns: ["group1", "group2", ' + '"group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "F2")\n' + ' |> yield(name: "F2")\n' + ' ') + + query = self.client.get_query(begin=self.period_begin, + end=self.period_end, + custom_fields=custom_fields, + filters=filters, + groupby=groupby) + + self.assertEqual(query, expected) + + def test_query_build_no_custom_fields(self): + custom_fields = None + groupby = ['group1', 'group2', 'group3'] + filters = { + 'filter1': '10', + 'filter2': 'filter2_filter' + } + beg = self.period_begin.isoformat() + end = self.period_end.isoformat() + self.maxDiff = None + expected = ('\n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "price"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> sum()\n' + ' |> keep(columns: ["group1", "group2",' + ' "group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "price")\n' + ' |> yield(name: "price")\n' + ' \n' + ' from(bucket:"cloudkitty")\n' + f' |> range(start: {beg}, stop: {end})\n' + ' |> filter(fn: (r) => r["_measurement"] == ' + '"dataframes")\n' + ' |> filter(fn: (r) => r["_field"] == "qty"' + ' and r.filter1==10 and r.filter2=="filter2_filter" )\n' + ' |> group(columns: ["group1","group2",' + '"group3"])\n' + ' |> sum()\n' + ' |> keep(columns: ["group1", "group2", ' + '"group3", "_field", "_value", "_start", "_stop"])\n' + ' |> set(key: "_field", value: "qty")\n' + ' |> yield(name: "qty")\n' + ' ') + + query = self.client.get_query(begin=self.period_begin, + end=self.period_end, + custom_fields=custom_fields, + filters=filters, + groupby=groupby) + + self.assertEqual(query, expected) + + def test_query_build_all_custom_fields(self): + custom_fields = '*' + groupby = ['group1', 'group2', 'group3'] + filters = { + 'filter1': '10', + 'filter2': 'filter2_filter' + } + beg = self.period_begin.isoformat() + end = self.period_end.isoformat() + expected = (f''' + from(bucket:"cloudkitty") + |> range(start: {beg}, stop: {end}) + |> filter(fn: (r) => r["_measurement"] == "dataframes") + |> filter(fn: (r) => r.filter1==10 and r.filter2=="filter + 2_filter") + |> group(columns: ["group1","group2","group3"]) + |> drop(columns: ["_time"]) + |> yield(name: "result")'''.replace( + ' ', '').replace('\n', '').replace('\t', '')) + + query = self.client.get_query(begin=self.period_begin, + end=self.period_end, + custom_fields=custom_fields, + filters=filters, + groupby=groupby).replace( + ' ', '').replace('\n', '').replace('\t', '') + + self.assertEqual(query, expected) diff --git a/devstack/plugin.sh b/devstack/plugin.sh index 993afeb1..12047841 100755 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -165,7 +165,7 @@ function configure_cloudkitty { iniset $CLOUDKITTY_CONF fetcher_keystone keystone_version 3 fi - if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} user ${CLOUDKITTY_INFLUXDB_USER} iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} password ${CLOUDKITTY_INFLUXDB_PASSWORD} iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} database ${CLOUDKITTY_INFLUXDB_DATABASE} @@ -173,6 +173,14 @@ function configure_cloudkitty { iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT} fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_INFLUXDB_HOST} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} url "http://${CLOUDKITTY_INFLUXDB_HOST}:${CLOUDKITTY_INFLUXDB_PORT}" + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} token ${CLOUDKITTY_INFLUXDB_PASSWORD} + iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} version 2 + fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "elasticsearch" ]; then iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_ELASTICSEARCH_HOST} iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} index_name ${CLOUDKITTY_ELASTICSEARCH_INDEX} @@ -242,9 +250,13 @@ function create_cloudkitty_data_dir { } function create_influxdb_database { - if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then influx -execute "CREATE DATABASE ${CLOUDKITTY_INFLUXDB_DATABASE}" fi + if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then + influx setup --username ${CLOUDKITTY_INFLUXDB_USER} --password ${CLOUDKITTY_INFLUXDB_PASSWORD} --token ${CLOUDKITTY_INFLUXDB_PASSWORD} --org openstack --bucket cloudkitty --force + fi + } function create_elasticsearch_index { @@ -296,11 +308,27 @@ function install_influx_ubuntu { sudo dpkg -i --skip-same-version ${influxdb_file} } +function install_influx_v2_ubuntu { + local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb) + sudo dpkg -i --skip-same-version ${influxdb_file} + local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz) + tar xvzf ${influxcli_file} + sudo cp ./influx /usr/local/bin/ +} + function install_influx_fedora { local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb-1.6.3.x86_64.rpm) sudo yum localinstall -y ${influxdb_file} } +function install_influx_v2_fedora { + local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.5-1.x86_64.rpm) + sudo yum localinstall -y ${influxdb_file} + local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz) + tar xvzf ${influxcli_file} + sudo cp ./influx /usr/local/bin/ +} + function install_influx { if is_ubuntu; then install_influx_ubuntu @@ -313,6 +341,19 @@ function install_influx { sudo systemctl start influxdb || sudo systemctl restart influxdb } + +function install_influx_v2 { + if is_ubuntu; then + install_influx_v2_ubuntu + elif is_fedora; then + install_influx_v2_fedora + else + die $LINENO "Distribution must be Debian or Fedora-based" + fi + sudo cp -f "${CLOUDKITTY_DIR}"/devstack/files/influxdb.conf /etc/influxdb/influxdb.conf + sudo systemctl start influxdb || sudo systemctl restart influxdb +} + function install_elasticsearch_ubuntu { local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.9/opensearch-1.3.9-linux-x64.deb) sudo dpkg -i --skip-same-version ${opensearch_file} @@ -367,9 +408,10 @@ function install_opensearch { function install_cloudkitty { git_clone $CLOUDKITTY_REPO $CLOUDKITTY_DIR $CLOUDKITTY_BRANCH setup_develop $CLOUDKITTY_DIR - - if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ]; then + if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then install_influx + elif [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then + install_influx_v2 elif [ $CLOUDKITTY_STORAGE_BACKEND == 'elasticsearch' ]; then install_elasticsearch elif [ $CLOUDKITTY_STORAGE_BACKEND == 'opensearch' ]; then diff --git a/devstack/settings b/devstack/settings index 106f81b4..758169ff 100644 --- a/devstack/settings +++ b/devstack/settings @@ -50,6 +50,7 @@ CLOUDKITTY_METRICS_CONF=metrics.yml # Set CloudKitty storage info CLOUDKITTY_STORAGE_BACKEND=${CLOUDKITTY_STORAGE_BACKEND:-"influxdb"} CLOUDKITTY_STORAGE_VERSION=${CLOUDKITTY_STORAGE_VERSION:-"2"} +CLOUDKITTY_INFLUX_VERSION=${CLOUDKITTY_INFLUX_VERSION:-1} # Set CloudKitty output info CLOUDKITTY_OUTPUT_BACKEND=${CLOUDKITTY_OUTPUT_BACKEND:-"cloudkitty.backend.file.FileBackend"} diff --git a/releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml b/releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml new file mode 100644 index 00000000..01b4943f --- /dev/null +++ b/releasenotes/notes/add-support-to-influxdb-v2-storage-backend-f94df79f9e5276a8.yaml @@ -0,0 +1,4 @@ +--- +features: + - | + Add support to Influx v2 database as storage backend. diff --git a/requirements.txt b/requirements.txt index 87dac3be..25071ba2 100644 --- a/requirements.txt +++ b/requirements.txt @@ -33,6 +33,7 @@ stevedore>=3.2.2 # Apache-2.0 tooz>=2.7.1 # Apache-2.0 voluptuous>=0.12.0 # BSD License influxdb>=5.3.1 # MIT +influxdb-client>=1.36.0 # MIT Flask>=2.0.0 # BSD Flask-RESTful>=0.3.9 # BSD cotyledon>=1.7.3 # Apache-2.0