Refactor db migration tests

Common oslo.db code contains a base test classes for migration testing,
so we can use it in Sahara.

Change-Id: I1bf2831c0534498be3f9365d307607c46676658f
This commit is contained in:
Victor Sergeyev 2014-12-29 18:12:01 +02:00
parent 1366594bd8
commit d8d132a421
3 changed files with 36 additions and 515 deletions

View File

@ -1,26 +0,0 @@
[unit_tests]
# Set up any number of databases to test concurrently.
# The "name" used in the test is the config variable key.
# A few tests rely on one sqlite database with 'sqlite' as the key.
sqlite=sqlite://
#sqlitefile=sqlite:///test_migrations_utils.db
#mysql=mysql+mysqldb://user:pass@localhost/test_migrations_utils
#postgresql=postgresql+psycopg2://user:pass@localhost/test_migrations_utils
[migration_dbs]
# Migration DB details are listed separately as they can't be connected to
# concurrently. These databases can't be the same as above
# Note, sqlite:// is in-memory and unique each time it is spawned.
# However file sqlite's are not unique.
#sqlite=sqlite://
#sqlitefile=sqlite:///test_migrations.db
#mysql=mysql+mysqldb://user:pass@localhost/test_migrations
#postgresql=postgresql+psycopg2://user:pass@localhost/test_migrations
[walk_style]
snake_walk=yes
downgrade=yes

View File

@ -14,15 +14,7 @@
# under the License. # under the License.
""" """
Tests for database migrations. This test case reads the configuration Tests for database migrations.
file test_migrations.conf for database connection settings
to use in the tests. For each connection found in the config file,
the test case runs a series of test cases to ensure that migrations work
properly.
There are also "opportunistic" tests for both mysql and postgresql in here,
which allows testing against mysql and pg) in a properly configured unit
test environment.
For the opportunistic testing you need to set up a db named 'openstack_citest' For the opportunistic testing you need to set up a db named 'openstack_citest'
with user 'openstack_citest' and password 'openstack_citest' on localhost. with user 'openstack_citest' and password 'openstack_citest' on localhost.
@ -39,27 +31,18 @@ postgres=# create database openstack_citest with owner openstack_citest;
import os import os
from oslo.config import cfg from oslo.db.sqlalchemy import test_base
from oslo.db.sqlalchemy import utils as db_utils from oslo.db.sqlalchemy import utils as db_utils
from sahara.tests.unit.db.migration import test_migrations_base as base from sahara.tests.unit.db.migration import test_migrations_base as base
import sqlalchemy
CONF = cfg.CONF class SaharaMigrationsCheckers(object):
TIMEOUT_SCALING_FACTOR = 2
class TestMigrations(base.BaseWalkMigrationTestCase): snake_walk = True
"""Test sqlalchemy-migrate migrations.""" downgrade = True
USER = "openstack_citest"
PASSWD = "openstack_citest"
DATABASE = "openstack_citest"
def __init__(self, *args, **kwargs):
super(TestMigrations, self).__init__(*args, **kwargs)
def setUp(self):
super(TestMigrations, self).setUp()
def assertColumnExists(self, engine, table, column): def assertColumnExists(self, engine, table, column):
t = db_utils.get_table(engine, table) t = db_utils.get_table(engine, table)
@ -94,6 +77,9 @@ class TestMigrations(base.BaseWalkMigrationTestCase):
self.assertEqual(sorted(members), sorted(index_columns)) self.assertEqual(sorted(members), sorted(index_columns))
def test_walk_versions(self):
self.walk_versions(self.engine, self.snake_walk, self.downgrade)
def _pre_upgrade_001(self, engine): def _pre_upgrade_001(self, engine):
# Anything returned from this method will be # Anything returned from this method will be
# passed to corresponding _check_xxx method as 'data'. # passed to corresponding _check_xxx method as 'data'.
@ -451,25 +437,16 @@ class TestMigrations(base.BaseWalkMigrationTestCase):
self.assertColumnsExists(engine, 'cluster_events', events_columns) self.assertColumnsExists(engine, 'cluster_events', events_columns)
class TestMigrationsMySQL(TestMigrations, base.MySQLTestsMixIn, class TestMigrationsMySQL(SaharaMigrationsCheckers,
base.TestModelsMigrationsSync): base.BaseWalkMigrationTestCase,
def get_engine(self): base.TestModelsMigrationsSync,
conn_string = ("mysql+mysqldb://%s:%s@localhost/%s" test_base.MySQLOpportunisticTestCase,
% (self.USER, self.PASSWD, self.DATABASE)) ):
engine = sqlalchemy.create_engine(conn_string) pass
return engine
def have_database(self):
return base.have_mysql(self.USER, self.PASSWD, self.DATABASE)
class TestMigrationsPostgresql(TestMigrations, base.PostgresqlTestsMixIn, class TestMigrationsPostgresql(SaharaMigrationsCheckers,
base.TestModelsMigrationsSync): base.BaseWalkMigrationTestCase,
def get_engine(self): base.TestModelsMigrationsSync,
conn_string = ("postgresql+psycopg2://%s:%s@localhost/%s" test_base.PostgreSQLOpportunisticTestCase):
% (self.USER, self.PASSWD, self.DATABASE)) pass
engine = sqlalchemy.create_engine(conn_string)
return engine
def have_database(self):
return base.have_postgresql(self.USER, self.PASSWD, self.DATABASE)

