Catch workflow errors

Avoid keeping workflow in zombie state: on unexpected errors
fail the workflow execution and save error information. If possible,
fail the correspondent tasks, too.

Done:
- [x] Introduce state_info into Execution, fill it with error info
when workflow fails.
- [x] Add error handlers to `on_task_result` and `run_task`
- [x] Add error handlers and tests to `start_workflow`
- [x] Add tests for YAQL eval exceptions
- [x] Add error handler and tests to `resume_worklfow`

TODO: 
- [ ] Make _fail_workflow public and let user fail workflow via API
(on separate commit)

Closes-bug: 1415886
Change-Id: Ib1fd661580d3a426dbbc7401a3622a98c7840e00
This commit is contained in:
Dmitri Zimine 2015-02-01 16:12:47 -08:00
parent 9300408fb1
commit 6a2481c6c6
6 changed files with 371 additions and 136 deletions

View File

@ -47,6 +47,9 @@ class Execution(resource.Resource):
state = wtypes.text
"state can be one of: RUNNING, SUCCESS, ERROR, PAUSED"
state_info = wtypes.text
"an optional state information string"
input = wtypes.text
"input is a JSON structure containing workflow input values."
output = wtypes.text

View File

@ -64,6 +64,7 @@ class Execution(mb.MistralSecureModelBase):
wf_spec = sa.Column(st.JsonDictType())
start_params = sa.Column(st.JsonDictType())
state = sa.Column(sa.String(20))
state_info = sa.Column(sa.Text(), nullable=True)
input = sa.Column(st.JsonDictType())
output = sa.Column(st.JsonDictType())
context = sa.Column(st.JsonDictType())

View File

