Implement role assignment inheritance (OS-INHERIT extension)

This extension allows for project roles to be optionally
inherited from the owning domain.  The v3 grant APIs are extended
to take an inherited_to_projects flag.  The GET role_assignments
API will also include these roles in its response, either showing them
as inherited roles assigned to the domain or, if the 'effective'
query parameter is set, will interpret the inheritance and reflect
those role assignments on the projects.

The inherited_to_projects flag is encoded in the role list in
the metadata of the relevant entries in the grant tables. The
'roles' key in the metadata is now a list of dicts, as opposed
to a simple list, where each dict is either

{'id': role_id} for a regular role, or
{'id': role_id, 'inherited_to': 'projects'} for an inherited role

Remember that a previous patch had rationalized the way metadata is
handled so that its structure is entirely hidden within the driver
layer.

The extension can be enabled/disabled via a config setting.

Limitations:

- The extension is not yet discoverable via url, this will be added
  as a separate patch when the v3/extensions work is complete.

A separate issue has been discovered with the fact that the v2
calls of 'get_projects_for_user()' and 'list_user_projects()'
should be rationalized and also honor both group (and inherited)
role assignments.  This is being raised as a separate bug.

DocImpact

Implements bp inherited-domain-roles

Change-Id: I35b57ce0df668f12462e96b3467cef0239594e97
This commit is contained in:
Henry Nash
2013-07-05 06:04:25 +01:00
parent 3a56c8a68d
commit 7f4891ddc3
18 changed files with 1859 additions and 188 deletions

View File

@@ -79,6 +79,7 @@ following sections:
* ``[signing]`` - cryptographic signatures for PKI based tokens
* ``[ssl]`` - SSL configuration
* ``[auth]`` - Authentication plugin configuration
* ``[os_inherit]`` - Inherited Role Assignment extension
* ``[paste_deploy]`` - Pointer to the PasteDeploy configuration file
The Keystone primary configuration file is expected to be named ``keystone.conf``.
@@ -485,6 +486,17 @@ In addition to changing their password all of the users current tokens will be
deleted (if the backend used is kvs or sql)
Inherited Role Assignment Extension
-----------------------------------
Keystone provides an optional extension that adds the capability to assign roles to a domain that, rather than
affect the domain itself, are instead inherited to all projects owned by theat domain. This extension is disabled by
default, but can be enabled by including the following in ``keystone.conf``.
[os_inherit]
enabled = True
Sample Configuration Files
--------------------------

View File

@@ -109,6 +109,11 @@
# delegation and impersonation features can be optionally disabled
# enabled = True
[os_inherit]
# role-assignment inheritance to projects from owning domain can be
# optionally enabled
# enabled = False
[catalog]
# dynamic, sql-based backend (supports API/CLI-based management commands)
# driver = keystone.catalog.backends.sql.Catalog

View File

