From 67682dcd0793ee5efb4afb8ab5737eb468ef8577 Mon Sep 17 00:00:00 2001 From: Colleen Murphy Date: Fri, 28 Jun 2019 12:43:08 -0700 Subject: [PATCH] Expose access rules as its own API This change creates a /v3/users/{user_id}/access_rules endpoint to allow users to view and delete their own access rules. Access rules are not automatically deleted when an application credential is deleted, so they can be re-used for other application credentials or explicitly deleted by the user. Access rules are automatically deleted when the user is deleted, the same way that application credentials are. Access rules that are in use by an application credential may not be deleted. bp whitelist-extension-for-app-creds Change-Id: I37d243d802cd538189ccfffee6ebf0624b7785d3 --- doc/source/getting-started/policy_mapping.rst | 5 + keystone/api/users.py | 69 ++ .../application_credential/backends/base.py | 37 ++ .../application_credential/backends/sql.py | 49 +- keystone/application_credential/core.py | 51 ++ keystone/common/json_home.py | 2 + keystone/common/policies/__init__.py | 4 +- keystone/common/policies/access_rule.py | 62 ++ keystone/exception.py | 4 + .../application_credential/test_backends.py | 40 ++ .../unit/protection/v3/test_access_rules.py | 626 ++++++++++++++++++ keystone/tests/unit/test_policy.py | 3 + .../unit/test_v3_application_credential.py | 31 +- keystone/tests/unit/test_versions.py | 15 +- 14 files changed, 987 insertions(+), 11 deletions(-) create mode 100644 keystone/common/policies/access_rule.py create mode 100644 keystone/tests/unit/protection/v3/test_access_rules.py diff --git a/doc/source/getting-started/policy_mapping.rst b/doc/source/getting-started/policy_mapping.rst index 771a7a746c..2975b45b3f 100644 --- a/doc/source/getting-started/policy_mapping.rst +++ b/doc/source/getting-started/policy_mapping.rst @@ -241,6 +241,11 @@ identity:get_application_credential GET /v3/users/{user_i identity:list_application_credentials GET /v3/users/{user_id}/application_credentials identity:create_application_credential POST /v3/users/{user_id}/application_credential identity:delete_application_credential DELETE /v3/users/{user_id}/application_credential/{application_credential_id} + +identity:get_access_rule GET /v3/users/{user_id}/access_rules/{access_rule_id} +identity:list_access_rules GET /v3/users/{user_id}/access_rules +identity:delete_access_rule DELETE /v3/users/{user_id}/access_rules/{access_rule_id} + ========================================================= === .. _grant_resources: diff --git a/keystone/api/users.py b/keystone/api/users.py index 3ddcc3291b..7df6410b81 100644 --- a/keystone/api/users.py +++ b/keystone/api/users.py @@ -663,6 +663,57 @@ class UserAppCredGetDeleteResource(ks_flask.ResourceBase): return None, http_client.NO_CONTENT +class UserAccessRuleListResource(ks_flask.ResourceBase): + collection_key = 'access_rules' + member_key = 'access_rule' + + def get(self, user_id): + """List access rules for user. + + GET/HEAD /v3/users/{user_id}/access_rules + """ + filters = ('service', 'path', 'method',) + ENFORCER.enforce_call(action='identity:list_access_rules', + filters=filters, + build_target=_build_user_target_enforcement) + app_cred_api = PROVIDERS.application_credential_api + hints = self.build_driver_hints(filters) + refs = app_cred_api.list_access_rules_for_user(user_id, hints=hints) + hints = self.build_driver_hints(filters) + return self.wrap_collection(refs, hints=hints) + + +class UserAccessRuleGetDeleteResource(ks_flask.ResourceBase): + collection_key = 'access_rules' + member_key = 'access_rule' + + def get(self, user_id, access_rule_id): + """Get access rule resource. + + GET/HEAD /v3/users/{user_id}/access_rules/{access_rule_id} + """ + ENFORCER.enforce_call( + action='identity:get_access_rule', + build_target=_build_user_target_enforcement + ) + ref = PROVIDERS.application_credential_api.get_access_rule( + access_rule_id) + return self.wrap_member(ref) + + def delete(self, user_id, access_rule_id): + """Delete access rule resource. + + DELETE /v3/users/{user_id}/access_rules/{access_rule_id} + """ + ENFORCER.enforce_call( + action='identity:delete_access_rule', + build_target=_build_user_target_enforcement + ) + PROVIDERS.application_credential_api.delete_access_rule( + access_rule_id, initiator=self.audit_initiator) + return None, http_client.NO_CONTENT + + class UserAPI(ks_flask.APIBase): _name = 'users' _import_name = __name__ @@ -772,6 +823,24 @@ class UserAPI(ks_flask.APIBase): 'user_id': json_home.Parameters.USER_ID, 'application_credential_id': json_home.Parameters.APPLICATION_CRED_ID} + ), + ks_flask.construct_resource_map( + resource=UserAccessRuleListResource, + url='/users//access_rules', + resource_kwargs={}, + rel='access_rules', + path_vars={'user_id': json_home.Parameters.USER_ID} + ), + ks_flask.construct_resource_map( + resource=UserAccessRuleGetDeleteResource, + url=('/users//access_rules/' + ''), + resource_kwargs={}, + rel='access_rule', + path_vars={ + 'user_id': json_home.Parameters.USER_ID, + 'access_rule_id': + json_home.Parameters.ACCESS_RULE_ID} ) ] diff --git a/keystone/application_credential/backends/base.py b/keystone/application_credential/backends/base.py index 745509d2fb..0a28c5062e 100644 --- a/keystone/application_credential/backends/base.py +++ b/keystone/application_credential/backends/base.py @@ -95,3 +95,40 @@ class ApplicationCredentialDriverBase(object): """ raise exception.NotImplemented() # pragma: no cover + + @abc.abstractmethod + def get_access_rule(self, access_rule_id): + """Get an access rule by its ID. + + :param str access_rule_id: Access Rule ID + """ + raise exception.NotImplemented() # pragma: no cover + + @abc.abstractmethod + def list_access_rules_for_user(self, user_id): + """List the access rules that a user has created. + + Access rules are only created as attributes of application credentials, + they cannot be created independently. + + :param str user_id: User ID + """ + raise exception.NotImplemented() # pragma: no cover + + @abc.abstractmethod + def delete_access_rule(self, access_rule_id): + """Delete one access rule. + + :param str access_rule_id: Access Rule ID + """ + raise exception.NotImplemented() # pragma: no cover + + @abc.abstractmethod + def delete_access_rules_for_user(self, user_id): + """Delete all access rules for user. + + This is called when the user itself is deleted. + + :param str user_id: User ID + """ + raise exception.NotImplemented() # pragma: no cover diff --git a/keystone/application_credential/backends/sql.py b/keystone/application_credential/backends/sql.py index 488ce9e95b..a7b4e6bea0 100644 --- a/keystone/application_credential/backends/sql.py +++ b/keystone/application_credential/backends/sql.py @@ -78,8 +78,7 @@ class AccessRuleModel(sql.ModelBase, sql.ModelDictMixin): ) application_credential = sqlalchemy.orm.relationship( 'ApplicationCredentialAccessRuleModel', - backref=sqlalchemy.orm.backref('access_rule'), - cascade='all, delete-orphan') + backref=sqlalchemy.orm.backref('access_rule')) class ApplicationCredentialAccessRuleModel(sql.ModelBase, sql.ModelDictMixin): @@ -168,15 +167,21 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase): app_cred['roles'] = roles if ref.access_rules: access_rules = [ - {k.replace('external_id', 'id'): v - for k, v in c.access_rule.to_dict().items() - if k != 'user_id' and k != 'id'} + self._access_rule_to_dict(c.access_rule) for c in ref.access_rules ] app_cred['access_rules'] = access_rules app_cred.pop('internal_id') return app_cred + def _access_rule_to_dict(self, ref): + access_rule = ref.to_dict() + return { + k.replace('external_id', 'id'): v + for k, v in access_rule.items() + if k != 'user_id' and k != 'id' + } + def get_application_credential(self, application_credential_id): with sql.session_for_read() as session: query = session.query(ApplicationCredentialModel).filter_by( @@ -220,3 +225,37 @@ class ApplicationCredential(base.ApplicationCredentialDriverBase): query = query.filter_by(user_id=user_id) query = query.filter_by(project_id=project_id) query.delete() + + def get_access_rule(self, access_rule_id): + with sql.session_for_read() as session: + query = session.query(AccessRuleModel).filter_by( + external_id=access_rule_id) + ref = query.first() + if not ref: + raise exception.AccessRuleNotFound( + access_rule_id=access_rule_id) + access_rule = self._access_rule_to_dict(ref) + return access_rule + + def list_access_rules_for_user(self, user_id, hints): + with sql.session_for_read() as session: + query = session.query(AccessRuleModel).filter_by(user_id=user_id) + refs = sql.filter_limit_query(AccessRuleModel, query, hints) + return [self._access_rule_to_dict(ref) for ref in refs] + + def delete_access_rule(self, access_rule_id): + try: + with sql.session_for_write() as session: + query = session.query(AccessRuleModel) + ref = query.filter_by(external_id=access_rule_id).first() + if not ref: + raise exception.AccessRuleNotFound( + access_rule_id=access_rule_id) + session.delete(ref) + except AssertionError: + raise exception.ForbiddenNotSecurity("May not delete access rule in use") + + def delete_access_rules_for_user(self, user_id): + with sql.session_for_write() as session: + query = session.query(AccessRuleModel).filter_by(user_id=user_id) + query.delete() diff --git a/keystone/application_credential/core.py b/keystone/application_credential/core.py index 363c4af6a8..00abb873e0 100644 --- a/keystone/application_credential/core.py +++ b/keystone/application_credential/core.py @@ -40,6 +40,7 @@ class Manager(manager.Manager): _provides_api = 'application_credential_api' _APP_CRED = 'application_credential' + _ACCESS_RULE = 'access_rule' def __init__(self): super(Manager, self).__init__(CONF.application_credential.driver) @@ -61,6 +62,7 @@ class Manager(manager.Manager): self, service, resource_type, operation, payload): user_id = payload['resource_info'] self._delete_application_credentials_for_user(user_id) + self._delete_access_rules_for_user(user_id) def _delete_app_creds_on_assignment_removal( self, service, resource_type, operation, payload): @@ -167,6 +169,26 @@ class Manager(manager.Manager): user_id, hints) return [self._process_app_cred(app_cred) for app_cred in app_cred_list] + @MEMOIZE + def get_access_rule(self, access_rule_id): + """Get access rule details. + + :param str access_rule_id: Access Rule ID + + :returns: an access rule + """ + return self.driver.get_access_rule(access_rule_id) + + def list_access_rules_for_user(self, user_id, hints=None): + """List access rules for user. + + :param str user_id: User ID + + :returns: a list of access rules + """ + hints = hints or driver_hints.Hints() + return self.driver.list_access_rules_for_user(user_id, hints) + def delete_application_credential(self, application_credential_id, initiator=None): """Delete an application credential. @@ -214,3 +236,32 @@ class Manager(manager.Manager): user_id, project_id) for app_cred in app_creds: self.get_application_credential.invalidate(self, app_cred['id']) + + def delete_access_rule(self, access_rule_id, initiator=None): + """Delete an access rule. + + :param str: access_rule_id: Access Rule ID + :param initiator: CADF initiator + + :raises keystone.exception.AccessRuleNotFound: If the access rule + doesn't exist. + """ + self.driver.delete_access_rule(access_rule_id) + self.get_access_rule.invalidate(self, access_rule_id) + notifications.Audit.deleted( + self._ACCESS_RULE, access_rule_id, initiator) + + def _delete_access_rules_for_user(self, user_id, initiator=None): + """Delete all access rules for a user. + + :param str user_id: User ID + + This is triggered when a user is deleted. + """ + access_rules = self.driver.list_access_rules_for_user( + user_id, driver_hints.Hints()) + self.driver.delete_access_rules_for_user(user_id) + for rule in access_rules: + self.get_access_rule.invalidate(self, rule['id']) + notifications.Audit.deleted(self._ACCESS_RULE, rule['id'], + initiator) diff --git a/keystone/common/json_home.py b/keystone/common/json_home.py index 111c516bca..ef4f61f914 100644 --- a/keystone/common/json_home.py +++ b/keystone/common/json_home.py @@ -59,6 +59,8 @@ class Parameters(object): LIMIT_ID = build_v3_parameter_relation('limit_id') APPLICATION_CRED_ID = build_v3_parameter_relation( 'application_credential_id') + ACCESS_RULE_ID = build_v3_parameter_relation( + 'access_rule_id') class Status(object): diff --git a/keystone/common/policies/__init__.py b/keystone/common/policies/__init__.py index e24b5f41b7..02608c185a 100644 --- a/keystone/common/policies/__init__.py +++ b/keystone/common/policies/__init__.py @@ -12,6 +12,7 @@ import itertools +from keystone.common.policies import access_rule from keystone.common.policies import access_token from keystone.common.policies import application_credential from keystone.common.policies import auth @@ -50,8 +51,9 @@ from keystone.common.policies import user def list_rules(): return itertools.chain( base.list_rules(), - application_credential.list_rules(), + access_rule.list_rules(), access_token.list_rules(), + application_credential.list_rules(), auth.list_rules(), consumer.list_rules(), credential.list_rules(), diff --git a/keystone/common/policies/access_rule.py b/keystone/common/policies/access_rule.py new file mode 100644 index 0000000000..bab6c7ddc4 --- /dev/null +++ b/keystone/common/policies/access_rule.py @@ -0,0 +1,62 @@ +# Copyright 2019 SUSE LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_policy import policy + +from keystone.common.policies import base + +collection_path = '/v3/users/{user_id}/access_rules' +resource_path = collection_path + '/{access_rule_id}' + +SYSTEM_READER_OR_OWNER = ( + '(' + base.SYSTEM_READER + ') or ' + 'user_id:%(target.user.id)s' +) + +SYSTEM_ADMIN_OR_OWNER = ( + '(' + base.SYSTEM_ADMIN + ') or ' + 'user_id:%(target.user.id)s' +) + +access_rule_policies = [ + policy.DocumentedRuleDefault( + name=base.IDENTITY % 'get_access_rule', + check_str=SYSTEM_READER_OR_OWNER, + scope_types=['system', 'project'], + description='Show access rule details.', + operations=[{'path': resource_path, + 'method': 'GET'}, + {'path': resource_path, + 'method': 'HEAD'}]), + policy.DocumentedRuleDefault( + name=base.IDENTITY % 'list_access_rules', + check_str=SYSTEM_READER_OR_OWNER, + scope_types=['system', 'project'], + description='List access rules for a user.', + operations=[{'path': collection_path, + 'method': 'GET'}, + {'path': collection_path, + 'method': 'HEAD'}]), + policy.DocumentedRuleDefault( + name=base.IDENTITY % 'delete_access_rule', + check_str=SYSTEM_ADMIN_OR_OWNER, + scope_types=['system', 'project'], + description='Delete an access_rule.', + operations=[{'path': resource_path, + 'method': 'DELETE'}]) +] + + +def list_rules(): + return access_rule_policies diff --git a/keystone/exception.py b/keystone/exception.py index a4d3f80ed6..4fe00d90ba 100644 --- a/keystone/exception.py +++ b/keystone/exception.py @@ -549,6 +549,10 @@ class ApplicationCredentialNotFound(NotFound): "%(application_credential_id)s.") +class AccessRuleNotFound(NotFound): + message_format = _("Could not find Access Rule: %(access_rule_id)s.") + + class Conflict(Error): message_format = _("Conflict occurred attempting to store %(type)s -" " %(details)s.") diff --git a/keystone/tests/unit/application_credential/test_backends.py b/keystone/tests/unit/application_credential/test_backends.py index b590f9a9c0..8baef27fdc 100644 --- a/keystone/tests/unit/application_credential/test_backends.py +++ b/keystone/tests/unit/application_credential/test_backends.py @@ -322,3 +322,43 @@ class ApplicationCredentialTests(object): self.app_cred_api.authenticate, resp['id'], badpass) + + def test_get_delete_access_rules(self): + app_cred = self._new_app_cred_data(self.user_foo['id'], + project_id=self.project_bar['id']) + access_rule_id = uuid.uuid4().hex + app_cred['access_rules'] = [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + self.app_cred_api.create_application_credential(app_cred) + self.assertDictEqual(app_cred['access_rules'][0], + self.app_cred_api.get_access_rule(access_rule_id)) + self.app_cred_api.delete_application_credential(app_cred['id']) + self.app_cred_api.delete_access_rule(access_rule_id) + self.assertRaises(exception.AccessRuleNotFound, + self.app_cred_api.get_access_rule, + access_rule_id) + + def test_list_delete_access_rule_for_user(self): + app_cred = self._new_app_cred_data(self.user_foo['id'], + project_id=self.project_bar['id']) + access_rule_id = uuid.uuid4().hex + app_cred['access_rules'] = [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + self.app_cred_api.create_application_credential(app_cred) + self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user( + self.user_foo['id']))) + self.app_cred_api.delete_application_credential(app_cred['id']) + # access rule should still exist + self.assertEqual(1, len(self.app_cred_api.list_access_rules_for_user( + self.user_foo['id']))) + self.app_cred_api.delete_access_rules_for_user(self.user_foo['id']) + self.assertEqual(0, len(self.app_cred_api.list_access_rules_for_user( + self.user_foo['id']))) diff --git a/keystone/tests/unit/protection/v3/test_access_rules.py b/keystone/tests/unit/protection/v3/test_access_rules.py new file mode 100644 index 0000000000..be0706a130 --- /dev/null +++ b/keystone/tests/unit/protection/v3/test_access_rules.py @@ -0,0 +1,626 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import uuid + +from six.moves import http_client + +from keystone.common import provider_api +import keystone.conf +from keystone.tests.common import auth as common_auth +from keystone.tests import unit +from keystone.tests.unit import base_classes +from keystone.tests.unit import ksfixtures +from keystone.tests.unit.ksfixtures import temporaryfile + +CONF = keystone.conf.CONF +PROVIDERS = provider_api.ProviderAPIs + + +class _UserAccessRuleTests(object): + """Test cases for anyone that has a valid user token.""" + + def test_user_can_get_their_access_rules(self): + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (self.user_id, app_cred['access_rules'][0]['id']) + c.get(path, headers=self.headers) + + def test_user_can_list_their_access_rules(self): + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': uuid.uuid4().hex, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + with self.test_client() as c: + r = c.get('/v3/users/%s/access_rules' % self.user_id, headers=self.headers) + self.assertEqual(len(r.json['access_rules']), 1) + + def test_user_can_delete_their_access_rules(self): + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': self.user_id, + 'project_id': self.project_id, + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + PROVIDERS.application_credential_api.delete_application_credential(app_cred['id']) + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (self.user_id, access_rule_id) + c.delete(path, headers=self.headers) + + +class _ProjectUsersTests(object): + """Users who have project role authorization observe the same behavior.""" + + def test_user_cannot_get_access_rules_for_other_users(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id) + c.get( + path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_get_own_non_existent_access_rule_not_found(self): + with self.test_client() as c: + c.get( + '/v3/users/%s/access_rules/%s' % ( + self.user_id, uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.NOT_FOUND + ) + + def test_user_cannot_get_non_existent_access_rule_other_user_forbidden(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + with self.test_client() as c: + c.get( + '/v3/users/%s/access_rules/%s' % ( + user['id'], uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_list_access_rules_for_other_users(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': uuid.uuid4().hex, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + + with self.test_client() as c: + path = '/v3/users/%s/access_rules' % user['id'] + c.get(path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_delete_access_rules_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + PROVIDERS.application_credential_api.delete_application_credential(app_cred['id']) + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id) + c.delete( + path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_non_existent_access_rule_other_user_forbidden(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + with self.test_client() as c: + c.delete( + '/v3/users/%s/access_rules/%s' % ( + user['id'], uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + +class _SystemUserAccessRuleTests(object): + """Tests that are common across all system users.""" + + def test_user_can_list_access_rules_for_other_users(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': uuid.uuid4().hex, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + + with self.test_client() as c: + r = c.get('/v3/users/%s/access_rules' % user['id'], + headers=self.headers) + self.assertEqual(1, len(r.json['access_rules'])) + + def test_user_cannot_get_non_existent_access_rule_not_found(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + with self.test_client() as c: + c.get( + '/v3/users/%s/access_rules/%s' % ( + user['id'], uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.NOT_FOUND + ) + + +class SystemReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserAccessRuleTests): + + def setUp(self): + super(SystemReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + system_reader = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + system_reader + )['id'] + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.bootstrapper.reader_role_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, password=system_reader['password'], + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def test_user_cannot_delete_access_rules_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + PROVIDERS.application_credential_api.delete_application_credential(app_cred['id']) + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id) + c.delete( + path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_non_existent_access_rule_forbidden(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + with self.test_client() as c: + c.delete( + '/v3/users/%s/access_rules/%s' % ( + user['id'], uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + +class SystemMemberTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserAccessRuleTests): + + def setUp(self): + super(SystemMemberTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + system_member = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + system_member + )['id'] + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.bootstrapper.member_role_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, password=system_member['password'], + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def test_user_cannot_delete_access_rules_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + PROVIDERS.application_credential_api.delete_application_credential(app_cred['id']) + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id) + c.delete( + path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id) + c.delete( + path, headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + def test_user_cannot_delete_non_existent_access_rule_forbidden(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + with self.test_client() as c: + c.delete( + '/v3/users/%s/access_rules/%s' % ( + user['id'], uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.FORBIDDEN + ) + + +class SystemAdminTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserAccessRuleTests): + + def setUp(self): + super(SystemAdminTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + # Reuse the system administrator account created during + # ``keystone-manage bootstrap`` + self.user_id = self.bootstrapper.admin_user_id + auth = self.build_authentication_request( + user_id=self.user_id, + password=self.bootstrapper.admin_password, + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def test_user_can_delete_access_rules_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=user['id'], + project_id=project['id'] + ) + access_rule_id = uuid.uuid4().hex + app_cred = { + 'id': uuid.uuid4().hex, + 'name': uuid.uuid4().hex, + 'user_id': user['id'], + 'project_id': project['id'], + 'secret': uuid.uuid4().hex, + 'access_rules': [{ + 'id': access_rule_id, + 'service': uuid.uuid4().hex, + 'path': uuid.uuid4().hex, + 'method': uuid.uuid4().hex[16:] + }] + } + PROVIDERS.application_credential_api.create_application_credential(app_cred) + PROVIDERS.application_credential_api.delete_application_credential(app_cred['id']) + + with self.test_client() as c: + path = '/v3/users/%s/access_rules/%s' % (user['id'], access_rule_id) + c.delete(path, headers=self.headers) + + def test_user_cannot_delete_non_existent_access_rule_not_found(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user = PROVIDERS.identity_api.create_user(user) + with self.test_client() as c: + c.delete( + '/v3/users/%s/access_rules/%s' % ( + user['id'], uuid.uuid4().hex), + headers=self.headers, + expected_status_code=http_client.NOT_FOUND + ) + + +class ProjectReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _UserAccessRuleTests, + _ProjectUsersTests): + + def setUp(self): + super(ProjectReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + project_reader = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + project_reader + )['id'] + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + self.project_id = PROVIDERS.resource_api.create_project( + project['id'], project + )['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + project_id=self.project_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, + password=project_reader['password'], + project_id=self.project_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + +class ProjectMemberTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _UserAccessRuleTests, + _ProjectUsersTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + project_member = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + project_member + )['id'] + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + self.project_id = PROVIDERS.resource_api.create_project( + project['id'], project + )['id'] + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.member_role_id, user_id=self.user_id, + project_id=self.project_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, + password=project_member['password'], + project_id=self.project_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + +class ProjectAdminTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _UserAccessRuleTests, + _ProjectUsersTests): + + def setUp(self): + super(ProjectAdminTests, self).setUp() + self.loadapp() + + self.policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.policy_file_name = self.policy_file.file_name + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=self.policy_file_name + ) + ) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + # Reuse the system administrator account created during + # ``keystone-manage bootstrap`` + self.user_id = self.bootstrapper.admin_user_id + self.project_id = self.bootstrapper.project_id + auth = self.build_authentication_request( + user_id=self.user_id, + password=self.bootstrapper.admin_password, + project_id=self.project_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} diff --git a/keystone/tests/unit/test_policy.py b/keystone/tests/unit/test_policy.py index 6255f1eeb1..67df10ecf3 100644 --- a/keystone/tests/unit/test_policy.py +++ b/keystone/tests/unit/test_policy.py @@ -220,6 +220,7 @@ class PolicyJsonTestCase(unit.TestCase): 'identity:create_system_grant_for_user', 'identity:create_trust', 'identity:create_user', + 'identity:delete_access_rule', 'identity:delete_access_token', 'identity:delete_application_credential', 'identity:delete_consumer', @@ -249,6 +250,7 @@ class PolicyJsonTestCase(unit.TestCase): 'identity:delete_user', 'identity:ec2_delete_credential', 'identity:ec2_get_credential', + 'identity:get_access_rule', 'identity:get_access_token', 'identity:get_access_token_role', 'identity:get_application_credential', @@ -281,6 +283,7 @@ class PolicyJsonTestCase(unit.TestCase): 'identity:get_service_provider', 'identity:get_trust', 'identity:get_user', + 'identity:list_access_rules', 'identity:list_access_token_roles', 'identity:list_access_tokens', 'identity:list_application_credentials', diff --git a/keystone/tests/unit/test_v3_application_credential.py b/keystone/tests/unit/test_v3_application_credential.py index cd87587850..55a81fb8d2 100644 --- a/keystone/tests/unit/test_v3_application_credential.py +++ b/keystone/tests/unit/test_v3_application_credential.py @@ -207,10 +207,33 @@ class ApplicationCredentialTestCase(test_v3.RestfulTestCase): headers={'X-Auth-Token': token}, json=app_cred_body, expected_status_code=http_client.CREATED) - resp_access_rules = resp.json['application_credential']['access_rules'] - self.assertIn('id', resp_access_rules[0]) - resp_access_rules[0].pop('id') - self.assertEqual(access_rules[0], resp_access_rules[0]) + app_cred_id = resp.json['application_credential']['id'] + resp_access_rules = resp.json['application_credential']['access_rules'] + access_rule_id = resp_access_rules[0].pop('id') + self.assertEqual(access_rules[0], resp_access_rules[0]) + resp = c.get('/v3/users/%s/access_rules' % self.user_id, + headers={'X-Auth-Token': token}) + resp_access_rule = resp.json['access_rules'][0] + resp_access_rule.pop('id') + resp_access_rule.pop('links') + self.assertEqual(access_rules[0], resp_access_rule) + resp = c.get('/v3/users/%s/access_rules/%s' % ( + self.user_id, access_rule_id), headers={'X-Auth-Token': token}) + resp_access_rule = resp.json['access_rule'] + resp_access_rule.pop('id') + resp_access_rule.pop('links') + self.assertEqual(access_rules[0], resp_access_rule) + # can't delete an access rule in use + c.delete('/v3/users/%s/access_rules/%s' % ( + self.user_id, access_rule_id), + headers={'X-Auth-Token': token}, + expected_status_code=http_client.FORBIDDEN) + c.delete('/v3/users/%s/application_credentials/%s' % ( + self.user_id, app_cred_id), + headers={'X-Auth-Token': token}) + c.delete('/v3/users/%s/access_rules/%s' % ( + self.user_id, access_rule_id), + headers={'X-Auth-Token': token}) def test_create_application_credential_with_duplicate_access_rule(self): roles = [{'id': self.role_id}] diff --git a/keystone/tests/unit/test_versions.py b/keystone/tests/unit/test_versions.py index b84efbdc92..32ce01b254 100644 --- a/keystone/tests/unit/test_versions.py +++ b/keystone/tests/unit/test_versions.py @@ -145,6 +145,10 @@ APPLICATION_CREDENTIALS = '/users/{user_id}/application_credentials' APPLICATION_CREDENTIAL_RELATION = ( json_home.build_v3_parameter_relation('application_credential_id')) +ACCESS_RULE = '/users/{user_id}/access_rules/{access_rule_id}' +ACCESS_RULES = '/users/{user_id}/access_rules' +ACCESS_RULE_RELATION = json_home.build_v3_parameter_relation('access_rule_id') + V3_JSON_HOME_RESOURCES = { json_home.build_v3_resource_relation('auth_tokens'): { 'href': '/auth/tokens'}, @@ -641,7 +645,16 @@ V3_JSON_HOME_RESOURCES = { 'href-template': APPLICATION_CREDENTIAL, 'href-vars': { 'application_credential_id': APPLICATION_CREDENTIAL_RELATION, - 'user_id': json_home.build_v3_parameter_relation('user_id')}} + 'user_id': json_home.build_v3_parameter_relation('user_id')}}, + json_home.build_v3_resource_relation('access_rules'): { + 'href-template': ACCESS_RULES, + 'href-vars': { + 'user_id': json_home.build_v3_parameter_relation('user_id')}}, + json_home.build_v3_resource_relation('access_rule'): { + 'href-template': ACCESS_RULE, + 'href-vars': { + 'access_rule_id': ACCESS_RULE_RELATION, + 'user_id': json_home.build_v3_parameter_relation('user_id')}}, }