Implement scope type checking for EC2 credentials

This change updates the EC2 credentials policies to understand
the scope types for EC2 credentials. A follow on patch will
Remove obsolete credential policies.

To maintain the compatibility with the old rule the
equivalent ec2_list_credentials and ec2_get_credentials behaves
inconsistently. Same for ec2_create_credentials and the
ec2_delete_inconsistently.

Change-Id: I090e2470726d22b2670a2cca89025063419f5262
Partial-Bug: #1750678
This commit is contained in:
Vishakha Agarwal 2018-10-04 12:39:32 +05:30
parent f3d5dfe323
commit d009384c9b
3 changed files with 504 additions and 28 deletions

View File

@ -361,7 +361,10 @@ class UserOSEC2CredentialsResourceListCreate(_UserOSEC2CredBaseResource):
POST /v3/users/{user_id}/credentials/OS-EC2 POST /v3/users/{user_id}/credentials/OS-EC2
""" """
ENFORCER.enforce_call(action='identity:ec2_create_credential') target = {}
target['credential'] = {'user_id': user_id}
ENFORCER.enforce_call(action='identity:ec2_create_credential',
target_attr=target)
PROVIDERS.identity_api.get_user(user_id) PROVIDERS.identity_api.get_user(user_id)
tenant_id = self.request_body_json.get('tenant_id') tenant_id = self.request_body_json.get('tenant_id')
PROVIDERS.resource_api.get_project(tenant_id) PROVIDERS.resource_api.get_project(tenant_id)

View File

@ -10,56 +10,92 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_log import versionutils
from oslo_policy import policy from oslo_policy import policy
from keystone.common.policies import base from keystone.common.policies import base
SYSTEM_READER_OR_CRED_OWNER = (
'(role:reader and system_scope:all) '
'or user_id:%(target.credential.user_id)s'
)
SYSTEM_ADMIN_OR_CRED_OWNER = (
'(role:admin and system_scope:all) '
'or user_id:%(target.credential.user_id)s'
)
deprecated_ec2_get_credential = policy.DeprecatedRule(
name=base.IDENTITY % 'ec2_get_credential',
check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER
)
deprecated_ec2_list_credentials = policy.DeprecatedRule(
name=base.IDENTITY % 'ec2_list_credentials',
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_ec2_create_credentials = policy.DeprecatedRule(
name=base.IDENTITY % 'ec2_create_credentials',
check_str=base.RULE_ADMIN_OR_OWNER
)
deprecated_ec2_delete_credentials = policy.DeprecatedRule(
name=base.IDENTITY % 'ec2_delete_credentials',
check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER
)
DEPRECATED_REASON = """
As of the Train release, the EC2 credential API understands how to handle
system-scoped tokens in addition to project tokens, making the API more
accessible to users without compromising security or manageability for
administrators. The new default policies for this API account for these changes
automatically.
"""
ec2_credential_policies = [ ec2_credential_policies = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=base.IDENTITY % 'ec2_get_credential', name=base.IDENTITY % 'ec2_get_credential',
check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER, check_str=SYSTEM_READER_OR_CRED_OWNER,
# FIXME(lbragstad): System administrator should be able to manage all scope_types=['system', 'project'],
# ec2 credentials. Users with a system role assignment should be able
# to manage only ec2 credentials keystone can assert belongs to them.
# This is going to require keystone to have "scope" checks in code to
# ensure this is enforced properly. Until keystone has support for
# those cases in code, we're going to have to comment this out. This
# would be a good candidate for a user-scoped operation. If we provide
# scope_types in these policies without proper scope checks in code we
# could expose credentials to users who are not supposed to access
# them.
# scope_types=['system', 'project'],
description='Show ec2 credential details.', description='Show ec2 credential details.',
operations=[{'path': ('/v3/users/{user_id}/credentials/OS-EC2/' operations=[{'path': ('/v3/users/{user_id}/credentials/OS-EC2/'
'{credential_id}'), '{credential_id}'),
'method': 'GET'}]), 'method': 'GET'}],
deprecated_rule=deprecated_ec2_get_credential,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.TRAIN
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=base.IDENTITY % 'ec2_list_credentials', name=base.IDENTITY % 'ec2_list_credentials',
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.RULE_SYSTEM_READER_OR_OWNER,
# FIXME(lbragstad): See the above comment as to why scope_types is scope_types=['system', 'project'],
# commented out.
# scope_types=['system', 'project'],
description='List ec2 credentials.', description='List ec2 credentials.',
operations=[{'path': '/v3/users/{user_id}/credentials/OS-EC2', operations=[{'path': '/v3/users/{user_id}/credentials/OS-EC2',
'method': 'GET'}]), 'method': 'GET'}],
deprecated_rule=deprecated_ec2_list_credentials,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.TRAIN
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=base.IDENTITY % 'ec2_create_credential', name=base.IDENTITY % 'ec2_create_credential',
check_str=base.RULE_ADMIN_OR_OWNER, check_str=base.RULE_SYSTEM_ADMIN_OR_OWNER,
# FIXME(lbragstad): See the above comment as to why scope_types is scope_types=['system', 'project'],
# commented out.
description='Create ec2 credential.', description='Create ec2 credential.',
operations=[{'path': '/v3/users/{user_id}/credentials/OS-EC2', operations=[{'path': '/v3/users/{user_id}/credentials/OS-EC2',
'method': 'POST'}]), 'method': 'POST'}],
deprecated_rule=deprecated_ec2_create_credentials,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.TRAIN
),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
name=base.IDENTITY % 'ec2_delete_credential', name=base.IDENTITY % 'ec2_delete_credential',
check_str=base.RULE_ADMIN_OR_CREDENTIAL_OWNER, check_str=SYSTEM_ADMIN_OR_CRED_OWNER,
# FIXME(lbragstad): See the above comment as to why scope_types is scope_types=['system', 'project'],
# commented out.
# scope_types=['system', 'project'],
description='Delete ec2 credential.', description='Delete ec2 credential.',
operations=[{'path': ('/v3/users/{user_id}/credentials/OS-EC2/' operations=[{'path': ('/v3/users/{user_id}/credentials/OS-EC2/'
'{credential_id}'), '{credential_id}'),
'method': 'DELETE'}]) 'method': 'DELETE'}],
deprecated_rule=deprecated_ec2_delete_credentials,
deprecated_reason=DEPRECATED_REASON,
deprecated_since=versionutils.deprecated.TRAIN
)
] ]

