Implement domain reader functionality for projects

This commit adds explicit testing for how users with the reader role
on a domain should interact with projects both inside and outside of
the domain they have authorization on.

Subsequent patches will continue to build on this by incorporating:

 - domain member test coverage
 - domain admin functionality
 - project user test coverage

Depends-On: https://review.openstack.org/#/c/642102/
Depends-On: https://review.openstack.org/#/c/624794/
Change-Id: I28db6b9bdb16a1ecdacdc2b9ecbb8674ef4d8fe4
Related-Bug: 1750660
Related-Bug: 1806762
This commit is contained in:
Lance Bragstad 2018-12-10 20:49:32 +00:00
parent ed45883380
commit 65165e7e8b
3 changed files with 281 additions and 43 deletions

View File

@ -114,7 +114,12 @@ class ProjectResource(ks_flask.ResourceBase):
GET/HEAD /v3/projects
"""
filters = ('domain_id', 'enabled', 'name', 'parent_id', 'is_domain')
ENFORCER.enforce_call(action='identity:list_projects', filters=filters)
target = None
if self.oslo_context.domain_id:
target = {'domain_id': self.oslo_context.domain_id}
ENFORCER.enforce_call(action='identity:list_projects',
filters=filters,
target_attr=target)
hints = self.build_driver_hints(filters)
# If 'is_domain' has not been included as a query, we default it to
@ -127,7 +132,14 @@ class ProjectResource(ks_flask.ResourceBase):
if t in flask.request.args:
hints.add_filter(t, flask.request.args[t])
refs = PROVIDERS.resource_api.list_projects(hints=hints)
return self.wrap_collection(refs, hints=hints)
if self.oslo_context.domain_id:
domain_id = self.oslo_context.domain_id
filtered_refs = [
ref for ref in refs if ref['domain_id'] == domain_id
]
else:
filtered_refs = refs
return self.wrap_collection(filtered_refs, hints=hints)
def get(self, project_id=None):
"""Get project or list projects.

View File

