Browse Source

Implement system reader role for projects

This commit introduces the system reader role to the project API, making
it easier for administrators to delegate subsets of responsibilities
to the API by default.

Subsequent patches will incorporate:

  - system member test coverage
  - system admin functionality
  - domain reader functionality
  - domain member test coverage
  - domain admin functionality
  - project user test coverage

Change-Id: I089ada1e314688e60f9041095138bc53cd465fa0
Related-Bug: 1805403
Related-Bug: 1750660
Related-Bug: 1806762
changes/15/624215/5
Lance Bragstad 3 years ago
parent
commit
b35928d5dc
  1. 32
      keystone/api/projects.py
  2. 17
      keystone/api/users.py
  3. 53
      keystone/common/policies/project.py
  4. 351
      keystone/tests/unit/protection/v3/test_projects.py

32
keystone/api/projects.py

@ -32,6 +32,20 @@ ENFORCER = rbac_enforcer.RBACEnforcer
PROVIDERS = provider_api.ProviderAPIs
def _build_project_target_enforcement():
target = {}
try:
target['project'] = PROVIDERS.resource_api.get_project(
flask.request.view_args.get('project_id')
)
except exception.NotFound: # nosec
# Defer existence in the event the project doesn't exist, we'll
# check this later anyway.
pass
return target
class ProjectResource(ks_flask.ResourceBase):
collection_key = 'projects'
member_key = 'project'
@ -86,7 +100,10 @@ class ProjectResource(ks_flask.ResourceBase):
GET/HEAD /v3/projects/{project_id}
"""
ENFORCER.enforce_call(action='identity:get_project')
ENFORCER.enforce_call(
action='identity:get_project',
build_target=_build_project_target_enforcement
)
project = PROVIDERS.resource_api.get_project(project_id)
self._expand_project_ref(project)
return self.wrap_member(project)
@ -97,8 +114,7 @@ class ProjectResource(ks_flask.ResourceBase):
GET/HEAD /v3/projects
"""
filters = ('domain_id', 'enabled', 'name', 'parent_id', 'is_domain')
ENFORCER.enforce_call(action='identity:list_projects',
filters=filters)
ENFORCER.enforce_call(action='identity:list_projects', filters=filters)
hints = self.build_driver_hints(filters)
# If 'is_domain' has not been included as a query, we default it to
@ -155,7 +171,10 @@ class ProjectResource(ks_flask.ResourceBase):
PATCH /v3/projects/{project_id}
"""
ENFORCER.enforce_call(action='identity:update_project')
ENFORCER.enforce_call(
action='identity:update_project',
build_target=_build_project_target_enforcement
)
project = self.request_body_json.get('project', {})
validation.lazy_validate(schema.project_update, project)
self._require_matching_id(project)
@ -170,7 +189,10 @@ class ProjectResource(ks_flask.ResourceBase):
DELETE /v3/projects/{project_id}
"""
ENFORCER.enforce_call(action='identity:delete_project')
ENFORCER.enforce_call(
action='identity:delete_project',
build_target=_build_project_target_enforcement
)
PROVIDERS.resource_api.delete_project(
project_id,
initiator=self.audit_initiator)

17
keystone/api/users.py

@ -92,6 +92,20 @@ def _check_unrestricted_application_credential(token):
raise ks_exception.ForbiddenAction(action=action)
def _build_user_target_enforcement():
target = {}
try:
target['user'] = PROVIDERS.identity_api.get_user(
flask.request.view_args.get('user_id')
)
except ks_exception.NotFound: # nosec
# Defer existence in the event the user doesn't exist, we'll
# check this later anyway.
pass
return target
def _build_enforcer_target_data_owner_and_user_id_match():
ref = {}
if flask.request.view_args:
@ -223,7 +237,8 @@ class UserProjectsResource(ks_flask.ResourceBase):
def get(self, user_id):
filters = ('domain_id', 'enabled', 'name')
ENFORCER.enforce_call(action='identity:list_user_projects',
filters=filters)
filters=filters,
build_target=_build_user_target_enforcement)
hints = self.build_driver_hints(filters)
refs = PROVIDERS.assignment_api.list_projects_for_user(user_id)
return self.wrap_collection(refs, hints=hints)

