991 lines
41 KiB
Python
991 lines
41 KiB
Python
# Copyright 2013 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import hashlib
|
|
import json
|
|
from unittest import mock
|
|
import uuid
|
|
|
|
import http.client
|
|
from keystoneclient.contrib.ec2 import utils as ec2_utils
|
|
from oslo_db import exception as oslo_db_exception
|
|
from testtools import matchers
|
|
import urllib
|
|
|
|
from keystone.api import ec2tokens
|
|
from keystone.common import provider_api
|
|
from keystone.common import utils
|
|
from keystone.credential.providers import fernet as credential_fernet
|
|
from keystone import exception
|
|
from keystone import oauth1
|
|
from keystone.tests import unit
|
|
from keystone.tests.unit import ksfixtures
|
|
from keystone.tests.unit import test_v3
|
|
|
|
|
|
PROVIDERS = provider_api.ProviderAPIs
|
|
CRED_TYPE_EC2 = ec2tokens.CRED_TYPE_EC2
|
|
|
|
|
|
class CredentialBaseTestCase(test_v3.RestfulTestCase):
|
|
|
|
def setUp(self):
|
|
super(CredentialBaseTestCase, self).setUp()
|
|
self.useFixture(
|
|
ksfixtures.KeyRepository(
|
|
self.config_fixture,
|
|
'credential',
|
|
credential_fernet.MAX_ACTIVE_KEYS
|
|
)
|
|
)
|
|
|
|
def _create_dict_blob_credential(self):
|
|
blob, credential = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
|
|
# Store the blob as a dict *not* JSON ref bug #1259584
|
|
# This means we can test the dict->json workaround, added
|
|
# as part of the bugfix for backwards compatibility works.
|
|
credential['blob'] = blob
|
|
credential_id = credential['id']
|
|
|
|
# Create direct via the DB API to avoid validation failure
|
|
PROVIDERS.credential_api.create_credential(credential_id, credential)
|
|
|
|
return json.dumps(blob), credential_id
|
|
|
|
def _test_get_token(self, access, secret):
|
|
"""Test signature validation with the access/secret provided."""
|
|
signer = ec2_utils.Ec2Signer(secret)
|
|
params = {'SignatureMethod': 'HmacSHA256',
|
|
'SignatureVersion': '2',
|
|
'AWSAccessKeyId': access}
|
|
request = {'host': 'foo',
|
|
'verb': 'GET',
|
|
'path': '/bar',
|
|
'params': params}
|
|
signature = signer.generate(request)
|
|
|
|
# Now make a request to validate the signed dummy request via the
|
|
# ec2tokens API. This proves the v3 ec2 credentials actually work.
|
|
sig_ref = {'access': access,
|
|
'signature': signature,
|
|
'host': 'foo',
|
|
'verb': 'GET',
|
|
'path': '/bar',
|
|
'params': params}
|
|
r = self.post(
|
|
'/ec2tokens',
|
|
body={'ec2Credentials': sig_ref},
|
|
expected_status=http.client.OK)
|
|
self.assertValidTokenResponse(r)
|
|
return r.result['token']
|
|
|
|
|
|
class CredentialTestCase(CredentialBaseTestCase):
|
|
"""Test credential CRUD."""
|
|
|
|
def setUp(self):
|
|
|
|
super(CredentialTestCase, self).setUp()
|
|
|
|
self.credential = unit.new_credential_ref(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
|
|
PROVIDERS.credential_api.create_credential(
|
|
self.credential['id'],
|
|
self.credential)
|
|
|
|
def test_credential_api_delete_credentials_for_project(self):
|
|
PROVIDERS.credential_api.delete_credentials_for_project(
|
|
self.project_id
|
|
)
|
|
# Test that the credential that we created in .setUp no longer exists
|
|
# once we delete all credentials for self.project_id
|
|
self.assertRaises(exception.CredentialNotFound,
|
|
PROVIDERS.credential_api.get_credential,
|
|
credential_id=self.credential['id'])
|
|
|
|
def test_credential_api_delete_credentials_for_user(self):
|
|
PROVIDERS.credential_api.delete_credentials_for_user(self.user_id)
|
|
# Test that the credential that we created in .setUp no longer exists
|
|
# once we delete all credentials for self.user_id
|
|
self.assertRaises(exception.CredentialNotFound,
|
|
PROVIDERS.credential_api.get_credential,
|
|
credential_id=self.credential['id'])
|
|
|
|
def test_list_credentials(self):
|
|
"""Call ``GET /credentials``."""
|
|
r = self.get('/credentials')
|
|
self.assertValidCredentialListResponse(r, ref=self.credential)
|
|
|
|
def test_list_credentials_filtered_by_user_id(self):
|
|
"""Call ``GET /credentials?user_id={user_id}``."""
|
|
credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
|
|
PROVIDERS.credential_api.create_credential(
|
|
credential['id'], credential
|
|
)
|
|
|
|
r = self.get('/credentials?user_id=%s' % self.user['id'])
|
|
self.assertValidCredentialListResponse(r, ref=self.credential)
|
|
for cred in r.result['credentials']:
|
|
self.assertEqual(self.user['id'], cred['user_id'])
|
|
|
|
def test_list_credentials_filtered_by_type(self):
|
|
"""Call ``GET /credentials?type={type}``."""
|
|
PROVIDERS.assignment_api.create_system_grant_for_user(
|
|
self.user_id, self.role_id
|
|
)
|
|
token = self.get_system_scoped_token()
|
|
|
|
# The type ec2 was chosen, instead of a random string,
|
|
# because the type must be in the list of supported types
|
|
ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex,
|
|
project_id=self.project_id,
|
|
type=CRED_TYPE_EC2)
|
|
|
|
ec2_resp = PROVIDERS.credential_api.create_credential(
|
|
ec2_credential['id'], ec2_credential)
|
|
|
|
# The type cert was chosen for the same reason as ec2
|
|
r = self.get('/credentials?type=cert', token=token)
|
|
|
|
# Testing the filter for two different types
|
|
self.assertValidCredentialListResponse(r, ref=self.credential)
|
|
for cred in r.result['credentials']:
|
|
self.assertEqual('cert', cred['type'])
|
|
|
|
r_ec2 = self.get('/credentials?type=ec2', token=token)
|
|
self.assertThat(r_ec2.result['credentials'], matchers.HasLength(1))
|
|
cred_ec2 = r_ec2.result['credentials'][0]
|
|
|
|
self.assertValidCredentialListResponse(r_ec2, ref=ec2_resp)
|
|
self.assertEqual(CRED_TYPE_EC2, cred_ec2['type'])
|
|
self.assertEqual(ec2_credential['id'], cred_ec2['id'])
|
|
|
|
def test_list_credentials_filtered_by_type_and_user_id(self):
|
|
"""Call ``GET /credentials?user_id={user_id}&type={type}``."""
|
|
user1_id = uuid.uuid4().hex
|
|
user2_id = uuid.uuid4().hex
|
|
|
|
PROVIDERS.assignment_api.create_system_grant_for_user(
|
|
self.user_id, self.role_id
|
|
)
|
|
token = self.get_system_scoped_token()
|
|
|
|
# Creating credentials for two different users
|
|
credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id,
|
|
type=CRED_TYPE_EC2)
|
|
credential_user1_cert = unit.new_credential_ref(user_id=user1_id)
|
|
credential_user2_cert = unit.new_credential_ref(user_id=user2_id)
|
|
|
|
PROVIDERS.credential_api.create_credential(
|
|
credential_user1_ec2['id'], credential_user1_ec2)
|
|
PROVIDERS.credential_api.create_credential(
|
|
credential_user1_cert['id'], credential_user1_cert)
|
|
PROVIDERS.credential_api.create_credential(
|
|
credential_user2_cert['id'], credential_user2_cert)
|
|
|
|
r = self.get(
|
|
'/credentials?user_id=%s&type=ec2' % user1_id, token=token
|
|
)
|
|
self.assertValidCredentialListResponse(r, ref=credential_user1_ec2)
|
|
self.assertThat(r.result['credentials'], matchers.HasLength(1))
|
|
cred = r.result['credentials'][0]
|
|
self.assertEqual(CRED_TYPE_EC2, cred['type'])
|
|
self.assertEqual(user1_id, cred['user_id'])
|
|
|
|
def test_create_credential(self):
|
|
"""Call ``POST /credentials``."""
|
|
ref = unit.new_credential_ref(user_id=self.user['id'])
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
|
|
def test_get_credential(self):
|
|
"""Call ``GET /credentials/{credential_id}``."""
|
|
r = self.get(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': self.credential['id']})
|
|
self.assertValidCredentialResponse(r, self.credential)
|
|
|
|
def test_update_credential(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
ref = unit.new_credential_ref(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
del ref['id']
|
|
r = self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': self.credential['id']},
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
|
|
def test_update_credential_to_ec2_type(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
# Create a credential without providing a project_id
|
|
ref = unit.new_credential_ref(user_id=self.user['id'])
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
|
|
# Updating the credential to ec2 requires a project_id
|
|
update_ref = {'type': 'ec2', 'project_id': self.project_id}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref})
|
|
|
|
def test_update_credential_to_ec2_missing_project_id(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
# Create a credential without providing a project_id
|
|
ref = unit.new_credential_ref(user_id=self.user['id'])
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
|
|
# Updating such credential to ec2 type without providing a project_id
|
|
# will fail
|
|
update_ref = {'type': 'ec2'}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
|
|
def test_update_credential_to_ec2_with_previously_set_project_id(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
# Create a credential providing a project_id
|
|
ref = unit.new_credential_ref(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
|
|
# Since the created credential above already has a project_id, the
|
|
# update request will not fail
|
|
update_ref = {'type': 'ec2'}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref})
|
|
|
|
def test_update_credential_non_owner(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
alt_user = unit.create_user(
|
|
PROVIDERS.identity_api, domain_id=self.domain_id)
|
|
alt_user_id = alt_user['id']
|
|
alt_project = unit.new_project_ref(domain_id=self.domain_id)
|
|
alt_project_id = alt_project['id']
|
|
PROVIDERS.resource_api.create_project(
|
|
alt_project['id'], alt_project)
|
|
alt_role = unit.new_role_ref(name='reader')
|
|
alt_role_id = alt_role['id']
|
|
PROVIDERS.role_api.create_role(alt_role_id, alt_role)
|
|
PROVIDERS.assignment_api.add_role_to_user_and_project(
|
|
alt_user_id, alt_project_id, alt_role_id)
|
|
auth = self.build_authentication_request(
|
|
user_id=alt_user_id,
|
|
password=alt_user['password'],
|
|
project_id=alt_project_id)
|
|
ref = unit.new_credential_ref(user_id=alt_user_id,
|
|
project_id=alt_project_id)
|
|
r = self.post(
|
|
'/credentials',
|
|
auth=auth,
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
|
|
# Cannot change the credential to be owned by another user
|
|
update_ref = {'user_id': self.user_id, 'project_id': self.project_id}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
expected_status=403,
|
|
auth=auth,
|
|
body={'credential': update_ref})
|
|
|
|
def test_update_ec2_credential_change_trust_id(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
blob['trust_id'] = uuid.uuid4().hex
|
|
ref['blob'] = json.dumps(blob)
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
# Try changing to a different trust
|
|
blob['trust_id'] = uuid.uuid4().hex
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
# Try removing the trust
|
|
del blob['trust_id']
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
|
|
def test_update_ec2_credential_change_app_cred_id(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
blob['app_cred_id'] = uuid.uuid4().hex
|
|
ref['blob'] = json.dumps(blob)
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
# Try changing to a different app cred
|
|
blob['app_cred_id'] = uuid.uuid4().hex
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
# Try removing the app cred
|
|
del blob['app_cred_id']
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
|
|
def test_update_ec2_credential_change_access_token_id(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
blob['access_token_id'] = uuid.uuid4().hex
|
|
ref['blob'] = json.dumps(blob)
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
# Try changing to a different access token
|
|
blob['access_token_id'] = uuid.uuid4().hex
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
# Try removing the access token
|
|
del blob['access_token_id']
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
|
|
def test_update_ec2_credential_change_access_id(self):
|
|
"""Call ``PATCH /credentials/{credential_id}``."""
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
blob['access_id'] = uuid.uuid4().hex
|
|
ref['blob'] = json.dumps(blob)
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
credential_id = r.result.get('credential')['id']
|
|
# Try changing to a different access_id
|
|
blob['access_id'] = uuid.uuid4().hex
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
# Try removing the access_id
|
|
del blob['access_id']
|
|
update_ref = {'blob': json.dumps(blob)}
|
|
self.patch(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id},
|
|
body={'credential': update_ref},
|
|
expected_status=http.client.BAD_REQUEST)
|
|
|
|
def test_delete_credential(self):
|
|
"""Call ``DELETE /credentials/{credential_id}``."""
|
|
self.delete(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': self.credential['id']})
|
|
|
|
def test_delete_credential_retries_on_deadlock(self):
|
|
patcher = mock.patch('sqlalchemy.orm.query.Query.delete',
|
|
autospec=True)
|
|
|
|
class FakeDeadlock(object):
|
|
def __init__(self, mock_patcher):
|
|
self.deadlock_count = 2
|
|
self.mock_patcher = mock_patcher
|
|
self.patched = True
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
if self.deadlock_count > 1:
|
|
self.deadlock_count -= 1
|
|
else:
|
|
self.mock_patcher.stop()
|
|
self.patched = False
|
|
raise oslo_db_exception.DBDeadlock
|
|
|
|
sql_delete_mock = patcher.start()
|
|
side_effect = FakeDeadlock(patcher)
|
|
sql_delete_mock.side_effect = side_effect
|
|
|
|
try:
|
|
PROVIDERS.credential_api.delete_credentials_for_user(
|
|
user_id=self.user['id'])
|
|
finally:
|
|
if side_effect.patched:
|
|
patcher.stop()
|
|
|
|
# initial attempt + 1 retry
|
|
self.assertEqual(sql_delete_mock.call_count, 2)
|
|
|
|
def test_create_ec2_credential(self):
|
|
"""Call ``POST /credentials`` for creating ec2 credential."""
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
r = self.post('/credentials', body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
# Assert credential id is same as hash of access key id for
|
|
# ec2 credentials
|
|
access = blob['access'].encode('utf-8')
|
|
self.assertEqual(hashlib.sha256(access).hexdigest(),
|
|
r.result['credential']['id'])
|
|
# Create second ec2 credential with the same access key id and check
|
|
# for conflict.
|
|
self.post(
|
|
'/credentials',
|
|
body={'credential': ref}, expected_status=http.client.CONFLICT)
|
|
|
|
def test_get_ec2_dict_blob(self):
|
|
"""Ensure non-JSON blob data is correctly converted."""
|
|
expected_blob, credential_id = self._create_dict_blob_credential()
|
|
|
|
r = self.get(
|
|
'/credentials/%(credential_id)s' % {
|
|
'credential_id': credential_id})
|
|
|
|
# use json.loads to transform the blobs back into Python dictionaries
|
|
# to avoid problems with the keys being in different orders.
|
|
self.assertEqual(json.loads(expected_blob),
|
|
json.loads(r.result['credential']['blob']))
|
|
|
|
def test_list_ec2_dict_blob(self):
|
|
"""Ensure non-JSON blob data is correctly converted."""
|
|
expected_blob, credential_id = self._create_dict_blob_credential()
|
|
|
|
list_r = self.get('/credentials')
|
|
list_creds = list_r.result['credentials']
|
|
list_ids = [r['id'] for r in list_creds]
|
|
self.assertIn(credential_id, list_ids)
|
|
# use json.loads to transform the blobs back into Python dictionaries
|
|
# to avoid problems with the keys being in different orders.
|
|
for r in list_creds:
|
|
if r['id'] == credential_id:
|
|
self.assertEqual(json.loads(expected_blob),
|
|
json.loads(r['blob']))
|
|
|
|
def test_create_non_ec2_credential(self):
|
|
"""Test creating non-ec2 credential.
|
|
|
|
Call ``POST /credentials``.
|
|
"""
|
|
blob, ref = unit.new_cert_credential(user_id=self.user['id'])
|
|
|
|
r = self.post('/credentials', body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
# Assert credential id is not same as hash of access key id for
|
|
# non-ec2 credentials
|
|
access = blob['access'].encode('utf-8')
|
|
self.assertNotEqual(hashlib.sha256(access).hexdigest(),
|
|
r.result['credential']['id'])
|
|
|
|
def test_create_ec2_credential_with_missing_project_id(self):
|
|
"""Test Creating ec2 credential with missing project_id.
|
|
|
|
Call ``POST /credentials``.
|
|
"""
|
|
_, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=None)
|
|
# Assert bad request status when missing project_id
|
|
self.post(
|
|
'/credentials',
|
|
body={'credential': ref}, expected_status=http.client.BAD_REQUEST)
|
|
|
|
def test_create_ec2_credential_with_invalid_blob(self):
|
|
"""Test creating ec2 credential with invalid blob.
|
|
|
|
Call ``POST /credentials``.
|
|
"""
|
|
ref = unit.new_credential_ref(user_id=self.user['id'],
|
|
project_id=self.project_id,
|
|
blob='{"abc":"def"d}',
|
|
type=CRED_TYPE_EC2)
|
|
# Assert bad request status when request contains invalid blob
|
|
response = self.post(
|
|
'/credentials',
|
|
body={'credential': ref}, expected_status=http.client.BAD_REQUEST)
|
|
self.assertValidErrorResponse(response)
|
|
|
|
def test_create_credential_with_admin_token(self):
|
|
# Make sure we can create credential with the static admin token
|
|
ref = unit.new_credential_ref(user_id=self.user['id'])
|
|
r = self.post(
|
|
'/credentials',
|
|
body={'credential': ref},
|
|
token=self.get_admin_token())
|
|
self.assertValidCredentialResponse(r, ref)
|
|
|
|
|
|
class TestCredentialTrustScoped(CredentialBaseTestCase):
|
|
"""Test credential with trust scoped token."""
|
|
|
|
def setUp(self):
|
|
super(TestCredentialTrustScoped, self).setUp()
|
|
|
|
self.trustee_user = unit.new_user_ref(domain_id=self.domain_id)
|
|
password = self.trustee_user['password']
|
|
self.trustee_user = PROVIDERS.identity_api.create_user(
|
|
self.trustee_user
|
|
)
|
|
self.trustee_user['password'] = password
|
|
self.trustee_user_id = self.trustee_user['id']
|
|
self.useFixture(
|
|
ksfixtures.KeyRepository(
|
|
self.config_fixture,
|
|
'credential',
|
|
credential_fernet.MAX_ACTIVE_KEYS
|
|
)
|
|
)
|
|
|
|
def config_overrides(self):
|
|
super(TestCredentialTrustScoped, self).config_overrides()
|
|
self.config_fixture.config(group='trust')
|
|
|
|
def test_trust_scoped_ec2_credential(self):
|
|
"""Test creating trust scoped ec2 credential.
|
|
|
|
Call ``POST /credentials``.
|
|
"""
|
|
# Create the trust
|
|
ref = unit.new_trust_ref(
|
|
trustor_user_id=self.user_id,
|
|
trustee_user_id=self.trustee_user_id,
|
|
project_id=self.project_id,
|
|
impersonation=True,
|
|
expires=dict(minutes=1),
|
|
role_ids=[self.role_id])
|
|
del ref['id']
|
|
r = self.post('/OS-TRUST/trusts', body={'trust': ref})
|
|
trust = self.assertValidTrustResponse(r)
|
|
|
|
# Get a trust scoped token
|
|
auth_data = self.build_authentication_request(
|
|
user_id=self.trustee_user['id'],
|
|
password=self.trustee_user['password'],
|
|
trust_id=trust['id'])
|
|
r = self.v3_create_token(auth_data)
|
|
self.assertValidProjectScopedTokenResponse(r, self.user)
|
|
trust_id = r.result['token']['OS-TRUST:trust']['id']
|
|
token_id = r.headers.get('X-Subject-Token')
|
|
|
|
# Create the credential with the trust scoped token
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user_id,
|
|
project_id=self.project_id)
|
|
r = self.post('/credentials', body={'credential': ref}, token=token_id)
|
|
|
|
# We expect the response blob to contain the trust_id
|
|
ret_ref = ref.copy()
|
|
ret_blob = blob.copy()
|
|
ret_blob['trust_id'] = trust_id
|
|
ret_ref['blob'] = json.dumps(ret_blob)
|
|
self.assertValidCredentialResponse(r, ref=ret_ref)
|
|
|
|
# Assert credential id is same as hash of access key id for
|
|
# ec2 credentials
|
|
access = blob['access'].encode('utf-8')
|
|
self.assertEqual(hashlib.sha256(access).hexdigest(),
|
|
r.result['credential']['id'])
|
|
|
|
# Create a role assignment to ensure that it is ignored and only the
|
|
# trust-delegated roles are used
|
|
role = unit.new_role_ref(name='reader')
|
|
role_id = role['id']
|
|
PROVIDERS.role_api.create_role(role_id, role)
|
|
PROVIDERS.assignment_api.add_role_to_user_and_project(
|
|
self.user_id, self.project_id, role_id)
|
|
|
|
ret_blob = json.loads(r.result['credential']['blob'])
|
|
ec2token = self._test_get_token(
|
|
access=ret_blob['access'], secret=ret_blob['secret'])
|
|
ec2_roles = [role['id'] for role in ec2token['roles']]
|
|
self.assertIn(self.role_id, ec2_roles)
|
|
self.assertNotIn(role_id, ec2_roles)
|
|
|
|
# Create second ec2 credential with the same access key id and check
|
|
# for conflict.
|
|
self.post(
|
|
'/credentials',
|
|
body={'credential': ref},
|
|
token=token_id,
|
|
expected_status=http.client.CONFLICT)
|
|
|
|
|
|
class TestCredentialAppCreds(CredentialBaseTestCase):
|
|
"""Test credential with application credential token."""
|
|
|
|
def setUp(self):
|
|
super(TestCredentialAppCreds, self).setUp()
|
|
self.useFixture(
|
|
ksfixtures.KeyRepository(
|
|
self.config_fixture,
|
|
'credential',
|
|
credential_fernet.MAX_ACTIVE_KEYS
|
|
)
|
|
)
|
|
|
|
def test_app_cred_ec2_credential(self):
|
|
"""Test creating ec2 credential from an application credential.
|
|
|
|
Call ``POST /credentials``.
|
|
"""
|
|
# Create the app cred
|
|
ref = unit.new_application_credential_ref(roles=[{'id': self.role_id}])
|
|
del ref['id']
|
|
r = self.post('/users/%s/application_credentials' % self.user_id,
|
|
body={'application_credential': ref})
|
|
app_cred = r.result['application_credential']
|
|
|
|
# Get an application credential token
|
|
auth_data = self.build_authentication_request(
|
|
app_cred_id=app_cred['id'],
|
|
secret=app_cred['secret'])
|
|
r = self.v3_create_token(auth_data)
|
|
token_id = r.headers.get('X-Subject-Token')
|
|
|
|
# Create the credential with the app cred token
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user_id,
|
|
project_id=self.project_id)
|
|
r = self.post('/credentials', body={'credential': ref}, token=token_id)
|
|
|
|
# We expect the response blob to contain the app_cred_id
|
|
ret_ref = ref.copy()
|
|
ret_blob = blob.copy()
|
|
ret_blob['app_cred_id'] = app_cred['id']
|
|
ret_ref['blob'] = json.dumps(ret_blob)
|
|
self.assertValidCredentialResponse(r, ref=ret_ref)
|
|
|
|
# Assert credential id is same as hash of access key id for
|
|
# ec2 credentials
|
|
access = blob['access'].encode('utf-8')
|
|
self.assertEqual(hashlib.sha256(access).hexdigest(),
|
|
r.result['credential']['id'])
|
|
|
|
# Create a role assignment to ensure that it is ignored and only the
|
|
# roles in the app cred are used
|
|
role = unit.new_role_ref(name='reader')
|
|
role_id = role['id']
|
|
PROVIDERS.role_api.create_role(role_id, role)
|
|
PROVIDERS.assignment_api.add_role_to_user_and_project(
|
|
self.user_id, self.project_id, role_id)
|
|
|
|
ret_blob = json.loads(r.result['credential']['blob'])
|
|
ec2token = self._test_get_token(
|
|
access=ret_blob['access'], secret=ret_blob['secret'])
|
|
ec2_roles = [role['id'] for role in ec2token['roles']]
|
|
self.assertIn(self.role_id, ec2_roles)
|
|
self.assertNotIn(role_id, ec2_roles)
|
|
|
|
# Create second ec2 credential with the same access key id and check
|
|
# for conflict.
|
|
self.post(
|
|
'/credentials',
|
|
body={'credential': ref},
|
|
token=token_id,
|
|
expected_status=http.client.CONFLICT)
|
|
|
|
|
|
class TestCredentialAccessToken(CredentialBaseTestCase):
|
|
"""Test credential with access token."""
|
|
|
|
def setUp(self):
|
|
super(TestCredentialAccessToken, self).setUp()
|
|
self.useFixture(
|
|
ksfixtures.KeyRepository(
|
|
self.config_fixture,
|
|
'credential',
|
|
credential_fernet.MAX_ACTIVE_KEYS
|
|
)
|
|
)
|
|
self.base_url = 'http://localhost/v3'
|
|
|
|
def _urllib_parse_qs_text_keys(self, content):
|
|
results = urllib.parse.parse_qs(content)
|
|
return {key.decode('utf-8'): value for key, value in results.items()}
|
|
|
|
def _create_single_consumer(self):
|
|
endpoint = '/OS-OAUTH1/consumers'
|
|
|
|
ref = {'description': uuid.uuid4().hex}
|
|
resp = self.post(endpoint, body={'consumer': ref})
|
|
return resp.result['consumer']
|
|
|
|
def _create_request_token(self, consumer, project_id, base_url=None):
|
|
endpoint = '/OS-OAUTH1/request_token'
|
|
client = oauth1.Client(consumer['key'],
|
|
client_secret=consumer['secret'],
|
|
signature_method=oauth1.SIG_HMAC,
|
|
callback_uri="oob")
|
|
headers = {'requested_project_id': project_id}
|
|
if not base_url:
|
|
base_url = self.base_url
|
|
url, headers, body = client.sign(base_url + endpoint,
|
|
http_method='POST',
|
|
headers=headers)
|
|
return endpoint, headers
|
|
|
|
def _create_access_token(self, consumer, token, base_url=None):
|
|
endpoint = '/OS-OAUTH1/access_token'
|
|
client = oauth1.Client(consumer['key'],
|
|
client_secret=consumer['secret'],
|
|
resource_owner_key=token.key,
|
|
resource_owner_secret=token.secret,
|
|
signature_method=oauth1.SIG_HMAC,
|
|
verifier=token.verifier)
|
|
if not base_url:
|
|
base_url = self.base_url
|
|
url, headers, body = client.sign(base_url + endpoint,
|
|
http_method='POST')
|
|
headers.update({'Content-Type': 'application/json'})
|
|
return endpoint, headers
|
|
|
|
def _get_oauth_token(self, consumer, token):
|
|
client = oauth1.Client(consumer['key'],
|
|
client_secret=consumer['secret'],
|
|
resource_owner_key=token.key,
|
|
resource_owner_secret=token.secret,
|
|
signature_method=oauth1.SIG_HMAC)
|
|
endpoint = '/auth/tokens'
|
|
url, headers, body = client.sign(self.base_url + endpoint,
|
|
http_method='POST')
|
|
headers.update({'Content-Type': 'application/json'})
|
|
ref = {'auth': {'identity': {'oauth1': {}, 'methods': ['oauth1']}}}
|
|
return endpoint, headers, ref
|
|
|
|
def _authorize_request_token(self, request_id):
|
|
if isinstance(request_id, bytes):
|
|
request_id = request_id.decode()
|
|
return '/OS-OAUTH1/authorize/%s' % (request_id)
|
|
|
|
def _get_access_token(self):
|
|
consumer = self._create_single_consumer()
|
|
consumer_id = consumer['id']
|
|
consumer_secret = consumer['secret']
|
|
consumer = {'key': consumer_id, 'secret': consumer_secret}
|
|
|
|
url, headers = self._create_request_token(consumer, self.project_id)
|
|
content = self.post(
|
|
url, headers=headers,
|
|
response_content_type='application/x-www-form-urlencoded')
|
|
credentials = self._urllib_parse_qs_text_keys(content.result)
|
|
request_key = credentials['oauth_token'][0]
|
|
request_secret = credentials['oauth_token_secret'][0]
|
|
request_token = oauth1.Token(request_key, request_secret)
|
|
|
|
url = self._authorize_request_token(request_key)
|
|
body = {'roles': [{'id': self.role_id}]}
|
|
resp = self.put(url, body=body, expected_status=http.client.OK)
|
|
verifier = resp.result['token']['oauth_verifier']
|
|
|
|
request_token.set_verifier(verifier)
|
|
url, headers = self._create_access_token(consumer, request_token)
|
|
content = self.post(
|
|
url, headers=headers,
|
|
response_content_type='application/x-www-form-urlencoded')
|
|
credentials = self._urllib_parse_qs_text_keys(content.result)
|
|
access_key = credentials['oauth_token'][0]
|
|
access_secret = credentials['oauth_token_secret'][0]
|
|
access_token = oauth1.Token(access_key, access_secret)
|
|
|
|
url, headers, body = self._get_oauth_token(consumer, access_token)
|
|
content = self.post(url, headers=headers, body=body)
|
|
return access_key, content.headers['X-Subject-Token']
|
|
|
|
def test_access_token_ec2_credential(self):
|
|
"""Test creating ec2 credential from an oauth access token.
|
|
|
|
Call ``POST /credentials``.
|
|
"""
|
|
access_key, token_id = self._get_access_token()
|
|
|
|
# Create the credential with the access token
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user_id,
|
|
project_id=self.project_id)
|
|
r = self.post('/credentials', body={'credential': ref}, token=token_id)
|
|
|
|
# We expect the response blob to contain the access_token_id
|
|
ret_ref = ref.copy()
|
|
ret_blob = blob.copy()
|
|
ret_blob['access_token_id'] = access_key.decode('utf-8')
|
|
ret_ref['blob'] = json.dumps(ret_blob)
|
|
self.assertValidCredentialResponse(r, ref=ret_ref)
|
|
|
|
# Assert credential id is same as hash of access key id for
|
|
# ec2 credentials
|
|
access = blob['access'].encode('utf-8')
|
|
self.assertEqual(hashlib.sha256(access).hexdigest(),
|
|
r.result['credential']['id'])
|
|
|
|
# Create a role assignment to ensure that it is ignored and only the
|
|
# roles in the access token are used
|
|
role = unit.new_role_ref(name='reader')
|
|
role_id = role['id']
|
|
PROVIDERS.role_api.create_role(role_id, role)
|
|
PROVIDERS.assignment_api.add_role_to_user_and_project(
|
|
self.user_id, self.project_id, role_id)
|
|
|
|
ret_blob = json.loads(r.result['credential']['blob'])
|
|
ec2token = self._test_get_token(
|
|
access=ret_blob['access'], secret=ret_blob['secret'])
|
|
ec2_roles = [role['id'] for role in ec2token['roles']]
|
|
self.assertIn(self.role_id, ec2_roles)
|
|
self.assertNotIn(role_id, ec2_roles)
|
|
|
|
|
|
class TestCredentialEc2(CredentialBaseTestCase):
|
|
"""Test v3 credential compatibility with ec2tokens."""
|
|
|
|
def test_ec2_credential_signature_validate(self):
|
|
"""Test signature validation with a v3 ec2 credential."""
|
|
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
|
|
project_id=self.project_id)
|
|
r = self.post('/credentials', body={'credential': ref})
|
|
self.assertValidCredentialResponse(r, ref)
|
|
# Assert credential id is same as hash of access key id
|
|
access = blob['access'].encode('utf-8')
|
|
self.assertEqual(hashlib.sha256(access).hexdigest(),
|
|
r.result['credential']['id'])
|
|
|
|
cred_blob = json.loads(r.result['credential']['blob'])
|
|
self.assertEqual(blob, cred_blob)
|
|
self._test_get_token(access=cred_blob['access'],
|
|
secret=cred_blob['secret'])
|
|
|
|
def test_ec2_credential_signature_validate_legacy(self):
|
|
"""Test signature validation with a legacy v3 ec2 credential."""
|
|
cred_json, _ = self._create_dict_blob_credential()
|
|
cred_blob = json.loads(cred_json)
|
|
self._test_get_token(access=cred_blob['access'],
|
|
secret=cred_blob['secret'])
|
|
|
|
def _get_ec2_cred_uri(self):
|
|
return '/users/%s/credentials/OS-EC2' % self.user_id
|
|
|
|
def _get_ec2_cred(self):
|
|
uri = self._get_ec2_cred_uri()
|
|
r = self.post(uri, body={'tenant_id': self.project_id})
|
|
return r.result['credential']
|
|
|
|
def test_ec2_create_credential(self):
|
|
"""Test ec2 credential creation."""
|
|
ec2_cred = self._get_ec2_cred()
|
|
self.assertEqual(self.user_id, ec2_cred['user_id'])
|
|
self.assertEqual(self.project_id, ec2_cred['tenant_id'])
|
|
self.assertIsNone(ec2_cred['trust_id'])
|
|
self._test_get_token(access=ec2_cred['access'],
|
|
secret=ec2_cred['secret'])
|
|
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
|
|
self.assertThat(ec2_cred['links']['self'],
|
|
matchers.EndsWith(uri))
|
|
|
|
def test_ec2_get_credential(self):
|
|
ec2_cred = self._get_ec2_cred()
|
|
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
|
|
r = self.get(uri)
|
|
self.assertDictEqual(ec2_cred, r.result['credential'])
|
|
self.assertThat(ec2_cred['links']['self'],
|
|
matchers.EndsWith(uri))
|
|
|
|
def test_ec2_cannot_get_non_ec2_credential(self):
|
|
access_key = uuid.uuid4().hex
|
|
cred_id = utils.hash_access_key(access_key)
|
|
non_ec2_cred = unit.new_credential_ref(
|
|
user_id=self.user_id,
|
|
project_id=self.project_id)
|
|
non_ec2_cred['id'] = cred_id
|
|
PROVIDERS.credential_api.create_credential(cred_id, non_ec2_cred)
|
|
uri = '/'.join([self._get_ec2_cred_uri(), access_key])
|
|
# if access_key is not found, ec2 controller raises Unauthorized
|
|
# exception
|
|
self.get(uri, expected_status=http.client.UNAUTHORIZED)
|
|
|
|
def test_ec2_list_credentials(self):
|
|
"""Test ec2 credential listing."""
|
|
self._get_ec2_cred()
|
|
uri = self._get_ec2_cred_uri()
|
|
r = self.get(uri)
|
|
cred_list = r.result['credentials']
|
|
self.assertEqual(1, len(cred_list))
|
|
self.assertThat(r.result['links']['self'],
|
|
matchers.EndsWith(uri))
|
|
|
|
# non-EC2 credentials won't be fetched
|
|
non_ec2_cred = unit.new_credential_ref(
|
|
user_id=self.user_id,
|
|
project_id=self.project_id)
|
|
non_ec2_cred['type'] = uuid.uuid4().hex
|
|
PROVIDERS.credential_api.create_credential(
|
|
non_ec2_cred['id'], non_ec2_cred
|
|
)
|
|
r = self.get(uri)
|
|
cred_list_2 = r.result['credentials']
|
|
# still one element because non-EC2 credentials are not returned.
|
|
self.assertEqual(1, len(cred_list_2))
|
|
self.assertEqual(cred_list[0], cred_list_2[0])
|
|
|
|
def test_ec2_delete_credential(self):
|
|
"""Test ec2 credential deletion."""
|
|
ec2_cred = self._get_ec2_cred()
|
|
uri = '/'.join([self._get_ec2_cred_uri(), ec2_cred['access']])
|
|
cred_from_credential_api = (
|
|
PROVIDERS.credential_api
|
|
.list_credentials_for_user(self.user_id, type=CRED_TYPE_EC2))
|
|
self.assertEqual(1, len(cred_from_credential_api))
|
|
self.delete(uri)
|
|
self.assertRaises(exception.CredentialNotFound,
|
|
PROVIDERS.credential_api.get_credential,
|
|
cred_from_credential_api[0]['id'])
|