From aa8a45b3d3004e84e7e1e5791581bee6e5622dc5 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Thu, 9 Jul 2015 21:23:15 -0700 Subject: [PATCH] Give the GC more of a break with regard to cycles We can avoid creating reference cycles relatively easily which will make the GC have to do less to garbage collect these objects so let's just give it a break to start. This is *safe* to do since the runtime components have the same lifetime as the runtime itself and they will never outlive the runtime objects existence (a runtime objects lifetime is directly the same as the engine objects lifetime). Change-Id: I7f1ee91e04f29dd27da1e57a462573e068aee45c --- taskflow/engines/action_engine/analyzer.py | 17 ++++++++--------- taskflow/engines/action_engine/builder.py | 3 ++- taskflow/engines/action_engine/scheduler.py | 4 ++-- 3 files changed, 12 insertions(+), 12 deletions(-) diff --git a/taskflow/engines/action_engine/analyzer.py b/taskflow/engines/action_engine/analyzer.py index f5c68722..78d4c29f 100644 --- a/taskflow/engines/action_engine/analyzer.py +++ b/taskflow/engines/action_engine/analyzer.py @@ -15,8 +15,8 @@ # under the License. import abc -import functools import itertools +import weakref from networkx.algorithms import traversal import six @@ -101,12 +101,9 @@ class Analyzer(object): """ def __init__(self, runtime): + self._runtime = weakref.proxy(runtime) self._storage = runtime.storage self._execution_graph = runtime.compilation.execution_graph - self._check_atom_transition = runtime.check_atom_transition - self._fetch_edge_deciders = runtime.fetch_edge_deciders - self._fetch_retries = functools.partial( - runtime.fetch_atoms_by_kind, 'retry') def get_next_nodes(self, node=None): """Get next nodes to run (originating from node or all nodes).""" @@ -174,7 +171,8 @@ class Analyzer(object): state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) - transition = self._check_atom_transition(atom, state, st.RUNNING) + transition = self._runtime.check_atom_transition(atom, state, + st.RUNNING) if not transition or intention != st.EXECUTE: return (False, None) @@ -190,7 +188,7 @@ class Analyzer(object): if not ok_to_run: return (False, None) else: - edge_deciders = self._fetch_edge_deciders(atom) + edge_deciders = self._runtime.fetch_edge_deciders(atom) return (True, IgnoreDecider(atom, edge_deciders)) def _get_maybe_ready_for_revert(self, atom): @@ -198,7 +196,8 @@ class Analyzer(object): state = self.get_state(atom) intention = self._storage.get_atom_intention(atom.name) - transition = self._check_atom_transition(atom, state, st.REVERTING) + transition = self._runtime.check_atom_transition(atom, state, + st.REVERTING) if not transition or intention not in (st.REVERT, st.RETRY): return (False, None) @@ -226,7 +225,7 @@ class Analyzer(object): If no state is provided it will yield back all retry atoms. """ - for atom in self._fetch_retries(): + for atom in self._runtime.fetch_atoms_by_kind('retry'): if not state or self.get_state(atom) == state: yield atom diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index f46d4a1f..9ab26d4a 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +import weakref from automaton import machines @@ -91,7 +92,7 @@ class MachineBuilder(object): """ def __init__(self, runtime, waiter): - self._runtime = runtime + self._runtime = weakref.proxy(runtime) self._analyzer = runtime.analyzer self._completer = runtime.completer self._scheduler = runtime.scheduler diff --git a/taskflow/engines/action_engine/scheduler.py b/taskflow/engines/action_engine/scheduler.py index 404781e6..5fdc1995 100644 --- a/taskflow/engines/action_engine/scheduler.py +++ b/taskflow/engines/action_engine/scheduler.py @@ -76,7 +76,7 @@ class Scheduler(object): """Safely schedules atoms using a runtime ``fetch_scheduler`` routine.""" def __init__(self, runtime): - self._fetch_scheduler = runtime.fetch_scheduler + self._runtime = weakref.proxy(runtime) def schedule(self, atoms): """Schedules the provided atoms for *future* completion. @@ -89,7 +89,7 @@ class Scheduler(object): """ futures = set() for atom in atoms: - scheduler = self._fetch_scheduler(atom) + scheduler = self._runtime.fetch_scheduler(atom) try: futures.add(scheduler.schedule(atom)) except Exception: