From 33a3822ecd3095f36983d88439644dc90446e896 Mon Sep 17 00:00:00 2001 From: Marcos Lobo Date: Wed, 26 Feb 2014 10:10:00 +0100 Subject: [PATCH] Unimplemented get roles by group for project list The list_projects_for_user() function on LDAP assignment backend only listed projects with associations across user_id, not across group_ids. This function admits the group_ids parameter, but never is used on the body of the function. Change-Id: I0ff6791e11aa18ffb3a4407e8e5958ac03f2086b Closes-Bug: #1284639 --- keystone/assignment/backends/ldap.py | 44 ++++++++++++++-- keystone/tests/test_backend_ldap.py | 77 ++++++++++++++++++++++++++-- 2 files changed, 115 insertions(+), 6 deletions(-) diff --git a/keystone/assignment/backends/ldap.py b/keystone/assignment/backends/ldap.py index 48058bddba..994960f510 100644 --- a/keystone/assignment/backends/ldap.py +++ b/keystone/assignment/backends/ldap.py @@ -133,12 +133,16 @@ class Assignment(assignment.Driver): return self.role.get_all() def list_projects_for_user(self, user_id, group_ids, hints): - # NOTE(henry-nash): The LDAP backend is being deprecated, so no - # support is provided for projects that the user has a role on solely - # by virtue of group membership. user_dn = self.user._id_to_dn(user_id) associations = (self.role.list_project_roles_for_user (user_dn, self.project.tree_dn)) + + for group_id in group_ids: + group_dn = self.group._id_to_dn(group_id) + for group_role in self.role.list_project_roles_for_group( + group_dn, self.project.tree_dn): + associations.append(group_role) + # Since the LDAP backend doesn't store the domain_id in the LDAP # records (and only supports the default domain), we fill in the # domain_id before we return the list. @@ -590,6 +594,40 @@ class RoleApi(common_ldap.BaseLdap): tenant_dn=tenant_dn)) return res + def list_project_roles_for_group(self, group_dn, project_subtree): + group_dn_esc = ldap.filter.escape_filter_chars(group_dn) + conn = self.get_connection() + query = '(&(objectClass=%s)(%s=%s))' % (self.object_class, + self.member_attribute, + group_dn_esc) + try: + roles = conn.search_s(project_subtree, + ldap.SCOPE_SUBTREE, + query, + attrlist=['1.1']) + except ldap.NO_SUCH_OBJECT: + # Return no roles rather than raise an exception if the project + # subtree entry doesn't exist because an empty subtree is not + # an error. + return [] + finally: + conn.unbind_s() + + res = [] + for role_dn, _ in roles: + # ldap.dn.str2dn returns a list, where the first + # element is the first RDN. + # For a role assignment, this contains the role ID, + # the remainder is the DN of the project. + project = ldap.dn.str2dn(role_dn) + project.pop(0) + project_dn = ldap.dn.dn2str(project) + res.append(GroupRoleAssociation( + group_dn=group_dn, + role_dn=role_dn, + tenant_dn=project_dn)) + return res + def roles_delete_subtree_by_project(self, tenant_dn): conn = self.get_connection() query = '(objectClass=%s)' % self.object_class diff --git a/keystone/tests/test_backend_ldap.py b/keystone/tests/test_backend_ldap.py index 538ef063ba..01a7bcc5eb 100644 --- a/keystone/tests/test_backend_ldap.py +++ b/keystone/tests/test_backend_ldap.py @@ -260,15 +260,86 @@ class BaseLDAPIdentity(test_backend.IdentityTests): 'enabled': True} self.identity_api.create_user(user1['id'], user1) user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(0, len(user_projects)) + self.assertThat(user_projects, matchers.HasLength(0)) + + # new grant(user1, role_member, tenant_bar) self.assignment_api.create_grant(user_id=user1['id'], project_id=self.tenant_bar['id'], role_id=self.role_member['id']) + # new grant(user1, role_member, tenant_baz) self.assignment_api.create_grant(user_id=user1['id'], project_id=self.tenant_baz['id'], role_id=self.role_member['id']) user_projects = self.assignment_api.list_projects_for_user(user1['id']) - self.assertEqual(2, len(user_projects)) + self.assertThat(user_projects, matchers.HasLength(2)) + + # Now, check number of projects through groups + user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'domain_id': domain['id'], + 'enabled': True} + self.identity_api.create_user(user2['id'], user2) + + group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain['id']} + self.identity_api.create_group(group1['id'], group1) + + self.identity_api.add_user_to_group(user2['id'], group1['id']) + + # new grant(group1(user2), role_member, tenant_bar) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + # new grant(group1(user2), role_member, tenant_baz) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=self.tenant_baz['id'], + role_id=self.role_member['id']) + user_projects = self.assignment_api.list_projects_for_user(user2['id']) + self.assertThat(user_projects, matchers.HasLength(2)) + + # new grant(group1(user2), role_other, tenant_bar) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_other['id']) + user_projects = self.assignment_api.list_projects_for_user(user2['id']) + self.assertThat(user_projects, matchers.HasLength(2)) + + def test_list_projects_for_user_and_groups(self): + domain = self._get_domain_fixture() + # Create user1 + user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'password': uuid.uuid4().hex, 'domain_id': domain['id'], + 'enabled': True} + self.identity_api.create_user(user1['id'], user1) + + # Create new group for user1 + group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex, + 'domain_id': domain['id']} + self.identity_api.create_group(group1['id'], group1) + + # Add user1 to group1 + self.identity_api.add_user_to_group(user1['id'], group1['id']) + + # Now, add grant to user1 and group1 in tenant_bar + self.assignment_api.create_grant(user_id=user1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + self.assignment_api.create_grant(group_id=group1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + + # The result is user1 has only one project granted + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertThat(user_projects, matchers.HasLength(1)) + + # Now, delete user1 grant into tenant_bar and check + self.assignment_api.delete_grant(user_id=user1['id'], + project_id=self.tenant_bar['id'], + role_id=self.role_member['id']) + + # The result is user1 has only one project granted. + # Granted through group1. + user_projects = self.assignment_api.list_projects_for_user(user1['id']) + self.assertThat(user_projects, matchers.HasLength(1)) def test_list_projects_for_user_with_grants(self): domain = self._get_domain_fixture() @@ -308,7 +379,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests): user_projects = self.assignment_api.list_projects_for_user( new_user['id']) - self.assertEqual(2, len(user_projects)) + self.assertEqual(3, len(user_projects)) def test_create_duplicate_user_name_in_different_domains(self): self.skipTest('Blocked by bug 1101276')