carbonara: compress non padded timeseries

This creates 2 distinct formats available when serializing
AggregatedTimeSerie: once is "padded" so it can be written based on
offset (Ceph) and the other one is "compressed" so it can be written to
object stores (Swift, file). This compressed format is also used to
store timeseries splits that are out of reach of the back window.

Change-Id: I6f5431945ad9873d280eeb0f6b4907ade493406b
This commit is contained in:
Julien Danjou 2016-08-19 20:41:47 +02:00
parent 1a425863e9
commit c74c5c9289
8 changed files with 202 additions and 83 deletions

View File

@ -59,37 +59,29 @@ How to plan for Gnocchis storage
Gnocchi uses a custom file format based on its library *Carbonara*. In Gnocchi,
a time series is a collection of points, where a point is a given measure, or
sample, in the lifespan of a time series. The storage format is compressed
using various techniques, therefore the computing of a time series' size can
be estimated based on its worst case scenario with the following formula::
using various techniques, therefore the computing of a time series' size can be
estimated based on its **worst** case scenario with the following formula::
number of points × 9 bytes = size in bytes
number of points × 8 bytes = size in bytes
The number of points you want to keep is usually determined by the following
formula::
number of points = timespan ÷ granularity
number of points = timespan ÷ granularity
For example, if you want to keep a year of data with a one minute resolution::
number of points = (365 days × 24 hours × 60 minutes) ÷ 1 minute
number of points = 525 600
number of points = (365 days × 24 hours × 60 minutes) ÷ 1 minute
number of points = 525 600
Then::
size in bytes = 525 600 × 9 = 4 730 400 bytes = 4 620 KiB
size in bytes = 525 600 × 8 = 4 204 800 bytes = 4 106 KiB
This is just for a single aggregated time series. If your archive policy uses
the 8 default aggregation methods (mean, min, max, sum, std, median, count,
95pct) with the same "one year, one minute aggregations" resolution, the space
used will go up to a maximum of 8 × 4.5 MiB = 36 MiB.
.. note::
The Ceph driver does not utilize compression as the Swift and File drivers
do in favour of more efficient write support. Therefore, each point is
always 9B in Ceph where as the Swift and File backends may have a smaller
storage footprint but higher I/O requirements. It also requires some
additional formatting which may add to disk size.
used will go up to a maximum of 8 × 4.1 MiB = 32.8 MiB.
How to set the archive policy and granularity

View File

