Add locking logic to database update/delete transactions
This patch removes all occurances of with_lockmode("update") and replaces them with with_for_update(). with_for_update() does the same thing as with_lockmode("update") (adds FOR UPDATE to the sql statement). This patch also addresses the action_acquire issue that can lead to deadlocks. with_lockmode("update") will lock the entire table leading up to the matching row when the unique ID of the row is not specified. action_acquire_first_ready has been updated to first seach for the unique ID and then lock the single row using action_acquire. Change-Id: I2637504c7d294b9993e8c90570974cef8c246fbf
This commit is contained in:
parent
d124c778ce
commit
0bdffc31b9
|
@ -87,7 +87,7 @@ engine_opts = [
|
|||
help=_('Number of times retrying a failed operation on the '
|
||||
'database.')),
|
||||
cfg.IntOpt('database_retry_interval',
|
||||
default=0.1,
|
||||
default=0.3,
|
||||
help=_('Initial number of seconds between database retries.')),
|
||||
cfg.IntOpt('database_max_retry_interval',
|
||||
default=2,
|
||||
|
|
|
@ -199,7 +199,8 @@ def cluster_count_all(context, filters=None, project_safe=True):
|
|||
@retry_on_deadlock
|
||||
def cluster_update(context, cluster_id, values):
|
||||
with session_for_write() as session:
|
||||
cluster = session.query(models.Cluster).get(cluster_id)
|
||||
cluster = session.query(
|
||||
models.Cluster).with_for_update().get(cluster_id)
|
||||
|
||||
if not cluster:
|
||||
raise exception.ResourceNotFound(type='cluster', id=cluster_id)
|
||||
|
@ -445,7 +446,7 @@ def cluster_lock_acquire(cluster_id, action_id, scope):
|
|||
:return: A list of action IDs that currently works on the cluster.
|
||||
'''
|
||||
with session_for_write() as session:
|
||||
query = session.query(models.ClusterLock).with_lockmode('update')
|
||||
query = session.query(models.ClusterLock).with_for_update()
|
||||
lock = query.get(cluster_id)
|
||||
if lock is not None:
|
||||
if scope == 1 and lock.semaphore > 0:
|
||||
|
@ -491,7 +492,7 @@ def cluster_lock_release(cluster_id, action_id, scope):
|
|||
'''
|
||||
with session_for_write() as session:
|
||||
lock = session.query(
|
||||
models.ClusterLock).with_lockmode('update').get(cluster_id)
|
||||
models.ClusterLock).with_for_update().get(cluster_id)
|
||||
if lock is None:
|
||||
return False
|
||||
|
||||
|
@ -502,7 +503,7 @@ def cluster_lock_release(cluster_id, action_id, scope):
|
|||
def cluster_lock_steal(cluster_id, action_id):
|
||||
with session_for_write() as session:
|
||||
lock = session.query(
|
||||
models.ClusterLock).with_lockmode('update').get(cluster_id)
|
||||
models.ClusterLock).with_for_update().get(cluster_id)
|
||||
if lock is not None:
|
||||
lock.action_ids = [action_id]
|
||||
lock.semaphore = -1
|
||||
|
@ -520,7 +521,7 @@ def cluster_lock_steal(cluster_id, action_id):
|
|||
def node_lock_acquire(node_id, action_id):
|
||||
with session_for_write() as session:
|
||||
lock = session.query(
|
||||
models.NodeLock).with_lockmode('update').get(node_id)
|
||||
models.NodeLock).with_for_update().get(node_id)
|
||||
if lock is None:
|
||||
lock = models.NodeLock(node_id=node_id, action_id=action_id)
|
||||
session.add(lock)
|
||||
|
@ -533,7 +534,7 @@ def node_lock_release(node_id, action_id):
|
|||
with session_for_write() as session:
|
||||
success = False
|
||||
lock = session.query(
|
||||
models.NodeLock).with_lockmode('update').get(node_id)
|
||||
models.NodeLock).with_for_update().get(node_id)
|
||||
if lock is not None and lock.action_id == action_id:
|
||||
session.delete(lock)
|
||||
success = True
|
||||
|
@ -545,7 +546,7 @@ def node_lock_release(node_id, action_id):
|
|||
def node_lock_steal(node_id, action_id):
|
||||
with session_for_write() as session:
|
||||
lock = session.query(
|
||||
models.NodeLock).with_lockmode('update').get(node_id)
|
||||
models.NodeLock).with_for_update().get(node_id)
|
||||
if lock is not None:
|
||||
lock.action_id = action_id
|
||||
lock.save(session)
|
||||
|
@ -1081,6 +1082,7 @@ def action_get_all(context, filters=None, limit=None, marker=None, sort=None,
|
|||
marker=marker, sort_dirs=dirs).all()
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def action_check_status(context, action_id, timestamp):
|
||||
with session_for_write() as session:
|
||||
q = session.query(models.ActionDependency)
|
||||
|
@ -1098,6 +1100,7 @@ def action_check_status(context, action_id, timestamp):
|
|||
return action.status
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def dependency_get_depended(context, action_id):
|
||||
with session_for_read() as session:
|
||||
q = session.query(models.ActionDependency).filter_by(
|
||||
|
@ -1105,6 +1108,7 @@ def dependency_get_depended(context, action_id):
|
|||
return [d.depended for d in q.all()]
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def dependency_get_dependents(context, action_id):
|
||||
with session_for_read() as session:
|
||||
q = session.query(models.ActionDependency).filter_by(
|
||||
|
@ -1124,7 +1128,7 @@ def dependency_add(context, depended, dependent):
|
|||
r = models.ActionDependency(depended=d, dependent=dependent)
|
||||
session.add(r)
|
||||
|
||||
query = session.query(models.Action).with_lockmode('update')
|
||||
query = session.query(models.Action).with_for_update()
|
||||
query = query.filter_by(id=dependent)
|
||||
query.update({'status': consts.ACTION_WAITING,
|
||||
'status_reason': 'Waiting for depended actions.'},
|
||||
|
@ -1142,7 +1146,7 @@ def dependency_add(context, depended, dependent):
|
|||
r = models.ActionDependency(depended=depended, dependent=d)
|
||||
session.add(r)
|
||||
|
||||
q = session.query(models.Action).with_lockmode('update')
|
||||
q = session.query(models.Action).with_for_update()
|
||||
q = q.filter(models.Action.id.in_(dependents))
|
||||
q.update({'status': consts.ACTION_WAITING,
|
||||
'status_reason': 'Waiting for depended actions.'},
|
||||
|
@ -1239,8 +1243,7 @@ def action_mark_cancelled(context, action_id, timestamp, reason=None):
|
|||
@retry_on_deadlock
|
||||
def action_acquire(context, action_id, owner, timestamp):
|
||||
with session_for_write() as session:
|
||||
action = session.query(models.Action).with_for_update().\
|
||||
get(action_id)
|
||||
action = session.query(models.Action).with_for_update().get(action_id)
|
||||
if not action:
|
||||
return None
|
||||
|
||||
|
@ -1278,29 +1281,15 @@ def action_acquire_random_ready(context, owner, timestamp):
|
|||
return action
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def _action_acquire_ready(session, owner, timestamp, order=None):
|
||||
action = session.query(models.Action).\
|
||||
filter_by(status=consts.ACTION_READY).\
|
||||
filter_by(owner=None).\
|
||||
order_by(order or func.random()).\
|
||||
with_for_update().first()
|
||||
|
||||
if action:
|
||||
action.owner = owner
|
||||
action.start_time = timestamp
|
||||
action.status = consts.ACTION_RUNNING
|
||||
action.status_reason = 'The action is being processed.'
|
||||
action.save(session)
|
||||
|
||||
return action
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
def action_acquire_first_ready(context, owner, timestamp):
|
||||
with session_for_write() as session:
|
||||
return _action_acquire_ready(session, owner, timestamp,
|
||||
consts.ACTION_CREATED_AT)
|
||||
action = session.query(models.Action).filter_by(
|
||||
status=consts.ACTION_READY).filter_by(
|
||||
owner=None).order_by(
|
||||
consts.ACTION_CREATED_AT or func.random()).first()
|
||||
if action:
|
||||
return action_acquire(context, action.id, owner, timestamp)
|
||||
|
||||
|
||||
@retry_on_deadlock
|
||||
|
@ -1590,7 +1579,7 @@ def registry_create(context, cluster_id, check_type, interval, params,
|
|||
@retry_on_deadlock
|
||||
def registry_update(context, cluster_id, values):
|
||||
with session_for_write() as session:
|
||||
query = session.query(models.HealthRegistry).with_lockmode('update')
|
||||
query = session.query(models.HealthRegistry).with_for_update()
|
||||
registry = query.filter_by(cluster_id=cluster_id).first()
|
||||
if registry:
|
||||
registry.update(values)
|
||||
|
@ -1602,7 +1591,7 @@ def registry_claim(context, engine_id):
|
|||
with session_for_write() as session:
|
||||
engines = session.query(models.Service).all()
|
||||
svc_ids = [e.id for e in engines if not utils.is_service_dead(e)]
|
||||
q_reg = session.query(models.HealthRegistry).with_lockmode('update')
|
||||
q_reg = session.query(models.HealthRegistry).with_for_update()
|
||||
if svc_ids:
|
||||
q_reg = q_reg.filter(
|
||||
models.HealthRegistry.engine_id.notin_(svc_ids))
|
||||
|
|
|
@ -165,7 +165,7 @@ class DBAPIActionTest(base.SenlinTestCase):
|
|||
self.assertIsNone(result)
|
||||
|
||||
def test_acquire_first_ready_one(self):
|
||||
data = {'created_at': tu.utcnow(True)}
|
||||
data = {'created_at': tu.utcnow(True), 'id': 'fake_UUID'}
|
||||
_create_action(self.ctx, **data)
|
||||
|
||||
result = db_api.action_acquire_first_ready(self.ctx, 'fake_o',
|
||||
|
|
Loading…
Reference in New Issue