Merge "Remove dependency on oslo.utils (replace with small util code)"
This commit is contained in:
commit
807a6a83d3
@ -21,15 +21,17 @@ import threading
|
||||
from concurrent import futures as _futures
|
||||
from concurrent.futures import process as _process
|
||||
from concurrent.futures import thread as _thread
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import reflection
|
||||
from oslo_utils import timeutils
|
||||
import six
|
||||
|
||||
greenpatcher = importutils.try_import('eventlet.patcher')
|
||||
greenpool = importutils.try_import('eventlet.greenpool')
|
||||
greenqueue = importutils.try_import('eventlet.queue')
|
||||
greenthreading = importutils.try_import('eventlet.green.threading')
|
||||
try:
|
||||
from eventlet import greenpool
|
||||
from eventlet import patcher as greenpatcher
|
||||
from eventlet import queue as greenqueue
|
||||
|
||||
from eventlet.green import threading as greenthreading
|
||||
except ImportError:
|
||||
greenpatcher, greenpool, greenqueue, greenthreading = (None, None,
|
||||
None, None)
|
||||
|
||||
from futurist import _utils
|
||||
|
||||
@ -54,13 +56,16 @@ class _Gatherer(object):
|
||||
with self._stats_lock:
|
||||
self._stats = ExecutorStatistics()
|
||||
|
||||
def _capture_stats(self, watch, fut):
|
||||
def _capture_stats(self, started_at, fut):
|
||||
"""Capture statistics
|
||||
|
||||
:param watch: stopwatch object
|
||||
:param started_at: when the activity the future has performed
|
||||
was started at
|
||||
:param fut: future object
|
||||
"""
|
||||
watch.stop()
|
||||
# If time somehow goes backwards, make sure we cap it at 0.0 instead
|
||||
# of having negative elapsed time...
|
||||
elapsed = max(0.0, _utils.now() - started_at)
|
||||
with self._stats_lock:
|
||||
# Use a new collection and lock so that all mutations are seen as
|
||||
# atomic and not overlapping and corrupting with other
|
||||
@ -78,7 +83,7 @@ class _Gatherer(object):
|
||||
executed += 1
|
||||
if fut.exception() is not None:
|
||||
failures += 1
|
||||
runtime += watch.elapsed()
|
||||
runtime += elapsed
|
||||
self._stats = ExecutorStatistics(failures=failures,
|
||||
executed=executed,
|
||||
runtime=runtime,
|
||||
@ -86,13 +91,13 @@ class _Gatherer(object):
|
||||
|
||||
def submit(self, fn, *args, **kwargs):
|
||||
"""Submit work to be executed and capture statistics."""
|
||||
watch = timeutils.StopWatch()
|
||||
if self._start_before_submit:
|
||||
watch.start()
|
||||
started_at = _utils.now()
|
||||
fut = self._submit_func(fn, *args, **kwargs)
|
||||
if not self._start_before_submit:
|
||||
watch.start()
|
||||
fut.add_done_callback(functools.partial(self._capture_stats, watch))
|
||||
started_at = _utils.now()
|
||||
fut.add_done_callback(functools.partial(self._capture_stats,
|
||||
started_at))
|
||||
return fut
|
||||
|
||||
|
||||
@ -421,7 +426,7 @@ class ExecutorStatistics(object):
|
||||
return self._runtime / self._executed
|
||||
|
||||
def __repr__(self):
|
||||
r = reflection.get_class_name(self, fully_qualified=False)
|
||||
r = self.__class__.__name__
|
||||
r += "("
|
||||
r += self.__repr_format % ({
|
||||
'failures': self._failures,
|
||||
|
@ -16,12 +16,17 @@
|
||||
|
||||
import multiprocessing
|
||||
|
||||
from oslo_utils import importutils
|
||||
|
||||
_eventlet = importutils.try_import('eventlet')
|
||||
try:
|
||||
from time import monotonic as now # noqa
|
||||
except ImportError:
|
||||
from time import time as now # noqa
|
||||
|
||||
|
||||
EVENTLET_AVAILABLE = bool(_eventlet)
|
||||
try:
|
||||
import eventlet as _eventlet # noqa
|
||||
EVENTLET_AVAILABLE = True
|
||||
except ImportError:
|
||||
EVENTLET_AVAILABLE = False
|
||||
|
||||
|
||||
def get_optimal_thread_count(default=2):
|
||||
|
@ -3,7 +3,5 @@
|
||||
# process, which may cause wedges in the gate later.
|
||||
|
||||
pbr>=0.11,<2.0
|
||||
Babel>=1.3
|
||||
oslo.utils>=1.4.0 # Apache-2.0
|
||||
six>=1.9.0
|
||||
futures>=3.0
|
||||
|
Loading…
Reference in New Issue
Block a user