Use in-memory SQLite for testing
Use the in-memory sqlite db for testing. Changes have been made to populate data for in-memory databases differently as there are now no longer any migrations being used to create the schema when in-memory SQLite is used (with exception of the explicit migration test cases). The most notible case where the creation of the schema differs from using the migrations, is the creation of the default domain. Typically this has been done by migration 008. The restful test cases now properly populate the default domain explicitly if needed. As part of this patch to ensure extensions received the correct schema additions, the EXTENSION_NAME property is now used in the test case to do migrations when not utilizing in-memory SQLite. The model for ROLE table in assignment did not properly indicate that the name column was "unique" (as would be the case using the migrations). This unique column identifier is needed since the migration no longer occurs (it becomes even more important that the models match). To support proper testing, the revoke tests in test_v3_auth now provide thier own load_sample_data method and re-use the already defined admin role based upon the super() call. Change-Id: I4cd08cdf3fa9c7b3265223d1baf6d3cf0d1c12dd
This commit is contained in:
parent
0a1cb0e202
commit
748f8bf6ca
@ -572,7 +572,7 @@ class Role(sql.ModelBase, sql.DictBase):
|
||||
__tablename__ = 'role'
|
||||
attributes = ['id', 'name']
|
||||
id = sql.Column(sql.String(64), primary_key=True)
|
||||
name = sql.Column(sql.String(255), nullable=False)
|
||||
name = sql.Column(sql.String(255), unique=True, nullable=False)
|
||||
extra = sql.Column(sql.JsonBlob())
|
||||
__table_args__ = (sql.UniqueConstraint('name'), {})
|
||||
|
||||
|
@ -88,6 +88,8 @@ TMPDIR = _calc_tmpdir()
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
IN_MEM_DB_CONN_STRING = 'sqlite://'
|
||||
|
||||
exception._FATAL_EXCEPTION_FORMAT_ERRORS = True
|
||||
os.makedirs(TMPDIR)
|
||||
atexit.register(shutil.rmtree, TMPDIR)
|
||||
@ -125,7 +127,7 @@ def _initialize_sql_session():
|
||||
# test cases.
|
||||
db_file = DEFAULT_TEST_DB_FILE
|
||||
db_options.set_defaults(
|
||||
sql_connection='sqlite:///%s' % db_file,
|
||||
sql_connection=IN_MEM_DB_CONN_STRING,
|
||||
sqlite_db=db_file)
|
||||
|
||||
|
||||
@ -164,19 +166,21 @@ def checkout_vendor(repo, rev):
|
||||
return revdir
|
||||
|
||||
|
||||
def setup_database():
|
||||
db = dirs.tmp('test.db')
|
||||
pristine = dirs.tmp('test.db.pristine')
|
||||
def setup_database(extensions=None):
|
||||
if CONF.database.connection != IN_MEM_DB_CONN_STRING:
|
||||
db = dirs.tmp('test.db')
|
||||
pristine = dirs.tmp('test.db.pristine')
|
||||
|
||||
if os.path.exists(db):
|
||||
os.unlink(db)
|
||||
if not os.path.exists(pristine):
|
||||
migration.db_sync(sql.get_engine(),
|
||||
migration_helpers.find_migrate_repo())
|
||||
migration_helpers.sync_database_to_version(extension='revoke')
|
||||
shutil.copyfile(db, pristine)
|
||||
else:
|
||||
shutil.copyfile(pristine, db)
|
||||
if os.path.exists(db):
|
||||
os.unlink(db)
|
||||
if not os.path.exists(pristine):
|
||||
migration.db_sync(sql.get_engine(),
|
||||
migration_helpers.find_migrate_repo())
|
||||
for extension in (extensions or []):
|
||||
migration_helpers.sync_database_to_version(extension=extension)
|
||||
shutil.copyfile(db, pristine)
|
||||
else:
|
||||
shutil.copyfile(pristine, db)
|
||||
|
||||
|
||||
def teardown_database():
|
||||
|
@ -15,11 +15,8 @@
|
||||
import copy
|
||||
import uuid
|
||||
|
||||
from keystone.common import sql
|
||||
from keystone.common.sql import migration_helpers
|
||||
from keystone import contrib
|
||||
from keystone.openstack.common.db.sqlalchemy import migration
|
||||
from keystone.openstack.common import importutils
|
||||
# NOTE(morganfainberg): import endpoint filter to populate the SQL model
|
||||
from keystone.contrib import endpoint_filter # flake8: noqa
|
||||
from keystone.tests import test_v3
|
||||
|
||||
|
||||
@ -28,14 +25,6 @@ class TestExtensionCase(test_v3.RestfulTestCase):
|
||||
EXTENSION_NAME = 'endpoint_filter'
|
||||
EXTENSION_TO_ADD = 'endpoint_filter_extension'
|
||||
|
||||
def setup_database(self):
|
||||
super(TestExtensionCase, self).setup_database()
|
||||
package_name = '.'.join((contrib.__name__, self.EXTENSION_NAME))
|
||||
package = importutils.import_module(package_name)
|
||||
abs_path = migration_helpers.find_migrate_repo(package)
|
||||
migration.db_version_control(sql.get_engine(), abs_path)
|
||||
migration.db_sync(sql.get_engine(), abs_path)
|
||||
|
||||
def config_overrides(self):
|
||||
super(TestExtensionCase, self).config_overrides()
|
||||
self.config_fixture.config(
|
||||
|
@ -70,9 +70,6 @@ class SqlMigrateBase(tests.SQLDriverOverrides, tests.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(SqlMigrateBase, self).setUp()
|
||||
|
||||
self.config(self.config_files())
|
||||
|
||||
conn_str = CONF.database.connection
|
||||
if (conn_str.startswith('sqlite') and
|
||||
conn_str[10:] == tests.DEFAULT_TEST_DB_FILE):
|
||||
|
@ -24,6 +24,7 @@ from keystone.common import authorization
|
||||
from keystone.common import cache
|
||||
from keystone.common import serializer
|
||||
from keystone import config
|
||||
from keystone import exception
|
||||
from keystone import middleware
|
||||
from keystone.openstack.common import timeutils
|
||||
from keystone.policy.backends import rules
|
||||
@ -43,8 +44,14 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase):
|
||||
config_files.append(tests.dirs.tests_conf('backend_sql.conf'))
|
||||
return config_files
|
||||
|
||||
def get_extensions(self):
|
||||
extensions = set(['revoke'])
|
||||
if hasattr(self, 'EXTENSION_NAME'):
|
||||
extensions.add(self.EXTENSION_NAME)
|
||||
return extensions
|
||||
|
||||
def setup_database(self):
|
||||
tests.setup_database()
|
||||
tests.setup_database(self.get_extensions())
|
||||
|
||||
def teardown_database(self):
|
||||
tests.teardown_database()
|
||||
@ -95,7 +102,25 @@ class RestfulTestCase(tests.SQLDriverOverrides, rest.RestfulTestCase):
|
||||
def load_fixtures(self, fixtures):
|
||||
self.load_sample_data()
|
||||
|
||||
def _populate_default_domain(self):
|
||||
if CONF.database.connection == tests.IN_MEM_DB_CONN_STRING:
|
||||
# NOTE(morganfainberg): If an in-memory db is being used, be sure
|
||||
# to populate the default domain, this is typically done by
|
||||
# a migration, but the in-mem db uses model definitions to create
|
||||
# the schema (no migrations are run).
|
||||
try:
|
||||
self.assignment_api.get_domain(DEFAULT_DOMAIN_ID)
|
||||
except exception.DomainNotFound:
|
||||
domain = {'description': (u'Owns users and tenants (i.e. '
|
||||
u'projects) available on Identity '
|
||||
u'API v2.'),
|
||||
'enabled': True,
|
||||
'id': DEFAULT_DOMAIN_ID,
|
||||
'name': u'Default'}
|
||||
self.assignment_api.create_domain(DEFAULT_DOMAIN_ID, domain)
|
||||
|
||||
def load_sample_data(self):
|
||||
self._populate_default_domain()
|
||||
self.domain_id = uuid.uuid4().hex
|
||||
self.domain = self.new_domain_ref()
|
||||
self.domain['id'] = self.domain_id
|
||||
|
@ -422,14 +422,15 @@ class TestUUIDTokenAPIs(test_v3.RestfulTestCase, TokenAPITests):
|
||||
|
||||
class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
|
||||
"""Test token revoke using v3 Identity API by token owner and admin."""
|
||||
def setUp(self):
|
||||
"""Setup for Test Cases.
|
||||
def load_sample_data(self):
|
||||
"""Load Sample Data for Test Cases.
|
||||
Two domains, domainA and domainB
|
||||
Two users in domainA, userNormalA and userAdminA
|
||||
One user in domainB, userAdminB
|
||||
|
||||
"""
|
||||
super(TestTokenRevokeSelfAndAdmin, self).setUp()
|
||||
super(TestTokenRevokeSelfAndAdmin, self).load_sample_data()
|
||||
self._populate_default_domain()
|
||||
# DomainA setup
|
||||
self.domainA = self.new_domain_ref()
|
||||
self.assignment_api.create_domain(self.domainA['id'], self.domainA)
|
||||
@ -443,11 +444,7 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
|
||||
self.userNormalA['password'] = uuid.uuid4().hex
|
||||
self.identity_api.create_user(self.userNormalA['id'], self.userNormalA)
|
||||
|
||||
self.role1 = self.new_role_ref()
|
||||
self.role1['name'] = 'admin'
|
||||
self.assignment_api.create_role(self.role1['id'], self.role1)
|
||||
|
||||
self.assignment_api.create_grant(self.role1['id'],
|
||||
self.assignment_api.create_grant(self.role['id'],
|
||||
user_id=self.userAdminA['id'],
|
||||
domain_id=self.domainA['id'])
|
||||
|
||||
@ -543,7 +540,7 @@ class TestTokenRevokeSelfAndAdmin(test_v3.RestfulTestCase):
|
||||
self.userAdminB = self.new_user_ref(domain_id=self.domainB['id'])
|
||||
self.userAdminB['password'] = uuid.uuid4().hex
|
||||
self.identity_api.create_user(self.userAdminB['id'], self.userAdminB)
|
||||
self.assignment_api.create_grant(self.role1['id'],
|
||||
self.assignment_api.create_grant(self.role['id'],
|
||||
user_id=self.userAdminB['id'],
|
||||
domain_id=self.domainB['id'])
|
||||
r = self.post(
|
||||
|
@ -16,15 +16,10 @@ import uuid
|
||||
from keystone.auth import controllers as auth_controllers
|
||||
from keystone.common import dependency
|
||||
from keystone.common import serializer
|
||||
from keystone.common import sql
|
||||
from keystone.common.sql import migration_helpers
|
||||
from keystone import config
|
||||
from keystone import contrib
|
||||
from keystone.contrib.federation import controllers as federation_controllers
|
||||
from keystone.contrib.federation import utils as mapping_utils
|
||||
from keystone import exception
|
||||
from keystone.openstack.common.db.sqlalchemy import migration
|
||||
from keystone.openstack.common import importutils
|
||||
from keystone.openstack.common import jsonutils
|
||||
from keystone.openstack.common import log
|
||||
from keystone.tests import mapping_fixtures
|
||||
@ -45,14 +40,6 @@ class FederationTests(test_v3.RestfulTestCase):
|
||||
EXTENSION_NAME = 'federation'
|
||||
EXTENSION_TO_ADD = 'federation_extension'
|
||||
|
||||
def setup_database(self):
|
||||
super(FederationTests, self).setup_database()
|
||||
package_name = '.'.join((contrib.__name__, self.EXTENSION_NAME))
|
||||
package = importutils.import_module(package_name)
|
||||
abs_path = migration_helpers.find_migrate_repo(package)
|
||||
migration.db_version_control(sql.get_engine(), abs_path)
|
||||
migration.db_sync(sql.get_engine(), abs_path)
|
||||
|
||||
|
||||
class FederatedIdentityProviderTests(FederationTests):
|
||||
"""A test class for Identity Providers."""
|
||||
@ -763,9 +750,8 @@ class FederatedTokenTests(FederationTests):
|
||||
|
||||
AUTH_URL = '/auth/tokens'
|
||||
|
||||
def setUp(self):
|
||||
super(FederationTests, self).setUp()
|
||||
self.load_sample_data()
|
||||
def load_fixtures(self, fixtures):
|
||||
super(FederationTests, self).load_fixtures(fixtures)
|
||||
self.load_federation_sample_data()
|
||||
|
||||
def idp_ref(self, id=None):
|
||||
|
@ -62,6 +62,7 @@ class IdentityTestFilteredCase(filtering.FilterTests,
|
||||
|
||||
"""
|
||||
# Start by creating a few domains
|
||||
self._populate_default_domain()
|
||||
self.domainA = self.new_domain_ref()
|
||||
self.assignment_api.create_domain(self.domainA['id'], self.domainA)
|
||||
self.domainB = self.new_domain_ref()
|
||||
|
@ -17,15 +17,10 @@ import uuid
|
||||
|
||||
from six.moves import urllib
|
||||
|
||||
from keystone.common import sql
|
||||
from keystone.common.sql import migration_helpers
|
||||
from keystone import config
|
||||
from keystone import contrib
|
||||
from keystone.contrib import oauth1
|
||||
from keystone.contrib.oauth1 import controllers
|
||||
from keystone import exception
|
||||
from keystone.openstack.common.db.sqlalchemy import migration
|
||||
from keystone.openstack.common import importutils
|
||||
from keystone.tests import test_v3
|
||||
|
||||
|
||||
@ -39,14 +34,6 @@ class OAuth1Tests(test_v3.RestfulTestCase):
|
||||
|
||||
CONSUMER_URL = '/OS-OAUTH1/consumers'
|
||||
|
||||
def setup_database(self):
|
||||
super(OAuth1Tests, self).setup_database()
|
||||
package_name = '.'.join((contrib.__name__, self.EXTENSION_NAME))
|
||||
package = importutils.import_module(package_name)
|
||||
abs_path = migration_helpers.find_migrate_repo(package)
|
||||
migration.db_version_control(sql.get_engine(), abs_path)
|
||||
migration.db_sync(sql.get_engine(), abs_path)
|
||||
|
||||
def setUp(self):
|
||||
super(OAuth1Tests, self).setUp()
|
||||
|
||||
|
@ -65,6 +65,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
||||
password=self.user1['password'])
|
||||
|
||||
def load_sample_data(self):
|
||||
self._populate_default_domain()
|
||||
# Start by creating a couple of domains
|
||||
self.domainA = self.new_domain_ref()
|
||||
self.assignment_api.create_domain(self.domainA['id'], self.domainA)
|
||||
@ -372,6 +373,7 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase):
|
||||
|
||||
def load_sample_data(self):
|
||||
# Start by creating a couple of domains
|
||||
self._populate_default_domain()
|
||||
self.domainA = self.new_domain_ref()
|
||||
self.assignment_api.create_domain(self.domainA['id'], self.domainA)
|
||||
self.domainB = self.new_domain_ref()
|
||||
|
Loading…
Reference in New Issue
Block a user