Fix some more race conditions

Some races conditions prevented the workflow / tasks to be set to
RUNNING during testing.
We explicitely need to wait for the task to be running to make sure
everything works fine.

Related-bug: #2051040

Change-Id: I41e4663ba2c829638670c20c92932a56015f96e3
Signed-off-by: Arnaud M <arnaud.morin@gmail.com>
This commit is contained in:
Arnaud M
2024-10-01 00:22:26 +02:00
parent 858a9aa19c
commit 7fbabf9989
3 changed files with 56 additions and 22 deletions

View File

@@ -401,7 +401,7 @@ class AdhocActionsTest(base.EngineTestCase):
wf_namespace=namespace
)
self.await_workflow_success(wf_ex.id, timeout=5)
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
action_execs = db_api.get_action_executions(

View File

@@ -222,5 +222,4 @@ class ErrorResultTest(base.EngineTestCase):
tasks = wf_ex.task_executions
self.assertEqual(1, len(tasks))
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.IDLE, task1.state)
self._assert_single_item(tasks, name='task1')

View File

@@ -927,6 +927,11 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
self.await_task_running(wf_2_ex_1_task_1_ex.id)
with db_api.transaction():
# Get again now that it's running
wf_2_ex_1_task_1_ex = db_api.get_task_execution(
wf_2_ex_1_task_execs[0].id
)
wf_execs = db_api.get_workflow_executions()
wf_2_ex_1_task_1_action_exs = db_api.get_action_executions(
@@ -937,6 +942,15 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[1].id
)
# And wait for the wf to be RUNNING
self.await_workflow_state(wf_2_ex_2.id, states.RUNNING)
with db_api.transaction():
# Get again now that it's running
wf_2_ex_2 = db_api.get_workflow_execution(
wf_1_task_1_action_exs[1].id
)
wf_2_ex_2_task_execs = wf_2_ex_2.task_executions
wf_2_ex_2_task_1_ex = self._assert_single_item(
@@ -944,6 +958,15 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
name='task1'
)
# And wait for the task to be RUNNING
self.await_task_running(wf_2_ex_2_task_1_ex.id)
with db_api.transaction():
# Get again now that it's running
wf_2_ex_2_task_1_ex = db_api.get_task_execution(
wf_2_ex_2_task_execs[0].id
)
wf_2_ex_2_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_ex_2_task_1_ex.id
)
@@ -952,6 +975,15 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
wf_1_task_1_action_exs[2].id
)
# And wait for the wf to be RUNNING
self.await_workflow_state(wf_2_ex_3.id, states.RUNNING)
with db_api.transaction():
# Get again now that it's running
wf_2_ex_3 = db_api.get_workflow_execution(
wf_1_task_1_action_exs[2].id
)
wf_2_ex_3_task_execs = wf_2_ex_3.task_executions
wf_2_ex_3_task_1_ex = self._assert_single_item(
@@ -959,6 +991,15 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
name='task1'
)
# And wait for the task to be RUNNING
self.await_task_running(wf_2_ex_3_task_1_ex.id)
with db_api.transaction():
# Get again now that it's running
wf_2_ex_3_task_1_ex = db_api.get_task_execution(
wf_2_ex_3_task_execs[0].id
)
wf_2_ex_3_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_2_ex_3_task_1_ex.id
)
@@ -966,6 +1007,15 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
# Get objects for the wf3 subworkflow execution.
wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3')
# And wait for the wf to be RUNNING
self.await_workflow_state(wf_3_ex.id, states.RUNNING)
with db_api.transaction():
# Get again now that it's running
wf_3_ex = db_api.get_workflow_execution(
wf_1_task_2_action_exs[0].id
)
wf_3_task_execs = wf_3_ex.task_executions
wf_3_task_1_ex = self._assert_single_item(
@@ -973,53 +1023,38 @@ class SubworkflowPauseResumeTest(base.EngineTestCase):
name='task1'
)
# And wait for the task to be RUNNING
self.await_task_running(wf_3_task_1_ex.id)
with db_api.transaction():
wf_3_task_1_action_exs = db_api.get_action_executions(
task_execution_id=wf_3_task_1_ex.id
)
# Check state of parent workflow execution.
self.assertEqual(states.RUNNING, wf_1_ex.state)
self.assertEqual(2, len(wf_1_task_execs))
self.assertEqual(states.RUNNING, wf_1_task_1_ex.state)
self.assertEqual(states.RUNNING, wf_1_task_2_ex.state)
self.assertEqual(3, len(wf_1_task_1_action_exs))
# Check state of wf2 (1) subworkflow execution.
self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state)
self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex_1.id)
self.assertEqual(states.RUNNING, wf_2_ex_1.state)
self.assertEqual(1, len(wf_2_ex_1_task_execs))
self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state)
self.assertEqual(1, len(wf_2_ex_1_task_1_action_exs))
self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state)
# Check state of wf2 (2) subworkflow execution.
self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[1].state)
self.assertEqual(wf_1_task_1_action_exs[1].id, wf_2_ex_2.id)
self.assertEqual(states.RUNNING, wf_2_ex_2.state)
self.assertEqual(1, len(wf_2_ex_2_task_execs))
self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state)
self.assertEqual(1, len(wf_2_ex_2_task_1_action_exs))
self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state)
# Check state of wf2 (3) subworkflow execution.
self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[2].state)
self.assertEqual(wf_1_task_1_action_exs[2].id, wf_2_ex_3.id)
self.assertEqual(states.RUNNING, wf_2_ex_3.state)
self.assertEqual(1, len(wf_2_ex_3_task_execs))
self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state)
self.assertEqual(1, len(wf_2_ex_3_task_1_action_exs))
self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state)
# Check state of wf3 subworkflow execution.
self.assertEqual(1, len(wf_1_task_2_action_exs))
self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state)
self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id)
self.assertEqual(states.RUNNING, wf_3_ex.state)
self.assertEqual(1, len(wf_3_task_execs))
self.assertEqual(states.RUNNING, wf_3_task_1_ex.state)
self.assertEqual(1, len(wf_3_task_1_action_exs))
self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state)
# Pause the main workflow.
self.engine.pause_workflow(wf_1_ex.id)