diff --git a/taskflow/engines/action_engine/executor.py b/taskflow/engines/action_engine/executor.py index 846cc568..816060f5 100644 --- a/taskflow/engines/action_engine/executor.py +++ b/taskflow/engines/action_engine/executor.py @@ -99,7 +99,7 @@ class SerialTaskExecutor(TaskExecutorBase): def wait_for_any(self, fs, timeout=None): # NOTE(imelnikov): this executor returns only done futures. - return fs, [] + return (fs, set()) class ParallelTaskExecutor(TaskExecutorBase): diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 691d3b55..77930e93 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -58,10 +58,10 @@ class FutureGraphAction(object): def _schedule(self, nodes): """Schedule a group of nodes for execution.""" - futures = [] + futures = set() for node in nodes: try: - futures.append(self._schedule_node(node)) + futures.add(self._schedule_node(node)) except Exception: # Immediately stop scheduling future work so that we can # exit execution early (rather than later) if a single task @@ -83,7 +83,7 @@ class FutureGraphAction(object): if self.is_running(): not_done, failures = self._schedule(next_nodes) else: - not_done, failures = ([], []) + not_done, failures = (set(), []) # Run! # @@ -129,7 +129,7 @@ class FutureGraphAction(object): # Recheck incase someone suspended it. if self.is_running(): more_not_done, failures = self._schedule(next_nodes) - not_done.extend(more_not_done) + not_done.update(more_not_done) if failures: misc.Failure.reraise_if_any(failures) diff --git a/taskflow/tests/unit/test_utils_async_utils.py b/taskflow/tests/unit/test_utils_async_utils.py index 32944c22..8e9ab944 100644 --- a/taskflow/tests/unit/test_utils_async_utils.py +++ b/taskflow/tests/unit/test_utils_async_utils.py @@ -54,40 +54,34 @@ class WaitForAnyTestsMixin(object): self.assertIs(done.pop(), f2) -class WaiterTestsMixin(object): +@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') +class AsyncUtilsEventletTest(test.TestCase, + WaitForAnyTestsMixin): + executor_cls = eu.GreenExecutor + is_green = True def test_add_result(self): - waiter = au._Waiter(self.is_green) + waiter = eu._GreenWaiter() self.assertFalse(waiter.event.is_set()) waiter.add_result(futures.Future()) self.assertTrue(waiter.event.is_set()) def test_add_exception(self): - waiter = au._Waiter(self.is_green) + waiter = eu._GreenWaiter() self.assertFalse(waiter.event.is_set()) waiter.add_exception(futures.Future()) self.assertTrue(waiter.event.is_set()) def test_add_cancelled(self): - waiter = au._Waiter(self.is_green) + waiter = eu._GreenWaiter() self.assertFalse(waiter.event.is_set()) waiter.add_cancelled(futures.Future()) self.assertTrue(waiter.event.is_set()) -@testtools.skipIf(not eu.EVENTLET_AVAILABLE, 'eventlet is not available') -class AsyncUtilsEventletTest(test.TestCase, - WaitForAnyTestsMixin, - WaiterTestsMixin): - executor_cls = eu.GreenExecutor - is_green = True - - class AsyncUtilsThreadedTest(test.TestCase, - WaitForAnyTestsMixin, - WaiterTestsMixin): + WaitForAnyTestsMixin): executor_cls = futures.ThreadPoolExecutor - is_green = False class MakeCompletedFutureTest(test.TestCase): diff --git a/taskflow/utils/async_utils.py b/taskflow/utils/async_utils.py index 4805230e..0599870d 100644 --- a/taskflow/utils/async_utils.py +++ b/taskflow/utils/async_utils.py @@ -14,71 +14,24 @@ # License for the specific language governing permissions and limitations # under the License. -import threading - from concurrent import futures from taskflow.utils import eventlet_utils as eu -DONE_STATES = frozenset([ - futures._base.CANCELLED_AND_NOTIFIED, - futures._base.FINISHED, -]) - - -class _Waiter(object): - """Provides the event that wait_for_any() blocks on.""" - def __init__(self, is_green): - if is_green: - assert eu.EVENTLET_AVAILABLE, ('eventlet is needed to use this' - ' feature') - self.event = eu.green_threading.Event() - else: - self.event = threading.Event() - - def add_result(self, future): - self.event.set() - - def add_exception(self, future): - self.event.set() - - def add_cancelled(self, future): - self.event.set() - - -def _partition_futures(fs): - """Partitions the input futures into done and not done lists.""" - done = [] - not_done = [] - for f in fs: - if f._state in DONE_STATES: - done.append(f) - else: - not_done.append(f) - return (done, not_done) - def wait_for_any(fs, timeout=None): """Wait for one of the futures to complete. Works correctly with both green and non-green futures. + Returns pair (done, not_done). """ - with futures._base._AcquireFutures(fs): - (done, not_done) = _partition_futures(fs) - if done: - return (done, not_done) - is_green = any(isinstance(f, eu.GreenFuture) for f in fs) - waiter = _Waiter(is_green) - for f in fs: - f._waiters.append(waiter) - - waiter.event.wait(timeout) - for f in fs: - f._waiters.remove(waiter) - - with futures._base._AcquireFutures(fs): - return _partition_futures(fs) + any_green = any(isinstance(f, eu.GreenFuture) for f in fs) + if any_green: + return eu.wait_for_any(fs, timeout=timeout) + else: + return tuple(futures.wait(fs, timeout=timeout, + return_when=futures.FIRST_COMPLETED)) def make_completed_future(result): diff --git a/taskflow/utils/eventlet_utils.py b/taskflow/utils/eventlet_utils.py index 6cafa5b5..347fba31 100644 --- a/taskflow/utils/eventlet_utils.py +++ b/taskflow/utils/eventlet_utils.py @@ -37,6 +37,11 @@ LOG = logging.getLogger(__name__) # working and rest in peace. _TOMBSTONE = object() +_DONE_STATES = frozenset([ + futures._base.CANCELLED_AND_NOTIFIED, + futures._base.FINISHED, +]) + class _WorkItem(object): def __init__(self, future, fn, args, kwargs): @@ -82,6 +87,7 @@ class _Worker(object): class GreenFuture(futures.Future): def __init__(self): super(GreenFuture, self).__init__() + assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green future' # NOTE(harlowja): replace the built-in condition with a greenthread # compatible one so that when getting the result of this future the # functions will correctly yield to eventlet. If this is not done then @@ -95,7 +101,7 @@ class GreenExecutor(futures.Executor): """A greenthread backed executor.""" def __init__(self, max_workers=1000): - assert EVENTLET_AVAILABLE, 'eventlet is needed to use GreenExecutor' + assert EVENTLET_AVAILABLE, 'eventlet is needed to use a green executor' assert int(max_workers) > 0, 'Max workers must be greater than zero' self._max_workers = int(max_workers) self._pool = greenpool.GreenPool(self._max_workers) @@ -128,3 +134,46 @@ class GreenExecutor(futures.Executor): self._work_queue.put(_TOMBSTONE) if wait: self._pool.waitall() + + +class _GreenWaiter(object): + """Provides the event that wait_for_any() blocks on.""" + def __init__(self): + self.event = green_threading.Event() + + def add_result(self, future): + self.event.set() + + def add_exception(self, future): + self.event.set() + + def add_cancelled(self, future): + self.event.set() + + +def _partition_futures(fs): + """Partitions the input futures into done and not done lists.""" + done = set() + not_done = set() + for f in fs: + if f._state in _DONE_STATES: + done.add(f) + else: + not_done.add(f) + return (done, not_done) + + +def wait_for_any(fs, timeout=None): + assert EVENTLET_AVAILABLE, ('eventlet is needed to wait on green futures') + with futures._base._AcquireFutures(fs): + (done, not_done) = _partition_futures(fs) + if done: + return (done, not_done) + waiter = _GreenWaiter() + for f in fs: + f._waiters.append(waiter) + waiter.event.wait(timeout) + for f in fs: + f._waiters.remove(waiter) + with futures._base._AcquireFutures(fs): + return _partition_futures(fs)