Rename engine analyzer to be named selector

This moves out the engine next to run (or revert)
selection process to a single class that just does
this and moves out the common functions the analyzer
class provided to the runtime object (which all
components can access).

This makes it easier to adjust the selection algorithm
in different ways.

Change-Id: I091c69297a7bff60729791d3ca6c3fae14d6eea5
This commit is contained in:
Joshua Harlow 2016-06-10 17:35:01 -07:00 committed by ChangBo Guo(gcb)
parent dc15edc842
commit 774d598134
6 changed files with 70 additions and 70 deletions

View File

@ -284,7 +284,7 @@ analyzing the current state of the task; which is determined by looking at the
state in the task detail object for that task and analyzing edges of the graph
for things like retry atom which can influence what a tasks intention should be
(this is aided by the usage of the
:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
:py:class:`~taskflow.engines.action_engine.selector.Selector` helper
object which was designed to provide helper methods for this analysis). Once
these intentions are determined and associated with each task (the intention is
also stored in the :py:class:`~taskflow.persistence.models.AtomDetail` object)
@ -299,7 +299,7 @@ This stage selects which atoms are eligible to run by using a
:py:class:`~taskflow.engines.action_engine.scheduler.Scheduler` implementation
(the default implementation looks at their intention, checking if predecessor
atoms have ran and so-on, using a
:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
:py:class:`~taskflow.engines.action_engine.selector.Selector` helper
object as needed) and submits those atoms to a previously provided compatible
`executor`_ for asynchronous execution. This
:py:class:`~taskflow.engines.action_engine.scheduler.Scheduler` will return a
@ -444,7 +444,6 @@ Components
other locations **without** notice (and without the typical deprecation
cycle).
.. automodule:: taskflow.engines.action_engine.analyzer
.. automodule:: taskflow.engines.action_engine.builder
.. automodule:: taskflow.engines.action_engine.compiler
.. automodule:: taskflow.engines.action_engine.completer
@ -453,6 +452,7 @@ Components
.. automodule:: taskflow.engines.action_engine.process_executor
.. automodule:: taskflow.engines.action_engine.runtime
.. automodule:: taskflow.engines.action_engine.scheduler
.. automodule:: taskflow.engines.action_engine.selector
.. autoclass:: taskflow.engines.action_engine.scopes.ScopeWalker
:special-members: __iter__
.. automodule:: taskflow.engines.action_engine.traversal

View File

@ -112,7 +112,7 @@ class MachineBuilder(object):
def __init__(self, runtime, waiter):
self._runtime = weakref.proxy(runtime)
self._analyzer = runtime.analyzer
self._selector = runtime.selector
self._completer = runtime.completer
self._scheduler = runtime.scheduler
self._storage = runtime.storage
@ -150,7 +150,7 @@ class MachineBuilder(object):
def iter_next_atoms(atom=None, apply_deciders=True):
# Yields and filters and tweaks the next atoms to run...
maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom)
maybe_atoms_it = self._selector.iter_next_atoms(atom=atom)
for atom, late_decider in maybe_atoms_it:
if apply_deciders:
proceed = late_decider.check_and_affect(self._runtime)
@ -188,7 +188,7 @@ class MachineBuilder(object):
" since (at least) %s atoms have been left in an"
" unfinished state", leftover_atoms)
return SUSPENDED
elif self._analyzer.is_success():
elif self._runtime.is_success():
return SUCCESS
else:
return REVERTED

View File

