diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 75ab3591..a5746af2 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +from concurrent import futures import weakref from automaton import machines @@ -43,6 +44,12 @@ SUCCESS = 'success' REVERTED = 'reverted' START = 'start' +# Internal enums used to denote how/if a atom was completed.""" +FAILED_COMPLETING = 'failed_completing' +WAS_CANCELLED = 'was_cancelled' +SUCCESSFULLY_COMPLETED = 'successfully_completed' + + # For these states we will gather how long (in seconds) the # state was in-progress (cumulatively if the state is entered multiple # times) @@ -60,6 +67,11 @@ class MachineMemory(object): self.failures = [] self.done = set() + def cancel_futures(self): + """Attempts to cancel any not done futures.""" + for fut in self.not_done: + fut.cancel() + class MachineBuilder(object): """State machine *builder* that powers the engine components. @@ -136,10 +148,6 @@ class MachineBuilder(object): key=lambda node: getattr(node, 'priority', 0), reverse=True)) - def is_runnable(): - # Checks if the storage says the flow is still runnable... - return self._storage.get_flow_state() == st.RUNNING - def iter_next_atoms(atom=None, apply_deciders=True): # Yields and filters and tweaks the next atoms to run... maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom) @@ -191,13 +199,21 @@ class MachineBuilder(object): # if the user of this engine has requested the engine/storage # that holds this information to stop or suspend); handles failures # that occur during this process safely... - if is_runnable() and memory.next_up: + current_flow_state = self._storage.get_flow_state() + if current_flow_state == st.RUNNING and memory.next_up: not_done, failures = do_schedule(memory.next_up) if not_done: memory.not_done.update(not_done) if failures: memory.failures.extend(failures) memory.next_up.intersection_update(not_done) + elif current_flow_state == st.SUSPENDING and memory.not_done: + # Try to force anything not cancelled to now be cancelled + # so that the executor that gets it does not continue to + # try to work on it (if the future execution is still in + # its backlog, if it's already being executed, this will + # do nothing). + memory.cancel_futures() return WAIT def complete_an_atom(fut): @@ -230,9 +246,19 @@ class MachineBuilder(object): statistics['discarded_failures'] += 1 if gather_statistics: statistics['completed'] += 1 + except futures.CancelledError: + # Well it got cancelled, skip doing anything + # and move on; at a further time it will be resumed + # and something should be done with it to get it + # going again. + return WAS_CANCELLED except Exception: memory.failures.append(failure.Failure()) - LOG.exception("Engine '%s' atom post-completion failed", atom) + LOG.exception("Engine '%s' atom post-completion" + " failed", atom) + return FAILED_COMPLETING + else: + return SUCCESSFULLY_COMPLETED def wait(old_state, new_state, event): # TODO(harlowja): maybe we should start doing 'yield from' this @@ -256,8 +282,9 @@ class MachineBuilder(object): # Force it to be completed so that we can ensure that # before we iterate over any successors or predecessors # that we know it has been completed and saved and so on... - complete_an_atom(fut) - if not memory.failures: + completion_status = complete_an_atom(fut) + if (not memory.failures + and completion_status != WAS_CANCELLED): atom = fut.atom try: more_work = set(iter_next_atoms(atom=atom)) @@ -267,10 +294,14 @@ class MachineBuilder(object): " next atom searching failed", atom) else: next_up.update(more_work) - if is_runnable() and next_up and not memory.failures: + current_flow_state = self._storage.get_flow_state() + if (current_flow_state == st.RUNNING + and next_up and not memory.failures): memory.next_up.update(next_up) return SCHEDULE elif memory.not_done: + if current_flow_state == st.SUSPENDING: + memory.cancel_futures() return WAIT else: return FINISH