diff --git a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py index 83813c06e..58f1f9d1b 100644 --- a/mistral/tests/unit/engine/test_subworkflows_pause_resume.py +++ b/mistral/tests/unit/engine/test_subworkflows_pause_resume.py @@ -21,7 +21,9 @@ from mistral_lib import actions as ml_actions class SubworkflowPauseResumeTest(base.EngineTestCase): - def test_pause_resume_cascade_down_to_subworkflow(self): + def test_cascade_down(self): + # The purpose of this test is to check whether pausing a main wf will + # also pause subworkflows wb_text = """ version: '2.0' @@ -44,11 +46,11 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf2: tasks: task1: - action: std.async_noop + action: std.noop on-success: task2 task2: - action: std.noop + action: std.async_noop wf3: tasks: @@ -62,306 +64,130 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wb_service.create_workbook_v2(wb_text) - # Start workflow execution. - wf_1_ex = self.engine.start_workflow('wb.wf1') - - self.await_workflow_state(wf_1_ex.id, states.RUNNING) + # Start wf1 + wf1_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_state(wf1_ex.id, states.RUNNING) with db_api.transaction(): + # Grab latest info wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, + wf1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + # Grab tasks + wf1_task1 = self._assert_single_item( + wf1_ex.task_executions, name='task1' ) - self.await_task_running(wf_1_task_1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, + wf1_task2 = self._assert_single_item( + wf1_ex.task_executions, name='task2' ) - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_execs = wf_2_ex.task_executions - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.RUNNING, wf_1_ex.state) - self.assertEqual(2, len(wf_1_task_execs)) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - self.assertEqual(1, len(wf_1_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex.id) - self.assertEqual(1, len(wf_1_task_2_action_exs)) - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) - self.assertEqual(states.RUNNING, wf_2_ex.state) - self.assertEqual(1, len(wf_2_task_execs)) - self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) - self.assertEqual(1, len(wf_2_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(1, len(wf_3_task_execs)) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(1, len(wf_3_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - - # Pause the main workflow. - self.engine.pause_workflow(wf_1_ex.id) - - self.await_workflow_paused(wf_1_ex.id) - self.await_workflow_paused(wf_2_ex.id) - self.await_workflow_paused(wf_3_ex.id) + # Wait for tasks to be running + # We know that it will be running for a long time, because the wf + # use async_noop and should be running until we decide to send an + # action complete event + self.await_task_running(wf1_task1.id) + self.await_task_running(wf1_task2.id) with db_api.transaction(): + # Grab latest info wf_execs = db_api.get_workflow_executions() - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + # Get subworkflows + wf2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + wf3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, + # Grab wf2 first task (we cant yet be sure the second is executed) + # The first task is noop + wf2_task1 = self._assert_single_item( + wf2_ex.task_executions, name='task1' ) - wf_1_task_1_action_exs = wf_1_task_1_ex.executions + # wf3 task1 is async_noop + wf3_task1 = self._assert_single_item( + wf3_ex.task_executions, + name='task1' + ) - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, + # Make sure wf2 and wf3 are running + self.await_workflow_state(wf2_ex.id, states.RUNNING) + self.await_workflow_state(wf3_ex.id, states.RUNNING) + + # Make sure wf3 task1 is running + self.await_task_running(wf3_task1.id) + + # Wait for the wf2_task1 (noop) to be finished + self.await_task_success(wf2_task1.id) + + with db_api.transaction(): + # Grab latest info + wf_execs = db_api.get_workflow_executions() + + # Get subworkflows + wf2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + # We should be able now to grab the second task of wf2 + wf2_task2 = self._assert_single_item( + wf2_ex.task_executions, name='task2' ) - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' + # Grab action so we can complete it after + wf3_task1_action = db_api.get_action_executions( + task_execution_id=wf3_task1.id ) - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id + # Make sure wf2 task2 is running + self.await_task_running(wf2_task2.id) + + with db_api.transaction(): + # Grab action so we can complete it after + wf2_task2_action = db_api.get_action_executions( + task_execution_id=wf2_task2.id ) - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') + # Pause the main workflow + self.engine.pause_workflow(wf1_ex.id) - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) + # We should have all wf paused now (aka cascade down) + self.await_workflow_paused(wf1_ex.id) + self.await_workflow_paused(wf2_ex.id) + self.await_workflow_paused(wf3_ex.id) - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.PAUSED, wf_2_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - self.assertEqual(states.PAUSED, wf_1_ex.state) + # Async and success tasks stay the same + # Task that were creating subwf inherit the wf state (paused here) + self.await_task_paused(wf1_task1.id) + self.await_task_paused(wf1_task2.id) + self.await_task_success(wf2_task1.id) + self.await_task_running(wf2_task2.id) + self.await_task_running(wf3_task1.id) # Resume the main workflow. - self.engine.resume_workflow(wf_1_ex.id) - - self.await_workflow_running(wf_1_ex.id) - self.await_workflow_running(wf_2_ex.id) - self.await_workflow_running(wf_3_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.RUNNING, wf_2_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - self.assertEqual(states.RUNNING, wf_1_ex.state) + # This will resume all workflows + self.engine.resume_workflow(wf1_ex.id) + self.await_workflow_running(wf1_ex.id) + self.await_workflow_running(wf2_ex.id) + self.await_workflow_running(wf3_ex.id) # Complete action executions of the subworkflows. self.engine.on_action_complete( - wf_2_task_1_action_exs[0].id, + wf2_task2_action[0].id, ml_actions.Result(data={'result': 'foobar'}) ) self.engine.on_action_complete( - wf_3_task_1_action_exs[0].id, + wf3_task1_action[0].id, ml_actions.Result(data={'result': 'foobar'}) ) - self.await_workflow_success(wf_2_ex.id) - self.await_workflow_success(wf_3_ex.id) - self.await_workflow_success(wf_1_ex.id) + self.await_workflow_success(wf2_ex.id) + self.await_workflow_success(wf3_ex.id) + self.await_workflow_success(wf1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - wf_1_task_3_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task3' - ) - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_execs = wf_2_ex.task_executions - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_2_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task2' - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_2_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task2' - ) - - self.assertEqual(states.SUCCESS, wf_1_ex.state) - self.assertEqual(3, len(wf_1_task_execs)) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_ex.state) - self.assertEqual(2, len(wf_2_task_execs)) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_3_ex.state) - self.assertEqual(2, len(wf_3_task_execs)) - self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state) - - def test_pause_resume_cascade_up_from_subworkflow(self): + def test_cascade_up(self): + # The purpose of this test is to check whether pausing a subwf will + # also pause the main wf wb_text = """ version: '2.0' @@ -384,11 +210,11 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf2: tasks: task1: - action: std.async_noop + action: std.noop on-success: task2 task2: - action: std.noop + action: std.async_noop wf3: tasks: @@ -402,420 +228,136 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wb_service.create_workbook_v2(wb_text) - # Start workflow execution. - wf_1_ex = self.engine.start_workflow('wb.wf1') - - self.await_workflow_state(wf_1_ex.id, states.RUNNING) + # Start wf1 + wf1_ex = self.engine.start_workflow('wb.wf1') + self.await_workflow_state(wf1_ex.id, states.RUNNING) with db_api.transaction(): + # Grab latest info wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, + wf1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + # Grab tasks + wf1_task1 = self._assert_single_item( + wf1_ex.task_executions, name='task1' ) - self.await_task_running(wf_1_task_1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, + wf1_task2 = self._assert_single_item( + wf1_ex.task_executions, name='task2' ) - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_execs = wf_2_ex.task_executions - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.RUNNING, wf_1_ex.state) - self.assertEqual(2, len(wf_1_task_execs)) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - self.assertEqual(1, len(wf_1_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex.id) - self.assertEqual(1, len(wf_1_task_2_action_exs)) - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) - self.assertEqual(states.RUNNING, wf_2_ex.state) - self.assertEqual(1, len(wf_2_task_execs)) - self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) - self.assertEqual(1, len(wf_2_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(1, len(wf_3_task_execs)) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(1, len(wf_3_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - - # Pause the subworkflow. - self.engine.pause_workflow(wf_2_ex.id) - - self.await_workflow_paused(wf_2_ex.id) + # Wait for tasks to be running + # We know that it will be running for a long time, because the wf + # use async_noop and should be running until we decide to send an + # action complete event + self.await_task_running(wf1_task1.id) + self.await_task_running(wf1_task2.id) with db_api.transaction(): + # Grab latest info wf_execs = db_api.get_workflow_executions() - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + # Get subworkflows + wf2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + wf3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, + # Grab wf2 first task (we cant yet be sure the second is executed) + # The first task is noop + wf2_task1 = self._assert_single_item( + wf2_ex.task_executions, name='task1' ) - wf_1_task_1_action_exs = wf_1_task_1_ex.executions + # wf3 task1 is async_noop + wf3_task1 = self._assert_single_item( + wf3_ex.task_executions, + name='task1' + ) - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, + # Make sure wf2 and wf3 are running + self.await_workflow_state(wf2_ex.id, states.RUNNING) + self.await_workflow_state(wf3_ex.id, states.RUNNING) + + # Make sure wf3 task1 is running + self.await_task_running(wf3_task1.id) + + # Wait for the wf2_task1 (noop) to be finished + self.await_task_success(wf2_task1.id) + + with db_api.transaction(): + # Grab latest info + wf_execs = db_api.get_workflow_executions() + + # Get subworkflows + wf2_ex = self._assert_single_item(wf_execs, name='wb.wf2') + + # We should be able now to grab the second task of wf2 + wf2_task2 = self._assert_single_item( + wf2_ex.task_executions, name='task2' ) - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' + # Grab action so we can complete it after + wf3_task1_action = db_api.get_action_executions( + task_execution_id=wf3_task1.id ) - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.PAUSED, wf_2_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - self.assertEqual(states.PAUSED, wf_1_ex.state) - - # Resume the 1st subworkflow. - self.engine.resume_workflow(wf_2_ex.id) - - self.await_workflow_running(wf_2_ex.id) + # Make sure wf2 task2 is running + self.await_task_running(wf2_task2.id) with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' + # Grab action so we can complete it after + wf2_task2_action = db_api.get_action_executions( + task_execution_id=wf2_task2.id ) - wf_1_task_1_action_exs = wf_1_task_1_ex.executions + # Pause the wf2 workflow + self.engine.pause_workflow(wf2_ex.id) - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) + # We should have all wf paused now (aka cascade up) + self.await_workflow_paused(wf1_ex.id) + self.await_workflow_paused(wf2_ex.id) + self.await_workflow_paused(wf3_ex.id) - wf_1_task_2_action_exs = wf_1_task_2_ex.executions + # Async and success tasks stay the same + # Task that were creating subwf inherit the wf state (paused here) + self.await_task_paused(wf1_task1.id) + self.await_task_paused(wf1_task2.id) + self.await_task_success(wf2_task1.id) + self.await_task_running(wf2_task2.id) + self.await_task_running(wf3_task1.id) - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.RUNNING, wf_2_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - self.assertEqual(states.PAUSED, wf_1_ex.state) - - # Complete action execution of 1st subworkflow. + # Resume and complete wf2 + self.engine.resume_workflow(wf2_ex.id) + self.await_workflow_running(wf2_ex.id) self.engine.on_action_complete( - wf_2_task_1_action_exs[0].id, + wf2_task2_action[0].id, ml_actions.Result(data={'result': 'foobar'}) ) - self.await_workflow_success(wf_2_ex.id) - self.await_task_success(wf_1_task_1_ex.id) + # This is not resuming wf1 and wf3 + self.await_workflow_paused(wf1_ex.id) + self.await_workflow_paused(wf3_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.SUCCESS, wf_2_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - self.assertEqual(states.PAUSED, wf_1_ex.state) - - # Resume the 2nd subworkflow. - self.engine.resume_workflow(wf_3_ex.id) - - self.await_workflow_running(wf_3_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.SUCCESS, wf_2_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - self.assertEqual(states.RUNNING, wf_1_ex.state) - - # Complete action execution of 2nd subworkflow. + # Resume and complete wf3 + self.engine.resume_workflow(wf3_ex.id) + self.await_workflow_running(wf3_ex.id) self.engine.on_action_complete( - wf_3_task_1_action_exs[0].id, + wf3_task1_action[0].id, ml_actions.Result(data={'result': 'foobar'}) ) - self.await_workflow_success(wf_3_ex.id) - self.await_workflow_success(wf_1_ex.id) + # Resuming both subwf is resuming wf1 + self.await_workflow_running(wf1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() + self.await_workflow_success(wf2_ex.id) + self.await_workflow_success(wf3_ex.id) + self.await_workflow_success(wf1_ex.id) - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - wf_1_task_3_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task3' - ) - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_execs = wf_2_ex.task_executions - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_2_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task2' - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_2_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task2' - ) - - self.assertEqual(states.SUCCESS, wf_1_ex.state) - self.assertEqual(3, len(wf_1_task_execs)) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_ex.state) - self.assertEqual(2, len(wf_2_task_execs)) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_3_ex.state) - self.assertEqual(2, len(wf_3_task_execs)) - self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state) - - def test_pause_resume_cascade_down_to_with_items_subworkflows(self): + def test_with_items_cascade(self): + # Purpose of this tests is to test that a wf creating subwfs + # from items (with-items) can be paused and pause will cascade down + # to all subwf wb_text = """ version: '2.0' @@ -825,1186 +367,139 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): wf1: tasks: task1: - with-items: i in <% range(3) %> + with-items: i in <% range(2) %> workflow: wf2 - on-success: task3 - - task2: - workflow: wf3 - on-success: task3 - - task3: - join: all wf2: tasks: task1: action: std.async_noop - on-success: task2 - - task2: - action: std.noop - - wf3: - tasks: - task1: - action: std.async_noop - on-success: task2 - - task2: - action: std.noop """ wb_service.create_workbook_v2(wb_text) # Start workflow execution. - wf_1_ex = self.engine.start_workflow('wb.wf1') + wf1_ex = self.engine.start_workflow('wb.wf1') - self.await_workflow_state(wf_1_ex.id, states.RUNNING) + self.await_workflow_state(wf1_ex.id, states.RUNNING) with db_api.transaction(): + # Grab latest info wf_execs = db_api.get_workflow_executions() + wf1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - self.await_task_running(wf_1_task_1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, + wf1_task1 = wf1_ex.task_executions + wf1_task1 = self._assert_single_item( + wf1_ex.task_executions, name='task1' ) - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, + # Wait for the first task to be running + # It will run until we send a action_complete (later in this test) + self.await_task_running(wf1_task1.id) + + with db_api.transaction(): + # Grab latest info + wf_execs = db_api.get_workflow_executions() + wf1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + + wf1_task1 = wf1_ex.task_executions + wf1_task1 = self._assert_single_item( + wf1_ex.task_executions, + name='task1' + ) + wf1_task1_actions = sorted( + wf1_task1.executions, key=lambda x: x['runtime_context']['index'] ) - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' + # Get the two with-items subworkflow executions. + wf2_ex1 = db_api.get_workflow_execution( + wf1_task1_actions[0].id ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id + wf2_ex2 = db_api.get_workflow_execution( + wf1_task1_actions[1].id ) # NOTE(amorin) we need to await the wf to make sure it's actually # running and avoid race condition with IDLE state # See lp-2051040 - self.await_workflow_state(wf_2_ex_1.id, states.RUNNING) + self.await_workflow_state(wf2_ex1.id, states.RUNNING) + self.await_workflow_state(wf2_ex2.id, states.RUNNING) with db_api.transaction(): - # Get again now that it's running - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id + # Grab latest info + wf2_ex1 = db_api.get_workflow_execution(wf2_ex1.id) + wf2_ex2 = db_api.get_workflow_execution(wf2_ex2.id) + + wf2_ex1_task1 = self._assert_single_item( + wf2_ex1.task_executions, + name='task1' ) - - wf_2_ex_1_task_execs = wf_2_ex_1.task_executions - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, + wf2_ex2_task1 = self._assert_single_item( + wf2_ex2.task_executions, name='task1' ) - # And wait for the task to be RUNNING - self.await_task_running(wf_2_ex_1_task_1_ex.id) + # And wait for the tasks to be RUNNING + self.await_task_running(wf2_ex1_task1.id) + self.await_task_running(wf2_ex2_task1.id) with db_api.transaction(): - # Get again now that it's running - wf_2_ex_1_task_1_ex = db_api.get_task_execution( - wf_2_ex_1_task_execs[0].id + # Grab actions so we can complete it after + wf2_ex1_task1_actions = db_api.get_action_executions( + task_execution_id=wf2_ex1_task1.id ) - - wf_execs = db_api.get_workflow_executions() - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id + wf2_ex2_task1_actions = db_api.get_action_executions( + task_execution_id=wf2_ex2_task1.id ) - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - # And wait for the wf to be RUNNING - self.await_workflow_state(wf_2_ex_2.id, states.RUNNING) - - with db_api.transaction(): - # Get again now that it's running - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_execs = wf_2_ex_2.task_executions - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - # And wait for the task to be RUNNING - self.await_task_running(wf_2_ex_2_task_1_ex.id) - - with db_api.transaction(): - # Get again now that it's running - wf_2_ex_2_task_1_ex = db_api.get_task_execution( - wf_2_ex_2_task_execs[0].id - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - # And wait for the wf to be RUNNING - self.await_workflow_state(wf_2_ex_3.id, states.RUNNING) - - with db_api.transaction(): - # Get again now that it's running - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_execs = wf_2_ex_3.task_executions - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - # And wait for the task to be RUNNING - self.await_task_running(wf_2_ex_3_task_1_ex.id) - - with db_api.transaction(): - # Get again now that it's running - wf_2_ex_3_task_1_ex = db_api.get_task_execution( - wf_2_ex_3_task_execs[0].id - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - # And wait for the wf to be RUNNING - self.await_workflow_state(wf_3_ex.id, states.RUNNING) - - with db_api.transaction(): - # Get again now that it's running - wf_3_ex = db_api.get_workflow_execution( - wf_1_task_2_action_exs[0].id - ) - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - # And wait for the task to be RUNNING - self.await_task_running(wf_3_task_1_ex.id) - - with db_api.transaction(): - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(2, len(wf_1_task_execs)) - self.assertEqual(3, len(wf_1_task_1_action_exs)) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex_1.id) - self.assertEqual(1, len(wf_2_ex_1_task_execs)) - self.assertEqual(1, len(wf_2_ex_1_task_1_action_exs)) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(wf_1_task_1_action_exs[1].id, wf_2_ex_2.id) - self.assertEqual(1, len(wf_2_ex_2_task_execs)) - self.assertEqual(1, len(wf_2_ex_2_task_1_action_exs)) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(wf_1_task_1_action_exs[2].id, wf_2_ex_3.id) - self.assertEqual(1, len(wf_2_ex_3_task_execs)) - self.assertEqual(1, len(wf_2_ex_3_task_1_action_exs)) - - # Check state of wf3 subworkflow execution. - self.assertEqual(1, len(wf_1_task_2_action_exs)) - self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) - self.assertEqual(1, len(wf_3_task_execs)) - self.assertEqual(1, len(wf_3_task_1_action_exs)) - # Pause the main workflow. - self.engine.pause_workflow(wf_1_ex.id) - self.await_workflow_paused(wf_2_ex_1.id) - self.await_workflow_paused(wf_2_ex_2.id) - self.await_workflow_paused(wf_2_ex_3.id) - self.await_workflow_paused(wf_3_ex.id) - self.await_task_paused(wf_1_task_1_ex.id) - self.await_task_paused(wf_1_task_2_ex.id) - self.await_workflow_paused(wf_1_ex.id) + self.engine.pause_workflow(wf1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() + # This should pause wf1 and its task + self.await_workflow_paused(wf1_ex.id) + self.await_task_paused(wf1_task1.id) - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + # And this should cascade down to all subwf + self.await_workflow_paused(wf2_ex1.id) + self.await_workflow_paused(wf2_ex2.id) - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.PAUSED, wf_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_2_ex_1.state) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[1].state) - self.assertEqual(states.PAUSED, wf_2_ex_2.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[2].state) - self.assertEqual(states.PAUSED, wf_2_ex_3.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + # But tasks stay running (async noop) + self.await_task_running(wf2_ex1_task1.id) + self.await_task_running(wf2_ex2_task1.id) # Resume the main workflow. - self.engine.resume_workflow(wf_1_ex.id) - self.await_workflow_running(wf_2_ex_1.id) - self.await_workflow_running(wf_2_ex_2.id) - self.await_workflow_running(wf_2_ex_3.id) - self.await_workflow_running(wf_3_ex.id) - self.await_task_running(wf_1_task_1_ex.id) - self.await_task_running(wf_1_task_2_ex.id) - self.await_workflow_running(wf_1_ex.id) + self.engine.resume_workflow(wf1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() + # This should resume wf1 + self.await_workflow_running(wf1_ex.id) + self.await_task_running(wf1_task1.id) - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.RUNNING, wf_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_2_ex_1.state) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[1].state) - self.assertEqual(states.RUNNING, wf_2_ex_2.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[2].state) - self.assertEqual(states.RUNNING, wf_2_ex_3.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) + # But also wf2 subwfs + self.await_workflow_running(wf2_ex1.id) + self.await_workflow_running(wf2_ex2.id) + self.await_task_running(wf2_ex1_task1.id) + self.await_task_running(wf2_ex2_task1.id) # Complete action execution of subworkflows. self.engine.on_action_complete( - wf_2_ex_1_task_1_action_exs[0].id, + wf2_ex1_task1_actions[0].id, ml_actions.Result(data={'result': 'foobar'}) ) self.engine.on_action_complete( - wf_2_ex_2_task_1_action_exs[0].id, + wf2_ex2_task1_actions[0].id, ml_actions.Result(data={'result': 'foobar'}) ) - self.engine.on_action_complete( - wf_2_ex_3_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) + self.await_workflow_success(wf2_ex1.id) + self.await_workflow_success(wf2_ex2.id) + self.await_workflow_success(wf1_ex.id) - self.engine.on_action_complete( - wf_3_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) - - self.await_workflow_success(wf_2_ex_1.id) - self.await_workflow_success(wf_2_ex_2.id) - self.await_workflow_success(wf_2_ex_3.id) - self.await_workflow_success(wf_3_ex.id) - self.await_task_success(wf_1_task_1_ex.id) - self.await_task_success(wf_1_task_2_ex.id) - self.await_workflow_success(wf_1_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.SUCCESS, wf_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_ex_1.state) - self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[1].state) - self.assertEqual(states.SUCCESS, wf_2_ex_2.state) - self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[2].state) - self.assertEqual(states.SUCCESS, wf_2_ex_3.state) - self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_3_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) - - def test_pause_resume_cascade_up_from_with_items_subworkflow(self): - wb_text = """ - version: '2.0' - - name: wb - - workflows: - wf1: - tasks: - task1: - with-items: i in <% range(3) %> - workflow: wf2 - on-success: task3 - - task2: - workflow: wf3 - on-success: task3 - - task3: - join: all - - wf2: - tasks: - task1: - action: std.async_noop - on-success: task2 - - task2: - action: std.noop - - wf3: - tasks: - task1: - action: std.async_noop - on-success: task2 - - task2: - action: std.noop - """ - - wb_service.create_workbook_v2(wb_text) - - # Start workflow execution. - wf_1_ex = self.engine.start_workflow('wb.wf1') - - self.await_workflow_state(wf_1_ex.id, states.RUNNING) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - self.await_task_running(wf_1_task_1_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_execs = wf_2_ex_1.task_executions - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_execs = wf_2_ex_2.task_executions - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_execs = wf_2_ex_3.task_executions - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.RUNNING, wf_1_ex.state) - self.assertEqual(2, len(wf_1_task_execs)) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - self.assertEqual(3, len(wf_1_task_1_action_exs)) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(wf_1_task_1_action_exs[0].id, wf_2_ex_1.id) - self.assertEqual(states.RUNNING, wf_2_ex_1.state) - self.assertEqual(1, len(wf_2_ex_1_task_execs)) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) - self.assertEqual(1, len(wf_2_ex_1_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[1].state) - self.assertEqual(wf_1_task_1_action_exs[1].id, wf_2_ex_2.id) - self.assertEqual(states.RUNNING, wf_2_ex_2.state) - self.assertEqual(1, len(wf_2_ex_2_task_execs)) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) - self.assertEqual(1, len(wf_2_ex_2_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[2].state) - self.assertEqual(wf_1_task_1_action_exs[2].id, wf_2_ex_3.id) - self.assertEqual(states.RUNNING, wf_2_ex_3.state) - self.assertEqual(1, len(wf_2_ex_3_task_execs)) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) - self.assertEqual(1, len(wf_2_ex_3_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(1, len(wf_1_task_2_action_exs)) - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(wf_1_task_2_action_exs[0].id, wf_3_ex.id) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(1, len(wf_3_task_execs)) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(1, len(wf_3_task_1_action_exs)) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - - # Pause one of the subworkflows in the with-items task. - self.engine.pause_workflow(wf_2_ex_1.id) - - self.await_workflow_paused(wf_2_ex_1.id) - self.await_workflow_paused(wf_2_ex_2.id) - self.await_workflow_paused(wf_2_ex_3.id) - self.await_workflow_paused(wf_3_ex.id) - self.await_task_paused(wf_1_task_1_ex.id) - self.await_task_paused(wf_1_task_2_ex.id) - self.await_workflow_paused(wf_1_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.PAUSED, wf_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_2_ex_1.state) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[1].state) - self.assertEqual(states.PAUSED, wf_2_ex_2.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[2].state) - self.assertEqual(states.PAUSED, wf_2_ex_3.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - - # NOTE(rakhmerov): Since cascade pausing is not atomic we need - # to make sure that all internal operations related to pausing - # one of workflow executions 'wb.wf2' are completed. So we have - # to look if any "_on_action_update" calls are scheduled. - - def _predicate(): - return all( - [ - '_on_action_update' not in c.target_method_name - for c in db_api.get_delayed_calls() - ] - ) - - self._await(_predicate) - - # Resume one of the subworkflows in the with-items task. - self.engine.resume_workflow(wf_2_ex_1.id) - - self.await_workflow_running(wf_2_ex_1.id) - self.await_workflow_paused(wf_2_ex_2.id) - self.await_workflow_paused(wf_2_ex_3.id) - self.await_workflow_paused(wf_3_ex.id) - self.await_task_paused(wf_1_task_1_ex.id) - self.await_task_paused(wf_1_task_2_ex.id) - self.await_workflow_paused(wf_1_ex.id) - - # Complete action execution of the subworkflow that is resumed. - self.engine.on_action_complete( - wf_2_ex_1_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) - - self.await_workflow_success(wf_2_ex_1.id) - self.await_workflow_paused(wf_2_ex_2.id) - self.await_workflow_paused(wf_2_ex_3.id) - self.await_workflow_paused(wf_3_ex.id) - self.await_task_paused(wf_1_task_1_ex.id) - self.await_task_paused(wf_1_task_2_ex.id) - self.await_workflow_paused(wf_1_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.PAUSED, wf_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_ex_1.state) - self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[1].state) - self.assertEqual(states.PAUSED, wf_2_ex_2.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[2].state) - self.assertEqual(states.PAUSED, wf_2_ex_3.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - - # Resume one of the remaining subworkflows. - self.engine.resume_workflow(wf_2_ex_2.id) - self.engine.resume_workflow(wf_2_ex_3.id) - self.engine.resume_workflow(wf_3_ex.id) - self.await_workflow_running(wf_2_ex_2.id) - self.await_workflow_running(wf_2_ex_3.id) - self.await_workflow_running(wf_3_ex.id) - self.await_task_running(wf_1_task_1_ex.id) - self.await_task_running(wf_1_task_2_ex.id) - self.await_workflow_running(wf_1_ex.id) - - # Complete action executions of the remaining subworkflows. - self.engine.on_action_complete( - wf_2_ex_2_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) - - self.engine.on_action_complete( - wf_2_ex_3_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) - - self.engine.on_action_complete( - wf_3_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) - - self.await_workflow_success(wf_2_ex_1.id) - self.await_workflow_success(wf_2_ex_2.id) - self.await_workflow_success(wf_2_ex_3.id) - self.await_workflow_success(wf_3_ex.id) - self.await_task_success(wf_1_task_1_ex.id) - self.await_task_success(wf_1_task_2_ex.id) - self.await_workflow_success(wf_1_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = sorted( - wf_1_task_1_ex.executions, - key=lambda x: x['runtime_context']['index'] - ) - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the with-items subworkflow executions. - wf_2_ex_1 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[0].id - ) - - wf_2_ex_1_task_1_ex = self._assert_single_item( - wf_2_ex_1.task_executions, - name='task1' - ) - - wf_2_ex_1_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_1_task_1_ex.id - ) - - wf_2_ex_2 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[1].id - ) - - wf_2_ex_2_task_1_ex = self._assert_single_item( - wf_2_ex_2.task_executions, - name='task1' - ) - - wf_2_ex_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_2_task_1_ex.id - ) - - wf_2_ex_3 = db_api.get_workflow_execution( - wf_1_task_1_action_exs[2].id - ) - - wf_2_ex_3_task_1_ex = self._assert_single_item( - wf_2_ex_3.task_executions, - name='task1' - ) - - wf_2_ex_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_ex_3_task_1_ex.id - ) - - # Get objects for the wf3 subworkflow execution. - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - # Check state of parent workflow execution. - self.assertEqual(states.SUCCESS, wf_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) - - # Check state of wf2 (1) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_ex_1.state) - self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_1_task_1_action_exs[0].state) - - # Check state of wf2 (2) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[1].state) - self.assertEqual(states.SUCCESS, wf_2_ex_2.state) - self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_2_task_1_action_exs[0].state) - - # Check state of wf2 (3) subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[2].state) - self.assertEqual(states.SUCCESS, wf_2_ex_3.state) - self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_ex_3_task_1_action_exs[0].state) - - # Check state of wf3 subworkflow execution. - self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_3_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) - - def test_pause_resume_cascade_up_from_subworkflow_pause_before(self): + def test_pause_before_cascade(self): + # Purpose of this test is to verify if a pause-before in a subwf will + # pause the main wf wb_text = """ version: '2.0' @@ -2015,296 +510,59 @@ class SubworkflowPauseResumeTest(base.EngineTestCase): tasks: task1: workflow: wf2 - on-success: task3 - - task2: - workflow: wf3 - on-success: task3 - - task3: - join: all wf2: tasks: task1: - action: std.noop - on-success: task2 - - task2: pause-before: true action: std.async_noop - - wf3: - tasks: - task1: - action: std.async_noop - on-success: task2 - - task2: - action: std.noop """ wb_service.create_workbook_v2(wb_text) # Start workflow execution. - wf_1_ex = self.engine.start_workflow('wb.wf1') + wf1_ex = self.engine.start_workflow('wb.wf1') - self.await_workflow_state(wf_1_ex.id, states.PAUSED) + self.await_workflow_state(wf1_ex.id, states.PAUSED) with db_api.transaction(): wf_execs = db_api.get_workflow_executions() + wf1_ex = self._assert_single_item(wf_execs, name='wb.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, + wf2_task1 = self._assert_single_item( + wf2_ex.task_executions, name='task1' ) - wf_1_task_1_action_exs = wf_1_task_1_ex.executions + # Make sure the task and wf are paused + self.await_workflow_state(wf2_ex.id, states.PAUSED) + # NOTE(amorin) not sure why the state is IDLE here, that was coded + # like this in the past, even if my logic would suggest to have + # PAUSED instead. + # Maybe because the task actually never started + # seen in logs: [RUNNING -> IDLE, msg=Set by 'pause-before' policy] + self.await_task_state(wf2_task1.id, states.IDLE) - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) + # Resume wf2 + self.engine.resume_workflow(wf2_ex.id) - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_2_task_2_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task2' - ) - - wf_2_task_2_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_2_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.PAUSED, wf_2_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.IDLE, wf_2_task_2_ex.state) - self.assertEqual(0, len(wf_2_task_2_action_exs)) - self.assertEqual(states.PAUSED, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_1_ex.state) - self.assertEqual(states.PAUSED, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.PAUSED, wf_1_task_2_ex.state) - self.assertEqual(states.PAUSED, wf_1_ex.state) - - # Resume the main workflow. - self.engine.resume_workflow(wf_1_ex.id) - - self.await_workflow_running(wf_1_ex.id) - self.await_workflow_running(wf_2_ex.id) - self.await_workflow_running(wf_3_ex.id) + # Make sure it's back running + self.await_workflow_running(wf1_ex.id) + self.await_workflow_running(wf2_ex.id) + self.await_task_running(wf2_task1.id) with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - self.await_task_running(wf_1_task_1_ex.id) - self.await_task_running(wf_2_task_2_ex.id) - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' + # Grab action so we can complete it below + wf2_task1_actions = db_api.get_action_executions( + task_execution_id=wf2_task1.id ) - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_2_task_2_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task2' - ) - - wf_2_task_2_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_2_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - self.assertEqual(states.RUNNING, wf_2_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_2_task_2_ex.state) - self.assertEqual(states.RUNNING, wf_2_task_2_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_3_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_1_ex.state) - self.assertEqual(states.RUNNING, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.RUNNING, wf_1_task_2_ex.state) - self.assertEqual(states.RUNNING, wf_1_ex.state) - - # Complete action executions of the subworkflows. + # Complete action execution of subworkflows. self.engine.on_action_complete( - wf_2_task_2_action_exs[0].id, + wf2_task1_actions[0].id, ml_actions.Result(data={'result': 'foobar'}) ) - self.engine.on_action_complete( - wf_3_task_1_action_exs[0].id, - ml_actions.Result(data={'result': 'foobar'}) - ) - - self.await_workflow_success(wf_2_ex.id) - self.await_workflow_success(wf_3_ex.id) - self.await_workflow_success(wf_1_ex.id) - - with db_api.transaction(): - wf_execs = db_api.get_workflow_executions() - - # Get objects for the parent workflow execution. - wf_1_ex = self._assert_single_item(wf_execs, name='wb.wf1') - - wf_1_task_execs = wf_1_ex.task_executions - - wf_1_task_1_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task1' - ) - - wf_1_task_1_action_exs = wf_1_task_1_ex.executions - - wf_1_task_2_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task2' - ) - - wf_1_task_2_action_exs = wf_1_task_2_ex.executions - - wf_1_task_3_ex = self._assert_single_item( - wf_1_ex.task_executions, - name='task3' - ) - - # Get objects for the subworkflow executions. - wf_2_ex = self._assert_single_item(wf_execs, name='wb.wf2') - - wf_2_task_execs = wf_2_ex.task_executions - - wf_2_task_1_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task1' - ) - - wf_2_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_1_ex.id - ) - - wf_2_task_2_ex = self._assert_single_item( - wf_2_ex.task_executions, - name='task2' - ) - - wf_2_task_2_action_exs = db_api.get_action_executions( - task_execution_id=wf_2_task_2_ex.id - ) - - wf_3_ex = self._assert_single_item(wf_execs, name='wb.wf3') - - wf_3_task_execs = wf_3_ex.task_executions - - wf_3_task_1_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task1' - ) - - wf_3_task_1_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_1_ex.id - ) - - wf_3_task_2_ex = self._assert_single_item( - wf_3_ex.task_executions, - name='task2' - ) - - wf_3_task_2_action_exs = db_api.get_action_executions( - task_execution_id=wf_3_task_2_ex.id - ) - - self.assertEqual(states.SUCCESS, wf_1_ex.state) - self.assertEqual(3, len(wf_1_task_execs)) - self.assertEqual(states.SUCCESS, wf_1_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_3_ex.state) - self.assertEqual(states.SUCCESS, wf_1_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_1_task_2_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_ex.state) - self.assertEqual(2, len(wf_2_task_execs)) - self.assertEqual(states.SUCCESS, wf_2_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_2_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_2_task_2_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_3_ex.state) - self.assertEqual(2, len(wf_3_task_execs)) - self.assertEqual(states.SUCCESS, wf_3_task_1_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_2_ex.state) - self.assertEqual(states.SUCCESS, wf_3_task_1_action_exs[0].state) - self.assertEqual(states.SUCCESS, wf_3_task_2_action_exs[0].state) + self.await_workflow_success(wf2_ex.id) + self.await_workflow_success(wf1_ex.id)