53
keystone/common/policies/project.py

@ -10,14 +10,44 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import versionutils
from oslo_policy import policy
from keystone.common.policies import base
SYSTEM_READER_OR_PROJECT_USER = (
'(' + base.SYSTEM_READER + ') or project_id:%(target.project.id)s'
)
SYSTEM_READER_OR_OWNER = (
'(' + base.SYSTEM_READER + ') or user_id:%(target.user.id)s'
)
deprecated_list_projects = policy.DeprecatedRule(
name=base.IDENTITY % 'list_projects',
check_str=base.RULE_ADMIN_REQUIRED
)
deprecated_get_project = policy.DeprecatedRule(
name=base.IDENTITY % 'get_project',
check_str=base.RULE_ADMIN_OR_TARGET_PROJECT
)
deprecated_list_user_projects = policy.DeprecatedRule(
name=base.IDENTITY % 'list_user_projects',
check_str=base.RULE_ADMIN_OR_OWNER
)
DEPRECATED_REASON = """
As of the Stein release, the project API understands how to handle
system-scoped tokens in addition to project and domain tokens, making the API
more accessible to users without compromising security or manageability for
administrators. The new default policies for this API account for these changes
automatically.
"""
project_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_project',
check_str=base.RULE_ADMIN_OR_TARGET_PROJECT,
check_str=SYSTEM_READER_OR_PROJECT_USER,
# FIXME(lbragstad): The default check_str here should change to be just
# a role. The OR_TARGET_PROJECT bit of this check_str should actually
# be moved into keystone. A system administrator should be able to get
@ -29,13 +59,16 @@ project_policies = [
# place, we should keep scope_type commented out. Otherwise, we risk
# exposing information to people who don't have the correct
# authorization.
# scope_types=['system', 'project'],
scope_types=['system', 'project'],
description='Show project details.',
operations=[{'path': '/v3/projects/{project_id}',
'method': 'GET'}]),
'method': 'GET'}],
deprecated_rule=deprecated_get_project,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_projects',
check_str=base.RULE_ADMIN_REQUIRED,
check_str=base.SYSTEM_READER,
# FIXME(lbragstad): This is set to 'system' until keystone is smart
# enough to tailor list_project responses for project-scoped tokens
# without exposing information that doesn't pertain to the scope of the
@ -48,10 +81,13 @@ project_policies = [
scope_types=['system'],
description='List projects.',
operations=[{'path': '/v3/projects',
'method': 'GET'}]),
'method': 'GET'}],
deprecated_rule=deprecated_list_projects,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_user_projects',
check_str=base.RULE_ADMIN_OR_OWNER,
check_str=SYSTEM_READER_OR_OWNER,
# FIXME(lbragstad): This is going to require keystone to be smarter
# about how it authorizes this API. A system administrator should be
# able to list all projects for a user. A domain administrator should
@ -62,7 +98,10 @@ project_policies = [
# scope_types=['system', 'project'],
description='List projects for user.',
operations=[{'path': '/v3/users/{user_id}/projects',
'method': 'GET'}]),
'method': 'GET'}],
deprecated_rule=deprecated_list_user_projects,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'create_project',
check_str=base.RULE_ADMIN_REQUIRED,

351
keystone/tests/unit/protection/v3/test_projects.py

