Attempt to cancel active futures when suspending is underway

Instead of waiting on futures to complete that may still be
in an executors backlog (aka, not even started yet) we can
avoid even executing (and waiting on these to finish) by
forcing them to be cancelled and handling this when analyzing
the futures result (by doing nothing with the cancelled result
of a future).

This saves some time when doing suspension with executors that
have a very small pool of workers that actually run tasks/atoms.

Change-Id: Ie79466c44ac6af5ff8936f192ebce4428aa12d98
This commit is contained in:
Joshua Harlow
2016-01-26 17:20:43 -08:00
committed by Joshua Harlow
parent 1bc8dd9bca
commit f4eac0ff4e

View File

@@ -14,6 +14,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from concurrent import futures
import weakref
from automaton import machines
@@ -43,6 +44,12 @@ SUCCESS = 'success'
REVERTED = 'reverted'
START = 'start'
# Internal enums used to denote how/if a atom was completed."""
FAILED_COMPLETING = 'failed_completing'
WAS_CANCELLED = 'was_cancelled'
SUCCESSFULLY_COMPLETED = 'successfully_completed'
# For these states we will gather how long (in seconds) the
# state was in-progress (cumulatively if the state is entered multiple
# times)
@@ -60,6 +67,11 @@ class MachineMemory(object):
self.failures = []
self.done = set()
def cancel_futures(self):
"""Attempts to cancel any not done futures."""
for fut in self.not_done:
fut.cancel()
class MachineBuilder(object):
"""State machine *builder* that powers the engine components.
@@ -136,10 +148,6 @@ class MachineBuilder(object):
key=lambda node: getattr(node, 'priority', 0),
reverse=True))
def is_runnable():
# Checks if the storage says the flow is still runnable...
return self._storage.get_flow_state() == st.RUNNING
def iter_next_atoms(atom=None, apply_deciders=True):
# Yields and filters and tweaks the next atoms to run...
maybe_atoms_it = self._analyzer.iter_next_atoms(atom=atom)
@@ -191,13 +199,21 @@ class MachineBuilder(object):
# if the user of this engine has requested the engine/storage
# that holds this information to stop or suspend); handles failures
# that occur during this process safely...
if is_runnable() and memory.next_up:
current_flow_state = self._storage.get_flow_state()
if current_flow_state == st.RUNNING and memory.next_up:
not_done, failures = do_schedule(memory.next_up)
if not_done:
memory.not_done.update(not_done)
if failures:
memory.failures.extend(failures)
memory.next_up.intersection_update(not_done)
elif current_flow_state == st.SUSPENDING and memory.not_done:
# Try to force anything not cancelled to now be cancelled
# so that the executor that gets it does not continue to
# try to work on it (if the future execution is still in
# its backlog, if it's already being executed, this will
# do nothing).
memory.cancel_futures()
return WAIT
def complete_an_atom(fut):
@@ -230,9 +246,19 @@ class MachineBuilder(object):
statistics['discarded_failures'] += 1
if gather_statistics:
statistics['completed'] += 1
except futures.CancelledError:
# Well it got cancelled, skip doing anything
# and move on; at a further time it will be resumed
# and something should be done with it to get it
# going again.
return WAS_CANCELLED
except Exception:
memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion failed", atom)
LOG.exception("Engine '%s' atom post-completion"
" failed", atom)
return FAILED_COMPLETING
else:
return SUCCESSFULLY_COMPLETED
def wait(old_state, new_state, event):
# TODO(harlowja): maybe we should start doing 'yield from' this
@@ -256,8 +282,9 @@ class MachineBuilder(object):
# Force it to be completed so that we can ensure that
# before we iterate over any successors or predecessors
# that we know it has been completed and saved and so on...
complete_an_atom(fut)
if not memory.failures:
completion_status = complete_an_atom(fut)
if (not memory.failures
and completion_status != WAS_CANCELLED):
atom = fut.atom
try:
more_work = set(iter_next_atoms(atom=atom))
@@ -267,10 +294,14 @@ class MachineBuilder(object):
" next atom searching failed", atom)
else:
next_up.update(more_work)
if is_runnable() and next_up and not memory.failures:
current_flow_state = self._storage.get_flow_state()
if (current_flow_state == st.RUNNING
and next_up and not memory.failures):
memory.next_up.update(next_up)
return SCHEDULE
elif memory.not_done:
if current_flow_state == st.SUSPENDING:
memory.cancel_futures()
return WAIT
else:
return FINISH