diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index 78d4c29f..77f7df37 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -166,54 +166,53 @@ class Analyzer(object): ready_nodes.append((node, late_decider)) return ready_nodes + def _get_maybe_ready(self, atom, transition_to, allowed_intentions, + connected_fetcher, connected_checker, + decider_fetcher): + state = self.get_state(atom) + ok_to_transition = self._runtime.check_atom_transition(atom, state, + transition_to) + if not ok_to_transition: + return (False, None) + intention = self._storage.get_atom_intention(atom.name) + if intention not in allowed_intentions: + return (False, None) + connected_states = self._storage.get_atoms_states( + connected_atom.name for connected_atom in connected_fetcher(atom)) + ok_to_run = connected_checker(six.itervalues(connected_states)) + if not ok_to_run: + return (False, None) + else: + return (True, decider_fetcher(atom)) + def _get_maybe_ready_for_execute(self, atom): """Returns if an atom is *likely* ready to be executed.""" - state = self.get_state(atom) - intention = self._storage.get_atom_intention(atom.name) - transition = self._runtime.check_atom_transition(atom, state, - st.RUNNING) - if not transition or intention != st.EXECUTE: - return (False, None) - - predecessor_names = [] - for previous_atom in self._execution_graph.predecessors(atom): - predecessor_names.append(previous_atom.name) - - predecessor_states = self._storage.get_atoms_states(predecessor_names) - predecessor_states_iter = six.itervalues(predecessor_states) - ok_to_run = all(state == st.SUCCESS and intention == st.EXECUTE - for state, intention in predecessor_states_iter) - - if not ok_to_run: - return (False, None) - else: + def decider_fetcher(atom): edge_deciders = self._runtime.fetch_edge_deciders(atom) - return (True, IgnoreDecider(atom, edge_deciders)) + if edge_deciders: + return IgnoreDecider(atom, edge_deciders) + else: + return NoOpDecider() + + connected_checker = lambda connected_iter: \ + all(state == st.SUCCESS and intention == st.EXECUTE + for state, intention in connected_iter) + connected_fetcher = self._execution_graph.predecessors_iter + return self._get_maybe_ready(atom, st.RUNNING, [st.EXECUTE], + connected_fetcher, connected_checker, + decider_fetcher) def _get_maybe_ready_for_revert(self, atom): """Returns if an atom is *likely* ready to be reverted.""" - - state = self.get_state(atom) - intention = self._storage.get_atom_intention(atom.name) - transition = self._runtime.check_atom_transition(atom, state, - st.REVERTING) - if not transition or intention not in (st.REVERT, st.RETRY): - return (False, None) - - predecessor_names = [] - for previous_atom in self._execution_graph.successors(atom): - predecessor_names.append(previous_atom.name) - - predecessor_states = self._storage.get_atoms_states(predecessor_names) - predecessor_states_iter = six.itervalues(predecessor_states) - ok_to_run = all(state in (st.PENDING, st.REVERTED) - for state, intention in predecessor_states_iter) - - if not ok_to_run: - return (False, None) - else: - return (True, NoOpDecider()) + connected_checker = lambda connected_iter: \ + all(state in (st.PENDING, st.REVERTED) + for state, _intention in connected_iter) + decider_fetcher = lambda atom: NoOpDecider() + connected_fetcher = self._execution_graph.successors_iter + return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY], + connected_fetcher, connected_checker, + decider_fetcher) def iterate_subgraph(self, atom): """Iterates a subgraph connected to given atom.""" diff --git a/taskflow/storage.py b/taskflow/storage.py index cab68f6d..f6769369 100644 --- a/taskflow/storage.py +++ b/taskflow/storage.py @@ -385,10 +385,12 @@ class Storage(object): @fasteners.read_locked def get_atoms_states(self, atom_names): - """Gets all atoms states given a set of names.""" - return dict((name, (self.get_atom_state(name), - self.get_atom_intention(name))) - for name in atom_names) + """Gets a dict of atom name => (state, intention) given atom names.""" + details = {} + for name in set(atom_names): + source, _clone = self._atomdetail_by_name(name) + details[name] = (source.state, source.intention) + return details @fasteners.write_locked def _update_atom_metadata(self, atom_name, update_with,