@ -28,6 +28,7 @@ import struct
import time
import iso8601
import lz4
import msgpack
import pandas
import six
@ -343,7 +344,8 @@ class AggregatedTimeSerie(TimeSerie):
_AGG_METHOD_PCT_RE = re.compile(r"([1-9][0-9]?)pct")
SERIAL_LEN = struct.calcsize("<?d")
PADDED_SERIAL_LEN = struct.calcsize("<?d")
COMPRESSED_SERIAL_LEN = struct.calcsize("<Hd")
def __init__(self, sampling, aggregation_method,
ts=None, max_size=None):
@ -413,20 +415,43 @@ class AggregatedTimeSerie(TimeSerie):
self.aggregation_method,
)
@staticmethod
def is_compressed(serialized_data):
"""Check whatever the data was serialized with compression."""
return six.indexbytes(serialized_data, 0) == ord("c")
@classmethod
def unserialize(cls, data, start, agg_method, sampling):
x, y = [], []
start = float(start)
v_len = len(data) // cls.SERIAL_LEN
# NOTE(gordc): use '<' for standardized, little-endian byte order
deserial = struct.unpack('<' + '?d' * v_len, data)
# alternating split into 2 list and drop items with False flag
for i, val in itertools.compress(six.moves.zip(six.moves.range(v_len),
deserial[1::2]),
deserial[::2]):
x.append(val)
y.append(start + (i * sampling))
y = pandas.to_datetime(y, unit='s')
if data:
if cls.is_compressed(data):
# Compressed format
uncompressed = lz4.loads(memoryview(data)[1:].tobytes())
nb_points = len(uncompressed) // cls.COMPRESSED_SERIAL_LEN
deserial = struct.unpack(
'<' + 'H' * nb_points + 'd' * nb_points,
uncompressed)
for delta in itertools.islice(deserial, nb_points):
ts = start + (delta * sampling)
y.append(ts)
start = ts
x = deserial[nb_points:]
else:
# Padded format
nb_points = len(data) // cls.PADDED_SERIAL_LEN
# NOTE(gordc): use '<' for standardized
# little-endian byte order
deserial = struct.unpack('<' + '?d' * nb_points, data)
# alternating split into 2 list and drop items with False flag
for i, val in itertools.compress(
six.moves.zip(six.moves.range(nb_points),
deserial[1::2]),
deserial[::2]):
x.append(val)
y.append(start + (i * sampling))
y = pandas.to_datetime(y, unit='s')
return cls.from_data(sampling, agg_method, y, x)
def get_split_key(self, timestamp=None):
@ -441,43 +466,58 @@ class AggregatedTimeSerie(TimeSerie):
return SplitKey.from_timestamp_and_sampling(
timestamp, self.sampling)
def serialize(self, start=None, padded=True):
def serialize(self, start, compressed=True):
"""Serialize an aggregated timeserie.
:param start: timestamp to start serialization
:param padded: pad the beginning of the serialization with zeroes
The serialization starts with a byte that indicate the serialization
format: 'c' for compressed format, '\x00' or '\x01' for uncompressed
format. Both format can be unserialized using the `unserialize` method.
The offset returned indicates at which offset the data should be
written from. In the case of compressed data, this is always 0.
:param start: Timestamp to start serialization at.
:param compressed: Serialize in a compressed format.
:return: a tuple of (offset, data)
"""
# NOTE(gordc): this binary serializes series based on the split time.
# the format is 1B True/False flag which denotes whether subsequent 8B
# is a real float or zero padding. every 9B represents one second from
# start time. this is intended to be run on data already split.
# ie. False,0,True,0 serialization means start datapoint is padding,
# and 1s after start time, the aggregate value is 0.
if not self.ts.index.is_monotonic:
self.ts = self.ts.sort_index()
offset_div = self.sampling * 10e8
if padded:
if start is None:
start = pandas.Timestamp(start).value
else:
start = self.get_split_key().value
else:
start = self.first.value
start = pandas.Timestamp(start).value
# calculate how many seconds from start the series runs until and
# initialize list to store alternating delimiter, float entries
e_offset = int((self.last.value - start) // (self.sampling * 10e8)) + 1
if compressed:
# NOTE(jd) Use a double delta encoding for timestamps
timestamps = []
for i in self.ts.index:
v = i.value
timestamps.append(int((v - start) // offset_div))
start = v
values = self.ts.values.tolist()
return None, b"c" + lz4.dumps(struct.pack(
'<' + 'H' * len(timestamps) + 'd' * len(values),
*(timestamps + values)))
# NOTE(gordc): this binary serializes series based on the split
# time. the format is 1B True/False flag which denotes whether
# subsequent 8B is a real float or zero padding. every 9B
# represents one second from start time. this is intended to be run
# on data already split. ie. False,0,True,0 serialization means
# start datapoint is padding, and 1s after start time, the
# aggregate value is 0. calculate how many seconds from start the
# series runs until and initialize list to store alternating
# delimiter, float entries
e_offset = int(
(self.last.value - self.first.value) // offset_div) + 1
serial = [False] * e_offset * 2
first = self.first.value # NOTE(jd) needed because faster
for i, v in self.ts.iteritems():
# overwrite zero padding with real points and set flag True
loc = int((i.value - start) // offset_div)
loc = int((i.value - first) // offset_div)
serial[loc * 2] = True
serial[loc * 2 + 1] = float(v)
return struct.pack('<' + '?d' * e_offset, *serial)
def offset_from_timestamp(self, timestamp):
return int((self.first.value - pandas.Timestamp(timestamp).value)
// (self.sampling * 10e8)
* self.SERIAL_LEN)
offset = int((first - start) // offset_div) * self.PADDED_SERIAL_LEN
return offset, struct.pack('<' + '?d' * e_offset, *serial)
def _truncate(self, quick=False):
"""Truncate the timeserie."""
@ -523,6 +563,14 @@ class AggregatedTimeSerie(TimeSerie):
for timestamp, value
in six.iteritems(points)]
def merge(self, ts):
"""Merge a timeserie into this one.
This is equivalent to `update` but is faster as they are is no
resampling. Be careful on what you merge.
"""
self.ts = self.ts.combine_first(ts.ts)
def update(self, ts):
if ts.ts.empty:
return
@ -558,7 +606,7 @@ class AggregatedTimeSerie(TimeSerie):
"""Run a speed benchmark!"""
points = SplitKey.POINTS_PER_SPLIT
sampling = 5
compress_times = 50
serialize_times = 50
now = datetime.datetime(2015, 4, 3, 23, 11)
@ -581,20 +629,45 @@ class AggregatedTimeSerie(TimeSerie):
("random ", [random.random()
for x in six.moves.range(points)]),
]:
print(title)
pts = pandas.Series(values,
[now + datetime.timedelta(seconds=i*sampling)
for i in six.moves.range(points)])
ts = cls(ts=pts, sampling=sampling, aggregation_method='mean')
t0 = time.time()
key = ts.get_split_key()
for i in six.moves.range(compress_times):
s = ts.serialize(key)
for i in six.moves.range(serialize_times):
e, s = ts.serialize(key, compressed=False)
t1 = time.time()
print(title)
print(" Bytes per point: %.2f" % (len(s) / float(points)))
print(" Serialization speed: %.2f MB/s"
print(" Uncompressed serialization speed: %.2f MB/s"
% (((points * 2 * 8)
/ ((t1 - t0) / compress_times)) / (1024.0 * 1024.0)))
/ ((t1 - t0) / serialize_times)) / (1024.0 * 1024.0)))
print(" Bytes per point: %.2f" % (len(s) / float(points)))
t0 = time.time()
for i in six.moves.range(serialize_times):
cls.unserialize(s, key, 'mean', sampling)
t1 = time.time()
print(" Unserialization speed: %.2f MB/s"
% (((points * 2 * 8)
/ ((t1 - t0) / serialize_times)) / (1024.0 * 1024.0)))
t0 = time.time()
for i in six.moves.range(serialize_times):
o, s = ts.serialize(key, compressed=True)
t1 = time.time()
print(" Compressed serialization speed: %.2f MB/s"
% (((points * 2 * 8)
/ ((t1 - t0) / serialize_times)) / (1024.0 * 1024.0)))
print(" Bytes per point: %.2f" % (len(s) / float(points)))
t0 = time.time()
for i in six.moves.range(serialize_times):
cls.unserialize(s, key, 'mean', sampling)
t1 = time.time()
print(" Uncompression speed: %.2f MB/s"
% (((points * 2 * 8)
/ ((t1 - t0) / serialize_times)) / (1024.0 * 1024.0)))
@staticmethod
def aggregated(timeseries, aggregation, from_timestamp=None,

View File

@ -234,7 +234,6 @@ class CarbonaraBasedStorage(storage.StorageDriver):
write_full = self.WRITE_FULL or oldest_mutable_timestamp >= next(key)
key_as_str = str(key)
if write_full:
offset = None
try:
existing = self._get_measures_and_unserialize(
metric, key_as_str, aggregation,
@ -243,15 +242,16 @@ class CarbonaraBasedStorage(storage.StorageDriver):
pass
else:
if existing is not None:
if split is not None:
# FIXME(jd) not update but rather concat ts + split
existing.update(split)
split = existing
else:
offset = split.offset_from_timestamp(key)
if split is None:
split = existing
else:
split.merge(existing)
offset, data = split.serialize(key, compressed=write_full)
return self._store_metric_measures(
metric, key_as_str, aggregation, archive_policy_def.granularity,
split.serialize(key, write_full), offset=offset)
data, offset=offset)
def _add_measures(self, aggregation, archive_policy_def,
metric, timeserie,

View File

@ -22,7 +22,6 @@ import shutil
import tempfile
import uuid
import lz4
from oslo_config import cfg
import six
@ -240,7 +239,7 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
self._build_metric_path_for_split(metric, aggregation,
timestamp_key, granularity,
version),
lz4.dumps(data))
data)
def _delete_metric(self, metric):
path = self._build_metric_dir(metric)
@ -258,7 +257,7 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
metric, aggregation, timestamp_key, granularity, version)
try:
with open(path, 'rb') as aggregation_file:
return lz4.loads(aggregation_file.read())
return aggregation_file.read()
except IOError as e:
if e.errno == errno.ENOENT:
if os.path.exists(self._build_metric_dir(metric)):

View File

@ -18,7 +18,6 @@ import contextlib
import datetime
import uuid
import lz4
from oslo_config import cfg
from oslo_log import log
import six
@ -195,7 +194,7 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
self._container_name(metric),
self._object_name(timestamp_key, aggregation, granularity,
version),
lz4.dumps(data))
data)
def _delete_metric_measures(self, metric, timestamp_key, aggregation,
granularity, version=3):
@ -239,7 +238,7 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
raise
raise storage.AggregationDoesNotExist(metric, aggregation)
raise
return lz4.loads(contents)
return contents
def _list_split_keys_for_metric(self, metric, aggregation, granularity,
version=None):