View File

@ -22,7 +22,6 @@
# There is an ongoing work to extact similar code to oslo incubator. Once it is # There is an ongoing work to extact similar code to oslo incubator. Once it is
# extracted we'll be able to remove this file and use oslo. # extracted we'll be able to remove this file and use oslo.
import ConfigParser
import io import io
import os import os
@ -33,12 +32,6 @@ from alembic import migration
from alembic import script as alembic_script from alembic import script as alembic_script
from oslo.config import cfg from oslo.config import cfg
from oslo.db.sqlalchemy import test_migrations as t_m from oslo.db.sqlalchemy import test_migrations as t_m
from oslo_concurrency import lockutils
from oslo_concurrency import processutils
import six.moves.urllib.parse as urlparse
import sqlalchemy
import sqlalchemy.exc
import testtools
import sahara.db.migration import sahara.db.migration
from sahara.db.sqlalchemy import api as sa from sahara.db.sqlalchemy import api as sa
@ -49,371 +42,15 @@ from sahara.openstack.common import log as logging
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
CONF = cfg.CONF CONF = cfg.CONF
synchronized = lockutils.synchronized_with_prefix('sahara-')
class BaseWalkMigrationTestCase(object):
def _get_connect_string(backend, user, passwd, database): ALEMBIC_CONFIG = alembic_config.Config(
"""Tries to get a connection with a very specific set of values. os.path.join(os.path.dirname(sahara.db.migration.__file__),
'alembic.ini')
)
If we get these then we'll run the tests, otherwise they are skipped ALEMBIC_CONFIG.sahara_config = CONF
"""
if backend == "postgres":
backend = "postgresql+psycopg2"
elif backend == "mysql":
backend = "mysql+mysqldb"
else:
raise Exception("Unrecognized backend: '%s'" % backend)
return ("%s://%s:%s@localhost/%s" % (backend, user, passwd, database))
def _is_backend_avail(backend, user, passwd, database):
try:
connect_uri = _get_connect_string(backend, user, passwd, database)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
else:
connection.close()
engine.dispose()
return True
def have_mysql(user, passwd, database):
present = os.environ.get('SAHARA_MYSQL_PRESENT')
if present is None:
return _is_backend_avail('mysql', user, passwd, database)
return present.lower() in ('', 'true')
def have_postgresql(user, passwd, database):
present = os.environ.get('SAHARA_TEST_POSTGRESQL_PRESENT')
if present is None:
return _is_backend_avail('postgres', user, passwd, database)
return present.lower() in ('', 'true')
def get_mysql_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
if auth_pieces[1].strip():
password = "-p\"%s\"" % auth_pieces[1]
return (user, password, database, host)
def get_pgsql_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)
class CommonTestsMixIn(object):
"""TestSaharaMigrations and TestBaremetalMigrations use these tests.
BaseMigrationTestCase is effectively an abstract class, meant to be derived
from and not directly tested against; that's why these `test_` methods need
to be on a Mixin, so that they won't be picked up as valid tests for
BaseMigrationTestCase.
"""
def test_walk_versions(self):
for key, engine in self.engines.items():
# We start each walk with a completely blank slate.
self._reset_database(key)
self._walk_versions(engine, self.snake_walk, self.downgrade)
class MySQLTestsMixIn(CommonTestsMixIn):
def test_mysql_opportunistically(self):
self._test_mysql_opportunistically()
def test_mysql_connect_fail(self):
"""Test that we can trigger a mysql connection failure.
We fail gracefully here to ensure we don't break people without mysql
"""
if _is_backend_avail('mysql', "openstack_cifail", self.PASSWD,
self.DATABASE):
self.fail("Shouldn't have connected")
class PostgresqlTestsMixIn(CommonTestsMixIn):
def test_postgresql_opportunistically(self):
self._test_postgresql_opportunistically()
def test_postgresql_connect_fail(self):
"""Test that we can trigger a postgres connection failure.
We fail gracefully to ensure we don't break people without postgres
"""
if _is_backend_avail('postgres', "openstack_cifail", self.PASSWD,
self.DATABASE):
self.fail("Shouldn't have connected")
class BaseMigrationTestCase(testtools.TestCase):
"""Base class for testing migrations and migration utils.
This sets up and configures the databases to run tests against.
"""
# NOTE(jhesketh): It is expected that tests clean up after themselves.
# This is necessary for concurrency to allow multiple tests to work on
# one database.
# The full migration walk tests however do call the old _reset_databases()
# to throw away whatever was there so they need to operate on their own
# database that we know isn't accessed concurrently.
# Hence, BaseWalkMigrationTestCase overwrites the engine list.
USER = None
PASSWD = None
DATABASE = None
TIMEOUT_SCALING_FACTOR = 2
def __init__(self, *args, **kwargs):
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIG_FILE = os.path.join(
os.path.dirname(__file__),
'test_migrations.conf')
# Test machines can set the SAHARA_TEST_MIGRATIONS_CONF variable
# to override the location of the config file for migration testing
self.CONFIG_FILE_PATH = os.environ.get(
'SAHARA_TEST_MIGRATIONS_CONF',
self.DEFAULT_CONFIG_FILE)
self.ALEMBIC_CONFIG = alembic_config.Config(
os.path.join(os.path.dirname(sahara.db.migration.__file__),
'alembic.ini')
)
self.ALEMBIC_CONFIG.sahara_config = CONF
self.snake_walk = False
self.downgrade = False
self.test_databases = {}
self.migration = None
self.migration_api = None
def setUp(self):
super(BaseMigrationTestCase, self).setUp()
self._load_config()
def _load_config(self):
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = ConfigParser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
config = cp.options('unit_tests')
for key in config:
self.test_databases[key] = cp.get('unit_tests', key)
self.snake_walk = cp.getboolean('walk_style', 'snake_walk')
self.downgrade = cp.getboolean('walk_style', 'downgrade')
except ConfigParser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# NOTE(jhesketh): We only need to make sure the databases are created
# not necessarily clean of tables.
self._create_databases()
def execute_cmd(self, cmd=None):
out, err = processutils.trycmd(cmd, shell=True, discard_warnings=True)
output = out or err
LOG.debug(output)
self.assertEqual('', err,
"Failed to run: %s\n%s" % (cmd, output))
@synchronized('pgadmin', external=True, lock_path='/tmp')
def _reset_pg(self, conn_pieces):
(user, password, database, host) = (
get_pgsql_connection_info(conn_pieces))
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ('psql -w -U %(user)s -h %(host)s -c '
'"%(sql)s" -d template1')
sqldict = {'user': user, 'host': host}
# note(apavlov): We need to kill all connections before dropping
# database. Otherwise it does not happen.
sqldict['sql'] = "select version();"
getversion = sqlcmd % sqldict
out, err = processutils.trycmd(getversion, shell=True)
version = out.split()
sqldict['sql'] = ("SELECT pg_terminate_backend(%s) FROM "
"pg_stat_activity WHERE datname = "
"'%s';")
if float(version[version.index('PostgreSQL')+1][:3]) < 9.2:
sqldict['sql'] = sqldict['sql'] % ('procpid', database)
else:
sqldict['sql'] = sqldict['sql'] % ('pid', database)
killconnections = sqlcmd % sqldict
self.execute_cmd(killconnections)
sqldict['sql'] = "drop database if exists %s;" % database
droptable = sqlcmd % sqldict
self.execute_cmd(droptable)
sqldict['sql'] = ("create database %s;") % database
createtable = sqlcmd % sqldict
self.execute_cmd(createtable)
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
@synchronized('mysql', external=True, lock_path='/tmp')
def _reset_mysql(self, conn_pieces):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
(user, password, database, host) = (
get_mysql_connection_info(conn_pieces))
sql = ("drop database if exists %(database)s; "
"create database %(database)s;" % {'database': database})
cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s -e \"%(sql)s\""
% {'user': user, 'password': password,
'host': host, 'sql': sql})
self.execute_cmd(cmd)
@synchronized('sqlite', external=True, lock_path='/tmp')
def _reset_sqlite(self, conn_pieces):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path.strip('/')
if os.path.exists(db_path):
os.unlink(db_path)
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
def _create_databases(self):
"""Create all configured databases as needed."""
for key, engine in self.engines.items():
self._create_database(key)
def _create_database(self, key):
"""Create database if it doesn't exist."""
conn_string = self.test_databases[key]
conn_pieces = urlparse.urlparse(conn_string)
if conn_string.startswith('mysql'):
(user, password, database, host) = (
get_mysql_connection_info(conn_pieces))
sql = "create database if not exists %s;" % database
cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
"-e \"%(sql)s\"" % {'user': user, 'password': password,
'host': host, 'sql': sql})
self.execute_cmd(cmd)
elif conn_string.startswith('postgresql'):
(user, password, database, host) = (
get_pgsql_connection_info(conn_pieces))
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("create database if not exists %s;") % database
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
# 0 means databases is created
# 256 means it already exists (which is fine)
# otherwise raise an error
out, err = processutils.trycmd(createtable, shell=True,
check_exit_code=[0, 256],
discard_warnings=True)
output = out or err
if err != '':
self.fail("Failed to run: %s\n%s" % (createtable, output))
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
def _reset_databases(self):
"""Reset all configured databases."""
for key, engine in self.engines.items():
self._reset_database(key)
def _reset_database(self, key):
"""Reset specific database."""
engine = self.engines[key]
conn_string = self.test_databases[key]
conn_pieces = urlparse.urlparse(conn_string)
engine.dispose()
if conn_string.startswith('sqlite'):
self._reset_sqlite(conn_pieces)
elif conn_string.startswith('mysql'):
self._reset_mysql(conn_pieces)
elif conn_string.startswith('postgresql'):
self._reset_pg(conn_pieces)
class BaseWalkMigrationTestCase(BaseMigrationTestCase):
"""Loads in an alternative set of databases for testing against.
This is necessary as the default databases can run tests
concurrently without interfering with itself. It is expected that
databases listed under [migraiton_dbs] in the configuration are only being
accessed by one test at a time. Currently only test_walk_versions accesses
the databases (and is the only method that calls _reset_database() which
is clearly problematic for concurrency).
"""
def _load_config(self):
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
LOG.debug('config_path is %s' % self.CONFIG_FILE_PATH)
if os.path.exists(self.CONFIG_FILE_PATH):
cp = ConfigParser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
config = cp.options('migration_dbs')
for key in config:
self.test_databases[key] = cp.get('migration_dbs', key)
self.snake_walk = cp.getboolean('walk_style', 'snake_walk')
self.downgrade = cp.getboolean('walk_style', 'downgrade')
except ConfigParser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
self._create_databases()
def _configure(self, engine): def _configure(self, engine):
"""For each type of repository we should do some of configure steps. """For each type of repository we should do some of configure steps.
@ -426,57 +63,6 @@ class BaseWalkMigrationTestCase(BaseMigrationTestCase):
CONF.set_override('connection', str(engine.url), group='database') CONF.set_override('connection', str(engine.url), group='database')
sa.cleanup() sa.cleanup()
def _test_mysql_opportunistically(self):
# Test that table creation on mysql only builds InnoDB tables
if not have_mysql(self.USER, self.PASSWD, self.DATABASE):
self.skipTest("mysql not available")
# add this to the global lists to make reset work with it, it's removed
# automatically in tearDown so no need to clean it up here.
connect_string = _get_connect_string(
"mysql", self.USER, self.PASSWD, self.DATABASE)
(user, password, database, host) = (
get_mysql_connection_info(urlparse.urlparse(connect_string)))
engine = sqlalchemy.create_engine(connect_string)
self.engines[database] = engine
self.test_databases[database] = connect_string
# build a fully populated mysql database with all the tables
self._reset_database(database)
self._walk_versions(engine, self.snake_walk, self.downgrade)
connection = engine.connect()
# sanity check
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='%(database)s'" %
{'database': database})
self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
connection.close()
del(self.engines[database])
del(self.test_databases[database])
def _test_postgresql_opportunistically(self):
# Test postgresql database migration walk
if not have_postgresql(self.USER, self.PASSWD, self.DATABASE):
self.skipTest("postgresql not available")
# add this to the global lists to make reset work with it, it's removed
# automatically in tearDown so no need to clean it up here.
connect_string = _get_connect_string(
"postgres", self.USER, self.PASSWD, self.DATABASE)
engine = sqlalchemy.create_engine(connect_string)
(user, password, database, host) = (
get_mysql_connection_info(urlparse.urlparse(connect_string)))
self.engines[database] = engine
self.test_databases[database] = connect_string
# build a fully populated postgresql database with all the tables
self._reset_database(database)
self._walk_versions(engine, self.snake_walk, self.downgrade)
del(self.engines[database])
del(self.test_databases[database])
def _alembic_command(self, alembic_command, engine, *args, **kwargs): def _alembic_command(self, alembic_command, engine, *args, **kwargs):
"""Most of alembic command return data into output. """Most of alembic command return data into output.
@ -508,8 +94,7 @@ class BaseWalkMigrationTestCase(BaseMigrationTestCase):
versions.reverse() versions.reverse()
return versions return versions
def _walk_versions(self, engine=None, snake_walk=False, def walk_versions(self, engine=None, snake_walk=False, downgrade=True):
downgrade=True):
# Determine latest version script from the repo, then # Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data # upgrade from 1 through to the latest, with no data
# in the databases. This just checks that the schema itself # in the databases. This just checks that the schema itself
@ -617,33 +202,18 @@ class TestModelsMigrationsSync(t_m.ModelsMigrationsSync):
scripts is equal to the one produced from models definitions. scripts is equal to the one produced from models definitions.
""" """
def __init__(self): ALEMBIC_CONFIG = alembic_config.Config(
self.ALEMBIC_CONFIG = alembic_config.Config( os.path.join(os.path.dirname(sahara.db.migration.__file__),
os.path.join(os.path.dirname(sahara.db.migration.__file__), 'alembic.ini')
'alembic.ini') )
) ALEMBIC_CONFIG.sahara_config = CONF
self.ALEMBIC_CONFIG.sahara_config = CONF
def get_engine(self):
return self.engine
def db_sync(self, engine): def db_sync(self, engine):
CONF.set_override('connection', str(engine.url), group='database') CONF.set_override('connection', str(engine.url), group='database')
alembic.command.upgrade(self.ALEMBIC_CONFIG, 'head') alembic.command.upgrade(self.ALEMBIC_CONFIG, 'head')
def get_metadata(self): def get_metadata(self):
metadata = model_base.SaharaBase.metadata return model_base.SaharaBase.metadata
return metadata
def have_database(self):
'''return True if the database is available
This function should be overridden by subclasses to allow skipping of
tests based on the presence of the database server. Since this
implementation is a base class, there is no associated database and
this function will always return False.
'''
return False
def test_models_sync(self):
'''check for database and run test if available.'''
if not self.have_database():
self.skipTest('database not available')
super(TestModelsMigrationsSync, self).test_models_sync()