Add domain scoping to list_domains

Introduces domain-scoped filtering of the response list of the
list_domains endpoint when the user is authenticated in domain scope
instead of returning all domains. This aligns the implementation with
other endpoints like list_projects or list_groups and allows for a
domain-scoped reader role.
Changes the default policy rule for identity:list_domains to
incorporate this new behavior for the reader role.

Closes-Bug: 2041611
Change-Id: I8ee50efc3b4850060cce840fc904bae17f1503a9
This commit is contained in:
Markus Hentsch 2023-11-03 10:43:34 +01:00 committed by Josephine Seifert
parent db0ff10476
commit dd785ee692
3 changed files with 93 additions and 11 deletions

View File

@ -99,11 +99,22 @@ class DomainResource(ks_flask.ResourceBase):
def _list_domains(self):
filters = ['name', 'enabled']
target = None
if self.oslo_context.domain_id:
target = {'domain': {'id': self.oslo_context.domain_id}}
ENFORCER.enforce_call(action='identity:list_domains',
filters=filters)
filters=filters,
target_attr=target)
hints = self.build_driver_hints(filters)
refs = PROVIDERS.resource_api.list_domains(hints=hints)
return self.wrap_collection(refs, hints=hints)
if self.oslo_context.domain_id:
domain_id = self.oslo_context.domain_id
filtered_refs = [
ref for ref in refs if ref['id'] == domain_id
]
else:
filtered_refs = refs
return self.wrap_collection(filtered_refs, hints=hints)
def post(self):
"""Create domain.

View File

@ -55,6 +55,10 @@ ADMIN_OR_SYSTEM_USER_OR_DOMAIN_USER_OR_PROJECT_USER = (
'token.domain.id:%(target.domain.id)s or '
'token.project.domain.id:%(target.domain.id)s'
)
ADMIN_OR_SYSTEM_READER_OR_DOMAIN_READER = (
base.RULE_ADMIN_OR_SYSTEM_READER + ' or '
'(role:reader and domain_id:%(target.domain.id)s)'
)
domain_policies = [
@ -70,8 +74,8 @@ domain_policies = [
deprecated_rule=deprecated_get_domain),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_domains',
check_str=base.RULE_ADMIN_OR_SYSTEM_READER,
scope_types=['system', 'project'],
check_str=ADMIN_OR_SYSTEM_READER_OR_DOMAIN_READER,
scope_types=['system', 'domain', 'project'],
description='List domains.',
operations=[{'path': '/v3/domains',
'method': 'GET'}],

View File

@ -128,7 +128,74 @@ class _SystemMemberAndReaderDomainTests(object):
)
class _DomainAndProjectUserDomainTests(object):
class _DomainReaderDomainTests(object):
def test_user_can_list_domains(self):
# second domain, should be invisible to scoped reader
second_domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
# user should only see their own domain
with self.test_client() as c:
r = c.get('/v3/domains', headers=self.headers)
self.assertEqual(1, len(r.json['domains']))
self.assertNotIn(
second_domain['id'], [d['id'] for d in r.json['domains']]
)
self.assertEqual(self.domain_id, r.json['domains'][0]['id'])
def test_user_can_filter_domains_by_name(self):
# second domain, should be invisible to domain-scoped reader
second_domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
with self.test_client() as c:
# filtering for own domain should succeed
r = c.get(
'/v3/domains?name=%s' % self.domain['name'],
headers=self.headers
)
self.assertEqual(1, len(r.json['domains']))
self.assertNotIn(
second_domain['id'], [d['id'] for d in r.json['domains']]
)
self.assertEqual(self.domain['id'], r.json['domains'][0]['id'])
# filtering for the second domain should yield no results
r = c.get(
'/v3/domains?name=%s' % second_domain['name'],
headers=self.headers
)
self.assertEqual(0, len(r.json['domains']))
def test_user_can_filter_domains_by_enabled(self):
# additional domains, neither should be visible to domain-scoped reader
enabled_domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
disabled_domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref(enabled=False)
)
# user should only see their own domain when filtering for enabled
with self.test_client() as c:
r = c.get('/v3/domains?enabled=true', headers=self.headers)
enabled_domain_ids = []
for domain in r.json['domains']:
enabled_domain_ids.append(domain['id'])
self.assertEqual(1, len(r.json['domains']))
self.assertEqual(self.domain_id, r.json['domains'][0]['id'])
self.assertNotIn(enabled_domain['id'], enabled_domain_ids)
self.assertNotIn(disabled_domain['id'], enabled_domain_ids)
# filtering for disabled should yield no results
r = c.get('/v3/domains?enabled=false', headers=self.headers)
self.assertEqual(0, len(r.json['domains']))
class _ProjectUserDomainTests(object):
def test_user_can_get_a_domain(self):
with self.test_client() as c:
@ -355,7 +422,7 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
class DomainUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainAndProjectUserDomainTests):
_DomainReaderDomainTests):
def setUp(self):
super(DomainUserTests, self).setUp()
@ -363,10 +430,10 @@ class DomainUserTests(base_classes.TestCaseWithBootstrap,
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
self.domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
self.domain_id = self.domain['id']
domain_user = unit.new_user_ref(domain_id=self.domain_id)
self.domain_user_id = PROVIDERS.identity_api.create_user(
domain_user
@ -391,7 +458,7 @@ class DomainUserTests(base_classes.TestCaseWithBootstrap,
class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainAndProjectUserDomainTests):
_ProjectUserDomainTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
@ -434,7 +501,7 @@ class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainAndProjectUserDomainTests):
_ProjectUserDomainTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
@ -477,7 +544,7 @@ class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainAndProjectUserDomainTests):
_ProjectUserDomainTests):
def setUp(self):
super(ProjectAdminTests, self).setUp()