From dd785ee692118a56ea0e3aaaf7f5bd6c73ea9c91 Mon Sep 17 00:00:00 2001 From: Markus Hentsch Date: Fri, 3 Nov 2023 10:43:34 +0100 Subject: [PATCH] Add domain scoping to list_domains Introduces domain-scoped filtering of the response list of the list_domains endpoint when the user is authenticated in domain scope instead of returning all domains. This aligns the implementation with other endpoints like list_projects or list_groups and allows for a domain-scoped reader role. Changes the default policy rule for identity:list_domains to incorporate this new behavior for the reader role. Closes-Bug: 2041611 Change-Id: I8ee50efc3b4850060cce840fc904bae17f1503a9 --- keystone/api/domains.py | 15 +++- keystone/common/policies/domain.py | 8 +- keystone/tests/protection/v3/test_domains.py | 81 ++++++++++++++++++-- 3 files changed, 93 insertions(+), 11 deletions(-) diff --git a/keystone/api/domains.py b/keystone/api/domains.py index 9852a0b75a..0f8477bca7 100644 --- a/keystone/api/domains.py +++ b/keystone/api/domains.py @@ -99,11 +99,22 @@ class DomainResource(ks_flask.ResourceBase): def _list_domains(self): filters = ['name', 'enabled'] + target = None + if self.oslo_context.domain_id: + target = {'domain': {'id': self.oslo_context.domain_id}} ENFORCER.enforce_call(action='identity:list_domains', - filters=filters) + filters=filters, + target_attr=target) hints = self.build_driver_hints(filters) refs = PROVIDERS.resource_api.list_domains(hints=hints) - return self.wrap_collection(refs, hints=hints) + if self.oslo_context.domain_id: + domain_id = self.oslo_context.domain_id + filtered_refs = [ + ref for ref in refs if ref['id'] == domain_id + ] + else: + filtered_refs = refs + return self.wrap_collection(filtered_refs, hints=hints) def post(self): """Create domain. diff --git a/keystone/common/policies/domain.py b/keystone/common/policies/domain.py index 643d0220ea..f2be68d11d 100644 --- a/keystone/common/policies/domain.py +++ b/keystone/common/policies/domain.py @@ -55,6 +55,10 @@ ADMIN_OR_SYSTEM_USER_OR_DOMAIN_USER_OR_PROJECT_USER = ( 'token.domain.id:%(target.domain.id)s or ' 'token.project.domain.id:%(target.domain.id)s' ) +ADMIN_OR_SYSTEM_READER_OR_DOMAIN_READER = ( + base.RULE_ADMIN_OR_SYSTEM_READER + ' or ' + '(role:reader and domain_id:%(target.domain.id)s)' +) domain_policies = [ @@ -70,8 +74,8 @@ domain_policies = [ deprecated_rule=deprecated_get_domain), policy.DocumentedRuleDefault( name=base.IDENTITY % 'list_domains', - check_str=base.RULE_ADMIN_OR_SYSTEM_READER, - scope_types=['system', 'project'], + check_str=ADMIN_OR_SYSTEM_READER_OR_DOMAIN_READER, + scope_types=['system', 'domain', 'project'], description='List domains.', operations=[{'path': '/v3/domains', 'method': 'GET'}], diff --git a/keystone/tests/protection/v3/test_domains.py b/keystone/tests/protection/v3/test_domains.py index 5cc3bd7d44..35a4bdd36a 100644 --- a/keystone/tests/protection/v3/test_domains.py +++ b/keystone/tests/protection/v3/test_domains.py @@ -128,7 +128,74 @@ class _SystemMemberAndReaderDomainTests(object): ) -class _DomainAndProjectUserDomainTests(object): +class _DomainReaderDomainTests(object): + + def test_user_can_list_domains(self): + # second domain, should be invisible to scoped reader + second_domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + # user should only see their own domain + with self.test_client() as c: + r = c.get('/v3/domains', headers=self.headers) + self.assertEqual(1, len(r.json['domains'])) + self.assertNotIn( + second_domain['id'], [d['id'] for d in r.json['domains']] + ) + self.assertEqual(self.domain_id, r.json['domains'][0]['id']) + + def test_user_can_filter_domains_by_name(self): + # second domain, should be invisible to domain-scoped reader + second_domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + + with self.test_client() as c: + # filtering for own domain should succeed + r = c.get( + '/v3/domains?name=%s' % self.domain['name'], + headers=self.headers + ) + self.assertEqual(1, len(r.json['domains'])) + self.assertNotIn( + second_domain['id'], [d['id'] for d in r.json['domains']] + ) + self.assertEqual(self.domain['id'], r.json['domains'][0]['id']) + + # filtering for the second domain should yield no results + r = c.get( + '/v3/domains?name=%s' % second_domain['name'], + headers=self.headers + ) + self.assertEqual(0, len(r.json['domains'])) + + def test_user_can_filter_domains_by_enabled(self): + # additional domains, neither should be visible to domain-scoped reader + enabled_domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref() + ) + disabled_domain = PROVIDERS.resource_api.create_domain( + uuid.uuid4().hex, unit.new_domain_ref(enabled=False) + ) + + # user should only see their own domain when filtering for enabled + with self.test_client() as c: + r = c.get('/v3/domains?enabled=true', headers=self.headers) + enabled_domain_ids = [] + for domain in r.json['domains']: + enabled_domain_ids.append(domain['id']) + self.assertEqual(1, len(r.json['domains'])) + self.assertEqual(self.domain_id, r.json['domains'][0]['id']) + self.assertNotIn(enabled_domain['id'], enabled_domain_ids) + self.assertNotIn(disabled_domain['id'], enabled_domain_ids) + + # filtering for disabled should yield no results + r = c.get('/v3/domains?enabled=false', headers=self.headers) + self.assertEqual(0, len(r.json['domains'])) + + +class _ProjectUserDomainTests(object): def test_user_can_get_a_domain(self): with self.test_client() as c: @@ -355,7 +422,7 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap, class DomainUserTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, - _DomainAndProjectUserDomainTests): + _DomainReaderDomainTests): def setUp(self): super(DomainUserTests, self).setUp() @@ -363,10 +430,10 @@ class DomainUserTests(base_classes.TestCaseWithBootstrap, self.useFixture(ksfixtures.Policy(self.config_fixture)) self.config_fixture.config(group='oslo_policy', enforce_scope=True) - domain = PROVIDERS.resource_api.create_domain( + self.domain = PROVIDERS.resource_api.create_domain( uuid.uuid4().hex, unit.new_domain_ref() ) - self.domain_id = domain['id'] + self.domain_id = self.domain['id'] domain_user = unit.new_user_ref(domain_id=self.domain_id) self.domain_user_id = PROVIDERS.identity_api.create_user( domain_user @@ -391,7 +458,7 @@ class DomainUserTests(base_classes.TestCaseWithBootstrap, class ProjectReaderTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, - _DomainAndProjectUserDomainTests): + _ProjectUserDomainTests): def setUp(self): super(ProjectReaderTests, self).setUp() @@ -434,7 +501,7 @@ class ProjectReaderTests(base_classes.TestCaseWithBootstrap, class ProjectMemberTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, - _DomainAndProjectUserDomainTests): + _ProjectUserDomainTests): def setUp(self): super(ProjectMemberTests, self).setUp() @@ -477,7 +544,7 @@ class ProjectMemberTests(base_classes.TestCaseWithBootstrap, class ProjectAdminTests(base_classes.TestCaseWithBootstrap, common_auth.AuthTestMixin, - _DomainAndProjectUserDomainTests): + _ProjectUserDomainTests): def setUp(self): super(ProjectAdminTests, self).setUp()