Adding a revert_all option to retry controllers

When a retry controller is added to a nested flow, with the exception of
AlwaysRetryAll, existing retry controllers do not cause a revert of
predecessor tasks in the surrounding flow.  This is due to the fact that
these retry controllers always return RETRY.

The revert_all option, when set True, will result in the retry
controller returning REVERT_ALL rather than REVERT, which will cause
predecessor tasks in the surrounding flow to also be REVERTed.

Change-Id: I2d867b2d05e8559121ec48fd7249f15078450532
This commit is contained in:
Min Pae
2015-06-10 02:04:05 -07:00
parent ea51f8b109
commit 3d4bbb92d8
2 changed files with 277 additions and 5 deletions

View File

@@ -241,15 +241,20 @@ class Times(Retry):
"""Retries subflow given number of times. Returns attempt number."""
def __init__(self, attempts=1, name=None, provides=None, requires=None,
auto_extract=True, rebind=None):
auto_extract=True, rebind=None, revert_all=False):
super(Times, self).__init__(name, provides, requires,
auto_extract, rebind)
self._attempts = attempts
if revert_all:
self._revert_action = REVERT_ALL
else:
self._revert_action = REVERT
def on_failure(self, history, *args, **kwargs):
if len(history) < self._attempts:
return RETRY
return REVERT
return self._revert_action
def execute(self, history, *args, **kwargs):
return len(history) + 1
@@ -258,6 +263,16 @@ class Times(Retry):
class ForEachBase(Retry):
"""Base class for retries that iterate over a given collection."""
def __init__(self, name=None, provides=None, requires=None,
auto_extract=True, rebind=None, revert_all=False):
super(ForEachBase, self).__init__(name, provides, requires,
auto_extract, rebind)
if revert_all:
self._revert_action = REVERT_ALL
else:
self._revert_action = REVERT
def _get_next_value(self, values, history):
# Fetches the next resolution result to try, removes overlapping
# entries with what has already been tried and then returns the first
@@ -272,7 +287,7 @@ class ForEachBase(Retry):
try:
self._get_next_value(values, history)
except exc.NotFound:
return REVERT
return self._revert_action
else:
return RETRY
@@ -285,9 +300,9 @@ class ForEach(ForEachBase):
"""
def __init__(self, values, name=None, provides=None, requires=None,
auto_extract=True, rebind=None):
auto_extract=True, rebind=None, revert_all=False):
super(ForEach, self).__init__(name, provides, requires,
auto_extract, rebind)
auto_extract, rebind, revert_all)
self._values = values
def on_failure(self, history, *args, **kwargs):
@@ -307,6 +322,12 @@ class ParameterizedForEach(ForEachBase):
each try.
"""
def __init__(self, name=None, provides=None, requires=None,
auto_extract=True, rebind=None, revert_all=False):
super(ParameterizedForEach, self).__init__(name, provides, requires,
auto_extract, rebind,
revert_all)
def on_failure(self, values, history, *args, **kwargs):
return self._on_failure(values, history)

View File

