From e2732aff9a3ec4d3b9bceca1fd317f92b3bfa85d Mon Sep 17 00:00:00 2001 From: Mehdi Abaakouk Date: Thu, 15 Sep 2016 16:39:58 +0200 Subject: [PATCH] ceph: make computed measures read async With this change Gnocchi will fetch all measures associated with a metric in parallel using the Ceph aio API. Change-Id: Idcab0eb447962f0d3c049f301b543806b673c869 --- gnocchi/storage/ceph.py | 33 +++++++++++++++++-- .../ceph-read-async-ca2f7512c6842adb.yaml | 4 +++ 2 files changed, 34 insertions(+), 3 deletions(-) create mode 100644 releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml diff --git a/gnocchi/storage/ceph.py b/gnocchi/storage/ceph.py index 1d9a7089..273786ac 100644 --- a/gnocchi/storage/ceph.py +++ b/gnocchi/storage/ceph.py @@ -17,6 +17,7 @@ from collections import defaultdict import contextlib import datetime import errno +import functools import itertools import uuid @@ -234,9 +235,35 @@ class CephStorage(_carbonara.CarbonaraBasedStorage): object_names = list(self._list_object_names_to_process(object_prefix)) measures = [] - for n in object_names: - data = self._get_object_content(n) - measures.extend(self._unserialize_measures(n, data)) + ops = [] + bufsize = 8192 # Same sa rados_read one + + tmp_measures = {} + + def add_to_measures(name, comp, data): + if name in tmp_measures: + tmp_measures[name] += data + else: + tmp_measures[name] = data + if len(data) < bufsize: + measures.extend(self._unserialize_measures(name, + tmp_measures[name])) + del tmp_measures[name] + else: + ops.append(self.ioctx.aio_read( + name, bufsize, len(tmp_measures[name]), + functools.partial(add_to_measures, name) + )) + + for name in object_names: + ops.append(self.ioctx.aio_read( + name, bufsize, 0, + functools.partial(add_to_measures, name) + )) + + while ops: + op = ops.pop() + op.wait_for_complete_and_cb() yield measures diff --git a/releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml b/releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml new file mode 100644 index 00000000..2dfe37de --- /dev/null +++ b/releasenotes/notes/ceph-read-async-ca2f7512c6842adb.yaml @@ -0,0 +1,4 @@ +--- +other: + - ceph driver now uses the rados async api to retrieve + measurements to process in parallel.