Fixing 'join' task completion logic

* The entire workflow was failing if one of a join task's inbound
  tasks failed even if there was 'on-error' clause in task
  defaults.

Change-Id: Ie5864740ed6356d87bcedf9a6cfd0e733e8795a8
Closes-Bug: #1635841
This commit is contained in:
Renat Akhmerov 2016-10-22 21:39:33 +07:00
parent 20c3bcc251
commit 9ce3a75aa9
4 changed files with 66 additions and 12 deletions

View File

@ -44,7 +44,7 @@ def on_action_complete(action_ex, result):
action.fail(msg)
if task_ex:
task_handler.fail_task(task_ex, msg)
task_handler.force_fail_task(task_ex, msg)
return

View File

@ -68,7 +68,7 @@ def run_task(wf_cmd):
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
wf_handler.force_fail_workflow(wf_ex, msg)
return
@ -113,12 +113,24 @@ def _on_action_complete(action_ex):
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
wf_handler.force_fail_workflow(wf_ex, msg)
return
def fail_task(task_ex, msg):
def force_fail_task(task_ex, msg):
"""Forces the given task to fail.
This method implements the 'forced' task fail without giving a chance
to a workflow controller to handle the error. Its main purpose is to
reflect errors caused by workflow structure (errors 'publish', 'on-xxx'
clauses etc.) rather than failed actions. If such an error happens
we should also force the entire workflow to fail. I.e., this kind of
error must be propagated to a higher level, to the workflow.
:param task_ex: Task execution.
:param msg: Error message.
"""
wf_spec = spec_parser.get_workflow_spec_by_execution_id(
task_ex.workflow_execution_id
)
@ -127,7 +139,7 @@ def fail_task(task_ex, msg):
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(task_ex.workflow_execution, msg)
wf_handler.force_fail_workflow(task_ex.workflow_execution, msg)
def continue_task(task_ex):
@ -153,7 +165,7 @@ def continue_task(task_ex):
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
wf_handler.force_fail_workflow(wf_ex, msg)
return
@ -179,7 +191,7 @@ def complete_task(task_ex, state, state_info):
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
wf_handler.force_fail_workflow(wf_ex, msg)
return
@ -263,7 +275,9 @@ def _refresh_task_state(task_ex_id):
if state == states.RUNNING:
continue_task(task_ex)
elif state == states.ERROR:
fail_task(task_ex, state_info)
task = _build_task_from_execution(wf_spec, task_ex)
task.complete(state, state_info)
elif state == states.WAITING:
# Let's assume that a task takes 0.01 sec in average to complete
# and based on this assumption calculate a time of the next check.

View File

@ -69,7 +69,7 @@ def stop_workflow(wf_ex, state, msg=None):
stop_workflow(sub_wf_ex, state, msg=msg)
def fail_workflow(wf_ex, msg=None):
def force_fail_workflow(wf_ex, msg=None):
stop_workflow(wf_ex, states.ERROR, msg)
@ -101,7 +101,7 @@ def _check_and_complete(wf_ex_id):
LOG.error(msg)
fail_workflow(wf.wf_ex, msg)
force_fail_workflow(wf.wf_ex, msg)
return

View File

@ -812,7 +812,7 @@ class JoinEngineTest(base.EngineTestCase):
self._assert_multiple_items(task_execs, 4, state=states.SUCCESS)
def delete_join_completion_check_on_stop(self):
def test_delete_join_completion_check_on_stop(self):
wf_text = """---
version: '2.0'
@ -867,7 +867,7 @@ class JoinEngineTest(base.EngineTestCase):
len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0
)
def delete_join_completion_check_on_execution_delete(self):
def test_delete_join_completion_check_on_execution_delete(self):
wf_text = """---
version: '2.0'
@ -921,3 +921,43 @@ class JoinEngineTest(base.EngineTestCase):
lambda:
len(db_api.get_delayed_calls(target_method_name=mtd_name)) == 0
)
def test_no_workflow_error_after_inbound_error(self):
wf_text = """---
version: "2.0"
wf:
output:
continue_flag: <% $.get(continue_flag) %>
task-defaults:
on-error:
- change_continue_flag
tasks:
task_a:
action: std.fail
on-success:
- task_c: <% $.get(continue_flag) = null %>
- task_a_process
task_a_process:
action: std.noop
task_b:
on-success:
- task_c: <% $.get(continue_flag) = null %>
task_c:
join: all
change_continue_flag:
publish:
continue_flag: false
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_success(wf_ex.id)