diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 2ead5d74..378dfa96 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -16,8 +16,10 @@ from mistral.db.v2 import api as db_api from mistral.engine import base from mistral.engine import rpc +from mistral import expressions from mistral.services import scheduler from mistral.utils import wf_trace +from mistral.workflow import data_flow from mistral.workflow import states @@ -99,7 +101,8 @@ def build_retry_policy(policies_spec): return RetryPolicy( retry.get_count(), retry.get_delay(), - retry.get_break_on() + retry.get_break_on(), + retry.get_continue_on() ) @@ -245,16 +248,23 @@ class RetryPolicy(base.TaskPolicy): } } - def __init__(self, count, delay, break_on): + def __init__(self, count, delay, break_on, continue_on): self.count = count self.delay = delay self.break_on = break_on + self._continue_on = continue_on def after_task_complete(self, task_ex, task_spec): """Possible Cases: 1. state = SUCCESS - No need to move to next iteration. + if continue_on is not specified, + no need to move to next iteration; + if current:count achieve retry:count then policy + breaks the loop (regardless on continue-on condition); + otherwise - check continue_on condition and if + it is True - schedule the next iteration, + otherwise policy breaks the loop. 2. retry:count = 5, current:count = 2, state = ERROR, state = IDLE/DELAYED, current:count = 3 3. retry:count = 5, current:count = 4, state = ERROR @@ -269,19 +279,18 @@ class RetryPolicy(base.TaskPolicy): context_key ) + continue_on = expressions.evaluate( + self._continue_on, + data_flow.evaluate_task_outbound_context(task_ex) + ) + task_ex.runtime_context = runtime_context state = task_ex.state - if state != states.ERROR: + if not states.is_completed(state): return - wf_trace.info( - task_ex, - "Task '%s' [%s -> ERROR]" - % (task_ex.name, task_ex.state) - ) - policy_context = runtime_context[context_key] retry_no = 0 @@ -292,11 +301,15 @@ class RetryPolicy(base.TaskPolicy): retries_remain = retry_no + 1 < self.count - if not retries_remain or self.break_on: + continue_triggered = self._continue_on and not continue_on + break_triggered = task_ex.state == states.ERROR and self.break_on + + if not retries_remain or break_triggered or continue_triggered: return _log_task_delay(task_ex, self.delay) + data_flow.invalidate_task_execution_result(task_ex) task_ex.state = states.DELAYED policy_context['retry_no'] = retry_no + 1 diff --git a/mistral/tests/unit/engine/test_policies.py b/mistral/tests/unit/engine/test_policies.py index 9330495f..3fcc5a7c 100644 --- a/mistral/tests/unit/engine/test_policies.py +++ b/mistral/tests/unit/engine/test_policies.py @@ -12,8 +12,10 @@ # See the License for the specific language governing permissions and # limitations under the License. +import mock from oslo.config import cfg +from mistral.actions import std_actions from mistral.db.v2 import api as db_api from mistral.engine import policies from mistral import exceptions as exc @@ -485,6 +487,83 @@ class PoliciesTest(base.EngineTestCase): task_ex.runtime_context['retry_task_policy']['retry_no'] ) + @mock.patch.object( + std_actions.EchoAction, 'run', mock.Mock(side_effect=[1, 2, 3, 4]) + ) + def test_retry_continue_on(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output="mocked result" + retry: + count: 4 + delay: 1 + continue-on: <% $.task1 < 3 %> + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1', {}) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self._await(lambda: self.is_task_success(task_ex.id)) + + self._await(lambda: self.is_execution_success(wf_ex.id)) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual( + 2, + task_ex.runtime_context['retry_task_policy']['retry_no'] + ) + + def test_retry_continue_on_not_happened(self): + retry_wb = """--- + version: '2.0' + + name: wb + + workflows: + wf1: + tasks: + task1: + action: std.echo output=4 + retry: + count: 4 + delay: 1 + continue-on: <% $.task1 <= 3 %> + """ + wb_service.create_workbook_v2(retry_wb) + + # Start workflow. + wf_ex = self.engine.start_workflow('wb.wf1', {}) + + # Note: We need to reread execution to access related tasks. + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self._await(lambda: self.is_task_success(task_ex.id)) + + self._await(lambda: self.is_execution_success(wf_ex.id)) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + task_ex = wf_ex.task_executions[0] + + self.assertEqual( + {}, + task_ex.runtime_context['retry_task_policy'] + ) + def test_retry_policy_one_line(self): retry_wb = """--- version: '2.0' diff --git a/mistral/workbook/v2/retry_policy.py b/mistral/workbook/v2/retry_policy.py index 08394176..aa363a9a 100644 --- a/mistral/workbook/v2/retry_policy.py +++ b/mistral/workbook/v2/retry_policy.py @@ -85,5 +85,8 @@ class RetrySpec(base.BaseSpec): def get_break_on(self): return self._break_on + def get_continue_on(self): + return self._continue_on + def get_delay(self): return self._delay diff --git a/mistral/workflow/data_flow.py b/mistral/workflow/data_flow.py index 1f23ff7b..c535850e 100644 --- a/mistral/workflow/data_flow.py +++ b/mistral/workflow/data_flow.py @@ -79,6 +79,11 @@ def _extract_execution_result(ex): return ex.output['result'] +def invalidate_task_execution_result(task_ex): + for ex in task_ex.executions: + ex.accepted = False + + def get_task_execution_result(task_ex): action_execs = task_ex.executions action_execs.sort(