diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index e7047c60..3504f084 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -20,9 +20,12 @@ import weakref from taskflow.engines.action_engine import compiler as co from taskflow.engines.action_engine import deciders from taskflow.engines.action_engine import traversal +from taskflow import logging from taskflow import states as st from taskflow.utils import iter_utils +LOG = logging.getLogger(__name__) + class Analyzer(object): """Analyzes a compilation and aids in execution processes. @@ -114,7 +117,7 @@ class Analyzer(object): def _get_maybe_ready(self, atom, transition_to, allowed_intentions, connected_fetcher, ready_checker, - decider_fetcher): + decider_fetcher, for_what="?"): def iter_connected_states(): # Lazily iterate over connected states so that ready checkers # can stop early (vs having to consume and check all the @@ -146,9 +149,15 @@ class Analyzer(object): ok_to_transition = self._runtime.check_atom_transition(atom, state, transition_to) if not ok_to_transition: + LOG.trace("Atom '%s' is not ready to %s since it can not" + " transition to %s from its current state %s", + atom, for_what, transition_to, state) return (False, None) intention = self._storage.get_atom_intention(atom.name) if intention not in allowed_intentions: + LOG.trace("Atom '%s' is not ready to %s since its current" + " intention %s is not in allowed intentions %s", + atom, for_what, intention, allowed_intentions) return (False, None) ok_to_run = ready_checker(iter_connected_states()) if not ok_to_run: @@ -159,11 +168,16 @@ class Analyzer(object): def _get_maybe_ready_for_execute(self, atom): """Returns if an atom is *likely* ready to be executed.""" def ready_checker(pred_connected_it): - for _atom, (atom_state, atom_intention) in pred_connected_it: - if (atom_state in (st.SUCCESS, st.IGNORE) and - atom_intention in (st.EXECUTE, st.IGNORE)): + for pred in pred_connected_it: + pred_atom, (pred_atom_state, pred_atom_intention) = pred + if (pred_atom_state in (st.SUCCESS, st.IGNORE) and + pred_atom_intention in (st.EXECUTE, st.IGNORE)): continue + LOG.trace("Unable to begin to execute since predecessor" + " atom '%s' is in state %s with intention %s", + pred_atom, pred_atom_state, pred_atom_intention) return False + LOG.trace("Able to let '%s' execute", atom) return True decider_fetcher = lambda: \ deciders.IgnoreDecider( @@ -178,16 +192,22 @@ class Analyzer(object): # If this atoms current state is able to be transitioned to RUNNING # and its intention is to EXECUTE and all of its predecessors executed # successfully or were ignored then this atom is ready to execute. + LOG.trace("Checking if '%s' is ready to execute", atom) return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE], connected_fetcher, ready_checker, - decider_fetcher) + decider_fetcher, for_what='execute') def _get_maybe_ready_for_revert(self, atom): """Returns if an atom is *likely* ready to be reverted.""" def ready_checker(succ_connected_it): - for _atom, (atom_state, _atom_intention) in succ_connected_it: - if atom_state not in (st.PENDING, st.REVERTED, st.IGNORE): + for succ in succ_connected_it: + succ_atom, (succ_atom_state, _succ_atom_intention) = succ + if succ_atom_state not in (st.PENDING, st.REVERTED, st.IGNORE): + LOG.trace("Unable to begin to revert since successor" + " atom '%s' is in state %s", succ_atom, + succ_atom_state) return False + LOG.trace("Able to let '%s' revert", atom) return True noop_decider = deciders.NoOpDecider() connected_fetcher = lambda: \ @@ -202,9 +222,10 @@ class Analyzer(object): # and its intention is either REVERT or RETRY and all of its # successors are either PENDING or REVERTED then this atom is ready # to revert. + LOG.trace("Checking if '%s' is ready to revert", atom) return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY], connected_fetcher, ready_checker, - decider_fetcher) + decider_fetcher, for_what='revert') def iterate_retries(self, state=None): """Iterates retry atoms that match the provided state. diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 02b63f76..2c964eec 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -140,6 +140,8 @@ class Completer(object): atom_state, _atom_intention = atom_states[atom.name] if atom_state in (st.RUNNING, st.REVERTING): unfinished_atoms.add(atom) + LOG.trace("Resuming atom '%s' since it was left in" + " state %s", atom, atom_state) return unfinished_atoms def complete(self, node, outcome, result):