@ -0,0 +1,351 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common.policies import base
from keystone.common.policies import project as pp
from keystone.common import provider_api
import keystone.conf
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class _SystemUserTests(object):
"""Common default functionality for all system users."""
def test_user_can_list_projects(self):
PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
r = c.get('/v3/projects', headers=self.headers)
self.assertEqual(2, len(r.json['projects']))
def test_user_can_list_projects_for_other_users(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(
CONF.identity.default_domain_id,
id=uuid.uuid4().hex
)
)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
with self.test_client() as c:
r = c.get(
'/v3/users/%s/projects' % user['id'], headers=self.headers,
)
self.assertEqual(1, len(r.json['projects']))
self.assertEqual(project['id'], r.json['projects'][0]['id'])
def test_user_can_get_a_project(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
r = c.get('/v3/projects/%s' % project['id'], headers=self.headers)
self.assertEqual(project['id'], r.json['project']['id'])
def test_user_cannot_get_non_existent_project_not_found(self):
with self.test_client() as c:
c.get(
'/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.NOT_FOUND
)
class _SystemMemberAndReaderProjectTests(object):
"""Common default functionality for system members and system readers."""
def test_user_cannot_create_projects(self):
create = {
'project': unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
}
with self.test_client() as c:
c.post(
'/v3/projects', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_projects(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % project['id'], json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_non_existent_project_forbidden(self):
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % uuid.uuid4().hex, json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_projects(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % project['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_project_forbidden(self):
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserTests,
_SystemMemberAndReaderProjectTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_reader = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_reader
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.reader_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_reader['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class ProjectUserTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin):
def setUp(self):
super(ProjectUserTests, self).setUp()
self.loadapp()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self._override_policy()
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
self.user_id = self.bootstrapper.admin_user_id
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
project_id=self.bootstrapper.project_id
)
self.project_id = self.bootstrapper.project_id
auth = self.build_authentication_request(
user_id=self.user_id, password=self.bootstrapper.admin_password,
project_id=self.bootstrapper.project_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def _override_policy(self):
# TODO(lbragstad): Remove this once the deprecated policies in
# keystone.common.policies.project have been removed. This is only
# here to make sure we test the new policies instead of the deprecated
# ones. Oslo.policy will OR deprecated policies with new policies to
# maintain compatibility and give operators a chance to update
# permissions or update policies without breaking users. This will
# cause these specific tests to fail since we're trying to correct this
# broken behavior with better scope checking.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:get_project': pp.SYSTEM_READER_OR_PROJECT_USER,
'identity:list_projects': base.SYSTEM_READER,
'identity:list_user_projects': pp.SYSTEM_READER_OR_OWNER
}
f.write(jsonutils.dumps(overridden_policies))
def test_user_cannot_list_projects(self):
# This test is assuming the user calling the API has a role assignment
# on the project created by ``keystone-manage bootstrap``.
with self.test_client() as c:
c.get(
'/v3/projects', headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_projects_for_others(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(
CONF.identity.default_domain_id,
id=uuid.uuid4().hex
)
)
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
with self.test_client() as c:
c.get(
'/v3/users/%s/projects' % user['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_can_list_their_projects(self):
# Users can get this information from the GET /v3/auth/projects API or
# the GET /v3/users/{user_id}/projects API. The GET /v3/projects API is
# administrative, reserved for system and domain users.
with self.test_client() as c:
r = c.get(
'/v3/users/%s/projects' % self.user_id, headers=self.headers,
)
self.assertEqual(1, len(r.json['projects']))
self.assertEqual(self.project_id, r.json['projects'][0]['id'])
def test_user_can_get_their_project(self):
with self.test_client() as c:
c.get('/v3/projects/%s' % self.project_id, headers=self.headers)
def test_user_cannot_get_other_projects(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
c.get(
'/v3/projects/%s' % project['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_create_projects(self):
create = {
'project': unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
}
with self.test_client() as c:
c.post(
'/v3/projects', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_projects(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % project['id'], json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_non_existent_project_forbidden(self):
update = {'project': {'description': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/projects/%s' % uuid.uuid4().hex, json=update,
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_projects(self):
project = PROVIDERS.resource_api.create_project(
uuid.uuid4().hex,
unit.new_project_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % project['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_project_forbidden(self):
with self.test_client() as c:
c.delete(
'/v3/projects/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
Loading…
Cancel
Save