diff --git a/mistral/engine/default_engine.py b/mistral/engine/default_engine.py index c4949f232..ae1dd17bf 100644 --- a/mistral/engine/default_engine.py +++ b/mistral/engine/default_engine.py @@ -35,9 +35,9 @@ from mistral.workflow import data_flow from mistral.workflow import states from mistral.workflow import utils as wf_utils - LOG = logging.getLogger(__name__) + # Submodules of mistral.engine will throw NoSuchOptError if configuration # options required at top level of this __init__.py are not imported before # the submodules are referenced. @@ -180,10 +180,10 @@ class DefaultEngine(base.Engine): self._dispatch_workflow_commands(wf_ex, cmds) - self._check_workflow_completion(wf_ex, action_ex, wf_ctrl) + self._check_workflow_completion(wf_ex, wf_ctrl) @staticmethod - def _check_workflow_completion(wf_ex, action_ex, wf_ctrl): + def _check_workflow_completion(wf_ex, wf_ctrl): if states.is_paused_or_completed(wf_ex.state): return @@ -196,13 +196,7 @@ class DefaultEngine(base.Engine): wf_ctrl.evaluate_workflow_final_context() ) else: - result_str = (str(action_ex.output.get('result', 'Unknown')) - if action_ex.output else 'Unknown') - - state_info = ( - "Failure caused by error in task '%s': %s" % - (action_ex.task_execution.name, result_str) - ) + state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex) wf_handler.fail_workflow(wf_ex, state_info) diff --git a/mistral/tests/unit/engine/test_state_info.py b/mistral/tests/unit/engine/test_state_info.py new file mode 100644 index 000000000..186188df6 --- /dev/null +++ b/mistral/tests/unit/engine/test_state_info.py @@ -0,0 +1,102 @@ +# Copyright 2014 - Mirantis, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_config import cfg +from oslo_log import log as logging + +from mistral.db.v2 import api as db_api +from mistral.services import workflows as wf_service +from mistral.tests.unit.engine import base + +LOG = logging.getLogger(__name__) +# Use the set_default method to set value otherwise in certain test cases +# the change in value is not permanent. +cfg.CONF.set_default('auth_enable', False, group='pecan') + + +class ExecutionStateInfoTest(base.EngineTestCase): + def test_state_info(self): + workflow = """--- + version: '2.0' + test_wf: + type: direct + tasks: + task1: + action: std.fail + + task2: + action: std.noop + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow('test_wf', {}) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertIn("error in task 'task1'", wf_ex.state_info) + + def test_state_info_two_failed_branches(self): + workflow = """--- + version: '2.0' + test_wf: + type: direct + tasks: + task1: + action: std.fail + + task2: + action: std.fail + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow('test_wf', {}) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertIn("error in task 'task1'", wf_ex.state_info) + self.assertIn("error in task 'task2'", wf_ex.state_info) + + def test_state_info_with_policies(self): + workflow = """--- + version: '2.0' + test_wf: + type: direct + tasks: + task1: + action: std.fail + wait-after: 1 + + task2: + action: std.noop + wait-after: 3 + """ + wf_service.create_workflows(workflow) + + # Start workflow. + wf_ex = self.engine.start_workflow('test_wf', {}) + + self._await(lambda: self.is_execution_error(wf_ex.id)) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertIn("error in task 'task1'", wf_ex.state_info) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index cec0386e0..767bbad8c 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -62,6 +62,15 @@ class WorkflowController(object): return self._find_next_commands() + @abc.abstractmethod + def is_error_handled_for(self, task_ex): + """Determines if error is handled for specific task. + + :return: True if either there is no error at all or + error is considered handled. + """ + raise NotImplementedError + @abc.abstractmethod def all_errors_handled(self): """Determines if all errors (if any) are handled. diff --git a/mistral/workflow/direct_workflow.py b/mistral/workflow/direct_workflow.py index f1ccc393d..37df31f64 100644 --- a/mistral/workflow/direct_workflow.py +++ b/mistral/workflow/direct_workflow.py @@ -180,6 +180,9 @@ class DirectWorkflowController(base.WorkflowController): return ctx + def is_error_handled_for(self, task_ex): + return bool(self.get_on_error_clause(task_ex.name)) + def all_errors_handled(self): for t_ex in wf_utils.find_error_tasks(self.wf_ex): if not self.get_on_error_clause(t_ex.name): diff --git a/mistral/workflow/reverse_workflow.py b/mistral/workflow/reverse_workflow.py index e99bbf8b2..53d637b7d 100644 --- a/mistral/workflow/reverse_workflow.py +++ b/mistral/workflow/reverse_workflow.py @@ -90,6 +90,9 @@ class ReverseWorkflowController(base.WorkflowController): ) ) + def is_error_handled_for(self, task_ex): + return task_ex.state != states.ERROR + def all_errors_handled(self): return len(wf_utils.find_error_tasks(self.wf_ex)) == 0 diff --git a/mistral/workflow/utils.py b/mistral/workflow/utils.py index c819a73c9..e9df29aa9 100644 --- a/mistral/workflow/utils.py +++ b/mistral/workflow/utils.py @@ -104,3 +104,25 @@ def find_incomplete_tasks(wf_ex): def find_error_tasks(wf_ex): return find_tasks_with_state(wf_ex, states.ERROR) + + +def construct_fail_info_message(wf_ctrl, wf_ex): + # Try to find where error is exactly. + failed_tasks = filter( + lambda t: not wf_ctrl.is_error_handled_for(t), + find_error_tasks(wf_ex) + ) + + errors = [] + + for t in failed_tasks: + errors += [ + ("error in task '%s': " + "%s" % (t.name, str(ex.output.get('result', 'Unknown'))) + if ex.output else 'Unknown') + for ex in t.executions + ] + + state_info = "Failure caused by %s" % ';\n '.join(errors) + + return state_info