Browse Source

Implement domain reader for role_assignments

This change adds tests cases for the default roles
keystone supports at install time. It also modifies
the policies for the role_assignments API to be more
self-service by properly checking for scopes if accessed
with a domain-scoped tokens. This gives domain users the
power to query role assignments within the domain they
have authorization on without exposing other assignment
information in the deployment, domains, or projects.

Subsequent patches will:

  - add functionality for domain members
  - add functionality for domain admins
  - add functionality for project readers
  - add functionality for project members
  - add functionality for project admins
  - remove the obsolete policies from policy.v3cloudsample.json

Co-Authored-By: Lance Bragstad <lbragstad@gmail.com>

Partial-Bug: 1750673
Change-Id: I0c6d202a315d4683e2589f0d9121e93c97fb13e4
tags/16.0.0.0rc1
Vishakha Agarwal 1 year ago
committed by Lance Bragstad
parent
commit
425d48ec0a
3 changed files with 342 additions and 4 deletions
  1. +24
    -2
      keystone/api/role_assignments.py
  2. +7
    -2
      keystone/common/policies/role_assignment.py
  3. +311
    -0
      keystone/tests/unit/protection/v3/test_assignment.py

+ 24
- 2
keystone/api/role_assignments.py View File

@@ -49,9 +49,31 @@ class RoleAssignmentsResource(ks_flask.ResourceBase):
'group.id', 'role.id', 'scope.domain.id', 'scope.project.id',
'scope.OS-INHERIT:inherited_to', 'user.id', 'scope.system'
]
target = None
if self.oslo_context.domain_id:
target = {'domain_id': self.oslo_context.domain_id}
ENFORCER.enforce_call(action='identity:list_role_assignments',
filters=filters)
return self._build_role_assignments_list()
filters=filters,
target_attr=target)

assignments = self._build_role_assignments_list()

if self.oslo_context.domain_id:
domain_assignments = []
for assignment in assignments['role_assignments']:
domain_id = assignment['scope'].get('domain', {}).get('id')
project_id = assignment['scope'].get('project', {}).get('id')
if domain_id == self.oslo_context.domain_id:
domain_assignments.append(assignment)
continue
elif project_id:
project = PROVIDERS.resource_api.get_project(project_id)
if project.get('domain_id') == self.oslo_context.domain_id:
domain_assignments.append(assignment)

assignments['role_assignments'] = domain_assignments

return assignments

