diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 6e915141..eece7a31 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -145,8 +145,23 @@ def continue_task(task_ex): def complete_task(task_ex, state, state_info): task = _build_task_from_execution(task_ex) - # TODO(rakhmerov): Error handling. - task.complete(state, state_info) + try: + task.complete(state, state_info) + except exc.MistralException as e: + wf_ex = task_ex.workflow_execution + + msg = ( + "Failed to complete task [wf=%s, task=%s]: %s\n%s" % + (wf_ex, task_ex.name, e, tb.format_exc()) + ) + + LOG.error(msg) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(wf_ex, msg) + + return if task.is_completed(): wf_handler.on_task_complete(task_ex) diff --git a/mistral/tests/unit/engine/test_error_handling.py b/mistral/tests/unit/engine/test_error_handling.py index 934f1384..481d0f2f 100644 --- a/mistral/tests/unit/engine/test_error_handling.py +++ b/mistral/tests/unit/engine/test_error_handling.py @@ -250,3 +250,56 @@ class ErrorHandlingEngineTest(base.EngineTestCase): action_execs = task_ex.executions self.assertEqual(0, len(action_execs)) + + def test_action_error_with_wait_after_policy(self): + # Check that state of all workflow objects (workflow executions, + # task executions, action executions) is properly persisted in case + # of action error and task has 'wait-after' policy. It is an + # implicit test for task completion because 'wait-after' inserts + # a delay between actual task completion and logic that calculates + # next workflow commands. If an an error happens while calculating + # next commands (e.g. invalid YAQL in on-XXX clauses) then we also + # need to handle this properly, meaning that task and workflow state + # should go into ERROR state. + wf_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + wait-after: 1 + on-success: + - task2: <% invalid_yaql_function() %> + + task2: + action: std.noop + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_error(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + task_ex = self._assert_single_item( + task_execs, + name='task1', + state=states.ERROR + ) + + action_execs = task_ex.executions + + self.assertEqual(1, len(action_execs)) + + self._assert_single_item( + action_execs, + name='std.noop', + state=states.SUCCESS + )