Towards non-locking model: remove pessimistic locks
TODO: * Fix reverse workflow * Do more testing on complex workflows Partially implements: blueprint mistral-non-locking-tx-model Change-Id: Ieceb0ea31492f4641b4f48acded9fca0d1475606
This commit is contained in:
@@ -93,13 +93,6 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
else:
|
||||
action_ex = db_api.get_action_execution(action_ex_id)
|
||||
|
||||
task_ex = action_ex.task_execution
|
||||
|
||||
if task_ex:
|
||||
wf_handler.lock_workflow_execution(
|
||||
task_ex.workflow_execution_id
|
||||
)
|
||||
|
||||
action_handler.on_action_complete(action_ex, result)
|
||||
|
||||
return action_ex.get_clone()
|
||||
@@ -107,7 +100,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
@u.log_exec(LOG)
|
||||
def pause_workflow(self, wf_ex_id):
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
|
||||
wf_handler.pause_workflow(wf_ex)
|
||||
|
||||
@@ -118,9 +111,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
with db_api.transaction():
|
||||
task_ex = db_api.get_task_execution(task_ex_id)
|
||||
|
||||
wf_ex = wf_handler.lock_workflow_execution(
|
||||
task_ex.workflow_execution_id
|
||||
)
|
||||
wf_ex = task_ex.workflow_execution
|
||||
|
||||
wf_handler.rerun_workflow(wf_ex, task_ex, reset=reset, env=env)
|
||||
|
||||
@@ -129,7 +120,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
@u.log_exec(LOG)
|
||||
def resume_workflow(self, wf_ex_id, env=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
|
||||
wf_handler.resume_workflow(wf_ex, env=env)
|
||||
|
||||
@@ -138,7 +129,7 @@ class DefaultEngine(base.Engine, coordination.Service):
|
||||
@u.log_exec(LOG)
|
||||
def stop_workflow(self, wf_ex_id, state, message=None):
|
||||
with db_api.transaction():
|
||||
wf_ex = wf_handler.lock_workflow_execution(wf_ex_id)
|
||||
wf_ex = db_api.get_workflow_execution(wf_ex_id)
|
||||
|
||||
wf_handler.stop_workflow(wf_ex, state, message)
|
||||
|
||||
|
||||
@@ -18,7 +18,6 @@ from osprofiler import profiler
|
||||
import traceback as tb
|
||||
|
||||
from mistral.db.v2 import api as db_api
|
||||
from mistral.db.v2.sqlalchemy import models as db_models
|
||||
from mistral.engine import workflows
|
||||
from mistral import exceptions as exc
|
||||
from mistral.services import scheduler
|
||||
@@ -154,14 +153,6 @@ def set_workflow_state(wf_ex, state, msg=None):
|
||||
)
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-lock-execution')
|
||||
def lock_workflow_execution(wf_ex_id):
|
||||
# Locks a workflow execution using the db_api.acquire_lock function.
|
||||
# The method expires all session objects and returns the up-to-date
|
||||
# workflow execution from the DB.
|
||||
return db_api.acquire_lock(db_models.WorkflowExecution, wf_ex_id)
|
||||
|
||||
|
||||
@profiler.trace('workflow-handler-schedule-on-task-complete')
|
||||
def schedule_on_task_complete(task_ex, delay=0):
|
||||
"""Schedules task completion check.
|
||||
|
||||
@@ -367,7 +367,7 @@ class WorkflowResumeTest(base.EngineTestCase):
|
||||
|
||||
with mock.patch.object(
|
||||
db_api,
|
||||
'acquire_lock',
|
||||
'get_workflow_execution',
|
||||
side_effect=err):
|
||||
|
||||
self.assertRaises(
|
||||
|
||||
Reference in New Issue
Block a user