multi-backend support for identity

The current code has a number of problems and limitations in its
support for having domain-specific backends (e.g. a different LDAP
server per domain).  Not least of the problems is that you cannot
always infer the domain if an API call is just handed a user_id or
group_id. These issues are so severe that this feature is currently
marked as experimental.

This patch fixes these issues by using a mapping layer to store
the domain and local ID for the public facing user and group IDs.
No API changes are required for this new support. An important
consequence of this change is that non-UUID IDs for backends
like LDAP do not escape from keystone.

To ensure backward compatibility with existing single backend
installations, the mapping is not used for the default driver.
An exception to this is that if a cloud provider wants to enable
mapping for the default LDAP driver then they can set a config
option to achieve this.

keystone-manage has been extended to provide options to purge
the mapping table.

Blueprint: multi-backend-uuids

Change-Id: I60f8965bb74b248e6a6c8f141289affa431ee3cf
This commit is contained in:
Henry Nash 2014-06-25 06:26:38 +01:00
parent 1cf858d451
commit 1a50986e7c
20 changed files with 1202 additions and 339 deletions

View File

@ -106,11 +106,9 @@ is not an absolute path, then Keystone looks for it in the same directories as a
Domain-specific Drivers
-----------------------
.. WARNING::
.. NOTE::
This feature is experimental and unsupported in Havana & Icehouse (with
several known issues that will not be fixed). Feedback welcome for
Juno!
This functionality is new in Juno.
Keystone supports the option (disabled by default) to specify identity driver
configurations on a domain by domain basis, allowing, for example, a specific
@ -132,6 +130,80 @@ the primary configuration file for the specified domain only. Domains without a
specific configuration file will continue to use the options from the primary
configuration file.
.. NOTE::
Keystone does not support moving the contents of a domain (i.e. "it's"
users and groups) from one backend to another, nor group membership across
backend boundaries.
Due to the need for user and group IDs to be unique across an OpenStack
installation and for Keystone to be able to deduce which domain and backend to
use from just a user or group ID, it dynamically builds a persistent identity
mapping table from a public ID to the actual domain, local ID (within that
backend) and entity type. The public ID is automatically generated by Keystone
when it first encounters the entity. If the local ID of the entity is from
a backend that does not guarantee to generate UUIDs, a hash algorithm will
generate a public ID for that entity, which is what will be exposed by
Keystone.
The use of a hash will ensure that if the public ID needs to be regenerated
then the same public ID will be created. This is useful if you are running
multiple keystones and want to ensure the same ID would be generated whichever
server you hit.
While Keystone will dynamically maintain the identity mapping, including
removing entries when entities are deleted via the Keystone, for those
entities in backends that are managed outside of Keystone (e.g. a Read Only
LDAP), Keystone will not know if entities have been deleted and hence will
continue to carry stale identity mappings in its table. While benign, keystone
provides an ability for operators to purge the mapping table of such stale
entries using the keystone-manage command, for example:
$ keystone-manage mapping_purge --domain-name DOMAINA --local-id abc@de.com
A typical usage would be for an operator to obtain a list of those entries
in an external backend that had been deleted out-of-band to Keystone, and then
call keystone-manage to purge those entries by specifying the domain and
local-id. The type of the entity (i.e. user or group) may also be specified
if this is needed to uniquely identify the mapping.
Since public IDs are be regeneratable **with the correct generator
implementation**, then, if the details of those entries that have
been deleted are not available, then it is safe to simply bulk purge
identity mappings periodically, for example:
$ keystone-manage mapping_purge --domain-name DOMAINA
will purge all the mappings for DOMAINA. The entire mapping table can be
purged with the following command:
$ keystone-manage mapping_purge --all
Public ID Generators
--------------------
Keystone supports a customizable public ID generator and it is specified in the
``[identity_mapping]`` section of the configuration file. Keystone provides a
sha256 generator as default, which produces regeneratable public IDs. The
generator algorithm for public IDs is a balance between key size (i.e. the
length of the public ID), the probability of collision and, in some
circumstances, the security of the public ID. The maximum length of public ID
supported by Keystone is 64 characters, and the default generator (sha256) uses
this full capability. Since the public ID is what is exposed externally by
Keystone and potentially stored in external systems, some installations may
wish to make use of other generator algorithms that have a different trade-off
of attributes. A different generator can be installed by configuring the
following property:
* ``generator`` - identity mapping generator. Defaults to
``keystone.identity.generators.sha256.Generator``
.. WARNING::
Changing the generator may cause all existing public IDs to be become
invalid, so typically the generator selection should be considered
immutable for a given installation.
Authentication Plugins
----------------------
@ -949,6 +1021,7 @@ through the normal REST API. At the moment, the following calls are supported:
* ``db_sync``: Sync the database schema.
* ``pki_setup``: Initialize the certificates for PKI based tokens.
* ``purge_mapping``: Purge user and group public ID mappings.
* ``ssl_setup``: Generate certificates for HTTPS.
Invoking ``keystone-manage`` by itself will give you additional usage

View File

@ -153,7 +153,9 @@ The contract for a driver for ``list_{entity}`` methods is therefore:
* It MUST return a list of entities of the specified type
* It MAY either just return all such entities, or alternatively reduce the
list by filtering for one or more of the specified filters in the passed
Hints reference, and removing any such satisfied filters.
Hints reference, and removing any such satisfied filters. An exception to
this is that for identity drivers that support domains, then they should
at least support filtering by domain_id.
Entity list truncation by drivers
---------------------------------
@ -170,6 +172,27 @@ honor any such limit if possible, but if it is unable to do so then it may
ignore it (and the truncation of the returned list of entities will happen at
the controller level).
Identity entity ID management between controllers and drivers
-------------------------------------------------------------
Keystone supports the option of having domain-specific backends for the
identity driver (i.e. for user and group storage), allowing, for example,
a different LDAP server for each domain. To ensure that Keystone can determine
to which backend it should route an API call, starting with Juno, the
identity manager will, provided that domain-specific backends are enabled,
build on-the-fly a persistent mapping table between Keystone Public IDs that
are presented to the controller and the domain that holds the entity, along
with whatever local ID is understood by the driver. This hides, for instance,
the LDAP specifics of whatever ID is being used.
To ensure backward compatibility, the default configuration of either a
single SQL or LDAP backend for Identity will not use the mapping table,
meaning that public facing IDs will be the unchanged. If keeping these IDs
the same for the default LDAP backend is not required, then setting the
configuration variable ``backward_compatible_ids`` to ``False`` will enable
the mapping for the default LDAP driver, hence hiding the LDAP specifics of the
IDs being used.
Testing
-------

View File

@ -456,7 +456,7 @@
#log_dir=<None>
# Use syslog for logging. Existing syslog format is DEPRECATED
# during I, and will chang in J to honor RFC5424. (boolean
# during I, and will change in J to honor RFC5424. (boolean
# value)
#use_syslog=false
@ -786,6 +786,39 @@
#list_limit=<None>
[identity_mapping]
#
# Options defined in keystone
#
# Keystone Identity Mapping backend driver. (string value)
#driver=keystone.identity.mapping_backends.sql.Mapping
# Public ID generator for user and group entities. The
# Keystone identity mapper only supports generators that
# produce no more than 64 characters. (string value)
#generator=keystone.identity.id_generators.sha256.Generator
# The format of user and group IDs changed in Juno for
# backends that do not generate UUIDs (e.g. LDAP), with
# keystone providing a hash mapping to the underlying
# attribute in LDAP. By default this mapping is disabled,
# which ensures that existing IDs will not change. Even when
# the mapping is enabled by using domain specific drivers, any
# users and groups from the default domain being handled by
# LDAP will still not be mapped to ensure their IDs remain
# backward compatible. Setting this value to False will enable
# the mapping for even the default LDAP driver. It is only
# safe to do this if you do not already have assignments for
# users and groups from the default LDAP domain, and it is
# acceptable for Keystone to provide the different IDs to
# clients than it did previously. Typically this means that
# the only time you can set this value to False is when
# configuring a fresh installation. (boolean value)
#backward_compatible_ids=true
[kvs]
#

View File