@ -15,6 +15,7 @@
import copy
from oslo.config import cfg
import traceback
from mistral.db.v2 import api as db_api
from mistral.engine1 import base
@ -49,69 +50,130 @@ class DefaultEngine(base.Engine):
"Starting the execution of workflow '%s'"
% workflow_name
)
try:
execution_id = None
params = self._canonize_workflow_params(params)
params = self._canonize_workflow_params(params)
with db_api.transaction():
wf_db = db_api.get_workflow(workflow_name)
with db_api.transaction():
wf_db = db_api.get_workflow(workflow_name)
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
wf_spec = spec_parser.get_workflow_spec(wf_db.spec)
utils.validate_workflow_input(wf_db, wf_spec, workflow_input)
utils.validate_workflow_input(wf_db, wf_spec, workflow_input)
exec_db = self._create_db_execution(
wf_db,
wf_spec,
workflow_input,
params
)
execution_id = exec_db.id
exec_db = self._create_db_execution(
wf_db,
wf_spec,
workflow_input,
params
)
wf_handler = wfh_factory.create_workflow_handler(
exec_db,
wf_spec
)
wf_handler = wfh_factory.create_workflow_handler(exec_db, wf_spec)
# Calculate commands to process next.
cmds = wf_handler.start_workflow(**params)
# Calculate commands to process next.
cmds = wf_handler.start_workflow(**params)
self._run_local_commands(cmds, exec_db, wf_handler)
self._run_local_commands(cmds, exec_db, wf_handler)
self._run_remote_commands(cmds, exec_db, wf_handler)
self._run_remote_commands(cmds, exec_db, wf_handler)
except Exception as e:
LOG.error("Failed to start workflow '%s' id=%s: %s\n%s",
workflow_name, execution_id, e, traceback.format_exc())
self._fail_workflow(execution_id, e)
raise e
return exec_db
@u.log_exec(LOG)
def on_task_result(self, task_id, raw_result):
with db_api.transaction():
task_db = db_api.get_task(task_id)
exec_db = db_api.get_execution(task_db.execution_id)
try:
task_name = "Unknown"
execution_id = None
raw_result = utils.transform_result(exec_db, task_db, raw_result)
wf_handler = wfh_factory.create_workflow_handler(exec_db)
with db_api.transaction():
task_db = db_api.get_task(task_id)
task_name = task_db.name
exec_db = db_api.get_execution(task_db.execution_id)
execution_id = exec_db.id
self._after_task_complete(
task_db,
spec_parser.get_task_spec(task_db.spec),
raw_result,
wf_handler.wf_spec
)
raw_result = utils.transform_result(
exec_db, task_db, raw_result)
wf_handler = wfh_factory.create_workflow_handler(exec_db)
if task_db.state == states.DELAYED:
return task_db
self._after_task_complete(
task_db,
spec_parser.get_task_spec(task_db.spec),
raw_result,
wf_handler.wf_spec
)
# Calculate commands to process next.
cmds = wf_handler.on_task_result(task_db, raw_result)
if task_db.state == states.DELAYED:
return task_db
self._run_local_commands(
cmds,
exec_db,
wf_handler,
task_db
)
# Calculate commands to process next.
cmds = wf_handler.on_task_result(task_db, raw_result)
self._run_remote_commands(cmds, exec_db, wf_handler)
self._run_local_commands(
cmds,
exec_db,
wf_handler,
task_db
)
self._check_subworkflow_completion(exec_db)
self._run_remote_commands(cmds, exec_db, wf_handler)
self._check_subworkflow_completion(exec_db)
except Exception as e:
LOG.error("Failed to handle results for task '%s' id=%s: %s\n%s",
task_name, task_id, e, traceback.format_exc())
# TODO(dzimine): try to find out which command caused failure.
self._fail_workflow(execution_id, e)
raise e
return task_db
@u.log_exec(LOG)
def run_task(self, task_id):
try:
execution_id = None
with db_api.transaction():
task_db = db_api.get_task(task_id)
WF_TRACE.info(
"Task '%s' [%s -> %s]"
% (task_db.name, task_db.state, states.RUNNING)
)
task_db = db_api.update_task(
task_id,
{'state': states.RUNNING}
)
task_spec = spec_parser.get_task_spec(task_db.spec)
exec_db = task_db.execution
execution_id = exec_db.id
wf_handler = wfh_factory.create_workflow_handler(exec_db)
cmd = commands.RunTask(task_spec, task_db)
cmd.run_local(exec_db, wf_handler)
cmd.run_remote(exec_db, wf_handler)
except Exception as e:
LOG.error("Failed to run task '%s': %s\n%s",
task_db.name, e, traceback.format_exc())
self._fail_workflow(execution_id, e, task_id)
raise e
@u.log_exec(LOG)
def pause_workflow(self, execution_id):
with db_api.transaction():
@ -125,17 +187,50 @@ class DefaultEngine(base.Engine):
@u.log_exec(LOG)
def resume_workflow(self, execution_id):
try:
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
wf_handler = wfh_factory.create_workflow_handler(exec_db)
# Calculate commands to process next.
cmds = wf_handler.resume_workflow()
self._run_local_commands(cmds, exec_db, wf_handler)
self._run_remote_commands(cmds, exec_db, wf_handler)
except Exception as e:
LOG.error("Failed to resume execution id=%s: %s\n%s",
execution_id, e, traceback.format_exc())
self._fail_workflow(execution_id, e)
raise e
return exec_db
# TODO(dzimine): make public, add to RPC, expose in API
def _fail_workflow(self, execution_id, err, task_id=None):
with db_api.transaction():
exec_db = db_api.get_execution(execution_id)
err_msg = str(err)
exec_db = db_api.load_execution(execution_id)
if exec_db is None:
LOG.error("Cant fail workflow execution id='%s': not found.",
execution_id)
return
wf_handler = wfh_factory.create_workflow_handler(exec_db)
wf_handler.fail_workflow(err_msg)
# Calculate commands to process next.
cmds = wf_handler.resume_workflow()
self._run_local_commands(cmds, exec_db, wf_handler)
self._run_remote_commands(cmds, exec_db, wf_handler)
if task_id:
task_db = db_api.get_task(task_id)
# Note(dzimine): Don't call self.engine_client:
# 1) to avoid computing and triggering next tasks
# 2) to avoid a loop in case of error in transport
wf_handler.on_task_result(
task_db,
wf_utils.TaskResult(error=err_msg)
)
return exec_db
@ -206,29 +301,6 @@ class DefaultEngine(base.Engine):
for p in policies.build_policies(task_spec.get_policies(), wf_spec):
p.after_task_complete(task_db, task_spec, raw_result)
@u.log_exec(LOG)
def run_task(self, task_id):
with db_api.transaction():
task_db = db_api.get_task(task_id)
WF_TRACE.info(
"Task '%s' [%s -> %s]"
% (task_db.name, task_db.state, states.RUNNING)
)
task_db = db_api.update_task(task_id, {'state': states.RUNNING})
task_spec = spec_parser.get_task_spec(task_db.spec)
exec_db = task_db.execution
wf_handler = wfh_factory.create_workflow_handler(exec_db)
cmd = commands.RunTask(task_spec, task_db)\
cmd.run_local(exec_db, wf_handler)
cmd.run_remote(exec_db, wf_handler)
def _check_subworkflow_completion(self, exec_db):
if not exec_db.parent_task_id:
return

View File

