From b16da527a99d2f695d7223d982cfc5a4c5660895 Mon Sep 17 00:00:00 2001 From: Luka Peschke Date: Thu, 5 Sep 2019 10:02:53 +0200 Subject: [PATCH] Replace eventlet with futurist This replaces the "eventlet" library with "futurist". Futurist does not require monkey-patching of the standard library, is an openstack project and is simply an overlay to the python STL in python3, rather than a partial replacement. Change-Id: Ib4f3a59377144e1d37028dfc3df77180d79e0c46 --- cloudkitty/orchestrator.py | 48 +++++++++++-------- cloudkitty/tests/test_orchestrator.py | 1 + lower-constraints.txt | 2 +- ...entlet-with-futurist-60f1fe6474a5efcf.yaml | 10 ++++ requirements.txt | 2 +- 5 files changed, 41 insertions(+), 22 deletions(-) create mode 100644 releasenotes/notes/replace-eventlet-with-futurist-60f1fe6474a5efcf.yaml diff --git a/cloudkitty/orchestrator.py b/cloudkitty/orchestrator.py index f0abf2c3..ad7546f3 100644 --- a/cloudkitty/orchestrator.py +++ b/cloudkitty/orchestrator.py @@ -23,7 +23,8 @@ import sys import time import cotyledon -import eventlet +import futurist +from futurist import waiters from oslo_concurrency import lockutils from oslo_config import cfg from oslo_log import log as logging @@ -58,8 +59,14 @@ orchestrator_opts = [ default=multiprocessing.cpu_count(), min=1, help='Max nb of workers to run. Defaults to the nb of available CPUs'), - cfg.IntOpt('max_greenthreads', default=100, min=1, - help='Maximal number of greenthreads to use per worker.'), + cfg.IntOpt('max_threads', + # NOTE(peschk_l): This is the futurist default + default=multiprocessing.cpu_count() * 5, + min=1, + deprecated_name='max_greenthreads', + advanced=True, + help='Maximal number of threads to use per worker. Defaults to ' + '5 times the nb of available CPUs'), ] CONF.register_opts(orchestrator_opts, group='orchestrator') @@ -238,6 +245,8 @@ class Worker(BaseWorker): self._wait_time = CONF.collect.wait_periods * self._period self._tenant_id = tenant_id self._worker_id = worker_id + self._log_prefix = '[scope: {scope}, worker: {worker}] '.format( + scope=self._tenant_id, worker=self._worker_id) self._conf = ck_utils.load_conf(CONF.collect.metrics_conf) self._state = state.StateManager() self._check_state = functools.partial( @@ -267,34 +276,33 @@ class Worker(BaseWorker): return self._collect(metric, timestamp) except collector.NoDataCollected: LOG.info( - '[scope: {scope}, worker: {worker}] No data collected ' + self._log_prefix + 'No data collected ' 'for metric {metric} at timestamp {ts}'.format( - scope=self._tenant_id, - worker=self._worker_id, - metric=metric, - ts=timestamp) - ) + metric=metric, ts=timestamp)) return metric, None except Exception as e: LOG.warning( - '[scope: {scope}, worker: {worker}] Error while collecting' + self._log_prefix + 'Error while collecting' ' metric {metric} at timestamp {ts}: {e}. Exiting.'.format( - scope=self._tenant_id, - worker=self._worker_id, - metric=metric, - ts=timestamp, - e=e) - ) + metric=metric, ts=timestamp, e=e)) # FIXME(peschk_l): here we just exit, and the # collection will be retried during the next collect # cycle. In the future, we should implement a retrying # system in workers sys.exit(1) - return dict(filter( - lambda x: x[1] is not None, - eventlet.GreenPool(size=CONF.orchestrator.max_greenthreads).imap( - _get_result, metrics))) + with futurist.ThreadPoolExecutor( + max_workers=CONF.orchestrator.max_threads) as tpool: + futs = [tpool.submit(_get_result, metric) for metric in metrics] + LOG.debug(self._log_prefix + + 'Collecting {} metrics.'.format(len(metrics))) + results = [r.result() for r in waiters.wait_for_all(futs).done] + LOG.debug(self._log_prefix + 'Collecting {} metrics took {}s ' + 'total, with {}s average'.format( + tpool.statistics.executed, + tpool.statistics.runtime, + tpool.statistics.average_runtime)) + return dict(filter(lambda x: x[1] is not None, results)) def run(self): while True: diff --git a/cloudkitty/tests/test_orchestrator.py b/cloudkitty/tests/test_orchestrator.py index 25664169..9bccc030 100644 --- a/cloudkitty/tests/test_orchestrator.py +++ b/cloudkitty/tests/test_orchestrator.py @@ -177,6 +177,7 @@ class WorkerTest(tests.TestCase): def __init__(self): self._tenant_id = 'a' self._worker_id = '0' + self._log_prefix = '[IGNORE THIS MESSAGE]' self.worker = FakeWorker() self.worker._collect = mock.MagicMock() diff --git a/lower-constraints.txt b/lower-constraints.txt index 41e65abc..0827bff6 100644 --- a/lower-constraints.txt +++ b/lower-constraints.txt @@ -1,7 +1,6 @@ # requirements pbr==2.0.0 # Apache-2.0 alembic==0.8.0 # MIT -eventlet==0.18.2 # MIT keystonemiddleware==4.0.0 # Apache-2.0 gnocchiclient==2.5.0 # Apache-2.0 python-monascaclient==1.9.0 # Apache-2.0 @@ -31,6 +30,7 @@ influxdb==5.1.0 # MIT Flask==1.0.2 # BSD Flask-RESTful==0.3.5 # BSD cotyledon==1.5.0 # Apache-2.0 +futurist==1.6.0 # Apache-2.0 # test-requirements coverage==3.6 # Apache-2.0 diff --git a/releasenotes/notes/replace-eventlet-with-futurist-60f1fe6474a5efcf.yaml b/releasenotes/notes/replace-eventlet-with-futurist-60f1fe6474a5efcf.yaml new file mode 100644 index 00000000..eeceb994 --- /dev/null +++ b/releasenotes/notes/replace-eventlet-with-futurist-60f1fe6474a5efcf.yaml @@ -0,0 +1,10 @@ +--- +deprecations: + - | + Since ``eventlet`` has been replaced with ``futurist``, the + ``[orchestrator]/max_greenthreads`` option has been deprecated and + replaced with ``[orchestrator]/max_threads``. + +other: + - | + The ``eventlet`` library has been replaced with ``futurist``. diff --git a/requirements.txt b/requirements.txt index 7b10a7f5..24b324cd 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,7 +3,6 @@ # process, which may cause wedges in the gate later. pbr>=2.0.0,!=2.1.0 # Apache-2.0 alembic>=0.8.0 # MIT -eventlet>=0.18.2,!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0 # MIT keystonemiddleware>=4.0.0 # Apache-2.0 gnocchiclient>=2.5.0 # Apache-2.0 python-monascaclient>=1.9.0 # Apache-2.0 @@ -34,3 +33,4 @@ influxdb>=5.1.0;python_version>='3.0' # MIT Flask>=1.0.2 # BSD Flask-RESTful>=0.3.5 # BSD cotyledon>=1.5.0 # Apache-2.0 +futurist>=1.6.0 # Apache-2.0