diff --git a/mistral/api/controllers/v2/action_execution.py b/mistral/api/controllers/v2/action_execution.py index 2010a64a..88384321 100644 --- a/mistral/api/controllers/v2/action_execution.py +++ b/mistral/api/controllers/v2/action_execution.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- -# # Copyright 2015 - Mirantis, Inc. +# Copyright 2016 - Brocade Communications Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -166,10 +165,14 @@ class ActionExecutionsController(rest.RestController): if not output: output = 'Unknown error' result = wf_utils.Result(error=output) + elif action_ex.state == states.CANCELLED: + result = wf_utils.Result(cancel=True) else: raise exc.InvalidResultException( - "Error. Expected on of %s, actual: %s" % - ([states.SUCCESS, states.ERROR], action_ex.state) + "Error. Expected one of %s, actual: %s" % ( + [states.SUCCESS, states.ERROR, states.CANCELLED], + action_ex.state + ) ) values = rpc.get_engine_client().on_action_complete(id, result) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 4c8835c3..1f80301c 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -323,7 +323,7 @@ class RetryPolicy(base.TaskPolicy): state = task_ex.state - if not states.is_completed(state): + if not states.is_completed(state) or states.is_cancelled(state): return policy_context = runtime_context[context_key] @@ -336,12 +336,20 @@ class RetryPolicy(base.TaskPolicy): retries_remain = retry_no + 1 < self.count - stop_continue_flag = (task_ex.state == states.SUCCESS and - not self._continue_on_clause) - stop_continue_flag = (stop_continue_flag or - (self._continue_on_clause and - not continue_on_evaluation)) - break_triggered = task_ex.state == states.ERROR and self.break_on + stop_continue_flag = ( + task_ex.state == states.SUCCESS and + not self._continue_on_clause + ) + + stop_continue_flag = ( + stop_continue_flag or + (self._continue_on_clause and not continue_on_evaluation) + ) + + break_triggered = ( + task_ex.state == states.ERROR and + self.break_on + ) if not retries_remain or break_triggered or stop_continue_flag: return diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index d850ce25..836feff1 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -327,7 +327,10 @@ class RegularTask(Task): execs = self.task_ex.executions else: execs = filter( - lambda e: e.accepted and e.state == states.ERROR, + lambda e: ( + e.accepted and + e.state in [states.ERROR, states.CANCELLED] + ), self.task_ex.executions ) diff --git a/mistral/tests/unit/api/v2/test_action_executions.py b/mistral/tests/unit/api/v2/test_action_executions.py index b7efe65e..e448b792 100644 --- a/mistral/tests/unit/api/v2/test_action_executions.py +++ b/mistral/tests/unit/api/v2/test_action_executions.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- -# # Copyright 2015 - Mirantis, Inc. +# Copyright 2016 - Brocade Communications Systems, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -132,6 +131,12 @@ UPDATED_ACTION = copy.deepcopy(ACTION_EX) UPDATED_ACTION['state'] = 'SUCCESS' UPDATED_ACTION_OUTPUT = UPDATED_ACTION['output'] +CANCELLED_ACTION_EX_DB = copy.copy(ACTION_EX_DB).to_dict() +CANCELLED_ACTION_EX_DB['state'] = 'CANCELLED' +CANCELLED_ACTION_EX_DB['task_name'] = 'task1' +CANCELLED_ACTION = copy.deepcopy(ACTION_EX) +CANCELLED_ACTION['state'] = 'CANCELLED' + ERROR_ACTION_EX = copy.copy(ACTION_EX_DB).to_dict() ERROR_ACTION_EX['state'] = 'ERROR' ERROR_ACTION_EX['task_name'] = 'task1' @@ -373,6 +378,20 @@ class TestActionExecutionsController(base.APITest): wf_utils.Result(error=DEFAULT_ERROR_OUTPUT) ) + @mock.patch.object(rpc.EngineClient, 'on_action_complete') + def test_put_cancelled(self, on_action_complete_mock_func): + on_action_complete_mock_func.return_value = CANCELLED_ACTION_EX_DB + + resp = self.app.put_json('/v2/action_executions/123', CANCELLED_ACTION) + + self.assertEqual(200, resp.status_int) + self.assertDictEqual(CANCELLED_ACTION, resp.json) + + on_action_complete_mock_func.assert_called_once_with( + CANCELLED_ACTION['id'], + wf_utils.Result(cancel=True) + ) + @mock.patch.object( rpc.EngineClient, 'on_action_complete', @@ -387,6 +406,19 @@ class TestActionExecutionsController(base.APITest): self.assertEqual(404, resp.status_int) + def test_put_bad_state(self): + action = copy.deepcopy(ACTION_EX) + action['state'] = 'PAUSED' + + resp = self.app.put_json( + '/v2/action_executions/123', + action, + expect_errors=True + ) + + self.assertEqual(400, resp.status_int) + self.assertIn('Expected one of', resp.json['faultstring']) + def test_put_bad_result(self): resp = self.app.put_json( '/v2/action_executions/123', diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py b/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py new file mode 100644 index 00000000..8cfae327 --- /dev/null +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun_cancelled.py @@ -0,0 +1,646 @@ +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg + +from mistral.actions import std_actions +from mistral.db.v2 import api as db_api +from mistral.services import workbooks as wb_service +from mistral.tests.unit.engine import base +from mistral.workflow import states +from mistral.workflow import utils as wf_utils + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class DirectWorkflowRerunCancelledTest(base.EngineTestCase): + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 2', # Mock task2 success. + 'Task 3' # Mock task3 success. + ] + ) + ) + def test_rerun_cancelled_task(self): + wb_def = """ + version: '2.0' + + name: wb1 + + workflows: + wf1: + type: direct + + tasks: + t1: + action: std.async_noop + on-success: + - t2 + t2: + action: std.echo output="Task 2" + on-success: + - t3 + t3: + action: std.echo output="Task 3" + """ + + wb_service.create_workbook_v2(wb_def) + + wf1_ex = self.engine.start_workflow('wb1.wf1', {}) + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(1, len(wf1_t1_action_exs)) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[0].state) + + # Cancel action execution for task. + self.engine.on_action_complete( + wf1_t1_action_exs[0].id, + wf_utils.Result(cancel=True) + ) + + self.await_task_cancelled(wf1_t1_ex.id) + self.await_workflow_cancelled(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_task_execs = wf1_ex.task_executions + + wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1') + + self.assertEqual(states.CANCELLED, wf1_ex.state) + self.assertEqual("Cancelled tasks: t1", wf1_ex.state_info) + self.assertEqual(1, len(wf1_task_execs)) + self.assertEqual(states.CANCELLED, wf1_t1_ex.state) + self.assertIsNone(wf1_t1_ex.state_info) + + # Resume workflow and re-run cancelled task. + self.engine.rerun_workflow(wf1_t1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_task_execs = wf1_ex.task_executions + + self.assertEqual(states.RUNNING, wf1_ex.state) + self.assertIsNone(wf1_ex.state_info) + + # Mark async action execution complete. + wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1') + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(states.RUNNING, wf1_t1_ex.state) + self.assertEqual(2, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[1].state) + + self.engine.on_action_complete( + wf1_t1_action_exs[1].id, + wf_utils.Result(data={'foo': 'bar'}) + ) + + # Wait for the workflow to succeed. + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_task_execs = wf1_ex.task_executions + + self.assertEqual(states.SUCCESS, wf1_ex.state) + self.assertIsNone(wf1_ex.state_info) + self.assertEqual(3, len(wf1_task_execs)) + + wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1') + wf1_t2_ex = self._assert_single_item(wf1_task_execs, name='t2') + wf1_t3_ex = self._assert_single_item(wf1_task_execs, name='t3') + + # Check action executions of task 1. + self.assertEqual(states.SUCCESS, wf1_t1_ex.state) + self.assertIsNone(wf1_t1_ex.state_info) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(2, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf1_t1_action_exs[1].state) + + # Check action executions of task 2. + self.assertEqual(states.SUCCESS, wf1_t2_ex.state) + + wf1_t2_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(1, len(wf1_t2_action_exs)) + self.assertEqual(states.SUCCESS, wf1_t2_action_exs[0].state) + + # Check action executions of task 3. + self.assertEqual(states.SUCCESS, wf1_t3_ex.state) + + wf1_t3_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t3_ex.id + ) + + self.assertEqual(1, len(wf1_t3_action_exs)) + self.assertEqual(states.SUCCESS, wf1_t3_action_exs[0].state) + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 1', # Mock task1 success. + 'Task 3' # Mock task3 success. + ] + ) + ) + def test_rerun_cancelled_subflow(self): + wb_def = """ + version: '2.0' + + name: wb1 + + workflows: + wf1: + type: direct + + tasks: + t1: + action: std.echo output="Task 1" + on-success: + - t2 + t2: + workflow: wf2 + on-success: + - t3 + t3: + action: std.echo output="Task 3" + + wf2: + type: direct + + output: + result: <% task(wf2_t1).result %> + + tasks: + wf2_t1: + action: std.async_noop + """ + + wb_service.create_workbook_v2(wb_def) + + wf1_ex = self.engine.start_workflow('wb1.wf1', {}) + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + with db_api.transaction(): + # Wait for task 1 to complete. + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + self.await_task_success(wf1_t1_ex.id) + + with db_api.transaction(): + # Wait for the async task to run. + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t2_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t2' + ) + + self.await_task_state(wf1_t2_ex.id, states.RUNNING) + + with db_api.transaction(): + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(1, len(sub_wf_exs)) + self.assertEqual(states.RUNNING, sub_wf_exs[0].state) + + wf2_ex = sub_wf_exs[0] + + wf2_t1_ex = self._assert_single_item( + wf2_ex.task_executions, + name='wf2_t1' + ) + + self.await_task_state(wf2_t1_ex.id, states.RUNNING) + + wf2_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf2_t1_ex.id + ) + + self.assertEqual(1, len(wf2_t1_action_exs)) + self.assertEqual(states.RUNNING, wf2_t1_action_exs[0].state) + + # Cancel subworkflow. + self.engine.stop_workflow(wf2_ex.id, states.CANCELLED) + + self.await_workflow_cancelled(wf2_ex.id) + self.await_workflow_cancelled(wf1_ex.id) + + # Resume workflow and re-run failed subworkflow task. + self.engine.rerun_workflow(wf1_t2_ex.id) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t2_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t2' + ) + + self.await_task_state(wf1_t2_ex.id, states.RUNNING) + + with db_api.transaction(): + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(2, len(sub_wf_exs)) + self.assertEqual(states.CANCELLED, sub_wf_exs[0].state) + self.assertEqual(states.RUNNING, sub_wf_exs[1].state) + + wf2_ex = sub_wf_exs[1] + + wf2_t1_ex = self._assert_single_item( + wf2_ex.task_executions, name='wf2_t1' + ) + + self.await_task_state(wf2_t1_ex.id, states.RUNNING) + + wf2_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf2_t1_ex.id + ) + + self.assertEqual(1, len(wf2_t1_action_exs)) + self.assertEqual(states.RUNNING, wf2_t1_action_exs[0].state) + + # Mark async action execution complete. + self.engine.on_action_complete( + wf2_t1_action_exs[0].id, + wf_utils.Result(data={'foo': 'bar'}) + ) + + # Wait for the workflows to succeed. + self.await_workflow_success(wf1_ex.id) + self.await_workflow_success(wf2_ex.id) + + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(2, len(sub_wf_exs)) + self.assertEqual(states.CANCELLED, sub_wf_exs[0].state) + self.assertEqual(states.SUCCESS, sub_wf_exs[1].state) + + wf2_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf2_t1_ex.id + ) + + self.assertEqual(1, len(wf2_t1_action_exs)) + self.assertEqual(states.SUCCESS, wf2_t1_action_exs[0].state) + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 1', # Mock task1 success. + 'Task 3' # Mock task3 success. + ] + ) + ) + def test_rerun_cancelled_subflow_task(self): + wb_def = """ + version: '2.0' + + name: wb1 + + workflows: + wf1: + type: direct + + tasks: + t1: + action: std.echo output="Task 1" + on-success: + - t2 + t2: + workflow: wf2 + on-success: + - t3 + t3: + action: std.echo output="Task 3" + + wf2: + type: direct + + output: + result: <% task(wf2_t1).result %> + + tasks: + wf2_t1: + action: std.async_noop + """ + + wb_service.create_workbook_v2(wb_def) + + wf1_ex = self.engine.start_workflow('wb1.wf1', {}) + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + with db_api.transaction(): + # Wait for task 1 to complete. + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + self.await_task_success(wf1_t1_ex.id) + + with db_api.transaction(): + # Wait for the async task to run. + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t2_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t2' + ) + + self.await_task_state(wf1_t2_ex.id, states.RUNNING) + + with db_api.transaction(): + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(1, len(sub_wf_exs)) + self.assertEqual(states.RUNNING, sub_wf_exs[0].state) + + wf2_ex = sub_wf_exs[0] + + wf2_t1_ex = self._assert_single_item( + wf2_ex.task_executions, + name='wf2_t1' + ) + + self.await_task_state(wf2_t1_ex.id, states.RUNNING) + + wf2_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf2_t1_ex.id + ) + + self.assertEqual(1, len(wf2_t1_action_exs)) + self.assertEqual(states.RUNNING, wf2_t1_action_exs[0].state) + + # Cancel action execution for task. + self.engine.on_action_complete( + wf2_t1_action_exs[0].id, + wf_utils.Result(cancel=True) + ) + + self.await_workflow_cancelled(wf2_ex.id) + self.await_workflow_cancelled(wf1_ex.id) + + # Resume workflow and re-run failed subworkflow task. + self.engine.rerun_workflow(wf2_t1_ex.id) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t2_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t2' + ) + + self.await_task_state(wf1_t2_ex.id, states.RUNNING) + + with db_api.transaction(): + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(1, len(sub_wf_exs)) + self.assertEqual(states.RUNNING, sub_wf_exs[0].state) + + wf2_ex = sub_wf_exs[0] + + wf2_t1_ex = self._assert_single_item( + wf2_ex.task_executions, + name='wf2_t1' + ) + + self.await_task_state(wf2_t1_ex.id, states.RUNNING) + + wf2_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf2_t1_ex.id + ) + + self.assertEqual(2, len(wf2_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf2_t1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf2_t1_action_exs[1].state) + + # Mark async action execution complete. + self.engine.on_action_complete( + wf2_t1_action_exs[1].id, + wf_utils.Result(data={'foo': 'bar'}) + ) + + # Wait for the workflows to succeed. + self.await_workflow_success(wf1_ex.id) + self.await_workflow_success(wf2_ex.id) + + sub_wf_exs = db_api.get_workflow_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(1, len(sub_wf_exs)) + self.assertEqual(states.SUCCESS, sub_wf_exs[0].state) + + wf2_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf2_t1_ex.id + ) + + self.assertEqual(2, len(wf2_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf2_t1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf2_t1_action_exs[1].state) + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 2' # Mock task2 success. + ] + ) + ) + def test_rerun_cancelled_with_items(self): + wb_def = """ + version: '2.0' + + name: wb1 + + workflows: + wf1: + type: direct + tasks: + t1: + with-items: i in <% list(range(0, 3)) %> + action: std.async_noop + on-success: + - t2 + t2: + action: std.echo output="Task 2" + """ + + wb_service.create_workbook_v2(wb_def) + + wf1_ex = self.engine.start_workflow('wb1.wf1', {}) + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(3, len(wf1_t1_action_exs)) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[1].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[2].state) + + # Cancel action execution for task. + for wf1_t1_action_ex in wf1_t1_action_exs: + self.engine.on_action_complete( + wf1_t1_action_ex.id, + wf_utils.Result(cancel=True) + ) + + self.await_workflow_cancelled(wf1_ex.id) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(3, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[1].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[2].state) + + # Resume workflow and re-run failed with items task. + self.engine.rerun_workflow(wf1_t1_ex.id, reset=False) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(6, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[1].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[2].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[3].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[4].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[5].state) + + # Mark async action execution complete. + for i in range(3, 6): + self.engine.on_action_complete( + wf1_t1_action_exs[i].id, + wf_utils.Result(data={'foo': 'bar'}) + ) + + # Wait for the workflows to succeed. + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(6, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[1].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[2].state) + self.assertEqual(states.SUCCESS, wf1_t1_action_exs[3].state) + self.assertEqual(states.SUCCESS, wf1_t1_action_exs[4].state) + self.assertEqual(states.SUCCESS, wf1_t1_action_exs[5].state) diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py new file mode 100644 index 00000000..cfbcbfed --- /dev/null +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun_cancelled.py @@ -0,0 +1,192 @@ +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock + +from oslo_config import cfg + +from mistral.actions import std_actions +from mistral.db.v2 import api as db_api +from mistral.services import workbooks as wb_service +from mistral.tests.unit.engine import base +from mistral.workflow import states +from mistral.workflow import utils as wf_utils + + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class ReverseWorkflowRerunCancelledTest(base.EngineTestCase): + + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 2', # Mock task2 success. + 'Task 3' # Mock task3 success. + ] + ) + ) + def test_rerun_cancelled_task(self): + wb_def = """ + version: '2.0' + name: wb1 + workflows: + wf1: + type: reverse + tasks: + t1: + action: std.async_noop + t2: + action: std.echo output="Task 2" + requires: + - t1 + t3: + action: std.echo output="Task 3" + requires: + - t2 + """ + + wb_service.create_workbook_v2(wb_def) + + wf1_ex = self.engine.start_workflow('wb1.wf1', {}, task_name='t3') + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(1, len(wf1_t1_action_exs)) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[0].state) + + # Cancel action execution for task. + self.engine.on_action_complete( + wf1_t1_action_exs[0].id, + wf_utils.Result(cancel=True) + ) + + self.await_workflow_cancelled(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + self.await_task_cancelled(wf1_t1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + self.assertEqual(states.CANCELLED, wf1_ex.state) + self.assertEqual("Cancelled tasks: t1", wf1_ex.state_info) + self.assertEqual(1, len(wf1_ex.task_executions)) + self.assertEqual(states.CANCELLED, wf1_t1_ex.state) + self.assertIsNone(wf1_t1_ex.state_info) + + # Resume workflow and re-run cancelled task. + self.engine.rerun_workflow(wf1_t1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_task_execs = wf1_ex.task_executions + + self.assertEqual(states.RUNNING, wf1_ex.state) + self.assertIsNone(wf1_ex.state_info) + + # Mark async action execution complete. + wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1') + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(states.RUNNING, wf1_t1_ex.state) + self.assertEqual(2, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[1].state) + + self.engine.on_action_complete( + wf1_t1_action_exs[1].id, + wf_utils.Result(data={'foo': 'bar'}) + ) + + # Wait for the workflow to succeed. + self.await_workflow_success(wf1_ex.id) + + with db_api.transaction(): + wf1_ex = db_api.get_workflow_execution(wf1_ex.id) + + wf1_task_execs = wf1_ex.task_executions + + self.assertEqual(states.SUCCESS, wf1_ex.state) + self.assertIsNone(wf1_ex.state_info) + self.assertEqual(3, len(wf1_task_execs)) + + wf1_t1_ex = self._assert_single_item(wf1_task_execs, name='t1') + wf1_t2_ex = self._assert_single_item(wf1_task_execs, name='t2') + wf1_t3_ex = self._assert_single_item(wf1_task_execs, name='t3') + + # Check action executions of task 1. + self.assertEqual(states.SUCCESS, wf1_t1_ex.state) + self.assertIsNone(wf1_t2_ex.state_info) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(2, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.SUCCESS, wf1_t1_action_exs[1].state) + + # Check action executions of task 2. + self.assertEqual(states.SUCCESS, wf1_t2_ex.state) + + wf1_t2_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t2_ex.id + ) + + self.assertEqual(1, len(wf1_t2_action_exs)) + self.assertEqual(states.SUCCESS, wf1_t2_action_exs[0].state) + + # Check action executions of task 3. + self.assertEqual(states.SUCCESS, wf1_t3_ex.state) + + wf1_t3_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t3_ex.id + ) + + self.assertEqual(1, len(wf1_t3_action_exs)) + self.assertEqual(states.SUCCESS, wf1_t3_action_exs[0].state) diff --git a/mistral/tests/unit/engine/test_task_cancel.py b/mistral/tests/unit/engine/test_task_cancel.py new file mode 100644 index 00000000..184c9593 --- /dev/null +++ b/mistral/tests/unit/engine/test_task_cancel.py @@ -0,0 +1,351 @@ +# Copyright 2015 - StackStorm, Inc. +# Copyright 2016 - Brocade Communications Systems, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +import testtools + +from mistral.actions import std_actions +from mistral.db.v2 import api as db_api +from mistral.services import workbooks as wb_service +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base +from mistral.workflow import states +from mistral.workflow import utils as wf_utils + + +class TaskCancelTest(base.EngineTestCase): + + def test_cancel_action_execution(self): + workflow = """ + version: '2.0' + + wf: + type: direct + tasks: + task1: + action: std.async_noop + on-success: + - task2 + on-error: + - task3 + on-complete: + - task4 + + task2: + action: std.noop + task3: + action: std.noop + task4: + action: std.noop + """ + + wf_service.create_workflows(workflow) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_state(wf_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + wf_ex = self._assert_single_item(wf_execs, name='wf') + + task_1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1' + ) + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id + ) + + self.assertEqual(1, len(task_1_action_exs)) + self.assertEqual(states.RUNNING, task_1_action_exs[0].state) + + self.engine.on_action_complete( + task_1_action_exs[0].id, + wf_utils.Result(cancel=True) + ) + + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1' + ) + + self.await_task_cancelled(task_1_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + task_1_ex = self._assert_single_item(task_execs, name='task1') + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id + ) + + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual("Cancelled tasks: task1", wf_ex.state_info) + self.assertEqual(1, len(task_execs)) + self.assertEqual(states.CANCELLED, task_1_ex.state) + self.assertIsNone(task_1_ex.state_info) + self.assertEqual(1, len(task_1_action_exs)) + self.assertEqual(states.CANCELLED, task_1_action_exs[0].state) + self.assertIsNone(task_1_action_exs[0].state_info) + + def test_cancel_child_workflow_action_execution(self): + workbook = """ + version: '2.0' + + name: wb + + workflows: + wf: + type: direct + tasks: + taskx: + workflow: subwf + + subwf: + type: direct + tasks: + task1: + action: std.async_noop + on-success: + - task2 + on-error: + - task3 + on-complete: + - task4 + + task2: + action: std.noop + task3: + action: std.noop + task4: + action: std.noop + """ + + wb_service.create_workbook_v2(workbook) + + wf_ex = self.engine.start_workflow('wb.wf', {}) + + self.await_workflow_state(wf_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + wf_ex = self._assert_single_item(wf_execs, name='wb.wf') + task_ex = self._assert_single_item( + wf_ex.task_executions, + name='taskx' + ) + subwf_ex = self._assert_single_item(wf_execs, name='wb.subwf') + + task_1_ex = self._assert_single_item( + subwf_ex.task_executions, + name='task1' + ) + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id + ) + + self.assertEqual(1, len(task_1_action_exs)) + self.assertEqual(states.RUNNING, task_1_action_exs[0].state) + + self.engine.on_action_complete( + task_1_action_exs[0].id, + wf_utils.Result(cancel=True) + ) + + self.await_workflow_cancelled(subwf_ex.id) + self.await_task_cancelled(task_ex.id) + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + wf_ex = self._assert_single_item(wf_execs, name='wb.wf') + task_ex = self._assert_single_item( + wf_ex.task_executions, + name='taskx' + ) + + subwf_ex = self._assert_single_item(wf_execs, name='wb.subwf') + + subwf_task_execs = subwf_ex.task_executions + + self.assertEqual(states.CANCELLED, subwf_ex.state) + self.assertEqual("Cancelled tasks: task1", subwf_ex.state_info) + self.assertEqual(1, len(subwf_task_execs)) + self.assertEqual(states.CANCELLED, task_ex.state) + self.assertEqual("Cancelled tasks: task1", task_ex.state_info) + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual("Cancelled tasks: taskx", wf_ex.state_info) + + def test_cancel_action_execution_with_task_retry(self): + workflow = """ + version: '2.0' + + wf: + type: direct + tasks: + task1: + action: std.async_noop + retry: + count: 3 + delay: 0 + on-success: + - task2 + + task2: + action: std.noop + """ + + wf_service.create_workflows(workflow) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_state(wf_ex.id, states.RUNNING) + + with db_api.transaction(): + wf_execs = db_api.get_workflow_executions() + + wf_ex = self._assert_single_item(wf_execs, name='wf') + + task_1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1' + ) + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id + ) + + self.assertEqual(1, len(task_1_action_exs)) + self.assertEqual(states.RUNNING, task_1_action_exs[0].state) + + self.engine.on_action_complete( + task_1_action_exs[0].id, + wf_utils.Result(cancel=True) + ) + + self.await_workflow_cancelled(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1' + ) + + self.await_task_cancelled(task_1_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + task_1_ex = self._assert_single_item(task_execs, name='task1') + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id + ) + + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual("Cancelled tasks: task1", wf_ex.state_info) + self.assertEqual(1, len(task_execs)) + self.assertEqual(states.CANCELLED, task_1_ex.state) + self.assertIsNone(task_1_ex.state_info) + self.assertEqual(1, len(task_1_action_exs)) + self.assertEqual(states.CANCELLED, task_1_action_exs[0].state) + self.assertIsNone(task_1_action_exs[0].state_info) + + @testtools.skip('Restore concurrency support.') + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 2' # Mock task2 success. + ] + ) + ) + def test_cancel_with_items_concurrency(self): + wb_def = """ + version: '2.0' + name: wb1 + workflows: + wf1: + type: direct + tasks: + t1: + with-items: i in <% list(range(0, 4)) %> + action: std.async_noop + concurrency: 2 + on-success: + - t2 + t2: + action: std.echo output="Task 2" + """ + + wb_service.create_workbook_v2(wb_def) + + wf1_ex = self.engine.start_workflow('wb1.wf1', {}) + + self.await_workflow_state(wf1_ex.id, states.RUNNING) + + with db_api.transaction(): + wf1_execs = db_api.get_workflow_executions() + + wf1_ex = self._assert_single_item(wf1_execs, name='wb1.wf1') + wf1_t1_ex = self._assert_single_item( + wf1_ex.task_executions, + name='t1' + ) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(2, len(wf1_t1_action_exs)) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[0].state) + self.assertEqual(states.RUNNING, wf1_t1_action_exs[1].state) + + # Cancel action execution for task. + for wf1_t1_action_ex in wf1_t1_action_exs: + self.engine.on_action_complete( + wf1_t1_action_ex.id, + wf_utils.Result(cancel=True) + ) + + self.await_task_cancelled(wf1_t1_ex.id) + self.await_workflow_cancelled(wf1_ex.id) + + wf1_t1_action_exs = db_api.get_action_executions( + task_execution_id=wf1_t1_ex.id + ) + + self.assertEqual(2, len(wf1_t1_action_exs)) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[0].state) + self.assertEqual(states.CANCELLED, wf1_t1_action_exs[1].state) diff --git a/mistral/tests/unit/engine/test_task_defaults.py b/mistral/tests/unit/engine/test_task_defaults.py index 52bd1438..52fab3e0 100644 --- a/mistral/tests/unit/engine/test_task_defaults.py +++ b/mistral/tests/unit/engine/test_task_defaults.py @@ -13,7 +13,9 @@ # limitations under the License. import datetime as dt +import mock from oslo_config import cfg +import requests from mistral.db.v2 import api as db_api from mistral.services import scheduler @@ -28,6 +30,12 @@ cfg.CONF.set_default('auth_enable', False, group='pecan') class TaskDefaultsDirectWorkflowEngineTest(base.EngineTestCase): + + @mock.patch.object( + requests, + 'request', + mock.MagicMock(side_effect=Exception()) + ) def test_task_defaults_on_error(self): wf_text = """--- version: '2.0' diff --git a/mistral/tests/unit/engine/test_workflow_cancel.py b/mistral/tests/unit/engine/test_workflow_cancel.py index f378fc99..8a8ae8a6 100644 --- a/mistral/tests/unit/engine/test_workflow_cancel.py +++ b/mistral/tests/unit/engine/test_workflow_cancel.py @@ -451,8 +451,8 @@ class WorkflowCancelTest(base.EngineTestCase): self.await_workflow_cancelled(subwf_exs[0].id) self.await_workflow_error(subwf_exs[1].id) - self.await_task_error(task_ex.id) - self.await_workflow_error(wf_ex.id) + self.await_task_cancelled(task_ex.id) + self.await_workflow_cancelled(wf_ex.id) wf_execs = db_api.get_workflow_executions() @@ -464,10 +464,10 @@ class WorkflowCancelTest(base.EngineTestCase): self.assertEqual("Cancelled by user.", subwf_exs[0].state_info) self.assertEqual(states.ERROR, subwf_exs[1].state) self.assertEqual("Failed by user.", subwf_exs[1].state_info) - self.assertEqual(states.ERROR, task_ex.state) - self.assertIn("failed", task_ex.state_info) - self.assertEqual(states.ERROR, wf_ex.state) - self.assertIn("Failed by user.", wf_ex.state_info) + self.assertEqual(states.CANCELLED, task_ex.state) + self.assertIn("cancelled", task_ex.state_info) + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual("Cancelled tasks: taskx", wf_ex.state_info) def test_fail_then_cancel_with_items_child_workflow(self): workbook = """ @@ -520,8 +520,8 @@ class WorkflowCancelTest(base.EngineTestCase): self.await_workflow_cancelled(subwf_exs[0].id) self.await_workflow_error(subwf_exs[1].id) - self.await_task_error(task_ex.id) - self.await_workflow_error(wf_ex.id) + self.await_task_cancelled(task_ex.id) + self.await_workflow_cancelled(wf_ex.id) wf_execs = db_api.get_workflow_executions() @@ -533,7 +533,7 @@ class WorkflowCancelTest(base.EngineTestCase): self.assertEqual("Cancelled by user.", subwf_exs[0].state_info) self.assertEqual(states.ERROR, subwf_exs[1].state) self.assertEqual("Failed by user.", subwf_exs[1].state_info) - self.assertEqual(states.ERROR, task_ex.state) - self.assertIn("failed", task_ex.state_info) - self.assertEqual(states.ERROR, wf_ex.state) - self.assertIn("Failed by user.", wf_ex.state_info) + self.assertEqual(states.CANCELLED, task_ex.state) + self.assertIn("cancelled", task_ex.state_info) + self.assertEqual(states.CANCELLED, wf_ex.state) + self.assertEqual("Cancelled tasks: taskx", wf_ex.state_info) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index 712d49e8..80b8ae80 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -240,7 +240,7 @@ class DirectWorkflowController(base.WorkflowController): t_names_and_params = [] - if states.is_completed(t_state): + if states.is_completed(t_state) and not states.is_cancelled(t_state): t_names_and_params += ( self._find_next_tasks_for_clause( self.wf_spec.get_on_complete_clause(t_name), diff --git a/mistral/workflow/states.py b/mistral/workflow/states.py index 983653f6..1c657e89 100644 --- a/mistral/workflow/states.py +++ b/mistral/workflow/states.py @@ -42,7 +42,7 @@ _VALID_TRANSITIONS = { RUNNING_DELAYED: [RUNNING, ERROR, CANCELLED], PAUSED: [RUNNING, ERROR, CANCELLED], SUCCESS: [], - CANCELLED: [], + CANCELLED: [RUNNING], ERROR: [RUNNING] } @@ -59,6 +59,10 @@ def is_completed(state): return state in [SUCCESS, ERROR, CANCELLED] +def is_cancelled(state): + return state == CANCELLED + + def is_running(state): return state in [RUNNING, RUNNING_DELAYED] diff --git a/mistral/workflow/with_items.py b/mistral/workflow/with_items.py index bc91c9e2..f4a4de4a 100644 --- a/mistral/workflow/with_items.py +++ b/mistral/workflow/with_items.py @@ -44,8 +44,12 @@ def get_count(task_ex): def is_completed(task_ex): - execs = list(filter(lambda t: t.accepted, task_ex.executions)) + find_cancel = lambda x: x.accepted and x.state == states.CANCELLED + if list(filter(find_cancel, task_ex.executions)): + return True + + execs = list(filter(lambda t: t.accepted, task_ex.executions)) count = get_count(task_ex) or 1 return count == len(execs) @@ -69,10 +73,10 @@ def get_final_state(task_ex): find_error = lambda x: x.accepted and x.state == states.ERROR find_cancel = lambda x: x.accepted and x.state == states.CANCELLED - if list(filter(find_error, task_ex.executions)): - return states.ERROR - elif list(filter(find_cancel, task_ex.executions)): + if list(filter(find_cancel, task_ex.executions)): return states.CANCELLED + elif list(filter(find_error, task_ex.executions)): + return states.ERROR else: return states.SUCCESS