Unimplemented get roles by group for project list
The list_projects_for_user() function on LDAP assignment backend only listed projects with associations across user_id, not across group_ids. This function admits the group_ids parameter, but never is used on the body of the function. Change-Id: I0ff6791e11aa18ffb3a4407e8e5958ac03f2086b Closes-Bug: #1284639
This commit is contained in:
parent
455d50e8ae
commit
33a3822ecd
|
@ -133,12 +133,16 @@ class Assignment(assignment.Driver):
|
||||||
return self.role.get_all()
|
return self.role.get_all()
|
||||||
|
|
||||||
def list_projects_for_user(self, user_id, group_ids, hints):
|
def list_projects_for_user(self, user_id, group_ids, hints):
|
||||||
# NOTE(henry-nash): The LDAP backend is being deprecated, so no
|
|
||||||
# support is provided for projects that the user has a role on solely
|
|
||||||
# by virtue of group membership.
|
|
||||||
user_dn = self.user._id_to_dn(user_id)
|
user_dn = self.user._id_to_dn(user_id)
|
||||||
associations = (self.role.list_project_roles_for_user
|
associations = (self.role.list_project_roles_for_user
|
||||||
(user_dn, self.project.tree_dn))
|
(user_dn, self.project.tree_dn))
|
||||||
|
|
||||||
|
for group_id in group_ids:
|
||||||
|
group_dn = self.group._id_to_dn(group_id)
|
||||||
|
for group_role in self.role.list_project_roles_for_group(
|
||||||
|
group_dn, self.project.tree_dn):
|
||||||
|
associations.append(group_role)
|
||||||
|
|
||||||
# Since the LDAP backend doesn't store the domain_id in the LDAP
|
# Since the LDAP backend doesn't store the domain_id in the LDAP
|
||||||
# records (and only supports the default domain), we fill in the
|
# records (and only supports the default domain), we fill in the
|
||||||
# domain_id before we return the list.
|
# domain_id before we return the list.
|
||||||
|
@ -590,6 +594,40 @@ class RoleApi(common_ldap.BaseLdap):
|
||||||
tenant_dn=tenant_dn))
|
tenant_dn=tenant_dn))
|
||||||
return res
|
return res
|
||||||
|
|
||||||
|
def list_project_roles_for_group(self, group_dn, project_subtree):
|
||||||
|
group_dn_esc = ldap.filter.escape_filter_chars(group_dn)
|
||||||
|
conn = self.get_connection()
|
||||||
|
query = '(&(objectClass=%s)(%s=%s))' % (self.object_class,
|
||||||
|
self.member_attribute,
|
||||||
|
group_dn_esc)
|
||||||
|
try:
|
||||||
|
roles = conn.search_s(project_subtree,
|
||||||
|
ldap.SCOPE_SUBTREE,
|
||||||
|
query,
|
||||||
|
attrlist=['1.1'])
|
||||||
|
except ldap.NO_SUCH_OBJECT:
|
||||||
|
# Return no roles rather than raise an exception if the project
|
||||||
|
# subtree entry doesn't exist because an empty subtree is not
|
||||||
|
# an error.
|
||||||
|
return []
|
||||||
|
finally:
|
||||||
|
conn.unbind_s()
|
||||||
|
|
||||||
|
res = []
|
||||||
|
for role_dn, _ in roles:
|
||||||
|
# ldap.dn.str2dn returns a list, where the first
|
||||||
|
# element is the first RDN.
|
||||||
|
# For a role assignment, this contains the role ID,
|
||||||
|
# the remainder is the DN of the project.
|
||||||
|
project = ldap.dn.str2dn(role_dn)
|
||||||
|
project.pop(0)
|
||||||
|
project_dn = ldap.dn.dn2str(project)
|
||||||
|
res.append(GroupRoleAssociation(
|
||||||
|
group_dn=group_dn,
|
||||||
|
role_dn=role_dn,
|
||||||
|
tenant_dn=project_dn))
|
||||||
|
return res
|
||||||
|
|
||||||
def roles_delete_subtree_by_project(self, tenant_dn):
|
def roles_delete_subtree_by_project(self, tenant_dn):
|
||||||
conn = self.get_connection()
|
conn = self.get_connection()
|
||||||
query = '(objectClass=%s)' % self.object_class
|
query = '(objectClass=%s)' % self.object_class
|
||||||
|
|
|
@ -260,15 +260,86 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
|
||||||
'enabled': True}
|
'enabled': True}
|
||||||
self.identity_api.create_user(user1['id'], user1)
|
self.identity_api.create_user(user1['id'], user1)
|
||||||
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||||
self.assertEqual(0, len(user_projects))
|
self.assertThat(user_projects, matchers.HasLength(0))
|
||||||
|
|
||||||
|
# new grant(user1, role_member, tenant_bar)
|
||||||
self.assignment_api.create_grant(user_id=user1['id'],
|
self.assignment_api.create_grant(user_id=user1['id'],
|
||||||
project_id=self.tenant_bar['id'],
|
project_id=self.tenant_bar['id'],
|
||||||
role_id=self.role_member['id'])
|
role_id=self.role_member['id'])
|
||||||
|
# new grant(user1, role_member, tenant_baz)
|
||||||
self.assignment_api.create_grant(user_id=user1['id'],
|
self.assignment_api.create_grant(user_id=user1['id'],
|
||||||
project_id=self.tenant_baz['id'],
|
project_id=self.tenant_baz['id'],
|
||||||
role_id=self.role_member['id'])
|
role_id=self.role_member['id'])
|
||||||
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||||
self.assertEqual(2, len(user_projects))
|
self.assertThat(user_projects, matchers.HasLength(2))
|
||||||
|
|
||||||
|
# Now, check number of projects through groups
|
||||||
|
user2 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||||
|
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
|
||||||
|
'enabled': True}
|
||||||
|
self.identity_api.create_user(user2['id'], user2)
|
||||||
|
|
||||||
|
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||||
|
'domain_id': domain['id']}
|
||||||
|
self.identity_api.create_group(group1['id'], group1)
|
||||||
|
|
||||||
|
self.identity_api.add_user_to_group(user2['id'], group1['id'])
|
||||||
|
|
||||||
|
# new grant(group1(user2), role_member, tenant_bar)
|
||||||
|
self.assignment_api.create_grant(group_id=group1['id'],
|
||||||
|
project_id=self.tenant_bar['id'],
|
||||||
|
role_id=self.role_member['id'])
|
||||||
|
# new grant(group1(user2), role_member, tenant_baz)
|
||||||
|
self.assignment_api.create_grant(group_id=group1['id'],
|
||||||
|
project_id=self.tenant_baz['id'],
|
||||||
|
role_id=self.role_member['id'])
|
||||||
|
user_projects = self.assignment_api.list_projects_for_user(user2['id'])
|
||||||
|
self.assertThat(user_projects, matchers.HasLength(2))
|
||||||
|
|
||||||
|
# new grant(group1(user2), role_other, tenant_bar)
|
||||||
|
self.assignment_api.create_grant(group_id=group1['id'],
|
||||||
|
project_id=self.tenant_bar['id'],
|
||||||
|
role_id=self.role_other['id'])
|
||||||
|
user_projects = self.assignment_api.list_projects_for_user(user2['id'])
|
||||||
|
self.assertThat(user_projects, matchers.HasLength(2))
|
||||||
|
|
||||||
|
def test_list_projects_for_user_and_groups(self):
|
||||||
|
domain = self._get_domain_fixture()
|
||||||
|
# Create user1
|
||||||
|
user1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||||
|
'password': uuid.uuid4().hex, 'domain_id': domain['id'],
|
||||||
|
'enabled': True}
|
||||||
|
self.identity_api.create_user(user1['id'], user1)
|
||||||
|
|
||||||
|
# Create new group for user1
|
||||||
|
group1 = {'id': uuid.uuid4().hex, 'name': uuid.uuid4().hex,
|
||||||
|
'domain_id': domain['id']}
|
||||||
|
self.identity_api.create_group(group1['id'], group1)
|
||||||
|
|
||||||
|
# Add user1 to group1
|
||||||
|
self.identity_api.add_user_to_group(user1['id'], group1['id'])
|
||||||
|
|
||||||
|
# Now, add grant to user1 and group1 in tenant_bar
|
||||||
|
self.assignment_api.create_grant(user_id=user1['id'],
|
||||||
|
project_id=self.tenant_bar['id'],
|
||||||
|
role_id=self.role_member['id'])
|
||||||
|
self.assignment_api.create_grant(group_id=group1['id'],
|
||||||
|
project_id=self.tenant_bar['id'],
|
||||||
|
role_id=self.role_member['id'])
|
||||||
|
|
||||||
|
# The result is user1 has only one project granted
|
||||||
|
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||||
|
self.assertThat(user_projects, matchers.HasLength(1))
|
||||||
|
|
||||||
|
# Now, delete user1 grant into tenant_bar and check
|
||||||
|
self.assignment_api.delete_grant(user_id=user1['id'],
|
||||||
|
project_id=self.tenant_bar['id'],
|
||||||
|
role_id=self.role_member['id'])
|
||||||
|
|
||||||
|
# The result is user1 has only one project granted.
|
||||||
|
# Granted through group1.
|
||||||
|
user_projects = self.assignment_api.list_projects_for_user(user1['id'])
|
||||||
|
self.assertThat(user_projects, matchers.HasLength(1))
|
||||||
|
|
||||||
def test_list_projects_for_user_with_grants(self):
|
def test_list_projects_for_user_with_grants(self):
|
||||||
domain = self._get_domain_fixture()
|
domain = self._get_domain_fixture()
|
||||||
|
@ -308,7 +379,7 @@ class BaseLDAPIdentity(test_backend.IdentityTests):
|
||||||
|
|
||||||
user_projects = self.assignment_api.list_projects_for_user(
|
user_projects = self.assignment_api.list_projects_for_user(
|
||||||
new_user['id'])
|
new_user['id'])
|
||||||
self.assertEqual(2, len(user_projects))
|
self.assertEqual(3, len(user_projects))
|
||||||
|
|
||||||
def test_create_duplicate_user_name_in_different_domains(self):
|
def test_create_duplicate_user_name_in_different_domains(self):
|
||||||
self.skipTest('Blocked by bug 1101276')
|
self.skipTest('Blocked by bug 1101276')
|
||||||
|
|
Loading…
Reference in New Issue