Allow env update on resume and rerun workflows

Add optional env kwarg to resume_workflow and rerun_workflow in default
engine. Add database API to update env in the params and context of the
existing workflow execution. The updated env will be merged into existing
__env of workflow context. For existing task, the env will be merged into
__env of in_context.

Partially implements: blueprint mistral-rerun-update-env

Change-Id: Id378762d90ca5fc62d14e22134f18f9bce71a123
This commit is contained in:
Winson Chan 2015-12-16 05:56:28 +00:00
parent 7822e2fce2
commit 0e7be02a48
9 changed files with 700 additions and 17 deletions

View File

@ -28,6 +28,7 @@ from mistral.engine import task_handler
from mistral.engine import utils as eng_utils
from mistral.engine import workflow_handler as wf_handler
from mistral.services import action_manager as a_m
from mistral.services import workflows as wf_service
from mistral import utils as u
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
@ -283,13 +284,15 @@ class DefaultEngine(base.Engine, coordination.Service):
return wf_ex
def _continue_workflow(self, wf_ex, task_ex=None, reset=True):
def _continue_workflow(self, wf_ex, task_ex=None, reset=True, env=None):
wf_ex = wf_service.update_workflow_execution_env(wf_ex, env)
wf_handler.set_execution_state(wf_ex, states.RUNNING)
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
# Calculate commands to process next.
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset)
cmds = wf_ctrl.continue_workflow(task_ex=task_ex, reset=reset, env=env)
# When resuming a workflow we need to ignore all 'pause'
# commands because workflow controller takes tasks that
@ -321,7 +324,7 @@ class DefaultEngine(base.Engine, coordination.Service):
return wf_ex.get_clone()
@u.log_exec(LOG)
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True):
def rerun_workflow(self, wf_ex_id, task_ex_id, reset=True, env=None):
try:
with db_api.transaction():
wf_ex = self._lock_workflow_execution(wf_ex_id)
@ -334,7 +337,7 @@ class DefaultEngine(base.Engine, coordination.Service):
if wf_ex.state == states.PAUSED:
return wf_ex.get_clone()
return self._continue_workflow(wf_ex, task_ex, reset)
return self._continue_workflow(wf_ex, task_ex, reset, env=env)
except Exception as e:
LOG.error(
"Failed to rerun execution id=%s at task=%s: %s\n%s",
@ -344,7 +347,7 @@ class DefaultEngine(base.Engine, coordination.Service):
raise e
@u.log_exec(LOG)
def resume_workflow(self, wf_ex_id):
def resume_workflow(self, wf_ex_id, env=None):
try:
with db_api.transaction():
wf_ex = self._lock_workflow_execution(wf_ex_id)
@ -352,7 +355,7 @@ class DefaultEngine(base.Engine, coordination.Service):
if wf_ex.state != states.PAUSED:
return wf_ex.get_clone()
return self._continue_workflow(wf_ex)
return self._continue_workflow(wf_ex, env=env)
except Exception as e:
LOG.error(
"Failed to resume execution id=%s: %s\n%s",

View File

@ -16,6 +16,8 @@ from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral import utils
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
STD_WF_PATH = 'resources/workflows'
@ -102,6 +104,22 @@ def update_workflows(definition, scope='private', identifier=None):
return db_wfs
def update_workflow_execution_env(wf_ex, env):
if not env:
return wf_ex
if wf_ex.state not in [states.IDLE, states.PAUSED, states.ERROR]:
raise exc.NotAllowedException(
'Updating env to workflow execution is only permitted if '
'it is in idle, paused, or re-runnable state.'
)
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
data_flow.add_environment_to_context(wf_ex)
return wf_ex
def _get_workflow_values(wf_spec, definition, scope, is_system=False):
values = {
'name': wf_spec.get_name(),

View File

@ -48,6 +48,33 @@ workflows:
action: std.echo output="Task 3"
"""
SIMPLE_WORKBOOK_DIFF_ENV_VAR = """
---
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t10:
action: std.echo output="Task 10"
on-success:
- t21
- t30
t21:
action: std.echo output=<% env().var1 %>
on-success:
- t22
t22:
action: std.echo output="<% env().var2 %>"
on-success:
- t30
t30:
join: all
action: std.echo output="<% env().var3 %>"
wait-before: 1
"""
WITH_ITEMS_WORKBOOK = """
---
version: '2.0'
@ -67,6 +94,24 @@ workflows:
action: std.echo output="Task 2"
"""
WITH_ITEMS_WORKBOOK_DIFF_ENV_VAR = """
---
version: '2.0'
name: wb3
workflows:
wf1:
type: direct
tasks:
t1:
with-items: i in <% list(range(0, 3)) %>
action: std.echo output="Task 1.<% $.i %> [<% env().var1 %>]"
publish:
v1: <% $.t1 %>
on-success:
- t2
t2:
action: std.echo output="Task 2"
"""
WITH_ITEMS_WORKBOOK_CONCURRENCY = """
---
@ -192,6 +237,143 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(1, len(task_3_action_exs))
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 10', # Mock task10 success for first run.
exc.ActionException(), # Mock task21 exception for first run.
'Task 21', # Mock task21 success for rerun.
'Task 22', # Mock task22 success.
'Task 30' # Mock task30 success.
]
)
)
def test_rerun_diff_env_vars(self):
wb_service.create_workbook_v2(SIMPLE_WORKBOOK_DIFF_ENV_VAR)
# Initial environment variables for the workflow execution.
env = {
'var1': 'fee fi fo fum',
'var2': 'mirror mirror',
'var3': 'heigh-ho heigh-ho'
}
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {}, env=env)
self._await(lambda: self.is_execution_error(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
task_exs = wf_ex.task_executions
task_10_ex = self._assert_single_item(task_exs, name='t10')
task_21_ex = self._assert_single_item(task_exs, name='t21')
task_30_ex = self._assert_single_item(task_exs, name='t30')
self.assertEqual(states.SUCCESS, task_10_ex.state)
self.assertEqual(states.ERROR, task_21_ex.state)
self.assertIsNotNone(task_21_ex.state_info)
self.assertEqual(states.WAITING, task_30_ex.state)
# Update env in workflow execution with the following.
updated_env = {
'var1': 'Task 21',
'var2': 'Task 22',
'var3': 'Task 30'
}
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_21_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
# Wait for the workflow to succeed.
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(4, len(wf_ex.task_executions))
task_exs = wf_ex.task_executions
task_10_ex = self._assert_single_item(task_exs, name='t10')
task_21_ex = self._assert_single_item(task_exs, name='t21')
task_22_ex = self._assert_single_item(task_exs, name='t22')
task_30_ex = self._assert_single_item(task_exs, name='t30')
# Check action executions of task 10.
self.assertEqual(states.SUCCESS, task_10_ex.state)
task_10_action_exs = db_api.get_action_executions(
task_execution_id=task_10_ex.id)
self.assertEqual(1, len(task_10_action_exs))
self.assertEqual(states.SUCCESS, task_10_action_exs[0].state)
self.assertDictEqual(
{'output': 'Task 10'},
task_10_action_exs[0].input
)
# Check action executions of task 21.
self.assertEqual(states.SUCCESS, task_21_ex.state)
self.assertIsNone(task_21_ex.state_info)
task_21_action_exs = db_api.get_action_executions(
task_execution_id=task_21_ex.id)
self.assertEqual(2, len(task_21_action_exs))
self.assertEqual(states.ERROR, task_21_action_exs[0].state)
self.assertEqual(states.SUCCESS, task_21_action_exs[1].state)
self.assertDictEqual(
{'output': env['var1']},
task_21_action_exs[0].input
)
self.assertDictEqual(
{'output': updated_env['var1']},
task_21_action_exs[1].input
)
# Check action executions of task 22.
self.assertEqual(states.SUCCESS, task_22_ex.state)
task_22_action_exs = db_api.get_action_executions(
task_execution_id=task_22_ex.id)
self.assertEqual(1, len(task_22_action_exs))
self.assertEqual(states.SUCCESS, task_22_action_exs[0].state)
self.assertDictEqual(
{'output': updated_env['var2']},
task_22_action_exs[0].input
)
# Check action executions of task 30.
self.assertEqual(states.SUCCESS, task_30_ex.state)
task_30_action_exs = db_api.get_action_executions(
task_execution_id=task_30_ex.id)
self.assertEqual(1, len(task_30_action_exs))
self.assertEqual(states.SUCCESS, task_30_action_exs[0].state)
self.assertDictEqual(
{'output': updated_env['var3']},
task_30_action_exs[0].input
)
@mock.patch.object(
std_actions.EchoAction,
'run',
@ -379,6 +561,106 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(1, len(task_2_action_exs))
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
exc.ActionException(), # Mock task1 exception for initial run.
'Task 1.1', # Mock task1 success for initial run.
exc.ActionException(), # Mock task1 exception for initial run.
'Task 1.0', # Mock task1 success for rerun.
'Task 1.2', # Mock task1 success for rerun.
'Task 2' # Mock task2 success.
]
)
)
def test_rerun_with_items_diff_env_vars(self):
wb_service.create_workbook_v2(WITH_ITEMS_WORKBOOK_DIFF_ENV_VAR)
# Initial environment variables for the workflow execution.
env = {
'var1': 'fee fi fo fum'
}
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb3.wf1', {}, env=env)
self._await(lambda: self.is_execution_error(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(1, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
self.assertEqual(states.ERROR, task_1_ex.state)
self.assertIsNotNone(task_1_ex.state_info)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
self.assertEqual(3, len(task_1_action_exs))
# Update env in workflow execution with the following.
updated_env = {
'var1': 'foobar'
}
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(
wf_ex.id,
task_1_ex.id,
reset=False,
env=updated_env
)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self._await(lambda: self.is_execution_success(wf_ex.id), delay=10)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertIsNone(task_1_ex.state_info)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
expected_inputs = [
'Task 1.0 [%s]' % env['var1'], # Task 1 item 0 (error).
'Task 1.1 [%s]' % env['var1'], # Task 1 item 1.
'Task 1.2 [%s]' % env['var1'], # Task 1 item 2 (error).
'Task 1.0 [%s]' % updated_env['var1'], # Task 1 item 0 (rerun).
'Task 1.2 [%s]' % updated_env['var1'] # Task 1 item 2 (rerun).
]
result = zip(task_1_action_exs, expected_inputs)
for (action_ex, expected_input) in result:
self.assertDictEqual(
{'output': expected_input},
action_ex.input
)
# Check action executions of task 2.
self.assertEqual(states.SUCCESS, task_2_ex.state)
task_2_action_exs = db_api.get_action_executions(
task_execution_id=task_2_ex.id)
self.assertEqual(1, len(task_2_action_exs))
@mock.patch.object(
std_actions.EchoAction,
'run',

View File

@ -49,6 +49,26 @@ workflows:
- t2
"""
SIMPLE_WORKBOOK_DIFF_ENV_VAR = """
---
version: '2.0'
name: wb1
workflows:
wf1:
type: reverse
tasks:
t1:
action: std.echo output="Task 1"
t2:
action: std.echo output=<% env().var1 %>
requires:
- t1
t3:
action: std.echo output=<% env().var2 %>
requires:
- t2
"""
class ReverseWorkflowRerunTest(base.EngineTestCase):
@ -131,6 +151,127 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
self.assertEqual(1, len(task_3_action_exs))
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 1', # Mock task1 success for initial run.
exc.ActionException(), # Mock task2 exception for initial run.
'Task 2', # Mock task2 success for rerun.
'Task 3' # Mock task3 success.
]
)
)
def test_rerun_diff_env_vars(self):
wb_service.create_workbook_v2(SIMPLE_WORKBOOK_DIFF_ENV_VAR)
# Initial environment variables for the workflow execution.
env = {
'var1': 'fee fi fo fum',
'var2': 'foobar'
}
# Run workflow and fail task.
wf_ex = self.engine.start_workflow(
'wb1.wf1',
{},
task_name='t3',
env=env
)
self._await(lambda: self.is_execution_error(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIsNotNone(wf_ex.state_info)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.ERROR, task_2_ex.state)
self.assertIsNotNone(task_2_ex.state_info)
# Update env in workflow execution with the following.
updated_env = {
'var1': 'Task 2',
'var2': 'Task 3'
}
# Resume workflow and re-run failed task.
self.engine.rerun_workflow(wf_ex.id, task_2_ex.id, env=updated_env)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.RUNNING, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
# Wait for the workflow to succeed.
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.SUCCESS, wf_ex.state)
self.assertIsNone(wf_ex.state_info)
self.assertEqual(3, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
task_2_ex = self._assert_single_item(wf_ex.task_executions, name='t2')
task_3_ex = self._assert_single_item(wf_ex.task_executions, name='t3')
# Check action executions of task 1.
self.assertEqual(states.SUCCESS, task_1_ex.state)
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)
self.assertDictEqual(
{'output': 'Task 1'},
task_1_action_exs[0].input
)
# Check action executions of task 2.
self.assertEqual(states.SUCCESS, task_2_ex.state)
self.assertIsNone(task_2_ex.state_info)
task_2_action_exs = db_api.get_action_executions(
task_execution_id=task_2_ex.id)
self.assertEqual(2, len(task_2_action_exs))
self.assertEqual(states.ERROR, task_2_action_exs[0].state)
self.assertEqual(states.SUCCESS, task_2_action_exs[1].state)
self.assertDictEqual(
{'output': env['var1']},
task_2_action_exs[0].input
)
self.assertDictEqual(
{'output': updated_env['var1']},
task_2_action_exs[1].input
)
# Check action executions of task 3.
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_action_exs = db_api.get_action_executions(
task_execution_id=task_3_ex.id)
self.assertEqual(1, len(task_3_action_exs))
self.assertEqual(states.SUCCESS, task_3_action_exs[0].state)
self.assertDictEqual(
{'output': updated_env['var2']},
task_3_action_exs[0].input
)
@mock.patch.object(
std_actions.EchoAction,
'run',

View File

@ -21,6 +21,7 @@ from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base
from mistral.workbook import parser as spec_parser
from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils
@ -52,6 +53,33 @@ workflows:
"""
RESUME_WORKBOOK_DIFF_ENV_VAR = """
---
version: '2.0'
name: wb
workflows:
wf1:
type: direct
tasks:
task1:
action: std.echo output="Hi!"
on-complete:
- task2
task2:
action: std.echo output=<% env().var1 %>
pause-before: true
on-complete:
- task3
task3:
action: std.echo output=<% env().var2 %>
"""
RESUME_WORKBOOK_REVERSE = """
---
version: '2.0'
@ -350,3 +378,75 @@ class WorkflowResumeTest(base.EngineTestCase):
)
mock_fw.assert_called_once_with(wf_ex.id, err)
def test_resume_diff_env_vars(self):
wb_service.create_workbook_v2(RESUME_WORKBOOK_DIFF_ENV_VAR)
# Initial environment variables for the workflow execution.
env = {
'var1': 'fee fi fo fum',
'var2': 'foobar'
}
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {}, env=env)
self._await(lambda: self.is_execution_paused(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_1_ex = self._assert_single_item(
wf_ex.task_executions,
name='task1'
)
task_2_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
self.assertEqual(states.PAUSED, wf_ex.state)
self.assertEqual(2, len(wf_ex.task_executions))
self.assertDictEqual(env, wf_ex.params['env'])
self.assertDictEqual(env, wf_ex.context['__env'])
self.assertEqual(states.SUCCESS, task_1_ex.state)
self.assertEqual(states.IDLE, task_2_ex.state)
# Update env in workflow execution with the following.
updated_env = {
'var1': 'Task 2',
'var2': 'Task 3'
}
# Update the env variables and resume workflow.
self.engine.resume_workflow(wf_ex.id, env=updated_env)
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertDictEqual(updated_env, wf_ex.params['env'])
self.assertDictEqual(updated_env, wf_ex.context['__env'])
self.assertEqual(3, len(wf_ex.task_executions))
# Check result of task2.
task_2_ex = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
self.assertEqual(states.SUCCESS, task_2_ex.state)
task_2_result = data_flow.get_task_execution_result(task_2_ex)
self.assertEqual(updated_env['var1'], task_2_result)
# Check result of task3.
task_3_ex = self._assert_single_item(
wf_ex.task_executions,
name='task3'
)
self.assertEqual(states.SUCCESS, task_3_ex.state)
task_3_result = data_flow.get_task_execution_result(task_3_ex)
self.assertEqual(updated_env['var2'], task_3_result)

View File

@ -1,4 +1,5 @@
# Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@ -12,14 +13,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import copy
from oslo_config import cfg
from oslo_log import log as logging
from mistral.db.v2.sqlalchemy import api as db_api
from mistral import exceptions as exc
from mistral.services import workflows as wf_service
from mistral.tests.unit import base
from mistral import utils
from mistral.workbook import parser as spec_parser
from mistral.workflow import states
LOG = logging.getLogger(__name__)
@ -177,3 +182,102 @@ class WorkflowServiceTest(base.DbTestCase):
)
self.assertIn("Invalid DSL", exception.message)
def test_update_workflow_execution_env(self):
wf_exec_template = {
'spec': {},
'start_params': {'task': 'my_task1'},
'state': 'PAUSED',
'state_info': None,
'params': {'env': {'k1': 'abc'}},
'created_at': None,
'updated_at': None,
'context': {'__env': {'k1': 'fee fi fo fum'}},
'task_id': None,
'trust_id': None,
'description': None,
'output': None
}
states_permitted = [
states.IDLE,
states.PAUSED,
states.ERROR
]
update_env = {'k1': 'foobar'}
for state in states_permitted:
wf_exec = copy.deepcopy(wf_exec_template)
wf_exec['state'] = state
with db_api.transaction():
created = db_api.create_workflow_execution(wf_exec)
self.assertIsNone(created.updated_at)
updated = wf_service.update_workflow_execution_env(
created,
update_env
)
self.assertDictEqual(update_env, updated.params['env'])
self.assertDictEqual(update_env, updated.context['__env'])
fetched = db_api.get_workflow_execution(created.id)
self.assertEqual(updated, fetched)
self.assertIsNotNone(fetched.updated_at)
def test_update_workflow_execution_env_wrong_state(self):
wf_exec_template = {
'spec': {},
'start_params': {'task': 'my_task1'},
'state': 'PAUSED',
'state_info': None,
'params': {'env': {'k1': 'abc'}},
'created_at': None,
'updated_at': None,
'context': {'__env': {'k1': 'fee fi fo fum'}},
'task_id': None,
'trust_id': None,
'description': None,
'output': None
}
states_not_permitted = [
states.RUNNING,
states.RUNNING_DELAYED,
states.SUCCESS,
states.WAITING
]
update_env = {'k1': 'foobar'}
for state in states_not_permitted:
wf_exec = copy.deepcopy(wf_exec_template)
wf_exec['state'] = state
with db_api.transaction():
created = db_api.create_workflow_execution(wf_exec)
self.assertIsNone(created.updated_at)
self.assertRaises(
exc.NotAllowedException,
wf_service.update_workflow_execution_env,
created,
update_env
)
fetched = db_api.get_workflow_execution(created.id)
self.assertDictEqual(
wf_exec['params']['env'],
fetched.params['env']
)
self.assertDictEqual(
wf_exec['context']['__env'],
fetched.context['__env']
)

View File

@ -48,7 +48,18 @@ class WorkflowController(object):
self.wf_ex = wf_ex
self.wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
def continue_workflow(self, task_ex=None, reset=True):
def _update_task_ex_env(self, task_ex, env):
if not env:
return task_ex
task_ex.in_context['__env'] = u.merge_dicts(
task_ex.in_context['__env'],
env
)
return task_ex
def continue_workflow(self, task_ex=None, reset=True, env=None):
"""Calculates a list of commands to continue the workflow.
Given a workflow specification this method makes required analysis
@ -57,6 +68,7 @@ class WorkflowController(object):
:param: task_ex: Task execution to rerun.
:param: reset: If true, then purge action executions for the tasks.
:param env: A set of environment variables to overwrite.
:return: List of workflow commands (instances of
mistral.workflow.commands.WorkflowCommand).
"""
@ -64,9 +76,9 @@ class WorkflowController(object):
return []
if task_ex:
return self._get_rerun_commands([task_ex], reset)
return self._get_rerun_commands([task_ex], reset, env=env)
return self._find_next_commands()
return self._find_next_commands(env=env)
@abc.abstractmethod
def is_error_handled_for(self, task_ex):
@ -97,11 +109,21 @@ class WorkflowController(object):
def _get_task_inbound_context(self, task_spec):
upstream_task_execs = self._get_upstream_task_executions(task_spec)
return u.merge_dicts(
upstream_ctx = data_flow.evaluate_upstream_context(upstream_task_execs)
ctx = u.merge_dicts(
copy.deepcopy(self.wf_ex.context),
data_flow.evaluate_upstream_context(upstream_task_execs)
upstream_ctx
)
if self.wf_ex.context:
ctx['__env'] = u.merge_dicts(
copy.deepcopy(upstream_ctx.get('__env', {})),
copy.deepcopy(self.wf_ex.context.get('__env', {}))
)
return ctx
@abc.abstractmethod
def _get_upstream_task_executions(self, task_spec):
"""Gets workflow upstream tasks for the given task.
@ -112,11 +134,13 @@ class WorkflowController(object):
raise NotImplementedError
@abc.abstractmethod
def _find_next_commands(self):
def _find_next_commands(self, env=None):
"""Finds commands that should run next.
A concrete algorithm of finding such tasks depends on a concrete
workflow controller.
:param env: A set of environment variables to overwrite.
:return: List of workflow commands.
"""
# Add all tasks in IDLE state.
@ -125,15 +149,22 @@ class WorkflowController(object):
states.IDLE
)
for task_ex in idle_tasks:
self._update_task_ex_env(task_ex, env)
return [commands.RunExistingTask(t) for t in idle_tasks]
def _get_rerun_commands(self, task_exs, reset=True):
def _get_rerun_commands(self, task_exs, reset=True, env=None):
"""Get commands to rerun existing task executions.
:param task_exs: List of task executions.
:param reset: If true, then purge action executions for the tasks.
:param env: A set of environment variables to overwrite.
:return: List of workflow commands.
"""
for task_ex in task_exs:
self._update_task_ex_env(task_ex, env)
cmds = [commands.RunExistingTask(t_e, reset) for t_e in task_exs]
LOG.debug("Found commands: %s" % cmds)

View File

@ -65,8 +65,10 @@ class DirectWorkflowController(base.WorkflowController):
self.wf_spec.get_tasks()[t_ex_candidate.name]
)
def _find_next_commands(self):
cmds = super(DirectWorkflowController, self)._find_next_commands()
def _find_next_commands(self, env=None):
cmds = super(DirectWorkflowController, self)._find_next_commands(
env=env
)
if not self.wf_ex.task_executions:
return self._find_start_commands()

View File

@ -40,13 +40,15 @@ class ReverseWorkflowController(base.WorkflowController):
__workflow_type__ = "reverse"
def _find_next_commands(self):
def _find_next_commands(self, env=None):
"""Finds all tasks with resolved dependencies.
This method finds all tasks with resolved dependencies and
returns them in the form of workflow commands.
"""
cmds = super(ReverseWorkflowController, self)._find_next_commands()
cmds = super(ReverseWorkflowController, self)._find_next_commands(
env=env
)
task_specs = self._find_task_specs_with_satisfied_dependencies()