Rewrite test_retry_async_action

This test was failing from time to time with new eventlet release.
The issue was a race condition.
By refactoring this test, we make sure everything is executed in the
correct order and that we wait properly for expected states before
finishing.

Change-Id: I0e5c8eaa9d88a1ba9a1bd21621aad15ae8e460ca
Signed-off-by: Arnaud M <arnaud.morin@gmail.com>
This commit is contained in:
Arnaud M 2025-01-03 14:04:27 +01:00
parent 84d6baa481
commit ff75c7fc10

View File

@ -1351,6 +1351,14 @@ class PoliciesTest(base.EngineTestCase):
mock.MagicMock(return_value='mock')
)
def test_retry_async_action(self):
"""Test that an action won't be done twice
When completing an action, we are supposed to receive the completion
only once. If we receive it twice or more, it should raise an error.
If we want to retry that action, then the engine will create a new
action execution.
"""
retry_wf = """---
version: '2.0'
repeated_retry:
@ -1358,61 +1366,74 @@ class PoliciesTest(base.EngineTestCase):
async_http:
retry:
delay: 0
count: 100
action: std.mistral_http url='https://google.com'
count: 2
action: std.mistral_http url='https://opendev.org'
"""
# Create and wait for the wf to be running
wf_service.create_workflows(retry_wf)
wf_ex = self.engine.start_workflow('repeated_retry')
self.await_workflow_running(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
# Wait for the task to be running
self.await_task_running(task_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.await_task_running(task_ex.id)
# We need to it back to be bound to a db transaction
task_ex = db_api.get_task_execution(task_ex.id)
first_action_ex = task_ex.executions[0]
self.await_action_state(first_action_ex.id, states.RUNNING)
# Now wait for this action to be running
# Note that the task won't actually run because it's mocked
self.await_action_state(first_action_ex.id, states.RUNNING)
# Send an action complete event with error so the task will retry
complete_action_params = (
first_action_ex.id,
ml_actions.Result(error="mock")
)
rpc.get_engine_client().on_action_complete(*complete_action_params)
for _ in range(2):
self.assertRaises(
exc.MistralException,
rpc.get_engine_client().on_action_complete,
*complete_action_params
)
# Send it a second time -- this should raise a ValueError
self.assertRaises(
exc.MistralException,
rpc.get_engine_client().on_action_complete,
*complete_action_params
)
# And the task should be restarted
self.await_task_running(task_ex.id)
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
second_action_ex = task_ex.executions[1]
# This time, send a success action complete
complete_action_params = (
second_action_ex.id,
ml_actions.Result(data="mock")
)
rpc.get_engine_client().on_action_complete(*complete_action_params)
# Wait for the wf to finish
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.await_workflow_success(wf_ex.id)
# Now check the results
with db_api.transaction():
task_ex = db_api.get_task_execution(task_ex.id)
action_exs = task_ex.executions
# We are supposed to have two actions
self.assertEqual(2, len(action_exs))
for action_ex in action_exs:
if action_ex.id == first_action_ex.id:
expected_state = states.ERROR
else:
expected_state = states.RUNNING
self.assertEqual(expected_state, action_ex.state)
# First is in error
self.assertEqual(states.ERROR, action_exs[0].state)
# Second is success
self.assertEqual(states.SUCCESS, action_exs[1].state)
def test_timeout_policy(self):
wb_service.create_workbook_v2(TIMEOUT_WB % 2)