From c74c5c9289956f0aa73eed4146ca5f3f2f6407f2 Mon Sep 17 00:00:00 2001 From: Julien Danjou Date: Fri, 19 Aug 2016 20:41:47 +0200 Subject: [PATCH] carbonara: compress non padded timeseries This creates 2 distinct formats available when serializing AggregatedTimeSerie: once is "padded" so it can be written based on offset (Ceph) and the other one is "compressed" so it can be written to object stores (Swift, file). This compressed format is also used to store timeseries splits that are out of reach of the back window. Change-Id: I6f5431945ad9873d280eeb0f6b4907ade493406b --- doc/source/architecture.rst | 24 +-- gnocchi/carbonara.py | 157 +++++++++++++----- gnocchi/storage/_carbonara.py | 16 +- gnocchi/storage/file.py | 5 +- gnocchi/storage/swift.py | 5 +- gnocchi/tests/test_carbonara.py | 8 +- gnocchi/tests/test_storage.py | 57 ++++++- .../storage-engine-v3-b34bd0723abf292f.yaml | 13 ++ 8 files changed, 202 insertions(+), 83 deletions(-) create mode 100644 releasenotes/notes/storage-engine-v3-b34bd0723abf292f.yaml diff --git a/doc/source/architecture.rst b/doc/source/architecture.rst index 9f340e05c..d7c682f6a 100755 --- a/doc/source/architecture.rst +++ b/doc/source/architecture.rst @@ -59,37 +59,29 @@ How to plan for Gnocchi’s storage Gnocchi uses a custom file format based on its library *Carbonara*. In Gnocchi, a time series is a collection of points, where a point is a given measure, or sample, in the lifespan of a time series. The storage format is compressed -using various techniques, therefore the computing of a time series' size can -be estimated based on its worst case scenario with the following formula:: +using various techniques, therefore the computing of a time series' size can be +estimated based on its **worst** case scenario with the following formula:: - number of points × 9 bytes = size in bytes + number of points × 8 bytes = size in bytes The number of points you want to keep is usually determined by the following formula:: - number of points = timespan ÷ granularity + number of points = timespan ÷ granularity For example, if you want to keep a year of data with a one minute resolution:: - number of points = (365 days × 24 hours × 60 minutes) ÷ 1 minute - number of points = 525 600 + number of points = (365 days × 24 hours × 60 minutes) ÷ 1 minute + number of points = 525 600 Then:: - size in bytes = 525 600 × 9 = 4 730 400 bytes = 4 620 KiB + size in bytes = 525 600 × 8 = 4 204 800 bytes = 4 106 KiB This is just for a single aggregated time series. If your archive policy uses the 8 default aggregation methods (mean, min, max, sum, std, median, count, 95pct) with the same "one year, one minute aggregations" resolution, the space -used will go up to a maximum of 8 × 4.5 MiB = 36 MiB. - -.. note:: - - The Ceph driver does not utilize compression as the Swift and File drivers - do in favour of more efficient write support. Therefore, each point is - always 9B in Ceph where as the Swift and File backends may have a smaller - storage footprint but higher I/O requirements. It also requires some - additional formatting which may add to disk size. +used will go up to a maximum of 8 × 4.1 MiB = 32.8 MiB. How to set the archive policy and granularity diff --git a/gnocchi/carbonara.py b/gnocchi/carbonara.py index 215d74bf8..25833a0a7 100644 --- a/gnocchi/carbonara.py +++ b/gnocchi/carbonara.py @@ -28,6 +28,7 @@ import struct import time import iso8601 +import lz4 import msgpack import pandas import six @@ -343,7 +344,8 @@ class AggregatedTimeSerie(TimeSerie): _AGG_METHOD_PCT_RE = re.compile(r"([1-9][0-9]?)pct") - SERIAL_LEN = struct.calcsize("= next(key) key_as_str = str(key) if write_full: - offset = None try: existing = self._get_measures_and_unserialize( metric, key_as_str, aggregation, @@ -243,15 +242,16 @@ class CarbonaraBasedStorage(storage.StorageDriver): pass else: if existing is not None: - if split is not None: - # FIXME(jd) not update but rather concat ts + split - existing.update(split) - split = existing - else: - offset = split.offset_from_timestamp(key) + if split is None: + split = existing + else: + split.merge(existing) + + offset, data = split.serialize(key, compressed=write_full) + return self._store_metric_measures( metric, key_as_str, aggregation, archive_policy_def.granularity, - split.serialize(key, write_full), offset=offset) + data, offset=offset) def _add_measures(self, aggregation, archive_policy_def, metric, timeserie, diff --git a/gnocchi/storage/file.py b/gnocchi/storage/file.py index 7b5bb53be..10ff4c686 100644 --- a/gnocchi/storage/file.py +++ b/gnocchi/storage/file.py @@ -22,7 +22,6 @@ import shutil import tempfile import uuid -import lz4 from oslo_config import cfg import six @@ -240,7 +239,7 @@ class FileStorage(_carbonara.CarbonaraBasedStorage): self._build_metric_path_for_split(metric, aggregation, timestamp_key, granularity, version), - lz4.dumps(data)) + data) def _delete_metric(self, metric): path = self._build_metric_dir(metric) @@ -258,7 +257,7 @@ class FileStorage(_carbonara.CarbonaraBasedStorage): metric, aggregation, timestamp_key, granularity, version) try: with open(path, 'rb') as aggregation_file: - return lz4.loads(aggregation_file.read()) + return aggregation_file.read() except IOError as e: if e.errno == errno.ENOENT: if os.path.exists(self._build_metric_dir(metric)): diff --git a/gnocchi/storage/swift.py b/gnocchi/storage/swift.py index 3c4d1004c..ee2bf7a13 100644 --- a/gnocchi/storage/swift.py +++ b/gnocchi/storage/swift.py @@ -18,7 +18,6 @@ import contextlib import datetime import uuid -import lz4 from oslo_config import cfg from oslo_log import log import six @@ -195,7 +194,7 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage): self._container_name(metric), self._object_name(timestamp_key, aggregation, granularity, version), - lz4.dumps(data)) + data) def _delete_metric_measures(self, metric, timestamp_key, aggregation, granularity, version=3): @@ -239,7 +238,7 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage): raise raise storage.AggregationDoesNotExist(metric, aggregation) raise - return lz4.loads(contents) + return contents def _list_split_keys_for_metric(self, metric, aggregation, granularity, version=None): diff --git a/gnocchi/tests/test_carbonara.py b/gnocchi/tests/test_carbonara.py index 9ea38dae4..f674d0bd6 100644 --- a/gnocchi/tests/test_carbonara.py +++ b/gnocchi/tests/test_carbonara.py @@ -16,6 +16,7 @@ import datetime import math +import fixtures from oslo_utils import timeutils from oslotest import base # TODO(jd) We shouldn't use pandas here @@ -27,6 +28,7 @@ from gnocchi import carbonara class TestBoundTimeSerie(base.BaseTestCase): def test_benchmark(self): + self.useFixture(fixtures.Timeout(120, gentle=True)) carbonara.AggregatedTimeSerie.benchmark() @staticmethod @@ -173,8 +175,9 @@ class TestAggregatedTimeSerie(base.BaseTestCase): # Serialize and unserialize key = ts.get_split_key() + o, s = ts.serialize(key) ts = carbonara.AggregatedTimeSerie.unserialize( - ts.serialize(key), key, '74pct', 60) + s, key, '74pct', ts.sampling) ts.update(carbonara.TimeSerie.from_tuples( [(datetime.datetime(2014, 1, 1, 12, 0, 0), 3), @@ -613,9 +616,10 @@ class TestAggregatedTimeSerie(base.BaseTestCase): ], before_truncate_callback=ts.update) key = ts.get_split_key() + o, s = ts.serialize(key) self.assertEqual(ts, carbonara.AggregatedTimeSerie.unserialize( - ts.serialize(key), key, + s, key, 'mean', 0.5)) def test_no_truncation(self): diff --git a/gnocchi/tests/test_storage.py b/gnocchi/tests/test_storage.py index 6553df372..f56b412ba 100644 --- a/gnocchi/tests/test_storage.py +++ b/gnocchi/tests/test_storage.py @@ -225,7 +225,15 @@ class TestStorageDriver(tests_base.TestCase): self.metric, "mean", 300.0)) def test_rewrite_measures(self): - self.metric, metric_sql = self._create_metric("high") + # Create an archive policy that spans on several splits. Each split + # being 3600 points, let's go for 36k points so we have 10 splits. + apname = str(uuid.uuid4()) + ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)]) + self.index.create_archive_policy(ap) + self.metric = storage.Metric(uuid.uuid4(), ap) + self.index.create_metric(self.metric.id, str(uuid.uuid4()), + str(uuid.uuid4()), + apname) # First store some points scattered across different splits self.storage.add_measures(self.metric, [ @@ -236,10 +244,27 @@ class TestStorageDriver(tests_base.TestCase): ]) self.trigger_processing() - self.assertEqual({'1451520000.0', '1451736000.0', '1451952000.0'}, + splits = {'1451520000.0', '1451736000.0', '1451952000.0'} + self.assertEqual(splits, self.storage._list_split_keys_for_metric( self.metric, "mean", 60.0)) + if self.storage.WRITE_FULL: + assertCompressedIfWriteFull = self.assertTrue + else: + assertCompressedIfWriteFull = self.assertFalse + + data = self.storage._get_measures( + self.metric, '1451520000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451736000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451952000.0', "mean", 60.0) + assertCompressedIfWriteFull( + carbonara.AggregatedTimeSerie.is_compressed(data)) + self.assertEqual([ (utils.datetime_utc(2016, 1, 1, 12), 60.0, 69), (utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42), @@ -249,26 +274,40 @@ class TestStorageDriver(tests_base.TestCase): # Now store brand new points that should force a rewrite of one of the # split (keep in mind the back window size in one hour here). We move - # the BoundTimeSerie processing timeserie to be between - # "2016-01-07 16:12:45" and "2016-01-07 17:12:45". + # the BoundTimeSerie processing timeserie far away from its current + # range. self.storage.add_measures(self.metric, [ - storage.Measure(datetime.datetime(2016, 1, 7, 16, 18, 45), 45), - storage.Measure(datetime.datetime(2016, 1, 7, 17, 12, 45), 46), + storage.Measure(datetime.datetime(2016, 1, 10, 16, 18, 45), 45), + storage.Measure(datetime.datetime(2016, 1, 10, 17, 12, 45), 46), ]) self.trigger_processing() - self.assertEqual({'1452168000.0', '1451736000.0', + self.assertEqual({'1452384000.0', '1451736000.0', '1451520000.0', '1451952000.0'}, self.storage._list_split_keys_for_metric( self.metric, "mean", 60.0)) + data = self.storage._get_measures( + self.metric, '1451520000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451736000.0', "mean", 60.0) + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1451952000.0', "mean", 60.0) + # Now this one is compressed because it has been rewritten! + self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data)) + data = self.storage._get_measures( + self.metric, '1452384000.0', "mean", 60.0) + assertCompressedIfWriteFull( + carbonara.AggregatedTimeSerie.is_compressed(data)) self.assertEqual([ (utils.datetime_utc(2016, 1, 1, 12), 60.0, 69), (utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42), (utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4), (utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44), - (utils.datetime_utc(2016, 1, 7, 16, 18), 60.0, 45), - (utils.datetime_utc(2016, 1, 7, 17, 12), 60.0, 46), + (utils.datetime_utc(2016, 1, 10, 16, 18), 60.0, 45), + (utils.datetime_utc(2016, 1, 10, 17, 12), 60.0, 46), ], self.storage.get_measures(self.metric, granularity=60.0)) def test_updated_measures(self): diff --git a/releasenotes/notes/storage-engine-v3-b34bd0723abf292f.yaml b/releasenotes/notes/storage-engine-v3-b34bd0723abf292f.yaml new file mode 100644 index 000000000..83aa09df8 --- /dev/null +++ b/releasenotes/notes/storage-engine-v3-b34bd0723abf292f.yaml @@ -0,0 +1,13 @@ +--- +features: + - The Carbonara based storage engine has been updated and greatly improved. + It now features fast write for Ceph (no change for file and Swift based + drivers) by using an append method. + It also features on the fly data compression (using LZ4) of the aggregated + time serie, reducing the data space usage by at least 50 %. +upgrade: + - gnocchi-upgrade must be run before running the new version of + gnocchi-metric and the HTTP REST API in order to upgrade from version 2 of + the Carbonara storage engine to version 3. It will read all metrics and + convert them to new version 3 serialization format (compressing the data), + which might take some time.