Merge "Unimplemented get roles by group for project list"
This commit is contained in:
commit
6f12495ed1
@ -134,12 +134,16 @@ class Assignment(assignment.Driver):
|
||||
return self.role.get_all()
|
||||
|
||||
def list_projects_for_user(self, user_id, group_ids, hints):
|
||||
# NOTE(henry-nash): The LDAP backend is being deprecated, so no
|
||||
# support is provided for projects that the user has a role on solely
|
||||
# by virtue of group membership.
|
||||
user_dn = self.user._id_to_dn(user_id)
|
||||
associations = (self.role.list_project_roles_for_user
|
||||
(user_dn, self.project.tree_dn))
|
||||
|
||||
for group_id in group_ids:
|
||||
group_dn = self.group._id_to_dn(group_id)
|
||||
for group_role in self.role.list_project_roles_for_group(
|
||||
group_dn, self.project.tree_dn):
|
||||
associations.append(group_role)
|
||||
|
||||
# Since the LDAP backend doesn't store the domain_id in the LDAP
|
||||
# records (and only supports the default domain), we fill in the
|
||||
# domain_id before we return the list.
|
||||
@ -590,6 +594,40 @@ class RoleApi(common_ldap.BaseLdap):
|
||||
tenant_dn=tenant_dn))
|
||||
return res
|
||||
|
||||
def list_project_roles_for_group(self, group_dn, project_subtree):
|
||||
group_dn_esc = ldap.filter.escape_filter_chars(group_dn)
|
||||
conn = self.get_connection()
|
||||
query = '(&(objectClass=%s)(%s=%s))' % (self.object_class,
|
||||
self.member_attribute,
|
||||
group_dn_esc)
|
||||
try:
|
||||
roles = conn.search_s(project_subtree,
|
||||
ldap.SCOPE_SUBTREE,
|
||||
query,
|
||||
attrlist=['1.1'])
|
||||
except ldap.NO_SUCH_OBJECT:
|
||||
# Return no roles rather than raise an exception if the project
|
||||
# subtree entry doesn't exist because an empty subtree is not
|
||||
# an error.
|
||||
return []
|
||||
finally:
|
||||
conn.unbind_s()
|
||||
|
||||
res = []
|
||||
for role_dn, _ in roles:
|
||||
# ldap.dn.str2dn returns a list, where the first
|
||||
# element is the first RDN.
|
||||
# For a role assignment, this contains the role ID,
|
||||
# the remainder is the DN of the project.
|
||||
project = ldap.dn.str2dn(role_dn)
|
||||
project.pop(0)
|
||||
project_dn = ldap.dn.dn2str(project)
|
||||
res.append(GroupRoleAssociation(
|
||||
group_dn=group_dn,
|
||||
role_dn=role_dn,
|
||||
tenant_dn=project_dn))
|
||||
return res
|
||||
|
||||
def roles_delete_subtree_by_project(self, tenant_dn):
|
||||
conn = self.get_connection()
|
||||
query = '(objectClass=%s)' % self.object_class
|
||||
|
@ -260,15 +260,86 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
|
||||
'enabled': True}
|
||||
self.identity_api.create_user(user1['id'], user1)
|
||||
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||
self.assertEqual(0, len(user_projects))
|
||||
self.assertThat(user_projects, matchers.HasLength(0))
|
||||
|
||||
# new grant(user1, role_member, tenant_bar)
|
||||
self.assignment_api.create_grant(user_id=user1['id'],
|
||||
project_id=self.tenant_bar['id'],
|
||||
role_id=self.role_member['id'])
|
||||
# new grant(user1, role_member, tenant_baz)
|
||||
self.assignment_api.create_grant(user_id=user1['id'],
|
||||
project_id=self.tenant_baz['id'],
|
||||
role_id=self.role_member['id'])
|
||||
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||
self.assertEqual(2, len(user_projects))
|
||||
self.assertThat(user_projects, matchers.HasLength(2))
|
||||
|
||||
# Now, check number of projects through groups
|
||||
user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
|
||||
'enabled': True}
|
||||
self.identity_api.create_user(user2['id'], user2)
|
||||
|
||||
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||
'domain_id': domain['id']}
|
||||
self.identity_api.create_group(group1['id'], group1)
|
||||
|
||||
self.identity_api.add_user_to_group(user2['id'], group1['id'])
|
||||
|
||||
# new grant(group1(user2), role_member, tenant_bar)
|
||||
self.assignment_api.create_grant(group_id=group1['id'],
|
||||
project_id=self.tenant_bar['id'],
|
||||
role_id=self.role_member['id'])
|
||||
# new grant(group1(user2), role_member, tenant_baz)
|
||||
self.assignment_api.create_grant(group_id=group1['id'],
|
||||
project_id=self.tenant_baz['id'],
|
||||
role_id=self.role_member['id'])
|
||||
user_projects = self.assignment_api.list_projects_for_user(user2['id'])
|
||||
self.assertThat(user_projects, matchers.HasLength(2))
|
||||
|
||||
# new grant(group1(user2), role_other, tenant_bar)
|
||||
self.assignment_api.create_grant(group_id=group1['id'],
|
||||
project_id=self.tenant_bar['id'],
|
||||
role_id=self.role_other['id'])
|
||||
user_projects = self.assignment_api.list_projects_for_user(user2['id'])
|
||||
self.assertThat(user_projects, matchers.HasLength(2))
|
||||
|
||||
def test_list_projects_for_user_and_groups(self):
|
||||
domain = self._get_domain_fixture()
|
||||
# Create user1
|
||||
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
|
||||
'enabled': True}
|
||||
self.identity_api.create_user(user1['id'], user1)
|
||||
|
||||
# Create new group for user1
|
||||
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||
'domain_id': domain['id']}
|
||||
self.identity_api.create_group(group1['id'], group1)
|
||||
|
||||
# Add user1 to group1
|
||||
self.identity_api.add_user_to_group(user1['id'], group1['id'])
|
||||
|
||||
# Now, add grant to user1 and group1 in tenant_bar
|
||||
self.assignment_api.create_grant(user_id=user1['id'],
|
||||
project_id=self.tenant_bar['id'],
|
||||
role_id=self.role_member['id'])
|
||||
self.assignment_api.create_grant(group_id=group1['id'],
|
||||
project_id=self.tenant_bar['id'],
|
||||
role_id=self.role_member['id'])
|
||||
|
||||
# The result is user1 has only one project granted
|
||||
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||
self.assertThat(user_projects, matchers.HasLength(1))
|
||||
|
||||
# Now, delete user1 grant into tenant_bar and check
|
||||
self.assignment_api.delete_grant(user_id=user1['id'],
|
||||
project_id=self.tenant_bar['id'],
|
||||
role_id=self.role_member['id'])
|
||||
|
||||
# The result is user1 has only one project granted.
|
||||
# Granted through group1.
|
||||
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||
self.assertThat(user_projects, matchers.HasLength(1))
|
||||
|
||||
def test_list_projects_for_user_with_grants(self):
|
||||
domain = self._get_domain_fixture()
|
||||
@ -308,7 +379,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
|
||||
|
||||
user_projects = self.assignment_api.list_projects_for_user(
|
||||
new_user['id'])
|
||||
self.assertEqual(2, len(user_projects))
|
||||
self.assertEqual(3, len(user_projects))
|
||||
|
||||
def test_create_duplicate_user_name_in_different_domains(self):
|
||||
self.skipTest('Blocked by bug 1101276')
|
||||
|
Loading…
Reference in New Issue
Block a user