Fix REVERT_ALL with Retries in unordered Flows

Fix a bug when using retries with unordered flows, a REVERT_ALL
triggered by one of the subflow was overriden by an other subflow
running in parallel, leading to an incomplete revert of the flow.

Closes-Bug: #2043808
Change-Id: Icf6f99e00621fb9c5c7b79a7f2cbb14df80eb6ac
This commit is contained in:
Gregory Thiemonge 2023-11-11 05:50:17 -05:00
parent cc3bd412b6
commit 0cb423b642
4 changed files with 96 additions and 0 deletions

View File

@ -0,0 +1,6 @@
---
fixes:
- |
Fixed a bug when using retries with unordered flows, a REVERT_ALL triggered
by one of the subflow was overriden by an other subflow running in parallel,
leading to an incomplete revert of the flow.

View File

@ -291,6 +291,18 @@ class Runtime(object):
"""Resets all the provided atoms to the given state and intention."""
tweaked = []
for atom in atoms:
cur_intention = self.storage.get_atom_intention(atom.name)
# Don't trigger a RETRY if the atom needs to be REVERTED.
# This is a workaround for a bug when REVERT_ALL is applied to
# unordered flows
# (https://bugs.launchpad.net/taskflow/+bug/2043808)
# A subflow may trigger a REVERT_ALL, all the atoms of all the
# related subflows are marked as REVERT but a task of a related
# flow may still be running in another thread. If this task
# triggers a RETRY, it overrides the previously set REVERT status,
# breaking the revert path of the flow.
if cur_intention == st.REVERT and intention == st.RETRY:
continue
if state or intention:
tweaked.append((atom, state, intention))
if state:

View File

@ -15,8 +15,10 @@
# under the License.
import testtools
import time
import taskflow.engines
from taskflow.engines.action_engine import executor
from taskflow import exceptions as exc
from taskflow.patterns import graph_flow as gf
from taskflow.patterns import linear_flow as lf
@ -497,6 +499,56 @@ class RetryTest(utils.EngineTestBase):
self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
self.assertRaisesRegex(RuntimeError, '^Woot', engine.run)
def test_restart_reverted_unordered_flows_with_retries(self):
now = time.time()
# First flow of an unordered flow:
subflow1 = lf.Flow('subflow1')
# * a task that completes in 3 sec with a few retries
subsubflow1 = lf.Flow('subflow1.subsubflow1',
retry=utils.RetryFiveTimes())
subsubflow1.add(utils.SuccessAfter3Sec('subflow1.fail1',
inject={'start_time': now}))
subflow1.add(subsubflow1)
# * a task that fails and triggers a revert after 5 retries
subsubflow2 = lf.Flow('subflow1.subsubflow2',
retry=utils.RetryFiveTimes())
subsubflow2.add(utils.FailingTask('subflow1.fail2'))
subflow1.add(subsubflow2)
# Second flow of the unordered flow:
subflow2 = lf.Flow('subflow2')
# * a task that always fails and retries
subsubflow1 = lf.Flow('subflow2.subsubflow1',
retry=utils.AlwaysRetry())
subsubflow1.add(utils.FailingTask('subflow2.fail1'))
subflow2.add(subsubflow1)
unordered_flow = uf.Flow('unordered_flow')
unordered_flow.add(subflow1, subflow2)
# Main flow, contains a simple task and an unordered flow
flow = lf.Flow('test')
flow.add(utils.NoopTask('task1'))
flow.add(unordered_flow)
engine = self._make_engine(flow)
# This test fails when using Green threads, skipping it for now
if isinstance(engine._task_executor,
executor.ParallelGreenThreadTaskExecutor):
self.skipTest("Skipping this test when using green threads.")
with utils.CaptureListener(engine) as capturer:
self.assertRaisesRegex(exc.WrappedFailure,
'.*RuntimeError: Woot!',
engine.run)
# task1 should have been reverted
self.assertIn('task1.t REVERTED(None)', capturer.values)
def test_run_just_retry(self):
flow = utils.OneReturnRetry(provides='x')
engine = self._make_engine(flow)

View File

@ -217,6 +217,32 @@ class FailingTask(ProgressingTask):
raise RuntimeError('Woot!')
class SimpleTask(task.Task):
def execute(self, time_sleep=0, **kwargs):
time.sleep(time_sleep)
class SuccessAfter3Sec(task.Task):
def execute(self, start_time, **kwargs):
now = time.time()
if now - start_time >= 3:
return None
raise RuntimeError('Woot!')
class RetryFiveTimes(retry.Times):
def on_failure(self, history, *args, **kwargs):
if len(history) < 5:
time.sleep(1)
return retry.RETRY
return retry.REVERT_ALL
class AlwaysRetry(retry.Times):
def on_failure(self, history, *args, **kwargs):
return retry.RETRY
class OptionalTask(task.Task):
def execute(self, a, b=5):
result = a * b