Prepare SM queries for sqlalchemy 1.4 upversion
For subcloud scalability purposes, sqlalchemy will be upgraded from version 1.3 to 1.4. This version change introduces some incompatibilities from the current version: - Session management: When calling model_query without providing a session, a new one would be created inside a context manager. This means that, upon exiting model_query, the query would be executed and any new filtering after would reopen the session. Version 1.3 was more forgiving regarding transactions and this was not an issue. On version 1.4, sessions called after transaction ended will still remain open [1], causing the pool to eventually overflow with sessions stuck in "idle in transaction" state. To fix this, the session creation inside model_query is changed to only start the context manager, which will then be closed only when exiting the db function itself. - Changed deprecated methods like PoolListener (changed to events) and removed autocommit (which although is still supported in 1.4, will be removed in 2.0). Test plan: - PASS: Build a custom ISO with the changes and deploy a DX system controller and a SX subcloud. Verify the system works as expected. - PASS: Manage a subcloud and verify the sync_status is "in-sync". - PASS: Soak the system and verify there was no connection leak and no sessions stuck in "idle in transaction" state. - PASS: Run DC sanity and regression. [1]: https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#session-features-new-autobegin-behavior [2]: https://docs.sqlalchemy.org/en/14/changelog/migration_14.html#rowproxy-is-no-longer-a-proxy-is-now-called-row-and-behaves-like-an-enhanced-named-tuple Story: 2011311 Task: 51732 Change-Id: Ic90dfd98a23ec0604256ae8db7437a8d1bc8bfed Signed-off-by: Victor Romano <victor.gluzromano@windriver.com>
This commit is contained in:
@@ -21,6 +21,10 @@
|
||||
|
||||
"""SQLAlchemy storage backend."""
|
||||
|
||||
import functools
|
||||
import sys
|
||||
|
||||
import eventlet
|
||||
from oslo_config import cfg
|
||||
|
||||
# TODO(deva): import MultipleResultsFound and handle it appropriately
|
||||
@@ -44,7 +48,7 @@ CONF.import_opt('connection',
|
||||
LOG = log.getLogger(__name__)
|
||||
|
||||
get_engine = db_session.get_engine
|
||||
get_session = db_session.get_session
|
||||
get_session_manager = db_session.get_session_manager
|
||||
|
||||
|
||||
def _paginate_query(model, limit=None, marker=None, sort_key=None,
|
||||
@@ -64,15 +68,65 @@ def get_backend():
|
||||
return Connection()
|
||||
|
||||
|
||||
def db_session_cleanup(cls):
|
||||
"""Class decorator that automatically adds session cleanup to all non-special methods."""
|
||||
|
||||
def method_decorator(method):
|
||||
@functools.wraps(method)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
_context = eventlet.greenthread.getcurrent()
|
||||
|
||||
try:
|
||||
return method(self, *args, **kwargs)
|
||||
finally:
|
||||
if (hasattr(_context, "_db_session_context") and
|
||||
_context._db_session_context is not None):
|
||||
try:
|
||||
if hasattr(_context, "_db_session_context"):
|
||||
exc_info = sys.exc_info()
|
||||
_context._db_session_context.__exit__(*exc_info)
|
||||
except Exception as e:
|
||||
LOG.warning(f"Error closing database session: {e}")
|
||||
|
||||
# Clear the session
|
||||
_context._db_session = None
|
||||
_context._db_session_context = None
|
||||
|
||||
return wrapper
|
||||
|
||||
for attr_name in dir(cls):
|
||||
# Skip special methods
|
||||
if not attr_name.startswith("__"):
|
||||
attr = getattr(cls, attr_name)
|
||||
if callable(attr):
|
||||
setattr(cls, attr_name, method_decorator(attr))
|
||||
|
||||
return cls
|
||||
|
||||
|
||||
def model_query(model, *args, **kwargs):
|
||||
"""Query helper for simpler session usage.
|
||||
|
||||
If the session is already provided in the kwargs, use it. Otherwise,
|
||||
try to get it from thread context. If it's not there, create a new one.
|
||||
|
||||
:param session: if present, the session to use
|
||||
"""
|
||||
session = kwargs.get('session')
|
||||
if not session:
|
||||
_context = eventlet.greenthread.getcurrent()
|
||||
if hasattr(_context, '_db_session') and _context._db_session is not None:
|
||||
session = _context._db_session
|
||||
else:
|
||||
session_context = get_session_manager()
|
||||
session = session_context.__enter__()
|
||||
_context._db_session = session
|
||||
# Need to store the session context to call __exit__ method later
|
||||
_context._db_session_context = session_context
|
||||
|
||||
session = kwargs.get('session') or get_session()
|
||||
query = session.query(model, *args)
|
||||
return query
|
||||
query = session.query(model, *args)
|
||||
|
||||
return session.query(model, *args)
|
||||
|
||||
|
||||
def add_identity_filter(query, value, model, use_name=False):
|
||||
@@ -119,6 +173,7 @@ def add_filter_by_many_identities(query, model, values):
|
||||
raise exception.InvalidIdentity(identity=value)
|
||||
|
||||
|
||||
@db_session_cleanup
|
||||
class Connection(api.Connection):
|
||||
"""SqlAlchemy connection."""
|
||||
|
||||
|
@@ -35,10 +35,11 @@ Initializing:
|
||||
|
||||
Recommended ways to use sessions within this framework:
|
||||
|
||||
* Don't use them explicitly; this is like running with AUTOCOMMIT=1.
|
||||
model_query() will implicitly use a session when called without one
|
||||
supplied. This is the ideal situation because it will allow queries
|
||||
to be automatically retried if the database connection is interrupted.
|
||||
* Don't use them explicitly; when entering model_query(), a session will be created
|
||||
and stored in the thread context; upon exiting the db function, the __exit__ method of
|
||||
the session will be called, which will commit/rollback and then close it. It's important
|
||||
to note that AUTOCOMMIT was deprecated in sqlalchemy 1.4 and removed in 2.0, so that's
|
||||
why we need to manually manage the session this way.
|
||||
|
||||
Note: Automatic retry will be enabled in a future patch.
|
||||
|
||||
@@ -246,18 +247,19 @@ Efficient use of soft deletes:
|
||||
|
||||
"""
|
||||
|
||||
from contextlib import contextmanager
|
||||
import re
|
||||
import time
|
||||
|
||||
from eventlet import greenthread
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
from sqlalchemy import event
|
||||
from sqlalchemy import exc as sqla_exc
|
||||
import sqlalchemy.interfaces
|
||||
from sqlalchemy.interfaces import PoolListener
|
||||
import sqlalchemy.orm
|
||||
from sqlalchemy.pool import NullPool, StaticPool
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
from sqlalchemy import __version__ as sa_version
|
||||
|
||||
from sm_api.openstack.common.db import exception
|
||||
from sm_api.openstack.common import log as logging
|
||||
@@ -357,7 +359,7 @@ def cleanup():
|
||||
_ENGINE = None
|
||||
|
||||
|
||||
class SqliteForeignKeysListener(PoolListener):
|
||||
class SqliteForeignKeysListener:
|
||||
"""
|
||||
Ensures that the foreign key constraints are enforced in SQLite.
|
||||
|
||||
@@ -370,17 +372,28 @@ class SqliteForeignKeysListener(PoolListener):
|
||||
dbapi_con.execute('pragma foreign_keys=ON')
|
||||
|
||||
|
||||
def get_session(autocommit=True, expire_on_commit=False,
|
||||
sqlite_fk=False):
|
||||
def get_session(expire_on_commit=False, sqlite_fk=False):
|
||||
"""Return a SQLAlchemy session."""
|
||||
global _MAKER
|
||||
|
||||
if _MAKER is None:
|
||||
engine = get_engine(sqlite_fk=sqlite_fk)
|
||||
_MAKER = get_maker(engine, autocommit, expire_on_commit)
|
||||
_MAKER = get_maker(engine, expire_on_commit)
|
||||
|
||||
session = _MAKER()
|
||||
return session
|
||||
return _MAKER()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_session_manager():
|
||||
session = get_session()
|
||||
try:
|
||||
yield session
|
||||
session.commit()
|
||||
except Exception:
|
||||
session.rollback()
|
||||
raise
|
||||
finally:
|
||||
session.close()
|
||||
|
||||
|
||||
# note(boris-42): In current versions of DB backends unique constraint
|
||||
@@ -557,12 +570,15 @@ def _is_db_connection_error(args):
|
||||
|
||||
def create_engine(sql_connection, sqlite_fk=False):
|
||||
"""Return a new SQLAlchemy engine."""
|
||||
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
|
||||
if sa_version >= '1.4.0':
|
||||
connection_dict = sqlalchemy.engine.URL.create(sql_connection)
|
||||
else:
|
||||
connection_dict = sqlalchemy.engine.url.make_url(sql_connection)
|
||||
|
||||
engine_args = {
|
||||
"pool_recycle": CONF.database.idle_timeout,
|
||||
"echo": False,
|
||||
'convert_unicode': True,
|
||||
'echo_pool': False,
|
||||
}
|
||||
|
||||
# Map our SQL debug level to SQLAlchemy's options
|
||||
@@ -571,9 +587,12 @@ def create_engine(sql_connection, sqlite_fk=False):
|
||||
elif CONF.database.connection_debug >= 50:
|
||||
engine_args['echo'] = True
|
||||
|
||||
engine = None
|
||||
|
||||
if "sqlite" in connection_dict.drivername:
|
||||
if sqlite_fk:
|
||||
engine_args["listeners"] = [SqliteForeignKeysListener()]
|
||||
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
|
||||
event.listen(engine, 'connect', SqliteForeignKeysListener())
|
||||
engine_args["poolclass"] = NullPool
|
||||
|
||||
if CONF.database.connection == "sqlite://":
|
||||
@@ -584,7 +603,8 @@ def create_engine(sql_connection, sqlite_fk=False):
|
||||
if CONF.database.max_overflow is not None:
|
||||
engine_args['max_overflow'] = CONF.database.max_overflow
|
||||
|
||||
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
|
||||
if not engine:
|
||||
engine = sqlalchemy.create_engine(sql_connection, **engine_args)
|
||||
|
||||
sqlalchemy.event.listen(engine, 'checkin', _greenthread_yield)
|
||||
|
||||
@@ -601,8 +621,9 @@ def create_engine(sql_connection, sqlite_fk=False):
|
||||
_patch_mysqldb_with_stacktrace_comments()
|
||||
|
||||
try:
|
||||
engine.connect()
|
||||
except sqla_exc.OperationalError as e:
|
||||
with engine.connect() as conn:
|
||||
conn.execute(sqlalchemy.text("SELECT 1"))
|
||||
except sqlalchemy.exc.OperationalError as e:
|
||||
if not _is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
|
||||
@@ -616,9 +637,10 @@ def create_engine(sql_connection, sqlite_fk=False):
|
||||
remaining -= 1
|
||||
time.sleep(CONF.database.retry_interval)
|
||||
try:
|
||||
engine.connect()
|
||||
with engine.connect() as conn:
|
||||
conn.execute(sqlalchemy.text("SELECT 1"))
|
||||
break
|
||||
except sqla_exc.OperationalError as e:
|
||||
except sqlalchemy.exc.OperationalError as e:
|
||||
if (remaining != 'infinite' and remaining == 0) or \
|
||||
not _is_db_connection_error(e.args[0]):
|
||||
raise
|
||||
@@ -650,13 +672,14 @@ class Session(sqlalchemy.orm.session.Session):
|
||||
return super(Session, self).execute(*args, **kwargs)
|
||||
|
||||
|
||||
def get_maker(engine, autocommit=True, expire_on_commit=False):
|
||||
def get_maker(engine, expire_on_commit=False):
|
||||
"""Return a SQLAlchemy sessionmaker using the given engine."""
|
||||
return sqlalchemy.orm.sessionmaker(bind=engine,
|
||||
class_=Session,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit,
|
||||
query_cls=Query)
|
||||
return sqlalchemy.orm.sessionmaker(
|
||||
bind=engine,
|
||||
class_=Session,
|
||||
expire_on_commit=expire_on_commit,
|
||||
query_cls=Query
|
||||
)
|
||||
|
||||
|
||||
def _patch_mysqldb_with_stacktrace_comments():
|
||||
|
Reference in New Issue
Block a user