Fix ORM caching for with_for_update calls
The SQLAlchemy recommends to use populate_existing() when using
with_for_update() [0], it fixed issues with the caching of the objects.
This patch precisely fixes a bug when locking of a loadbalancer in the
batch member update API call, the load balancer might not have been
locked correctly and race conditions could have occurred (processing
simultaneously 2 requests in the workers for the same load balancer).
[0] https://docs.sqlalchemy.org/en/14/orm/query.html#sqlalchemy.orm.Query.with_for_update
Story 2010646
Task 47642
Co-Authored-By: Gaudenz Steinlin <gaudenz.steinlin@cloudscale.ch>
Merge conflict:
octavia/db/repositories.py
Change-Id: Ibd4da09079e83789d6cfe3658fcf1f266f5cf8b4
(cherry picked from commit 2e4ee9c65e)
This commit is contained in:
committed by
Sergey Kraynev
parent
304fc9ba81
commit
e1b7c78f85
@@ -393,8 +393,11 @@ class Repositories(object):
|
|||||||
# Note: You cannot just use the current count as the in-use
|
# Note: You cannot just use the current count as the in-use
|
||||||
# value as we don't want to lock the whole resource table
|
# value as we don't want to lock the whole resource table
|
||||||
try:
|
try:
|
||||||
quotas = lock_session.query(models.Quotas).filter_by(
|
quotas = (lock_session.query(models.Quotas)
|
||||||
project_id=project_id).with_for_update().first()
|
.filter_by(project_id=project_id)
|
||||||
|
.populate_existing()
|
||||||
|
.with_for_update()
|
||||||
|
.first())
|
||||||
if _class == data_models.LoadBalancer:
|
if _class == data_models.LoadBalancer:
|
||||||
# Decide which quota to use
|
# Decide which quota to use
|
||||||
if quotas.load_balancer is None:
|
if quotas.load_balancer is None:
|
||||||
@@ -562,8 +565,11 @@ class Repositories(object):
|
|||||||
|
|
||||||
# Lock the project record in the database to block other quota checks
|
# Lock the project record in the database to block other quota checks
|
||||||
try:
|
try:
|
||||||
quotas = lock_session.query(models.Quotas).filter_by(
|
quotas = (lock_session.query(models.Quotas)
|
||||||
project_id=project_id).with_for_update().first()
|
.filter_by(project_id=project_id)
|
||||||
|
.populate_existing()
|
||||||
|
.with_for_update()
|
||||||
|
.first())
|
||||||
if not quotas:
|
if not quotas:
|
||||||
if not CONF.api_settings.auth_strategy == consts.NOAUTH:
|
if not CONF.api_settings.auth_strategy == consts.NOAUTH:
|
||||||
LOG.error('Quota decrement on %(clss)s called on '
|
LOG.error('Quota decrement on %(clss)s called on '
|
||||||
@@ -875,8 +881,10 @@ class LoadBalancerRepository(BaseRepository):
|
|||||||
:returns: bool
|
:returns: bool
|
||||||
"""
|
"""
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
lb = session.query(self.model_class).with_for_update().filter_by(
|
lb = (session.query(self.model_class)
|
||||||
id=id).one()
|
.populate_existing()
|
||||||
|
.with_for_update()
|
||||||
|
.filter_by(id=id).one())
|
||||||
is_delete = status == consts.PENDING_DELETE
|
is_delete = status == consts.PENDING_DELETE
|
||||||
acceptable_statuses = (
|
acceptable_statuses = (
|
||||||
consts.DELETABLE_STATUSES
|
consts.DELETABLE_STATUSES
|
||||||
@@ -907,8 +915,10 @@ class LoadBalancerRepository(BaseRepository):
|
|||||||
:returns: bool
|
:returns: bool
|
||||||
"""
|
"""
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
lb = session.query(self.model_class).with_for_update().filter_by(
|
lb = (session.query(self.model_class)
|
||||||
id=id).one()
|
.populate_existing()
|
||||||
|
.with_for_update()
|
||||||
|
.filter_by(id=id).one())
|
||||||
if lb.provisioning_status not in consts.FAILOVERABLE_STATUSES:
|
if lb.provisioning_status not in consts.FAILOVERABLE_STATUSES:
|
||||||
if raise_exception:
|
if raise_exception:
|
||||||
raise exceptions.ImmutableObject(
|
raise exceptions.ImmutableObject(
|
||||||
@@ -1261,10 +1271,13 @@ class ListenerStatisticsRepository(BaseRepository):
|
|||||||
listener_id=delta_stats.listener_id,
|
listener_id=delta_stats.listener_id,
|
||||||
amphora_id=delta_stats.amphora_id).count()
|
amphora_id=delta_stats.amphora_id).count()
|
||||||
if count:
|
if count:
|
||||||
existing_stats = session.query(
|
existing_stats = (
|
||||||
self.model_class).with_for_update().filter_by(
|
session.query(self.model_class)
|
||||||
listener_id=delta_stats.listener_id,
|
.populate_existing()
|
||||||
amphora_id=delta_stats.amphora_id).one()
|
.with_for_update()
|
||||||
|
.filter_by(
|
||||||
|
listener_id=delta_stats.listener_id,
|
||||||
|
amphora_id=delta_stats.amphora_id).one())
|
||||||
existing_stats += delta_stats
|
existing_stats += delta_stats
|
||||||
existing_stats.active_connections = (
|
existing_stats.active_connections = (
|
||||||
delta_stats.active_connections)
|
delta_stats.active_connections)
|
||||||
@@ -1347,8 +1360,10 @@ class AmphoraRepository(BaseRepository):
|
|||||||
filters['cached_zone'] = availability_zone
|
filters['cached_zone'] = availability_zone
|
||||||
|
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
amp = session.query(self.model_class).with_for_update().filter_by(
|
amp = (session.query(self.model_class)
|
||||||
**filters).first()
|
.populate_existing()
|
||||||
|
.with_for_update()
|
||||||
|
.filter_by(**filters).first())
|
||||||
|
|
||||||
if amp is None:
|
if amp is None:
|
||||||
return None
|
return None
|
||||||
@@ -1427,12 +1442,15 @@ class AmphoraRepository(BaseRepository):
|
|||||||
seconds=expired_seconds)
|
seconds=expired_seconds)
|
||||||
|
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
amp = session.query(self.model_class).with_for_update().filter(
|
amp = (session.query(self.model_class)
|
||||||
self.model_class.status.notin_(
|
.populate_existing()
|
||||||
[consts.DELETED, consts.PENDING_DELETE]),
|
.with_for_update()
|
||||||
self.model_class.cert_busy == false(),
|
.filter(
|
||||||
self.model_class.cert_expiration < expired_date
|
self.model_class.status.notin_(
|
||||||
).first()
|
[consts.DELETED, consts.PENDING_DELETE]),
|
||||||
|
self.model_class.cert_busy == false(),
|
||||||
|
self.model_class.cert_expiration < expired_date)
|
||||||
|
.first())
|
||||||
|
|
||||||
if amp is None:
|
if amp is None:
|
||||||
return None
|
return None
|
||||||
@@ -1531,8 +1549,11 @@ class AmphoraRepository(BaseRepository):
|
|||||||
:raises NoResultFound: The amphora was not found or already deleted.
|
:raises NoResultFound: The amphora was not found or already deleted.
|
||||||
:returns: None
|
:returns: None
|
||||||
"""
|
"""
|
||||||
amp = lock_session.query(self.model_class).with_for_update().filter_by(
|
amp = (lock_session.query(self.model_class)
|
||||||
id=id).filter(self.model_class.status != consts.DELETED).one()
|
.populate_existing()
|
||||||
|
.with_for_update()
|
||||||
|
.filter_by(id=id)
|
||||||
|
.filter(self.model_class.status != consts.DELETED).one())
|
||||||
if amp.status not in [consts.AMPHORA_READY, consts.ERROR]:
|
if amp.status not in [consts.AMPHORA_READY, consts.ERROR]:
|
||||||
raise exceptions.ImmutableObject(resource=consts.AMPHORA, id=id)
|
raise exceptions.ImmutableObject(resource=consts.AMPHORA, id=id)
|
||||||
amp.status = consts.PENDING_DELETE
|
amp.status = consts.PENDING_DELETE
|
||||||
@@ -1691,10 +1712,17 @@ class AmphoraHealthRepository(BaseRepository):
|
|||||||
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
|
expired_time = datetime.datetime.utcnow() - datetime.timedelta(
|
||||||
seconds=timeout)
|
seconds=timeout)
|
||||||
|
|
||||||
amp = session.query(self.model_class).with_for_update().filter_by(
|
amp = session.query(
|
||||||
busy=False).filter(
|
self.model_class
|
||||||
self.model_class.last_update < expired_time).order_by(
|
).populate_existing(
|
||||||
func.random()).first()
|
).with_for_update(
|
||||||
|
).filter_by(
|
||||||
|
busy=False
|
||||||
|
).filter(
|
||||||
|
self.model_class.last_update < expired_time
|
||||||
|
).order_by(
|
||||||
|
func.random()
|
||||||
|
).first()
|
||||||
|
|
||||||
if amp is None:
|
if amp is None:
|
||||||
return None
|
return None
|
||||||
@@ -1986,8 +2014,11 @@ class QuotasRepository(BaseRepository):
|
|||||||
def update(self, session, project_id, **model_kwargs):
|
def update(self, session, project_id, **model_kwargs):
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
kwargs_quota = model_kwargs['quota']
|
kwargs_quota = model_kwargs['quota']
|
||||||
quotas = session.query(self.model_class).filter_by(
|
quotas = (
|
||||||
project_id=project_id).with_for_update().first()
|
session.query(self.model_class)
|
||||||
|
.filter_by(project_id=project_id)
|
||||||
|
.populate_existing()
|
||||||
|
.with_for_update().first())
|
||||||
if not quotas:
|
if not quotas:
|
||||||
quotas = models.Quotas(project_id=project_id)
|
quotas = models.Quotas(project_id=project_id)
|
||||||
|
|
||||||
@@ -1999,8 +2030,11 @@ class QuotasRepository(BaseRepository):
|
|||||||
|
|
||||||
def delete(self, session, project_id):
|
def delete(self, session, project_id):
|
||||||
with session.begin(subtransactions=True):
|
with session.begin(subtransactions=True):
|
||||||
quotas = session.query(self.model_class).filter_by(
|
quotas = (
|
||||||
project_id=project_id).with_for_update().first()
|
session.query(self.model_class)
|
||||||
|
.filter_by(project_id=project_id)
|
||||||
|
.populate_existing()
|
||||||
|
.with_for_update().first())
|
||||||
if not quotas:
|
if not quotas:
|
||||||
raise exceptions.NotFound(
|
raise exceptions.NotFound(
|
||||||
resource=data_models.Quotas._name(), id=project_id)
|
resource=data_models.Quotas._name(), id=project_id)
|
||||||
@@ -2104,7 +2138,8 @@ class SparesPoolRepository(BaseRepository):
|
|||||||
so that other processes cannot read or write it.
|
so that other processes cannot read or write it.
|
||||||
:returns: expected_spares_count, updated_at
|
:returns: expected_spares_count, updated_at
|
||||||
"""
|
"""
|
||||||
row = lock_session.query(models.SparesPool).with_for_update().one()
|
row = lock_session.query(
|
||||||
|
models.SparesPool).populate_existing().with_for_update().one()
|
||||||
return row
|
return row
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -3679,6 +3679,30 @@ class LoadBalancerRepositoryTest(BaseRepositoryTest):
|
|||||||
lb = self.lb_repo.get(self.session, id=lb_id)
|
lb = self.lb_repo.get(self.session, id=lb_id)
|
||||||
self.assertEqual(constants.PENDING_DELETE, lb.provisioning_status)
|
self.assertEqual(constants.PENDING_DELETE, lb.provisioning_status)
|
||||||
|
|
||||||
|
def test_test_and_set_provisioning_status_concurrent(self):
|
||||||
|
lb_id = uuidutils.generate_uuid()
|
||||||
|
lock_session1 = db_api.get_session(autocommit=False)
|
||||||
|
self.lb_repo.create(lock_session1, id=lb_id,
|
||||||
|
provisioning_status=constants.ACTIVE,
|
||||||
|
operating_status=constants.ONLINE,
|
||||||
|
enabled=True)
|
||||||
|
|
||||||
|
# Create a concurrent session
|
||||||
|
lock_session2 = db_api.get_session(autocommit=False)
|
||||||
|
|
||||||
|
# Load LB into lock_session2's identity map
|
||||||
|
lock_session2.query(db_models.LoadBalancer).filter_by(
|
||||||
|
id=lb_id).one()
|
||||||
|
|
||||||
|
# Update provisioning status in lock_session1
|
||||||
|
self.lb_repo.test_and_set_provisioning_status(
|
||||||
|
self.session, lb_id, constants.PENDING_UPDATE)
|
||||||
|
lock_session1.commit()
|
||||||
|
|
||||||
|
# Assert concurrent updates are rejected
|
||||||
|
self.assertFalse(self.lb_repo.test_and_set_provisioning_status(
|
||||||
|
lock_session2, lb_id, constants.PENDING_UPDATE))
|
||||||
|
|
||||||
def test_set_status_for_failover_immutable(self):
|
def test_set_status_for_failover_immutable(self):
|
||||||
lb_id = uuidutils.generate_uuid()
|
lb_id = uuidutils.generate_uuid()
|
||||||
self.lb_repo.create(self.session, id=lb_id,
|
self.lb_repo.create(self.session, id=lb_id,
|
||||||
|
|||||||
@@ -0,0 +1,5 @@
|
|||||||
|
---
|
||||||
|
fixes:
|
||||||
|
- |
|
||||||
|
Fixed a potential race condition in the member batch update API call, the
|
||||||
|
load balancers might not have been locked properly.
|
||||||
Reference in New Issue
Block a user