diff --git a/mistral/engine/workflow_handler.py b/mistral/engine/workflow_handler.py index f1c8ae075..7a9e3bfae 100644 --- a/mistral/engine/workflow_handler.py +++ b/mistral/engine/workflow_handler.py @@ -101,12 +101,28 @@ def _check_and_complete(wf_ex_id): wf = workflows.Workflow(wf_ex=wf_ex) - incomplete_tasks_count = 0 - try: check_and_fix_integrity(wf_ex) - incomplete_tasks_count = wf.check_and_complete() + num_incomplete_tasks = wf.check_and_complete() + + if not states.is_completed(wf_ex.state): + delay = ( + 2 + int(num_incomplete_tasks * 0.1) if num_incomplete_tasks + else 4 + ) + + # Rescheduling this check may not happen if erros are + # raised in the business logic. If the error is DB related + # and not considered fatal (e.g. disconnect, deadlock), the + # retry annotation around the method will ensure that the + # whole method is retried in a new transaction. On fatal + # errors, the check should not be rescheduled as it could + # result in undesired consequences. + # In case there are some errors that should not be + # considered fatal, those should be handled explicitly. + _schedule_check_and_complete(wf_ex, delay) + except exc.MistralException as e: msg = ( "Failed to check and complete [wf_ex_id=%s, wf_name=%s]:" @@ -117,27 +133,6 @@ def _check_and_complete(wf_ex_id): force_fail_workflow(wf.wf_ex, msg) - return - finally: - if states.is_completed(wf_ex.state): - return - - # Let's assume that a task takes 0.01 sec in average to complete - # and based on this assumption calculate a time of the next check. - # The estimation is very rough but this delay will be decreasing - # as tasks will be completing which will give a decent - # approximation. - # For example, if a workflow has 100 incomplete tasks then the - # next check call will happen in 1 second. For 500 tasks it will - # be 5 seconds. The larger the workflow is, the more beneficial - # this mechanism will be. - delay = ( - int(incomplete_tasks_count * 0.01) if incomplete_tasks_count - else 4 - ) - - _schedule_check_and_complete(wf_ex, delay) - @profiler.trace('workflow-handler-check-and-fix-integrity') def check_and_fix_integrity(wf_ex):