storage: make sure we delete old measures respecting archive policy

Change-Id: If8e35fd4e97c7838a88891b7e34b7a857f94be12
This commit is contained in:
Julien Danjou 2016-02-11 14:19:13 +01:00
parent 3a905525f6
commit 7d67957e45
7 changed files with 106 additions and 14 deletions

View File

@ -94,13 +94,14 @@ How to set the archive policy and granularity
In Gnocchi, the archive policy is expressed in number of points. If your
archive policy defines a policy of 10 points with a granularity of 1 second,
the time serie archive will keep up to 10 points, each representing an
the time serie archive will keep up to 10 seconds, each representing an
aggregation over 1 second. This means the time serie will at maximum retain 10
seconds of data, **but** that does not mean it will be 10 consecutive seconds:
there might be a gap if data is fed irregularly.
seconds of data (sometimes a bit more) between the more recent point and the
oldest point. That does not mean it will be 10 consecutive seconds: there might
be a gap if data is fed irregularly.
Consequently, there is no expiry of data relative to the current timestamp, and
you cannot delete old data points (at least for now).
There is no expiry of data relative to the current timestamp. Also, you cannot
delete old data points (at least for now).
Therefore, both the archive policy and the granularity entirely depends on your
use case. Depending on the usage of your data, you can define several archiving

View File

@ -152,11 +152,17 @@ class TimeSerie(SerializableMixin):
@property
def first(self):
return self.ts.index[0]
try:
return self.ts.index[0]
except IndexError:
return
@property
def last(self):
return self.ts.index[-1]
try:
return self.ts.index[-1]
except IndexError:
return
class BoundTimeSerie(TimeSerie):

View File

@ -15,6 +15,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import datetime
import logging
import multiprocessing
import threading
@ -197,20 +198,36 @@ class CarbonaraBasedStorage(storage.StorageDriver):
sampling=granularity,
max_size=points)
def _add_measures(self, aggregation, granularity, metric, timeserie):
def _add_measures(self, aggregation, archive_policy_def,
metric, timeserie):
with timeutils.StopWatch() as sw:
ts = self._get_measures_timeserie(metric, aggregation, granularity,
ts = self._get_measures_timeserie(metric, aggregation,
archive_policy_def.granularity,
timeserie.first, timeserie.last)
LOG.debug("Retrieve measures"
"for %s/%s/%s in %.2fs"
% (metric.id, aggregation, granularity, sw.elapsed()))
% (metric.id, aggregation, archive_policy_def.
granularity, sw.elapsed()))
ts.update(timeserie)
with timeutils.StopWatch() as sw:
for key, split in ts.split():
self._store_metric_measures(metric, key, aggregation,
granularity, split.serialize())
archive_policy_def.granularity,
split.serialize())
LOG.debug("Store measures for %s/%s/%s in %.2fs"
% (metric.id, aggregation, granularity, sw.elapsed()))
% (metric.id, aggregation,
archive_policy_def.granularity, sw.elapsed()))
if ts.last and archive_policy_def.timespan:
with timeutils.StopWatch() as sw:
oldest_point_to_keep = ts.last - datetime.timedelta(
seconds=archive_policy_def.timespan)
self._delete_metric_measures_before(
metric, aggregation, archive_policy_def.granularity,
oldest_point_to_keep)
LOG.debug("Expire measures for %s/%s/%s in %.2fs"
% (metric.id, aggregation,
archive_policy_def.granularity, sw.elapsed()))
def add_measures(self, metric, measures):
self._store_measures(metric, msgpackutils.dumps(
@ -239,6 +256,26 @@ class CarbonaraBasedStorage(storage.StorageDriver):
self._delete_metric_archives(metric)
self._delete_metric(metric)
def _delete_metric_measures_before(self, metric, aggregation_method,
granularity, timestamp):
"""Delete measures for a metric before a timestamp."""
ts = carbonara.AggregatedTimeSerie.get_split_key(
timestamp, granularity)
for key in self._list_split_keys_for_metric(
metric, aggregation_method, granularity):
# NOTE(jd) Only delete if the key is strictly inferior to
# the timestamp; we don't delete any timeserie split that
# contains our timestamp, so we prefer to keep a bit more
# than deleting too much
if key < ts:
self._delete_metric_measures(
metric, key, aggregation_method, granularity)
@staticmethod
def _delete_metric_measures(metric, timestamp_key,
aggregation, granularity):
raise NotImplementedError
@staticmethod
def _unserialize_measures(data):
return msgpackutils.loads(data)
@ -357,8 +394,7 @@ class CarbonaraBasedStorage(storage.StorageDriver):
def _map_add_measures(bound_timeserie):
self._map_in_thread(
self._add_measures,
((aggregation, d.granularity,
metric, bound_timeserie)
((aggregation, d, metric, bound_timeserie)
for aggregation in agg_methods
for d in metric.archive_policy.definition))

View File

@ -195,6 +195,14 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
ioctx.write_full(name, data)
ioctx.set_xattr("gnocchi_%s_container" % metric.id, name, "")
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
granularity):
name = self._get_object_name(metric, timestamp_key,
aggregation, granularity)
with self._get_ioctx() as ioctx:
ioctx.rm_xattr("gnocchi_%s_container" % metric.id, name)
ioctx.remove_object(name)
def _delete_metric(self, metric):
with self._get_ioctx() as ioctx:
try:

View File

@ -222,6 +222,11 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
keys.append(key)
return keys
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
granularity):
os.unlink(self._build_metric_path_for_split(
metric, aggregation, timestamp_key, granularity))
def _store_metric_measures(self, metric, timestamp_key, aggregation,
granularity, data):
self._atomic_file_store(

View File

@ -175,6 +175,12 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
self._object_name(timestamp_key, aggregation, granularity),
data)
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
granularity):
self.swift.delete_object(
self._container_name(metric),
self._object_name(timestamp_key, aggregation, granularity))
def _delete_metric(self, metric):
self._delete_unaggregated_timeserie(metric)
container = self._container_name(metric)

View File

@ -133,6 +133,36 @@ class TestStorageDriver(tests_base.TestCase):
count += 1
self.assertEqual(1, count)
def test_delete_old_measures(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 7, 31), 42),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 9, 31), 4),
storage.Measure(datetime.datetime(2014, 1, 1, 12, 12, 45), 44),
])
self.storage.process_background_tasks(self.index, sync=True)
self.assertEqual([
(utils.datetime_utc(2014, 1, 1), 86400.0, 39.75),
(utils.datetime_utc(2014, 1, 1, 12), 3600.0, 39.75),
(utils.datetime_utc(2014, 1, 1, 12), 300.0, 69.0),
(utils.datetime_utc(2014, 1, 1, 12, 5), 300.0, 23.0),
(utils.datetime_utc(2014, 1, 1, 12, 10), 300.0, 44.0),
], self.storage.get_measures(self.metric))
# One year later…
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2015, 1, 1, 12, 0, 1), 69),
])
self.storage.process_background_tasks(self.index, sync=True)
self.assertEqual([
(utils.datetime_utc(2014, 1, 1), 86400.0, 39.75),
(utils.datetime_utc(2015, 1, 1), 86400.0, 69),
(utils.datetime_utc(2015, 1, 1, 12), 3600.0, 69),
(utils.datetime_utc(2015, 1, 1, 12), 300.0, 69),
], self.storage.get_measures(self.metric))
def test_add_and_get_measures(self):
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2014, 1, 1, 12, 0, 1), 69),