Fix ORM caching for with_for_update calls

The SQLAlchemy recommends to use populate_existing() when using
with_for_update() [0], it fixed issues with the caching of the objects.

This patch precisely fixes a bug when locking of a loadbalancer in the
batch member update API call, the load balancer might not have been
locked correctly and race conditions could have occurred (processing
simultaneously 2 requests in the workers for the same load balancer).

[0] https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.with_for_update

Story 2010646
Task 47642

Co-Authored-By: Gaudenz Steinlin <gaudenz.steinlin@cloudscale.ch>

Change-Id: Ibd4da09079e83789d6cfe3658fcf1f266f5cf8b4
This commit is contained in:
Gregory Thiemonge 2023-03-14 13:09:42 -04:00
parent 5ed6f37519
commit 2e4ee9c65e
3 changed files with 83 additions and 26 deletions

View File

@ -406,8 +406,11 @@ class Repositories(object):
# Note: You cannot just use the current count as the in-use
# value as we don't want to lock the whole resource table
try:
quotas = lock_session.query(models.Quotas).filter_by(
project_id=project_id).with_for_update().first()
quotas = (lock_session.query(models.Quotas)
.filter_by(project_id=project_id)
.populate_existing()
.with_for_update()
.first())
if _class == data_models.LoadBalancer:
# Decide which quota to use
if quotas.load_balancer is None:
@ -575,8 +578,11 @@ class Repositories(object):
# Lock the project record in the database to block other quota checks
try:
quotas = lock_session.query(models.Quotas).filter_by(
project_id=project_id).with_for_update().first()
quotas = (lock_session.query(models.Quotas)
.filter_by(project_id=project_id)
.populate_existing()
.with_for_update()
.first())
if not quotas:
if not CONF.api_settings.auth_strategy == consts.NOAUTH:
LOG.error('Quota decrement on %(clss)s called on '
@ -742,8 +748,10 @@ class LoadBalancerRepository(BaseRepository):
:returns: bool
"""
with session.begin(subtransactions=True):
lb = session.query(self.model_class).with_for_update().filter_by(
id=id).one()
lb = (session.query(self.model_class)
.populate_existing()
.with_for_update()
.filter_by(id=id).one())
is_delete = status == consts.PENDING_DELETE
acceptable_statuses = (
consts.DELETABLE_STATUSES
@ -774,8 +782,10 @@ class LoadBalancerRepository(BaseRepository):
:returns: bool
"""
with session.begin(subtransactions=True):
lb = session.query(self.model_class).with_for_update().filter_by(
id=id).one()
lb = (session.query(self.model_class)
.populate_existing()
.with_for_update()
.filter_by(id=id).one())
if lb.provisioning_status not in consts.FAILOVERABLE_STATUSES:
if raise_exception:
raise exceptions.ImmutableObject(
@ -1143,10 +1153,13 @@ class ListenerStatisticsRepository(BaseRepository):
listener_id=delta_stats.listener_id,
amphora_id=delta_stats.amphora_id).count()
if count:
existing_stats = session.query(
self.model_class).with_for_update().filter_by(
listener_id=delta_stats.listener_id,
amphora_id=delta_stats.amphora_id).one()
existing_stats = (
session.query(self.model_class)
.populate_existing()
.with_for_update()
.filter_by(
listener_id=delta_stats.listener_id,
amphora_id=delta_stats.amphora_id).one())
existing_stats += delta_stats
existing_stats.active_connections = (
delta_stats.active_connections)
@ -1229,8 +1242,10 @@ class AmphoraRepository(BaseRepository):
filters['cached_zone'] = availability_zone
with session.begin(subtransactions=True):
amp = session.query(self.model_class).with_for_update().filter_by(
**filters).first()
amp = (session.query(self.model_class)
.populate_existing()
.with_for_update()
.filter_by(**filters).first())
if amp is None:
return None
@ -1281,12 +1296,15 @@ class AmphoraRepository(BaseRepository):
seconds=expired_seconds)
with session.begin(subtransactions=True):
amp = session.query(self.model_class).with_for_update().filter(
self.model_class.status.notin_(
[consts.DELETED, consts.PENDING_DELETE]),
self.model_class.cert_busy == false(),
self.model_class.cert_expiration < expired_date
).first()
amp = (session.query(self.model_class)
.populate_existing()
.with_for_update()
.filter(
self.model_class.status.notin_(
[consts.DELETED, consts.PENDING_DELETE]),
self.model_class.cert_busy == false(),
self.model_class.cert_expiration < expired_date)
.first())
if amp is None:
return None
@ -1385,8 +1403,11 @@ class AmphoraRepository(BaseRepository):
:raises NoResultFound: The amphora was not found or already deleted.
:returns: None
"""
amp = lock_session.query(self.model_class).with_for_update().filter_by(
id=id).filter(self.model_class.status != consts.DELETED).one()
amp = (lock_session.query(self.model_class)
.populate_existing()
.with_for_update()
.filter_by(id=id)
.filter(self.model_class.status != consts.DELETED).one())
if amp.status not in [consts.AMPHORA_READY, consts.ERROR]:
raise exceptions.ImmutableObject(resource=consts.AMPHORA, id=id)
amp.status = consts.PENDING_DELETE
@ -1587,6 +1608,7 @@ class AmphoraHealthRepository(BaseRepository):
# Pick one expired amphora for automatic failover
amp_health = lock_session.query(
self.model_class
).populate_existing(
).with_for_update(
).filter(
self.model_class.amphora_id.in_(expired_ids_query)
@ -1911,8 +1933,11 @@ class QuotasRepository(BaseRepository):
def update(self, session, project_id, **model_kwargs):
with session.begin(subtransactions=True):
kwargs_quota = model_kwargs['quota']
quotas = session.query(self.model_class).filter_by(
project_id=project_id).with_for_update().first()
quotas = (
session.query(self.model_class)
.filter_by(project_id=project_id)
.populate_existing()
.with_for_update().first())
if not quotas:
quotas = models.Quotas(project_id=project_id)
@ -1924,8 +1949,11 @@ class QuotasRepository(BaseRepository):
def delete(self, session, project_id):
with session.begin(subtransactions=True):
quotas = session.query(self.model_class).filter_by(
project_id=project_id).with_for_update().first()
quotas = (
session.query(self.model_class)
.filter_by(project_id=project_id)
.populate_existing()
.with_for_update().first())
if not quotas:
raise exceptions.NotFound(
resource=data_models.Quotas._name(), id=project_id)

View File

@ -3094,6 +3094,30 @@ class LoadBalancerRepositoryTest(BaseRepositoryTest):
lb = self.lb_repo.get(self.session, id=lb_id)
self.assertEqual(constants.PENDING_DELETE, lb.provisioning_status)
def test_test_and_set_provisioning_status_concurrent(self):
lb_id = uuidutils.generate_uuid()
lock_session1 = db_api.get_session(autocommit=False)
self.lb_repo.create(lock_session1, id=lb_id,
provisioning_status=constants.ACTIVE,
operating_status=constants.ONLINE,
enabled=True)
# Create a concurrent session
lock_session2 = db_api.get_session(autocommit=False)
# Load LB into lock_session2's identity map
lock_session2.query(db_models.LoadBalancer).filter_by(
id=lb_id).one()
# Update provisioning status in lock_session1
self.lb_repo.test_and_set_provisioning_status(
self.session, lb_id, constants.PENDING_UPDATE)
lock_session1.commit()
# Assert concurrent updates are rejected
self.assertFalse(self.lb_repo.test_and_set_provisioning_status(
lock_session2, lb_id, constants.PENDING_UPDATE))
def test_set_status_for_failover_immutable(self):
lb_id = uuidutils.generate_uuid()
self.lb_repo.create(self.session, id=lb_id,

View File

@ -0,0 +1,5 @@
---
fixes:
- |
Fixed a potential race condition in the member batch update API call, the
load balancers might not have been locked properly.