Towards non-locking model: removing env update from WF controller
* Removed updating task execution environment from WF controller completely and moving it to Workflow class. It was wrong for a number of reasons. For example, from architectural perspective workflow controller must not update any DB objects, it doesn't have such a responsibility and priviledge. Change-Id: I69a2de97933638d0c4adde443dc67d60893c7d45
This commit is contained in:
@@ -138,9 +138,12 @@ class Workflow(object):
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||
|
||||
# Calculate commands to process next.
|
||||
cmds = wf_ctrl.continue_workflow(env)
|
||||
cmds = wf_ctrl.continue_workflow()
|
||||
|
||||
# TODO(rakhmerov): How to update environment correctly?
|
||||
if env:
|
||||
for cmd in cmds:
|
||||
if isinstance(cmd, commands.RunExistingTask):
|
||||
_update_task_environment(cmd.task_ex, env)
|
||||
|
||||
self._continue_workflow(cmds)
|
||||
|
||||
@@ -159,11 +162,7 @@ class Workflow(object):
|
||||
|
||||
self.set_state(states.RUNNING, recursive=True)
|
||||
|
||||
if env:
|
||||
task_ex.in_context['__env'] = utils.merge_dicts(
|
||||
task_ex.in_context['__env'],
|
||||
env
|
||||
)
|
||||
_update_task_environment(task_ex, env)
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||
|
||||
@@ -356,6 +355,16 @@ class Workflow(object):
|
||||
)
|
||||
|
||||
|
||||
def _update_task_environment(task_ex, env):
|
||||
if env is None:
|
||||
return
|
||||
|
||||
task_ex.in_context['__env'] = utils.merge_dicts(
|
||||
task_ex.in_context['__env'],
|
||||
env
|
||||
)
|
||||
|
||||
|
||||
def _get_environment(params):
|
||||
env = params.get('env', {})
|
||||
|
||||
|
||||
@@ -85,36 +85,29 @@ class WorkflowController(object):
|
||||
|
||||
self.wf_spec = wf_spec
|
||||
|
||||
@staticmethod
|
||||
def _update_task_ex_env(task_ex, env):
|
||||
if not env:
|
||||
return task_ex
|
||||
|
||||
task_ex.in_context['__env'] = u.merge_dicts(
|
||||
task_ex.in_context['__env'],
|
||||
env
|
||||
)
|
||||
|
||||
return task_ex
|
||||
|
||||
@profiler.trace('workflow-controller-continue-workflow')
|
||||
def continue_workflow(self, env=None):
|
||||
def continue_workflow(self):
|
||||
"""Calculates a list of commands to continue the workflow.
|
||||
|
||||
Given a workflow specification this method makes required analysis
|
||||
according to this workflow type rules and identifies a list of
|
||||
commands needed to continue the workflow.
|
||||
|
||||
:param env: A set of environment variables to overwrite.
|
||||
:return: List of workflow commands (instances of
|
||||
mistral.workflow.commands.WorkflowCommand).
|
||||
"""
|
||||
|
||||
# TODO(rakhmerov): We now use this method for two cases:
|
||||
# 1) to handle task completion
|
||||
# 2) to resume a workflow after it's been paused
|
||||
# Moving forward we need to introduce a separate method for
|
||||
# resuming a workflow because it won't be operating with
|
||||
# any concrete tasks that caused this operation.
|
||||
|
||||
if self._is_paused_or_completed():
|
||||
return []
|
||||
|
||||
# TODO(rakhmerov): Get rid of 'env' parameter completely, WF controller
|
||||
# should not be updating any DB objects.
|
||||
return self._find_next_commands(env=env)
|
||||
return self._find_next_commands()
|
||||
|
||||
def rerun_tasks(self, task_execs, reset=True):
|
||||
"""Gets commands to rerun existing task executions.
|
||||
@@ -197,13 +190,12 @@ class WorkflowController(object):
|
||||
raise NotImplementedError
|
||||
|
||||
@abc.abstractmethod
|
||||
def _find_next_commands(self, env=None):
|
||||
def _find_next_commands(self):
|
||||
"""Finds commands that should run next.
|
||||
|
||||
A concrete algorithm of finding such tasks depends on a concrete
|
||||
workflow controller.
|
||||
|
||||
:param env: A set of environment variables to overwrite.
|
||||
:return: List of workflow commands.
|
||||
"""
|
||||
# Add all tasks in IDLE state.
|
||||
@@ -212,11 +204,6 @@ class WorkflowController(object):
|
||||
states.IDLE
|
||||
)
|
||||
|
||||
# TODO(rakhmerov): We should not be updating any DB objects in a WF
|
||||
# controller.
|
||||
for task_ex in idle_tasks:
|
||||
self._update_task_ex_env(task_ex, env)
|
||||
|
||||
return [
|
||||
commands.RunExistingTask(self.wf_ex, self.wf_spec, t)
|
||||
for t in idle_tasks
|
||||
|
||||
@@ -65,10 +65,8 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
self.wf_spec.get_tasks()[t_ex_candidate.name]
|
||||
)
|
||||
|
||||
def _find_next_commands(self, env=None):
|
||||
cmds = super(DirectWorkflowController, self)._find_next_commands(
|
||||
env=env
|
||||
)
|
||||
def _find_next_commands(self):
|
||||
cmds = super(DirectWorkflowController, self)._find_next_commands()
|
||||
|
||||
if not self.wf_ex.task_executions:
|
||||
return self._find_start_commands()
|
||||
|
||||
@@ -40,15 +40,13 @@ class ReverseWorkflowController(base.WorkflowController):
|
||||
|
||||
__workflow_type__ = "reverse"
|
||||
|
||||
def _find_next_commands(self, env=None):
|
||||
def _find_next_commands(self):
|
||||
"""Finds all tasks with resolved dependencies.
|
||||
|
||||
This method finds all tasks with resolved dependencies and
|
||||
returns them in the form of workflow commands.
|
||||
"""
|
||||
cmds = super(ReverseWorkflowController, self)._find_next_commands(
|
||||
env=env
|
||||
)
|
||||
cmds = super(ReverseWorkflowController, self)._find_next_commands()
|
||||
|
||||
task_specs = self._find_task_specs_with_satisfied_dependencies()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user