Merge "Ensure workflow check is retried on DB error"
This commit is contained in:
commit
1361e8d2ad
@ -101,12 +101,28 @@ def _check_and_complete(wf_ex_id):
|
|||||||
|
|
||||||
wf = workflows.Workflow(wf_ex=wf_ex)
|
wf = workflows.Workflow(wf_ex=wf_ex)
|
||||||
|
|
||||||
incomplete_tasks_count = 0
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
check_and_fix_integrity(wf_ex)
|
check_and_fix_integrity(wf_ex)
|
||||||
|
|
||||||
incomplete_tasks_count = wf.check_and_complete()
|
num_incomplete_tasks = wf.check_and_complete()
|
||||||
|
|
||||||
|
if not states.is_completed(wf_ex.state):
|
||||||
|
delay = (
|
||||||
|
2 + int(num_incomplete_tasks * 0.1) if num_incomplete_tasks
|
||||||
|
else 4
|
||||||
|
)
|
||||||
|
|
||||||
|
# Rescheduling this check may not happen if erros are
|
||||||
|
# raised in the business logic. If the error is DB related
|
||||||
|
# and not considered fatal (e.g. disconnect, deadlock), the
|
||||||
|
# retry annotation around the method will ensure that the
|
||||||
|
# whole method is retried in a new transaction. On fatal
|
||||||
|
# errors, the check should not be rescheduled as it could
|
||||||
|
# result in undesired consequences.
|
||||||
|
# In case there are some errors that should not be
|
||||||
|
# considered fatal, those should be handled explicitly.
|
||||||
|
_schedule_check_and_complete(wf_ex, delay)
|
||||||
|
|
||||||
except exc.MistralException as e:
|
except exc.MistralException as e:
|
||||||
msg = (
|
msg = (
|
||||||
"Failed to check and complete [wf_ex_id=%s, wf_name=%s]:"
|
"Failed to check and complete [wf_ex_id=%s, wf_name=%s]:"
|
||||||
@ -117,27 +133,6 @@ def _check_and_complete(wf_ex_id):
|
|||||||
|
|
||||||
force_fail_workflow(wf.wf_ex, msg)
|
force_fail_workflow(wf.wf_ex, msg)
|
||||||
|
|
||||||
return
|
|
||||||
finally:
|
|
||||||
if states.is_completed(wf_ex.state):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Let's assume that a task takes 0.01 sec in average to complete
|
|
||||||
# and based on this assumption calculate a time of the next check.
|
|
||||||
# The estimation is very rough but this delay will be decreasing
|
|
||||||
# as tasks will be completing which will give a decent
|
|
||||||
# approximation.
|
|
||||||
# For example, if a workflow has 100 incomplete tasks then the
|
|
||||||
# next check call will happen in 1 second. For 500 tasks it will
|
|
||||||
# be 5 seconds. The larger the workflow is, the more beneficial
|
|
||||||
# this mechanism will be.
|
|
||||||
delay = (
|
|
||||||
int(incomplete_tasks_count * 0.01) if incomplete_tasks_count
|
|
||||||
else 4
|
|
||||||
)
|
|
||||||
|
|
||||||
_schedule_check_and_complete(wf_ex, delay)
|
|
||||||
|
|
||||||
|
|
||||||
@profiler.trace('workflow-handler-check-and-fix-integrity')
|
@profiler.trace('workflow-handler-check-and-fix-integrity')
|
||||||
def check_and_fix_integrity(wf_ex):
|
def check_and_fix_integrity(wf_ex):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user