diff --git a/taskflow/retry.py b/taskflow/retry.py index b7135a92..3015c79d 100644 --- a/taskflow/retry.py +++ b/taskflow/retry.py @@ -241,15 +241,20 @@ class Times(Retry): """Retries subflow given number of times. Returns attempt number.""" def __init__(self, attempts=1, name=None, provides=None, requires=None, - auto_extract=True, rebind=None): + auto_extract=True, rebind=None, revert_all=False): super(Times, self).__init__(name, provides, requires, auto_extract, rebind) self._attempts = attempts + if revert_all: + self._revert_action = REVERT_ALL + else: + self._revert_action = REVERT + def on_failure(self, history, *args, **kwargs): if len(history) < self._attempts: return RETRY - return REVERT + return self._revert_action def execute(self, history, *args, **kwargs): return len(history) + 1 @@ -258,6 +263,16 @@ class Times(Retry): class ForEachBase(Retry): """Base class for retries that iterate over a given collection.""" + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None, revert_all=False): + super(ForEachBase, self).__init__(name, provides, requires, + auto_extract, rebind) + + if revert_all: + self._revert_action = REVERT_ALL + else: + self._revert_action = REVERT + def _get_next_value(self, values, history): # Fetches the next resolution result to try, removes overlapping # entries with what has already been tried and then returns the first @@ -272,7 +287,7 @@ class ForEachBase(Retry): try: self._get_next_value(values, history) except exc.NotFound: - return REVERT + return self._revert_action else: return RETRY @@ -285,9 +300,9 @@ class ForEach(ForEachBase): """ def __init__(self, values, name=None, provides=None, requires=None, - auto_extract=True, rebind=None): + auto_extract=True, rebind=None, revert_all=False): super(ForEach, self).__init__(name, provides, requires, - auto_extract, rebind) + auto_extract, rebind, revert_all) self._values = values def on_failure(self, history, *args, **kwargs): @@ -307,6 +322,12 @@ class ParameterizedForEach(ForEachBase): each try. """ + def __init__(self, name=None, provides=None, requires=None, + auto_extract=True, rebind=None, revert_all=False): + super(ParameterizedForEach, self).__init__(name, provides, requires, + auto_extract, rebind, + revert_all) + def on_failure(self, values, history, *args, **kwargs): return self._on_failure(values, history) diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index edcc6d8b..ddb256b0 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -336,6 +336,66 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f SUCCESS'] self.assertEqual(expected, capturer.values) + def test_nested_flow_with_retry_revert(self): + retry1 = retry.Times(0, 'r1', provides='x2') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.ConditionalTask("task2", inject={'x': 1})) + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + self.assertEqual(engine.storage.fetch_all(), {'y': 2}) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_nested_flow_with_retry_revert_all(self): + retry1 = retry.Times(0, 'r1', provides='x2', revert_all=True) + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.ConditionalTask("task2", inject={'x': 1})) + ) + engine = self._make_engine(flow) + engine.storage.inject({'y': 2}) + with utils.CaptureListener(engine) as capturer: + try: + engine.run() + except Exception: + pass + self.assertEqual(engine.storage.fetch_all(), {'y': 2}) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(1)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot!)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_revert_all_retry(self): flow = lf.Flow('flow-1', retry.Times(3, 'r1', provides='x')).add( utils.ProgressingTask("task1"), @@ -594,6 +654,108 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) + def test_nested_for_each_revert(self): + collection = [3, 2, 3, 5] + retry1 = retry.ForEach(collection, 'r1', provides='x') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task2') + ) + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_nested_for_each_revert_all(self): + collection = [3, 2, 3, 5] + retry1 = retry.ForEach(collection, 'r1', provides='x', revert_all=True) + flow = lf.Flow('flow-1').add( + utils.ProgressingTask("task1"), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task2') + ) + ) + engine = self._make_engine(flow) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task1.t RUNNING', + 'task1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r RETRYING', + 'task2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task2.t RUNNING', + 'task2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task2.t REVERTING', + 'task2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'task1.t REVERTING', + 'task1.t REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_for_each_empty_collection(self): values = [] retry1 = retry.ForEach(values, 'r1', provides='x') @@ -674,6 +836,95 @@ class RetryTest(utils.EngineTestBase): 'flow-1.f REVERTED'] self.assertItemsEqual(capturer.values, expected) + def test_nested_parameterized_for_each_revert(self): + values = [3, 2, 5] + retry1 = retry.ParameterizedForEach('r1', provides='x') + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task-2') + ) + ) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + + def test_nested_parameterized_for_each_revert_all(self): + values = [3, 2, 5] + retry1 = retry.ParameterizedForEach('r1', provides='x', + revert_all=True) + flow = lf.Flow('flow-1').add( + utils.ProgressingTask('task-1'), + lf.Flow('flow-2', retry1).add( + utils.FailingTaskWithOneArg('task-2') + ) + ) + engine = self._make_engine(flow) + engine.storage.inject({'values': values, 'y': 1}) + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegexp(RuntimeError, '^Woot', engine.run) + expected = ['flow-1.f RUNNING', + 'task-1.t RUNNING', + 'task-1.t SUCCESS(5)', + 'r1.r RUNNING', + 'r1.r SUCCESS(3)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 3)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(2)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 2)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r RETRYING', + 'task-2.t PENDING', + 'r1.r RUNNING', + 'r1.r SUCCESS(5)', + 'task-2.t RUNNING', + 'task-2.t FAILURE(Failure: RuntimeError: Woot with 5)', + 'task-2.t REVERTING', + 'task-2.t REVERTED', + 'r1.r REVERTING', + 'r1.r REVERTED', + 'task-1.t REVERTING', + 'task-1.t REVERTED', + 'flow-1.f REVERTED'] + self.assertEqual(expected, capturer.values) + def test_parameterized_for_each_empty_collection(self): values = [] retry1 = retry.ParameterizedForEach('r1', provides='x')