From ff75c7fc105f923a7acfcb52a94a08721b968a61 Mon Sep 17 00:00:00 2001 From: Arnaud M Date: Fri, 3 Jan 2025 14:04:27 +0100 Subject: [PATCH] Rewrite test_retry_async_action This test was failing from time to time with new eventlet release. The issue was a race condition. By refactoring this test, we make sure everything is executed in the correct order and that we wait properly for expected states before finishing. Change-Id: I0e5c8eaa9d88a1ba9a1bd21621aad15ae8e460ca Signed-off-by: Arnaud M --- mistral/tests/unit/engine/test_policies.py | 75 ++++++++++++++-------- 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index 865df5044..0e314a999 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -1351,6 +1351,14 @@ class PoliciesTest(base.EngineTestCase): mock.MagicMock(return_value='mock') ) def test_retry_async_action(self): + """Test that an action won't be done twice + + When completing an action, we are supposed to receive the completion + only once. If we receive it twice or more, it should raise an error. + + If we want to retry that action, then the engine will create a new + action execution. + """ retry_wf = """--- version: '2.0' repeated_retry: @@ -1358,61 +1366,74 @@ class PoliciesTest(base.EngineTestCase): async_http: retry: delay: 0 - count: 100 - action: std.mistral_http url='https://google.com' + count: 2 + action: std.mistral_http url='https://opendev.org' """ + # Create and wait for the wf to be running wf_service.create_workflows(retry_wf) - wf_ex = self.engine.start_workflow('repeated_retry') - self.await_workflow_running(wf_ex.id) with db_api.transaction(): wf_ex = db_api.get_workflow_execution(wf_ex.id) task_ex = wf_ex.task_executions[0] + + # Wait for the task to be running self.await_task_running(task_ex.id) with db_api.transaction(): - wf_ex = db_api.get_workflow_execution(wf_ex.id) - - task_ex = wf_ex.task_executions[0] - - self.await_task_running(task_ex.id) - + # We need to it back to be bound to a db transaction + task_ex = db_api.get_task_execution(task_ex.id) first_action_ex = task_ex.executions[0] - self.await_action_state(first_action_ex.id, states.RUNNING) + # Now wait for this action to be running + # Note that the task won't actually run because it's mocked + self.await_action_state(first_action_ex.id, states.RUNNING) + # Send an action complete event with error so the task will retry complete_action_params = ( first_action_ex.id, ml_actions.Result(error="mock") ) - rpc.get_engine_client().on_action_complete(*complete_action_params) - for _ in range(2): - self.assertRaises( - exc.MistralException, - rpc.get_engine_client().on_action_complete, - *complete_action_params - ) + # Send it a second time -- this should raise a ValueError + self.assertRaises( + exc.MistralException, + rpc.get_engine_client().on_action_complete, + *complete_action_params + ) + # And the task should be restarted self.await_task_running(task_ex.id) + with db_api.transaction(): + task_ex = db_api.get_task_execution(task_ex.id) + second_action_ex = task_ex.executions[1] + + # This time, send a success action complete + complete_action_params = ( + second_action_ex.id, + ml_actions.Result(data="mock") + ) + rpc.get_engine_client().on_action_complete(*complete_action_params) + + # Wait for the wf to finish + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + self.await_workflow_success(wf_ex.id) + + # Now check the results with db_api.transaction(): task_ex = db_api.get_task_execution(task_ex.id) action_exs = task_ex.executions - + # We are supposed to have two actions self.assertEqual(2, len(action_exs)) - - for action_ex in action_exs: - if action_ex.id == first_action_ex.id: - expected_state = states.ERROR - else: - expected_state = states.RUNNING - - self.assertEqual(expected_state, action_ex.state) + # First is in error + self.assertEqual(states.ERROR, action_exs[0].state) + # Second is success + self.assertEqual(states.SUCCESS, action_exs[1].state) def test_timeout_policy(self): wb_service.create_workbook_v2(TIMEOUT_WB % 2)