Store collect period in InfluxDB driver datapoints

This adds a field containing the cloudkitty period used when pushing data
into InfluxDB. Work items:

* Store the period along with the begin timestamp in the InfluxDB storage.

* Added a "diff_seconds()" function to "cloudkitty.tzutils". It calculates
  the difference in seconds between two "datetime.datetime" objects.

* Added unit tests for the "diff_seconds()" function.

Change-Id: I4af7adaf5044da2e0a44bbb30ba4a24742cd8a94
This commit is contained in:
Luka Peschke
2019-09-05 16:39:57 +02:00
parent 492ec063a7
commit 54ede80601
4 changed files with 98 additions and 22 deletions

View File

@@ -62,6 +62,9 @@ influx_storage_opts = [
CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP)
PERIOD_FIELD_NAME = '__ck_collect_period'
class InfluxClient(object):
"""Classe used to ease interaction with InfluxDB"""
@@ -110,14 +113,17 @@ class InfluxClient(object):
def append_point(self,
metric_type,
timestamp,
start,
period,
point):
"""Adds a point to commit to InfluxDB.
:param metric_type: Name of the metric type
:type metric_type: str
:param timestamp: Timestamp of the time
:type timestamp: datetime.datetime
:param start: Start of the period the point applies to
:type start: datetime.datetime
:param period: length of the period the point applies to (in seconds)
:type period: int
:param point: Point to push
:type point: dataframe.DataPoint
"""
@@ -131,6 +137,8 @@ class InfluxClient(object):
measurement_fields['groupby'] = '|'.join(point.groupby.keys())
measurement_fields['metadata'] = '|'.join(point.metadata.keys())
measurement_fields[PERIOD_FIELD_NAME] = period
measurement_tags = dict(point.groupby)
measurement_tags['type'] = metric_type
@@ -138,7 +146,7 @@ class InfluxClient(object):
'measurement': 'dataframes',
'tags': measurement_tags,
'fields': measurement_fields,
'time': timestamp,
'time': start,
})
if self._autocommit and len(self._points) >= self._chunk_size:
self.commit()
@@ -234,8 +242,8 @@ class InfluxStorage(v2_storage.BaseStorage):
def __init__(self, *args, **kwargs):
super(InfluxStorage, self).__init__(*args, **kwargs)
self._default_period = kwargs.get('period') or CONF.collect.period
self._conn = InfluxClient()
self._period = kwargs.get('period', None) or CONF.collect.period
def init(self):
policy = CONF.storage_influxdb.retention_policy
@@ -249,9 +257,9 @@ class InfluxStorage(v2_storage.BaseStorage):
def push(self, dataframes, scope_id=None):
for frame in dataframes:
timestamp = frame.start
period = tzutils.diff_seconds(frame.end, frame.start)
for type_, point in frame.iterpoints():
self._conn.append_point(type_, timestamp, point)
self._conn.append_point(type_, frame.start, period, point)
self._conn.commit()
@@ -265,10 +273,8 @@ class InfluxStorage(v2_storage.BaseStorage):
@staticmethod
def _point_to_dataframe_entry(point):
groupby = filter(lambda x: bool(x),
(point.pop('groupby', None) or '').split('|'))
metadata = filter(lambda x: bool(x),
(point.pop('metadata', None) or '').split('|'))
groupby = filter(bool, (point.pop('groupby', None) or '').split('|'))
metadata = filter(bool, (point.pop('metadata', None) or '').split('|'))
return dataframe.DataPoint(
point['unit'],
point['qty'],
@@ -282,18 +288,20 @@ class InfluxStorage(v2_storage.BaseStorage):
for point in points:
point_type = point['type']
time = tzutils.dt_from_iso(point['time'])
if time not in dataframes.keys():
dataframes[time] = dataframe.DataFrame(
start=time,
end=tzutils.add_delta(
time, datetime.timedelta(seconds=self._period)),
)
period = point.get(PERIOD_FIELD_NAME) or self._default_period
timekey = (
time,
tzutils.add_delta(time, datetime.timedelta(seconds=period)))
if timekey not in dataframes.keys():
dataframes[timekey] = dataframe.DataFrame(
start=timekey[0],
end=timekey[1])
dataframes[time].add_point(
dataframes[timekey].add_point(
self._point_to_dataframe_entry(point), point_type)
output = list(dataframes.values())
output.sort(key=lambda frame: frame.start)
output.sort(key=lambda frame: (frame.start, frame.end))
return output
def retrieve(self, begin=None, end=None,

View File

@@ -13,9 +13,12 @@
# under the License.
#
import collections
import copy
from datetime import datetime
from datetime import timedelta
import unittest
from dateutil import tz
import mock
from cloudkitty import dataframe
@@ -29,6 +32,7 @@ class TestInfluxDBStorage(TestCase):
def setUp(self):
super(TestInfluxDBStorage, self).setUp()
self.point = {
'type': 'amazing_type',
'unit': 'banana',
'qty': 42,
'price': 1.0,
@@ -38,6 +42,7 @@ class TestInfluxDBStorage(TestCase):
'two': '2',
'1': 'one',
'2': 'two',
'time': datetime(2019, 1, 1, tzinfo=tz.UTC).isoformat(),
}
def test_point_to_dataframe_entry_valid_point(self):
@@ -53,10 +58,11 @@ class TestInfluxDBStorage(TestCase):
)
def test_point_to_dataframe_entry_invalid_groupby_metadata(self):
self.point['groupby'] = 'a'
self.point['metadata'] = None
point = copy.deepcopy(self.point)
point['groupby'] = 'a'
point['metadata'] = None
self.assertEqual(
influx.InfluxStorage._point_to_dataframe_entry(self.point),
influx.InfluxStorage._point_to_dataframe_entry(point),
dataframe.DataPoint(
'banana',
42,
@@ -66,6 +72,25 @@ class TestInfluxDBStorage(TestCase):
),
)
def test_build_dataframes_differenciates_periods(self):
points = [copy.deepcopy(self.point) for _ in range(3)]
for idx, point in enumerate(points):
point[influx.PERIOD_FIELD_NAME] = 100 * (idx + 1)
dataframes = influx.InfluxStorage()._build_dataframes(points)
self.assertEqual(len(dataframes), 3)
for idx, frame in enumerate(dataframes):
self.assertEqual(frame.start, datetime(2019, 1, 1, tzinfo=tz.UTC))
delta = timedelta(seconds=(idx + 1) * 100)
self.assertEqual(frame.end,
datetime(2019, 1, 1, tzinfo=tz.UTC) + delta)
typelist = list(frame.itertypes())
self.assertEqual(len(typelist), 1)
type_, points = typelist[0]
self.assertEqual(len(points), 1)
self.assertEqual(type_, 'amazing_type')
class FakeResultSet(object):
def __init__(self, points=[], items=[]):

View File

@@ -101,3 +101,30 @@ class TestTZUtils(unittest.TestCase):
month_start = tzutils.get_month_start(param, naive=True)
self.assertIsNone(month_start.tzinfo)
self.assertEqual(month_start, datetime.datetime(2019, 1, 1))
def test_diff_seconds_positive_arg_naive_objects(self):
one = datetime.datetime(2019, 1, 1, 1, 1, 30)
two = datetime.datetime(2019, 1, 1, 1, 1)
self.assertEqual(tzutils.diff_seconds(one, two), 30)
def test_diff_seconds_negative_arg_naive_objects(self):
one = datetime.datetime(2019, 1, 1, 1, 1, 30)
two = datetime.datetime(2019, 1, 1, 1, 1)
self.assertEqual(tzutils.diff_seconds(two, one), 30)
def test_diff_seconds_positive_arg_aware_objects(self):
one = datetime.datetime(2019, 1, 1, 1, 1, 30, tzinfo=tz.UTC)
two = datetime.datetime(2019, 1, 1, 1, 1, tzinfo=tz.UTC)
self.assertEqual(tzutils.diff_seconds(one, two), 30)
def test_diff_seconds_negative_arg_aware_objects(self):
one = datetime.datetime(2019, 1, 1, 1, 1, 30, tzinfo=tz.UTC)
two = datetime.datetime(2019, 1, 1, 1, 1, tzinfo=tz.UTC)
self.assertEqual(tzutils.diff_seconds(two, one), 30)
def test_diff_seconds_negative_arg_aware_objects_on_summer_change(self):
one = datetime.datetime(2019, 3, 31, 1,
tzinfo=tz.gettz('Europe/Paris'))
two = datetime.datetime(2019, 3, 31, 3,
tzinfo=tz.gettz('Europe/Paris'))
self.assertEqual(tzutils.diff_seconds(two, one), 3600)

View File

@@ -143,3 +143,19 @@ def get_next_month(dt=None, naive=False):
start = get_month_start(dt, naive=naive)
month_days = calendar.monthrange(start.year, start.month)[1]
return add_delta(start, datetime.timedelta(days=month_days))
def diff_seconds(one, two):
"""Returns the difference in seconds between two datetime objects.
Objects will be converted to naive UTC objects before calculating the
difference. The return value is the absolute value of the difference.
:param one: First datetime object
:type one: datetime.datetime
:param two: datetime object to substract from the first one
:type two: datetime.datetime
:rtype: int
"""
return abs(int((local_to_utc(one, naive=True)
- local_to_utc(two, naive=True)).total_seconds()))