diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py index 04ee4393..ef4cf21c 100644 --- a/taskflow/patterns/ordered_flow.py +++ b/taskflow/patterns/ordered_flow.py @@ -107,12 +107,6 @@ class Flow(object): # some type of undo of the tasks already completed. cause = FlowFailure(task, self, excp) with excutils.save_and_reraise_exception(): - try: - self._change_state(context, states.REVERTING) - except Exception: - LOG.exception("Dropping exception catched when" - " changing state to reverting while performing" - " reconcilation on a tasks exception.") try: self._on_task_error(context, task) except Exception: @@ -122,15 +116,7 @@ class Flow(object): # The default strategy will be to rollback all the contained # tasks by calling there reverting methods, and then calling # any parent flows rollbacks (and so-on). - try: - self.rollback(context, cause) - finally: - try: - self._change_state(context, states.FAILURE) - except Exception: - LOG.exception("Dropping exception catched when" - " changing state to failure while performing" - " reconcilation on a tasks exception.") + self.rollback(context, cause) def run(self, context, *args, **kwargs): if self.state != states.PENDING: @@ -263,23 +249,41 @@ class Flow(object): # For example, if each task was creating a file in a directory, then # it's easier to just remove the directory than to ask each task to # delete its file individually. - for (i, (task, result)) in enumerate(reversed(self._reversions)): + try: + self._change_state(context, states.REVERTING) + except Exception: + LOG.exception("Dropping exception catched when" + " changing state to reverting while performing" + " reconcilation on a tasks exception.") + + def rollback_tasks(reversions, tolerant): + for (i, (task, result)) in enumerate(reversions): + try: + task.revert(context, result, cause) + except Exception: + # Ex: WARN: Failed rolling back stage 1 (validate_request) of + # chain validation due to Y exception. + log_f = LOG.warn + if not tolerant: + log_f = LOG.exception + msg = ("Failed rolling back stage %(index)s (%(task)s)" + " of flow %(flow)s, due to inner exception.") + log_f(msg % {'index': (i + 1), 'task': task, 'flow': self}) + if not tolerant: + # NOTE(harlowja): LOG a msg AND re-raise the exception + # if the flow does not tolerate exceptions happening + # in the rollback method. + raise + + try: + rollback_tasks(reversed(self._reversions), self.tolerant) + finally: try: - task.revert(context, result, cause) + self._change_state(context, states.FAILURE) except Exception: - # Ex: WARN: Failed rolling back stage 1 (validate_request) of - # chain validation due to Y exception. - log_f = LOG.warn - if not self.tolerant: - log_f = LOG.exception - msg = ("Failed rolling back stage %(index)s (%(task)s)" - " of flow %(flow)s, due to inner exception.") - log_f(msg % {'index': (i + 1), 'task': task, 'flow': self}) - if not self.tolerant: - # NOTE(harlowja): LOG a msg AND re-raise the exception if - # the chain does not tolerate exceptions happening in the - # rollback method. - raise + LOG.exception("Dropping exception catched when" + " changing state to failure while performing" + " reconcilation on a tasks exception.") if self.parents: # Rollback any parents flows if they exist... for p in self.parents: