Remove environment data from task inbound context
* It's redundant to keep environment data in task inbound context, it is immutable and we can always take it from workflow execution object which is more efficient from DB space consumption standpoint. The only case when it's allowed to modify data in a workflow environment is when we either resume or re-run a workflow and in this case we can change it for the whole workflow execution. Change-Id: I244c1768aaa306f8ad41084325107a40005d874c
This commit is contained in:
@@ -14,7 +14,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import copy
|
|
||||||
import operator
|
import operator
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from osprofiler import profiler
|
from osprofiler import profiler
|
||||||
@@ -241,7 +240,7 @@ class Task(object):
|
|||||||
if not action_name:
|
if not action_name:
|
||||||
return {}
|
return {}
|
||||||
|
|
||||||
env = self.task_ex.in_context.get('__env', {})
|
env = self.wf_ex.context.get('__env', {})
|
||||||
|
|
||||||
return env.get('__actions', {}).get(action_name, {})
|
return env.get('__actions', {}).get(action_name, {})
|
||||||
|
|
||||||
@@ -351,12 +350,16 @@ class RegularTask(Task):
|
|||||||
)
|
)
|
||||||
|
|
||||||
def _get_target(self, input_dict):
|
def _get_target(self, input_dict):
|
||||||
|
ctx_view = data_flow.ContextView(
|
||||||
|
input_dict,
|
||||||
|
self.ctx,
|
||||||
|
self.wf_ex.context,
|
||||||
|
self.wf_ex.input
|
||||||
|
)
|
||||||
|
|
||||||
return expr.evaluate_recursively(
|
return expr.evaluate_recursively(
|
||||||
self.task_spec.get_target(),
|
self.task_spec.get_target(),
|
||||||
utils.merge_dicts(
|
ctx_view
|
||||||
copy.deepcopy(input_dict),
|
|
||||||
copy.deepcopy(self.ctx)
|
|
||||||
)
|
|
||||||
)
|
)
|
||||||
|
|
||||||
def _get_action_input(self, ctx=None):
|
def _get_action_input(self, ctx=None):
|
||||||
|
|||||||
@@ -140,11 +140,6 @@ class Workflow(object):
|
|||||||
# Calculate commands to process next.
|
# Calculate commands to process next.
|
||||||
cmds = wf_ctrl.continue_workflow()
|
cmds = wf_ctrl.continue_workflow()
|
||||||
|
|
||||||
if env:
|
|
||||||
for cmd in cmds:
|
|
||||||
if isinstance(cmd, commands.RunExistingTask):
|
|
||||||
_update_task_environment(cmd.task_ex, env)
|
|
||||||
|
|
||||||
self._continue_workflow(cmds)
|
self._continue_workflow(cmds)
|
||||||
|
|
||||||
def rerun(self, task_ex, reset=True, env=None):
|
def rerun(self, task_ex, reset=True, env=None):
|
||||||
@@ -167,8 +162,6 @@ class Workflow(object):
|
|||||||
|
|
||||||
self.set_state(states.RUNNING, recursive=True)
|
self.set_state(states.RUNNING, recursive=True)
|
||||||
|
|
||||||
_update_task_environment(task_ex, env)
|
|
||||||
|
|
||||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||||
|
|
||||||
# Calculate commands to process next.
|
# Calculate commands to process next.
|
||||||
@@ -379,16 +372,6 @@ class Workflow(object):
|
|||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
def _update_task_environment(task_ex, env):
|
|
||||||
if env is None:
|
|
||||||
return
|
|
||||||
|
|
||||||
task_ex.in_context['__env'] = utils.merge_dicts(
|
|
||||||
task_ex.in_context['__env'],
|
|
||||||
env
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def _get_environment(params):
|
def _get_environment(params):
|
||||||
env = params.get('env', {})
|
env = params.get('env', {})
|
||||||
|
|
||||||
|
|||||||
@@ -116,6 +116,7 @@ def update_workflow_execution_env(wf_ex, env):
|
|||||||
)
|
)
|
||||||
|
|
||||||
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
|
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
|
||||||
|
|
||||||
data_flow.add_environment_to_context(wf_ex)
|
data_flow.add_environment_to_context(wf_ex)
|
||||||
|
|
||||||
return wf_ex
|
return wf_ex
|
||||||
|
|||||||
@@ -16,7 +16,6 @@
|
|||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
import abc
|
import abc
|
||||||
import copy
|
|
||||||
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from osprofiler import profiler
|
from osprofiler import profiler
|
||||||
@@ -182,19 +181,7 @@ class WorkflowController(object):
|
|||||||
# to cover 'split' (aka 'merge') use case.
|
# to cover 'split' (aka 'merge') use case.
|
||||||
upstream_task_execs = self._get_upstream_task_executions(task_spec)
|
upstream_task_execs = self._get_upstream_task_executions(task_spec)
|
||||||
|
|
||||||
ctx = data_flow.evaluate_upstream_context(upstream_task_execs)
|
return data_flow.evaluate_upstream_context(upstream_task_execs)
|
||||||
|
|
||||||
# TODO(rakhmerov): Seems like we can fully get rid of '__env' in
|
|
||||||
# task context if we are OK to have it only in workflow execution
|
|
||||||
# object (wf_ex.context). Now we can selectively modify env
|
|
||||||
# for some tasks if we resume or re-run a workflow.
|
|
||||||
if self.wf_ex.context:
|
|
||||||
ctx['__env'] = u.merge_dicts(
|
|
||||||
copy.deepcopy(ctx.get('__env', {})),
|
|
||||||
copy.deepcopy(self.wf_ex.context.get('__env', {}))
|
|
||||||
)
|
|
||||||
|
|
||||||
return ctx
|
|
||||||
|
|
||||||
@abc.abstractmethod
|
@abc.abstractmethod
|
||||||
def _get_upstream_task_executions(self, task_spec):
|
def _get_upstream_task_executions(self, task_spec):
|
||||||
|
|||||||
Reference in New Issue
Block a user