Advanced publishing: add publishing of global variables
Partially implements: blueprint mistral-advanced-publishing-global-vars Change-Id: I964e367063f7e9a846b86c1057116d202d62715a
This commit is contained in:
parent
6c6e212688
commit
e721e7ff23
@ -713,7 +713,7 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
|
|||||||
|
|
||||||
self.assertDictEqual(wf_input['a'], task1.published['published_a'])
|
self.assertDictEqual(wf_input['a'], task1.published['published_a'])
|
||||||
|
|
||||||
def test_advanced_publishing_branch(self):
|
def test_branch_publishing_success(self):
|
||||||
wf_text = """---
|
wf_text = """---
|
||||||
version: 2.0
|
version: 2.0
|
||||||
|
|
||||||
@ -750,6 +750,88 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
|
|||||||
|
|
||||||
self.assertDictEqual({"my_var": "my branch value"}, task1.published)
|
self.assertDictEqual({"my_var": "my branch value"}, task1.published)
|
||||||
|
|
||||||
|
def test_global_publishing_success_access_via_root_context_(self):
|
||||||
|
wf_text = """---
|
||||||
|
version: '2.0'
|
||||||
|
|
||||||
|
wf:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
action: std.echo output="Hi"
|
||||||
|
on-success:
|
||||||
|
publish:
|
||||||
|
global:
|
||||||
|
my_var: <% task().result %>
|
||||||
|
next:
|
||||||
|
- task2
|
||||||
|
|
||||||
|
task2:
|
||||||
|
action: std.echo output=<% $.my_var %>
|
||||||
|
publish:
|
||||||
|
result: <% task().result %>
|
||||||
|
"""
|
||||||
|
|
||||||
|
wf_service.create_workflows(wf_text)
|
||||||
|
|
||||||
|
wf_ex = self.engine.start_workflow('wf', {})
|
||||||
|
|
||||||
|
self.await_workflow_success(wf_ex.id)
|
||||||
|
|
||||||
|
with db_api.transaction():
|
||||||
|
# Note: We need to reread execution to access related tasks.
|
||||||
|
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||||
|
|
||||||
|
tasks = wf_ex.task_executions
|
||||||
|
|
||||||
|
self._assert_single_item(tasks, name='task1')
|
||||||
|
task2 = self._assert_single_item(tasks, name='task2')
|
||||||
|
|
||||||
|
self.assertDictEqual({'result': 'Hi'}, task2.published)
|
||||||
|
|
||||||
|
def test_global_publishing_error_access_via_root_context(self):
|
||||||
|
wf_text = """---
|
||||||
|
version: '2.0'
|
||||||
|
|
||||||
|
wf:
|
||||||
|
tasks:
|
||||||
|
task1:
|
||||||
|
action: std.fail
|
||||||
|
on-success:
|
||||||
|
publish:
|
||||||
|
global:
|
||||||
|
my_var: "We got success"
|
||||||
|
next:
|
||||||
|
- task2
|
||||||
|
on-error:
|
||||||
|
publish:
|
||||||
|
global:
|
||||||
|
my_var: "We got an error"
|
||||||
|
next:
|
||||||
|
- task2
|
||||||
|
|
||||||
|
task2:
|
||||||
|
action: std.echo output=<% $.my_var %>
|
||||||
|
publish:
|
||||||
|
result: <% task().result %>
|
||||||
|
"""
|
||||||
|
|
||||||
|
wf_service.create_workflows(wf_text)
|
||||||
|
|
||||||
|
wf_ex = self.engine.start_workflow('wf', {})
|
||||||
|
|
||||||
|
self.await_workflow_success(wf_ex.id)
|
||||||
|
|
||||||
|
with db_api.transaction():
|
||||||
|
# Note: We need to reread execution to access related tasks.
|
||||||
|
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||||
|
|
||||||
|
tasks = wf_ex.task_executions
|
||||||
|
|
||||||
|
self._assert_single_item(tasks, name='task1')
|
||||||
|
task2 = self._assert_single_item(tasks, name='task2')
|
||||||
|
|
||||||
|
self.assertDictEqual({'result': 'We got an error'}, task2.published)
|
||||||
|
|
||||||
|
|
||||||
class DataFlowTest(test_base.BaseTest):
|
class DataFlowTest(test_base.BaseTest):
|
||||||
def test_get_task_execution_result(self):
|
def test_get_task_execution_result(self):
|
||||||
|
@ -203,12 +203,21 @@ def publish_variables(task_ex, task_spec):
|
|||||||
if not publish_spec:
|
if not publish_spec:
|
||||||
return
|
return
|
||||||
|
|
||||||
|
# Publish branch variables.
|
||||||
branch_vars = publish_spec.get_branch()
|
branch_vars = publish_spec.get_branch()
|
||||||
|
|
||||||
task_ex.published = expr.evaluate_recursively(branch_vars, expr_ctx)
|
task_ex.published = expr.evaluate_recursively(branch_vars, expr_ctx)
|
||||||
|
|
||||||
|
# Publish global variables.
|
||||||
|
global_vars = publish_spec.get_global()
|
||||||
|
|
||||||
|
utils.merge_dicts(
|
||||||
|
task_ex.workflow_execution.context,
|
||||||
|
expr.evaluate_recursively(global_vars, expr_ctx)
|
||||||
|
)
|
||||||
|
|
||||||
# TODO(rakhmerov):
|
# TODO(rakhmerov):
|
||||||
# 1. Publish global and atomic variables.
|
# 1. Publish atomic variables.
|
||||||
# 2. Add the field "publish" in TaskExecution model similar to "published"
|
# 2. Add the field "publish" in TaskExecution model similar to "published"
|
||||||
# but containing info as
|
# but containing info as
|
||||||
# {'branch': {vars}, 'global': {vars}, 'atomic': {vars}}
|
# {'branch': {vars}, 'global': {vars}, 'atomic': {vars}}
|
||||||
|
Loading…
Reference in New Issue
Block a user