@ -379,9 +379,9 @@ class Manager(manager.Manager):
Users: Reference domains for grants
"""
user_refs = self.identity_api.list_users()
user_refs = self.identity_api.list_users(domain_scope=domain_id)
proj_refs = self.list_projects()
group_refs = self.identity_api.list_groups()
group_refs = self.identity_api.list_groups(domain_scope=domain_id)
# First delete the projects themselves
for project in proj_refs:
@ -399,8 +399,7 @@ class Manager(manager.Manager):
# Cleanup any existing groups.
if group['domain_id'] == domain_id:
try:
self.identity_api.delete_group(group['id'],
domain_scope=domain_id)
self.identity_api.delete_group(group['id'])
except exception.GroupNotFound:
LOG.debug(_('Group %(groupid)s not found when deleting '
'domain contents for %(domainid)s, continuing '
@ -411,8 +410,7 @@ class Manager(manager.Manager):
for user in user_refs:
if user['domain_id'] == domain_id:
try:
self.identity_api.delete_user(user['id'],
domain_scope=domain_id)
self.identity_api.delete_user(user['id'])
except exception.UserNotFound:
LOG.debug(_('User %(userid)s not found when '
'deleting domain contents for %(domainid)s, '

View File

@ -116,8 +116,7 @@ class Password(auth.AuthMethodHandler):
self.identity_api.authenticate(
context,
user_id=user_info.user_id,
password=user_info.password,
domain_scope=user_info.domain_id)
password=user_info.password)
except AssertionError:
# authentication failed because of invalid username or password
msg = _('Invalid username or password')

View File

@ -19,11 +19,13 @@ import os
from oslo.config import cfg
import pbr.version
from keystone import assignment
from keystone.common import openssl
from keystone.common import sql
from keystone.common.sql import migration_helpers
from keystone.common import utils
from keystone import config
from keystone import identity
from keystone.openstack.common.gettextutils import _
from keystone.openstack.common import log
from keystone import token
@ -174,9 +176,89 @@ class TokenFlush(BaseApp):
token_manager.driver.flush_expired_tokens()
class MappingPurge(BaseApp):
"""Purge the mapping table."""
name = 'mapping_purge'
@classmethod
def add_argument_parser(cls, subparsers):
parser = super(MappingPurge, cls).add_argument_parser(subparsers)
parser.add_argument('--all', default=False, action='store_true',
help=('Purge all mappings.'))
parser.add_argument('--domain-name', default=None,
help=('Purge any mappings for the domain '
'specified.'))
parser.add_argument('--public-id', default=None,
help=('Purge the mapping for the Public ID '
'specified.'))
parser.add_argument('--local-id', default=None,
help=('Purge the mappings for the Local ID '
'specified.'))
parser.add_argument('--type', default=None, choices=['user', 'group'],
help=('Purge any mappings for the type '
'specified.'))
return parser
@staticmethod
def main():
def validate_options():
# NOTE(henry-nash); It would be nice to use the argparse automated
# checking for this validation, but the only way I can see doing
# that is to make the default (i.e. if no optional parameters
# are specified) to purge all mappings - and that sounds too
# dangerous as a default. So we use it in a slightly
# unconventional way, where all parameters are optional, but you
# must specify at least one.
if (CONF.command.all is False and
CONF.command.domain_name is None and
CONF.command.public_id is None and
CONF.command.local_id is None and
CONF.command.type is None):
raise ValueError(_('At least one option must be provided'))
if (CONF.command.all is True and
(CONF.command.domain_name is not None or
CONF.command.public_id is not None or
CONF.command.local_id is not None or
CONF.command.type is not None)):
raise ValueError(_('--all option cannot be mixed with '
'other options'))
def get_domain_id(name):
try:
identity.Manager()
assignment_manager = assignment.Manager()
return assignment_manager.driver.get_domain_by_name(name)['id']
except KeyError:
raise ValueError(_("Unknown domain '%(name)s' specified by "
"--domain-name") % {'name': name})
validate_options()
# Now that we have validated the options, we know that at least one
# option has been specified, and if it was the --all option then this
# was the only option specified.
#
# The mapping dict is used to filter which mappings are purged, so
# leaving it empty means purge them all
mapping = {}
if CONF.command.domain_name is not None:
mapping['domain_id'] = get_domain_id(CONF.command.domain_name)
if CONF.command.public_id is not None:
mapping['public_id'] = CONF.command.public_id
if CONF.command.local_id is not None:
mapping['local_id'] = CONF.command.local_id
if CONF.command.type is not None:
mapping['type'] = CONF.command.type
mapping_manager = identity.MappingManager()
mapping_manager.driver.purge_mappings(mapping)
CMDS = [
DbSync,
DbVersion,
MappingPurge,
PKISetup,
SSLSetup,
TokenFlush,

View File

@ -168,6 +168,40 @@ FILE_OPTIONS = {
help='Maximum number of entities that will be returned in '
'an identity collection.'),
],
'identity_mapping': [
cfg.StrOpt('driver',
default=('keystone.identity.mapping_backends'
'.sql.Mapping'),
help='Keystone Identity Mapping backend driver.'),
cfg.StrOpt('generator',
default=('keystone.identity.id_generators'
'.sha256.Generator'),
help='Public ID generator for user and group entities. '
'The Keystone identity mapper only supports '
'generators that produce no more than 64 characters.'),
cfg.BoolOpt('backward_compatible_ids',
default=True,
help='The format of user and group IDs changed '
'in Juno for backends that do not generate UUIDs '
'(e.g. LDAP), with keystone providing a hash mapping '
'to the underlying attribute in LDAP. By default '
'this mapping is disabled, which ensures that '
'existing IDs will not change. Even when the '
'mapping is enabled by using domain specific '
'drivers, any users and groups from the default '
'domain being handled by LDAP will still not be '
'mapped to ensure their IDs remain backward '
'compatible. Setting this value to False will '
'enable the mapping for even the default LDAP '
'driver. It is only safe to do this if you do not '
'already have assignments for users and '
'groups from the default LDAP domain, and it is '
'acceptable for Keystone to provide the different '
'IDs to clients than it did previously. Typically '
'this means that the only time you can set this '
'value to False is when configuring a fresh '
'installation.'),
],
'trust': [
cfg.BoolOpt('enabled', default=True,
help='Delegation and impersonation features can be '

View File

@ -549,14 +549,46 @@ class V3Controller(wsgi.Application):
ref['id'] = uuid.uuid4().hex
return ref
def _get_domain_id_for_request(self, context):
"""Get the domain_id for a v3 call."""
def _get_domain_id_for_list_request(self, context):
"""Get the domain_id for a v3 list call.
if context['is_admin']:
return CONF.identity.default_domain_id
If we running with multiple domain drivers, then the caller must
specify a domain_id either as a filter or as part of the token scope.
# Fish the domain_id out of the token
#
"""
if not CONF.identity.domain_specific_drivers_enabled:
# We don't need to specify a domain ID in this case
return
if context['query_string'].get('domain_id') is not None:
return context['query_string'].get('domain_id')
try:
token_ref = self.token_api.get_token(context['token_id'])
token = token_ref['token_data']['token']
except exception.KeyError:
raise exception.ValidationError(
_('domain_id is required as part of entity'))
except exception.TokenNotFound:
LOG.warning(_('Invalid token found while getting domain ID '
'for list request'))
raise exception.Unauthorized()
if 'domain' in token:
return token['domain']['id']
else:
LOG.warning(
_('No domain information specified as part of list request'))
raise exception.Unauthorized()
def _get_domain_id_from_token(self, context):
"""Get the domain_id for a v3 create call.
In the case of a v3 create entity call that does not specify a domain
ID, the spec says that we should use the domain scoping from the token
being used.
"""
# We could make this more efficient by loading the domain_id
# into the context in the wrapper function above (since
# this version of normalize_domain will only be called inside
@ -564,19 +596,30 @@ class V3Controller(wsgi.Application):
# worth the duplication of state
try:
token_ref = self.token_api.get_token(context['token_id'])
except exception.KeyError:
# This might happen if we use the Admin token, for instance
raise exception.ValidationError(
_('A domain-scoped token must be used'))
except exception.TokenNotFound:
LOG.warning(_('Invalid token in _get_domain_id_for_request'))
LOG.warning(_('Invalid token found while getting domain ID '
'for list request'))
raise exception.Unauthorized()
if 'domain' in token_ref:
return token_ref['domain']['id']
if token_ref.get('token_data', {}).get('token', {}).get('domain', {}):
return token_ref['token_data']['token']['domain']['id']
else:
# TODO(henry-nash): We should issue an exception here since if
# a v3 call does not explicitly specify the domain_id in the
# entity, it should be using a domain scoped token. However,
# the current tempest heat tests issue a v3 call without this.
# This is raised as bug #1283539. Once this is fixed, we
# should remove the line below and replace it with an error.
return CONF.identity.default_domain_id
def _normalize_domain_id(self, context, ref):
"""Fill in domain_id if not specified in a v3 call."""
if 'domain_id' not in ref:
ref['domain_id'] = self._get_domain_id_for_request(context)
ref['domain_id'] = self._get_domain_id_from_token(context)
return ref
@staticmethod

View File

@ -191,6 +191,12 @@ class ImmutableAttributeError(Forbidden):
"'%(attributes)s' in target %(target)s")
class CrossBackendNotAllowed(Forbidden):
message_format = _("Group membership across backend boundaries is not "
"allowed, group in question is %(group_id)s, "
"user is %(user_id)s")
class NotFound(Error):
message_format = _("Could not find: %(target)s")
code = 404
@ -274,6 +280,12 @@ class FederatedProtocolNotFound(NotFound):
" Identity Provider: %(idp_id)s")
class PublicIDNotFound(NotFound):
# This is used internally and mapped to either User/GroupNotFound or,
# Assertion before the exception leaves Keystone.
message_format = "%(id)s"
class Conflict(Error):
message_format = _("Conflict occurred attempting to store %(type)s -"
" %(details)s")

View File

@ -60,9 +60,6 @@ class Identity(kvs.Base, identity.Driver):
def default_assignment_driver(self):
return "keystone.assignment.backends.kvs.Assignment"
def is_domain_aware(self):
return True
# Public interface
def authenticate(self, user_id, password):
user_ref = None
@ -156,8 +153,6 @@ class Identity(kvs.Base, identity.Driver):
new_user = old_user.copy()
user = utils.hash_user_password(user)
new_user.update(user)
if new_user['id'] != user_id:
raise exception.ValidationError('Cannot change user ID')
self.db.delete(self._calc_user_name_key(old_user['name'], domain_id))
self.db.set('user-%s' % user_id, new_user)
user_name_key = self._calc_user_name_key(new_user['name'], domain_id)

View File

@ -49,6 +49,9 @@ class Identity(identity.Driver):
def is_domain_aware(self):
return False
def generates_uuids(self):
return False
# Identity interface
def authenticate(self, user_id, password):
@ -93,8 +96,6 @@ class Identity(identity.Driver):
def update_user(self, user_id, user):
self.user.check_allow_update()
if 'id' in user and user['id'] != user_id:
raise exception.ValidationError(_('Cannot change user ID'))
old_obj = self.user.get(user_id)
if 'name' in user and old_obj.get('name') != user['name']:
raise exception.Conflict(_('Cannot change user name'))

View File

@ -91,9 +91,6 @@ class Identity(identity.Driver):
"""
return utils.check_password(password, user_ref.password)
def is_domain_aware(self):
return True
# Identity interface
def authenticate(self, user_id, password):
session = sql.get_session()
@ -148,8 +145,6 @@ class Identity(identity.Driver):
@sql.handle_conflicts(conflict_type='user')
def update_user(self, user_id, user):
session = sql.get_session()
if 'id' in user and user_id != user['id']:
raise exception.ValidationError(_('Cannot change user ID'))
with session.begin():
user_ref = self._get_user(session, user_id)