@ -12,12 +12,17 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo.config import cfg
from yaql import exceptions as yaql_exc
from mistral.db.v2 import api as db_api
from mistral.engine1 import default_engine as de
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit.engine1 import base
from mistral.workflow import states
# TODO(nmakhotkin) Need to write more tests.
@ -27,90 +32,209 @@ LOG = logging.getLogger(__name__)
# the change in value is not permanent.
cfg.CONF.set_default('auth_enable', False, group='pecan')
WORKBOOK = """
---
version: '2.0'
name: wb
workflows:
wf1:
type: direct
tasks:
task1:
description: That should lead to workflow fail.
action: std.echo output="Echo"
on-success:
- task2
- succeed
on-complete:
- task3
- task4
- fail
task2:
action: std.echo output="Morpheus"
task3:
action: std.echo output="output"
task4:
action: std.echo output="output"
"""
WORKBOOK_WRONG_TASK_INPUT = """
---
version: '2.0'
name: wb
workflows:
wf:
type: direct
tasks:
task1:
description: That should lead to workflow fail.
action: std.echo outpu="Echo"
"""
class DirectWorkflowEngineTest(base.EngineTestCase):
def test_direct_workflow_on_closures(self):
wb_service.create_workbook_v2(WORKBOOK)
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf1', {})
def _run_workflow(self, worklfow_yaml):
wf_service.create_workflows(worklfow_yaml)
exec_db = self.engine.start_workflow('wf', {})
self._await(lambda: self.is_execution_error(exec_db.id))
# Note: We need to reread execution to access related tasks.
exec_db = db_api.get_execution(exec_db.id)
return db_api.get_execution(exec_db.id)
def test_direct_workflow_on_closures(self):
WORKFLOW = """
version: '2.0'
wf:
type: direct
tasks:
task1:
description: That should lead to workflow fail.
action: std.echo output="Echo"
on-success:
- task2
- succeed
on-complete:
- task3
- task4
- fail
- never_gets_here
task2:
action: std.echo output="Morpheus"
task3:
action: std.echo output="output"
task4:
action: std.echo output="output"
never_gets_here:
action: std.noop
"""
exec_db = self._run_workflow(WORKFLOW)
tasks = exec_db.tasks
task1 = self._assert_single_item(tasks, name='task1')
task3 = self._assert_single_item(tasks, name='task3')
task4 = self._assert_single_item(tasks, name='task4')
self.assertEqual(3, len(tasks))
self._await(lambda: self.is_task_success(task1.id))
self._await(lambda: self.is_task_success(task3.id))
self._await(lambda: self.is_task_success(task4.id))
self.assertTrue(exec_db.state, states.ERROR)
def test_wrong_task_input(self):
wb_service.create_workbook_v2(WORKBOOK_WRONG_TASK_INPUT)
WORKFLOW_WRONG_TASK_INPUT = """
version: '2.0'
# Start workflow.
exec_db = self.engine.start_workflow('wb.wf', {})
wf:
type: direct
self._await(lambda: self.is_execution_error(exec_db.id))
tasks:
task1:
action: std.echo output="Echo"
on-complete:
- task2
task2:
description: Wrong task output should lead to workflow failure
action: std.echo wrong_input="Hahaha"
"""
exec_db = exec_db = self._run_workflow(WORKFLOW_WRONG_TASK_INPUT)
task_db2 = exec_db.tasks[1]
exec_db = db_api.get_execution(exec_db.id)
self.assertIn(
"Failed to initialize action",
task_db2.output['task'][task_db2.name]
)
self.assertIn(
"unexpected keyword argument",
task_db2.output['task'][task_db2.name]
)
self.assertTrue(exec_db.state, states.ERROR)
self.assertIn(task_db2.output['error'], exec_db.state_info)
def test_wrong_first_task_input(self):
WORKFLOW_WRONG_FIRST_TASK_INPUT = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo wrong_input="Ha-ha"
"""
exec_db = self._run_workflow(WORKFLOW_WRONG_FIRST_TASK_INPUT)
task_db = exec_db.tasks[0]
self.assertIn(
"Failed to initialize action",
task_db.output['task'][task_db.name]
)
self.assertIn(
"unexpected keyword argument",
task_db.output['task'][task_db.name]
)
self.assertTrue(exec_db.state, states.ERROR)
self.assertIn(task_db.output['error'], exec_db.state_info)
def test_wrong_action(self):
WORKFLOW_WRONG_ACTION = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Echo"
on-complete:
- task2
task2:
action: action.doesnt_exist
"""
exec_db = self._run_workflow(WORKFLOW_WRONG_ACTION)
# TODO(dzimine): Catch tasks caused error, and set them to ERROR:
# TODO(dzimine): self.assertTrue(task_db.state, states.ERROR)
self.assertTrue(exec_db.state, states.ERROR)
self.assertIn("Failed to find action", exec_db.state_info)
def test_wrong_action_first_task(self):
WORKFLOW_WRONG_ACTION_FIRST_TASK = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: wrong.task
"""
wf_service.create_workflows(WORKFLOW_WRONG_ACTION_FIRST_TASK)
with mock.patch.object(de.DefaultEngine, '_fail_workflow') as mock_fw:
self.assertRaises(
exc.InvalidActionException,
self.engine.start_workflow, 'wf', None)
mock_fw.assert_called_once()
self.assertTrue(
issubclass(
type(mock_fw.call_args[0][1]),
exc.InvalidActionException
),
"Called with a right exception"
)
def test_messed_yaql(self):
WORKFLOW_MESSED_YAQL = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="Echo"
# publish: {wrong yaql}
on-complete:
- task2
task2:
action: std.echo output="{wrong yaql}"
"""
exec_db = self._run_workflow(WORKFLOW_MESSED_YAQL)
self.assertTrue(exec_db.state, states.ERROR)
def test_messed_yaql_in_first_task(self):
WORKFLOW_MESSED_YAQL_IN_FIRST_TASK = """
version: '2.0'
wf:
type: direct
tasks:
task1:
action: std.echo output="{wrong(yaql)}"
"""
wf_service.create_workflows(WORKFLOW_MESSED_YAQL_IN_FIRST_TASK)
with mock.patch.object(de.DefaultEngine, '_fail_workflow') as mock_fw:
self.assertRaises(
yaql_exc.YaqlException, self.engine.start_workflow, 'wf', None)
mock_fw.assert_called_once()
self.assertTrue(
issubclass(
type(mock_fw.call_args[0][1]),
yaql_exc.YaqlException
),
"Called with a right exception"
)

