diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index 6f9aa669..1c0f4545 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -135,7 +135,7 @@ class Analyzer(object): if atom is None: return iter_utils.unique_seen(self.browse_atoms_for_execute(), self.browse_atoms_for_revert()) - state = self.get_state(atom) + state = self._storage.get_atom_state(atom.name) intention = self._storage.get_atom_intention(atom.name) if state == st.SUCCESS: if intention == st.REVERT: @@ -195,7 +195,7 @@ class Analyzer(object): def _get_maybe_ready(self, atom, transition_to, allowed_intentions, connected_fetcher, connected_checker, decider_fetcher): - state = self.get_state(atom) + state = self._storage.get_atom_state(atom.name) ok_to_transition = self._runtime.check_atom_transition(atom, state, transition_to) if not ok_to_transition: @@ -261,8 +261,15 @@ class Analyzer(object): If no state is provided it will yield back all retry atoms. """ - for atom in self.iterate_nodes((co.RETRY,)): - if not state or self.get_state(atom) == state: + if state: + atoms = list(self.iterate_nodes((co.RETRY,))) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + if atom_states[atom.name][0] == state: + yield atom + else: + for atom in self.iterate_nodes((co.RETRY,)): yield atom def iterate_nodes(self, allowed_kinds): @@ -277,14 +284,13 @@ class Analyzer(object): def is_success(self): """Checks if all atoms in the execution graph are in 'happy' state.""" - for atom in self.iterate_nodes(co.ATOMS): - atom_state = self.get_state(atom) + atoms = list(self.iterate_nodes(co.ATOMS)) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state = atom_states[atom.name][0] if atom_state == st.IGNORE: continue if atom_state != st.SUCCESS: return False return True - - def get_state(self, atom): - """Gets the state of a given atom (from the backend storage unit).""" - return self._storage.get_atom_state(atom.name) diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index ee988c4a..893279d6 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -139,14 +139,21 @@ class Completer(object): atoms that were previously not finished (due to a RUNNING or REVERTING attempt not previously finishing). """ - for atom in self._analyzer.iterate_nodes(co.ATOMS): - if self._analyzer.get_state(atom) == st.FAILURE: + atoms = list(self._analyzer.iterate_nodes(co.ATOMS)) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state = atom_states[atom.name][0] + if atom_state == st.FAILURE: self._process_atom_failure(atom, self._storage.get(atom.name)) for retry in self._analyzer.iterate_retries(st.RETRYING): - self._runtime.retry_subflow(retry) + for atom, state, intention in self._runtime.retry_subflow(retry): + if state: + atom_states[atom.name] = (state, intention) unfinished_atoms = set() - for atom in self._analyzer.iterate_nodes(co.ATOMS): - if self._analyzer.get_state(atom) in (st.RUNNING, st.REVERTING): + for atom in atoms: + atom_state = atom_states[atom.name][0] + if atom_state in (st.RUNNING, st.REVERTING): unfinished_atoms.add(atom) return unfinished_atoms diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 719f7563..36cc2a64 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -249,5 +249,6 @@ class Runtime(object): subgraph (its successors) to the ``PENDING`` state with an ``EXECUTE`` intention. """ - self.storage.set_atom_intention(retry.name, st.EXECUTE) - self.reset_subgraph(retry) + tweaked = self.reset_atoms([retry], state=None, intention=st.EXECUTE) + tweaked.extend(self.reset_subgraph(retry)) + return tweaked