Sync common db code from Oslo
This sync contains commit ``f7705f3 Make table utf-8 charset checking be optional for DB migration`` which will be used to support bug #1279000 fix. Full list of changes: 9fed4ed Fix Keystone doc build errors with SQLAlchemy 0.9 f7705f3 Make table utf-8 charset checking be optional for DB migration 5b7e61c Dispose db connections pool on disconnect d1988b9 Set sql_mode callback on connect instead of checkout a1a8280 Fix excessive logging from db.sqlalchemy.session dc2d829 Add lockutils fixture to OpportunisticTestCase d10f871 Adapt DB provisioning code for CI requirements 5920bed Make db utils importable without migrate 9933bdd Get mysql_sql_mode parameter from config 96a2217 Prevent incorrect usage of _wrap_db_error() 6cab37c Python3: define a __next__() method for ModelBase 20a7510 Add from_config() method to EngineFacade Change-Id: I115f5610101a8512af484111469f5060dce27919 Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>
This commit is contained in:
parent
84cb790ce9
commit
3e301eb960
@ -40,9 +40,12 @@ database_opts = [
|
||||
cfg.DeprecatedOpt('connection',
|
||||
group='sql'), ]),
|
||||
cfg.StrOpt('mysql_sql_mode',
|
||||
help='The SQL mode to be used for MySQL sessions '
|
||||
'(default is empty, meaning do not override '
|
||||
'any server-side SQL mode setting)'),
|
||||
default='TRADITIONAL',
|
||||
help='The SQL mode to be used for MySQL sessions. '
|
||||
'This option, including the default, overrides any '
|
||||
'server-set SQL mode. To use whatever SQL mode '
|
||||
'is set by the server configuration, '
|
||||
'set this to no value. Example: mysql_sql_mode='),
|
||||
cfg.IntOpt('idle_timeout',
|
||||
default=3600,
|
||||
deprecated_opts=[cfg.DeprecatedOpt('sql_idle_timeout',
|
||||
|
@ -168,7 +168,7 @@ def patch_migrate():
|
||||
sqlite.SQLiteConstraintGenerator)
|
||||
|
||||
|
||||
def db_sync(engine, abs_path, version=None, init_version=0):
|
||||
def db_sync(engine, abs_path, version=None, init_version=0, sanity_check=True):
|
||||
"""Upgrade or downgrade a database.
|
||||
|
||||
Function runs the upgrade() or downgrade() functions in change scripts.
|
||||
@ -179,7 +179,9 @@ def db_sync(engine, abs_path, version=None, init_version=0):
|
||||
If None - database will update to the latest
|
||||
available version.
|
||||
:param init_version: Initial database version
|
||||
:param sanity_check: Require schema sanity checking for all tables
|
||||
"""
|
||||
|
||||
if version is not None:
|
||||
try:
|
||||
version = int(version)
|
||||
@ -189,6 +191,7 @@ def db_sync(engine, abs_path, version=None, init_version=0):
|
||||
|
||||
current_version = db_version(engine, abs_path, init_version)
|
||||
repository = _find_migrate_repo(abs_path)
|
||||
if sanity_check:
|
||||
_db_schema_sanity_check(engine)
|
||||
if version is None or version > current_version:
|
||||
return versioning_api.upgrade(engine, repository, version)
|
||||
|
@ -29,7 +29,7 @@ from sqlalchemy.orm import object_mapper
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
|
||||
class ModelBase(object):
|
||||
class ModelBase(six.Iterator):
|
||||
"""Base class for models."""
|
||||
__table_initialized__ = False
|
||||
|
||||
@ -78,10 +78,14 @@ class ModelBase(object):
|
||||
self._i = iter(columns)
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
# In Python 3, __next__() has replaced next().
|
||||
def __next__(self):
|
||||
n = six.advance_iterator(self._i)
|
||||
return n, getattr(self, n)
|
||||
|
||||
def next(self):
|
||||
return self.__next__()
|
||||
|
||||
def update(self, values):
|
||||
"""Make the model object behave like a dict."""
|
||||
for k, v in six.iteritems(values):
|
||||
|
@ -16,6 +16,7 @@
|
||||
"""Provision test environment for specific DB backends"""
|
||||
|
||||
import argparse
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import string
|
||||
@ -26,23 +27,12 @@ import sqlalchemy
|
||||
from glance.openstack.common.db import exception as exc
|
||||
|
||||
|
||||
SQL_CONNECTION = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION', 'sqlite://')
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _gen_credentials(*names):
|
||||
"""Generate credentials."""
|
||||
auth_dict = {}
|
||||
for name in names:
|
||||
val = ''.join(random.choice(string.ascii_lowercase)
|
||||
for i in moves.range(10))
|
||||
auth_dict[name] = val
|
||||
return auth_dict
|
||||
|
||||
|
||||
def _get_engine(uri=SQL_CONNECTION):
|
||||
def get_engine(uri):
|
||||
"""Engine creation
|
||||
|
||||
By default the uri is SQL_CONNECTION which is admin credentials.
|
||||
Call the function without arguments to get admin connection. Admin
|
||||
connection required to create temporary user and database for each
|
||||
particular test. Otherwise use existing connection to recreate connection
|
||||
@ -62,50 +52,43 @@ def _execute_sql(engine, sql, driver):
|
||||
except sqlalchemy.exc.OperationalError:
|
||||
msg = ('%s does not match database admin '
|
||||
'credentials or database does not exist.')
|
||||
raise exc.DBConnectionError(msg % SQL_CONNECTION)
|
||||
LOG.exception(msg % engine.url)
|
||||
raise exc.DBConnectionError(msg % engine.url)
|
||||
|
||||
|
||||
def create_database(engine):
|
||||
"""Provide temporary user and database for each particular test."""
|
||||
driver = engine.name
|
||||
|
||||
auth = _gen_credentials('database', 'user', 'passwd')
|
||||
|
||||
sqls = {
|
||||
'mysql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"grant all on %(database)s.* to '%(user)s'@'localhost'"
|
||||
" identified by '%(passwd)s';",
|
||||
"create database %(database)s;",
|
||||
],
|
||||
'postgresql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"drop user if exists %(user)s;",
|
||||
"create user %(user)s with password '%(passwd)s';",
|
||||
"create database %(database)s owner %(user)s;",
|
||||
]
|
||||
auth = {
|
||||
'database': ''.join(random.choice(string.ascii_lowercase)
|
||||
for i in moves.range(10)),
|
||||
'user': engine.url.username,
|
||||
'passwd': engine.url.password,
|
||||
}
|
||||
|
||||
sqls = [
|
||||
"drop database if exists %(database)s;",
|
||||
"create database %(database)s;"
|
||||
]
|
||||
|
||||
if driver == 'sqlite':
|
||||
return 'sqlite:////tmp/%s' % auth['database']
|
||||
|
||||
try:
|
||||
sql_rows = sqls[driver]
|
||||
except KeyError:
|
||||
raise ValueError('Unsupported RDBMS %s' % driver)
|
||||
sql_query = map(lambda x: x % auth, sql_rows)
|
||||
|
||||
elif driver in ['mysql', 'postgresql']:
|
||||
sql_query = map(lambda x: x % auth, sqls)
|
||||
_execute_sql(engine, sql_query, driver)
|
||||
else:
|
||||
raise ValueError('Unsupported RDBMS %s' % driver)
|
||||
|
||||
params = auth.copy()
|
||||
params['backend'] = driver
|
||||
return "%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s" % params
|
||||
|
||||
|
||||
def drop_database(engine, current_uri):
|
||||
def drop_database(admin_engine, current_uri):
|
||||
"""Drop temporary database and user after each particular test."""
|
||||
engine = _get_engine(current_uri)
|
||||
admin_engine = _get_engine()
|
||||
|
||||
engine = get_engine(current_uri)
|
||||
driver = engine.name
|
||||
auth = {'database': engine.url.database, 'user': engine.url.username}
|
||||
|
||||
@ -114,26 +97,11 @@ def drop_database(engine, current_uri):
|
||||
os.remove(auth['database'])
|
||||
except OSError:
|
||||
pass
|
||||
return
|
||||
|
||||
sqls = {
|
||||
'mysql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"drop user '%(user)s'@'localhost';",
|
||||
],
|
||||
'postgresql': [
|
||||
"drop database if exists %(database)s;",
|
||||
"drop user if exists %(user)s;",
|
||||
]
|
||||
}
|
||||
|
||||
try:
|
||||
sql_rows = sqls[driver]
|
||||
except KeyError:
|
||||
elif driver in ['mysql', 'postgresql']:
|
||||
sql = "drop database if exists %(database)s;"
|
||||
_execute_sql(admin_engine, [sql % auth], driver)
|
||||
else:
|
||||
raise ValueError('Unsupported RDBMS %s' % driver)
|
||||
sql_query = map(lambda x: x % auth, sql_rows)
|
||||
|
||||
_execute_sql(admin_engine, sql_query, driver)
|
||||
|
||||
|
||||
def main():
|
||||
@ -172,7 +140,9 @@ def main():
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
engine = _get_engine()
|
||||
connection_string = os.getenv('OS_TEST_DBAPI_ADMIN_CONNECTION',
|
||||
'sqlite://')
|
||||
engine = get_engine(connection_string)
|
||||
which = args.which
|
||||
|
||||
if which == "create":
|
||||
|
@ -291,7 +291,7 @@ from sqlalchemy.pool import NullPool, StaticPool
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
|
||||
from glance.openstack.common.db import exception
|
||||
from glance.openstack.common.gettextutils import _LE, _LW, _LI
|
||||
from glance.openstack.common.gettextutils import _LE, _LW
|
||||
from glance.openstack.common import timeutils
|
||||
|
||||
|
||||
@ -428,13 +428,14 @@ def _raise_if_deadlock_error(operational_error, engine_name):
|
||||
|
||||
|
||||
def _wrap_db_error(f):
|
||||
#TODO(rpodolyaka): in a subsequent commit make this a class decorator to
|
||||
# ensure it can only applied to Session subclasses instances (as we use
|
||||
# Session instance bind attribute below)
|
||||
|
||||
@functools.wraps(f)
|
||||
def _wrap(self, *args, **kwargs):
|
||||
try:
|
||||
assert issubclass(
|
||||
self.__class__, sqlalchemy.orm.session.Session
|
||||
), ('_wrap_db_error() can only be applied to methods of '
|
||||
'subclasses of sqlalchemy.orm.session.Session.')
|
||||
|
||||
return f(self, *args, **kwargs)
|
||||
except UnicodeEncodeError:
|
||||
raise exception.DBInvalidUnicodeParameter()
|
||||
@ -504,25 +505,20 @@ def _ping_listener(engine, dbapi_conn, connection_rec, connection_proxy):
|
||||
if engine.dialect.is_disconnect(ex, dbapi_conn, cursor):
|
||||
msg = _LW('Database server has gone away: %s') % ex
|
||||
LOG.warning(msg)
|
||||
|
||||
# if the database server has gone away, all connections in the pool
|
||||
# have become invalid and we can safely close all of them here,
|
||||
# rather than waste time on checking of every single connection
|
||||
engine.dispose()
|
||||
|
||||
# this will be handled by SQLAlchemy and will force it to create
|
||||
# a new connection and retry the original action
|
||||
raise sqla_exc.DisconnectionError(msg)
|
||||
else:
|
||||
raise
|
||||
|
||||
|
||||
def _set_mode_traditional(dbapi_con, connection_rec, connection_proxy):
|
||||
"""Set engine mode to 'traditional'.
|
||||
|
||||
Required to prevent silent truncates at insert or update operations
|
||||
under MySQL. By default MySQL truncates inserted string if it longer
|
||||
than a declared field just with warning. That is fraught with data
|
||||
corruption.
|
||||
"""
|
||||
_set_session_sql_mode(dbapi_con, connection_rec,
|
||||
connection_proxy, 'TRADITIONAL')
|
||||
|
||||
|
||||
def _set_session_sql_mode(dbapi_con, connection_rec,
|
||||
connection_proxy, sql_mode=None):
|
||||
def _set_session_sql_mode(dbapi_con, connection_rec, sql_mode=None):
|
||||
"""Set the sql_mode session variable.
|
||||
|
||||
MySQL supports several server modes. The default is None, but sessions
|
||||
@ -531,30 +527,54 @@ def _set_session_sql_mode(dbapi_con, connection_rec,
|
||||
|
||||
Note: passing in '' (empty string) for sql_mode clears
|
||||
the SQL mode for the session, overriding a potentially set
|
||||
server default. Passing in None (the default) makes this
|
||||
a no-op, meaning if a server-side SQL mode is set, it still applies.
|
||||
server default.
|
||||
"""
|
||||
|
||||
cursor = dbapi_con.cursor()
|
||||
if sql_mode is not None:
|
||||
cursor.execute("SET SESSION sql_mode = %s", [sql_mode])
|
||||
|
||||
# Check against the real effective SQL mode. Even when unset by
|
||||
|
||||
def _mysql_get_effective_sql_mode(engine):
|
||||
"""Returns the effective SQL mode for connections from the engine pool.
|
||||
|
||||
Returns ``None`` if the mode isn't available, otherwise returns the mode.
|
||||
|
||||
"""
|
||||
# Get the real effective SQL mode. Even when unset by
|
||||
# our own config, the server may still be operating in a specific
|
||||
# SQL mode as set by the server configuration
|
||||
cursor.execute("SHOW VARIABLES LIKE 'sql_mode'")
|
||||
row = cursor.fetchone()
|
||||
# SQL mode as set by the server configuration.
|
||||
# Also note that the checkout listener will be called on execute to
|
||||
# set the mode if it's registered.
|
||||
row = engine.execute("SHOW VARIABLES LIKE 'sql_mode'").fetchone()
|
||||
if row is None:
|
||||
return
|
||||
return row[1]
|
||||
|
||||
|
||||
def _mysql_check_effective_sql_mode(engine):
|
||||
"""Logs a message based on the effective SQL mode for MySQL connections."""
|
||||
realmode = _mysql_get_effective_sql_mode(engine)
|
||||
|
||||
if realmode is None:
|
||||
LOG.warning(_LW('Unable to detect effective SQL mode'))
|
||||
return
|
||||
realmode = row[1]
|
||||
LOG.info(_LI('MySQL server mode set to %s') % realmode)
|
||||
|
||||
LOG.debug('MySQL server mode set to %s', realmode)
|
||||
# 'TRADITIONAL' mode enables several other modes, so
|
||||
# we need a substring match here
|
||||
if not ('TRADITIONAL' in realmode.upper() or
|
||||
'STRICT_ALL_TABLES' in realmode.upper()):
|
||||
LOG.warning(_LW("MySQL SQL mode is '%s', "
|
||||
"consider enabling TRADITIONAL or STRICT_ALL_TABLES")
|
||||
% realmode)
|
||||
"consider enabling TRADITIONAL or STRICT_ALL_TABLES"),
|
||||
realmode)
|
||||
|
||||
|
||||
def _mysql_set_mode_callback(engine, sql_mode):
|
||||
if sql_mode is not None:
|
||||
mode_callback = functools.partial(_set_session_sql_mode,
|
||||
sql_mode=sql_mode)
|
||||
sqlalchemy.event.listen(engine, 'connect', mode_callback)
|
||||
_mysql_check_effective_sql_mode(engine)
|
||||
|
||||
|
||||
def _is_db_connection_error(args):
|
||||
@ -582,7 +602,7 @@ def _raise_if_db_connection_lost(error, engine):
|
||||
|
||||
|
||||
def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
|
||||
mysql_traditional_mode=False, idle_timeout=3600,
|
||||
idle_timeout=3600,
|
||||
connection_debug=0, max_pool_size=None, max_overflow=None,
|
||||
pool_timeout=None, sqlite_synchronous=True,
|
||||
connection_trace=False, max_retries=10, retry_interval=10):
|
||||
@ -629,12 +649,8 @@ def create_engine(sql_connection, sqlite_fk=False, mysql_sql_mode=None,
|
||||
ping_callback = functools.partial(_ping_listener, engine)
|
||||
sqlalchemy.event.listen(engine, 'checkout', ping_callback)
|
||||
if engine.name == 'mysql':
|
||||
if mysql_traditional_mode:
|
||||
mysql_sql_mode = 'TRADITIONAL'
|
||||
if mysql_sql_mode:
|
||||
mode_callback = functools.partial(_set_session_sql_mode,
|
||||
sql_mode=mysql_sql_mode)
|
||||
sqlalchemy.event.listen(engine, 'checkout', mode_callback)
|
||||
_mysql_set_mode_callback(engine, mysql_sql_mode)
|
||||
elif 'sqlite' in connection_dict.drivername:
|
||||
if not sqlite_synchronous:
|
||||
sqlalchemy.event.listen(engine, 'connect',
|
||||
@ -765,6 +781,7 @@ class EngineFacade(object):
|
||||
integrate engine/sessionmaker instances into your apps any way you like
|
||||
(e.g. one might want to bind a session to a request context). Two important
|
||||
things to remember:
|
||||
|
||||
1. An Engine instance is effectively a pool of DB connections, so it's
|
||||
meant to be shared (and it's thread-safe).
|
||||
2. A Session instance is not meant to be shared and represents a DB
|
||||
@ -774,16 +791,13 @@ class EngineFacade(object):
|
||||
"""
|
||||
|
||||
def __init__(self, sql_connection,
|
||||
sqlite_fk=False, mysql_sql_mode=None,
|
||||
autocommit=True, expire_on_commit=False, **kwargs):
|
||||
sqlite_fk=False, autocommit=True,
|
||||
expire_on_commit=False, **kwargs):
|
||||
"""Initialize engine and sessionmaker instances.
|
||||
|
||||
:param sqlite_fk: enable foreign keys in SQLite
|
||||
:type sqlite_fk: bool
|
||||
|
||||
:param mysql_sql_mode: set SQL mode in MySQL
|
||||
:type mysql_sql_mode: string
|
||||
|
||||
:param autocommit: use autocommit mode for created Session instances
|
||||
:type autocommit: bool
|
||||
|
||||
@ -792,6 +806,8 @@ class EngineFacade(object):
|
||||
|
||||
Keyword arguments:
|
||||
|
||||
:keyword mysql_sql_mode: the SQL mode to be used for MySQL sessions.
|
||||
(defaults to TRADITIONAL)
|
||||
:keyword idle_timeout: timeout before idle sql connections are reaped
|
||||
(defaults to 3600)
|
||||
:keyword connection_debug: verbosity of SQL debugging information.
|
||||
@ -819,7 +835,7 @@ class EngineFacade(object):
|
||||
self._engine = create_engine(
|
||||
sql_connection=sql_connection,
|
||||
sqlite_fk=sqlite_fk,
|
||||
mysql_sql_mode=mysql_sql_mode,
|
||||
mysql_sql_mode=kwargs.get('mysql_sql_mode', 'TRADITIONAL'),
|
||||
idle_timeout=kwargs.get('idle_timeout', 3600),
|
||||
connection_debug=kwargs.get('connection_debug', 0),
|
||||
max_pool_size=kwargs.get('max_pool_size'),
|
||||
@ -858,3 +874,31 @@ class EngineFacade(object):
|
||||
del kwargs[arg]
|
||||
|
||||
return self._session_maker(**kwargs)
|
||||
|
||||
@classmethod
|
||||
def from_config(cls, connection_string, conf,
|
||||
sqlite_fk=False, autocommit=True, expire_on_commit=False):
|
||||
"""Initialize EngineFacade using oslo.config config instance options.
|
||||
|
||||
:param connection_string: SQLAlchemy connection string
|
||||
:type connection_string: string
|
||||
|
||||
:param conf: oslo.config config instance
|
||||
:type conf: oslo.config.cfg.ConfigOpts
|
||||
|
||||
:param sqlite_fk: enable foreign keys in SQLite
|
||||
:type sqlite_fk: bool
|
||||
|
||||
:param autocommit: use autocommit mode for created Session instances
|
||||
:type autocommit: bool
|
||||
|
||||
:param expire_on_commit: expire session objects on commit
|
||||
:type expire_on_commit: bool
|
||||
|
||||
"""
|
||||
|
||||
return cls(sql_connection=connection_string,
|
||||
sqlite_fk=sqlite_fk,
|
||||
autocommit=autocommit,
|
||||
expire_on_commit=expire_on_commit,
|
||||
**dict(conf.database.items()))
|
||||
|
@ -22,6 +22,7 @@ import six
|
||||
|
||||
from glance.openstack.common.db.sqlalchemy import session
|
||||
from glance.openstack.common.db.sqlalchemy import utils
|
||||
from glance.openstack.common.fixture import lockutils
|
||||
from glance.openstack.common import test
|
||||
|
||||
|
||||
@ -120,6 +121,9 @@ class OpportunisticTestCase(DbTestCase):
|
||||
FIXTURE = abc.abstractproperty(lambda: None)
|
||||
|
||||
def setUp(self):
|
||||
# TODO(bnemec): Remove this once infra is ready for
|
||||
# https://review.openstack.org/#/c/74963/ to merge.
|
||||
self.useFixture(lockutils.LockFixture('opportunistic-db'))
|
||||
credentials = {
|
||||
'backend': self.FIXTURE.DRIVER,
|
||||
'user': self.FIXTURE.USERNAME,
|
||||
|
@ -19,7 +19,6 @@
|
||||
import logging
|
||||
import re
|
||||
|
||||
from migrate.changeset import UniqueConstraint
|
||||
import sqlalchemy
|
||||
from sqlalchemy import Boolean
|
||||
from sqlalchemy import CheckConstraint
|
||||
@ -33,7 +32,6 @@ from sqlalchemy import MetaData
|
||||
from sqlalchemy import or_
|
||||
from sqlalchemy.sql.expression import literal_column
|
||||
from sqlalchemy.sql.expression import UpdateBase
|
||||
from sqlalchemy.sql import select
|
||||
from sqlalchemy import String
|
||||
from sqlalchemy import Table
|
||||
from sqlalchemy.types import NullType
|
||||
@ -218,6 +216,9 @@ def model_query(context, model, session, args=None, project_only=False,
|
||||
:type read_deleted: bool
|
||||
|
||||
Usage:
|
||||
|
||||
..code:: python
|
||||
|
||||
result = (utils.model_query(context, models.Instance, session=session)
|
||||
.filter_by(uuid=instance_uuid)
|
||||
.all())
|
||||
@ -227,6 +228,7 @@ def model_query(context, model, session, args=None, project_only=False,
|
||||
session=session,
|
||||
args=(func.count(Node.id), func.sum(Node.ram))
|
||||
).filter_by(project_id=project_id)
|
||||
|
||||
"""
|
||||
|
||||
if not read_deleted:
|
||||
@ -298,6 +300,10 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
|
||||
**col_name_col_instance):
|
||||
"""Drop unique constraint from table.
|
||||
|
||||
DEPRECATED: this function is deprecated and will be removed from glance.db
|
||||
in a few releases. Please use UniqueConstraint.drop() method directly for
|
||||
sqlalchemy-migrate migration scripts.
|
||||
|
||||
This method drops UC from table and works for mysql, postgresql and sqlite.
|
||||
In mysql and postgresql we are able to use "alter table" construction.
|
||||
Sqlalchemy doesn't support some sqlite column types and replaces their
|
||||
@ -314,6 +320,8 @@ def drop_unique_constraint(migrate_engine, table_name, uc_name, *columns,
|
||||
types by sqlite. For example BigInteger.
|
||||
"""
|
||||
|
||||
from migrate.changeset import UniqueConstraint
|
||||
|
||||
meta = MetaData()
|
||||
meta.bind = migrate_engine
|
||||
t = Table(table_name, meta, autoload=True)
|
||||
@ -353,8 +361,8 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
|
||||
columns_for_select = [func.max(table.c.id)]
|
||||
columns_for_select.extend(columns_for_group_by)
|
||||
|
||||
duplicated_rows_select = select(columns_for_select,
|
||||
group_by=columns_for_group_by,
|
||||
duplicated_rows_select = sqlalchemy.sql.select(
|
||||
columns_for_select, group_by=columns_for_group_by,
|
||||
having=func.count(table.c.id) > 1)
|
||||
|
||||
for row in migrate_engine.execute(duplicated_rows_select):
|
||||
@ -365,7 +373,8 @@ def drop_old_duplicate_entries_from_table(migrate_engine, table_name,
|
||||
for name in uc_column_names:
|
||||
delete_condition &= table.c[name] == row[name]
|
||||
|
||||
rows_to_delete_select = select([table.c.id]).where(delete_condition)
|
||||
rows_to_delete_select = sqlalchemy.sql.select(
|
||||
[table.c.id]).where(delete_condition)
|
||||
for row in migrate_engine.execute(rows_to_delete_select).fetchall():
|
||||
LOG.info(_LI("Deleting duplicated row with id: %(id)s from table: "
|
||||
"%(table)s") % dict(id=row[0], table=table_name))
|
||||
@ -476,7 +485,7 @@ def _change_deleted_column_type_to_boolean_sqlite(migrate_engine, table_name,
|
||||
else:
|
||||
c_select.append(table.c.deleted == table.c.id)
|
||||
|
||||
ins = InsertFromSelect(new_table, select(c_select))
|
||||
ins = InsertFromSelect(new_table, sqlalchemy.sql.select(c_select))
|
||||
migrate_engine.execute(ins)
|
||||
|
||||
table.drop()
|
||||
|
Loading…
Reference in New Issue
Block a user