Add migration tests with data

This commit adds a test class based on the glance test_migrations code
and using some of the code from the WIP oslo.db alembic migration test
mixin.[1] This framework will run the migrations against a real db and
using _pre_upgrade*, _post_upgrade*, and _check* methods data can be
injected and validated during migrations.


Change-Id: Idb8f203e9e0c721cc3459af2aa75d3c0b20b48fa
This commit is contained in:
Matthew Treinish 2014-09-29 20:37:22 -04:00
parent 1394000b39
commit 51a06bb7e3
5 changed files with 394 additions and 5 deletions

View File

@ -5,7 +5,6 @@ Short Term
* Add a migration that adds an ordering to the runs table
* Add more unit tests
* Migration tests
* DB API unit tests
* Write subunit module
* Flesh out query side of DB API to make it useful for building additional

View File

@ -36,7 +36,7 @@ def upgrade():
def downgrade():
op.drop_constraint('uq_test_runs', 'test_runs')
op.drop_index('ix_test_id', 'tests')
op.drop_index('ix_test_run_test_id', 'test_runs')
op.drop_index('ix_test_run_run_id', 'test_runs')

View File

@ -0,0 +1,17 @@
# Set up any number of databases to test concurrently.
# The "name" used in the test is the config variable key.
# Migration DB details are listed separately as they can't be connected to
# concurrently. These databases can't be the same as above

View File

