Task publish is ignored silently

bug-fix, on-success, on complete published variables are not merged
 with regular published variables

Closes-Bug: #1791449
Change-Id: Iba05b9acbfea06214a3ab93911d2eaaa4729278f
This commit is contained in:
ali 2019-09-25 11:21:54 +00:00
parent fd24972bef
commit c7a54d22df
3 changed files with 226 additions and 10 deletions

View File

@ -16,6 +16,7 @@
from mistral import exceptions as exc
from mistral.lang import types
from mistral.lang.v2 import base
from mistral_lib import utils
class PublishSpec(base.BaseSpec):
@ -59,3 +60,12 @@ class PublishSpec(base.BaseSpec):
def get_atomic(self):
return self._atomic
def merge(self, spec_to_merge):
if spec_to_merge:
if spec_to_merge.get_branch():
utils.merge_dicts(self._branch, spec_to_merge.get_branch())
if spec_to_merge.get_global():
utils.merge_dicts(self._global, spec_to_merge.get_global())
if spec_to_merge.get_atomic():
utils.merge_dicts(self._atomic, spec_to_merge.get_atomic())

View File

@ -248,7 +248,6 @@ class TaskSpec(base.BaseSpec):
{'branch': self._publish_on_error},
validate=self._validate
)
return spec
def get_keep_result(self):
@ -323,22 +322,23 @@ class DirectWorkflowTaskSpec(TaskSpec):
def get_publish(self, state):
spec = super(DirectWorkflowTaskSpec, self).get_publish(state)
# TODO(rakhmerov): How do we need to resolve a possible conflict
# between 'on-complete' and 'on-success/on-error' and
# 'publish/publish-on-error'? For now we assume that 'on-error'
# and 'on-success' take precedence over on-complete.
on_clause = self._on_complete
if self._on_complete and self._on_complete.get_publish():
if spec:
spec.merge(self._on_complete.get_publish())
else:
spec = self._on_complete.get_publish()
if state == states.SUCCESS:
on_clause = self._on_success
elif state == states.ERROR:
on_clause = self._on_error
if not on_clause:
return spec
if on_clause and on_clause.get_publish():
if spec:
on_clause.get_publish().merge(spec)
return on_clause.get_publish()
return on_clause.get_publish() or spec
return spec
def get_join(self):
return self._join

View File

@ -616,6 +616,212 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
wf_output['result']
)
def test_publish_with_all(self):
wf_text = """---
version: '2.0'
wf:
tasks:
main-task:
publish:
res_x1: 111
on-complete:
next: complete-task
publish:
branch:
res_x3: 222
on-success:
next: success-task
publish:
branch:
res_x2: 222
success-task:
action: std.noop
publish:
success_x2: <% $.res_x2 %>
success_x1: <% $.res_x1 %>
complete-task:
action: std.noop
publish:
complete_x2: <% $.res_x3 %>
complete_x1: <% $.res_x1 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x3', 'res_x2', 'res_x1'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
complete_task = self._assert_single_item(tasks,
name='complete-task')
complete_task_published_vars = complete_task.get("published")
expected_complete_variables = {'complete_x2', 'complete_x1'}
self.assertEqual(set(complete_task_published_vars.keys()),
expected_complete_variables)
success_task = self._assert_single_item(tasks, name='success-task')
success_task_published_vars = success_task.get("published")
expected_success_variables = {'success_x2', 'success_x1'}
self.assertEqual(set(success_task_published_vars.keys()),
expected_success_variables)
all_expected_published_variables = expected_main_variables.union(
expected_success_variables,
expected_complete_variables
)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_publish_no_success(self):
wf_text = """---
version: '2.0'
wf:
tasks:
main-task:
publish:
res_x1: 111
on-complete:
next: complete-task
publish:
branch:
res_x3: 222
complete-task:
action: std.noop
publish:
complete_x2: <% $.res_x3 %>
complete_x1: <% $.res_x1 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
wf_output = wf_ex.output
tasks = wf_ex.task_executions
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x3', 'res_x1'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
complete_task = self._assert_single_item(tasks,
name='complete-task')
complete_task_published_vars = complete_task.get("published")
expected_complete_variables = {'complete_x2', 'complete_x1'}
self.assertEqual(set(complete_task_published_vars.keys()),
expected_complete_variables)
all_expected_published_variables = expected_main_variables.union(
expected_complete_variables)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_publish_no_complete(self):
wf_text = """---
version: '2.0'
wf:
tasks:
main-task:
publish:
res_x1: 111
on-success:
next: success-task
publish:
branch:
res_x2: 222
success-task:
action: std.noop
publish:
success_x2: <% $.res_x2 %>
success_x1: <% $.res_x1 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
wf_output = wf_ex.output
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x2', 'res_x1'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
success_task = self._assert_single_item(tasks, name='success-task')
success_task_published_vars = success_task.get("published")
expected_success_variables = {'success_x2', 'success_x1'}
self.assertEqual(set(success_task_published_vars.keys()),
expected_success_variables)
all_expected_published_variables = expected_main_variables.union(
expected_success_variables)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_publish_no_regular_publish(self):
wf_text = """---
version: '2.0'
wf2:
tasks:
main-task:
on-success:
next: success-task
publish:
branch:
res_x2: 222
success-task:
action: std.noop
publish:
success_x2: <% $.res_x2 %>
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf2')
self.await_workflow_success(wf_ex.id)
with db_api.transaction():
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
tasks = wf_ex.task_executions
wf_output = wf_ex.output
main_task = self._assert_single_item(tasks, name='main-task')
main_task_published_vars = main_task.get("published")
expected_main_variables = {'res_x2'}
self.assertEqual(set(main_task_published_vars.keys()),
expected_main_variables)
success_task = self._assert_single_item(tasks, name='success-task')
success_task_published_vars = success_task.get("published")
expected_success_variables = {'success_x2'}
self.assertEqual(set(success_task_published_vars.keys()),
expected_success_variables)
all_expected_published_variables = expected_main_variables.union(
expected_success_variables)
self.assertEqual(set(wf_output), all_expected_published_variables)
def test_output_on_error_wb_yaql_failed(self):
wb_text = """---
version: '2.0'