@ -15,12 +15,30 @@ from oslo_policy import policy
from keystone.common.policies import base
SYSTEM_READER_OR_PROJECT_USER = (
'(' + base.SYSTEM_READER + ') or project_id:%(target.project.id)s'
SYSTEM_READER_OR_DOMAIN_READER_OR_PROJECT_USER = (
'(' + base.SYSTEM_READER + ') or '
'(role:reader and domain_id:%(target.project.domain_id)s) or '
'project_id:%(target.project.id)s'
)
SYSTEM_READER_OR_OWNER = (
'(' + base.SYSTEM_READER + ') or user_id:%(target.user.id)s'
# This policy is only written to be used to protect the
# /v3/users/{user_id}/projects API. It should not be used to protect
# /v3/project APIs because the target information contained in the last check
# is specific to user targets from the user id passed in the
# /v3/users/{user_id}/project path.
SYSTEM_READER_OR_DOMAIN_READER_OR_OWNER = (
# System reader policy
'(' + base.SYSTEM_READER + ') or '
# Domain reader policy
'(role:reader and domain_id:%(target.user.domain_id)s) or '
# User accessing the API with a token they've obtained, matching
# the context user_id to the target user id.
'user_id:%(target.user.id)s'
)
SYSTEM_READER_OR_DOMAIN_READER = (
'(' + base.SYSTEM_READER + ') or '
'(role:reader and domain_id:%(target.domain_id)s)'
)
deprecated_list_projects = policy.DeprecatedRule(
@ -59,19 +77,8 @@ automatically.
project_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_project',
check_str=SYSTEM_READER_OR_PROJECT_USER,
# FIXME(lbragstad): The default check_str here should change to be just
# a role. The OR_TARGET_PROJECT bit of this check_str should actually
# be moved into keystone. A system administrator should be able to get
# any project in the deployement. A domain administrator should be
# able to get any project within their domain. A project administrator
# should be able to get their project or children of their project
# (maybe). This is going to require policy checks in code that make
# keystone smarter about handling these cases. Until we have those in
# place, we should keep scope_type commented out. Otherwise, we risk
# exposing information to people who don't have the correct
# authorization.
scope_types=['system', 'project'],
check_str=SYSTEM_READER_OR_DOMAIN_READER_OR_PROJECT_USER,
scope_types=['system', 'domain', 'project'],
description='Show project details.',
operations=[{'path': '/v3/projects/{project_id}',
'method': 'GET'}],
@ -80,17 +87,12 @@ project_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_projects',
check_str=base.SYSTEM_READER,
# FIXME(lbragstad): This is set to 'system' until keystone is smart
# enough to tailor list_project responses for project-scoped tokens
# without exposing information that doesn't pertain to the scope of the
# token used to make the request. System administrators should be able
# to list all projects in the deployment. Domain administrators should
# be able to list all projects within their domain. Project
# administrators should be able to list projects they administer or
# possibly their children. Until keystone is smart enought to handle
# those cases, keep scope_types set to 'system'.
scope_types=['system'],
check_str=SYSTEM_READER_OR_DOMAIN_READER,
# FIXME(lbragstad): Project administrators should be able to list
# projects they administer or possibly their children. Until keystone
# is smart enough to handle those cases, keep scope_types set to
# 'system' and 'domain'.
scope_types=['system', 'domain'],
description='List projects.',
operations=[{'path': '/v3/projects',
'method': 'GET'}],
@ -99,15 +101,8 @@ project_policies = [
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_user_projects',
check_str=SYSTEM_READER_OR_OWNER,
# FIXME(lbragstad): This is going to require keystone to be smarter
# about how it authorizes this API. A system administrator should be
# able to list all projects for a user. A domain administrator should
# be able to list all projects to users within their domain. A user
# should be able to list projects for themselves, including the
# hierarchy in place. Until we have those cases covered in code and
# tested, we should keep scope_types commented out.
# scope_types=['system', 'project'],
check_str=SYSTEM_READER_OR_DOMAIN_READER_OR_OWNER,
scope_types=['system', 'domain', 'project'],
description='List projects for user.',
operations=[{'path': '/v3/users/{user_id}/projects',
'method': 'GET'}],

View File

@ -15,7 +15,6 @@ import uuid
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common.policies import base
from keystone.common.policies import project as pp
from keystone.common import provider_api
import keystone.conf
@ -146,6 +145,199 @@ class _SystemMemberAndReaderProjectTests(object):
)
class _DomainUsersTests(object):
"""Common default functionality for all domain users."""
def test_user_can_list_projects_within_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
r = c.get('/v3/projects', headers=self.headers)
self.assertEqual(1, len(r.json['projects']))
self.assertEqual(project['id'], r.json['projects'][0]['id'])
def test_user_cannot_list_projects_in_other_domain(self):
PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
r = c.get('/v3/projects', headers=self.headers)
self.assertEqual(0, len(r.json['projects']))
def test_user_can_get_a_project_within_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
r = c.get('/v3/projects/%s' % project['id'], headers=self.headers)
self.assertEqual(project['id'], r.json['project']['id'])
def test_user_cannot_get_a_project_in_other_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
c.get(
'/v3/projects/%s' % project['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_can_list_projects_for_user_in_domain(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(
self.domain_id,
id=uuid.uuid4().hex
)
)
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=self.domain_id)
)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
with self.test_client() as c:
r = c.get(
'/v3/users/%s/projects' % user['id'], headers=self.headers
)
self.assertEqual(1, len(r.json['projects']))
self.assertEqual(project['id'], r.json['projects'][0]['id'])
def test_user_cannot_list_projects_for_user_in_other_domain(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(
CONF.identity.default_domain_id,
id=uuid.uuid4().hex
)
)
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
with self.test_client() as c:
c.get(
'/v3/users/%s/projects' % user['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class _DomainMemberAndReaderProjectTests(object):
"""Common default functionality for domain member and domain readers."""
def test_user_cannot_create_projects_within_domain(self):
create = {'project': unit.new_project_ref(domain_id=self.domain_id)}
with self.test_client() as c:
c.post(
'/v3/projects', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_projects_in_other_domains(self):
create = {
'project': unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
}
with self.test_client() as c:
c.post(
'/v3/projects', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_projects_within_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=self.domain_id)
)
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % project['id'], json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_projects_in_other_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % project['id'], json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_non_existent_project_forbidden(self):
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % uuid.uuid4().hex, json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_projects_within_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=self.domain_id)
)
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % project['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_projects_in_other_domain(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % project['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_projects_forbidden(self):
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserTests,
@ -318,6 +510,41 @@ class SystemAdminTests(base_classes.TestCaseWithBootstrap,
self.assertNotIn(other_project['id'], project_ids)
class DomainReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_DomainUsersTests,
_DomainMemberAndReaderProjectTests):
def setUp(self):
super(DomainReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
domain = PROVIDERS.resource_api.create_domain(
uuid.uuid4().hex, unit.new_domain_ref()
)
self.domain_id = domain['id']
domain_user = unit.new_user_ref(domain_id=self.domain_id)
self.user_id = PROVIDERS.identity_api.create_user(domain_user)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
domain_id=self.domain_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=domain_user['password'],
domain_id=self.domain_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class ProjectUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin):
@ -365,9 +592,13 @@ class ProjectUserTests(base_classes.TestCaseWithBootstrap,
# broken behavior with better scope checking.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:get_project': pp.SYSTEM_READER_OR_PROJECT_USER,
'identity:list_projects': base.SYSTEM_READER,
'identity:list_user_projects': pp.SYSTEM_READER_OR_OWNER
'identity:get_project': (
pp.SYSTEM_READER_OR_DOMAIN_READER_OR_PROJECT_USER
),
'identity:list_projects': pp.SYSTEM_READER_OR_DOMAIN_READER,
'identity:list_user_projects': (
pp.SYSTEM_READER_OR_DOMAIN_READER_OR_OWNER
),
}
f.write(jsonutils.dumps(overridden_policies))