Ensure identity sql driver supports domain-specific configuration.
For an identity driver to be used with a domain-specific config file, it must accept a ConfigOpts as an optional parameter. The ldap driver already supported this - so this patch adds this to the sql driver. The above change is trivial, but the rest of the patch is enhancing our testing to validate this operation, as well as enforcing the limitation listed below. Limitation: Currently Keystone only supports one SQL driver - this patch does not change that. What it does enable is for that one SQL driver to be assigned to a specific domain, as opposed to being just a catch-all for domains without a specific configuration file. This at least enables the (often requested) scenario of service users to be stored in SQL in a predominantly LDAP installation. Closes-Bug: 1217017 Change-Id: Ic531ebae28680fc518d8e036db516f5982241c40
This commit is contained in:
parent
a56d36311c
commit
8e6e6b392c
|
@ -147,6 +147,13 @@ configuration file.
|
|||
users and groups) from one backend to another, nor group membership across
|
||||
backend boundaries.
|
||||
|
||||
.. NOTE::
|
||||
|
||||
Although Keystone supports multiple LDAP backends via domain specific
|
||||
configuration files, it currently only supports one SQL backend. This
|
||||
could be either the default driver or a single domain-specific backend,
|
||||
perhaps for storing service users in a predominantly LDAP installation.
|
||||
|
||||
Due to the need for user and group IDs to be unique across an OpenStack
|
||||
installation and for Keystone to be able to deduce which domain and backend to
|
||||
use from just a user or group ID, it dynamically builds a persistent identity
|
||||
|
|
|
@ -398,6 +398,12 @@ class ConfigFileNotFound(UnexpectedError):
|
|||
"could not be found.")
|
||||
|
||||
|
||||
class MultipleSQLDriversInConfig(UnexpectedError):
|
||||
message_format = _('The Keystone domain configuration file '
|
||||
'%(config_file)s defines an additional SQL driver - '
|
||||
'only one is permitted.')
|
||||
|
||||
|
||||
class MigrationNotProvided(Exception):
|
||||
def __init__(self, mod_name, path):
|
||||
super(MigrationNotProvided, self).__init__(_(
|
||||
|
|
|
@ -14,6 +14,7 @@
|
|||
|
||||
from keystone.common import sql
|
||||
from keystone.common import utils
|
||||
from keystone import config
|
||||
from keystone import exception
|
||||
from keystone.i18n import _
|
||||
from keystone import identity
|
||||
|
@ -23,6 +24,9 @@ from keystone import identity
|
|||
from keystone.assignment.backends import sql as assignment_sql # noqa
|
||||
|
||||
|
||||
CONF = config.CONF
|
||||
|
||||
|
||||
class User(sql.ModelBase, sql.DictBase):
|
||||
__tablename__ = 'user'
|
||||
attributes = ['id', 'name', 'domain_id', 'password', 'enabled',
|
||||
|
@ -72,9 +76,18 @@ class UserGroupMembership(sql.ModelBase, sql.DictBase):
|
|||
|
||||
|
||||
class Identity(identity.Driver):
|
||||
# NOTE(henry-nash): Override the __init__() method so as to take a
|
||||
# config parameter to enable sql to be used as a domain-specific driver.
|
||||
def __init__(self, conf=None):
|
||||
super(Identity, self).__init__()
|
||||
|
||||
def default_assignment_driver(self):
|
||||
return "keystone.assignment.backends.sql.Assignment"
|
||||
|
||||
@property
|
||||
def is_sql(self):
|
||||
return True
|
||||
|
||||
def _check_password(self, password, user_ref):
|
||||
"""Check the specified password against the data store.
|
||||
|
||||
|
|
|
@ -85,15 +85,32 @@ class DomainConfigs(dict):
|
|||
"""
|
||||
configured = False
|
||||
driver = None
|
||||
_any_sql = False
|
||||
|
||||
def _load_driver(self, assignment_api, domain_id):
|
||||
domain_config = self[domain_id]
|
||||
domain_config['driver'] = (
|
||||
def _load_driver(self, domain_config, assignment_api):
|
||||
domain_config_driver = (
|
||||
importutils.import_object(
|
||||
domain_config['cfg'].identity.driver, domain_config['cfg']))
|
||||
domain_config['driver'].assignment_api = assignment_api
|
||||
domain_config_driver.assignment_api = assignment_api
|
||||
return domain_config_driver
|
||||
|
||||
def _load_config(self, assignment_api, file_list, domain_name):
|
||||
|
||||
def assert_no_more_than_one_sql_driver(new_config, config_file):
|
||||
"""Ensure there is more than one sql driver.
|
||||
|
||||
Check to see if the addition of the driver in this new config
|
||||
would cause there to now be more than one sql driver.
|
||||
|
||||
"""
|
||||
if (new_config['driver'].is_sql and
|
||||
(self.driver.is_sql or self._any_sql)):
|
||||
# The addition of this driver would cause us to have more than
|
||||
# one sql driver, so raise an exception.
|
||||
raise exception.MultipleSQLDriversInConfig(
|
||||
config_file=config_file)
|
||||
self._any_sql = new_config['driver'].is_sql
|
||||
|
||||
try:
|
||||
domain_ref = assignment_api.get_domain_by_name(domain_name)
|
||||
except exception.DomainNotFound:
|
||||
|
@ -107,13 +124,15 @@ class DomainConfigs(dict):
|
|||
# options defined in this set of config files. Later, when we
|
||||
# service calls via this Manager, we'll index via this domain
|
||||
# config dict to make sure we call the right driver
|
||||
domain = domain_ref['id']
|
||||
self[domain] = {}
|
||||
self[domain]['cfg'] = cfg.ConfigOpts()
|
||||
config.configure(conf=self[domain]['cfg'])
|
||||
self[domain]['cfg'](args=[], project='keystone',
|
||||
default_config_files=file_list)
|
||||
self._load_driver(assignment_api, domain)
|
||||
domain_config = {}
|
||||
domain_config['cfg'] = cfg.ConfigOpts()
|
||||
config.configure(conf=domain_config['cfg'])
|
||||
domain_config['cfg'](args=[], project='keystone',
|
||||
default_config_files=file_list)
|
||||
domain_config['driver'] = self._load_driver(
|
||||
domain_config, assignment_api)
|
||||
assert_no_more_than_one_sql_driver(domain_config, file_list)
|
||||
self[domain_ref['id']] = domain_config
|
||||
|
||||
def setup_domain_drivers(self, standard_driver, assignment_api):
|
||||
# This is called by the api call wrapper
|
||||
|
@ -156,7 +175,8 @@ class DomainConfigs(dict):
|
|||
# read.
|
||||
if self.configured:
|
||||
if domain_id in self:
|
||||
self._load_driver(assignment_api, domain_id)
|
||||
self[domain_id]['driver'] = (
|
||||
self._load_driver(self[domain_id], assignment_api))
|
||||
else:
|
||||
# The standard driver
|
||||
self.driver = self.driver()
|
||||
|
@ -840,6 +860,11 @@ class Driver(object):
|
|||
"""Indicates if Driver supports domains."""
|
||||
return True
|
||||
|
||||
@property
|
||||
def is_sql(self):
|
||||
"""Indicates if this Driver uses SQL."""
|
||||
return False
|
||||
|
||||
@property
|
||||
def multiple_domains_supported(self):
|
||||
return (self.is_domain_aware() or
|
||||
|
|
|
@ -0,0 +1,5 @@
|
|||
# The domain-specific configuration file for the test domain
|
||||
# 'domain1' for use with unit tests.
|
||||
|
||||
[identity]
|
||||
driver = keystone.identity.backends.sql.Identity
|
|
@ -0,0 +1,5 @@
|
|||
# The domain-specific configuration file for the test domain
|
||||
# 'domain2' for use with unit tests.
|
||||
|
||||
[identity]
|
||||
driver = keystone.identity.backends.sql.Identity
|
|
@ -0,0 +1,14 @@
|
|||
# The domain-specific configuration file for the default domain for
|
||||
# use with unit tests.
|
||||
#
|
||||
# The domain_name of the default domain is 'Default', hence the
|
||||
# strange mix of upper/lower case in the file name.
|
||||
|
||||
[ldap]
|
||||
url = fake://memory
|
||||
user = cn=Admin
|
||||
password = password
|
||||
suffix = cn=example,cn=com
|
||||
|
||||
[identity]
|
||||
driver = keystone.identity.backends.ldap.Identity
|
|
@ -0,0 +1,5 @@
|
|||
# The domain-specific configuration file for the test domain
|
||||
# 'domain1' for use with unit tests.
|
||||
|
||||
[identity]
|
||||
driver = keystone.identity.backends.sql.Identity
|
|
@ -2021,8 +2021,105 @@ class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
|
|||
'group in our mapping table')
|
||||
|
||||
|
||||
class BaseMultiLDAPandSQLIdentity(object):
|
||||
"""Mixin class with support methods for domain-specific config testing."""
|
||||
|
||||
def create_user(self, domain_id):
|
||||
user = {'name': uuid.uuid4().hex,
|
||||
'domain_id': domain_id,
|
||||
'password': uuid.uuid4().hex,
|
||||
'enabled': True}
|
||||
user_ref = self.identity_api.create_user(user)
|
||||
# Put the password back in, since this is used later by tests to
|
||||
# authenticate.
|
||||
user_ref['password'] = user['password']
|
||||
return user_ref
|
||||
|
||||
def create_users_across_domains(self):
|
||||
"""Create a set of users, each with a role on their own domain."""
|
||||
|
||||
# We also will check that the right number of id mappings get created
|
||||
initial_mappings = len(mapping_sql.list_id_mappings())
|
||||
|
||||
self.users['user0'] = self.create_user(
|
||||
self.domains['domain_default']['id'])
|
||||
self.assignment_api.create_grant(
|
||||
user_id=self.users['user0']['id'],
|
||||
domain_id=self.domains['domain_default']['id'],
|
||||
role_id=self.role_member['id'])
|
||||
for x in range(1, self.domain_count):
|
||||
self.users['user%s' % x] = self.create_user(
|
||||
self.domains['domain%s' % x]['id'])
|
||||
self.assignment_api.create_grant(
|
||||
user_id=self.users['user%s' % x]['id'],
|
||||
domain_id=self.domains['domain%s' % x]['id'],
|
||||
role_id=self.role_member['id'])
|
||||
|
||||
# So how many new id mappings should have been created? One for each
|
||||
# user created in a domain that is using the non default driver..
|
||||
self.assertEqual(initial_mappings + self.domain_specific_count,
|
||||
len(mapping_sql.list_id_mappings()))
|
||||
|
||||
def check_user(self, user, domain_id, expected_status):
|
||||
"""Check user is in correct backend.
|
||||
|
||||
As part of the tests, we want to force ourselves to manually
|
||||
select the driver for a given domain, to make sure the entity
|
||||
ended up in the correct backend.
|
||||
|
||||
"""
|
||||
driver = self.identity_api._select_identity_driver(domain_id)
|
||||
unused, unused, entity_id = (
|
||||
self.identity_api._get_domain_driver_and_entity_id(
|
||||
user['id']))
|
||||
|
||||
if expected_status == 200:
|
||||
ref = driver.get_user(entity_id)
|
||||
ref = self.identity_api._set_domain_id_and_mapping(
|
||||
ref, domain_id, driver, map.EntityType.USER)
|
||||
user = user.copy()
|
||||
del user['password']
|
||||
self.assertDictEqual(ref, user)
|
||||
else:
|
||||
# TODO(henry-nash): Use AssertRaises here, although
|
||||
# there appears to be an issue with using driver.get_user
|
||||
# inside that construct
|
||||
try:
|
||||
driver.get_user(entity_id)
|
||||
except expected_status:
|
||||
pass
|
||||
|
||||
def setup_initial_domains(self):
|
||||
|
||||
def create_domain(domain):
|
||||
try:
|
||||
ref = self.assignment_api.create_domain(
|
||||
domain['id'], domain)
|
||||
except exception.Conflict:
|
||||
ref = (
|
||||
self.assignment_api.get_domain_by_name(domain['name']))
|
||||
return ref
|
||||
|
||||
self.domains = {}
|
||||
for x in range(1, self.domain_count):
|
||||
domain = 'domain%s' % x
|
||||
self.domains[domain] = create_domain(
|
||||
{'id': uuid.uuid4().hex, 'name': domain})
|
||||
self.domains['domain_default'] = create_domain(
|
||||
assignment.calc_default_domain())
|
||||
|
||||
def test_authenticate_to_each_domain(self):
|
||||
"""Test that a user in each domain can authenticate."""
|
||||
for user_num in range(self.domain_count):
|
||||
user = 'user%s' % user_num
|
||||
self.identity_api.authenticate(
|
||||
context={},
|
||||
user_id=self.users[user]['id'],
|
||||
password=self.users[user]['password'])
|
||||
|
||||
|
||||
class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
|
||||
tests.TestCase):
|
||||
tests.TestCase, BaseMultiLDAPandSQLIdentity):
|
||||
"""Class to test common SQL plus individual LDAP backends.
|
||||
|
||||
We define a set of domains and domain-specific backends:
|
||||
|
@ -2052,21 +2149,23 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
|
|||
sql.ModelBase.metadata.create_all(bind=self.engine)
|
||||
self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
|
||||
|
||||
self._setup_initial_test_data()
|
||||
self.domain_count = 5
|
||||
self.domain_specific_count = 3
|
||||
self.setup_initial_domains()
|
||||
self._setup_initial_users()
|
||||
|
||||
# All initial test data setup complete, time to switch on support
|
||||
# for separate backends per domain.
|
||||
|
||||
self.config_fixture.config(group='identity',
|
||||
domain_specific_drivers_enabled=True,
|
||||
domain_config_dir=tests.TESTSDIR)
|
||||
self.config_fixture.config(
|
||||
group='identity', domain_specific_drivers_enabled=True,
|
||||
domain_config_dir=tests.TESTCONF + '/domain_configs_multi_ldap')
|
||||
self.config_fixture.config(group='identity_mapping',
|
||||
backward_compatible_ids=False)
|
||||
|
||||
self._set_domain_configs()
|
||||
self.clear_database()
|
||||
self.load_fixtures(default_fixtures)
|
||||
self._create_users_across_domains()
|
||||
self.create_users_across_domains()
|
||||
|
||||
def config_overrides(self):
|
||||
super(MultiLDAPandSQLIdentity, self).config_overrides()
|
||||
|
@ -2079,89 +2178,17 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
|
|||
group='assignment',
|
||||
driver='keystone.assignment.backends.sql.Assignment')
|
||||
|
||||
def _create_user(self, domain_id):
|
||||
user = {'name': uuid.uuid4().hex,
|
||||
'domain_id': domain_id,
|
||||
'password': uuid.uuid4().hex,
|
||||
'enabled': True}
|
||||
user_ref = self.identity_api.create_user(user)
|
||||
# Put the password back in, since this is used later by tests to
|
||||
# authenticate.
|
||||
user_ref['password'] = user['password']
|
||||
return user_ref
|
||||
|
||||
def _setup_initial_test_data(self):
|
||||
|
||||
def create_domain(domain):
|
||||
try:
|
||||
ref = self.assignment_api.create_domain(
|
||||
domain['id'], domain)
|
||||
except exception.Conflict:
|
||||
ref = (
|
||||
self.assignment_api.get_domain_by_name(domain['name']))
|
||||
return ref
|
||||
|
||||
self.domains = {}
|
||||
self.domain_count = 5
|
||||
for x in range(1, self.domain_count):
|
||||
domain = 'domain%s' % x
|
||||
self.domains[domain] = create_domain(
|
||||
{'id': uuid.uuid4().hex, 'name': domain})
|
||||
self.domains['domain_default'] = create_domain(
|
||||
assignment.calc_default_domain())
|
||||
|
||||
def _setup_initial_users(self):
|
||||
# Create some identity entities BEFORE we switch to multi-backend, so
|
||||
# we can test that these are still accessible
|
||||
self.users = {}
|
||||
self.users['userA'] = self._create_user(
|
||||
self.users['userA'] = self.create_user(
|
||||
self.domains['domain_default']['id'])
|
||||
self.users['userB'] = self._create_user(
|
||||
self.users['userB'] = self.create_user(
|
||||
self.domains['domain1']['id'])
|
||||
self.users['userC'] = self._create_user(
|
||||
self.users['userC'] = self.create_user(
|
||||
self.domains['domain3']['id'])
|
||||
|
||||
def _create_users_across_domains(self):
|
||||
"""Create a set of users, each with a role on their own domain."""
|
||||
|
||||
# We also will check that the right number of id mappings get created
|
||||
initial_mappings = len(mapping_sql.list_id_mappings())
|
||||
|
||||
self.users['user0'] = self._create_user(
|
||||
self.domains['domain_default']['id'])
|
||||
self.assignment_api.create_grant(
|
||||
user_id=self.users['user0']['id'],
|
||||
domain_id=self.domains['domain_default']['id'],
|
||||
role_id=self.role_member['id'])
|
||||
for x in range(1, self.domain_count):
|
||||
self.users['user%s' % x] = self._create_user(
|
||||
self.domains['domain%s' % x]['id'])
|
||||
self.assignment_api.create_grant(
|
||||
user_id=self.users['user%s' % x]['id'],
|
||||
domain_id=self.domains['domain%s' % x]['id'],
|
||||
role_id=self.role_member['id'])
|
||||
|
||||
# So how many new id mappings should have been created? One for each
|
||||
# user created in a domain that is using the non default driver - i.e.
|
||||
# the default domain and domains 1 and 2 - so 3 new mappings.
|
||||
self.assertEqual(initial_mappings + 3,
|
||||
len(mapping_sql.list_id_mappings()))
|
||||
|
||||
def _set_domain_configs(self):
|
||||
# We need to load the domain configs explicitly to ensure the
|
||||
# test overrides are included.
|
||||
self.identity_api.domain_configs._load_config(
|
||||
self.identity_api.assignment_api,
|
||||
[tests.dirs.tests_conf('keystone.Default.conf')],
|
||||
'Default')
|
||||
self.identity_api.domain_configs._load_config(
|
||||
self.identity_api.assignment_api,
|
||||
[tests.dirs.tests_conf('keystone.domain1.conf')],
|
||||
'domain1')
|
||||
self.identity_api.domain_configs._load_config(
|
||||
self.identity_api.assignment_api,
|
||||
[tests.dirs.tests_conf('keystone.domain2.conf')],
|
||||
'domain2')
|
||||
|
||||
def reload_backends(self, domain_id):
|
||||
# Just reload the driver for this domain - which will pickup
|
||||
# any updated cfg
|
||||
|
@ -2208,34 +2235,10 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
|
|||
you can get the users via any of its domains
|
||||
|
||||
"""
|
||||
def check_user(user, domain_id, expected_status):
|
||||
# As part of the test, we want to force ourselves to manually
|
||||
# select the driver for this domain, to make sure the entity
|
||||
# ended up in the correct backend
|
||||
driver = self.identity_api._select_identity_driver(domain_id)
|
||||
unused, unused, entity_id = (
|
||||
self.identity_api._get_domain_driver_and_entity_id(
|
||||
user['id']))
|
||||
|
||||
if expected_status == 200:
|
||||
ref = driver.get_user(entity_id)
|
||||
ref = self.identity_api._set_domain_id_and_mapping(
|
||||
ref, domain_id, driver, map.EntityType.USER)
|
||||
user = user.copy()
|
||||
del user['password']
|
||||
self.assertDictEqual(ref, user)
|
||||
else:
|
||||
# TODO(henry-nash): Use AssertRaises here, although
|
||||
# there appears to be an issue with using driver.get_user
|
||||
# inside that construct
|
||||
try:
|
||||
driver.get_user(entity_id)
|
||||
except expected_status:
|
||||
pass
|
||||
|
||||
# Check that I can read a user with the appropriate domain-selected
|
||||
# driver, but won't find it via any other domain driver
|
||||
|
||||
check_user = self.check_user
|
||||
check_user(self.users['user0'],
|
||||
self.domains['domain_default']['id'], 200)
|
||||
for domain in [self.domains['domain1']['id'],
|
||||
|
@ -2308,15 +2311,6 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
|
|||
self.identity_api.get_user(self.users['userB']['id'])
|
||||
self.identity_api.get_user(self.users['userC']['id'])
|
||||
|
||||
def test_authenticate_to_each_domain(self):
|
||||
"""Test that a user in each domain can authenticate."""
|
||||
for user_num in range(5):
|
||||
user = 'user%s' % user_num
|
||||
self.identity_api.authenticate(
|
||||
context={},
|
||||
user_id=self.users[user]['id'],
|
||||
password=self.users[user]['password'])
|
||||
|
||||
def test_scanning_of_config_dir(self):
|
||||
"""Test the Manager class scans the config directory.
|
||||
|
||||
|
@ -2378,3 +2372,272 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
|
|||
self.assertRaises(exception.DomainNotFound,
|
||||
self.assignment_api.get_domain,
|
||||
domain['id'])
|
||||
|
||||
|
||||
class DomainSpecificLDAPandSQLIdentity(
|
||||
BaseLDAPIdentity, tests.SQLDriverOverrides, tests.TestCase,
|
||||
BaseMultiLDAPandSQLIdentity):
|
||||
"""Class to test when all domains use specific configs, including SQL.
|
||||
|
||||
We define a set of domains and domain-specific backends:
|
||||
|
||||
- A separate LDAP backend for the default domain
|
||||
- A separate SQL backend for domain1
|
||||
|
||||
Although the default driver still exists, we don't use it.
|
||||
|
||||
"""
|
||||
def setUp(self):
|
||||
self.useFixture(database.Database())
|
||||
super(DomainSpecificLDAPandSQLIdentity, self).setUp()
|
||||
self.initial_setup()
|
||||
|
||||
def initial_setup(self):
|
||||
# We aren't setting up any initial data ahead of switching to
|
||||
# domain-specific operation, so make the switch straight away.
|
||||
self.config_fixture.config(
|
||||
group='identity', domain_specific_drivers_enabled=True,
|
||||
domain_config_dir=(
|
||||
tests.TESTCONF + '/domain_configs_one_sql_one_ldap'))
|
||||
self.config_fixture.config(group='identity_mapping',
|
||||
backward_compatible_ids=False)
|
||||
|
||||
self.load_backends()
|
||||
|
||||
self.engine = sql.get_engine()
|
||||
self.addCleanup(sql.cleanup)
|
||||
|
||||
sql.ModelBase.metadata.create_all(bind=self.engine)
|
||||
self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
|
||||
|
||||
self.domain_count = 2
|
||||
self.domain_specific_count = 2
|
||||
self.setup_initial_domains()
|
||||
self.users = {}
|
||||
|
||||
self.clear_database()
|
||||
self.load_fixtures(default_fixtures)
|
||||
self.create_users_across_domains()
|
||||
|
||||
def config_overrides(self):
|
||||
super(DomainSpecificLDAPandSQLIdentity, self).config_overrides()
|
||||
# Make sure assignment is actually an SQL driver,
|
||||
# BaseLDAPIdentity causes this option to use LDAP.
|
||||
self.config_fixture.config(
|
||||
group='assignment',
|
||||
driver='keystone.assignment.backends.sql.Assignment')
|
||||
|
||||
def reload_backends(self, domain_id):
|
||||
# Just reload the driver for this domain - which will pickup
|
||||
# any updated cfg
|
||||
self.identity_api.domain_configs.reload_domain_driver(
|
||||
self.identity_api.assignment_api, domain_id)
|
||||
|
||||
def get_config(self, domain_id):
|
||||
# Get the config for this domain, will return CONF
|
||||
# if no specific config defined for this domain
|
||||
return self.identity_api.domain_configs.get_domain_conf(domain_id)
|
||||
|
||||
def test_list_domains(self):
|
||||
self.skipTest(
|
||||
'N/A: Not relevant for multi ldap testing')
|
||||
|
||||
def test_list_domains_non_default_domain_id(self):
|
||||
self.skipTest(
|
||||
'N/A: Not relevant for multi ldap testing')
|
||||
|
||||
def test_domain_crud(self):
|
||||
self.skipTest(
|
||||
'N/A: Not relevant for multi ldap testing')
|
||||
|
||||
def test_list_users(self):
|
||||
# Override the standard list users, since we have added an extra user
|
||||
# to the default domain, so the number of expected users is one more
|
||||
# than in the standard test.
|
||||
users = self.identity_api.list_users(
|
||||
domain_scope=self._set_domain_scope(
|
||||
CONF.identity.default_domain_id))
|
||||
self.assertEqual(len(default_fixtures.USERS) + 1, len(users))
|
||||
user_ids = set(user['id'] for user in users)
|
||||
expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
|
||||
for user in default_fixtures.USERS)
|
||||
expected_user_ids.add(self.users['user0']['id'])
|
||||
for user_ref in users:
|
||||
self.assertNotIn('password', user_ref)
|
||||
self.assertEqual(expected_user_ids, user_ids)
|
||||
|
||||
def test_domain_segregation(self):
|
||||
"""Test that separate configs have segregated the domain.
|
||||
|
||||
Test Plan:
|
||||
|
||||
- Users were created in each domain as part of setup, now make sure
|
||||
you can only find a given user in its relevant domain/backend
|
||||
- Make sure that for a backend that supports multiple domains
|
||||
you can get the users via any of its domains
|
||||
|
||||
"""
|
||||
# Check that I can read a user with the appropriate domain-selected
|
||||
# driver, but won't find it via any other domain driver
|
||||
|
||||
self.check_user(self.users['user0'],
|
||||
self.domains['domain_default']['id'], 200)
|
||||
self.check_user(self.users['user0'],
|
||||
self.domains['domain1']['id'], exception.UserNotFound)
|
||||
|
||||
self.check_user(self.users['user1'],
|
||||
self.domains['domain1']['id'], 200)
|
||||
self.check_user(self.users['user1'],
|
||||
self.domains['domain_default']['id'],
|
||||
exception.UserNotFound)
|
||||
|
||||
# Finally, going through the regular manager layer, make sure we
|
||||
# only see the right number of users in the non-default domain.
|
||||
|
||||
self.assertThat(
|
||||
self.identity_api.list_users(
|
||||
domain_scope=self.domains['domain1']['id']),
|
||||
matchers.HasLength(1))
|
||||
|
||||
def test_add_role_grant_to_user_and_project_404(self):
|
||||
self.skipTest('Blocked by bug 1101287')
|
||||
|
||||
def test_get_role_grants_for_user_and_project_404(self):
|
||||
self.skipTest('Blocked by bug 1101287')
|
||||
|
||||
def test_list_projects_for_user_with_grants(self):
|
||||
self.skipTest('Blocked by bug 1221805')
|
||||
|
||||
def test_get_roles_for_user_and_project_user_group_same_id(self):
|
||||
self.skipTest('N/A: We never generate the same ID for a user and '
|
||||
'group in our mapping table')
|
||||
|
||||
def test_user_id_comma(self):
|
||||
self.skipTest('Only valid if it is guaranteed to be talking to '
|
||||
'the fakeldap backend')
|
||||
|
||||
def test_user_id_comma_grants(self):
|
||||
self.skipTest('Only valid if it is guaranteed to be talking to '
|
||||
'the fakeldap backend')
|
||||
|
||||
|
||||
class DomainSpecificSQLIdentity(DomainSpecificLDAPandSQLIdentity):
|
||||
"""Class to test simplest use of domain-specific SQL driver.
|
||||
|
||||
The simplest use of an SQL domain-specific backend is when it is used to
|
||||
augment the standard case when LDAP is the default driver defined in the
|
||||
main config file. This would allow, for example, service users to be
|
||||
stored in SQL while LDAP handles the rest. Hence we define:
|
||||
|
||||
- The default driver uses the LDAP backend for the default domain
|
||||
- A separate SQL backend for domain1
|
||||
|
||||
"""
|
||||
def initial_setup(self):
|
||||
# We aren't setting up any initial data ahead of switching to
|
||||
# domain-specific operation, so make the switch straight away.
|
||||
self.config_fixture.config(
|
||||
group='identity', domain_specific_drivers_enabled=True,
|
||||
domain_config_dir=(
|
||||
tests.TESTCONF + '/domain_configs_default_ldap_one_sql'))
|
||||
# Part of the testing counts how many new mappings get created as
|
||||
# we create users, so ensure we are NOT using mapping for the default
|
||||
# LDAP domain so this doesn't confuse the calculation.
|
||||
self.config_fixture.config(group='identity_mapping',
|
||||
backward_compatible_ids=True)
|
||||
|
||||
self.load_backends()
|
||||
|
||||
self.engine = sql.get_engine()
|
||||
self.addCleanup(sql.cleanup)
|
||||
|
||||
sql.ModelBase.metadata.create_all(bind=self.engine)
|
||||
self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
|
||||
|
||||
self.domain_count = 2
|
||||
self.domain_specific_count = 1
|
||||
self.setup_initial_domains()
|
||||
self.users = {}
|
||||
|
||||
self.load_fixtures(default_fixtures)
|
||||
self.create_users_across_domains()
|
||||
|
||||
def config_overrides(self):
|
||||
super(DomainSpecificSQLIdentity, self).config_overrides()
|
||||
self.config_fixture.config(
|
||||
group='identity',
|
||||
driver='keystone.identity.backends.ldap.Identity')
|
||||
self.config_fixture.config(
|
||||
group='assignment',
|
||||
driver='keystone.assignment.backends.sql.Assignment')
|
||||
|
||||
def get_config(self, domain_id):
|
||||
if domain_id == CONF.identity.default_domain_id:
|
||||
return CONF
|
||||
else:
|
||||
return self.identity_api.domain_configs.get_domain_conf(domain_id)
|
||||
|
||||
def reload_backends(self, domain_id):
|
||||
if domain_id == CONF.identity.default_domain_id:
|
||||
self.load_backends()
|
||||
else:
|
||||
# Just reload the driver for this domain - which will pickup
|
||||
# any updated cfg
|
||||
self.identity_api.domain_configs.reload_domain_driver(
|
||||
self.identity_api.assignment_api, domain_id)
|
||||
|
||||
def test_default_sql_plus_sql_specific_driver_fails(self):
|
||||
# First confirm that if ldap is default driver, domain1 can be
|
||||
# loaded as sql
|
||||
self.config_fixture.config(
|
||||
group='identity',
|
||||
driver='keystone.identity.backends.ldap.Identity')
|
||||
self.config_fixture.config(
|
||||
group='assignment',
|
||||
driver='keystone.assignment.backends.sql.Assignment')
|
||||
self.load_backends()
|
||||
# Make any identity call to initiate the lazy loading of configs
|
||||
self.identity_api.list_users(
|
||||
domain_scope=CONF.identity.default_domain_id)
|
||||
self.assertIsNotNone(self.get_config(self.domains['domain1']['id']))
|
||||
|
||||
# Now re-initialize, but with sql as the default identity driver
|
||||
self.config_fixture.config(
|
||||
group='identity',
|
||||
driver='keystone.identity.backends.sql.Identity')
|
||||
self.config_fixture.config(
|
||||
group='assignment',
|
||||
driver='keystone.assignment.backends.sql.Assignment')
|
||||
self.load_backends()
|
||||
# Make any identity call to initiate the lazy loading of configs, which
|
||||
# should fail since we would now have two sql drivers.
|
||||
self.assertRaises(exception.MultipleSQLDriversInConfig,
|
||||
self.identity_api.list_users,
|
||||
domain_scope=CONF.identity.default_domain_id)
|
||||
|
||||
def test_multiple_sql_specific_drivers_fails(self):
|
||||
self.config_fixture.config(
|
||||
group='identity',
|
||||
driver='keystone.identity.backends.ldap.Identity')
|
||||
self.config_fixture.config(
|
||||
group='assignment',
|
||||
driver='keystone.assignment.backends.sql.Assignment')
|
||||
self.load_backends()
|
||||
# Ensure default, domain1 and domain2 exist
|
||||
self.domain_count = 3
|
||||
self.setup_initial_domains()
|
||||
# Make any identity call to initiate the lazy loading of configs
|
||||
self.identity_api.list_users(
|
||||
domain_scope=CONF.identity.default_domain_id)
|
||||
# This will only load domain1, since the domain2 config file is
|
||||
# not stored in the same location
|
||||
self.assertIsNotNone(self.get_config(self.domains['domain1']['id']))
|
||||
|
||||
# Now try and manually load a 2nd sql specific driver, for domain2,
|
||||
# which should fail.
|
||||
self.assertRaises(exception.MultipleSQLDriversInConfig,
|
||||
self.identity_api.domain_configs._load_config,
|
||||
self.identity_api.assignment_api,
|
||||
[tests.TESTCONF + '/domain_configs_one_extra_sql/' +
|
||||
'keystone.domain2.conf'],
|
||||
'domain2')
|
||||
|
|
Loading…
Reference in New Issue