From d009384c9b683008d572f9996e1fe4e5e5d82096 Mon Sep 17 00:00:00 2001 From: Vishakha Agarwal Date: Thu, 4 Oct 2018 12:39:32 +0530 Subject: [PATCH] Implement scope type checking for EC2 credentials This change updates the EC2 credentials policies to understand the scope types for EC2 credentials. A follow on patch will Remove obsolete credential policies. To maintain the compatibility with the old rule the equivalent ec2_list_credentials and ec2_get_credentials behaves inconsistently. Same for ec2_create_credentials and the ec2_delete_inconsistently. Change-Id: I090e2470726d22b2670a2cca89025063419f5262 Partial-Bug: #1750678 --- keystone/api/users.py | 5 +- keystone/common/policies/ec2_credential.py | 90 ++-- .../unit/protection/v3/test_ec2_credential.py | 437 ++++++++++++++++++ 3 files changed, 504 insertions(+), 28 deletions(-) create mode 100644 keystone/tests/unit/protection/v3/test_ec2_credential.py diff --git a/keystone/api/users.py b/keystone/api/users.py index 3ddcc3291b..7118590a14 100644 --- a/keystone/api/users.py +++ b/keystone/api/users.py @@ -361,7 +361,10 @@ class UserOSEC2CredentialsResourceListCreate(_UserOSEC2CredBaseResource): POST /v3/users/{user_id}/credentials/OS-EC2 """ - ENFORCER.enforce_call(action='identity:ec2_create_credential') + target = {} + target['credential'] = {'user_id': user_id} + ENFORCER.enforce_call(action='identity:ec2_create_credential', + target_attr=target) PROVIDERS.identity_api.get_user(user_id) tenant_id = self.request_body_json.get('tenant_id') PROVIDERS.resource_api.get_project(tenant_id) diff --git a/keystone/common/policies/ec2_credential.py b/keystone/common/policies/ec2_credential.py index d417ab1e2c..ea8603c7ed 100644 --- a/keystone/common/policies/ec2_credential.py +++ b/keystone/common/policies/ec2_credential.py @@ -10,56 +10,92 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo_log import versionutils from oslo_policy import policy from keystone.common.policies import base +SYSTEM_READER_OR_CRED_OWNER = ( + '(role:reader and system_scope:all) ' + 'or user_id:%(target.credential.user_id)s' +) +SYSTEM_ADMIN_OR_CRED_OWNER = ( + '(role:admin and system_scope:all) ' + 'or user_id:%(target.credential.user_id)s' +) + +deprecated_ec2_get_credential = policy.DeprecatedRule( + name=base.IDENTITY % 'ec2_get_credential', + check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER +) +deprecated_ec2_list_credentials = policy.DeprecatedRule( + name=base.IDENTITY % 'ec2_list_credentials', + check_str=base.RULE_ADMIN_OR_OWNER +) +deprecated_ec2_create_credentials = policy.DeprecatedRule( + name=base.IDENTITY % 'ec2_create_credentials', + check_str=base.RULE_ADMIN_OR_OWNER +) +deprecated_ec2_delete_credentials = policy.DeprecatedRule( + name=base.IDENTITY % 'ec2_delete_credentials', + check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER +) + +DEPRECATED_REASON = """ +As of the Train release, the EC2 credential API understands how to handle +system-scoped tokens in addition to project tokens, making the API more +accessible to users without compromising security or manageability for +administrators. The new default policies for this API account for these changes +automatically. +""" + ec2_credential_policies = [ policy.DocumentedRuleDefault( name=base.IDENTITY % 'ec2_get_credential', - check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER, - # FIXME(lbragstad): System administrator should be able to manage all - # ec2 credentials. Users with a system role assignment should be able - # to manage only ec2 credentials keystone can assert belongs to them. - # This is going to require keystone to have "scope" checks in code to - # ensure this is enforced properly. Until keystone has support for - # those cases in code, we're going to have to comment this out. This - # would be a good candidate for a user-scoped operation. If we provide - # scope_types in these policies without proper scope checks in code we - # could expose credentials to users who are not supposed to access - # them. - # scope_types=['system', 'project'], + check_str=SYSTEM_READER_OR_CRED_OWNER, + scope_types=['system', 'project'], description='Show ec2 credential details.', operations=[{'path': ('/v3/users/{user_id}/credentials/OS-EC2/' '{credential_id}'), - 'method': 'GET'}]), + 'method': 'GET'}], + deprecated_rule=deprecated_ec2_get_credential, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.TRAIN + ), policy.DocumentedRuleDefault( name=base.IDENTITY % 'ec2_list_credentials', - check_str=base.RULE_ADMIN_OR_OWNER, - # FIXME(lbragstad): See the above comment as to why scope_types is - # commented out. - # scope_types=['system', 'project'], + check_str=base.RULE_SYSTEM_READER_OR_OWNER, + scope_types=['system', 'project'], description='List ec2 credentials.', operations=[{'path': '/v3/users/{user_id}/credentials/OS-EC2', - 'method': 'GET'}]), + 'method': 'GET'}], + deprecated_rule=deprecated_ec2_list_credentials, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.TRAIN + ), policy.DocumentedRuleDefault( name=base.IDENTITY % 'ec2_create_credential', - check_str=base.RULE_ADMIN_OR_OWNER, - # FIXME(lbragstad): See the above comment as to why scope_types is - # commented out. + check_str=base.RULE_SYSTEM_ADMIN_OR_OWNER, + scope_types=['system', 'project'], description='Create ec2 credential.', operations=[{'path': '/v3/users/{user_id}/credentials/OS-EC2', - 'method': 'POST'}]), + 'method': 'POST'}], + deprecated_rule=deprecated_ec2_create_credentials, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.TRAIN + ), policy.DocumentedRuleDefault( name=base.IDENTITY % 'ec2_delete_credential', - check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER, - # FIXME(lbragstad): See the above comment as to why scope_types is - # commented out. - # scope_types=['system', 'project'], + check_str=SYSTEM_ADMIN_OR_CRED_OWNER, + scope_types=['system', 'project'], description='Delete ec2 credential.', operations=[{'path': ('/v3/users/{user_id}/credentials/OS-EC2/' '{credential_id}'), - 'method': 'DELETE'}]) + 'method': 'DELETE'}], + deprecated_rule=deprecated_ec2_delete_credentials, + deprecated_reason=DEPRECATED_REASON, + deprecated_since=versionutils.deprecated.TRAIN + ) ] diff --git a/keystone/tests/unit/protection/v3/test_ec2_credential.py b/keystone/tests/unit/protection/v3/test_ec2_credential.py new file mode 100644 index 0000000000..cb6bf135f2 --- /dev/null +++ b/keystone/tests/unit/protection/v3/test_ec2_credential.py @@ -0,0 +1,437 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from oslo_serialization import jsonutils +from six.moves import http_client + +from keystone.common.policies import base as bp +from keystone.common.policies import ec2_credential as ec +from keystone.common import provider_api +import keystone.conf +from keystone.tests.common import auth as common_auth +from keystone.tests import unit +from keystone.tests.unit import base_classes +from keystone.tests.unit import ksfixtures +from keystone.tests.unit.ksfixtures import temporaryfile + +CONF = keystone.conf.CONF +PROVIDERS = provider_api.ProviderAPIs + + +class _UserEC2CredentialTests(object): + + def test_user_can_get_their_ec2_credentials(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + project_id=project['id'] + ) + + with self.test_client() as c: + r = c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id, + json={'tenant_id': project['id']}, headers=self.headers) + + credential_id = r.json['credential']['access'] + + path = '/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id) + r = c.get(path, headers=self.headers) + self.assertEqual( + self.user_id, r.json['credential']['user_id'] + ) + + def test_user_can_list_their_ec2_credentials(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + project_id=project['id'] + ) + + with self.test_client() as c: + c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id, + json={'tenant_id': project['id']}, headers=self.headers) + + path = '/v3/users/%s/credentials/OS-EC2' % self.user_id + r = c.get(path, headers=self.headers) + for credential in r.json['credentials']: + self.assertEqual( + self.user_id, credential['user_id'] + ) + + def test_user_create_their_ec2_credentials(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + project_id=project['id'] + ) + + with self.test_client() as c: + c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id, + json={'tenant_id': project['id']}, headers=self.headers, + expected_status_code=http_client.CREATED) + + def test_user_delete_their_ec2_credentials(self): + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=self.user_id, + project_id=project['id'] + ) + + with self.test_client() as c: + r = c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id, + json={'tenant_id': project['id']}, headers=self.headers) + credential_id = r.json['credential']['access'] + + c.delete('/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id), + headers=self.headers) + + def test_user_cannot_create_ec2_credentials_for_others(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + ) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + + with self.test_client() as c: + c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=self.headers, + expected_status_code=http_client.FORBIDDEN) + + def test_user_cannot_delete_ec2_credentials_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_password = user['password'] + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + user_auth = self.build_authentication_request( + user_id=user['id'], password=user_password, + project_id=project['id'] + ) + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=user_auth) + token_id = r.headers['X-Subject-Token'] + headers = {'X-Auth-Token': token_id} + + r = c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=headers) + credential_id = r.json['credential']['access'] + + c.delete('/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id), + headers=self.headers, expected_status_code=http_client.FORBIDDEN) + + +class _SystemUserTests(object): + + def test_user_can_get_ec2_credentials_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_password = user['password'] + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + user_auth = self.build_authentication_request( + user_id=user['id'], password=user_password, + project_id=project['id'] + ) + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=user_auth) + token_id = r.headers['X-Subject-Token'] + headers = {'X-Auth-Token': token_id} + + r = c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=headers) + credential_id = r.json['credential']['access'] + + path = '/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id) + c.get(path, headers=self.headers, expected_status_code=http_client.OK) + + +class _SystemReaderAndMemberTests(object): + + def test_user_cannot_list_ec2_credentials_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_password = user['password'] + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + user_auth = self.build_authentication_request( + user_id=user['id'], password=user_password, + project_id=project['id'] + ) + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=user_auth) + token_id = r.headers['X-Subject-Token'] + headers = {'X-Auth-Token': token_id} + + c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=headers) + + path = '/v3/users/%s/credentials/OS-EC2' % self.user_id + r = c.get(path, headers=self.headers) + self.assertEqual([], r.json['credentials']) + + +class SystemReaderTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserTests, + _SystemReaderAndMemberTests): + + def setUp(self): + super(SystemReaderTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + system_reader = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + system_reader + )['id'] + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.bootstrapper.reader_role_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, password=system_reader['password'], + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + +class SystemMemberTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserTests, + _SystemReaderAndMemberTests): + + def setUp(self): + super(SystemMemberTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + system_member = unit.new_user_ref( + domain_id=CONF.identity.default_domain_id + ) + self.user_id = PROVIDERS.identity_api.create_user( + system_member + )['id'] + PROVIDERS.assignment_api.create_system_grant_for_user( + self.user_id, self.bootstrapper.member_role_id + ) + + auth = self.build_authentication_request( + user_id=self.user_id, password=system_member['password'], + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + +class SystemAdminTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _SystemUserTests): + + def setUp(self): + super(SystemAdminTests, self).setUp() + self.loadapp() + self.useFixture(ksfixtures.Policy(self.config_fixture)) + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + # Reuse the system administrator account created during + # ``keystone-manage bootstrap`` + self.user_id = self.bootstrapper.admin_user_id + auth = self.build_authentication_request( + user_id=self.user_id, + password=self.bootstrapper.admin_password, + system=True + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id} + + def test_user_can_list_ec2_credentials_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_password = user['password'] + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + user_auth = self.build_authentication_request( + user_id=user['id'], password=user_password, + project_id=project['id'] + ) + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=user_auth) + token_id = r.headers['X-Subject-Token'] + headers = {'X-Auth-Token': token_id} + + c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=headers) + + path = '/v3/users/%s/credentials/OS-EC2' % self.user_id + r = c.get(path, headers=self.headers) + self.assertEqual([], r.json['credentials']) + + def test_user_can_create_ec2_credentials_for_others(self): + user = PROVIDERS.identity_api.create_user( + unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + ) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + + with self.test_client() as c: + c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=self.headers) + + def test_user_can_delete_ec2_credentials_for_others(self): + user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id) + user_password = user['password'] + user = PROVIDERS.identity_api.create_user(user) + project = unit.new_project_ref( + domain_id=CONF.identity.default_domain_id + ) + project = PROVIDERS.resource_api.create_project(project['id'], project) + PROVIDERS.assignment_api.create_grant( + self.bootstrapper.reader_role_id, user_id=user['id'], + project_id=project['id'] + ) + user_auth = self.build_authentication_request( + user_id=user['id'], password=user_password, + project_id=project['id'] + ) + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=user_auth) + token_id = r.headers['X-Subject-Token'] + headers = {'X-Auth-Token': token_id} + + r = c.post('/v3/users/%s/credentials/OS-EC2' % user['id'], + json={'tenant_id': project['id']}, headers=headers) + credential_id = r.json['credential']['access'] + + c.delete('/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id), + headers=self.headers) + + +class ProjectAdminTests(base_classes.TestCaseWithBootstrap, + common_auth.AuthTestMixin, + _UserEC2CredentialTests, + _SystemReaderAndMemberTests): + + def _override_policy(self): + # TODO(cmurphy): Remove this once the deprecated policies in + # keystone.common.policies.ec2_credential have been removed. This is + # only here to make sure we test the new policies instead of the + # deprecated ones. Oslo.policy will OR deprecated policies with new + # policies to maintain compatibility and give operators a chance to + # update permissions or update policies without breaking users. This + # will cause these specific tests to fail since we're trying to correct + # this broken behavior with better scope checking. + with open(self.policy_file_name, 'w') as f: + overridden_policies = { + 'identity:ec2_get_credential': ec.SYSTEM_READER_OR_CRED_OWNER, + 'identity:ec2_list_credentials': bp.RULE_SYSTEM_READER_OR_OWNER, + 'identity:ec2_create_credential': ec.SYSTEM_ADMIN_OR_CRED_OWNER, + 'identity:ec2_update_credential': ec.SYSTEM_ADMIN_OR_CRED_OWNER, + 'identity:ec2_delete_credential': ec.SYSTEM_ADMIN_OR_CRED_OWNER + } + f.write(jsonutils.dumps(overridden_policies)) + + def setUp(self): + super(ProjectAdminTests, self).setUp() + self.loadapp() + + self.policy_file = self.useFixture(temporaryfile.SecureTempFile()) + self.policy_file_name = self.policy_file.file_name + self.useFixture( + ksfixtures.Policy( + self.config_fixture, policy_file=self.policy_file_name + ) + ) + self._override_policy() + self.config_fixture.config(group='oslo_policy', enforce_scope=True) + + # Reuse the system administrator account created during + # ``keystone-manage bootstrap`` + self.user_id = self.bootstrapper.admin_user_id + auth = self.build_authentication_request( + user_id=self.user_id, + password=self.bootstrapper.admin_password, + project_id=self.bootstrapper.project_id + ) + + # Grab a token using the persona we're testing and prepare headers + # for requests we'll be making in the tests. + with self.test_client() as c: + r = c.post('/v3/auth/tokens', json=auth) + self.token_id = r.headers['X-Subject-Token'] + self.headers = {'X-Auth-Token': self.token_id}