From 0e7be02a4812f6d2016e9bdcd31be8135c4d095e Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Wed, 16 Dec 2015 05:56:28 +0000 Subject: [PATCH] Allow env update on resume and rerun workflows Add optional env kwarg to resume_workflow and rerun_workflow in default engine. Add database API to update env in the params and context of the existing workflow execution. The updated env will be merged into existing __env of workflow context. For existing task, the env will be merged into __env of in_context. Partially implements: blueprint mistral-rerun-update-env Change-Id: Id378762d90ca5fc62d14e22134f18f9bce71a123 --- mistral/engine/default_engine.py | 15 +- mistral/services/workflows.py | 18 ++ .../unit/engine/test_direct_workflow_rerun.py | 282 ++++++++++++++++++ .../engine/test_reverse_workflow_rerun.py | 141 +++++++++ .../tests/unit/engine/test_workflow_resume.py | 100 +++++++ .../unit/services/test_workflow_service.py | 104 +++++++ mistral/workflow/base.py | 45 ++- mistral/workflow/direct_workflow.py | 6 +- mistral/workflow/reverse_workflow.py | 6 +- 9 files changed, 700 insertions(+), 17 deletions(-) diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index c95a4c02..404959fa 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -28,6 +28,7 @@ from mistral.engine import task_handler from mistral.engine import utils as eng_utils from mistral.engine import workflow_handler as wf_handler from mistral.services import action_manager as a_m +from mistral.services import workflows as wf_service from mistral import utils as u from mistral.utils import wf_trace from mistral.workbook import parser as spec_parser @@ -283,13 +284,15 @@ class DefaultEngine(base.Engine, coordination.Service): return wf_ex - def _continue_workflow(self, wf_ex, task_ex=None, reset=True): + def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None): + wf_ex = wf_service.update_workflow_execution_env(wf_ex, env) + wf_handler.set_execution_state(wf_ex, states.RUNNING) wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex) # Calculate commands to process next. - cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset) + cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env) # When resuming a workflow we need to ignore all 'pause' # commands because workflow controller takes tasks that @@ -321,7 +324,7 @@ class DefaultEngine(base.Engine, coordination.Service): return wf_ex.get_clone() @u.log_exec(LOG) - def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True): + def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None): try: with db_api.transaction(): wf_ex = self._lock_workflow_execution(wf_ex_id) @@ -334,7 +337,7 @@ class DefaultEngine(base.Engine, coordination.Service): if wf_ex.state == states.PAUSED: return wf_ex.get_clone() - return self._continue_workflow(wf_ex, task_ex, reset) + return self._continue_workflow(wf_ex, task_ex, reset, env=env) except Exception as e: LOG.error( "Failed to rerun execution id=%s at task=%s: %s\n%s", @@ -344,7 +347,7 @@ class DefaultEngine(base.Engine, coordination.Service): raise e @u.log_exec(LOG) - def resume_workflow(self, wf_ex_id): + def resume_workflow(self, wf_ex_id, env=None): try: with db_api.transaction(): wf_ex = self._lock_workflow_execution(wf_ex_id) @@ -352,7 +355,7 @@ class DefaultEngine(base.Engine, coordination.Service): if wf_ex.state != states.PAUSED: return wf_ex.get_clone() - return self._continue_workflow(wf_ex) + return self._continue_workflow(wf_ex, env=env) except Exception as e: LOG.error( "Failed to resume execution id=%s: %s\n%s", diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index 020a9183..657e5831 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -16,6 +16,8 @@ from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral import utils from mistral.workbook import parser as spec_parser +from mistral.workflow import data_flow +from mistral.workflow import states STD_WF_PATH = 'resources/workflows' @@ -102,6 +104,22 @@ def update_workflows(definition, scope='private', identifier=None): return db_wfs +def update_workflow_execution_env(wf_ex, env): + if not env: + return wf_ex + + if wf_ex.state not in [states.IDLE, states.PAUSED, states.ERROR]: + raise exc.NotAllowedException( + 'Updating env to workflow execution is only permitted if ' + 'it is in idle, paused, or re-runnable state.' + ) + + wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env) + data_flow.add_environment_to_context(wf_ex) + + return wf_ex + + def _get_workflow_values(wf_spec, definition, scope, is_system=False): values = { 'name': wf_spec.get_name(), diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index 17d733a3..b7a79f60 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -48,6 +48,33 @@ workflows: action: std.echo output="Task 3" """ +SIMPLE_WORKBOOK_DIFF_ENV_VAR = """ +--- +version: '2.0' +name: wb1 +workflows: + wf1: + type: direct + tasks: + t10: + action: std.echo output="Task 10" + on-success: + - t21 + - t30 + t21: + action: std.echo output=<% env().var1 %> + on-success: + - t22 + t22: + action: std.echo output="<% env().var2 %>" + on-success: + - t30 + t30: + join: all + action: std.echo output="<% env().var3 %>" + wait-before: 1 +""" + WITH_ITEMS_WORKBOOK = """ --- version: '2.0' @@ -67,6 +94,24 @@ workflows: action: std.echo output="Task 2" """ +WITH_ITEMS_WORKBOOK_DIFF_ENV_VAR = """ +--- +version: '2.0' +name: wb3 +workflows: + wf1: + type: direct + tasks: + t1: + with-items: i in <% list(range(0, 3)) %> + action: std.echo output="Task 1.<% $.i %> [<% env().var1 %>]" + publish: + v1: <% $.t1 %> + on-success: + - t2 + t2: + action: std.echo output="Task 2" +""" WITH_ITEMS_WORKBOOK_CONCURRENCY = """ --- @@ -192,6 +237,143 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(1, len(task_3_action_exs)) self.assertEqual(states.SUCCESS, task_3_action_exs[0].state) + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 10', # Mock task10 success for first run. + exc.ActionException(), # Mock task21 exception for first run. + 'Task 21', # Mock task21 success for rerun. + 'Task 22', # Mock task22 success. + 'Task 30' # Mock task30 success. + ] + ) + ) + def test_rerun_diff_env_vars(self): + wb_service.create_workbook_v2(SIMPLE_WORKBOOK_DIFF_ENV_VAR) + + # Initial environment variables for the workflow execution. + env = { + 'var1': 'fee fi fo fum', + 'var2': 'mirror mirror', + 'var3': 'heigh-ho heigh-ho' + } + + # Run workflow and fail task. + wf_ex = self.engine.start_workflow('wb1.wf1', {}, env=env) + self._await(lambda: self.is_execution_error(wf_ex.id)) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(3, len(wf_ex.task_executions)) + self.assertDictEqual(env, wf_ex.params['env']) + self.assertDictEqual(env, wf_ex.context['__env']) + + task_exs = wf_ex.task_executions + task_10_ex = self._assert_single_item(task_exs, name='t10') + task_21_ex = self._assert_single_item(task_exs, name='t21') + task_30_ex = self._assert_single_item(task_exs, name='t30') + + self.assertEqual(states.SUCCESS, task_10_ex.state) + self.assertEqual(states.ERROR, task_21_ex.state) + self.assertIsNotNone(task_21_ex.state_info) + self.assertEqual(states.WAITING, task_30_ex.state) + + # Update env in workflow execution with the following. + updated_env = { + 'var1': 'Task 21', + 'var2': 'Task 22', + 'var3': 'Task 30' + } + + # Resume workflow and re-run failed task. + self.engine.rerun_workflow(wf_ex.id, task_21_ex.id, env=updated_env) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertDictEqual(updated_env, wf_ex.params['env']) + self.assertDictEqual(updated_env, wf_ex.context['__env']) + + # Wait for the workflow to succeed. + self._await(lambda: self.is_execution_success(wf_ex.id)) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(4, len(wf_ex.task_executions)) + + task_exs = wf_ex.task_executions + task_10_ex = self._assert_single_item(task_exs, name='t10') + task_21_ex = self._assert_single_item(task_exs, name='t21') + task_22_ex = self._assert_single_item(task_exs, name='t22') + task_30_ex = self._assert_single_item(task_exs, name='t30') + + # Check action executions of task 10. + self.assertEqual(states.SUCCESS, task_10_ex.state) + + task_10_action_exs = db_api.get_action_executions( + task_execution_id=task_10_ex.id) + + self.assertEqual(1, len(task_10_action_exs)) + self.assertEqual(states.SUCCESS, task_10_action_exs[0].state) + + self.assertDictEqual( + {'output': 'Task 10'}, + task_10_action_exs[0].input + ) + + # Check action executions of task 21. + self.assertEqual(states.SUCCESS, task_21_ex.state) + self.assertIsNone(task_21_ex.state_info) + + task_21_action_exs = db_api.get_action_executions( + task_execution_id=task_21_ex.id) + + self.assertEqual(2, len(task_21_action_exs)) + self.assertEqual(states.ERROR, task_21_action_exs[0].state) + self.assertEqual(states.SUCCESS, task_21_action_exs[1].state) + + self.assertDictEqual( + {'output': env['var1']}, + task_21_action_exs[0].input + ) + + self.assertDictEqual( + {'output': updated_env['var1']}, + task_21_action_exs[1].input + ) + + # Check action executions of task 22. + self.assertEqual(states.SUCCESS, task_22_ex.state) + + task_22_action_exs = db_api.get_action_executions( + task_execution_id=task_22_ex.id) + + self.assertEqual(1, len(task_22_action_exs)) + self.assertEqual(states.SUCCESS, task_22_action_exs[0].state) + + self.assertDictEqual( + {'output': updated_env['var2']}, + task_22_action_exs[0].input + ) + + # Check action executions of task 30. + self.assertEqual(states.SUCCESS, task_30_ex.state) + + task_30_action_exs = db_api.get_action_executions( + task_execution_id=task_30_ex.id) + + self.assertEqual(1, len(task_30_action_exs)) + self.assertEqual(states.SUCCESS, task_30_action_exs[0].state) + + self.assertDictEqual( + {'output': updated_env['var3']}, + task_30_action_exs[0].input + ) + @mock.patch.object( std_actions.EchoAction, 'run', @@ -379,6 +561,106 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(1, len(task_2_action_exs)) + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + exc.ActionException(), # Mock task1 exception for initial run. + 'Task 1.1', # Mock task1 success for initial run. + exc.ActionException(), # Mock task1 exception for initial run. + 'Task 1.0', # Mock task1 success for rerun. + 'Task 1.2', # Mock task1 success for rerun. + 'Task 2' # Mock task2 success. + ] + ) + ) + def test_rerun_with_items_diff_env_vars(self): + wb_service.create_workbook_v2(WITH_ITEMS_WORKBOOK_DIFF_ENV_VAR) + + # Initial environment variables for the workflow execution. + env = { + 'var1': 'fee fi fo fum' + } + + # Run workflow and fail task. + wf_ex = self.engine.start_workflow('wb3.wf1', {}, env=env) + self._await(lambda: self.is_execution_error(wf_ex.id)) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(1, len(wf_ex.task_executions)) + + task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') + + self.assertEqual(states.ERROR, task_1_ex.state) + self.assertIsNotNone(task_1_ex.state_info) + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id) + + self.assertEqual(3, len(task_1_action_exs)) + + # Update env in workflow execution with the following. + updated_env = { + 'var1': 'foobar' + } + + # Resume workflow and re-run failed task. + self.engine.rerun_workflow( + wf_ex.id, + task_1_ex.id, + reset=False, + env=updated_env + ) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + + self._await(lambda: self.is_execution_success(wf_ex.id), delay=10) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(2, len(wf_ex.task_executions)) + + task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') + task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + + # Check action executions of task 1. + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertIsNone(task_1_ex.state_info) + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id) + + expected_inputs = [ + 'Task 1.0 [%s]' % env['var1'], # Task 1 item 0 (error). + 'Task 1.1 [%s]' % env['var1'], # Task 1 item 1. + 'Task 1.2 [%s]' % env['var1'], # Task 1 item 2 (error). + 'Task 1.0 [%s]' % updated_env['var1'], # Task 1 item 0 (rerun). + 'Task 1.2 [%s]' % updated_env['var1'] # Task 1 item 2 (rerun). + ] + + result = zip(task_1_action_exs, expected_inputs) + + for (action_ex, expected_input) in result: + self.assertDictEqual( + {'output': expected_input}, + action_ex.input + ) + + # Check action executions of task 2. + self.assertEqual(states.SUCCESS, task_2_ex.state) + + task_2_action_exs = db_api.get_action_executions( + task_execution_id=task_2_ex.id) + + self.assertEqual(1, len(task_2_action_exs)) + @mock.patch.object( std_actions.EchoAction, 'run', diff --git a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py index afe65b11..64d86911 100644 --- a/mistral/tests/unit/engine/test_reverse_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_reverse_workflow_rerun.py @@ -49,6 +49,26 @@ workflows: - t2 """ +SIMPLE_WORKBOOK_DIFF_ENV_VAR = """ +--- +version: '2.0' +name: wb1 +workflows: + wf1: + type: reverse + tasks: + t1: + action: std.echo output="Task 1" + t2: + action: std.echo output=<% env().var1 %> + requires: + - t1 + t3: + action: std.echo output=<% env().var2 %> + requires: + - t2 +""" + class ReverseWorkflowRerunTest(base.EngineTestCase): @@ -131,6 +151,127 @@ class ReverseWorkflowRerunTest(base.EngineTestCase): self.assertEqual(1, len(task_3_action_exs)) self.assertEqual(states.SUCCESS, task_3_action_exs[0].state) + @mock.patch.object( + std_actions.EchoAction, + 'run', + mock.MagicMock( + side_effect=[ + 'Task 1', # Mock task1 success for initial run. + exc.ActionException(), # Mock task2 exception for initial run. + 'Task 2', # Mock task2 success for rerun. + 'Task 3' # Mock task3 success. + ] + ) + ) + def test_rerun_diff_env_vars(self): + wb_service.create_workbook_v2(SIMPLE_WORKBOOK_DIFF_ENV_VAR) + + # Initial environment variables for the workflow execution. + env = { + 'var1': 'fee fi fo fum', + 'var2': 'foobar' + } + + # Run workflow and fail task. + wf_ex = self.engine.start_workflow( + 'wb1.wf1', + {}, + task_name='t3', + env=env + ) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(2, len(wf_ex.task_executions)) + self.assertDictEqual(env, wf_ex.params['env']) + self.assertDictEqual(env, wf_ex.context['__env']) + + task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') + task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.ERROR, task_2_ex.state) + self.assertIsNotNone(task_2_ex.state_info) + + # Update env in workflow execution with the following. + updated_env = { + 'var1': 'Task 2', + 'var2': 'Task 3' + } + + # Resume workflow and re-run failed task. + self.engine.rerun_workflow(wf_ex.id, task_2_ex.id, env=updated_env) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertDictEqual(updated_env, wf_ex.params['env']) + self.assertDictEqual(updated_env, wf_ex.context['__env']) + + # Wait for the workflow to succeed. + self._await(lambda: self.is_execution_success(wf_ex.id)) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.SUCCESS, wf_ex.state) + self.assertIsNone(wf_ex.state_info) + self.assertEqual(3, len(wf_ex.task_executions)) + + task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1') + task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2') + task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3') + + # Check action executions of task 1. + self.assertEqual(states.SUCCESS, task_1_ex.state) + + task_1_action_exs = db_api.get_action_executions( + task_execution_id=task_1_ex.id) + + self.assertEqual(1, len(task_1_action_exs)) + self.assertEqual(states.SUCCESS, task_1_action_exs[0].state) + + self.assertDictEqual( + {'output': 'Task 1'}, + task_1_action_exs[0].input + ) + + # Check action executions of task 2. + self.assertEqual(states.SUCCESS, task_2_ex.state) + self.assertIsNone(task_2_ex.state_info) + + task_2_action_exs = db_api.get_action_executions( + task_execution_id=task_2_ex.id) + + self.assertEqual(2, len(task_2_action_exs)) + self.assertEqual(states.ERROR, task_2_action_exs[0].state) + self.assertEqual(states.SUCCESS, task_2_action_exs[1].state) + + self.assertDictEqual( + {'output': env['var1']}, + task_2_action_exs[0].input + ) + + self.assertDictEqual( + {'output': updated_env['var1']}, + task_2_action_exs[1].input + ) + + # Check action executions of task 3. + self.assertEqual(states.SUCCESS, task_3_ex.state) + + task_3_action_exs = db_api.get_action_executions( + task_execution_id=task_3_ex.id) + + self.assertEqual(1, len(task_3_action_exs)) + self.assertEqual(states.SUCCESS, task_3_action_exs[0].state) + + self.assertDictEqual( + {'output': updated_env['var2']}, + task_3_action_exs[0].input + ) + @mock.patch.object( std_actions.EchoAction, 'run', diff --git a/mistral/tests/unit/engine/test_workflow_resume.py b/mistral/tests/unit/engine/test_workflow_resume.py index b69a1a97..90445038 100644 --- a/mistral/tests/unit/engine/test_workflow_resume.py +++ b/mistral/tests/unit/engine/test_workflow_resume.py @@ -21,6 +21,7 @@ from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base from mistral.workbook import parser as spec_parser +from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils @@ -52,6 +53,33 @@ workflows: """ +RESUME_WORKBOOK_DIFF_ENV_VAR = """ +--- +version: '2.0' + +name: wb + +workflows: + wf1: + type: direct + + tasks: + task1: + action: std.echo output="Hi!" + on-complete: + - task2 + + task2: + action: std.echo output=<% env().var1 %> + pause-before: true + on-complete: + - task3 + + task3: + action: std.echo output=<% env().var2 %> +""" + + RESUME_WORKBOOK_REVERSE = """ --- version: '2.0' @@ -350,3 +378,75 @@ class WorkflowResumeTest(base.EngineTestCase): ) mock_fw.assert_called_once_with(wf_ex.id, err) + + def test_resume_diff_env_vars(self): + wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR) + + # Initial environment variables for the workflow execution. + env = { + 'var1': 'fee fi fo fum', + 'var2': 'foobar' + } + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1', {}, env=env) + + self._await(lambda: self.is_execution_paused(wf_ex.id)) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_1_ex = self._assert_single_item( + wf_ex.task_executions, + name='task1' + ) + + task_2_ex = self._assert_single_item( + wf_ex.task_executions, + name='task2' + ) + + self.assertEqual(states.PAUSED, wf_ex.state) + self.assertEqual(2, len(wf_ex.task_executions)) + self.assertDictEqual(env, wf_ex.params['env']) + self.assertDictEqual(env, wf_ex.context['__env']) + self.assertEqual(states.SUCCESS, task_1_ex.state) + self.assertEqual(states.IDLE, task_2_ex.state) + + # Update env in workflow execution with the following. + updated_env = { + 'var1': 'Task 2', + 'var2': 'Task 3' + } + + # Update the env variables and resume workflow. + self.engine.resume_workflow(wf_ex.id, env=updated_env) + self._await(lambda: self.is_execution_success(wf_ex.id)) + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertDictEqual(updated_env, wf_ex.params['env']) + self.assertDictEqual(updated_env, wf_ex.context['__env']) + self.assertEqual(3, len(wf_ex.task_executions)) + + # Check result of task2. + task_2_ex = self._assert_single_item( + wf_ex.task_executions, + name='task2' + ) + + self.assertEqual(states.SUCCESS, task_2_ex.state) + + task_2_result = data_flow.get_task_execution_result(task_2_ex) + + self.assertEqual(updated_env['var1'], task_2_result) + + # Check result of task3. + task_3_ex = self._assert_single_item( + wf_ex.task_executions, + name='task3' + ) + + self.assertEqual(states.SUCCESS, task_3_ex.state) + + task_3_result = data_flow.get_task_execution_result(task_3_ex) + + self.assertEqual(updated_env['var2'], task_3_result) diff --git a/mistral/tests/unit/services/test_workflow_service.py b/mistral/tests/unit/services/test_workflow_service.py index a650b3a9..fb55c03d 100644 --- a/mistral/tests/unit/services/test_workflow_service.py +++ b/mistral/tests/unit/services/test_workflow_service.py @@ -1,4 +1,5 @@ # Copyright 2014 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,14 +13,18 @@ # See the License for the specific language governing permissions and # limitations under the License. +import copy + from oslo_config import cfg from oslo_log import log as logging +from mistral.db.v2.sqlalchemy import api as db_api from mistral import exceptions as exc from mistral.services import workflows as wf_service from mistral.tests.unit import base from mistral import utils from mistral.workbook import parser as spec_parser +from mistral.workflow import states LOG = logging.getLogger(__name__) @@ -177,3 +182,102 @@ class WorkflowServiceTest(base.DbTestCase): ) self.assertIn("Invalid DSL", exception.message) + + def test_update_workflow_execution_env(self): + wf_exec_template = { + 'spec': {}, + 'start_params': {'task': 'my_task1'}, + 'state': 'PAUSED', + 'state_info': None, + 'params': {'env': {'k1': 'abc'}}, + 'created_at': None, + 'updated_at': None, + 'context': {'__env': {'k1': 'fee fi fo fum'}}, + 'task_id': None, + 'trust_id': None, + 'description': None, + 'output': None + } + + states_permitted = [ + states.IDLE, + states.PAUSED, + states.ERROR + ] + + update_env = {'k1': 'foobar'} + + for state in states_permitted: + wf_exec = copy.deepcopy(wf_exec_template) + wf_exec['state'] = state + + with db_api.transaction(): + created = db_api.create_workflow_execution(wf_exec) + + self.assertIsNone(created.updated_at) + + updated = wf_service.update_workflow_execution_env( + created, + update_env + ) + + self.assertDictEqual(update_env, updated.params['env']) + self.assertDictEqual(update_env, updated.context['__env']) + + fetched = db_api.get_workflow_execution(created.id) + + self.assertEqual(updated, fetched) + self.assertIsNotNone(fetched.updated_at) + + def test_update_workflow_execution_env_wrong_state(self): + wf_exec_template = { + 'spec': {}, + 'start_params': {'task': 'my_task1'}, + 'state': 'PAUSED', + 'state_info': None, + 'params': {'env': {'k1': 'abc'}}, + 'created_at': None, + 'updated_at': None, + 'context': {'__env': {'k1': 'fee fi fo fum'}}, + 'task_id': None, + 'trust_id': None, + 'description': None, + 'output': None + } + + states_not_permitted = [ + states.RUNNING, + states.RUNNING_DELAYED, + states.SUCCESS, + states.WAITING + ] + + update_env = {'k1': 'foobar'} + + for state in states_not_permitted: + wf_exec = copy.deepcopy(wf_exec_template) + wf_exec['state'] = state + + with db_api.transaction(): + created = db_api.create_workflow_execution(wf_exec) + + self.assertIsNone(created.updated_at) + + self.assertRaises( + exc.NotAllowedException, + wf_service.update_workflow_execution_env, + created, + update_env + ) + + fetched = db_api.get_workflow_execution(created.id) + + self.assertDictEqual( + wf_exec['params']['env'], + fetched.params['env'] + ) + + self.assertDictEqual( + wf_exec['context']['__env'], + fetched.context['__env'] + ) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 33daed3f..61bde005 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -48,7 +48,18 @@ class WorkflowController(object): self.wf_ex = wf_ex self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - def continue_workflow(self, task_ex=None, reset=True): + def _update_task_ex_env(self, task_ex, env): + if not env: + return task_ex + + task_ex.in_context['__env'] = u.merge_dicts( + task_ex.in_context['__env'], + env + ) + + return task_ex + + def continue_workflow(self, task_ex=None, reset=True, env=None): """Calculates a list of commands to continue the workflow. Given a workflow specification this method makes required analysis @@ -57,6 +68,7 @@ class WorkflowController(object): :param: task_ex: Task execution to rerun. :param: reset: If true, then purge action executions for the tasks. + :param env: A set of environment variables to overwrite. :return: List of workflow commands (instances of mistral.workflow.commands.WorkflowCommand). """ @@ -64,9 +76,9 @@ class WorkflowController(object): return [] if task_ex: - return self._get_rerun_commands([task_ex], reset) + return self._get_rerun_commands([task_ex], reset, env=env) - return self._find_next_commands() + return self._find_next_commands(env=env) @abc.abstractmethod def is_error_handled_for(self, task_ex): @@ -97,11 +109,21 @@ class WorkflowController(object): def _get_task_inbound_context(self, task_spec): upstream_task_execs = self._get_upstream_task_executions(task_spec) - return u.merge_dicts( + upstream_ctx = data_flow.evaluate_upstream_context(upstream_task_execs) + + ctx = u.merge_dicts( copy.deepcopy(self.wf_ex.context), - data_flow.evaluate_upstream_context(upstream_task_execs) + upstream_ctx ) + if self.wf_ex.context: + ctx['__env'] = u.merge_dicts( + copy.deepcopy(upstream_ctx.get('__env', {})), + copy.deepcopy(self.wf_ex.context.get('__env', {})) + ) + + return ctx + @abc.abstractmethod def _get_upstream_task_executions(self, task_spec): """Gets workflow upstream tasks for the given task. @@ -112,11 +134,13 @@ class WorkflowController(object): raise NotImplementedError @abc.abstractmethod - def _find_next_commands(self): + def _find_next_commands(self, env=None): """Finds commands that should run next. A concrete algorithm of finding such tasks depends on a concrete workflow controller. + + :param env: A set of environment variables to overwrite. :return: List of workflow commands. """ # Add all tasks in IDLE state. @@ -125,15 +149,22 @@ class WorkflowController(object): states.IDLE ) + for task_ex in idle_tasks: + self._update_task_ex_env(task_ex, env) + return [commands.RunExistingTask(t) for t in idle_tasks] - def _get_rerun_commands(self, task_exs, reset=True): + def _get_rerun_commands(self, task_exs, reset=True, env=None): """Get commands to rerun existing task executions. :param task_exs: List of task executions. :param reset: If true, then purge action executions for the tasks. + :param env: A set of environment variables to overwrite. :return: List of workflow commands. """ + for task_ex in task_exs: + self._update_task_ex_env(task_ex, env) + cmds = [commands.RunExistingTask(t_e, reset) for t_e in task_exs] LOG.debug("Found commands: %s" % cmds) diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index edd2d6b5..d4aef7c8 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -65,8 +65,10 @@ class DirectWorkflowController(base.WorkflowController): self.wf_spec.get_tasks()[t_ex_candidate.name] ) - def _find_next_commands(self): - cmds = super(DirectWorkflowController, self)._find_next_commands() + def _find_next_commands(self, env=None): + cmds = super(DirectWorkflowController, self)._find_next_commands( + env=env + ) if not self.wf_ex.task_executions: return self._find_start_commands() diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 96ba95ed..c3afa18f 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -40,13 +40,15 @@ class ReverseWorkflowController(base.WorkflowController): __workflow_type__ = "reverse" - def _find_next_commands(self): + def _find_next_commands(self, env=None): """Finds all tasks with resolved dependencies. This method finds all tasks with resolved dependencies and returns them in the form of workflow commands. """ - cmds = super(ReverseWorkflowController, self)._find_next_commands() + cmds = super(ReverseWorkflowController, self)._find_next_commands( + env=env + ) task_specs = self._find_task_specs_with_satisfied_dependencies()