diff --git a/api-ref/source/v3/roles.inc b/api-ref/source/v3/roles.inc index efb6ba7995..076017717a 100644 --- a/api-ref/source/v3/roles.inc +++ b/api-ref/source/v3/roles.inc @@ -41,6 +41,10 @@ of assignments returned in the collection. For example: GET /role_assignments?scope.system=all?user.id={user_id} +- List system role assignments for all users and groups:: + + GET /role_assignments?scope.system=all + Since Identity API v3.10, you can grant role assignments to users and groups on an entity called the ``system``. The role assignment API also supports listing and filtering role assignments on the system. diff --git a/keystone/assignment/backends/sql.py b/keystone/assignment/backends/sql.py index 2d93a1690d..47987c6742 100644 --- a/keystone/assignment/backends/sql.py +++ b/keystone/assignment/backends/sql.py @@ -308,9 +308,12 @@ class Assignment(base.AssignmentDriverBase): def list_system_grants(self, actor_id, target_id, assignment_type): with sql.session_for_read() as session: query = session.query(SystemRoleAssignment) - query = query.filter_by(actor_id=actor_id) - query = query.filter_by(target_id=target_id) - query = query.filter_by(type=assignment_type) + if actor_id: + query = query.filter_by(actor_id=actor_id) + if target_id: + query = query.filter_by(target_id=target_id) + if assignment_type: + query = query.filter_by(type=assignment_type) results = query.all() return [role.to_dict() for role in results] diff --git a/keystone/assignment/core.py b/keystone/assignment/core.py index e08bfcfeee..f4df860d90 100644 --- a/keystone/assignment/core.py +++ b/keystone/assignment/core.py @@ -880,7 +880,7 @@ class Manager(manager.Manager): 'user_id': user_id, 'role_id': assignment['id']} ) - if group_id: + elif group_id: assignments = self.list_system_grants_for_group(group_id) for assignment in assignments: system_assignments.append( @@ -888,6 +888,17 @@ class Manager(manager.Manager): 'group_id': group_id, 'role_id': assignment['id']} ) + else: + assignments = self.list_all_system_grants() + for assignment in assignments: + a = {} + if assignment['type'] == self._GROUP_SYSTEM: + a['group_id'] = assignment['actor_id'] + elif assignment['type'] == self._USER_SYSTEM: + a['user_id'] = assignment['actor_id'] + a['role_id'] = assignment['role_id'] + a['system'] = {'all': True} + system_assignments.append(a) assignments = [] for assignment in itertools.chain( @@ -1240,6 +1251,15 @@ class Manager(manager.Manager): role_id, group_id, target_id, inherited ) + def list_all_system_grants(self): + """Return a list of all system grants.""" + actor_id = None + target_id = self._SYSTEM_SCOPE_TOKEN + assignment_type = None + return self.driver.list_system_grants( + actor_id, target_id, assignment_type + ) + class RoleManager(manager.Manager): """Default pivot point for the Role backend.""" diff --git a/keystone/tests/unit/test_v3_assignment.py b/keystone/tests/unit/test_v3_assignment.py index 602743fbd4..9afe97fa45 100644 --- a/keystone/tests/unit/test_v3_assignment.py +++ b/keystone/tests/unit/test_v3_assignment.py @@ -28,8 +28,40 @@ from keystone.tests.unit import test_v3 CONF = keystone.conf.CONF +class SystemRoleAssignmentMixin(object): + + def _create_new_role(self): + """Create a role available for use anywhere and return the ID.""" + ref = unit.new_role_ref() + response = self.post('/roles', body={'role': ref}) + # We only really need the role ID, so omit the rest of the response and + # return the ID of the role we just created. + return response.json_body['role']['id'] + + def _create_group(self): + body = { + 'group': { + 'domain_id': self.domain_id, + 'name': uuid.uuid4().hex + } + } + response = self.post('/groups/', body=body) + return response.json_body['group'] + + def _create_user(self): + body = { + 'user': { + 'domain_id': self.domain_id, + 'name': uuid.uuid4().hex + } + } + response = self.post('/users/', body=body) + return response.json_body['user'] + + class AssignmentTestCase(test_v3.RestfulTestCase, - test_v3.AssignmentTestMixin): + test_v3.AssignmentTestMixin, + SystemRoleAssignmentMixin): """Test roles and role assignments.""" def setUp(self): @@ -953,6 +985,116 @@ class AssignmentTestCase(test_v3.RestfulTestCase, self.assertRoleAssignmentInListResponse(r, up_entity) self.assertRoleAssignmentInListResponse(r, up1_entity) + def test_list_system_role_assignments(self): + # create a bunch of roles + user_system_role_id = self._create_new_role() + user_domain_role_id = self._create_new_role() + user_project_role_id = self._create_new_role() + group_system_role_id = self._create_new_role() + group_domain_role_id = self._create_new_role() + group_project_role_id = self._create_new_role() + + # create a user and grant the user a role on the system, domain, and + # project + user = self._create_user() + url = '/system/users/%s/roles/%s' % (user['id'], user_system_role_id) + self.put(url) + url = '/domains/%s/users/%s/roles/%s' % ( + self.domain_id, user['id'], user_domain_role_id + ) + self.put(url) + url = '/projects/%s/users/%s/roles/%s' % ( + self.project_id, user['id'], user_project_role_id + ) + self.put(url) + + # create a group and grant the group a role on the system, domain, and + # project + group = self._create_group() + url = '/system/groups/%s/roles/%s' % ( + group['id'], group_system_role_id + ) + self.put(url) + url = '/domains/%s/groups/%s/roles/%s' % ( + self.domain_id, group['id'], group_domain_role_id + ) + self.put(url) + url = '/projects/%s/groups/%s/roles/%s' % ( + self.project_id, group['id'], group_project_role_id + ) + self.put(url) + + # /v3/role_assignments?scope.system=all should return two assignments + response = self.get('/role_assignments?scope.system=all') + self.assertValidRoleAssignmentListResponse(response, expected_length=2) + for assignment in response.json_body['role_assignments']: + self.assertTrue(assignment['scope']['system']['all']) + if assignment.get('user'): + self.assertEqual(user_system_role_id, assignment['role']['id']) + if assignment.get('group'): + self.assertEqual( + group_system_role_id, + assignment['role']['id'] + ) + + # /v3/role_assignments?scope_system=all&user.id=$USER_ID should return + # one role assignment + url = '/role_assignments?scope.system=all&user.id=%s' % user['id'] + response = self.get(url) + self.assertValidRoleAssignmentListResponse(response, expected_length=1) + self.assertEqual( + user_system_role_id, + response.json_body['role_assignments'][0]['role']['id'] + ) + + # /v3/role_assignments?scope_system=all&group.id=$GROUP_ID should + # return one role assignment + url = '/role_assignments?scope.system=all&group.id=%s' % group['id'] + response = self.get(url) + self.assertValidRoleAssignmentListResponse(response, expected_length=1) + self.assertEqual( + group_system_role_id, + response.json_body['role_assignments'][0]['role']['id'] + ) + + # /v3/role_assignments?user.id=$USER_ID should return 3 assignments + # and system should be in that list of assignments + url = '/role_assignments?user.id=%s' % user['id'] + response = self.get(url) + self.assertValidRoleAssignmentListResponse(response, expected_length=3) + for assignment in response.json_body['role_assignments']: + if 'system' in assignment['scope']: + self.assertEqual( + user_system_role_id, assignment['role']['id'] + ) + if 'domain' in assignment['scope']: + self.assertEqual( + user_domain_role_id, assignment['role']['id'] + ) + if 'project' in assignment['scope']: + self.assertEqual( + user_project_role_id, assignment['role']['id'] + ) + + # /v3/role_assignments?group.id=$GROUP_ID should return 3 assignments + # and system should be in that list of assignments + url = '/role_assignments?group.id=%s' % group['id'] + response = self.get(url) + self.assertValidRoleAssignmentListResponse(response, expected_length=3) + for assignment in response.json_body['role_assignments']: + if 'system' in assignment['scope']: + self.assertEqual( + group_system_role_id, assignment['role']['id'] + ) + if 'domain' in assignment['scope']: + self.assertEqual( + group_domain_role_id, assignment['role']['id'] + ) + if 'project' in assignment['scope']: + self.assertEqual( + group_project_role_id, assignment['role']['id'] + ) + class RoleAssignmentBaseTestCase(test_v3.RestfulTestCase, test_v3.AssignmentTestMixin): @@ -3100,17 +3242,6 @@ class ListUserProjectsTestCase(test_v3.RestfulTestCase): self.assertEqual(self.projects[i]['id'], projects_result[0]['id']) -class SystemRoleAssignmentMixin(object): - - def _create_new_role(self): - """Create a role available for use anywhere and return the ID.""" - ref = unit.new_role_ref() - response = self.post('/roles', body={'role': ref}) - # We only really need the role ID, so omit the rest of the response and - # return the ID of the role we just created. - return response.json_body['role']['id'] - - # FIXME(lbragstad): These tests contain system-level API calls, which means # they will log a warning message if they are called with a project-scoped # token, regardless of the role assignment on the project. We need to fix @@ -3406,16 +3537,6 @@ class UserSystemRoleAssignmentTestCase(test_v3.RestfulTestCase, class GroupSystemRoleAssignmentTestCase(test_v3.RestfulTestCase, SystemRoleAssignmentMixin): - def _create_group(self): - body = { - 'group': { - 'domain_id': self.domain_id, - 'name': uuid.uuid4().hex - } - } - response = self.post('/groups/', body=body) - return response.json_body['group'] - def test_assign_system_role_to_group(self): system_role_id = self._create_new_role() group = self._create_group()