ceph: make computed measures read async
With this change Gnocchi will fetch all measures associated with a metric in parallel using the Ceph aio API. Change-Id: Idcab0eb447962f0d3c049f301b543806b673c869
This commit is contained in:
parent
c5779e4dc2
commit
e2732aff9a
@ -17,6 +17,7 @@ from collections import defaultdict
|
||||
import contextlib
|
||||
import datetime
|
||||
import errno
|
||||
import functools
|
||||
import itertools
|
||||
import uuid
|
||||
|
||||
@ -234,9 +235,35 @@ class CephStorage(_carbonara.CarbonaraBasedStorage):
|
||||
object_names = list(self._list_object_names_to_process(object_prefix))
|
||||
|
||||
measures = []
|
||||
for n in object_names:
|
||||
data = self._get_object_content(n)
|
||||
measures.extend(self._unserialize_measures(n, data))
|
||||
ops = []
|
||||
bufsize = 8192 # Same sa rados_read one
|
||||
|
||||
tmp_measures = {}
|
||||
|
||||
def add_to_measures(name, comp, data):
|
||||
if name in tmp_measures:
|
||||
tmp_measures[name] += data
|
||||
else:
|
||||
tmp_measures[name] = data
|
||||
if len(data) < bufsize:
|
||||
measures.extend(self._unserialize_measures(name,
|
||||
tmp_measures[name]))
|
||||
del tmp_measures[name]
|
||||
else:
|
||||
ops.append(self.ioctx.aio_read(
|
||||
name, bufsize, len(tmp_measures[name]),
|
||||
functools.partial(add_to_measures, name)
|
||||
))
|
||||
|
||||
for name in object_names:
|
||||
ops.append(self.ioctx.aio_read(
|
||||
name, bufsize, 0,
|
||||
functools.partial(add_to_measures, name)
|
||||
))
|
||||
|
||||
while ops:
|
||||
op = ops.pop()
|
||||
op.wait_for_complete_and_cb()
|
||||
|
||||
yield measures
|
||||
|
||||
|
4
releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml
Normal file
4
releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml
Normal file
@ -0,0 +1,4 @@
|
||||
---
|
||||
other:
|
||||
- ceph driver now uses the rados async api to retrieve
|
||||
measurements to process in parallel.
|
Loading…
Reference in New Issue
Block a user