diff --git a/taskflow/patterns/linear_flow.py b/taskflow/patterns/linear_flow.py index 7ae17c54..14f69f0b 100644 --- a/taskflow/patterns/linear_flow.py +++ b/taskflow/patterns/linear_flow.py @@ -19,6 +19,7 @@ import collections import functools import logging +import sys import threading from taskflow.openstack.common import excutils @@ -209,9 +210,9 @@ class Flow(flow.Flow): 'flow': self, 'runner': runner, }) - except Exception as e: - runner.result = e - cause = utils.FlowFailure(runner, self, e) + except Exception: + runner.result = None + runner.exc_info = sys.exc_info() with excutils.save_and_reraise_exception(): # Notify any listeners that the task has errored. self.task_notifier.notify(states.FAILURE, details={ @@ -219,7 +220,7 @@ class Flow(flow.Flow): 'flow': self, 'runner': runner, }) - self.rollback(context, cause) + self.rollback(context, utils.FlowFailure(runner, self)) run_check_functor = functools.partial(abort_if, ok_states=[states.STARTED, diff --git a/taskflow/patterns/threaded_flow.py b/taskflow/patterns/threaded_flow.py index 6869b63f..38267388 100644 --- a/taskflow/patterns/threaded_flow.py +++ b/taskflow/patterns/threaded_flow.py @@ -342,8 +342,7 @@ class Flow(flow.Flow): return causes = [] for r in failures: - causes.append(utils.FlowFailure(r, self, - r.exc, r.exc_info)) + causes.append(utils.FlowFailure(r, self)) try: self.rollback(context, causes) except exc.InvalidStateException: @@ -470,9 +469,6 @@ class ThreadRunner(utils.Runner): self._predecessors = [] self._successors = [] self._siblings = [] - # Ensure we capture any exceptions that may have been triggered. - self.exc = None - self.exc_info = (None, None, None) # This callback will be called before the underlying task is actually # returned and it should either return a tuple of (has_result, result) self._result_cb = None @@ -538,10 +534,10 @@ class ThreadRunner(utils.Runner): " state: %s" % (self.state)) def do_reset(): + super(ThreadRunner, self).reset() self._latch.count = len(self._predecessors) - self.exc = None - self.exc_info = (None, None, None) - self.result = None + + def change_state(): self._change_state(None, states.PENDING) # We need to acquire both locks here so that we can not be running @@ -552,6 +548,7 @@ class ThreadRunner(utils.Runner): with self._cancel_lock: check() do_reset() + change_state() @property def runs_before(self): @@ -583,10 +580,10 @@ class ThreadRunner(utils.Runner): if not has_result: super(ThreadRunner, self).__call__(*args, **kwargs) self._change_state(context, states.SUCCESS) - except Exception as e: - self._change_state(context, states.FAILURE) - self.exc = e + except Exception: + self.result = None self.exc_info = sys.exc_info() + self._change_state(context, states.FAILURE) def signal(): if not self._successors: diff --git a/taskflow/utils.py b/taskflow/utils.py index 2a5db454..f82bdd47 100644 --- a/taskflow/utils.py +++ b/taskflow/utils.py @@ -22,12 +22,12 @@ import contextlib import copy import inspect import logging -import sys import threading -import threading2 import time import types +import threading2 + from distutils import version from taskflow.openstack.common import uuidutils @@ -250,14 +250,17 @@ class FlowFailure(object): and can be used to interrogate what caused the failure. """ - def __init__(self, runner, flow, exc, exc_info=None): + def __init__(self, runner, flow): self.runner = runner self.flow = flow - self.exc = exc - if not exc_info: - self.exc_info = sys.exc_info() - else: - self.exc_info = exc_info + + @property + def exc_info(self): + return self.runner.exc_info + + @property + def exc(self): + return self.runner.exc_info[1] class RollbackTask(object): @@ -301,6 +304,7 @@ class Runner(object): self._id = uuidutils.generate_uuid() else: self._id = str(uuid) + self.exc_info = (None, None, None) @property def uuid(self): @@ -332,6 +336,7 @@ class Runner(object): def reset(self): self.result = None + self.exc_info = (None, None, None) def __str__(self): lines = ["Runner: %s" % (self.name)]