From cbef0ee2866adb22925c7a8dbc0c587de21136ab Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 1 Oct 2013 10:15:15 -0700 Subject: [PATCH] Avoid setting object variables Instead of setting a object state variable, which makes a graph_action object now have state we can instead just have a new event boolean that will be set to true when a suspension is detected. This value will then be used to alter the status returned from the parallel graph action. Also fixes a lack of returned status when there are no items to actually run (aka the deps counter dictionary is empty). Change-Id: Ifa3ba7293613ba1e70e77d29504ea214f0b4c9e9 --- taskflow/engines/action_engine/graph_action.py | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 505f0b01b..d19f876df 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -119,7 +119,7 @@ class ParallelGraphAction(SequentialGraphAction): has_failed = threading.Event() deps_lock = threading.RLock() deps_counter = self._get_nodes_dependencies_count() - self._future_flow_state = st.SUCCESS + was_suspended = threading.Event() def submit_followups(node): # Mutating the deps_counter isn't thread safe. @@ -146,7 +146,7 @@ class ParallelGraphAction(SequentialGraphAction): if engine.is_running: action.execute(engine) else: - self._future_flow_state = st.SUSPENDED + was_suspended.set() return except Exception: # Make sure others don't continue working (although they may @@ -167,7 +167,7 @@ class ParallelGraphAction(SequentialGraphAction): # Nothing to execute in the first place if not deps_counter: - return + return st.SUCCESS # Ensure that we obtain the lock just in-case the functions submitted # immediately themselves start submitting there own jobs (which could @@ -204,4 +204,7 @@ class ParallelGraphAction(SequentialGraphAction): elif len(failures) == 1: failures[0].reraise() - return self._future_flow_state + if was_suspended.is_set(): + return st.SUSPENDED + else: + return st.SUCCESS