@@ -33,10 +33,16 @@ class Assignment(kvs.Base, assignment.Driver):
except exception.NotFound:
raise exception.ProjectNotFound(project_id=tenant_id)
def list_projects(self):
tenant_keys = filter(lambda x: x.startswith("tenant-"),
self.db.keys())
return [self.db.get(key) for key in tenant_keys]
def list_projects(self, domain_id=None):
project_keys = filter(lambda x: x.startswith("tenant-"),
self.db.keys())
project_refs = [self.db.get(key) for key in project_keys]
if domain_id:
self.get_domain(domain_id)
project_refs = filter(lambda x: domain_id in x['domain_id'],
project_refs)
return project_refs
def get_project_by_name(self, tenant_name, domain_id):
try:
@@ -105,13 +111,16 @@ class Assignment(kvs.Base, assignment.Driver):
metadata_ref = self._get_metadata(user_id, tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
roles = set(metadata_ref.get('roles', []))
if role_id in roles:
try:
metadata_ref['roles'] = self._add_role_to_role_dicts(
role_id, False, metadata_ref.get('roles', []),
allow_existing=False)
except KeyError:
msg = ('User %s already has role %s in tenant %s'
% (user_id, role_id, tenant_id))
raise exception.Conflict(type='role grant', details=msg)
roles.add(role_id)
metadata_ref['roles'] = list(roles)
self._update_metadata(user_id, tenant_id, metadata_ref)
def remove_role_from_user_and_project(self, user_id, tenant_id, role_id):
@@ -119,23 +128,25 @@ class Assignment(kvs.Base, assignment.Driver):
metadata_ref = self._get_metadata(user_id, tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
roles = set(metadata_ref.get('roles', []))
if role_id not in roles:
msg = 'Cannot remove role that has not been granted, %s' % role_id
raise exception.RoleNotFound(message=msg)
roles.remove(role_id)
metadata_ref['roles'] = list(roles)
try:
metadata_ref['roles'] = self._remove_role_from_role_dicts(
role_id, False, metadata_ref.get('roles', []))
except KeyError:
raise exception.RoleNotFound(message=_(
'Cannot remove role that has not been granted, %s') %
role_id)
if len(metadata_ref['roles']):
self._update_metadata(user_id, tenant_id, metadata_ref)
else:
if not len(roles):
self.db.delete('metadata-%s-%s' % (tenant_id, user_id))
user_ref = self._get_user(user_id)
tenants = set(user_ref.get('tenants', []))
tenants.remove(tenant_id)
user_ref['tenants'] = list(tenants)
self.identity_api.update_user(user_id, user_ref)
else:
self._update_metadata(user_id, tenant_id, metadata_ref)
def list_role_assignments(self):
"""List the role assignments.
@@ -144,7 +155,7 @@ class Assignment(kvs.Base, assignment.Driver):
"metadata-{target}-{actor}", with the value being a role list
i.e. "metadata-MyProjectID-MyUserID" [role1, role2]
i.e. "metadata-MyProjectID-MyUserID" [{'id': role1}, {'id': role2}]
...so we enumerate the list and extract the targets, actors
and roles.
@@ -169,7 +180,8 @@ class Assignment(kvs.Base, assignment.Driver):
template['group_id'] = meta_id2
entry = self.db.get(key)
for r in entry.get('roles', []):
for r in self._roles_from_role_dicts(entry.get('roles', {}),
False):
role_assignment = template.copy()
role_assignment['role_id'] = r
assignment_list.append(role_assignment)
@@ -324,7 +336,8 @@ class Assignment(kvs.Base, assignment.Driver):
self.db.set('role_list', list(role_list))
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
self.get_role(role_id)
if user_id:
@@ -341,14 +354,16 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
roles = set(metadata_ref.get('roles', []))
roles.add(role_id)
metadata_ref['roles'] = list(roles)
metadata_ref['roles'] = self._add_role_to_role_dicts(
role_id, inherited_to_projects, metadata_ref.get('roles', []))
self._update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -363,10 +378,14 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
return [self.get_role(x) for x in metadata_ref.get('roles', [])]
return [self.get_role(x) for x in
self._roles_from_role_dicts(metadata_ref.get('roles', []),
inherited_to_projects)]
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
self.get_role(role_id)
if user_id:
self.identity_api.get_user(user_id)
@@ -382,13 +401,17 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
role_ids = set(metadata_ref.get('roles', []))
role_ids = set(self._roles_from_role_dicts(
metadata_ref.get('roles', []), inherited_to_projects))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return self.get_role(role_id)
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
self.get_role(role_id)
if user_id:
self.identity_api.get_user(user_id)
@@ -404,12 +427,13 @@ class Assignment(kvs.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
roles = set(metadata_ref.get('roles', []))
try:
roles.remove(role_id)
metadata_ref['roles'] = self._remove_role_from_role_dicts(
role_id, inherited_to_projects, metadata_ref.get('roles', []))
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
metadata_ref['roles'] = list(roles)
self._update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)

View File

@@ -72,7 +72,9 @@ class Assignment(assignment.Driver):
def get_project(self, tenant_id):
return self._set_default_domain(self.project.get(tenant_id))
def list_projects(self):
def list_projects(self, domain_id=None):
# We don't support multiple domains within this driver, so ignore
# any domain passed.
return self._set_default_domain(self.project.get_all())
def get_project_by_name(self, tenant_name, domain_id):
@@ -117,7 +119,7 @@ class Assignment(assignment.Driver):
metadata_ref = _get_roles_for_just_user_and_project(user_id, tenant_id)
if not metadata_ref:
return {}
return {'roles': metadata_ref}
return {'roles': [self._role_to_dict(r, False) for r in metadata_ref]}
def get_role(self, role_id):
return self.role.get(role_id)

View File

@@ -96,7 +96,8 @@ class Assignment(sql.Base, assignment.Driver):
raise exception.MetadataNotFound()
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -110,6 +111,10 @@ class Assignment(sql.Base, assignment.Driver):
if project_id:
self._get_project(session, project_id)
if project_id and inherited_to_projects:
msg = _('Inherited roles can only be assigned to domains')
raise exception.Conflict(type='role grant', details=msg)
try:
metadata_ref = self._get_metadata(user_id, project_id,
domain_id, group_id)
@@ -117,9 +122,10 @@ class Assignment(sql.Base, assignment.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
roles = set(metadata_ref.get('roles', []))
roles.add(role_id)
metadata_ref['roles'] = list(roles)
metadata_ref['roles'] = self._add_role_to_role_dicts(
role_id, inherited_to_projects, metadata_ref.get('roles', []))
if is_new:
self._create_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
@@ -128,7 +134,8 @@ class Assignment(sql.Base, assignment.Driver):
domain_id, group_id)
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -144,10 +151,14 @@ class Assignment(sql.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
return [self.get_role(x) for x in metadata_ref.get('roles', [])]
return [self.get_role(x) for x in
self._roles_from_role_dicts(metadata_ref.get('roles', []),
inherited_to_projects)]
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -166,13 +177,15 @@ class Assignment(sql.Base, assignment.Driver):
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
role_ids = set(metadata_ref.get('roles', []))
role_ids = set(self._roles_from_role_dicts(
metadata_ref.get('roles', []), inherited_to_projects))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return role_ref.to_dict()
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
if user_id:
self.identity_api.get_user(user_id)
if group_id:
@@ -193,25 +206,43 @@ class Assignment(sql.Base, assignment.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
roles = set(metadata_ref.get('roles', []))
try:
roles.remove(role_id)
metadata_ref['roles'] = self._remove_role_from_role_dicts(
role_id, inherited_to_projects, metadata_ref.get('roles', []))
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
metadata_ref['roles'] = list(roles)
if is_new:
# TODO(henry-nash) It seems odd that you would create a new
# entry in response to trying to delete a role that was not
# assigned. Although benign, this should probably be removed.
self._create_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
else:
self._update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
def list_projects(self):
def list_projects(self, domain_id=None):
session = self.get_session()
tenant_refs = session.query(Project).all()
return [tenant_ref.to_dict() for tenant_ref in tenant_refs]
if domain_id:
self._get_domain(session, domain_id)
query = session.query(Project)
if domain_id:
query = query.filter_by(domain_id=domain_id)
project_refs = query.all()
return [project_ref.to_dict() for project_ref in project_refs]
def get_projects_for_user(self, user_id):
# FIXME(henry-nash) The following should take into account
# both group and inherited roles. In fact, I don't see why this
# call can't be handled at the controller level like we do
# with 'get_roles_for_user_and_project()'. Further, this
# call seems essentially the same as 'list_user_projects()'
# later in this driver. Both should be removed.
self.identity_api.get_user(user_id)
session = self.get_session()
query = session.query(UserProjectGrant)
@@ -230,13 +261,16 @@ class Assignment(sql.Base, assignment.Driver):
except exception.MetadataNotFound:
metadata_ref = {}
is_new = True
roles = set(metadata_ref.get('roles', []))
if role_id in roles:
try:
metadata_ref['roles'] = self._add_role_to_role_dicts(
role_id, False, metadata_ref.get('roles', []),
allow_existing=False)
except KeyError:
msg = ('User %s already has role %s in tenant %s'
% (user_id, role_id, tenant_id))
raise exception.Conflict(type='role grant', details=msg)
roles.add(role_id)
metadata_ref['roles'] = list(roles)
if is_new:
self._create_metadata(user_id, tenant_id, metadata_ref)
else:
@@ -245,14 +279,15 @@ class Assignment(sql.Base, assignment.Driver):
def remove_role_from_user_and_project(self, user_id, tenant_id, role_id):
try:
metadata_ref = self._get_metadata(user_id, tenant_id)
roles = set(metadata_ref.get('roles', []))
if role_id not in roles:
try:
metadata_ref['roles'] = self._remove_role_from_role_dicts(
role_id, False, metadata_ref.get('roles', []))
except KeyError:
raise exception.RoleNotFound(message=_(
'Cannot remove role that has not been granted, %s') %
role_id)
roles.remove(role_id)
metadata_ref['roles'] = list(roles)
if len(roles):
if len(metadata_ref['roles']):
self._update_metadata(user_id, tenant_id, metadata_ref)
else:
session = self.get_session()
@@ -277,28 +312,44 @@ class Assignment(sql.Base, assignment.Driver):
assignment_list = []
refs = session.query(UserDomainGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'user_id': x.user_id,
'domain_id': x.domain_id,
'role_id': r})
for r in self._roles_from_role_dicts(
x.data.get('roles', {}), False):
assignment_list.append({'user_id': x.user_id,
'domain_id': x.domain_id,
'role_id': r})
for r in self._roles_from_role_dicts(
x.data.get('roles', {}), True):
assignment_list.append({'user_id': x.user_id,
'domain_id': x.domain_id,
'role_id': r,
'inherited_to_projects': True})
refs = session.query(UserProjectGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'user_id': x.user_id,
'project_id': x.project_id,
'role_id': r})
for r in self._roles_from_role_dicts(
x.data.get('roles', {}), False):
assignment_list.append({'user_id': x.user_id,
'project_id': x.project_id,
'role_id': r})
refs = session.query(GroupDomainGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'group_id': x.group_id,
'domain_id': x.domain_id,
'role_id': r})
for r in self._roles_from_role_dicts(
x.data.get('roles', {}), False):
assignment_list.append({'group_id': x.group_id,
'domain_id': x.domain_id,
'role_id': r})
for r in self._roles_from_role_dicts(
x.data.get('roles', {}), True):
assignment_list.append({'group_id': x.group_id,
'domain_id': x.domain_id,
'role_id': r,
'inherited_to_projects': True})
refs = session.query(GroupProjectGrant).all()
for x in refs:
for r in x.data.get('roles', []):
assignment_list.append({'group_id': x.group_id,
'project_id': x.project_id,
'role_id': r})
for r in self._roles_from_role_dicts(
x.data.get('roles', {}), False):
assignment_list.append({'group_id': x.group_id,
'project_id': x.project_id,
'role_id': r})
return assignment_list
# CRUD
@@ -473,6 +524,14 @@ class Assignment(sql.Base, assignment.Driver):
session.flush()
def list_user_projects(self, user_id):
# FIXME(henry-nash) The following should take into account
# both group and inherited roles. In fact, I don't see why this
# call can't be handled at the controller level like we do
# with 'get_roles_for_user_and_project()'. Further, this
# call seems essentially the same as 'get_projects_for_user()'
# earlier in this driver. Both should be removed.
session = self.get_session()
user = self.identity_api.get_user(user_id)
metadata_refs = session\
@@ -627,6 +686,29 @@ class Role(sql.ModelBase, sql.DictBase):
class BaseGrant(sql.DictBase):
"""Base Grant class.
There are four grant tables in the current implementation, one for
each type of grant:
- User for Project
- User for Domain
- Group for Project
- Group for Domain
Each is a table with the two attributes above as a combined primary key,
with the data field holding all roles for that combination. The data
field is a list of dicts. For regular role assignments each dict in
the list of of the form:
{'id': role_id}
If the OS-INHERIT extension is enabled and the role on a domain is an
inherited role, the dict will be of the form:
{'id': role_id, 'inherited_to': 'projects'}
"""
def to_dict(self):
"""Override parent to_dict() method with a simpler implementation.

View File

@@ -59,33 +59,72 @@ class Manager(manager.Manager):
self.identity_api.assignment_api = self
def get_roles_for_user_and_project(self, user_id, tenant_id):
def _get_group_project_roles(user_id, tenant_id):
"""Get the roles associated with a user within given project.
This includes roles directly assigned to the user on the
project, as well as those by virtue of group membership. If
the OS-INHERIT extension is enabled, then this will also
include roles inherited from the domain.
:returns: a list of role ids.
:raises: keystone.exception.UserNotFound,
keystone.exception.ProjectNotFound
"""
def _get_group_project_roles(user_id, project_ref):
role_list = []
group_refs = (self.identity_api.list_groups_for_user
(user_id=user_id))
for x in group_refs:
try:
metadata_ref = self._get_metadata(group_id=x['id'],
tenant_id=tenant_id)
role_list += metadata_ref.get('roles', [])
metadata_ref = self._get_metadata(
group_id=x['id'], tenant_id=project_ref['id'])
role_list += self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
except exception.MetadataNotFound:
# no group grant, skip
pass
if CONF.os_inherit.enabled:
# Now get any inherited group roles for the owning domain
try:
metadata_ref = self._get_metadata(
group_id=x['id'],
domain_id=project_ref['domain_id'])
role_list += self._roles_from_role_dicts(
metadata_ref.get('roles', {}), True)
except (exception.MetadataNotFound,
exception.NotImplemented):
pass
return role_list
def _get_user_project_roles(user_id, tenant_id):
metadata_ref = {}
def _get_user_project_roles(user_id, project_ref):
role_list = []
try:
metadata_ref = self._get_metadata(user_id=user_id,
tenant_id=tenant_id)
tenant_id=project_ref['id'])
role_list = self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
except exception.MetadataNotFound:
pass
return metadata_ref.get('roles', [])
if CONF.os_inherit.enabled:
# Now get any inherited roles for the owning domain
try:
metadata_ref = self._get_metadata(
user_id=user_id, domain_id=project_ref['domain_id'])
role_list += self._roles_from_role_dicts(
metadata_ref.get('roles', {}), True)
except (exception.MetadataNotFound, exception.NotImplemented):
pass
return role_list
self.identity_api.get_user(user_id)
self.get_project(tenant_id)
user_role_list = _get_user_project_roles(user_id, tenant_id)
group_role_list = _get_group_project_roles(user_id, tenant_id)
project_ref = self.get_project(tenant_id)
user_role_list = _get_user_project_roles(user_id, project_ref)
group_role_list = _get_group_project_roles(user_id, project_ref)
# Use set() to process the list to remove any duplicates
return list(set(user_role_list + group_role_list))
@@ -106,11 +145,12 @@ class Manager(manager.Manager):
try:
metadata_ref = self._get_metadata(group_id=x['id'],
domain_id=domain_id)
role_list += metadata_ref.get('roles', [])
role_list += self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
except (exception.MetadataNotFound, exception.NotImplemented):
# MetadataNotFound implies no group grant, so skip.
# Ignore NotImplemented since not all backends support
# domains. pass
# domains.
pass
return role_list
@@ -124,7 +164,8 @@ class Manager(manager.Manager):
# Ignore NotImplemented since not all backends support
# domains
pass
return metadata_ref.get('roles', [])
return self._roles_from_role_dicts(
metadata_ref.get('roles', {}), False)
self.identity_api.get_user(user_id)
self.get_domain(domain_id)
@@ -160,6 +201,40 @@ class Manager(manager.Manager):
class Driver(object):
def _role_to_dict(self, role_id, inherited):
role_dict = {'id': role_id}
if inherited:
role_dict['inherited_to'] = 'projects'
return role_dict
def _roles_from_role_dicts(self, dict_list, inherited):
role_list = []
for d in dict_list:
if ((not d.get('inherited_to') and not inherited) or
(d.get('inherited_to') == 'projects' and inherited)):
role_list.append(d['id'])
return role_list
def _add_role_to_role_dicts(self, role_id, inherited, dict_list,
allow_existing=True):
# There is a difference in error semantics when trying to
# assign a role that already exists between the coded v2 and v3
# API calls. v2 will error if the assignment already exists,
# while v3 is silent. Setting the 'allow_existing' parameter
# appropriately lets this call be used for both.
role_set = set([frozenset(r.items()) for r in dict_list])
key = frozenset(self._role_to_dict(role_id, inherited).items())
if not allow_existing and key in role_set:
raise KeyError
role_set.add(key)
return [dict(r) for r in role_set]
def _remove_role_from_role_dicts(self, role_id, inherited, dict_list):
role_set = set([frozenset(r.items()) for r in dict_list])
role_set.remove(frozenset(self._role_to_dict(role_id,
inherited).items()))
return [dict(r) for r in role_set]
def get_project_by_name(self, tenant_name, domain_id):
"""Get a tenant by name.
@@ -209,9 +284,14 @@ class Driver(object):
# assignment/grant crud
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
"""Creates a new assignment/grant.
If the assignment is to a domain, then optionally it may be
specified as inherited to owned projects (this requires
the OS-INHERIT extension to be enabled).
:raises: keystone.exception.UserNotFound,
keystone.exception.GroupNotFound,
keystone.exception.ProjectNotFound,
@@ -223,7 +303,8 @@ class Driver(object):
raise exception.NotImplemented()
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
"""Lists assignments/grants.
:raises: keystone.exception.UserNotFound,
@@ -237,7 +318,8 @@ class Driver(object):
raise exception.NotImplemented()
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
"""Lists assignments/grants.
:raises: keystone.exception.UserNotFound,
@@ -251,7 +333,8 @@ class Driver(object):
raise exception.NotImplemented()
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
"""Lists assignments/grants.
:raises: keystone.exception.UserNotFound,
@@ -329,7 +412,7 @@ class Driver(object):
"""
raise exception.NotImplemented()
def list_projects(self):
def list_projects(self, domain_id=None):
"""List all projects in the system.
:returns: a list of project_refs or an empty list.

View File

@@ -214,6 +214,9 @@ def configure():
# trust
register_bool('enabled', group='trust', default=True)
# os_inherit
register_bool('enabled', group='os_inherit', default=False)
# ssl
register_bool('enable', group='ssl', default=False)
register_str('certfile', group='ssl',

View File

@@ -0,0 +1,190 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sql
def upgrade(migrate_engine):
# The group_project_metadata table was not updated in terms of its
# FK to the tenant table when the tenant->project change was made at
# the 015 migration for sqlite. This upgrade fixes that.
# We need to create a fake tenant table so that we can first load
# the group_project_metadata at all, then do a dance of copying tables
# to get us to the correct schema.
meta = sql.MetaData()
meta.bind = migrate_engine
if migrate_engine.name != 'sqlite':
return
temp_tenant_table = sql.Table(
'tenant',
meta,
sql.Column('id', sql.String(64), primary_key=True))
temp_tenant_table.create(migrate_engine, checkfirst=True)
sql.Table('user', meta, autoload=True)
old_group_metadata_table = sql.Table('group_project_metadata',
meta, autoload=True)
# OK, we now have the table loaded, create a first
# temporary table of a different name with the correct FK
sql.Table('project', meta, autoload=True)
temp_group_project_metadata_table = sql.Table(
'temp_group_project_metadata',
meta,
sql.Column(
'group_id',
sql.String(64),
primary_key=True),
sql.Column(
'project_id',
sql.String(64),
sql.ForeignKey('project.id'),
primary_key=True),
sql.Column('data', sql.Text()))
temp_group_project_metadata_table.create(migrate_engine, checkfirst=True)
# Populate the new temporary table, and then drop the old one
session = sql.orm.sessionmaker(bind=migrate_engine)()
for metadata in session.query(old_group_metadata_table):
q = temp_group_project_metadata_table.insert().values(
group_id=metadata.group_id,
project_id=metadata.project_id,
data=metadata.data)
session.execute(q)
session.commit()
old_group_metadata_table.drop()
temp_tenant_table.drop()
# Now do a final table copy to get the table of the right name.
# Re-init the metadata so that sqlalchemy does not get confused with
# multiple versions of the same named table.
meta2 = sql.MetaData()
meta2.bind = migrate_engine
sql.Table('project', meta2, autoload=True)
new_group_project_metadata_table = sql.Table(
'group_project_metadata',
meta2,
sql.Column(
'group_id',
sql.String(64),
primary_key=True),
sql.Column(
'project_id',
sql.String(64),
sql.ForeignKey('project.id'),
primary_key=True),
sql.Column('data', sql.Text()))
new_group_project_metadata_table.create(migrate_engine, checkfirst=True)
for metadata in session.query(temp_group_project_metadata_table):
q = new_group_project_metadata_table.insert().values(
group_id=metadata.group_id,
project_id=metadata.project_id,
data=metadata.data)
session.execute(q)
session.commit()
temp_group_project_metadata_table.drop()
def downgrade(migrate_engine):
# Put the group_project_metadata table back the way it was in its rather
# broken state. We don't try and re-write history, since otherwise people
# get out of step.
meta = sql.MetaData()
meta.bind = migrate_engine
if migrate_engine.name != 'sqlite':
return
sql.Table('user', meta, autoload=True)
sql.Table('project', meta, autoload=True)
group_metadata_table = sql.Table('group_project_metadata',
meta, autoload=True)
# We want to create a temp group meta table with the FK
# set to the wrong place.
temp_tenant_table = sql.Table(
'tenant',
meta,
sql.Column('id', sql.String(64), primary_key=True))
temp_tenant_table.create(migrate_engine, checkfirst=True)
temp_group_project_metadata_table = sql.Table(
'temp_group_project_metadata',
meta,
sql.Column(
'group_id',
sql.String(64),
primary_key=True),
sql.Column(
'project_id',
sql.String(64),
sql.ForeignKey('tenant.id'),
primary_key=True),
sql.Column('data', sql.Text()))
temp_group_project_metadata_table.create(migrate_engine, checkfirst=True)
# Now populate the temp table and drop the real one
session = sql.orm.sessionmaker(bind=migrate_engine)()
for metadata in session.query(group_metadata_table):
q = temp_group_project_metadata_table.insert().values(
group_id=metadata.group_id,
project_id=metadata.project_id,
data=metadata.data)
session.execute(q)
session.commit()
group_metadata_table.drop()
# Now copy again into the correctly named table. Re-init the metadata
# so that sqlalchemy does not get confused with multiple versions of the
# same named table.
meta2 = sql.MetaData()
meta2.bind = migrate_engine
sql.Table('tenant', meta2, autoload=True)
new_group_project_metadata_table = sql.Table(
'group_project_metadata',
meta2,
sql.Column(
'group_id',
sql.String(64),
primary_key=True),
sql.Column(
'project_id',
sql.String(64),
sql.ForeignKey('tenant.id'),
primary_key=True),
sql.Column('data', sql.Text()))
new_group_project_metadata_table.create(migrate_engine, checkfirst=True)
for metadata in session.query(temp_group_project_metadata_table):
q = new_group_project_metadata_table.insert().values(
group_id=metadata.group_id,
project_id=metadata.project_id,
data=metadata.data)
session.execute(q)
session.commit()
temp_group_project_metadata_table.drop()
temp_tenant_table.drop()

View File

@@ -0,0 +1,102 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import sqlalchemy as sql
def build_update(table_name, upgrade_table, row, values):
if table_name == 'user_project_metadata':
update = upgrade_table.update().where(
upgrade_table.c.user_id == row.user_id).where(
upgrade_table.c.project_id == row.project_id).values(values)
elif table_name == 'group_project_metadata':
update = upgrade_table.update().where(
upgrade_table.c.group_id == row.group_id).where(
upgrade_table.c.project_id == row.project_id).values(values)
elif table_name == 'user_domain_metadata':
update = upgrade_table.update().where(
upgrade_table.c.user_id == row.user_id).where(
upgrade_table.c.domain_id == row.domain_id).values(values)
else:
update = upgrade_table.update().where(
upgrade_table.c.group_id == row.group_id).where(
upgrade_table.c.domain_id == row.domain_id).values(values)
return update
def upgrade_grant_table(meta, migrate_engine, session, table_name):
# Convert the roles component of the metadata from a list
# of ids to a list of dicts
def list_to_dict_list(metadata):
json_metadata = json.loads(metadata)
if 'roles' in json_metadata:
json_metadata['roles'] = (
[{'id': x} for x in json_metadata['roles']])
return json.dumps(json_metadata)
upgrade_table = sql.Table(table_name, meta, autoload=True)
for assignment in session.query(upgrade_table):
values = {'data': list_to_dict_list(assignment.data)}
update = build_update(table_name, upgrade_table, assignment, values)
migrate_engine.execute(update)
def downgrade_grant_table(meta, migrate_engine, session, table_name):
# Convert the roles component of the metadata from a list
# of dicts to a simple list of ids. Any inherited roles are deleted
# since they would have no meaning
def dict_list_to_list(metadata):
json_metadata = json.loads(metadata)
if 'roles' in json_metadata:
json_metadata['roles'] = ([x['id'] for x in json_metadata['roles']
if 'inherited_to' not in x])
return json.dumps(json_metadata)
downgrade_table = sql.Table(table_name, meta, autoload=True)
for assignment in session.query(downgrade_table):
values = {'data': dict_list_to_list(assignment.data)}
update = build_update(table_name, downgrade_table, assignment, values)
migrate_engine.execute(update)
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
session = sql.orm.sessionmaker(bind=migrate_engine)()
for grant_table in ['user_project_metadata', 'user_domain_metadata',
'group_project_metadata', 'group_domain_metadata']:
upgrade_grant_table(meta, migrate_engine, session, grant_table)
session.commit()
session.close()
def downgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
session = sql.orm.sessionmaker(bind=migrate_engine)()
for grant_table in ['user_project_metadata', 'user_domain_metadata',
'group_project_metadata', 'group_domain_metadata']:
downgrade_grant_table(meta, migrate_engine, session, grant_table)
session.commit()
session.close()

View File

@@ -748,6 +748,11 @@ class RoleV3(controller.V3Controller):
msg = 'Specify a user or group, not both'
raise exception.ValidationError(msg)
def _check_if_inherited(self, context):
return (CONF.os_inherit.enabled and
context['path'].startswith('/OS-INHERIT') and
context['path'].endswith('/inherited_to_projects'))
@controller.protected
def create_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
@@ -756,7 +761,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
self.identity_api.create_grant(
role_id, user_id, group_id, domain_id, project_id)
role_id, user_id, group_id, domain_id, project_id,
self._check_if_inherited(context))
@controller.protected
def list_grants(self, context, user_id=None, group_id=None,
@@ -766,7 +772,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
refs = self.identity_api.list_grants(
user_id, group_id, domain_id, project_id)
user_id, group_id, domain_id, project_id,
self._check_if_inherited(context))
return RoleV3.wrap_collection(context, refs)
@controller.protected
@@ -777,7 +784,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
self.identity_api.get_grant(
role_id, user_id, group_id, domain_id, project_id)
role_id, user_id, group_id, domain_id, project_id,
self._check_if_inherited(context))
@controller.protected
def revoke_grant(self, context, role_id, user_id=None, group_id=None,
@@ -787,7 +795,8 @@ class RoleV3(controller.V3Controller):
self._require_user_xor_group(user_id, group_id)
self.identity_api.delete_grant(
role_id, user_id, group_id, domain_id, project_id)
role_id, user_id, group_id, domain_id, project_id,
self._check_if_inherited(context))
# Now delete any tokens for this user or, in the case of a group,
# tokens from all the uses who are members of this group.
@@ -815,13 +824,55 @@ class RoleAssignmentV3(controller.V3Controller):
pass
def _format_entity(self, entity):
"""Format an assignment entity for API response.
The driver layer returns entities as dicts containing the ids of the
actor (e.g. user or group), target (e.g. domain or project) and role.
If it is an inherited role, then this is also indicated. Examples:
{'user_id': user_id,
'project_id': domain_id,
'role_id': role_id}
or, for an inherited role:
{'user_id': user_id,
'domain_id': domain_id,
'role_id': role_id,
'inherited_to_projects': true}
This function maps this into the format to be returned via the API,
e.g. for the second example above:
{
'user': {
{'id': user_id}
},
'scope': {
'domain': {
{'id': domain_id}
},
'OS-INHERIT:inherited_to': 'projects
},
'role': {
{'id': role_id}
},
'links': {
'assignment': '/domains/domain_id/users/user_id/roles/'
'role_id/inherited_to_projects'
}
}
"""
formatted_entity = {}
suffix = ""
if 'user_id' in entity:
formatted_entity['user'] = {'id': entity['user_id']}
actor_link = '/users/%s' % entity['user_id']
actor_link = 'users/%s' % entity['user_id']
if 'group_id' in entity:
formatted_entity['group'] = {'id': entity['group_id']}
actor_link = '/groups/%s' % entity['group_id']
actor_link = 'groups/%s' % entity['group_id']
if 'role_id' in entity:
formatted_entity['role'] = {'id': entity['role_id']}
if 'project_id' in entity:
@@ -831,12 +882,21 @@ class RoleAssignmentV3(controller.V3Controller):
if 'domain_id' in entity:
formatted_entity['scope'] = (
{'domain': {'id': entity['domain_id']}})
target_link = '/domains/%s' % entity['domain_id']
if 'inherited_to_projects' in entity:
formatted_entity['scope']['OS-INHERIT:inherited_to'] = (
'projects')
target_link = '/OS-INHERIT/domains/%s' % entity['domain_id']
suffix = '/inherited_to_projects'
else:
target_link = '/domains/%s' % entity['domain_id']
formatted_entity.setdefault('links', {})
formatted_entity['links']['assignment'] = (
self.base_url(target_link + actor_link +
'/roles/%s' % entity['role_id']))
self.base_url('%(target)s/%(actor)s/roles/%(role)s%(suffix)s' % {
'target': target_link,
'actor': actor_link,
'role': entity['role_id'],
'suffix': suffix}))
return formatted_entity
def _expand_indirect_assignments(self, refs):
@@ -846,6 +906,10 @@ class RoleAssignmentV3(controller.V3Controller):
entity for each member of that group, and then remove the group
assignment entity itself from the list.
If the OS-INHERIT extension is enabled, then honor any inherited
roles on the domain by creating the equivalent on all projects
owned by the domain.
For any new entity created by virtue of group membership, add in an
additional link to that membership.
@@ -881,7 +945,8 @@ class RoleAssignmentV3(controller.V3Controller):
'role': ref.get('role_id')})
return members
def _build_equivalent_user_assignment(user, group_id, template):
def _build_user_assignment_equivalent_of_group(
user, group_id, template):
"""Create a user assignment equivalent to the group one.
The template has had the 'group' entity removed, so
@@ -900,33 +965,112 @@ class RoleAssignmentV3(controller.V3Controller):
'/projects/%s' % scope['project']['id'])
user_entry['links']['assignment'] = (
self.base_url('%s/users/%s/roles/%s' %
(target_link, m['id'],
(target_link, user['id'],
user_entry['role']['id'])))
user_entry['links']['membership'] = (
self.base_url('/groups/%s/users/%s' %
(group_id, user['id'])))
return user_entry
# Scan the list of entities for any group assignments, expanding
# them into equivalent user entities. Due to potential large
# expansion of group entities, rather than modify the
def _build_project_equivalent_of_user_domain_role(
project_id, domain_id, template):
"""Create a user project assignment equivalent to the domain one.
The template has had the 'domain' entity removed, so
substitute a 'project' one, modifying the 'assignment' link
to match.
"""
project_entry = copy.deepcopy(template)
project_entry['scope']['project'] = {'id': project_id}
project_entry['links']['assignment'] = (
self.base_url(
'/OS-INHERIT/domains/%s/users/%s/roles/%s'
'/inherited_to_projects' % (
domain_id, project_entry['user']['id'],
project_entry['role']['id'])))
return project_entry
def _build_project_equivalent_of_group_domain_role(
user_id, group_id, project_id, domain_id, template):
"""Create a user project equivalent to the domain group one.
The template has had the 'domain' and 'group' entities removed, so
substitute a 'user-project' one, modifying the 'assignment' link
to match.
"""
project_entry = copy.deepcopy(template)
project_entry['user'] = {'id': user_id}
project_entry['scope']['project'] = {'id': project_id}
project_entry['links']['assignment'] = (
self.base_url('/OS-INHERIT/domains/%s/groups/%s/roles/%s'
'/inherited_to_projects' % (
domain_id, group_id,
project_entry['role']['id'])))
project_entry['links']['membership'] = (
self.base_url('/groups/%s/users/%s' %
(group_id, user_id)))
return project_entry
# Scan the list of entities for any assignments that need to be
# expanded.
#
# If the OS-INERIT extension is enabled, the refs lists may
# contain roles to be inherited from domain to project, so expand
# these as well into project equivalents
#
# For any regular group entries, expand these into user entries based
# on membership of that group.
#
# Due to the potentially large expansions, rather than modify the
# list we are enumerating, we build a new one as we go.
#
new_refs = []
for r in refs:
if 'group' in r:
# As it is a group role assignment, first get the list of
# members.
if 'OS-INHERIT:inherited_to' in r['scope']:
# It's an inherited domain role - so get the list of projects
# owned by this domain. A domain scope is guaranteed since we
# checked this when we built the refs list
project_ids = (
[x['id'] for x in self.assignment_api.list_projects(
r['scope']['domain']['id'])])
base_entry = copy.deepcopy(r)
domain_id = base_entry['scope']['domain']['id']
base_entry['scope'].pop('domain')
# For each project, create an equivalent role assignment
for p in project_ids:
# If it's a group assignment, then create equivalent user
# roles based on membership of the group
if 'group' in base_entry:
members = _get_group_members(base_entry)
sub_entry = copy.deepcopy(base_entry)
group_id = sub_entry['group']['id']
sub_entry.pop('group')
for m in members:
new_entry = (
_build_project_equivalent_of_group_domain_role(
m['id'], group_id, p,
domain_id, sub_entry))
new_refs.append(new_entry)
else:
new_entry = (
_build_project_equivalent_of_user_domain_role(
p, domain_id, base_entry))
new_refs.append(new_entry)
elif 'group' in r:
# It's a non-inherited group role assignment, so get the list
# of members.
members = _get_group_members(r)
# Now replace that group role assignment entry with an
# equivalent user role assignment for each of the group members
base_entry = copy.deepcopy(r)
group_id = base_entry['group']['id']
base_entry.pop('group')
for m in members:
user_entry = _build_equivalent_user_assignment(
user_entry = _build_user_assignment_equivalent_of_group(
m, group_id, base_entry)
new_refs.append(user_entry)
else:
@@ -954,9 +1098,16 @@ class RoleAssignmentV3(controller.V3Controller):
val = True
return val
def _filter_inherited(self, entry):
if ('inherited_to_projects' in entry and
not CONF.os_inherit.enabled):
return False
else:
return True
@controller.filterprotected('group.id', 'role.id',
'scope.domain.id', 'scope.project.id',
'user.id')
'scope.OS-INHERIT:inherited_to', 'user.id')
def list_role_assignments(self, context, filters):
# TODO(henry-nash): This implementation uses the standard filtering
@@ -966,7 +1117,9 @@ class RoleAssignmentV3(controller.V3Controller):
# kept a minimum.
refs = self.identity_api.list_role_assignments()
formatted_refs = [self._format_entity(x) for x in refs]
formatted_refs = (
[self._format_entity(x) for x in refs
if self._filter_inherited(x)])
if ('effective' in context['query_string'] and
self._query_filter_is_true(

View File

@@ -107,8 +107,8 @@ class Manager(manager.Manager):
def get_project(self, tenant_id):
return self.assignment.get_project(tenant_id)
def list_projects(self):
return self.assignment.list_projects()
def list_projects(self, domain_id=None):
return self.assignment.list_projects(domain_id)
def get_role(self, role_id):
return self.assignment.get_role(role_id)
@@ -156,24 +156,32 @@ class Manager(manager.Manager):
return self.assignment.update_role(role_id, role)
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
return (self.assignment.create_grant
(role_id, user_id, group_id, domain_id, project_id))
(role_id, user_id, group_id, domain_id, project_id,
inherited_to_projects))
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
return (self.assignment.list_grants
(user_id, group_id, domain_id, project_id))
(user_id, group_id, domain_id, project_id,
inherited_to_projects))
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
return (self.assignment.get_grant
(role_id, user_id, group_id, domain_id, project_id))
(role_id, user_id, group_id, domain_id, project_id,
inherited_to_projects))
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
domain_id=None, project_id=None,
inherited_to_projects=False):
return (self.assignment.delete_grant
(role_id, user_id, group_id, domain_id, project_id))
(role_id, user_id, group_id, domain_id, project_id,
inherited_to_projects))
def create_domain(self, domain_id, domain):
return self.assignment.create_domain(domain_id, domain)
@@ -214,6 +222,7 @@ class Driver(object):
:raises: AssertionError
"""
raise exception.NotImplemented()
# user crud
def create_user(self, user_id, user):

View File

@@ -16,6 +16,7 @@
"""WSGI Routers for the Identity service."""
from keystone.common import router
from keystone.common import wsgi
from keystone import config
from keystone.identity import controllers
@@ -174,6 +175,47 @@ def append_v3_routers(mapper, routers):
action='revoke_grant',
conditions=dict(method=['DELETE']))
if config.CONF.os_inherit.enabled:
mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
'/roles/{role_id}/inherited_to_projects'),
controller=role_controller,
action='create_grant',
conditions=dict(method=['PUT']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
'/roles/{role_id}/inherited_to_projects'),
controller=role_controller,
action='create_grant',
conditions=dict(method=['PUT']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
'/roles/{role_id}/inherited_to_projects'),
controller=role_controller,
action='check_grant',
conditions=dict(method=['HEAD']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
'/roles/{role_id}/inherited_to_projects'),
controller=role_controller,
action='check_grant',
conditions=dict(method=['HEAD']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
'/roles/inherited_to_projects'),
controller=role_controller,
action='list_grants',
conditions=dict(method=['GET']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
'/roles/inherited_to_projects'),
controller=role_controller,
action='list_grants',
conditions=dict(method=['GET']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/users/{user_id}'
'/roles/{role_id}/inherited_to_projects'),
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))
mapper.connect(('/OS-INHERIT/domains/{domain_id}/groups/{group_id}'
'/roles/{role_id}/inherited_to_projects'),
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))
routers.append(
router.Router(controllers.RoleAssignmentV3(),
'role_assignments', 'role_assignment'))

View File

@@ -1106,12 +1106,17 @@ class IdentityTests(object):
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group1['id'], group1)
group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group2['id'], group2)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.identity_api.create_project(project1['id'], project1)
self.identity_api.add_user_to_group(user1['id'],
group1['id'])
self.identity_api.add_user_to_group(user1['id'],
group2['id'])
roles_ref = self.identity_api.list_grants(
user_id=user1['id'],
@@ -1766,6 +1771,30 @@ class IdentityTests(object):
self.assertIn(self.tenant_bar['id'], project_ids)
self.assertIn(self.tenant_baz['id'], project_ids)
def test_list_projects_for_domain(self):
project_ids = ([x['id'] for x in
self.assignment_api.list_projects(DEFAULT_DOMAIN_ID)])
self.assertEquals(len(project_ids), 4)
self.assertIn(self.tenant_bar['id'], project_ids)
self.assertIn(self.tenant_baz['id'], project_ids)
self.assertIn(self.tenant_mtu['id'], project_ids)
self.assertIn(self.tenant_service['id'], project_ids)
def test_list_projects_for_alternate_domain(self):
domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_domain(domain1['id'], domain1)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.assignment_api.create_project(project1['id'], project1)
project2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.assignment_api.create_project(project2['id'], project2)
project_ids = ([x['id'] for x in
self.assignment_api.list_projects(domain1['id'])])
self.assertEquals(len(project_ids), 2)
self.assertIn(project1['id'], project_ids)
self.assertIn(project2['id'], project_ids)
def test_list_roles(self):
roles = self.identity_api.list_roles()
for test_role in default_fixtures.ROLES:
@@ -2690,3 +2719,164 @@ class PolicyTests(object):
self.assertRaises(exception.PolicyNotFound,
self.policy_api.delete_policy,
uuid.uuid4().hex)
class InheritanceTests(object):
def test_inherited_role_grants_for_user(self):
"""Test inherited user roles.
Test Plan:
- Enable OS-INHERIT extension
- Create 3 roles
- Create a domain, with a project and a user
- Check no roles yet exit
- Assign a direct user role to the project and a (non-inherited)
user role to the domain
- Get a list of effective roles - should only get the one direct role
- Now add an inherited user role to the domain
- Get a list of effective roles - should have two roles, one
direct and one by virtue of the inherited user role
- Also get effective roles for the domain - the role marked as
inherited should not show up
"""
self.opt_in_group('os_inherit', enabled=True)
role_list = []
for _ in range(3):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
role_list.append(role)
domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain1['id'], domain1)
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'password': uuid.uuid4().hex,
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.identity_api.create_project(project1['id'], project1)
roles_ref = self.identity_api.list_grants(
user_id=user1['id'],
project_id=project1['id'])
self.assertEquals(len(roles_ref), 0)
# Create the first two roles - the domain one is not inherited
self.identity_api.create_grant(user_id=user1['id'],
project_id=project1['id'],
role_id=role_list[0]['id'])
self.identity_api.create_grant(user_id=user1['id'],
domain_id=domain1['id'],
role_id=role_list[1]['id'])
# Now get the effective roles for the user and project, this
# should only include the direct role assignment on the project
combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 1)
self.assertIn(role_list[0]['id'], combined_role_list)
# Now add an inherited role on the domain
self.identity_api.create_grant(user_id=user1['id'],
domain_id=domain1['id'],
role_id=role_list[2]['id'],
inherited_to_projects=True)
# Now get the effective roles for the user and project again, this
# should now include the inherited role on the domain
combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 2)
self.assertIn(role_list[0]['id'], combined_role_list)
self.assertIn(role_list[2]['id'], combined_role_list)
# Finally, check that the inherited role does not appear as a valid
# directly assigned role on the domain itself
combined_role_list = self.identity_api.get_roles_for_user_and_domain(
user1['id'], domain1['id'])
self.assertEquals(len(combined_role_list), 1)
self.assertIn(role_list[1]['id'], combined_role_list)
def test_inherited_role_grants_for_group(self):
"""Test inherited group roles.
Test Plan:
- Enable OS-INHERIT extension
- Create 4 roles
- Create a domain, with a project, user and two groups
- Make the user a member of both groups
- Check no roles yet exit
- Assign a direct user role to the project and a (non-inherited)
group role on the domain
- Get a list of effective roles - should only get the one direct role
- Now add two inherited group roles to the domain
- Get a list of effective roles - should have three roles, one
direct and two by virtue of inherited group roles
"""
self.opt_in_group('os_inherit', enabled=True)
role_list = []
for _ in range(4):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
role_list.append(role)
domain1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(domain1['id'], domain1)
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'password': uuid.uuid4().hex,
'enabled': True}
self.identity_api.create_user(user1['id'], user1)
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group1['id'], group1)
group2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id'], 'enabled': True}
self.identity_api.create_group(group2['id'], group2)
project1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
'domain_id': domain1['id']}
self.identity_api.create_project(project1['id'], project1)
self.identity_api.add_user_to_group(user1['id'],
group1['id'])
self.identity_api.add_user_to_group(user1['id'],
group2['id'])
roles_ref = self.identity_api.list_grants(
user_id=user1['id'],
project_id=project1['id'])
self.assertEquals(len(roles_ref), 0)
# Create two roles - the domain one is not inherited
self.identity_api.create_grant(user_id=user1['id'],
project_id=project1['id'],
role_id=role_list[0]['id'])
self.identity_api.create_grant(group_id=group1['id'],
domain_id=domain1['id'],
role_id=role_list[1]['id'])
# Now get the effective roles for the user and project, this
# should only include the direct role assignment on the project
combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 1)
self.assertIn(role_list[0]['id'], combined_role_list)
# Now add to more group roles, both inherited, to the domain
self.identity_api.create_grant(group_id=group2['id'],
domain_id=domain1['id'],
role_id=role_list[2]['id'],
inherited_to_projects=True)
self.identity_api.create_grant(group_id=group2['id'],
domain_id=domain1['id'],
role_id=role_list[3]['id'],
inherited_to_projects=True)
# Now get the effective roles for the user and project again, this
# should now include the inherited roles on the domain
combined_role_list = self.identity_api.get_roles_for_user_and_project(
user1['id'], project1['id'])
self.assertEquals(len(combined_role_list), 3)
self.assertIn(role_list[0]['id'], combined_role_list)
self.assertIn(role_list[2]['id'], combined_role_list)
self.assertIn(role_list[3]['id'], combined_role_list)

View File

@@ -630,6 +630,10 @@ class LDAPIdentity(test.TestCase, BaseLDAPIdentity):
user1['id'], CONF.identity.default_domain_id)
self.assertEquals(len(combined_role_list), 0)
def test_list_projects_for_alternate_domain(self):
raise nose.exc.SkipTest(
'N/A: LDAP does not support multiple domains')
class LDAPIdentityEnabledEmulation(LDAPIdentity):
def setUp(self):

View File

@@ -410,3 +410,7 @@ class SqlCatalog(SqlTests, test_backend.CatalogTests):
class SqlPolicy(SqlTests, test_backend.PolicyTests):
pass
class SqlInheritance(SqlTests, test_backend.InheritanceTests):
pass

View File

@@ -824,6 +824,304 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(ref.legacy_endpoint_id, legacy_endpoint_id)
self.assertEqual(ref.extra, '{}')
def test_group_project_FK_fixup(self):
# To create test data we must start before we broke in the
# group_project_metadata table in 015.
self.upgrade(14)
session = self.Session()
domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
tenant_table = sqlalchemy.Table('tenant', self.metadata, autoload=True)
role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
group_project_metadata_table = sqlalchemy.Table(
'group_project_metadata', self.metadata, autoload=True)
# Create a Domain
domain = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'enabled': True}
session.execute(domain_table.insert().values(domain))
# Create two Tenants
tenant = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'extra': "{}"}
session.execute(tenant_table.insert().values(tenant))
tenant1 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'extra': "{}"}
session.execute(tenant_table.insert().values(tenant1))
# Create a Group
group = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'extra': json.dumps({})}
session.execute(group_table.insert().values(group))
# Create roles
role_list = []
for _ in range(2):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
session.execute(role_table.insert().values(role))
role_list.append(role)
# Grant Role to User on Project
role_grant = {'group_id': group['id'],
'project_id': tenant['id'],
'data': json.dumps({'roles': [role_list[0]['id']]})}
session.execute(
group_project_metadata_table.insert().values(role_grant))
role_grant = {'group_id': group['id'],
'project_id': tenant1['id'],
'data': json.dumps({'roles': [role_list[1]['id']]})}
session.execute(
group_project_metadata_table.insert().values(role_grant))
session.commit()
# Now upgrade and fix up the FKs
self.upgrade(28)
self.assertTableExists('group_project_metadata')
self.assertTableExists('project')
self.assertTableDoesNotExist('tenant')
s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
(group_project_metadata_table.c.group_id == group['id']) &
(group_project_metadata_table.c.project_id == tenant['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[0]['id'], data['roles'])
s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
(group_project_metadata_table.c.group_id == group['id']) &
(group_project_metadata_table.c.project_id == tenant1['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[1]['id'], data['roles'])
self.downgrade(27)
self.assertTableExists('group_project_metadata')
self.assertTableExists('project')
self.assertTableDoesNotExist('tenant')
def test_assignment_metadata_migration(self):
self.upgrade(28)
# Scaffolding
session = self.Session()
domain_table = sqlalchemy.Table('domain', self.metadata, autoload=True)
user_table = sqlalchemy.Table('user', self.metadata, autoload=True)
group_table = sqlalchemy.Table('group', self.metadata, autoload=True)
role_table = sqlalchemy.Table('role', self.metadata, autoload=True)
project_table = sqlalchemy.Table(
'project', self.metadata, autoload=True)
user_project_metadata_table = sqlalchemy.Table(
'user_project_metadata', self.metadata, autoload=True)
user_domain_metadata_table = sqlalchemy.Table(
'user_domain_metadata', self.metadata, autoload=True)
group_project_metadata_table = sqlalchemy.Table(
'group_project_metadata', self.metadata, autoload=True)
group_domain_metadata_table = sqlalchemy.Table(
'group_domain_metadata', self.metadata, autoload=True)
# Create a Domain
domain = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'enabled': True}
session.execute(domain_table.insert().values(domain))
# Create anther Domain
domain2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'enabled': True}
session.execute(domain_table.insert().values(domain2))
# Create a Project
project = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'extra': "{}"}
session.execute(project_table.insert().values(project))
# Create another Project
project2 = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'extra': "{}"}
session.execute(project_table.insert().values(project2))
# Create a User
user = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'password': uuid.uuid4().hex,
'enabled': True,
'extra': json.dumps({})}
session.execute(user_table.insert().values(user))
# Create a Group
group = {'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'domain_id': domain['id'],
'extra': json.dumps({})}
session.execute(group_table.insert().values(group))
# Create roles
role_list = []
for _ in range(7):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
session.execute(role_table.insert().values(role))
role_list.append(role)
# Grant Role to User on Project
role_grant = {'user_id': user['id'],
'project_id': project['id'],
'data': json.dumps({'roles': [role_list[0]['id']]})}
session.execute(
user_project_metadata_table.insert().values(role_grant))
role_grant = {'user_id': user['id'],
'project_id': project2['id'],
'data': json.dumps({'roles': [role_list[1]['id']]})}
session.execute(
user_project_metadata_table.insert().values(role_grant))
# Grant Role to Group on different Project
role_grant = {'group_id': group['id'],
'project_id': project2['id'],
'data': json.dumps({'roles': [role_list[2]['id']]})}
session.execute(
group_project_metadata_table.insert().values(role_grant))
# Grant Role to User on Domain
role_grant = {'user_id': user['id'],
'domain_id': domain['id'],
'data': json.dumps({'roles': [role_list[3]['id']]})}
session.execute(user_domain_metadata_table.insert().values(role_grant))
# Grant Role to Group on Domain
role_grant = {'group_id': group['id'],
'domain_id': domain['id'],
'data': json.dumps(
{'roles': [role_list[4]['id']],
'other': 'somedata'})}
session.execute(
group_domain_metadata_table.insert().values(role_grant))
session.commit()
self.upgrade(29)
s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
(user_project_metadata_table.c.user_id == user['id']) &
(user_project_metadata_table.c.project_id == project['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn({'id': role_list[0]['id']}, data['roles'])
s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
(user_project_metadata_table.c.user_id == user['id']) &
(user_project_metadata_table.c.project_id == project2['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn({'id': role_list[1]['id']}, data['roles'])
s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
(group_project_metadata_table.c.group_id == group['id']) &
(group_project_metadata_table.c.project_id == project2['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn({'id': role_list[2]['id']}, data['roles'])
s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
(user_domain_metadata_table.c.user_id == user['id']) &
(user_domain_metadata_table.c.domain_id == domain['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn({'id': role_list[3]['id']}, data['roles'])
s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
(group_domain_metadata_table.c.group_id == group['id']) &
(group_domain_metadata_table.c.domain_id == domain['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn({'id': role_list[4]['id']}, data['roles'])
self.assertIn('other', data)
# Now add an entry that has one regular and one inherited role
role_grant = {'user_id': user['id'],
'domain_id': domain2['id'],
'data': json.dumps(
{'roles': [{'id': role_list[5]['id']},
{'id': role_list[6]['id'],
'inherited_to': 'projects'}]})}
session.execute(user_domain_metadata_table.insert().values(role_grant))
session.commit()
self.downgrade(28)
s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
(user_project_metadata_table.c.user_id == user['id']) &
(user_project_metadata_table.c.project_id == project['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[0]['id'], data['roles'])
s = sqlalchemy.select([user_project_metadata_table.c.data]).where(
(user_project_metadata_table.c.user_id == user['id']) &
(user_project_metadata_table.c.project_id == project2['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[1]['id'], data['roles'])
s = sqlalchemy.select([group_project_metadata_table.c.data]).where(
(group_project_metadata_table.c.group_id == group['id']) &
(group_project_metadata_table.c.project_id == project2['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[2]['id'], data['roles'])
s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
(user_domain_metadata_table.c.user_id == user['id']) &
(user_domain_metadata_table.c.domain_id == domain['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[3]['id'], data['roles'])
s = sqlalchemy.select([group_domain_metadata_table.c.data]).where(
(group_domain_metadata_table.c.group_id == group['id']) &
(group_domain_metadata_table.c.domain_id == domain['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[4]['id'], data['roles'])
self.assertIn('other', data)
# For user-domain2, where we had one regular and one inherited role,
# only the direct role should remain, the inherited role should
# have been deleted during the downgrade
s = sqlalchemy.select([user_domain_metadata_table.c.data]).where(
(user_domain_metadata_table.c.user_id == user['id']) &
(user_domain_metadata_table.c.domain_id == domain2['id']))
r = session.execute(s)
data = json.loads(r.fetchone()['data'])
self.assertEqual(len(data['roles']), 1)
self.assertIn(role_list[5]['id'], data['roles'])
def populate_user_table(self, with_pass_enab=False,
with_pass_enab_domain=False):
# Populate the appropriate fields in the user

View File

@@ -338,7 +338,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
links['previous'])
def assertValidListResponse(self, resp, key, entity_validator, ref=None,
expected_length=None):
expected_length=None, keys_to_check=None):
"""Make assertions common to all API list responses.
If a reference is provided, it's ID will be searched for in the
@@ -359,11 +359,12 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
for entity in entities:
self.assertIsNotNone(entity)
self.assertValidEntity(entity)
self.assertValidEntity(entity, keys_to_check=keys_to_check)
entity_validator(entity)
if ref:
entity = [x for x in entities if x['id'] == ref['id']][0]
self.assertValidEntity(entity, ref)
self.assertValidEntity(entity, ref=ref,
keys_to_check=keys_to_check)
entity_validator(entity, ref)
return entities
@@ -372,17 +373,21 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
"""Make assertions common to all API responses."""
entity = resp.result.get(key)
self.assertIsNotNone(entity)
self.assertValidEntity(entity, *args, **kwargs)
keys = kwargs.pop('keys_to_check', None)
self.assertValidEntity(entity, keys_to_check=keys, *args, **kwargs)
entity_validator(entity, *args, **kwargs)
return entity
def assertValidEntity(self, entity, ref=None):
def assertValidEntity(self, entity, ref=None, keys_to_check=None):
"""Make assertions common to all API entities.
If a reference is provided, the entity will also be compared against
the reference.
"""
keys = ['name', 'description', 'enabled']
if keys_to_check:
keys = keys_to_check
else:
keys = ['name', 'description', 'enabled']
for k in ['id'] + keys:
msg = '%s unexpectedly None in %s' % (k, entity)
@@ -705,6 +710,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
resp,
'roles',
self.assertValidRole,
keys_to_check=['name'],
*args,
**kwargs)
@@ -713,6 +719,7 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
resp,
'role',
self.assertValidRole,
keys_to_check=['name'],
*args,
**kwargs)

