Towards non-locking model: decouple WF completion check via scheduler
* In non-locking model, when a task completes we must perform workflow completion check in a separate scheduled transaction to deal with phantom reads of multiple tasks to determine if the entire workflow is completed. This new scheduled transaction checks workfow completion and if it's not it reschedules itself to repeat at a later time. TODO: * Implement a smarter algorithm of calculating delay for rescheduling Change-Id: Ic475ff75c9fbf120fa73cfe8c2e58ac54d7d7659
This commit is contained in:
@@ -21,12 +21,16 @@ from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import workflows
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import scheduler
|
||||
from mistral.workflow import states
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
_ON_TASK_COMPLETE_PATH = 'mistral.engine.workflow_handler.on_task_complete'
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-start-workflow')
|
||||
def start_workflow(wf_identifier, wf_input, desc, params):
|
||||
wf = workflows.Workflow(
|
||||
@@ -71,25 +75,38 @@ def cancel_workflow(wf_ex, msg=None):
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-on-task-complete')
|
||||
def on_task_complete(task_ex):
|
||||
wf_ex = task_ex.workflow_execution
|
||||
def on_task_complete(task_ex_id):
|
||||
# Note: This method can only be called via scheduler.
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
|
||||
wf = workflows.Workflow(
|
||||
db_api.get_workflow_definition(wf_ex.workflow_id),
|
||||
wf_ex=wf_ex
|
||||
)
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
try:
|
||||
wf.on_task_complete(task_ex)
|
||||
except exc.MistralException as e:
|
||||
msg = (
|
||||
"Failed to handle task completion [wf_ex=%s, task_ex=%s]: %s\n%s"
|
||||
% (wf_ex, task_ex, e, tb.format_exc())
|
||||
wf = workflows.Workflow(
|
||||
db_api.get_workflow_definition(wf_ex.workflow_id),
|
||||
wf_ex=wf_ex
|
||||
)
|
||||
|
||||
LOG.error(msg)
|
||||
try:
|
||||
wf.on_task_complete(task_ex)
|
||||
except exc.MistralException as e:
|
||||
msg = (
|
||||
"Failed to handle task completion [wf_ex=%s, task_ex=%s]:"
|
||||
" %s\n%s" % (wf_ex, task_ex, e, tb.format_exc())
|
||||
)
|
||||
|
||||
fail_workflow(wf.wf_ex, msg)
|
||||
LOG.error(msg)
|
||||
|
||||
fail_workflow(wf.wf_ex, msg)
|
||||
|
||||
return
|
||||
|
||||
if not states.is_completed(wf_ex.state):
|
||||
# TODO(rakhmerov): Moving forward we can implement some more fancy
|
||||
# algorithm for increasing delay for rescheduling so that we don't
|
||||
# put too serious load onto scheduler.
|
||||
delay = 1
|
||||
schedule_on_task_complete(task_ex, delay)
|
||||
|
||||
|
||||
def pause_workflow(wf_ex, msg=None):
|
||||
@@ -143,3 +160,30 @@ def lock_workflow_execution(wf_ex_id):
|
||||
# The method expires all session objects and returns the up-to-date
|
||||
# workflow execution from the DB.
|
||||
return db_api.acquire_lock(db_models.WorkflowExecution, wf_ex_id)
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-schedule-on-task-complete')
|
||||
def schedule_on_task_complete(task_ex, delay=0):
|
||||
"""Schedules task completion check.
|
||||
|
||||
This method provides transactional decoupling of task completion from
|
||||
workflow completion check. It's needed in non-locking model in order to
|
||||
avoid 'phantom read' phenomena when reading state of multiple tasks
|
||||
to see if a workflow is completed. Just starting a separate transaction
|
||||
without using scheduler is not safe due to concurrency window that we'll
|
||||
have in this case (time between transactions) whereas scheduler is a
|
||||
special component that is designed to be resistant to failures.
|
||||
|
||||
:param task_ex: Task execution.
|
||||
:param delay: Minimum amount of time before task completion check
|
||||
should be made.
|
||||
"""
|
||||
key = 'wfh_on_t_c-%s' % task_ex.workflow_execution_id
|
||||
|
||||
scheduler.schedule_call(
|
||||
None,
|
||||
_ON_TASK_COMPLETE_PATH,
|
||||
delay,
|
||||
unique_key=key,
|
||||
task_ex_id=task_ex.id
|
||||
)
|
||||
|
||||
Reference in New Issue
Block a user