diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 47f191c68..f218b45c7 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -80,6 +80,7 @@ def run_existing_task(task_ex_id, reset=True): # Explicitly change task state to RUNNING. task_ex.state = states.RUNNING + task_ex.state_info = None task_ex.processed = False _run_existing_task(task_ex, task_spec, wf_spec) diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index e3a4af2e5..54788a864 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -134,6 +134,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -141,18 +142,21 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_2_ex.state) + self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. self.engine.rerun_workflow(wf_ex.id, task_2_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) # Wait for the workflow to succeed. self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(3, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -170,6 +174,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # Check action executions of task 2. self.assertEqual(states.SUCCESS, task_2_ex.state) + self.assertIsNone(task_2_ex.state_info) task_2_action_exs = db_api.get_action_executions( task_execution_id=task_2_ex.id) @@ -206,6 +211,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -213,6 +219,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_2_ex.state) + self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. e = self.assertRaises( @@ -247,11 +254,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(1, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') self.assertEqual(states.ERROR, task_1_ex.state) + self.assertIsNotNone(task_1_ex.state_info) task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id) @@ -263,11 +272,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self._await(lambda: self.is_execution_success(wf_ex.id), delay=10) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -275,6 +286,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # Check action executions of task 1. self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertIsNone(task_1_ex.state_info) task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id) @@ -314,10 +326,10 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # Run workflow and fail task. wf_ex = self.engine.start_workflow('wb3.wf1', {}) self._await(lambda: self.is_execution_error(wf_ex.id)) - wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(1, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -335,11 +347,13 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self._await(lambda: self.is_execution_success(wf_ex.id), delay=10) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -347,6 +361,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): # Check action executions of task 1. self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertIsNone(task_1_ex.state_info) task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id) @@ -386,6 +401,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(3, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -395,18 +411,21 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.SUCCESS, task_2_ex.state) self.assertEqual(states.ERROR, task_3_ex.state) + self.assertIsNotNone(task_3_ex.state_info) # Resume workflow and re-run failed task. self.engine.rerun_workflow(wf_ex.id, task_3_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) # Wait for the workflow to succeed. self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(3, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -414,6 +433,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase): task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3') # Check action executions of task 1. + self.assertEqual(states.SUCCESS, task_1_ex.state) + task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id) @@ -421,6 +442,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_action_exs[0].state) # Check action executions of task 2. + self.assertEqual(states.SUCCESS, task_2_ex.state) + task_2_action_exs = db_api.get_action_executions( task_execution_id=task_2_ex.id) @@ -428,6 +451,9 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_2_action_exs[0].state) # Check action executions of task 3. + self.assertEqual(states.SUCCESS, task_3_ex.state) + self.assertIsNone(task_3_ex.state_info) + task_3_action_exs = db_api.get_action_executions( task_execution_id=wf_ex.task_executions[2].id) @@ -442,7 +468,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): side_effect=[ exc.ActionException(), # Mock task1 exception for initial run. exc.ActionException(), # Mock task2 exception for initial run. - 'Task 1', # Mock task2 success for rerun. + 'Task 1', # Mock task1 success for rerun. 'Task 2', # Mock task2 success for rerun. 'Task 3' # Mock task3 success. ] @@ -463,19 +489,23 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') self.assertEqual(states.ERROR, task_1_ex.state) + self.assertIsNotNone(task_1_ex.state_info) self.assertEqual(states.ERROR, task_2_ex.state) + self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. self.engine.rerun_workflow(wf_ex.id, task_1_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) # Wait for the task to succeed. task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -484,6 +514,7 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(3, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -499,12 +530,14 @@ class DirectWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) # Wait for the workflow to succeed. self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(3, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -512,6 +545,9 @@ class DirectWorkflowRerunTest(base.EngineTestCase): task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3') # Check action executions of task 1. + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertIsNone(task_1_ex.state_info) + task_1_action_exs = db_api.get_action_executions( task_execution_id=task_1_ex.id) @@ -520,6 +556,9 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_action_exs[1].state) # Check action executions of task 2. + self.assertEqual(states.SUCCESS, task_2_ex.state) + self.assertIsNone(task_2_ex.state_info) + task_2_action_exs = db_api.get_action_executions( task_execution_id=task_2_ex.id) @@ -528,6 +567,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_2_action_exs[1].state) # Check action executions of task 3. + self.assertEqual(states.SUCCESS, task_3_ex.state) + task_3_action_exs = db_api.get_action_executions( task_execution_id=wf_ex.task_executions[2].id) diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index d7f492322..afe65b11b 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -73,6 +73,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -80,18 +81,21 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_2_ex.state) + self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. self.engine.rerun_workflow(wf_ex.id, task_2_ex.id) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) # Wait for the workflow to succeed. self._await(lambda: self.is_execution_success(wf_ex.id)) wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) self.assertEqual(3, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -109,6 +113,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): # Check action executions of task 2. self.assertEqual(states.SUCCESS, task_2_ex.state) + self.assertIsNone(task_2_ex.state_info) task_2_action_exs = db_api.get_action_executions( task_execution_id=task_2_ex.id) @@ -145,6 +150,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): wf_ex = db_api.get_workflow_execution(wf_ex.id) self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) self.assertEqual(2, len(wf_ex.task_executions)) task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') @@ -152,6 +158,7 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): self.assertEqual(states.SUCCESS, task_1_ex.state) self.assertEqual(states.ERROR, task_2_ex.state) + self.assertIsNotNone(task_2_ex.state_info) # Resume workflow and re-run failed task. e = self.assertRaises(