View File

@ -14,8 +14,6 @@
"""Workflow Logic the Identity service."""
import functools
from keystone.common import controller
from keystone.common import dependency
from keystone import config
@ -46,7 +44,8 @@ class User(controller.V2Controller):
context, context['query_string'].get('name'))
self.assert_admin(context)
user_list = self.identity_api.list_users()
user_list = self.identity_api.list_users(
CONF.identity.default_domain_id)
return {'users': self.v3_to_v2_user(user_list)}
@controller.v2_deprecated
@ -217,67 +216,47 @@ class UserV3(controller.V3Controller):
def list_users(self, context, filters):
hints = UserV3.build_driver_hints(context, filters)
refs = self.identity_api.list_users(
domain_scope=self._get_domain_id_for_request(context),
domain_scope=self._get_domain_id_for_list_request(context),
hints=hints)
return UserV3.wrap_collection(context, refs, hints=hints)
@controller.filterprotected('domain_id', 'enabled', 'name')
def list_users_in_group(self, context, filters, group_id):
hints = UserV3.build_driver_hints(context, filters)
refs = self.identity_api.list_users_in_group(
group_id,
domain_scope=self._get_domain_id_for_request(context),
hints=hints)
refs = self.identity_api.list_users_in_group(group_id, hints=hints)
return UserV3.wrap_collection(context, refs, hints=hints)
@controller.protected()
def get_user(self, context, user_id):
ref = self.identity_api.get_user(
user_id,
domain_scope=self._get_domain_id_for_request(context))
ref = self.identity_api.get_user(user_id)
return UserV3.wrap_member(context, ref)
def _update_user(self, context, user_id, user, domain_scope):
def _update_user(self, context, user_id, user):
self._require_matching_id(user_id, user)
self._require_matching_domain_id(
user_id, user,
functools.partial(self.identity_api.get_user,
domain_scope=domain_scope))
ref = self.identity_api.update_user(
user_id, user, domain_scope=domain_scope)
user_id, user, self.identity_api.get_user)
ref = self.identity_api.update_user(user_id, user)
return UserV3.wrap_member(context, ref)
@controller.protected()
def update_user(self, context, user_id, user):
domain_scope = self._get_domain_id_for_request(context)
return self._update_user(context, user_id, user, domain_scope)
return self._update_user(context, user_id, user)
@controller.protected(callback=_check_user_and_group_protection)
def add_user_to_group(self, context, user_id, group_id):
self.identity_api.add_user_to_group(
user_id, group_id,
domain_scope=self._get_domain_id_for_request(context))
self.identity_api.add_user_to_group(user_id, group_id)
@controller.protected(callback=_check_user_and_group_protection)
def check_user_in_group(self, context, user_id, group_id):
self.identity_api.check_user_in_group(
user_id, group_id,
domain_scope=self._get_domain_id_for_request(context))
return self.identity_api.check_user_in_group(user_id, group_id)
@controller.protected(callback=_check_user_and_group_protection)
def remove_user_from_group(self, context, user_id, group_id):
self.identity_api.remove_user_from_group(
user_id, group_id,
domain_scope=self._get_domain_id_for_request(context))
self.identity_api.remove_user_from_group(user_id, group_id)
@controller.protected()
def delete_user(self, context, user_id):
# Make sure any tokens are marked as deleted
domain_id = self._get_domain_id_for_request(context)
# Finally delete the user itself - the backend is
# responsible for deleting any role assignments related
# to this user
return self.identity_api.delete_user(user_id, domain_scope=domain_id)
return self.identity_api.delete_user(user_id)
@controller.protected()
def change_password(self, context, user_id, user):
@ -290,11 +269,9 @@ class UserV3(controller.V3Controller):
if password is None:
raise exception.ValidationError(target='user',
attribute='password')
domain_scope = self._get_domain_id_for_request(context)
try:
self.identity_api.change_password(
context, user_id, original_password, password, domain_scope)
context, user_id, original_password, password)
except AssertionError:
raise exception.Unauthorized()
@ -322,40 +299,29 @@ class GroupV3(controller.V3Controller):
def list_groups(self, context, filters):
hints = GroupV3.build_driver_hints(context, filters)
refs = self.identity_api.list_groups(
domain_scope=self._get_domain_id_for_request(context),
domain_scope=self._get_domain_id_for_list_request(context),
hints=hints)
return GroupV3.wrap_collection(context, refs, hints=hints)
@controller.filterprotected('name')
def list_groups_for_user(self, context, filters, user_id):
hints = GroupV3.build_driver_hints(context, filters)
refs = self.identity_api.list_groups_for_user(
user_id,
domain_scope=self._get_domain_id_for_request(context),
hints=hints)
refs = self.identity_api.list_groups_for_user(user_id, hints=hints)
return GroupV3.wrap_collection(context, refs, hints=hints)
@controller.protected()
def get_group(self, context, group_id):
ref = self.identity_api.get_group(
group_id,
domain_scope=self._get_domain_id_for_request(context))
ref = self.identity_api.get_group(group_id)
return GroupV3.wrap_member(context, ref)
@controller.protected()
def update_group(self, context, group_id, group):
self._require_matching_id(group_id, group)
domain_scope = self._get_domain_id_for_request(context)
self._require_matching_domain_id(
group_id, group,
functools.partial(self.identity_api.get_group,
domain_scope=domain_scope))
ref = self.identity_api.update_group(
group_id, group,
domain_scope=domain_scope)
group_id, group, self.identity_api.get_group)
ref = self.identity_api.update_group(group_id, group)
return GroupV3.wrap_member(context, ref)
@controller.protected()
def delete_group(self, context, group_id):
domain_id = self._get_domain_id_for_request(context)
self.identity_api.delete_group(group_id, domain_scope=domain_id)
self.identity_api.delete_group(group_id)

View File

