db: Remove use of autocommit

Change-Id: I3f0d384f353069f1eb186bbfa2aa3d8afa969e05
This commit is contained in:
wu.chunyang 2024-03-13 22:42:00 +08:00
parent 860772e1b0
commit ab83ad7c85
12 changed files with 157 additions and 209 deletions

View File

@ -189,14 +189,15 @@ class Backup(object):
:param instance_id: Id of the instance
:param exclude: Backup ID to exclude from the query (any other running)
"""
query = DBBackup.query()
query = query.filter(DBBackup.instance_id == instance_id,
DBBackup.state.in_(BackupState.RUNNING_STATES))
# filter out deleted backups, PEP8 does not like field == False!
query = query.filter_by(deleted=False)
if exclude:
query = query.filter(DBBackup.id != exclude)
return query.first()
with DBBackup.query() as query:
query = query.filter(DBBackup.instance_id == instance_id,
DBBackup.state.in_(
BackupState.RUNNING_STATES))
# filter out deleted backups, PEP8 does not like field == False!
query = query.filter_by(deleted=False)
if exclude:
query = query.filter(DBBackup.id != exclude)
return query.first()
@classmethod
def get_by_id(cls, context, backup_id, deleted=False):
@ -233,30 +234,31 @@ class Backup(object):
marker = None
else:
marker += limit
return query.all(), marker
res = query.all()
return res, marker
@classmethod
def list(cls, context, datastore=None, instance_id=None, project_id=None,
all_projects=False):
query = DBBackup.query()
filters = [DBBackup.deleted == 0]
with DBBackup.query() as query:
filters = [DBBackup.deleted == 0]
if project_id:
filters.append(DBBackup.tenant_id == project_id)
elif not all_projects:
filters.append(DBBackup.tenant_id == context.project_id)
if project_id:
filters.append(DBBackup.tenant_id == project_id)
elif not all_projects:
filters.append(DBBackup.tenant_id == context.project_id)
if instance_id:
filters.append(DBBackup.instance_id == instance_id)
if instance_id:
filters.append(DBBackup.instance_id == instance_id)
if datastore:
ds = datastore_models.Datastore.load(datastore)
filters.append(datastore_models.DBDatastoreVersion.
datastore_id == ds.id)
query = query.join(datastore_models.DBDatastoreVersion)
if datastore:
ds = datastore_models.Datastore.load(datastore)
filters.append(datastore_models.DBDatastoreVersion.
datastore_id == ds.id)
query = query.join(datastore_models.DBDatastoreVersion)
query = query.filter(*filters)
return cls._paginate(context, query)
query = query.filter(*filters)
return cls._paginate(context, query)
@classmethod
def list_for_instance(cls, context, instance_id):
@ -266,15 +268,15 @@ class Backup(object):
:param instance_id:
:return:
"""
query = DBBackup.query()
if context.is_admin:
query = query.filter_by(instance_id=instance_id,
deleted=False)
else:
query = query.filter_by(instance_id=instance_id,
tenant_id=context.project_id,
deleted=False)
return cls._paginate(context, query)
with DBBackup.query() as query:
if context.is_admin:
query = query.filter_by(instance_id=instance_id,
deleted=False)
else:
query = query.filter_by(instance_id=instance_id,
tenant_id=context.project_id,
deleted=False)
return cls._paginate(context, query)
@classmethod
def get_last_completed(cls, context, instance_id,
@ -300,13 +302,14 @@ class Backup(object):
@classmethod
def fail_for_instance(cls, instance_id):
query = DBBackup.query()
query = query.filter(DBBackup.instance_id == instance_id,
DBBackup.state.in_(BackupState.RUNNING_STATES))
query = query.filter_by(deleted=False)
for backup in query.all():
backup.state = BackupState.FAILED
backup.save()
with DBBackup.query() as query:
query = query.filter(DBBackup.instance_id == instance_id,
DBBackup.state.in_(
BackupState.RUNNING_STATES))
query = query.filter_by(deleted=False)
for backup in query.all():
backup.state = BackupState.FAILED
backup.save()
@classmethod
def delete(cls, context, backup_id):
@ -319,13 +322,13 @@ class Backup(object):
"""
# Recursively delete all children and grandchildren of this backup.
query = DBBackup.query()
query = query.filter_by(parent_id=backup_id, deleted=False)
for child in query.all():
try:
cls.delete(context, child.id)
except exception.NotFound:
LOG.warning("Backup %s cannot be found.", backup_id)
with DBBackup.query() as query:
query = query.filter_by(parent_id=backup_id, deleted=False)
for child in query.all():
try:
cls.delete(context, child.id)
except exception.NotFound:
LOG.warning("Backup %s cannot be found.", backup_id)
def _delete_resources():
backup = cls.get_by_id(context, backup_id)

View File

@ -41,10 +41,10 @@ class Checks(upgradecheck.UpgradeCommands):
db_api = db.get_db_api()
db_api.configure_db(cfg.CONF)
query = DBInstance.query()
query = query.filter(DBInstance.task_status != InstanceTasks.NONE)
query = query.filter_by(deleted=False)
instances_with_tasks = query.count()
with DBInstance.query() as query:
query = query.filter(DBInstance.task_status != InstanceTasks.NONE)
query = query.filter_by(deleted=False)
instances_with_tasks = query.count()
if instances_with_tasks:
return upgradecheck.Result(

View File

@ -22,6 +22,7 @@ from keystonemiddleware import auth_token
from oslo_config import cfg
from oslo_config.cfg import NoSuchOptError
from oslo_config import types
from oslo_db import options as db_options
from oslo_log import log as logging
from oslo_log import versionutils
from oslo_middleware import cors
@ -536,64 +537,6 @@ common_opts = [
'instance'),
]
database_opts = [
cfg.StrOpt('connection',
default='sqlite:///trove_test.sqlite',
help='SQL Connection.',
secret=True,
deprecated_name='sql_connection',
deprecated_group='DEFAULT'),
cfg.IntOpt('connection_recycle_time',
default=3600),
cfg.BoolOpt('query_log',
default=False,
deprecated_name='sql_query_log',
deprecated_group='DEFAULT',
deprecated_for_removal=True),
cfg.BoolOpt('sqlite_synchronous',
default=True,
help='If True, SQLite uses synchronous mode.'),
cfg.StrOpt('slave_connection',
secret=True,
help='The SQLAlchemy connection string to use to connect to the'
' slave database.'),
cfg.StrOpt('mysql_sql_mode',
default='TRADITIONAL',
help='The SQL mode to be used for MySQL sessions. '
'This option, including the default, overrides any '
'server-set SQL mode. To use whatever SQL mode '
'is set by the server configuration, '
'set this to no value. Example: mysql_sql_mode='),
cfg.IntOpt('max_pool_size',
help='Maximum number of SQL connections to keep open in a '
'pool.'),
cfg.IntOpt('max_retries',
default=10,
help='Maximum number of database connection retries '
'during startup. Set to -1 to specify an infinite '
'retry count.'),
cfg.IntOpt('retry_interval',
default=10,
help='Interval between retries of opening a SQL connection.'),
cfg.IntOpt('max_overflow',
help='If set, use this value for max_overflow with '
'SQLAlchemy.'),
cfg.IntOpt('connection_debug',
default=0,
help='Verbosity of SQL debugging information: 0=None, '
'100=Everything.'),
cfg.BoolOpt('connection_trace',
default=False,
help='Add Python stack traces to SQL as comment strings.'),
cfg.IntOpt('pool_timeout',
help='If set, use this value for pool_timeout with '
'SQLAlchemy.'),
]
# Datastore specific option groups
# Mysql
mysql_group = cfg.OptGroup(
'mysql', title='MySQL options',
@ -1541,9 +1484,6 @@ CONF = cfg.CONF
CONF.register_opts(path_opts)
CONF.register_opts(versions_opts)
CONF.register_opts(common_opts)
CONF.register_opts(database_opts, 'database')
CONF.register_group(mysql_group)
CONF.register_group(percona_group)
CONF.register_group(pxc_group)
@ -1577,6 +1517,7 @@ CONF.register_opts(service_credentials_opts, service_credentials_group)
CONF.register_opts(guest_agent_opts, guest_agent_group)
CONF.register_opts(rpcapi_cap_opts, upgrade_levels)
db_options.set_defaults(CONF, connection='sqlite://')
profiler.set_defaults(CONF)
logging.register_options(CONF)
@ -1590,7 +1531,6 @@ def list_opts():
trove_opts = [
(None, path_opts + versions_opts + common_opts),
('database', database_opts),
(mysql_group, mysql_opts),
(postgresql_group, postgresql_opts),
(mariadb_group, mariadb_opts),

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from contextlib import contextmanager
from oslo_log import log as logging
from oslo_utils import strutils
@ -53,8 +55,11 @@ class DatabaseModelBase(models.ModelBase):
return hasattr(self, 'deleted') and hasattr(self, 'deleted_at')
@classmethod
@contextmanager
def query(cls):
return get_db_api()._base_query(cls)
query = get_db_api()._base_query(cls)
yield query
query.session.commit()
def save(self):
if not self.is_valid():

View File

@ -21,19 +21,31 @@ from trove.db.sqlalchemy import session
def list(query_func, *args, **kwargs):
return query_func(*args, **kwargs).all()
query = query_func(*args, **kwargs)
res = query.all()
query.session.commit()
return res
def count(query, *args, **kwargs):
return query(*args, **kwargs).count()
query = query(*args, **kwargs)
res = query.count()
query.session.commit()
return res
def first(query, *args, **kwargs):
return query(*args, **kwargs).first()
query = query(*args, **kwargs)
res = query.first()
query.session.commit()
return res
def join(query, model, *args):
return query(model).join(*args)
query = query(model)
res = query.join(*args)
query.session.commit()
return res
def find_all(model, **conditions):
@ -47,7 +59,10 @@ def find_all_by_limit(query_func, model, conditions, limit, marker=None,
def find_by(model, **kwargs):
return _query_by(model, **kwargs).first()
query = _query_by(model, **kwargs)
res = query.first()
query.session.commit()
return res
def find_by_filter(model, **kwargs):
@ -58,9 +73,10 @@ def find_by_filter(model, **kwargs):
def save(model):
try:
db_session = session.get_session()
model = db_session.merge(model)
db_session.flush()
return model
with db_session.begin():
model = db_session.merge(model)
db_session.flush()
return model
except sqlalchemy.exc.IntegrityError as error:
raise exception.DBConstraintError(model_name=model.__class__.__name__,
error=str(error.orig))
@ -68,13 +84,16 @@ def save(model):
def delete(model):
db_session = session.get_session()
model = db_session.merge(model)
db_session.delete(model)
db_session.flush()
with db_session.begin():
model = db_session.merge(model)
db_session.delete(model)
db_session.flush()
def delete_all(query_func, model, **conditions):
query_func(model, **conditions).delete()
query = query_func(model, **conditions)
query.delete()
query.session.commit()
def update(model, **values):
@ -83,7 +102,9 @@ def update(model, **values):
def update_all(query_func, model, conditions, values):
query_func(model, **conditions).update(values)
query = query_func(model, **conditions)
query.update()
query.session.commit()
def configure_db(options, *plugins):
@ -119,7 +140,9 @@ def db_reset(options, *plugins):
def _base_query(cls):
return session.get_session().query(cls)
db_session = session.get_session()
query = db_session.query(cls)
return query
def _query_by(cls, **conditions):

View File

@ -14,9 +14,8 @@
# under the License.
import contextlib
import threading
from oslo_db.sqlalchemy import session
from oslo_db.sqlalchemy import session as db_session
from oslo_log import log as logging
from sqlalchemy import MetaData
@ -25,8 +24,6 @@ from trove.common.i18n import _
from trove.db.sqlalchemy import mappers
_FACADE = None
_LOCK = threading.Lock()
LOG = logging.getLogger(__name__)
@ -73,35 +70,9 @@ def configure_db(options, models_mapper=None):
def _create_facade(options):
global _LOCK, _FACADE
# TODO(mvandijk): Refactor this once oslo.db spec is implemented:
# https://specs.openstack.org/openstack/oslo-specs/specs/kilo/
# make-enginefacade-a-facade.html
global _FACADE
if _FACADE is None:
with _LOCK:
if _FACADE is None:
conf = CONF.database
# pop the deprecated config option 'query_log'
if conf.query_log:
if conf.connection_debug < 50:
conf['connection_debug'] = 50
LOG.warning(('Configuration option "query_log" has been '
'depracated. Use "connection_debug" '
'instead. Setting connection_debug = '
'%(debug_level)s instead.'),
conf.get('connection_debug'))
# TODO(mvandijk): once query_log is removed,
# use enginefacade.from_config() instead
database_opts = dict(CONF.database)
database_opts.pop('query_log')
# FIXME(wuchunyang): we need to remove reliance on autocommit
# semantics ASAP. since it's not compatible with
# SQLAlchemy 2.0
database_opts['autocommit'] = True
_FACADE = session.EngineFacade(
options['database']['connection'],
**database_opts
)
_FACADE = db_session.EngineFacade.from_config(options)
return _FACADE

View File

@ -2069,22 +2069,23 @@ def module_instance_count(context, module_id, include_clustered=False):
func.max(module_models.DBInstanceModule.updated)]
filters = [module_models.DBInstanceModule.module_id == module_id,
module_models.DBInstanceModule.deleted == 0]
query = module_models.DBInstanceModule.query()
query = query.join(
module_models.DBModule,
module_models.DBInstanceModule.module_id == module_models.DBModule.id)
query = query.join(
DBInstance,
module_models.DBInstanceModule.instance_id == DBInstance.id)
if not include_clustered:
filters.append(DBInstance.cluster_id.is_(None))
if not context.is_admin:
filters.append(DBInstance.tenant_id == context.project_id)
query = query.group_by(module_models.DBInstanceModule.md5)
query = query.add_columns(*columns)
query = query.filter(*filters)
query = query.order_by(module_models.DBInstanceModule.updated)
return query.all()
with module_models.DBInstanceModule.query() as query:
query = query.join(
module_models.DBModule,
module_models.DBInstanceModule.module_id ==
module_models.DBModule.id)
query = query.join(
DBInstance,
module_models.DBInstanceModule.instance_id == DBInstance.id)
if not include_clustered:
filters.append(DBInstance.cluster_id.is_(None))
if not context.is_admin:
filters.append(DBInstance.tenant_id == context.project_id)
query = query.group_by(module_models.DBInstanceModule.md5)
query = query.add_columns(*columns)
query = query.filter(*filters)
query = query.order_by(module_models.DBInstanceModule.updated)
return query.all()
def persist_instance_fault(notification, event_qualifier):

View File

@ -63,14 +63,16 @@ class Modules(object):
# build a query manually, since we need current tenant
# plus the 'all' tenant ones
query_opts['visible'] = True
db_info = DBModule.query().filter_by(**query_opts)
db_info = db_info.filter(
or_(DBModule.tenant_id == context.project_id,
DBModule.tenant_id.is_(None))
)
if db_info.count() == 0:
LOG.debug("No modules found for tenant %s", context.project_id)
modules = db_info.all()
with DBModule.query() as query:
db_info = query.filter_by(**query_opts)
db_info = db_info.filter(
or_(DBModule.tenant_id == context.project_id,
DBModule.tenant_id.is_(None))
)
if db_info.count() == 0:
LOG.debug("No modules found for tenant %s",
context.project_id)
modules = db_info.all()
return modules
@staticmethod
@ -83,14 +85,16 @@ class Modules(object):
query_opts = {'deleted': False,
'auto_apply': True}
db_info = DBModule.query().filter_by(**query_opts)
db_info = Modules.add_tenant_filter(db_info, context.project_id)
db_info = Modules.add_datastore_filter(db_info, datastore_id)
db_info = Modules.add_ds_version_filter(db_info, datastore_version_id)
if db_info.count() == 0:
LOG.debug("No auto-apply modules found for tenant %s",
context.project_id)
modules = db_info.all()
with DBModule.query() as query:
db_info = query.filter_by(**query_opts)
db_info = Modules.add_tenant_filter(db_info, context.project_id)
db_info = Modules.add_datastore_filter(db_info, datastore_id)
db_info = Modules.add_ds_version_filter(db_info,
datastore_version_id)
if db_info.count() == 0:
LOG.debug("No auto-apply modules found for tenant %s",
context.project_id)
modules = db_info.all()
return modules
@staticmethod
@ -122,12 +126,13 @@ class Modules(object):
modules = []
if module_ids:
query_opts = {'deleted': False}
db_info = DBModule.query().filter_by(**query_opts)
if not context.is_admin:
db_info = Modules.add_tenant_filter(db_info,
context.project_id)
db_info = db_info.filter(DBModule.id.in_(module_ids))
modules = db_info.all()
with DBModule.query() as query:
db_info = query.filter_by(**query_opts)
if not context.is_admin:
db_info = Modules.add_tenant_filter(db_info,
context.project_id)
db_info = db_info.filter(DBModule.id.in_(module_ids))
modules = db_info.all()
return modules
@staticmethod

View File

@ -145,7 +145,6 @@ class TestBackupController(trove_testtools.TestCase):
req = mock.MagicMock(GET={'project_id': str(uuid.uuid4())},
environ={wsgi.CONTEXT_KEY: self.context},
url='http://localhost')
res = self.controller.index(req, 'fake_tenant_id')
self.assertEqual(200, res.status)

View File

@ -468,8 +468,8 @@ class PaginationTests(trove_testtools.TestCase):
def tearDown(self):
super(PaginationTests, self).tearDown()
query = models.DBBackup.query()
query.filter_by(instance_id=self.instance_id).delete()
with models.DBBackup.query() as query:
query.filter_by(instance_id=self.instance_id).delete()
def test_pagination_list(self):
# page one
@ -541,8 +541,8 @@ class OrderingTests(trove_testtools.TestCase):
def tearDown(self):
super(OrderingTests, self).tearDown()
query = models.DBBackup.query()
query.filter_by(instance_id=self.instance_id).delete()
with models.DBBackup.query() as query:
query.filter_by(instance_id=self.instance_id).delete()
def test_list(self):
backups, marker = models.Backup.list(self.context)

View File

@ -33,9 +33,8 @@ class TestUpgradeChecksInstancesWithTasks(trove_testtools.TestCase):
fake_get_db_api):
fake_get_db_api.return_value = self.fake_db_api
mock_instance.query.return_value.filter.return_value.filter_by.\
return_value.count.return_value = 0
mock_instance.query.return_value.__enter__.return_value.filter.\
return_value.filter_by.return_value.count.return_value = 0
check_result = self.cmd._check_instances_with_running_tasks()
self.assertEqual(Code.SUCCESS, check_result.code)
@ -43,8 +42,7 @@ class TestUpgradeChecksInstancesWithTasks(trove_testtools.TestCase):
fake_get_db_api):
fake_get_db_api.return_value = self.fake_db_api
mock_instance.query.return_value.filter.return_value.filter_by.\
return_value.count.return_value = 1
mock_instance.query.return_value.__enter__.return_value.filter.\
return_value.filter_by.return_value.count.return_value = 1
check_result = self.cmd._check_instances_with_running_tasks()
self.assertEqual(Code.WARNING, check_result.code)

View File

@ -27,6 +27,9 @@ def init_db():
with LOCK:
global DB_SETUP
if not DB_SETUP:
CONF.set_override("connection",
"sqlite:///trove_test.sqlite",
"database")
db_api = get_db_api()
db_api.db_sync(CONF)
session.configure_db(CONF)