Fail task on publish error

If there's YAQL expression evaluation error in published variables for
any task, the on_action_complete fails, DB transactions rollback, and
the task is stuck in a RUNNING state. This patch sets the task state
to ERROR and save the error message in the task state_info.

Change-Id: I9003157f57f4e649610428b046c237f8f51038fb
Closes-Bug: #1496685
This commit is contained in:
Winson Chan 2015-09-18 22:37:34 +00:00
parent 1d293aced5
commit bf684b6adc
4 changed files with 113 additions and 8 deletions

View File

@ -49,6 +49,9 @@ class Task(resource.Resource):
"state can take one of the following values: \
IDLE, RUNNING, SUCCESS, ERROR, DELAYED"
state_info = wtypes.text
"an optional state information string"
result = wtypes.text
published = types.jsontype
processed = bool

View File

@ -497,16 +497,19 @@ def _complete_task(task_ex, task_spec, state):
_set_task_state(task_ex, state)
data_flow.publish_variables(
task_ex,
task_spec
)
try:
data_flow.publish_variables(
task_ex,
task_spec
)
except Exception as e:
_set_task_state(task_ex, states.ERROR, state_info=str(e))
if not task_spec.get_keep_result():
data_flow.destroy_task_result(task_ex)
def _set_task_state(task_ex, state):
def _set_task_state(task_ex, state, state_info=None):
# TODO(rakhmerov): How do we log task result?
wf_trace.info(
task_ex.workflow_execution,
@ -516,6 +519,9 @@ def _set_task_state(task_ex, state):
task_ex.state = state
if state_info:
task_ex.state_info = state_info
def is_task_completed(task_ex, task_spec):
if task_spec.get_with_items():

View File

@ -168,9 +168,17 @@ class ExecutionFieldsSizeLimitTest(base.EngineTestCase):
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual("Size of 'published' is 1KB which exceeds "
"the limit of 0KB",
wf_ex.state_info)
self.assertEqual(
"Failure caused by error in task 'task1': ",
wf_ex.state_info
)
task_ex = self._assert_single_item(wf_ex.task_executions, name='task1')
self.assertEqual(
"Size of 'published' is 1KB which exceeds the limit of 0KB",
task_ex.state_info
)
@expect_size_limit_exception('params')
def test_workflow_params_limit(self):

View File

@ -0,0 +1,88 @@
# Copyright 2015 - StackStorm, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo_config import cfg
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.services import workbooks as wb_service
from mistral.tests.unit.engine import base
from mistral.workflow import states
# Use the set_default method to set value otherwise in certain test cases
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
SIMPLE_WORKBOOK = """
---
version: '2.0'
name: wb1
workflows:
wf1:
type: direct
tasks:
t1:
action: std.echo output="Task 1"
publish:
v1: <% $.t1.get($foobar) %>
on-success:
- t2
t2:
action: std.echo output="Task 2"
on-success:
- t3
t3:
action: std.echo output="Task 3"
"""
class TaskPublishTest(base.EngineTestCase):
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(
side_effect=[
'Task 1', # Mock task1 success.
'Task 2', # Mock task2 success.
'Task 3' # Mock task3 success.
]
)
)
def test_publish_failure(self):
wb_service.create_workbook_v2(SIMPLE_WORKBOOK)
# Run workflow and fail task.
wf_ex = self.engine.start_workflow('wb1.wf1', {})
self._await(lambda: self.is_execution_error(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertEqual(1, len(wf_ex.task_executions))
task_1_ex = self._assert_single_item(wf_ex.task_executions, name='t1')
# Task 1 should have failed.
self.assertEqual(states.ERROR, task_1_ex.state)
self.assertIn('Can not evaluate YAQL expression', task_1_ex.state_info)
# Action execution of task 1 should have succeeded.
task_1_action_exs = db_api.get_action_executions(
task_execution_id=task_1_ex.id)
self.assertEqual(1, len(task_1_action_exs))
self.assertEqual(states.SUCCESS, task_1_action_exs[0].state)