View File

@ -16,6 +16,7 @@
import datetime
import math
import fixtures
from oslo_utils import timeutils
from oslotest import base
# TODO(jd) We shouldn't use pandas here
@ -27,6 +28,7 @@ from gnocchi import carbonara
class TestBoundTimeSerie(base.BaseTestCase):
def test_benchmark(self):
self.useFixture(fixtures.Timeout(120, gentle=True))
carbonara.AggregatedTimeSerie.benchmark()
@staticmethod
@ -173,8 +175,9 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
# Serialize and unserialize
key = ts.get_split_key()
o, s = ts.serialize(key)
ts = carbonara.AggregatedTimeSerie.unserialize(
ts.serialize(key), key, '74pct', 60)
s, key, '74pct', ts.sampling)
ts.update(carbonara.TimeSerie.from_tuples(
[(datetime.datetime(2014, 1, 1, 12, 0, 0), 3),
@ -613,9 +616,10 @@ class TestAggregatedTimeSerie(base.BaseTestCase):
], before_truncate_callback=ts.update)
key = ts.get_split_key()
o, s = ts.serialize(key)
self.assertEqual(ts,
carbonara.AggregatedTimeSerie.unserialize(
ts.serialize(key), key,
s, key,
'mean', 0.5))
def test_no_truncation(self):

View File

@ -225,7 +225,15 @@ class TestStorageDriver(tests_base.TestCase):
self.metric, "mean", 300.0))
def test_rewrite_measures(self):
self.metric, metric_sql = self._create_metric("high")
# Create an archive policy that spans on several splits. Each split
# being 3600 points, let's go for 36k points so we have 10 splits.
apname = str(uuid.uuid4())
ap = archive_policy.ArchivePolicy(apname, 0, [(36000, 60)])
self.index.create_archive_policy(ap)
self.metric = storage.Metric(uuid.uuid4(), ap)
self.index.create_metric(self.metric.id, str(uuid.uuid4()),
str(uuid.uuid4()),
apname)
# First store some points scattered across different splits
self.storage.add_measures(self.metric, [
@ -236,10 +244,27 @@ class TestStorageDriver(tests_base.TestCase):
])
self.trigger_processing()
self.assertEqual({'1451520000.0', '1451736000.0', '1451952000.0'},
splits = {'1451520000.0', '1451736000.0', '1451952000.0'}
self.assertEqual(splits,
self.storage._list_split_keys_for_metric(
self.metric, "mean", 60.0))
if self.storage.WRITE_FULL:
assertCompressedIfWriteFull = self.assertTrue
else:
assertCompressedIfWriteFull = self.assertFalse
data = self.storage._get_measures(
self.metric, '1451520000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451736000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451952000.0', "mean", 60.0)
assertCompressedIfWriteFull(
carbonara.AggregatedTimeSerie.is_compressed(data))
self.assertEqual([
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
@ -249,26 +274,40 @@ class TestStorageDriver(tests_base.TestCase):
# Now store brand new points that should force a rewrite of one of the
# split (keep in mind the back window size in one hour here). We move
# the BoundTimeSerie processing timeserie to be between
# "2016-01-07 16:12:45" and "2016-01-07 17:12:45".
# the BoundTimeSerie processing timeserie far away from its current
# range.
self.storage.add_measures(self.metric, [
storage.Measure(datetime.datetime(2016, 1, 7, 16, 18, 45), 45),
storage.Measure(datetime.datetime(2016, 1, 7, 17, 12, 45), 46),
storage.Measure(datetime.datetime(2016, 1, 10, 16, 18, 45), 45),
storage.Measure(datetime.datetime(2016, 1, 10, 17, 12, 45), 46),
])
self.trigger_processing()
self.assertEqual({'1452168000.0', '1451736000.0',
self.assertEqual({'1452384000.0', '1451736000.0',
'1451520000.0', '1451952000.0'},
self.storage._list_split_keys_for_metric(
self.metric, "mean", 60.0))
data = self.storage._get_measures(
self.metric, '1451520000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451736000.0', "mean", 60.0)
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1451952000.0', "mean", 60.0)
# Now this one is compressed because it has been rewritten!
self.assertTrue(carbonara.AggregatedTimeSerie.is_compressed(data))
data = self.storage._get_measures(
self.metric, '1452384000.0', "mean", 60.0)
assertCompressedIfWriteFull(
carbonara.AggregatedTimeSerie.is_compressed(data))
self.assertEqual([
(utils.datetime_utc(2016, 1, 1, 12), 60.0, 69),
(utils.datetime_utc(2016, 1, 2, 13, 7), 60.0, 42),
(utils.datetime_utc(2016, 1, 4, 14, 9), 60.0, 4),
(utils.datetime_utc(2016, 1, 6, 15, 12), 60.0, 44),
(utils.datetime_utc(2016, 1, 7, 16, 18), 60.0, 45),
(utils.datetime_utc(2016, 1, 7, 17, 12), 60.0, 46),
(utils.datetime_utc(2016, 1, 10, 16, 18), 60.0, 45),
(utils.datetime_utc(2016, 1, 10, 17, 12), 60.0, 46),
], self.storage.get_measures(self.metric, granularity=60.0))
def test_updated_measures(self):

View File

@ -0,0 +1,13 @@
---
features:
- The Carbonara based storage engine has been updated and greatly improved.
It now features fast write for Ceph (no change for file and Swift based
drivers) by using an append method.
It also features on the fly data compression (using LZ4) of the aggregated
time serie, reducing the data space usage by at least 50 %.
upgrade:
- gnocchi-upgrade must be run before running the new version of
gnocchi-metric and the HTTP REST API in order to upgrade from version 2 of
the Carbonara storage engine to version 3. It will read all metrics and
convert them to new version 3 serialization format (compressing the data),
which might take some time.