Merge "Rename the graph analyzer to analyzer"
This commit is contained in:
@@ -253,7 +253,7 @@ analyzing the current state of the task; which is determined by looking at the
|
||||
state in the task detail object for that task and analyzing edges of the graph
|
||||
for things like retry atom which can influence what a tasks intention should be
|
||||
(this is aided by the usage of the
|
||||
:py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper
|
||||
:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
|
||||
object which was designed to provide helper methods for this analysis). Once
|
||||
these intentions are determined and associated with each task (the intention is
|
||||
also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object)
|
||||
@@ -268,7 +268,7 @@ This stage selects which atoms are eligible to run by using a
|
||||
:py:class:`~taskflow.engines.action_engine.runtime.Scheduler` implementation
|
||||
(the default implementation looks at there intention, checking if predecessor
|
||||
atoms have ran and so-on, using a
|
||||
:py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper
|
||||
:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper
|
||||
object as needed) and submits those atoms to a previously provided compatible
|
||||
`executor`_ for asynchronous execution. This
|
||||
:py:class:`~taskflow.engines.action_engine.runtime.Scheduler` will return a
|
||||
@@ -322,9 +322,9 @@ saved for this execution.
|
||||
Interfaces
|
||||
==========
|
||||
|
||||
.. automodule:: taskflow.engines.action_engine.analyzer
|
||||
.. automodule:: taskflow.engines.action_engine.compiler
|
||||
.. automodule:: taskflow.engines.action_engine.engine
|
||||
.. automodule:: taskflow.engines.action_engine.graph_analyzer
|
||||
.. automodule:: taskflow.engines.action_engine.runner
|
||||
.. automodule:: taskflow.engines.action_engine.runtime
|
||||
.. automodule:: taskflow.engines.base
|
||||
|
||||
@@ -17,19 +17,21 @@
|
||||
from networkx.algorithms import traversal
|
||||
import six
|
||||
|
||||
from taskflow import retry as r
|
||||
from taskflow import retry as retry_atom
|
||||
from taskflow import states as st
|
||||
|
||||
|
||||
class GraphAnalyzer(object):
|
||||
"""Analyzes a execution graph to get the next nodes for execution or
|
||||
reversion by utilizing the graphs nodes and edge relations and comparing
|
||||
the node state against the states stored in storage.
|
||||
class Analyzer(object):
|
||||
"""Analyzes a compilation output to get the next atoms for execution or
|
||||
reversion by utilizing the compilations underlying structures (graphs,
|
||||
nodes and edge relations...) and using this information along with the
|
||||
atom state/states stored in storage to provide useful analysis functions
|
||||
to the rest of the runtime system.
|
||||
"""
|
||||
|
||||
def __init__(self, graph, storage):
|
||||
self._graph = graph
|
||||
def __init__(self, compilation, storage):
|
||||
self._storage = storage
|
||||
self._graph = compilation.execution_graph
|
||||
|
||||
def get_next_nodes(self, node=None):
|
||||
if node is None:
|
||||
@@ -129,7 +131,7 @@ class GraphAnalyzer(object):
|
||||
retries if state is None.
|
||||
"""
|
||||
for node in self._graph.nodes_iter():
|
||||
if isinstance(node, r.Retry):
|
||||
if isinstance(node, retry_atom.Retry):
|
||||
if not state or self.get_state(node) == state:
|
||||
yield node
|
||||
|
||||
@@ -14,7 +14,6 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import collections
|
||||
import logging
|
||||
|
||||
from taskflow import exceptions as exc
|
||||
@@ -27,10 +26,19 @@ from taskflow.utils import misc
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# The result of a compilers compile() is this tuple (for now it is just a
|
||||
# execution graph but in the future it may grow to include more attributes
|
||||
# that help the runtime units execute in a more optimal/featureful manner).
|
||||
Compilation = collections.namedtuple("Compilation", ["execution_graph"])
|
||||
class Compilation(object):
|
||||
"""The result of a compilers compile() is this *immutable* object.
|
||||
|
||||
For now it is just a execution graph but in the future it will grow to
|
||||
include more methods & properties that help the various runtime units
|
||||
execute in a more optimal & featureful manner.
|
||||
"""
|
||||
def __init__(self, execution_graph):
|
||||
self._execution_graph = execution_graph
|
||||
|
||||
@property
|
||||
def execution_graph(self):
|
||||
return self._execution_graph
|
||||
|
||||
|
||||
class PatternCompiler(object):
|
||||
|
||||
@@ -43,11 +43,10 @@ class Runner(object):
|
||||
ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING)
|
||||
|
||||
def __init__(self, runtime, waiter):
|
||||
self._runtime = runtime
|
||||
self._scheduler = runtime.scheduler
|
||||
self._completer = runtime.completer
|
||||
self._storage = runtime.storage
|
||||
self._analyzer = runtime.graph_analyzer
|
||||
self._analyzer = runtime.analyzer
|
||||
self._waiter = waiter
|
||||
|
||||
def is_running(self):
|
||||
|
||||
@@ -20,8 +20,8 @@ from taskflow import states as st
|
||||
from taskflow import task as task_atom
|
||||
from taskflow.utils import misc
|
||||
|
||||
from taskflow.engines.action_engine import analyzer as ca
|
||||
from taskflow.engines.action_engine import executor as ex
|
||||
from taskflow.engines.action_engine import graph_analyzer as ga
|
||||
from taskflow.engines.action_engine import retry_action as ra
|
||||
from taskflow.engines.action_engine import task_action as ta
|
||||
|
||||
@@ -47,9 +47,8 @@ class Runtime(object):
|
||||
return self._storage
|
||||
|
||||
@misc.cachedproperty
|
||||
def graph_analyzer(self):
|
||||
return ga.GraphAnalyzer(self._compilation.execution_graph,
|
||||
self._storage)
|
||||
def analyzer(self):
|
||||
return ca.Analyzer(self._compilation, self._storage)
|
||||
|
||||
@misc.cachedproperty
|
||||
def completer(self):
|
||||
@@ -82,11 +81,11 @@ class Runtime(object):
|
||||
self.storage.set_atom_intention(node.name, intention)
|
||||
|
||||
def reset_all(self, state=st.PENDING, intention=st.EXECUTE):
|
||||
self.reset_nodes(self.graph_analyzer.iterate_all_nodes(),
|
||||
self.reset_nodes(self.analyzer.iterate_all_nodes(),
|
||||
state=state, intention=intention)
|
||||
|
||||
def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE):
|
||||
self.reset_nodes(self.graph_analyzer.iterate_subgraph(node),
|
||||
self.reset_nodes(self.analyzer.iterate_subgraph(node),
|
||||
state=state, intention=intention)
|
||||
|
||||
|
||||
@@ -100,7 +99,7 @@ class Completer(object):
|
||||
"""Completes atoms using actions to complete them."""
|
||||
|
||||
def __init__(self, runtime):
|
||||
self._analyzer = runtime.graph_analyzer
|
||||
self._analyzer = runtime.analyzer
|
||||
self._retry_action = runtime.retry_action
|
||||
self._runtime = runtime
|
||||
self._storage = runtime.storage
|
||||
@@ -183,7 +182,7 @@ class Scheduler(object):
|
||||
"""Schedules atoms using actions to schedule."""
|
||||
|
||||
def __init__(self, runtime):
|
||||
self._analyzer = runtime.graph_analyzer
|
||||
self._analyzer = runtime.analyzer
|
||||
self._retry_action = runtime.retry_action
|
||||
self._runtime = runtime
|
||||
self._storage = runtime.storage
|
||||
|
||||
Reference in New Issue
Block a user