Getting rid of task result proxies in workflow context

* Task result proxies were needed to be able to access
  task result as $.task_name. But this was deprecated in Mitaka
  in favor of using task() YAQL function. So it's time to get
  rid of complicated internal machinery before making further
  improvements

Change-Id: I9b8c1c9ac6e9561c6aa66151011ae2f7d906179a
Implements: blueprint mistral-remove-task-result-proxies
This commit is contained in:
Renat Akhmerov 2016-04-05 16:32:17 +07:00
parent ad07ba0d68
commit 4a88302f49
20 changed files with 131 additions and 181 deletions

View File

@ -287,8 +287,6 @@ def _get_input_dictionaries(wf_spec, task_ex, task_spec, ctx):
:return the list of tuples containing indexes
and the corresponding input dict.
"""
# TODO(rakhmerov): Think how to get rid of this.
ctx = data_flow.extract_task_result_proxies_to_context(ctx)
if not task_spec.get_with_items():
input_dict = _get_workflow_or_action_input(

View File

@ -9,7 +9,7 @@ workflows:
catalog:
action: keystone.service_catalog_get_data
publish:
result: <% $.catalog %>
result: <% task(catalog).result %>
nova:
type: direct
@ -17,7 +17,7 @@ workflows:
networks_list:
action: nova.networks_list
publish:
result: <% $.networks_list %>
result: <% task(networks_list).result %>
glance:
type: direct
@ -25,7 +25,7 @@ workflows:
images_list:
action: glance.images_list
publish:
result: <% $.images_list %>
result: <% task(images_list).result %>
heat:
type: direct
@ -33,7 +33,7 @@ workflows:
stacks_list:
action: heat.stacks_list
publish:
result: <% $.stacks_list %>
result: <% task(stacks_list).result %>
neutron:
type: direct
@ -41,7 +41,7 @@ workflows:
list_subnets:
action: neutron.list_subnets
publish:
result: <% $.list_subnets %>
result: <% task(list_subnets).result %>
cinder:
type: direct
@ -49,5 +49,5 @@ workflows:
volumes_list:
action: cinder.volumes_list
publish:
result: <% $.volumes_list %>
result: <% task(volumes_list).result %>

View File

@ -10,4 +10,4 @@ workflows:
hello:
action: std.echo output="Hello"
publish:
result: <% $.hello %>
result: <% task(hello).result %>

View File

@ -9,7 +9,7 @@ wf:
action: std.echo output="Hello"
wait-before: 1
publish:
result: <% $.hello %>
result: <% task(hello).result %>
wf1:
type: reverse
@ -20,7 +20,7 @@ wf1:
addressee:
action: std.echo output="John"
publish:
name: <% $.addressee %>
name: <% task(addressee).result %>
goodbye:
action: std.echo output="<% $.farewell %>, <% $.name %>"

View File

@ -45,7 +45,7 @@ workflows:
input:
url: https://wiki.openstack.org/wiki/mistral
publish:
result: <% $.task1 %>
result: <% task(task1).result %>
"""

View File

@ -49,13 +49,13 @@ workflows:
- str2
output:
workflow_result: <% $.result %> # Access to execution context variables
concat_task_result: <% $.concat %> # Access to the same but via task name
concat_task_result: <% task(concat).result %> # Same but via task name
tasks:
concat:
action: concat_twice s1=<% $.str1 %> s2=<% $.str2 %>
publish:
result: <% $.concat %>
result: <% task(concat).result %>
wf2:
type: direct
@ -64,13 +64,13 @@ workflows:
- str2
output:
workflow_result: <% $.result %> # Access to execution context variables
concat_task_result: <% $.concat %> # Access to the same but via task name
concat_task_result: <% task(concat).result %> # Same but via task name
tasks:
concat:
action: concat_twice s2=<% $.str2 %>
publish:
result: <% $.concat %>
result: <% task(concat).result %>
wf3:
type: direct

View File

