Merge "Add support to InfluxDB v2 as storage backend"
This commit is contained in:
commit
85ad6904fd
19
.zuul.yaml
19
.zuul.yaml
@ -76,12 +76,25 @@
|
||||
name: cloudkitty-tempest-full-v2-storage-influxdb
|
||||
parent: base-cloudkitty-v2-api-tempest-job
|
||||
description: |
|
||||
Job testing cloudkitty installation on devstack with python 3 and the
|
||||
InfluxDB v2 storage driver and running tempest tests
|
||||
Job testing cloudkitty installation on devstack with python 3, InfluxDB
|
||||
v1 and the InfluxDB v2 storage driver and running tempest tests
|
||||
vars:
|
||||
devstack_localrc:
|
||||
CLOUDKITTY_STORAGE_BACKEND: influxdb
|
||||
CLOUDKITTY_STORAGE_VERSION: 2
|
||||
CLOUDKITTY_INFLUX_VERSION: 1
|
||||
|
||||
- job:
|
||||
name: cloudkitty-tempest-full-v2-storage-influxdb-v2
|
||||
parent: base-cloudkitty-v2-api-tempest-job
|
||||
description: |
|
||||
Job testing cloudkitty installation on devstack with python 3, InfluxDB
|
||||
v2 and the InfluxDB v2 storage driver and running tempest tests
|
||||
vars:
|
||||
devstack_localrc:
|
||||
CLOUDKITTY_STORAGE_BACKEND: influxdb
|
||||
CLOUDKITTY_STORAGE_VERSION: 2
|
||||
CLOUDKITTY_INFLUX_VERSION: 2
|
||||
|
||||
- job:
|
||||
name: cloudkitty-tempest-full-v2-storage-elasticsearch
|
||||
@ -139,6 +152,7 @@
|
||||
check:
|
||||
jobs:
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb-v2
|
||||
- cloudkitty-tempest-full-v2-storage-elasticsearch:
|
||||
voting: false
|
||||
- cloudkitty-tempest-full-v2-storage-opensearch:
|
||||
@ -150,5 +164,6 @@
|
||||
gate:
|
||||
jobs:
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb
|
||||
- cloudkitty-tempest-full-v2-storage-influxdb-v2
|
||||
- cloudkitty-tempest-full-v1-storage-sqlalchemy
|
||||
- cloudkitty-tempest-full-ipv6-only
|
||||
|
@ -12,11 +12,18 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
#
|
||||
import csv
|
||||
import datetime
|
||||
import influxdb
|
||||
import io
|
||||
import json
|
||||
import re
|
||||
|
||||
from influxdb_client.client.write_api import SYNCHRONOUS
|
||||
from influxdb_client import InfluxDBClient
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log
|
||||
import requests
|
||||
|
||||
from cloudkitty import dataframe
|
||||
from cloudkitty.storage import v2 as v2_storage
|
||||
@ -56,6 +63,29 @@ influx_storage_opts = [
|
||||
help='Path of the CA certificate to trust for HTTPS connections',
|
||||
default=None
|
||||
),
|
||||
cfg.IntOpt('version', help='InfluxDB version', default=1),
|
||||
cfg.IntOpt('query_timeout', help='Flux query timeout in milliseconds',
|
||||
default=3600000),
|
||||
cfg.StrOpt(
|
||||
'token',
|
||||
help='InfluxDB API token for version 2 authentication',
|
||||
default=None
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'org',
|
||||
help='InfluxDB 2 org',
|
||||
default="openstack"
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'bucket',
|
||||
help='InfluxDB 2 bucket',
|
||||
default="cloudkitty"
|
||||
),
|
||||
cfg.StrOpt(
|
||||
'url',
|
||||
help='InfluxDB 2 URL',
|
||||
default=None
|
||||
)
|
||||
]
|
||||
|
||||
CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP)
|
||||
@ -192,7 +222,7 @@ class InfluxClient(object):
|
||||
return " AND " + InfluxClient._get_filter("type", types)
|
||||
|
||||
def get_total(self, types, begin, end, custom_fields,
|
||||
groupby=None, filters=None):
|
||||
groupby=None, filters=None, limit=None):
|
||||
|
||||
self.validate_custom_fields(custom_fields)
|
||||
|
||||
@ -232,11 +262,8 @@ class InfluxClient(object):
|
||||
" clauses are not allowed [%s].",
|
||||
field, forbidden_clauses)
|
||||
|
||||
def retrieve(self,
|
||||
types,
|
||||
filters,
|
||||
begin, end,
|
||||
offset=0, limit=1000, paginate=True):
|
||||
def retrieve(self, types, filters, begin, end, offset=0, limit=1000,
|
||||
paginate=True):
|
||||
query = 'SELECT * FROM "dataframes"'
|
||||
query += self._get_time_query(begin, end)
|
||||
query += self._get_filter_query(filters)
|
||||
@ -282,15 +309,448 @@ class InfluxClient(object):
|
||||
|
||||
self._conn.query(query)
|
||||
|
||||
def _get_total_elem(self, begin, end, groupby, series_groupby, point):
|
||||
if groupby and 'time' in groupby:
|
||||
begin = tzutils.dt_from_iso(point['time'])
|
||||
period = point.get(PERIOD_FIELD_NAME) or self._default_period
|
||||
end = tzutils.add_delta(begin, datetime.timedelta(seconds=period))
|
||||
output = {
|
||||
'begin': begin,
|
||||
'end': end,
|
||||
}
|
||||
|
||||
for key in point.keys():
|
||||
if "time" != key:
|
||||
output[key] = point[key]
|
||||
|
||||
if groupby:
|
||||
for group in _sanitized_groupby(groupby):
|
||||
output[group] = series_groupby.get(group, '')
|
||||
return output
|
||||
|
||||
def process_total(self, total, begin, end, groupby, *args):
|
||||
output = []
|
||||
for (series_name, series_groupby), points in total.items():
|
||||
for point in points:
|
||||
# NOTE(peschk_l): InfluxDB returns all timestamps for a given
|
||||
# period and interval, even those with no data. This filters
|
||||
# out periods with no data
|
||||
|
||||
# NOTE (rafaelweingartner): the summary get API is allowing
|
||||
# users to customize the report. Therefore, we only ignore
|
||||
# data points, if all of the entries have None values.
|
||||
# Otherwise, they are presented to the user.
|
||||
if [k for k in point.keys() if point[k]]:
|
||||
output.append(self._get_total_elem(
|
||||
tzutils.utc_to_local(begin),
|
||||
tzutils.utc_to_local(end),
|
||||
groupby,
|
||||
series_groupby,
|
||||
point))
|
||||
return output
|
||||
|
||||
|
||||
class InfluxClientV2(InfluxClient):
|
||||
"""Class used to facilitate interaction with InfluxDB v2
|
||||
|
||||
custom_fields_rgx: Regex to parse the input custom_fields and
|
||||
retrieve the field name, the desired alias
|
||||
and the aggregation function to use.
|
||||
It allows us to keep the same custom_fields
|
||||
representation for both InfluxQL and Flux
|
||||
queries.
|
||||
|
||||
"""
|
||||
|
||||
custom_fields_rgx = r'([\w_\\"]+)\(([\w_\\"]+)\) (AS|as) ' \
|
||||
r'\\?"?([\w_ \\]+)"?,? ?'
|
||||
|
||||
class FluxResponseHandler(object):
|
||||
"""Class used to process the response of Flux queries
|
||||
|
||||
As the Flux response splits its result set by the
|
||||
requested fields, we need to merge them into a single
|
||||
one based on their groups (tags).
|
||||
|
||||
Using this approach we keep the response data
|
||||
compatible with the InfluxQL result set, where we
|
||||
already have the multiple result set for each field
|
||||
merged into a single one.
|
||||
"""
|
||||
|
||||
def __init__(self, response, groupby, fields, begin, end,
|
||||
field_filters):
|
||||
self.data = response
|
||||
self.field_filters = field_filters
|
||||
self.response = {}
|
||||
self.begin = begin
|
||||
self.end = end
|
||||
self.groupby = groupby
|
||||
self.fields = fields
|
||||
self.process()
|
||||
|
||||
def process(self):
|
||||
"""This method merges all the Flux result sets into a single one.
|
||||
|
||||
To make sure the fields filtering comply with the user's
|
||||
request, we need to remove the merged entries that have None
|
||||
value for filtered fields, we need to do that because working
|
||||
with fields one by one in Flux queries is more performant
|
||||
than working with all the fields together, but it brings some
|
||||
problems when we want to filter some data. E.g:
|
||||
|
||||
We want the fields A and B, grouped by C and D, and the field
|
||||
A must be 2. Imagine this query for the following
|
||||
dataset:
|
||||
|
||||
A : C : D B : C : D
|
||||
1 : 1 : 1 5 : 1 : 1
|
||||
2 : 2 : 2 6 : 2 : 2
|
||||
2 : 3 : 3 7 : 3 : 3
|
||||
2 : 4 : 4
|
||||
|
||||
The result set is going to be like:
|
||||
|
||||
A : C : D B : C : D
|
||||
2 : 2 : 2 5 : 1 : 1
|
||||
2 : 3 : 3 6 : 2 : 2
|
||||
2 : 4 : 4 7 : 3 : 3
|
||||
|
||||
And the merged value is going to be like:
|
||||
|
||||
A : B : C : D
|
||||
None : 5 : 1 : 1
|
||||
2 : 6 : 2 : 2
|
||||
2 : 7 : 3 : 3
|
||||
2 : None : 4 : 4
|
||||
|
||||
So, we need to remove the first undesired entry to get the
|
||||
correct result:
|
||||
|
||||
A : B : C : D
|
||||
2 : 6 : 2 : 2
|
||||
2 : 7 : 3 : 3
|
||||
2 : None : 4 : 4
|
||||
"""
|
||||
|
||||
LOG.debug("Using fields %s to process InfluxDB V2 response.",
|
||||
self.fields)
|
||||
LOG.debug("Start processing data [%s] of InfluxDB V2 API.",
|
||||
self.data)
|
||||
if self.fields == ["*"] and not self.groupby:
|
||||
self.process_data_wildcard()
|
||||
else:
|
||||
self.process_data_with_fields()
|
||||
|
||||
LOG.debug("Data processed by the InfluxDB V2 backend with "
|
||||
"result [%s].", self.response)
|
||||
LOG.debug("Start sanitizing the response of Influx V2 API.")
|
||||
self.sanitize_filtered_entries()
|
||||
LOG.debug("Response sanitized [%s] for InfluxDB V2 API.",
|
||||
self.response)
|
||||
|
||||
def process_data_wildcard(self):
|
||||
LOG.debug("Processing wildcard response for InfluxDB V2 API.")
|
||||
found_fields = set()
|
||||
for r in self.data:
|
||||
if self.is_header_entry(r):
|
||||
LOG.debug("Skipping header entry: [%s].", r)
|
||||
continue
|
||||
r_key = ''.join(sorted(r.values()))
|
||||
found_fields.add(r['_field'])
|
||||
r_value = r
|
||||
r_value['begin'] = self.begin
|
||||
r_value['end'] = self.end
|
||||
self.response.setdefault(
|
||||
r_key, r_value)[r['result']] = float(r['_value'])
|
||||
|
||||
def process_data_with_fields(self):
|
||||
for r in self.data:
|
||||
if self.is_header_entry(r):
|
||||
LOG.debug("Skipping header entry: [%s].", r)
|
||||
continue
|
||||
r_key = ''
|
||||
r_value = {f: None for f in self.fields}
|
||||
r_value['begin'] = self.begin
|
||||
r_value['end'] = self.end
|
||||
for g in (self.groupby or []):
|
||||
val = r.get(g)
|
||||
r_key += val or ''
|
||||
r_value[g] = val
|
||||
|
||||
self.response.setdefault(
|
||||
r_key, r_value)[r['result']] = float(r['_value'])
|
||||
|
||||
@staticmethod
|
||||
def is_header_entry(entry):
|
||||
"""Check header entries.
|
||||
|
||||
As the response contains multiple resultsets,
|
||||
each entry in the response CSV has its own
|
||||
header, which is the same for all the result sets,
|
||||
but the CSV parser does not ignore it
|
||||
and processes all headers except the first as a
|
||||
dict entry, so for these cases, each dict's value
|
||||
is going to be the same as the dict's key, so we
|
||||
are picking one and if it is this case, we skip it.
|
||||
|
||||
"""
|
||||
|
||||
return entry.get('_start') == '_start'
|
||||
|
||||
def sanitize_filtered_entries(self):
|
||||
"""Removes entries where filtered fields have None as value."""
|
||||
|
||||
for d in self.field_filters or []:
|
||||
for k in list(self.response.keys()):
|
||||
if self.response[k][d] is None:
|
||||
self.response.pop(k, None)
|
||||
|
||||
def __init__(self, default_period=None):
|
||||
super().__init__(default_period=default_period)
|
||||
self.client = InfluxDBClient(
|
||||
url=CONF.storage_influxdb.url,
|
||||
timeout=CONF.storage_influxdb.query_timeout,
|
||||
token=CONF.storage_influxdb.token,
|
||||
org=CONF.storage_influxdb.org)
|
||||
self._conn = self.client
|
||||
|
||||
def retrieve(self, types, filters, begin, end, offset=0, limit=1000,
|
||||
paginate=True):
|
||||
|
||||
query = self.get_query(begin, end, '*', filters=filters)
|
||||
response = self.query(query)
|
||||
output = self.process_total(
|
||||
response, begin, end, None, '*', filters)
|
||||
LOG.debug("Retrieved output %s", output)
|
||||
results = {'results': output[
|
||||
offset:offset + limit] if paginate else output}
|
||||
return len(output), results
|
||||
|
||||
def delete(self, begin, end, filters):
|
||||
predicate = '_measurement="dataframes"'
|
||||
f = self.get_group_filters_query(
|
||||
filters, fmt=lambda x: '"' + str(x) + '"')
|
||||
if f:
|
||||
f = f.replace('==', '=').replace('and', 'AND')
|
||||
predicate += f'{f}'
|
||||
|
||||
LOG.debug("InfluxDB v2 deleting elements filtering by [%s] and "
|
||||
"with [begin=%s, end=%s].", predicate, begin, end)
|
||||
delete_api = self.client.delete_api()
|
||||
delete_api.delete(begin, end, bucket=CONF.storage_influxdb.bucket,
|
||||
predicate=predicate)
|
||||
|
||||
def process_total(self, total, begin, end, groupby, custom_fields,
|
||||
filters):
|
||||
cf = self.get_custom_fields(custom_fields)
|
||||
fields = list(map(lambda f: f[2], cf))
|
||||
c_fields = {f[1]: f[2] for f in cf}
|
||||
field_filters = [c_fields[f] for f in filters if f in c_fields]
|
||||
handler = self.FluxResponseHandler(total, groupby, fields, begin, end,
|
||||
field_filters)
|
||||
return list(handler.response.values())
|
||||
|
||||
def commit(self):
|
||||
total_points = len(self._points)
|
||||
if len(self._points) < 1:
|
||||
return
|
||||
LOG.debug('Pushing {} points to InfluxDB'.format(total_points))
|
||||
self.write_points(self._points,
|
||||
retention_policy=self._retention_policy)
|
||||
self._points = []
|
||||
|
||||
def write_points(self, points, retention_policy=None):
|
||||
write_api = self.client.write_api(write_options=SYNCHRONOUS)
|
||||
[write_api.write(
|
||||
bucket=CONF.storage_influxdb.bucket, record=p)
|
||||
for p in points]
|
||||
|
||||
def _get_filter_query(self, filters):
|
||||
if not filters:
|
||||
return ''
|
||||
return ' and ' + ' and '.join(
|
||||
self._get_filter(k, v) for k, v in filters.items())
|
||||
|
||||
def get_custom_fields(self, custom_fields):
|
||||
|
||||
if not custom_fields:
|
||||
return []
|
||||
|
||||
if custom_fields.strip() == '*':
|
||||
return [('*', '*', '*')]
|
||||
|
||||
groups = [list(i.groups()) for i in re.finditer(
|
||||
self.custom_fields_rgx, custom_fields)]
|
||||
|
||||
# Remove the "As|as" group that is useless.
|
||||
if groups:
|
||||
for g in groups:
|
||||
del g[2]
|
||||
|
||||
return groups
|
||||
|
||||
def get_group_filters_query(self, group_filters, fmt=lambda x: f'r.{x}'):
|
||||
if not group_filters:
|
||||
return ''
|
||||
|
||||
get_val = (lambda x: x if isinstance(v, (int, float)) or
|
||||
x.isnumeric() else f'"{x}"')
|
||||
|
||||
f = ''
|
||||
for k, v in group_filters.items():
|
||||
if isinstance(v, (list, tuple)):
|
||||
if len(v) == 1:
|
||||
f += f' and {fmt(k)}=={get_val(v[0])}'
|
||||
continue
|
||||
|
||||
f += ' and (%s)' % ' or '.join([f'{fmt(k)}=={get_val(val)}'
|
||||
for val in v])
|
||||
continue
|
||||
|
||||
f += f' and {fmt(k)}=={get_val(v)}'
|
||||
|
||||
return f
|
||||
|
||||
def get_field_filters_query(self, field_filters,
|
||||
fmt=lambda x: 'r["_value"]'):
|
||||
return self.get_group_filters_query(field_filters, fmt)
|
||||
|
||||
def get_custom_fields_query(self, custom_fields, query, field_filters,
|
||||
group_filters, limit=None, groupby=None):
|
||||
if not groupby:
|
||||
groupby = []
|
||||
if not custom_fields:
|
||||
custom_fields = 'sum(price) AS price,sum(qty) AS qty'
|
||||
columns_to_keep = ', '.join(map(lambda g: f'"{g}"', groupby))
|
||||
columns_to_keep += ', "_field", "_value", "_start", "_stop"'
|
||||
new_query = ''
|
||||
LOG.debug("Custom fields: %s", custom_fields)
|
||||
LOG.debug("Custom fields processed: %s",
|
||||
self.get_custom_fields(custom_fields))
|
||||
for operation, field, alias in self.get_custom_fields(custom_fields):
|
||||
LOG.debug("Generating query with operation [%s],"
|
||||
" field [%s] and alias [%s]", operation, field, alias)
|
||||
field_filter = {}
|
||||
if field_filters and field in field_filters:
|
||||
field_filter = {field: field_filters[field]}
|
||||
|
||||
if field == '*':
|
||||
group_filter = self.get_group_filters_query(
|
||||
group_filters).replace(" and ", "", 1)
|
||||
filter_to_replace = f'|> filter(fn: (r) => {group_filter})'
|
||||
new_query += query.replace(
|
||||
'<placeholder-filter>',
|
||||
filter_to_replace).replace(
|
||||
'<placeholder-operations>',
|
||||
f'''|> drop(columns: ["_time"])
|
||||
{'|> limit(n: ' + str(limit) + ')' if limit else ''}
|
||||
|> yield(name: "result")''')
|
||||
continue
|
||||
|
||||
new_query += query.replace(
|
||||
'<placeholder-filter>',
|
||||
f'|> filter(fn: (r) => r["_field"] == '
|
||||
f'"{field}" {self.get_group_filters_query(group_filters)} '
|
||||
f'{self.get_field_filters_query(field_filter)})'
|
||||
).replace(
|
||||
'<placeholder-operations>',
|
||||
f'''|> {operation.lower()}()
|
||||
|> keep(columns: [{columns_to_keep}])
|
||||
|> set(key: "_field", value: "{alias}")
|
||||
|> yield(name: "{alias}")''')
|
||||
return new_query
|
||||
|
||||
def get_groupby(self, groupby):
|
||||
if not groupby:
|
||||
return "|> group()"
|
||||
return f'''|> group(columns: [{','.join([f'"{g}"'
|
||||
for g in groupby])}])'''
|
||||
|
||||
def get_time_range(self, begin, end):
|
||||
if not begin or not end:
|
||||
return ''
|
||||
return f'|> range(start: {begin.isoformat()}, stop: {end.isoformat()})'
|
||||
|
||||
def get_query(self, begin, end, custom_fields, groupby=None, filters=None,
|
||||
limit=None):
|
||||
|
||||
custom_fields_processed = list(
|
||||
map(lambda x: x[1], self.get_custom_fields(custom_fields)))
|
||||
field_filters = dict(filter(
|
||||
lambda f: f[0] in custom_fields_processed, filters.items()))
|
||||
group_filters = dict(filter(
|
||||
lambda f: f[0] not in field_filters, filters.items()))
|
||||
|
||||
query = f'''
|
||||
from(bucket:"{CONF.storage_influxdb.bucket}")
|
||||
{self.get_time_range(begin, end)}
|
||||
|> filter(fn: (r) => r["_measurement"] == "dataframes")
|
||||
<placeholder-filter>
|
||||
{self.get_groupby(groupby)}
|
||||
<placeholder-operations>
|
||||
'''
|
||||
|
||||
LOG.debug("Field Filters: %s", field_filters)
|
||||
LOG.debug("Group Filters: %s", group_filters)
|
||||
query = self.get_custom_fields_query(custom_fields, query,
|
||||
field_filters, group_filters,
|
||||
limit, groupby)
|
||||
return query
|
||||
|
||||
def query(self, query):
|
||||
url_base = CONF.storage_influxdb.url
|
||||
org = CONF.storage_influxdb.org
|
||||
url = f'{url_base}/api/v2/query?org={org}'
|
||||
response = requests.post(
|
||||
url=url,
|
||||
headers={
|
||||
'Content-type': 'application/json',
|
||||
'Authorization': f'Token {CONF.storage_influxdb.token}'},
|
||||
data=json.dumps({
|
||||
'query': query}))
|
||||
response_text = response.text
|
||||
LOG.debug("Raw Response: [%s].", response_text)
|
||||
handled_response = []
|
||||
for csv_tables in response_text.split(',result,table,'):
|
||||
csv_tables = ',result,table,' + csv_tables
|
||||
LOG.debug("Processing CSV [%s].", csv_tables)
|
||||
processed = list(csv.DictReader(io.StringIO(csv_tables)))
|
||||
LOG.debug("Processed CSV in dict [%s]", processed)
|
||||
handled_response.extend(processed)
|
||||
return handled_response
|
||||
|
||||
def get_total(self, types, begin, end, custom_fields,
|
||||
groupby=None, filters=None, limit=None):
|
||||
|
||||
LOG.debug("Query types: %s", types)
|
||||
if types:
|
||||
if not filters:
|
||||
filters = {}
|
||||
filters['type'] = types
|
||||
|
||||
LOG.debug("Query filters: %s", filters)
|
||||
query = self.get_query(begin, end, custom_fields,
|
||||
groupby, filters, limit)
|
||||
|
||||
LOG.debug("Executing the Flux query [%s].", query)
|
||||
|
||||
return self.query(query)
|
||||
|
||||
|
||||
class InfluxStorage(v2_storage.BaseStorage):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(InfluxStorage, self).__init__(*args, **kwargs)
|
||||
self._default_period = kwargs.get('period') or CONF.collect.period
|
||||
self._conn = InfluxClient(default_period=self._default_period)
|
||||
if CONF.storage_influxdb.version == 2:
|
||||
self._conn = InfluxClientV2(default_period=self._default_period)
|
||||
else:
|
||||
self._conn = InfluxClient(default_period=self._default_period)
|
||||
|
||||
def init(self):
|
||||
if CONF.storage_influxdb.version == 2:
|
||||
return
|
||||
policy = CONF.storage_influxdb.retention_policy
|
||||
database = CONF.storage_influxdb.database
|
||||
if not self._conn.retention_policy_exists(database, policy):
|
||||
@ -371,25 +831,6 @@ class InfluxStorage(v2_storage.BaseStorage):
|
||||
def delete(self, begin=None, end=None, filters=None):
|
||||
self._conn.delete(begin, end, filters)
|
||||
|
||||
def _get_total_elem(self, begin, end, groupby, series_groupby, point):
|
||||
if groupby and 'time' in groupby:
|
||||
begin = tzutils.dt_from_iso(point['time'])
|
||||
period = point.get(PERIOD_FIELD_NAME) or self._default_period
|
||||
end = tzutils.add_delta(begin, datetime.timedelta(seconds=period))
|
||||
output = {
|
||||
'begin': begin,
|
||||
'end': end,
|
||||
}
|
||||
|
||||
for key in point.keys():
|
||||
if "time" != key:
|
||||
output[key] = point[key]
|
||||
|
||||
if groupby:
|
||||
for group in _sanitized_groupby(groupby):
|
||||
output[group] = series_groupby.get(group, '')
|
||||
return output
|
||||
|
||||
def total(self, groupby=None, begin=None, end=None, metric_types=None,
|
||||
filters=None, offset=0, limit=1000, paginate=True,
|
||||
custom_fields="SUM(qty) AS qty, SUM(price) AS rate"):
|
||||
@ -398,30 +839,14 @@ class InfluxStorage(v2_storage.BaseStorage):
|
||||
groupby = self.parse_groupby_syntax_to_groupby_elements(groupby)
|
||||
|
||||
total = self._conn.get_total(metric_types, begin, end,
|
||||
custom_fields, groupby, filters)
|
||||
custom_fields, groupby, filters, limit)
|
||||
|
||||
output = []
|
||||
for (series_name, series_groupby), points in total.items():
|
||||
for point in points:
|
||||
# NOTE(peschk_l): InfluxDB returns all timestamps for a given
|
||||
# period and interval, even those with no data. This filters
|
||||
# out periods with no data
|
||||
|
||||
# NOTE (rafaelweingartner): the summary get API is allowing
|
||||
# users to customize the report. Therefore, we only ignore
|
||||
# data points, if all of the entries have None values.
|
||||
# Otherwise, they are presented to the user.
|
||||
if [k for k in point.keys() if point[k]]:
|
||||
output.append(self._get_total_elem(
|
||||
tzutils.utc_to_local(begin),
|
||||
tzutils.utc_to_local(end),
|
||||
groupby,
|
||||
series_groupby,
|
||||
point))
|
||||
output = self._conn.process_total(
|
||||
total, begin, end, groupby, custom_fields, filters)
|
||||
|
||||
groupby = _sanitized_groupby(groupby)
|
||||
if groupby:
|
||||
output.sort(key=lambda x: [x[group] for group in groupby])
|
||||
output.sort(key=lambda x: [x[group] or "" for group in groupby])
|
||||
|
||||
return {
|
||||
'total': len(output),
|
||||
|
@ -90,7 +90,7 @@ class FakeInfluxClient(InfluxClient):
|
||||
return target_serie
|
||||
|
||||
def get_total(self, types, begin, end, custom_fields, groupby=None,
|
||||
filters=None):
|
||||
filters=None, limit=None):
|
||||
total = copy.deepcopy(self.total_sample)
|
||||
series = []
|
||||
|
||||
|
@ -209,3 +209,233 @@ class TestInfluxClient(unittest.TestCase):
|
||||
self._storage.delete(end=datetime(2019, 1, 2))
|
||||
m.assert_called_once_with("""DELETE FROM "dataframes" WHERE """
|
||||
"""time < '2019-01-02T00:00:00';""")
|
||||
|
||||
def test_process_total(self):
|
||||
begin = datetime(2019, 1, 2, 10)
|
||||
end = datetime(2019, 1, 2, 11)
|
||||
groupby = ['valA', 'time']
|
||||
points_1 = [
|
||||
{
|
||||
'qty': 42,
|
||||
'price': 1.0,
|
||||
'time': begin.isoformat()
|
||||
}
|
||||
]
|
||||
series_groupby_1 = {
|
||||
'valA': '1'
|
||||
}
|
||||
points_2 = [
|
||||
{
|
||||
'qty': 12,
|
||||
'price': 2.0,
|
||||
'time': begin.isoformat()
|
||||
}
|
||||
]
|
||||
series_groupby_2 = {
|
||||
'valA': '2'
|
||||
}
|
||||
points_3 = [
|
||||
{
|
||||
'qty': None,
|
||||
'price': None,
|
||||
'time': None
|
||||
}
|
||||
]
|
||||
series_groupby_3 = {
|
||||
'valA': None
|
||||
}
|
||||
series_name = 'dataframes'
|
||||
items = [((series_name, series_groupby_1), points_1),
|
||||
((series_name, series_groupby_2), points_2),
|
||||
((series_name, series_groupby_3), points_3)]
|
||||
total = FakeResultSet(items=items)
|
||||
result = self.client.process_total(total=total, begin=begin, end=end,
|
||||
groupby=groupby)
|
||||
expected = [{'begin': tzutils.utc_to_local(begin),
|
||||
'end': tzutils.utc_to_local(end),
|
||||
'qty': 42,
|
||||
'price': 1.0,
|
||||
'valA': '1'},
|
||||
{'begin': tzutils.utc_to_local(begin),
|
||||
'end': tzutils.utc_to_local(end),
|
||||
'qty': 12,
|
||||
'price': 2.0,
|
||||
'valA': '2'}
|
||||
]
|
||||
self.assertEqual(expected, result)
|
||||
|
||||
|
||||
class TestInfluxClientV2(unittest.TestCase):
|
||||
|
||||
@mock.patch('cloudkitty.storage.v2.influx.InfluxDBClient')
|
||||
def setUp(self, client_mock):
|
||||
self.period_begin = tzutils.local_to_utc(
|
||||
tzutils.get_month_start())
|
||||
self.period_end = tzutils.local_to_utc(
|
||||
tzutils.get_next_month())
|
||||
self.client = influx.InfluxClientV2()
|
||||
|
||||
@mock.patch('cloudkitty.storage.v2.influx.requests')
|
||||
def test_query(self, mock_request):
|
||||
static_vals = ['', 'result', 'table', '_start', '_value']
|
||||
custom_fields = 'last(f1) AS f1, last(f2) AS f2, last(f3) AS f3'
|
||||
groups = ['g1', 'g2', 'g3']
|
||||
data = [
|
||||
static_vals + groups,
|
||||
['', 'f1', 0, 1, 1, 1, 2, 3],
|
||||
['', 'f2', 0, 1, 2, 1, 2, 3],
|
||||
['', 'f3', 0, 1, 3, 1, 2, 3],
|
||||
static_vals + groups,
|
||||
['', 'f1', 0, 1, 3, 3, 1, 2],
|
||||
['', 'f2', 0, 1, 1, 3, 1, 2],
|
||||
['', 'f3', 0, 1, 2, 3, 1, 2],
|
||||
static_vals + groups,
|
||||
['', 'f1', 0, 1, 2, 2, 3, 1],
|
||||
['', 'f2', 0, 1, 3, 2, 3, 1],
|
||||
['', 'f3', 0, 1, 1, 2, 3, 1]
|
||||
]
|
||||
|
||||
expected_value = [
|
||||
{'f1': 1.0, 'f2': 2.0, 'f3': 3.0, 'begin': self.period_begin,
|
||||
'end': self.period_end, 'g1': '1', 'g2': '2', 'g3': '3'},
|
||||
{'f1': 3.0, 'f2': 1.0, 'f3': 2.0, 'begin': self.period_begin,
|
||||
'end': self.period_end, 'g1': '3', 'g2': '1', 'g3': '2'},
|
||||
{'f1': 2.0, 'f2': 3.0, 'f3': 1.0, 'begin': self.period_begin,
|
||||
'end': self.period_end, 'g1': '2', 'g2': '3', 'g3': '1'}
|
||||
]
|
||||
|
||||
data_csv = '\n'.join([','.join(map(str, d)) for d in data])
|
||||
mock_request.post.return_value = mock.Mock(text=data_csv)
|
||||
response = self.client.get_total(
|
||||
None, self.period_begin, self.period_end, custom_fields,
|
||||
filters={}, groupby=groups)
|
||||
|
||||
result = self.client.process_total(
|
||||
response, self.period_begin, self.period_end,
|
||||
groups, custom_fields, {})
|
||||
|
||||
self.assertEqual(result, expected_value)
|
||||
|
||||
def test_query_build(self):
|
||||
custom_fields = 'last(field1) AS F1, sum(field2) AS F2'
|
||||
groupby = ['group1', 'group2', 'group3']
|
||||
filters = {
|
||||
'filter1': '10',
|
||||
'filter2': 'filter2_filter'
|
||||
}
|
||||
beg = self.period_begin.isoformat()
|
||||
end = self.period_end.isoformat()
|
||||
expected = ('\n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "field1"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> last()\n'
|
||||
' |> keep(columns: ["group1", "group2",'
|
||||
' "group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "F1")\n'
|
||||
' |> yield(name: "F1")\n'
|
||||
' \n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "field2"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> sum()\n'
|
||||
' |> keep(columns: ["group1", "group2", '
|
||||
'"group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "F2")\n'
|
||||
' |> yield(name: "F2")\n'
|
||||
' ')
|
||||
|
||||
query = self.client.get_query(begin=self.period_begin,
|
||||
end=self.period_end,
|
||||
custom_fields=custom_fields,
|
||||
filters=filters,
|
||||
groupby=groupby)
|
||||
|
||||
self.assertEqual(query, expected)
|
||||
|
||||
def test_query_build_no_custom_fields(self):
|
||||
custom_fields = None
|
||||
groupby = ['group1', 'group2', 'group3']
|
||||
filters = {
|
||||
'filter1': '10',
|
||||
'filter2': 'filter2_filter'
|
||||
}
|
||||
beg = self.period_begin.isoformat()
|
||||
end = self.period_end.isoformat()
|
||||
self.maxDiff = None
|
||||
expected = ('\n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "price"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> sum()\n'
|
||||
' |> keep(columns: ["group1", "group2",'
|
||||
' "group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "price")\n'
|
||||
' |> yield(name: "price")\n'
|
||||
' \n'
|
||||
' from(bucket:"cloudkitty")\n'
|
||||
f' |> range(start: {beg}, stop: {end})\n'
|
||||
' |> filter(fn: (r) => r["_measurement"] == '
|
||||
'"dataframes")\n'
|
||||
' |> filter(fn: (r) => r["_field"] == "qty"'
|
||||
' and r.filter1==10 and r.filter2=="filter2_filter" )\n'
|
||||
' |> group(columns: ["group1","group2",'
|
||||
'"group3"])\n'
|
||||
' |> sum()\n'
|
||||
' |> keep(columns: ["group1", "group2", '
|
||||
'"group3", "_field", "_value", "_start", "_stop"])\n'
|
||||
' |> set(key: "_field", value: "qty")\n'
|
||||
' |> yield(name: "qty")\n'
|
||||
' ')
|
||||
|
||||
query = self.client.get_query(begin=self.period_begin,
|
||||
end=self.period_end,
|
||||
custom_fields=custom_fields,
|
||||
filters=filters,
|
||||
groupby=groupby)
|
||||
|
||||
self.assertEqual(query, expected)
|
||||
|
||||
def test_query_build_all_custom_fields(self):
|
||||
custom_fields = '*'
|
||||
groupby = ['group1', 'group2', 'group3']
|
||||
filters = {
|
||||
'filter1': '10',
|
||||
'filter2': 'filter2_filter'
|
||||
}
|
||||
beg = self.period_begin.isoformat()
|
||||
end = self.period_end.isoformat()
|
||||
expected = (f'''
|
||||
from(bucket:"cloudkitty")
|
||||
|> range(start: {beg}, stop: {end})
|
||||
|> filter(fn: (r) => r["_measurement"] == "dataframes")
|
||||
|> filter(fn: (r) => r.filter1==10 and r.filter2=="filter
|
||||
2_filter")
|
||||
|> group(columns: ["group1","group2","group3"])
|
||||
|> drop(columns: ["_time"])
|
||||
|> yield(name: "result")'''.replace(
|
||||
' ', '').replace('\n', '').replace('\t', ''))
|
||||
|
||||
query = self.client.get_query(begin=self.period_begin,
|
||||
end=self.period_end,
|
||||
custom_fields=custom_fields,
|
||||
filters=filters,
|
||||
groupby=groupby).replace(
|
||||
' ', '').replace('\n', '').replace('\t', '')
|
||||
|
||||
self.assertEqual(query, expected)
|
||||
|
@ -165,7 +165,7 @@ function configure_cloudkitty {
|
||||
iniset $CLOUDKITTY_CONF fetcher_keystone keystone_version 3
|
||||
fi
|
||||
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} user ${CLOUDKITTY_INFLUXDB_USER}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} password ${CLOUDKITTY_INFLUXDB_PASSWORD}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} database ${CLOUDKITTY_INFLUXDB_DATABASE}
|
||||
@ -173,6 +173,14 @@ function configure_cloudkitty {
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT}
|
||||
fi
|
||||
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_INFLUXDB_HOST}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} port ${CLOUDKITTY_INFLUXDB_PORT}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} url "http://${CLOUDKITTY_INFLUXDB_HOST}:${CLOUDKITTY_INFLUXDB_PORT}"
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} token ${CLOUDKITTY_INFLUXDB_PASSWORD}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} version 2
|
||||
fi
|
||||
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "elasticsearch" ]; then
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} host ${CLOUDKITTY_ELASTICSEARCH_HOST}
|
||||
iniset $CLOUDKITTY_CONF storage_${CLOUDKITTY_STORAGE_BACKEND} index_name ${CLOUDKITTY_ELASTICSEARCH_INDEX}
|
||||
@ -242,9 +250,13 @@ function create_cloudkitty_data_dir {
|
||||
}
|
||||
|
||||
function create_influxdb_database {
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ]; then
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then
|
||||
influx -execute "CREATE DATABASE ${CLOUDKITTY_INFLUXDB_DATABASE}"
|
||||
fi
|
||||
if [ "$CLOUDKITTY_STORAGE_BACKEND" == "influxdb" ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then
|
||||
influx setup --username ${CLOUDKITTY_INFLUXDB_USER} --password ${CLOUDKITTY_INFLUXDB_PASSWORD} --token ${CLOUDKITTY_INFLUXDB_PASSWORD} --org openstack --bucket cloudkitty --force
|
||||
fi
|
||||
|
||||
}
|
||||
|
||||
function create_elasticsearch_index {
|
||||
@ -296,11 +308,27 @@ function install_influx_ubuntu {
|
||||
sudo dpkg -i --skip-same-version ${influxdb_file}
|
||||
}
|
||||
|
||||
function install_influx_v2_ubuntu {
|
||||
local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2_2.7.5-1_amd64.deb)
|
||||
sudo dpkg -i --skip-same-version ${influxdb_file}
|
||||
local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz)
|
||||
tar xvzf ${influxcli_file}
|
||||
sudo cp ./influx /usr/local/bin/
|
||||
}
|
||||
|
||||
function install_influx_fedora {
|
||||
local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb-1.6.3.x86_64.rpm)
|
||||
sudo yum localinstall -y ${influxdb_file}
|
||||
}
|
||||
|
||||
function install_influx_v2_fedora {
|
||||
local influxdb_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-2.7.5-1.x86_64.rpm)
|
||||
sudo yum localinstall -y ${influxdb_file}
|
||||
local influxcli_file=$(get_extra_file https://dl.influxdata.com/influxdb/releases/influxdb2-client-2.7.3-linux-amd64.tar.gz)
|
||||
tar xvzf ${influxcli_file}
|
||||
sudo cp ./influx /usr/local/bin/
|
||||
}
|
||||
|
||||
function install_influx {
|
||||
if is_ubuntu; then
|
||||
install_influx_ubuntu
|
||||
@ -313,6 +341,19 @@ function install_influx {
|
||||
sudo systemctl start influxdb || sudo systemctl restart influxdb
|
||||
}
|
||||
|
||||
|
||||
function install_influx_v2 {
|
||||
if is_ubuntu; then
|
||||
install_influx_v2_ubuntu
|
||||
elif is_fedora; then
|
||||
install_influx_v2_fedora
|
||||
else
|
||||
die $LINENO "Distribution must be Debian or Fedora-based"
|
||||
fi
|
||||
sudo cp -f "${CLOUDKITTY_DIR}"/devstack/files/influxdb.conf /etc/influxdb/influxdb.conf
|
||||
sudo systemctl start influxdb || sudo systemctl restart influxdb
|
||||
}
|
||||
|
||||
function install_elasticsearch_ubuntu {
|
||||
local opensearch_file=$(get_extra_file https://artifacts.opensearch.org/releases/bundle/opensearch/1.3.9/opensearch-1.3.9-linux-x64.deb)
|
||||
sudo dpkg -i --skip-same-version ${opensearch_file}
|
||||
@ -367,9 +408,10 @@ function install_opensearch {
|
||||
function install_cloudkitty {
|
||||
git_clone $CLOUDKITTY_REPO $CLOUDKITTY_DIR $CLOUDKITTY_BRANCH
|
||||
setup_develop $CLOUDKITTY_DIR
|
||||
|
||||
if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ]; then
|
||||
if [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 1 ]; then
|
||||
install_influx
|
||||
elif [ $CLOUDKITTY_STORAGE_BACKEND == 'influxdb' ] && [ "$CLOUDKITTY_INFLUX_VERSION" == 2 ]; then
|
||||
install_influx_v2
|
||||
elif [ $CLOUDKITTY_STORAGE_BACKEND == 'elasticsearch' ]; then
|
||||
install_elasticsearch
|
||||
elif [ $CLOUDKITTY_STORAGE_BACKEND == 'opensearch' ]; then
|
||||
|
@ -50,6 +50,7 @@ CLOUDKITTY_METRICS_CONF=metrics.yml
|
||||
# Set CloudKitty storage info
|
||||
CLOUDKITTY_STORAGE_BACKEND=${CLOUDKITTY_STORAGE_BACKEND:-"influxdb"}
|
||||
CLOUDKITTY_STORAGE_VERSION=${CLOUDKITTY_STORAGE_VERSION:-"2"}
|
||||
CLOUDKITTY_INFLUX_VERSION=${CLOUDKITTY_INFLUX_VERSION:-1}
|
||||
|
||||
# Set CloudKitty output info
|
||||
CLOUDKITTY_OUTPUT_BACKEND=${CLOUDKITTY_OUTPUT_BACKEND:-"cloudkitty.backend.file.FileBackend"}
|
||||
|
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
Add support to Influx v2 database as storage backend.
|
@ -33,6 +33,7 @@ stevedore>=3.2.2 # Apache-2.0
|
||||
tooz>=2.7.1 # Apache-2.0
|
||||
voluptuous>=0.12.0 # BSD License
|
||||
influxdb>=5.3.1 # MIT
|
||||
influxdb-client>=1.36.0 # MIT
|
||||
Flask>=2.0.0 # BSD
|
||||
Flask-RESTful>=0.3.9 # BSD
|
||||
cotyledon>=1.7.3 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user