Expose access rules as its own API

This change creates a /v3/users/{user_id}/access_rules endpoint to allow
users to view and delete their own access rules. Access rules are not
automatically deleted when an application credential is deleted, so they
can be re-used for other application credentials or explicitly deleted
by the user. Access rules are automatically deleted when the user is
deleted, the same way that application credentials are. Access rules
that are in use by an application credential may not be deleted.

bp whitelist-extension-for-app-creds

Change-Id: I37d243d802cd538189ccfffee6ebf0624b7785d3
This commit is contained in:
Colleen Murphy 2019-06-28 12:43:08 -07:00 committed by Colleen Murphy
parent d7c424d220
commit 67682dcd07
14 changed files with 987 additions and 11 deletions

View File

@ -241,6 +241,11 @@ identity:get_application_credential GET /v3/users/{user_i
identity:list_application_credentials GET /v3/users/{user_id}/application_credentials
identity:create_application_credential POST /v3/users/{user_id}/application_credential
identity:delete_application_credential DELETE /v3/users/{user_id}/application_credential/{application_credential_id}
identity:get_access_rule GET /v3/users/{user_id}/access_rules/{access_rule_id}
identity:list_access_rules GET /v3/users/{user_id}/access_rules
identity:delete_access_rule DELETE /v3/users/{user_id}/access_rules/{access_rule_id}
========================================================= ===
.. _grant_resources:

View File

@ -663,6 +663,57 @@ class UserAppCredGetDeleteResource(ks_flask.ResourceBase):
return None, http_client.NO_CONTENT
class UserAccessRuleListResource(ks_flask.ResourceBase):
collection_key = 'access_rules'
member_key = 'access_rule'
def get(self, user_id):
"""List access rules for user.
GET/HEAD /v3/users/{user_id}/access_rules
"""
filters = ('service', 'path', 'method',)
ENFORCER.enforce_call(action='identity:list_access_rules',
filters=filters,
build_target=_build_user_target_enforcement)
app_cred_api = PROVIDERS.application_credential_api
hints = self.build_driver_hints(filters)
refs = app_cred_api.list_access_rules_for_user(user_id, hints=hints)
hints = self.build_driver_hints(filters)
return self.wrap_collection(refs, hints=hints)
class UserAccessRuleGetDeleteResource(ks_flask.ResourceBase):
collection_key = 'access_rules'
member_key = 'access_rule'
def get(self, user_id, access_rule_id):
"""Get access rule resource.
GET/HEAD /v3/users/{user_id}/access_rules/{access_rule_id}
"""
ENFORCER.enforce_call(
action='identity:get_access_rule',
build_target=_build_user_target_enforcement
)
ref = PROVIDERS.application_credential_api.get_access_rule(
access_rule_id)
return self.wrap_member(ref)
def delete(self, user_id, access_rule_id):
"""Delete access rule resource.
DELETE /v3/users/{user_id}/access_rules/{access_rule_id}
"""
ENFORCER.enforce_call(
action='identity:delete_access_rule',
build_target=_build_user_target_enforcement
)
PROVIDERS.application_credential_api.delete_access_rule(
access_rule_id, initiator=self.audit_initiator)
return None, http_client.NO_CONTENT
class UserAPI(ks_flask.APIBase):
_name = 'users'
_import_name = __name__
@ -772,6 +823,24 @@ class UserAPI(ks_flask.APIBase):
'user_id': json_home.Parameters.USER_ID,
'application_credential_id':
json_home.Parameters.APPLICATION_CRED_ID}
),
ks_flask.construct_resource_map(
resource=UserAccessRuleListResource,
url='/users/<string:user_id>/access_rules',
resource_kwargs={},
rel='access_rules',
path_vars={'user_id': json_home.Parameters.USER_ID}
),
ks_flask.construct_resource_map(
resource=UserAccessRuleGetDeleteResource,
url=('/users/<string:user_id>/access_rules/'
'<string:access_rule_id>'),
resource_kwargs={},
rel='access_rule',
path_vars={
'user_id': json_home.Parameters.USER_ID,
'access_rule_id':
json_home.Parameters.ACCESS_RULE_ID}
)
]

