diff --git a/taskflow/patterns/ordered_workflow.py b/taskflow/patterns/ordered_workflow.py index ddc22a81..64c212b3 100644 --- a/taskflow/patterns/ordered_workflow.py +++ b/taskflow/patterns/ordered_workflow.py @@ -85,6 +85,38 @@ class Workflow(object): def _fetch_inputs(self, task): return {} + def _perform_reconcilation(self, task, excp): + # Attempt to reconcile the given exception that occured while applying + # the given task and either reconcile said task and its associated + # failure, so that the workflow can continue or abort and perform + # some type of undo of the tasks already completed. + try: + self._change_state(context, states.REVERTING) + except Exception: + LOG.exception("Dropping exception catched when" + " changing state to reverting while performing" + " reconcilation on a tasks exception.") + cause = exc.TaskException(task, self, excp) + with excutils.save_and_reraise_exception(): + try: + self._on_task_error(context, task) + except Exception: + LOG.exception("Dropping exception catched when" + " notifying about existing task" + " exception.") + # The default strategy will be to rollback all the contained + # tasks by calling there reverting methods, and then calling + # any parent workflows rollbacks (and so-on). + try: + self.rollback(context, cause) + finally: + try: + self._change_state(context, states.FAILURE) + except Exception: + LOG.exception("Dropping exception catched when" + " changing state to failure while performing" + " reconcilation on a tasks exception.") + def run(self, context, *args, **kwargs): if self.state != states.PENDING: raise exc.InvalidStateException("Unable to run workflow when " @@ -96,21 +128,6 @@ class Workflow(object): result_fetcher = None self._change_state(context, states.STARTED) - - # TODO(harlowja): we can likely add in custom reconcilation strategies - # here or around here... - def do_rollback_for(task, ex): - self._change_state(context, states.REVERTING) - with excutils.save_and_reraise_exception(): - try: - self._on_task_error(context, task) - except Exception: - LOG.exception("Dropping exception catched when" - " notifying about existing task" - " exception.") - self.rollback(context, exc.TaskException(task, self, ex)) - self._change_state(context, states.FAILURE) - task_order = self.order() last_task = 0 if result_fetcher: @@ -132,8 +149,8 @@ class Workflow(object): # result it returned and not a modified one. self.results.append((task, copy.deepcopy(result))) self._on_task_finish(context, task, result) - except Exception as ex: - do_rollback_for(task, ex) + except Exception as e: + self._perform_reconcilation(task, e) self._change_state(context, states.RUNNING) was_interrupted = False @@ -158,8 +175,8 @@ class Workflow(object): # and not a modified one. self.results.append((task, copy.deepcopy(result))) self._on_task_finish(context, task, result) - except Exception as ex: - do_rollback_for(task, ex) + except Exception as e: + self._perform_reconcilation(task, e) if not was_interrupted: # Only gets here if everything went successfully. @@ -201,6 +218,18 @@ class Workflow(object): f(context, states.SUCCESS, self, task, result=result) def rollback(self, context, cause): + # Performs basic task by task rollback by going through the reverse + # order that tasks have finished and asking said task to undo whatever + # it has done. If this workflow has any parent workflows then they will + # also be called to rollback any tasks said parents contain. + # + # Note(harlowja): if a workflow can more simply revert a whole set of + # tasks via a simpler command then it can override this method to + # accomplish that. + # + # For example, if each task was creating a file in a directory, then + # it's easier to just remove the directory than to ask each task to + # delete its file individually. for (i, (task, result)) in enumerate(reversed(self._reversions)): try: task.revert(context, result, cause)