From 1394240da57bce270235782a9f1fb6f02661d299 Mon Sep 17 00:00:00 2001 From: Winson Chan Date: Mon, 23 Nov 2015 23:05:12 +0000 Subject: [PATCH] Fix task state for YAQL error in subflow output If evaluation of YAQL in the subworkflow output, the corresponding task will be stuck in a RUNNING state. Also, error in the state_info of the subworkflow does not bubble up to the parent workflow. Change-Id: I13e9f389ad923a43caee41072979eeba578d0f3e Closes-Bug: #1518506 --- mistral/engine/task_handler.py | 16 +++-- mistral/engine/workflow_handler.py | 19 ++++-- .../test_execution_fields_size_limitation.py | 4 +- mistral/tests/unit/engine/test_state_info.py | 7 +- .../tests/unit/engine/test_subworkflows.py | 66 +++++++++++++++---- mistral/workflow/utils.py | 28 ++++---- 6 files changed, 101 insertions(+), 39 deletions(-) diff --git a/mistral/engine/task_handler.py b/mistral/engine/task_handler.py index 68d2bd80..47f191c6 100644 --- a/mistral/engine/task_handler.py +++ b/mistral/engine/task_handler.py @@ -181,17 +181,23 @@ def on_action_complete(action_ex, result): wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) task_spec = wf_spec.get_tasks()[task_ex.name] - task_state = states.SUCCESS if result.is_success() else states.ERROR + if result.is_success(): + task_state = states.SUCCESS + task_state_info = None + else: + task_state = states.ERROR + task_state_info = result.error if not task_spec.get_with_items(): - _complete_task(task_ex, task_spec, task_state) + _complete_task(task_ex, task_spec, task_state, task_state_info) else: with_items.increase_capacity(task_ex) if with_items.is_completed(task_ex): _complete_task( task_ex, task_spec, - with_items.get_final_state(task_ex) + with_items.get_final_state(task_ex), + task_state_info ) return task_ex @@ -490,12 +496,12 @@ def run_workflow(wf_name, wf_input, wf_params): ) -def _complete_task(task_ex, task_spec, state): +def _complete_task(task_ex, task_spec, state, state_info=None): # Ignore if task already completed. if states.is_completed(task_ex.state): return [] - _set_task_state(task_ex, state) + _set_task_state(task_ex, state, state_info=state_info) try: data_flow.publish_variables( diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index 41916927..224d1040 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -24,11 +24,19 @@ from mistral.workflow import utils as wf_utils def succeed_workflow(wf_ex, final_context, state_info=None): - set_execution_state(wf_ex, states.SUCCESS, state_info) - wf_spec = spec_parser.get_workflow_spec(wf_ex.spec) - wf_ex.output = data_flow.evaluate_workflow_output(wf_spec, final_context) + # Fail workflow if output is not successfully evaluated. + try: + wf_ex.output = data_flow.evaluate_workflow_output( + wf_spec, + final_context + ) + except Exception as e: + return fail_workflow(wf_ex, e.message) + + # Set workflow execution to success until after output is evaluated. + set_execution_state(wf_ex, states.SUCCESS, state_info) if wf_ex.task_execution_id: _schedule_send_result_to_parent_workflow(wf_ex) @@ -66,7 +74,10 @@ def send_result_to_parent_workflow(wf_ex_id): wf_utils.Result(data=wf_ex.output) ) elif wf_ex.state == states.ERROR: - err_msg = 'Failed subworkflow [execution_id=%s]' % wf_ex.id + err_msg = ( + wf_ex.state_info or + 'Failed subworkflow [execution_id=%s]' % wf_ex.id + ) rpc.get_engine_client().on_action_complete( wf_ex.id, diff --git a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py index abe00d62..a0c9b374 100644 --- a/mistral/tests/unit/engine/test_execution_fields_size_limitation.py +++ b/mistral/tests/unit/engine/test_execution_fields_size_limitation.py @@ -176,8 +176,8 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertEqual( - "Failure caused by error in task 'task1': ", + self.assertIn( + "Failure caused by error in tasks: task1", wf_ex.state_info ) diff --git a/mistral/tests/unit/engine/test_state_info.py b/mistral/tests/unit/engine/test_state_info.py index 186188df..d00e25d9 100644 --- a/mistral/tests/unit/engine/test_state_info.py +++ b/mistral/tests/unit/engine/test_state_info.py @@ -48,7 +48,7 @@ class ExecutionStateInfoTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertIn("error in task 'task1'", wf_ex.state_info) + self.assertIn("error in tasks: task1", wf_ex.state_info) def test_state_info_two_failed_branches(self): workflow = """--- @@ -72,8 +72,7 @@ class ExecutionStateInfoTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertIn("error in task 'task1'", wf_ex.state_info) - self.assertIn("error in task 'task2'", wf_ex.state_info) + self.assertIn("error in tasks: task1, task2", wf_ex.state_info) def test_state_info_with_policies(self): workflow = """--- @@ -99,4 +98,4 @@ class ExecutionStateInfoTest(base.EngineTestCase): # Note: We need to reread execution to access related tasks. wf_ex = db_api.get_workflow_execution(wf_ex.id) - self.assertIn("error in task 'task1'", wf_ex.state_info) + self.assertIn("error in tasks: task1", wf_ex.state_info) diff --git a/mistral/tests/unit/engine/test_subworkflows.py b/mistral/tests/unit/engine/test_subworkflows.py index 8ecadfb4..1c6a7341 100644 --- a/mistral/tests/unit/engine/test_subworkflows.py +++ b/mistral/tests/unit/engine/test_subworkflows.py @@ -23,6 +23,7 @@ from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.services import workbooks as wb_service from mistral.tests.unit.engine import base +from mistral.workflow import states LOG = logging.getLogger(__name__) @@ -31,11 +32,11 @@ LOG = logging.getLogger(__name__) cfg.CONF.set_default('auth_enable', False, group='pecan') -WORKBOOK = """ +WB1 = """ --- version: '2.0' -name: my_wb +name: wb1 workflows: wf1: @@ -70,15 +71,38 @@ workflows: slogan: "<% $.task1.final_result %> is a cool movie!" """ +WB2 = """ +--- +version: '2.0' + +name: wb2 + +workflows: + wf1: + type: direct + tasks: + task1: + workflow: wf2 + + wf2: + type: direct + output: + var1: <% $.does_not_exist %> + tasks: + task1: + action: std.noop +""" + class SubworkflowsTest(base.EngineTestCase): def setUp(self): super(SubworkflowsTest, self).setUp() - wb_service.create_workbook_v2(WORKBOOK) + wb_service.create_workbook_v2(WB1) + wb_service.create_workbook_v2(WB2) def test_subworkflow_success(self): - wf2_ex = self.engine.start_workflow('my_wb.wf2', None) + wf2_ex = self.engine.start_workflow('wb1.wf2', None) project_id = auth_context.ctx().project_id @@ -95,8 +119,8 @@ class SubworkflowsTest(base.EngineTestCase): self.assertEqual(2, len(wf_execs)) # Execution of 'wf2'. - wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1') - wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2') + wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') self.assertEqual(project_id, wf1_ex.project_id) self.assertIsNotNone(wf1_ex.task_execution_id) @@ -154,7 +178,7 @@ class SubworkflowsTest(base.EngineTestCase): @mock.patch.object(std_actions.EchoAction, 'run', mock.MagicMock(side_effect=exc.ActionException)) def test_subworkflow_error(self): - wf2_ex = self.engine.start_workflow('my_wb.wf2', None) + wf2_ex = self.engine.start_workflow('wb1.wf2', None) self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5) @@ -162,8 +186,8 @@ class SubworkflowsTest(base.EngineTestCase): self.assertEqual(2, len(wf_execs)) - wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1') - wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2') + wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') # Wait till workflow 'wf1' is completed. self._await(lambda: self.is_execution_error(wf1_ex.id)) @@ -171,10 +195,28 @@ class SubworkflowsTest(base.EngineTestCase): # Wait till workflow 'wf2' is completed, its state must be ERROR. self._await(lambda: self.is_execution_error(wf2_ex.id)) + def test_subworkflow_yaql_error(self): + wf_ex = self.engine.start_workflow('wb2.wf1', None) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + wf_execs = db_api.get_workflow_executions() + + self.assertEqual(2, len(wf_execs)) + + wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2') + self.assertEqual(states.ERROR, wf2_ex.state) + self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info) + + # Ensure error message is bubbled up to the main workflow. + wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1') + self.assertEqual(states.ERROR, wf1_ex.state) + self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info) + def test_subworkflow_environment_inheritance(self): env = {'key1': 'abc'} - wf2_ex = self.engine.start_workflow('my_wb.wf2', None, env=env) + wf2_ex = self.engine.start_workflow('wb1.wf2', None, env=env) # Execution of 'wf2'. self.assertIsNotNone(wf2_ex) @@ -188,8 +230,8 @@ class SubworkflowsTest(base.EngineTestCase): self.assertEqual(2, len(wf_execs)) # Execution of 'wf1'. - wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1') - wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2') + wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1') + wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2') expected_start_params = { 'task_name': 'task2', diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index 42753031..7c499501 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -109,21 +109,25 @@ def find_error_task_executions(wf_ex): def construct_fail_info_message(wf_ctrl, wf_ex): # Try to find where error is exactly. - failed_tasks = filter( - lambda t: not wf_ctrl.is_error_handled_for(t), - find_error_task_executions(wf_ex) + failed_tasks = sorted( + filter( + lambda t: not wf_ctrl.is_error_handled_for(t), + find_error_task_executions(wf_ex) + ), + key=lambda t: t.name ) - errors = [] + msg = ('Failure caused by error in tasks: %s\n' % + ', '.join([t.name for t in failed_tasks])) for t in failed_tasks: - errors += [ - ("error in task '%s': " - "%s" % (t.name, str(ex.output.get('result', 'Unknown'))) - if ex.output else 'Unknown') - for ex in t.executions - ] + msg += '\n %s -> ' % t.name - state_info = "Failure caused by %s" % ';\n '.join(errors) + if t.state_info: + msg += t.state_info + '\n' + else: + for i, ex in enumerate(t.executions): + output = (ex.output or dict()).get('result', 'Unknown') + msg += ' action execution #%s: %s\n' % (i + 1, str(output)) - return state_info + return msg