From 086dd2738b314e1502fbdb4c1e828544c6f3cf02 Mon Sep 17 00:00:00 2001 From: Lance Bragstad Date: Wed, 29 Nov 2017 22:58:37 +0000 Subject: [PATCH] Implement controller logic for system group assignments This commit wires up the remaining bits to expose system role assignments for groups via the assignment API. bp system-scope Change-Id: I5051aa97dbecb88ee706749b26a4140f9798e084 --- keystone/assignment/controllers.py | 53 ++++ keystone/assignment/core.py | 15 +- keystone/assignment/routers.py | 19 ++ .../tests/unit/assignment/test_backends.py | 7 +- keystone/tests/unit/test_v3.py | 2 +- keystone/tests/unit/test_v3_assignment.py | 296 +++++++++++++++++- keystone/tests/unit/test_versions.py | 13 + 7 files changed, 393 insertions(+), 12 deletions(-) diff --git a/keystone/assignment/controllers.py b/keystone/assignment/controllers.py index 07f96419b1..3cb2770100 100644 --- a/keystone/assignment/controllers.py +++ b/keystone/assignment/controllers.py @@ -439,6 +439,8 @@ class GrantAssignmentV3(controller.V3Controller): else: ref['group'] = PROVIDERS.identity_api.get_group(group_id) + # NOTE(lbragstad): This if/else check will need to be expanded in the + # future to handle system hierarchies if that is implemented. if domain_id: ref['domain'] = PROVIDERS.resource_api.get_domain(domain_id) elif project_id: @@ -549,6 +551,57 @@ class GrantAssignmentV3(controller.V3Controller): """ PROVIDERS.assignment_api.delete_system_grant_for_user(user_id, role_id) + @controller.protected(callback=_check_grant_protection) + def list_system_grants_for_group(self, request, group_id): + """List all system grants for a specific group. + + :param request: the request object + :param group_id: ID of the group + :returns: a list of grants the group has on the system + + """ + refs = PROVIDERS.assignment_api.list_system_grants_for_group(group_id) + return GrantAssignmentV3.wrap_collection(request.context_dict, refs) + + @controller.protected(callback=_check_grant_protection) + def check_system_grant_for_group(self, request, role_id, group_id): + """Check if a group has a specific role on the system. + + :param request: the request object + :param role_id: the ID of the role to check + :param group_id: the ID of the group to check + + """ + PROVIDERS.assignment_api.check_system_grant_for_group( + group_id, role_id + ) + + @controller.protected(callback=_check_grant_protection) + def create_system_grant_for_group(self, request, role_id, group_id): + """Grant a role to a group on the system. + + :param request: the request object + :param role_id: the ID of the role to grant to the group + :param group_id: the ID of the group + + """ + PROVIDERS.assignment_api.create_system_grant_for_group( + group_id, role_id + ) + + @controller.protected(callback=functools.partial(_check_grant_protection)) + def revoke_system_grant_for_group(self, request, role_id, group_id): + """Revoke a role from the group on the system. + + :param request: the request object + :param role_id: the ID of the role to remove + :param user_id: the ID of the user + + """ + PROVIDERS.assignment_api.delete_system_grant_for_group( + group_id, role_id + ) + class RoleAssignmentV3(controller.V3Controller): """The V3 Role Assignment APIs, really just list_role_assignment().""" diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index 8326081e34..e08bfcfeee 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -880,6 +880,14 @@ class Manager(manager.Manager): 'user_id': user_id, 'role_id': assignment['id']} ) + if group_id: + assignments = self.list_system_grants_for_group(group_id) + for assignment in assignments: + system_assignments.append( + {'system': {'all': True}, + 'group_id': group_id, + 'role_id': assignment['id']} + ) assignments = [] for assignment in itertools.chain( @@ -1184,9 +1192,14 @@ class Manager(manager.Manager): """ target_id = self._SYSTEM_SCOPE_TOKEN assignment_type = self._GROUP_SYSTEM - return self.driver.list_system_grants( + grants = self.driver.list_system_grants( group_id, target_id, assignment_type ) + grant_ids = [] + for grant in grants: + grant_ids.append(grant['role_id']) + + return PROVIDERS.role_api.list_roles_from_ids(grant_ids) def create_system_grant_for_group(self, group_id, role_id): """Grant a group a role on the system. diff --git a/keystone/assignment/routers.py b/keystone/assignment/routers.py index 9e7f39be40..a97d81f0f8 100644 --- a/keystone/assignment/routers.py +++ b/keystone/assignment/routers.py @@ -195,6 +195,25 @@ class Routers(wsgi.RoutersBase): 'role_id': json_home.Parameters.ROLE_ID, 'user_id': json_home.Parameters.USER_ID }) + self._add_resource( + mapper, grant_controller, + path='/system/groups/{group_id}/roles', + get_head_action='list_system_grants_for_group', + rel=json_home.build_v3_resource_relation('system_group_roles'), + path_vars={ + 'group_id': json_home.Parameters.GROUP_ID + }) + self._add_resource( + mapper, grant_controller, + path='/system/groups/{group_id}/roles/{role_id}', + get_head_action='check_system_grant_for_group', + put_action='create_system_grant_for_group', + delete_action='revoke_system_grant_for_group', + rel=json_home.build_v3_resource_relation('system_group_role'), + path_vars={ + 'role_id': json_home.Parameters.ROLE_ID, + 'group_id': json_home.Parameters.GROUP_ID + }) self._add_resource( mapper, controllers.RoleAssignmentV3(), diff --git a/keystone/tests/unit/assignment/test_backends.py b/keystone/tests/unit/assignment/test_backends.py index 754fd34f13..6fb045918f 100644 --- a/keystone/tests/unit/assignment/test_backends.py +++ b/keystone/tests/unit/assignment/test_backends.py @@ -3851,10 +3851,9 @@ class SystemAssignmentTests(AssignmentTestHelperMixin): group_id ) self.assertEqual(len(system_roles), 1) - self.assertEqual(system_roles[0]['type'], 'GroupSystem') - self.assertEqual(system_roles[0]['target_id'], 'system') - self.assertEqual(system_roles[0]['actor_id'], group_id) - self.assertFalse(system_roles[0]['inherited']) + self.assertIsNone(system_roles[0]['domain_id']) + self.assertEqual(system_roles[0]['id'], role_ref['id']) + self.assertEqual(system_roles[0]['name'], role_ref['name']) def test_list_system_grants_for_group(self): group_ref = unit.new_group_ref(CONF.identity.default_domain_id) diff --git a/keystone/tests/unit/test_v3.py b/keystone/tests/unit/test_v3.py index 0f858885b8..599a6afb13 100644 --- a/keystone/tests/unit/test_v3.py +++ b/keystone/tests/unit/test_v3.py @@ -1017,7 +1017,7 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase, resource_url=None): entities = resp.result.get('role_assignments') - if expected_length: + if expected_length or expected_length == 0: self.assertEqual(expected_length, len(entities)) # Collections should have relational links diff --git a/keystone/tests/unit/test_v3_assignment.py b/keystone/tests/unit/test_v3_assignment.py index 42dc6cac9b..602743fbd4 100644 --- a/keystone/tests/unit/test_v3_assignment.py +++ b/keystone/tests/unit/test_v3_assignment.py @@ -3100,7 +3100,7 @@ class ListUserProjectsTestCase(test_v3.RestfulTestCase): self.assertEqual(self.projects[i]['id'], projects_result[0]['id']) -class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase): +class SystemRoleAssignmentMixin(object): def _create_new_role(self): """Create a role available for use anywhere and return the ID.""" @@ -3110,11 +3110,15 @@ class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase): # return the ID of the role we just created. return response.json_body['role']['id'] - # FIXME(lbragstad): These tests contain system-level API calls, which means - # they will log a warning message if they are called with a project-scoped - # token, regardless of the role assignment on the project. We need to fix - # them by using a proper system-scoped admin token to make the call instead - # of a project scoped token. + +# FIXME(lbragstad): These tests contain system-level API calls, which means +# they will log a warning message if they are called with a project-scoped +# token, regardless of the role assignment on the project. We need to fix +# them by using a proper system-scoped admin token to make the call instead +# of a project scoped token. +class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase, + SystemRoleAssignmentMixin): + def test_assign_system_role_to_user(self): system_role_id = self._create_new_role() @@ -3392,3 +3396,283 @@ class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase): '&scope.project.id=%(project_id)s' ) % {'project_id': self.project_id} self.get(path, expected_status=http_client.BAD_REQUEST) + + +# FIXME(lbragstad): These tests contain system-level API calls, which means +# they will log a warning message if they are called with a project-scoped +# token, regardless of the role assignment on the project. We need to fix +# them by using a proper system-scoped admin token to make the call instead +# of a project scoped token. +class GroupSystemRoleAssignmentTestCase(test_v3.RestfulTestCase, + SystemRoleAssignmentMixin): + + def _create_group(self): + body = { + 'group': { + 'domain_id': self.domain_id, + 'name': uuid.uuid4().hex + } + } + response = self.post('/groups/', body=body) + return response.json_body['group'] + + def test_assign_system_role_to_group(self): + system_role_id = self._create_new_role() + group = self._create_group() + + # assign the role to the group globally + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], + 'role_id': system_role_id + } + self.put(member_url) + + # validate the role assignment + self.head(member_url) + + # list global roles + collection_url = '/system/groups/%(group_id)s/roles' % { + 'group_id': group['id'] + } + roles = self.get(collection_url).json_body['roles'] + self.assertEqual(len(roles), 1) + self.assertEqual(roles[0]['id'], system_role_id) + self.head(collection_url, expected_status=http_client.OK) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=1) + self.assertEqual( + response.json_body['role_assignments'][0]['role']['id'], + system_role_id + ) + + def test_assign_system_role_to_non_existant_group_fails(self): + system_role_id = self._create_new_role() + group_id = uuid.uuid4().hex + + # assign the role to the group globally + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group_id, + 'role_id': system_role_id + } + self.put(member_url, expected_status=http_client.NOT_FOUND) + + def test_list_role_assignments_for_group_returns_all_assignments(self): + system_role_id = self._create_new_role() + group = self._create_group() + + # assign the role to the group globally and on a single project + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], + 'role_id': system_role_id + } + self.put(member_url) + member_url = ( + '/projects/%(project_id)s/groups/%(group_id)s/' + 'roles/%(role_id)s' + ) % { + 'project_id': self.project_id, + 'group_id': group['id'], + 'role_id': system_role_id + } + self.put(member_url) + + # make sure both assignments exist in the response, there should be two + response = self.get( + '/role_assignments?group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=2) + + def test_list_system_roles_for_group_returns_none_without_assignment(self): + group = self._create_group() + + # list global roles for group + collection_url = '/system/groups/%(group_id)s/roles' % { + 'group_id': group['id'] + } + response = self.get(collection_url) + + # assert that the group doesn't have any system role assignments, which + # is denoted by an empty list + self.assertEqual(response.json_body['roles'], []) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=0) + + def test_list_system_roles_for_group_does_not_return_project_roles(self): + system_role_id = self._create_new_role() + project_role_id = self._create_new_role() + group = self._create_group() + + # assign the group a role on the system and a role on a project + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], 'role_id': system_role_id + } + self.put(member_url) + member_url = ( + '/projects/%(project_id)s/groups/%(group_id)s/' + 'roles/%(role_id)s' + ) % { + 'project_id': self.project_id, + 'group_id': group['id'], + 'role_id': project_role_id + } + self.put(member_url) + + # list system role assignments + collection_url = '/system/groups/%(group_id)s/roles' % { + 'group_id': group['id'] + } + response = self.get(collection_url) + + # assert the project role assignment is not in the system role + # assignments + for role in response.json_body['roles']: + self.assertNotEqual(role['id'], project_role_id) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=1) + + def test_list_system_roles_for_group_does_not_return_domain_roles(self): + system_role_id = self._create_new_role() + domain_role_id = self._create_new_role() + group = self._create_group() + + # assign a role to the group on a domain + domain_member_url = ( + '/domains/%(domain_id)s/groups/%(group_id)s/' + 'roles/%(role_id)s' % { + 'domain_id': group['domain_id'], + 'group_id': group['id'], + 'role_id': domain_role_id + } + ) + self.put(domain_member_url) + + # assign the group a role on the system + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], + 'role_id': system_role_id + } + self.put(member_url) + + # list domain role assignments + response = self.get( + '/domains/%(domain_id)s/groups/%(group_id)s/roles' % { + 'domain_id': group['domain_id'], 'group_id': group['id'] + } + ) + self.assertEqual(len(response.json_body['roles']), 1) + + # list system role assignments + collection_url = '/system/groups/%(group_id)s/roles' % { + 'group_id': group['id'] + } + response = self.get(collection_url) + + # assert the domain role assignment is not in the system role + # assignments + for role in response.json_body['roles']: + self.assertNotEqual(role['id'], domain_role_id) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=1) + + def test_check_group_has_system_role_when_assignment_exists(self): + system_role_id = self._create_new_role() + group = self._create_group() + + # assign the group a role on the system + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], + 'role_id': system_role_id + } + self.put(member_url) + + # check the group has the system role assignment + self.head(member_url) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=1) + self.assertEqual( + response.json_body['role_assignments'][0]['role']['id'], + system_role_id + ) + + def test_check_group_does_not_have_system_role_without_assignment(self): + system_role_id = self._create_new_role() + group = self._create_group() + + # check the group does't have the system role assignment + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], + 'role_id': system_role_id + } + self.head(member_url, expected_status=http_client.NOT_FOUND) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=0) + + def test_unassign_system_role_from_group(self): + system_role_id = self._create_new_role() + group = self._create_group() + + # assign the group a role on the system + member_url = '/system/groups/%(group_id)s/roles/%(role_id)s' % { + 'group_id': group['id'], + 'role_id': system_role_id + } + self.put(member_url) + + # ensure the group has the role assignment + self.head(member_url) + + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertEqual(len(response.json_body['role_assignments']), 1) + self.assertValidRoleAssignmentListResponse(response) + + # remove the system role assignment from the group + self.delete(member_url) + + # ensure the group doesn't have any system role assignments + collection_url = '/system/groups/%(group_id)s/roles' % { + 'group_id': group['id'] + } + response = self.get(collection_url) + self.assertEqual(len(response.json_body['roles']), 0) + response = self.get( + '/role_assignments?scope.system=all&group.id=%(group_id)s' % { + 'group_id': group['id'] + } + ) + self.assertValidRoleAssignmentListResponse(response, expected_length=0) diff --git a/keystone/tests/unit/test_versions.py b/keystone/tests/unit/test_versions.py index 0baa694fd1..7efbf73a7a 100644 --- a/keystone/tests/unit/test_versions.py +++ b/keystone/tests/unit/test_versions.py @@ -204,6 +204,19 @@ V3_JSON_HOME_RESOURCES = { 'user_id': json_home.Parameters.USER_ID } }, + json_home.build_v3_resource_relation('system_group_role'): { + 'href-template': '/system/groups/{group_id}/roles/{role_id}', + 'href-vars': { + 'group_id': json_home.Parameters.GROUP_ID, + 'role_id': json_home.Parameters.ROLE_ID + } + }, + json_home.build_v3_resource_relation('system_group_roles'): { + 'href-template': '/system/groups/{group_id}/roles', + 'href-vars': { + 'group_id': json_home.Parameters.GROUP_ID + } + }, json_home.build_v3_resource_relation('domain'): { 'href-template': '/domains/{domain_id}', 'href-vars': {'domain_id': json_home.Parameters.DOMAIN_ID, }},