Ensure we change the self and parents states correctly.
Change the state of the current flow in the rollback method itself so that if we have parent workflows they also get there states changed.
This commit is contained in:
@@ -107,12 +107,6 @@ class Flow(object):
|
||||
# some type of undo of the tasks already completed.
|
||||
cause = FlowFailure(task, self, excp)
|
||||
with excutils.save_and_reraise_exception():
|
||||
try:
|
||||
self._change_state(context, states.REVERTING)
|
||||
except Exception:
|
||||
LOG.exception("Dropping exception catched when"
|
||||
" changing state to reverting while performing"
|
||||
" reconcilation on a tasks exception.")
|
||||
try:
|
||||
self._on_task_error(context, task)
|
||||
except Exception:
|
||||
@@ -122,15 +116,7 @@ class Flow(object):
|
||||
# The default strategy will be to rollback all the contained
|
||||
# tasks by calling there reverting methods, and then calling
|
||||
# any parent flows rollbacks (and so-on).
|
||||
try:
|
||||
self.rollback(context, cause)
|
||||
finally:
|
||||
try:
|
||||
self._change_state(context, states.FAILURE)
|
||||
except Exception:
|
||||
LOG.exception("Dropping exception catched when"
|
||||
" changing state to failure while performing"
|
||||
" reconcilation on a tasks exception.")
|
||||
self.rollback(context, cause)
|
||||
|
||||
def run(self, context, *args, **kwargs):
|
||||
if self.state != states.PENDING:
|
||||
@@ -263,23 +249,41 @@ class Flow(object):
|
||||
# For example, if each task was creating a file in a directory, then
|
||||
# it's easier to just remove the directory than to ask each task to
|
||||
# delete its file individually.
|
||||
for (i, (task, result)) in enumerate(reversed(self._reversions)):
|
||||
try:
|
||||
self._change_state(context, states.REVERTING)
|
||||
except Exception:
|
||||
LOG.exception("Dropping exception catched when"
|
||||
" changing state to reverting while performing"
|
||||
" reconcilation on a tasks exception.")
|
||||
|
||||
def rollback_tasks(reversions, tolerant):
|
||||
for (i, (task, result)) in enumerate(reversions):
|
||||
try:
|
||||
task.revert(context, result, cause)
|
||||
except Exception:
|
||||
# Ex: WARN: Failed rolling back stage 1 (validate_request) of
|
||||
# chain validation due to Y exception.
|
||||
log_f = LOG.warn
|
||||
if not tolerant:
|
||||
log_f = LOG.exception
|
||||
msg = ("Failed rolling back stage %(index)s (%(task)s)"
|
||||
" of flow %(flow)s, due to inner exception.")
|
||||
log_f(msg % {'index': (i + 1), 'task': task, 'flow': self})
|
||||
if not tolerant:
|
||||
# NOTE(harlowja): LOG a msg AND re-raise the exception
|
||||
# if the flow does not tolerate exceptions happening
|
||||
# in the rollback method.
|
||||
raise
|
||||
|
||||
try:
|
||||
rollback_tasks(reversed(self._reversions), self.tolerant)
|
||||
finally:
|
||||
try:
|
||||
task.revert(context, result, cause)
|
||||
self._change_state(context, states.FAILURE)
|
||||
except Exception:
|
||||
# Ex: WARN: Failed rolling back stage 1 (validate_request) of
|
||||
# chain validation due to Y exception.
|
||||
log_f = LOG.warn
|
||||
if not self.tolerant:
|
||||
log_f = LOG.exception
|
||||
msg = ("Failed rolling back stage %(index)s (%(task)s)"
|
||||
" of flow %(flow)s, due to inner exception.")
|
||||
log_f(msg % {'index': (i + 1), 'task': task, 'flow': self})
|
||||
if not self.tolerant:
|
||||
# NOTE(harlowja): LOG a msg AND re-raise the exception if
|
||||
# the chain does not tolerate exceptions happening in the
|
||||
# rollback method.
|
||||
raise
|
||||
LOG.exception("Dropping exception catched when"
|
||||
" changing state to failure while performing"
|
||||
" reconcilation on a tasks exception.")
|
||||
if self.parents:
|
||||
# Rollback any parents flows if they exist...
|
||||
for p in self.parents:
|
||||
|
||||
Reference in New Issue
Block a user