Merge "Remove environment data from task inbound context"
This commit is contained in:
commit
82c6bdbebd
@ -14,7 +14,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import copy
|
|
||||||
import operator
|
import operator
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from osprofiler import profiler
|
from osprofiler import profiler
|
||||||
@ -241,7 +240,7 @@ class Task(object):
|
|||||||
if not action_name:
|
if not action_name:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
env = self.task_ex.in_context.get('__env', {})
|
env = self.wf_ex.context.get('__env', {})
|
||||||
|
|
||||||
return env.get('__actions', {}).get(action_name, {})
|
return env.get('__actions', {}).get(action_name, {})
|
||||||
|
|
||||||
@ -351,12 +350,16 @@ class RegularTask(Task):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _get_target(self, input_dict):
|
def _get_target(self, input_dict):
|
||||||
|
ctx_view = data_flow.ContextView(
|
||||||
|
input_dict,
|
||||||
|
self.ctx,
|
||||||
|
self.wf_ex.context,
|
||||||
|
self.wf_ex.input
|
||||||
|
)
|
||||||
|
|
||||||
return expr.evaluate_recursively(
|
return expr.evaluate_recursively(
|
||||||
self.task_spec.get_target(),
|
self.task_spec.get_target(),
|
||||||
utils.merge_dicts(
|
ctx_view
|
||||||
copy.deepcopy(input_dict),
|
|
||||||
copy.deepcopy(self.ctx)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_action_input(self, ctx=None):
|
def _get_action_input(self, ctx=None):
|
||||||
|
@ -140,11 +140,6 @@ class Workflow(object):
|
|||||||
# Calculate commands to process next.
|
# Calculate commands to process next.
|
||||||
cmds = wf_ctrl.continue_workflow()
|
cmds = wf_ctrl.continue_workflow()
|
||||||
|
|
||||||
if env:
|
|
||||||
for cmd in cmds:
|
|
||||||
if isinstance(cmd, commands.RunExistingTask):
|
|
||||||
_update_task_environment(cmd.task_ex, env)
|
|
||||||
|
|
||||||
self._continue_workflow(cmds)
|
self._continue_workflow(cmds)
|
||||||
|
|
||||||
def rerun(self, task_ex, reset=True, env=None):
|
def rerun(self, task_ex, reset=True, env=None):
|
||||||
@ -167,8 +162,6 @@ class Workflow(object):
|
|||||||
|
|
||||||
self.set_state(states.RUNNING, recursive=True)
|
self.set_state(states.RUNNING, recursive=True)
|
||||||
|
|
||||||
_update_task_environment(task_ex, env)
|
|
||||||
|
|
||||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||||
|
|
||||||
# Calculate commands to process next.
|
# Calculate commands to process next.
|
||||||
@ -379,16 +372,6 @@ class Workflow(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _update_task_environment(task_ex, env):
|
|
||||||
if env is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
task_ex.in_context['__env'] = utils.merge_dicts(
|
|
||||||
task_ex.in_context['__env'],
|
|
||||||
env
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_environment(params):
|
def _get_environment(params):
|
||||||
env = params.get('env', {})
|
env = params.get('env', {})
|
||||||
|
|
||||||
|
@ -116,6 +116,7 @@ def update_workflow_execution_env(wf_ex, env):
|
|||||||
)
|
)
|
||||||
|
|
||||||
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
|
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
|
||||||
|
|
||||||
data_flow.add_environment_to_context(wf_ex)
|
data_flow.add_environment_to_context(wf_ex)
|
||||||
|
|
||||||
return wf_ex
|
return wf_ex
|
||||||
|
@ -16,7 +16,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import copy
|
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from osprofiler import profiler
|
from osprofiler import profiler
|
||||||
@ -182,19 +181,7 @@ class WorkflowController(object):
|
|||||||
# to cover 'split' (aka 'merge') use case.
|
# to cover 'split' (aka 'merge') use case.
|
||||||
upstream_task_execs = self._get_upstream_task_executions(task_spec)
|
upstream_task_execs = self._get_upstream_task_executions(task_spec)
|
||||||
|
|
||||||
ctx = data_flow.evaluate_upstream_context(upstream_task_execs)
|
return data_flow.evaluate_upstream_context(upstream_task_execs)
|
||||||
|
|
||||||
# TODO(rakhmerov): Seems like we can fully get rid of '__env' in
|
|
||||||
# task context if we are OK to have it only in workflow execution
|
|
||||||
# object (wf_ex.context). Now we can selectively modify env
|
|
||||||
# for some tasks if we resume or re-run a workflow.
|
|
||||||
if self.wf_ex.context:
|
|
||||||
ctx['__env'] = u.merge_dicts(
|
|
||||||
copy.deepcopy(ctx.get('__env', {})),
|
|
||||||
copy.deepcopy(self.wf_ex.context.get('__env', {}))
|
|
||||||
)
|
|
||||||
|
|
||||||
return ctx
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _get_upstream_task_executions(self, task_spec):
|
def _get_upstream_task_executions(self, task_spec):
|
||||||
|
Loading…
Reference in New Issue
Block a user