From 0cb423b64209f9412ebe20fb562b916fa09c5cbc Mon Sep 17 00:00:00 2001 From: Gregory Thiemonge Date: Sat, 11 Nov 2023 05:50:17 -0500 Subject: [PATCH] Fix REVERT_ALL with Retries in unordered Flows Fix a bug when using retries with unordered flows, a REVERT_ALL triggered by one of the subflow was overriden by an other subflow running in parallel, leading to an incomplete revert of the flow. Closes-Bug: #2043808 Change-Id: Icf6f99e00621fb9c5c7b79a7f2cbb14df80eb6ac --- ...ix-revert-all-revert-a0310cd7beaa7409.yaml | 6 +++ taskflow/engines/action_engine/runtime.py | 12 +++++ taskflow/tests/unit/test_retries.py | 52 +++++++++++++++++++ taskflow/tests/utils.py | 26 ++++++++++ 4 files changed, 96 insertions(+) create mode 100644 releasenotes/notes/fix-revert-all-revert-a0310cd7beaa7409.yaml diff --git a/releasenotes/notes/fix-revert-all-revert-a0310cd7beaa7409.yaml b/releasenotes/notes/fix-revert-all-revert-a0310cd7beaa7409.yaml new file mode 100644 index 000000000..d7e8e126e --- /dev/null +++ b/releasenotes/notes/fix-revert-all-revert-a0310cd7beaa7409.yaml @@ -0,0 +1,6 @@ +--- +fixes: + - | + Fixed a bug when using retries with unordered flows, a REVERT_ALL triggered + by one of the subflow was overriden by an other subflow running in parallel, + leading to an incomplete revert of the flow. diff --git a/taskflow/engines/action_engine/runtime.py b/taskflow/engines/action_engine/runtime.py index 233c3b260..09577501b 100644 --- a/taskflow/engines/action_engine/runtime.py +++ b/taskflow/engines/action_engine/runtime.py @@ -291,6 +291,18 @@ class Runtime(object): """Resets all the provided atoms to the given state and intention.""" tweaked = [] for atom in atoms: + cur_intention = self.storage.get_atom_intention(atom.name) + # Don't trigger a RETRY if the atom needs to be REVERTED. + # This is a workaround for a bug when REVERT_ALL is applied to + # unordered flows + # (https://bugs.launchpad.net/taskflow/+bug/2043808) + # A subflow may trigger a REVERT_ALL, all the atoms of all the + # related subflows are marked as REVERT but a task of a related + # flow may still be running in another thread. If this task + # triggers a RETRY, it overrides the previously set REVERT status, + # breaking the revert path of the flow. + if cur_intention == st.REVERT and intention == st.RETRY: + continue if state or intention: tweaked.append((atom, state, intention)) if state: diff --git a/taskflow/tests/unit/test_retries.py b/taskflow/tests/unit/test_retries.py index 1fb0303e3..f057ab417 100644 --- a/taskflow/tests/unit/test_retries.py +++ b/taskflow/tests/unit/test_retries.py @@ -15,8 +15,10 @@ # under the License. import testtools +import time import taskflow.engines +from taskflow.engines.action_engine import executor from taskflow import exceptions as exc from taskflow.patterns import graph_flow as gf from taskflow.patterns import linear_flow as lf @@ -497,6 +499,56 @@ class RetryTest(utils.EngineTestBase): self.assertRaisesRegex(RuntimeError, '^Woot', engine.run) self.assertRaisesRegex(RuntimeError, '^Woot', engine.run) + def test_restart_reverted_unordered_flows_with_retries(self): + now = time.time() + + # First flow of an unordered flow: + subflow1 = lf.Flow('subflow1') + + # * a task that completes in 3 sec with a few retries + subsubflow1 = lf.Flow('subflow1.subsubflow1', + retry=utils.RetryFiveTimes()) + subsubflow1.add(utils.SuccessAfter3Sec('subflow1.fail1', + inject={'start_time': now})) + subflow1.add(subsubflow1) + + # * a task that fails and triggers a revert after 5 retries + subsubflow2 = lf.Flow('subflow1.subsubflow2', + retry=utils.RetryFiveTimes()) + subsubflow2.add(utils.FailingTask('subflow1.fail2')) + subflow1.add(subsubflow2) + + # Second flow of the unordered flow: + subflow2 = lf.Flow('subflow2') + + # * a task that always fails and retries + subsubflow1 = lf.Flow('subflow2.subsubflow1', + retry=utils.AlwaysRetry()) + subsubflow1.add(utils.FailingTask('subflow2.fail1')) + subflow2.add(subsubflow1) + + unordered_flow = uf.Flow('unordered_flow') + unordered_flow.add(subflow1, subflow2) + + # Main flow, contains a simple task and an unordered flow + flow = lf.Flow('test') + flow.add(utils.NoopTask('task1')) + flow.add(unordered_flow) + + engine = self._make_engine(flow) + + # This test fails when using Green threads, skipping it for now + if isinstance(engine._task_executor, + executor.ParallelGreenThreadTaskExecutor): + self.skipTest("Skipping this test when using green threads.") + + with utils.CaptureListener(engine) as capturer: + self.assertRaisesRegex(exc.WrappedFailure, + '.*RuntimeError: Woot!', + engine.run) + # task1 should have been reverted + self.assertIn('task1.t REVERTED(None)', capturer.values) + def test_run_just_retry(self): flow = utils.OneReturnRetry(provides='x') engine = self._make_engine(flow) diff --git a/taskflow/tests/utils.py b/taskflow/tests/utils.py index 2b027599e..402c31f0f 100644 --- a/taskflow/tests/utils.py +++ b/taskflow/tests/utils.py @@ -217,6 +217,32 @@ class FailingTask(ProgressingTask): raise RuntimeError('Woot!') +class SimpleTask(task.Task): + def execute(self, time_sleep=0, **kwargs): + time.sleep(time_sleep) + + +class SuccessAfter3Sec(task.Task): + def execute(self, start_time, **kwargs): + now = time.time() + if now - start_time >= 3: + return None + raise RuntimeError('Woot!') + + +class RetryFiveTimes(retry.Times): + def on_failure(self, history, *args, **kwargs): + if len(history) < 5: + time.sleep(1) + return retry.RETRY + return retry.REVERT_ALL + + +class AlwaysRetry(retry.Times): + def on_failure(self, history, *args, **kwargs): + return retry.RETRY + + class OptionalTask(task.Task): def execute(self, a, b=5): result = a * b