Implement workflow execution environment - part 3

Environment is recursively self evaluated at the start of the workflow
to support expressions that reference other environment variables.
Environment is evaluated for action inputs, executor target, and
published outputs.

Change-Id: I12b9d0f42b9aa8c338cd40bab33285ff98859778
Implements: blueprint mistral-execution-environment
This commit is contained in:
W Chan 2015-01-13 16:59:23 -08:00
parent 318c097c5c
commit dd75a4c6f6
5 changed files with 230 additions and 10 deletions

View File

@ -199,6 +199,7 @@ class DefaultEngine(base.Engine):
data_flow.add_openstack_data_to_context(wf_db, exec_db.context) data_flow.add_openstack_data_to_context(wf_db, exec_db.context)
data_flow.add_execution_to_context(exec_db, exec_db.context) data_flow.add_execution_to_context(exec_db, exec_db.context)
data_flow.add_environment_to_context(exec_db, exec_db.context)
return exec_db return exec_db

View File

@ -0,0 +1,178 @@
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo.config import cfg
from mistral.db.v2 import api as db_api
from mistral.engine1 import rpc
from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine1 import base
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
TARGET = '10.1.15.251'
WORKBOOK = """
---
version: '2.0'
name: my_wb
workflows:
wf1:
type: reverse
input:
- param1
- param2
output:
final_result: $.final_result
tasks:
task1:
action: std.echo output='{$.param1}'
target: $.__env.var1
publish:
result1: $
task2:
action: std.echo output="'{$.result1} & {$.param2}'"
target: $.__env.var1
publish:
final_result: $
requires: [task1]
wf2:
type: direct
output:
slogan: $.slogan
tasks:
task1:
workflow: wf1
input:
param1: $.__env.var2
param2: $.__env.var3
task_name: "task2"
publish:
slogan: "{$.final_result} is a cool {$.__env.var4}!"
"""
def _run_at_target(task_id, action_class_str, attributes,
action_params, target=None):
kwargs = {
'task_id': task_id,
'action_class_str': action_class_str,
'attributes': attributes,
'params': action_params
}
rpc_client = rpc.get_executor_client()
rpc_client._cast_run_action(rpc_client.topic, **kwargs)
MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target)
class SubworkflowsTest(base.EngineTestCase):
def setUp(self):
super(SubworkflowsTest, self).setUp()
wb_service.create_workbook_v2(WORKBOOK)
@mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET)
def _test_subworkflow(self, env):
exec1_db = self.engine.start_workflow('my_wb.wf2',
None,
environment=env)
# Execution 1.
self.assertIsNotNone(exec1_db)
self.assertDictEqual({}, exec1_db.input)
self.assertDictEqual({'environment': env}, exec1_db.start_params)
db_execs = db_api.get_executions()
self.assertEqual(2, len(db_execs))
# Execution 2.
if db_execs[0].id != exec1_db.id:
exec2_db = db_execs[0]
else:
exec2_db = db_execs[1]
expected_start_params = {
'task_name': 'task2',
'parent_task_id': exec2_db.parent_task_id,
'environment': env
}
expected_wf1_input = {
'param1': 'Bonnie',
'param2': 'Clyde'
}
self.assertIsNotNone(exec2_db.parent_task_id)
self.assertDictEqual(exec2_db.start_params, expected_start_params)
self.assertDictEqual(exec2_db.input, expected_wf1_input)
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_success(exec2_db.id))
exec2_db = db_api.get_execution(exec2_db.id)
expected_wf1_output = {'final_result': "'Bonnie & Clyde'"}
self.assertDictEqual(exec2_db.output, expected_wf1_output)
# Wait till workflow 'wf2' is completed.
self._await(lambda: self.is_execution_success(exec1_db.id))
exec1_db = db_api.get_execution(exec1_db.id)
expected_wf2_output = {'slogan': "'Bonnie & Clyde' is a cool movie!"}
self.assertDictEqual(exec1_db.output, expected_wf2_output)
# Check if target is resolved.
tasks_exec2 = db_api.get_tasks(execution_id=exec2_db.id)
self._assert_single_item(tasks_exec2, name="task1")
self._assert_single_item(tasks_exec2, name="task2")
for task in tasks_exec2:
rpc.ExecutorClient.run_action.assert_any_call(
task.id, 'mistral.actions.std_actions.EchoAction', {},
task.input, TARGET)
def test_subworkflow_env_task_input(self):
env = {
'var1': TARGET,
'var2': 'Bonnie',
'var3': 'Clyde',
'var4': 'movie'
}
self._test_subworkflow(env)
def test_subworkflow_env_recursive(self):
env = {
'var1': TARGET,
'var2': 'Bonnie',
'var3': '{$.__env.var5}',
'var4': 'movie',
'var5': 'Clyde'
}
self._test_subworkflow(env)

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc. # Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -210,3 +209,26 @@ class ExpressionsTest(base.BaseTest):
}, },
applied applied
) )
def test_evaluate_recursively_environment(self):
environment = {
'host': 'vm1234.example.com',
'db': 'test',
'timeout': 600,
'verbose': True,
'__actions': {
'std.sql': {
'conn': 'mysql://admin:secrete@{$.__env.host}/{$.__env.db}'
}
}
}
context = {
'__env': environment
}
defaults = context['__env']['__actions']['std.sql']
applied = expr.evaluate_recursively(defaults, context)
expected = 'mysql://admin:secrete@vm1234.example.com/test'
self.assertEqual(applied['conn'], expected)

