From 7b5dad30ed660de6e09ad46152fe26dc13d92abd Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Mon, 2 Jun 2014 17:56:49 -0700 Subject: [PATCH] Rename the graph analyzer to analyzer Adjust the graph analyzer to be a more generic compilation analyzer which analyzes compilation objects (which are now changed to be an object and not a named tuple) and provides utility functions ontop of that object. This helps it become possible to provide other useful analysis functions that are not directly tied to the execution graph component but can be provided ontop of other compilation components. Change-Id: I2ab08db4f566d5f329d7e79b1c50ed65aad9e4b3 --- doc/source/engines.rst | 6 +++--- .../{graph_analyzer.py => analyzer.py} | 18 ++++++++++-------- taskflow/engines/action_engine/compiler.py | 18 +++++++++++++----- taskflow/engines/action_engine/runner.py | 3 +-- taskflow/engines/action_engine/runtime.py | 15 +++++++-------- 5 files changed, 34 insertions(+), 26 deletions(-) rename taskflow/engines/action_engine/{graph_analyzer.py => analyzer.py} (89%) diff --git a/doc/source/engines.rst b/doc/source/engines.rst index 03526491..752f9f0e 100644 --- a/doc/source/engines.rst +++ b/doc/source/engines.rst @@ -253,7 +253,7 @@ analyzing the current state of the task; which is determined by looking at the state in the task detail object for that task and analyzing edges of the graph for things like retry atom which can influence what a tasks intention should be (this is aided by the usage of the -:py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper +:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper object which was designed to provide helper methods for this analysis). Once these intentions are determined and associated with each task (the intention is also stored in the :py:class:`~taskflow.persistence.logbook.AtomDetail` object) @@ -268,7 +268,7 @@ This stage selects which atoms are eligible to run by using a :py:class:`~taskflow.engines.action_engine.runtime.Scheduler` implementation (the default implementation looks at there intention, checking if predecessor atoms have ran and so-on, using a -:py:class:`~taskflow.engines.action_engine.graph_analyzer.GraphAnalyzer` helper +:py:class:`~taskflow.engines.action_engine.analyzer.Analyzer` helper object as needed) and submits those atoms to a previously provided compatible `executor`_ for asynchronous execution. This :py:class:`~taskflow.engines.action_engine.runtime.Scheduler` will return a @@ -322,9 +322,9 @@ saved for this execution. Interfaces ========== +.. automodule:: taskflow.engines.action_engine.analyzer .. automodule:: taskflow.engines.action_engine.compiler .. automodule:: taskflow.engines.action_engine.engine -.. automodule:: taskflow.engines.action_engine.graph_analyzer .. automodule:: taskflow.engines.action_engine.runner .. automodule:: taskflow.engines.action_engine.runtime .. automodule:: taskflow.engines.base diff --git a/taskflow/engines/action_engine/graph_analyzer.py b/taskflow/engines/action_engine/analyzer.py similarity index 89% rename from taskflow/engines/action_engine/graph_analyzer.py rename to taskflow/engines/action_engine/analyzer.py index 2e910f6b..ef960afc 100644 --- a/taskflow/engines/action_engine/graph_analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -17,19 +17,21 @@ from networkx.algorithms import traversal import six -from taskflow import retry as r +from taskflow import retry as retry_atom from taskflow import states as st -class GraphAnalyzer(object): - """Analyzes a execution graph to get the next nodes for execution or - reversion by utilizing the graphs nodes and edge relations and comparing - the node state against the states stored in storage. +class Analyzer(object): + """Analyzes a compilation output to get the next atoms for execution or + reversion by utilizing the compilations underlying structures (graphs, + nodes and edge relations...) and using this information along with the + atom state/states stored in storage to provide useful analysis functions + to the rest of the runtime system. """ - def __init__(self, graph, storage): - self._graph = graph + def __init__(self, compilation, storage): self._storage = storage + self._graph = compilation.execution_graph def get_next_nodes(self, node=None): if node is None: @@ -129,7 +131,7 @@ class GraphAnalyzer(object): retries if state is None. """ for node in self._graph.nodes_iter(): - if isinstance(node, r.Retry): + if isinstance(node, retry_atom.Retry): if not state or self.get_state(node) == state: yield node diff --git a/taskflow/engines/action_engine/compiler.py b/taskflow/engines/action_engine/compiler.py index 883e7f0b..f5c519cc 100644 --- a/taskflow/engines/action_engine/compiler.py +++ b/taskflow/engines/action_engine/compiler.py @@ -14,7 +14,6 @@ # License for the specific language governing permissions and limitations # under the License. -import collections import logging from taskflow import exceptions as exc @@ -27,10 +26,19 @@ from taskflow.utils import misc LOG = logging.getLogger(__name__) -# The result of a compilers compile() is this tuple (for now it is just a -# execution graph but in the future it may grow to include more attributes -# that help the runtime units execute in a more optimal/featureful manner). -Compilation = collections.namedtuple("Compilation", ["execution_graph"]) +class Compilation(object): + """The result of a compilers compile() is this *immutable* object. + + For now it is just a execution graph but in the future it will grow to + include more methods & properties that help the various runtime units + execute in a more optimal & featureful manner. + """ + def __init__(self, execution_graph): + self._execution_graph = execution_graph + + @property + def execution_graph(self): + return self._execution_graph class PatternCompiler(object): diff --git a/taskflow/engines/action_engine/runner.py b/taskflow/engines/action_engine/runner.py index dc8c1003..0120bd69 100644 --- a/taskflow/engines/action_engine/runner.py +++ b/taskflow/engines/action_engine/runner.py @@ -43,11 +43,10 @@ class Runner(object): ignorable_states = (st.SCHEDULING, st.WAITING, st.RESUMING, st.ANALYZING) def __init__(self, runtime, waiter): - self._runtime = runtime self._scheduler = runtime.scheduler self._completer = runtime.completer self._storage = runtime.storage - self._analyzer = runtime.graph_analyzer + self._analyzer = runtime.analyzer self._waiter = waiter def is_running(self): diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 709ff78a..146c93a7 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -20,8 +20,8 @@ from taskflow import states as st from taskflow import task as task_atom from taskflow.utils import misc +from taskflow.engines.action_engine import analyzer as ca from taskflow.engines.action_engine import executor as ex -from taskflow.engines.action_engine import graph_analyzer as ga from taskflow.engines.action_engine import retry_action as ra from taskflow.engines.action_engine import task_action as ta @@ -47,9 +47,8 @@ class Runtime(object): return self._storage @misc.cachedproperty - def graph_analyzer(self): - return ga.GraphAnalyzer(self._compilation.execution_graph, - self._storage) + def analyzer(self): + return ca.Analyzer(self._compilation, self._storage) @misc.cachedproperty def completer(self): @@ -82,11 +81,11 @@ class Runtime(object): self.storage.set_atom_intention(node.name, intention) def reset_all(self, state=st.PENDING, intention=st.EXECUTE): - self.reset_nodes(self.graph_analyzer.iterate_all_nodes(), + self.reset_nodes(self.analyzer.iterate_all_nodes(), state=state, intention=intention) def reset_subgraph(self, node, state=st.PENDING, intention=st.EXECUTE): - self.reset_nodes(self.graph_analyzer.iterate_subgraph(node), + self.reset_nodes(self.analyzer.iterate_subgraph(node), state=state, intention=intention) @@ -100,7 +99,7 @@ class Completer(object): """Completes atoms using actions to complete them.""" def __init__(self, runtime): - self._analyzer = runtime.graph_analyzer + self._analyzer = runtime.analyzer self._retry_action = runtime.retry_action self._runtime = runtime self._storage = runtime.storage @@ -183,7 +182,7 @@ class Scheduler(object): """Schedules atoms using actions to schedule.""" def __init__(self, runtime): - self._analyzer = runtime.graph_analyzer + self._analyzer = runtime.analyzer self._retry_action = runtime.retry_action self._runtime = runtime self._storage = runtime.storage