Fix join on branch error
If a workflow has multiple branches that join on a task and there is one or more unhandled errors in upstream tasks at these branches, the workflow execution state will be RUNNING indefinitely. In this case, the workflow execution will never reach completion and the engine should fail the workflow. Change-Id: I237ed0ac481f946de1626e8d7936cfb7bf9081d6 Closes-Bug: #1472790
This commit is contained in:
parent
6b2976aead
commit
9edd86dba5
@ -200,7 +200,14 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
if states.is_paused_or_completed(wf_ex.state):
|
||||
return
|
||||
|
||||
if wf_utils.find_incomplete_task_executions(wf_ex):
|
||||
# Workflow is not completed if there are any incomplete task
|
||||
# executions that are not in WAITING state. If all incomplete
|
||||
# tasks are waiting and there are unhandled errors, then these
|
||||
# tasks will not reach completion. In this case, mark the
|
||||
# workflow complete.
|
||||
incomplete_tasks = wf_utils.find_incomplete_task_executions(wf_ex)
|
||||
|
||||
if any(not states.is_waiting(t.state) for t in incomplete_tasks):
|
||||
return
|
||||
|
||||
if wf_ctrl.all_errors_handled():
|
||||
|
@ -513,8 +513,8 @@ class DirectWorkflowRerunTest(base.EngineTestCase):
|
||||
states.SUCCESS))
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
|
||||
self.assertEqual(states.RUNNING, wf_ex.state)
|
||||
self.assertIsNone(wf_ex.state_info)
|
||||
self.assertEqual(states.ERROR, wf_ex.state)
|
||||
self.assertIsNotNone(wf_ex.state_info)
|
||||
self.assertEqual(3, len(wf_ex.task_executions))
|
||||
|
||||
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
|
||||
|
@ -556,3 +556,61 @@ class JoinEngineTest(base.EngineTestCase):
|
||||
},
|
||||
exec_db.output
|
||||
)
|
||||
|
||||
def test_full_join_with_branch_errors(self):
|
||||
wf_full_join_with_errors = """---
|
||||
version: '2.0'
|
||||
|
||||
main:
|
||||
type: direct
|
||||
|
||||
tasks:
|
||||
task10:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task21
|
||||
- task31
|
||||
|
||||
task21:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task22
|
||||
task22:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task40
|
||||
|
||||
task31:
|
||||
action: std.fail
|
||||
on-success:
|
||||
- task32
|
||||
task32:
|
||||
action: std.noop
|
||||
on-success:
|
||||
- task40
|
||||
|
||||
task40:
|
||||
join: all
|
||||
action: std.noop
|
||||
"""
|
||||
|
||||
wf_service.create_workflows(wf_full_join_with_errors)
|
||||
wf_ex = self.engine.start_workflow('main', {})
|
||||
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
tasks = wf_ex.task_executions
|
||||
|
||||
task10 = self._assert_single_item(tasks, name='task10')
|
||||
task21 = self._assert_single_item(tasks, name='task21')
|
||||
task22 = self._assert_single_item(tasks, name='task22')
|
||||
task31 = self._assert_single_item(tasks, name='task31')
|
||||
task40 = self._assert_single_item(tasks, name='task40')
|
||||
|
||||
self.assertEqual(states.SUCCESS, task10.state)
|
||||
self.assertEqual(states.SUCCESS, task21.state)
|
||||
self.assertEqual(states.SUCCESS, task22.state)
|
||||
self.assertEqual(states.ERROR, task31.state)
|
||||
self.assertNotIn('task32', [task.name for task in tasks])
|
||||
self.assertEqual(states.WAITING, task40.state)
|
||||
|
@ -287,7 +287,7 @@ class DirectWorkflowController(base.WorkflowController):
|
||||
)
|
||||
|
||||
# TODO(rakhmerov): Temporary hack. See the previous comment.
|
||||
in_t_ex = in_t_execs[-1]
|
||||
in_t_ex = in_t_execs[-1] if in_t_execs else None
|
||||
|
||||
if not in_t_ex or not states.is_completed(in_t_ex.state):
|
||||
return False
|
||||
|
Loading…
x
Reference in New Issue
Block a user