Merge "Constant defined for sqlAlchemy VARCHAR & INTEGER"

This commit is contained in:
Jenkins 2016-01-06 21:30:40 +00:00 committed by Gerrit Code Review
commit a7b057aeff

View File

@ -38,6 +38,8 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
BOOL_TYPE = sqlalchemy.types.BOOLEAN BOOL_TYPE = sqlalchemy.types.BOOLEAN
TIME_TYPE = sqlalchemy.types.DATETIME TIME_TYPE = sqlalchemy.types.DATETIME
INTEGER_TYPE = sqlalchemy.types.INTEGER
VARCHAR_TYPE = sqlalchemy.types.VARCHAR
@property @property
def INIT_VERSION(self): def INIT_VERSION(self):
@ -152,12 +154,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
"""Test that adding source_volid column works correctly.""" """Test that adding source_volid column works correctly."""
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.source_volid.type, self.assertIsInstance(volumes.c.source_volid.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_006(self, engine, data): def _check_006(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots') snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(snapshots.c.provider_location.type, self.assertIsInstance(snapshots.c.provider_location.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_007(self, engine, data): def _check_007(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots') snapshots = db_utils.get_table(engine, 'snapshots')
@ -185,35 +187,35 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(backups.c.deleted.type, self.assertIsInstance(backups.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(backups.c.id.type, self.assertIsInstance(backups.c.id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.volume_id.type, self.assertIsInstance(backups.c.volume_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.user_id.type, self.assertIsInstance(backups.c.user_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.project_id.type, self.assertIsInstance(backups.c.project_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.host.type, self.assertIsInstance(backups.c.host.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.availability_zone.type, self.assertIsInstance(backups.c.availability_zone.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.display_name.type, self.assertIsInstance(backups.c.display_name.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.display_description.type, self.assertIsInstance(backups.c.display_description.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.container.type, self.assertIsInstance(backups.c.container.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.status.type, self.assertIsInstance(backups.c.status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.fail_reason.type, self.assertIsInstance(backups.c.fail_reason.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.service_metadata.type, self.assertIsInstance(backups.c.service_metadata.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.service.type, self.assertIsInstance(backups.c.service.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.size.type, self.assertIsInstance(backups.c.size.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(backups.c.object_count.type, self.assertIsInstance(backups.c.object_count.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
def _check_009(self, engine, data): def _check_009(self, engine, data):
"""Test adding snapshot_metadata table works correctly.""" """Test adding snapshot_metadata table works correctly."""
@ -232,13 +234,13 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(snapshot_metadata.c.deleted.type, self.assertIsInstance(snapshot_metadata.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(snapshot_metadata.c.id.type, self.assertIsInstance(snapshot_metadata.c.id.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(snapshot_metadata.c.snapshot_id.type, self.assertIsInstance(snapshot_metadata.c.snapshot_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(snapshot_metadata.c.key.type, self.assertIsInstance(snapshot_metadata.c.key.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(snapshot_metadata.c.value.type, self.assertIsInstance(snapshot_metadata.c.value.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_010(self, engine, data): def _check_010(self, engine, data):
"""Test adding transfers table works correctly.""" """Test adding transfers table works correctly."""
@ -255,15 +257,15 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(transfers.c.deleted.type, self.assertIsInstance(transfers.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(transfers.c.id.type, self.assertIsInstance(transfers.c.id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.volume_id.type, self.assertIsInstance(transfers.c.volume_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.display_name.type, self.assertIsInstance(transfers.c.display_name.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.salt.type, self.assertIsInstance(transfers.c.salt.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.crypt_hash.type, self.assertIsInstance(transfers.c.crypt_hash.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(transfers.c.expires_at.type, self.assertIsInstance(transfers.c.expires_at.type,
self.TIME_TYPE) self.TIME_TYPE)
@ -278,19 +280,19 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
"""Test that adding attached_host column works correctly.""" """Test that adding attached_host column works correctly."""
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.attached_host.type, self.assertIsInstance(volumes.c.attached_host.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_013(self, engine, data): def _check_013(self, engine, data):
"""Test that adding provider_geometry column works correctly.""" """Test that adding provider_geometry column works correctly."""
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.provider_geometry.type, self.assertIsInstance(volumes.c.provider_geometry.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_014(self, engine, data): def _check_014(self, engine, data):
"""Test that adding _name_id column works correctly.""" """Test that adding _name_id column works correctly."""
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c._name_id.type, self.assertIsInstance(volumes.c._name_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_015(self, engine, data): def _check_015(self, engine, data):
"""Test removing migrations table works correctly.""" """Test removing migrations table works correctly."""
@ -312,26 +314,26 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIn('encryption_key_id', volumes.c) self.assertIn('encryption_key_id', volumes.c)
self.assertIsInstance(volumes.c.encryption_key_id.type, self.assertIsInstance(volumes.c.encryption_key_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
snapshots = db_utils.get_table(engine, 'snapshots') snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIn('encryption_key_id', snapshots.c) self.assertIn('encryption_key_id', snapshots.c)
self.assertIsInstance(snapshots.c.encryption_key_id.type, self.assertIsInstance(snapshots.c.encryption_key_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIn('volume_type_id', snapshots.c) self.assertIn('volume_type_id', snapshots.c)
self.assertIsInstance(snapshots.c.volume_type_id.type, self.assertIsInstance(snapshots.c.volume_type_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
# encryption types table # encryption types table
encryption = db_utils.get_table(engine, 'encryption') encryption = db_utils.get_table(engine, 'encryption')
self.assertIsInstance(encryption.c.volume_type_id.type, self.assertIsInstance(encryption.c.volume_type_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(encryption.c.cipher.type, self.assertIsInstance(encryption.c.cipher.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(encryption.c.key_size.type, self.assertIsInstance(encryption.c.key_size.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(encryption.c.provider.type, self.assertIsInstance(encryption.c.provider.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_018(self, engine, data): def _check_018(self, engine, data):
"""Test that added qos_specs table works correctly.""" """Test that added qos_specs table works correctly."""
@ -347,19 +349,19 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(qos_specs.c.deleted.type, self.assertIsInstance(qos_specs.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(qos_specs.c.id.type, self.assertIsInstance(qos_specs.c.id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(qos_specs.c.specs_id.type, self.assertIsInstance(qos_specs.c.specs_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(qos_specs.c.key.type, self.assertIsInstance(qos_specs.c.key.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(qos_specs.c.value.type, self.assertIsInstance(qos_specs.c.value.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_019(self, engine, data): def _check_019(self, engine, data):
"""Test that adding migration_status column works correctly.""" """Test that adding migration_status column works correctly."""
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.migration_status.type, self.assertIsInstance(volumes.c.migration_status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_020(self, engine, data): def _check_020(self, engine, data):
"""Test adding volume_admin_metadata table works correctly.""" """Test adding volume_admin_metadata table works correctly."""
@ -377,13 +379,13 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(volume_admin_metadata.c.deleted.type, self.assertIsInstance(volume_admin_metadata.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(volume_admin_metadata.c.id.type, self.assertIsInstance(volume_admin_metadata.c.id.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(volume_admin_metadata.c.volume_id.type, self.assertIsInstance(volume_admin_metadata.c.volume_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(volume_admin_metadata.c.key.type, self.assertIsInstance(volume_admin_metadata.c.key.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(volume_admin_metadata.c.value.type, self.assertIsInstance(volume_admin_metadata.c.value.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _verify_quota_defaults(self, engine): def _verify_quota_defaults(self, engine):
quota_class_metadata = db_utils.get_table(engine, 'quota_classes') quota_class_metadata = db_utils.get_table(engine, 'quota_classes')
@ -402,7 +404,7 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
"""Test that adding disabled_reason column works correctly.""" """Test that adding disabled_reason column works correctly."""
services = db_utils.get_table(engine, 'services') services = db_utils.get_table(engine, 'services')
self.assertIsInstance(services.c.disabled_reason.type, self.assertIsInstance(services.c.disabled_reason.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_023(self, engine, data): def _check_023(self, engine, data):
"""Test that adding reservations index works correctly.""" """Test that adding reservations index works correctly."""
@ -420,11 +422,11 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
"""Test adding replication columns to volume table.""" """Test adding replication columns to volume table."""
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.replication_status.type, self.assertIsInstance(volumes.c.replication_status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(volumes.c.replication_extended_status.type, self.assertIsInstance(volumes.c.replication_extended_status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(volumes.c.replication_driver_data.type, self.assertIsInstance(volumes.c.replication_driver_data.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_025(self, engine, data): def _check_025(self, engine, data):
"""Test adding table and columns for consistencygroups.""" """Test adding table and columns for consistencygroups."""
@ -432,12 +434,12 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
metadata = sqlalchemy.MetaData() metadata = sqlalchemy.MetaData()
volumes = self.get_table_ref(engine, 'volumes', metadata) volumes = self.get_table_ref(engine, 'volumes', metadata)
self.assertIsInstance(volumes.c.consistencygroup_id.type, self.assertIsInstance(volumes.c.consistencygroup_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
# Test cgsnapshot_id is in Table snapshots # Test cgsnapshot_id is in Table snapshots
snapshots = self.get_table_ref(engine, 'snapshots', metadata) snapshots = self.get_table_ref(engine, 'snapshots', metadata)
self.assertIsInstance(snapshots.c.cgsnapshot_id.type, self.assertIsInstance(snapshots.c.cgsnapshot_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
# Test Table consistencygroups exists # Test Table consistencygroups exists
self.assertTrue(engine.dialect.has_table(engine.connect(), self.assertTrue(engine.dialect.has_table(engine.connect(),
@ -454,23 +456,23 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(consistencygroups.c.deleted.type, self.assertIsInstance(consistencygroups.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(consistencygroups.c.id.type, self.assertIsInstance(consistencygroups.c.id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.user_id.type, self.assertIsInstance(consistencygroups.c.user_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.project_id.type, self.assertIsInstance(consistencygroups.c.project_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.host.type, self.assertIsInstance(consistencygroups.c.host.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.availability_zone.type, self.assertIsInstance(consistencygroups.c.availability_zone.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.name.type, self.assertIsInstance(consistencygroups.c.name.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.description.type, self.assertIsInstance(consistencygroups.c.description.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.volume_type_id.type, self.assertIsInstance(consistencygroups.c.volume_type_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(consistencygroups.c.status.type, self.assertIsInstance(consistencygroups.c.status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
# Test Table cgsnapshots exists # Test Table cgsnapshots exists
self.assertTrue(engine.dialect.has_table(engine.connect(), self.assertTrue(engine.dialect.has_table(engine.connect(),
@ -488,19 +490,19 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(cgsnapshots.c.deleted.type, self.assertIsInstance(cgsnapshots.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(cgsnapshots.c.id.type, self.assertIsInstance(cgsnapshots.c.id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.user_id.type, self.assertIsInstance(cgsnapshots.c.user_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.project_id.type, self.assertIsInstance(cgsnapshots.c.project_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type, self.assertIsInstance(cgsnapshots.c.consistencygroup_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.name.type, self.assertIsInstance(cgsnapshots.c.name.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.description.type, self.assertIsInstance(cgsnapshots.c.description.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(cgsnapshots.c.status.type, self.assertIsInstance(cgsnapshots.c.status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
# Verify foreign keys are created # Verify foreign keys are created
fkey, = volumes.c.consistencygroup_id.foreign_keys fkey, = volumes.c.consistencygroup_id.foreign_keys
@ -545,11 +547,11 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(volume_type_projects.c.deleted.type, self.assertIsInstance(volume_type_projects.c.deleted.type,
self.BOOL_TYPE) self.BOOL_TYPE)
self.assertIsInstance(volume_type_projects.c.id.type, self.assertIsInstance(volume_type_projects.c.id.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(volume_type_projects.c.volume_type_id.type, self.assertIsInstance(volume_type_projects.c.volume_type_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(volume_type_projects.c.project_id.type, self.assertIsInstance(volume_type_projects.c.project_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
volume_types = db_utils.get_table(engine, 'volume_types') volume_types = db_utils.get_table(engine, 'volume_types')
self.assertIsInstance(volume_types.c.is_public.type, self.assertIsInstance(volume_types.c.is_public.type,
@ -559,28 +561,28 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
"""Test adding encryption_id column to encryption table.""" """Test adding encryption_id column to encryption table."""
encryptions = db_utils.get_table(engine, 'encryption') encryptions = db_utils.get_table(engine, 'encryption')
self.assertIsInstance(encryptions.c.encryption_id.type, self.assertIsInstance(encryptions.c.encryption_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_034(self, engine, data): def _check_034(self, engine, data):
"""Test adding description columns to volume_types table.""" """Test adding description columns to volume_types table."""
volume_types = db_utils.get_table(engine, 'volume_types') volume_types = db_utils.get_table(engine, 'volume_types')
self.assertIsInstance(volume_types.c.description.type, self.assertIsInstance(volume_types.c.description.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_035(self, engine, data): def _check_035(self, engine, data):
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.provider_id.type, self.assertIsInstance(volumes.c.provider_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_036(self, engine, data): def _check_036(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots') snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(snapshots.c.provider_id.type, self.assertIsInstance(snapshots.c.provider_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_037(self, engine, data): def _check_037(self, engine, data):
consistencygroups = db_utils.get_table(engine, 'consistencygroups') consistencygroups = db_utils.get_table(engine, 'consistencygroups')
self.assertIsInstance(consistencygroups.c.cgsnapshot_id.type, self.assertIsInstance(consistencygroups.c.cgsnapshot_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_038(self, engine, data): def _check_038(self, engine, data):
"""Test adding and removing driver_initiator_data table.""" """Test adding and removing driver_initiator_data table."""
@ -599,20 +601,20 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
self.assertIsInstance(private_data.c.updated_at.type, self.assertIsInstance(private_data.c.updated_at.type,
self.TIME_TYPE) self.TIME_TYPE)
self.assertIsInstance(private_data.c.id.type, self.assertIsInstance(private_data.c.id.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(private_data.c.initiator.type, self.assertIsInstance(private_data.c.initiator.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.namespace.type, self.assertIsInstance(private_data.c.namespace.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.key.type, self.assertIsInstance(private_data.c.key.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.value.type, self.assertIsInstance(private_data.c.value.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_039(self, engine, data): def _check_039(self, engine, data):
backups = db_utils.get_table(engine, 'backups') backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.parent_id.type, self.assertIsInstance(backups.c.parent_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_40(self, engine, data): def _check_40(self, engine, data):
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
@ -625,15 +627,15 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
attachments = db_utils.get_table(engine, 'volume_attachment') attachments = db_utils.get_table(engine, 'volume_attachment')
self.assertIsInstance(attachments.c.attach_mode.type, self.assertIsInstance(attachments.c.attach_mode.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.instance_uuid.type, self.assertIsInstance(attachments.c.instance_uuid.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.attached_host.type, self.assertIsInstance(attachments.c.attached_host.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.mountpoint.type, self.assertIsInstance(attachments.c.mountpoint.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(attachments.c.attach_status.type, self.assertIsInstance(attachments.c.attach_status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_041(self, engine, data): def _check_041(self, engine, data):
"""Test that adding modified_at column works correctly.""" """Test that adding modified_at column works correctly."""
@ -644,45 +646,45 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
def _check_048(self, engine, data): def _check_048(self, engine, data):
quotas = db_utils.get_table(engine, 'quotas') quotas = db_utils.get_table(engine, 'quotas')
self.assertIsInstance(quotas.c.allocated.type, self.assertIsInstance(quotas.c.allocated.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
def _check_049(self, engine, data): def _check_049(self, engine, data):
backups = db_utils.get_table(engine, 'backups') backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.temp_volume_id.type, self.assertIsInstance(backups.c.temp_volume_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.temp_snapshot_id.type, self.assertIsInstance(backups.c.temp_snapshot_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_050(self, engine, data): def _check_050(self, engine, data):
volumes = db_utils.get_table(engine, 'volumes') volumes = db_utils.get_table(engine, 'volumes')
self.assertIsInstance(volumes.c.previous_status.type, self.assertIsInstance(volumes.c.previous_status.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_051(self, engine, data): def _check_051(self, engine, data):
consistencygroups = db_utils.get_table(engine, 'consistencygroups') consistencygroups = db_utils.get_table(engine, 'consistencygroups')
self.assertIsInstance(consistencygroups.c.source_cgid.type, self.assertIsInstance(consistencygroups.c.source_cgid.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_052(self, engine, data): def _check_052(self, engine, data):
snapshots = db_utils.get_table(engine, 'snapshots') snapshots = db_utils.get_table(engine, 'snapshots')
self.assertIsInstance(snapshots.c.provider_auth.type, self.assertIsInstance(snapshots.c.provider_auth.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_053(self, engine, data): def _check_053(self, engine, data):
services = db_utils.get_table(engine, 'services') services = db_utils.get_table(engine, 'services')
self.assertIsInstance(services.c.rpc_current_version.type, self.assertIsInstance(services.c.rpc_current_version.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(services.c.rpc_available_version.type, self.assertIsInstance(services.c.rpc_available_version.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(services.c.object_current_version.type, self.assertIsInstance(services.c.object_current_version.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(services.c.object_available_version.type, self.assertIsInstance(services.c.object_available_version.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
def _check_054(self, engine, data): def _check_054(self, engine, data):
backups = db_utils.get_table(engine, 'backups') backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.num_dependent_backups.type, self.assertIsInstance(backups.c.num_dependent_backups.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
def _check_055(self, engine, data): def _check_055(self, engine, data):
"""Test adding image_volume_cache_entries table.""" """Test adding image_volume_cache_entries table."""
@ -696,24 +698,24 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
) )
self.assertIsInstance(private_data.c.id.type, self.assertIsInstance(private_data.c.id.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(private_data.c.host.type, self.assertIsInstance(private_data.c.host.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.image_id.type, self.assertIsInstance(private_data.c.image_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.image_updated_at.type, self.assertIsInstance(private_data.c.image_updated_at.type,
self.TIME_TYPE) self.TIME_TYPE)
self.assertIsInstance(private_data.c.volume_id.type, self.assertIsInstance(private_data.c.volume_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(private_data.c.size.type, self.assertIsInstance(private_data.c.size.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
self.assertIsInstance(private_data.c.last_used.type, self.assertIsInstance(private_data.c.last_used.type,
self.TIME_TYPE) self.TIME_TYPE)
def _check_061(self, engine, data): def _check_061(self, engine, data):
backups = db_utils.get_table(engine, 'backups') backups = db_utils.get_table(engine, 'backups')
self.assertIsInstance(backups.c.snapshot_id.type, self.assertIsInstance(backups.c.snapshot_id.type,
sqlalchemy.types.VARCHAR) self.VARCHAR_TYPE)
self.assertIsInstance(backups.c.data_timestamp.type, self.assertIsInstance(backups.c.data_timestamp.type,
self.TIME_TYPE) self.TIME_TYPE)
@ -721,7 +723,7 @@ class MigrationsMixin(test_migrations.WalkVersionsMixin):
volume_type_projects = db_utils.get_table(engine, volume_type_projects = db_utils.get_table(engine,
'volume_type_projects') 'volume_type_projects')
self.assertIsInstance(volume_type_projects.c.id.type, self.assertIsInstance(volume_type_projects.c.id.type,
sqlalchemy.types.INTEGER) self.INTEGER_TYPE)
def test_walk_versions(self): def test_walk_versions(self):
self.walk_versions(False, False) self.walk_versions(False, False)