Merge "Use helper function for post-atom-completion work"

This commit is contained in:
Jenkins 2016-02-01 08:47:42 +00:00 committed by Gerrit Code Review
commit 6a48709d76
2 changed files with 51 additions and 38 deletions

View File

@ -109,6 +109,8 @@ class MachineBuilder(object):
# Cache some local functions/methods...
do_complete = self._completer.complete
do_complete_failure = self._completer.complete_failure
get_atom_intention = self._storage.get_atom_intention
def do_schedule(next_nodes):
return self._scheduler.schedule(
@ -180,6 +182,36 @@ class MachineBuilder(object):
memory.next_up.intersection_update(not_done)
return WAIT
def complete_an_atom(fut):
# This completes a single atom saving its result in
# storage and preparing whatever predecessors or successors will
# now be ready to execute (or revert or retry...); it also
# handles failures that occur during this process safely...
atom = fut.atom
try:
outcome, result = fut.result()
do_complete(atom, outcome, result)
if isinstance(result, failure.Failure):
retain = do_complete_failure(atom, outcome, result)
if retain:
memory.failures.append(result)
else:
# NOTE(harlowja): avoid making any intention request
# to storage unless we are sure we are in DEBUG
# enabled logging (otherwise we will call this all
# the time even when DEBUG is not enabled, which
# would suck...)
if LOG.isEnabledFor(logging.DEBUG):
intention = get_atom_intention(atom.name)
LOG.debug("Discarding failure '%s' (in response"
" to outcome '%s') under completion"
" units request during completion of"
" atom '%s' (intention is to %s)",
result, outcome, atom, intention)
except Exception:
memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion failed", atom)
def wait(old_state, new_state, event):
# TODO(harlowja): maybe we should start doing 'yield from' this
# call sometime in the future, or equivalent that will work in
@ -192,40 +224,19 @@ class MachineBuilder(object):
def analyze(old_state, new_state, event):
# This reaction function is responsible for analyzing all nodes
# that have finished executing and completing them and figuring
# that have finished executing/reverting and figuring
# out what nodes are now ready to be ran (and then triggering those
# nodes to be scheduled in the future); handles failures that
# occur during this process safely...
next_up = set()
while memory.done:
fut = memory.done.pop()
atom = fut.atom
try:
outcome, result = fut.result()
retain = do_complete(atom, outcome, result)
if isinstance(result, failure.Failure):
if retain:
memory.failures.append(result)
else:
# NOTE(harlowja): avoid making any
# intention request to storage unless we are
# sure we are in DEBUG enabled logging (otherwise
# we will call this all the time even when DEBUG
# is not enabled, which would suck...)
if LOG.isEnabledFor(logging.DEBUG):
intention = self._storage.get_atom_intention(
atom.name)
LOG.debug("Discarding failure '%s' (in"
" response to outcome '%s') under"
" completion units request during"
" completion of atom '%s' (intention"
" is to %s)", result, outcome,
atom, intention)
except Exception:
memory.failures.append(failure.Failure())
LOG.exception("Engine '%s' atom post-completion"
" failed", atom)
else:
# Force it to be completed so that we can ensure that
# before we iterate over any successors or predecessors
# that we know it has been completed and saved and so on...
complete_an_atom(fut)
if not memory.failures:
atom = fut.atom
try:
more_work = set(iter_next_atoms(atom=atom))
except Exception:

View File

@ -26,7 +26,6 @@ from taskflow.engines.action_engine import executor as ex
from taskflow import logging
from taskflow import retry as retry_atom
from taskflow import states as st
from taskflow.types import failure
LOG = logging.getLogger(__name__)
@ -144,24 +143,27 @@ class Completer(object):
" state %s", atom, atom_state)
return unfinished_atoms
def complete(self, node, outcome, result):
"""Performs post-execution completion of a node.
def complete_failure(self, node, outcome, failure):
"""Performs post-execution completion of a nodes failure.
Returns whether the result should be saved into an accumulator of
failures or whether this should not be done.
"""
if outcome == ex.EXECUTED:
self._process_atom_failure(node, failure)
# We resolved something, carry on...
return False
else:
# Reverting failed, always retain the failure...
return True
def complete(self, node, outcome, result):
"""Performs post-execution completion of a node result."""
handler = self._runtime.fetch_action(node)
if outcome == ex.EXECUTED:
handler.complete_execution(node, result)
else:
handler.complete_reversion(node, result)
if isinstance(result, failure.Failure):
if outcome == ex.EXECUTED:
self._process_atom_failure(node, result)
else:
# Reverting failed, always retain the failure...
return True
return False
def _determine_resolution(self, atom, failure):
"""Determines which resolution strategy to activate/apply."""