diff --git a/cloudkitty/storage/v2/influx.py b/cloudkitty/storage/v2/influx.py index bb43e58f..602dc1e8 100644 --- a/cloudkitty/storage/v2/influx.py +++ b/cloudkitty/storage/v2/influx.py @@ -62,6 +62,9 @@ influx_storage_opts = [ CONF.register_opts(influx_storage_opts, INFLUX_STORAGE_GROUP) +PERIOD_FIELD_NAME = '__ck_collect_period' + + class InfluxClient(object): """Classe used to ease interaction with InfluxDB""" @@ -110,14 +113,17 @@ class InfluxClient(object): def append_point(self, metric_type, - timestamp, + start, + period, point): """Adds a point to commit to InfluxDB. :param metric_type: Name of the metric type :type metric_type: str - :param timestamp: Timestamp of the time - :type timestamp: datetime.datetime + :param start: Start of the period the point applies to + :type start: datetime.datetime + :param period: length of the period the point applies to (in seconds) + :type period: int :param point: Point to push :type point: dataframe.DataPoint """ @@ -131,6 +137,8 @@ class InfluxClient(object): measurement_fields['groupby'] = '|'.join(point.groupby.keys()) measurement_fields['metadata'] = '|'.join(point.metadata.keys()) + measurement_fields[PERIOD_FIELD_NAME] = period + measurement_tags = dict(point.groupby) measurement_tags['type'] = metric_type @@ -138,7 +146,7 @@ class InfluxClient(object): 'measurement': 'dataframes', 'tags': measurement_tags, 'fields': measurement_fields, - 'time': timestamp, + 'time': start, }) if self._autocommit and len(self._points) >= self._chunk_size: self.commit() @@ -234,8 +242,8 @@ class InfluxStorage(v2_storage.BaseStorage): def __init__(self, *args, **kwargs): super(InfluxStorage, self).__init__(*args, **kwargs) + self._default_period = kwargs.get('period') or CONF.collect.period self._conn = InfluxClient() - self._period = kwargs.get('period', None) or CONF.collect.period def init(self): policy = CONF.storage_influxdb.retention_policy @@ -249,9 +257,9 @@ class InfluxStorage(v2_storage.BaseStorage): def push(self, dataframes, scope_id=None): for frame in dataframes: - timestamp = frame.start + period = tzutils.diff_seconds(frame.end, frame.start) for type_, point in frame.iterpoints(): - self._conn.append_point(type_, timestamp, point) + self._conn.append_point(type_, frame.start, period, point) self._conn.commit() @@ -265,10 +273,8 @@ class InfluxStorage(v2_storage.BaseStorage): @staticmethod def _point_to_dataframe_entry(point): - groupby = filter(lambda x: bool(x), - (point.pop('groupby', None) or '').split('|')) - metadata = filter(lambda x: bool(x), - (point.pop('metadata', None) or '').split('|')) + groupby = filter(bool, (point.pop('groupby', None) or '').split('|')) + metadata = filter(bool, (point.pop('metadata', None) or '').split('|')) return dataframe.DataPoint( point['unit'], point['qty'], @@ -282,18 +288,20 @@ class InfluxStorage(v2_storage.BaseStorage): for point in points: point_type = point['type'] time = tzutils.dt_from_iso(point['time']) - if time not in dataframes.keys(): - dataframes[time] = dataframe.DataFrame( - start=time, - end=tzutils.add_delta( - time, datetime.timedelta(seconds=self._period)), - ) + period = point.get(PERIOD_FIELD_NAME) or self._default_period + timekey = ( + time, + tzutils.add_delta(time, datetime.timedelta(seconds=period))) + if timekey not in dataframes.keys(): + dataframes[timekey] = dataframe.DataFrame( + start=timekey[0], + end=timekey[1]) - dataframes[time].add_point( + dataframes[timekey].add_point( self._point_to_dataframe_entry(point), point_type) output = list(dataframes.values()) - output.sort(key=lambda frame: frame.start) + output.sort(key=lambda frame: (frame.start, frame.end)) return output def retrieve(self, begin=None, end=None, diff --git a/cloudkitty/tests/storage/v2/test_influxdb.py b/cloudkitty/tests/storage/v2/test_influxdb.py index 2151e584..0992e2a0 100644 --- a/cloudkitty/tests/storage/v2/test_influxdb.py +++ b/cloudkitty/tests/storage/v2/test_influxdb.py @@ -13,9 +13,12 @@ # under the License. # import collections +import copy from datetime import datetime +from datetime import timedelta import unittest +from dateutil import tz import mock from cloudkitty import dataframe @@ -29,6 +32,7 @@ class TestInfluxDBStorage(TestCase): def setUp(self): super(TestInfluxDBStorage, self).setUp() self.point = { + 'type': 'amazing_type', 'unit': 'banana', 'qty': 42, 'price': 1.0, @@ -38,6 +42,7 @@ class TestInfluxDBStorage(TestCase): 'two': '2', '1': 'one', '2': 'two', + 'time': datetime(2019, 1, 1, tzinfo=tz.UTC).isoformat(), } def test_point_to_dataframe_entry_valid_point(self): @@ -53,10 +58,11 @@ class TestInfluxDBStorage(TestCase): ) def test_point_to_dataframe_entry_invalid_groupby_metadata(self): - self.point['groupby'] = 'a' - self.point['metadata'] = None + point = copy.deepcopy(self.point) + point['groupby'] = 'a' + point['metadata'] = None self.assertEqual( - influx.InfluxStorage._point_to_dataframe_entry(self.point), + influx.InfluxStorage._point_to_dataframe_entry(point), dataframe.DataPoint( 'banana', 42, @@ -66,6 +72,25 @@ class TestInfluxDBStorage(TestCase): ), ) + def test_build_dataframes_differenciates_periods(self): + points = [copy.deepcopy(self.point) for _ in range(3)] + for idx, point in enumerate(points): + point[influx.PERIOD_FIELD_NAME] = 100 * (idx + 1) + + dataframes = influx.InfluxStorage()._build_dataframes(points) + self.assertEqual(len(dataframes), 3) + + for idx, frame in enumerate(dataframes): + self.assertEqual(frame.start, datetime(2019, 1, 1, tzinfo=tz.UTC)) + delta = timedelta(seconds=(idx + 1) * 100) + self.assertEqual(frame.end, + datetime(2019, 1, 1, tzinfo=tz.UTC) + delta) + typelist = list(frame.itertypes()) + self.assertEqual(len(typelist), 1) + type_, points = typelist[0] + self.assertEqual(len(points), 1) + self.assertEqual(type_, 'amazing_type') + class FakeResultSet(object): def __init__(self, points=[], items=[]): diff --git a/cloudkitty/tests/test_tzutils.py b/cloudkitty/tests/test_tzutils.py index f5c78110..89bc6d56 100644 --- a/cloudkitty/tests/test_tzutils.py +++ b/cloudkitty/tests/test_tzutils.py @@ -101,3 +101,30 @@ class TestTZUtils(unittest.TestCase): month_start = tzutils.get_month_start(param, naive=True) self.assertIsNone(month_start.tzinfo) self.assertEqual(month_start, datetime.datetime(2019, 1, 1)) + + def test_diff_seconds_positive_arg_naive_objects(self): + one = datetime.datetime(2019, 1, 1, 1, 1, 30) + two = datetime.datetime(2019, 1, 1, 1, 1) + self.assertEqual(tzutils.diff_seconds(one, two), 30) + + def test_diff_seconds_negative_arg_naive_objects(self): + one = datetime.datetime(2019, 1, 1, 1, 1, 30) + two = datetime.datetime(2019, 1, 1, 1, 1) + self.assertEqual(tzutils.diff_seconds(two, one), 30) + + def test_diff_seconds_positive_arg_aware_objects(self): + one = datetime.datetime(2019, 1, 1, 1, 1, 30, tzinfo=tz.UTC) + two = datetime.datetime(2019, 1, 1, 1, 1, tzinfo=tz.UTC) + self.assertEqual(tzutils.diff_seconds(one, two), 30) + + def test_diff_seconds_negative_arg_aware_objects(self): + one = datetime.datetime(2019, 1, 1, 1, 1, 30, tzinfo=tz.UTC) + two = datetime.datetime(2019, 1, 1, 1, 1, tzinfo=tz.UTC) + self.assertEqual(tzutils.diff_seconds(two, one), 30) + + def test_diff_seconds_negative_arg_aware_objects_on_summer_change(self): + one = datetime.datetime(2019, 3, 31, 1, + tzinfo=tz.gettz('Europe/Paris')) + two = datetime.datetime(2019, 3, 31, 3, + tzinfo=tz.gettz('Europe/Paris')) + self.assertEqual(tzutils.diff_seconds(two, one), 3600) diff --git a/cloudkitty/tzutils.py b/cloudkitty/tzutils.py index 488d2460..bd55e2d6 100644 --- a/cloudkitty/tzutils.py +++ b/cloudkitty/tzutils.py @@ -143,3 +143,19 @@ def get_next_month(dt=None, naive=False): start = get_month_start(dt, naive=naive) month_days = calendar.monthrange(start.year, start.month)[1] return add_delta(start, datetime.timedelta(days=month_days)) + + +def diff_seconds(one, two): + """Returns the difference in seconds between two datetime objects. + + Objects will be converted to naive UTC objects before calculating the + difference. The return value is the absolute value of the difference. + + :param one: First datetime object + :type one: datetime.datetime + :param two: datetime object to substract from the first one + :type two: datetime.datetime + :rtype: int + """ + return abs(int((local_to_utc(one, naive=True) + - local_to_utc(two, naive=True)).total_seconds()))