From 3bdfaeb67e7041a0ea541143bd3cfc697d8ab5d4 Mon Sep 17 00:00:00 2001 From: gordon chung Date: Wed, 30 Mar 2016 09:34:40 -0400 Subject: [PATCH] resample only data affected by new measures currently, we are passing in the full unaggregated timeserie when we update each of the aggregates for each granularity. this is incorrect as the unaggregated timeserie corresponds to the largest granularity of the metric. this means that for smaller granularities, we are updating and resampling points that are already aggregated and not affected by new incoming measures. Change-Id: I687c2a18b332494f5c5cb7fdfe6f2b3d1e8de804 Closes-Bug: #1562820 --- gnocchi/carbonara.py | 14 +++++++------- gnocchi/storage/_carbonara.py | 12 +++++++++++- gnocchi/tests/test_storage.py | 35 ++++++++++++++++++++++++++++------- 3 files changed, 46 insertions(+), 15 deletions(-) diff --git a/gnocchi/carbonara.py b/gnocchi/carbonara.py index 3bc51996..977443fe 100644 --- a/gnocchi/carbonara.py +++ b/gnocchi/carbonara.py @@ -162,7 +162,7 @@ class TimeSerie(SerializableMixin): return value.nanos / 10e8 @staticmethod - def _round_timestamp(ts, freq): + def round_timestamp(ts, freq): return pandas.Timestamp( (pandas.Timestamp(ts).value // freq) * freq) @@ -223,8 +223,8 @@ class BoundTimeSerie(TimeSerie): def set_values(self, values, before_truncate_callback=None, ignore_too_old_timestamps=False): + # NOTE: values must be sorted when passed in. if self.block_size is not None and not self.ts.empty: - values = sorted(values, key=operator.itemgetter(0)) first_block_timestamp = self._first_block_timestamp() if ignore_too_old_timestamps: for index, (timestamp, value) in enumerate(values): @@ -268,8 +268,8 @@ class BoundTimeSerie(TimeSerie): return basic def _first_block_timestamp(self): - rounded = self._round_timestamp(self.ts.index[-1], - self.block_size.delta.value) + rounded = self.round_timestamp(self.ts.index[-1], + self.block_size.delta.value) return rounded - (self.block_size * self.back_window) @@ -323,7 +323,7 @@ class AggregatedTimeSerie(TimeSerie): @classmethod def get_split_key_datetime(cls, timestamp, sampling): - return cls._round_timestamp( + return cls.round_timestamp( timestamp, freq=sampling * cls.POINTS_PER_SPLIT * 10e8) @staticmethod @@ -447,7 +447,7 @@ class AggregatedTimeSerie(TimeSerie): # Group by the sampling, and then apply the aggregation method on # the points after `after' groupedby = self.ts[after:].groupby( - functools.partial(self._round_timestamp, + functools.partial(self.round_timestamp, freq=self.sampling * 10e8)) agg_func = getattr(groupedby, self.aggregation_method_func_name) if self.aggregation_method_func_name == 'quantile': @@ -469,7 +469,7 @@ class AggregatedTimeSerie(TimeSerie): if from_timestamp is None: from_ = None else: - from_ = self._round_timestamp(from_timestamp, self.sampling * 10e8) + from_ = self.round_timestamp(from_timestamp, self.sampling * 10e8) points = self[from_:to_timestamp] try: # Do not include stop timestamp diff --git a/gnocchi/storage/_carbonara.py b/gnocchi/storage/_carbonara.py index e1b33851..72b97119 100644 --- a/gnocchi/storage/_carbonara.py +++ b/gnocchi/storage/_carbonara.py @@ -18,6 +18,7 @@ import collections import datetime import logging import multiprocessing +import operator import threading import time import uuid @@ -342,6 +343,7 @@ class CarbonaraBasedStorage(storage.StorageDriver): % metric) continue + measures = sorted(measures, key=operator.itemgetter(0)) try: with timeutils.StopWatch() as sw: raw_measures = ( @@ -380,9 +382,17 @@ class CarbonaraBasedStorage(storage.StorageDriver): back_window=metric.archive_policy.back_window) def _map_add_measures(bound_timeserie): + # NOTE (gordc): bound_timeserie is entire set of + # unaggregated measures matching largest + # granularity. the following takes only the points + # affected by new measures for specific granularity + tstamp = max(bound_timeserie.first, measures[0][0]) self._map_in_thread( self._add_measures, - ((aggregation, d, metric, bound_timeserie) + ((aggregation, d, metric, + carbonara.TimeSerie(bound_timeserie.ts[ + carbonara.TimeSerie.round_timestamp( + tstamp, d.granularity * 10e8):])) for aggregation in agg_methods for d in metric.archive_policy.definition)) diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index c7f80eca..ede191d7 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -21,6 +21,7 @@ from oslo_utils import timeutils from oslotest import base import six.moves +from gnocchi import carbonara from gnocchi import storage from gnocchi.storage import _carbonara from gnocchi.storage import null @@ -112,27 +113,47 @@ class TestStorageDriver(tests_base.TestCase): self.assertEqual(3661, len(self.storage.get_measures(m))) @mock.patch('gnocchi.carbonara.AggregatedTimeSerie.POINTS_PER_SPLIT', 48) - def test_add_measures_big_update_subset(self): + def test_add_measures_update_subset_split(self): m, m_sql = self._create_metric('medium') measures = [ - storage.Measure(datetime.datetime(2014, 1, i, j, 0, 0), 100) - for i in six.moves.range(1, 6) for j in six.moves.range(0, 24)] - measures.append( - storage.Measure(datetime.datetime(2014, 1, 6, 0, 0, 0), 100)) + storage.Measure(datetime.datetime(2014, 1, 6, i, j, 0), 100) + for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)] self.storage.add_measures(m, measures) self.storage.process_background_tasks(self.index, sync=True) + # add measure to end, in same aggregate time as last point. self.storage.add_measures(m, [ - storage.Measure(datetime.datetime(2014, 1, 6, 1, 0, 0), 100)]) + storage.Measure(datetime.datetime(2014, 1, 6, 1, 58, 1), 100)]) with mock.patch.object(self.storage, '_store_metric_measures') as c: + # should only resample last aggregate self.storage.process_background_tasks(self.index, sync=True) count = 0 for call in c.mock_calls: - if mock.call(m_sql, mock.ANY, 'mean', 3600.0, mock.ANY) == call: + # policy is 60 points and split is 48. should only update 2nd half + if mock.call(m_sql, mock.ANY, 'mean', 60.0, mock.ANY) == call: count += 1 self.assertEqual(1, count) + def test_add_measures_update_subset(self): + m, m_sql = self._create_metric('medium') + measures = [ + storage.Measure(datetime.datetime(2014, 1, 6, i, j, 0), 100) + for i in six.moves.range(2) for j in six.moves.range(0, 60, 2)] + self.storage.add_measures(m, measures) + self.storage.process_background_tasks(self.index, sync=True) + + # add measure to end, in same aggregate time as last point. + new_point = datetime.datetime(2014, 1, 6, 1, 58, 1) + self.storage.add_measures(m, [storage.Measure(new_point, 100)]) + + with mock.patch.object(self.storage, '_add_measures') as c: + self.storage.process_background_tasks(self.index, sync=True) + for __, args, __ in c.mock_calls: + self.assertEqual( + args[3].first, carbonara.TimeSerie.round_timestamp( + new_point, args[1].granularity * 10e8)) + def test_delete_old_measures(self): self.storage.add_measures(self.metric, [ storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),