Merge "Fix DB deadlock in quotas" into stable/2023.2
This commit is contained in:
commit
c0cd920166
@ -40,6 +40,7 @@ from octavia.common import constants as consts
|
|||||||
from octavia.common import data_models
|
from octavia.common import data_models
|
||||||
from octavia.common import exceptions
|
from octavia.common import exceptions
|
||||||
from octavia.common import validate
|
from octavia.common import validate
|
||||||
|
from octavia.db import api as db_api
|
||||||
from octavia.db import models
|
from octavia.db import models
|
||||||
|
|
||||||
CONF = cfg.CONF
|
CONF = cfg.CONF
|
||||||
@ -390,13 +391,8 @@ class Repositories(object):
|
|||||||
if not project_id:
|
if not project_id:
|
||||||
raise exceptions.MissingProjectID()
|
raise exceptions.MissingProjectID()
|
||||||
|
|
||||||
quotas = self.quotas.get(session, project_id=project_id)
|
self.quotas.ensure_project_exists(project_id)
|
||||||
if not quotas:
|
|
||||||
# Make sure we have a record to lock
|
|
||||||
self.quotas.update(
|
|
||||||
session,
|
|
||||||
project_id,
|
|
||||||
quota={})
|
|
||||||
# Lock the project record in the database to block other quota checks
|
# Lock the project record in the database to block other quota checks
|
||||||
#
|
#
|
||||||
# Note: You cannot just use the current count as the in-use
|
# Note: You cannot just use the current count as the in-use
|
||||||
@ -1883,11 +1879,6 @@ class L7PolicyRepository(BaseRepository):
|
|||||||
class QuotasRepository(BaseRepository):
|
class QuotasRepository(BaseRepository):
|
||||||
model_class = models.Quotas
|
model_class = models.Quotas
|
||||||
|
|
||||||
# Since this is for the initial quota record creation it locks the table
|
|
||||||
# which can lead to recoverable deadlocks. Thus we use the deadlock
|
|
||||||
# retry wrapper here. This may not be appropriate for other sessions
|
|
||||||
# and or queries. Use with caution.
|
|
||||||
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
|
||||||
def update(self, session, project_id, **model_kwargs):
|
def update(self, session, project_id, **model_kwargs):
|
||||||
kwargs_quota = model_kwargs['quota']
|
kwargs_quota = model_kwargs['quota']
|
||||||
quotas = (
|
quotas = (
|
||||||
@ -1904,6 +1895,19 @@ class QuotasRepository(BaseRepository):
|
|||||||
session.flush()
|
session.flush()
|
||||||
return self.get(session, project_id=project_id)
|
return self.get(session, project_id=project_id)
|
||||||
|
|
||||||
|
# Since this is for the initial quota record creation it locks the table
|
||||||
|
# which can lead to recoverable deadlocks. Thus we use the deadlock
|
||||||
|
# retry wrapper here. This may not be appropriate for other sessions
|
||||||
|
# and or queries. Use with caution.
|
||||||
|
@oslo_db_api.wrap_db_retry(max_retries=5, retry_on_deadlock=True)
|
||||||
|
def ensure_project_exists(self, project_id):
|
||||||
|
with db_api.session().begin() as session:
|
||||||
|
quotas = self.get(session, project_id=project_id)
|
||||||
|
if not quotas:
|
||||||
|
# Make sure we have a record to lock
|
||||||
|
self.update(session, project_id, quota={})
|
||||||
|
session.commit()
|
||||||
|
|
||||||
def delete(self, session, project_id):
|
def delete(self, session, project_id):
|
||||||
quotas = (
|
quotas = (
|
||||||
session.query(self.model_class)
|
session.query(self.model_class)
|
||||||
|
@ -30,6 +30,8 @@ from octavia.tests import fixtures as oc_fixtures
|
|||||||
|
|
||||||
class OctaviaDBTestBase(test_base.BaseTestCase):
|
class OctaviaDBTestBase(test_base.BaseTestCase):
|
||||||
|
|
||||||
|
facade = None
|
||||||
|
|
||||||
def setUp(self, connection_string='sqlite://'):
|
def setUp(self, connection_string='sqlite://'):
|
||||||
super().setUp()
|
super().setUp()
|
||||||
|
|
||||||
@ -73,11 +75,18 @@ class OctaviaDBTestBase(test_base.BaseTestCase):
|
|||||||
sqlite_fk=True)
|
sqlite_fk=True)
|
||||||
engine = facade.get_engine()
|
engine = facade.get_engine()
|
||||||
session = facade.get_session(expire_on_commit=True)
|
session = facade.get_session(expire_on_commit=True)
|
||||||
|
self.facade = facade
|
||||||
else:
|
else:
|
||||||
engine = db_api.get_engine()
|
engine = db_api.get_engine()
|
||||||
session = db_api.get_session()
|
session = db_api.get_session()
|
||||||
return engine, session
|
return engine, session
|
||||||
|
|
||||||
|
def get_session(self):
|
||||||
|
if 'sqlite:///' in self.connection_string:
|
||||||
|
return self.facade.get_session(expire_on_commit=True)
|
||||||
|
else:
|
||||||
|
return db_api.get_session()
|
||||||
|
|
||||||
def _seed_lookup_tables(self, session):
|
def _seed_lookup_tables(self, session):
|
||||||
self._seed_lookup_table(
|
self._seed_lookup_table(
|
||||||
session, constants.SUPPORTED_PROVISIONING_STATUSES,
|
session, constants.SUPPORTED_PROVISIONING_STATUSES,
|
||||||
|
@ -12,6 +12,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
import datetime
|
import datetime
|
||||||
import random
|
import random
|
||||||
from unittest import mock
|
from unittest import mock
|
||||||
@ -509,6 +510,47 @@ class AllRepositoriesTest(base.OctaviaDBTestBase):
|
|||||||
project_id=project_id).all()
|
project_id=project_id).all()
|
||||||
self.assertEqual(1, len(lbs)) # After rollback: 1 (broken!)
|
self.assertEqual(1, len(lbs)) # After rollback: 1 (broken!)
|
||||||
|
|
||||||
|
def test_check_quota_met_check_deadlock(self):
|
||||||
|
# This test doesn't work with sqlite, using another backend is not
|
||||||
|
# straighforward, we need to update the connection_string passed to the
|
||||||
|
# __init__ func and also change some calls in the constructor (don't
|
||||||
|
# create the DB objects if we use a DB that was deployed for Octavia)
|
||||||
|
if 'sqlite://' in self.connection_string:
|
||||||
|
self.skipTest("The test for checking potential deadlocks "
|
||||||
|
"doesn't work with the sqlite backend")
|
||||||
|
|
||||||
|
conf = self.useFixture(oslo_fixture.Config(cfg.CONF))
|
||||||
|
conf.config(group='api_settings', auth_strategy=constants.TESTING)
|
||||||
|
conf.config(group='quotas', default_load_balancer_quota=-1)
|
||||||
|
|
||||||
|
# Calling check_quota_met concurrently from many threads may
|
||||||
|
# have triggered a deadlock in the DB
|
||||||
|
# (Note: we run the test 8 times because it's not 100% reproducible)
|
||||||
|
# https://bugs.launchpad.net/octavia/+bug/2038798
|
||||||
|
for _ in range(8):
|
||||||
|
number_of_projects = 8
|
||||||
|
project_ids = (
|
||||||
|
uuidutils.generate_uuid()
|
||||||
|
for _ in range(number_of_projects))
|
||||||
|
|
||||||
|
with ThreadPoolExecutor(
|
||||||
|
max_workers=number_of_projects) as executor:
|
||||||
|
def _test_check_quota_met(project_id):
|
||||||
|
session = self.get_session()
|
||||||
|
session.begin()
|
||||||
|
self.assertFalse(self.repos.check_quota_met(
|
||||||
|
session, data_models.LoadBalancer,
|
||||||
|
project_id))
|
||||||
|
session.commit()
|
||||||
|
|
||||||
|
futs = []
|
||||||
|
for project_id in project_ids:
|
||||||
|
future = executor.submit(_test_check_quota_met, project_id)
|
||||||
|
futs.append(future)
|
||||||
|
|
||||||
|
for fut in futs:
|
||||||
|
fut.result()
|
||||||
|
|
||||||
def test_check_quota_met(self):
|
def test_check_quota_met(self):
|
||||||
|
|
||||||
project_id = uuidutils.generate_uuid()
|
project_id = uuidutils.generate_uuid()
|
||||||
|
Loading…
Reference in New Issue
Block a user