Add proper handling for implicit task completion

* 'wait-after' policy inserts a delay into normal task execution flow
  so that, break happens right after task has completed but next workflow
  commands are not yet calculated. Since this calculation runs separately
  after scheduler delay, if something bad happens at this stage it should
  also be handled properly, workflow and task state should go into ERROR,
  but not action (if action completed successfully).
  This patch adds error handling and the test to check this.

Change-Id: Ie2b9f7fa2a7f45e0236497d3134a749f431b20e1
This commit is contained in:
Renat Akhmerov 2016-07-08 15:50:42 +07:00
parent 633eb0fe6d
commit 87a75cb2df
2 changed files with 70 additions and 2 deletions

View File

@ -145,8 +145,23 @@ def continue_task(task_ex):
def complete_task(task_ex, state, state_info):
task = _build_task_from_execution(task_ex)
# TODO(rakhmerov): Error handling.
task.complete(state, state_info)
try:
task.complete(state, state_info)
except exc.MistralException as e:
wf_ex = task_ex.workflow_execution
msg = (
"Failed to complete task [wf=%s, task=%s]: %s\n%s" %
(wf_ex, task_ex.name, e, tb.format_exc())
)
LOG.error(msg)
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
return
if task.is_completed():
wf_handler.on_task_complete(task_ex)

View File

@ -250,3 +250,56 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
action_execs = task_ex.executions
self.assertEqual(0, len(action_execs))
def test_action_error_with_wait_after_policy(self):
# Check that state of all workflow objects (workflow executions,
# task executions, action executions) is properly persisted in case
# of action error and task has 'wait-after' policy. It is an
# implicit test for task completion because 'wait-after' inserts
# a delay between actual task completion and logic that calculates
# next workflow commands. If an an error happens while calculating
# next commands (e.g. invalid YAQL in on-XXX clauses) then we also
# need to handle this properly, meaning that task and workflow state
# should go into ERROR state.
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.noop
wait-after: 1
on-success:
- task2: <% invalid_yaql_function() %>
task2:
action: std.noop
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task_ex = self._assert_single_item(
task_execs,
name='task1',
state=states.ERROR
)
action_execs = task_ex.executions
self.assertEqual(1, len(action_execs))
self._assert_single_item(
action_execs,
name='std.noop',
state=states.SUCCESS
)