diff --git a/mistral/engine/tasks.py b/mistral/engine/tasks.py index fbeeccb0..d850ce25 100644 --- a/mistral/engine/tasks.py +++ b/mistral/engine/tasks.py @@ -14,7 +14,6 @@ # limitations under the License. import abc -import copy import operator from oslo_log import log as logging from osprofiler import profiler @@ -241,7 +240,7 @@ class Task(object): if not action_name: return {} - env = self.task_ex.in_context.get('__env', {}) + env = self.wf_ex.context.get('__env', {}) return env.get('__actions', {}).get(action_name, {}) @@ -351,12 +350,16 @@ class RegularTask(Task): ) def _get_target(self, input_dict): + ctx_view = data_flow.ContextView( + input_dict, + self.ctx, + self.wf_ex.context, + self.wf_ex.input + ) + return expr.evaluate_recursively( self.task_spec.get_target(), - utils.merge_dicts( - copy.deepcopy(input_dict), - copy.deepcopy(self.ctx) - ) + ctx_view ) def _get_action_input(self, ctx=None): diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index d2e6ffda..9a53a94e 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -140,11 +140,6 @@ class Workflow(object): # Calculate commands to process next. cmds = wf_ctrl.continue_workflow() - if env: - for cmd in cmds: - if isinstance(cmd, commands.RunExistingTask): - _update_task_environment(cmd.task_ex, env) - self._continue_workflow(cmds) def rerun(self, task_ex, reset=True, env=None): @@ -167,8 +162,6 @@ class Workflow(object): self.set_state(states.RUNNING, recursive=True) - _update_task_environment(task_ex, env) - wf_ctrl = wf_base.get_controller(self.wf_ex) # Calculate commands to process next. @@ -379,16 +372,6 @@ class Workflow(object): ) -def _update_task_environment(task_ex, env): - if env is None: - return - - task_ex.in_context['__env'] = utils.merge_dicts( - task_ex.in_context['__env'], - env - ) - - def _get_environment(params): env = params.get('env', {}) diff --git a/mistral/services/workflows.py b/mistral/services/workflows.py index 649edc1c..ebfed895 100644 --- a/mistral/services/workflows.py +++ b/mistral/services/workflows.py @@ -116,6 +116,7 @@ def update_workflow_execution_env(wf_ex, env): ) wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env) + data_flow.add_environment_to_context(wf_ex) return wf_ex diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index 3015a7d6..cc7d5dd0 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -16,7 +16,6 @@ # limitations under the License. import abc -import copy from oslo_log import log as logging from osprofiler import profiler @@ -182,19 +181,7 @@ class WorkflowController(object): # to cover 'split' (aka 'merge') use case. upstream_task_execs = self._get_upstream_task_executions(task_spec) - ctx = data_flow.evaluate_upstream_context(upstream_task_execs) - - # TODO(rakhmerov): Seems like we can fully get rid of '__env' in - # task context if we are OK to have it only in workflow execution - # object (wf_ex.context). Now we can selectively modify env - # for some tasks if we resume or re-run a workflow. - if self.wf_ex.context: - ctx['__env'] = u.merge_dicts( - copy.deepcopy(ctx.get('__env', {})), - copy.deepcopy(self.wf_ex.context.get('__env', {})) - ) - - return ctx + return data_flow.evaluate_upstream_context(upstream_task_execs) @abc.abstractmethod def _get_upstream_task_executions(self, task_spec):