Fixing execution state_info

* Now engine checks for all failed tasks
   and then assign state_info using messages
   from their action executions

Closes-Bug: #1476075

Change-Id: I7f6e0d0e59cf89c0204047515e6d62cc3adce304
This commit is contained in:
Nikolay Mahotkin
2015-07-20 16:11:18 +03:00
parent 0a6da145cd
commit 40d65f3d40
6 changed files with 143 additions and 10 deletions

View File

@@ -35,9 +35,9 @@ from mistral.workflow import data_flow
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
# Submodules of mistral.engine will throw NoSuchOptError if configuration
# options required at top level of this __init__.py are not imported before
# the submodules are referenced.
@@ -180,10 +180,10 @@ class DefaultEngine(base.Engine):
self._dispatch_workflow_commands(wf_ex, cmds)
self._check_workflow_completion(wf_ex, action_ex, wf_ctrl)
self._check_workflow_completion(wf_ex, wf_ctrl)
@staticmethod
def _check_workflow_completion(wf_ex, action_ex, wf_ctrl):
def _check_workflow_completion(wf_ex, wf_ctrl):
if states.is_paused_or_completed(wf_ex.state):
return
@@ -196,13 +196,7 @@ class DefaultEngine(base.Engine):
wf_ctrl.evaluate_workflow_final_context()
)
else:
result_str = (str(action_ex.output.get('result', 'Unknown'))
if action_ex.output else 'Unknown')
state_info = (
"Failure caused by error in task '%s': %s" %
(action_ex.task_execution.name, result_str)
)
state_info = wf_utils.construct_fail_info_message(wf_ctrl, wf_ex)
wf_handler.fail_workflow(wf_ex, state_info)

View File

@@ -0,0 +1,102 @@
# Copyright 2014 - Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_config import cfg
from oslo_log import log as logging
from mistral.db.v2 import api as db_api
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine import base
LOG = logging.getLogger(__name__)
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
class ExecutionStateInfoTest(base.EngineTestCase):
def test_state_info(self):
workflow = """---
version: '2.0'
test_wf:
type: direct
tasks:
task1:
action: std.fail
task2:
action: std.noop
"""
wf_service.create_workflows(workflow)
# Start workflow.
wf_ex = self.engine.start_workflow('test_wf', {})
self._await(lambda: self.is_execution_error(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("error in task 'task1'", wf_ex.state_info)
def test_state_info_two_failed_branches(self):
workflow = """---
version: '2.0'
test_wf:
type: direct
tasks:
task1:
action: std.fail
task2:
action: std.fail
"""
wf_service.create_workflows(workflow)
# Start workflow.
wf_ex = self.engine.start_workflow('test_wf', {})
self._await(lambda: self.is_execution_error(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("error in task 'task1'", wf_ex.state_info)
self.assertIn("error in task 'task2'", wf_ex.state_info)
def test_state_info_with_policies(self):
workflow = """---
version: '2.0'
test_wf:
type: direct
tasks:
task1:
action: std.fail
wait-after: 1
task2:
action: std.noop
wait-after: 3
"""
wf_service.create_workflows(workflow)
# Start workflow.
wf_ex = self.engine.start_workflow('test_wf', {})
self._await(lambda: self.is_execution_error(wf_ex.id))
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("error in task 'task1'", wf_ex.state_info)

View File

@@ -62,6 +62,15 @@ class WorkflowController(object):
return self._find_next_commands()
@abc.abstractmethod
def is_error_handled_for(self, task_ex):
"""Determines if error is handled for specific task.
:return: True if either there is no error at all or
error is considered handled.
"""
raise NotImplementedError
@abc.abstractmethod
def all_errors_handled(self):
"""Determines if all errors (if any) are handled.

View File

@@ -180,6 +180,9 @@ class DirectWorkflowController(base.WorkflowController):
return ctx
def is_error_handled_for(self, task_ex):
return bool(self.get_on_error_clause(task_ex.name))
def all_errors_handled(self):
for t_ex in wf_utils.find_error_tasks(self.wf_ex):
if not self.get_on_error_clause(t_ex.name):

View File

@@ -90,6 +90,9 @@ class ReverseWorkflowController(base.WorkflowController):
)
)
def is_error_handled_for(self, task_ex):
return task_ex.state != states.ERROR
def all_errors_handled(self):
return len(wf_utils.find_error_tasks(self.wf_ex)) == 0

View File

@@ -104,3 +104,25 @@ def find_incomplete_tasks(wf_ex):
def find_error_tasks(wf_ex):
return find_tasks_with_state(wf_ex, states.ERROR)
def construct_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = filter(
lambda t: not wf_ctrl.is_error_handled_for(t),
find_error_tasks(wf_ex)
)
errors = []
for t in failed_tasks:
errors += [
("error in task '%s': "
"%s" % (t.name, str(ex.output.get('result', 'Unknown')))
if ex.output else 'Unknown')
for ex in t.executions
]
state_info = "Failure caused by %s" % ';\n '.join(errors)
return state_info