diff --git a/taskflow/engines/action_engine/graph_action.py b/taskflow/engines/action_engine/graph_action.py index 505f0b01b..d19f876df 100644 --- a/taskflow/engines/action_engine/graph_action.py +++ b/taskflow/engines/action_engine/graph_action.py @@ -119,7 +119,7 @@ class ParallelGraphAction(SequentialGraphAction): has_failed = threading.Event() deps_lock = threading.RLock() deps_counter = self._get_nodes_dependencies_count() - self._future_flow_state = st.SUCCESS + was_suspended = threading.Event() def submit_followups(node): # Mutating the deps_counter isn't thread safe. @@ -146,7 +146,7 @@ class ParallelGraphAction(SequentialGraphAction): if engine.is_running: action.execute(engine) else: - self._future_flow_state = st.SUSPENDED + was_suspended.set() return except Exception: # Make sure others don't continue working (although they may @@ -167,7 +167,7 @@ class ParallelGraphAction(SequentialGraphAction): # Nothing to execute in the first place if not deps_counter: - return + return st.SUCCESS # Ensure that we obtain the lock just in-case the functions submitted # immediately themselves start submitting there own jobs (which could @@ -204,4 +204,7 @@ class ParallelGraphAction(SequentialGraphAction): elif len(failures) == 1: failures[0].reraise() - return self._future_flow_state + if was_suspended.is_set(): + return st.SUSPENDED + else: + return st.SUCCESS