Error result: allow actions to return instance of wf_utils.Result

* Actions are now allowed to return instance of workflow.utils.Result
  if they need a notion of "error result" which means that something
  went wrong but we can see what exactly is wrong. An example is std.http
  action which can return HTTP status code instead of just blindly raising
  an exception. See the bug description for more details.
* In "on-error" clause we now can evaluate condition expressions based
  on an error result returned by task action.
* "publish" clause is still ignored when task has ERROR state.

TODO:
 * Add more tests, especially for various corner cases.
 * Fix standard actions (e.g std.http).
 * Make sure workflows can also return error result.
 * Add corresponding functional tests.

Change-Id: If1531b3da31de95be9717570a29d87a1a424d7a5
This commit is contained in:
Renat Akhmerov
2015-07-15 14:35:39 +06:00
parent 990f711e71
commit b27d0d3ca8
7 changed files with 55 additions and 34 deletions

View File

@@ -157,11 +157,11 @@ def store_action_result(action_ex, result):
if result.is_success():
action_ex.state = states.SUCCESS
action_ex.output = {'result': result.data}
action_ex.accepted = True
else:
action_ex.state = states.ERROR
action_ex.output = {'result': result.error}
action_ex.accepted = False
action_ex.accepted = True
_log_action_result(action_ex, prev_state, action_ex.state, result)

View File

@@ -218,7 +218,8 @@ class DefaultEngine(base.Engine):
# assigned task execution.
if not action_ex.task_execution:
return action_handler.store_action_result(
action_ex, result
action_ex,
result
).get_clone()
wf_ex_id = action_ex.task_execution.workflow_execution_id

View File

@@ -52,13 +52,17 @@ class DefaultExecutor(base.Executor):
try:
action = action_cls(**action_params)
result = action.run()
if action.is_sync() and action_ex_id:
self._engine_client.on_action_complete(
action_ex_id,
wf_utils.Result(data=result)
)
# Note: it's made for backwards compatibility with already
# existing Mistral actions which don't return result as
# instance of workflow.utils.Result.
if not isinstance(result, wf_utils.Result):
result = wf_utils.Result(data=result)
self._engine_client.on_action_complete(action_ex_id, result)
return result
except TypeError as e:

View File

@@ -249,7 +249,7 @@ class EngineClient(base.Engine):
)
@wrap_messaging_exception
def start_workflow(self, wf_name, wf_input, description, **params):
def start_workflow(self, wf_name, wf_input, description='', **params):
"""Starts workflow sending a request to engine over RPC.
:return: Workflow execution.

View File

@@ -147,22 +147,21 @@ def on_action_complete(action_ex, result):
wf_ex = task_ex.workflow_execution
# Ignore workflow executions because they're handled during
# workflow completion
# workflow completion.
if not isinstance(action_ex, models.WorkflowExecution):
action_handler.store_action_result(action_ex, result)
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
task_spec = wf_spec.get_tasks()[task_ex.name]
if result.is_success():
if not task_spec.get_with_items():
_complete_task(task_ex, task_spec, states.SUCCESS)
else:
if with_items.iterations_completed(task_ex):
_complete_task(task_ex, task_spec, states.SUCCESS)
task_state = states.SUCCESS if result.is_success() else states.ERROR
if not task_spec.get_with_items():
_complete_task(task_ex, task_spec, task_state)
else:
_complete_task(task_ex, task_spec, states.ERROR)
if (task_state == states.ERROR or
with_items.iterations_completed(task_ex)):
_complete_task(task_ex, task_spec, task_state)
return task_ex
@@ -449,11 +448,10 @@ def _complete_task(task_ex, task_spec, state):
_set_task_state(task_ex, state)
if task_ex.state == states.SUCCESS:
data_flow.publish_variables(
task_ex,
task_spec
)
data_flow.publish_variables(
task_ex,
task_spec
)
if not task_spec.get_keep_result():
data_flow.destroy_task_result(task_ex)

View File

@@ -14,7 +14,6 @@
from oslo_config import cfg
from oslo_log import log as logging
import testtools
from mistral.actions import base as actions_base
from mistral.db.v2 import api as db_api
@@ -22,6 +21,7 @@ from mistral.services import workflows as wf_service
from mistral.tests import base as test_base
from mistral.tests.unit.engine import base
from mistral.workflow import states
from mistral.workflow import utils as wf_utils
LOG = logging.getLogger(__name__)
@@ -31,15 +31,18 @@ cfg.CONF.set_default('auth_enable', False, group='pecan')
class MyAction(actions_base.Action):
def __init__(self, error_result):
def __init__(self, success_result, error_result):
self.success_result = success_result
self.error_result = error_result
def run(self):
# TODO(rakhmerov): The current state of the code shows that action
# contract is not complete if we want to handle wf errors (like
# http status codes) because action should be able to pass a result
# type (success / error) as well as the result itself. This is TBD.
return {'error_result': self.error_result}
return wf_utils.Result(
data=self.success_result,
error=self.error_result
)
def test(self):
raise NotImplementedError
class ErrorResultTest(base.EngineTestCase):
@@ -48,18 +51,23 @@ class ErrorResultTest(base.EngineTestCase):
test_base.register_action_class('my_action', MyAction)
@testtools.skip('Make it work.')
def test_error_result(self):
wf_text = """---
version: '2.0'
wf:
input:
- success_result
- error_result
tasks:
task1:
action: my_action error_result=<% $.error_result %>
action: my_action
input:
success_result: <% $.success_result %>
error_result: <% $.error_result %>
publish:
p_var: <% $.task1.some_field %>
on-error:
- task2: <% $.task1 = 2 %>
- task3: <% $.task1 = 3 %>
@@ -74,7 +82,13 @@ class ErrorResultTest(base.EngineTestCase):
wf_service.create_workflows(wf_text)
# Start workflow.
wf_ex = self.engine.start_workflow('wf', {'error_result': 2})
wf_ex = self.engine.start_workflow(
'wf',
{
'success_result': None,
'error_result': 2
}
)
self._await(lambda: self.is_execution_success(wf_ex.id))
@@ -90,3 +104,7 @@ class ErrorResultTest(base.EngineTestCase):
self.assertEqual(states.ERROR, task1.state)
self.assertEqual(states.SUCCESS, task2.state)
# "publish" clause is ignored in case of ERROR so task execution field
# must be empty.
self.assertDictEqual({}, task1.published)

View File

@@ -143,6 +143,9 @@ class ProxyAwareDict(dict):
def publish_variables(task_ex, task_spec):
if task_ex.state != states.SUCCESS:
return
expr_ctx = extract_task_result_proxies_to_context(task_ex.in_context)
if task_ex.name in expr_ctx:
@@ -177,9 +180,6 @@ def evaluate_task_outbound_context(task_ex, include_result=True):
:return: Outbound task Data Flow context.
"""
if task_ex.state != states.SUCCESS:
return task_ex.in_context
in_context = (copy.deepcopy(dict(task_ex.in_context))
if task_ex.in_context is not None else {})