Raise the last exception thrown into DependencyTaskGroup
Normally when exceptions are caught in DependencyTaskGroup, we cancel with a grace period and then re-raise the first exception once all subtasks are stopped. This means that e.g. when a resource fails, the exception the stack gets back is from the first resource failure and not e.g. any resulting exceptions from cancelled in-progress resources. However, for exceptions thrown in to the task from the outside (as opposed to raised by one of the subtasks), we generally want the *last* one to be re-raised. Specifically, if a cancel-without-rollback exception is thrown followed by a cancel-with-rollback, we want the with-rollback exception to be re-raised so that the stack knows to roll back. Change-Id: I4843f9a55a4897a7af86a9aa69df073913fd03e6
This commit is contained in:
parent
991d41f255
commit
796cea6cf4
@ -430,6 +430,8 @@ class DependencyTaskGroup(object):
|
||||
def __call__(self):
|
||||
"""Return a co-routine which runs the task group."""
|
||||
raised_exceptions = []
|
||||
thrown_exceptions = []
|
||||
|
||||
while any(six.itervalues(self._runners)):
|
||||
try:
|
||||
for k, r in self._ready():
|
||||
@ -438,7 +440,11 @@ class DependencyTaskGroup(object):
|
||||
del self._graph[k]
|
||||
|
||||
if self._graph:
|
||||
yield
|
||||
try:
|
||||
yield
|
||||
except Exception:
|
||||
thrown_exceptions.append(sys.exc_info())
|
||||
raise
|
||||
|
||||
for k, r in self._running():
|
||||
if r.step():
|
||||
@ -450,6 +456,7 @@ class DependencyTaskGroup(object):
|
||||
else:
|
||||
self.cancel_all(grace_period=self.error_wait_time)
|
||||
raised_exceptions.append(exc_info)
|
||||
del exc_info
|
||||
except: # noqa
|
||||
with excutils.save_and_reraise_exception():
|
||||
self.cancel_all()
|
||||
@ -459,10 +466,13 @@ class DependencyTaskGroup(object):
|
||||
if self.aggregate_exceptions:
|
||||
raise ExceptionGroup(v for t, v, tb in raised_exceptions)
|
||||
else:
|
||||
exc_type, exc_val, traceback = raised_exceptions[0]
|
||||
raise_(exc_type, exc_val, traceback)
|
||||
if thrown_exceptions:
|
||||
raise_(*thrown_exceptions[-1])
|
||||
else:
|
||||
raise_(*raised_exceptions[0])
|
||||
finally:
|
||||
del raised_exceptions
|
||||
del thrown_exceptions
|
||||
|
||||
def cancel_all(self, grace_period=None):
|
||||
if callable(grace_period):
|
||||
|
@ -371,6 +371,26 @@ class DependencyTaskGroupTest(common.HeatTestCase):
|
||||
exc = self.assertRaises(type(e1), run_tasks_with_exceptions)
|
||||
self.assertEqual(e1, exc)
|
||||
|
||||
def test_thrown_exception_order(self):
|
||||
e1 = Exception('e1')
|
||||
e2 = Exception('e2')
|
||||
|
||||
tasks = (('A', None), ('B', None), ('C', 'A'))
|
||||
deps = dependencies.Dependencies(tasks)
|
||||
|
||||
tg = scheduler.DependencyTaskGroup(
|
||||
deps, DummyTask(), reverse=self.reverse_order,
|
||||
error_wait_time=1,
|
||||
aggregate_exceptions=self.aggregate_exceptions)
|
||||
task = tg()
|
||||
|
||||
next(task)
|
||||
task.throw(e1)
|
||||
next(task)
|
||||
tg.error_wait_time = None
|
||||
exc = self.assertRaises(type(e2), task.throw, e2)
|
||||
self.assertIs(e2, exc)
|
||||
|
||||
|
||||
class TaskTest(common.HeatTestCase):
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user