Get correct inbound tasks context for retry policy

Change-Id: Ifd18684f2756b8a23f06c94197edd1f348da8aa0
Cloese-Bug: #1634812
This commit is contained in:
Lingxian Kong 2016-10-19 22:50:13 +13:00
parent 430f4e106b
commit 455e12e3a7
2 changed files with 47 additions and 1 deletions

View File

@ -872,6 +872,52 @@ class PoliciesTest(base.EngineTestCase):
self.assertDictEqual({'result': 'mocked result'}, wf_ex.output)
@mock.patch.object(
std_actions.EchoAction,
'run',
mock.MagicMock(side_effect=[exc.ActionException(), 'value'])
)
def test_retry_policy_succeed_after_failure_with_publish(self):
retry_wf = """---
version: '2.0'
wf1:
output:
result: <% task(task2).result %>
tasks:
task1:
action: std.noop
publish:
key: value
on-success:
- task2
task2:
action: std.echo output=<% $.key %>
retry:
count: 3
delay: 1
"""
wf_service.create_workflows(retry_wf)
wf_ex = self.engine.start_workflow('wf1', {})
self.await_workflow_success(wf_ex.id)
wf_ex = db_api.get_workflow_execution(wf_ex.id)
retry_task = self._assert_single_item(
wf_ex.task_executions,
name='task2'
)
self.assertDictEqual(
{'retry_no': 1},
retry_task.runtime_context['retry_task_policy']
)
self.assertDictEqual({'result': 'value'}, wf_ex.output)
def test_timeout_policy(self):
wb_service.create_workbook_v2(TIMEOUT_WB)

View File

@ -59,7 +59,7 @@ class DirectWorkflowController(base.WorkflowController):
return False
if not t_spec.get_join():
return not t_ex_candidate.processed
return t_ex_candidate.processed
induced_state, _ = self._get_induced_join_state(
self.wf_spec.get_tasks()[t_ex_candidate.name],