Browse Source

Use opportunistic approach for migration testing

Refactored migration tests due to use OpportunisticTestCase, removed
unused code and ``test_migrations.conf`` file.

This change allows tests use database ``openstack_citest`` only
for connection to the database backend. The main feature of this
approach is - for each migration test will be created new database
with random name. This will avoid migration tests of race conditions
and reduce tests intersection.

``test_migrations.conf`` file was removed, because we create test
database for migration test, so we no longer need test database credentials.

Closes-Bug: #1327397
Closes-Bug: #1328997

Change-Id: I95ad140ba5f483cd3dc36e2b78f140826d57624f
changes/53/107053/3
Victor Sergeyev 8 years ago
parent
commit
ffd83dc3af
  1. 6
      ironic/tests/db/sqlalchemy/test_migrations.conf
  2. 256
      ironic/tests/db/sqlalchemy/test_migrations.py
  3. 1
      test-requirements.txt

6
ironic/tests/db/sqlalchemy/test_migrations.conf

@ -1,6 +0,0 @@
[DEFAULT]
# Set up any number of migration data stores you want, one
# The "name" used in the test is the config variable key.
# sqlite migrations not supported by alembic
#mysql=mysql://root:@localhost/test_migrations
#postgresql=postgresql://user:pass@localhost/test_migrations

256
ironic/tests/db/sqlalchemy/test_migrations.py

