diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index f5c68722..78d4c29f 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -15,8 +15,8 @@ # under the License. import abc -import functools import itertools +import weakref from networkx.algorithms import traversal import six @@ -101,12 +101,9 @@ class Analyzer(object): """ def __init__(self, runtime): + self._runtime = weakref.proxy(runtime) self._storage = runtime.storage self._execution_graph = runtime.compilation.execution_graph - self._check_atom_transition = runtime.check_atom_transition - self._fetch_edge_deciders = runtime.fetch_edge_deciders - self._fetch_retries = functools.partial( - runtime.fetch_atoms_by_kind, 'retry') def get_next_nodes(self, node=None): """Get next nodes to run (originating from node or all nodes).""" @@ -174,7 +171,8 @@ class Analyzer(object): state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) - transition = self._check_atom_transition(atom, state, st.RUNNING) + transition = self._runtime.check_atom_transition(atom, state, + st.RUNNING) if not transition or intention != st.EXECUTE: return (False, None) @@ -190,7 +188,7 @@ class Analyzer(object): if not ok_to_run: return (False, None) else: - edge_deciders = self._fetch_edge_deciders(atom) + edge_deciders = self._runtime.fetch_edge_deciders(atom) return (True, IgnoreDecider(atom, edge_deciders)) def _get_maybe_ready_for_revert(self, atom): @@ -198,7 +196,8 @@ class Analyzer(object): state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) - transition = self._check_atom_transition(atom, state, st.REVERTING) + transition = self._runtime.check_atom_transition(atom, state, + st.REVERTING) if not transition or intention not in (st.REVERT, st.RETRY): return (False, None) @@ -226,7 +225,7 @@ class Analyzer(object): If no state is provided it will yield back all retry atoms. """ - for atom in self._fetch_retries(): + for atom in self._runtime.fetch_atoms_by_kind('retry'): if not state or self.get_state(atom) == state: yield atom diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index f46d4a1f..9ab26d4a 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import weakref from automaton import machines @@ -91,7 +92,7 @@ class MachineBuilder(object): """ def __init__(self, runtime, waiter): - self._runtime = runtime + self._runtime = weakref.proxy(runtime) self._analyzer = runtime.analyzer self._completer = runtime.completer self._scheduler = runtime.scheduler diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 404781e6..5fdc1995 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -76,7 +76,7 @@ class Scheduler(object): """Safely schedules atoms using a runtime ``fetch_scheduler`` routine.""" def __init__(self, runtime): - self._fetch_scheduler = runtime.fetch_scheduler + self._runtime = weakref.proxy(runtime) def schedule(self, atoms): """Schedules the provided atoms for *future* completion. @@ -89,7 +89,7 @@ class Scheduler(object): """ futures = set() for atom in atoms: - scheduler = self._fetch_scheduler(atom) + scheduler = self._runtime.fetch_scheduler(atom) try: futures.add(scheduler.schedule(atom)) except Exception: