diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 691d3b55..b649875f 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -215,19 +215,19 @@ class FutureGraphAction(object): next_nodes.add(node) return next_nodes - def reset_all(self): - self._retry_subflow(None) - - def _retry_subflow(self, retry): - if retry is not None: - self._storage.set_atom_intention(retry.name, st.EXECUTE) - nodes_iter = self._analyzer.iterate_subgraph(retry) - else: - nodes_iter = self._analyzer.iterate_all_nodes() - + def _reset_nodes(self, nodes_iter, intention=st.EXECUTE): for node in nodes_iter: if isinstance(node, task.BaseTask): self._task_action.change_state(node, st.PENDING, progress=0.0) - else: + elif isinstance(node, r.Retry): self._retry_action.change_state(node, st.PENDING) - self._storage.set_atom_intention(node.name, st.EXECUTE) + else: + raise TypeError("Unknown how to reset node %s" % node) + self._storage.set_atom_intention(node.name, intention) + + def reset_all(self): + self._reset_nodes(self._analyzer.iterate_all_nodes()) + + def _retry_subflow(self, retry): + self._storage.set_atom_intention(retry.name, st.EXECUTE) + self._reset_nodes(self._analyzer.iterate_subgraph(retry))