def _list_role_assignments_for_tree(self):
filters = [


+ 7
- 2
keystone/common/policies/role_assignment.py View File

@@ -15,6 +15,11 @@ from oslo_policy import policy

from keystone.common.policies import base

SYSTEM_READER_OR_DOMAIN_READER = (
'(' + base.SYSTEM_READER + ') or '
'(role:reader and domain_id:%(target.domain_id)s)'
)

deprecated_list_role_assignments = policy.DeprecatedRule(
name=base.IDENTITY % 'list_role_assignments',
check_str=base.RULE_ADMIN_REQUIRED
@@ -31,7 +36,7 @@ account for these changes automatically.
role_assignment_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_role_assignments',
check_str=base.SYSTEM_READER,
check_str=SYSTEM_READER_OR_DOMAIN_READER,
# FIXME(lbragstad): This API will behave differently depending on the
# token scope used to call the API. A system administrator should be
# able to list all role assignment across the entire deployment. A
@@ -40,7 +45,7 @@ role_assignment_policies = [
# make keystone smart enough to handle those cases in code, we can add
# 'project' to the scope_types below. For now, this should be a system
# administrator only operation to maintain backwards compatibility.
scope_types=['system'],
scope_types=['system', 'domain'],
description='List role assignments.',
operations=[{'path': '/v3/role_assignments',
'method': 'GET'},


+ 311
- 0
keystone/tests/unit/protection/v3/test_assignment.py View File

@@ -649,6 +649,275 @@ class _SystemUserTests(object):
self.assertIn(assignment, expected)


class _DomainUserTests(object):
"""Common functionality for domain users."""

def _setup_test_role_assignments_for_domain(self):
# Populate role assignment within `self.domain_id` so that we can
# assert users can view assignments within the domain they have
# authorization on
role_id = self.bootstrapper.reader_role_id

user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)

group = PROVIDERS.identity_api.create_group(
unit.new_group_ref(domain_id=CONF.identity.default_domain_id)
)

project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=self.domain_id)
)

# create a user+project role assignment.
PROVIDERS.assignment_api.create_grant(
role_id, user_id=user['id'], project_id=project['id']
)

# create a user+domain role assignment.
PROVIDERS.assignment_api.create_grant(
role_id, user_id=user['id'], domain_id=self.domain_id
)

# create a group+project role assignment.
PROVIDERS.assignment_api.create_grant(
role_id, group_id=group['id'], project_id=project['id']
)

# create a group+domain role assignment.
PROVIDERS.assignment_api.create_grant(
role_id, group_id=group['id'], domain_id=self.domain_id
)

return {
'user_id': user['id'],
'group_id': group['id'],
'project_id': project['id'],
'role_id': role_id,
}

def test_user_can_list_all_assignments_in_their_domain(self):
self._setup_test_role_assignments()
domain_assignments = self._setup_test_role_assignments_for_domain()

self.expected.append({
'user_id': domain_assignments['user_id'],
'domain_id': self.domain_id,
'role_id': domain_assignments['role_id']
})
self.expected.append({
'user_id': domain_assignments['user_id'],
'project_id': domain_assignments['project_id'],
'role_id': domain_assignments['role_id']
})
self.expected.append({
'group_id': domain_assignments['group_id'],
'domain_id': self.domain_id,
'role_id': domain_assignments['role_id']
})
self.expected.append({
'group_id': domain_assignments['group_id'],
'project_id': domain_assignments['project_id'],
'role_id': domain_assignments['role_id']
})

with self.test_client() as c:
r = c.get('/v3/role_assignments', headers=self.headers)
self.assertEqual(
len(self.expected), len(r.json['role_assignments'])
)
actual = self._extract_role_assignments_from_response_body(r)
for assignment in actual:
self.assertIn(assignment, self.expected)

def test_user_can_filter_role_assignments_by_project_in_domain(self):
self._setup_test_role_assignments()
domain_assignments = self._setup_test_role_assignments_for_domain()

expected = [
{
'user_id': domain_assignments['user_id'],
'project_id': domain_assignments['project_id'],
'role_id': domain_assignments['role_id']
},
{
'group_id': domain_assignments['group_id'],
'project_id': domain_assignments['project_id'],
'role_id': domain_assignments['role_id']
}
]

project_id = domain_assignments['project_id']

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?scope.project.id=%s' % project_id,
headers=self.headers
)
self.assertEqual(len(expected), len(r.json['role_assignments']))
actual = self._extract_role_assignments_from_response_body(r)
for assignment in actual:
self.assertIn(assignment, expected)

def test_user_can_filter_role_assignments_by_domain(self):
# This shouldn't really provide any more value than just calling GET
# /v3/role_assignments with a domain-scoped token, but we test it
# anyway.
self._setup_test_role_assignments()
domain_assignments = self._setup_test_role_assignments_for_domain()

self.expected.append({
'user_id': domain_assignments['user_id'],
'domain_id': self.domain_id,
'role_id': domain_assignments['role_id']
})
self.expected.append({
'group_id': domain_assignments['group_id'],
'domain_id': self.domain_id,
'role_id': domain_assignments['role_id']
})

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?scope.domain.id=%s' % self.domain_id,
headers=self.headers
)
self.assertEqual(
len(self.expected), len(r.json['role_assignments'])
)
actual = self._extract_role_assignments_from_response_body(r)
for assignment in actual:
self.assertIn(assignment, self.expected)

def test_user_can_filter_role_assignments_by_user_of_domain(self):
self._setup_test_role_assignments()
domain_assignments = self._setup_test_role_assignments_for_domain()

