From d4f74720cb57c06e7c4b291f1c25fb79997ff98e Mon Sep 17 00:00:00 2001 From: Joshua Harlow Date: Wed, 22 May 2013 14:25:11 -0700 Subject: [PATCH] Ensure we change the self and parents states correctly. Change the state of the current flow in the rollback method itself so that if we have parent workflows they also get there states changed. --- taskflow/patterns/ordered_flow.py | 64 ++++++++++++++++--------------- 1 file changed, 34 insertions(+), 30 deletions(-) diff --git a/taskflow/patterns/ordered_flow.py b/taskflow/patterns/ordered_flow.py index 04ee4393..ef4cf21c 100644 --- a/taskflow/patterns/ordered_flow.py +++ b/taskflow/patterns/ordered_flow.py @@ -107,12 +107,6 @@ class Flow(object): # some type of undo of the tasks already completed. cause = FlowFailure(task, self, excp) with excutils.save_and_reraise_exception(): - try: - self._change_state(context, states.REVERTING) - except Exception: - LOG.exception("Dropping exception catched when" - " changing state to reverting while performing" - " reconcilation on a tasks exception.") try: self._on_task_error(context, task) except Exception: @@ -122,15 +116,7 @@ class Flow(object): # The default strategy will be to rollback all the contained # tasks by calling there reverting methods, and then calling # any parent flows rollbacks (and so-on). - try: - self.rollback(context, cause) - finally: - try: - self._change_state(context, states.FAILURE) - except Exception: - LOG.exception("Dropping exception catched when" - " changing state to failure while performing" - " reconcilation on a tasks exception.") + self.rollback(context, cause) def run(self, context, *args, **kwargs): if self.state != states.PENDING: @@ -263,23 +249,41 @@ class Flow(object): # For example, if each task was creating a file in a directory, then # it's easier to just remove the directory than to ask each task to # delete its file individually. - for (i, (task, result)) in enumerate(reversed(self._reversions)): + try: + self._change_state(context, states.REVERTING) + except Exception: + LOG.exception("Dropping exception catched when" + " changing state to reverting while performing" + " reconcilation on a tasks exception.") + + def rollback_tasks(reversions, tolerant): + for (i, (task, result)) in enumerate(reversions): + try: + task.revert(context, result, cause) + except Exception: + # Ex: WARN: Failed rolling back stage 1 (validate_request) of + # chain validation due to Y exception. + log_f = LOG.warn + if not tolerant: + log_f = LOG.exception + msg = ("Failed rolling back stage %(index)s (%(task)s)" + " of flow %(flow)s, due to inner exception.") + log_f(msg % {'index': (i + 1), 'task': task, 'flow': self}) + if not tolerant: + # NOTE(harlowja): LOG a msg AND re-raise the exception + # if the flow does not tolerate exceptions happening + # in the rollback method. + raise + + try: + rollback_tasks(reversed(self._reversions), self.tolerant) + finally: try: - task.revert(context, result, cause) + self._change_state(context, states.FAILURE) except Exception: - # Ex: WARN: Failed rolling back stage 1 (validate_request) of - # chain validation due to Y exception. - log_f = LOG.warn - if not self.tolerant: - log_f = LOG.exception - msg = ("Failed rolling back stage %(index)s (%(task)s)" - " of flow %(flow)s, due to inner exception.") - log_f(msg % {'index': (i + 1), 'task': task, 'flow': self}) - if not self.tolerant: - # NOTE(harlowja): LOG a msg AND re-raise the exception if - # the chain does not tolerate exceptions happening in the - # rollback method. - raise + LOG.exception("Dropping exception catched when" + " changing state to failure while performing" + " reconcilation on a tasks exception.") if self.parents: # Rollback any parents flows if they exist... for p in self.parents: