Replace eventlet with futurist
This replaces the "eventlet" library with "futurist". Futurist does not require monkey-patching of the standard library, is an openstack project and is simply an overlay to the python STL in python3, rather than a partial replacement. Change-Id: Ib4f3a59377144e1d37028dfc3df77180d79e0c46
This commit is contained in:
parent
3a47e50f09
commit
b16da527a9
@ -23,7 +23,8 @@ import sys
|
||||
import time
|
||||
|
||||
import cotyledon
|
||||
import eventlet
|
||||
import futurist
|
||||
from futurist import waiters
|
||||
from oslo_concurrency import lockutils
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
@ -58,8 +59,14 @@ orchestrator_opts = [
|
||||
default=multiprocessing.cpu_count(),
|
||||
min=1,
|
||||
help='Max nb of workers to run. Defaults to the nb of available CPUs'),
|
||||
cfg.IntOpt('max_greenthreads', default=100, min=1,
|
||||
help='Maximal number of greenthreads to use per worker.'),
|
||||
cfg.IntOpt('max_threads',
|
||||
# NOTE(peschk_l): This is the futurist default
|
||||
default=multiprocessing.cpu_count() * 5,
|
||||
min=1,
|
||||
deprecated_name='max_greenthreads',
|
||||
advanced=True,
|
||||
help='Maximal number of threads to use per worker. Defaults to '
|
||||
'5 times the nb of available CPUs'),
|
||||
]
|
||||
|
||||
CONF.register_opts(orchestrator_opts, group='orchestrator')
|
||||
@ -238,6 +245,8 @@ class Worker(BaseWorker):
|
||||
self._wait_time = CONF.collect.wait_periods * self._period
|
||||
self._tenant_id = tenant_id
|
||||
self._worker_id = worker_id
|
||||
self._log_prefix = '[scope: {scope}, worker: {worker}] '.format(
|
||||
scope=self._tenant_id, worker=self._worker_id)
|
||||
self._conf = ck_utils.load_conf(CONF.collect.metrics_conf)
|
||||
self._state = state.StateManager()
|
||||
self._check_state = functools.partial(
|
||||
@ -267,34 +276,33 @@ class Worker(BaseWorker):
|
||||
return self._collect(metric, timestamp)
|
||||
except collector.NoDataCollected:
|
||||
LOG.info(
|
||||
'[scope: {scope}, worker: {worker}] No data collected '
|
||||
self._log_prefix + 'No data collected '
|
||||
'for metric {metric} at timestamp {ts}'.format(
|
||||
scope=self._tenant_id,
|
||||
worker=self._worker_id,
|
||||
metric=metric,
|
||||
ts=timestamp)
|
||||
)
|
||||
metric=metric, ts=timestamp))
|
||||
return metric, None
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
'[scope: {scope}, worker: {worker}] Error while collecting'
|
||||
self._log_prefix + 'Error while collecting'
|
||||
' metric {metric} at timestamp {ts}: {e}. Exiting.'.format(
|
||||
scope=self._tenant_id,
|
||||
worker=self._worker_id,
|
||||
metric=metric,
|
||||
ts=timestamp,
|
||||
e=e)
|
||||
)
|
||||
metric=metric, ts=timestamp, e=e))
|
||||
# FIXME(peschk_l): here we just exit, and the
|
||||
# collection will be retried during the next collect
|
||||
# cycle. In the future, we should implement a retrying
|
||||
# system in workers
|
||||
sys.exit(1)
|
||||
|
||||
return dict(filter(
|
||||
lambda x: x[1] is not None,
|
||||
eventlet.GreenPool(size=CONF.orchestrator.max_greenthreads).imap(
|
||||
_get_result, metrics)))
|
||||
with futurist.ThreadPoolExecutor(
|
||||
max_workers=CONF.orchestrator.max_threads) as tpool:
|
||||
futs = [tpool.submit(_get_result, metric) for metric in metrics]
|
||||
LOG.debug(self._log_prefix +
|
||||
'Collecting {} metrics.'.format(len(metrics)))
|
||||
results = [r.result() for r in waiters.wait_for_all(futs).done]
|
||||
LOG.debug(self._log_prefix + 'Collecting {} metrics took {}s '
|
||||
'total, with {}s average'.format(
|
||||
tpool.statistics.executed,
|
||||
tpool.statistics.runtime,
|
||||
tpool.statistics.average_runtime))
|
||||
return dict(filter(lambda x: x[1] is not None, results))
|
||||
|
||||
def run(self):
|
||||
while True:
|
||||
|
@ -177,6 +177,7 @@ class WorkerTest(tests.TestCase):
|
||||
def __init__(self):
|
||||
self._tenant_id = 'a'
|
||||
self._worker_id = '0'
|
||||
self._log_prefix = '[IGNORE THIS MESSAGE]'
|
||||
|
||||
self.worker = FakeWorker()
|
||||
self.worker._collect = mock.MagicMock()
|
||||
|
@ -1,7 +1,6 @@
|
||||
# requirements
|
||||
pbr==2.0.0 # Apache-2.0
|
||||
alembic==0.8.0 # MIT
|
||||
eventlet==0.18.2 # MIT
|
||||
keystonemiddleware==4.0.0 # Apache-2.0
|
||||
gnocchiclient==2.5.0 # Apache-2.0
|
||||
python-monascaclient==1.9.0 # Apache-2.0
|
||||
@ -31,6 +30,7 @@ influxdb==5.1.0 # MIT
|
||||
Flask==1.0.2 # BSD
|
||||
Flask-RESTful==0.3.5 # BSD
|
||||
cotyledon==1.5.0 # Apache-2.0
|
||||
futurist==1.6.0 # Apache-2.0
|
||||
|
||||
# test-requirements
|
||||
coverage==3.6 # Apache-2.0
|
||||
|
@ -0,0 +1,10 @@
|
||||
---
|
||||
deprecations:
|
||||
- |
|
||||
Since ``eventlet`` has been replaced with ``futurist``, the
|
||||
``[orchestrator]/max_greenthreads`` option has been deprecated and
|
||||
replaced with ``[orchestrator]/max_threads``.
|
||||
|
||||
other:
|
||||
- |
|
||||
The ``eventlet`` library has been replaced with ``futurist``.
|
@ -3,7 +3,6 @@
|
||||
# process, which may cause wedges in the gate later.
|
||||
pbr>=2.0.0,!=2.1.0 # Apache-2.0
|
||||
alembic>=0.8.0 # MIT
|
||||
eventlet>=0.18.2,!=0.18.3,!=0.20.1,!=0.21.0,!=0.23.0 # MIT
|
||||
keystonemiddleware>=4.0.0 # Apache-2.0
|
||||
gnocchiclient>=2.5.0 # Apache-2.0
|
||||
python-monascaclient>=1.9.0 # Apache-2.0
|
||||
@ -34,3 +33,4 @@ influxdb>=5.1.0;python_version>='3.0' # MIT
|
||||
Flask>=1.0.2 # BSD
|
||||
Flask-RESTful>=0.3.5 # BSD
|
||||
cotyledon>=1.5.0 # Apache-2.0
|
||||
futurist>=1.6.0 # Apache-2.0
|
||||
|
Loading…
Reference in New Issue
Block a user