Make migration tests postgres & mysql friendly.

Add some session commits to prevent database deadlocks.
Force close all open sessions before performing tear down.
Use escaped parameter handling for raw sql statements.

Change-Id: I0ef670ddc416a02e78570ab6ebed2b4bf2a8635b
This commit is contained in:
Jamie Lennox 2013-04-17 10:26:22 +10:00
parent 2eab5fd2b7
commit c9174850cc
2 changed files with 29 additions and 27 deletions

View File

@ -42,6 +42,7 @@ def downgrade_user_table_with_copy(meta, migrate_engine):
'name': user.name,
'extra': user.extra})
session.execute("drop table orig_user;")
session.close()
def downgrade_tenant_table_with_copy(meta, migrate_engine):
@ -65,6 +66,7 @@ def downgrade_tenant_table_with_copy(meta, migrate_engine):
'name': tenant.name,
'extra': tenant.extra})
session.execute("drop table orig_tenant;")
session.close()
def downgrade_user_table_with_column_drop(meta, migrate_engine):

View File

@ -72,6 +72,7 @@ class SqlUpgradeTests(test.TestCase):
self.max_version = self.schema.repository.version().version
def tearDown(self):
sqlalchemy.orm.session.Session.close_all()
table = sqlalchemy.Table("migrate_version", self.metadata,
autoload=True)
self.downgrade(0)
@ -204,12 +205,11 @@ class SqlUpgradeTests(test.TestCase):
for user in users.values():
self.insert_dict(session, 'user', user)
session.commit()
session.close()
self.upgrade(10)
user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
q = self.Session().query(user_table, 'enabled')
q = session.query(user_table, 'enabled')
user = q.filter_by(id=users['bool_enabled_user']['id']).one()
self.assertTrue(user.enabled)
@ -536,21 +536,21 @@ class SqlUpgradeTests(test.TestCase):
domain = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'enabled': True}
self.engine.execute(domain_table.insert().values(domain))
session.execute(domain_table.insert().values(domain))
# Create a Project
project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'extra': "{}"}
self.engine.execute(project_table.insert().values(project))
session.execute(project_table.insert().values(project))
# Create another Project
project2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'extra': "{}"}
self.engine.execute(project_table.insert().values(project2))
session.execute(project_table.insert().values(project2))
# Create a User
user = {'id': uuid.uuid4().hex,
@ -559,28 +559,28 @@ class SqlUpgradeTests(test.TestCase):
'password': uuid.uuid4().hex,
'enabled': True,
'extra': json.dumps({})}
self.engine.execute(user_table.insert().values(user))
session.execute(user_table.insert().values(user))
# Create a Role
role = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.engine.execute(role_table.insert().values(role))
session.execute(role_table.insert().values(role))
# And another role
role2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.engine.execute(role_table.insert().values(role2))
session.execute(role_table.insert().values(role2))
# Grant Role to User
role_grant = {'user_id': user['id'],
'tenant_id': project['id'],
'data': json.dumps({"roles": [role['id']]})}
self.engine.execute(metadata_table.insert().values(role_grant))
session.execute(metadata_table.insert().values(role_grant))
role_grant = {'user_id': user['id'],
'tenant_id': project2['id'],
'data': json.dumps({"roles": [role2['id']]})}
self.engine.execute(metadata_table.insert().values(role_grant))
session.execute(metadata_table.insert().values(role_grant))
session.commit()
@ -589,18 +589,17 @@ class SqlUpgradeTests(test.TestCase):
user_project_metadata_table = sqlalchemy.Table(
'user_project_metadata', self.metadata, autoload=True)
# Test user in project has role
r = session.execute('select data from metadata where user_id="%s"'
'and tenant_id="%s"' %
(user['id'], project['id']))
r = session.execute('select data from metadata where '
'user_id=:user and tenant_id=:tenant',
{'user': user['id'], 'tenant': project['id']})
test_project1 = json.loads(r.fetchone()['data'])
self.assertEqual(len(test_project1['roles']), 1)
self.assertIn(role['id'], test_project1['roles'])
# Test user in project2 has role2
r = session.execute('select data from metadata where user_id="%s"'
' and tenant_id="%s"' %
(user['id'], project2['id']))
r = session.execute('select data from metadata where '
'user_id=:user and tenant_id=:tenant',
{'user': user['id'], 'tenant': project2['id']})
test_project2 = json.loads(r.fetchone()['data'])
self.assertEqual(len(test_project2['roles']), 1)
self.assertIn(role2['id'], test_project2['roles'])
@ -609,8 +608,8 @@ class SqlUpgradeTests(test.TestCase):
# Migration 17 does not properly migrate this data, so this should
# be None.
r = session.execute('select data from user_project_metadata where '
'user_id="%s" and project_id="%s"' %
(user['id'], project['id']))
'user_id=:user and project_id=:project',
{'user': user['id'], 'project': project['id']})
self.assertIsNone(r.fetchone())
# Create a conflicting user-project in user_project_metadata with
@ -623,15 +622,17 @@ class SqlUpgradeTests(test.TestCase):
self.engine.execute(cmd)
# End Scaffolding
session.commit()
# Migrate to 20
self.upgrade(20)
# The user-project pairs should have all roles from the previous
# metadata table in addition to any roles currently in
# user_project_metadata
r = session.execute('select data from user_project_metadata '
'where user_id="%s" and project_id="%s"' %
(user['id'], project['id']))
r = session.execute('select data from user_project_metadata where '
'user_id=:user and project_id=:project',
{'user': user['id'], 'project': project['id']})
role_ids = json.loads(r.fetchone()['data'])['roles']
self.assertEqual(len(role_ids), 3)
self.assertIn(CONF.member_role_id, role_ids)
@ -641,8 +642,8 @@ class SqlUpgradeTests(test.TestCase):
# pairs that only existed in old metadata table should be in
# user_project_metadata
r = session.execute('select data from user_project_metadata where '
'user_id="%s" and project_id="%s"' %
(user['id'], project2['id']))
'user_id=:user and project_id=:project',
{'user': user['id'], 'project': project2['id']})
role_ids = json.loads(r.fetchone()['data'])['roles']
self.assertEqual(len(role_ids), 2)
self.assertIn(CONF.member_role_id, role_ids)
@ -766,6 +767,7 @@ class SqlUpgradeTests(test.TestCase):
r = session.execute('select count(*) as c from role '
'where extra is null')
self.assertEqual(r.fetchone()['c'], 2)
session.commit()
self.upgrade(19)
r = session.execute('select count(*) as c from role '
'where extra is null')
@ -791,11 +793,9 @@ class SqlUpgradeTests(test.TestCase):
'legacy_endpoint_id': legacy_endpoint_id})}
self.insert_dict(session, 'endpoint', endpoint)
self.upgrade(22)
session.commit()
session.close()
self.upgrade(22)
session = self.Session()
endpoint_table = sqlalchemy.Table(
'endpoint', self.metadata, autoload=True)