Fix retrying tasks after workflow is paused

Current behaviour if we pause an execution is task will
continue execution until retry limit. We should also pause
task execution and continue it after workflow execution is
resumed. It's more strange when Mistral itself is paused,
because we also countinue run tasks.
As reties are mostly used when there is the likelihood
of temporary problems like network problems, access to
exeternal API etc. I think it's better to start retry
execution from the begining:
Retry count: 10 ->
Pause execution at 5 retry ->
Resume execution ->
15 retries finally

So we need to stop task retrying if workflow is paused
and add marker that this task should be restarted after
mistral is resumed.
After workflow is resumed we should find all task to restart
and create RunExistingTask commands.

Closes-Bug: #1769012
Change-Id: Ie00d481b5e579afc76b67006c006f0c0b690e5b6
Signed-off-by: Vadim Zelenevsky <wortellen@gmail.com>
This commit is contained in:
Vadim Zelenevsky 2024-04-08 09:55:24 +03:00
parent 288da4d5b0
commit 45d6edd151
3 changed files with 29 additions and 1 deletions

View File

@ -315,6 +315,8 @@ class RetryPolicy(base.TaskPolicy):
# iteration behind.
ex = task.task_ex.executions # noqa
wf_ex = task.task_ex.workflow_execution
ctx_key = 'retry_task_policy'
expr_ctx = task.get_expression_context(
@ -357,6 +359,15 @@ class RetryPolicy(base.TaskPolicy):
(self._continue_on_clause and not continue_on_evaluation)
)
stop_continue_flag = (
stop_continue_flag or
wf_ex.state == states.PAUSED
)
if wf_ex.state == states.PAUSED:
policy_ctx['to_retry'] = 'true'
task.touch_runtime_context()
break_triggered = (
task.get_state() == states.ERROR and
break_on_evaluation

View File

@ -318,7 +318,10 @@ class Workflow(object, metaclass=abc.ABCMeta):
# because workflow controller takes only completed tasks
# with flag 'processed' equal to False.
for t_ex in self.wf_ex.task_executions:
if states.is_completed(t_ex.state) and not t_ex.processed:
retry_policy = t_ex.runtime_context.get('retry_task_policy', {})
to_retry = retry_policy.get('to_retry', 'false')
if (states.is_completed(t_ex.state) and not t_ex.processed
and to_retry != 'true'):
t_ex.processed = True
if cmds or self._get_backlog():

View File

@ -277,6 +277,20 @@ class WorkflowController(object):
if task_ex:
return []
# If workflow execution was paused, but failed tasks executions with
# retries weren't completely executed
cmds = []
for t in self._get_task_executions(state=states.ERROR):
retry_policy = t.runtime_context.get('retry_task_policy', {})
to_retry = retry_policy.get('to_retry', 'false')
if to_retry == 'true':
cmds.append(
commands.RunExistingTask(self.wf_ex, self.wf_spec, t)
)
if cmds:
return cmds
# Add all tasks in IDLE state.
return [
commands.RunExistingTask(self.wf_ex, self.wf_spec, t)