Merge "Fix task state for YAQL error in subflow output"

This commit is contained in:
Jenkins 2015-11-26 12:36:16 +00:00 committed by Gerrit Code Review
commit edeb6fe67c
6 changed files with 101 additions and 39 deletions

View File

@ -181,17 +181,23 @@ def on_action_complete(action_ex, result):
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_spec = wf_spec.get_tasks()[task_ex.name]
task_state = states.SUCCESS if result.is_success() else states.ERROR
if result.is_success():
task_state = states.SUCCESS
task_state_info = None
else:
task_state = states.ERROR
task_state_info = result.error
if not task_spec.get_with_items():
_complete_task(task_ex, task_spec, task_state)
_complete_task(task_ex, task_spec, task_state, task_state_info)
else:
with_items.increase_capacity(task_ex)
if with_items.is_completed(task_ex):
_complete_task(
task_ex,
task_spec,
with_items.get_final_state(task_ex)
with_items.get_final_state(task_ex),
task_state_info
)
return task_ex
@ -490,12 +496,12 @@ def run_workflow(wf_name, wf_input, wf_params):
)
def _complete_task(task_ex, task_spec, state):
def _complete_task(task_ex, task_spec, state, state_info=None):
# Ignore if task already completed.
if states.is_completed(task_ex.state):
return []
_set_task_state(task_ex, state)
_set_task_state(task_ex, state, state_info=state_info)
try:
data_flow.publish_variables(

View File

@ -24,11 +24,19 @@ from mistral.workflow import utils as wf_utils
def succeed_workflow(wf_ex, final_context, state_info=None):
set_execution_state(wf_ex, states.SUCCESS, state_info)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
wf_ex.output = data_flow.evaluate_workflow_output(wf_spec, final_context)
# Fail workflow if output is not successfully evaluated.
try:
wf_ex.output = data_flow.evaluate_workflow_output(
wf_spec,
final_context
)
except Exception as e:
return fail_workflow(wf_ex, e.message)
# Set workflow execution to success until after output is evaluated.
set_execution_state(wf_ex, states.SUCCESS, state_info)
if wf_ex.task_execution_id:
_schedule_send_result_to_parent_workflow(wf_ex)
@ -66,7 +74,10 @@ def send_result_to_parent_workflow(wf_ex_id):
wf_utils.Result(data=wf_ex.output)
)
elif wf_ex.state == states.ERROR:
err_msg = 'Failed subworkflow [execution_id=%s]' % wf_ex.id
err_msg = (
wf_ex.state_info or
'Failed subworkflow [execution_id=%s]' % wf_ex.id
)
rpc.get_engine_client().on_action_complete(
wf_ex.id,

View File

@ -176,8 +176,8 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(
"Failure caused by error in task 'task1': ",
self.assertIn(
"Failure caused by error in tasks: task1",
wf_ex.state_info
)

View File

@ -48,7 +48,7 @@ class ExecutionStateInfoTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("error in task 'task1'", wf_ex.state_info)
self.assertIn("error in tasks: task1", wf_ex.state_info)
def test_state_info_two_failed_branches(self):
workflow = """---
@ -72,8 +72,7 @@ class ExecutionStateInfoTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("error in task 'task1'", wf_ex.state_info)
self.assertIn("error in task 'task2'", wf_ex.state_info)
self.assertIn("error in tasks: task1, task2", wf_ex.state_info)
def test_state_info_with_policies(self):
workflow = """---
@ -99,4 +98,4 @@ class ExecutionStateInfoTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertIn("error in task 'task1'", wf_ex.state_info)
self.assertIn("error in tasks: task1", wf_ex.state_info)

View File

@ -23,6 +23,7 @@ from mistral.db.v2 import api as db_api
from mistral import exceptions as exc
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
LOG = logging.getLogger(__name__)
@ -31,11 +32,11 @@ LOG = logging.getLogger(__name__)
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
WB1 = """
---
version: '2.0'
name: my_wb
name: wb1
workflows:
wf1:
@ -70,15 +71,38 @@ workflows:
slogan: "<% $.task1.final_result %> is a cool movie!"
"""
WB2 = """
---
version: '2.0'
name: wb2
workflows:
wf1:
type: direct
tasks:
task1:
workflow: wf2
wf2:
type: direct
output:
var1: <% $.does_not_exist %>
tasks:
task1:
action: std.noop
"""
class SubworkflowsTest(base.EngineTestCase):
def setUp(self):
super(SubworkflowsTest, self).setUp()
wb_service.create_workbook_v2(WORKBOOK)
wb_service.create_workbook_v2(WB1)
wb_service.create_workbook_v2(WB2)
def test_subworkflow_success(self):
wf2_ex = self.engine.start_workflow('my_wb.wf2', None)
wf2_ex = self.engine.start_workflow('wb1.wf2', None)
project_id = auth_context.ctx().project_id
@ -95,8 +119,8 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertEqual(2, len(wf_execs))
# Execution of 'wf2'.
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
self.assertEqual(project_id, wf1_ex.project_id)
self.assertIsNotNone(wf1_ex.task_execution_id)
@ -154,7 +178,7 @@ class SubworkflowsTest(base.EngineTestCase):
@mock.patch.object(std_actions.EchoAction, 'run',
mock.MagicMock(side_effect=exc.ActionException))
def test_subworkflow_error(self):
wf2_ex = self.engine.start_workflow('my_wb.wf2', None)
wf2_ex = self.engine.start_workflow('wb1.wf2', None)
self._await(lambda: len(db_api.get_workflow_executions()) == 2, 0.5, 5)
@ -162,8 +186,8 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertEqual(2, len(wf_execs))
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
# Wait till workflow 'wf1' is completed.
self._await(lambda: self.is_execution_error(wf1_ex.id))
@ -171,10 +195,28 @@ class SubworkflowsTest(base.EngineTestCase):
# Wait till workflow 'wf2' is completed, its state must be ERROR.
self._await(lambda: self.is_execution_error(wf2_ex.id))
def test_subworkflow_yaql_error(self):
wf_ex = self.engine.start_workflow('wb2.wf1', None)
self._await(lambda: self.is_execution_error(wf_ex.id))
wf_execs = db_api.get_workflow_executions()
self.assertEqual(2, len(wf_execs))
wf2_ex = self._assert_single_item(wf_execs, name='wb2.wf2')
self.assertEqual(states.ERROR, wf2_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf2_ex.state_info)
# Ensure error message is bubbled up to the main workflow.
wf1_ex = self._assert_single_item(wf_execs, name='wb2.wf1')
self.assertEqual(states.ERROR, wf1_ex.state)
self.assertIn('Can not evaluate YAQL expression', wf1_ex.state_info)
def test_subworkflow_environment_inheritance(self):
env = {'key1': 'abc'}
wf2_ex = self.engine.start_workflow('my_wb.wf2', None, env=env)
wf2_ex = self.engine.start_workflow('wb1.wf2', None, env=env)
# Execution of 'wf2'.
self.assertIsNotNone(wf2_ex)
@ -188,8 +230,8 @@ class SubworkflowsTest(base.EngineTestCase):
self.assertEqual(2, len(wf_execs))
# Execution of 'wf1'.
wf1_ex = self._assert_single_item(wf_execs, name='my_wb.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='my_wb.wf2')
wf1_ex = self._assert_single_item(wf_execs, name='wb1.wf1')
wf2_ex = self._assert_single_item(wf_execs, name='wb1.wf2')
expected_start_params = {
'task_name': 'task2',

View File

@ -109,21 +109,25 @@ def find_error_task_executions(wf_ex):
def construct_fail_info_message(wf_ctrl, wf_ex):
# Try to find where error is exactly.
failed_tasks = filter(
lambda t: not wf_ctrl.is_error_handled_for(t),
find_error_task_executions(wf_ex)
failed_tasks = sorted(
filter(
lambda t: not wf_ctrl.is_error_handled_for(t),
find_error_task_executions(wf_ex)
),
key=lambda t: t.name
)
errors = []
msg = ('Failure caused by error in tasks: %s\n' %
', '.join([t.name for t in failed_tasks]))
for t in failed_tasks:
errors += [
("error in task '%s': "
"%s" % (t.name, str(ex.output.get('result', 'Unknown')))
if ex.output else 'Unknown')
for ex in t.executions
]
msg += '\n %s -> ' % t.name
state_info = "Failure caused by %s" % ';\n '.join(errors)
if t.state_info:
msg += t.state_info + '\n'
else:
for i, ex in enumerate(t.executions):
output = (ex.output or dict()).get('result', 'Unknown')
msg += ' action execution #%s: %s\n' % (i + 1, str(output))
return state_info
return msg