Get rid of a extra copy of workflow environment
* Previously, we had two copies of the workflow environment passed by a user: one was in the 'params' fields under ke 'env' key and another one was copied into the 'context' field under the '__env' key so that we can evaluate expressions involving the env() function (YAQL or Jinja). This patch removes the copy from the 'context' field in favor of using an ad-hoc ContextView structure where we now also weave in the environment dictionary under the same key '__env'. Related-Bug: #1757966 Change-Id: I1204b082794b376787d126136a79dd204ec3af07
This commit is contained in:
parent
56adb2ed65
commit
b77769cf44
@ -381,7 +381,8 @@ class AdHocAction(PythonAction):
|
||||
)
|
||||
|
||||
base_action_def = self._gather_base_actions(
|
||||
action_def, base_action_def
|
||||
action_def,
|
||||
base_action_def
|
||||
)
|
||||
|
||||
super(AdHocAction, self).__init__(
|
||||
@ -421,11 +422,17 @@ class AdHocAction(PythonAction):
|
||||
base_input_expr = action_spec.get_base_input()
|
||||
|
||||
if base_input_expr:
|
||||
wf_ex = (
|
||||
self.task_ex.workflow_execution if self.task_ex else None
|
||||
)
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
base_input_dict,
|
||||
self.task_ctx,
|
||||
data_flow.get_workflow_environment_dict(wf_ex),
|
||||
self.wf_ctx
|
||||
)
|
||||
|
||||
base_input_dict = expr.evaluate_recursively(
|
||||
base_input_expr,
|
||||
ctx_view
|
||||
@ -447,8 +454,10 @@ class AdHocAction(PythonAction):
|
||||
|
||||
if transformer is not None:
|
||||
result = ml_actions.Result(
|
||||
data=expr.evaluate_recursively(transformer,
|
||||
result.data),
|
||||
data=expr.evaluate_recursively(
|
||||
transformer,
|
||||
result.data
|
||||
),
|
||||
error=result.error
|
||||
)
|
||||
|
||||
|
@ -152,6 +152,7 @@ class TaskPolicy(object):
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
task_ex.in_context,
|
||||
data_flow.get_workflow_environment_dict(wf_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
@ -170,6 +171,7 @@ class TaskPolicy(object):
|
||||
|
||||
ctx_view = data_flow.ContextView(
|
||||
task_ex.in_context,
|
||||
data_flow.get_workflow_environment_dict(wf_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
@ -342,7 +342,7 @@ class Task(object):
|
||||
if not action_name:
|
||||
return {}
|
||||
|
||||
env = self.wf_ex.context.get('__env', {})
|
||||
env = self.wf_ex.params['env']
|
||||
|
||||
return env.get('__actions', {}).get(action_name, {})
|
||||
|
||||
@ -483,6 +483,7 @@ class RegularTask(Task):
|
||||
ctx_view = data_flow.ContextView(
|
||||
input_dict,
|
||||
self.ctx,
|
||||
data_flow.get_workflow_environment_dict(self.wf_ex),
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
@ -512,6 +513,7 @@ class RegularTask(Task):
|
||||
def _evaluate_expression(self, expression, ctx=None):
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(self.task_ex),
|
||||
data_flow.get_workflow_environment_dict(self.wf_ex),
|
||||
ctx or self.ctx,
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
|
@ -27,6 +27,7 @@ from mistral.engine import action_queue
|
||||
from mistral.engine import dispatcher
|
||||
from mistral.engine import utils as engine_utils
|
||||
from mistral import exceptions as exc
|
||||
from mistral import expressions as expr
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral.notifiers import base as notif
|
||||
from mistral.notifiers import notification_events as events
|
||||
@ -272,10 +273,12 @@ class Workflow(object):
|
||||
return db_api.acquire_lock(db_models.WorkflowExecution, self.wf_ex.id)
|
||||
|
||||
def _get_final_context(self):
|
||||
final_ctx = {}
|
||||
|
||||
wf_ctrl = wf_base.get_controller(self.wf_ex)
|
||||
final_context = {}
|
||||
|
||||
try:
|
||||
final_context = wf_ctrl.evaluate_workflow_final_context()
|
||||
final_ctx = wf_ctrl.evaluate_workflow_final_context()
|
||||
except Exception as e:
|
||||
LOG.warning(
|
||||
'Failed to get final context for workflow execution. '
|
||||
@ -285,7 +288,7 @@ class Workflow(object):
|
||||
str(e)
|
||||
)
|
||||
|
||||
return final_context
|
||||
return final_ctx
|
||||
|
||||
def _create_execution(self, wf_def, wf_ex_id, input_dict, desc, params):
|
||||
self.wf_ex = db_api.create_workflow_execution({
|
||||
@ -307,16 +310,12 @@ class Workflow(object):
|
||||
|
||||
self.wf_ex.input = input_dict or {}
|
||||
|
||||
env = _get_environment(params)
|
||||
|
||||
if env:
|
||||
params['env'] = env
|
||||
params['env'] = _get_environment(params)
|
||||
|
||||
self.wf_ex.params = params
|
||||
|
||||
data_flow.add_openstack_data_to_context(self.wf_ex)
|
||||
data_flow.add_execution_to_context(self.wf_ex)
|
||||
data_flow.add_environment_to_context(self.wf_ex)
|
||||
data_flow.add_workflow_variables_to_context(self.wf_ex, self.wf_spec)
|
||||
|
||||
spec_parser.cache_workflow_spec_by_execution_id(
|
||||
@ -545,10 +544,12 @@ class Workflow(object):
|
||||
def _get_environment(params):
|
||||
env = params.get('env', {})
|
||||
|
||||
if isinstance(env, dict):
|
||||
return env
|
||||
if not env:
|
||||
return {}
|
||||
|
||||
if isinstance(env, six.string_types):
|
||||
if isinstance(env, dict):
|
||||
env_dict = env
|
||||
elif isinstance(env, six.string_types):
|
||||
env_db = db_api.load_environment(env)
|
||||
|
||||
if not env_db:
|
||||
@ -556,12 +557,18 @@ def _get_environment(params):
|
||||
'Environment is not found: %s' % env
|
||||
)
|
||||
|
||||
return env_db.variables
|
||||
env_dict = env_db.variables
|
||||
else:
|
||||
raise exc.InputException(
|
||||
'Unexpected value type for environment [env=%s, type=%s]'
|
||||
% (env, type(env))
|
||||
)
|
||||
|
||||
raise exc.InputException(
|
||||
'Unexpected value type for environment [env=%s, type=%s]'
|
||||
% (env, type(env))
|
||||
)
|
||||
if ('evaluate_env' in params and
|
||||
not params['evaluate_env']):
|
||||
return env_dict
|
||||
else:
|
||||
return expr.evaluate_recursively(env_dict, {'__env': env_dict})
|
||||
|
||||
|
||||
def _build_fail_info_message(wf_ctrl, wf_ex):
|
||||
|
@ -16,7 +16,6 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral import exceptions as exc
|
||||
from mistral.lang import parser as spec_parser
|
||||
from mistral import utils
|
||||
from mistral.workflow import data_flow
|
||||
from mistral.workflow import states
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -136,8 +135,6 @@ def update_workflow_execution_env(wf_ex, env):
|
||||
|
||||
wf_ex.params['env'] = utils.merge_dicts(wf_ex.params['env'], env)
|
||||
|
||||
data_flow.add_environment_to_context(wf_ex)
|
||||
|
||||
return wf_ex
|
||||
|
||||
|
||||
|
@ -42,8 +42,8 @@ EXPECTED_ENV_AUTH = ('librarian', 'password123')
|
||||
WORKFLOW1 = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
wf1:
|
||||
type: direct
|
||||
tasks:
|
||||
task1:
|
||||
action: std.http url="https://api.library.org/books"
|
||||
@ -54,8 +54,8 @@ wf1:
|
||||
WORKFLOW2 = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
wf2:
|
||||
type: direct
|
||||
tasks:
|
||||
task1:
|
||||
action: std.http url="https://api.library.org/books" timeout=60
|
||||
@ -66,10 +66,11 @@ wf2:
|
||||
WORKFLOW1_WITH_ITEMS = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
wf1_with_items:
|
||||
type: direct
|
||||
input:
|
||||
- links
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
with-items: link in <% $.links %>
|
||||
@ -81,10 +82,11 @@ wf1_with_items:
|
||||
WORKFLOW2_WITH_ITEMS = """
|
||||
---
|
||||
version: "2.0"
|
||||
|
||||
wf2_with_items:
|
||||
type: direct
|
||||
input:
|
||||
- links
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
with-items: link in <% $.links %>
|
||||
@ -95,7 +97,6 @@ wf2_with_items:
|
||||
|
||||
|
||||
class ActionDefaultTest(base.EngineTestCase):
|
||||
|
||||
@mock.patch.object(
|
||||
requests, 'request',
|
||||
mock.MagicMock(return_value=test_base.FakeHTTPResponse('', 200, 'OK')))
|
||||
@ -116,11 +117,18 @@ class ActionDefaultTest(base.EngineTestCase):
|
||||
self._assert_single_item(wf_ex.task_executions, name='task1')
|
||||
|
||||
requests.request.assert_called_with(
|
||||
'GET', 'https://api.library.org/books',
|
||||
params=None, data=None, headers=None, cookies=None,
|
||||
allow_redirects=None, proxies=None, verify=None,
|
||||
'GET',
|
||||
'https://api.library.org/books',
|
||||
params=None,
|
||||
data=None,
|
||||
headers=None,
|
||||
cookies=None,
|
||||
allow_redirects=None,
|
||||
proxies=None,
|
||||
verify=None,
|
||||
auth=EXPECTED_ENV_AUTH,
|
||||
timeout=ENV['__actions']['std.http']['timeout'])
|
||||
timeout=ENV['__actions']['std.http']['timeout']
|
||||
)
|
||||
|
||||
@mock.patch.object(
|
||||
requests, 'request',
|
||||
|
@ -338,7 +338,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(3, len(task_execs))
|
||||
self.assertDictEqual(env, wf_ex.params['env'])
|
||||
self.assertDictEqual(env, wf_ex.context['__env'])
|
||||
|
||||
task_10_ex = self._assert_single_item(task_execs, name='t10')
|
||||
task_21_ex = self._assert_single_item(task_execs, name='t21')
|
||||
@ -362,7 +361,6 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertIsNone(wf_ex.state_info)
|
||||
self.assertDictEqual(updated_env, wf_ex.params['env'])
|
||||
self.assertDictEqual(updated_env, wf_ex.context['__env'])
|
||||
|
||||
# Await t30 success.
|
||||
self.await_task_success(task_30_ex.id)
|
||||
|
@ -14,6 +14,7 @@
|
||||
|
||||
import mock
|
||||
from oslo_config import cfg
|
||||
import testtools
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.executors import default_executor as d_exe
|
||||
@ -113,7 +114,6 @@ class EnvironmentTest(base.EngineTestCase):
|
||||
# Execution of 'wf2'.
|
||||
self.assertIsNotNone(wf2_ex)
|
||||
self.assertDictEqual({}, wf2_ex.input)
|
||||
self.assertDictContainsSubset({'env': env}, wf2_ex.params)
|
||||
|
||||
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
|
||||
|
||||
@ -126,19 +126,12 @@ class EnvironmentTest(base.EngineTestCase):
|
||||
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
|
||||
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
|
||||
|
||||
expected_start_params = {
|
||||
'task_name': 'task2',
|
||||
'task_execution_id': wf1_ex.task_execution_id,
|
||||
'env': env
|
||||
}
|
||||
|
||||
expected_wf1_input = {
|
||||
'param1': 'Bonnie',
|
||||
'param2': 'Clyde'
|
||||
}
|
||||
|
||||
self.assertIsNotNone(wf1_ex.task_execution_id)
|
||||
self.assertDictContainsSubset(expected_start_params, wf1_ex.params)
|
||||
self.assertDictEqual(wf1_ex.input, expected_wf1_input)
|
||||
|
||||
# Wait till workflow 'wf1' is completed.
|
||||
@ -364,3 +357,97 @@ class EnvironmentTest(base.EngineTestCase):
|
||||
},
|
||||
sub_wf_ex.output
|
||||
)
|
||||
|
||||
def test_env_not_copied_to_context(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
wf:
|
||||
tasks:
|
||||
task1:
|
||||
action: std.echo output="<% env().param1 %>"
|
||||
publish:
|
||||
result: <% task().result %>
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
env = {
|
||||
'param1': 'val1',
|
||||
'param2': 'val2',
|
||||
'param3': 'val3'
|
||||
}
|
||||
|
||||
wf_ex = self.engine.start_workflow('wf', env=env)
|
||||
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
t = self._assert_single_item(
|
||||
wf_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
self.assertDictEqual({'result': 'val1'}, t.published)
|
||||
|
||||
self.assertNotIn('__env', wf_ex.context)
|
||||
|
||||
@testtools.skip("Not implemented yet")
|
||||
def test_subworkflow_env_no_duplicate(self):
|
||||
wf_text = """---
|
||||
version: '2.0'
|
||||
|
||||
parent_wf:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: sub_wf
|
||||
|
||||
sub_wf:
|
||||
output:
|
||||
result: <% $.result %>
|
||||
|
||||
tasks:
|
||||
task1:
|
||||
action: std.noop
|
||||
publish:
|
||||
result: <% env().param1 %>
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_text)
|
||||
|
||||
env = {
|
||||
'param1': 'val1',
|
||||
'param2': 'val2',
|
||||
'param3': 'val3'
|
||||
}
|
||||
|
||||
parent_wf_ex = self.engine.start_workflow('parent_wf', env=env)
|
||||
|
||||
self.await_workflow_success(parent_wf_ex.id)
|
||||
|
||||
with db_api.transaction():
|
||||
parent_wf_ex = db_api.get_workflow_execution(parent_wf_ex.id)
|
||||
|
||||
t = self._assert_single_item(
|
||||
parent_wf_ex.task_executions,
|
||||
name='task1'
|
||||
)
|
||||
|
||||
sub_wf_ex = db_api.get_workflow_executions(
|
||||
task_execution_id=t.id
|
||||
)[0]
|
||||
|
||||
self.assertDictEqual(
|
||||
{
|
||||
"result": "val1"
|
||||
},
|
||||
sub_wf_ex.output
|
||||
)
|
||||
|
||||
# The environment of the subworkflow must be empty.
|
||||
# To evaluate expressions it should be taken from the
|
||||
# parent workflow execution.
|
||||
self.assertIsNone(sub_wf_ex.params['env'])
|
||||
self.assertIsNone(sub_wf_ex.context['__env'])
|
||||
|
@ -374,7 +374,8 @@ class PoliciesTest(base.EngineTestCase):
|
||||
wf_ex = models.WorkflowExecution(
|
||||
id='1-2-3-4',
|
||||
context={},
|
||||
input={}
|
||||
input={},
|
||||
params={}
|
||||
)
|
||||
|
||||
task_ex = models.TaskExecution(in_context={'int_var': 5})
|
||||
|
@ -68,7 +68,10 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
wb_service.create_workbook_v2(WORKBOOK)
|
||||
|
||||
def test_start_task1(self):
|
||||
wf_input = {'param1': 'a', 'param2': 'b'}
|
||||
wf_input = {
|
||||
'param1': 'a',
|
||||
'param2': 'b'
|
||||
}
|
||||
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'my_wb.wf1',
|
||||
@ -80,7 +83,11 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
self.assertIsNotNone(wf_ex)
|
||||
self.assertDictEqual(wf_input, wf_ex.input)
|
||||
self.assertDictEqual(
|
||||
{'task_name': 'task1', 'namespace': ''},
|
||||
{
|
||||
'task_name': 'task1',
|
||||
'namespace': '',
|
||||
'env': {}
|
||||
},
|
||||
wf_ex.params
|
||||
)
|
||||
|
||||
@ -104,7 +111,10 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
self.assertDictEqual({'result1': 'a'}, task_ex.published)
|
||||
|
||||
def test_start_task2(self):
|
||||
wf_input = {'param1': 'a', 'param2': 'b'}
|
||||
wf_input = {
|
||||
'param1': 'a',
|
||||
'param2': 'b'
|
||||
}
|
||||
|
||||
wf_ex = self.engine.start_workflow(
|
||||
'my_wb.wf1',
|
||||
@ -116,7 +126,11 @@ class ReverseWorkflowEngineTest(base.EngineTestCase):
|
||||
self.assertIsNotNone(wf_ex)
|
||||
self.assertDictEqual(wf_input, wf_ex.input)
|
||||
self.assertDictEqual(
|
||||
{'task_name': 'task2', 'namespace': ''},
|
||||
{
|
||||
'task_name': 'task2',
|
||||
'namespace': '',
|
||||
'env': {}
|
||||
},
|
||||
wf_ex.params
|
||||
)
|
||||
|
||||
|
@ -215,7 +215,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(2, len(task_execs))
|
||||
self.assertDictEqual(env, wf_ex.params['env'])
|
||||
self.assertDictEqual(env, wf_ex.context['__env'])
|
||||
|
||||
task_1_ex = self._assert_single_item(task_execs, name='t1')
|
||||
task_2_ex = self._assert_single_item(task_execs, name='t2')
|
||||
@ -238,7 +237,6 @@ class ReverseWorkflowRerunTest(base.EngineTestCase):
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertIsNone(wf_ex.state_info)
|
||||
self.assertDictEqual(updated_env, wf_ex.params['env'])
|
||||
self.assertDictEqual(updated_env, wf_ex.context['__env'])
|
||||
|
||||
# Wait for the workflow to succeed.
|
||||
self.await_workflow_success(wf_ex.id)
|
||||
|
@ -233,7 +233,7 @@ class SubworkflowsTest(base.EngineTestCase):
|
||||
self.assertEqual(project_id, wf2_ex.project_id)
|
||||
self.assertIsNotNone(wf2_ex)
|
||||
self.assertDictEqual({}, wf2_ex.input)
|
||||
self.assertDictEqual({'namespace': ''}, wf2_ex.params)
|
||||
self.assertDictEqual({'namespace': '', 'env': {}}, wf2_ex.params)
|
||||
|
||||
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
|
||||
|
||||
|
@ -427,7 +427,6 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
self.assertEqual(states.PAUSED, wf_ex.state)
|
||||
self.assertEqual(2, len(task_execs))
|
||||
self.assertDictEqual(env, wf_ex.params['env'])
|
||||
self.assertDictEqual(env, wf_ex.context['__env'])
|
||||
self.assertEqual(states.SUCCESS, task_1_ex.state)
|
||||
self.assertEqual(states.IDLE, task_2_ex.state)
|
||||
|
||||
@ -448,7 +447,6 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
task_execs = wf_ex.task_executions
|
||||
|
||||
self.assertDictEqual(updated_env, wf_ex.params['env'])
|
||||
self.assertDictEqual(updated_env, wf_ex.context['__env'])
|
||||
self.assertEqual(3, len(task_execs))
|
||||
|
||||
# Check result of task2.
|
||||
|
@ -377,7 +377,11 @@ class YAQLFunctionsEngineTest(engine_test_base.EngineTestCase):
|
||||
)
|
||||
|
||||
self.assertDictEqual(
|
||||
{'param1': 'blablabla', 'namespace': ''},
|
||||
{
|
||||
'param1': 'blablabla',
|
||||
'namespace': '',
|
||||
'env': {}
|
||||
},
|
||||
execution['params']
|
||||
)
|
||||
|
||||
|
@ -260,7 +260,6 @@ class WorkflowServiceTest(base.DbTestCase):
|
||||
)
|
||||
|
||||
self.assertDictEqual(update_env, updated.params['env'])
|
||||
self.assertDictEqual(update_env, updated.context['__env'])
|
||||
|
||||
fetched = db_api.get_workflow_execution(created.id)
|
||||
|
||||
|
@ -38,6 +38,7 @@ class DirectWorkflowControllerTest(base.DbTestCase):
|
||||
state=states.RUNNING,
|
||||
workflow_id=wfs[0].id,
|
||||
input={},
|
||||
params={},
|
||||
context={}
|
||||
)
|
||||
|
||||
|
@ -13,8 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import copy
|
||||
|
||||
from oslo_config import cfg
|
||||
from oslo_log import log as logging
|
||||
|
||||
@ -193,6 +191,7 @@ def publish_variables(task_ex, task_spec):
|
||||
expr_ctx = ContextView(
|
||||
get_current_task_dict(task_ex),
|
||||
task_ex.in_context,
|
||||
get_workflow_environment_dict(wf_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
@ -264,7 +263,12 @@ def evaluate_workflow_output(wf_ex, wf_output, ctx):
|
||||
"""
|
||||
|
||||
# Evaluate workflow 'output' clause using the final workflow context.
|
||||
ctx_view = ContextView(ctx, wf_ex.context, wf_ex.input)
|
||||
ctx_view = ContextView(
|
||||
ctx,
|
||||
get_workflow_environment_dict(wf_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
output = expr.evaluate_recursively(wf_output, ctx_view)
|
||||
|
||||
@ -298,30 +302,16 @@ def add_execution_to_context(wf_ex):
|
||||
wf_ex.context['__execution'] = {'id': wf_ex.id}
|
||||
|
||||
|
||||
def add_environment_to_context(wf_ex):
|
||||
# TODO(rakhmerov): This is redundant, we can always get env from WF params
|
||||
wf_ex.context = wf_ex.context or {}
|
||||
|
||||
# If env variables are provided, add an evaluated copy into the context.
|
||||
if 'env' in wf_ex.params:
|
||||
env = copy.deepcopy(wf_ex.params['env'])
|
||||
|
||||
if ('evaluate_env' in wf_ex.params and
|
||||
not wf_ex.params['evaluate_env']):
|
||||
wf_ex.context['__env'] = env
|
||||
else:
|
||||
wf_ex.context['__env'] = expr.evaluate_recursively(
|
||||
env,
|
||||
{'__env': env}
|
||||
)
|
||||
|
||||
|
||||
def add_workflow_variables_to_context(wf_ex, wf_spec):
|
||||
wf_ex.context = wf_ex.context or {}
|
||||
|
||||
# The context for calculating workflow variables is workflow input
|
||||
# and other data already stored in workflow initial context.
|
||||
ctx_view = ContextView(wf_ex.context, wf_ex.input)
|
||||
ctx_view = ContextView(
|
||||
get_workflow_environment_dict(wf_ex),
|
||||
wf_ex.context,
|
||||
wf_ex.input
|
||||
)
|
||||
|
||||
wf_vars = expr.evaluate_recursively(wf_spec.get_vars(), ctx_view)
|
||||
|
||||
@ -335,3 +325,9 @@ def evaluate_object_fields(obj, context):
|
||||
|
||||
for k, v in evaluated_fields.items():
|
||||
setattr(obj, k, v)
|
||||
|
||||
|
||||
def get_workflow_environment_dict(wf_ex):
|
||||
env_dict = wf_ex.params['env'] if wf_ex and 'env' in wf_ex.params else {}
|
||||
|
||||
return {'__env': env_dict}
|
||||
|
@ -198,6 +198,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
for t_ex in lookup_utils.find_error_task_executions(self.wf_ex.id):
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.evaluate_task_outbound_context(t_ex),
|
||||
data_flow.get_workflow_environment_dict(self.wf_ex),
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
@ -252,6 +253,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
ctx_view = data_flow.ContextView(
|
||||
data_flow.get_current_task_dict(task_ex),
|
||||
ctx or data_flow.evaluate_task_outbound_context(task_ex),
|
||||
data_flow.get_workflow_environment_dict(self.wf_ex),
|
||||
self.wf_ex.context,
|
||||
self.wf_ex.input
|
||||
)
|
||||
|
Loading…
x
Reference in New Issue
Block a user