View File

@ -12,9 +12,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo.config import cfg
from mistral.db.v2 import api as db_api
from mistral.engine1 import default_engine as de
from mistral import exceptions as exc
from mistral.openstack.common import log as logging
from mistral.services import scheduler
from mistral.services import workbooks as wb_service
@ -300,3 +303,28 @@ class WorkflowResumeTest(base.EngineTestCase):
self.assertEqual(states.SUCCESS, exec_db.state)
self.assertEqual(4, len(exec_db.tasks))
@mock.patch.object(de.DefaultEngine, '_fail_workflow')
def test_resume_fails(self, mock_fw):
# Start and pause workflow.
wb_service.create_workbook_v2(WORKBOOK_DIFFERENT_TASK_STATES)
exec_db = self.engine.start_workflow('wb.wf1', {})
self._await(lambda: self.is_execution_paused(exec_db.id))
exec_db = db_api.get_execution(exec_db.id)
self.assertEqual(states.PAUSED, exec_db.state)
# Simulate failure and check if it is handled.
err = exc.MistralException('foo')
with mock.patch.object(
de.DefaultEngine,
'_run_remote_commands',
side_effect=err):
self.assertRaises(
exc.MistralException,
self.engine.resume_workflow,
exec_db.id
)
mock_fw.assert_called_once_with(exec_db.id, err)

View File

@ -109,7 +109,13 @@ class WorkflowHandler(object):
if (task_db.state == states.ERROR and
not self._is_error_handled(task_db)):
if not self.is_paused_or_completed():
self._set_execution_state(states.ERROR)
# TODO(dzimine): pass task_db.result when Model refactored.
msg = str(task_db.output.get('error', "Unknown"))
self._set_execution_state(
states.ERROR,
"Failure caused by error in task '%s': %s"
% (task_db.name, msg)
)
return []
@ -254,12 +260,12 @@ class WorkflowHandler(object):
return self.exec_db
def fail_workflow(self):
def fail_workflow(self, err_msg=None):
"""Stops workflow with ERROR state.
:return: Execution object.
"""
self._set_execution_state(states.ERROR)
self._set_execution_state(states.ERROR, err_msg)
return self.exec_db
@ -286,13 +292,14 @@ class WorkflowHandler(object):
"""
raise NotImplementedError
def _set_execution_state(self, state):
def _set_execution_state(self, state, state_info=None):
cur_state = self.exec_db.state
if states.is_valid_transition(cur_state, state):
WF_TRACE.info("Execution of workflow '%s' [%s -> %s]"
% (self.exec_db.wf_name, cur_state, state))
self.exec_db.state = state
self.exec_db.state_info = state_info
else:
msg = "Can't change workflow state [execution=%s," \
" state=%s -> %s]" % (self.exec_db, cur_state, state)