@ -43,14 +43,14 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task1:
action: std.echo output="Hi"
publish:
hi: <% $.task1 %>
hi: <% task(task1).result %>
on-success:
- task2
task2:
action: std.echo output="Morpheus"
publish:
to: <% $.task2 %>
to: <% task(task2).result %>
on-success:
- task3
@ -96,7 +96,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task1:
action: std.echo output="Hi"
publish:
hi: <% $.task1 %>
hi: <% task(task1).result %>
progress: "completed task1"
on-success:
- notify
@ -105,7 +105,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task2:
action: std.echo output="Morpheus"
publish:
to: <% $.task2 %>
to: <% task(task2).result %>
progress: "completed task2"
on-success:
- notify
@ -121,7 +121,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
notify:
action: std.echo output=<% $.progress %>
publish:
progress: <% $.notify %>
progress: <% task(notify).result %>
"""
wf_service.create_workflows(linear_with_branches_wf)
@ -180,12 +180,12 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task1:
action: std.echo output=1
publish:
var1: <% $.task1 %>
var1: <% task(task1).result %>
task2:
action: std.echo output=2
publish:
var2: <% $.task2 %>
var2: <% task(task2).result %>
"""
wf_service.create_workflows(parallel_tasks_wf)
@ -318,21 +318,21 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task1:
action: std.echo output="Hi"
publish:
greeting: <% $.task1 %>
greeting: <% task(task1).result %>
on-success:
- task2
task2:
action: std.echo output="Yo"
publish:
greeting: <% $.task2 %>
greeting: <% task(task2).result %>
on-success:
- task3
task3:
action: std.echo output="Morpheus"
publish:
to: <% $.task3 %>
to: <% task(task3).result %>
on-success:
- task4
@ -400,7 +400,9 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task4:
join: all
publish:
result: "<% $.task1 %>, <% $.task21 %>! Your <% $.task22 %>."
result: >
<% task(task1).result %>, <% task(task21).result %>!
Your <% task(task22).result %>.
"""
wf_service.create_workflows(linear_wf)
@ -416,7 +418,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task4 = self._assert_single_item(tasks, name='task4')
self.assertDictEqual(
{'result': 'Hi, Morpheus! Your Neo.'},
{'result': 'Hi, Morpheus! Your Neo.\n'},
task4.published
)
@ -431,7 +433,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
task1:
action: std.echo output=["Hi", "John Doe!"]
publish:
hi: <% $.task1 %>
hi: <% task(task1).result %>
keep-result: false
"""
@ -446,6 +448,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
result = data_flow.get_task_execution_result(task1)
@ -471,7 +474,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
with-items: i in <% list() %>
action: std.echo output= "Task 1.<% $.i %>"
publish:
result: <% $.task1 %>
result: <% task(task1).result %>
"""
wf_service.create_workflows(wf)
@ -505,14 +508,12 @@ class DataFlowTest(test_base.BaseTest):
}
)
action_exs = []
action_exs.append(models.ActionExecution(
action_exs = [models.ActionExecution(
name='my_action',
output={'result': 1},
accepted=True,
runtime_context={'with_items_index': 0}
))
)]
with mock.patch.object(db_api, 'get_action_executions',
return_value=action_exs):

View File

@ -53,7 +53,7 @@ workflows:
task1:
action: std.echo output=<% $.param1 %>
publish:
var: <% $.task1 %>
var: <% task(task1).result %>
task2:
action: std.echo output=<% $.param2 %>
@ -364,7 +364,6 @@ class DefaultEngineTest(base.DbTestCase):
self.assertEqual(states.SUCCESS, task2_action_ex.state)
# Data Flow properties.
self.assertIn('__tasks', task2_ex.in_context)
self.assertIn('__execution', task1_ex.in_context)
self.assertDictEqual({'output': 'Hi'}, task2_action_ex.input)
self.assertDictEqual({}, task2_ex.published)

View File

@ -31,10 +31,13 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
SIMPLE_WORKBOOK = """
---
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
action: std.echo output="Task 1"
@ -51,10 +54,13 @@ workflows:
SIMPLE_WORKBOOK_DIFF_ENV_VAR = """
---
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t10:
action: std.echo output="Task 10"
@ -78,16 +84,19 @@ workflows:
WITH_ITEMS_WORKBOOK = """
---
version: '2.0'
name: wb3
workflows:
wf1:
type: direct
tasks:
t1:
with-items: i in <% list(range(0, 3)) %>
action: std.echo output="Task 1.<% $.i %>"
publish:
v1: <% $.t1 %>
v1: <% task(t1).result %>
on-success:
- t2
t2:
@ -97,16 +106,19 @@ workflows:
WITH_ITEMS_WORKBOOK_DIFF_ENV_VAR = """
---
version: '2.0'
name: wb3
workflows:
wf1:
type: direct
tasks:
t1:
with-items: i in <% list(range(0, 3)) %>
action: std.echo output="Task 1.<% $.i %> [<% env().var1 %>]"
publish:
v1: <% $.t1 %>
v1: <% task(t1).result %>
on-success:
- t2
t2:
@ -116,17 +128,20 @@ workflows:
WITH_ITEMS_WORKBOOK_CONCURRENCY = """
---
version: '2.0'
name: wb3
workflows:
wf1:
type: direct
tasks:
t1:
with-items: i in <% list(range(0, 4)) %>
action: std.echo output="Task 1.<% $.i %>"
concurrency: 2
publish:
v1: <% $.t1 %>
v1: <% task(t1).result %>
on-success:
- t2
t2:
@ -136,10 +151,13 @@ workflows:
JOIN_WORKBOOK = """
---
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
action: std.echo output="Task 1"
@ -156,10 +174,13 @@ workflows:
SUBFLOW_WORKBOOK = """
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
action: std.echo output="Task 1"
@ -171,10 +192,13 @@ workflows:
- t3
t3:
action: std.echo output="Task 3"
wf2:
type: direct
output:
result: <% $.wf2_t1 %>
result: <% task(wf2_t1).result %>
tasks:
wf2_t1:
action: std.echo output="Task 2"