@ -76,11 +76,10 @@ class RevertAll(Strategy):
def __init__(self, runtime):
super(RevertAll, self).__init__(runtime)
self._analyzer = runtime.analyzer
def apply(self):
return self._runtime.reset_atoms(
self._analyzer.iterate_nodes(co.ATOMS),
self._runtime.iterate_nodes(co.ATOMS),
state=None, intention=st.REVERT)
@ -106,7 +105,6 @@ class Completer(object):
def __init__(self, runtime):
self._runtime = weakref.proxy(runtime)
self._analyzer = runtime.analyzer
self._storage = runtime.storage
self._undefined_resolver = RevertAll(self._runtime)
self._defer_reverts = strutils.bool_from_string(
@ -125,7 +123,7 @@ class Completer(object):
atoms that were previously not finished (due to a RUNNING or REVERTING
attempt not previously finishing).
"""
atoms = list(self._analyzer.iterate_nodes(co.ATOMS))
atoms = list(self._runtime.iterate_nodes(co.ATOMS))
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
if self._resolve:
@ -134,7 +132,7 @@ class Completer(object):
if atom_state == st.FAILURE:
self._process_atom_failure(
atom, self._storage.get(atom.name))
for retry in self._analyzer.iterate_retries(st.RETRYING):
for retry in self._runtime.iterate_retries(st.RETRYING):
retry_affected_atoms_it = self._runtime.retry_subflow(retry)
for atom, state, intention in retry_affected_atoms_it:
if state:
@ -173,7 +171,7 @@ class Completer(object):
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""
retry = self._analyzer.find_retry(atom)
retry = self._runtime.find_retry(atom)
if retry is not None:
# Ask retry controller what to do in case of failure.
handler = self._runtime.fetch_action(retry)

View File

@ -375,8 +375,8 @@ class ActionEngine(base.Engine):
def _ensure_storage(self):
"""Ensure all contained atoms exist in the storage unit."""
self.storage.ensure_atoms(
self._runtime.analyzer.iterate_nodes(compiler.ATOMS))
for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
self._runtime.iterate_nodes(compiler.ATOMS))
for atom in self._runtime.iterate_nodes(compiler.ATOMS):
if atom.inject:
self.storage.inject_atom_args(atom.name, atom.inject,
transient=self._inject_transient)
@ -402,7 +402,7 @@ class ActionEngine(base.Engine):
last_cause = None
last_node = None
missing_nodes = 0
for atom in self._runtime.analyzer.iterate_nodes(compiler.ATOMS):
for atom in self._runtime.iterate_nodes(compiler.ATOMS):
exec_missing = self.storage.fetch_unsatisfied_args(
atom.name, atom.rebind, optional_args=atom.optional)
revert_missing = self.storage.fetch_unsatisfied_args(

View File

@ -22,12 +22,12 @@ from futurist import waiters
from taskflow import deciders as de
from taskflow.engines.action_engine.actions import retry as ra
from taskflow.engines.action_engine.actions import task as ta
from taskflow.engines.action_engine import analyzer as an
from taskflow.engines.action_engine import builder as bu
from taskflow.engines.action_engine import compiler as com
from taskflow.engines.action_engine import completer as co
from taskflow.engines.action_engine import scheduler as sched
from taskflow.engines.action_engine import scopes as sc
from taskflow.engines.action_engine import selector as se
from taskflow.engines.action_engine import traversal as tr
from taskflow import exceptions as exc
from taskflow import logging
@ -163,8 +163,8 @@ class Runtime(object):
return self._options
@misc.cachedproperty
def analyzer(self):
return an.Analyzer(self)
def selector(self):
return se.Selector(self)
@misc.cachedproperty
def builder(self):
@ -245,6 +245,48 @@ class Runtime(object):
# Various helper methods used by the runtime components; not for public
# consumption...
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
If no state is provided it will yield back all retry atoms.
"""
if state:
atoms = list(self.iterate_nodes((com.RETRY,)))
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == state:
yield atom
else:
for atom in self.iterate_nodes((com.RETRY,)):
yield atom
def iterate_nodes(self, allowed_kinds):
"""Yields back all nodes of specified kinds in the execution graph."""
graph = self._compilation.execution_graph
for node, node_data in graph.nodes_iter(data=True):
if node_data['kind'] in allowed_kinds:
yield node
def is_success(self):
"""Checks if all atoms in the execution graph are in 'happy' state."""
atoms = list(self.iterate_nodes(com.ATOMS))
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == st.IGNORE:
continue
if atom_state != st.SUCCESS:
return False
return True
def find_retry(self, node):
"""Returns the retry atom associated to the given node (or none)."""
graph = self._compilation.execution_graph
return graph.node[node].get(com.RETRY)
def reset_atoms(self, atoms, state=st.PENDING, intention=st.EXECUTE):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
@ -261,7 +303,7 @@ class Runtime(object):
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
"""Resets all atoms to the given state and intention."""
return self.reset_atoms(self.analyzer.iterate_nodes(com.ATOMS),
return self.reset_atoms(self.iterate_nodes(com.ATOMS),
state=state, intention=intention)
def reset_subgraph(self, atom, state=st.PENDING, intention=st.EXECUTE):

View File

@ -27,8 +27,8 @@ from taskflow.utils import iter_utils
LOG = logging.getLogger(__name__)
class Analyzer(object):
"""Analyzes a compilation and aids in execution processes.
class Selector(object):
"""Selector that uses a compilation and aids in execution processes.
Its primary purpose is to get the next atoms for execution or reversion
by utilizing the compilations underlying structures (graphs, nodes and
@ -45,8 +45,8 @@ class Analyzer(object):
def iter_next_atoms(self, atom=None):
"""Iterate next atoms to run (originating from atom or all atoms)."""
if atom is None:
return iter_utils.unique_seen((self.browse_atoms_for_execute(),
self.browse_atoms_for_revert()),
return iter_utils.unique_seen((self._browse_atoms_for_execute(),
self._browse_atoms_for_revert()),
seen_selector=operator.itemgetter(0))
state = self._storage.get_atom_state(atom.name)
intention = self._storage.get_atom_intention(atom.name)
@ -56,17 +56,17 @@ class Analyzer(object):
(atom, deciders.NoOpDecider()),
])
elif intention == st.EXECUTE:
return self.browse_atoms_for_execute(atom=atom)
return self._browse_atoms_for_execute(atom=atom)
else:
return iter([])
elif state == st.REVERTED:
return self.browse_atoms_for_revert(atom=atom)
return self._browse_atoms_for_revert(atom=atom)
elif state == st.FAILURE:
return self.browse_atoms_for_revert()
return self._browse_atoms_for_revert()
else:
return iter([])
def browse_atoms_for_execute(self, atom=None):
def _browse_atoms_for_execute(self, atom=None):
"""Browse next atoms to execute.
This returns a iterator of atoms that *may* be ready to be
@ -74,7 +74,7 @@ class Analyzer(object):
of that atom, otherwise it will examine the whole graph.
"""
if atom is None:
atom_it = self.iterate_nodes(co.ATOMS)
atom_it = self._runtime.iterate_nodes(co.ATOMS)
else:
# NOTE(harlowja): the reason this uses breadth first is so that
# when deciders are applied that those deciders can be applied
@ -90,7 +90,7 @@ class Analyzer(object):
if is_ready:
yield (atom, late_decider)
def browse_atoms_for_revert(self, atom=None):
def _browse_atoms_for_revert(self, atom=None):
"""Browse next atoms to revert.
This returns a iterator of atoms that *may* be ready to be be
@ -99,7 +99,7 @@ class Analyzer(object):
graph.
"""
if atom is None:
atom_it = self.iterate_nodes(co.ATOMS)
atom_it = self._runtime.iterate_nodes(co.ATOMS)
else:
atom_it = traversal.breadth_first_iterate(
self._execution_graph, atom, traversal.Direction.BACKWARD,
@ -226,43 +226,3 @@ class Analyzer(object):
return self._get_maybe_ready(atom, st.REVERTING, [st.REVERT, st.RETRY],
connected_fetcher, ready_checker,
decider_fetcher, for_what='revert')
def iterate_retries(self, state=None):
"""Iterates retry atoms that match the provided state.
If no state is provided it will yield back all retry atoms.
"""
if state:
atoms = list(self.iterate_nodes((co.RETRY,)))
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == state:
yield atom
else:
for atom in self.iterate_nodes((co.RETRY,)):
yield atom
def iterate_nodes(self, allowed_kinds):
"""Yields back all nodes of specified kinds in the execution graph."""
for node, node_data in self._execution_graph.nodes_iter(data=True):
if node_data['kind'] in allowed_kinds:
yield node
def find_retry(self, node):
"""Returns the retry atom associated to the given node (or none)."""
return self._execution_graph.node[node].get(co.RETRY)
def is_success(self):
"""Checks if all atoms in the execution graph are in 'happy' state."""
atoms = list(self.iterate_nodes(co.ATOMS))
atom_states = self._storage.get_atoms_states(atom.name
for atom in atoms)
for atom in atoms:
atom_state, _atom_intention = atom_states[atom.name]
if atom_state == st.IGNORE:
continue
if atom_state != st.SUCCESS:
return False
return True