s3: check for consistency after write
Change-Id: I50fb952279f736c88543867729abe3c9c7302463
This commit is contained in:
parent
ae136a04fc
commit
f74aec793d
@ -1,6 +1,6 @@
|
||||
# -*- encoding: utf-8 -*-
|
||||
#
|
||||
# Copyright © 2016 Red Hat, Inc.
|
||||
# Copyright © 2016-2017 Red Hat, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
@ -16,6 +16,7 @@
|
||||
import os
|
||||
|
||||
from oslo_config import cfg
|
||||
import tenacity
|
||||
|
||||
from gnocchi import storage
|
||||
from gnocchi.storage import _carbonara
|
||||
@ -42,6 +43,12 @@ OPTS = [
|
||||
max_length=26,
|
||||
default='gnocchi',
|
||||
help='Prefix to namespace metric bucket.'),
|
||||
cfg.FloatOpt('s3_check_consistency_timeout',
|
||||
min=0,
|
||||
default=60,
|
||||
help="Maximum time to wait checking data consistency when "
|
||||
"writing to S3. Set to 0 to disable data consistency "
|
||||
"validation."),
|
||||
]
|
||||
|
||||
|
||||
@ -54,12 +61,19 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
|
||||
WRITE_FULL = True
|
||||
|
||||
_consistency_wait = tenacity.wait_exponential(multiplier=0.1)
|
||||
|
||||
def __init__(self, conf, incoming):
|
||||
super(S3Storage, self).__init__(conf, incoming)
|
||||
self.s3, self._region_name, self._bucket_prefix = (
|
||||
s3.get_connection(conf)
|
||||
)
|
||||
self._bucket_name = '%s-aggregates' % self._bucket_prefix
|
||||
if conf.s3_check_consistency_timeout > 0:
|
||||
self._consistency_stop = tenacity.stop_after_delay(
|
||||
conf.s3_check_consistency_timeout)
|
||||
else:
|
||||
self._consistency_stop = None
|
||||
|
||||
def upgrade(self, index):
|
||||
super(S3Storage, self).upgrade(index)
|
||||
@ -81,9 +95,24 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
def _create_metric(self, metric):
|
||||
pass
|
||||
|
||||
def _put_object_safe(self, Bucket, Key, Body):
|
||||
put = self.s3.put_object(Bucket=Bucket, Key=Key, Body=Body)
|
||||
|
||||
if self._consistency_stop:
|
||||
|
||||
def _head():
|
||||
return self.s3.head_object(Bucket=Bucket,
|
||||
Key=Key, IfMatch=put['ETag'])
|
||||
|
||||
tenacity.Retrying(
|
||||
retry=tenacity.retry_if_result(
|
||||
lambda r: r['ETag'] != put['ETag']),
|
||||
wait=self._consistency_wait,
|
||||
stop=self._consistency_stop)(_head)
|
||||
|
||||
def _store_metric_measures(self, metric, timestamp_key, aggregation,
|
||||
granularity, data, offset=0, version=3):
|
||||
self.s3.put_object(
|
||||
self._put_object_safe(
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._prefix(metric) + self._object_name(
|
||||
timestamp_key, aggregation, granularity, version),
|
||||
@ -186,7 +215,7 @@ class S3Storage(_carbonara.CarbonaraBasedStorage):
|
||||
return response['Body'].read()
|
||||
|
||||
def _store_unaggregated_timeserie(self, metric, data, version=3):
|
||||
self.s3.put_object(
|
||||
self._put_object_safe(
|
||||
Bucket=self._bucket_name,
|
||||
Key=self._build_unaggregated_timeserie_path(metric, version),
|
||||
Body=data)
|
||||
|
@ -0,0 +1,9 @@
|
||||
---
|
||||
features:
|
||||
- |
|
||||
The S3 driver now checks for data consistency by default. S3 does not
|
||||
guarantee read-after-write consistency when overwriting data. Gnocchi now
|
||||
waits up to `s3_check_consistency_timeout` seconds before returning and
|
||||
unlocking a metric for new processing. This makes sure that the data that
|
||||
will be read by the next workers will be consistent and that no data will
|
||||
be lost. This feature can be disabled by setting the value to 0.
|
Loading…
Reference in New Issue
Block a user