Switch migration tests to oslo.db DbTestCase

oslo.db provides a DbTestCase base class that configures the oslo.db
for in-memory sqlite testing, in addition to being able to test
against other live databases.

Any support that we may have had for live database testing is
removed. It will be added back in a later patch to use features of
oslo.db DbTestCase to implement OpenStack's common way to do
live database testing.

Change-Id: I5b691d0adc812d687b318f639544ecaa165b9cfc
This commit is contained in:
Brant Knudson 2016-03-17 14:27:11 -05:00
parent 1f675cfdd2
commit 92749e4861
8 changed files with 45 additions and 138 deletions

View File

@ -196,12 +196,26 @@ def _get_context():
return _CONTEXT
# Unit tests set this to True so that oslo.db's global engine is used.
# This allows oslo_db.test_base.DbTestCase to override the transaction manager
# with its test transaction manager.
_TESTING_USE_GLOBAL_CONTEXT_MANAGER = False
def session_for_read():
return _get_main_context_manager().reader.using(_get_context())
if _TESTING_USE_GLOBAL_CONTEXT_MANAGER:
reader = enginefacade.reader
else:
reader = _get_main_context_manager().reader
return reader.using(_get_context())
def session_for_write():
return _get_main_context_manager().writer.using(_get_context())
if _TESTING_USE_GLOBAL_CONTEXT_MANAGER:
writer = enginefacade.writer
else:
writer = _get_main_context_manager().writer
return writer.using(_get_context())
def truncated(f):

View File

@ -1,4 +0,0 @@
#Used for running the Migrate tests against a live DB2 Server
#See _sql_livetest.py
[database]
connection = ibm_db_sa://keystone:keystone@/staktest?charset=utf8

View File

@ -1,4 +0,0 @@
#Used for running the Migrate tests against a live MySQL Server
#See _sql_livetest.py
[database]
connection = mysql+pymysql://keystone:keystone@localhost/keystone_test?charset=utf8

View File

@ -1,4 +0,0 @@
#Used for running the Migrate tests against a live Postgresql Server
#See _sql_livetest.py
[database]
connection = postgresql://keystone:keystone@localhost/keystone_test?client_encoding=utf8

View File

@ -42,13 +42,13 @@ def run_once(f):
# NOTE(I159): Every execution all the options will be cleared. The method must
# be called at the every fixture initialization.
def initialize_sql_session():
def initialize_sql_session(connection_str=unit.IN_MEM_DB_CONN_STRING):
# Make sure the DB is located in the correct location, in this case set
# the default value, as this should be able to be overridden in some
# test cases.
db_options.set_defaults(
CONF,
connection=unit.IN_MEM_DB_CONN_STRING)
connection=connection_str)
@run_once

View File

@ -1,49 +0,0 @@
# Copyright 2013 Red Hat, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from keystone.tests import unit
from keystone.tests.unit import test_sql_upgrade
class PostgresqlMigrateTests(test_sql_upgrade.SqlUpgradeTests):
def setUp(self):
self.skip_if_env_not_set('ENABLE_LIVE_POSTGRES_TEST')
super(PostgresqlMigrateTests, self).setUp()
def config_files(self):
files = super(PostgresqlMigrateTests, self).config_files()
files.append(unit.dirs.tests_conf("backend_postgresql.conf"))
return files
class MysqlMigrateTests(test_sql_upgrade.SqlUpgradeTests):
def setUp(self):
self.skip_if_env_not_set('ENABLE_LIVE_MYSQL_TEST')
super(MysqlMigrateTests, self).setUp()
def config_files(self):
files = super(MysqlMigrateTests, self).config_files()
files.append(unit.dirs.tests_conf("backend_mysql.conf"))
return files
class Db2MigrateTests(test_sql_upgrade.SqlUpgradeTests):
def setUp(self):
self.skip_if_env_not_set('ENABLE_LIVE_DB2_TEST')
super(Db2MigrateTests, self).setUp()
def config_files(self):
files = super(Db2MigrateTests, self).config_files()
files.append(unit.dirs.tests_conf("backend_db2.conf"))
return files

View File

