From 07163cb627c9691a406ff4332e244abe228502e5 Mon Sep 17 00:00:00 2001 From: Luka Peschke Date: Thu, 11 Apr 2019 17:13:44 +0200 Subject: [PATCH] Retrieve metrics in eventlet greenthreads This updates the orchestrator in order to fetch metrics concurrently with an eventlet Greenpool instead of one after another. Change-Id: If19924175c9a5532dd272696a52daf5d8a21153b Story: 2005423 Task: 30446 --- cloudkitty/orchestrator.py | 102 +++++++++--------- cloudkitty/tests/test_orchestrator.py | 46 ++++++++ ...metrics-concurrently-dffffe346bd4900e.yaml | 7 ++ 3 files changed, 107 insertions(+), 48 deletions(-) create mode 100644 releasenotes/notes/fetch-metrics-concurrently-dffffe346bd4900e.yaml diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index f111dea7..b9bfb335 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -22,6 +22,7 @@ import sys import time import cotyledon +import eventlet from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging @@ -55,6 +56,8 @@ orchestrator_opts = [ default=multiprocessing.cpu_count(), min=1, help='Max nb of workers to run. Defaults to the nb of available CPUs'), + cfg.IntOpt('max_greenthreads', default=100, min=1, + help='Maximal number of greenthreads to use per worker.'), ] CONF.register_opts(orchestrator_opts, group='orchestrator') @@ -182,11 +185,48 @@ class Worker(BaseWorker): next_timestamp, self._tenant_id, ) + if not raw_data: + raise collector.NoDataCollected - if raw_data: - return [{'period': {'begin': start_timestamp, - 'end': next_timestamp}, - 'usage': raw_data}] + return {'period': {'begin': start_timestamp, + 'end': next_timestamp}, + 'usage': raw_data} + + def _do_collection(self, metrics, timestamp): + + def _get_result(metric): + try: + return self._collect(metric, timestamp) + except collector.NoDataCollected: + LOG.info( + '[scope: {scope}, worker: {worker}] No data collected ' + 'for metric {metric} at timestamp {ts}'.format( + scope=self._tenant_id, + worker=self._worker_id, + metric=metric, + ts=ck_utils.ts2dt(timestamp)) + ) + return None + except Exception as e: + LOG.warning( + '[scope: {scope}, worker: {worker}] Error while collecting' + ' metric {metric} at timestamp {ts}: {e}. Exiting.'.format( + scope=self._tenant_id, + worker=self._worker_id, + metric=metric, + ts=ck_utils.ts2dt(timestamp), + e=e) + ) + # FIXME(peschk_l): here we just exit, and the + # collection will be retried during the next collect + # cycle. In the future, we should implement a retrying + # system in workers + sys.exit(1) + + return list(filter( + lambda x: x is not None, + eventlet.GreenPool(size=CONF.orchestrator.max_greenthreads).imap( + _get_result, metrics))) def check_state(self): timestamp = self._state.get_state(self._tenant_id) @@ -202,50 +242,15 @@ class Worker(BaseWorker): metrics = list(self._conf['metrics'].keys()) - storage_data = [] - for metric in metrics: - try: - try: - data = self._collect(metric, timestamp) - except collector.NoDataCollected: - raise - except Exception as e: - LOG.warning( - '[scope: {scope}, worker: {worker}] Error while' - 'collecting metric {metric}: {error}. Retrying on ' - 'next collection cycle.'.format( - scope=self._tenant_id, - worker=self._worker_id, - metric=metric, - error=e, - ), - ) - # FIXME(peschk_l): here we just exit, and the - # collection will be retried during the next collect - # cycle. In the future, we should implement a retrying - # system in workers - sys.exit(0) - except collector.NoDataCollected: - LOG.info( - '[scope: {scope}, worker: {worker}] No data collected ' - 'for metric {metric} at timestamp {ts}'.format( - scope=self._tenant_id, - worker=self._worker_id, - metric=metric, - ts=ck_utils.ts2dt(timestamp)) - ) - else: - # Rating - for processor in self._processors: - processor.obj.process(data) - # Writing - if isinstance(data, list): - storage_data += data - else: - storage_data.append(data) + # Collection + data = self._do_collection(metrics, timestamp) - # We're getting a full period so we directly commit - self._storage.push(storage_data, self._tenant_id) + # Rating + for processor in self._processors: + processor.obj.process(data) + + # Writing + self._storage.push(data, self._tenant_id) self._state.set_state(self._tenant_id, timestamp) @@ -340,4 +345,5 @@ class OrchestratorServiceManager(cotyledon.ServiceManager): def __init__(self): super(OrchestratorServiceManager, self).__init__() - self.service_id = self.add(Orchestrator, workers=4) + self.service_id = self.add(Orchestrator, + workers=CONF.orchestrator.max_workers) diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index 6dad7afd..62fb3d0d 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -19,6 +19,7 @@ import mock from oslo_messaging import conffixture from stevedore import extension +from cloudkitty import collector from cloudkitty import orchestrator from cloudkitty import tests @@ -88,3 +89,48 @@ class OrchestratorTest(tests.TestCase): self.assertEqual(2, worker._processors[1].obj.priority) self.assertEqual('fake2', worker._processors[2].name) self.assertEqual(1, worker._processors[2].obj.priority) + + +class WorkerTest(tests.TestCase): + + def setUp(self): + super(WorkerTest, self).setUp() + + class FakeWorker(orchestrator.Worker): + def __init__(self): + self._tenant_id = 'a' + self._worker_id = '0' + + self.worker = FakeWorker() + self.worker._collect = mock.MagicMock() + + def test_do_collection_all_valid(self): + side_effect = [ + {'period': {'begin': 0, + 'end': 3600}, + 'usage': [i]} + for i in range(5) + ] + self.worker._collect.side_effect = side_effect + metrics = ['metric{}'.format(i) for i in range(5)] + output = sorted(self.worker._do_collection(metrics, 0), + key=lambda x: x['usage'][0]) + self.assertEqual(side_effect, output) + + def test_do_collection_some_empty(self): + side_effect = [ + {'period': {'begin': 0, + 'end': 3600}, + 'usage': [i]} + for i in range(5) + ] + side_effect.insert(2, collector.NoDataCollected('a', 'b')) + side_effect.insert(4, collector.NoDataCollected('a', 'b')) + self.worker._collect.side_effect = side_effect + metrics = ['metric{}'.format(i) for i in range(7)] + output = sorted(self.worker._do_collection(metrics, 0), + key=lambda x: x['usage'][0]) + self.assertEqual([ + i for i in side_effect + if not isinstance(i, collector.NoDataCollected) + ], output) diff --git a/releasenotes/notes/fetch-metrics-concurrently-dffffe346bd4900e.yaml b/releasenotes/notes/fetch-metrics-concurrently-dffffe346bd4900e.yaml new file mode 100644 index 00000000..bc323fa4 --- /dev/null +++ b/releasenotes/notes/fetch-metrics-concurrently-dffffe346bd4900e.yaml @@ -0,0 +1,7 @@ +--- +upgrade: + - | + Metrics are now fetched concurrently with ``eventlet`` instead of one + after another by the orchestrator, leading to a consequent performance + improvement. The maximum number of greenthreads to use can be specified + through the ``max_greenthreads`` option of the ``orchestrator`` section.