Add proper error handling for task continuation

* In case if task needs to be continued, e.g. in case of 'wait-before'
  policy which inserts a delay into normal task execution flow (between
  creation of task policy and scheduling actions), possible exceptions
  also need to be handled properly (move task and worklfow into ERROR).
  This patch adds error handling and the test to check this.
* Other minor changes related to addressing a few TODO's across engine
  code.

Change-Id: I525f193a149e3b0341aa8d0ffa0858ded96ba94f
This commit is contained in:
Renat Akhmerov 2016-07-08 13:41:07 +07:00
parent eda639fe87
commit 633eb0fe6d
4 changed files with 63 additions and 6 deletions

View File

@ -267,7 +267,6 @@ class PythonAction(Action):
if self.action_def.action_class:
self._inject_action_ctx_for_validating(input_dict)
# TODO(rakhmerov): I'm not sure what this is for.
# NOTE(xylan): Don't validate action input if action initialization
# method contains ** argument.
if '**' not in self.action_def.input:

View File

@ -197,8 +197,6 @@ class WaitBeforePolicy(base.TaskPolicy):
task_ex.state = states.RUNNING_DELAYED
# TODO(rakhmerov): This is wrong as task handler doesn't manage
# transactions and hence it can't be called explicitly.
scheduler.schedule_call(
None,
_CONTINUE_TASK_PATH,
@ -410,7 +408,6 @@ class PauseBeforePolicy(base.TaskPolicy):
task_ex.state = states.IDLE
# TODO(rakhmerov): In progress.
class ConcurrencyPolicy(base.TaskPolicy):
_schema = {
"properties": {
@ -424,6 +421,10 @@ class ConcurrencyPolicy(base.TaskPolicy):
def before_task_start(self, task_ex, task_spec):
super(ConcurrencyPolicy, self).before_task_start(task_ex, task_spec)
# This policy doesn't do anything except validating "concurrency"
# property value and setting a variable into task runtime context.
# This variable is then used to define how many action executions
# may be started in parallel.
context_key = 'concurrency'
runtime_context = _ensure_context_has_key(

View File

@ -120,8 +120,23 @@ def fail_task(task_ex, msg):
def continue_task(task_ex):
task = _build_task_from_execution(task_ex)
# TODO(rakhmerov): Error handling.
task.run()
try:
task.run()
except exc.MistralException as e:
wf_ex = task_ex.workflow_execution
msg = (
"Failed to run task [wf=%s, task=%s]: %s\n%s" %
(wf_ex, task_ex.name, e, tb.format_exc())
)
LOG.error(msg)
task.set_state(states.ERROR, msg)
wf_handler.fail_workflow(wf_ex, msg)
return
if task.is_completed():
wf_handler.on_task_complete(task_ex)

View File

@ -208,3 +208,45 @@ class ErrorHandlingEngineTest(base.EngineTestCase):
name='std.noop',
state=states.SUCCESS
)
def test_action_error_with_wait_before_policy(self):
# Check that state of all workflow objects (workflow executions,
# task executions, action executions) is properly persisted in case
# of action error and task has 'wait-before' policy. It is an
# implicit test for task continuation because 'wait-before' inserts
# a delay between preparing task execution object and scheduling
# actions. If an an error happens during scheduling actions (e.g.
# invalid YAQL in action parameters) then we also need to handle
# this properly, meaning that task and workflow state should go
# into ERROR state.
wf_text = """
version: '2.0'
wf:
tasks:
task1:
action: std.echo output=<% invalid_yaql_function() %>
wait-before: 1
"""
wf_service.create_workflows(wf_text)
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_error(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_execs = wf_ex.task_executions
self.assertEqual(1, len(task_execs))
task_ex = self._assert_single_item(
task_execs,
name='task1',
state=states.ERROR
)
action_execs = task_ex.executions
self.assertEqual(0, len(action_execs))