@ -38,10 +38,9 @@ import mock
from oslo_config import cfg
from oslo_db import exception as db_exception
from oslo_db.sqlalchemy import migration
from oslo_db.sqlalchemy import session as db_session
from oslo_db.sqlalchemy import test_base
from sqlalchemy.engine import reflection
import sqlalchemy.exc
from sqlalchemy import schema
from testtools import matchers
from keystone.common import sql
@ -178,7 +177,7 @@ class MigrationHelpersGetInitVersionTests(unit.TestCase):
self.assertEqual(initial_version, version)
class SqlMigrateBase(unit.BaseTestCase):
class SqlMigrateBase(test_base.DbTestCase):
# override this in subclasses. The default of zero covers tests such
# as extensions upgrades.
_initial_db_version = 0
@ -192,72 +191,27 @@ class SqlMigrateBase(unit.BaseTestCase):
def setUp(self):
super(SqlMigrateBase, self).setUp()
database.initialize_sql_session()
conn_str = CONF.database.connection
if (conn_str != unit.IN_MEM_DB_CONN_STRING and
conn_str.startswith('sqlite') and
conn_str[10:] == unit.DEFAULT_TEST_DB_FILE):
# Override the default with a DB that is specific to the migration
# tests only if the DB Connection string is the same as the global
# default. This is required so that no conflicts occur due to the
# global default DB already being under migrate control. This is
# only needed if the DB is not-in-memory
db_file = unit.dirs.tmp('keystone_migrate_test.db')
self.config_fixture.config(
group='database',
connection='sqlite:///%s' % db_file)
# create and share a single sqlalchemy engine for testing
with sql.session_for_write() as session:
self.engine = session.get_bind()
self.addCleanup(self.cleanup_instance('engine'))
self.Session = db_session.get_maker(self.engine, autocommit=False)
self.addCleanup(sqlalchemy.orm.session.Session.close_all)
# Set keystone's connection URL to be the test engine's url.
database.initialize_sql_session(self.engine.url)
# Override keystone's context manager to be oslo.db's global context
# manager.
sql.core._TESTING_USE_GLOBAL_CONTEXT_MANAGER = True
self.addCleanup(setattr,
sql.core, '_TESTING_USE_GLOBAL_CONTEXT_MANAGER', False)
self.addCleanup(sql.cleanup)
self.initialize_sql()
self.repo_path = migration_helpers.find_migrate_repo(
self.repo_package())
self.schema = versioning_api.ControlledSchema.create(
self.schema_ = versioning_api.ControlledSchema.create(
self.engine,
self.repo_path,
self._initial_db_version)
# auto-detect the highest available schema version in the migrate_repo
self.max_version = self.schema.repository.version().version
self.addCleanup(sql.cleanup)
# drop tables and FKs.
self.addCleanup(self._cleanupDB)
def _cleanupDB(self):
meta = sqlalchemy.MetaData()
meta.bind = self.engine
meta.reflect(self.engine)
with self.engine.begin() as conn:
inspector = reflection.Inspector.from_engine(self.engine)
metadata = schema.MetaData()
tbs = []
all_fks = []
for table_name in inspector.get_table_names():
fks = []
for fk in inspector.get_foreign_keys(table_name):
if not fk['name']:
continue
fks.append(
schema.ForeignKeyConstraint((), (), name=fk['name']))
table = schema.Table(table_name, metadata, *fks)
tbs.append(table)
all_fks.extend(fks)
for fkc in all_fks:
if self.engine.name != 'sqlite':
conn.execute(schema.DropConstraint(fkc))
for table in tbs:
conn.execute(schema.DropTable(table))
self.max_version = self.schema_.repository.version().version
def select_table(self, name):
table = sqlalchemy.Table(name,
@ -294,7 +248,7 @@ class SqlMigrateBase(unit.BaseTestCase):
table2 = self.select_table(table2_name)
except sqlalchemy.exc.NoSuchTableError:
raise AssertionError('Table "%s" does not exist' % table2_name)
session = self.Session()
session = self.sessionmaker()
table1_count = session.execute(table1.count()).scalar()
table2_count = session.execute(table2.count()).scalar()
if table1_count != table2_count:
@ -309,16 +263,16 @@ class SqlMigrateBase(unit.BaseTestCase):
current_schema=None):
repository = repository or self.repo_path
err = ''
version = versioning_api._migrate_version(self.schema,
version = versioning_api._migrate_version(self.schema_,
version,
not downgrade,
err)
if not current_schema:
current_schema = self.schema
current_schema = self.schema_
changeset = current_schema.changeset(version)
for ver, change in changeset:
self.schema.runchange(ver, change, changeset.step)
self.assertEqual(self.schema.version, version)
self.schema_.runchange(ver, change, changeset.step)
self.assertEqual(self.schema_.version, version)
def assertTableColumns(self, table_name, expected_cols):
"""Asserts that the table contains the expected set of columns."""
@ -363,7 +317,6 @@ class SqlUpgradeTests(SqlMigrateBase):
this_table = table
insert = this_table.insert().values(**d)
session.execute(insert)
session.commit()
def test_kilo_squash(self):
self.upgrade(67)
@ -414,7 +367,7 @@ class SqlUpgradeTests(SqlMigrateBase):
self.assertFalse(self.does_pk_exist(ASSIGNMENT_TABLE_NAME,
INHERITED_COLUMN_NAME))
session = self.Session()
session = self.sessionmaker()
role = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
@ -448,7 +401,7 @@ class SqlUpgradeTests(SqlMigrateBase):
self.upgrade(73)
session = self.Session()
session = self.sessionmaker()
self.metadata.clear()
# Check that the 'inherited' column is now part of the PK
@ -708,7 +661,7 @@ class SqlUpgradeTests(SqlMigrateBase):
NULL_DOMAIN_ID = '<<null>>'
self.upgrade(87)
session = self.Session()
session = self.sessionmaker()
role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
# Add a role before we upgrade, so we can check that its new domain_id
# attribute is handled correctly
@ -719,7 +672,7 @@ class SqlUpgradeTests(SqlMigrateBase):
self.upgrade(88)
session = self.Session()
session = self.sessionmaker()
self.metadata.clear()
self.assertTableColumns('role', ['id', 'name', 'domain_id', 'extra'])
# Check the domain_id has been added to the uniqueness constraint
@ -741,7 +694,7 @@ class SqlUpgradeTests(SqlMigrateBase):
def test_add_root_of_all_domains(self):
NULL_DOMAIN_ID = '<<keystone.domain.root>>'
self.upgrade(89)
session = self.Session()
session = self.sessionmaker()
domain_table = sqlalchemy.Table(
'domain', self.metadata, autoload=True)
@ -867,7 +820,7 @@ class SqlUpgradeTests(SqlMigrateBase):
# pop extra attribute which doesn't recognized by SQL expression
# layer.
user_ref.pop('email')
session = self.Session()
session = self.sessionmaker()
self.insert_dict(session, USER_TABLE_NAME, user_ref)
self.metadata.clear()
self.upgrade(91)
@ -889,7 +842,7 @@ class SqlUpgradeTests(SqlMigrateBase):
user1_ref.pop('email')
user2_ref = unit.new_user_ref(uuid.uuid4().hex)
user2_ref.pop('email')
session = self.Session()
session = self.sessionmaker()
self.insert_dict(session, USER_TABLE_NAME, user1_ref)
self.insert_dict(session, USER_TABLE_NAME, user2_ref)
user_id = user1_ref.pop('id')
@ -1014,7 +967,7 @@ class SqlUpgradeTests(SqlMigrateBase):
NULL_DOMAIN_ID = '<<keystone.domain.root>>'
self.upgrade(92)
session = self.Session()
session = self.sessionmaker()
_populate_domain_and_project_tables(session)

View File

@ -7,6 +7,7 @@ pep257==0.7.0 # MIT License
flake8-docstrings==0.2.1.post1 # MIT
bashate>=0.2 # Apache-2.0
os-testr>=0.4.1 # Apache-2.0
oslo.db[fixtures]>=4.1.0 # Apache-2.0
# computes code coverage percentages
coverage>=3.6 # Apache-2.0