Merge "Implement system reader role for users"

This commit is contained in:
Zuul 2019-02-06 22:29:00 +00:00 committed by Gerrit Code Review
commit 1df2e24e09
5 changed files with 213 additions and 17 deletions

View File

@ -148,7 +148,10 @@ class UserResource(ks_flask.ResourceBase):
GET/HEAD /v3/users/{user_id}
"""
ENFORCER.enforce_call(action='identity:get_user')
ENFORCER.enforce_call(
action='identity:get_user',
build_target=_build_user_target_enforcement
)
ref = PROVIDERS.identity_api.get_user(user_id)
return self.wrap_member(ref)
@ -199,7 +202,10 @@ class UserResource(ks_flask.ResourceBase):
DELETE /v3/users/{user_id}
"""
ENFORCER.enforce_call(action='identity:delete_user')
ENFORCER.enforce_call(
action='identity:delete_user',
build_target=_build_user_target_enforcement
)
PROVIDERS.identity_api.delete_user(user_id)
return None, http_client.NO_CONTENT

View File

@ -10,31 +10,49 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import versionutils
from oslo_policy import policy
from keystone.common.policies import base
# Allow access for system readers or users attempting to list their owner user
# reference.
SYSTEM_READER_OR_USER = (
'(' + base.SYSTEM_READER + ') or user_id:%(target.user.id)s'
)
DEPRECATED_REASON = """
As of the Stein release, the user API understands how to handle system-scoped
tokens in addition to project and domain tokens, making the API more accessible
to users without compromising security or manageability for administrators. The
new default policies for this API account for these changes automatically.
"""
deprecated_get_user = policy.DeprecatedRule(
name=base.IDENTITY % 'get_user',
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_list_users = policy.DeprecatedRule(
name=base.IDENTITY % 'list_users',
check_str=base.RULE_ADMIN_REQUIRED
)
user_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_user',
check_str=base.RULE_ADMIN_OR_OWNER,
# FIXME(lbragstad): First, a system administrator should be able to get
# a user reference for anyone in the system. Second, a project
# administrator should be able to get references for users within the
# project their token is scoped to or their domain. Third, a user
# should be able to get a reference for themselves. This is going to
# require keystone to be smarter about enforcing policy checks in code,
# specifically for the last two cases. Once that is fixed, we can
# uncomment the following line.
# scope_types=['system', 'project'],
check_str=SYSTEM_READER_OR_USER,
scope_types=['system', 'project'],
description='Show user details.',
operations=[{'path': '/v3/users/{user_id}',
'method': 'GET'},
{'path': '/v3/users/{user_id}',
'method': 'HEAD'}]),
'method': 'HEAD'}],
deprecated_rule=deprecated_get_user,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_users',
check_str=base.RULE_ADMIN_REQUIRED,
check_str=base.SYSTEM_READER,
# FIXME(lbragstad): Since listing users has traditionally always been a
# system-level API call, let's maintain that pattern here. A system
# administrator should be able to list all users in the deployment,
@ -49,7 +67,10 @@ user_policies = [
operations=[{'path': '/v3/users',
'method': 'GET'},
{'path': '/v3/users',
'method': 'HEAD'}]),
'method': 'HEAD'}],
deprecated_rule=deprecated_list_users,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.STEIN),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_projects_for_user',
check_str='',

View File

@ -0,0 +1,162 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from six.moves import http_client
from keystone.common import provider_api
import keystone.conf
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class _CommonUserTests(object):
"""Common default functionality for all users."""
def test_user_can_get_their_own_user_reference(self):
with self.test_client() as c:
r = c.get('/v3/users/%s' % self.user_id, headers=self.headers)
self.assertEqual(self.user_id, r.json['user']['id'])
class _SystemUserTests(object):
"""Common default functionality for all system users."""
def test_user_can_get_other_users(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
r = c.get('/v3/users/%s' % user['id'], headers=self.headers)
self.assertEqual(user['id'], r.json['user']['id'])
def test_user_cannot_get_non_existent_user_not_found(self):
with self.test_client() as c:
c.get(
'/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.NOT_FOUND
)
def test_user_can_list_users(self):
expected_user_ids = []
for _ in range(3):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
expected_user_ids.append(user['id'])
with self.test_client() as c:
r = c.get('/v3/users', headers=self.headers)
returned_user_ids = []
for user in r.json['users']:
returned_user_ids.append(user['id'])
for user_id in expected_user_ids:
self.assertIn(user_id, returned_user_ids)
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_CommonUserTests,
_SystemUserTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_reader = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_reader
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.reader_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_reader['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_cannot_create_users(self):
create = {
'user': {
'name': uuid.uuid4().hex,
'domain': CONF.identity.default_domain_id
}
}
with self.test_client() as c:
c.post(
'/v3/users', json=create, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_users(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
update = {'user': {'email': uuid.uuid4().hex}}
c.patch(
'/v3/users/%s' % user['id'], json=update, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_update_non_existent_user_forbidden(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
update = {'user': {'email': uuid.uuid4().hex}}
with self.test_client() as c:
c.patch(
'/v3/users/%s' % user['id'], json=update, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_users(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
with self.test_client() as c:
c.delete(
'/v3/users/%s' % user['id'], headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_user_forbidden(self):
with self.test_client() as c:
c.delete(
'/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)

View File

@ -243,7 +243,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
to Domain A - this should fail
- Retry this for a user who is in Domain A, which should succeed.
- Finally, try getting a user that does not exist, which should
still return UserNotFound
still return ForbiddenAction
"""
new_policy = {'identity:get_user':
@ -263,7 +263,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
url_by_name = '/users/%s' % uuid.uuid4().hex
r = self.get(url_by_name, auth=self.auth,
expected_status=exception.UserNotFound.code)
expected_status=exception.ForbiddenAction.code)
def test_revoke_grant_protected_match_target(self):
"""DELETE /domains/{id}/users/{id}/roles/{id} (match target).

View File

@ -0,0 +1,7 @@
upgrade:
- |
[`bug 1805406 <https://bugs.launchpad.net/keystone/+bug/1805406>`_]
The ``GET /v3/users/{user_id`` API now properly returns an ``HTTP
403 Forbidden`` as opposed to ``HTTP 404 Not Found`` if the calling
user doesn't have authorization to call the API. This applies consistent
authorititive policy checks to the API.