View File

@ -95,3 +95,40 @@ class ApplicationCredentialDriverBase(object):
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def get_access_rule(self, access_rule_id):
"""Get an access rule by its ID.
:param str access_rule_id: Access Rule ID
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def list_access_rules_for_user(self, user_id):
"""List the access rules that a user has created.
Access rules are only created as attributes of application credentials,
they cannot be created independently.
:param str user_id: User ID
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def delete_access_rule(self, access_rule_id):
"""Delete one access rule.
:param str access_rule_id: Access Rule ID
"""
raise exception.NotImplemented() # pragma: no cover
@abc.abstractmethod
def delete_access_rules_for_user(self, user_id):
"""Delete all access rules for user.
This is called when the user itself is deleted.
:param str user_id: User ID
"""
raise exception.NotImplemented() # pragma: no cover

View File

@ -78,8 +78,7 @@ class AccessRuleModel(sql.ModelBase, sql.ModelDictMixin):
)
application_credential = sqlalchemy.orm.relationship(
'ApplicationCredentialAccessRuleModel',
backref=sqlalchemy.orm.backref('access_rule'),
cascade='all, delete-orphan')
backref=sqlalchemy.orm.backref('access_rule'))
class ApplicationCredentialAccessRuleModel(sql.ModelBase, sql.ModelDictMixin):
@ -168,15 +167,21 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase):
app_cred['roles'] = roles
if ref.access_rules:
access_rules = [
{k.replace('external_id', 'id'): v
for k, v in c.access_rule.to_dict().items()
if k != 'user_id' and k != 'id'}
self._access_rule_to_dict(c.access_rule)
for c in ref.access_rules
]
app_cred['access_rules'] = access_rules
app_cred.pop('internal_id')
return app_cred
def _access_rule_to_dict(self, ref):
access_rule = ref.to_dict()
return {
k.replace('external_id', 'id'): v
for k, v in access_rule.items()
if k != 'user_id' and k != 'id'
}
def get_application_credential(self, application_credential_id):
with sql.session_for_read() as session:
query = session.query(ApplicationCredentialModel).filter_by(
@ -220,3 +225,37 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase):
query = query.filter_by(user_id=user_id)
query = query.filter_by(project_id=project_id)
query.delete()
def get_access_rule(self, access_rule_id):
with sql.session_for_read() as session:
query = session.query(AccessRuleModel).filter_by(
external_id=access_rule_id)
ref = query.first()
if not ref:
raise exception.AccessRuleNotFound(
access_rule_id=access_rule_id)
access_rule = self._access_rule_to_dict(ref)
return access_rule
def list_access_rules_for_user(self, user_id, hints):
with sql.session_for_read() as session:
query = session.query(AccessRuleModel).filter_by(user_id=user_id)
refs = sql.filter_limit_query(AccessRuleModel, query, hints)
return [self._access_rule_to_dict(ref) for ref in refs]
def delete_access_rule(self, access_rule_id):
try:
with sql.session_for_write() as session:
query = session.query(AccessRuleModel)
ref = query.filter_by(external_id=access_rule_id).first()
if not ref:
raise exception.AccessRuleNotFound(
access_rule_id=access_rule_id)
session.delete(ref)
except AssertionError:
raise exception.ForbiddenNotSecurity("May not delete access rule in use")
def delete_access_rules_for_user(self, user_id):
with sql.session_for_write() as session:
query = session.query(AccessRuleModel).filter_by(user_id=user_id)
query.delete()

View File