View File

@ -51,14 +51,14 @@ workflows:
action: std.echo output=<% $.param1 %>
target: <% env().var1 %>
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
task2:
requires: [task1]
action: std.echo output="'<% $.result1 %> & <% $.param2 %>'"
target: <% env().var1 %>
publish:
final_result: <% $.task2 %>
final_result: <% task(task2).result %>
wf2:
output:
@ -72,7 +72,8 @@ workflows:
param2: <% env().var3 %>
task_name: task2
publish:
slogan: "<% $.task1.final_result %> is a cool <% env().var4 %>!"
slogan: >
<% task(task1).result.final_result %> is a cool <% env().var4 %>!
"""
@ -92,9 +93,9 @@ def _run_at_target(action_ex_id, action_class_str, attributes,
MOCK_RUN_AT_TARGET = mock.MagicMock(side_effect=_run_at_target)
class SubworkflowsTest(base.EngineTestCase):
class EnvironmentTest(base.EngineTestCase):
def setUp(self):
super(SubworkflowsTest, self).setUp()
super(EnvironmentTest, self).setUp()
wb_service.create_workbook_v2(WORKBOOK)
@ -151,7 +152,7 @@ class SubworkflowsTest(base.EngineTestCase):
wf2_ex = db_api.get_workflow_execution(wf2_ex.id)
expected_wf2_output = {'slogan': "'Bonnie & Clyde' is a cool movie!"}
expected_wf2_output = {'slogan': "'Bonnie & Clyde' is a cool movie!\n"}
self.assertDictEqual(wf2_ex.output, expected_wf2_output)

View File

@ -45,10 +45,10 @@ wf:
success_result: <% $.success_result %>
error_result: <% $.error_result %>
publish:
p_var: <% $.task1 %>
p_var: <% task(task1).result %>
on-error:
- task2: <% $.task1 = 2 %>
- task3: <% $.task1 = 3 %>
- task2: <% task(task1).result = 2 %>
- task3: <% task(task1).result = 3 %>
task2:
action: std.noop

View File

@ -54,7 +54,7 @@ workflows:
context: <% $ %>
publish:
result: <% $.task1 %>
result: <% task(task1).result %>
"""

View File