View File

@@ -16,11 +16,64 @@
import uuid
from keystone import config
from keystone import exception
import test_v3
def _build_role_assignment_url_and_entity(
role_id, user_id=None, group_id=None, domain_id=None,
project_id=None, inherited_to_projects=False,
effective=False):
if user_id and domain_id:
url = ('/domains/%(domain_id)s/users/%(user_id)s'
'/roles/%(role_id)s' % {
'domain_id': domain_id,
'user_id': user_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'user': {'id': user_id},
'scope': {'domain': {'id': domain_id}}}
if inherited_to_projects:
url = '/OS-INHERIT%s/inherited_to_projects' % url
if not effective:
entity['OS-INHERIT:inherited_to'] = 'projects'
elif user_id and project_id:
url = ('/projects/%(project_id)s/users/%(user_id)s'
'/roles/%(role_id)s' % {
'project_id': project_id,
'user_id': user_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'user': {'id': user_id},
'scope': {'project': {'id': project_id}}}
if group_id and domain_id:
url = ('/domains/%(domain_id)s/groups/%(group_id)s'
'/roles/%(role_id)s' % {
'domain_id': domain_id,
'group_id': group_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'group': {'id': group_id},
'scope': {'domain': {'id': domain_id}}}
if inherited_to_projects:
url = '/OS-INHERIT%s/inherited_to_projects' % url
if not effective:
entity['OS-INHERIT:inherited_to'] = 'projects'
elif group_id and project_id:
url = ('/projects/%(project_id)s/groups/%(group_id)s'
'/roles/%(role_id)s' % {
'project_id': project_id,
'group_id': group_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'group': {'id': group_id},
'scope': {'project': {'id': project_id}}}
return (url, entity)
class IdentityTestCase(test_v3.RestfulTestCase):
"""Test domains, projects, users, groups, & role CRUD."""
@@ -628,48 +681,6 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleListResponse(r, expected_length=0)
self.assertIn(collection_url, r.result['links']['self'])
def _build_role_assignment_url_and_entity(
self, role_id, user_id=None, group_id=None, domain_id=None,
project_id=None):
if user_id and domain_id:
url = ('/domains/%(domain_id)s/users/%(user_id)s'
'/roles/%(role_id)s' % {
'domain_id': domain_id,
'user_id': user_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'user': {'id': user_id},
'scope': {'domain': {'id': domain_id}}}
elif user_id and project_id:
url = ('/projects/%(project_id)s/users/%(user_id)s'
'/roles/%(role_id)s' % {
'project_id': project_id,
'user_id': user_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'user': {'id': user_id},
'scope': {'project': {'id': project_id}}}
if group_id and domain_id:
url = ('/domains/%(domain_id)s/groups/%(group_id)s'
'/roles/%(role_id)s' % {
'domain_id': domain_id,
'group_id': group_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'group': {'id': group_id},
'scope': {'domain': {'id': domain_id}}}
elif group_id and project_id:
url = ('/projects/%(project_id)s/groups/%(group_id)s'
'/roles/%(role_id)s' % {
'project_id': project_id,
'group_id': group_id,
'role_id': role_id})
entity = {'role': {'id': role_id},
'group': {'id': group_id},
'scope': {'project': {'id': project_id}}}
return (url, entity)
def test_get_role_assignments(self):
"""Call ``GET /role_assignments``.
@@ -712,7 +723,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# Now add one of each of the four types of assignment, making sure
# that we get them all back.
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
@@ -722,7 +733,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
existing_assignments + 1)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role_id)
self.put(ud_url)
@@ -732,7 +743,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
existing_assignments + 2)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
gp_url, gp_entity = self._build_role_assignment_url_and_entity(
gp_url, gp_entity = _build_role_assignment_url_and_entity(
project_id=self.project_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gp_url)
@@ -742,7 +753,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
existing_assignments + 3)
self.assertRoleAssignmentInListResponse(r, gp_entity, link_url=gp_url)
up_url, up_entity = self._build_role_assignment_url_and_entity(
up_url, up_entity = _build_role_assignment_url_and_entity(
project_id=self.project_id, user_id=self.user1['id'],
role_id=self.role_id)
self.put(up_url)
@@ -798,7 +809,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertIn(collection_url, r.result['links']['self'])
existing_assignments = len(r.result.get('role_assignments'))
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
@@ -816,11 +827,11 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')),
existing_assignments + 2)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user2['id'],
role_id=self.role_id)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
@@ -866,7 +877,7 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleAssignmentListResponse(r)
existing_assignments = len(r.result.get('role_assignments'))
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group_id,
role_id=self.role_id)
self.put(gd_url)
@@ -954,22 +965,22 @@ class IdentityTestCase(test_v3.RestfulTestCase):
# Now add one of each of the four types of assignment
gd_url, gd_entity = self._build_role_assignment_url_and_entity(
gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, group_id=self.group1['id'],
role_id=self.role1['id'])
self.put(gd_url)
ud_url, ud_entity = self._build_role_assignment_url_and_entity(
ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role2['id'])
self.put(ud_url)
gp_url, gp_entity = self._build_role_assignment_url_and_entity(
gp_url, gp_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], group_id=self.group1['id'],
role_id=self.role1['id'])
self.put(gp_url)
up_url, up_entity = self._build_role_assignment_url_and_entity(
up_url, up_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role2['id'])
self.put(up_url)
@@ -1038,10 +1049,10 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
# ...and the two via group membership...
up1_url, up1_entity = self._build_role_assignment_url_and_entity(
up1_url, up1_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role1['id'])
ud1_url, ud1_entity = self._build_role_assignment_url_and_entity(
ud1_url, ud1_entity = _build_role_assignment_url_and_entity(
domain_id=self.domain_id, user_id=self.user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up1_entity,
@@ -1062,9 +1073,459 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
# Should have one direct role and one from group membership...
up1_url, up1_entity = self._build_role_assignment_url_and_entity(
up1_url, up1_entity = _build_role_assignment_url_and_entity(
project_id=self.project1['id'], user_id=self.user1['id'],
role_id=self.role1['id'])
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=up_url)
self.assertRoleAssignmentInListResponse(r, up1_entity,
link_url=up1_url)
class IdentityIneritanceTestCase(test_v3.RestfulTestCase):
"""Test inheritance crud and its effects."""
def setUp(self):
self.orig_extension_enablement = config.CONF.os_inherit.enabled
self.opt_in_group('os_inherit', enabled=True)
super(IdentityIneritanceTestCase, self).setUp()
def tearDown(self):
super(IdentityIneritanceTestCase, self).tearDown()
self.opt_in_group('os_inherit', enabled=self.orig_extension_enablement)
def test_crud_user_inherited_domain_role_grants(self):
role_list = []
for _ in range(2):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role['id'], role)
role_list.append(role)
# Create a non-inherited role as a spoiler
self.assignment_api.create_grant(
role_list[1]['id'], user_id=self.user['id'],
domain_id=self.domain_id)
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domain_id,
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role_list[0]['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url)
# Check we can read it back
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=role_list[0])
self.assertIn(collection_url, r.result['links']['self'])
# Now delete and check its gone
self.delete(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, expected_length=0)
self.assertIn(collection_url, r.result['links']['self'])
def test_crud_inherited_role_grants_failed_if_disabled(self):
# Disable the extension and check no API calls can be issued
self.opt_in_group('os_inherit', enabled=False)
super(IdentityIneritanceTestCase, self).setUp()
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role['id'], role)
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': self.domain_id,
'user_id': self.user['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url, expected_status=404)
self.head(member_url, expected_status=404)
self.get(collection_url, expected_status=404)
self.delete(member_url, expected_status=404)
def test_list_role_assignments_for_inherited_domain_grants(self):
"""Call ``GET /role_assignments with inherited domain grants``.
Test Plan:
- Create 4 roles
- Create a domain with a user and two projects
- Assign two direct roles to project1
- Assign a spoiler role to project2
- Issue the URL to add inherited role to the domain
- Issue the URL to check it is indeed on the domain
- Issue the URL to check effective roles on project1 - this
should return 3 roles.
"""
role_list = []
for _ in range(4):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role['id'], role)
role_list.append(role)
domain = self.new_domain_ref()
self.identity_api.create_domain(domain['id'], domain)
user1 = self.new_user_ref(
domain_id=domain['id'])
user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(user1['id'], user1)
project1 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project1['id'], project1)
project2 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project2['id'], project2)
# Add some roles to the project
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[0]['id'])
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[1]['id'])
# ..and one on a different project as a spoiler
self.assignment_api.add_role_to_user_and_project(
user1['id'], project2['id'], role_list[2]['id'])
# Now create our inherited role on the domain
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': domain['id'],
'user_id': user1['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role_list[3]['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=role_list[3])
self.assertIn(collection_url, r.result['links']['self'])
# Now use the list domain role assignments api to check if this
# is included
collection_url = (
'/role_assignments?user.id=%(user_id)s'
'&scope.domain.id=%(domain_id)s' % {
'user_id': user1['id'],
'domain_id': domain['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 1)
ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=domain['id'], user_id=user1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
# Now ask for effective list role assignments - the role should
# turn into a project role, along with the two direct roles that are
# on the project
collection_url = (
'/role_assignments?effective&user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
'user_id': user1['id'],
'project_id': project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 3)
# An effective role for an inherited role will be a project
# entity, with a domain link to the inherited assignment
unused, up_entity = _build_role_assignment_url_and_entity(
project_id=project1['id'], user_id=user1['id'],
role_id=role_list[3]['id'])
ud_url, unused = _build_role_assignment_url_and_entity(
domain_id=domain['id'], user_id=user1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=ud_url)
def test_list_role_assignments_for_disabled_inheritance_extension(self):
"""Call ``GET /role_assignments with inherited domain grants``.
Test Plan:
- Issue the URL to add inherited role to the domain
- Issue the URL to check effective roles on project include the
inherited role
- Disable the extension
- Re-check the effective roles, proving the inherited role no longer
shows up.
"""
role_list = []
for _ in range(4):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role['id'], role)
role_list.append(role)
domain = self.new_domain_ref()
self.identity_api.create_domain(domain['id'], domain)
user1 = self.new_user_ref(
domain_id=domain['id'])
user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(user1['id'], user1)
project1 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project1['id'], project1)
project2 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project2['id'], project2)
# Add some roles to the project
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[0]['id'])
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[1]['id'])
# ..and one on a different project as a spoiler
self.assignment_api.add_role_to_user_and_project(
user1['id'], project2['id'], role_list[2]['id'])
# Now create our inherited role on the domain
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': domain['id'],
'user_id': user1['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role_list[3]['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=role_list[3])
self.assertIn(collection_url, r.result['links']['self'])
# Get effective list role assignments - the role should
# turn into a project role, along with the two direct roles that are
# on the project
collection_url = (
'/role_assignments?effective&user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
'user_id': user1['id'],
'project_id': project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 3)
unused, up_entity = _build_role_assignment_url_and_entity(
project_id=project1['id'], user_id=user1['id'],
role_id=role_list[3]['id'])
ud_url, unused = _build_role_assignment_url_and_entity(
domain_id=domain['id'], user_id=user1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=ud_url)
# Disable the extension and re-check the list, the role inherited
# from the project should no longer show up
self.opt_in_group('os_inherit', enabled=False)
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
unused, up_entity = _build_role_assignment_url_and_entity(
project_id=project1['id'], user_id=user1['id'],
role_id=role_list[3]['id'])
ud_url, unused = _build_role_assignment_url_and_entity(
domain_id=domain['id'], user_id=user1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
self.assertRoleAssignmentNotInListResponse(r, up_entity,
link_url=ud_url)
def test_list_role_assignments_for_inherited_group_domain_grants(self):
"""Call ``GET /role_assignments with inherited group domain grants``.
Test Plan:
- Create 4 roles
- Create a domain with a user and two projects
- Assign two direct roles to project1
- Assign a spoiler role to project2
- Issue the URL to add inherited role to the domain
- Issue the URL to check it is indeed on the domain
- Issue the URL to check effective roles on project1 - this
should return 3 roles.
"""
role_list = []
for _ in range(4):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role['id'], role)
role_list.append(role)
domain = self.new_domain_ref()
self.identity_api.create_domain(domain['id'], domain)
user1 = self.new_user_ref(
domain_id=domain['id'])
user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(user1['id'], user1)
user2 = self.new_user_ref(
domain_id=domain['id'])
user2['password'] = uuid.uuid4().hex
self.identity_api.create_user(user2['id'], user2)
group1 = self.new_group_ref(
domain_id=domain['id'])
self.identity_api.create_group(group1['id'], group1)
self.identity_api.add_user_to_group(user1['id'],
group1['id'])
self.identity_api.add_user_to_group(user2['id'],
group1['id'])
project1 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project1['id'], project1)
project2 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project2['id'], project2)
# Add some roles to the project
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[0]['id'])
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[1]['id'])
# ..and one on a different project as a spoiler
self.assignment_api.add_role_to_user_and_project(
user1['id'], project2['id'], role_list[2]['id'])
# Now create our inherited role on the domain
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
'domain_id': domain['id'],
'group_id': group1['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role_list[3]['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=role_list[3])
self.assertIn(collection_url, r.result['links']['self'])
# Now use the list domain role assignments api to check if this
# is included
collection_url = (
'/role_assignments?group.id=%(group_id)s'
'&scope.domain.id=%(domain_id)s' % {
'group_id': group1['id'],
'domain_id': domain['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 1)
gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=domain['id'], group_id=group1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)
# Now ask for effective list role assignments - the role should
# turn into a user project role, along with the two direct roles
# that are on the project
collection_url = (
'/role_assignments?effective&user.id=%(user_id)s'
'&scope.project.id=%(project_id)s' % {
'user_id': user1['id'],
'project_id': project1['id']})
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 3)
# An effective role for an inherited role will be a project
# entity, with a domain link to the inherited assignment
unused, up_entity = _build_role_assignment_url_and_entity(
project_id=project1['id'], user_id=user1['id'],
role_id=role_list[3]['id'])
gd_url, unused = _build_role_assignment_url_and_entity(
domain_id=domain['id'], group_id=group1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, up_entity, link_url=gd_url)
def test_filtered_role_assignments_for_inherited_grants(self):
"""Call ``GET /role_assignments?scope.OS-INHERIT:inherited_to``.
Test Plan:
- Create 5 roles
- Create a domain with a user, group and two projects
- Assign three direct spoiler roles to projects
- Issue the URL to add an inherited user role to the domain
- Issue the URL to add an inherited group role to the domain
- Issue the URL to filter by inherited roles - this should
return just the 2 inherited roles.
"""
role_list = []
for _ in range(5):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.assignment_api.create_role(role['id'], role)
role_list.append(role)
domain = self.new_domain_ref()
self.identity_api.create_domain(domain['id'], domain)
user1 = self.new_user_ref(
domain_id=domain['id'])
user1['password'] = uuid.uuid4().hex
self.identity_api.create_user(user1['id'], user1)
group1 = self.new_group_ref(
domain_id=domain['id'])
self.identity_api.create_group(group1['id'], group1)
project1 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project1['id'], project1)
project2 = self.new_project_ref(
domain_id=domain['id'])
self.assignment_api.create_project(project2['id'], project2)
# Add some spoiler roles to the projects
self.assignment_api.add_role_to_user_and_project(
user1['id'], project1['id'], role_list[0]['id'])
self.assignment_api.add_role_to_user_and_project(
user1['id'], project2['id'], role_list[1]['id'])
# Create a non-inherited role as a spoiler
self.assignment_api.create_grant(
role_list[2]['id'], user_id=user1['id'], domain_id=domain['id'])
# Now create two inherited roles on the domain, one for a user
# and one for a domain
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/users/%(user_id)s/roles' % {
'domain_id': domain['id'],
'user_id': user1['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role_list[3]['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=role_list[3])
self.assertIn(collection_url, r.result['links']['self'])
base_collection_url = (
'/OS-INHERIT/domains/%(domain_id)s/groups/%(group_id)s/roles' % {
'domain_id': domain['id'],
'group_id': group1['id']})
member_url = '%(collection_url)s/%(role_id)s/inherited_to_projects' % {
'collection_url': base_collection_url,
'role_id': role_list[4]['id']}
collection_url = base_collection_url + '/inherited_to_projects'
self.put(member_url)
self.head(member_url)
r = self.get(collection_url)
self.assertValidRoleListResponse(r, ref=role_list[4])
self.assertIn(collection_url, r.result['links']['self'])
# Now use the list role assignments api to get a list of inherited
# roles on the domain - should get back the two roles
collection_url = (
'/role_assignments?scope.OS-INHERIT:inherited_to=projects')
r = self.get(collection_url)
self.assertValidRoleAssignmentListResponse(r)
self.assertEqual(len(r.result.get('role_assignments')), 2)
ud_url, ud_entity = _build_role_assignment_url_and_entity(
domain_id=domain['id'], user_id=user1['id'],
role_id=role_list[3]['id'], inherited_to_projects=True)
gd_url, gd_entity = _build_role_assignment_url_and_entity(
domain_id=domain['id'], group_id=group1['id'],
role_id=role_list[4]['id'], inherited_to_projects=True)
self.assertRoleAssignmentInListResponse(r, ud_entity, link_url=ud_url)
self.assertRoleAssignmentInListResponse(r, gd_entity, link_url=gd_url)