From 7655ae02ce07f53b1611983d8988552bb4f71bd6 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Sun, 11 May 2014 09:12:55 -0700 Subject: [PATCH] Use futures wait() when possible Instead of always using our custom future wait functionality, only use that functionality if there are green futures and in other cases just use the future wait() function instead. Change-Id: I1eadcf53eb4b5f47b9543965610bfe04fec52e70 --- taskflow/engines/action_engine/executor.py | 2 +- .../engines/action_engine/graph_action.py | 8 +-- taskflow/tests/unit/test_utils_async_utils.py | 24 +++----- taskflow/utils/async_utils.py | 61 +++---------------- taskflow/utils/eventlet_utils.py | 51 +++++++++++++++- 5 files changed, 71 insertions(+), 75 deletions(-) diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 846cc568..816060f5 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -99,7 +99,7 @@ class SerialTaskExecutor(TaskExecutorBase): def wait_for_any(self, fs, timeout=None): # NOTE(imelnikov): this executor returns only done futures. - return fs, [] + return (fs, set()) class ParallelTaskExecutor(TaskExecutorBase): diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 691d3b55..77930e93 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -58,10 +58,10 @@ class FutureGraphAction(object): def _schedule(self, nodes): """Schedule a group of nodes for execution.""" - futures = [] + futures = set() for node in nodes: try: - futures.append(self._schedule_node(node)) + futures.add(self._schedule_node(node)) except Exception: # Immediately stop scheduling future work so that we can # exit execution early (rather than later) if a single task @@ -83,7 +83,7 @@ class FutureGraphAction(object): if self.is_running(): not_done, failures = self._schedule(next_nodes) else: - not_done, failures = ([], []) + not_done, failures = (set(), []) # Run! # @@ -129,7 +129,7 @@ class FutureGraphAction(object): # Recheck incase someone suspended it. if self.is_running(): more_not_done, failures = self._schedule(next_nodes) - not_done.extend(more_not_done) + not_done.update(more_not_done) if failures: misc.Failure.reraise_if_any(failures) diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py index 32944c22..8e9ab944 100644 --- a/taskflow/tests/unit/test_utils_async_utils.py +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -54,40 +54,34 @@ class WaitForAnyTestsMixin(object): self.assertIs(done.pop(), f2) -class WaiterTestsMixin(object): +@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +class AsyncUtilsEventletTest(test.TestCase, + WaitForAnyTestsMixin): + executor_cls = eu.GreenExecutor + is_green = True def test_add_result(self): - waiter = au._Waiter(self.is_green) + waiter = eu._GreenWaiter() self.assertFalse(waiter.event.is_set()) waiter.add_result(futures.Future()) self.assertTrue(waiter.event.is_set()) def test_add_exception(self): - waiter = au._Waiter(self.is_green) + waiter = eu._GreenWaiter() self.assertFalse(waiter.event.is_set()) waiter.add_exception(futures.Future()) self.assertTrue(waiter.event.is_set()) def test_add_cancelled(self): - waiter = au._Waiter(self.is_green) + waiter = eu._GreenWaiter() self.assertFalse(waiter.event.is_set()) waiter.add_cancelled(futures.Future()) self.assertTrue(waiter.event.is_set()) -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class AsyncUtilsEventletTest(test.TestCase, - WaitForAnyTestsMixin, - WaiterTestsMixin): - executor_cls = eu.GreenExecutor - is_green = True - - class AsyncUtilsThreadedTest(test.TestCase, - WaitForAnyTestsMixin, - WaiterTestsMixin): + WaitForAnyTestsMixin): executor_cls = futures.ThreadPoolExecutor - is_green = False class MakeCompletedFutureTest(test.TestCase): diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index 4805230e..0599870d 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -14,71 +14,24 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from concurrent import futures from taskflow.utils import eventlet_utils as eu -DONE_STATES = frozenset([ - futures._base.CANCELLED_AND_NOTIFIED, - futures._base.FINISHED, -]) - - -class _Waiter(object): - """Provides the event that wait_for_any() blocks on.""" - def __init__(self, is_green): - if is_green: - assert eu.EVENTLET_AVAILABLE, ('eventlet is needed to use this' - ' feature') - self.event = eu.green_threading.Event() - else: - self.event = threading.Event() - - def add_result(self, future): - self.event.set() - - def add_exception(self, future): - self.event.set() - - def add_cancelled(self, future): - self.event.set() - - -def _partition_futures(fs): - """Partitions the input futures into done and not done lists.""" - done = [] - not_done = [] - for f in fs: - if f._state in DONE_STATES: - done.append(f) - else: - not_done.append(f) - return (done, not_done) - def wait_for_any(fs, timeout=None): """Wait for one of the futures to complete. Works correctly with both green and non-green futures. + Returns pair (done, not_done). """ - with futures._base._AcquireFutures(fs): - (done, not_done) = _partition_futures(fs) - if done: - return (done, not_done) - is_green = any(isinstance(f, eu.GreenFuture) for f in fs) - waiter = _Waiter(is_green) - for f in fs: - f._waiters.append(waiter) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - with futures._base._AcquireFutures(fs): - return _partition_futures(fs) + any_green = any(isinstance(f, eu.GreenFuture) for f in fs) + if any_green: + return eu.wait_for_any(fs, timeout=timeout) + else: + return tuple(futures.wait(fs, timeout=timeout, + return_when=futures.FIRST_COMPLETED)) def make_completed_future(result): diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/utils/eventlet_utils.py index 6cafa5b5..347fba31 100644 --- a/taskflow/utils/eventlet_utils.py +++ b/taskflow/utils/eventlet_utils.py @@ -37,6 +37,11 @@ LOG = logging.getLogger(__name__) # working and rest in peace. _TOMBSTONE = object() +_DONE_STATES = frozenset([ + futures._base.CANCELLED_AND_NOTIFIED, + futures._base.FINISHED, +]) + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): @@ -82,6 +87,7 @@ class _Worker(object): class GreenFuture(futures.Future): def __init__(self): super(GreenFuture, self).__init__() + assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future' # NOTE(harlowja): replace the built-in condition with a greenthread # compatible one so that when getting the result of this future the # functions will correctly yield to eventlet. If this is not done then @@ -95,7 +101,7 @@ class GreenExecutor(futures.Executor): """A greenthread backed executor.""" def __init__(self, max_workers=1000): - assert EVENTLET_AVAILABLE, 'eventlet is needed to use GreenExecutor' + assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor' assert int(max_workers) > 0, 'Max workers must be greater than zero' self._max_workers = int(max_workers) self._pool = greenpool.GreenPool(self._max_workers) @@ -128,3 +134,46 @@ class GreenExecutor(futures.Executor): self._work_queue.put(_TOMBSTONE) if wait: self._pool.waitall() + + +class _GreenWaiter(object): + """Provides the event that wait_for_any() blocks on.""" + def __init__(self): + self.event = green_threading.Event() + + def add_result(self, future): + self.event.set() + + def add_exception(self, future): + self.event.set() + + def add_cancelled(self, future): + self.event.set() + + +def _partition_futures(fs): + """Partitions the input futures into done and not done lists.""" + done = set() + not_done = set() + for f in fs: + if f._state in _DONE_STATES: + done.add(f) + else: + not_done.add(f) + return (done, not_done) + + +def wait_for_any(fs, timeout=None): + assert EVENTLET_AVAILABLE, ('eventlet is needed to wait on green futures') + with futures._base._AcquireFutures(fs): + (done, not_done) = _partition_futures(fs) + if done: + return (done, not_done) + waiter = _GreenWaiter() + for f in fs: + f._waiters.append(waiter) + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + with futures._base._AcquireFutures(fs): + return _partition_futures(fs)