View File

@ -1,4 +1,5 @@
# Copyright 2014 - Mirantis, Inc. # Copyright 2014 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -134,10 +135,10 @@ class WorkflowHandler(object):
if with_items_spec: if with_items_spec:
return with_items.get_output( return with_items.get_output(
task_db, task_spec, raw_result task_db, task_spec, raw_result)
)
else: else:
return data_flow.evaluate_task_output(task_spec, raw_result) return data_flow.evaluate_task_output(
task_db, task_spec, raw_result)
@staticmethod @staticmethod
def _determine_task_state(task_db, task_spec, raw_result): def _determine_task_state(task_db, task_spec, raw_result):

View File

@ -1,6 +1,5 @@
# -*- coding: utf-8 -*-
#
# Copyright 2013 - Mirantis, Inc. # Copyright 2013 - Mirantis, Inc.
# Copyright 2015 - StackStorm, Inc.
# #
# Licensed under the Apache License, Version 2.0 (the "License"); # Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License. # you may not use this file except in compliance with the License.
@ -84,9 +83,10 @@ def _evaluate_upstream_context(upstream_db_tasks):
return ctx return ctx
def evaluate_task_output(task_spec, raw_result): def evaluate_task_output(task_db, task_spec, raw_result):
"""Evaluates task output given a raw task result from action/workflow. """Evaluates task output given a raw task result from action/workflow.
:param task_db: DB task
:param task_spec: Task specification :param task_spec: Task specification
:param raw_result: Raw task result that comes from action/workflow :param raw_result: Raw task result that comes from action/workflow
(before publisher). Instance of mistral.workflow.base.TaskResult (before publisher). Instance of mistral.workflow.base.TaskResult
@ -94,8 +94,13 @@ def evaluate_task_output(task_spec, raw_result):
""" """
publish_dict = task_spec.get_publish() publish_dict = task_spec.get_publish()
# Evaluate 'publish' clause using raw result as a context. # Combine the raw result with the environment variables as the context
output = expr.evaluate_recursively(publish_dict, raw_result.data) or {} # for evaulating the 'publish' clause.
out_context = copy.deepcopy(raw_result.data) or {}
if (task_db.in_context and '__env' in task_db.in_context and
isinstance(out_context, dict)):
out_context['__env'] = task_db.in_context['__env']
output = expr.evaluate_recursively(publish_dict, out_context) or {}
# Add same result to task output under key 'task'. # Add same result to task output under key 'task'.
output['task'] = { output['task'] = {
@ -170,3 +175,16 @@ def add_execution_to_context(exec_db, context):
} }
return context return context
def add_environment_to_context(exec_db, context):
if context is None:
context = {}
# If env variables are provided, add an evaluated copy into the context.
if 'environment' in exec_db.start_params:
env = copy.deepcopy(exec_db.start_params['environment'])
# An env variable can be an expression of other env variables.
context['__env'] = expr.evaluate_recursively(env, {'__env': env})
return context