diff --git a/keystone/api/role_assignments.py b/keystone/api/role_assignments.py index 76cf55fc9a..d1cfd90c48 100644 --- a/keystone/api/role_assignments.py +++ b/keystone/api/role_assignments.py @@ -49,9 +49,31 @@ class RoleAssignmentsResource(ks_flask.ResourceBase): 'group.id', 'role.id', 'scope.domain.id', 'scope.project.id', 'scope.OS-INHERIT:inherited_to', 'user.id', 'scope.system' ] + target = None + if self.oslo_context.domain_id: + target = {'domain_id': self.oslo_context.domain_id} ENFORCER.enforce_call(action='identity:list_role_assignments', - filters=filters) - return self._build_role_assignments_list() + filters=filters, + target_attr=target) + + assignments = self._build_role_assignments_list() + + if self.oslo_context.domain_id: + domain_assignments = [] + for assignment in assignments['role_assignments']: + domain_id = assignment['scope'].get('domain', {}).get('id') + project_id = assignment['scope'].get('project', {}).get('id') + if domain_id == self.oslo_context.domain_id: + domain_assignments.append(assignment) + continue + elif project_id: + project = PROVIDERS.resource_api.get_project(project_id) + if project.get('domain_id') == self.oslo_context.domain_id: + domain_assignments.append(assignment) + + assignments['role_assignments'] = domain_assignments + + return assignments def _list_role_assignments_for_tree(self): filters = [ diff --git a/keystone/common/policies/role_assignment.py b/keystone/common/policies/role_assignment.py index 86ad9046b6..2dda64d00a 100644 --- a/keystone/common/policies/role_assignment.py +++ b/keystone/common/policies/role_assignment.py @@ -15,6 +15,11 @@ from oslo_policy import policy from keystone.common.policies import base +SYSTEM_READER_OR_DOMAIN_READER = ( + '(' + base.SYSTEM_READER + ') or ' + '(role:reader and domain_id:%(target.domain_id)s)' +) + deprecated_list_role_assignments = policy.DeprecatedRule( name=base.IDENTITY % 'list_role_assignments', check_str=base.RULE_ADMIN_REQUIRED @@ -31,7 +36,7 @@ account for these changes automatically. role_assignment_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_role_assignments', - check_str=base.SYSTEM_READER, + check_str=SYSTEM_READER_OR_DOMAIN_READER, # FIXME(lbragstad): This API will behave differently depending on the # token scope used to call the API. A system administrator should be # able to list all role assignment across the entire deployment. A @@ -40,7 +45,7 @@ role_assignment_policies = [ # make keystone smart enough to handle those cases in code, we can add # 'project' to the scope_types below. For now, this should be a system # administrator only operation to maintain backwards compatibility. - scope_types=['system'], + scope_types=['system', 'domain'], description='List role assignments.', operations=[{'path': '/v3/role_assignments', 'method': 'GET'}, diff --git a/keystone/tests/unit/protection/v3/test_assignment.py b/keystone/tests/unit/protection/v3/test_assignment.py index 1827447c4b..3e85f1d885 100644 --- a/keystone/tests/unit/protection/v3/test_assignment.py +++ b/keystone/tests/unit/protection/v3/test_assignment.py @@ -649,6 +649,275 @@ class _SystemUserTests(object): self.assertIn(assignment, expected) +class _DomainUserTests(object): + """Common functionality for domain users.""" + + def _setup_test_role_assignments_for_domain(self): + # Populate role assignment within `self.domain_id` so that we can + # assert users can view assignments within the domain they have + # authorization on + role_id = self.bootstrapper.reader_role_id + + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + ) + + group = PROVIDERS.identity_api.create_group( + unit.new_group_ref(domain_id=CONF.identity.default_domain_id) + ) + + project = PROVIDERS.resource_api.create_project( + uuid.uuid4().hex, + unit.new_project_ref(domain_id=self.domain_id) + ) + + # create a user+project role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, user_id=user['id'], project_id=project['id'] + ) + + # create a user+domain role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, user_id=user['id'], domain_id=self.domain_id + ) + + # create a group+project role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, group_id=group['id'], project_id=project['id'] + ) + + # create a group+domain role assignment. + PROVIDERS.assignment_api.create_grant( + role_id, group_id=group['id'], domain_id=self.domain_id + ) + + return { + 'user_id': user['id'], + 'group_id': group['id'], + 'project_id': project['id'], + 'role_id': role_id, + } + + def test_user_can_list_all_assignments_in_their_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + self.expected.append({ + 'user_id': domain_assignments['user_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'user_id': domain_assignments['user_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'group_id': domain_assignments['group_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'group_id': domain_assignments['group_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + }) + + with self.test_client() as c: + r = c.get('/v3/role_assignments', headers=self.headers) + self.assertEqual( + len(self.expected), len(r.json['role_assignments']) + ) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, self.expected) + + def test_user_can_filter_role_assignments_by_project_in_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + expected = [ + { + 'user_id': domain_assignments['user_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + }, + { + 'group_id': domain_assignments['group_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + } + ] + + project_id = domain_assignments['project_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.project.id=%s' % project_id, + headers=self.headers + ) + self.assertEqual(len(expected), len(r.json['role_assignments'])) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, expected) + + def test_user_can_filter_role_assignments_by_domain(self): + # This shouldn't really provide any more value than just calling GET + # /v3/role_assignments with a domain-scoped token, but we test it + # anyway. + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + self.expected.append({ + 'user_id': domain_assignments['user_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + self.expected.append({ + 'group_id': domain_assignments['group_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }) + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.domain.id=%s' % self.domain_id, + headers=self.headers + ) + self.assertEqual( + len(self.expected), len(r.json['role_assignments']) + ) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, self.expected) + + def test_user_can_filter_role_assignments_by_user_of_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + expected = [ + { + 'user_id': domain_assignments['user_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }, + { + 'user_id': domain_assignments['user_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + } + ] + + user_id = domain_assignments['user_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?user.id=%s' % user_id, + headers=self.headers + ) + self.assertEqual(len(expected), len(r.json['role_assignments'])) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, expected) + + def test_user_can_filter_role_assignments_by_group_of_domain(self): + self._setup_test_role_assignments() + domain_assignments = self._setup_test_role_assignments_for_domain() + + expected = [ + { + 'group_id': domain_assignments['group_id'], + 'domain_id': self.domain_id, + 'role_id': domain_assignments['role_id'] + }, + { + 'group_id': domain_assignments['group_id'], + 'project_id': domain_assignments['project_id'], + 'role_id': domain_assignments['role_id'] + } + ] + + group_id = domain_assignments['group_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?group.id=%s' % group_id, + headers=self.headers + ) + self.assertEqual(len(expected), len(r.json['role_assignments'])) + actual = self._extract_role_assignments_from_response_body(r) + for assignment in actual: + self.assertIn(assignment, expected) + + def test_user_cannot_filter_role_assignments_by_system(self): + self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.system=all', + headers=self.headers + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + def test_user_cannot_filter_role_assignments_by_other_domain(self): + assignments = self._setup_test_role_assignments() + domain = assignments['domain_id'] + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.domain.id=%s' % domain, + headers=self.headers + ) + self.assertEqual([], r.json['role_assignments']) + + def test_user_cannot_filter_role_assignments_by_other_domain_project(self): + assignments = self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + # This project is in an entirely separate domain that this user doesn't + # have authorization to access, so they should only see an empty list + project_id = assignments['project_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?scope.project.id=%s' % project_id, + headers=self.headers + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + def test_user_cannot_filter_role_assignments_by_other_domain_user(self): + assignments = self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + # This user doesn't have any role assignments on self.domain_id, so the + # domain user of self.domain_id should only see an empty list of role + # assignments. + user_id = assignments['user_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?user.id=%s' % user_id, + headers=self.headers + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + def test_user_cannot_filter_role_assignments_by_other_domain_group(self): + assignments = self._setup_test_role_assignments() + self._setup_test_role_assignments_for_domain() + + # This group doesn't have any role assignments on self.domain_id, so + # the domain user of self.domain_id should only see an empty list of + # role assignments. + group_id = assignments['group_id'] + + with self.test_client() as c: + r = c.get( + '/v3/role_assignments?group.id=%s' % group_id, + headers=self.headers, + ) + self.assertEqual(0, len(r.json['role_assignments'])) + + class SystemReaderTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, _AssignmentTestUtilities, @@ -758,3 +1027,45 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap, r = c.post('/v3/auth/tokens', json=auth) self.token_id = r.headers['X-Subject-Token'] self.headers = {'X-Auth-Token': self.token_id} + + +class DomainReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _AssignmentTestUtilities, + _DomainUserTests): + + def setUp(self): + super(DomainReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + self.domain_id = domain['id'] + domain_reader = unit.new_user_ref(domain_id=self.domain_id) + self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + domain_id=self.domain_id + ) + self.expected = [ + # assignment of the user running the test case + { + 'user_id': self.user_id, + 'domain_id': self.domain_id, + 'role_id': self.bootstrapper.reader_role_id + }] + + auth = self.build_authentication_request( + user_id=self.user_id, password=domain_reader['password'], + domain_id=self.domain_id, + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id}