diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 0a21a0b2..914b0d22 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -41,28 +41,31 @@ class FutureGraphAction(object): def is_running(self): return self._storage.get_flow_state() == st.RUNNING + def _schedule(self, nodes): + """Schedule nodes for execution. + + Returns list of futures. + """ + futures = [] + for node in nodes: + if isinstance(node, task.BaseTask): + future = self._schedule_task(node) + elif isinstance(node, r.Retry): + future = self._schedule_retry(node) + else: + raise TypeError("Unknown how to schedule node %s" % node) + if future is not None: + futures.append(future) + else: + next_nodes = self._analyzer.get_next_nodes(node) + futures.extend(self._schedule(next_nodes)) + return futures + def execute(self): - - def schedule(nodes, not_done): - for node in nodes: - # Returns schedule function for current atom and - # executes scheduling - if isinstance(node, task.BaseTask): - future = self._schedule_task(node) - elif isinstance(node, r.Retry): - future = self._schedule_retry(node) - else: - raise TypeError("Unknown how to schedule node %s" % node) - if future is not None: - not_done.append(future) - else: - schedule(self._analyzer.get_next_nodes(node), not_done) - - not_done = [] # Prepare flow to be resumed next_nodes = self._prepare_flow_for_resume() next_nodes.update(self._analyzer.get_next_nodes()) - schedule(next_nodes, not_done) + not_done = self._schedule(next_nodes) failures = [] while not_done: @@ -85,7 +88,7 @@ class FutureGraphAction(object): next_nodes.update(self._analyzer.get_next_nodes(node)) if next_nodes and not failures and self.is_running(): - schedule(next_nodes, not_done) + not_done.extend(self._schedule(next_nodes)) if failures: misc.Failure.reraise_if_any(failures)