@ -28,6 +28,7 @@ from keystone.common import driver_hints
from keystone.common import manager
from keystone import config
from keystone import exception
from keystone.identity.mapping_backends import mapping
from keystone import notifications
from keystone.openstack.common.gettextutils import _
from keystone.openstack.common import importutils
@ -176,19 +177,37 @@ def domains_configured(f):
def wrapper(self, *args, **kwargs):
if (not self.domain_configs.configured and
CONF.identity.domain_specific_drivers_enabled):
LOG.warning(_(
'Running an experimental and unsupported configuration '
'(domain_specific_drivers_enabled = True); '
'this will result in known issues.'))
self.domain_configs.setup_domain_drivers(
self.driver, self.assignment_api)
return f(self, *args, **kwargs)
return wrapper
def exception_translated(exception_type):
"""Wraps API calls to map to correct exception."""
def _exception_translated(f):
@functools.wraps(f)
def wrapper(self, *args, **kwargs):
try:
return f(self, *args, **kwargs)
except exception.PublicIDNotFound as e:
if exception_type == 'user':
raise exception.UserNotFound(user_id=e.message)
elif exception_type == 'group':
raise exception.GroupNotFound(group_id=e.message)
elif exception_type == 'assertion':
raise AssertionError(_('Invalid user / password'))
else:
raise
return wrapper
return _exception_translated
@dependency.provider('identity_api')
@dependency.optional('revoke_api')
@dependency.requires('assignment_api', 'credential_api', 'token_api')
@dependency.requires('assignment_api', 'credential_api', 'id_mapping_api',
'token_api')
class Manager(manager.Manager):
"""Default pivot point for the Identity backend.
@ -205,7 +224,26 @@ class Manager(manager.Manager):
Each of the identity calls are pre-processed here to choose, based on
domain, which of the drivers should be called. The non-domain-specific
driver is still in place, and is used if there is no specific driver for
the domain in question.
the domain in question (or we are not using multiple domain drivers).
Starting with Juno, in order to be able to obtain the domain from
just an ID being presented as part of an API call, a public ID to domain
and local ID mapping is maintained. This mapping also allows for the local
ID of drivers that do not provide simple UUIDs (such as LDAP) to be
referenced via a public facing ID. The mapping itself is automatically
generated as entities are accessed via the driver.
This mapping is only used when:
- the entity is being handled by anything other than the default driver, or
- the entity is being handled by the default LDAP driver and backward
compatible IDs are not required.
This means that in the standard case of a single SQL backend or the default
settings of a single LDAP backend (since backward compatible IDs is set to
True by default), no mapping is used. An alternative approach would be to
always use the mapping table, but in the cases where we don't need it to
make the public and local IDs the same. It is felt that not using the
mapping by default is a more prudent way to introduce this functionality.
"""
_USER = 'user'
@ -217,45 +255,227 @@ class Manager(manager.Manager):
# Domain ID normalization methods
def _set_domain_id(self, ref, domain_id):
def _set_domain_id_and_mapping(self, ref, domain_id, driver,
entity_type):
"""Patch the domain_id/public_id into the resulting entity(ies).
:param ref: the entity or list of entities to post process
:param domain_id: the domain scope used for the call
:param driver: the driver used to execute the call
:param entity_type: whether this is a user or group
:returns: post processed entity or list or entities
Called to post-process the entity being returned, using a mapping
to substitute a public facing ID as necessary. This method must
take into account:
- If the driver is not domain aware, then we must set the domain
attribute of all entities irrespective of mapping.
- If the driver does not support UUIDs, then we always want to provide
a mapping, except for the special case of this being the default
driver and backward_compatible_ids is set to True. This is to ensure
that entity IDs do not change for an existing LDAP installation (only
single domain/driver LDAP configurations were previously supported).
- If the driver does support UUIDs, then we always create a mapping
entry, but use the local UUID as the public ID. The exception to
- this is that if we just have single driver (i.e. not using specific
multi-domain configs), then we don't both with the mapping at all.
"""
conf = CONF.identity
if (driver is self.driver and driver.generates_uuids() and
driver.is_domain_aware()):
# The default driver that needs no help, e.g. SQL
return ref
LOG.debug('ID Mapping - Domain ID: %(domain)s, '
'Default Driver: %(driver)s, '
'Domains: %(aware)s, UUIDs: %(generate)s, '
'Compatible IDs: %(compat)s',
{'domain': domain_id,
'driver': (driver == self.driver),
'aware': driver.is_domain_aware(),
'generate': driver.generates_uuids(),
'compat': CONF.identity_mapping.backward_compatible_ids})
if isinstance(ref, dict):
LOG.debug('Local ID: %s', ref['id'])
ref = ref.copy()
ref['domain_id'] = domain_id
# If the driver can't handle domains, then we need to insert the
# domain_id into the entity being returned. If the domain_id is
# None that means we are running in a single backend mode, so to
# remain backwardly compatible, we put in the default domain ID.
if not driver.is_domain_aware():
if domain_id is None:
domain_id = conf.default_domain_id
ref['domain_id'] = domain_id
# There are two situations where we must now use the mapping:
# - this isn't the default driver (i.e. multiple backends), or
# - we have a single backend that doesn't use UUIDs
# The exception to the above is that we must honor backward
# compatibility if this is the default driver (e.g. to support
# current LDAP)
if (driver is not self.driver or
(not driver.generates_uuids() and
not CONF.identity_mapping.backward_compatible_ids)):
local_entity = {'domain_id': ref['domain_id'],
'local_id': ref['id'],
'entity_type': entity_type}
public_id = self.id_mapping_api.get_public_id(local_entity)
if public_id:
ref['id'] = public_id
LOG.debug('Found existing mapping to public ID: %s',
ref['id'])
else:
# Need to create a mapping. If the driver generates UUIDs
# then pass the local UUID in as the public ID to use.
if driver.generates_uuids():
public_id = ref['id']
ref['id'] = self.id_mapping_api.create_id_mapping(
local_entity, public_id)
LOG.debug('Created new mapping to public ID: %s',
ref['id'])
return ref
elif isinstance(ref, list):
return [self._set_domain_id(x, domain_id) for x in ref]
return [self._set_domain_id_and_mapping(
x, domain_id, driver, entity_type) for x in ref]
else:
raise ValueError(_('Expected dict or list: %s') % type(ref))
def _clear_domain_id(self, ref):
# Clear the domain_id, and then check to ensure that if this
# was not the default domain, it is being handled by its own
# backend driver.
ref = ref.copy()
domain_id = ref.pop('domain_id', CONF.identity.default_domain_id)
if (domain_id != CONF.identity.default_domain_id and
domain_id not in self.domain_configs):
raise exception.DomainNotFound(domain_id=domain_id)
def _clear_domain_id_if_domain_unaware(self, driver, ref):
"""Clear domain_id details if driver is not domain aware."""
if not driver.is_domain_aware() and 'domain_id' in ref:
ref = ref.copy()
ref.pop('domain_id')
return ref
def _normalize_scope(self, domain_scope):
if domain_scope is None:
return CONF.identity.default_domain_id
else:
return domain_scope
def _select_identity_driver(self, domain_id):
driver = self.domain_configs.get_domain_driver(domain_id)
if driver:
return driver
else:
self.assignment_api.get_domain(domain_id)
return self.driver
"""Choose a backend driver for the given domain_id.
def _get_domain_id_and_driver(self, domain_scope):
domain_id = self._normalize_scope(domain_scope)
driver = self._select_identity_driver(domain_id)
return (domain_id, driver)
:param domain_id: The domain_id for which we want to find a driver. If
the domain_id is specified as None, then this means
we need a driver that handles multiple domains.
:returns: chosen backend driver
If there is a specific driver defined for this domain then choose it.
If the domain is None, or there no specific backend for the given
domain is found, then we chose the default driver.
"""
if domain_id is None:
driver = self.driver
else:
driver = (self.domain_configs.get_domain_driver(domain_id) or
self.driver)
# If the driver is not domain aware (e.g. LDAP) then check to
# ensure we are not mapping multiple domains onto it - the only way
# that would happen is that the default driver is LDAP and the
# domain is anything other than None or the default domain.
if (not driver.is_domain_aware() and driver == self.driver and
domain_id != CONF.identity.default_domain_id and
domain_id is not None):
LOG.warning('Found multiple domains being mapped to a '
'driver that does not support that (e.g. '
'LDAP) - Domain ID: %(domain)s, '
'Default Driver: %(driver)s',
{'domain': domain_id,
'driver': (driver == self.driver)})
raise exception.DomainNotFound(domain_id=domain_id)
return driver
def _get_domain_driver_and_entity_id(self, public_id):
"""Look up details using the public ID.
:param public_id: the ID provided in the call
:returns: domain_id, which can be None to indicate that the driver
in question supports multiple domains
driver selected based on this domain
entity_id which will is understood by the driver.
Use the mapping table to look up the domain, driver and local entity
that is represented by the provided public ID. Handle the situations
were we do not use the mapping (e.g. single driver that understands
UUIDs etc.)
"""
conf = CONF.identity
# First, since we don't know anything about the entity yet, we must
# assume it needs mapping, so long as we are using domain specific
# drivers.
if conf.domain_specific_drivers_enabled:
local_id_ref = self.id_mapping_api.get_id_mapping(public_id)
if local_id_ref:
return (
local_id_ref['domain_id'],
self._select_identity_driver(local_id_ref['domain_id']),
local_id_ref['local_id'])
# So either we are using multiple drivers but the public ID is invalid
# (and hence was not found in the mapping table), or the public ID is
# being handled by the default driver. Either way, the only place left
# to look is in that standard driver. However, we don't yet know if
# this driver also needs mapping (e.g. LDAP in non backward
# compatibility mode).
driver = self.driver
if driver.generates_uuids():
if driver.is_domain_aware:
# No mapping required, and the driver can handle the domain
# information itself. The classic case of this is the
# current SQL driver.
return (None, driver, public_id)
else:
# Although we don't have any drivers of this type, i.e. that
# understand UUIDs but not domains, conceptually you could.
return (conf.default_domain_id, driver, public_id)
# So the only place left to find the ID is in the default driver which
# we now know doesn't generate UUIDs
if not CONF.identity_mapping.backward_compatible_ids:
# We are not running in backward compatibility mode, so we
# must use a mapping.
local_id_ref = self.id_mapping_api.get_id_mapping(public_id)
if local_id_ref:
return (
local_id_ref['domain_id'],
driver,
local_id_ref['local_id'])
else:
raise exception.PublicIDNotFound(id=public_id)
# If we reach here, this means that the default driver
# requires no mapping - but also doesn't understand domains
# (e.g. the classic single LDAP driver situation). Hence we pass
# back the public_ID unmodified and use the default domain (to
# keep backwards compatibility with existing installations).
#
# It is still possible that the public ID is just invalid in
# which case we leave this to the caller to check.
return (conf.default_domain_id, driver, public_id)
def _assert_user_and_group_in_same_backend(
self, user_entity_id, user_driver, group_entity_id, group_driver):
"""Ensures that user and group IDs are backed by the same backend.
Raise a CrossBackendNotAllowed exception if they are not from the same
backend, otherwise return None.
"""
if user_driver is not group_driver:
# Determine first if either IDs don't exist by calling
# the driver.get methods (which will raise a NotFound
# exception).
user_driver.get_user(user_entity_id)
group_driver.get_group(group_entity_id)
# If we get here, then someone is attempting to create a cross
# backend membership, which is not allowed.
raise exception.CrossBackendNotAllowed(group_id=group_entity_id,
user_id=user_entity_id)
def _mark_domain_id_filter_satisfied(self, hints):
if hints:
@ -264,142 +484,178 @@ class Manager(manager.Manager):
filter['comparator'] == 'equals'):
hints.filters.remove(filter)
def _ensure_domain_id_in_hints(self, hints, domain_id):
if (domain_id is not None and
not hints.get_exact_filter_by_name('domain_id')):
hints.add_filter('domain_id', domain_id)
# The actual driver calls - these are pre/post processed here as
# part of the Manager layer to make sure we:
#
# - select the right driver for this domain
# - clear/set domain_ids for drivers that do not support domains
# - create any ID mapping that might be required
@notifications.emit_event('authenticate')
@domains_configured
def authenticate(self, context, user_id, password, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
ref = driver.authenticate(user_id, password)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
@exception_translated('assertion')
def authenticate(self, context, user_id, password):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(user_id))
ref = driver.authenticate(entity_id, password)
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
@notifications.created(_USER, result_id_arg_attr='id')
@domains_configured
@exception_translated('user')
def create_user(self, user_ref):
user = user_ref.copy()
user['name'] = clean.user_name(user['name'])
user.setdefault('enabled', True)
user['enabled'] = clean.user_enabled(user['enabled'])
domain_id = user['domain_id']
self.assignment_api.get_domain(domain_id)
# For creating a user, the domain is in the object itself
domain_id = user_ref['domain_id']
driver = self._select_identity_driver(domain_id)
if not driver.is_domain_aware():
user = self._clear_domain_id(user)
user = self._clear_domain_id_if_domain_unaware(driver, user)
# Generate a local ID - in the future this might become a function of
# the underlying driver so that it could conform to rules set down by
# that particular driver type.
user['id'] = uuid.uuid4().hex
ref = driver.create_user(user['id'], user)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
@domains_configured
def get_user(self, user_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
ref = driver.get_user(user_id)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
@exception_translated('user')
def get_user(self, user_id):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(user_id))
ref = driver.get_user(entity_id)
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
@domains_configured
@exception_translated('user')
def get_user_by_name(self, user_name, domain_id):
driver = self._select_identity_driver(domain_id)
ref = driver.get_user_by_name(user_name, domain_id)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
@manager.response_truncated
@domains_configured
@exception_translated('user')
def list_users(self, domain_scope=None, hints=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
if not driver.is_domain_aware():
driver = self._select_identity_driver(domain_scope)
hints = hints or driver_hints.Hints()
if driver.is_domain_aware():
# Force the domain_scope into the hint to ensure that we only get
# back domains for that scope.
self._ensure_domain_id_in_hints(hints, domain_scope)
else:
# We are effectively satisfying any domain_id filter by the above
# driver selection, so remove any such filter
# driver selection, so remove any such filter.
self._mark_domain_id_filter_satisfied(hints)
ref_list = driver.list_users(hints or driver_hints.Hints())
if not driver.is_domain_aware():
ref_list = self._set_domain_id(ref_list, domain_id)
return ref_list
ref_list = driver.list_users(hints)
return self._set_domain_id_and_mapping(
ref_list, domain_scope, driver, mapping.EntityType.USER)
@notifications.updated(_USER)
@domains_configured
def update_user(self, user_id, user_ref, domain_scope=None):
@exception_translated('user')
def update_user(self, user_id, user_ref):
user = user_ref.copy()
if 'name' in user:
user['name'] = clean.user_name(user['name'])
if 'enabled' in user:
user['enabled'] = clean.user_enabled(user['enabled'])
if 'domain_id' in user:
self.assignment_api.get_domain(user['domain_id'])
if 'id' in user:
if user_id != user['id']:
raise exception.ValidationError(_('Cannot change user ID'))
# Since any ID in the user dict is now irrelevant, remove its so as
# the driver layer won't be confused by the fact the this is the
# public ID not the local ID
user.pop('id')
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
if not driver.is_domain_aware():
user = self._clear_domain_id(user)
ref = driver.update_user(user_id, user)
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(user_id))
user = self._clear_domain_id_if_domain_unaware(driver, user)
ref = driver.update_user(entity_id, user)
if user.get('enabled') is False or user.get('password') is not None:
if self.revoke_api:
self.revoke_api.revoke_by_user(user_id)
self.token_api.delete_tokens_for_user(user_id)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.USER)
@notifications.deleted(_USER)
@domains_configured
def delete_user(self, user_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
driver.delete_user(user_id)
@exception_translated('user')
def delete_user(self, user_id):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(user_id))
driver.delete_user(entity_id)
self.credential_api.delete_credentials_for_user(user_id)
self.token_api.delete_tokens_for_user(user_id)
self.id_mapping_api.delete_id_mapping(user_id)
@notifications.created(_GROUP, result_id_arg_attr='id')
@domains_configured
@exception_translated('group')
def create_group(self, group_ref):
group = group_ref.copy()
group.setdefault('description', '')
domain_id = group['domain_id']
self.assignment_api.get_domain(domain_id)
# For creating a group, the domain is in the object itself
domain_id = group_ref['domain_id']
driver = self._select_identity_driver(domain_id)
if not driver.is_domain_aware():
group = self._clear_domain_id(group)
group = self._clear_domain_id_if_domain_unaware(driver, group)
# Generate a local ID - in the future this might become a function of
# the underlying driver so that it could conform to rules set down by
# that particular driver type.
group['id'] = uuid.uuid4().hex
ref = driver.create_group(group['id'], group)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.GROUP)
@domains_configured
def get_group(self, group_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
ref = driver.get_group(group_id)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
@exception_translated('group')
def get_group(self, group_id):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(group_id))
ref = driver.get_group(entity_id)
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.GROUP)
@notifications.updated(_GROUP)
@domains_configured
def update_group(self, group_id, group, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
if not driver.is_domain_aware():
group = self._clear_domain_id(group)
ref = driver.update_group(group_id, group)
if not driver.is_domain_aware():
ref = self._set_domain_id(ref, domain_id)
return ref
@exception_translated('group')
def update_group(self, group_id, group):
if 'domain_id' in group:
self.assignment_api.get_domain(group['domain_id'])
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(group_id))
group = self._clear_domain_id_if_domain_unaware(driver, group)
ref = driver.update_group(entity_id, group)
return self._set_domain_id_and_mapping(
ref, domain_id, driver, mapping.EntityType.GROUP)
def revoke_tokens_for_group(self, group_id, domain_scope):
def revoke_tokens_for_group(self, group_id):
# We get the list of users before we attempt the group
# deletion, so that we can remove these tokens after we know
# the group deletion succeeded.
# TODO(ayoung): revoke based on group and roleids instead
user_ids = []
for u in self.list_users_in_group(group_id, domain_scope):
for u in self.list_users_in_group(group_id):
user_ids.append(u['id'])
if self.revoke_api:
self.revoke_api.revoke_by_user(u['id'])
@ -407,23 +663,54 @@ class Manager(manager.Manager):
@notifications.deleted(_GROUP)
@domains_configured
def delete_group(self, group_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
@exception_translated('group')
def delete_group(self, group_id):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(group_id))
# As well as deleting the group, we need to invalidate
# any tokens for the users who are members of the group.
self.revoke_tokens_for_group(group_id, domain_scope)
driver.delete_group(group_id)
self.revoke_tokens_for_group(group_id)
driver.delete_group(entity_id)
self.id_mapping_api.delete_id_mapping(group_id)
@domains_configured
def add_user_to_group(self, user_id, group_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
driver.add_user_to_group(user_id, group_id)
@exception_translated('group')
def add_user_to_group(self, user_id, group_id):
@exception_translated('user')
def get_entity_info_for_user(public_id):
return self._get_domain_driver_and_entity_id(public_id)
_domain_id, group_driver, group_entity_id = (
self._get_domain_driver_and_entity_id(group_id))
# Get the same info for the user_id, taking care to map any
# exceptions correctly
_domain_id, user_driver, user_entity_id = (
get_entity_info_for_user(user_id))
self._assert_user_and_group_in_same_backend(
user_entity_id, user_driver, group_entity_id, group_driver)
group_driver.add_user_to_group(user_entity_id, group_entity_id)
self.token_api.delete_tokens_for_user(user_id)
@domains_configured
def remove_user_from_group(self, user_id, group_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
driver.remove_user_from_group(user_id, group_id)
@exception_translated('group')
def remove_user_from_group(self, user_id, group_id):
@exception_translated('user')
def get_entity_info_for_user(public_id):
return self._get_domain_driver_and_entity_id(public_id)
_domain_id, group_driver, group_entity_id = (
self._get_domain_driver_and_entity_id(group_id))
# Get the same info for the user_id, taking care to map any
# exceptions correctly
_domain_id, user_driver, user_entity_id = (
get_entity_info_for_user(user_id))
self._assert_user_and_group_in_same_backend(
user_entity_id, user_driver, group_entity_id, group_driver)
group_driver.remove_user_from_group(user_entity_id, group_entity_id)
# TODO(ayoung) revoking all tokens for a user based on group
# membership is overkill, as we only would need to revoke tokens
# that had role assignments via the group. Calculating those
@ -434,62 +721,81 @@ class Manager(manager.Manager):
@manager.response_truncated
@domains_configured
def list_groups_for_user(self, user_id, domain_scope=None,
hints=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
@exception_translated('user')
def list_groups_for_user(self, user_id, hints=None):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(user_id))
hints = hints or driver_hints.Hints()
if not driver.is_domain_aware():
# We are effectively satisfying any domain_id filter by the above
# driver selection, so remove any such filter
self._mark_domain_id_filter_satisfied(hints)
ref_list = driver.list_groups_for_user(
user_id, hints or driver_hints.Hints())
if not driver.is_domain_aware():
ref_list = self._set_domain_id(ref_list, domain_id)
return ref_list
ref_list = driver.list_groups_for_user(entity_id, hints)
return self._set_domain_id_and_mapping(
ref_list, domain_id, driver, mapping.EntityType.GROUP)
@manager.response_truncated
@domains_configured
@exception_translated('group')
def list_groups(self, domain_scope=None, hints=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
if not driver.is_domain_aware():
driver = self._select_identity_driver(domain_scope)
hints = hints or driver_hints.Hints()
if driver.is_domain_aware():
# Force the domain_scope into the hint to ensure that we only get
# back domains for that scope.
self._ensure_domain_id_in_hints(hints, domain_scope)
else:
# We are effectively satisfying any domain_id filter by the above
# driver selection, so remove any such filter
# driver selection, so remove any such filter.
self._mark_domain_id_filter_satisfied(hints)
ref_list = driver.list_groups(hints or driver_hints.Hints())
if not driver.is_domain_aware():
ref_list = self._set_domain_id(ref_list, domain_id)
return ref_list
ref_list = driver.list_groups(hints)
return self._set_domain_id_and_mapping(
ref_list, domain_scope, driver, mapping.EntityType.GROUP)
@manager.response_truncated
@domains_configured
def list_users_in_group(self, group_id, domain_scope=None,
hints=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
@exception_translated('group')
def list_users_in_group(self, group_id, hints=None):
domain_id, driver, entity_id = (
self._get_domain_driver_and_entity_id(group_id))
hints = hints or driver_hints.Hints()
if not driver.is_domain_aware():
# We are effectively satisfying any domain_id filter by the above
# driver selection, so remove any such filter
self._mark_domain_id_filter_satisfied(hints)
ref_list = driver.list_users_in_group(
group_id, hints or driver_hints.Hints())
if not driver.is_domain_aware():
ref_list = self._set_domain_id(ref_list, domain_id)
return ref_list
ref_list = driver.list_users_in_group(entity_id, hints)
return self._set_domain_id_and_mapping(
ref_list, domain_id, driver, mapping.EntityType.USER)
@domains_configured
def check_user_in_group(self, user_id, group_id, domain_scope=None):
domain_id, driver = self._get_domain_id_and_driver(domain_scope)
driver.check_user_in_group(user_id, group_id)
@exception_translated('group')
def check_user_in_group(self, user_id, group_id):
@exception_translated('user')
def get_entity_info_for_user(public_id):
return self._get_domain_driver_and_entity_id(public_id)
_domain_id, group_driver, group_entity_id = (
self._get_domain_driver_and_entity_id(group_id))
# Get the same info for the user_id, taking care to map any
# exceptions correctly
_domain_id, user_driver, user_entity_id = (
get_entity_info_for_user(user_id))
self._assert_user_and_group_in_same_backend(
user_entity_id, user_driver, group_entity_id, group_driver)
return group_driver.check_user_in_group(user_entity_id,
group_entity_id)
@domains_configured
def change_password(self, context, user_id, original_password,
new_password, domain_scope):
new_password):
# authenticate() will raise an AssertionError if authentication fails
self.authenticate(context, user_id, original_password,
domain_scope=domain_scope)
self.authenticate(context, user_id, original_password)
update_dict = {'password': new_password}
self.update_user(user_id, update_dict, domain_scope=domain_scope)
self.update_user(user_id, update_dict)
@six.add_metaclass(abc.ABCMeta)
@ -499,6 +805,14 @@ class Driver(object):
def _get_list_limit(self):
return CONF.identity.list_limit or CONF.list_limit
def is_domain_aware(self):
"""Indicates if Driver supports domains."""
return True
def generates_uuids(self):
"""Indicates if Driver generates UUIDs as the local entity ID."""
return True
@abc.abstractmethod
def authenticate(self, user_id, password):
"""Authenticate a given user and password.
@ -676,11 +990,6 @@ class Driver(object):
"""
raise exception.NotImplemented()
@abc.abstractmethod
def is_domain_aware(self):
"""Indicates if Driver supports domains."""
raise exception.NotImplemented()
# end of identity
@ -689,9 +998,7 @@ class MappingManager(manager.Manager):
"""Default pivot point for the ID Mapping backend."""
def __init__(self):
# TODO(henry-nash): Use a config option to select the mapping driver
super(MappingManager, self).__init__(
'keystone.identity.mapping_backends.sql.Mapping')
super(MappingManager, self).__init__(CONF.identity_mapping.driver)
@six.add_metaclass(abc.ABCMeta)

View File

@ -20,17 +20,18 @@ import six
from keystone.common import dependency
from keystone.common import manager
from keystone import config
from keystone import exception
CONF = config.CONF
@dependency.provider('id_generator_api')
class Manager(manager.Manager):
"""Default pivot point for the identifier generator backend."""
def __init__(self):
# TODO(henry-nash): Use a config option to select the generator driver
super(Manager, self).__init__(
'keystone.identity.id_generators.sha256.Generator')
super(Manager, self).__init__(CONF.identity_mapping.generator)
@six.add_metaclass(abc.ABCMeta)

View File

@ -474,11 +474,14 @@ class TestCase(BaseTestCase):
fixtures_to_cleanup.append(attrname)
for tenant in fixtures.TENANTS:
try:
rv = self.assignment_api.create_project(
tenant['id'], tenant)
except exception.Conflict:
rv = self.assignment_api.get_project(tenant['id'])
if hasattr(self, 'tenant_%s' % tenant['id']):
try:
# This will clear out any roles on the project as well
self.assignment_api.delete_project(tenant['id'])
except exception.ProjectNotFound:
pass
rv = self.assignment_api.create_project(
tenant['id'], tenant)
attrname = 'tenant_%s' % tenant['id']
setattr(self, attrname, rv)

View File

@ -14,13 +14,6 @@
from keystone.common import sql
from keystone.identity.mapping_backends import sql as mapping_sql
# NOTE(henry-nash): This function is defined in a separate file since it will
# be used across multiple unit test files once the full support for cross
# backend identifiers is implemented.
#
# TODO(henry-nash): Remove this comment once the full support mentioned above
# has landed, since the reason for this separate file will be obvious.
def list_id_mappings():
"""List all id_mappings for testing purposes."""

View File

@ -45,6 +45,11 @@ class IdentityTests(object):
self.assignment_api.create_domain(domain['id'], domain)
return domain
def _set_domain_scope(self, domain_id):
# We only provide a domain scope if we have multiple drivers
if CONF.identity.domain_specific_drivers_enabled:
return domain_id
def test_project_add_and_remove_user_role(self):
user_ids = self.assignment_api.list_user_ids_for_project(
self.tenant_bar['id'])
@ -1909,7 +1914,8 @@ class IdentityTests(object):
user)
def test_list_users(self):
users = self.identity_api.list_users()
users = self.identity_api.list_users(
domain_scope=self._set_domain_scope(DEFAULT_DOMAIN_ID))
self.assertEqual(len(default_fixtures.USERS), len(users))
user_ids = set(user['id'] for user in users)
expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
@ -1927,7 +1933,8 @@ class IdentityTests(object):
'name': uuid.uuid4().hex}
group1 = self.identity_api.create_group(group1)
group2 = self.identity_api.create_group(group2)
groups = self.identity_api.list_groups()
groups = self.identity_api.list_groups(
domain_scope=self._set_domain_scope(DEFAULT_DOMAIN_ID))
self.assertEqual(2, len(groups))
group_ids = []
for group in groups:

View File

@ -29,9 +29,11 @@ from keystone.common import sql
from keystone import config
from keystone import exception
from keystone import identity
from keystone.identity.mapping_backends import mapping as map
from keystone import tests
from keystone.tests import default_fixtures
from keystone.tests import fakeldap
from keystone.tests import identity_mapping as mapping_sql
from keystone.tests.ksfixtures import database
from keystone.tests import test_backend
@ -490,13 +492,13 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
group_id = self.identity_api.create_group(group)['id']
# Create a couple of users and add them to the group.
user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex,
user = dict(name=uuid.uuid4().hex,
domain_id=CONF.identity.default_domain_id)
user_1_id = self.identity_api.create_user(user)['id']
self.identity_api.add_user_to_group(user_1_id, group_id)
user = dict(name=uuid.uuid4().hex, id=uuid.uuid4().hex,
user = dict(name=uuid.uuid4().hex,
domain_id=CONF.identity.default_domain_id)
user_2_id = self.identity_api.create_user(user)['id']
@ -505,9 +507,9 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
# Delete user 2
# NOTE(blk-u): need to go directly to user interface to keep from
# updating the group.
driver = self.identity_api._select_identity_driver(
user['domain_id'])
driver.user.delete(user_2_id)
unused, driver, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(user_2_id))
driver.user.delete(entity_id)
# List group users and verify only user 1.
res = self.identity_api.list_users_in_group(group_id)
@ -588,8 +590,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
self.identity_api.authenticate,
context={},
user_id=user['id'],
password=None,
domain_scope=user['domain_id'])
password=None)
# (spzala)The group and domain crud tests below override the standard ones
# in test_backend.py so that we can exclude the update name test, since we
@ -651,6 +652,12 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
def test_updated_arbitrary_attributes_are_returned_from_update_user(self):
self.skipTest("Using arbitrary attributes doesn't work under LDAP")
def test_cache_layer_domain_crud(self):
# TODO(morganfainberg): This also needs to be removed when full LDAP
# implementation is submitted. No need to duplicate the above test,
# just skip this time.
self.skipTest('Domains are read-only against LDAP')
def test_user_id_comma(self):
"""Even if the user has a , in their ID, groups can be listed."""
@ -668,6 +675,15 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
}
user = self.identity_api.driver.create_user(user_id, user)
# Now we'll use the manager to discover it, which will create a
# Public ID for it.
ref_list = self.identity_api.list_users()
public_user_id = None
for ref in ref_list:
if ref['name'] == user['name']:
public_user_id = ref['id']
break
# Create a group
group_id = uuid.uuid4().hex
group = {
@ -676,14 +692,23 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
'description': self.getUniqueString(),
'domain_id': CONF.identity.default_domain_id,
}
self.identity_api.driver.create_group(group_id, group)
group = self.identity_api.driver.create_group(group_id, group)
# Now we'll use the manager to discover it, which will create a
# Public ID for it.
ref_list = self.identity_api.list_groups()
public_group_id = None
for ref in ref_list:
if ref['name'] == group['name']:
public_group_id = ref['id']
break
# Put the user in the group
self.identity_api.add_user_to_group(user_id, group_id)
self.identity_api.add_user_to_group(public_user_id, public_group_id)
# List groups for user.
ref_list = self.identity_api.list_groups_for_user(user_id)
ref_list = self.identity_api.list_groups_for_user(public_user_id)
group['id'] = public_group_id
self.assertThat(ref_list, matchers.Equals([group]))
def test_user_id_comma_grants(self):
@ -704,15 +729,25 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
}
self.identity_api.driver.create_user(user_id, user)
# Now we'll use the manager to discover it, which will create a
# Public ID for it.
ref_list = self.identity_api.list_users()
public_user_id = None
for ref in ref_list:
if ref['name'] == user['name']:
public_user_id = ref['id']
break
# Grant the user a role on a project.
role_id = 'member'
project_id = self.tenant_baz['id']
self.assignment_api.create_grant(role_id, user_id=user_id,
self.assignment_api.create_grant(role_id, user_id=public_user_id,
project_id=project_id)
role_ref = self.assignment_api.get_grant(role_id, user_id=user_id,
role_ref = self.assignment_api.get_grant(role_id,
user_id=public_user_id,
project_id=project_id)
self.assertEqual(role_id, role_ref['id'])
@ -1685,6 +1720,64 @@ class LdapIdentitySqlAssignment(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.skipTest('Blocked by bug 1221805')
class LdapIdentitySqlAssignmentWithMapping(LdapIdentitySqlAssignment):
"""Class to test mapping of default LDAP backend.
The default configuration is not to enable mapping when using a single
backend LDAP driver. However, a cloud provider might want to enable
the mapping, hence hiding the LDAP IDs from any clients of keystone.
Setting backward_compatible_ids to False will enable this mapping.
"""
def config_overrides(self):
super(LdapIdentitySqlAssignmentWithMapping, self).config_overrides()
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
def test_dynamic_mapping_build(self):
"""Test to ensure entities not create via controller are mapped.
Many LDAP backends will, essentially, by Read Only. In these cases
the mapping is not built by creating objects, rather from enumerating
the entries. We test this here my manually deleting the mapping and
then trying to re-read the entries.
"""
initial_mappings = len(mapping_sql.list_id_mappings())
user1 = {'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'password': uuid.uuid4().hex, 'enabled': True}
user1 = self.identity_api.create_user(user1)
user2 = {'name': uuid.uuid4().hex,
'domain_id': CONF.identity.default_domain_id,
'password': uuid.uuid4().hex, 'enabled': True}
user2 = self.identity_api.create_user(user2)
mappings = mapping_sql.list_id_mappings()
self.assertEqual(initial_mappings + 2, len(mappings))
# Now delete the mappings for the two users above
self.id_mapping_api.purge_mappings({'public_id': user1['id']})
self.id_mapping_api.purge_mappings({'public_id': user2['id']})
# We should no longer be able to get these users via their old IDs
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
user1['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
user2['id'])
# Now enumerate all users...this should re-build the mapping, and
# we should be able to find the users via their original public IDs.
self.identity_api.list_users()
self.identity_api.get_user(user1['id'])
self.identity_api.get_user(user2['id'])
def test_get_roles_for_user_and_project_user_group_same_id(self):
self.skipTest('N/A: We never generate the same ID for a user and '
'group in our mapping table')
class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
tests.TestCase):
"""Class to test common SQL plus individual LDAP backends.
@ -1716,18 +1809,21 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
sql.ModelBase.metadata.create_all(bind=self.engine)
self.addCleanup(sql.ModelBase.metadata.drop_all, bind=self.engine)
self._setup_domain_test_data()
self._setup_initial_test_data()
# All initial domain data setup complete, time to switch on support
# All initial test data setup complete, time to switch on support
# for separate backends per domain.
self.config_fixture.config(group='identity',
domain_specific_drivers_enabled=True,
domain_config_dir=tests.TESTSDIR)
self.config_fixture.config(group='identity_mapping',
backward_compatible_ids=False)
self._set_domain_configs()
self.clear_database()
self.load_fixtures(default_fixtures)
self._create_users_across_domains()
def config_overrides(self):
super(MultiLDAPandSQLIdentity, self).config_overrides()
@ -1740,7 +1836,18 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
group='assignment',
driver='keystone.assignment.backends.sql.Assignment')
def _setup_domain_test_data(self):
def _create_user(self, domain_id):
user = {'name': uuid.uuid4().hex,
'domain_id': domain_id,
'password': uuid.uuid4().hex,
'enabled': True}
user_ref = self.identity_api.create_user(user)
# Put the password back in, since this is used later by tests to
# authenticate.
user_ref['password'] = user['password']
return user_ref
def _setup_initial_test_data(self):
def create_domain(domain):
try:
@ -1751,15 +1858,50 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.assignment_api.get_domain_by_name(domain['name']))
return ref
self.domain_default = create_domain(assignment.calc_default_domain())
self.domain1 = create_domain(
{'id': uuid.uuid4().hex, 'name': 'domain1'})
self.domain2 = create_domain(
{'id': uuid.uuid4().hex, 'name': 'domain2'})
self.domain3 = create_domain(
{'id': uuid.uuid4().hex, 'name': 'domain3'})
self.domain4 = create_domain(
{'id': uuid.uuid4().hex, 'name': 'domain4'})
self.domains = {}
self.domain_count = 5
for x in range(1, self.domain_count):
domain = 'domain%s' % x
self.domains[domain] = create_domain(
{'id': uuid.uuid4().hex, 'name': domain})
self.domains['domain_default'] = create_domain(
assignment.calc_default_domain())
# Create some identity entities BEFORE we switch to multi-backend, so
# we can test that these are still accessible
self.users = {}
self.users['userA'] = self._create_user(
self.domains['domain_default']['id'])
self.users['userB'] = self._create_user(
self.domains['domain1']['id'])
self.users['userC'] = self._create_user(
self.domains['domain3']['id'])
def _create_users_across_domains(self):
"""Create a set of users, each with a role on their own domain."""
# We also will check that the right number of id mappings get created
initial_mappings = len(mapping_sql.list_id_mappings())
self.users['user0'] = self._create_user(
self.domains['domain_default']['id'])
self.assignment_api.create_grant(
user_id=self.users['user0']['id'],
domain_id=self.domains['domain_default']['id'],
role_id=self.role_member['id'])
for x in range(1, self.domain_count):
self.users['user%s' % x] = self._create_user(
self.domains['domain%s' % x]['id'])
self.assignment_api.create_grant(
user_id=self.users['user%s' % x]['id'],
domain_id=self.domains['domain%s' % x]['id'],
role_id=self.role_member['id'])
# So how many new id mappings should have been created? One for each
# user created in a domain that is using the non default driver - i.e.
# the default domain and domains 1 and 2 - so 3 new mappings.
self.assertEqual(initial_mappings + 3,
len(mapping_sql.list_id_mappings()))
def _set_domain_configs(self):
# We need to load the domain configs explicitly to ensure the
@ -1796,83 +1938,141 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.skipTest(
'N/A: Not relevant for multi ldap testing')
def test_list_users(self):
# Override the standard list users, since we have added an extra user
# to the default domain, so the number of expected users is one more
# than in the standard test.
users = self.identity_api.list_users(
domain_scope=self._set_domain_scope(
CONF.identity.default_domain_id))
self.assertEqual(len(default_fixtures.USERS) + 1, len(users))
user_ids = set(user['id'] for user in users)
expected_user_ids = set(getattr(self, 'user_%s' % user['id'])['id']
for user in default_fixtures.USERS)
expected_user_ids.add(self.users['user0']['id'])
for user_ref in users:
self.assertNotIn('password', user_ref)
self.assertEqual(expected_user_ids, user_ids)
def test_domain_segregation(self):
"""Test that separate configs have segregated the domain.
Test Plan:
- Create a user in each of the domains
- Make sure that you can only find a given user in its
relevant domain
- Users were created in each domain as part of setup, now make sure
you can only find a given user in its relevant domain/backend
- Make sure that for a backend that supports multiple domains
you can get the users via any of the domain scopes
you can get the users via any of its domains
"""
def create_user(domain_id):
user = {'name': uuid.uuid4().hex,
'domain_id': domain_id,
'password': uuid.uuid4().hex,
'enabled': True}
user_ref = self.identity_api.create_user(user)
user_ref['password'] = user['password']
return user_ref
def check_user(user, domain_id, expected_status):
# As part of the test, we want to force ourselves to manually
# select the driver for this domain, to make sure the entity
# ended up in the correct backend
driver = self.identity_api._select_identity_driver(domain_id)
unused, unused, entity_id = (
self.identity_api._get_domain_driver_and_entity_id(
user['id']))
userd = create_user(CONF.identity.default_domain_id)
user1 = create_user(self.domain1['id'])
user2 = create_user(self.domain2['id'])
user3 = create_user(self.domain3['id'])
user4 = create_user(self.domain4['id'])
if expected_status == 200:
ref = driver.get_user(entity_id)
ref = self.identity_api._set_domain_id_and_mapping(
ref, domain_id, driver, map.EntityType.USER)
user = user.copy()
del user['password']
self.assertDictEqual(ref, user)
else:
# TODO(henry-nash): Use AssertRaises here, although
# there appears to be an issue with using driver.get_user
# inside that construct
try:
driver.get_user(entity_id)
except expected_status:
pass
# Now check that I can read user1 with the appropriate domain
# scope, but won't find it if the wrong scope is used
# Check that I can read a user with the appropriate domain-selected
# driver, but won't find it via any other domain driver
ref = self.identity_api.get_user(
userd['id'], domain_scope=CONF.identity.default_domain_id)
del userd['password']
self.assertDictEqual(ref, userd)
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
userd['id'],
domain_scope=self.domain1['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
userd['id'],
domain_scope=self.domain2['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
userd['id'],
domain_scope=self.domain3['id'])
self.assertRaises(exception.UserNotFound,
self.identity_api.get_user,
userd['id'],
domain_scope=self.domain4['id'])
check_user(self.users['user0'],
self.domains['domain_default']['id'], 200)
for domain in [self.domains['domain1']['id'],
self.domains['domain2']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user0'], domain, exception.UserNotFound)
ref = self.identity_api.get_user(
user1['id'], domain_scope=self.domain1['id'])
del user1['password']
self.assertDictEqual(ref, user1)
ref = self.identity_api.get_user(
user2['id'], domain_scope=self.domain2['id'])
del user2['password']
self.assertDictEqual(ref, user2)
check_user(self.users['user1'], self.domains['domain1']['id'], 200)
for domain in [self.domains['domain_default']['id'],
self.domains['domain2']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user1'], domain, exception.UserNotFound)
check_user(self.users['user2'], self.domains['domain2']['id'], 200)
for domain in [self.domains['domain_default']['id'],
self.domains['domain1']['id'],
self.domains['domain3']['id'],
self.domains['domain4']['id']]:
check_user(self.users['user2'], domain, exception.UserNotFound)
# Domains 3 and 4 share the same backend, so you should be
# able to see user3 and 4 from either
ref = self.identity_api.get_user(
user3['id'], domain_scope=self.domain3['id'])
del user3['password']
self.assertDictEqual(ref, user3)
ref = self.identity_api.get_user(
user4['id'], domain_scope=self.domain4['id'])
del user4['password']
self.assertDictEqual(ref, user4)
ref = self.identity_api.get_user(
user3['id'], domain_scope=self.domain4['id'])
self.assertDictEqual(ref, user3)
ref = self.identity_api.get_user(
user4['id'], domain_scope=self.domain3['id'])
self.assertDictEqual(ref, user4)
check_user(self.users['user3'], self.domains['domain3']['id'], 200)
check_user(self.users['user3'], self.domains['domain4']['id'], 200)
check_user(self.users['user4'], self.domains['domain3']['id'], 200)
check_user(self.users['user4'], self.domains['domain4']['id'], 200)
for domain in [self.domains['domain_default']['id'],
self.domains['domain1']['id'],
self.domains['domain2']['id']]:
check_user(self.users['user3'], domain, exception.UserNotFound)
check_user(self.users['user4'], domain, exception.UserNotFound)
# Finally, going through the regular manager layer, make sure we
# only see the right number of users in each of the non-default
# domains. One might have expected two users in domain1 (since we
# created one before we switched to multi-backend), however since
# that domain changed backends in the switch we don't find it anymore.
# This is as designed - we don't support moving domains between
# backends.
#
# The listing of the default domain is already handled in the
# test_lists_users() method.
for domain in [self.domains['domain1']['id'],
self.domains['domain2']['id'],
self.domains['domain4']['id']]:
self.assertThat(
self.identity_api.list_users(domain_scope=domain),
matchers.HasLength(1))
# Domain3 has a user created before we switched on
# multiple backends, plus one created afterwards - and it's
# backend has not changed - so we should fined two.
self.assertThat(
self.identity_api.list_users(
domain_scope=self.domains['domain3']['id']),
matchers.HasLength(2))
def test_existing_uuids_work(self):
"""Test that 'uni-domain' created IDs still work.
Throwing the switch to domain-specific backends should not cause
existing identities to be inaccessible via ID.
"""
self.identity_api.get_user(self.users['userA']['id'])
self.identity_api.get_user(self.users['userB']['id'])
self.identity_api.get_user(self.users['userC']['id'])
def test_authenticate_to_each_domain(self):
"""Test that a user in each domain can authenticate."""
for user_num in range(5):
user = 'user%s' % user_num
self.identity_api.authenticate(
context={},
user_id=self.users[user]['id'],
password=self.users[user]['password'])
def test_scanning_of_config_dir(self):
"""Test the Manager class scans the config directory.
@ -1888,18 +2088,23 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
self.assertTrue(config.CONF.identity.domain_specific_drivers_enabled)
self.load_backends()
# Execute any command to trigger the lazy loading of domain configs
self.identity_api.list_users(domain_scope=self.domain1['id'])
self.identity_api.list_users(
domain_scope=self.domains['domain1']['id'])
# ...and now check the domain configs have been set up
self.assertIn('default', self.identity_api.domain_configs)
self.assertIn(self.domain1['id'], self.identity_api.domain_configs)
self.assertIn(self.domain2['id'], self.identity_api.domain_configs)
self.assertNotIn(self.domain3['id'], self.identity_api.domain_configs)
self.assertNotIn(self.domain4['id'], self.identity_api.domain_configs)
self.assertIn(self.domains['domain1']['id'],
self.identity_api.domain_configs)
self.assertIn(self.domains['domain2']['id'],
self.identity_api.domain_configs)
self.assertNotIn(self.domains['domain3']['id'],
self.identity_api.domain_configs)
self.assertNotIn(self.domains['domain4']['id'],
self.identity_api.domain_configs)
# Finally check that a domain specific config contains items from both
# the primary config and the domain specific config
conf = self.identity_api.domain_configs.get_domain_conf(
self.domain1['id'])
self.domains['domain1']['id'])
# This should now be false, as is the default, since this is not
# set in the standard primary config file
self.assertFalse(conf.identity.domain_specific_drivers_enabled)
@ -1915,6 +2120,10 @@ class MultiLDAPandSQLIdentity(BaseLDAPIdentity, tests.SQLDriverOverrides,
def test_list_projects_for_user_with_grants(self):
self.skipTest('Blocked by bug 1221805')
def test_get_roles_for_user_and_project_user_group_same_id(self):
self.skipTest('N/A: We never generate the same ID for a user and '
'group in our mapping table')
def test_user_id_comma(self):
self.skipTest('Only valid if it is guaranteed to be taling to '
'the fakeldap backend')

View File

@ -484,6 +484,52 @@ class IdentityTestCase(test_v3.RestfulTestCase):
body={'user': ref})
return self.assertValidUserResponse(r, ref)
def test_create_user_without_domain(self):
"""Call ``POST /users`` without specifying domain.
According to the identity-api specification, if you do not
explicitly specific the domain_id in the entity, it should
take the domain scope of the token as the domain_id.
"""
# Create a user with a role on the domain so we can get a
# domain scoped token
domain = self.new_domain_ref()
self.assignment_api.create_domain(domain['id'], domain)
user = self.new_user_ref(domain_id=domain['id'])
password = user['password']
user = self.identity_api.create_user(user)
user['password'] = password
self.assignment_api.create_grant(
role_id=self.role_id, user_id=user['id'],
domain_id=domain['id'])
ref = self.new_user_ref(domain_id=domain['id'])
ref_nd = ref.copy()
ref_nd.pop('domain_id')
auth = self.build_authentication_request(
user_id=user['id'],
password=user['password'],
domain_id=domain['id'])
r = self.post('/users', body={'user': ref_nd}, auth=auth)
self.assertValidUserResponse(r, ref)
# Now try the same thing without a domain token - which should fail
ref = self.new_user_ref(domain_id=domain['id'])
ref_nd = ref.copy()
ref_nd.pop('domain_id')
auth = self.build_authentication_request(
user_id=self.user['id'],
password=self.user['password'],
project_id=self.project['id'])
r = self.post('/users', body={'user': ref_nd}, auth=auth)
# TODO(henry-nash): Due to bug #1283539 we currently automatically
# use the default domain_id if a domain scoped token is not being
# used. Change the code below to expect a failure once this bug is
# fixed.
ref['domain_id'] = CONF.identity.default_domain_id
return self.assertValidUserResponse(r, ref)
def test_create_user_400(self):
"""Call ``POST /users``."""
self.post('/users', body={'user': {}}, expected_status=400)
@ -493,6 +539,49 @@ class IdentityTestCase(test_v3.RestfulTestCase):
r = self.get('/users')
self.assertValidUserListResponse(r, ref=self.user)
def test_list_users_with_multiple_backends(self):
"""Call ``GET /users`` when multiple backends is enabled.
In this scenario, the controller requires a domain to be specified
either as a filter or by using a domain scoped token.
"""
self.config_fixture.config(group='identity',
domain_specific_drivers_enabled=True)
# Create a user with a role on the domain so we can get a
# domain scoped token
domain = self.new_domain_ref()
self.assignment_api.create_domain(domain['id'], domain)
user = self.new_user_ref(domain_id=domain['id'])
password = user['password']
user = self.identity_api.create_user(user)
user['password'] = password
self.assignment_api.create_grant(
role_id=self.role_id, user_id=user['id'],
domain_id=domain['id'])
ref = self.new_user_ref(domain_id=domain['id'])
ref_nd = ref.copy()
ref_nd.pop('domain_id')
auth = self.build_authentication_request(
user_id=user['id'],
password=user['password'],
domain_id=domain['id'])
# First try using a domain scoped token
r = self.get('/users', auth=auth)
self.assertValidUserListResponse(r, ref=user)
# Now try with an explicit filter
r = self.get('/users?domain_id=%(domain_id)s' %
{'domain_id': domain['id']})
self.assertValidUserListResponse(r, ref=user)
# Now try the same thing without a domain token or filter,
# which should fail
r = self.get('/users', expected_status=exception.Unauthorized.code)
def test_list_users_no_default_project(self):
"""Call ``GET /users`` making sure no default_project_id."""
user = self.new_user_ref(self.domain_id)