Ensure workflow check is retried on DB error

On DB error, the _check_and_complete logic is retried. DB disconnect errors
are not shadowed by subsequent DB operations in the failed transaction. Other
non DB disconnect or MistralExceptions are considered fatal, thus, the consistency
check is not rescheduled.

Change-Id: I093d5e2442a5489d406f4b8a188552f201b1a076
Closes-Bug: 1751018
This commit is contained in:
Andras Kovi 2018-02-22 17:42:13 +01:00 committed by Adriano Petrich
parent 2af724bf04
commit a59af3f247

View File

@ -101,12 +101,28 @@ def _check_and_complete(wf_ex_id):
wf = workflows.Workflow(wf_ex=wf_ex)
incomplete_tasks_count = 0
try:
check_and_fix_integrity(wf_ex)
incomplete_tasks_count = wf.check_and_complete()
num_incomplete_tasks = wf.check_and_complete()
if not states.is_completed(wf_ex.state):
delay = (
2 + int(num_incomplete_tasks * 0.1) if num_incomplete_tasks
else 4
)
# Rescheduling this check may not happen if erros are
# raised in the business logic. If the error is DB related
# and not considered fatal (e.g. disconnect, deadlock), the
# retry annotation around the method will ensure that the
# whole method is retried in a new transaction. On fatal
# errors, the check should not be rescheduled as it could
# result in undesired consequences.
# In case there are some errors that should not be
# considered fatal, those should be handled explicitly.
_schedule_check_and_complete(wf_ex, delay)
except exc.MistralException as e:
msg = (
"Failed to check and complete [wf_ex_id=%s, wf_name=%s]:"
@ -117,27 +133,6 @@ def _check_and_complete(wf_ex_id):
force_fail_workflow(wf.wf_ex, msg)
return
finally:
if states.is_completed(wf_ex.state):
return
# Let's assume that a task takes 0.01 sec in average to complete
# and based on this assumption calculate a time of the next check.
# The estimation is very rough but this delay will be decreasing
# as tasks will be completing which will give a decent
# approximation.
# For example, if a workflow has 100 incomplete tasks then the
# next check call will happen in 1 second. For 500 tasks it will
# be 5 seconds. The larger the workflow is, the more beneficial
# this mechanism will be.
delay = (
int(incomplete_tasks_count * 0.01) if incomplete_tasks_count
else 4
)
_schedule_check_and_complete(wf_ex, delay)
@profiler.trace('workflow-handler-check-and-fix-integrity')
def check_and_fix_integrity(wf_ex):