@ -0,0 +1,371 @@
# Copyright 2010-2011 OpenStack Foundation
# Copyright 2013 IBM Corp.
# Copyright (c) 2014 Hewlett-Packard Development Company, L.P.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ConfigParser
import os
import subprocess
from alembic import config
from alembic import script
from oslo.config import cfg
from oslo.db import options
import six
from six.moves.urllib import parse
import sqlalchemy
from subunit2sql import exceptions as exc
from subunit2sql.migrations import cli
from subunit2sql.tests import base
CONF.register_cli_opts(options.database_opts, group='database')
def _get_connect_string(backend,
"""Generate a db uri for testing locally.
Try to get a connection with a very specific set of values, if we get
these then we'll run the tests, otherwise they are skipped
if backend == "mysql":
backend = "mysql+mysqldb"
elif backend == "postgres":
backend = "postgresql+psycopg2"
return ("%(backend)s://%(user)s:%(passwd)s@localhost/%(database)s"
% {'backend': backend, 'user': user, 'passwd': passwd,
'database': database})
def _is_backend_avail(backend,
if backend == "mysql":
connect_uri = _get_connect_string("mysql", user=user,
passwd=passwd, database=database)
elif backend == "postgres":
connect_uri = _get_connect_string("postgres", user=user,
passwd=passwd, database=database)
engine = sqlalchemy.create_engine(connect_uri)
connection = engine.connect()
except Exception:
# intentionally catch all to handle exceptions even if we don't
# have any backend code loaded.
return False
return True
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data.
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
return sqlalchemy.Table(name, metadata, autoload=True)
class TestWalkMigrations(base.TestCase):
DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
script_location = os.path.join(os.path.dirname(os.path.dirname(
os.path.dirname(__file__))), 'migrations')
def setUp(self):
super(TestWalkMigrations, self).setUp()
self.snake_walk = False
self.test_databases = {}
if os.path.exists(self.CONFIG_FILE_PATH):
cp = ConfigParser.RawConfigParser()
defaults = cp.options('unit_tests')
for key in defaults:
self.test_databases[key] = cp.get('unit_tests', key)
self.snake_walk = cp.getboolean('walk_style', 'snake_walk')
except ConfigParser.ParsingError as e:"Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:"Failed to find test_migrations.conf config "
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# We start each test case with a completely blank slate.
def assertColumnExists(self, engine, table, column):
table = get_table(engine, table)
self.assertIn(column, table.c)
def _reset_databases(self):
def execute_cmd(cmd=None):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True)
output = proc.communicate()[0]
self.assertEqual(0, proc.returncode, 'Command failed with '
'output:\n%s' % output)
for key, engine in self.engines.items():
conn_string = self.test_databases[key]
conn_pieces = parse.urlparse(conn_string)
if conn_string.startswith('sqlite'):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path[1:]
if os.path.exists(db_path):
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via me.
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
if auth_pieces[1].strip():
password = "-p\"%s\"" % auth_pieces[1]
sql = ("drop database if exists %(database)s; create "
"database %(database)s;") % {'database': database}
cmd = ("mysql -u \"%(user)s\" %(password)s -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
elif conn_string.startswith('postgresql'):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
# note(boris-42): This file is used for authentication
# without password prompt.
createpgpass = ("echo '*:*:*:%(user)s:%(password)s' > "
"~/.pgpass && chmod 0600 ~/.pgpass" %
{'user': user, 'password': password})
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("drop database if exists %(database)s;")
sql = sql % {'database': database}
droptable = sqlcmd % {'user': user, 'host': host,
'sql': sql}
sql = ("create database %(database)s;")
sql = sql % {'database': database}
createtable = sqlcmd % {'user': user, 'host': host,
'sql': sql}
def _get_alembic_config(self, uri):
db_config = config.Config(os.path.join(self.script_location,
db_config.set_main_option('script_location', 'subunit2sql:migrations')
db_config.subunit2sql_config = CONF
self.script_dir = script.ScriptDirectory.from_config(db_config)
return db_config
def _revisions(self, downgrade=False):
"""Provides revisions and its parent revisions.
:param downgrade: whether to include downgrade behavior or not.
:type downgrade: Bool
:return: List of tuples. Every tuple contains revision and its parent
revisions = list(self.script_dir.walk_revisions("base", "head"))
if not downgrade:
revisions = list(reversed(revisions))
if not revisions:
raise exc.DbMigrationError('There is no suitable migrations.')
for rev in revisions:
if downgrade:
# Destination, current
yield rev.down_revision, rev.revision
# Destination, current
yield rev.revision, rev.down_revision
def _walk_versions(self, config, engine, downgrade=True, snake_walk=False):
"""Test migrations ability to upgrade and downgrade.
:param downgrade: whether to include downgrade behavior or not.
:type downgrade: Bool
:snake_walk: enable mode when at every upgrade revision will be
downgraded and upgraded in previous state at upgrade and backward at
:type snake_walk: Bool
revisions = self._revisions()
for dest, curr in revisions:
self._migrate_up(config, engine, dest, curr, with_data=True)
if snake_walk and dest != 'None':
# NOTE(I159): Pass reversed arguments into `_migrate_down`
# method because we have been upgraded to a destination
# revision and now we going to downgrade back.
self._migrate_down(config, curr, dest, with_data=True)
self._migrate_up(config, dest, curr, with_data=True)
if downgrade:
revisions = self._revisions(downgrade)
for dest, curr in revisions:
self._migrate_down(config, engine, dest, curr, with_data=True)
if snake_walk:
self._migrate_up(config, engine, curr, dest,
self._migrate_down(config, engine, dest, curr,
def _migrate_down(self, config, engine, dest, curr, with_data=False):
if dest:
cli.do_alembic_command(config, 'downgrade', dest)
meta = sqlalchemy.MetaData(bind=engine)
if with_data:
post_downgrade = getattr(
self, "_post_downgrade_%s" % curr, None)
if post_downgrade:
def _migrate_up(self, config, engine, dest, curr, with_data=False):
if with_data:
data = None
pre_upgrade = getattr(
self, "_pre_upgrade_%s" % dest, None)
if pre_upgrade:
data = pre_upgrade(engine)
cli.do_alembic_command(config, 'upgrade', dest)
if with_data:
check = getattr(self, "_check_%s" % dest, None)
if check and data:
check(engine, data)
def test_walk_versions(self):
"""Test walk versions.
Walks all version scripts for each tested database, ensuring
that there are no errors in the version scripts for each engine
for key, engine in self.engines.items():
config = self._get_alembic_config(self.test_databases[key])
self._walk_versions(config, engine, self.snake_walk)
def test_mysql_connect_fail(self):
"""Test graceful mysql connection failure.
Test that we can trigger a mysql connection failure and we fail
gracefully to ensure we don't break people without mysql
if _is_backend_avail('mysql', user="openstack_cifail"):"Shouldn't have connected")
def test_mysql_opportunistically(self):
# Test that table creation on mysql only builds InnoDB tables
if not _is_backend_avail('mysql'):
self.skipTest("mysql not available")
# add this to the global lists to make reset work with it, it's removed
# automatically in tearDown so no need to clean it up here.
connect_string = _get_connect_string("mysql")
engine = sqlalchemy.create_engine(connect_string)
config = self._get_alembic_config(connect_string)
self.engines["mysqlcitest"] = engine
self.test_databases["mysqlcitest"] = connect_string
# build a fully populated mysql database with all the tables
self._walk_versions(config, engine, False, False)
connection = engine.connect()
# sanity check
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='openstack_citest'")
self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
noninnodb = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='openstack_citest' "
"and ENGINE!='InnoDB' "
"and TABLE_NAME!='alembic_version'")
count = noninnodb.scalar()
self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
def test_postgresql_connect_fail(self):
"""Test graceful postgresql connection failure.
Test that we can trigger a postgres connection failure and we fail
gracefully to ensure we don't break people without postgres
if _is_backend_avail('postgresql', user="openstack_cifail"):"Shouldn't have connected")
def test_postgresql_opportunistically(self):
# Test postgresql database migration walk
if not _is_backend_avail('postgres'):
self.skipTest("postgresql not available")
# add this to the global lists to make reset work with it, it's removed
# automatically in tearDown so no need to clean it up here.
connect_string = _get_connect_string("postgres")
engine = sqlalchemy.create_engine(connect_string)
config = self._get_alembic_config(connect_string)
self.engines["postgresqlcitest"] = engine
self.test_databases["postgresqlcitest"] = connect_string
# build a fully populated postgresql database with all the tables
self._walk_versions(config, engine, False, False)

View File

@ -7,3 +7,5 @@ sphinx>=1.1.2,<1.2