Add the ability to alter the task failure reconcilation.
This commit is contained in:
@@ -85,6 +85,38 @@ class Workflow(object):
|
|||||||
def _fetch_inputs(self, task):
|
def _fetch_inputs(self, task):
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
|
def _perform_reconcilation(self, task, excp):
|
||||||
|
# Attempt to reconcile the given exception that occured while applying
|
||||||
|
# the given task and either reconcile said task and its associated
|
||||||
|
# failure, so that the workflow can continue or abort and perform
|
||||||
|
# some type of undo of the tasks already completed.
|
||||||
|
try:
|
||||||
|
self._change_state(context, states.REVERTING)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception("Dropping exception catched when"
|
||||||
|
" changing state to reverting while performing"
|
||||||
|
" reconcilation on a tasks exception.")
|
||||||
|
cause = exc.TaskException(task, self, excp)
|
||||||
|
with excutils.save_and_reraise_exception():
|
||||||
|
try:
|
||||||
|
self._on_task_error(context, task)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception("Dropping exception catched when"
|
||||||
|
" notifying about existing task"
|
||||||
|
" exception.")
|
||||||
|
# The default strategy will be to rollback all the contained
|
||||||
|
# tasks by calling there reverting methods, and then calling
|
||||||
|
# any parent workflows rollbacks (and so-on).
|
||||||
|
try:
|
||||||
|
self.rollback(context, cause)
|
||||||
|
finally:
|
||||||
|
try:
|
||||||
|
self._change_state(context, states.FAILURE)
|
||||||
|
except Exception:
|
||||||
|
LOG.exception("Dropping exception catched when"
|
||||||
|
" changing state to failure while performing"
|
||||||
|
" reconcilation on a tasks exception.")
|
||||||
|
|
||||||
def run(self, context, *args, **kwargs):
|
def run(self, context, *args, **kwargs):
|
||||||
if self.state != states.PENDING:
|
if self.state != states.PENDING:
|
||||||
raise exc.InvalidStateException("Unable to run workflow when "
|
raise exc.InvalidStateException("Unable to run workflow when "
|
||||||
@@ -96,21 +128,6 @@ class Workflow(object):
|
|||||||
result_fetcher = None
|
result_fetcher = None
|
||||||
|
|
||||||
self._change_state(context, states.STARTED)
|
self._change_state(context, states.STARTED)
|
||||||
|
|
||||||
# TODO(harlowja): we can likely add in custom reconcilation strategies
|
|
||||||
# here or around here...
|
|
||||||
def do_rollback_for(task, ex):
|
|
||||||
self._change_state(context, states.REVERTING)
|
|
||||||
with excutils.save_and_reraise_exception():
|
|
||||||
try:
|
|
||||||
self._on_task_error(context, task)
|
|
||||||
except Exception:
|
|
||||||
LOG.exception("Dropping exception catched when"
|
|
||||||
" notifying about existing task"
|
|
||||||
" exception.")
|
|
||||||
self.rollback(context, exc.TaskException(task, self, ex))
|
|
||||||
self._change_state(context, states.FAILURE)
|
|
||||||
|
|
||||||
task_order = self.order()
|
task_order = self.order()
|
||||||
last_task = 0
|
last_task = 0
|
||||||
if result_fetcher:
|
if result_fetcher:
|
||||||
@@ -132,8 +149,8 @@ class Workflow(object):
|
|||||||
# result it returned and not a modified one.
|
# result it returned and not a modified one.
|
||||||
self.results.append((task, copy.deepcopy(result)))
|
self.results.append((task, copy.deepcopy(result)))
|
||||||
self._on_task_finish(context, task, result)
|
self._on_task_finish(context, task, result)
|
||||||
except Exception as ex:
|
except Exception as e:
|
||||||
do_rollback_for(task, ex)
|
self._perform_reconcilation(task, e)
|
||||||
|
|
||||||
self._change_state(context, states.RUNNING)
|
self._change_state(context, states.RUNNING)
|
||||||
was_interrupted = False
|
was_interrupted = False
|
||||||
@@ -158,8 +175,8 @@ class Workflow(object):
|
|||||||
# and not a modified one.
|
# and not a modified one.
|
||||||
self.results.append((task, copy.deepcopy(result)))
|
self.results.append((task, copy.deepcopy(result)))
|
||||||
self._on_task_finish(context, task, result)
|
self._on_task_finish(context, task, result)
|
||||||
except Exception as ex:
|
except Exception as e:
|
||||||
do_rollback_for(task, ex)
|
self._perform_reconcilation(task, e)
|
||||||
|
|
||||||
if not was_interrupted:
|
if not was_interrupted:
|
||||||
# Only gets here if everything went successfully.
|
# Only gets here if everything went successfully.
|
||||||
@@ -201,6 +218,18 @@ class Workflow(object):
|
|||||||
f(context, states.SUCCESS, self, task, result=result)
|
f(context, states.SUCCESS, self, task, result=result)
|
||||||
|
|
||||||
def rollback(self, context, cause):
|
def rollback(self, context, cause):
|
||||||
|
# Performs basic task by task rollback by going through the reverse
|
||||||
|
# order that tasks have finished and asking said task to undo whatever
|
||||||
|
# it has done. If this workflow has any parent workflows then they will
|
||||||
|
# also be called to rollback any tasks said parents contain.
|
||||||
|
#
|
||||||
|
# Note(harlowja): if a workflow can more simply revert a whole set of
|
||||||
|
# tasks via a simpler command then it can override this method to
|
||||||
|
# accomplish that.
|
||||||
|
#
|
||||||
|
# For example, if each task was creating a file in a directory, then
|
||||||
|
# it's easier to just remove the directory than to ask each task to
|
||||||
|
# delete its file individually.
|
||||||
for (i, (task, result)) in enumerate(reversed(self._reversions)):
|
for (i, (task, result)) in enumerate(reversed(self._reversions)):
|
||||||
try:
|
try:
|
||||||
task.revert(context, result, cause)
|
task.revert(context, result, cause)
|
||||||
|
|||||||
Reference in New Issue
Block a user