@ -38,22 +38,17 @@ postgres=# create database openstack_citest with owner openstack_citest;
"""
import ConfigParser
import contextlib
import fixtures
import os
import subprocess
from alembic import script
import mock
from oslo.db.sqlalchemy import test_base
from oslo.db.sqlalchemy import utils as db_utils
import six.moves.urllib.parse as urlparse
import sqlalchemy
import sqlalchemy.exc
from ironic.common import utils
from ironic.db.sqlalchemy import migration
from ironic.openstack.common import lockutils
from ironic.openstack.common import log as logging
from ironic.tests import base
@ -93,34 +88,6 @@ def _is_backend_avail(backend, user, passwd, database):
return True
def _have_mysql(user, passwd, database):
present = os.environ.get('TEST_MYSQL_PRESENT')
if present is None:
return _is_backend_avail('mysql', user, passwd, database)
return present.lower() in ('', 'true')
def _have_postgresql(user, passwd, database):
present = os.environ.get('TEST_POSTGRESQL_PRESENT')
if present is None:
return _is_backend_avail('postgres', user, passwd, database)
return present.lower() in ('', 'true')
def get_db_connection_info(conn_pieces):
database = conn_pieces.path.strip('/')
loc_pieces = conn_pieces.netloc.split('@')
host = loc_pieces[1]
auth_pieces = loc_pieces[0].split(':')
user = auth_pieces[0]
password = ""
if len(auth_pieces) > 1:
password = auth_pieces[1].strip()
return (user, password, database, host)
@contextlib.contextmanager
def patch_with_engine(engine):
with mock.patch(('ironic.db'
@ -129,117 +96,6 @@ def patch_with_engine(engine):
yield
class BaseMigrationTestCase(base.TestCase):
"""Base class fort testing of migration utils."""
def __init__(self, *args, **kwargs):
super(BaseMigrationTestCase, self).__init__(*args, **kwargs)
self.DEFAULT_CONFIG_FILE = os.path.join(os.path.dirname(__file__),
'test_migrations.conf')
# Test machines can set the TEST_MIGRATIONS_CONF variable
# to override the location of the config file for migration testing
self.CONFIG_FILE_PATH = os.environ.get('TEST_MIGRATIONS_CONF',
self.DEFAULT_CONFIG_FILE)
self.test_databases = {}
self.migration_api = None
def setUp(self):
super(BaseMigrationTestCase, self).setUp()
# Load test databases from the config file. Only do this
# once. No need to re-run this on each test...
if os.path.exists(self.CONFIG_FILE_PATH):
cp = ConfigParser.RawConfigParser()
try:
cp.read(self.CONFIG_FILE_PATH)
defaults = cp.defaults()
for key, value in defaults.items():
self.test_databases[key] = value
except ConfigParser.ParsingError as e:
self.fail("Failed to read test_migrations.conf config "
"file. Got error: %s" % e)
else:
self.fail("Failed to find test_migrations.conf config "
"file.")
self.engines = {}
for key, value in self.test_databases.items():
self.engines[key] = sqlalchemy.create_engine(value)
# We start each test case with a completely blank slate.
self.temp_dir = self.useFixture(fixtures.TempDir())
self._reset_databases()
# We also want to clean up, eg. in case of a failing test
self.addCleanup(self._reset_databases)
def execute_cmd(self, cmd=None):
proc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
stderr=subprocess.STDOUT, shell=True)
output = proc.communicate()[0]
LOG.debug(output)
self.assertEqual(0, proc.returncode,
"Failed to run: %s\n%s" % (cmd, output))
def _reset_pg(self, conn_pieces):
@lockutils.synchronized('pgadmin', 'tests-', external=True,
lock_path=self.temp_dir.path)
def _reset_pg_inner():
(user, password, database, host) = \
get_db_connection_info(conn_pieces)
os.environ['PGPASSWORD'] = password
os.environ['PGUSER'] = user
# note(boris-42): We must create and drop database, we can't
# drop database which we have connected to, so for such
# operations there is a special database template1.
sqlcmd = ("psql -w -U %(user)s -h %(host)s -c"
" '%(sql)s' -d template1")
sql = ("drop database if exists %s;") % database
droptable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(droptable)
sql = ("create database %s;") % database
createtable = sqlcmd % {'user': user, 'host': host, 'sql': sql}
self.execute_cmd(createtable)
os.unsetenv('PGPASSWORD')
os.unsetenv('PGUSER')
_reset_pg_inner()
def _reset_databases(self):
for key, engine in self.engines.items():
conn_string = self.test_databases[key]
conn_pieces = urlparse.urlparse(conn_string)
engine.dispose()
if conn_string.startswith('sqlite'):
# We can just delete the SQLite database, which is
# the easiest and cleanest solution
db_path = conn_pieces.path.strip('/')
if os.path.exists(db_path):
os.unlink(db_path)
# No need to recreate the SQLite DB. SQLite will
# create it for us if it's not there...
elif conn_string.startswith('mysql'):
# We can execute the MySQL client to destroy and re-create
# the MYSQL database, which is easier and less error-prone
# than using SQLAlchemy to do this via MetaData...trust me.
(user, password, database, host) = \
get_db_connection_info(conn_pieces)
sql = ("drop database if exists %(database)s; "
"create database %(database)s;") % \
{'database': database}
cmd = ("mysql -u \"%(user)s\" -p\"%(password)s\" -h %(host)s "
"-e \"%(sql)s\"") % {'user': user, 'password': password,
'host': host, 'sql': sql}
self.execute_cmd(cmd)
elif conn_string.startswith('postgresql'):
self._reset_pg(conn_pieces)
class WalkVersionsMixin(object):
def _walk_versions(self, engine=None, alembic_cfg=None, downgrade=True):
# Determine latest version script from the repo, then
@ -398,104 +254,24 @@ class TestWalkVersions(base.TestCase, WalkVersionsMixin):
self.assertEqual(upgraded, self._migrate_up.call_args_list)
class TestMigrations(BaseMigrationTestCase, WalkVersionsMixin):
USER = "openstack_citest"
PASSWD = "openstack_citest"
DATABASE = "openstack_citest"
def __init__(self, *args, **kwargs):
super(TestMigrations, self).__init__(*args, **kwargs)
class MigrationCheckersMixin(object):
def setUp(self):
super(TestMigrations, self).setUp()
super(MigrationCheckersMixin, self).setUp()
self.config = migration._alembic_config()
if self.migration_api is None:
self.migration_api = migration
def _test_mysql_opportunistically(self):
# Test that table creation on mysql only builds InnoDB tables
if not _have_mysql(self.USER, self.PASSWD, self.DATABASE):
self.skipTest("mysql not available")
# add this to the global lists to make reset work with it, it's removed
# automatically during Cleanup so no need to clean it up here.
connect_string = _get_connect_string("mysql", self.USER, self.PASSWD,
self.DATABASE)
(user, password, database, host) = \
get_db_connection_info(urlparse.urlparse(connect_string))
engine = sqlalchemy.create_engine(connect_string)
self.engines[database] = engine
self.test_databases[database] = connect_string
# build a fully populated mysql database with all the tables
self._reset_databases()
self._walk_versions(engine, self.config, downgrade=False)
connection = engine.connect()
# sanity check
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='%s'" % database)
self.assertTrue(total.scalar() > 0, "No tables found. Wrong schema?")
noninnodb = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
"where TABLE_SCHEMA='%s' "
"and ENGINE!='InnoDB' "
"and TABLE_NAME!='alembic_version'" %
database)
count = noninnodb.scalar()
self.assertEqual(0, count, "%d non InnoDB tables created" % count)
connection.close()
def _test_postgresql_opportunistically(self):
# Test postgresql database migration walk
if not _have_postgresql(self.USER, self.PASSWD, self.DATABASE):
self.skipTest("postgresql not available")
# add this to the global lists to make reset work with it, it's removed
# automatically during Cleanup so no need to clean it up here.
connect_string = _get_connect_string("postgres", self.USER,
self.PASSWD, self.DATABASE)
engine = sqlalchemy.create_engine(connect_string)
(user, password, database, host) = \
get_db_connection_info(urlparse.urlparse(connect_string))
self.engines[database] = engine
self.test_databases[database] = connect_string
# build a fully populated postgresql database with all the tables
self._reset_databases()
self._walk_versions(engine, self.config, downgrade=False)
self.migration_api = migration
def test_walk_versions(self):
if not len(self.engines.values()):
self.skipTest("No engines initialized")
for engine in self.engines.values():
self._walk_versions(engine, self.config, downgrade=False)
def test_mysql_opportunistically(self):
self._test_mysql_opportunistically()
def test_mysql_connect_fail(self):
"""Test that we can trigger a mysql connection failure
Test that we can fail gracefully to ensure we don't break people
without mysql
"""
if _is_backend_avail('mysql', "openstack_cifail", self.PASSWD,
self.DATABASE):
self.fail("Shouldn't have connected")
def test_postgresql_opportunistically(self):
self._test_postgresql_opportunistically()
self._walk_versions(self.engine, self.config, downgrade=False)
def test_postgresql_connect_fail(self):
"""Test that we can trigger a postgres connection failure
def test_connect_fail(self):
"""Test that we can trigger a database connection failure
Test that we can fail gracefully to ensure we don't break people
without postgres
without specific database backend
"""
if _is_backend_avail('postgres', "openstack_cifail", self.PASSWD,
self.DATABASE):
if _is_backend_avail(self.FIXTURE.DRIVER, "openstack_cifail",
self.FIXTURE.USERNAME, self.FIXTURE.DBNAME):
self.fail("Shouldn't have connected")
def _check_21b331f883ef(self, engine, data):
@ -533,3 +309,15 @@ class TestMigrations(BaseMigrationTestCase, WalkVersionsMixin):
data['uuid'] = utils.generate_uuid()
self.assertRaises(sqlalchemy.exc.IntegrityError,
nodes.insert().execute, data)
class TestMigrationsMySQL(MigrationCheckersMixin,
WalkVersionsMixin,
test_base.MySQLOpportunisticTestCase):
pass
class TestMigrationsPostgreSQL(MigrationCheckersMixin,
WalkVersionsMixin,
test_base.PostgreSQLOpportunisticTestCase):
pass

1
test-requirements.txt

@ -5,6 +5,7 @@ fixtures>=0.3.14
mock>=1.0
Babel>=1.3
MySQL-python
oslotest
psycopg2
python-ironicclient
python-subunit>=0.0.18

Loading…
Cancel
Save