_carbonara: dedicated methods to store raw timeserie
This patches introduces _{get,store}_unaggregated_timeserie methods for
Carbonara-based drivers. It allows to really handle aggregated
timeseries and the unaggregated timeserie with different code path.
It's cleaner (and it'll come handy later when splitting TimeSerieArchive).
Change-Id: If779c19f14c7295b17a796fb29c30932f1590eb5
This commit is contained in:
@@ -75,6 +75,14 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||
def _get_measures(metric, aggregation):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def _get_unaggregated_timeserie(metric):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def _store_unaggregated_timeserie(metric, data):
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def _store_metric_measures(metric, aggregation, data):
|
||||
raise NotImplementedError
|
||||
@@ -175,8 +183,10 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||
with self._process_measure_for_metric(metric) as measures:
|
||||
try:
|
||||
with timeutils.StopWatch() as sw:
|
||||
raw_measures = self._get_measures(metric,
|
||||
'none')
|
||||
raw_measures = (
|
||||
self._get_unaggregated_timeserie(
|
||||
metric)
|
||||
)
|
||||
LOG.debug(
|
||||
"Retrieve unaggregated measures "
|
||||
"for %s in %.2fs"
|
||||
@@ -188,8 +198,6 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||
# Created in the mean time, do not worry
|
||||
pass
|
||||
ts = None
|
||||
except storage.AggregationDoesNotExist:
|
||||
ts = None
|
||||
else:
|
||||
try:
|
||||
ts = carbonara.BoundTimeSerie.unserialize(
|
||||
@@ -223,8 +231,8 @@ class CarbonaraBasedStorage(storage.StorageDriver):
|
||||
"in %.2f seconds"
|
||||
% (metric.id, len(measures), sw.elapsed()))
|
||||
|
||||
self._store_metric_measures(metric, 'none',
|
||||
ts.serialize())
|
||||
self._store_unaggregated_timeserie(metric,
|
||||
ts.serialize())
|
||||
except Exception:
|
||||
LOG.error("Error processing new measures", exc_info=True)
|
||||
finally:
|
||||
|
||||
@@ -189,6 +189,24 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
||||
else:
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
|
||||
def _get_unaggregated_timeserie(self, metric):
|
||||
try:
|
||||
with self._get_ioctx() as ioctx:
|
||||
name = self._get_object_name(metric, "none")
|
||||
content = self._get_object_content(ioctx, name)
|
||||
if len(content) == 0:
|
||||
# NOTE(sileht: the object have been created by
|
||||
# the lock code
|
||||
raise rados.ObjectNotFound
|
||||
return content
|
||||
except rados.ObjectNotFound:
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
|
||||
def _store_unaggregated_timeserie(self, metric, data):
|
||||
name = self._get_object_name(metric, "none")
|
||||
with self._get_ioctx() as ioctx:
|
||||
ioctx.write_full(name, data)
|
||||
|
||||
@staticmethod
|
||||
def _get_object_content(ioctx, name):
|
||||
offset = 0
|
||||
|
||||
@@ -70,6 +70,15 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
|
||||
def stop(self):
|
||||
self._lock.stop()
|
||||
|
||||
def _atomic_file_store(self, dest, data):
|
||||
tmpfile = self._get_tempfile()
|
||||
tmpfile.write(data)
|
||||
tmpfile.close()
|
||||
os.rename(tmpfile.name, dest)
|
||||
|
||||
def _build_unaggregated_timeserie_path(self, metric):
|
||||
return os.path.join(self.basepath, str(metric.id), "none")
|
||||
|
||||
def _build_metric_path(self, metric, aggregation=None):
|
||||
path = os.path.join(self.basepath, str(metric.id))
|
||||
if aggregation:
|
||||
@@ -165,11 +174,25 @@ class FileStorage(_carbonara.CarbonaraBasedStorage):
|
||||
|
||||
self._delete_measures_files_for_metric_id(metric.id, files)
|
||||
|
||||
def _store_unaggregated_timeserie(self, metric, data):
|
||||
self._atomic_file_store(
|
||||
self._build_unaggregated_timeserie_path(metric),
|
||||
data)
|
||||
|
||||
def _get_unaggregated_timeserie(self, metric):
|
||||
path = self._build_unaggregated_timeserie_path(metric)
|
||||
try:
|
||||
with open(path, 'rb') as f:
|
||||
return f.read()
|
||||
except IOError as e:
|
||||
if e.errno == errno.ENOENT:
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
raise
|
||||
|
||||
def _store_metric_measures(self, metric, aggregation, data):
|
||||
tmpfile = self._get_tempfile()
|
||||
tmpfile.write(data)
|
||||
tmpfile.close()
|
||||
os.rename(tmpfile.name, self._build_metric_path(metric, aggregation))
|
||||
self._atomic_file_store(
|
||||
self._build_metric_path(metric, aggregation),
|
||||
data)
|
||||
|
||||
def _delete_metric(self, metric):
|
||||
path = self._build_metric_path(metric)
|
||||
|
||||
@@ -169,3 +169,19 @@ class SwiftStorage(_carbonara.CarbonaraBasedStorage):
|
||||
raise storage.AggregationDoesNotExist(metric, aggregation)
|
||||
raise
|
||||
return contents
|
||||
|
||||
@retrying.retry(stop_max_attempt_number=4,
|
||||
wait_fixed=500,
|
||||
retry_on_result=retry_if_result_empty)
|
||||
def _get_unaggregated_timeserie(self, metric):
|
||||
try:
|
||||
headers, contents = self.swift.get_object(
|
||||
self._container_name(metric), "none")
|
||||
except swclient.ClientException as e:
|
||||
if e.http_status == 404:
|
||||
raise storage.MetricDoesNotExist(metric)
|
||||
raise
|
||||
return contents
|
||||
|
||||
def _store_unaggregated_timeserie(self, metric, data):
|
||||
self.swift.put_object(self._container_name(metric), "none", data)
|
||||
|
||||
Reference in New Issue
Block a user