Implement system reader role for users
This commit introduces the system reader role to the users API, making it easier for administrators to delegate subsets of responsibilities to the API by default. This commit also maintains the ability for any user to be able to fetch their own user reference. Subsequent patches will incorporate: - system member test coverage - system admin functionality - domain reader functionality - domain member test coverage - domain admin functionality - project user test coverage Change-Id: I9c362e515772540dfa93d05781d955009b9a154d Partial-Bug: 1805406 Partial-Bug: 1748027 Partial-Bug: 968696
This commit is contained in:
parent
30445574fc
commit
40d3458fcb
|
@ -148,7 +148,10 @@ class UserResource(ks_flask.ResourceBase):
|
|||
|
||||
GET/HEAD /v3/users/{user_id}
|
||||
"""
|
||||
ENFORCER.enforce_call(action='identity:get_user')
|
||||
ENFORCER.enforce_call(
|
||||
action='identity:get_user',
|
||||
build_target=_build_user_target_enforcement
|
||||
)
|
||||
ref = PROVIDERS.identity_api.get_user(user_id)
|
||||
return self.wrap_member(ref)
|
||||
|
||||
|
@ -199,7 +202,10 @@ class UserResource(ks_flask.ResourceBase):
|
|||
|
||||
DELETE /v3/users/{user_id}
|
||||
"""
|
||||
ENFORCER.enforce_call(action='identity:delete_user')
|
||||
ENFORCER.enforce_call(
|
||||
action='identity:delete_user',
|
||||
build_target=_build_user_target_enforcement
|
||||
)
|
||||
PROVIDERS.identity_api.delete_user(user_id)
|
||||
return None, http_client.NO_CONTENT
|
||||
|
||||
|
|
|
@ -10,31 +10,49 @@
|
|||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_log import versionutils
|
||||
from oslo_policy import policy
|
||||
|
||||
from keystone.common.policies import base
|
||||
|
||||
# Allow access for system readers or users attempting to list their owner user
|
||||
# reference.
|
||||
SYSTEM_READER_OR_USER = (
|
||||
'(' + base.SYSTEM_READER + ') or user_id:%(target.user.id)s'
|
||||
)
|
||||
|
||||
DEPRECATED_REASON = """
|
||||
As of the Stein release, the user API understands how to handle system-scoped
|
||||
tokens in addition to project and domain tokens, making the API more accessible
|
||||
to users without compromising security or manageability for administrators. The
|
||||
new default policies for this API account for these changes automatically.
|
||||
"""
|
||||
|
||||
deprecated_get_user = policy.DeprecatedRule(
|
||||
name=base.IDENTITY % 'get_user',
|
||||
check_str=base.RULE_ADMIN_OR_OWNER
|
||||
)
|
||||
deprecated_list_users = policy.DeprecatedRule(
|
||||
name=base.IDENTITY % 'list_users',
|
||||
check_str=base.RULE_ADMIN_REQUIRED
|
||||
)
|
||||
|
||||
user_policies = [
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'get_user',
|
||||
check_str=base.RULE_ADMIN_OR_OWNER,
|
||||
# FIXME(lbragstad): First, a system administrator should be able to get
|
||||
# a user reference for anyone in the system. Second, a project
|
||||
# administrator should be able to get references for users within the
|
||||
# project their token is scoped to or their domain. Third, a user
|
||||
# should be able to get a reference for themselves. This is going to
|
||||
# require keystone to be smarter about enforcing policy checks in code,
|
||||
# specifically for the last two cases. Once that is fixed, we can
|
||||
# uncomment the following line.
|
||||
# scope_types=['system', 'project'],
|
||||
check_str=SYSTEM_READER_OR_USER,
|
||||
scope_types=['system', 'project'],
|
||||
description='Show user details.',
|
||||
operations=[{'path': '/v3/users/{user_id}',
|
||||
'method': 'GET'},
|
||||
{'path': '/v3/users/{user_id}',
|
||||
'method': 'HEAD'}]),
|
||||
'method': 'HEAD'}],
|
||||
deprecated_rule=deprecated_get_user,
|
||||
deprecated_reason=DEPRECATED_REASON,
|
||||
deprecated_since=versionutils.deprecated.STEIN),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'list_users',
|
||||
check_str=base.RULE_ADMIN_REQUIRED,
|
||||
check_str=base.SYSTEM_READER,
|
||||
# FIXME(lbragstad): Since listing users has traditionally always been a
|
||||
# system-level API call, let's maintain that pattern here. A system
|
||||
# administrator should be able to list all users in the deployment,
|
||||
|
@ -49,7 +67,10 @@ user_policies = [
|
|||
operations=[{'path': '/v3/users',
|
||||
'method': 'GET'},
|
||||
{'path': '/v3/users',
|
||||
'method': 'HEAD'}]),
|
||||
'method': 'HEAD'}],
|
||||
deprecated_rule=deprecated_list_users,
|
||||
deprecated_reason=DEPRECATED_REASON,
|
||||
deprecated_since=versionutils.deprecated.STEIN),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'list_projects_for_user',
|
||||
check_str='',
|
||||
|
|
|
@ -0,0 +1,162 @@
|
|||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from six.moves import http_client
|
||||
|
||||
from keystone.common import provider_api
|
||||
import keystone.conf
|
||||
from keystone.tests.common import auth as common_auth
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import base_classes
|
||||
from keystone.tests.unit import ksfixtures
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class _CommonUserTests(object):
|
||||
"""Common default functionality for all users."""
|
||||
|
||||
def test_user_can_get_their_own_user_reference(self):
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/users/%s' % self.user_id, headers=self.headers)
|
||||
self.assertEqual(self.user_id, r.json['user']['id'])
|
||||
|
||||
|
||||
class _SystemUserTests(object):
|
||||
"""Common default functionality for all system users."""
|
||||
|
||||
def test_user_can_get_other_users(self):
|
||||
user = PROVIDERS.identity_api.create_user(
|
||||
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/users/%s' % user['id'], headers=self.headers)
|
||||
self.assertEqual(user['id'], r.json['user']['id'])
|
||||
|
||||
def test_user_cannot_get_non_existent_user_not_found(self):
|
||||
with self.test_client() as c:
|
||||
c.get(
|
||||
'/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
|
||||
expected_status_code=http_client.NOT_FOUND
|
||||
)
|
||||
|
||||
def test_user_can_list_users(self):
|
||||
expected_user_ids = []
|
||||
for _ in range(3):
|
||||
user = PROVIDERS.identity_api.create_user(
|
||||
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
)
|
||||
expected_user_ids.append(user['id'])
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/users', headers=self.headers)
|
||||
returned_user_ids = []
|
||||
for user in r.json['users']:
|
||||
returned_user_ids.append(user['id'])
|
||||
|
||||
for user_id in expected_user_ids:
|
||||
self.assertIn(user_id, returned_user_ids)
|
||||
|
||||
|
||||
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_CommonUserTests,
|
||||
_SystemUserTests):
|
||||
|
||||
def setUp(self):
|
||||
super(SystemReaderTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
system_reader = unit.new_user_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.user_id = PROVIDERS.identity_api.create_user(
|
||||
system_reader
|
||||
)['id']
|
||||
PROVIDERS.assignment_api.create_system_grant_for_user(
|
||||
self.user_id, self.bootstrapper.reader_role_id
|
||||
)
|
||||
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id, password=system_reader['password'],
|
||||
system=True
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
def test_user_cannot_create_users(self):
|
||||
create = {
|
||||
'user': {
|
||||
'name': uuid.uuid4().hex,
|
||||
'domain': CONF.identity.default_domain_id
|
||||
}
|
||||
}
|
||||
|
||||
with self.test_client() as c:
|
||||
c.post(
|
||||
'/v3/users', json=create, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_update_users(self):
|
||||
user = PROVIDERS.identity_api.create_user(
|
||||
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
update = {'user': {'email': uuid.uuid4().hex}}
|
||||
|
||||
c.patch(
|
||||
'/v3/users/%s' % user['id'], json=update, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_update_non_existent_user_forbidden(self):
|
||||
user = PROVIDERS.identity_api.create_user(
|
||||
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
)
|
||||
|
||||
update = {'user': {'email': uuid.uuid4().hex}}
|
||||
with self.test_client() as c:
|
||||
c.patch(
|
||||
'/v3/users/%s' % user['id'], json=update, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_users(self):
|
||||
user = PROVIDERS.identity_api.create_user(
|
||||
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/users/%s' % user['id'], headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_non_existent_user_forbidden(self):
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/users/%s' % uuid.uuid4().hex, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
|
@ -243,7 +243,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
to Domain A - this should fail
|
||||
- Retry this for a user who is in Domain A, which should succeed.
|
||||
- Finally, try getting a user that does not exist, which should
|
||||
still return UserNotFound
|
||||
still return ForbiddenAction
|
||||
|
||||
"""
|
||||
new_policy = {'identity:get_user':
|
||||
|
@ -263,7 +263,7 @@ class IdentityTestProtectedCase(test_v3.RestfulTestCase):
|
|||
|
||||
url_by_name = '/users/%s' % uuid.uuid4().hex
|
||||
r = self.get(url_by_name, auth=self.auth,
|
||||
expected_status=exception.UserNotFound.code)
|
||||
expected_status=exception.ForbiddenAction.code)
|
||||
|
||||
def test_revoke_grant_protected_match_target(self):
|
||||
"""DELETE /domains/{id}/users/{id}/roles/{id} (match target).
|
||||
|
|
|
@ -0,0 +1,7 @@
|
|||
upgrade:
|
||||
- |
|
||||
[`bug 1805406 <https://bugs.launchpad.net/keystone/+bug/1805406>`_]
|
||||
The ``GET /v3/users/{user_id`` API now properly returns an ``HTTP
|
||||
403 Forbidden`` as opposed to ``HTTP 404 Not Found`` if the calling
|
||||
user doesn't have authorization to call the API. This applies consistent
|
||||
authorititive policy checks to the API.
|
Loading…
Reference in New Issue