diff --git a/octavia/db/repositories.py b/octavia/db/repositories.py index 776d756ca4..dd355f348d 100644 --- a/octavia/db/repositories.py +++ b/octavia/db/repositories.py @@ -393,8 +393,11 @@ class Repositories(object): # Note: You cannot just use the current count as the in-use # value as we don't want to lock the whole resource table try: - quotas = lock_session.query(models.Quotas).filter_by( - project_id=project_id).with_for_update().first() + quotas = (lock_session.query(models.Quotas) + .filter_by(project_id=project_id) + .populate_existing() + .with_for_update() + .first()) if _class == data_models.LoadBalancer: # Decide which quota to use if quotas.load_balancer is None: @@ -562,8 +565,11 @@ class Repositories(object): # Lock the project record in the database to block other quota checks try: - quotas = lock_session.query(models.Quotas).filter_by( - project_id=project_id).with_for_update().first() + quotas = (lock_session.query(models.Quotas) + .filter_by(project_id=project_id) + .populate_existing() + .with_for_update() + .first()) if not quotas: if not CONF.api_settings.auth_strategy == consts.NOAUTH: LOG.error('Quota decrement on %(clss)s called on ' @@ -875,8 +881,10 @@ class LoadBalancerRepository(BaseRepository): :returns: bool """ with session.begin(subtransactions=True): - lb = session.query(self.model_class).with_for_update().filter_by( - id=id).one() + lb = (session.query(self.model_class) + .populate_existing() + .with_for_update() + .filter_by(id=id).one()) is_delete = status == consts.PENDING_DELETE acceptable_statuses = ( consts.DELETABLE_STATUSES @@ -907,8 +915,10 @@ class LoadBalancerRepository(BaseRepository): :returns: bool """ with session.begin(subtransactions=True): - lb = session.query(self.model_class).with_for_update().filter_by( - id=id).one() + lb = (session.query(self.model_class) + .populate_existing() + .with_for_update() + .filter_by(id=id).one()) if lb.provisioning_status not in consts.FAILOVERABLE_STATUSES: if raise_exception: raise exceptions.ImmutableObject( @@ -1261,10 +1271,13 @@ class ListenerStatisticsRepository(BaseRepository): listener_id=delta_stats.listener_id, amphora_id=delta_stats.amphora_id).count() if count: - existing_stats = session.query( - self.model_class).with_for_update().filter_by( - listener_id=delta_stats.listener_id, - amphora_id=delta_stats.amphora_id).one() + existing_stats = ( + session.query(self.model_class) + .populate_existing() + .with_for_update() + .filter_by( + listener_id=delta_stats.listener_id, + amphora_id=delta_stats.amphora_id).one()) existing_stats += delta_stats existing_stats.active_connections = ( delta_stats.active_connections) @@ -1347,8 +1360,10 @@ class AmphoraRepository(BaseRepository): filters['cached_zone'] = availability_zone with session.begin(subtransactions=True): - amp = session.query(self.model_class).with_for_update().filter_by( - **filters).first() + amp = (session.query(self.model_class) + .populate_existing() + .with_for_update() + .filter_by(**filters).first()) if amp is None: return None @@ -1427,12 +1442,15 @@ class AmphoraRepository(BaseRepository): seconds=expired_seconds) with session.begin(subtransactions=True): - amp = session.query(self.model_class).with_for_update().filter( - self.model_class.status.notin_( - [consts.DELETED, consts.PENDING_DELETE]), - self.model_class.cert_busy == false(), - self.model_class.cert_expiration < expired_date - ).first() + amp = (session.query(self.model_class) + .populate_existing() + .with_for_update() + .filter( + self.model_class.status.notin_( + [consts.DELETED, consts.PENDING_DELETE]), + self.model_class.cert_busy == false(), + self.model_class.cert_expiration < expired_date) + .first()) if amp is None: return None @@ -1531,8 +1549,11 @@ class AmphoraRepository(BaseRepository): :raises NoResultFound: The amphora was not found or already deleted. :returns: None """ - amp = lock_session.query(self.model_class).with_for_update().filter_by( - id=id).filter(self.model_class.status != consts.DELETED).one() + amp = (lock_session.query(self.model_class) + .populate_existing() + .with_for_update() + .filter_by(id=id) + .filter(self.model_class.status != consts.DELETED).one()) if amp.status not in [consts.AMPHORA_READY, consts.ERROR]: raise exceptions.ImmutableObject(resource=consts.AMPHORA, id=id) amp.status = consts.PENDING_DELETE @@ -1691,10 +1712,17 @@ class AmphoraHealthRepository(BaseRepository): expired_time = datetime.datetime.utcnow() - datetime.timedelta( seconds=timeout) - amp = session.query(self.model_class).with_for_update().filter_by( - busy=False).filter( - self.model_class.last_update < expired_time).order_by( - func.random()).first() + amp = session.query( + self.model_class + ).populate_existing( + ).with_for_update( + ).filter_by( + busy=False + ).filter( + self.model_class.last_update < expired_time + ).order_by( + func.random() + ).first() if amp is None: return None @@ -1986,8 +2014,11 @@ class QuotasRepository(BaseRepository): def update(self, session, project_id, **model_kwargs): with session.begin(subtransactions=True): kwargs_quota = model_kwargs['quota'] - quotas = session.query(self.model_class).filter_by( - project_id=project_id).with_for_update().first() + quotas = ( + session.query(self.model_class) + .filter_by(project_id=project_id) + .populate_existing() + .with_for_update().first()) if not quotas: quotas = models.Quotas(project_id=project_id) @@ -1999,8 +2030,11 @@ class QuotasRepository(BaseRepository): def delete(self, session, project_id): with session.begin(subtransactions=True): - quotas = session.query(self.model_class).filter_by( - project_id=project_id).with_for_update().first() + quotas = ( + session.query(self.model_class) + .filter_by(project_id=project_id) + .populate_existing() + .with_for_update().first()) if not quotas: raise exceptions.NotFound( resource=data_models.Quotas._name(), id=project_id) @@ -2104,7 +2138,8 @@ class SparesPoolRepository(BaseRepository): so that other processes cannot read or write it. :returns: expected_spares_count, updated_at """ - row = lock_session.query(models.SparesPool).with_for_update().one() + row = lock_session.query( + models.SparesPool).populate_existing().with_for_update().one() return row diff --git a/octavia/tests/functional/db/test_repositories.py b/octavia/tests/functional/db/test_repositories.py index 30032a8d6f..575476d8ba 100644 --- a/octavia/tests/functional/db/test_repositories.py +++ b/octavia/tests/functional/db/test_repositories.py @@ -3679,6 +3679,30 @@ class LoadBalancerRepositoryTest(BaseRepositoryTest): lb = self.lb_repo.get(self.session, id=lb_id) self.assertEqual(constants.PENDING_DELETE, lb.provisioning_status) + def test_test_and_set_provisioning_status_concurrent(self): + lb_id = uuidutils.generate_uuid() + lock_session1 = db_api.get_session(autocommit=False) + self.lb_repo.create(lock_session1, id=lb_id, + provisioning_status=constants.ACTIVE, + operating_status=constants.ONLINE, + enabled=True) + + # Create a concurrent session + lock_session2 = db_api.get_session(autocommit=False) + + # Load LB into lock_session2's identity map + lock_session2.query(db_models.LoadBalancer).filter_by( + id=lb_id).one() + + # Update provisioning status in lock_session1 + self.lb_repo.test_and_set_provisioning_status( + self.session, lb_id, constants.PENDING_UPDATE) + lock_session1.commit() + + # Assert concurrent updates are rejected + self.assertFalse(self.lb_repo.test_and_set_provisioning_status( + lock_session2, lb_id, constants.PENDING_UPDATE)) + def test_set_status_for_failover_immutable(self): lb_id = uuidutils.generate_uuid() self.lb_repo.create(self.session, id=lb_id, diff --git a/releasenotes/notes/fix-batch-member-update-race-condition-09b82e2cc3121e03.yaml b/releasenotes/notes/fix-batch-member-update-race-condition-09b82e2cc3121e03.yaml new file mode 100644 index 0000000000..80307992fc --- /dev/null +++ b/releasenotes/notes/fix-batch-member-update-race-condition-09b82e2cc3121e03.yaml @@ -0,0 +1,5 @@ +--- +fixes: + - | + Fixed a potential race condition in the member batch update API call, the + load balancers might not have been locked properly.