diff --git a/doc/source/engines.rst b/doc/source/engines.rst index bfabe0687..aef0c0dae 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -284,7 +284,7 @@ analyzing the current state of the task; which is determined by looking at the state in the task detail object for that task and analyzing edges of the graph for things like retry atom which can influence what a tasks intention should be (this is aided by the usage of the -:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper +:py:class:`~taskflow.engines.action_engine.selector.Selector` helper object which was designed to provide helper methods for this analysis). Once these intentions are determined and associated with each task (the intention is also stored in the :py:class:`~taskflow.persistence.models.AtomDetail` object) @@ -299,7 +299,7 @@ This stage selects which atoms are eligible to run by using a :py:class:`~taskflow.engines.action_engine.scheduler.Scheduler` implementation (the default implementation looks at their intention, checking if predecessor atoms have ran and so-on, using a -:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper +:py:class:`~taskflow.engines.action_engine.selector.Selector` helper object as needed) and submits those atoms to a previously provided compatible `executor`_ for asynchronous execution. This :py:class:`~taskflow.engines.action_engine.scheduler.Scheduler` will return a @@ -444,7 +444,6 @@ Components other locations **without** notice (and without the typical deprecation cycle). -.. automodule:: taskflow.engines.action_engine.analyzer .. automodule:: taskflow.engines.action_engine.builder .. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.completer @@ -453,6 +452,7 @@ Components .. automodule:: taskflow.engines.action_engine.process_executor .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.action_engine.scheduler +.. automodule:: taskflow.engines.action_engine.selector .. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker :special-members: __iter__ .. automodule:: taskflow.engines.action_engine.traversal diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index a5746af28..066630719 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -112,7 +112,7 @@ class MachineBuilder(object): def __init__(self, runtime, waiter): self._runtime = weakref.proxy(runtime) - self._analyzer = runtime.analyzer + self._selector = runtime.selector self._completer = runtime.completer self._scheduler = runtime.scheduler self._storage = runtime.storage @@ -150,7 +150,7 @@ class MachineBuilder(object): def iter_next_atoms(atom=None, apply_deciders=True): # Yields and filters and tweaks the next atoms to run... - maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom) + maybe_atoms_it = self._selector.iter_next_atoms(atom=atom) for atom, late_decider in maybe_atoms_it: if apply_deciders: proceed = late_decider.check_and_affect(self._runtime) @@ -188,7 +188,7 @@ class MachineBuilder(object): " since (at least) %s atoms have been left in an" " unfinished state", leftover_atoms) return SUSPENDED - elif self._analyzer.is_success(): + elif self._runtime.is_success(): return SUCCESS else: return REVERTED diff --git a/taskflow/engines/action_engine/completer.py b/taskflow/engines/action_engine/completer.py index 3d376640b..59a2dbf32 100644 --- a/taskflow/engines/action_engine/completer.py +++ b/taskflow/engines/action_engine/completer.py @@ -76,11 +76,10 @@ class RevertAll(Strategy): def __init__(self, runtime): super(RevertAll, self).__init__(runtime) - self._analyzer = runtime.analyzer def apply(self): return self._runtime.reset_atoms( - self._analyzer.iterate_nodes(co.ATOMS), + self._runtime.iterate_nodes(co.ATOMS), state=None, intention=st.REVERT) @@ -106,7 +105,6 @@ class Completer(object): def __init__(self, runtime): self._runtime = weakref.proxy(runtime) - self._analyzer = runtime.analyzer self._storage = runtime.storage self._undefined_resolver = RevertAll(self._runtime) self._defer_reverts = strutils.bool_from_string( @@ -125,7 +123,7 @@ class Completer(object): atoms that were previously not finished (due to a RUNNING or REVERTING attempt not previously finishing). """ - atoms = list(self._analyzer.iterate_nodes(co.ATOMS)) + atoms = list(self._runtime.iterate_nodes(co.ATOMS)) atom_states = self._storage.get_atoms_states(atom.name for atom in atoms) if self._resolve: @@ -134,7 +132,7 @@ class Completer(object): if atom_state == st.FAILURE: self._process_atom_failure( atom, self._storage.get(atom.name)) - for retry in self._analyzer.iterate_retries(st.RETRYING): + for retry in self._runtime.iterate_retries(st.RETRYING): retry_affected_atoms_it = self._runtime.retry_subflow(retry) for atom, state, intention in retry_affected_atoms_it: if state: @@ -173,7 +171,7 @@ class Completer(object): def _determine_resolution(self, atom, failure): """Determines which resolution strategy to activate/apply.""" - retry = self._analyzer.find_retry(atom) + retry = self._runtime.find_retry(atom) if retry is not None: # Ask retry controller what to do in case of failure. handler = self._runtime.fetch_action(retry) diff --git a/taskflow/engines/action_engine/engine.py b/taskflow/engines/action_engine/engine.py index 8b0b99fdb..a1d1fc0d6 100644 --- a/taskflow/engines/action_engine/engine.py +++ b/taskflow/engines/action_engine/engine.py @@ -375,8 +375,8 @@ class ActionEngine(base.Engine): def _ensure_storage(self): """Ensure all contained atoms exist in the storage unit.""" self.storage.ensure_atoms( - self._runtime.analyzer.iterate_nodes(compiler.ATOMS)) - for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS): + self._runtime.iterate_nodes(compiler.ATOMS)) + for atom in self._runtime.iterate_nodes(compiler.ATOMS): if atom.inject: self.storage.inject_atom_args(atom.name, atom.inject, transient=self._inject_transient) @@ -402,7 +402,7 @@ class ActionEngine(base.Engine): last_cause = None last_node = None missing_nodes = 0 - for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS): + for atom in self._runtime.iterate_nodes(compiler.ATOMS): exec_missing = self.storage.fetch_unsatisfied_args( atom.name, atom.rebind, optional_args=atom.optional) revert_missing = self.storage.fetch_unsatisfied_args( diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 3d5a20721..32ce052a3 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -22,12 +22,12 @@ from futurist import waiters from taskflow import deciders as de from taskflow.engines.action_engine.actions import retry as ra from taskflow.engines.action_engine.actions import task as ta -from taskflow.engines.action_engine import analyzer as an from taskflow.engines.action_engine import builder as bu from taskflow.engines.action_engine import compiler as com from taskflow.engines.action_engine import completer as co from taskflow.engines.action_engine import scheduler as sched from taskflow.engines.action_engine import scopes as sc +from taskflow.engines.action_engine import selector as se from taskflow.engines.action_engine import traversal as tr from taskflow import exceptions as exc from taskflow import logging @@ -163,8 +163,8 @@ class Runtime(object): return self._options @misc.cachedproperty - def analyzer(self): - return an.Analyzer(self) + def selector(self): + return se.Selector(self) @misc.cachedproperty def builder(self): @@ -245,6 +245,48 @@ class Runtime(object): # Various helper methods used by the runtime components; not for public # consumption... + def iterate_retries(self, state=None): + """Iterates retry atoms that match the provided state. + + If no state is provided it will yield back all retry atoms. + """ + if state: + atoms = list(self.iterate_nodes((com.RETRY,))) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == state: + yield atom + else: + for atom in self.iterate_nodes((com.RETRY,)): + yield atom + + def iterate_nodes(self, allowed_kinds): + """Yields back all nodes of specified kinds in the execution graph.""" + graph = self._compilation.execution_graph + for node, node_data in graph.nodes_iter(data=True): + if node_data['kind'] in allowed_kinds: + yield node + + def is_success(self): + """Checks if all atoms in the execution graph are in 'happy' state.""" + atoms = list(self.iterate_nodes(com.ATOMS)) + atom_states = self._storage.get_atoms_states(atom.name + for atom in atoms) + for atom in atoms: + atom_state, _atom_intention = atom_states[atom.name] + if atom_state == st.IGNORE: + continue + if atom_state != st.SUCCESS: + return False + return True + + def find_retry(self, node): + """Returns the retry atom associated to the given node (or none).""" + graph = self._compilation.execution_graph + return graph.node[node].get(com.RETRY) + def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE): """Resets all the provided atoms to the given state and intention.""" tweaked = [] @@ -261,7 +303,7 @@ class Runtime(object): def reset_all(self, state=st.PENDING, intention=st.EXECUTE): """Resets all atoms to the given state and intention.""" - return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS), + return self.reset_atoms(self.iterate_nodes(com.ATOMS), state=state, intention=intention) def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE): diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/selector.py similarity index 82% rename from taskflow/engines/action_engine/analyzer.py rename to taskflow/engines/action_engine/selector.py index 3504f0847..162e36810 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/selector.py @@ -27,8 +27,8 @@ from taskflow.utils import iter_utils LOG = logging.getLogger(__name__) -class Analyzer(object): - """Analyzes a compilation and aids in execution processes. +class Selector(object): + """Selector that uses a compilation and aids in execution processes. Its primary purpose is to get the next atoms for execution or reversion by utilizing the compilations underlying structures (graphs, nodes and @@ -45,8 +45,8 @@ class Analyzer(object): def iter_next_atoms(self, atom=None): """Iterate next atoms to run (originating from atom or all atoms).""" if atom is None: - return iter_utils.unique_seen((self.browse_atoms_for_execute(), - self.browse_atoms_for_revert()), + return iter_utils.unique_seen((self._browse_atoms_for_execute(), + self._browse_atoms_for_revert()), seen_selector=operator.itemgetter(0)) state = self._storage.get_atom_state(atom.name) intention = self._storage.get_atom_intention(atom.name) @@ -56,17 +56,17 @@ class Analyzer(object): (atom, deciders.NoOpDecider()), ]) elif intention == st.EXECUTE: - return self.browse_atoms_for_execute(atom=atom) + return self._browse_atoms_for_execute(atom=atom) else: return iter([]) elif state == st.REVERTED: - return self.browse_atoms_for_revert(atom=atom) + return self._browse_atoms_for_revert(atom=atom) elif state == st.FAILURE: - return self.browse_atoms_for_revert() + return self._browse_atoms_for_revert() else: return iter([]) - def browse_atoms_for_execute(self, atom=None): + def _browse_atoms_for_execute(self, atom=None): """Browse next atoms to execute. This returns a iterator of atoms that *may* be ready to be @@ -74,7 +74,7 @@ class Analyzer(object): of that atom, otherwise it will examine the whole graph. """ if atom is None: - atom_it = self.iterate_nodes(co.ATOMS) + atom_it = self._runtime.iterate_nodes(co.ATOMS) else: # NOTE(harlowja): the reason this uses breadth first is so that # when deciders are applied that those deciders can be applied @@ -90,7 +90,7 @@ class Analyzer(object): if is_ready: yield (atom, late_decider) - def browse_atoms_for_revert(self, atom=None): + def _browse_atoms_for_revert(self, atom=None): """Browse next atoms to revert. This returns a iterator of atoms that *may* be ready to be be @@ -99,7 +99,7 @@ class Analyzer(object): graph. """ if atom is None: - atom_it = self.iterate_nodes(co.ATOMS) + atom_it = self._runtime.iterate_nodes(co.ATOMS) else: atom_it = traversal.breadth_first_iterate( self._execution_graph, atom, traversal.Direction.BACKWARD, @@ -226,43 +226,3 @@ class Analyzer(object): return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY], connected_fetcher, ready_checker, decider_fetcher, for_what='revert') - - def iterate_retries(self, state=None): - """Iterates retry atoms that match the provided state. - - If no state is provided it will yield back all retry atoms. - """ - if state: - atoms = list(self.iterate_nodes((co.RETRY,))) - atom_states = self._storage.get_atoms_states(atom.name - for atom in atoms) - for atom in atoms: - atom_state, _atom_intention = atom_states[atom.name] - if atom_state == state: - yield atom - else: - for atom in self.iterate_nodes((co.RETRY,)): - yield atom - - def iterate_nodes(self, allowed_kinds): - """Yields back all nodes of specified kinds in the execution graph.""" - for node, node_data in self._execution_graph.nodes_iter(data=True): - if node_data['kind'] in allowed_kinds: - yield node - - def find_retry(self, node): - """Returns the retry atom associated to the given node (or none).""" - return self._execution_graph.node[node].get(co.RETRY) - - def is_success(self): - """Checks if all atoms in the execution graph are in 'happy' state.""" - atoms = list(self.iterate_nodes(co.ATOMS)) - atom_states = self._storage.get_atoms_states(atom.name - for atom in atoms) - for atom in atoms: - atom_state, _atom_intention = atom_states[atom.name] - if atom_state == st.IGNORE: - continue - if atom_state != st.SUCCESS: - return False - return True