Fixing sending the result of subworkflow
* This is why retry didn't work for subworkflow * Engine commands such as fail or succeed doesn't send the result of current workflow to parent workflow if needed. Closes-Bug: #1449298 Change-Id: Ia0d87160a51f53a686ca0451a2000298cec5a929
This commit is contained in:
parent
4e699ce21b
commit
766698bd43
@ -265,17 +265,36 @@ class DefaultEngine(base.Engine):
|
||||
|
||||
wf_ex = db_api.get_execution(execution_id)
|
||||
|
||||
wf_handler.set_execution_state(wf_ex, state, message)
|
||||
return self._stop_workflow(wf_ex, state, message)
|
||||
|
||||
return wf_ex
|
||||
@staticmethod
|
||||
def _stop_workflow(wf_ex, state, message=None):
|
||||
if state == states.SUCCESS:
|
||||
wf_ctrl = wf_base.WorkflowController.get_controller(wf_ex)
|
||||
|
||||
final_context = {}
|
||||
try:
|
||||
final_context = wf_ctrl.evaluate_workflow_final_context()
|
||||
except Exception as e:
|
||||
LOG.warn(
|
||||
"Failed to get final context for %s: %s" % (wf_ex, e)
|
||||
)
|
||||
wf_handler.succeed_workflow(
|
||||
wf_ex,
|
||||
final_context,
|
||||
message
|
||||
)
|
||||
elif state == states.ERROR:
|
||||
wf_handler.fail_workflow(wf_ex, message)
|
||||
|
||||
return wf_ex
|
||||
|
||||
@u.log_exec(LOG)
|
||||
def rollback_workflow(self, execution_id):
|
||||
# TODO(rakhmerov): Implement.
|
||||
raise NotImplementedError
|
||||
|
||||
@staticmethod
|
||||
def _dispatch_workflow_commands(wf_ex, wf_cmds):
|
||||
def _dispatch_workflow_commands(self, wf_ex, wf_cmds):
|
||||
if not wf_cmds:
|
||||
return
|
||||
|
||||
@ -285,8 +304,10 @@ class DefaultEngine(base.Engine):
|
||||
elif isinstance(cmd, commands.RunExistingTask):
|
||||
task_handler.run_existing_task(cmd.task_ex.id)
|
||||
elif isinstance(cmd, commands.SetWorkflowState):
|
||||
# TODO(rakhmerov): Special commands should be persisted too.
|
||||
wf_handler.set_execution_state(wf_ex, cmd.new_state)
|
||||
if states.is_completed(cmd.new_state):
|
||||
self._stop_workflow(cmd.wf_ex, cmd.new_state, cmd.msg)
|
||||
else:
|
||||
wf_handler.set_execution_state(wf_ex, cmd.new_state)
|
||||
elif isinstance(cmd, commands.Noop):
|
||||
# Do nothing.
|
||||
pass
|
||||
|
@ -23,8 +23,8 @@ from mistral.workflow import states
|
||||
from mistral.workflow import utils as wf_utils
|
||||
|
||||
|
||||
def succeed_workflow(wf_ex, final_context):
|
||||
set_execution_state(wf_ex, states.SUCCESS)
|
||||
def succeed_workflow(wf_ex, final_context, state_info=None):
|
||||
set_execution_state(wf_ex, states.SUCCESS, state_info)
|
||||
|
||||
wf_spec = spec_parser.get_workflow_spec(wf_ex.spec)
|
||||
|
||||
|
@ -412,11 +412,9 @@ class DefaultEngineTest(base.DbTestCase):
|
||||
# Re-read execution to access related tasks.
|
||||
wf_ex = db_api.get_execution(wf_ex.id)
|
||||
|
||||
self.assertRaises(
|
||||
exc.WorkflowException,
|
||||
self.engine.stop_workflow,
|
||||
wf_ex.id,
|
||||
'PAUSE'
|
||||
self.assertNotEqual(
|
||||
'PAUSE',
|
||||
self.engine.stop_workflow(wf_ex.id, 'PAUSE')
|
||||
)
|
||||
|
||||
def test_resume_workflow(self):
|
||||
|
@ -521,6 +521,48 @@ class PoliciesTest(base.EngineTestCase):
|
||||
task_ex.runtime_context['retry_task_policy']['retry_no']
|
||||
)
|
||||
|
||||
def test_retry_policy_subworkflow_force_fail(self):
|
||||
retry_wb = """---
|
||||
version: '2.0'
|
||||
|
||||
name: wb
|
||||
|
||||
workflows:
|
||||
main:
|
||||
tasks:
|
||||
task1:
|
||||
workflow: work
|
||||
retry:
|
||||
count: 3
|
||||
delay: 1
|
||||
|
||||
work:
|
||||
tasks:
|
||||
do:
|
||||
action: std.fail
|
||||
on-error:
|
||||
- fail
|
||||
"""
|
||||
wb_service.create_workbook_v2(retry_wb)
|
||||
|
||||
# Start workflow.
|
||||
wf_ex = self.engine.start_workflow('wb.main', {})
|
||||
|
||||
# Note: We need to reread execution to access related tasks.
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
self._await(lambda: self.is_task_error(task_ex.id))
|
||||
self._await(lambda: self.is_execution_error(wf_ex.id))
|
||||
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex.id)
|
||||
task_ex = wf_ex.task_executions[0]
|
||||
|
||||
self.assertEqual(
|
||||
2,
|
||||
task_ex.runtime_context['retry_task_policy']['retry_no']
|
||||
)
|
||||
|
||||
def test_timeout_policy(self):
|
||||
wb_service.create_workbook_v2(TIMEOUT_WB)
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user