Merge "refactored data upgrade tests in test_migrations"

This commit is contained in:
Jenkins
2013-01-29 15:33:27 +00:00
committed by Gerrit Code Review
2 changed files with 135 additions and 241 deletions

View File

@@ -37,9 +37,9 @@ def upgrade(migrate_engine):
if rec['binary'] != 'nova-compute':
continue
# if zone doesn't exist create
result = aggregate_metadata.select().where(aggregate_metadata.c.key ==
'availability_zone' and
aggregate_metadata.c.key == rec['availability_zone']).execute()
result = aggregate_metadata.select().where(
aggregate_metadata.c.key == 'availability_zone').where(
aggregate_metadata.c.value == rec['availability_zone']).execute()
result = [r for r in result]
if len(result) > 0:
agg_id = result[0].aggregate_id

View File

@@ -87,6 +87,16 @@ def _have_mysql():
return present.lower() in ('', 'true')
def get_table(engine, name):
"""Returns an sqlalchemy table dynamically from db.
Needed because the models don't work for us in migrations
as models will be far out of sync with the current data."""
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
return sqlalchemy.Table(name, metadata, autoload=True)
class TestMigrations(test.TestCase):
"""Test sqlalchemy-migrate migrations."""
@@ -227,19 +237,11 @@ class TestMigrations(test.TestCase):
self.engines["mysqlcitest"] = engine
self.test_databases["mysqlcitest"] = connect_string
# Test that we end in an innodb
self._check_mysql_innodb(engine)
# Test IP transition
self._check_mysql_migration_149(engine)
def _check_mysql_innodb(self, engine):
# build a fully populated mysql database with all the tables
self._reset_databases()
self._walk_versions(engine, False, False)
uri = _get_connect_string("mysql", database="information_schema")
connection = sqlalchemy.create_engine(uri).connect()
connection = engine.connect()
# sanity check
total = connection.execute("SELECT count(*) "
"from information_schema.TABLES "
@@ -253,92 +255,8 @@ class TestMigrations(test.TestCase):
"and TABLE_NAME!='migrate_version'")
count = noninnodb.scalar()
self.assertEqual(count, 0, "%d non InnoDB tables created" % count)
def test_migration_149_postgres(self):
"""Test updating a table with IPAddress columns."""
if not _is_backend_avail('postgres'):
self.skipTest("postgres not available")
connect_string = _get_connect_string("postgres")
engine = sqlalchemy.create_engine(connect_string)
self.engines["postgrescitest"] = engine
self.test_databases["postgrescitest"] = connect_string
self._reset_databases()
migration_api.version_control(engine, TestMigrations.REPOSITORY,
migration.INIT_VERSION)
connection = engine.connect()
self._migrate_up(engine, 148)
IPS = ("127.0.0.1", "255.255.255.255", "2001:db8::1:2", "::1")
connection.execute("INSERT INTO provider_fw_rules "
" (protocol, from_port, to_port, cidr)"
"VALUES ('tcp', 1234, 1234, '%s'), "
" ('tcp', 1234, 1234, '%s'), "
" ('tcp', 1234, 1234, '%s'), "
" ('tcp', 1234, 1234, '%s')" % IPS)
self.assertEqual('character varying',
connection.execute(
"SELECT data_type FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE table_name='provider_fw_rules' "
"AND table_catalog='openstack_citest' "
"AND column_name='cidr'").scalar())
self._migrate_up(engine, 149)
self.assertEqual(IPS,
tuple(tup[0] for tup in connection.execute(
"SELECT cidr from provider_fw_rules").fetchall()))
self.assertEqual('inet',
connection.execute(
"SELECT data_type FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE table_name='provider_fw_rules' "
"AND table_catalog='openstack_citest' "
"AND column_name='cidr'").scalar())
connection.close()
def _check_mysql_migration_149(self, engine):
"""Test updating a table with IPAddress columns."""
self._reset_databases()
migration_api.version_control(engine, TestMigrations.REPOSITORY,
migration.INIT_VERSION)
uri = _get_connect_string("mysql", database="openstack_citest")
connection = sqlalchemy.create_engine(uri).connect()
self._migrate_up(engine, 148)
IPS = ("127.0.0.1", "255.255.255.255", "2001:db8::1:2", "::1")
connection.execute("INSERT INTO provider_fw_rules "
" (protocol, from_port, to_port, cidr)"
"VALUES ('tcp', 1234, 1234, '%s'), "
" ('tcp', 1234, 1234, '%s'), "
" ('tcp', 1234, 1234, '%s'), "
" ('tcp', 1234, 1234, '%s')" % IPS)
self.assertEqual('varchar(255)',
connection.execute(
"SELECT column_type FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE table_name='provider_fw_rules' "
"AND table_schema='openstack_citest' "
"AND column_name='cidr'").scalar())
connection.close()
self._migrate_up(engine, 149)
connection = sqlalchemy.create_engine(uri).connect()
self.assertEqual(IPS,
tuple(tup[0] for tup in connection.execute(
"SELECT cidr from provider_fw_rules").fetchall()))
self.assertEqual('varchar(39)',
connection.execute(
"SELECT column_type FROM INFORMATION_SCHEMA.COLUMNS "
"WHERE table_name='provider_fw_rules' "
"AND table_schema='openstack_citest' "
"AND column_name='cidr'").scalar())
def _walk_versions(self, engine=None, snake_walk=False, downgrade=True):
# Determine latest version script from the repo, then
# upgrade from 1 through to the latest, with no data
@@ -360,7 +278,7 @@ class TestMigrations(test.TestCase):
for version in xrange(migration.INIT_VERSION + 2,
TestMigrations.REPOSITORY.latest + 1):
# upgrade -> downgrade -> upgrade
self._migrate_up(engine, version)
self._migrate_up(engine, version, with_data=True)
if snake_walk:
self._migrate_down(engine, version)
self._migrate_up(engine, version)
@@ -385,7 +303,19 @@ class TestMigrations(test.TestCase):
migration_api.db_version(engine,
TestMigrations.REPOSITORY))
def _migrate_up(self, engine, version):
def _migrate_up(self, engine, version, with_data=False):
"""migrate up to a new version of the db.
We allow for data insertion and post checks at every
migration version with special _prerun_### and
_check_### functions in the main test.
"""
if with_data:
data = None
prerun = getattr(self, "_prerun_%d" % version, None)
if prerun:
data = prerun(engine)
migration_api.upgrade(engine,
TestMigrations.REPOSITORY,
version)
@@ -393,168 +323,132 @@ class TestMigrations(test.TestCase):
migration_api.db_version(engine,
TestMigrations.REPOSITORY))
def test_migration_146(self):
name = 'name'
az = 'custom_az'
if with_data:
check = getattr(self, "_check_%d" % version, None)
if check:
check(engine, data)
def _145_check():
agg = aggregates.select(aggregates.c.id == 1).execute().first()
self.assertEqual(name, agg.name)
self.assertEqual(az, agg.availability_zone)
# migration 146, availability zone transition
def _prerun_146(self, engine):
data = {
'id': 1,
'availability_zone': 'custom_az',
'aggregate_name': 1,
'name': 'name',
}
for key, engine in self.engines.items():
migration_api.version_control(engine, TestMigrations.REPOSITORY,
migration.INIT_VERSION)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 145)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
aggregates = sqlalchemy.Table('aggregates', metadata,
autoload=True)
aggregates = get_table(engine, 'aggregates')
aggregates.insert().values(data).execute()
return data
aggregates.insert().values(id=1, availability_zone=az,
aggregate_name=1, name=name).execute()
def _check_146(self, engine, data):
aggregate_md = get_table(engine, 'aggregate_metadata')
md = aggregate_md.select(
aggregate_md.c.aggregate_id == 1).execute().first()
self.assertEqual(data['availability_zone'], md['value'])
_145_check()
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 146)
aggregate_metadata = sqlalchemy.Table('aggregate_metadata',
metadata, autoload=True)
metadata = aggregate_metadata.select(aggregate_metadata.c.
aggregate_id == 1).execute().first()
self.assertEqual(az, metadata['value'])
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 145)
_145_check()
def test_migration_147(self):
# migration 147, availability zone transition for services
def _prerun_147(self, engine):
az = 'test_zone'
host1 = 'compute-host1'
host2 = 'compute-host2'
# start at id == 2 because we already inserted one
data = [
{'id': 1, 'host': host1,
'binary': 'nova-compute', 'topic': 'compute',
'report_count': 0, 'availability_zone': az},
{'id': 2, 'host': 'sched-host',
'binary': 'nova-scheduler', 'topic': 'scheduler',
'report_count': 0, 'availability_zone': 'ignore_me'},
{'id': 3, 'host': host2,
'binary': 'nova-compute', 'topic': 'compute',
'report_count': 0, 'availability_zone': az},
]
def _146_check():
service = services.select(services.c.id == 1).execute().first()
self.assertEqual(az, service.availability_zone)
self.assertEqual(host1, service.host)
service = services.select(services.c.id == 2).execute().first()
self.assertNotEqual(az, service.availability_zone)
service = services.select(services.c.id == 3).execute().first()
self.assertEqual(az, service.availability_zone)
self.assertEqual(host2, service.host)
services = get_table(engine, 'services')
engine.execute(services.insert(), data)
return data
for key, engine in self.engines.items():
migration_api.version_control(engine, TestMigrations.REPOSITORY,
migration.INIT_VERSION)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 146)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
def _check_147(self, engine, data):
aggregate_md = get_table(engine, 'aggregate_metadata')
aggregate_hosts = get_table(engine, 'aggregate_hosts')
# NOTE(sdague): hard coded to id == 2, because we added to
# aggregate_metadata previously
for item in data:
md = aggregate_md.select(
aggregate_md.c.aggregate_id == 2).execute().first()
if item['binary'] == "nova-compute":
self.assertEqual(item['availability_zone'], md['value'])
#populate service table
services = sqlalchemy.Table('services', metadata,
autoload=True)
services.insert().values(id=1, host=host1,
binary='nova-compute', topic='compute', report_count=0,
availability_zone=az).execute()
services.insert().values(id=2, host='sched-host',
binary='nova-scheduler', topic='scheduler', report_count=0,
availability_zone='ignore_me').execute()
services.insert().values(id=3, host=host2,
binary='nova-compute', topic='compute', report_count=0,
availability_zone=az).execute()
host = aggregate_hosts.select(
aggregate_hosts.c.aggregate_id == 2
).execute().first()
self.assertEqual(host['host'], data[0]['host'])
_146_check()
# NOTE(sdague): id 3 is just non-existent
host = aggregate_hosts.select(
aggregate_hosts.c.aggregate_id == 3
).execute().first()
self.assertEqual(host, None)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 147)
# migration 149, changes IPAddr storage format
def _prerun_149(self, engine):
provider_fw_rules = get_table(engine, 'provider_fw_rules')
data = [
{'protocol': 'tcp', 'from_port': 1234,
'to_port': 1234, 'cidr': "127.0.0.1"},
{'protocol': 'tcp', 'from_port': 1234,
'to_port': 1234, 'cidr': "255.255.255.255"},
{'protocol': 'tcp', 'from_port': 1234,
'to_port': 1234, 'cidr': "2001:db8::1:2"},
{'protocol': 'tcp', 'from_port': 1234,
'to_port': 1234, 'cidr': "::1"}
]
engine.execute(provider_fw_rules.insert(), data)
return data
# check aggregate metadata
aggregate_metadata = sqlalchemy.Table('aggregate_metadata',
metadata, autoload=True)
aggregate_hosts = sqlalchemy.Table('aggregate_hosts',
metadata, autoload=True)
metadata = aggregate_metadata.select(aggregate_metadata.c.
aggregate_id == 1).execute().first()
self.assertEqual(az, metadata['value'])
self.assertEqual(aggregate_hosts.select(
aggregate_hosts.c.aggregate_id == 1).execute().
first().host, host1)
blank = [h for h in aggregate_hosts.select(
aggregate_hosts.c.aggregate_id == 2).execute()]
self.assertEqual(blank, [])
def _check_149(self, engine, data):
provider_fw_rules = get_table(engine, 'provider_fw_rules')
result = provider_fw_rules.select().execute()
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 146)
iplist = map(lambda x: x['cidr'], data)
_146_check()
for row in result:
self.assertIn(row['cidr'], iplist)
def test_migration_152(self):
# migration 152 - convert deleted from boolean to int
def _prerun_152(self, engine):
host1 = 'compute-host1'
host2 = 'compute-host2'
# NOTE(sdague): start at #4 because services data already in table
# from 147
services_data = [
{'id': 4, 'host': host1, 'binary': 'nova-compute',
'report_count': 0, 'topic': 'compute', 'deleted': False},
{'id': 5, 'host': host1, 'binary': 'nova-compute',
'report_count': 0, 'topic': 'compute', 'deleted': True}
]
volumes_data = [
{'id': 'first', 'host': host1, 'deleted': False},
{'id': 'second', 'host': host2, 'deleted': True}
]
def _151_check(services, volumes):
service = services.select(services.c.id == 1).execute().first()
self.assertEqual(False, service.deleted)
service = services.select(services.c.id == 2).execute().first()
self.assertEqual(True, service.deleted)
services = get_table(engine, 'services')
engine.execute(services.insert(), services_data)
volume = volumes.select(volumes.c.id == "first").execute().first()
self.assertEqual(False, volume.deleted)
volume = volumes.select(volumes.c.id == "second").execute().first()
self.assertEqual(True, volume.deleted)
volumes = get_table(engine, 'volumes')
engine.execute(volumes.insert(), volumes_data)
return dict(services=services_data, volumes=volumes_data)
for key, engine in self.engines.items():
migration_api.version_control(engine, TestMigrations.REPOSITORY,
migration.INIT_VERSION)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 151)
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
def _check_152(self, engine, data):
services = get_table(engine, 'services')
service = services.select(services.c.id == 4).execute().first()
self.assertEqual(0, service.deleted)
service = services.select(services.c.id == 5).execute().first()
self.assertEqual(service.id, service.deleted)
# NOTE(boris-42): It is enough to test one table with type of `id`
# column Integer and one with type String.
services = sqlalchemy.Table('services', metadata, autoload=True)
volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
engine.execute(
services.insert(),
[
{'id': 1, 'host': host1, 'binary': 'nova-compute',
'report_count': 0, 'topic': 'compute', 'deleted': False},
{'id': 2, 'host': host1, 'binary': 'nova-compute',
'report_count': 0, 'topic': 'compute', 'deleted': True}
]
)
engine.execute(
volumes.insert(),
[
{'id': 'first', 'host': host1, 'deleted': False},
{'id': 'second', 'host': host2, 'deleted': True}
]
)
_151_check(services, volumes)
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 152)
# NOTE(boris-42): One more time get from DB info about tables.
metadata2 = sqlalchemy.schema.MetaData()
metadata2.bind = engine
services = sqlalchemy.Table('services', metadata2, autoload=True)
service = services.select(services.c.id == 1).execute().first()
self.assertEqual(0, service.deleted)
service = services.select(services.c.id == 2).execute().first()
self.assertEqual(service.id, service.deleted)
volumes = sqlalchemy.Table('volumes', metadata2, autoload=True)
volume = volumes.select(volumes.c.id == "first").execute().first()
self.assertEqual("", volume.deleted)
volume = volumes.select(volumes.c.id == "second").execute().first()
self.assertEqual(volume.id, volume.deleted)
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 151)
# NOTE(boris-42): One more time get from DB info about tables.
metadata = sqlalchemy.schema.MetaData()
metadata.bind = engine
services = sqlalchemy.Table('services', metadata, autoload=True)
volumes = sqlalchemy.Table('volumes', metadata, autoload=True)
_151_check(services, volumes)
volumes = get_table(engine, 'volumes')
volume = volumes.select(volumes.c.id == "first").execute().first()
self.assertEqual("", volume.deleted)
volume = volumes.select(volumes.c.id == "second").execute().first()
self.assertEqual(volume.id, volume.deleted)