@@ -336,6 +336,66 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f SUCCESS']
self.assertEqual(expected, capturer.values)
def test_nested_flow_with_retry_revert(self):
retry1 = retry.Times(0, 'r1', provides='x2')
flow = lf.Flow('flow-1').add(
utils.ProgressingTask("task1"),
lf.Flow('flow-2', retry1).add(
utils.ConditionalTask("task2", inject={'x': 1}))
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
with utils.CaptureListener(engine) as capturer:
try:
engine.run()
except Exception:
pass
self.assertEqual(engine.storage.fetch_all(), {'y': 2})
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(1)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r REVERTING',
'r1.r REVERTED',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_nested_flow_with_retry_revert_all(self):
retry1 = retry.Times(0, 'r1', provides='x2', revert_all=True)
flow = lf.Flow('flow-1').add(
utils.ProgressingTask("task1"),
lf.Flow('flow-2', retry1).add(
utils.ConditionalTask("task2", inject={'x': 1}))
)
engine = self._make_engine(flow)
engine.storage.inject({'y': 2})
with utils.CaptureListener(engine) as capturer:
try:
engine.run()
except Exception:
pass
self.assertEqual(engine.storage.fetch_all(), {'y': 2})
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(1)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot!)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r REVERTING',
'r1.r REVERTED',
'task1.t REVERTING',
'task1.t REVERTED',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_revert_all_retry(self):
flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add(
utils.ProgressingTask("task1"),
@@ -594,6 +654,108 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f REVERTED']
self.assertItemsEqual(capturer.values, expected)
def test_nested_for_each_revert(self):
collection = [3, 2, 3, 5]
retry1 = retry.ForEach(collection, 'r1', provides='x')
flow = lf.Flow('flow-1').add(
utils.ProgressingTask("task1"),
lf.Flow('flow-2', retry1).add(
utils.FailingTaskWithOneArg('task2')
)
)
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(2)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(5)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r REVERTING',
'r1.r REVERTED',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_nested_for_each_revert_all(self):
collection = [3, 2, 3, 5]
retry1 = retry.ForEach(collection, 'r1', provides='x', revert_all=True)
flow = lf.Flow('flow-1').add(
utils.ProgressingTask("task1"),
lf.Flow('flow-2', retry1).add(
utils.FailingTaskWithOneArg('task2')
)
)
engine = self._make_engine(flow)
with utils.CaptureListener(engine) as capturer:
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task1.t RUNNING',
'task1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(2)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r RETRYING',
'task2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(5)',
'task2.t RUNNING',
'task2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task2.t REVERTING',
'task2.t REVERTED',
'r1.r REVERTING',
'r1.r REVERTED',
'task1.t REVERTING',
'task1.t REVERTED',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_for_each_empty_collection(self):
values = []
retry1 = retry.ForEach(values, 'r1', provides='x')
@@ -674,6 +836,95 @@ class RetryTest(utils.EngineTestBase):
'flow-1.f REVERTED']
self.assertItemsEqual(capturer.values, expected)
def test_nested_parameterized_for_each_revert(self):
values = [3, 2, 5]
retry1 = retry.ParameterizedForEach('r1', provides='x')
flow = lf.Flow('flow-1').add(
utils.ProgressingTask('task-1'),
lf.Flow('flow-2', retry1).add(
utils.FailingTaskWithOneArg('task-2')
)
)
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
with utils.CaptureListener(engine) as capturer:
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task-1.t RUNNING',
'task-1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(2)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(5)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'r1.r REVERTING',
'r1.r REVERTED',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_nested_parameterized_for_each_revert_all(self):
values = [3, 2, 5]
retry1 = retry.ParameterizedForEach('r1', provides='x',
revert_all=True)
flow = lf.Flow('flow-1').add(
utils.ProgressingTask('task-1'),
lf.Flow('flow-2', retry1).add(
utils.FailingTaskWithOneArg('task-2')
)
)
engine = self._make_engine(flow)
engine.storage.inject({'values': values, 'y': 1})
with utils.CaptureListener(engine) as capturer:
self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run)
expected = ['flow-1.f RUNNING',
'task-1.t RUNNING',
'task-1.t SUCCESS(5)',
'r1.r RUNNING',
'r1.r SUCCESS(3)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(2)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'r1.r RETRYING',
'task-2.t PENDING',
'r1.r RUNNING',
'r1.r SUCCESS(5)',
'task-2.t RUNNING',
'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)',
'task-2.t REVERTING',
'task-2.t REVERTED',
'r1.r REVERTING',
'r1.r REVERTED',
'task-1.t REVERTING',
'task-1.t REVERTED',
'flow-1.f REVERTED']
self.assertEqual(expected, capturer.values)
def test_parameterized_for_each_empty_collection(self):
values = []
retry1 = retry.ParameterizedForEach('r1', provides='x')