From f4eac0ff4e083ad02833d50f18bab250fc1d1553 Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Tue, 26 Jan 2016 17:20:43 -0800 Subject: [PATCH] Attempt to cancel active futures when suspending is underway Instead of waiting on futures to complete that may still be in an executors backlog (aka, not even started yet) we can avoid even executing (and waiting on these to finish) by forcing them to be cancelled and handling this when analyzing the futures result (by doing nothing with the cancelled result of a future). This saves some time when doing suspension with executors that have a very small pool of workers that actually run tasks/atoms. Change-Id: Ie79466c44ac6af5ff8936f192ebce4428aa12d98 --- taskflow/engines/action_engine/builder.py | 49 ++++++++++++++++++----- 1 file changed, 40 insertions(+), 9 deletions(-) diff --git a/taskflow/engines/action_engine/builder.py b/taskflow/engines/action_engine/builder.py index 75ab3591..a5746af2 100644 --- a/taskflow/engines/action_engine/builder.py +++ b/taskflow/engines/action_engine/builder.py @@ -14,6 +14,7 @@ # License for the specific language governing permissions and limitations # under the License. +from concurrent import futures import weakref from automaton import machines @@ -43,6 +44,12 @@ SUCCESS = 'success' REVERTED = 'reverted' START = 'start' +# Internal enums used to denote how/if a atom was completed.""" +FAILED_COMPLETING = 'failed_completing' +WAS_CANCELLED = 'was_cancelled' +SUCCESSFULLY_COMPLETED = 'successfully_completed' + + # For these states we will gather how long (in seconds) the # state was in-progress (cumulatively if the state is entered multiple # times) @@ -60,6 +67,11 @@ class MachineMemory(object): self.failures = [] self.done = set() + def cancel_futures(self): + """Attempts to cancel any not done futures.""" + for fut in self.not_done: + fut.cancel() + class MachineBuilder(object): """State machine *builder* that powers the engine components. @@ -136,10 +148,6 @@ class MachineBuilder(object): key=lambda node: getattr(node, 'priority', 0), reverse=True)) - def is_runnable(): - # Checks if the storage says the flow is still runnable... - return self._storage.get_flow_state() == st.RUNNING - def iter_next_atoms(atom=None, apply_deciders=True): # Yields and filters and tweaks the next atoms to run... maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom) @@ -191,13 +199,21 @@ class MachineBuilder(object): # if the user of this engine has requested the engine/storage # that holds this information to stop or suspend); handles failures # that occur during this process safely... - if is_runnable() and memory.next_up: + current_flow_state = self._storage.get_flow_state() + if current_flow_state == st.RUNNING and memory.next_up: not_done, failures = do_schedule(memory.next_up) if not_done: memory.not_done.update(not_done) if failures: memory.failures.extend(failures) memory.next_up.intersection_update(not_done) + elif current_flow_state == st.SUSPENDING and memory.not_done: + # Try to force anything not cancelled to now be cancelled + # so that the executor that gets it does not continue to + # try to work on it (if the future execution is still in + # its backlog, if it's already being executed, this will + # do nothing). + memory.cancel_futures() return WAIT def complete_an_atom(fut): @@ -230,9 +246,19 @@ class MachineBuilder(object): statistics['discarded_failures'] += 1 if gather_statistics: statistics['completed'] += 1 + except futures.CancelledError: + # Well it got cancelled, skip doing anything + # and move on; at a further time it will be resumed + # and something should be done with it to get it + # going again. + return WAS_CANCELLED except Exception: memory.failures.append(failure.Failure()) - LOG.exception("Engine '%s' atom post-completion failed", atom) + LOG.exception("Engine '%s' atom post-completion" + " failed", atom) + return FAILED_COMPLETING + else: + return SUCCESSFULLY_COMPLETED def wait(old_state, new_state, event): # TODO(harlowja): maybe we should start doing 'yield from' this @@ -256,8 +282,9 @@ class MachineBuilder(object): # Force it to be completed so that we can ensure that # before we iterate over any successors or predecessors # that we know it has been completed and saved and so on... - complete_an_atom(fut) - if not memory.failures: + completion_status = complete_an_atom(fut) + if (not memory.failures + and completion_status != WAS_CANCELLED): atom = fut.atom try: more_work = set(iter_next_atoms(atom=atom)) @@ -267,10 +294,14 @@ class MachineBuilder(object): " next atom searching failed", atom) else: next_up.update(more_work) - if is_runnable() and next_up and not memory.failures: + current_flow_state = self._storage.get_flow_state() + if (current_flow_state == st.RUNNING + and next_up and not memory.failures): memory.next_up.update(next_up) return SCHEDULE elif memory.not_done: + if current_flow_state == st.SUSPENDING: + memory.cancel_futures() return WAIT else: return FINISH