@ -41,14 +41,14 @@ class JoinEngineTest(base.EngineTestCase):
task1:
action: std.echo output=1
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
on-complete:
- task3
task2:
action: std.echo output=2
publish:
result2: <% $.task2 %>
result2: <% task(task2).result %>
on-complete:
- task3
@ -56,7 +56,7 @@ class JoinEngineTest(base.EngineTestCase):
join: all
action: std.echo output="<% $.result1 %>,<% $.result2 %>"
publish:
result3: <% $.task3 %>
result3: <% task(task3).result %>
"""
wf_service.create_workflows(wf_full_join)
@ -95,7 +95,7 @@ class JoinEngineTest(base.EngineTestCase):
task1:
action: std.echo output=1
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
on-complete:
- task3
@ -108,7 +108,7 @@ class JoinEngineTest(base.EngineTestCase):
join: all
action: std.echo output="<% $.result1 %>-<% $.result1 %>"
publish:
result3: <% $.task3 %>
result3: <% task(task3).result %>
"""
wf_service.create_workflows(wf_full_join_with_errors)
@ -147,14 +147,14 @@ class JoinEngineTest(base.EngineTestCase):
task1:
action: std.echo output=1
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
on-complete:
- task3
task2:
action: std.echo output=2
publish:
result2: <% $.task2 %>
result2: <% task(task2).result %>
on-complete:
- task3: <% $.result2 = 11111 %>
- task4: <% $.result2 = 2 %>
@ -163,12 +163,12 @@ class JoinEngineTest(base.EngineTestCase):
join: all
action: std.echo output="<% $.result1 %>-<% $.result1 %>"
publish:
result3: <% $.task3 %>
result3: <% task(task3).result %>
task4:
action: std.echo output=4
publish:
result4: <% $.task4 %>
result4: <% task(task4).result %>
"""
wf_service.create_workflows(wf_full_join_with_conditions)
@ -213,14 +213,14 @@ class JoinEngineTest(base.EngineTestCase):
task1:
action: std.echo output=1
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
on-complete:
- task4
task2:
action: std.echo output=2
publish:
result2: <% $.task2 %>
result2: <% task(task2).result %>
on-complete:
- task4
@ -240,7 +240,7 @@ class JoinEngineTest(base.EngineTestCase):
join: 2
action: std.echo output="<% $.result1 %>,<% $.result2 %>"
publish:
result4: <% $.task4 %>
result4: <% task(task4).result %>
"""
wf_service.create_workflows(wf_partial_join)
@ -320,7 +320,7 @@ class JoinEngineTest(base.EngineTestCase):
<% result1 in $.keys() %>,<% result2 in $.keys() %>,
<% result3 in $.keys() %>,<% result4 in $.keys() %>
publish:
result5: <% $.task5 %>
result5: <% task(task5).result %>
"""
wf_service.create_workflows(wf_partial_join_triggers_once)
@ -391,7 +391,7 @@ class JoinEngineTest(base.EngineTestCase):
<% result1 in $.keys() %>,<% result2 in $.keys() %>,
<% result3 in $.keys() %>
publish:
result4: <% $.task4 %>
result4: <% task(task4).result %>
"""
wf_service.create_workflows(wf_discriminator)
@ -428,6 +428,7 @@ class JoinEngineTest(base.EngineTestCase):
main:
type: direct
output:
var1: <% $.var1 %>
var2: <% $.var2 %>
@ -455,6 +456,7 @@ class JoinEngineTest(base.EngineTestCase):
var2: true
on-success:
- done
done:
join: all
publish:
@ -462,6 +464,7 @@ class JoinEngineTest(base.EngineTestCase):
work:
type: direct
tasks:
do:
action: std.echo output="Doing..."

View File

