diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 66978d0ec..a673433dd 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -138,9 +138,12 @@ class Workflow(object): wf_ctrl = wf_base.get_controller(self.wf_ex) # Calculate commands to process next. - cmds = wf_ctrl.continue_workflow(env) + cmds = wf_ctrl.continue_workflow() - # TODO(rakhmerov): How to update environment correctly? + if env: + for cmd in cmds: + if isinstance(cmd, commands.RunExistingTask): + _update_task_environment(cmd.task_ex, env) self._continue_workflow(cmds) @@ -159,11 +162,7 @@ class Workflow(object): self.set_state(states.RUNNING, recursive=True) - if env: - task_ex.in_context['__env'] = utils.merge_dicts( - task_ex.in_context['__env'], - env - ) + _update_task_environment(task_ex, env) wf_ctrl = wf_base.get_controller(self.wf_ex) @@ -356,6 +355,16 @@ class Workflow(object): ) +def _update_task_environment(task_ex, env): + if env is None: + return + + task_ex.in_context['__env'] = utils.merge_dicts( + task_ex.in_context['__env'], + env + ) + + def _get_environment(params): env = params.get('env', {}) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index eeead51e2..06d72d058 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -85,36 +85,29 @@ class WorkflowController(object): self.wf_spec = wf_spec - @staticmethod - def _update_task_ex_env(task_ex, env): - if not env: - return task_ex - - task_ex.in_context['__env'] = u.merge_dicts( - task_ex.in_context['__env'], - env - ) - - return task_ex - @profiler.trace('workflow-controller-continue-workflow') - def continue_workflow(self, env=None): + def continue_workflow(self): """Calculates a list of commands to continue the workflow. Given a workflow specification this method makes required analysis according to this workflow type rules and identifies a list of commands needed to continue the workflow. - :param env: A set of environment variables to overwrite. :return: List of workflow commands (instances of mistral.workflow.commands.WorkflowCommand). """ + + # TODO(rakhmerov): We now use this method for two cases: + # 1) to handle task completion + # 2) to resume a workflow after it's been paused + # Moving forward we need to introduce a separate method for + # resuming a workflow because it won't be operating with + # any concrete tasks that caused this operation. + if self._is_paused_or_completed(): return [] - # TODO(rakhmerov): Get rid of 'env' parameter completely, WF controller - # should not be updating any DB objects. - return self._find_next_commands(env=env) + return self._find_next_commands() def rerun_tasks(self, task_execs, reset=True): """Gets commands to rerun existing task executions. @@ -197,13 +190,12 @@ class WorkflowController(object): raise NotImplementedError @abc.abstractmethod - def _find_next_commands(self, env=None): + def _find_next_commands(self): """Finds commands that should run next. A concrete algorithm of finding such tasks depends on a concrete workflow controller. - :param env: A set of environment variables to overwrite. :return: List of workflow commands. """ # Add all tasks in IDLE state. @@ -212,11 +204,6 @@ class WorkflowController(object): states.IDLE ) - # TODO(rakhmerov): We should not be updating any DB objects in a WF - # controller. - for task_ex in idle_tasks: - self._update_task_ex_env(task_ex, env) - return [ commands.RunExistingTask(self.wf_ex, self.wf_spec, t) for t in idle_tasks diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index cbb1d7d23..3ef8990e1 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -65,10 +65,8 @@ class DirectWorkflowController(base.WorkflowController): self.wf_spec.get_tasks()[t_ex_candidate.name] ) - def _find_next_commands(self, env=None): - cmds = super(DirectWorkflowController, self)._find_next_commands( - env=env - ) + def _find_next_commands(self): + cmds = super(DirectWorkflowController, self)._find_next_commands() if not self.wf_ex.task_executions: return self._find_start_commands() diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index 5848f5db0..c94a1195a 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -40,15 +40,13 @@ class ReverseWorkflowController(base.WorkflowController): __workflow_type__ = "reverse" - def _find_next_commands(self, env=None): + def _find_next_commands(self): """Finds all tasks with resolved dependencies. This method finds all tasks with resolved dependencies and returns them in the form of workflow commands. """ - cmds = super(ReverseWorkflowController, self)._find_next_commands( - env=env - ) + cmds = super(ReverseWorkflowController, self)._find_next_commands() task_specs = self._find_task_specs_with_satisfied_dependencies()