Clean up new_credential_ref usage and surrounding code

Standardize use of unit.new_credential_ref(). Remove methods in preference
for the common function.

Refactor the credential creation code to simplify and standardize the tests.

Change-Id: I4274ea9ae17ae7b8b18dc0c86c9f9496a0803c71
This commit is contained in:
Sean Perry 2015-11-17 17:31:11 -08:00
parent d8d57fc1bf
commit 44d0c2f5a5
8 changed files with 140 additions and 166 deletions

View File

@ -16,6 +16,7 @@ from __future__ import absolute_import
import atexit
import datetime
import functools
import hashlib
import json
import logging
import os
@ -350,22 +351,54 @@ def new_group_ref(domain_id, **kwargs):
return ref
def new_credential_ref(user_id, project_id=None, cred_type=None):
def new_credential_ref(user_id, project_id=None, type='cert', **kwargs):
ref = {
'id': uuid.uuid4().hex,
'user_id': user_id,
'type': type,
}
if cred_type == 'ec2':
ref['type'] = 'ec2'
ref['blob'] = uuid.uuid4().hex
else:
ref['type'] = 'cert'
ref['blob'] = uuid.uuid4().hex
if project_id:
ref['project_id'] = project_id
if 'blob' not in kwargs:
ref['blob'] = uuid.uuid4().hex
ref.update(kwargs)
return ref
def new_cert_credential(user_id, project_id=None, blob=None, **kwargs):
if blob is None:
blob = {'access': uuid.uuid4().hex, 'secret': uuid.uuid4().hex}
credential = new_credential_ref(user_id=user_id,
project_id=project_id,
blob=json.dumps(blob),
type='cert',
**kwargs)
return blob, credential
def new_ec2_credential(user_id, project_id=None, blob=None, **kwargs):
if blob is None:
blob = {
'access': uuid.uuid4().hex,
'secret': uuid.uuid4().hex,
'trust_id': None
}
if 'id' not in kwargs:
access = blob['access'].encode('utf-8')
kwargs['id'] = hashlib.sha256(access).hexdigest()
credential = new_credential_ref(user_id=user_id,
project_id=project_id,
blob=json.dumps(blob),
type='ec2',
**kwargs)
return blob, credential
def new_role_ref(**kwargs):
ref = {
'id': uuid.uuid4().hex,

View File

@ -745,17 +745,10 @@ class SqlModuleInitialization(unit.TestCase):
class SqlCredential(SqlTests):
def _create_credential_with_user_id(self, user_id=uuid.uuid4().hex):
credential_id = uuid.uuid4().hex
new_credential = {
'id': credential_id,
'user_id': user_id,
'project_id': uuid.uuid4().hex,
'blob': uuid.uuid4().hex,
'type': uuid.uuid4().hex,
'extra': uuid.uuid4().hex
}
self.credential_api.create_credential(credential_id, new_credential)
return new_credential
credential = unit.new_credential_ref(user_id=user_id,
extra=uuid.uuid4().hex)
self.credential_api.create_credential(credential['id'], credential)
return credential
def _validateCredentialList(self, retrieved_credentials,
expected_credentials):

View File

@ -34,14 +34,13 @@ class TestCredentialEc2(unit.TestCase):
self.load_fixtures(default_fixtures)
self.user_id = self.user_foo['id']
self.project_id = self.tenant_bar['id']
self.blob = {'access': uuid.uuid4().hex,
'secret': uuid.uuid4().hex}
self.controller = controllers.Ec2Controller()
self.creds_ref = {'user_id': self.user_id,
'tenant_id': self.project_id,
'access': self.blob['access'],
'secret': self.blob['secret'],
'trust_id': None}
self.blob, tmp_ref = unit.new_ec2_credential(
user_id=self.user_id,
project_id=self.project_id)
self.creds_ref = (controllers.Ec2Controller
._convert_v3_to_ec2_credential(tmp_ref))
def test_signature_validate_no_host_port(self):
"""Test signature validation with the access/secret provided."""
@ -149,17 +148,10 @@ class TestCredentialEc2(unit.TestCase):
'path': '/bar',
'params': params}
creds_ref = {'user_id': self.user_id,
'tenant_id': self.project_id,
'access': self.blob['access'],
'secret': self.blob['secret'],
'trust_id': None
}
# Now validate the signature based on the dummy request
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
creds_ref, sig_ref)
self.creds_ref, sig_ref)
def test_signature_validate_invalid_signature(self):
"""Signature is not signed on the correct data."""
@ -182,17 +174,10 @@ class TestCredentialEc2(unit.TestCase):
'path': '/bar',
'params': params}
creds_ref = {'user_id': self.user_id,
'tenant_id': self.project_id,
'access': self.blob['access'],
'secret': self.blob['secret'],
'trust_id': None
}
# Now validate the signature based on the dummy request
self.assertRaises(exception.Unauthorized,
self.controller.check_signature,
creds_ref, sig_ref)
self.creds_ref, sig_ref)
def test_check_non_admin_user(self):
"""Checking if user is admin causes uncaught error.

View File

@ -246,10 +246,6 @@ class RestfulTestCase(unit.SQLDriverOverrides, rest.RestfulTestCase,
# The server adds 'enabled' and defaults to True.
self.endpoint['enabled'] = True
def new_credential_ref(self, user_id, project_id=None, cred_type=None):
return unit.new_credential_ref(user_id, project_id=project_id,
cred_type=cred_type)
def create_new_default_project_for_user(self, user_id, domain_id,
enable_project=True):
ref = unit.new_project_ref(domain_id=domain_id, enabled=enable_project)

View File

@ -31,25 +31,19 @@ CONF = cfg.CONF
class CredentialBaseTestCase(test_v3.RestfulTestCase):
def _create_dict_blob_credential(self):
blob = {"access": uuid.uuid4().hex,
"secret": uuid.uuid4().hex}
credential_id = hashlib.sha256(blob['access']).hexdigest()
credential = self.new_credential_ref(
user_id=self.user['id'],
project_id=self.project_id)
credential['id'] = credential_id
blob, credential = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
# Store the blob as a dict *not* JSON ref bug #1259584
# This means we can test the dict->json workaround, added
# as part of the bugfix for backwards compatibility works.
credential['blob'] = blob
credential['type'] = 'ec2'
credential_id = credential['id']
# Create direct via the DB API to avoid validation failure
self.credential_api.create_credential(
credential_id,
credential)
expected_blob = json.dumps(blob)
return expected_blob, credential_id
self.credential_api.create_credential(credential_id, credential)
return json.dumps(blob), credential_id
class CredentialTestCase(CredentialBaseTestCase):
@ -59,13 +53,11 @@ class CredentialTestCase(CredentialBaseTestCase):
super(CredentialTestCase, self).setUp()
self.credential_id = uuid.uuid4().hex
self.credential = self.new_credential_ref(
user_id=self.user['id'],
project_id=self.project_id)
self.credential['id'] = self.credential_id
self.credential = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
self.credential_api.create_credential(
self.credential_id,
self.credential['id'],
self.credential)
def test_credential_api_delete_credentials_for_project(self):
@ -74,7 +66,7 @@ class CredentialTestCase(CredentialBaseTestCase):
# once we delete all credentials for self.project_id
self.assertRaises(exception.CredentialNotFound,
self.credential_api.get_credential,
credential_id=self.credential_id)
credential_id=self.credential['id'])
def test_credential_api_delete_credentials_for_user(self):
self.credential_api.delete_credentials_for_user(self.user_id)
@ -82,7 +74,7 @@ class CredentialTestCase(CredentialBaseTestCase):
# once we delete all credentials for self.user_id
self.assertRaises(exception.CredentialNotFound,
self.credential_api.get_credential,
credential_id=self.credential_id)
credential_id=self.credential['id'])
def test_list_credentials(self):
"""Call ``GET /credentials``."""
@ -91,10 +83,8 @@ class CredentialTestCase(CredentialBaseTestCase):
def test_list_credentials_filtered_by_user_id(self):
"""Call ``GET /credentials?user_id={user_id}``."""
credential = self.new_credential_ref(
user_id=uuid.uuid4().hex)
self.credential_api.create_credential(
credential['id'], credential)
credential = unit.new_credential_ref(user_id=uuid.uuid4().hex)
self.credential_api.create_credential(credential['id'], credential)
r = self.get('/credentials?user_id=%s' % self.user['id'])
self.assertValidCredentialListResponse(r, ref=self.credential)
@ -105,9 +95,9 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``GET /credentials?type={type}``."""
# The type ec2 was chosen, instead of a random string,
# because the type must be in the list of supported types
ec2_credential = self.new_credential_ref(user_id=uuid.uuid4().hex,
ec2_credential = unit.new_credential_ref(user_id=uuid.uuid4().hex,
project_id=self.project_id,
cred_type='ec2')
type='ec2')
ec2_resp = self.credential_api.create_credential(
ec2_credential['id'], ec2_credential)
@ -134,12 +124,10 @@ class CredentialTestCase(CredentialBaseTestCase):
user2_id = uuid.uuid4().hex
# Creating credentials for two different users
credential_user1_ec2 = self.new_credential_ref(
user_id=user1_id, cred_type='ec2')
credential_user1_cert = self.new_credential_ref(
user_id=user1_id)
credential_user2_cert = self.new_credential_ref(
user_id=user2_id)
credential_user1_ec2 = unit.new_credential_ref(user_id=user1_id,
type='ec2')
credential_user1_cert = unit.new_credential_ref(user_id=user1_id)
credential_user2_cert = unit.new_credential_ref(user_id=user2_id)
self.credential_api.create_credential(
credential_user1_ec2['id'], credential_user1_ec2)
@ -157,7 +145,7 @@ class CredentialTestCase(CredentialBaseTestCase):
def test_create_credential(self):
"""Call ``POST /credentials``."""
ref = self.new_credential_ref(user_id=self.user['id'])
ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post(
'/credentials',
body={'credential': ref})
@ -167,18 +155,17 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``GET /credentials/{credential_id}``."""
r = self.get(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential_id})
'credential_id': self.credential['id']})
self.assertValidCredentialResponse(r, self.credential)
def test_update_credential(self):
"""Call ``PATCH /credentials/{credential_id}``."""
ref = self.new_credential_ref(
user_id=self.user['id'],
project_id=self.project_id)
ref = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
del ref['id']
r = self.patch(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential_id},
'credential_id': self.credential['id']},
body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
@ -186,23 +173,18 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``DELETE /credentials/{credential_id}``."""
self.delete(
'/credentials/%(credential_id)s' % {
'credential_id': self.credential_id})
'credential_id': self.credential['id']})
def test_create_ec2_credential(self):
"""Call ``POST /credentials`` for creating ec2 credential."""
ref = self.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
blob = {"access": uuid.uuid4().hex,
"secret": uuid.uuid4().hex}
ref['blob'] = json.dumps(blob)
ref['type'] = 'ec2'
r = self.post(
'/credentials',
body={'credential': ref})
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
# Assert credential id is same as hash of access key id for
# ec2 credentials
self.assertEqual(hashlib.sha256(blob['access']).hexdigest(),
access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id'])
# Create second ec2 credential with the same access key id and check
# for conflict.
@ -217,7 +199,11 @@ class CredentialTestCase(CredentialBaseTestCase):
r = self.get(
'/credentials/%(credential_id)s' % {
'credential_id': credential_id})
self.assertEqual(expected_blob, r.result['credential']['blob'])
# use json.loads to transform the blobs back into Python dictionaries
# to avoid problems with the keys being in different orders.
self.assertEqual(json.loads(expected_blob),
json.loads(r.result['credential']['blob']))
def test_list_ec2_dict_blob(self):
"""Ensure non-JSON blob data is correctly converted."""
@ -227,34 +213,31 @@ class CredentialTestCase(CredentialBaseTestCase):
list_creds = list_r.result['credentials']
list_ids = [r['id'] for r in list_creds]
self.assertIn(credential_id, list_ids)
# use json.loads to transform the blobs back into Python dictionaries
# to avoid problems with the keys being in different orders.
for r in list_creds:
if r['id'] == credential_id:
self.assertEqual(expected_blob, r['blob'])
self.assertEqual(json.loads(expected_blob),
json.loads(r['blob']))
def test_create_non_ec2_credential(self):
"""Call ``POST /credentials`` for creating non-ec2 credential."""
ref = self.new_credential_ref(user_id=self.user['id'])
blob = {"access": uuid.uuid4().hex,
"secret": uuid.uuid4().hex}
ref['blob'] = json.dumps(blob)
r = self.post(
'/credentials',
body={'credential': ref})
blob, ref = unit.new_cert_credential(user_id=self.user['id'])
r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
# Assert credential id is not same as hash of access key id for
# non-ec2 credentials
self.assertNotEqual(hashlib.sha256(blob['access']).hexdigest(),
access = blob['access'].encode('utf-8')
self.assertNotEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id'])
def test_create_ec2_credential_with_missing_project_id(self):
"""Call ``POST /credentials`` for creating ec2 credential with missing
project_id.
"""
ref = self.new_credential_ref(user_id=self.user['id'])
blob = {"access": uuid.uuid4().hex,
"secret": uuid.uuid4().hex}
ref['blob'] = json.dumps(blob)
ref['type'] = 'ec2'
_, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=None)
# Assert bad request status when missing project_id
self.post(
'/credentials',
@ -264,10 +247,10 @@ class CredentialTestCase(CredentialBaseTestCase):
"""Call ``POST /credentials`` for creating ec2 credential with invalid
blob.
"""
ref = self.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
ref['blob'] = '{"abc":"def"d}'
ref['type'] = 'ec2'
ref = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id,
blob='{"abc":"def"d}',
type='ec2')
# Assert bad request status when request contains invalid blob
response = self.post(
'/credentials',
@ -276,7 +259,7 @@ class CredentialTestCase(CredentialBaseTestCase):
def test_create_credential_with_admin_token(self):
# Make sure we can create credential with the static admin token
ref = self.new_credential_ref(user_id=self.user['id'])
ref = unit.new_credential_ref(user_id=self.user['id'])
r = self.post(
'/credentials',
body={'credential': ref},
@ -325,16 +308,9 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase):
token_id = r.headers.get('X-Subject-Token')
# Create the credential with the trust scoped token
ref = self.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
blob = {"access": uuid.uuid4().hex,
"secret": uuid.uuid4().hex}
ref['blob'] = json.dumps(blob)
ref['type'] = 'ec2'
r = self.post(
'/credentials',
body={'credential': ref},
token=token_id)
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref}, token=token_id)
# We expect the response blob to contain the trust_id
ret_ref = ref.copy()
@ -345,7 +321,8 @@ class TestCredentialTrustScoped(test_v3.RestfulTestCase):
# Assert credential id is same as hash of access key id for
# ec2 credentials
self.assertEqual(hashlib.sha256(blob['access']).hexdigest(),
access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id'])
# Create second ec2 credential with the same access key id and check
@ -391,19 +368,13 @@ class TestCredentialEc2(CredentialBaseTestCase):
def test_ec2_credential_signature_validate(self):
"""Test signature validation with a v3 ec2 credential."""
ref = self.new_credential_ref(
user_id=self.user['id'],
project_id=self.project_id)
blob = {"access": uuid.uuid4().hex,
"secret": uuid.uuid4().hex}
ref['blob'] = json.dumps(blob)
ref['type'] = 'ec2'
r = self.post(
'/credentials',
body={'credential': ref})
blob, ref = unit.new_ec2_credential(user_id=self.user['id'],
project_id=self.project_id)
r = self.post('/credentials', body={'credential': ref})
self.assertValidCredentialResponse(r, ref)
# Assert credential id is same as hash of access key id
self.assertEqual(hashlib.sha256(blob['access']).hexdigest(),
access = blob['access'].encode('utf-8')
self.assertEqual(hashlib.sha256(access).hexdigest(),
r.result['credential']['id'])
cred_blob = json.loads(r.result['credential']['blob'])
@ -413,7 +384,7 @@ class TestCredentialEc2(CredentialBaseTestCase):
def test_ec2_credential_signature_validate_legacy(self):
"""Test signature validation with a legacy v3 ec2 credential."""
cred_json, credential_id = self._create_dict_blob_credential()
cred_json, _ = self._create_dict_blob_credential()
cred_blob = json.loads(cred_json)
self._validate_signature(access=cred_blob['access'],
secret=cred_blob['secret'])

View File

@ -40,14 +40,12 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.group = self.identity_api.create_group(self.group)
self.group_id = self.group['id']
self.credential_id = uuid.uuid4().hex
self.credential = self.new_credential_ref(
self.credential = unit.new_credential_ref(
user_id=self.user['id'],
project_id=self.project_id)
self.credential['id'] = self.credential_id
self.credential_api.create_credential(
self.credential_id,
self.credential)
self.credential_api.create_credential(self.credential['id'],
self.credential)
# user crud tests
@ -336,15 +334,14 @@ class IdentityTestCase(test_v3.RestfulTestCase):
r = self.credential_api.get_credential(self.credential['id'])
self.assertDictEqual(self.credential, r)
# Create a second credential with a different user
user2 = unit.new_user_ref(domain_id=self.domain['id'],
project_id=self.project['id'])
user2 = self.identity_api.create_user(user2)
self.credential2 = self.new_credential_ref(
user_id=user2['id'],
project_id=self.project['id'])
self.credential_api.create_credential(
self.credential2['id'],
self.credential2)
credential2 = unit.new_credential_ref(user_id=user2['id'],
project_id=self.project['id'])
self.credential_api.create_credential(credential2['id'], credential2)
# Create a token for this user which we can check later
# gets deleted
auth_data = self.build_authentication_request(
@ -371,8 +368,8 @@ class IdentityTestCase(test_v3.RestfulTestCase):
self.user['id'])
self.assertEqual(0, len(tokens))
# But the credential for user2 is unaffected
r = self.credential_api.get_credential(self.credential2['id'])
self.assertDictEqual(self.credential2, r)
r = self.credential_api.get_credential(credential2['id'])
self.assertDictEqual(credential2, r)
# group crud tests

View File

@ -1021,13 +1021,12 @@ class IdentityTestv3CloudPolicySample(test_v3.RestfulTestCase,
self.get(entity_url, auth=self.auth)
def test_list_user_credentials(self):
self.credential_user = self.new_credential_ref(self.just_a_user['id'])
self.credential_api.create_credential(self.credential_user['id'],
self.credential_user)
self.credential_admin = self.new_credential_ref(
self.cloud_admin_user['id'])
self.credential_api.create_credential(self.credential_admin['id'],
self.credential_admin)
credential_user = unit.new_credential_ref(self.just_a_user['id'])
self.credential_api.create_credential(credential_user['id'],
credential_user)
credential_admin = unit.new_credential_ref(self.cloud_admin_user['id'])
self.credential_api.create_credential(credential_admin['id'],
credential_admin)
self.auth = self.build_authentication_request(
user_id=self.just_a_user['id'],

View File

@ -190,7 +190,7 @@ class ResourceTestCase(test_v3.RestfulTestCase,
group = unit.new_group_ref(domain_id=self.domain_id)
group = self.identity_api.create_group(group)
credential = self.new_credential_ref(user_id=self.user['id'],
credential = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
self.credential_api.create_credential(credential['id'], credential)
@ -208,9 +208,10 @@ class ResourceTestCase(test_v3.RestfulTestCase,
group2 = unit.new_group_ref(domain_id=domain2['id'])
group2 = self.identity_api.create_group(group2)
credential2 = self.new_credential_ref(user_id=user2['id'],
credential2 = unit.new_credential_ref(user_id=user2['id'],
project_id=project2['id'])
self.credential_api.create_credential(credential2['id'], credential2)
self.credential_api.create_credential(credential2['id'],
credential2)
# Now disable the new domain and delete it
domain2['enabled'] = False
@ -1023,7 +1024,7 @@ class ResourceTestCase(test_v3.RestfulTestCase,
also deleted, while other credentials are unaffected.
"""
credential = self.new_credential_ref(user_id=self.user['id'],
credential = unit.new_credential_ref(user_id=self.user['id'],
project_id=self.project_id)
self.credential_api.create_credential(credential['id'], credential)
@ -1033,9 +1034,8 @@ class ResourceTestCase(test_v3.RestfulTestCase,
# Create a second credential with a different project
project2 = unit.new_project_ref(domain_id=self.domain['id'])
self.resource_api.create_project(project2['id'], project2)
credential2 = self.new_credential_ref(
user_id=self.user['id'],
project_id=project2['id'])
credential2 = unit.new_credential_ref(user_id=self.user['id'],
project_id=project2['id'])
self.credential_api.create_credential(credential2['id'], credential2)
# Now delete the project