Keystone server support for user groups

This implements the server side of groups of users.  This
set of code provides all the crud functionality for groups as
well as the corresponding support for role assignments.

blueprint user-groups

The following deficiencies existing with the current version and
will be corrected ahead of the final Grizzly release:

1) There is only placeholder support for LDAP (Bug #1092187)
2) Domain role grants are accepted but not yet honored (Bug #1093248)
3) Token invalidation does not occur with group changes (Bug #1093493)

This update also fills in missing v3 grant unit testing and v3 grant
support within the kvs backend.  In addition, there is a fix for
Bug #1092200 (uncaught exception when listing grants)

DocImpact

Change-Id: Ibd1783b04b2d7804eff90312e5ef591dca4d0695
This commit is contained in:
Henry Nash 2012-12-13 16:48:13 +00:00
parent 9460ff5c35
commit 4fae928c59
20 changed files with 1677 additions and 114 deletions

View File

@ -55,3 +55,7 @@ def tenant_name(name):
def user_name(name):
return check_name('User', name)
def group_name(name):
return check_name('Group', name)

View File

@ -99,6 +99,23 @@ class User(Model):
optional_keys = ('password', 'description', 'email', 'enabled')
class Group(Model):
"""Group object.
Required keys:
id
name
Optional keys:
domain_id
description
"""
required_keys = ('id', 'name')
optional_keys = ('domain_id', 'description')
class Tenant(Model):
"""Tenant object.

View File

@ -0,0 +1,93 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sql
def upgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
sql.Table('domain', meta, autoload=True)
group_table = sql.Table(
'group',
meta,
sql.Column('id', sql.String(64), primary_key=True),
sql.Column('domain_id', sql.String(64), sql.ForeignKey('domain.id')),
sql.Column('name', sql.String(64), unique=True, nullable=False),
sql.Column('description', sql.Text()),
sql.Column('extra', sql.Text()))
group_table.create(migrate_engine, checkfirst=True)
sql.Table('user', meta, autoload=True)
user_group_membership_table = sql.Table(
'user_group_membership',
meta,
sql.Column(
'user_id',
sql.String(64),
sql.ForeignKey('user.id'),
primary_key=True),
sql.Column(
'group_id',
sql.String(64),
sql.ForeignKey('group.id'),
primary_key=True))
user_group_membership_table.create(migrate_engine, checkfirst=True)
sql.Table('tenant', meta, autoload=True)
group_project_metadata_table = sql.Table(
'group_project_metadata',
meta,
sql.Column(
'group_id',
sql.String(64),
sql.ForeignKey('group.id'),
primary_key=True),
sql.Column(
'project_id',
sql.String(64),
sql.ForeignKey('tenant.id'),
primary_key=True),
sql.Column('data', sql.Text()))
group_project_metadata_table.create(migrate_engine, checkfirst=True)
group_domain_metadata_table = sql.Table(
'group_domain_metadata',
meta,
sql.Column(
'group_id',
sql.String(64),
sql.ForeignKey('group.id'),
primary_key=True),
sql.Column(
'domain_id',
sql.String(64),
sql.ForeignKey('domain.id'),
primary_key=True),
sql.Column('data', sql.Text()))
group_domain_metadata_table.create(migrate_engine, checkfirst=True)
def downgrade(migrate_engine):
meta = sql.MetaData()
meta.bind = migrate_engine
tables = ['user_group_membership', 'group_project_metadata',
'group_domain_metadata', 'group']
for t in tables:
table = sql.Table(t, meta, autoload=True)
table.drop(migrate_engine, checkfirst=True)

View File

@ -226,6 +226,17 @@ register_bool('role_allow_create', group='ldap', default=True)
register_bool('role_allow_update', group='ldap', default=True)
register_bool('role_allow_delete', group='ldap', default=True)
register_str('group_tree_dn', group='ldap', default=None)
register_str('group_filter', group='ldap', default=None)
register_str('group_objectclass', group='ldap', default='groupOfNames')
register_str('group_id_attribute', group='ldap', default='cn')
register_str('group_name_attribute', group='ldap', default='ou')
register_str('group_member_attribute', group='ldap', default='member')
register_str('group_desc_attribute', group='ldap', default='desc')
register_list('group_attribute_ignore', group='ldap', default='')
register_bool('group_allow_create', group='ldap', default=True)
register_bool('group_allow_update', group='ldap', default=True)
register_bool('group_allow_delete', group='ldap', default=True)
#pam
register_str('url', group='pam', default=None)
register_str('userid', group='pam', default=None)

View File

@ -148,6 +148,10 @@ class UserNotFound(NotFound):
"""Could not find user: %(user_id)s"""
class GroupNotFound(NotFound):
"""Could not find group: %(group_id)s"""
class Conflict(Error):
"""Conflict occurred attempting to store %(type)s.

View File

@ -97,9 +97,13 @@ class Identity(kvs.Base, identity.Driver):
def get_user_by_name(self, user_name):
return identity.filter_user(self._get_user_by_name(user_name))
def get_metadata(self, user_id, tenant_id):
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
try:
if user_id:
return self.db.get('metadata-%s-%s' % (tenant_id, user_id))
else:
return self.db.get('metadata-%s-%s' % (tenant_id, group_id))
except exception.NotFound:
raise exception.MetadataNotFound()
@ -199,12 +203,16 @@ class Identity(kvs.Base, identity.Driver):
raise exception.Conflict(type='user', details=msg)
user = utils.hash_user_password(user)
self.db.set('user-%s' % user_id, user)
self.db.set('user_name-%s' % user['name'], user)
new_user = user.copy()
new_user.setdefault('groups', [])
self.db.set('user-%s' % user_id, new_user)
self.db.set('user_name-%s' % new_user['name'], new_user)
user_list = set(self.db.get('user_list', []))
user_list.add(user_id)
self.db.set('user_list', list(user_list))
return identity.filter_user(user)
return identity.filter_user(new_user)
def update_user(self, user_id, user):
if 'name' in user:
@ -228,6 +236,42 @@ class Identity(kvs.Base, identity.Driver):
self.db.set('user_name-%s' % new_user['name'], new_user)
return new_user
def add_user_to_group(self, user_id, group_id):
self.get_group(group_id)
user_ref = self._get_user(user_id)
groups = set(user_ref.get('groups', []))
groups.add(group_id)
self.update_user(user_id, {'groups': list(groups)})
def check_user_in_group(self, user_id, group_id):
self.get_group(group_id)
user_ref = self._get_user(user_id)
if not group_id in set(user_ref.get('groups', [])):
raise exception.NotFound(_('User not found in group'))
def remove_user_from_group(self, user_id, group_id):
self.get_group(group_id)
user_ref = self._get_user(user_id)
groups = set(user_ref.get('groups', []))
try:
groups.remove(group_id)
except KeyError:
raise exception.NotFound(_('User not found in group'))
self.update_user(user_id, {'groups': list(groups)})
def list_users_in_group(self, group_id):
self.get_group(group_id)
user_keys = filter(lambda x: x.startswith("user-"), self.db.keys())
user_refs = [self.db.get(key) for key in user_keys]
user_refs_for_group = filter(lambda x: group_id in x['groups'],
user_refs)
return [identity.filter_user(x) for x in user_refs_for_group]
def list_groups_for_user(self, user_id):
user_ref = self._get_user(user_id)
group_ids = user_ref.get('groups', [])
return [self.get_group(x) for x in group_ids]
def delete_user(self, user_id):
try:
old_user = self.db.get('user-%s' % user_id)
@ -292,17 +336,22 @@ class Identity(kvs.Base, identity.Driver):
self.db.delete('tenant_name-%s' % old_tenant['name'])
self.db.delete('tenant-%s' % tenant_id)
def create_metadata(self, user_id, tenant_id, metadata):
def create_metadata(self, user_id, tenant_id, metadata,
domain_id=None, group_id=None):
if user_id:
self.db.set('metadata-%s-%s' % (tenant_id, user_id), metadata)
else:
self.db.set('metadata-%s-%s' % (tenant_id, group_id), metadata)
return metadata
def update_metadata(self, user_id, tenant_id, metadata):
def update_metadata(self, user_id, tenant_id, metadata,
domain_id=None, group_id=None):
if user_id:
self.db.set('metadata-%s-%s' % (tenant_id, user_id), metadata)
else:
self.db.set('metadata-%s-%s' % (tenant_id, group_id), metadata)
return metadata
def delete_metadata(self, user_id, tenant_id):
self.db.delete('metadata-%s-%s' % (tenant_id, user_id))
def create_role(self, role_id, role):
try:
self.get_role(role_id)
@ -358,6 +407,96 @@ class Identity(kvs.Base, identity.Driver):
role_list.remove(role_id)
self.db.set('role_list', list(role_list))
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
self.get_role(role_id)
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
roles = set(metadata_ref.get('roles', []))
roles.add(role_id)
metadata_ref['roles'] = list(roles)
self.update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
return [self.get_role(x) for x in metadata_ref.get('roles', [])]
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
self.get_role(role_id)
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
role_ids = set(metadata_ref.get('roles', []))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return self.get_role(role_id)
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
self.get_role(role_id)
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
roles = set(metadata_ref.get('roles', []))
try:
roles.remove(role_id)
except KeyError:
raise exception.RoleNotFound(role_id=role_id)
metadata_ref['roles'] = list(roles)
self.update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
# domain crud
def create_domain(self, domain_id, domain):
@ -382,3 +521,41 @@ class Identity(kvs.Base, identity.Driver):
domain_list = set(self.db.get('domain_list', []))
domain_list.remove(domain_id)
self.db.set('domain_list', list(domain_list))
# group crud
def create_group(self, group_id, group):
self.db.set('group-%s' % group_id, group)
group_list = set(self.db.get('group_list', []))
group_list.add(group_id)
self.db.set('group_list', list(group_list))
return group
def list_groups(self):
return self.db.get('group_list', [])
def get_group(self, group_id):
try:
return self.db.get('group-%s' % group_id)
except exception.NotFound:
raise exception.GroupNotFound(group_id=group_id)
def update_group(self, group_id, group):
self.db.set('group-%s' % group_id, group)
return group
def delete_group(self, group_id):
# Delete any entries in the group lists of all users
user_keys = filter(lambda x: x.startswith("user-"), self.db.keys())
user_refs = [self.db.get(key) for key in user_keys]
for user_ref in user_refs:
groups = set(user_ref.get('groups', []))
if group_id in groups:
groups.remove(group_id)
self.update_user(user_ref['id'], {'groups': list(groups)})
# Now delete the group itself
self.db.delete('group-%s' % group_id)
group_list = set(self.db.get('group_list', []))
group_list.remove(group_id)
self.db.set('group_list', list(group_list))

View File

@ -43,6 +43,7 @@ class Identity(identity.Driver):
self.user = UserApi(CONF)
self.tenant = TenantApi(CONF)
self.role = RoleApi(CONF)
self.group = GroupApi(CONF)
def get_connection(self, user=None, password=None):
if self.LDAP_URL.startswith('fake://'):
@ -259,6 +260,27 @@ class Identity(identity.Driver):
self.get_role(role_id)
self.role.update(role_id, role)
def create_group(self, group_id, group):
group['name'] = clean.group_name(group['name'])
return self.group.create(group)
def get_group(self, group_id):
try:
return self.group.get(group_id)
except exception.NotFound:
raise exception.GroupNotFound(group_id=group_id)
def update_group(self, group_id, group):
if 'name' in group:
group['name'] = clean.group_name(group['name'])
return self.group.update(group_id, group)
def delete_group(self, group_id):
try:
return self.group.delete(group_id)
except ldap.NO_SUCH_OBJECT:
raise exception.GroupNotFound(group_id=group_id)
# TODO(termie): remove this and move cross-api calls into driver
class ApiShim(object):
@ -271,6 +293,7 @@ class ApiShim(object):
_role = None
_tenant = None
_user = None
_group = None
def __init__(self, conf):
self.conf = conf
@ -293,6 +316,12 @@ class ApiShim(object):
self._user = UserApi(self.conf)
return self._user
@property
def group(self):
if not self.group:
self.group = GroupApi(self.conf)
return self.group
# TODO(termie): remove this and move cross-api calls into driver
class ApiShimMixin(object):
@ -310,6 +339,10 @@ class ApiShimMixin(object):
def user_api(self):
return self.api.user
@property
def group_api(self):
return self.api.group
# TODO(termie): turn this into a data object and move logic to driver
class UserApi(common_ldap.BaseLdap, ApiShimMixin):
@ -618,6 +651,16 @@ class UserRoleAssociation(object):
self.tenant_id = str(tenant_id)
class GroupRoleAssociation(object):
"""Role Grant model."""
def __init__(self, group_id=None, role_id=None, tenant_id=None,
*args, **kw):
self.group_id = str(group_id)
self.role_id = role_id
self.tenant_id = str(tenant_id)
# TODO(termie): turn this into a data object and move logic to driver
class RoleApi(common_ldap.BaseLdap, ApiShimMixin):
DEFAULT_OU = 'ou=Roles'
@ -1005,3 +1048,69 @@ class RoleApi(common_ldap.BaseLdap, ApiShimMixin):
except ldap.NO_SUCH_OBJECT:
pass
super(RoleApi, self).delete(id)
# TODO (henry-nash) This is a placeholder for the full LDPA implementation
# This needs to be completed (see Bug #1092187)
class GroupApi(common_ldap.BaseLdap, ApiShimMixin):
DEFAULT_OU = 'ou=UserGroups'
DEFAULT_STRUCTURAL_CLASSES = []
DEFAULT_OBJECTCLASS = 'groupOfNames'
DEFAULT_ID_ATTR = 'cn'
DEFAULT_MEMBER_ATTRIBUTE = 'member'
DEFAULT_ATTRIBUTE_IGNORE = []
options_name = 'group'
attribute_mapping = {'name': 'ou',
'description': 'desc',
'groupId': 'cn'}
model = models.Group
def __init__(self, conf):
super(GroupApi, self).__init__(conf)
self.api = ApiShim(conf)
self.attribute_mapping['name'] = conf.ldap.group_name_attribute
self.attribute_mapping['description'] = conf.ldap.group_desc_attribute
self.member_attribute = (getattr(conf.ldap, 'group_member_attribute')
or self.DEFAULT_MEMBER_ATTRIBUTE)
self.attribute_ignore = (getattr(conf.ldap, 'group_attribute_ignore')
or self.DEFAULT_ATTRIBUTE_IGNORE)
def get(self, id, filter=None):
"""Replaces exception.NotFound with exception.GroupNotFound."""
try:
return super(GroupApi, self).get(id, filter)
except exception.NotFound:
raise exception.GroupNotFound(group_id=id)
def get_by_name(self, name, filter=None):
query = ('(%s=%s)' % (self.attribute_mapping['name'],
ldap_filter.escape_filter_chars(name)))
groups = self.get_all(query)
try:
return groups[0]
except IndexError:
raise exception.GroupNotFound(group_id=name)
def create(self, values):
self.affirm_unique(values)
data = values.copy()
if data.get('id') is None:
data['id'] = uuid.uuid4().hex
return super(GroupApi, self).create(data)
def delete(self, id):
if self.subtree_delete_enabled:
super(GroupApi, self).deleteTree(id)
else:
self.role_api.roles_delete_subtree_by_group(id)
super(GroupApi, self).delete(id)
def update(self, id, values):
try:
old_obj = self.get(id)
except exception.NotFound:
raise exception.GroupNotFound(group_id=id)
if old_obj['name'] != values['name']:
msg = _('Changing Name not supported by LDAP')
raise exception.NotImplemented(message=msg)
super(GroupApi, self).update(id, values, old_obj)

View File

@ -143,9 +143,6 @@ class PamIdentity(identity.Driver):
def update_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()
def delete_metadata(self, user_id, tenant_id, metadata):
raise NotImplementedError()
def create_role(self, role_id, role):
raise NotImplementedError()

View File

@ -47,6 +47,16 @@ class User(sql.ModelBase, sql.DictBase):
extra = sql.Column(sql.JsonBlob())
class Group(sql.ModelBase, sql.DictBase):
__tablename__ = 'group'
attributes = ['id', 'name', 'domain_id']
id = sql.Column(sql.String(64), primary_key=True)
name = sql.Column(sql.String(64), unique=True, nullable=False)
domain_id = sql.Column(sql.String(64), sql.ForeignKey('domain.id'))
description = sql.Column(sql.Text())
extra = sql.Column(sql.JsonBlob())
class Credential(sql.ModelBase, sql.DictBase):
__tablename__ = 'credential'
attributes = ['id', 'user_id', 'project_id', 'blob', 'type']
@ -88,7 +98,17 @@ class Role(sql.ModelBase, sql.DictBase):
extra = sql.Column(sql.JsonBlob())
class UserProjectMetadata(sql.ModelBase, sql.DictBase):
class BaseGrant(sql.DictBase):
def to_dict(self):
"""Override parent to_dict() method with a simpler implementation.
Grant tables don't have non-indexed 'extra' attributes, so the
parent implementation is not applicable.
"""
return dict(self.iteritems())
class UserProjectGrant(sql.ModelBase, BaseGrant):
# TODO(dolph): rename to user_project_metadata (needs a migration)
__tablename__ = 'metadata'
user_id = sql.Column(sql.String(64), primary_key=True)
@ -96,22 +116,28 @@ class UserProjectMetadata(sql.ModelBase, sql.DictBase):
tenant_id = sql.Column(sql.String(64), primary_key=True)
data = sql.Column(sql.JsonBlob())
def to_dict(self):
"""Override parent to_dict() method with a simpler implementation.
Metadata doesn't have non-indexed 'extra' attributes, so the parent
implementation is not applicable.
"""
return dict(self.iteritems())
class UserDomainMetadata(sql.ModelBase, sql.DictBase):
class UserDomainGrant(sql.ModelBase, BaseGrant):
__tablename__ = 'user_domain_metadata'
user_id = sql.Column(sql.String(64), primary_key=True)
domain_id = sql.Column(sql.String(64), primary_key=True)
data = sql.Column(sql.JsonBlob())
class GroupProjectGrant(sql.ModelBase, BaseGrant):
__tablename__ = 'group_project_metadata'
group_id = sql.Column(sql.String(64), primary_key=True)
project_id = sql.Column(sql.String(64), primary_key=True)
data = sql.Column(sql.JsonBlob())
class GroupDomainGrant(sql.ModelBase, BaseGrant):
__tablename__ = 'group_domain_metadata'
group_id = sql.Column(sql.String(64), primary_key=True)
domain_id = sql.Column(sql.String(64), primary_key=True)
data = sql.Column(sql.JsonBlob())
# TODO(dolph): ... do we need this table?
class UserTenantMembership(sql.ModelBase, sql.DictBase):
"""Tenant membership join table."""
@ -124,6 +150,17 @@ class UserTenantMembership(sql.ModelBase, sql.DictBase):
primary_key=True)
class UserGroupMembership(sql.ModelBase, sql.DictBase):
"""Group membership join table."""
__tablename__ = 'user_group_membership'
user_id = sql.Column(sql.String(64),
sql.ForeignKey('user.id'),
primary_key=True)
group_id = sql.Column(sql.String(64),
sql.ForeignKey('group.id'),
primary_key=True)
class Identity(sql.Base, identity.Driver):
# Internal interface to manage the database
def db_sync(self):
@ -202,32 +239,47 @@ class Identity(sql.Base, identity.Driver):
return [identity.filter_user(user_ref.to_dict())
for user_ref in user_refs]
def get_metadata(self, user_id, tenant_id=None, domain_id=None):
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
session = self.get_session()
if user_id:
if tenant_id:
q = session.query(UserProjectMetadata)
q = session.query(UserProjectGrant)
q = q.filter_by(tenant_id=tenant_id)
elif domain_id:
q = session.query(UserDomainMetadata)
q = session.query(UserDomainGrant)
q = q.filter_by(domain_id=domain_id)
q = q.filter_by(user_id=user_id)
elif group_id:
if tenant_id:
q = session.query(GroupProjectGrant)
q = q.filter_by(project_id=tenant_id)
elif domain_id:
q = session.query(GroupDomainGrant)
q = q.filter_by(domain_id=domain_id)
q = q.filter_by(group_id=group_id)
try:
return q.one().data
except sql.NotFound:
raise exception.MetadataNotFound()
def create_grant(self, role_id, user_id, domain_id, project_id):
def create_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
self.get_role(role_id)
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
is_new = False
except exception.MetadataNotFound:
metadata_ref = {}
@ -236,31 +288,67 @@ class Identity(sql.Base, identity.Driver):
roles.add(role_id)
metadata_ref['roles'] = list(roles)
if is_new:
self.create_metadata(user_id, project_id, metadata_ref, domain_id)
self.create_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
else:
self.update_metadata(user_id, project_id, metadata_ref, domain_id)
self.update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
def list_grants(self, user_id, domain_id, project_id):
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
return [self.get_role(x) for x in metadata_ref.get('roles', [])]
def get_grant(self, role_id, user_id, domain_id, project_id):
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
role_ids = set(metadata_ref.get('roles', []))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return self.get_role(role_id)
def delete_grant(self, role_id, user_id, domain_id, project_id):
self.get_role(role_id)
def list_grants(self, user_id=None, group_id=None,
domain_id=None, project_id=None):
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id, domain_id)
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
return [self.get_role(x) for x in metadata_ref.get('roles', [])]
def get_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
self.get_role(role_id)
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
except exception.MetadataNotFound:
metadata_ref = {}
role_ids = set(metadata_ref.get('roles', []))
if role_id not in role_ids:
raise exception.RoleNotFound(role_id=role_id)
return self.get_role(role_id)
def delete_grant(self, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
self.get_role(role_id)
if user_id:
self.get_user(user_id)
if group_id:
self.get_group(group_id)
if domain_id:
self.get_domain(domain_id)
if project_id:
self.get_tenant(project_id)
try:
metadata_ref = self.get_metadata(user_id, project_id,
domain_id, group_id)
is_new = False
except exception.MetadataNotFound:
metadata_ref = {}
@ -272,9 +360,11 @@ class Identity(sql.Base, identity.Driver):
raise exception.RoleNotFound(role_id=role_id)
metadata_ref['roles'] = list(roles)
if is_new:
self.create_metadata(user_id, project_id, metadata_ref, domain_id)
self.create_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
else:
self.update_metadata(user_id, project_id, metadata_ref, domain_id)
self.update_metadata(user_id, project_id, metadata_ref,
domain_id, group_id)
# These should probably be part of the high-level API
def add_user_to_tenant(self, tenant_id, user_id):
@ -416,10 +506,14 @@ class Identity(sql.Base, identity.Driver):
q = q.filter_by(tenant_id=tenant_id)
q.delete(False)
q = session.query(UserProjectMetadata)
q = session.query(UserProjectGrant)
q = q.filter_by(tenant_id=tenant_id)
q.delete(False)
q = session.query(GroupProjectGrant)
q = q.filter_by(project_id=tenant_id)
q.delete(False)
if not session.query(Tenant).filter_by(id=tenant_id).delete(False):
raise exception.TenantNotFound(tenant_id=tenant_id)
@ -427,34 +521,55 @@ class Identity(sql.Base, identity.Driver):
session.flush()
@handle_conflicts(type='metadata')
def create_metadata(self, user_id, tenant_id, metadata, domain_id=None):
def create_metadata(self, user_id, tenant_id, metadata,
domain_id=None, group_id=None):
session = self.get_session()
with session.begin():
if user_id:
if tenant_id:
session.add(UserProjectMetadata(user_id=user_id,
session.add(UserProjectGrant(user_id=user_id,
tenant_id=tenant_id,
data=metadata))
elif domain_id:
session.add(UserDomainMetadata(user_id=user_id,
session.add(UserDomainGrant(user_id=user_id,
domain_id=domain_id,
data=metadata))
elif group_id:
if tenant_id:
session.add(GroupProjectGrant(group_id=group_id,
project_id=tenant_id,
data=metadata))
elif domain_id:
session.add(GroupDomainGrant(group_id=group_id,
domain_id=domain_id,
data=metadata))
session.flush()
return metadata
@handle_conflicts(type='metadata')
def update_metadata(self, user_id, tenant_id, metadata, domain_id=None):
def update_metadata(self, user_id, tenant_id, metadata,
domain_id=None, group_id=None):
session = self.get_session()
with session.begin():
if user_id:
if tenant_id:
metadata_ref = session.query(UserProjectMetadata)\
.filter_by(user_id=user_id)\
.filter_by(tenant_id=tenant_id)\
.first()
q = session.query(UserProjectGrant)
q = q.filter_by(user_id=user_id)
q = q.filter_by(tenant_id=tenant_id)
elif domain_id:
metadata_ref = session.query(UserDomainMetadata)\
.filter_by(user_id=user_id)\
.filter_by(domain_id=domain_id)\
.first()
q = session.query(UserDomainGrant)
q = q.filter_by(user_id=user_id)
q = q.filter_by(domain_id=domain_id)
elif group_id:
if tenant_id:
q = session.query(GroupProjectGrant)
q = q.filter_by(group_id=group_id)
q = q.filter_by(project_id=tenant_id)
elif domain_id:
q = session.query(GroupDomainGrant)
q = q.filter_by(group_id=group_id)
q = q.filter_by(domain_id=domain_id)
metadata_ref = q.first()
data = metadata_ref.data.copy()
data.update(metadata)
metadata_ref.data = data
@ -548,7 +663,7 @@ class Identity(sql.Base, identity.Driver):
session = self.get_session()
user = self.get_user(user_id)
metadata_refs = session\
.query(UserProjectMetadata)\
.query(UserProjectGrant)\
.filter_by(user_id=user_id)
project_ids = set([x.tenant_id for x in metadata_refs
if x.data.get('roles')])
@ -624,6 +739,62 @@ class Identity(sql.Base, identity.Driver):
session.flush()
return identity.filter_user(user_ref.to_dict(include_extra_dict=True))
def add_user_to_group(self, user_id, group_id):
session = self.get_session()
self.get_group(group_id)
self.get_user(user_id)
query = session.query(UserGroupMembership)
query = query.filter_by(user_id=user_id)
query = query.filter_by(group_id=group_id)
rv = query.first()
if rv:
return
with session.begin():
session.add(UserGroupMembership(user_id=user_id,
group_id=group_id))
session.flush()
def check_user_in_group(self, user_id, group_id):
session = self.get_session()
self.get_group(group_id)
self.get_user(user_id)
query = session.query(UserGroupMembership)
query = query.filter_by(user_id=user_id)
query = query.filter_by(group_id=group_id)
if not query.first():
raise exception.NotFound('User not found in group')
def remove_user_from_group(self, user_id, group_id):
session = self.get_session()
# We don't check if user or group are still valid and let the remove
# be tried anyway - in case this is some kind of clean-up operation
query = session.query(UserGroupMembership)
query = query.filter_by(user_id=user_id)
query = query.filter_by(group_id=group_id)
membership_ref = query.first()
if membership_ref is None:
raise exception.NotFound('User not found in group')
with session.begin():
session.delete(membership_ref)
session.flush()
def list_groups_for_user(self, user_id):
session = self.get_session()
self.get_user(user_id)
query = session.query(UserGroupMembership)
query = query.filter_by(user_id=user_id)
membership_refs = query.all()
return [self.get_group(x.group_id) for x in membership_refs]
def list_users_in_group(self, group_id):
session = self.get_session()
self.get_group(group_id)
query = session.query(UserGroupMembership)
query = query.filter_by(group_id=group_id)
membership_refs = query.all()
return [self.get_user(x.user_id) for x in membership_refs]
def delete_user(self, user_id):
session = self.get_session()
@ -637,7 +808,15 @@ class Identity(sql.Base, identity.Driver):
q = q.filter_by(user_id=user_id)
q.delete(False)
q = session.query(UserProjectMetadata)
q = session.query(UserProjectGrant)
q = q.filter_by(user_id=user_id)
q.delete(False)
q = session.query(UserDomainGrant)
q = q.filter_by(user_id=user_id)
q.delete(False)
q = session.query(UserGroupMembership)
q = q.filter_by(user_id=user_id)
q.delete(False)
@ -647,6 +826,77 @@ class Identity(sql.Base, identity.Driver):
session.delete(ref)
session.flush()
# group crud
@handle_conflicts(type='group')
def create_group(self, group_id, group):
session = self.get_session()
with session.begin():
ref = Group.from_dict(group)
session.add(ref)
session.flush()
return ref.to_dict()
def list_groups(self):
session = self.get_session()
refs = session.query(Group).all()
return [ref.to_dict() for ref in refs]
def _get_group(self, group_id):
session = self.get_session()
ref = session.query(Group).filter_by(id=group_id).first()
if not ref:
raise exception.GroupNotFound(group_id=group_id)
return ref.to_dict()
def get_group(self, group_id):
return self._get_group(group_id)
@handle_conflicts(type='group')
def update_group(self, group_id, group):
session = self.get_session()
with session.begin():
ref = session.query(Group).filter_by(id=group_id).first()
if ref is None:
raise exception.GroupNotFound(group_id=group_id)
old_dict = ref.to_dict()
for k in group:
old_dict[k] = group[k]
new_group = Group.from_dict(old_dict)
for attr in Group.attributes:
if attr != 'id':
setattr(ref, attr, getattr(new_group, attr))
ref.extra = new_group.extra
session.flush()
return ref.to_dict()
def delete_group(self, group_id):
session = self.get_session()
try:
ref = session.query(Group).filter_by(id=group_id).one()
except sql.NotFound:
raise exception.GroupNotFound(group_id=group_id)
with session.begin():
q = session.query(GroupProjectGrant)
q = q.filter_by(group_id=group_id)
q.delete(False)
q = session.query(GroupDomainGrant)
q = q.filter_by(group_id=group_id)
q.delete(False)
q = session.query(UserGroupMembership)
q = q.filter_by(group_id=group_id)
q.delete(False)
if not session.query(Group).filter_by(id=group_id).delete(False):
raise exception.GroupNotFound(group_id=group_id)
session.delete(ref)
session.flush()
# credential crud
@handle_conflicts(type='credential')
@ -750,7 +1000,7 @@ class Identity(sql.Base, identity.Driver):
raise exception.RoleNotFound(role_id=role_id)
with session.begin():
for metadata_ref in session.query(UserProjectMetadata):
for metadata_ref in session.query(UserProjectGrant):
metadata = metadata_ref.to_dict()
try:
self.remove_role_from_user_and_tenant(

View File

@ -462,6 +462,11 @@ class UserV3(controller.V3Controller):
refs = self.identity_api.list_users(context)
return {'users': self._paginate(context, refs)}
@controller.protected
def list_users_in_group(self, context, group_id):
refs = self.identity_api.list_users_in_group(context, group_id)
return {'users': self._paginate(context, refs)}
@controller.protected
def get_user(self, context, user_id):
ref = self.identity_api.get_user(context, user_id)
@ -474,11 +479,60 @@ class UserV3(controller.V3Controller):
ref = self.identity_api.update_user(context, user_id, user)
return {'user': ref}
@controller.protected
def add_user_to_group(self, context, user_id, group_id):
return self.identity_api.add_user_to_group(context,
user_id, group_id)
@controller.protected
def check_user_in_group(self, context, user_id, group_id):
return self.identity_api.check_user_in_group(context,
user_id, group_id)
@controller.protected
def remove_user_from_group(self, context, user_id, group_id):
return self.identity_api.remove_user_from_group(context,
user_id, group_id)
@controller.protected
def delete_user(self, context, user_id):
return self.identity_api.delete_user(context, user_id)
class GroupV3(controller.V3Controller):
@controller.protected
def create_group(self, context, group):
ref = self._assign_unique_id(self._normalize_dict(group))
ref = self.identity_api.create_group(context, ref['id'], ref)
return {'group': ref}
@controller.protected
def list_groups(self, context):
refs = self.identity_api.list_groups(context)
return {'groups': self._paginate(context, refs)}
@controller.protected
def list_groups_for_user(self, context, user_id):
refs = self.identity_api.list_groups_for_user(context, user_id)
return {'groups': self._paginate(context, refs)}
@controller.protected
def get_group(self, context, group_id):
ref = self.identity_api.get_group(context, group_id)
return {'group': ref}
@controller.protected
def update_group(self, context, group_id, group):
self._require_matching_id(group_id, group)
ref = self.identity_api.update_group(context, group_id, group)
return {'group': ref}
@controller.protected
def delete_group(self, context, group_id):
return self.identity_api.delete_group(context, group_id)
class CredentialV3(controller.V3Controller):
@controller.protected
def create_credential(self, context, credential):
@ -539,43 +593,52 @@ class RoleV3(controller.V3Controller):
def delete_role(self, context, role_id):
return self.identity_api.delete_role(context, role_id)
def _require_domain_or_project(self, domain_id, project_id):
def _require_domain_xor_project(self, domain_id, project_id):
if (domain_id and project_id) or (not domain_id and not project_id):
msg = 'Specify a domain or project, not both'
raise exception.ValidationError(msg)
def _require_user_xor_group(self, user_id, group_id):
if (user_id and group_id) or (not user_id and not group_id):
msg = 'Specify a user or group, not both'
raise exception.ValidationError(msg)
@controller.protected
def create_grant(self, context, role_id, user_id, domain_id=None,
project_id=None):
"""Grants a role to a user on either a domain or project."""
self._require_domain_or_project(domain_id, project_id)
def create_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
"""Grants a role to a user or group on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
return self.identity_api.create_grant(
context, role_id, user_id, domain_id, project_id)
context, role_id, user_id, group_id, domain_id, project_id)
@controller.protected
def list_grants(self, context, user_id, domain_id=None,
project_id=None):
"""Lists roles granted to a user on either a domain or project."""
self._require_domain_or_project(domain_id, project_id)
def list_grants(self, context, user_id=None, group_id=None,
domain_id=None, project_id=None):
"""Lists roles granted to user/group on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
return self.identity_api.list_grants(
context, user_id, domain_id, project_id)
context, user_id, group_id, domain_id, project_id)
@controller.protected
def check_grant(self, context, role_id, user_id, domain_id=None,
project_id=None):
def check_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
"""Checks if a role has been granted on either a domain or project."""
self._require_domain_or_project(domain_id, project_id)
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
self.identity_api.get_grant(
context, role_id, user_id, domain_id, project_id)
context, role_id, user_id, group_id, domain_id, project_id)
@controller.protected
def revoke_grant(self, context, role_id, user_id, domain_id=None,
project_id=None):
"""Revokes a role from a user on either a domain or project."""
self._require_domain_or_project(domain_id, project_id)
def revoke_grant(self, context, role_id, user_id=None, group_id=None,
domain_id=None, project_id=None):
"""Revokes a role from user/group on either a domain or project."""
self._require_domain_xor_project(domain_id, project_id)
self._require_user_xor_group(user_id, group_id)
self.identity_api.delete_grant(
context, role_id, user_id, domain_id, project_id)
context, role_id, user_id, group_id, domain_id, project_id)

View File

@ -38,6 +38,7 @@ def filter_user(user_ref):
user_ref = user_ref.copy()
user_ref.pop('password', None)
user_ref.pop('tenants', None)
user_ref.pop('groups', None)
try:
user_ref['extra'].pop('password', None)
user_ref['extra'].pop('tenants', None)
@ -202,16 +203,32 @@ class Driver(object):
raise exception.NotImplemented()
# metadata crud
def get_metadata(self, user_id, tenant_id):
def get_metadata(self, user_id=None, tenant_id=None,
domain_id=None, group_id=None):
"""Gets the metadata for the specified user/group on project/domain.
:raises: keystone.exception.MetadataNotFound
:returns: metadata
"""
raise exception.NotImplemented()
def create_metadata(self, user_id, tenant_id, metadata):
def create_metadata(self, user_id, tenant_id, metadata,
domain_id=None, group_id=None):
"""Creates the metadata for the specified user/group on project/domain.
:returns: metadata created
"""
raise exception.NotImplemented()
def update_metadata(self, user_id, tenant_id, metadata):
raise exception.NotImplemented()
def update_metadata(self, user_id, tenant_id, metadata,
domain_id=None, group_id=None):
"""Updates the metadata for the specified user/group on project/domain.
def delete_metadata(self, user_id, tenant_id):
:returns: metadata updated
"""
raise exception.NotImplemented()
# domain crud
@ -318,6 +335,14 @@ class Driver(object):
"""
raise exception.NotImplemented()
def list_users_in_group(self, group_id, group):
"""List all users in a group.
:returns: a list of user_refs or an empty list.
"""
raise exception.NotImplemented()
def get_user(self, user_id):
"""Get a user by ID.
@ -336,6 +361,32 @@ class Driver(object):
"""
raise exception.NotImplemented()
def add_user_to_group(self, user_id, group_id):
"""Adds a user to a group.
:raises: keystone.exception.UserNotFound,
keystone.exception.GroupNotFound
"""
raise exception.NotImplemented()
def check_user_in_group(self, user_id, group_id):
"""Checks if a user is a member of a group.
:raises: keystone.exception.UserNotFound,
keystone.exception.GroupNotFound
"""
raise exception.NotImplemented()
def remove_user_from_group(self, user_id, group_id):
"""Removes a user from a group.
:raises: keystone.exception.NotFound
"""
raise exception.NotImplemented()
def delete_user(self, user_id):
"""Deletes an existing user.
@ -431,3 +482,55 @@ class Driver(object):
"""
raise exception.NotImplemented()
# group crud
def create_group(self, group_id, group):
"""Creates a new group.
:raises: keystone.exception.Conflict
"""
raise exception.NotImplemented()
def list_groups(self):
"""List all groups in the system.
:returns: a list of group_refs or an empty list.
"""
raise exception.NotImplemented()
def list_groups_for_user(self, user_id, user):
"""List all groups a user is in
:returns: a list of group_refs or an empty list.
"""
raise exception.NotImplemented()
def get_group(self, group_id):
"""Get a group by ID.
:returns: group_ref
:raises: keystone.exception.GroupNotFound
"""
raise exception.NotImplemented()
def update_group(self, group_id, group):
"""Updates an existing group.
:raises: keystone.exceptionGroupNotFound,
keystone.exception.Conflict
"""
raise exception.NotImplemented()
def delete_group(self, group_id):
"""Deletes an existing group.
:raises: keystone.exception.GroupNotFound
"""
raise exception.NotImplemented()

View File

@ -64,6 +64,7 @@ def append_v3_routers(mapper, routers):
routers.append(
router.Router(controllers.DomainV3(),
'domains', 'domain'))
project_controller = controllers.ProjectV3()
routers.append(
router.Router(project_controller,
@ -72,43 +73,107 @@ def append_v3_routers(mapper, routers):
controller=project_controller,
action='list_user_projects',
conditions=dict(method=['GET']))
user_controller = controllers.UserV3()
routers.append(
router.Router(controllers.UserV3(),
router.Router(user_controller,
'users', 'user'))
mapper.connect('/groups/{group_id}/users',
controller=user_controller,
action='list_users_in_group',
conditions=dict(method=['GET']))
mapper.connect('/groups/{group_id}/users/{user_id}',
controller=user_controller,
action='add_user_to_group',
conditions=dict(method=['PUT']))
mapper.connect('/groups/{group_id}/users/{user_id}',
controller=user_controller,
action='check_user_in_group',
conditions=dict(method=['HEAD']))
mapper.connect('/groups/{group_id}/users/{user_id}',
controller=user_controller,
action='remove_user_from_group',
conditions=dict(method=['DELETE']))
group_controller = controllers.GroupV3()
routers.append(
router.Router(group_controller,
'groups', 'group'))
mapper.connect('/users/{user_id}/groups',
controller=group_controller,
action='list_groups_for_user',
conditions=dict(method=['GET']))
routers.append(
router.Router(controllers.CredentialV3(),
'credentials', 'credential'))
role_controller = controllers.RoleV3()
routers.append(router.Router(role_controller, 'roles', 'role'))
mapper.connect('/projects/{project_id}/users/{user_id}/roles/{role_id}',
controller=role_controller,
action='create_grant',
conditions=dict(method=['PUT']))
mapper.connect('/projects/{project_id}/groups/{group_id}/roles/{role_id}',
controller=role_controller,
action='create_grant',
conditions=dict(method=['PUT']))
mapper.connect('/projects/{project_id}/users/{user_id}/roles/{role_id}',
controller=role_controller,
action='check_grant',
conditions=dict(method=['HEAD']))
mapper.connect('/projects/{project_id}/groups/{group_id}/roles/{role_id}',
controller=role_controller,
action='check_grant',
conditions=dict(method=['HEAD']))
mapper.connect('/projects/{project_id}/users/{user_id}/roles',
controller=role_controller,
action='list_grants',
conditions=dict(method=['GET']))
mapper.connect('/projects/{project_id}/groups/{group_id}/roles',
controller=role_controller,
action='list_grants',
conditions=dict(method=['GET']))
mapper.connect('/projects/{project_id}/users/{user_id}/roles/{role_id}',
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))
mapper.connect('/projects/{project_id}/groups/{group_id}/roles/{role_id}',
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))
mapper.connect('/domains/{domain_id}/users/{user_id}/roles/{role_id}',
controller=role_controller,
action='create_grant',
conditions=dict(method=['PUT']))
mapper.connect('/domains/{domain_id}/groups/{group_id}/roles/{role_id}',
controller=role_controller,
action='create_grant',
conditions=dict(method=['PUT']))
mapper.connect('/domains/{domain_id}/users/{user_id}/roles/{role_id}',
controller=role_controller,
action='check_grant',
conditions=dict(method=['HEAD']))
mapper.connect('/domains/{domain_id}/groups/{group_id}/roles/{role_id}',
controller=role_controller,
action='check_grant',
conditions=dict(method=['HEAD']))
mapper.connect('/domains/{domain_id}/users/{user_id}/roles',
controller=role_controller,
action='list_grants',
conditions=dict(method=['GET']))
mapper.connect('/domains/{domain_id}/groups/{group_id}/roles',
controller=role_controller,
action='list_grants',
conditions=dict(method=['GET']))
mapper.connect('/domains/{domain_id}/users/{user_id}/roles/{role_id}',
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))
mapper.connect('/domains/{domain_id}/groups/{group_id}/roles/{role_id}',
controller=role_controller,
action='revoke_grant',
conditions=dict(method=['DELETE']))

View File

@ -27,6 +27,7 @@ from keystone import token
LOG = logging.getLogger(__name__)
DRIVERS = dict(
catalog_api=catalog.Manager(),
ec2_api=ec2.Manager(),

View File

@ -174,6 +174,14 @@ class Auth(controller.V2Controller):
tenant_ref = self._get_tenant_ref(context, user_id, tenant_id)
metadata_ref = self._get_metadata_ref(context, user_id, tenant_id)
self._append_roles(metadata_ref,
self._get_group_metadata_ref(
context, user_id, tenant_id))
self._append_roles(metadata_ref,
self._get_domain_metadata_ref(
context, user_id, tenant_id))
expiry = old_token_ref['expires']
auth_token_data = self._get_auth_token_data(current_user_ref,
tenant_ref,
@ -226,6 +234,14 @@ class Auth(controller.V2Controller):
raise exception.Unauthorized(e)
(user_ref, tenant_ref, metadata_ref) = auth_info
self._append_roles(metadata_ref,
self._get_group_metadata_ref(
context, user_id, tenant_id))
self._append_roles(metadata_ref,
self._get_domain_metadata_ref(
context, user_id, tenant_id))
expiry = core.default_expire_time()
auth_token_data = self._get_auth_token_data(user_ref,
tenant_ref,
@ -255,6 +271,14 @@ class Auth(controller.V2Controller):
tenant_ref = self._get_tenant_ref(context, user_id, tenant_id)
metadata_ref = self._get_metadata_ref(context, user_id, tenant_id)
self._append_roles(metadata_ref,
self._get_group_metadata_ref(
context, user_id, tenant_id))
self._append_roles(metadata_ref,
self._get_domain_metadata_ref(
context, user_id, tenant_id))
expiry = core.default_expire_time()
auth_token_data = self._get_auth_token_data(user_ref,
tenant_ref,
@ -303,20 +327,54 @@ class Auth(controller.V2Controller):
exception.Unauthorized(e)
return tenant_ref
def _get_metadata_ref(self, context, user_id, tenant_id):
"""Returns the metadata_ref for a user in a tenant"""
def _get_metadata_ref(self, context, user_id=None, tenant_id=None,
group_id=None):
"""Returns the metadata_ref for a user or group in a tenant"""
metadata_ref = {}
if tenant_id:
try:
if user_id:
metadata_ref = self.identity_api.get_metadata(
context=context,
user_id=user_id,
tenant_id=tenant_id)
elif group_id:
metadata_ref = self.identity_api.get_metadata(
context=context,
group_id=group_id,
tenant_id=tenant_id)
except exception.MetadataNotFound:
metadata_ref = {}
return metadata_ref
def _get_group_metadata_ref(self, context, user_id, tenant_id):
"""Return any metadata for this project due to group grants"""
group_refs = self.identity_api.list_groups_for_user(context=context,
user_id=user_id)
metadata_ref = {}
for x in group_refs:
metadata_ref.update(self._get_metadata_ref(context,
group_id=x['id'],
tenant_id=tenant_id))
return metadata_ref
def _get_domain_metadata_ref(self, context, user_id, tenant_id):
"""Return any metadata for this project due to domain grants"""
# TODO (henry-nashe) Get the domain for this tenant...and then see if
# any domain grants apply. Bug #1093248
return {}
def _append_roles(self, metadata, additional_metadata):
"""
Update the roles in metadata to be the union of the roles from
both of the passed metadatas
"""
first = set(metadata.get('roles', []))
second = set(additional_metadata.get('roles', []))
metadata['roles'] = list(first.union(second))
def _get_token_ref(self, context, token_id, belongs_to=None):
"""Returns a token if a valid one exists.

View File

@ -165,6 +165,11 @@ class AuthWithToken(AuthTest):
def test_auth_unscoped_token_tenant(self):
"""Verify getting a token in a tenant with an unscoped token"""
# Add a role in so we can check we get this back
self.identity_api.add_role_to_user_and_tenant(
self.user_foo['id'],
self.tenant_bar['id'],
self.role_member['id'])
# Get an unscoped tenant
body_dict = _build_user_auth(
username='FOO',
@ -177,7 +182,41 @@ class AuthWithToken(AuthTest):
scoped_token = self.api.authenticate({}, body_dict)
tenant = scoped_token["access"]["token"]["tenant"]
roles = scoped_token["access"]["metadata"]["roles"]
self.assertEquals(tenant["id"], self.tenant_bar['id'])
self.assertEquals(roles[0], self.role_member['id'])
def test_auth_token_tenant_group_role(self):
"""Verify getting a token in a tenant with group roles"""
# Add a v2 style role in so we can check we get this back
self.identity_api.add_role_to_user_and_tenant(
self.user_foo['id'],
self.tenant_bar['id'],
self.role_member['id'])
# Now create a group role for this user as well
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
self.identity_api.add_user_to_group(self.user_foo['id'],
new_group['id'])
self.identity_api.create_grant(
group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id=self.role_keystone_admin['id'])
# Get a scoped token for the tenant
body_dict = _build_user_auth(
username='FOO',
password='foo2',
tenant_name="BAR")
scoped_token = self.api.authenticate({}, body_dict)
tenant = scoped_token["access"]["token"]["tenant"]
roles = scoped_token["access"]["metadata"]["roles"]
self.assertEquals(tenant["id"], self.tenant_bar['id'])
self.assertIn(self.role_member['id'], roles)
self.assertIn(self.role_keystone_admin['id'], roles)
class AuthWithPasswordCredentials(AuthTest):

View File

@ -385,6 +385,185 @@ class IdentityTests(object):
self.tenant_bar['id'],
'member')
def test_get_role_grant_by_user_and_project(self):
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
self.assertEquals(len(roles_ref), 0)
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='keystone_admin')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
self.assertDictEqual(roles_ref[0], self.role_keystone_admin)
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
roles_ref_ids = []
for i, ref in enumerate(roles_ref):
roles_ref_ids.append(ref['id'])
self.assertIn('keystone_admin', roles_ref_ids)
self.assertIn('member', roles_ref_ids)
def test_get_role_grants_for_user_and_project_404(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.list_grants,
user_id=uuid.uuid4().hex,
project_id=self.tenant_bar['id'])
self.assertRaises(exception.TenantNotFound,
self.identity_api.list_grants,
user_id=self.user_foo['id'],
project_id=uuid.uuid4().hex)
def test_add_role_grant_to_user_and_project_404(self):
self.assertRaises(exception.UserNotFound,
self.identity_api.create_grant,
user_id=uuid.uuid4().hex,
project_id=self.tenant_bar['id'],
role_id='keystone_admin')
self.assertRaises(exception.TenantNotFound,
self.identity_api.create_grant,
user_id=self.user_foo['id'],
project_id=uuid.uuid4().hex,
role_id='keystone_admin')
self.assertRaises(exception.RoleNotFound,
self.identity_api.create_grant,
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id=uuid.uuid4().hex)
def test_remove_role_grant_from_user_and_project(self):
self.identity_api.create_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
self.assertDictEqual(roles_ref[0], self.role_member)
self.identity_api.delete_grant(user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'])
self.assertEquals(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.identity_api.delete_grant,
user_id=self.user_foo['id'],
project_id=self.tenant_bar['id'],
role_id='member')
def test_get_and_remove_role_grant_by_group_and_project(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertEquals(len(roles_ref), 0)
self.identity_api.create_grant(group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertDictEqual(roles_ref[0], self.role_member)
self.identity_api.delete_grant(group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],
project_id=self.tenant_bar['id'])
self.assertEquals(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.identity_api.delete_grant,
group_id=new_group['id'],
project_id=self.tenant_bar['id'],
role_id='member')
def test_get_and_remove_role_grant_by_group_and_domain(self):
new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(new_domain['id'], new_domain)
new_group = {'id': uuid.uuid4().hex, 'domain_id': new_domain['id'],
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],
domain_id=new_domain['id'])
self.assertEquals(len(roles_ref), 0)
self.identity_api.create_grant(group_id=new_group['id'],
domain_id=new_domain['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],
domain_id=new_domain['id'])
self.assertDictEqual(roles_ref[0], self.role_member)
self.identity_api.delete_grant(group_id=new_group['id'],
domain_id=new_domain['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
group_id=new_group['id'],
domain_id=new_domain['id'])
self.assertEquals(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.identity_api.delete_grant,
group_id=new_group['id'],
domain_id=new_domain['id'],
role_id='member')
def test_get_and_remove_role_grant_by_user_and_domain(self):
new_domain = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_domain(new_domain['id'], new_domain)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
roles_ref = self.identity_api.list_grants(
user_id=new_user['id'],
domain_id=new_domain['id'])
self.assertEquals(len(roles_ref), 0)
self.identity_api.create_grant(user_id=new_user['id'],
domain_id=new_domain['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=new_user['id'],
domain_id=new_domain['id'])
self.assertDictEqual(roles_ref[0], self.role_member)
self.identity_api.delete_grant(user_id=new_user['id'],
domain_id=new_domain['id'],
role_id='member')
roles_ref = self.identity_api.list_grants(
user_id=new_user['id'],
domain_id=new_domain['id'])
self.assertEquals(len(roles_ref), 0)
self.assertRaises(exception.NotFound,
self.identity_api.delete_grant,
user_id=new_user['id'],
domain_id=new_domain['id'],
role_id='member')
def test_role_crud(self):
role = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex}
self.identity_api.create_role(role['id'], role)
@ -696,6 +875,133 @@ class IdentityTests(object):
tenant_ref = self.identity_api.get_tenant('fake1')
self.assertEqual(tenant_ref['enabled'], tenant['enabled'])
def test_add_user_to_group(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
groups = self.identity_api.list_groups_for_user(new_user['id'])
found = False
for x in groups:
if (x['id'] == new_group['id']):
found = True
self.assertTrue(found)
def test_add_user_to_group_404(self):
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.assertRaises(exception.GroupNotFound,
self.identity_api.add_user_to_group,
new_user['id'],
uuid.uuid4().hex)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
self.assertRaises(exception.UserNotFound,
self.identity_api.add_user_to_group,
uuid.uuid4().hex,
new_group['id'])
def test_check_user_in_group(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
self.identity_api.check_user_in_group(new_user['id'], new_group['id'])
def test_check_user_not_in_group(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
self.assertRaises(exception.UserNotFound,
self.identity_api.check_user_in_group,
uuid.uuid4().hex,
new_group['id'])
def test_list_users_in_group(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
user_refs = self.identity_api.list_users_in_group(new_group['id'])
found = False
for x in user_refs:
if (x['id'] == new_user['id']):
found = True
self.assertTrue(found)
def test_remove_user_from_group(self):
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
self.identity_api.add_user_to_group(new_user['id'],
new_group['id'])
agroups = self.identity_api.list_groups_for_user(new_user['id'])
self.identity_api.remove_user_from_group(new_user['id'],
new_group['id'])
groups = self.identity_api.list_groups_for_user(new_user['id'])
for x in groups:
self.assertFalse(x['id'] == new_group['id'])
def test_remove_user_from_group_404(self):
new_user = {'id': uuid.uuid4().hex, 'name': 'new_user',
'password': 'secret', 'enabled': True}
self.identity_api.create_user(new_user['id'], new_user)
new_group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(new_group['id'], new_group)
self.assertRaises(exception.NotFound,
self.identity_api.remove_user_from_group,
new_user['id'],
uuid.uuid4().hex)
self.assertRaises(exception.NotFound,
self.identity_api.remove_user_from_group,
uuid.uuid4().hex,
new_group['id'])
self.assertRaises(exception.NotFound,
self.identity_api.remove_user_from_group,
uuid.uuid4().hex,
uuid.uuid4().hex)
def test_group_crud(self):
group = {'id': uuid.uuid4().hex, 'domain_id': uuid.uuid4().hex,
'name': uuid.uuid4().hex}
self.identity_api.create_group(group['id'], group)
group_ref = self.identity_api.get_group(group['id'])
group_ref_dict = dict((x, group_ref[x]) for x in group_ref)
self.assertDictEqual(group_ref_dict, group)
group['name'] = uuid.uuid4().hex
self.identity_api.update_group(group['id'], group)
group_ref = self.identity_api.get_group(group['id'])
group_ref_dict = dict((x, group_ref[x]) for x in group_ref)
self.assertDictEqual(group_ref_dict, group)
self.identity_api.delete_group(group['id'])
self.assertRaises(exception.GroupNotFound,
self.identity_api.get_group,
group['id'])
class TokenTests(object):
def test_token_crud(self):

View File

@ -395,3 +395,50 @@ class LDAPIdentity(test.TestCase, test_backend.IdentityTests):
self.identity_api.update_user('fake1', user)
user_ref = self.identity_api.get_user('fake1')
self.assertEqual(user_ref['enabled'], True)
# TODO (henry-nash) These need to be removed when the full LDAP implementation
# is submitted - see BugL #1092187
def test_group_crud(self):
pass
def test_add_user_to_group(self):
pass
def test_add_user_to_group_404(self):
pass
def test_check_user_in_group(self):
pass
def test_check_user_not_in_group(self):
pass
def test_list_users_in_group(self):
pass
def test_remove_user_from_group(self):
pass
def test_remove_user_from_group_404(self):
pass
def test_get_role_grant_by_user_and_project(self):
pass
def test_get_role_grants_for_user_and_project_404(self):
pass
def test_add_role_grant_to_user_and_project_404(self):
pass
def test_remove_role_grant_from_user_and_project(self):
pass
def test_get_and_remove_role_grant_by_group_and_project(self):
pass
def test_get_and_remove_role_grant_by_group_and_domain(self):
pass
def test_get_and_remove_role_grant_by_user_and_domain(self):
pass

View File

@ -176,6 +176,22 @@ class SqlUpgradeTests(test.TestCase):
self.assertEqual(ref.url, endpoint_extra['%surl' % interface])
self.assertEqual(ref.extra, '{}')
def test_upgrade_12_to_13(self):
self.upgrade(12)
self.upgrade(13)
self.assertTableExists('group')
self.assertTableExists('group_project_metadata')
self.assertTableExists('group_domain_metadata')
self.assertTableExists('user_group_membership')
def test_downgrade_13_to_12(self):
self.upgrade(13)
self.downgrade(12)
self.assertTableDoesNotExist('group')
self.assertTableDoesNotExist('group_project_metadata')
self.assertTableDoesNotExist('group_domain_metadata')
self.assertTableDoesNotExist('user_group_membership')
def test_downgrade_12_to_9(self):
self.upgrade(12)
@ -253,7 +269,7 @@ class SqlUpgradeTests(test.TestCase):
', '.join("'%s'" % v for v in d.values())))
def test_downgrade_to_0(self):
self.upgrade(12)
self.upgrade(13)
self.downgrade(0)
for table_name in ["user", "token", "role", "user_tenant_membership",
"metadata"]:

View File

@ -64,6 +64,11 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
ref['project_id'] = project_id
return ref
def new_group_ref(self, domain_id):
ref = self.new_ref()
ref['domain_id'] = domain_id
return ref
def new_credential_ref(self, user_id, project_id=None):
ref = self.new_ref()
ref['user_id'] = user_id
@ -118,6 +123,9 @@ class RestfulTestCase(test_content_types.RestfulTestCase):
def post(self, path, **kwargs):
return self.v3_request(method='POST', path=path, **kwargs)
def put(self, path, **kwargs):
return self.v3_request(method='PUT', path=path, **kwargs)
def patch(self, path, **kwargs):
return self.v3_request(method='PATCH', path=path, **kwargs)

View File

@ -4,7 +4,7 @@ import test_v3
class IdentityTestCase(test_v3.RestfulTestCase):
"""Test domains, projects, users, credential & role CRUD"""
"""Test domains, projects, users, groups, credential & role CRUD"""
def setUp(self):
super(IdentityTestCase, self).setUp()
@ -33,6 +33,14 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.user_id,
self.user.copy())
self.group_id = uuid.uuid4().hex
self.group = self.new_group_ref(
domain_id=self.domain_id)
self.group['id'] = self.group_id
self.identity_api.create_group(
self.group_id,
self.group.copy())
self.credential_id = uuid.uuid4().hex
self.credential = self.new_credential_ref(
user_id=self.user_id,
@ -117,6 +125,28 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.assertEqual(ref['email'], entity['email'])
return entity
# group validation
def assertValidGroupListResponse(self, resp, ref):
return self.assertValidListResponse(
resp,
'groups',
self.assertValidGroup,
ref)
def assertValidGroupResponse(self, resp, ref):
return self.assertValidResponse(
resp,
'group',
self.assertValidGroup,
ref)
def assertValidGroup(self, entity, ref=None):
self.assertIsNotNone(entity.get('name'))
if ref:
self.assertEqual(ref['name'], entity['name'])
return entity
# credential validation
def assertValidCredentialListResponse(self, resp, ref):
@ -161,8 +191,31 @@ class IdentityTestCase(test_v3.RestfulTestCase):
ref)
def assertValidRole(self, entity, ref=None):
self.assertIsNotNone(entity.get('name'))
if ref:
pass
self.assertEqual(ref['name'], entity['name'])
return entity
# grant validation
def assertValidGrantListResponse(self, resp, ref):
entities = resp.body
self.assertIsNotNone(entities)
self.assertTrue(len(entities))
roles_ref_ids = []
for i, entity in enumerate(entities):
self.assertValidEntity(entity)
self.assertValidGrant(entity, ref)
if ref and entity['id'] == ref['id'][0]:
self.assertValidEntity(entity, ref)
self.assertValidGrant(entity, ref)
def assertValidGrant(self, entity, ref=None):
self.assertIsNotNone(entity.get('id'))
self.assertIsNotNone(entity.get('name'))
if ref:
self.assertEqual(ref['id'], entity['id'])
self.assertEqual(ref['name'], entity['name'])
return entity
# domain crud tests
@ -259,6 +312,33 @@ class IdentityTestCase(test_v3.RestfulTestCase):
'user_id': self.user_id})
self.assertValidUserResponse(r, self.user)
def test_add_user_to_group(self):
"""PUT /groups/{group_id}/users/{user_id}"""
r = self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
def test_check_user_in_group(self):
"""HEAD /groups/{group_id}/users/{user_id}"""
r = self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
r = self.head('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
def test_list_users_in_group(self):
"""GET /groups/{group_id}/users"""
r = self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
r = self.get('/groups/%(group_id)s/users' % {
'group_id': self.group_id})
self.assertValidUserListResponse(r, self.user)
def test_remove_user_from_group(self):
"""DELETE /groups/{group_id}/users/{user_id}"""
r = self.put('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
r = self.delete('/groups/%(group_id)s/users/%(user_id)s' % {
'group_id': self.group_id, 'user_id': self.user_id})
def test_update_user(self):
"""PATCH /users/{user_id}"""
user = self.new_user_ref(domain_id=self.domain_id)
@ -273,6 +353,41 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.delete('/users/%(user_id)s' % {
'user_id': self.user_id})
# group crud tests
def test_create_group(self):
"""POST /groups"""
ref = self.new_group_ref(domain_id=self.domain_id)
r = self.post(
'/groups',
body={'group': ref})
return self.assertValidGroupResponse(r, ref)
def test_list_groups(self):
"""GET /groups"""
r = self.get('/groups')
self.assertValidGroupListResponse(r, self.group)
def test_get_group(self):
"""GET /groups/{group_id}"""
r = self.get('/groups/%(group_id)s' % {
'group_id': self.group_id})
self.assertValidGroupResponse(r, self.group)
def test_update_group(self):
"""PATCH /groups/{group_id}"""
group = self.new_group_ref(domain_id=self.domain_id)
del group['id']
r = self.patch('/groups/%(group_id)s' % {
'group_id': self.group_id},
body={'group': group})
self.assertValidGroupResponse(r, group)
def test_delete_group(self):
"""DELETE /groups/{group_id}"""
self.delete('/groups/%(group_id)s' % {
'group_id': self.group_id})
# credential crud tests
def test_list_credentials(self):
@ -347,3 +462,83 @@ class IdentityTestCase(test_v3.RestfulTestCase):
"""DELETE /roles/{role_id}"""
self.delete('/roles/%(role_id)s' % {
'role_id': self.role_id})
def test_create_user_project_grant(self):
"""PUT /projects/{project_id}/users/{user_id}/roles/{role_id}"""
self.put('/projects/%(project_id)s/users/%(user_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'user_id': self.user_id,
'role_id': self.role_id})
self.head('/projects/%(project_id)s/users/%(user_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'user_id': self.user_id,
'role_id': self.role_id})
def test_create_group_project_grant(self):
"""PUT /projects/{project_id}/groups/{group_id}/roles/{role_id}"""
self.put('/projects/%(project_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'group_id': self.group_id,
'role_id': self.role_id})
self.head('/projects/%(project_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'group_id': self.group_id,
'role_id': self.role_id})
def test_create_group_domain_grant(self):
"""PUT /domains/{domain_id}/groups/{group_id}/roles/{role_id}"""
self.put('/domains/%(domain_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'domain_id': self.domain_id,
'group_id': self.group_id,
'role_id': self.role_id})
self.head('/domains/%(domain_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'domain_id': self.domain_id,
'group_id': self.group_id,
'role_id': self.role_id})
def test_list_user_project_grants(self):
"""GET /projects/{project_id}/users/{user_id}/roles"""
self.put('/projects/%(project_id)s/users/%(user_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'user_id': self.user_id,
'role_id': self.role_id})
r = self.get('/projects/%(project_id)s/users/%(user_id)s/roles' % {
'project_id': self.project_id,
'user_id': self.user_id})
self.assertValidGrantListResponse(r, self.role)
def test_list_group_project_grants(self):
"""GET /projects/{project_id}/groups/{group_id}/roles"""
self.put('/projects/%(project_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'group_id': self.group_id,
'role_id': self.role_id})
r = self.get('/projects/%(project_id)s/groups/%(group_id)s/roles' % {
'project_id': self.project_id,
'group_id': self.group_id})
self.assertValidGrantListResponse(r, self.role)
def test_delete_group_project_grant(self):
"""DELETE /projects/{project_id}/groups/{group_id}/roles/{role_id}"""
self.put('/projects/%(project_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'group_id': self.group_id,
'role_id': self.role_id})
self.delete('/projects/%(project_id)s/groups/%(group_id)s/roles/'
'%(role_id)s' % {
'project_id': self.project_id,
'group_id': self.group_id,
'role_id': self.role_id})
r = self.get('/projects/%(project_id)s/groups/%(group_id)s/roles' % {
'project_id': self.project_id,
'group_id': self.group_id})
self.assertEquals(len(r.body), 0)