From 87a75cb2dfb9d75a36b8e837071132577ec17fde Mon Sep 17 00:00:00 2001 From: Renat Akhmerov Date: Fri, 8 Jul 2016 15:50:42 +0700 Subject: [PATCH] Add proper handling for implicit task completion * 'wait-after' policy inserts a delay into normal task execution flow so that, break happens right after task has completed but next workflow commands are not yet calculated. Since this calculation runs separately after scheduler delay, if something bad happens at this stage it should also be handled properly, workflow and task state should go into ERROR, but not action (if action completed successfully). This patch adds error handling and the test to check this. Change-Id: Ie2b9f7fa2a7f45e0236497d3134a749f431b20e1 --- mistral/engine/task_handler.py | 19 ++++++- .../tests/unit/engine/test_error_handling.py | 53 +++++++++++++++++++ 2 files changed, 70 insertions(+), 2 deletions(-) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 6e915141..eece7a31 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -145,8 +145,23 @@ def continue_task(task_ex): def complete_task(task_ex, state, state_info): task = _build_task_from_execution(task_ex) - # TODO(rakhmerov): Error handling. - task.complete(state, state_info) + try: + task.complete(state, state_info) + except exc.MistralException as e: + wf_ex = task_ex.workflow_execution + + msg = ( + "Failed to complete task [wf=%s, task=%s]: %s\n%s" % + (wf_ex, task_ex.name, e, tb.format_exc()) + ) + + LOG.error(msg) + + task.set_state(states.ERROR, msg) + + wf_handler.fail_workflow(wf_ex, msg) + + return if task.is_completed(): wf_handler.on_task_complete(task_ex) diff --git a/mistral/tests/unit/engine/test_error_handling.py b/mistral/tests/unit/engine/test_error_handling.py index 934f1384..481d0f2f 100644 --- a/mistral/tests/unit/engine/test_error_handling.py +++ b/mistral/tests/unit/engine/test_error_handling.py @@ -250,3 +250,56 @@ class ErrorHandlingEngineTest(base.EngineTestCase): action_execs = task_ex.executions self.assertEqual(0, len(action_execs)) + + def test_action_error_with_wait_after_policy(self): + # Check that state of all workflow objects (workflow executions, + # task executions, action executions) is properly persisted in case + # of action error and task has 'wait-after' policy. It is an + # implicit test for task completion because 'wait-after' inserts + # a delay between actual task completion and logic that calculates + # next workflow commands. If an an error happens while calculating + # next commands (e.g. invalid YAQL in on-XXX clauses) then we also + # need to handle this properly, meaning that task and workflow state + # should go into ERROR state. + wf_text = """ + version: '2.0' + + wf: + tasks: + task1: + action: std.noop + wait-after: 1 + on-success: + - task2: <% invalid_yaql_function() %> + + task2: + action: std.noop + """ + + wf_service.create_workflows(wf_text) + + wf_ex = self.engine.start_workflow('wf', {}) + + self.await_workflow_error(wf_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_execs = wf_ex.task_executions + + self.assertEqual(1, len(task_execs)) + + task_ex = self._assert_single_item( + task_execs, + name='task1', + state=states.ERROR + ) + + action_execs = task_ex.executions + + self.assertEqual(1, len(action_execs)) + + self._assert_single_item( + action_execs, + name='std.noop', + state=states.SUCCESS + )