Merge "sql: Remove dead helpers"

This commit is contained in:
Zuul 2022-02-11 17:41:34 +00:00 committed by Gerrit Code Review
commit 3d173b9b18
1 changed files with 1 additions and 69 deletions

View File

@ -52,7 +52,6 @@ from oslo_log import fixture as log_fixture
from oslo_log import log
from oslotest import base as test_base
import sqlalchemy.exc
from sqlalchemy import inspect
from keystone.cmd import cli
from keystone.common import sql
@ -335,17 +334,9 @@ class MigrateBase(
return sqlalchemy.MetaData(self.engine)
def load_table(self, name):
table = sqlalchemy.Table(name,
self.metadata,
autoload=True)
table = sqlalchemy.Table(name, self.metadata, autoload=True)
return table
def assertTableExists(self, table_name):
try:
self.load_table(table_name)
except sqlalchemy.exc.NoSuchTableError:
raise AssertionError('Table "%s" does not exist' % table_name)
def assertTableDoesNotExist(self, table_name):
"""Assert that a given table exists cannot be selected by name."""
# Switch to a different metadata otherwise you might still
@ -357,22 +348,6 @@ class MigrateBase(
else:
raise AssertionError('Table "%s" already exists' % table_name)
def calc_table_row_count(self, table_name):
"""Return the number of rows in the table."""
t = sqlalchemy.Table(table_name, self.metadata, autoload=True)
session = self.sessionmaker()
row_count = session.query(
sqlalchemy.func.count('*')).select_from(t).scalar()
return row_count
def assertTableCountsMatch(self, table1_name, table2_name):
table1_count = self.calc_table_row_count(table1_name)
table2_count = self.calc_table_row_count(table2_name)
if table1_count != table2_count:
raise AssertionError('Table counts do not match: {0} ({1}), {2} '
'({3})'.format(table1_name, table1_count,
table2_name, table2_count))
def assertTableColumns(self, table_name, expected_cols):
"""Assert that the table contains the expected set of columns."""
table = self.load_table(table_name)
@ -382,49 +357,6 @@ class MigrateBase(
self.assertCountEqual(expected_cols, actual_cols,
'%s table' % table_name)
def insert_dict(self, session, table_name, d, table=None):
"""Naively inserts key-value pairs into a table, given a dictionary."""
if table is None:
this_table = sqlalchemy.Table(table_name, self.metadata,
autoload=True)
else:
this_table = table
insert = this_table.insert().values(**d)
session.execute(insert)
def does_pk_exist(self, table, pk_column):
"""Check whether a column is primary key on a table."""
inspector = inspect(self.engine)
pk_columns = inspector.get_pk_constraint(table)['constrained_columns']
return pk_column in pk_columns
def does_fk_exist(self, table, fk_column):
inspector = inspect(self.engine)
for fk in inspector.get_foreign_keys(table):
if fk_column in fk['constrained_columns']:
return True
return False
def does_constraint_exist(self, table_name, constraint_name):
table = sqlalchemy.Table(table_name, self.metadata, autoload=True)
return constraint_name in [con.name for con in table.constraints]
def does_index_exist(self, table_name, index_name):
table = sqlalchemy.Table(table_name, self.metadata, autoload=True)
return index_name in [idx.name for idx in table.indexes]
def does_unique_constraint_exist(self, table_name, column_names):
inspector = inspect(self.engine)
constraints = inspector.get_unique_constraints(table_name)
for c in constraints:
if (len(c['column_names']) == 1 and
column_names in c['column_names']):
return True
if (len(c['column_names'])) > 1 and isinstance(column_names, list):
return set(c['column_names']) == set(column_names)
return False
class ExpandSchemaUpgradeTests(MigrateBase):