diff --git a/gnocchi/storage/ceph.py b/gnocchi/storage/ceph.py index 1d9a70892..273786ac1 100644 --- a/gnocchi/storage/ceph.py +++ b/gnocchi/storage/ceph.py @@ -17,6 +17,7 @@ from collections import defaultdict import contextlib import datetime import errno +import functools import itertools import uuid @@ -234,9 +235,35 @@ class CephStorage(_carbonara.CarbonaraBasedStorage): object_names = list(self._list_object_names_to_process(object_prefix)) measures = [] - for n in object_names: - data = self._get_object_content(n) - measures.extend(self._unserialize_measures(n, data)) + ops = [] + bufsize = 8192 # Same sa rados_read one + + tmp_measures = {} + + def add_to_measures(name, comp, data): + if name in tmp_measures: + tmp_measures[name] += data + else: + tmp_measures[name] = data + if len(data) < bufsize: + measures.extend(self._unserialize_measures(name, + tmp_measures[name])) + del tmp_measures[name] + else: + ops.append(self.ioctx.aio_read( + name, bufsize, len(tmp_measures[name]), + functools.partial(add_to_measures, name) + )) + + for name in object_names: + ops.append(self.ioctx.aio_read( + name, bufsize, 0, + functools.partial(add_to_measures, name) + )) + + while ops: + op = ops.pop() + op.wait_for_complete_and_cb() yield measures diff --git a/releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml b/releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml new file mode 100644 index 000000000..2dfe37dea --- /dev/null +++ b/releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml @@ -0,0 +1,4 @@ +--- +other: + - ceph driver now uses the rados async api to retrieve + measurements to process in parallel.