diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 7fe80a92..5a79a427 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -59,20 +59,18 @@ class FutureGraphAction(object): return st.SUSPENDED if was_suspended else st.REVERTED def _run(self, running, schedule_node, complete_node, get_next_nodes): - not_done = [] - def schedule(nodes): + def schedule(nodes, not_done): for node in nodes: future = schedule_node(node) if future is not None: not_done.append(future) else: - schedule(get_next_nodes(node)) - - schedule(get_next_nodes()) + schedule(get_next_nodes(node), not_done) failures = [] - + not_done = [] + schedule(get_next_nodes(), not_done) was_suspended = False while not_done: # NOTE(imelnikov): if timeout occurs before any of futures @@ -93,7 +91,7 @@ class FutureGraphAction(object): if next_nodes: if running() and not failures: - schedule(next_nodes) + schedule(next_nodes, not_done) else: # NOTE(imelnikov): engine stopped while there were # still some tasks to do, so we either failed