View File

@ -0,0 +1,437 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_serialization import jsonutils
from six.moves import http_client
from keystone.common.policies import base as bp
from keystone.common.policies import ec2_credential as ec
from keystone.common import provider_api
import keystone.conf
from keystone.tests.common import auth as common_auth
from keystone.tests import unit
from keystone.tests.unit import base_classes
from keystone.tests.unit import ksfixtures
from keystone.tests.unit.ksfixtures import temporaryfile
CONF = keystone.conf.CONF
PROVIDERS = provider_api.ProviderAPIs
class _UserEC2CredentialTests(object):
def test_user_can_get_their_ec2_credentials(self):
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id,
json={'tenant_id': project['id']}, headers=self.headers)
credential_id = r.json['credential']['access']
path = '/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id)
r = c.get(path, headers=self.headers)
self.assertEqual(
self.user_id, r.json['credential']['user_id']
)
def test_user_can_list_their_ec2_credentials(self):
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
project_id=project['id']
)
with self.test_client() as c:
c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id,
json={'tenant_id': project['id']}, headers=self.headers)
path = '/v3/users/%s/credentials/OS-EC2' % self.user_id
r = c.get(path, headers=self.headers)
for credential in r.json['credentials']:
self.assertEqual(
self.user_id, credential['user_id']
)
def test_user_create_their_ec2_credentials(self):
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
project_id=project['id']
)
with self.test_client() as c:
c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id,
json={'tenant_id': project['id']}, headers=self.headers,
expected_status_code=http_client.CREATED)
def test_user_delete_their_ec2_credentials(self):
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=self.user_id,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/users/%s/credentials/OS-EC2' % self.user_id,
json={'tenant_id': project['id']}, headers=self.headers)
credential_id = r.json['credential']['access']
c.delete('/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id),
headers=self.headers)
def test_user_cannot_create_ec2_credentials_for_others(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
with self.test_client() as c:
c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=self.headers,
expected_status_code=http_client.FORBIDDEN)
def test_user_cannot_delete_ec2_credentials_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_password = user['password']
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
user_auth = self.build_authentication_request(
user_id=user['id'], password=user_password,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=user_auth)
token_id = r.headers['X-Subject-Token']
headers = {'X-Auth-Token': token_id}
r = c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=headers)
credential_id = r.json['credential']['access']
c.delete('/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id),
headers=self.headers, expected_status_code=http_client.FORBIDDEN)
class _SystemUserTests(object):
def test_user_can_get_ec2_credentials_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_password = user['password']
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
user_auth = self.build_authentication_request(
user_id=user['id'], password=user_password,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=user_auth)
token_id = r.headers['X-Subject-Token']
headers = {'X-Auth-Token': token_id}
r = c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=headers)
credential_id = r.json['credential']['access']
path = '/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id)
c.get(path, headers=self.headers, expected_status_code=http_client.OK)
class _SystemReaderAndMemberTests(object):
def test_user_cannot_list_ec2_credentials_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_password = user['password']
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
user_auth = self.build_authentication_request(
user_id=user['id'], password=user_password,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=user_auth)
token_id = r.headers['X-Subject-Token']
headers = {'X-Auth-Token': token_id}
c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=headers)
path = '/v3/users/%s/credentials/OS-EC2' % self.user_id
r = c.get(path, headers=self.headers)
self.assertEqual([], r.json['credentials'])
class SystemReaderTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserTests,
_SystemReaderAndMemberTests):
def setUp(self):
super(SystemReaderTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_reader = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_reader
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.reader_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_reader['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class SystemMemberTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserTests,
_SystemReaderAndMemberTests):
def setUp(self):
super(SystemMemberTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
system_member = unit.new_user_ref(
domain_id=CONF.identity.default_domain_id
)
self.user_id = PROVIDERS.identity_api.create_user(
system_member
)['id']
PROVIDERS.assignment_api.create_system_grant_for_user(
self.user_id, self.bootstrapper.member_role_id
)
auth = self.build_authentication_request(
user_id=self.user_id, password=system_member['password'],
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
class SystemAdminTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_SystemUserTests):
def setUp(self):
super(SystemAdminTests, self).setUp()
self.loadapp()
self.useFixture(ksfixtures.Policy(self.config_fixture))
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
# Reuse the system administrator account created during
# ``keystone-manage bootstrap``
self.user_id = self.bootstrapper.admin_user_id
auth = self.build_authentication_request(
user_id=self.user_id,
password=self.bootstrapper.admin_password,
system=True
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}
def test_user_can_list_ec2_credentials_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_password = user['password']
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
user_auth = self.build_authentication_request(
user_id=user['id'], password=user_password,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=user_auth)
token_id = r.headers['X-Subject-Token']
headers = {'X-Auth-Token': token_id}
c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=headers)
path = '/v3/users/%s/credentials/OS-EC2' % self.user_id
r = c.get(path, headers=self.headers)
self.assertEqual([], r.json['credentials'])
def test_user_can_create_ec2_credentials_for_others(self):
user = PROVIDERS.identity_api.create_user(
unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
with self.test_client() as c:
c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=self.headers)
def test_user_can_delete_ec2_credentials_for_others(self):
user = unit.new_user_ref(domain_id=CONF.identity.default_domain_id)
user_password = user['password']
user = PROVIDERS.identity_api.create_user(user)
project = unit.new_project_ref(
domain_id=CONF.identity.default_domain_id
)
project = PROVIDERS.resource_api.create_project(project['id'], project)
PROVIDERS.assignment_api.create_grant(
self.bootstrapper.reader_role_id, user_id=user['id'],
project_id=project['id']
)
user_auth = self.build_authentication_request(
user_id=user['id'], password=user_password,
project_id=project['id']
)
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=user_auth)
token_id = r.headers['X-Subject-Token']
headers = {'X-Auth-Token': token_id}
r = c.post('/v3/users/%s/credentials/OS-EC2' % user['id'],
json={'tenant_id': project['id']}, headers=headers)
credential_id = r.json['credential']['access']
c.delete('/v3/users/%s/credentials/OS-EC2/%s' % (self.user_id, credential_id),
headers=self.headers)
class ProjectAdminTests(base_classes.TestCaseWithBootstrap,
common_auth.AuthTestMixin,
_UserEC2CredentialTests,
_SystemReaderAndMemberTests):
def _override_policy(self):
# TODO(cmurphy): Remove this once the deprecated policies in
# keystone.common.policies.ec2_credential have been removed. This is
# only here to make sure we test the new policies instead of the
# deprecated ones. Oslo.policy will OR deprecated policies with new
# policies to maintain compatibility and give operators a chance to
# update permissions or update policies without breaking users. This
# will cause these specific tests to fail since we're trying to correct
# this broken behavior with better scope checking.
with open(self.policy_file_name, 'w') as f:
overridden_policies = {
'identity:ec2_get_credential': ec.SYSTEM_READER_OR_CRED_OWNER,
'identity:ec2_list_credentials': bp.RULE_SYSTEM_READER_OR_OWNER,
'identity:ec2_create_credential': ec.SYSTEM_ADMIN_OR_CRED_OWNER,
'identity:ec2_update_credential': ec.SYSTEM_ADMIN_OR_CRED_OWNER,
'identity:ec2_delete_credential': ec.SYSTEM_ADMIN_OR_CRED_OWNER
}
f.write(jsonutils.dumps(overridden_policies))
def setUp(self):
super(ProjectAdminTests, self).setUp()
self.loadapp()
self.policy_file = self.useFixture(temporaryfile.SecureTempFile())
self.policy_file_name = self.policy_file.file_name
self.useFixture(
ksfixtures.Policy(
self.config_fixture, policy_file=self.policy_file_name
)
)
self._override_policy()
self.config_fixture.config(group='oslo_policy', enforce_scope=True)
# Reuse the system administrator account created during
# ``keystone-manage bootstrap``
self.user_id = self.bootstrapper.admin_user_id
auth = self.build_authentication_request(
user_id=self.user_id,
password=self.bootstrapper.admin_password,
project_id=self.bootstrapper.project_id
)
# Grab a token using the persona we're testing and prepare headers
# for requests we'll be making in the tests.
with self.test_client() as c:
r = c.post('/v3/auth/tokens', json=auth)
self.token_id = r.headers['X-Subject-Token']
self.headers = {'X-Auth-Token': self.token_id}