diff --git a/taskflow/tests/unit/test_green_executor.py b/taskflow/tests/unit/test_green_executor.py index ab6fb25e..0287e8c5 100644 --- a/taskflow/tests/unit/test_green_executor.py +++ b/taskflow/tests/unit/test_green_executor.py @@ -18,9 +18,10 @@ import collections import functools - import testtools +from concurrent import futures + from taskflow import test from taskflow.utils import eventlet_utils as eu @@ -80,32 +81,64 @@ class GreenExecutorTest(test.TestCase): create_am = 50 with eu.GreenExecutor(2) as e: - futures = [] + fs = [] for i in range(0, create_am): - futures.append(e.submit(functools.partial(return_given, i))) + fs.append(e.submit(functools.partial(return_given, i))) - self.assertEqual(create_am, len(futures)) + self.assertEqual(create_am, len(fs)) for i in range(0, create_am): - result = futures[i].result() + result = fs[i].result() self.assertEqual(i, result) def test_func_cancellation(self): called = collections.defaultdict(int) - futures = [] + fs = [] with eu.GreenExecutor(2) as e: for func in self.make_funcs(called, 2): - futures.append(e.submit(func)) + fs.append(e.submit(func)) # Greenthreads don't start executing until we wait for them # to, since nothing here does IO, this will work out correctly. # # If something here did a blocking call, then eventlet could swap # one of the executors threads in, but nothing in this test does. - for f in futures: + for f in fs: self.assertFalse(f.running()) f.cancel() self.assertEqual(0, len(called)) - for f in futures: + for f in fs: self.assertTrue(f.cancelled()) self.assertTrue(f.done()) + + +class WaitForAnyTestCase(test.TestCase): + + @testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') + def test_green_waits_and_finishes(self): + def foo(): + pass + + e = eu.GreenExecutor() + + f1 = e.submit(foo) + f2 = e.submit(foo) + # this test assumes that our foo will end within 10 seconds + done, not_done = eu.wait_for_any([f1, f2], 10) + self.assertIn(len(done), (1, 2)) + self.assertTrue(any((f1 in done, f2 in done))) + + def test_threaded_waits_and_finishes(self): + def foo(): + pass + + e = futures.ThreadPoolExecutor(2) + try: + f1 = e.submit(foo) + f2 = e.submit(foo) + # this test assumes that our foo will end within 10 seconds + done, not_done = eu.wait_for_any([f1, f2], 10) + self.assertIn(len(done), (1, 2)) + self.assertTrue(any((f1 in done, f2 in done))) + finally: + e.shutdown() diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/utils/eventlet_utils.py index 3d81d105..a633122d 100644 --- a/taskflow/utils/eventlet_utils.py +++ b/taskflow/utils/eventlet_utils.py @@ -19,6 +19,8 @@ import logging import threading +from concurrent import futures + try: from eventlet.green import threading as gthreading from eventlet import greenpool @@ -28,7 +30,6 @@ try: except ImportError: EVENTLET_AVAILABLE = False -from concurrent import futures from taskflow.utils import lock_utils @@ -129,3 +130,56 @@ class GreenExecutor(futures.Executor): self._work_queue.put(_TOMBSTONE) if wait: self._pool.waitall() + + +class _FirstCompletedWaiter(object): + """Provides the event that wait_for_any() block on.""" + def __init__(self, is_green): + if is_green: + assert EVENTLET_AVAILABLE, 'eventlet is needed to use this feature' + self.event = gthreading.Event() + else: + self.event = threading.Event() + self.finished_futures = [] + + def add_result(self, future): + self.finished_futures.append(future) + self.event.set() + + def add_exception(self, future): + self.finished_futures.append(future) + self.event.set() + + def add_cancelled(self, future): + self.finished_futures.append(future) + self.event.set() + + +def _done_futures(fs): + return set(f for f in fs + if f._state in [futures._base.CANCELLED_AND_NOTIFIED, + futures._base.FINISHED]) + + +def wait_for_any(fs, timeout=None): + """Wait for one of the futures to complete. + + Works correctly with both green and non-green futures. + Returns pair (done, not_done). + """ + with futures._base._AcquireFutures(fs): + done = _done_futures(fs) + if done: + return done, set(fs) - done + is_green = any(isinstance(f, _GreenFuture) for f in fs) + waiter = _FirstCompletedWaiter(is_green) + for f in fs: + f._waiters.append(waiter) + + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + + with futures._base._AcquireFutures(fs): + done = _done_futures(fs) + return done, set(fs) - done