diff --git a/mistral/engine1/policies.py b/mistral/engine1/policies.py index a9d52ea0..354ac564 100644 --- a/mistral/engine1/policies.py +++ b/mistral/engine1/policies.py @@ -145,9 +145,10 @@ class WaitAfterPolicy(base.TaskPolicy): policy_context = runtime_context[context_key] if policy_context.get('skip'): - # Unset state 'DELAYED'. - task_db.state = \ - states.ERROR if raw_result.is_error() else states.SUCCESS + # Need to avoid terminal states. + if not states.is_finished(task_db.state): + # Unset state 'DELAYED'. + task_db.state = states.RUNNING return diff --git a/mistral/tests/base.py b/mistral/tests/base.py index 90f41ce4..141f821c 100644 --- a/mistral/tests/base.py +++ b/mistral/tests/base.py @@ -167,6 +167,9 @@ class BaseTest(base.BaseTestCase): time.sleep(delay) + def _sleep(self, seconds): + time.sleep(seconds) + class DbTestCase(BaseTest): is_heavy_init_called = False diff --git a/mistral/tests/unit/engine1/test_policies.py b/mistral/tests/unit/engine1/test_policies.py index 7ef2f558..96eae0e7 100644 --- a/mistral/tests/unit/engine1/test_policies.py +++ b/mistral/tests/unit/engine1/test_policies.py @@ -142,6 +142,23 @@ workflows: """ +TIMEOUT_WB2 = """ +--- +version: '2.0' +name: wb +workflows: + wf1: + type: direct + + tasks: + task1: + action: std.echo output="Hi!" + policies: + wait-after: 2 + timeout: 1 +""" + + class PoliciesTest(base.EngineTestCase): def setUp(self): super(PoliciesTest, self).setUp() @@ -288,3 +305,29 @@ class PoliciesTest(base.EngineTestCase): exec_db = db_api.get_execution(exec_db.id) self.assertEqual(states.SUCCESS, exec_db.state) + + def test_timeout_policy_success_after_timeout(self): + wb_service.create_workbook_v2({'definition': TIMEOUT_WB2}) + + # Start workflow. + exec_db = self.engine.start_workflow('wb.wf1', {}) + + # Note: We need to reread execution to access related tasks. + exec_db = db_api.get_execution(exec_db.id) + task_db = exec_db.tasks[0] + + self.assertEqual(states.RUNNING, task_db.state) + + self._await( + lambda: self.is_execution_error(exec_db.id), + ) + + # Wait until timeout exceeds. + self._sleep(2) + + exec_db = db_api.get_execution(exec_db.id) + tasks_db = exec_db.tasks + + # Make sure that engine did not create extra tasks. + self.assertEqual(1, len(tasks_db)) + self.assertEqual(states.ERROR, tasks_db[0].state) diff --git a/mistral/workflow/base.py b/mistral/workflow/base.py index c46073d3..388f5242 100644 --- a/mistral/workflow/base.py +++ b/mistral/workflow/base.py @@ -63,11 +63,13 @@ class WorkflowHandler(object): (before publisher). Instance of mistral.workflow.utils.TaskResult :return List of engine commands that needs to be performed. """ - # TODO(rakhmerov): need to ignore result if task is complete. + + # Ignore if task already completed. + if states.is_finished(task_db.state): + return [] task_db.state = \ states.ERROR if raw_result.is_error() else states.SUCCESS - task_spec = self.wf_spec.get_tasks()[task_db.name] task_db.output =\