Implementing 'continue-on' retry policy property

Implements blueprint mistral-retry-continue-on

Change-Id: Idf893fdb201a05521ab2503ad756bafda95ae7d5
This commit is contained in:
Nikolay Mahotkin 2015-06-03 17:08:02 +03:00
parent bee9395e35
commit b42a5b86a2
4 changed files with 111 additions and 11 deletions

View File

@ -16,8 +16,10 @@
from mistral.db.v2 import api as db_api
from mistral.engine import base
from mistral.engine import rpc
from mistral import expressions
from mistral.services import scheduler
from mistral.utils import wf_trace
from mistral.workflow import data_flow
from mistral.workflow import states
@ -99,7 +101,8 @@ def build_retry_policy(policies_spec):
return RetryPolicy(
retry.get_count(),
retry.get_delay(),
retry.get_break_on()
retry.get_break_on(),
retry.get_continue_on()
)
@ -245,16 +248,23 @@ class RetryPolicy(base.TaskPolicy):
}
}
def __init__(self, count, delay, break_on):
def __init__(self, count, delay, break_on, continue_on):
self.count = count
self.delay = delay
self.break_on = break_on
self._continue_on = continue_on
def after_task_complete(self, task_ex, task_spec):
"""Possible Cases:
1. state = SUCCESS
No need to move to next iteration.
if continue_on is not specified,
no need to move to next iteration;
if current:count achieve retry:count then policy
breaks the loop (regardless on continue-on condition);
otherwise - check continue_on condition and if
it is True - schedule the next iteration,
otherwise policy breaks the loop.
2. retry:count = 5, current:count = 2, state = ERROR,
state = IDLE/DELAYED, current:count = 3
3. retry:count = 5, current:count = 4, state = ERROR
@ -269,19 +279,18 @@ class RetryPolicy(base.TaskPolicy):
context_key
)
continue_on = expressions.evaluate(
self._continue_on,
data_flow.evaluate_task_outbound_context(task_ex)
)
task_ex.runtime_context = runtime_context
state = task_ex.state
if state != states.ERROR:
if not states.is_completed(state):
return
wf_trace.info(
task_ex,
"Task '%s' [%s -> ERROR]"
% (task_ex.name, task_ex.state)
)
policy_context = runtime_context[context_key]
retry_no = 0
@ -292,11 +301,15 @@ class RetryPolicy(base.TaskPolicy):
retries_remain = retry_no + 1 < self.count
if not retries_remain or self.break_on:
continue_triggered = self._continue_on and not continue_on
break_triggered = task_ex.state == states.ERROR and self.break_on
if not retries_remain or break_triggered or continue_triggered:
return
_log_task_delay(task_ex, self.delay)
data_flow.invalidate_task_execution_result(task_ex)
task_ex.state = states.DELAYED
policy_context['retry_no'] = retry_no + 1

View File

@ -12,8 +12,10 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import mock
from oslo.config import cfg
from mistral.actions import std_actions
from mistral.db.v2 import api as db_api
from mistral.engine import policies
from mistral import exceptions as exc
@ -485,6 +487,83 @@ class PoliciesTest(base.EngineTestCase):
task_ex.runtime_context['retry_task_policy']['retry_no']
)
@mock.patch.object(
std_actions.EchoAction, 'run', mock.Mock(side_effect=[1, 2, 3, 4])
)
def test_retry_continue_on(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
action: std.echo output="mocked result"
retry:
count: 4
delay: 1
continue-on: <% $.task1 < 3 %>
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self._await(lambda: self.is_task_success(task_ex.id))
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
2,
task_ex.runtime_context['retry_task_policy']['retry_no']
)
def test_retry_continue_on_not_happened(self):
retry_wb = """---
version: '2.0'
name: wb
workflows:
wf1:
tasks:
task1:
action: std.echo output=4
retry:
count: 4
delay: 1
continue-on: <% $.task1 <= 3 %>
"""
wb_service.create_workbook_v2(retry_wb)
# Start workflow.
wf_ex = self.engine.start_workflow('wb.wf1', {})
# Note: We need to reread execution to access related tasks.
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self._await(lambda: self.is_task_success(task_ex.id))
self._await(lambda: self.is_execution_success(wf_ex.id))
wf_ex = db_api.get_workflow_execution(wf_ex.id)
task_ex = wf_ex.task_executions[0]
self.assertEqual(
{},
task_ex.runtime_context['retry_task_policy']
)
def test_retry_policy_one_line(self):
retry_wb = """---
version: '2.0'

View File

@ -85,5 +85,8 @@ class RetrySpec(base.BaseSpec):
def get_break_on(self):
return self._break_on
def get_continue_on(self):
return self._continue_on
def get_delay(self):
return self._delay

View File

@ -79,6 +79,11 @@ def _extract_execution_result(ex):
return ex.output['result']
def invalidate_task_execution_result(task_ex):
for ex in task_ex.executions:
ex.accepted = False
def get_task_execution_result(task_ex):
action_execs = task_ex.executions
action_execs.sort(