@ -40,6 +40,7 @@ class Manager(manager.Manager):
_provides_api = 'application_credential_api'
_APP_CRED = 'application_credential'
_ACCESS_RULE = 'access_rule'
def __init__(self):
super(Manager, self).__init__(CONF.application_credential.driver)
@ -61,6 +62,7 @@ class Manager(manager.Manager):
self, service, resource_type, operation, payload):
user_id = payload['resource_info']
self._delete_application_credentials_for_user(user_id)
self._delete_access_rules_for_user(user_id)
def _delete_app_creds_on_assignment_removal(
self, service, resource_type, operation, payload):
@ -167,6 +169,26 @@ class Manager(manager.Manager):
user_id, hints)
return [self._process_app_cred(app_cred) for app_cred in app_cred_list]
@MEMOIZE
def get_access_rule(self, access_rule_id):
"""Get access rule details.
:param str access_rule_id: Access Rule ID
:returns: an access rule
"""
return self.driver.get_access_rule(access_rule_id)
def list_access_rules_for_user(self, user_id, hints=None):
"""List access rules for user.
:param str user_id: User ID
:returns: a list of access rules
"""
hints = hints or driver_hints.Hints()
return self.driver.list_access_rules_for_user(user_id, hints)
def delete_application_credential(self, application_credential_id,
initiator=None):
"""Delete an application credential.
@ -214,3 +236,32 @@ class Manager(manager.Manager):
user_id, project_id)
for app_cred in app_creds:
self.get_application_credential.invalidate(self, app_cred['id'])
def delete_access_rule(self, access_rule_id, initiator=None):
"""Delete an access rule.
:param str: access_rule_id: Access Rule ID
:param initiator: CADF initiator
:raises keystone.exception.AccessRuleNotFound: If the access rule
doesn't exist.
"""
self.driver.delete_access_rule(access_rule_id)
self.get_access_rule.invalidate(self, access_rule_id)
notifications.Audit.deleted(
self._ACCESS_RULE, access_rule_id, initiator)
def _delete_access_rules_for_user(self, user_id, initiator=None):
"""Delete all access rules for a user.
:param str user_id: User ID
This is triggered when a user is deleted.
"""
access_rules = self.driver.list_access_rules_for_user(
user_id, driver_hints.Hints())
self.driver.delete_access_rules_for_user(user_id)
for rule in access_rules:
self.get_access_rule.invalidate(self, rule['id'])
notifications.Audit.deleted(self._ACCESS_RULE, rule['id'],
initiator)

View File

@ -59,6 +59,8 @@ class Parameters(object):
LIMIT_ID = build_v3_parameter_relation('limit_id')
APPLICATION_CRED_ID = build_v3_parameter_relation(
'application_credential_id')
ACCESS_RULE_ID = build_v3_parameter_relation(
'access_rule_id')
class Status(object):

View File

@ -12,6 +12,7 @@
import itertools
from keystone.common.policies import access_rule
from keystone.common.policies import access_token
from keystone.common.policies import application_credential
from keystone.common.policies import auth
@ -50,8 +51,9 @@ from keystone.common.policies import user
def list_rules():
return itertools.chain(
base.list_rules(),
application_credential.list_rules(),
access_rule.list_rules(),
access_token.list_rules(),
application_credential.list_rules(),
auth.list_rules(),
consumer.list_rules(),
credential.list_rules(),

View File

@ -0,0 +1,62 @@
# Copyright 2019 SUSE LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_policy import policy
from keystone.common.policies import base
collection_path = '/v3/users/{user_id}/access_rules'
resource_path = collection_path + '/{access_rule_id}'
SYSTEM_READER_OR_OWNER = (
'(' + base.SYSTEM_READER + ') or '
'user_id:%(target.user.id)s'
)
SYSTEM_ADMIN_OR_OWNER = (
'(' + base.SYSTEM_ADMIN + ') or '
'user_id:%(target.user.id)s'
)
access_rule_policies = [
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'get_access_rule',
check_str=SYSTEM_READER_OR_OWNER,
scope_types=['system', 'project'],
description='Show access rule details.',
operations=[{'path': resource_path,
'method': 'GET'},
{'path': resource_path,
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'list_access_rules',
check_str=SYSTEM_READER_OR_OWNER,
scope_types=['system', 'project'],
description='List access rules for a user.',
operations=[{'path': collection_path,
'method': 'GET'},
{'path': collection_path,
'method': 'HEAD'}]),
policy.DocumentedRuleDefault(
name=base.IDENTITY % 'delete_access_rule',
check_str=SYSTEM_ADMIN_OR_OWNER,
scope_types=['system', 'project'],
description='Delete an access_rule.',
operations=[{'path': resource_path,
'method': 'DELETE'}])
]
def list_rules():
return access_rule_policies

View File

@ -549,6 +549,10 @@ class ApplicationCredentialNotFound(NotFound):
"%(application_credential_id)s.")
class AccessRuleNotFound(NotFound):
message_format = _("Could not find Access Rule: %(access_rule_id)s.")
class Conflict(Error):
message_format = _("Conflict occurred attempting to store %(type)s -"
" %(details)s.")

View File

@ -322,3 +322,43 @@ class ApplicationCredentialTests(object):
self.app_cred_api.authenticate,
resp['id'],
badpass)
def test_get_delete_access_rules(self):
app_cred = self._new_app_cred_data(self.user_foo['id'],
project_id=self.project_bar['id'])
access_rule_id = uuid.uuid4().hex
app_cred['access_rules'] = [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
self.app_cred_api.create_application_credential(app_cred)
self.assertDictEqual(app_cred['access_rules'][0],
self.app_cred_api.get_access_rule(access_rule_id))
self.app_cred_api.delete_application_credential(app_cred['id'])
self.app_cred_api.delete_access_rule(access_rule_id)
self.assertRaises(exception.AccessRuleNotFound,
self.app_cred_api.get_access_rule,
access_rule_id)
def test_list_delete_access_rule_for_user(self):
app_cred = self._new_app_cred_data(self.user_foo['id'],
project_id=self.project_bar['id'])
access_rule_id = uuid.uuid4().hex
app_cred['access_rules'] = [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
self.app_cred_api.create_application_credential(app_cred)
self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user(
self.user_foo['id'])))
self.app_cred_api.delete_application_credential(app_cred['id'])
# access rule should still exist
self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user(
self.user_foo['id'])))
self.app_cred_api.delete_access_rules_for_user(self.user_foo['id'])
self.assertEqual(0, len(self.app_cred_api.list_access_rules_for_user(
self.user_foo['id'])))

View File

@ -0,0 +1,626 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import uuid
from six.moves import http_client
from keystone.common import provider_api
import keystone.conf
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class _UserAccessRuleTests(object):
"""Test cases for anyone that has a valid user token."""
def test_user_can_get_their_access_rules(self):
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': self.user_id,
'project_id': self.project_id,
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (self.user_id, app_cred['access_rules'][0]['id'])
c.get(path, headers=self.headers)
def test_user_can_list_their_access_rules(self):
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': self.user_id,
'project_id': self.project_id,
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': uuid.uuid4().hex,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
with self.test_client() as c:
r = c.get('/v3/users/%s/access_rules' % self.user_id, headers=self.headers)
self.assertEqual(len(r.json['access_rules']), 1)
def test_user_can_delete_their_access_rules(self):
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': self.user_id,
'project_id': self.project_id,
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (self.user_id, access_rule_id)
c.delete(path, headers=self.headers)
class _ProjectUsersTests(object):
"""Users who have project role authorization observe the same behavior."""
def test_user_cannot_get_access_rules_for_other_users(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
c.get(
path, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_get_own_non_existent_access_rule_not_found(self):
with self.test_client() as c:
c.get(
'/v3/users/%s/access_rules/%s' % (
self.user_id, uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.NOT_FOUND
)
def test_user_cannot_get_non_existent_access_rule_other_user_forbidden(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
with self.test_client() as c:
c.get(
'/v3/users/%s/access_rules/%s' % (
user['id'], uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_list_access_rules_for_other_users(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': uuid.uuid4().hex,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
with self.test_client() as c:
path = '/v3/users/%s/access_rules' % user['id']
c.get(path, headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_delete_access_rules_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
c.delete(
path, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_access_rule_other_user_forbidden(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
with self.test_client() as c:
c.delete(
'/v3/users/%s/access_rules/%s' % (
user['id'], uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class _SystemUserAccessRuleTests(object):
"""Tests that are common across all system users."""
def test_user_can_list_access_rules_for_other_users(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': uuid.uuid4().hex,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
with self.test_client() as c:
r = c.get('/v3/users/%s/access_rules' % user['id'],
headers=self.headers)
self.assertEqual(1, len(r.json['access_rules']))
def test_user_cannot_get_non_existent_access_rule_not_found(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
with self.test_client() as c:
c.get(
'/v3/users/%s/access_rules/%s' % (
user['id'], uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.NOT_FOUND
)
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserAccessRuleTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_reader = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_reader
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.reader_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_reader['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_cannot_delete_access_rules_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
c.delete(
path, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_access_rule_forbidden(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
with self.test_client() as c:
c.delete(
'/v3/users/%s/access_rules/%s' % (
user['id'], uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class SystemMemberTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserAccessRuleTests):
def setUp(self):
super(SystemMemberTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_member = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_member
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.member_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_member['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_cannot_delete_access_rules_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
c.delete(
path, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
c.delete(
path, headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
def test_user_cannot_delete_non_existent_access_rule_forbidden(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
with self.test_client() as c:
c.delete(
'/v3/users/%s/access_rules/%s' % (
user['id'], uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.FORBIDDEN
)
class SystemAdminTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserAccessRuleTests):
def setUp(self):
super(SystemAdminTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
# Reuse the system administrator account created during
# ``keystone-manage bootstrap``
self.user_id = self.bootstrapper.admin_user_id
auth = self.build_authentication_request(
user_id=self.user_id,
password=self.bootstrapper.admin_password,
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_can_delete_access_rules_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=user['id'],
project_id=project['id']
)
access_rule_id = uuid.uuid4().hex
app_cred = {
'id': uuid.uuid4().hex,
'name': uuid.uuid4().hex,
'user_id': user['id'],
'project_id': project['id'],
'secret': uuid.uuid4().hex,
'access_rules': [{
'id': access_rule_id,
'service': uuid.uuid4().hex,
'path': uuid.uuid4().hex,
'method': uuid.uuid4().hex[16:]
}]
}
PROVIDERS.application_credential_api.create_application_credential(app_cred)
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
with self.test_client() as c:
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
c.delete(path, headers=self.headers)
def test_user_cannot_delete_non_existent_access_rule_not_found(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user = PROVIDERS.identity_api.create_user(user)
with self.test_client() as c:
c.delete(
'/v3/users/%s/access_rules/%s' % (
user['id'], uuid.uuid4().hex),
headers=self.headers,
expected_status_code=http_client.NOT_FOUND
)
class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_UserAccessRuleTests,
_ProjectUsersTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
project_reader = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
project_reader
)['id']
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
self.project_id = PROVIDERS.resource_api.create_project(
project['id'], project
)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
project_id=self.project_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=project_reader['password'],
project_id=self.project_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_UserAccessRuleTests,
_ProjectUsersTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
project_member = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
project_member
)['id']
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
self.project_id = PROVIDERS.resource_api.create_project(
project['id'], project
)['id']
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.member_role_id, user_id=self.user_id,
project_id=self.project_id
)
auth = self.build_authentication_request(
user_id=self.user_id,
password=project_member['password'],
project_id=self.project_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_UserAccessRuleTests,
_ProjectUsersTests):
def setUp(self):
super(ProjectAdminTests, self).setUp()
self.loadapp()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
# Reuse the system administrator account created during
# ``keystone-manage bootstrap``
self.user_id = self.bootstrapper.admin_user_id
self.project_id = self.bootstrapper.project_id
auth = self.build_authentication_request(
user_id=self.user_id,
password=self.bootstrapper.admin_password,
project_id=self.project_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}

View File

@ -220,6 +220,7 @@ class PolicyJsonTestCase(unit.TestCase):
'identity:create_system_grant_for_user',
'identity:create_trust',
'identity:create_user',
'identity:delete_access_rule',
'identity:delete_access_token',
'identity:delete_application_credential',
'identity:delete_consumer',
@ -249,6 +250,7 @@ class PolicyJsonTestCase(unit.TestCase):
'identity:delete_user',
'identity:ec2_delete_credential',
'identity:ec2_get_credential',
'identity:get_access_rule',
'identity:get_access_token',
'identity:get_access_token_role',
'identity:get_application_credential',
@ -281,6 +283,7 @@ class PolicyJsonTestCase(unit.TestCase):
'identity:get_service_provider',
'identity:get_trust',
'identity:get_user',
'identity:list_access_rules',
'identity:list_access_token_roles',
'identity:list_access_tokens',
'identity:list_application_credentials',

View File

@ -207,10 +207,33 @@ class ApplicationCredentialTestCase(test_v3.RestfulTestCase):
headers={'X-Auth-Token': token},
json=app_cred_body,
expected_status_code=http_client.CREATED)
resp_access_rules = resp.json['application_credential']['access_rules']
self.assertIn('id', resp_access_rules[0])
resp_access_rules[0].pop('id')
self.assertEqual(access_rules[0], resp_access_rules[0])
app_cred_id = resp.json['application_credential']['id']
resp_access_rules = resp.json['application_credential']['access_rules']
access_rule_id = resp_access_rules[0].pop('id')
self.assertEqual(access_rules[0], resp_access_rules[0])
resp = c.get('/v3/users/%s/access_rules' % self.user_id,
headers={'X-Auth-Token': token})
resp_access_rule = resp.json['access_rules'][0]
resp_access_rule.pop('id')
resp_access_rule.pop('links')
self.assertEqual(access_rules[0], resp_access_rule)
resp = c.get('/v3/users/%s/access_rules/%s' % (
self.user_id, access_rule_id), headers={'X-Auth-Token': token})
resp_access_rule = resp.json['access_rule']
resp_access_rule.pop('id')
resp_access_rule.pop('links')
self.assertEqual(access_rules[0], resp_access_rule)
# can't delete an access rule in use
c.delete('/v3/users/%s/access_rules/%s' % (
self.user_id, access_rule_id),
headers={'X-Auth-Token': token},
expected_status_code=http_client.FORBIDDEN)
c.delete('/v3/users/%s/application_credentials/%s' % (
self.user_id, app_cred_id),
headers={'X-Auth-Token': token})
c.delete('/v3/users/%s/access_rules/%s' % (
self.user_id, access_rule_id),
headers={'X-Auth-Token': token})
def test_create_application_credential_with_duplicate_access_rule(self):
roles = [{'id': self.role_id}]

View File

@ -145,6 +145,10 @@ APPLICATION_CREDENTIALS = '/users/{user_id}/application_credentials'
APPLICATION_CREDENTIAL_RELATION = (
json_home.build_v3_parameter_relation('application_credential_id'))
ACCESS_RULE = '/users/{user_id}/access_rules/{access_rule_id}'
ACCESS_RULES = '/users/{user_id}/access_rules'
ACCESS_RULE_RELATION = json_home.build_v3_parameter_relation('access_rule_id')
V3_JSON_HOME_RESOURCES = {
json_home.build_v3_resource_relation('auth_tokens'): {
'href': '/auth/tokens'},
@ -641,7 +645,16 @@ V3_JSON_HOME_RESOURCES = {
'href-template': APPLICATION_CREDENTIAL,
'href-vars': {
'application_credential_id': APPLICATION_CREDENTIAL_RELATION,
'user_id': json_home.build_v3_parameter_relation('user_id')}}
'user_id': json_home.build_v3_parameter_relation('user_id')}},
json_home.build_v3_resource_relation('access_rules'): {
'href-template': ACCESS_RULES,
'href-vars': {
'user_id': json_home.build_v3_parameter_relation('user_id')}},
json_home.build_v3_resource_relation('access_rule'): {
'href-template': ACCESS_RULE,
'href-vars': {
'access_rule_id': ACCESS_RULE_RELATION,
'user_id': json_home.build_v3_parameter_relation('user_id')}},
}