expected = [
{
'user_id': domain_assignments['user_id'],
'domain_id': self.domain_id,
'role_id': domain_assignments['role_id']
},
{
'user_id': domain_assignments['user_id'],
'project_id': domain_assignments['project_id'],
'role_id': domain_assignments['role_id']
}
]

user_id = domain_assignments['user_id']

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?user.id=%s' % user_id,
headers=self.headers
)
self.assertEqual(len(expected), len(r.json['role_assignments']))
actual = self._extract_role_assignments_from_response_body(r)
for assignment in actual:
self.assertIn(assignment, expected)

def test_user_can_filter_role_assignments_by_group_of_domain(self):
self._setup_test_role_assignments()
domain_assignments = self._setup_test_role_assignments_for_domain()

expected = [
{
'group_id': domain_assignments['group_id'],
'domain_id': self.domain_id,
'role_id': domain_assignments['role_id']
},
{
'group_id': domain_assignments['group_id'],
'project_id': domain_assignments['project_id'],
'role_id': domain_assignments['role_id']
}
]

group_id = domain_assignments['group_id']

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?group.id=%s' % group_id,
headers=self.headers
)
self.assertEqual(len(expected), len(r.json['role_assignments']))
actual = self._extract_role_assignments_from_response_body(r)
for assignment in actual:
self.assertIn(assignment, expected)

def test_user_cannot_filter_role_assignments_by_system(self):
self._setup_test_role_assignments()
self._setup_test_role_assignments_for_domain()

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?scope.system=all',
headers=self.headers
)
self.assertEqual(0, len(r.json['role_assignments']))

def test_user_cannot_filter_role_assignments_by_other_domain(self):
assignments = self._setup_test_role_assignments()
domain = assignments['domain_id']
with self.test_client() as c:
r = c.get(
'/v3/role_assignments?scope.domain.id=%s' % domain,
headers=self.headers
)
self.assertEqual([], r.json['role_assignments'])

def test_user_cannot_filter_role_assignments_by_other_domain_project(self):
assignments = self._setup_test_role_assignments()
self._setup_test_role_assignments_for_domain()

# This project is in an entirely separate domain that this user doesn't
# have authorization to access, so they should only see an empty list
project_id = assignments['project_id']

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?scope.project.id=%s' % project_id,
headers=self.headers
)
self.assertEqual(0, len(r.json['role_assignments']))

def test_user_cannot_filter_role_assignments_by_other_domain_user(self):
assignments = self._setup_test_role_assignments()
self._setup_test_role_assignments_for_domain()

# This user doesn't have any role assignments on self.domain_id, so the
# domain user of self.domain_id should only see an empty list of role
# assignments.
user_id = assignments['user_id']

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?user.id=%s' % user_id,
headers=self.headers
)
self.assertEqual(0, len(r.json['role_assignments']))

def test_user_cannot_filter_role_assignments_by_other_domain_group(self):
assignments = self._setup_test_role_assignments()
self._setup_test_role_assignments_for_domain()

# This group doesn't have any role assignments on self.domain_id, so
# the domain user of self.domain_id should only see an empty list of
# role assignments.
group_id = assignments['group_id']

with self.test_client() as c:
r = c.get(
'/v3/role_assignments?group.id=%s' % group_id,
headers=self.headers,
)
self.assertEqual(0, len(r.json['role_assignments']))


class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_AssignmentTestUtilities,
@@ -758,3 +1027,45 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}


class DomainReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_AssignmentTestUtilities,
_DomainUserTests):

def setUp(self):
super(DomainReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)

domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_reader = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_reader)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
self.expected = [
# assignment of the user running the test case
{
'user_id': self.user_id,
'domain_id': self.domain_id,
'role_id': self.bootstrapper.reader_role_id
}]

auth = self.build_authentication_request(
user_id=self.user_id, password=domain_reader['password'],
domain_id=self.domain_id,
)

# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}

Loading…
Cancel
Save