From dd75a4c6f66f0cb0ce05e628044930e8305d855c Mon Sep 17 00:00:00 2001 From: W Chan Date: Tue, 13 Jan 2015 16:59:23 -0800 Subject: [PATCH] Implement workflow execution environment - part 3 Environment is recursively self evaluated at the start of the workflow to support expressions that reference other environment variables. Environment is evaluated for action inputs, executor target, and published outputs. Change-Id: I12b9d0f42b9aa8c338cd40bab33285ff98859778 Implements: blueprint mistral-execution-environment --- mistral/engine1/default_engine.py | 1 + .../tests/unit/engine1/test_environment.py | 178 ++++++++++++++++++ mistral/tests/unit/test_expressions.py | 26 ++- mistral/workflow/base.py | 7 +- mistral/workflow/data_flow.py | 28 ++- 5 files changed, 230 insertions(+), 10 deletions(-) create mode 100644 mistral/tests/unit/engine1/test_environment.py diff --git a/mistral/engine1/default_engine.py b/mistral/engine1/default_engine.py index d9ebe838..03908ef9 100644 --- a/mistral/engine1/default_engine.py +++ b/mistral/engine1/default_engine.py @@ -199,6 +199,7 @@ class DefaultEngine(base.Engine): data_flow.add_openstack_data_to_context(wf_db, exec_db.context) data_flow.add_execution_to_context(exec_db, exec_db.context) + data_flow.add_environment_to_context(exec_db, exec_db.context) return exec_db diff --git a/mistral/tests/unit/engine1/test_environment.py b/mistral/tests/unit/engine1/test_environment.py new file mode 100644 index 00000000..31b1a13d --- /dev/null +++ b/mistral/tests/unit/engine1/test_environment.py @@ -0,0 +1,178 @@ +# Copyright 2015 - StackStorm, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import mock +from oslo.config import cfg + +from mistral.db.v2 import api as db_api +from mistral.engine1 import rpc +from mistral.openstack.common import log as logging +from mistral.services import workbooks as wb_service +from mistral.tests.unit.engine1 import base + +LOG = logging.getLogger(__name__) + +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +TARGET = '10.1.15.251' + +WORKBOOK = """ +--- +version: '2.0' +name: my_wb +workflows: + wf1: + type: reverse + input: + - param1 + - param2 + output: + final_result: $.final_result + tasks: + task1: + action: std.echo output='{$.param1}' + target: $.__env.var1 + publish: + result1: $ + task2: + action: std.echo output="'{$.result1} & {$.param2}'" + target: $.__env.var1 + publish: + final_result: $ + requires: [task1] + wf2: + type: direct + output: + slogan: $.slogan + tasks: + task1: + workflow: wf1 + input: + param1: $.__env.var2 + param2: $.__env.var3 + task_name: "task2" + publish: + slogan: "{$.final_result} is a cool {$.__env.var4}!" +""" + + +def _run_at_target(task_id, action_class_str, attributes, + action_params, target=None): + kwargs = { + 'task_id': task_id, + 'action_class_str': action_class_str, + 'attributes': attributes, + 'params': action_params + } + + rpc_client = rpc.get_executor_client() + rpc_client._cast_run_action(rpc_client.topic, **kwargs) + + +MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target) + + +class SubworkflowsTest(base.EngineTestCase): + def setUp(self): + super(SubworkflowsTest, self).setUp() + + wb_service.create_workbook_v2(WORKBOOK) + + @mock.patch.object(rpc.ExecutorClient, 'run_action', MOCK_RUN_AT_TARGET) + def _test_subworkflow(self, env): + exec1_db = self.engine.start_workflow('my_wb.wf2', + None, + environment=env) + + # Execution 1. + self.assertIsNotNone(exec1_db) + self.assertDictEqual({}, exec1_db.input) + self.assertDictEqual({'environment': env}, exec1_db.start_params) + + db_execs = db_api.get_executions() + + self.assertEqual(2, len(db_execs)) + + # Execution 2. + if db_execs[0].id != exec1_db.id: + exec2_db = db_execs[0] + else: + exec2_db = db_execs[1] + + expected_start_params = { + 'task_name': 'task2', + 'parent_task_id': exec2_db.parent_task_id, + 'environment': env + } + + expected_wf1_input = { + 'param1': 'Bonnie', + 'param2': 'Clyde' + } + + self.assertIsNotNone(exec2_db.parent_task_id) + self.assertDictEqual(exec2_db.start_params, expected_start_params) + self.assertDictEqual(exec2_db.input, expected_wf1_input) + + # Wait till workflow 'wf1' is completed. + self._await(lambda: self.is_execution_success(exec2_db.id)) + + exec2_db = db_api.get_execution(exec2_db.id) + + expected_wf1_output = {'final_result': "'Bonnie & Clyde'"} + + self.assertDictEqual(exec2_db.output, expected_wf1_output) + + # Wait till workflow 'wf2' is completed. + self._await(lambda: self.is_execution_success(exec1_db.id)) + + exec1_db = db_api.get_execution(exec1_db.id) + + expected_wf2_output = {'slogan': "'Bonnie & Clyde' is a cool movie!"} + + self.assertDictEqual(exec1_db.output, expected_wf2_output) + + # Check if target is resolved. + tasks_exec2 = db_api.get_tasks(execution_id=exec2_db.id) + self._assert_single_item(tasks_exec2, name="task1") + self._assert_single_item(tasks_exec2, name="task2") + + for task in tasks_exec2: + rpc.ExecutorClient.run_action.assert_any_call( + task.id, 'mistral.actions.std_actions.EchoAction', {}, + task.input, TARGET) + + def test_subworkflow_env_task_input(self): + env = { + 'var1': TARGET, + 'var2': 'Bonnie', + 'var3': 'Clyde', + 'var4': 'movie' + } + + self._test_subworkflow(env) + + def test_subworkflow_env_recursive(self): + env = { + 'var1': TARGET, + 'var2': 'Bonnie', + 'var3': '{$.__env.var5}', + 'var4': 'movie', + 'var5': 'Clyde' + } + + self._test_subworkflow(env) diff --git a/mistral/tests/unit/test_expressions.py b/mistral/tests/unit/test_expressions.py index fb80d3bb..b7507a66 100644 --- a/mistral/tests/unit/test_expressions.py +++ b/mistral/tests/unit/test_expressions.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- -# # Copyright 2013 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -210,3 +209,26 @@ class ExpressionsTest(base.BaseTest): }, applied ) + + def test_evaluate_recursively_environment(self): + environment = { + 'host': 'vm1234.example.com', + 'db': 'test', + 'timeout': 600, + 'verbose': True, + '__actions': { + 'std.sql': { + 'conn': 'mysql://admin:secrete@{$.__env.host}/{$.__env.db}' + } + } + } + + context = { + '__env': environment + } + + defaults = context['__env']['__actions']['std.sql'] + applied = expr.evaluate_recursively(defaults, context) + expected = 'mysql://admin:secrete@vm1234.example.com/test' + + self.assertEqual(applied['conn'], expected) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index e113cc37..0f218705 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -1,4 +1,5 @@ # Copyright 2014 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -134,10 +135,10 @@ class WorkflowHandler(object): if with_items_spec: return with_items.get_output( - task_db, task_spec, raw_result - ) + task_db, task_spec, raw_result) else: - return data_flow.evaluate_task_output(task_spec, raw_result) + return data_flow.evaluate_task_output( + task_db, task_spec, raw_result) @staticmethod def _determine_task_state(task_db, task_spec, raw_result): diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 3acb1498..1600d9c4 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -1,6 +1,5 @@ -# -*- coding: utf-8 -*- -# # Copyright 2013 - Mirantis, Inc. +# Copyright 2015 - StackStorm, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -84,9 +83,10 @@ def _evaluate_upstream_context(upstream_db_tasks): return ctx -def evaluate_task_output(task_spec, raw_result): +def evaluate_task_output(task_db, task_spec, raw_result): """Evaluates task output given a raw task result from action/workflow. + :param task_db: DB task :param task_spec: Task specification :param raw_result: Raw task result that comes from action/workflow (before publisher). Instance of mistral.workflow.base.TaskResult @@ -94,8 +94,13 @@ def evaluate_task_output(task_spec, raw_result): """ publish_dict = task_spec.get_publish() - # Evaluate 'publish' clause using raw result as a context. - output = expr.evaluate_recursively(publish_dict, raw_result.data) or {} + # Combine the raw result with the environment variables as the context + # for evaulating the 'publish' clause. + out_context = copy.deepcopy(raw_result.data) or {} + if (task_db.in_context and '__env' in task_db.in_context and + isinstance(out_context, dict)): + out_context['__env'] = task_db.in_context['__env'] + output = expr.evaluate_recursively(publish_dict, out_context) or {} # Add same result to task output under key 'task'. output['task'] = { @@ -170,3 +175,16 @@ def add_execution_to_context(exec_db, context): } return context + + +def add_environment_to_context(exec_db, context): + if context is None: + context = {} + + # If env variables are provided, add an evaluated copy into the context. + if 'environment' in exec_db.start_params: + env = copy.deepcopy(exec_db.start_params['environment']) + # An env variable can be an expression of other env variables. + context['__env'] = expr.evaluate_recursively(env, {'__env': env}) + + return context