Publish/output in case of task/workflow failure

Currently it is not possible to provide any reasonable output in case of
a task or workflow failure. Implementing this would greatly simplify error
handling in workflows. This blueprint is a proposal to introduce two new
attributes, publish-on-error for tasks and output-on-error for workflows
for this purpose.

Implements: blueprint mistral-publish-on-error
Change-Id: Ib3a64971effb02390206dc6f993e772a51f8f237
This commit is contained in:
Istvan Imre 2016-09-22 14:17:23 +02:00
parent 7685cdb1b6
commit 3f5092d7f4
11 changed files with 203 additions and 40 deletions

View File

@ -128,6 +128,9 @@ Common workflow attributes
optionally their default values in a form "my_param: 123". *Optional*.
- **output** - Any data structure arbitrarily containing
expressions that defines workflow output. May be nested. *Optional*.
- **output-on-error** - Any data structure arbitrarily containing YAQL
expressions that defines output of a workflow to be returned if it goes into
error state. May be nested. *Optional*.
- **task-defaults** - Default settings for some of task attributes
defined at workflow level. *Optional*. Corresponding attribute
defined for a specific task always takes precedence. Specific task
@ -201,6 +204,8 @@ attributes:
expression to select precisely what needs to be published.
Published variables will be accessible for downstream tasks via using
expressions. *Optional*.
- **publish-on-error** - Same as **publish** but evaluated in case of
task execution failures. *Optional*
- **with-items** - If configured, it allows to run action or workflow
associated with a task multiple times on a provided list of items.
See `Processing collections using

View File

@ -308,6 +308,10 @@ def get_task_executions(limit=None, marker=None, sort_keys=['created_at'],
)
def get_completed_task_executions(**kwargs):
return IMPL.get_completed_task_executions(**kwargs)
def get_incomplete_task_executions(**kwargs):
return IMPL.get_incomplete_task_executions(**kwargs)

View File

@ -35,6 +35,7 @@ from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral.services import security
from mistral import utils
from mistral.workflow import states
CONF = cfg.CONF
@ -797,6 +798,28 @@ def get_task_executions(**kwargs):
return _get_task_executions(**kwargs)
def _get_completed_task_executions_query(kwargs):
query = b.model_query(models.TaskExecution)
query = query.filter_by(**kwargs)
query = query.filter(
sa.or_(
models.TaskExecution.state == states.ERROR,
models.TaskExecution.state == states.CANCELLED,
models.TaskExecution.state == states.SUCCESS
)
)
return query
def get_completed_task_executions(**kwargs):
query = _get_completed_task_executions_query(kwargs)
return query.all()
def _get_incomplete_task_executions_query(kwargs):
query = b.model_query(models.TaskExecution)
@ -804,10 +827,10 @@ def _get_incomplete_task_executions_query(kwargs):
query = query.filter(
sa.or_(
models.TaskExecution.state == 'IDLE',
models.TaskExecution.state == 'RUNNING',
models.TaskExecution.state == 'WAITING',
models.TaskExecution.state == 'DELAYED'
models.TaskExecution.state == states.IDLE,
models.TaskExecution.state == states.RUNNING,
models.TaskExecution.state == states.WAITING,
models.TaskExecution.state == states.RUNNING_DELAYED
)
)
@ -971,9 +994,9 @@ def get_expired_executions(time, session=None):
query = query.filter(models.WorkflowExecution.updated_at < time)
query = query.filter(
sa.or_(
models.WorkflowExecution.state == "SUCCESS",
models.WorkflowExecution.state == "ERROR",
models.WorkflowExecution.state == "CANCELLED"
models.WorkflowExecution.state == states.SUCCESS,
models.WorkflowExecution.state == states.ERROR,
models.WorkflowExecution.state == states.CANCELLED
)
)

View File

@ -28,6 +28,7 @@ from mistral import exceptions as exc
from mistral.services import scheduler
from mistral.services import workflows as wf_service
from mistral import utils
from mistral.utils import merge_dicts
from mistral.utils import wf_trace
from mistral.workbook import parser as spec_parser
from mistral.workflow import base as wf_base
@ -106,21 +107,11 @@ class Workflow(object):
assert self.wf_ex
if state == states.SUCCESS:
wf_ctrl = wf_base.get_controller(self.wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for %s: %s' % (self.wf_ex, e)
)
return self._succeed_workflow(final_context, msg)
return self._succeed_workflow(self._get_final_context(), msg)
elif state == states.ERROR:
return self._fail_workflow(msg)
return self._fail_workflow(self._get_final_context(), msg)
elif state == states.CANCELLED:
return self._cancel_workflow(msg)
def resume(self, env=None):
@ -196,6 +187,17 @@ class Workflow(object):
return db_api.acquire_lock(db_models.WorkflowExecution, self.wf_ex.id)
def _get_final_context(self):
wf_ctrl = wf_base.get_controller(self.wf_ex)
final_context = {}
try:
final_context = wf_ctrl.evaluate_workflow_final_context()
except Exception as e:
LOG.warning(
'Failed to get final context for %s: %s' % (self.wf_ex, e)
)
return final_context
def _create_execution(self, input_dict, desc, params):
self.wf_ex = db_api.create_workflow_execution({
'name': self.wf_def.name,
@ -309,15 +311,15 @@ class Workflow(object):
self._succeed_workflow(ctx)
else:
msg = _build_fail_info_message(wf_ctrl, self.wf_ex)
self._fail_workflow(msg)
final_context = wf_ctrl.evaluate_workflow_final_context()
self._fail_workflow(final_context, msg)
return 0
def _succeed_workflow(self, final_context, msg=None):
self.wf_ex.output = data_flow.evaluate_workflow_output(
self.wf_ex,
self.wf_spec,
self.wf_spec.get_output(),
final_context
)
@ -327,10 +329,25 @@ class Workflow(object):
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()
def _fail_workflow(self, msg):
def _fail_workflow(self, final_context, msg):
if states.is_paused_or_completed(self.wf_ex.state):
return
output_on_error = {}
try:
output_on_error = data_flow.evaluate_workflow_output(
self.wf_ex,
self.wf_spec.get_output_on_error(),
final_context
)
except exc.MistralException as e:
msg = (
"Failed to evaluate expression in output-on-error! "
"(output-on-error: '%s', exception: '%s' Cause: '%s'"
% (self.wf_spec.get_output_on_error(), e, msg)
)
LOG.error(msg)
self.set_state(states.ERROR, state_info=msg)
# When we set an ERROR state we should safely set output value getting
@ -340,7 +357,7 @@ class Workflow(object):
cfg.CONF.engine.execution_field_size_limit_kb
)
self.wf_ex.output = {'result': msg}
self.wf_ex.output = merge_dicts({'result': msg}, output_on_error)
if self.wf_ex.task_execution_id:
self._schedule_send_result_to_parent_workflow()

View File

@ -19,6 +19,7 @@ from mistral.db.v2 import api as db_api
from mistral.db.v2.sqlalchemy import models
from mistral import exceptions as exc
from mistral import expressions as expr
from mistral.services import workbooks as wb_service
from mistral.services import workflows as wf_service
from mistral.tests.unit import base as test_base
from mistral.tests.unit.engine import base as engine_test_base
@ -549,6 +550,96 @@ class DataFlowEngineTest(engine_test_base.EngineTestCase):
self.assertListEqual([], result)
def test_publish_on_error(self):
wf_def = """---
version: '2.0'
wf:
type: direct
output-on-error:
out: <% $.hi %>
tasks:
task1:
action: std.fail
publish-on-error:
hi: hello_from_error
err: <% task(task1).result %>
"""
wf_service.create_workflows(wf_def)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {})
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertEqual(states.ERROR, task1.state)
self.assertEqual('hello_from_error', task1.published['hi'])
self.assertIn('Fail action expected exception', task1.published['err'])
self.assertEqual('hello_from_error', wf_ex.output['out'])
self.assertIn('Fail action expected exception', wf_ex.output['result'])
def test_output_on_error_wb_yaql_failed(self):
wb_def = """---
version: '2.0'
name: wb
workflows:
wf1:
type: direct
output-on-error:
message: <% $.message %>
tasks:
task1:
workflow: wf2
publish-on-error:
message: <% task(task1).result.message %>
wf2:
type: direct
output-on-error:
message: <% $.not_existing_variable %>
tasks:
task1:
action: std.fail
publish-on-error:
message: <% task(task1).result %>
"""
wb_service.create_workbook_v2(wb_def)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
self.await_workflow_error(wf_ex.id)
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
self.assertEqual(states.ERROR, wf_ex.state)
self.assertIn('Failed to evaluate expression in output-on-error!',
wf_ex.state_info)
self.assertIn('$.message', wf_ex.state_info)
tasks = wf_ex.task_executions
task1 = self._assert_single_item(tasks, name='task1')
self.assertIn('task(task1).result.message', task1.state_info)
class DataFlowTest(test_base.BaseTest):
def test_get_task_execution_result(self):

View File

@ -55,6 +55,7 @@ class TaskSpec(base.BaseSpec):
]
},
"publish": types.NONEMPTY_DICT,
"publish-on-error": types.NONEMPTY_DICT,
"retry": policies.RETRY_SCHEMA,
"wait-before": policies.WAIT_BEFORE_SCHEMA,
"wait-after": policies.WAIT_AFTER_SCHEMA,
@ -98,6 +99,7 @@ class TaskSpec(base.BaseSpec):
self._input = data.get('input', {})
self._with_items = self._transform_with_items()
self._publish = data.get('publish', {})
self._publish_on_error = data.get('publish-on-error', {})
self._policies = self._group_spec(
policies.PoliciesSpec,
'retry',
@ -212,6 +214,9 @@ class TaskSpec(base.BaseSpec):
def get_publish(self):
return self._publish
def get_publish_on_error(self):
return self._publish_on_error
def get_keep_result(self):
return self._keep_result

View File

@ -40,6 +40,7 @@ class WorkflowSpec(base.BaseSpec):
"task-defaults": _task_defaults_schema,
"input": types.UNIQUE_STRING_OR_ONE_KEY_DICT_LIST,
"output": types.NONEMPTY_DICT,
"output-on-error": types.NONEMPTY_DICT,
"vars": types.NONEMPTY_DICT
},
"required": ["tasks"],
@ -55,6 +56,7 @@ class WorkflowSpec(base.BaseSpec):
self._type = data['type'] if 'type' in data else 'direct'
self._input = utils.get_input_dict(data.get('input', []))
self._output = data.get('output', {})
self._output_on_error = data.get('output-on-error', {})
self._vars = data.get('vars', {})
self._task_defaults = self._spec_property(
@ -122,6 +124,9 @@ class WorkflowSpec(base.BaseSpec):
def get_output(self):
return self._output
def get_output_on_error(self):
return self._output_on_error
def get_vars(self):
return self._vars

View File

@ -183,7 +183,7 @@ def get_task_execution_result(task_ex):
def publish_variables(task_ex, task_spec):
if task_ex.state != states.SUCCESS:
if task_ex.state not in [states.SUCCESS, states.ERROR]:
return
wf_ex = task_ex.workflow_execution
@ -200,10 +200,10 @@ def publish_variables(task_ex, task_spec):
task_ex.name
)
task_ex.published = expr.evaluate_recursively(
task_spec.get_publish(),
expr_ctx
)
data = (task_spec.get_publish()
if task_ex.state == states.SUCCESS
else task_spec.get_publish_on_error())
task_ex.published = expr.evaluate_recursively(data, expr_ctx)
def evaluate_task_outbound_context(task_ex):
@ -222,20 +222,18 @@ def evaluate_task_outbound_context(task_ex):
return utils.update_dict(in_context, task_ex.published)
def evaluate_workflow_output(wf_ex, wf_spec, ctx):
def evaluate_workflow_output(wf_ex, wf_output, ctx):
"""Evaluates workflow output.
:param wf_ex: Workflow execution.
:param wf_spec: Workflow specification.
:param wf_output: Workflow output.
:param ctx: Final Data Flow context (cause task's outbound context).
"""
output_dict = wf_spec.get_output()
# Evaluate workflow 'output' clause using the final workflow context.
ctx_view = ContextView(ctx, wf_ex.context, wf_ex.input)
output = expr.evaluate_recursively(output_dict, ctx_view)
output = expr.evaluate_recursively(wf_output, ctx_view)
# TODO(rakhmerov): Many don't like that we return the whole context
# if 'output' is not explicitly defined.

View File

@ -200,11 +200,19 @@ class DirectWorkflowController(base.WorkflowController):
return True
def _find_end_tasks(self):
def is_end_task(t_ex):
try:
return not self._has_outbound_tasks(t_ex)
except exc.MistralException:
# If some error happened during the evaluation of outbound
# tasks we consider that the given task is an end task.
# Due to this output-on-error could reach the outbound context
# of given task also.
return True
return list(
filter(
lambda t_ex: not self._has_outbound_tasks(t_ex),
lookup_utils.find_successful_task_executions(self.wf_ex.id)
)
filter(is_end_task,
lookup_utils.find_completed_tasks(self.wf_ex.id))
)
def _has_outbound_tasks(self, task_ex):

View File

@ -104,6 +104,10 @@ def find_cancelled_task_executions(wf_ex_id):
return find_task_executions_with_state(wf_ex_id, states.CANCELLED)
def find_completed_tasks(wf_ex_id):
return db_api.get_completed_task_executions(workflow_execution_id=wf_ex_id)
def clean_caches():
with _TASK_EXECUTIONS_CACHE_LOCK:
_TASK_EXECUTIONS_CACHE.clear()

View File

@ -109,7 +109,10 @@ class ReverseWorkflowController(base.WorkflowController):
# executions for one task.
assert len(task_execs) <= 1
return data_flow.evaluate_task_outbound_context(task_execs[0])
if len(task_execs) == 1:
return data_flow.evaluate_task_outbound_context(task_execs[0])
else:
return {}
def get_logical_task_state(self, task_ex):
# TODO(rakhmerov): Implement.