@ -41,14 +41,14 @@ wf:
task1:
action: std.echo output=<% $.num1 %>
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
on-complete:
- task3
task2:
action: std.echo output=<% $.num2 %>
publish:
result2: <% $.task2 %>
result2: <% task(task2).result %>
on-complete:
- task3
@ -64,12 +64,12 @@ wf:
task4:
action: std.echo output=4
publish:
result: <% $.task4 %>
result: <% task(task4).result %>
task5:
action: std.echo output=5
publish:
result: <% $.task5 %>
result: <% task(task5).result %>
"""

View File

@ -647,7 +647,7 @@ class PoliciesTest(base.EngineTestCase):
retry:
count: 4
delay: 1
continue-on: <% $.task1 < 3 %>
continue-on: <% task(task1).result < 3 %>
"""
wb_service.create_workbook_v2(retry_wb)
@ -684,7 +684,7 @@ class PoliciesTest(base.EngineTestCase):
retry:
count: 4
delay: 1
continue-on: <% $.task1 <= 3 %>
continue-on: <% task(task1).result <= 3 %>
"""
wb_service.create_workbook_v2(retry_wb)
@ -799,7 +799,8 @@ class PoliciesTest(base.EngineTestCase):
workflows:
wf1:
output:
result: <% $.task1 %>
result: <% task(task1).result %>
tasks:
task1:
action: std.echo output="mocked result"

View File

@ -48,7 +48,7 @@ wf:
task1:
action: test.block
publish:
result: <% $.task1 %>
result: <% task(task1).result %>
"""
WF_SHORT_ACTION = """

View File

@ -44,12 +44,12 @@ workflows:
task1:
action: std.echo output=<% $.param1 %>
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
task2:
action: std.echo output="<% $.result1 %> & <% $.param2 %>"
publish:
result2: <% $.task2 %>
result2: <% task(task2).result %>
requires: [task1]
task3:

View File

@ -39,6 +39,7 @@ name: wb1
workflows:
wf1:
type: reverse
input:
- param1
- param2
@ -49,16 +50,17 @@ workflows:
task1:
action: std.echo output=<% $.param1 %>
publish:
result1: <% $.task1 %>
result1: <% task(task1).result %>
task2:
action: std.echo output="'<% $.param1 %> & <% $.param2 %>'"
publish:
final_result: <% $.task2 %>
final_result: <% task(task2).result %>
requires: [task1]
wf2:
type: direct
output:
slogan: <% $.slogan %>
@ -66,7 +68,7 @@ workflows:
task1:
workflow: wf1 param1='Bonnie' param2='Clyde' task_name='task2'
publish:
slogan: "<% $.task1.final_result %> is a cool movie!"
slogan: "<% task(task1).result.final_result %> is a cool movie!"
"""
WB2 = """
@ -78,14 +80,17 @@ name: wb2
workflows:
wf1:
type: direct
tasks:
task1:
workflow: wf2
wf2:
type: direct
output:
var1: <% $.does_not_exist %>
tasks:
task1:
action: std.noop

View File

@ -51,7 +51,7 @@ workflows:
with-items: name_info in <% $.names_info %>
action: std.echo output=<% $.name_info.name %>
publish:
result: <% $.task1[0] %>
result: <% task(task1).result[0] %>
"""
@ -74,7 +74,7 @@ workflows:
with-items: name_info in <% $.names_info %>
action: std.echo output="<% $.greeting %>, <% $.name_info.name %>!"
publish:
result: <% $.task1 %>
result: <% task(task1).result %>
"""
@ -99,7 +99,7 @@ workflows:
- itemY in <% $.arrayJ %>
action: std.echo output="<% $.itemX %> <% $.itemY %>"
publish:
result: <% $.task1 %>
result: <% task(task1).result %>
"""
@ -107,19 +107,22 @@ workflows:
WB_ACTION_CONTEXT = """
---
version: "2.0"
name: wb1
workflows:
wf1_with_items:
type: direct
input:
- links
tasks:
task1:
with-items: link in <% $.links %>
action: std.http url=<% $.link %>
publish:
result: <% $.task1 %>
result: <% task(task1) %>
"""
@ -496,7 +499,7 @@ class WithItemsEngineTest(base.EngineTestCase):
with-items: i in [1, 2, 3]
action: sleep_echo output=<% $.i %>
publish:
one_two_three: <% $.task1 %>
one_two_three: <% task(task1).result %>
"""
# Register random sleep action in the DB.
test_base.register_action_class('sleep_echo', RandomSleepEchoAction)

View File

@ -14,7 +14,6 @@
# limitations under the License.
import copy
import six
from oslo_config import cfg
from oslo_log import log as logging
@ -39,7 +38,7 @@ def evaluate_upstream_context(upstream_task_execs):
for t_ex in upstream_task_execs:
# TODO(rakhmerov): These two merges look confusing. So it's a
# temporary solution.There's still the bug
# temporary solution. There's still the bug
# https://bugs.launchpad.net/mistral/+bug/1424461 that needs to be
# fixed using context variable versioning.
published_vars = utils.merge_dicts(
@ -47,30 +46,9 @@ def evaluate_upstream_context(upstream_task_execs):
t_ex.published
)
utils.merge_dicts(
ctx,
evaluate_task_outbound_context(t_ex, include_result=False)
)
utils.merge_dicts(ctx, evaluate_task_outbound_context(t_ex))
ctx = utils.merge_dicts(ctx, published_vars)
# TODO(rakhmerov): IMO, this method shouldn't deal with these task ids or
# anything else related to task proxies. Need to refactor.
return utils.merge_dicts(
ctx,
_get_task_identifiers_dict(upstream_task_execs)
)
# TODO(rakhmerov): Think how to gt rid of this method and the whole trick
# with upstream tasks. It doesn't look clear from design standpoint.
def _get_task_identifiers_dict(task_execs):
tasks = {}
for task_ex in task_execs:
tasks[task_ex.id] = task_ex.name
return {"__tasks": tasks}
return utils.merge_dicts(ctx, published_vars)
def _extract_execution_result(ex):
@ -115,48 +93,11 @@ def get_task_execution_result(task_ex):
return results[0] if len(results) == 1 else results
class TaskResultProxy(object):
def __init__(self, task_id):
self.task_id = task_id
def get(self):
task_ex = db_api.get_task_execution(self.task_id)
return get_task_execution_result(task_ex)
def __str__(self):
return "%s [task_id = '%s']" % (self.__class__.__name__, self.task_id)
def __repr__(self):
return self.__str__()
class ProxyAwareDict(dict):
def __getitem__(self, item):
val = super(ProxyAwareDict, self).__getitem__(item)
if isinstance(val, TaskResultProxy):
return val.get()
return val
def get(self, k, d=None):
try:
return self.__getitem__(k)
except KeyError:
return d
def iteritems(self):
for k, _ in six.iteritems(super(ProxyAwareDict, self)):
yield k, self[k]
def to_builtin_dict(self):
return {k: self[k] for k, _ in self.iteritems()}
def publish_variables(task_ex, task_spec):
if task_ex.state != states.SUCCESS:
return
expr_ctx = extract_task_result_proxies_to_context(task_ex.in_context)
expr_ctx = task_ex.in_context
if task_ex.name in expr_ctx:
LOG.warning(
@ -164,9 +105,6 @@ def publish_variables(task_ex, task_spec):
task_ex.name
)
# Add result of current task to context for variables evaluation.
expr_ctx[task_ex.name] = TaskResultProxy(task_ex.id)
task_ex.published = expr.evaluate_recursively(
task_spec.get_publish(),
expr_ctx
@ -179,51 +117,38 @@ def destroy_task_result(task_ex):
ex.output = {}
def evaluate_task_outbound_context(task_ex, include_result=True):
def evaluate_task_outbound_context(task_ex):
"""Evaluates task outbound Data Flow context.
This method assumes that complete task output (after publisher etc.)
has already been evaluated.
:param task_ex: DB task.
:param include_result: boolean argument, if True - include the
TaskResultProxy in outbound context under <task_name> key.
:return: Outbound task Data Flow context.
"""
in_context = (copy.deepcopy(dict(task_ex.in_context))
if task_ex.in_context is not None else {})
out_ctx = utils.merge_dicts(in_context, task_ex.published)
# Add task output under key 'taskName'.
if include_result:
task_ex_result = TaskResultProxy(task_ex.id)
out_ctx = utils.merge_dicts(
out_ctx,
{task_ex.name: task_ex_result or None}
)
return ProxyAwareDict(out_ctx)
return utils.merge_dicts(in_context, task_ex.published)
def evaluate_workflow_output(wf_spec, context):
def evaluate_workflow_output(wf_spec, ctx):
"""Evaluates workflow output.
:param wf_spec: Workflow specification.
:param context: Final Data Flow context (cause task's outbound context).
:param ctx: Final Data Flow context (cause task's outbound context).
"""
# Convert context to ProxyAwareDict for correct output evaluation.
context = ProxyAwareDict(copy.deepcopy(context))
ctx = copy.deepcopy(ctx)
output_dict = wf_spec.get_output()
# Evaluate workflow 'publish' clause using the final workflow context.
output = expr.evaluate_recursively(output_dict, context)
output = expr.evaluate_recursively(output_dict, ctx)
# TODO(rakhmerov): Many don't like that we return the whole context
# TODO(rakhmerov): if 'output' is not explicitly defined.
return ProxyAwareDict(output or context).to_builtin_dict()
# if 'output' is not explicitly defined.
return output or ctx
def add_openstack_data_to_context(wf_ex):
@ -255,6 +180,7 @@ def add_environment_to_context(wf_ex):
# If env variables are provided, add an evaluated copy into the context.
if 'env' in wf_ex.params:
env = copy.deepcopy(wf_ex.params['env'])
# An env variable can be an expression of other env variables.
wf_ex.context['__env'] = expr.evaluate_recursively(env, {'__env': env})
@ -268,17 +194,6 @@ def add_workflow_variables_to_context(wf_ex, wf_spec):
)
# TODO(rakhmerov): Think how to get rid of this method. It should not be
# exposed in API.
def extract_task_result_proxies_to_context(ctx):
ctx = ProxyAwareDict(copy.deepcopy(ctx))
for task_ex_id, task_ex_name in six.iteritems(ctx['__tasks']):
ctx[task_ex_name] = TaskResultProxy(task_ex_id)
return ctx
def evaluate_object_fields(obj, context):
fields = inspect_utils.get_public_fields(obj)