diff --git a/taskflow/tests/unit/test_futures.py b/taskflow/tests/unit/test_futures.py index 576b5eee..437bf28a 100644 --- a/taskflow/tests/unit/test_futures.py +++ b/taskflow/tests/unit/test_futures.py @@ -67,6 +67,7 @@ class _SimpleFuturesTestMixin(object): with self._make_executor(2) as e: f = e.submit(_blowup) self.assertRaises(IOError, f.result) + self.assertEqual(1, e.statistics.failures) def test_accumulator(self): created = [] @@ -75,6 +76,7 @@ class _SimpleFuturesTestMixin(object): created.append(e.submit(_return_one)) results = [f.result() for f in created] self.assertEqual(10, sum(results)) + self.assertEqual(10, e.statistics.executed) def test_map(self): count = [i for i in range(0, 100)] @@ -119,6 +121,7 @@ class _FuturesTestMixin(_SimpleFuturesTestMixin): self.assertEqual(1, called[0]) self.assertEqual(1, called[1]) + self.assertEqual(2, e.statistics.executed) def test_result_callback(self): called = collections.defaultdict(int) @@ -143,19 +146,22 @@ class _FuturesTestMixin(_SimpleFuturesTestMixin): for i in range(0, create_am): fs.append(e.submit(functools.partial(_return_given, i))) self.assertEqual(create_am, len(fs)) + self.assertEqual(create_am, e.statistics.executed) for i in range(0, create_am): result = fs[i].result() self.assertEqual(i, result) def test_called_restricted_size(self): + create_am = 100 called = collections.defaultdict(int) with self._make_executor(1) as e: - for f in self._make_funcs(called, 100): + for f in self._make_funcs(called, create_am): e.submit(f) self.assertFalse(e.alive) - self.assertEqual(100, len(called)) + self.assertEqual(create_am, len(called)) + self.assertEqual(create_am, e.statistics.executed) class ThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): @@ -217,6 +223,7 @@ class GreenThreadPoolExecutorTest(test.TestCase, _FuturesTestMixin): self.assertEqual(0, len(called)) self.assertEqual(2, len(fs)) + self.assertEqual(2, e.statistics.cancelled) for f in fs: self.assertTrue(f.cancelled()) self.assertTrue(f.done()) diff --git a/taskflow/types/futures.py b/taskflow/types/futures.py index 194730e5..8e8e67af 100644 --- a/taskflow/types/futures.py +++ b/taskflow/types/futures.py @@ -14,9 +14,13 @@ # License for the specific language governing permissions and limitations # under the License. +import functools +import threading + from concurrent import futures as _futures from concurrent.futures import process as _process from concurrent.futures import thread as _thread +from oslo.utils import reflection try: from eventlet.green import threading as greenthreading @@ -27,6 +31,7 @@ try: except ImportError: EVENTLET_AVAILABLE = False +from taskflow.types import timing from taskflow.utils import threading_utils as tu @@ -34,9 +39,59 @@ from taskflow.utils import threading_utils as tu Future = _futures.Future +class _Gatherer(object): + def __init__(self, submit_func, + lock_cls=threading.Lock, start_before_submit=False): + self._submit_func = submit_func + self._stats_lock = lock_cls() + self._stats = ExecutorStatistics() + self._start_before_submit = start_before_submit + + @property + def statistics(self): + return self._stats + + def _capture_stats(self, watch, fut): + watch.stop() + with self._stats_lock: + # Use a new collection and lock so that all mutations are seen as + # atomic and not overlapping and corrupting with other + # mutations (the clone ensures that others reading the current + # values will not see a mutated/corrupted one). Since futures may + # be completed by different threads we need to be extra careful to + # gather this data in a way that is thread-safe... + (failures, executed, runtime, cancelled) = (self._stats.failures, + self._stats.executed, + self._stats.runtime, + self._stats.cancelled) + if fut.cancelled(): + cancelled += 1 + else: + executed += 1 + if fut.exception() is not None: + failures += 1 + runtime += watch.elapsed() + self._stats = ExecutorStatistics(failures=failures, + executed=executed, + runtime=runtime, + cancelled=cancelled) + + def submit(self, fn, *args, **kwargs): + watch = timing.StopWatch() + if self._start_before_submit: + watch.start() + fut = self._submit_func(fn, *args, **kwargs) + if not self._start_before_submit: + watch.start() + fut.add_done_callback(functools.partial(self._capture_stats, watch)) + return fut + + class ThreadPoolExecutor(_thread.ThreadPoolExecutor): """Executor that uses a thread pool to execute calls asynchronously. + It gathers statistics about the submissions executed for post-analysis... + See: https://docs.python.org/dev/library/concurrent.futures.html """ def __init__(self, max_workers=None): @@ -45,15 +100,31 @@ class ThreadPoolExecutor(_thread.ThreadPoolExecutor): super(ThreadPoolExecutor, self).__init__(max_workers=max_workers) if self._max_workers <= 0: raise ValueError("Max workers must be greater than zero") + self._gatherer = _Gatherer( + # Since our submit will use this gatherer we have to reference + # the parent submit, bound to this instance (which is what we + # really want to use anyway). + super(ThreadPoolExecutor, self).submit) + + @property + def statistics(self): + """:class:`.ExecutorStatistics` about the executors executions.""" + return self._gatherer.statistics @property def alive(self): return not self._shutdown + def submit(self, fn, *args, **kwargs): + """Submit some work to be executed (and gather statistics).""" + return self._gatherer.submit(fn, *args, **kwargs) + class ProcessPoolExecutor(_process.ProcessPoolExecutor): """Executor that uses a process pool to execute calls asynchronously. + It gathers statistics about the submissions executed for post-analysis... + See: https://docs.python.org/dev/library/concurrent.futures.html """ def __init__(self, max_workers=None): @@ -62,11 +133,25 @@ class ProcessPoolExecutor(_process.ProcessPoolExecutor): super(ProcessPoolExecutor, self).__init__(max_workers=max_workers) if self._max_workers <= 0: raise ValueError("Max workers must be greater than zero") + self._gatherer = _Gatherer( + # Since our submit will use this gatherer we have to reference + # the parent submit, bound to this instance (which is what we + # really want to use anyway). + super(ProcessPoolExecutor, self).submit) @property def alive(self): return not self._shutdown_thread + @property + def statistics(self): + """:class:`.ExecutorStatistics` about the executors executions.""" + return self._gatherer.statistics + + def submit(self, fn, *args, **kwargs): + """Submit some work to be executed (and gather statistics).""" + return self._gatherer.submit(fn, *args, **kwargs) + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): @@ -93,10 +178,14 @@ class SynchronousExecutor(_futures.Executor): will execute the calls inside the caller thread instead of executing it in a external process/thread for when this type of functionality is useful to provide... + + It gathers statistics about the submissions executed for post-analysis... """ def __init__(self): self._shutoff = False + self._gatherer = _Gatherer(self._submit, + start_before_submit=True) @property def alive(self): @@ -105,10 +194,19 @@ class SynchronousExecutor(_futures.Executor): def shutdown(self, wait=True): self._shutoff = True + @property + def statistics(self): + """:class:`.ExecutorStatistics` about the executors executions.""" + return self._gatherer.statistics + def submit(self, fn, *args, **kwargs): + """Submit some work to be executed (and gather statistics).""" if self._shutoff: raise RuntimeError('Can not schedule new futures' ' after being shutdown') + return self._gatherer.submit(fn, *args, **kwargs) + + def _submit(self, fn, *args, **kwargs): f = Future() runner = _WorkItem(f, fn, args, kwargs) runner.run() @@ -160,6 +258,8 @@ class GreenThreadPoolExecutor(_futures.Executor): See: https://docs.python.org/dev/library/concurrent.futures.html and http://eventlet.net/doc/modules/greenpool.html for information on how this works. + + It gathers statistics about the submissions executed for post-analysis... """ def __init__(self, max_workers=1000): @@ -171,21 +271,32 @@ class GreenThreadPoolExecutor(_futures.Executor): self._delayed_work = greenqueue.Queue() self._shutdown_lock = greenthreading.Lock() self._shutdown = False + self._gatherer = _Gatherer(self._submit, + lock_cls=greenthreading.Lock) @property def alive(self): return not self._shutdown + @property + def statistics(self): + """:class:`.ExecutorStatistics` about the executors executions.""" + return self._gatherer.statistics + def submit(self, fn, *args, **kwargs): + """Submit some work to be executed (and gather statistics).""" with self._shutdown_lock: if self._shutdown: raise RuntimeError('Can not schedule new futures' ' after being shutdown') - f = GreenFuture() - work = _WorkItem(f, fn, args, kwargs) - if not self._spin_up(work): - self._delayed_work.put(work) - return f + return self._gatherer.submit(fn, *args, **kwargs) + + def _submit(self, fn, *args, **kwargs): + f = GreenFuture() + work = _WorkItem(f, fn, args, kwargs) + if not self._spin_up(work): + self._delayed_work.put(work) + return f def _spin_up(self, work): alive = self._pool.running() + self._pool.waiting() @@ -204,3 +315,58 @@ class GreenThreadPoolExecutor(_futures.Executor): if wait and shutoff: self._pool.waitall() self._delayed_work.join() + + +class ExecutorStatistics(object): + """Holds *immutable* information about a executors executions.""" + + __slots__ = ['_failures', '_executed', '_runtime', '_cancelled'] + + __repr_format = ("failures=%(failures)s, executed=%(executed)s, " + "runtime=%(runtime)s, cancelled=%(cancelled)s") + + def __init__(self, failures=0, executed=0, runtime=0.0, cancelled=0): + self._failures = failures + self._executed = executed + self._runtime = runtime + self._cancelled = cancelled + + @property + def failures(self): + """How many submissions ended up raising exceptions.""" + return self._failures + + @property + def executed(self): + """How many submissions were executed (failed or not).""" + return self._executed + + @property + def runtime(self): + """Total runtime of all submissions executed.""" + return self._runtime + + @property + def cancelled(self): + """How many submissions were cancelled before executing.""" + return self._cancelled + + @property + def average_runtime(self): + """The average runtime of all submissions executed. + + :raises: ZeroDivisionError when no executions have occurred. + """ + return self._runtime / self._executed + + def __repr__(self): + r = reflection.get_class_name(self, fully_qualified=False) + r += "(" + r += self.__repr_format % ({ + 'failures': self._failures, + 'executed': self._executed, + 'runtime': self._runtime, + 'cancelled': self._cancelled, + }) + r += ")" + return r