Merge "Expose access rules as its own API"
This commit is contained in:
@@ -241,6 +241,11 @@ identity:get_application_credential GET /v3/users/{user_i
|
||||
identity:list_application_credentials GET /v3/users/{user_id}/application_credentials
|
||||
identity:create_application_credential POST /v3/users/{user_id}/application_credential
|
||||
identity:delete_application_credential DELETE /v3/users/{user_id}/application_credential/{application_credential_id}
|
||||
|
||||
identity:get_access_rule GET /v3/users/{user_id}/access_rules/{access_rule_id}
|
||||
identity:list_access_rules GET /v3/users/{user_id}/access_rules
|
||||
identity:delete_access_rule DELETE /v3/users/{user_id}/access_rules/{access_rule_id}
|
||||
|
||||
========================================================= ===
|
||||
|
||||
.. _grant_resources:
|
||||
|
||||
@@ -663,6 +663,57 @@ class UserAppCredGetDeleteResource(ks_flask.ResourceBase):
|
||||
return None, http_client.NO_CONTENT
|
||||
|
||||
|
||||
class UserAccessRuleListResource(ks_flask.ResourceBase):
|
||||
collection_key = 'access_rules'
|
||||
member_key = 'access_rule'
|
||||
|
||||
def get(self, user_id):
|
||||
"""List access rules for user.
|
||||
|
||||
GET/HEAD /v3/users/{user_id}/access_rules
|
||||
"""
|
||||
filters = ('service', 'path', 'method',)
|
||||
ENFORCER.enforce_call(action='identity:list_access_rules',
|
||||
filters=filters,
|
||||
build_target=_build_user_target_enforcement)
|
||||
app_cred_api = PROVIDERS.application_credential_api
|
||||
hints = self.build_driver_hints(filters)
|
||||
refs = app_cred_api.list_access_rules_for_user(user_id, hints=hints)
|
||||
hints = self.build_driver_hints(filters)
|
||||
return self.wrap_collection(refs, hints=hints)
|
||||
|
||||
|
||||
class UserAccessRuleGetDeleteResource(ks_flask.ResourceBase):
|
||||
collection_key = 'access_rules'
|
||||
member_key = 'access_rule'
|
||||
|
||||
def get(self, user_id, access_rule_id):
|
||||
"""Get access rule resource.
|
||||
|
||||
GET/HEAD /v3/users/{user_id}/access_rules/{access_rule_id}
|
||||
"""
|
||||
ENFORCER.enforce_call(
|
||||
action='identity:get_access_rule',
|
||||
build_target=_build_user_target_enforcement
|
||||
)
|
||||
ref = PROVIDERS.application_credential_api.get_access_rule(
|
||||
access_rule_id)
|
||||
return self.wrap_member(ref)
|
||||
|
||||
def delete(self, user_id, access_rule_id):
|
||||
"""Delete access rule resource.
|
||||
|
||||
DELETE /v3/users/{user_id}/access_rules/{access_rule_id}
|
||||
"""
|
||||
ENFORCER.enforce_call(
|
||||
action='identity:delete_access_rule',
|
||||
build_target=_build_user_target_enforcement
|
||||
)
|
||||
PROVIDERS.application_credential_api.delete_access_rule(
|
||||
access_rule_id, initiator=self.audit_initiator)
|
||||
return None, http_client.NO_CONTENT
|
||||
|
||||
|
||||
class UserAPI(ks_flask.APIBase):
|
||||
_name = 'users'
|
||||
_import_name = __name__
|
||||
@@ -772,6 +823,24 @@ class UserAPI(ks_flask.APIBase):
|
||||
'user_id': json_home.Parameters.USER_ID,
|
||||
'application_credential_id':
|
||||
json_home.Parameters.APPLICATION_CRED_ID}
|
||||
),
|
||||
ks_flask.construct_resource_map(
|
||||
resource=UserAccessRuleListResource,
|
||||
url='/users/<string:user_id>/access_rules',
|
||||
resource_kwargs={},
|
||||
rel='access_rules',
|
||||
path_vars={'user_id': json_home.Parameters.USER_ID}
|
||||
),
|
||||
ks_flask.construct_resource_map(
|
||||
resource=UserAccessRuleGetDeleteResource,
|
||||
url=('/users/<string:user_id>/access_rules/'
|
||||
'<string:access_rule_id>'),
|
||||
resource_kwargs={},
|
||||
rel='access_rule',
|
||||
path_vars={
|
||||
'user_id': json_home.Parameters.USER_ID,
|
||||
'access_rule_id':
|
||||
json_home.Parameters.ACCESS_RULE_ID}
|
||||
)
|
||||
]
|
||||
|
||||
|
||||
@@ -95,3 +95,40 @@ class ApplicationCredentialDriverBase(object):
|
||||
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_access_rule(self, access_rule_id):
|
||||
"""Get an access rule by its ID.
|
||||
|
||||
:param str access_rule_id: Access Rule ID
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def list_access_rules_for_user(self, user_id):
|
||||
"""List the access rules that a user has created.
|
||||
|
||||
Access rules are only created as attributes of application credentials,
|
||||
they cannot be created independently.
|
||||
|
||||
:param str user_id: User ID
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_access_rule(self, access_rule_id):
|
||||
"""Delete one access rule.
|
||||
|
||||
:param str access_rule_id: Access Rule ID
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@abc.abstractmethod
|
||||
def delete_access_rules_for_user(self, user_id):
|
||||
"""Delete all access rules for user.
|
||||
|
||||
This is called when the user itself is deleted.
|
||||
|
||||
:param str user_id: User ID
|
||||
"""
|
||||
raise exception.NotImplemented() # pragma: no cover
|
||||
|
||||
@@ -78,8 +78,7 @@ class AccessRuleModel(sql.ModelBase, sql.ModelDictMixin):
|
||||
)
|
||||
application_credential = sqlalchemy.orm.relationship(
|
||||
'ApplicationCredentialAccessRuleModel',
|
||||
backref=sqlalchemy.orm.backref('access_rule'),
|
||||
cascade='all, delete-orphan')
|
||||
backref=sqlalchemy.orm.backref('access_rule'))
|
||||
|
||||
|
||||
class ApplicationCredentialAccessRuleModel(sql.ModelBase, sql.ModelDictMixin):
|
||||
@@ -168,15 +167,21 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase):
|
||||
app_cred['roles'] = roles
|
||||
if ref.access_rules:
|
||||
access_rules = [
|
||||
{k.replace('external_id', 'id'): v
|
||||
for k, v in c.access_rule.to_dict().items()
|
||||
if k != 'user_id' and k != 'id'}
|
||||
self._access_rule_to_dict(c.access_rule)
|
||||
for c in ref.access_rules
|
||||
]
|
||||
app_cred['access_rules'] = access_rules
|
||||
app_cred.pop('internal_id')
|
||||
return app_cred
|
||||
|
||||
def _access_rule_to_dict(self, ref):
|
||||
access_rule = ref.to_dict()
|
||||
return {
|
||||
k.replace('external_id', 'id'): v
|
||||
for k, v in access_rule.items()
|
||||
if k != 'user_id' and k != 'id'
|
||||
}
|
||||
|
||||
def get_application_credential(self, application_credential_id):
|
||||
with sql.session_for_read() as session:
|
||||
query = session.query(ApplicationCredentialModel).filter_by(
|
||||
@@ -220,3 +225,37 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase):
|
||||
query = query.filter_by(user_id=user_id)
|
||||
query = query.filter_by(project_id=project_id)
|
||||
query.delete()
|
||||
|
||||
def get_access_rule(self, access_rule_id):
|
||||
with sql.session_for_read() as session:
|
||||
query = session.query(AccessRuleModel).filter_by(
|
||||
external_id=access_rule_id)
|
||||
ref = query.first()
|
||||
if not ref:
|
||||
raise exception.AccessRuleNotFound(
|
||||
access_rule_id=access_rule_id)
|
||||
access_rule = self._access_rule_to_dict(ref)
|
||||
return access_rule
|
||||
|
||||
def list_access_rules_for_user(self, user_id, hints):
|
||||
with sql.session_for_read() as session:
|
||||
query = session.query(AccessRuleModel).filter_by(user_id=user_id)
|
||||
refs = sql.filter_limit_query(AccessRuleModel, query, hints)
|
||||
return [self._access_rule_to_dict(ref) for ref in refs]
|
||||
|
||||
def delete_access_rule(self, access_rule_id):
|
||||
try:
|
||||
with sql.session_for_write() as session:
|
||||
query = session.query(AccessRuleModel)
|
||||
ref = query.filter_by(external_id=access_rule_id).first()
|
||||
if not ref:
|
||||
raise exception.AccessRuleNotFound(
|
||||
access_rule_id=access_rule_id)
|
||||
session.delete(ref)
|
||||
except AssertionError:
|
||||
raise exception.ForbiddenNotSecurity("May not delete access rule in use")
|
||||
|
||||
def delete_access_rules_for_user(self, user_id):
|
||||
with sql.session_for_write() as session:
|
||||
query = session.query(AccessRuleModel).filter_by(user_id=user_id)
|
||||
query.delete()
|
||||
|
||||
@@ -40,6 +40,7 @@ class Manager(manager.Manager):
|
||||
_provides_api = 'application_credential_api'
|
||||
|
||||
_APP_CRED = 'application_credential'
|
||||
_ACCESS_RULE = 'access_rule'
|
||||
|
||||
def __init__(self):
|
||||
super(Manager, self).__init__(CONF.application_credential.driver)
|
||||
@@ -61,6 +62,7 @@ class Manager(manager.Manager):
|
||||
self, service, resource_type, operation, payload):
|
||||
user_id = payload['resource_info']
|
||||
self._delete_application_credentials_for_user(user_id)
|
||||
self._delete_access_rules_for_user(user_id)
|
||||
|
||||
def _delete_app_creds_on_assignment_removal(
|
||||
self, service, resource_type, operation, payload):
|
||||
@@ -167,6 +169,26 @@ class Manager(manager.Manager):
|
||||
user_id, hints)
|
||||
return [self._process_app_cred(app_cred) for app_cred in app_cred_list]
|
||||
|
||||
@MEMOIZE
|
||||
def get_access_rule(self, access_rule_id):
|
||||
"""Get access rule details.
|
||||
|
||||
:param str access_rule_id: Access Rule ID
|
||||
|
||||
:returns: an access rule
|
||||
"""
|
||||
return self.driver.get_access_rule(access_rule_id)
|
||||
|
||||
def list_access_rules_for_user(self, user_id, hints=None):
|
||||
"""List access rules for user.
|
||||
|
||||
:param str user_id: User ID
|
||||
|
||||
:returns: a list of access rules
|
||||
"""
|
||||
hints = hints or driver_hints.Hints()
|
||||
return self.driver.list_access_rules_for_user(user_id, hints)
|
||||
|
||||
def delete_application_credential(self, application_credential_id,
|
||||
initiator=None):
|
||||
"""Delete an application credential.
|
||||
@@ -214,3 +236,32 @@ class Manager(manager.Manager):
|
||||
user_id, project_id)
|
||||
for app_cred in app_creds:
|
||||
self.get_application_credential.invalidate(self, app_cred['id'])
|
||||
|
||||
def delete_access_rule(self, access_rule_id, initiator=None):
|
||||
"""Delete an access rule.
|
||||
|
||||
:param str: access_rule_id: Access Rule ID
|
||||
:param initiator: CADF initiator
|
||||
|
||||
:raises keystone.exception.AccessRuleNotFound: If the access rule
|
||||
doesn't exist.
|
||||
"""
|
||||
self.driver.delete_access_rule(access_rule_id)
|
||||
self.get_access_rule.invalidate(self, access_rule_id)
|
||||
notifications.Audit.deleted(
|
||||
self._ACCESS_RULE, access_rule_id, initiator)
|
||||
|
||||
def _delete_access_rules_for_user(self, user_id, initiator=None):
|
||||
"""Delete all access rules for a user.
|
||||
|
||||
:param str user_id: User ID
|
||||
|
||||
This is triggered when a user is deleted.
|
||||
"""
|
||||
access_rules = self.driver.list_access_rules_for_user(
|
||||
user_id, driver_hints.Hints())
|
||||
self.driver.delete_access_rules_for_user(user_id)
|
||||
for rule in access_rules:
|
||||
self.get_access_rule.invalidate(self, rule['id'])
|
||||
notifications.Audit.deleted(self._ACCESS_RULE, rule['id'],
|
||||
initiator)
|
||||
|
||||
@@ -59,6 +59,8 @@ class Parameters(object):
|
||||
LIMIT_ID = build_v3_parameter_relation('limit_id')
|
||||
APPLICATION_CRED_ID = build_v3_parameter_relation(
|
||||
'application_credential_id')
|
||||
ACCESS_RULE_ID = build_v3_parameter_relation(
|
||||
'access_rule_id')
|
||||
|
||||
|
||||
class Status(object):
|
||||
|
||||
@@ -12,6 +12,7 @@
|
||||
|
||||
import itertools
|
||||
|
||||
from keystone.common.policies import access_rule
|
||||
from keystone.common.policies import access_token
|
||||
from keystone.common.policies import application_credential
|
||||
from keystone.common.policies import auth
|
||||
@@ -50,8 +51,9 @@ from keystone.common.policies import user
|
||||
def list_rules():
|
||||
return itertools.chain(
|
||||
base.list_rules(),
|
||||
application_credential.list_rules(),
|
||||
access_rule.list_rules(),
|
||||
access_token.list_rules(),
|
||||
application_credential.list_rules(),
|
||||
auth.list_rules(),
|
||||
consumer.list_rules(),
|
||||
credential.list_rules(),
|
||||
|
||||
62
keystone/common/policies/access_rule.py
Normal file
62
keystone/common/policies/access_rule.py
Normal file
@@ -0,0 +1,62 @@
|
||||
# Copyright 2019 SUSE LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
from oslo_policy import policy
|
||||
|
||||
from keystone.common.policies import base
|
||||
|
||||
collection_path = '/v3/users/{user_id}/access_rules'
|
||||
resource_path = collection_path + '/{access_rule_id}'
|
||||
|
||||
SYSTEM_READER_OR_OWNER = (
|
||||
'(' + base.SYSTEM_READER + ') or '
|
||||
'user_id:%(target.user.id)s'
|
||||
)
|
||||
|
||||
SYSTEM_ADMIN_OR_OWNER = (
|
||||
'(' + base.SYSTEM_ADMIN + ') or '
|
||||
'user_id:%(target.user.id)s'
|
||||
)
|
||||
|
||||
access_rule_policies = [
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'get_access_rule',
|
||||
check_str=SYSTEM_READER_OR_OWNER,
|
||||
scope_types=['system', 'project'],
|
||||
description='Show access rule details.',
|
||||
operations=[{'path': resource_path,
|
||||
'method': 'GET'},
|
||||
{'path': resource_path,
|
||||
'method': 'HEAD'}]),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'list_access_rules',
|
||||
check_str=SYSTEM_READER_OR_OWNER,
|
||||
scope_types=['system', 'project'],
|
||||
description='List access rules for a user.',
|
||||
operations=[{'path': collection_path,
|
||||
'method': 'GET'},
|
||||
{'path': collection_path,
|
||||
'method': 'HEAD'}]),
|
||||
policy.DocumentedRuleDefault(
|
||||
name=base.IDENTITY % 'delete_access_rule',
|
||||
check_str=SYSTEM_ADMIN_OR_OWNER,
|
||||
scope_types=['system', 'project'],
|
||||
description='Delete an access_rule.',
|
||||
operations=[{'path': resource_path,
|
||||
'method': 'DELETE'}])
|
||||
]
|
||||
|
||||
|
||||
def list_rules():
|
||||
return access_rule_policies
|
||||
@@ -549,6 +549,10 @@ class ApplicationCredentialNotFound(NotFound):
|
||||
"%(application_credential_id)s.")
|
||||
|
||||
|
||||
class AccessRuleNotFound(NotFound):
|
||||
message_format = _("Could not find Access Rule: %(access_rule_id)s.")
|
||||
|
||||
|
||||
class Conflict(Error):
|
||||
message_format = _("Conflict occurred attempting to store %(type)s -"
|
||||
" %(details)s.")
|
||||
|
||||
@@ -322,3 +322,43 @@ class ApplicationCredentialTests(object):
|
||||
self.app_cred_api.authenticate,
|
||||
resp['id'],
|
||||
badpass)
|
||||
|
||||
def test_get_delete_access_rules(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
project_id=self.project_bar['id'])
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred['access_rules'] = [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
self.app_cred_api.create_application_credential(app_cred)
|
||||
self.assertDictEqual(app_cred['access_rules'][0],
|
||||
self.app_cred_api.get_access_rule(access_rule_id))
|
||||
self.app_cred_api.delete_application_credential(app_cred['id'])
|
||||
self.app_cred_api.delete_access_rule(access_rule_id)
|
||||
self.assertRaises(exception.AccessRuleNotFound,
|
||||
self.app_cred_api.get_access_rule,
|
||||
access_rule_id)
|
||||
|
||||
def test_list_delete_access_rule_for_user(self):
|
||||
app_cred = self._new_app_cred_data(self.user_foo['id'],
|
||||
project_id=self.project_bar['id'])
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred['access_rules'] = [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
self.app_cred_api.create_application_credential(app_cred)
|
||||
self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user(
|
||||
self.user_foo['id'])))
|
||||
self.app_cred_api.delete_application_credential(app_cred['id'])
|
||||
# access rule should still exist
|
||||
self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user(
|
||||
self.user_foo['id'])))
|
||||
self.app_cred_api.delete_access_rules_for_user(self.user_foo['id'])
|
||||
self.assertEqual(0, len(self.app_cred_api.list_access_rules_for_user(
|
||||
self.user_foo['id'])))
|
||||
|
||||
626
keystone/tests/unit/protection/v3/test_access_rules.py
Normal file
626
keystone/tests/unit/protection/v3/test_access_rules.py
Normal file
@@ -0,0 +1,626 @@
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import uuid
|
||||
|
||||
from six.moves import http_client
|
||||
|
||||
from keystone.common import provider_api
|
||||
import keystone.conf
|
||||
from keystone.tests.common import auth as common_auth
|
||||
from keystone.tests import unit
|
||||
from keystone.tests.unit import base_classes
|
||||
from keystone.tests.unit import ksfixtures
|
||||
from keystone.tests.unit.ksfixtures import temporaryfile
|
||||
|
||||
CONF = keystone.conf.CONF
|
||||
PROVIDERS = provider_api.ProviderAPIs
|
||||
|
||||
|
||||
class _UserAccessRuleTests(object):
|
||||
"""Test cases for anyone that has a valid user token."""
|
||||
|
||||
def test_user_can_get_their_access_rules(self):
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': self.user_id,
|
||||
'project_id': self.project_id,
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (self.user_id, app_cred['access_rules'][0]['id'])
|
||||
c.get(path, headers=self.headers)
|
||||
|
||||
def test_user_can_list_their_access_rules(self):
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': self.user_id,
|
||||
'project_id': self.project_id,
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': uuid.uuid4().hex,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/users/%s/access_rules' % self.user_id, headers=self.headers)
|
||||
self.assertEqual(len(r.json['access_rules']), 1)
|
||||
|
||||
def test_user_can_delete_their_access_rules(self):
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': self.user_id,
|
||||
'project_id': self.project_id,
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (self.user_id, access_rule_id)
|
||||
c.delete(path, headers=self.headers)
|
||||
|
||||
|
||||
class _ProjectUsersTests(object):
|
||||
"""Users who have project role authorization observe the same behavior."""
|
||||
|
||||
def test_user_cannot_get_access_rules_for_other_users(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
|
||||
c.get(
|
||||
path, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_get_own_non_existent_access_rule_not_found(self):
|
||||
with self.test_client() as c:
|
||||
c.get(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
self.user_id, uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.NOT_FOUND
|
||||
)
|
||||
|
||||
def test_user_cannot_get_non_existent_access_rule_other_user_forbidden(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
with self.test_client() as c:
|
||||
c.get(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
user['id'], uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_list_access_rules_for_other_users(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': uuid.uuid4().hex,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules' % user['id']
|
||||
c.get(path, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN)
|
||||
|
||||
def test_user_cannot_delete_access_rules_for_others(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
|
||||
c.delete(
|
||||
path, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_non_existent_access_rule_other_user_forbidden(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
user['id'], uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
|
||||
class _SystemUserAccessRuleTests(object):
|
||||
"""Tests that are common across all system users."""
|
||||
|
||||
def test_user_can_list_access_rules_for_other_users(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': uuid.uuid4().hex,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
|
||||
with self.test_client() as c:
|
||||
r = c.get('/v3/users/%s/access_rules' % user['id'],
|
||||
headers=self.headers)
|
||||
self.assertEqual(1, len(r.json['access_rules']))
|
||||
|
||||
def test_user_cannot_get_non_existent_access_rule_not_found(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
with self.test_client() as c:
|
||||
c.get(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
user['id'], uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.NOT_FOUND
|
||||
)
|
||||
|
||||
|
||||
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_SystemUserAccessRuleTests):
|
||||
|
||||
def setUp(self):
|
||||
super(SystemReaderTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
system_reader = unit.new_user_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.user_id = PROVIDERS.identity_api.create_user(
|
||||
system_reader
|
||||
)['id']
|
||||
PROVIDERS.assignment_api.create_system_grant_for_user(
|
||||
self.user_id, self.bootstrapper.reader_role_id
|
||||
)
|
||||
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id, password=system_reader['password'],
|
||||
system=True
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
def test_user_cannot_delete_access_rules_for_others(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
|
||||
c.delete(
|
||||
path, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_non_existent_access_rule_forbidden(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
user['id'], uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
|
||||
class SystemMemberTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_SystemUserAccessRuleTests):
|
||||
|
||||
def setUp(self):
|
||||
super(SystemMemberTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
system_member = unit.new_user_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.user_id = PROVIDERS.identity_api.create_user(
|
||||
system_member
|
||||
)['id']
|
||||
PROVIDERS.assignment_api.create_system_grant_for_user(
|
||||
self.user_id, self.bootstrapper.member_role_id
|
||||
)
|
||||
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id, password=system_member['password'],
|
||||
system=True
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
def test_user_cannot_delete_access_rules_for_others(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
|
||||
c.delete(
|
||||
path, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
|
||||
c.delete(
|
||||
path, headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
def test_user_cannot_delete_non_existent_access_rule_forbidden(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
user['id'], uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.FORBIDDEN
|
||||
)
|
||||
|
||||
|
||||
class SystemAdminTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_SystemUserAccessRuleTests):
|
||||
|
||||
def setUp(self):
|
||||
super(SystemAdminTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
# Reuse the system administrator account created during
|
||||
# ``keystone-manage bootstrap``
|
||||
self.user_id = self.bootstrapper.admin_user_id
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.bootstrapper.admin_password,
|
||||
system=True
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
def test_user_can_delete_access_rules_for_others(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
project = PROVIDERS.resource_api.create_project(project['id'], project)
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=user['id'],
|
||||
project_id=project['id']
|
||||
)
|
||||
access_rule_id = uuid.uuid4().hex
|
||||
app_cred = {
|
||||
'id': uuid.uuid4().hex,
|
||||
'name': uuid.uuid4().hex,
|
||||
'user_id': user['id'],
|
||||
'project_id': project['id'],
|
||||
'secret': uuid.uuid4().hex,
|
||||
'access_rules': [{
|
||||
'id': access_rule_id,
|
||||
'service': uuid.uuid4().hex,
|
||||
'path': uuid.uuid4().hex,
|
||||
'method': uuid.uuid4().hex[16:]
|
||||
}]
|
||||
}
|
||||
PROVIDERS.application_credential_api.create_application_credential(app_cred)
|
||||
PROVIDERS.application_credential_api.delete_application_credential(app_cred['id'])
|
||||
|
||||
with self.test_client() as c:
|
||||
path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id)
|
||||
c.delete(path, headers=self.headers)
|
||||
|
||||
def test_user_cannot_delete_non_existent_access_rule_not_found(self):
|
||||
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
|
||||
user = PROVIDERS.identity_api.create_user(user)
|
||||
with self.test_client() as c:
|
||||
c.delete(
|
||||
'/v3/users/%s/access_rules/%s' % (
|
||||
user['id'], uuid.uuid4().hex),
|
||||
headers=self.headers,
|
||||
expected_status_code=http_client.NOT_FOUND
|
||||
)
|
||||
|
||||
|
||||
class ProjectReaderTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_UserAccessRuleTests,
|
||||
_ProjectUsersTests):
|
||||
|
||||
def setUp(self):
|
||||
super(ProjectReaderTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
project_reader = unit.new_user_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.user_id = PROVIDERS.identity_api.create_user(
|
||||
project_reader
|
||||
)['id']
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.project_id = PROVIDERS.resource_api.create_project(
|
||||
project['id'], project
|
||||
)['id']
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.reader_role_id, user_id=self.user_id,
|
||||
project_id=self.project_id
|
||||
)
|
||||
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=project_reader['password'],
|
||||
project_id=self.project_id
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
|
||||
class ProjectMemberTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_UserAccessRuleTests,
|
||||
_ProjectUsersTests):
|
||||
|
||||
def setUp(self):
|
||||
super(ProjectMemberTests, self).setUp()
|
||||
self.loadapp()
|
||||
self.useFixture(ksfixtures.Policy(self.config_fixture))
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
project_member = unit.new_user_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.user_id = PROVIDERS.identity_api.create_user(
|
||||
project_member
|
||||
)['id']
|
||||
project = unit.new_project_ref(
|
||||
domain_id=CONF.identity.default_domain_id
|
||||
)
|
||||
self.project_id = PROVIDERS.resource_api.create_project(
|
||||
project['id'], project
|
||||
)['id']
|
||||
PROVIDERS.assignment_api.create_grant(
|
||||
self.bootstrapper.member_role_id, user_id=self.user_id,
|
||||
project_id=self.project_id
|
||||
)
|
||||
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=project_member['password'],
|
||||
project_id=self.project_id
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
|
||||
|
||||
class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
|
||||
common_auth.AuthTestMixin,
|
||||
_UserAccessRuleTests,
|
||||
_ProjectUsersTests):
|
||||
|
||||
def setUp(self):
|
||||
super(ProjectAdminTests, self).setUp()
|
||||
self.loadapp()
|
||||
|
||||
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
|
||||
self.policy_file_name = self.policy_file.file_name
|
||||
self.useFixture(
|
||||
ksfixtures.Policy(
|
||||
self.config_fixture, policy_file=self.policy_file_name
|
||||
)
|
||||
)
|
||||
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
|
||||
|
||||
# Reuse the system administrator account created during
|
||||
# ``keystone-manage bootstrap``
|
||||
self.user_id = self.bootstrapper.admin_user_id
|
||||
self.project_id = self.bootstrapper.project_id
|
||||
auth = self.build_authentication_request(
|
||||
user_id=self.user_id,
|
||||
password=self.bootstrapper.admin_password,
|
||||
project_id=self.project_id
|
||||
)
|
||||
|
||||
# Grab a token using the persona we're testing and prepare headers
|
||||
# for requests we'll be making in the tests.
|
||||
with self.test_client() as c:
|
||||
r = c.post('/v3/auth/tokens', json=auth)
|
||||
self.token_id = r.headers['X-Subject-Token']
|
||||
self.headers = {'X-Auth-Token': self.token_id}
|
||||
@@ -220,6 +220,7 @@ class PolicyJsonTestCase(unit.TestCase):
|
||||
'identity:create_system_grant_for_user',
|
||||
'identity:create_trust',
|
||||
'identity:create_user',
|
||||
'identity:delete_access_rule',
|
||||
'identity:delete_access_token',
|
||||
'identity:delete_application_credential',
|
||||
'identity:delete_consumer',
|
||||
@@ -249,6 +250,7 @@ class PolicyJsonTestCase(unit.TestCase):
|
||||
'identity:delete_user',
|
||||
'identity:ec2_delete_credential',
|
||||
'identity:ec2_get_credential',
|
||||
'identity:get_access_rule',
|
||||
'identity:get_access_token',
|
||||
'identity:get_access_token_role',
|
||||
'identity:get_application_credential',
|
||||
@@ -281,6 +283,7 @@ class PolicyJsonTestCase(unit.TestCase):
|
||||
'identity:get_service_provider',
|
||||
'identity:get_trust',
|
||||
'identity:get_user',
|
||||
'identity:list_access_rules',
|
||||
'identity:list_access_token_roles',
|
||||
'identity:list_access_tokens',
|
||||
'identity:list_application_credentials',
|
||||
|
||||
@@ -207,10 +207,33 @@ class ApplicationCredentialTestCase(test_v3.RestfulTestCase):
|
||||
headers={'X-Auth-Token': token},
|
||||
json=app_cred_body,
|
||||
expected_status_code=http_client.CREATED)
|
||||
resp_access_rules = resp.json['application_credential']['access_rules']
|
||||
self.assertIn('id', resp_access_rules[0])
|
||||
resp_access_rules[0].pop('id')
|
||||
self.assertEqual(access_rules[0], resp_access_rules[0])
|
||||
app_cred_id = resp.json['application_credential']['id']
|
||||
resp_access_rules = resp.json['application_credential']['access_rules']
|
||||
access_rule_id = resp_access_rules[0].pop('id')
|
||||
self.assertEqual(access_rules[0], resp_access_rules[0])
|
||||
resp = c.get('/v3/users/%s/access_rules' % self.user_id,
|
||||
headers={'X-Auth-Token': token})
|
||||
resp_access_rule = resp.json['access_rules'][0]
|
||||
resp_access_rule.pop('id')
|
||||
resp_access_rule.pop('links')
|
||||
self.assertEqual(access_rules[0], resp_access_rule)
|
||||
resp = c.get('/v3/users/%s/access_rules/%s' % (
|
||||
self.user_id, access_rule_id), headers={'X-Auth-Token': token})
|
||||
resp_access_rule = resp.json['access_rule']
|
||||
resp_access_rule.pop('id')
|
||||
resp_access_rule.pop('links')
|
||||
self.assertEqual(access_rules[0], resp_access_rule)
|
||||
# can't delete an access rule in use
|
||||
c.delete('/v3/users/%s/access_rules/%s' % (
|
||||
self.user_id, access_rule_id),
|
||||
headers={'X-Auth-Token': token},
|
||||
expected_status_code=http_client.FORBIDDEN)
|
||||
c.delete('/v3/users/%s/application_credentials/%s' % (
|
||||
self.user_id, app_cred_id),
|
||||
headers={'X-Auth-Token': token})
|
||||
c.delete('/v3/users/%s/access_rules/%s' % (
|
||||
self.user_id, access_rule_id),
|
||||
headers={'X-Auth-Token': token})
|
||||
|
||||
def test_create_application_credential_with_duplicate_access_rule(self):
|
||||
roles = [{'id': self.role_id}]
|
||||
|
||||
@@ -145,6 +145,10 @@ APPLICATION_CREDENTIALS = '/users/{user_id}/application_credentials'
|
||||
APPLICATION_CREDENTIAL_RELATION = (
|
||||
json_home.build_v3_parameter_relation('application_credential_id'))
|
||||
|
||||
ACCESS_RULE = '/users/{user_id}/access_rules/{access_rule_id}'
|
||||
ACCESS_RULES = '/users/{user_id}/access_rules'
|
||||
ACCESS_RULE_RELATION = json_home.build_v3_parameter_relation('access_rule_id')
|
||||
|
||||
V3_JSON_HOME_RESOURCES = {
|
||||
json_home.build_v3_resource_relation('auth_tokens'): {
|
||||
'href': '/auth/tokens'},
|
||||
@@ -641,7 +645,16 @@ V3_JSON_HOME_RESOURCES = {
|
||||
'href-template': APPLICATION_CREDENTIAL,
|
||||
'href-vars': {
|
||||
'application_credential_id': APPLICATION_CREDENTIAL_RELATION,
|
||||
'user_id': json_home.build_v3_parameter_relation('user_id')}}
|
||||
'user_id': json_home.build_v3_parameter_relation('user_id')}},
|
||||
json_home.build_v3_resource_relation('access_rules'): {
|
||||
'href-template': ACCESS_RULES,
|
||||
'href-vars': {
|
||||
'user_id': json_home.build_v3_parameter_relation('user_id')}},
|
||||
json_home.build_v3_resource_relation('access_rule'): {
|
||||
'href-template': ACCESS_RULE,
|
||||
'href-vars': {
|
||||
'access_rule_id': ACCESS_RULE_RELATION,
|
||||
'user_id': json_home.build_v3_parameter_relation('user_id')}},
|
||||
}
|
||||
|
||||
|
||||
|
||||
Reference in New Issue
Block a user