From 78b542c4c5e7ed6604bed4e50088690d187ab683 Mon Sep 17 00:00:00 2001 From: Vitalii Solodilov Date: Thu, 31 May 2018 12:36:37 +0400 Subject: [PATCH] Refresh a number of retry a task when task was rerun Change-Id: If0a8219bb54ee0d01084dbaf5c9ed5b2041c2bc4 Closes-Bug: #1772265 Signed-off-by: Vitalii Solodilov --- mistral/engine/policies.py | 9 ++++ mistral/engine/workflows.py | 6 +++ .../unit/engine/test_direct_workflow_rerun.py | 50 +++++++++++++++++++ 3 files changed, 65 insertions(+) diff --git a/mistral/engine/policies.py b/mistral/engine/policies.py index 418ca5ebc..844714bf0 100644 --- a/mistral/engine/policies.py +++ b/mistral/engine/policies.py @@ -405,6 +405,15 @@ class RetryPolicy(base.TaskPolicy): task_ex_id=task_ex.id, ) + @staticmethod + def refresh_runtime_context(task_ex): + runtime_context = task_ex.runtime_context or {} + retry_task_policy = runtime_context.get('retry_task_policy') + + if retry_task_policy: + retry_task_policy['retry_no'] = 0 + task_ex.runtime_context['retry_task_policy'] = retry_task_policy + class TimeoutPolicy(base.TaskPolicy): _schema = { diff --git a/mistral/engine/workflows.py b/mistral/engine/workflows.py index 3ee8247e1..e2fd62ffe 100644 --- a/mistral/engine/workflows.py +++ b/mistral/engine/workflows.py @@ -240,6 +240,12 @@ class Workflow(object): # Calculate commands to process next. cmds = wf_ctrl.rerun_tasks([task_ex], reset=reset) + if cmds: + # Import the task_handler module here to avoid circular reference. + from mistral.engine import policies + + policies.RetryPolicy.refresh_runtime_context(task_ex) + self._continue_workflow(cmds) def _get_backlog(self): diff --git a/mistral/tests/unit/engine/test_direct_workflow_rerun.py b/mistral/tests/unit/engine/test_direct_workflow_rerun.py index 0f0d39be7..c022e54fd 100644 --- a/mistral/tests/unit/engine/test_direct_workflow_rerun.py +++ b/mistral/tests/unit/engine/test_direct_workflow_rerun.py @@ -21,6 +21,7 @@ from mistral.actions import std_actions from mistral.db.v2 import api as db_api from mistral import exceptions as exc from mistral.services import workbooks as wb_service +from mistral.services import workflows as wf_service from mistral.tests.unit.engine import base from mistral.workflow import states @@ -1430,3 +1431,52 @@ class DirectWorkflowRerunTest(base.EngineTestCase): self.assertEqual(1, len(task_3_action_exs)) self.assertEqual(states.SUCCESS, task_3_action_exs[0].state) + + def test_rerun_task_with_retry_policy(self): + wf_service.create_workflows("""--- + version: '2.0' + wf_fail: + tasks: + task1: + action: std.fail + retry: + delay: 0 + count: 2""") + + wf_ex = self.engine.start_workflow("wf_fail") + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_ex = self._assert_single_item(wf_ex.task_executions, + name="task1") + action_executions = task_ex.executions + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(3, len(action_executions)) + self.assertTrue(all(a.state == states.ERROR + for a in action_executions)) + + self.engine.rerun_workflow(task_ex.id) + + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + self.assertEqual(states.RUNNING, wf_ex.state) + + self.await_workflow_error(wf_ex.id) + + with db_api.transaction(): + wf_ex = db_api.get_workflow_execution(wf_ex.id) + + task_ex = self._assert_single_item(wf_ex.task_executions, + name="task1") + action_executions = task_ex.executions + + self.assertEqual(states.ERROR, wf_ex.state) + self.assertIsNotNone(wf_ex.state_info) + self.assertEqual(6, len(action_executions)) + self.assertTrue(all(a.state == states.ERROR + for a in action_executions))