Merge "Add tests for consistency groups DB migration"
This commit is contained in:
commit
b35c97cd25
@ -1126,3 +1126,156 @@ class TestMigrations(test.TestCase):
|
||||
self.assertNotIn('replication_status', volumes.c)
|
||||
self.assertNotIn('replication_extended_status', volumes.c)
|
||||
self.assertNotIn('replication_driver_data', volumes.c)
|
||||
|
||||
def test_migration_025(self):
|
||||
"""Test adding table and columns for consistencygroups."""
|
||||
for (key, engine) in self.engines.items():
|
||||
migration_api.version_control(engine,
|
||||
TestMigrations.REPOSITORY,
|
||||
migration.db_initial_version())
|
||||
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 24)
|
||||
metadata = sqlalchemy.schema.MetaData()
|
||||
metadata.bind = engine
|
||||
|
||||
# Upgrade
|
||||
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
|
||||
|
||||
# Test consistencygroup_id is in Table volumes
|
||||
volumes = sqlalchemy.Table('volumes',
|
||||
metadata,
|
||||
autoload=True)
|
||||
self.assertIsInstance(volumes.c.consistencygroup_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
|
||||
# Test cgsnapshot_id is in Table snapshots
|
||||
snapshots = sqlalchemy.Table('snapshots',
|
||||
metadata,
|
||||
autoload=True)
|
||||
self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
|
||||
# Test Table consistencygroups exists
|
||||
self.assertTrue(engine.dialect.has_table(engine.connect(),
|
||||
"consistencygroups"))
|
||||
consistencygroups = sqlalchemy.Table('consistencygroups',
|
||||
metadata,
|
||||
autoload=True)
|
||||
|
||||
self.assertIsInstance(consistencygroups.c.created_at.type,
|
||||
self.time_type[engine.name])
|
||||
self.assertIsInstance(consistencygroups.c.updated_at.type,
|
||||
self.time_type[engine.name])
|
||||
self.assertIsInstance(consistencygroups.c.deleted_at.type,
|
||||
self.time_type[engine.name])
|
||||
self.assertIsInstance(consistencygroups.c.deleted.type,
|
||||
self.bool_type[engine.name])
|
||||
self.assertIsInstance(consistencygroups.c.id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.user_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.project_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.host.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.availability_zone.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.name.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.description.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.volume_type_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(consistencygroups.c.status.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
|
||||
# Test Table cgsnapshots exists
|
||||
self.assertTrue(engine.dialect.has_table(engine.connect(),
|
||||
"cgsnapshots"))
|
||||
cgsnapshots = sqlalchemy.Table('cgsnapshots',
|
||||
metadata,
|
||||
autoload=True)
|
||||
|
||||
self.assertIsInstance(cgsnapshots.c.created_at.type,
|
||||
self.time_type[engine.name])
|
||||
self.assertIsInstance(cgsnapshots.c.updated_at.type,
|
||||
self.time_type[engine.name])
|
||||
self.assertIsInstance(cgsnapshots.c.deleted_at.type,
|
||||
self.time_type[engine.name])
|
||||
self.assertIsInstance(cgsnapshots.c.deleted.type,
|
||||
self.bool_type[engine.name])
|
||||
self.assertIsInstance(cgsnapshots.c.id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(cgsnapshots.c.user_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(cgsnapshots.c.project_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(cgsnapshots.c.name.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(cgsnapshots.c.description.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
self.assertIsInstance(cgsnapshots.c.status.type,
|
||||
sqlalchemy.types.VARCHAR)
|
||||
|
||||
# Downgrade
|
||||
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 24)
|
||||
metadata = sqlalchemy.schema.MetaData()
|
||||
metadata.bind = engine
|
||||
|
||||
# Test consistencygroup_id is not in Table volumes
|
||||
volumes = sqlalchemy.Table('volumes',
|
||||
metadata,
|
||||
autoload=True)
|
||||
self.assertNotIn('consistencygroup_id', volumes.c)
|
||||
|
||||
# Test cgsnapshot_id is not in Table snapshots
|
||||
snapshots = sqlalchemy.Table('snapshots',
|
||||
metadata,
|
||||
autoload=True)
|
||||
self.assertNotIn('cgsnapshot_id', snapshots.c)
|
||||
|
||||
# Test Table cgsnapshots doesn't exist any more
|
||||
self.assertFalse(engine.dialect.has_table(engine.connect(),
|
||||
"cgsnapshots"))
|
||||
|
||||
# Test Table consistencygroups doesn't exist any more
|
||||
self.assertFalse(engine.dialect.has_table(engine.connect(),
|
||||
"consistencygroups"))
|
||||
|
||||
def test_migration_026(self):
|
||||
"""Test adding default data for consistencygroups quota class."""
|
||||
for (key, engine) in self.engines.items():
|
||||
migration_api.version_control(engine,
|
||||
TestMigrations.REPOSITORY,
|
||||
migration.db_initial_version())
|
||||
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 25)
|
||||
metadata = sqlalchemy.schema.MetaData()
|
||||
metadata.bind = engine
|
||||
|
||||
quota_class_metadata = sqlalchemy.Table('quota_classes',
|
||||
metadata,
|
||||
autoload=True)
|
||||
|
||||
num_defaults = quota_class_metadata.count().\
|
||||
where(quota_class_metadata.c.class_name == 'default').\
|
||||
execute().scalar()
|
||||
|
||||
self.assertEqual(3, num_defaults)
|
||||
|
||||
migration_api.upgrade(engine, TestMigrations.REPOSITORY, 26)
|
||||
|
||||
num_defaults = quota_class_metadata.count().\
|
||||
where(quota_class_metadata.c.class_name == 'default').\
|
||||
execute().scalar()
|
||||
|
||||
self.assertEqual(4, num_defaults)
|
||||
|
||||
migration_api.downgrade(engine, TestMigrations.REPOSITORY, 25)
|
||||
|
||||
# Defaults should not be deleted during downgrade
|
||||
num_defaults = quota_class_metadata.count().\
|
||||
where(quota_class_metadata.c.class_name == 'default').\
|
||||
execute().scalar()
|
||||
|
||||
self.assertEqual(4, num_defaults)
|
||||
|
Loading…
Reference in New Issue
Block a user