Use testresources for example files

The example files were loaded at import time rather than used as
a test fixture.

Change-Id: I3c1ac4db4b269725bf83904c1568a86b45eb7e55
This commit is contained in:
Brant Knudson
2013-12-03 10:59:03 -06:00
parent ffc699c1ac
commit 7fadf95a14
4 changed files with 409 additions and 336 deletions

View File

@@ -16,7 +16,9 @@
import os
import fixtures
import six
import testresources
from keystoneclient.common import cms
from keystoneclient.openstack.common import jsonutils
@@ -30,279 +32,306 @@ CERTDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'certs')
CMSDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'cms')
KEYDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'private')
# @TODO(mordred) This should become a testresources resource attached to the
# class
# The data for these tests are signed using openssl and are stored in files
# in the signing subdirectory. In order to keep the values consistent between
# the tests and the signed documents, we read them in for use in the tests.
with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f:
SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f:
SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f:
SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
REVOKED_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem')
with open(SIGNING_CERT_FILE) as f:
SIGNING_CERT = f.read()
class Examples(fixtures.Fixture):
"""Example tokens and certs loaded from the examples directory.
SIGNING_KEY_FILE = os.path.join(KEYDIR, 'signing_key.pem')
with open(SIGNING_KEY_FILE) as f:
SIGNING_KEY = f.read()
To use this class correctly, the module needs to override the test suite
class to use testresources.OptimisingTestSuite (otherwise the files will
be read on every test). This is done by defining a load_tests function
in the module, like this:
SIGNING_CA_FILE = os.path.join(CERTDIR, 'cacert.pem')
with open(SIGNING_CA_FILE) as f:
SIGNING_CA = f.read()
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
(see http://docs.python.org/2/library/unittest.html#load-tests-protocol )
REVOKED_TOKEN_HASH = utils.hash_signed_token(REVOKED_TOKEN)
REVOKED_TOKEN_LIST = {'revoked': [{'id': REVOKED_TOKEN_HASH,
'expires': timeutils.utcnow()}]}
REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(REVOKED_TOKEN_LIST)
"""
REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(REVOKED_v3_TOKEN)
REVOKED_v3_TOKEN_LIST = {'revoked': [{'id': REVOKED_v3_TOKEN_HASH,
'expires': timeutils.utcnow()}]}
REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(REVOKED_v3_TOKEN_LIST)
def setUp(self):
super(Examples, self).setUp()
SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(SIGNED_TOKEN_SCOPED)
SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(SIGNED_TOKEN_UNSCOPED)
SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token(SIGNED_v3_TOKEN_SCOPED)
# The data for several tests are signed using openssl and are stored in
# files in the signing subdirectory. In order to keep the values
# consistent between the tests and the signed documents, we read them
# in for use in the tests.
INVALID_SIGNED_TOKEN = \
"MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" \
"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" \
"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" \
"DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD" \
"EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" \
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" \
"0000000000000000000000000000000000000000000000000000000000000000" \
"1111111111111111111111111111111111111111111111111111111111111111" \
"2222222222222222222222222222222222222222222222222222222222222222" \
"3333333333333333333333333333333333333333333333333333333333333333" \
"4444444444444444444444444444444444444444444444444444444444444444" \
"5555555555555555555555555555555555555555555555555555555555555555" \
"6666666666666666666666666666666666666666666666666666666666666666" \
"7777777777777777777777777777777777777777777777777777777777777777" \
"8888888888888888888888888888888888888888888888888888888888888888" \
"9999999999999999999999999999999999999999999999999999999999999999" \
"0000000000000000000000000000000000000000000000000000000000000000" \
with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f:
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f:
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f:
self.SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
self.SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
self.REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
self.SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
self.SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem')
with open(self.SIGNING_CERT_FILE) as f:
self.SIGNING_CERT = f.read()
# JSON responses keyed by token ID
TOKEN_RESPONSES = {
UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': UUID_TOKEN_DEFAULT,
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
self.SIGNING_KEY_FILE = os.path.join(KEYDIR, 'signing_key.pem')
with open(self.SIGNING_KEY_FILE) as f:
self.SIGNING_KEY = f.read()
self.SIGNING_CA_FILE = os.path.join(CERTDIR, 'cacert.pem')
with open(self.SIGNING_CA_FILE) as f:
self.SIGNING_CA = f.read()
self.UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
self.UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
self.UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
self.VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
self.v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
self.v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
self.v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
self.REVOKED_TOKEN_LIST = (
{'revoked': [{'id': self.REVOKED_TOKEN_HASH,
'expires': timeutils.utcnow()}]})
self.REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(self.REVOKED_TOKEN_LIST)
self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(
self.REVOKED_v3_TOKEN)
self.REVOKED_v3_TOKEN_LIST = (
{'revoked': [{'id': self.REVOKED_v3_TOKEN_HASH,
'expires': timeutils.utcnow()}]})
self.REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(
self.REVOKED_v3_TOKEN_LIST)
self.SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(
self.SIGNED_TOKEN_SCOPED)
self.SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(
self.SIGNED_TOKEN_UNSCOPED)
self.SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token(
self.SIGNED_v3_TOKEN_SCOPED)
self.INVALID_SIGNED_TOKEN = (
"MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
"DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
"EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
"0000000000000000000000000000000000000000000000000000000000000000"
"1111111111111111111111111111111111111111111111111111111111111111"
"2222222222222222222222222222222222222222222222222222222222222222"
"3333333333333333333333333333333333333333333333333333333333333333"
"4444444444444444444444444444444444444444444444444444444444444444"
"5555555555555555555555555555555555555555555555555555555555555555"
"6666666666666666666666666666666666666666666666666666666666666666"
"7777777777777777777777777777777777777777777777777777777777777777"
"8888888888888888888888888888888888888888888888888888888888888888"
"9999999999999999999999999999999999999999999999999999999999999999"
"0000000000000000000000000000000000000000000000000000000000000000")
# JSON responses keyed by token ID
self.TOKEN_RESPONSES = {
self.UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': self.UUID_TOKEN_DEFAULT,
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
},
VALID_DIABLO_TOKEN: {
'access': {
'token': {
'id': VALID_DIABLO_TOKEN,
'expires': '2020-01-01T00:00:10.000123Z',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': UUID_TOKEN_UNSCOPED,
'expires': '2020-01-01T00:00:10.000123Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
self.VALID_DIABLO_TOKEN: {
'access': {
'token': {
'id': self.VALID_DIABLO_TOKEN,
'expires': '2020-01-01T00:00:10.000123Z',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
v3_UUID_TOKEN_DEFAULT: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
self.UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': self.UUID_TOKEN_UNSCOPED,
'expires': '2020-01-01T00:00:10.000123Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
self.UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
self.v3_UUID_TOKEN_DEFAULT: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
self.v3_UUID_TOKEN_UNSCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
self.v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
self.SIGNED_TOKEN_SCOPED_KEY: {
'access': {
'token': {
'id': self.SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
self.SIGNED_TOKEN_UNSCOPED_KEY: {
'access': {
'token': {
'id': self.SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
self.SIGNED_v3_TOKEN_SCOPED_KEY: {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
v3_UUID_TOKEN_UNSCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
SIGNED_TOKEN_SCOPED_KEY: {
'access': {
'token': {
'id': SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
SIGNED_TOKEN_UNSCOPED_KEY: {
'access': {
'token': {
'id': SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
SIGNED_v3_TOKEN_SCOPED_KEY: {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
}
},
}
self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in
six.iteritems(self.TOKEN_RESPONSES)])
JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in
six.iteritems(TOKEN_RESPONSES)])
EXAMPLES_RESOURCE = testresources.FixtureResource(Examples())

View File

@@ -29,6 +29,7 @@ import uuid
import fixtures
import httpretty
import mock
import testresources
import webob
from keystoneclient.common import cms
@@ -320,7 +321,10 @@ if tuple(sys.version_info)[0:2] < (2, 7):
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
@httpretty.activate
def test_fetch_revocation_list_with_expire(self):
@@ -333,20 +337,24 @@ class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
responses = [httpretty.Response(body='', status=401),
httpretty.Response(
body=client_fixtures.SIGNED_REVOCATION_LIST)]
body=self.examples.SIGNED_REVOCATION_LIST)]
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/revoked" % BASE_URI,
responses=responses)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
# Check that 4 requests have been made
self.assertEqual(len(httpretty.httpretty.latest_requests), 4)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
"""Auth Token middleware should understand Diablo keystone responses."""
def setUp(self):
# pre-diablo only had Tenant ID, which was also the Name
@@ -372,8 +380,8 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
self.token_id = client_fixtures.VALID_DIABLO_TOKEN
token_response = client_fixtures.JSON_TOKEN_RESPONSES[self.token_id]
self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, self.token_id),
@@ -507,7 +515,7 @@ class CommonAuthTokenMiddlewareTest(object):
tmp_name = uuid.uuid4().hex
test_parent_signing_dir = "/tmp/%s" % tmp_name
self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
self.middleware.signing_cert_file_name = "%s/test.pem" %\
self.middleware.signing_cert_file_name = "%s/test.pem" % \
self.middleware.signing_dirname
self.middleware.verify_signing_dir()
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
@@ -546,7 +554,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_get_token_revocation_list_fetched_time_returns_utc(self):
with TimezoneFixture('UTC-1'):
self.middleware.token_revocation_list = jsonutils.dumps(
client_fixtures.REVOCATION_LIST)
self.examples.REVOCATION_LIST)
self.middleware.token_revocation_list_fetched_time = None
fetched_time = self.middleware.token_revocation_list_fetched_time
self.assertTrue(timeutils.is_soon(fetched_time, 1))
@@ -562,7 +570,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list,
client_fixtures.REVOCATION_LIST)
self.examples.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware.token_revocation_list,
@@ -586,7 +594,7 @@ class CommonAuthTokenMiddlewareTest(object):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection
@@ -603,7 +611,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_request_invalid_signed_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN
req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_TOKEN
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
@@ -747,23 +755,23 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
def test_token_is_v2_accepts_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
token = self.examples.UUID_TOKEN_DEFAULT
token_response = self.examples.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v2(token_response))
def test_token_is_v2_rejects_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
token = self.examples.v3_UUID_TOKEN_DEFAULT
token_response = self.examples.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v2(token_response))
def test_token_is_v3_rejects_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
token = self.examples.UUID_TOKEN_DEFAULT
token_response = self.examples.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v3(token_response))
def test_token_is_v3_accepts_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token]
token = self.examples.v3_UUID_TOKEN_DEFAULT
token_response = self.examples.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v3(token_response))
def test_encrypt_cache_data(self):
@@ -895,7 +903,11 @@ class CommonAuthTokenMiddlewareTest(object):
with_catalog=False)
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def setUp(self):
super(CertDownloadMiddlewareTest, self).setUp()
self.base_dir = tempfile.mkdtemp()
@@ -927,7 +939,7 @@ class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
status=404)
self.assertRaises(exceptions.CertificateConfigError,
self.middleware.verify_signed_token,
client_fixtures.SIGNED_TOKEN_SCOPED)
self.examples.SIGNED_TOKEN_SCOPED)
def test_fetch_signing_cert(self):
data = 'FAKE CERT'
@@ -1005,7 +1017,8 @@ def network_error_response(method, uri, headers):
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
CommonAuthTokenMiddlewareTest):
CommonAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
"""v2 token specific tests.
There are some differences between how the auth-token middleware handles
@@ -1022,17 +1035,19 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
"""
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def setUp(self):
super(v2AuthTokenMiddlewareTest, self).setUp()
self.token_dict = {
'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED,
'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED,
'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED,
'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED,
'signed_token_scoped_expired':
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': client_fixtures.REVOKED_TOKEN,
'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH
self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': self.examples.REVOKED_TOKEN,
'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH
}
httpretty.httpretty.reset()
@@ -1049,16 +1064,16 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/revoked" % BASE_URI,
body=client_fixtures.SIGNED_REVOCATION_LIST,
body=self.examples.SIGNED_REVOCATION_LIST,
status=200)
for token in (client_fixtures.UUID_TOKEN_DEFAULT,
client_fixtures.UUID_TOKEN_UNSCOPED,
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG):
for token in (self.examples.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_UNSCOPED,
self.examples.UUID_TOKEN_NO_SERVICE_CATALOG):
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, token),
body=
client_fixtures.JSON_TOKEN_RESPONSES[token])
self.examples.JSON_TOKEN_RESPONSES[token])
httpretty.register_uri(httpretty.GET,
'%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
@@ -1088,11 +1103,11 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_default_tenant_uuid_token(self):
self.assert_unscoped_default_tenant_auto_scopes(
client_fixtures.UUID_TOKEN_DEFAULT)
self.examples.UUID_TOKEN_DEFAULT)
def test_default_tenant_signed_token(self):
self.assert_unscoped_default_tenant_auto_scopes(
client_fixtures.SIGNED_TOKEN_SCOPED)
self.examples.SIGNED_TOKEN_SCOPED)
def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected."""
@@ -1105,24 +1120,27 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_unscoped_uuid_token_receives_401(self):
self.assert_unscoped_token_receives_401(
client_fixtures.UUID_TOKEN_UNSCOPED)
self.examples.UUID_TOKEN_UNSCOPED)
def test_unscoped_pki_token_receives_401(self):
self.assert_unscoped_token_receives_401(
client_fixtures.SIGNED_TOKEN_UNSCOPED)
self.examples.SIGNED_TOKEN_UNSCOPED)
def test_request_prevent_service_catalog_injection(self):
req = webob.Request.blank('/')
req.headers['X-Service-Catalog'] = '[]'
req.headers['X-Auth-Token'] = \
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG
self.examples.UUID_TOKEN_NO_SERVICE_CATALOG
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertFalse(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
@httpretty.activate
def test_valid_uuid_request_forced_to_2_0(self):
@@ -1149,27 +1167,28 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN)
token = client_fixtures.UUID_TOKEN_DEFAULT
token = self.examples.UUID_TOKEN_DEFAULT
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, token),
body=
client_fixtures.JSON_TOKEN_RESPONSES[token])
self.examples.JSON_TOKEN_RESPONSES[token])
self.set_middleware(conf=conf)
# This tests will only work is auth_token has chosen to use the
# lower, v2, api version
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT
req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual("/testadmin/v2.0/tokens/%s" %
client_fixtures.UUID_TOKEN_DEFAULT,
self.examples.UUID_TOKEN_DEFAULT,
httpretty.httpretty.last_request.path)
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
CommonAuthTokenMiddlewareTest):
CommonAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
"""Test auth_token middleware with v3 tokens.
Re-execute the AuthTokenMiddlewareTest class tests, but with the
@@ -1193,19 +1212,22 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
the highest available auth version, i.e. v3.0
"""
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def setUp(self):
super(v3AuthTokenMiddlewareTest, self).setUp(
auth_version='v3.0',
fake_app=v3FakeApp)
self.token_dict = {
'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED,
'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED,
'uuid_token_default': self.examples.v3_UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': self.examples.v3_UUID_TOKEN_UNSCOPED,
'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED,
'signed_token_scoped_expired':
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': client_fixtures.REVOKED_v3_TOKEN,
'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH
self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': self.examples.REVOKED_v3_TOKEN,
'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH
}
httpretty.httpretty.reset()
@@ -1225,7 +1247,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2
httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/revoked" % BASE_URI,
body=client_fixtures.SIGNED_REVOCATION_LIST,
body=self.examples.SIGNED_REVOCATION_LIST,
status=200)
httpretty.register_uri(httpretty.GET,
@@ -1251,7 +1273,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
raise auth_token.NetworkError("Network connection error.")
try:
response = client_fixtures.JSON_TOKEN_RESPONSES[token_id]
response = self.examples.JSON_TOKEN_RESPONSES[token_id]
except KeyError:
status = 404
@@ -1274,7 +1296,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'HTTP_X_ROLE': '',
}
self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED,
self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED,
with_catalog=False)
self.assertLastPath('/testadmin/v3/auth/tokens')
@@ -1293,7 +1315,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
}
self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(
client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertLastPath('/testadmin/v3/auth/tokens')
@@ -1512,3 +1534,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires = timeutils.strtime(some_time_earlier) + '-02:00'
self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token))
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)

View File

@@ -16,6 +16,7 @@ import os
import subprocess
import mock
import testresources
from keystoneclient.common import cms
from keystoneclient import exceptions
@@ -23,10 +24,12 @@ from keystoneclient.tests import client_fixtures
from keystoneclient.tests import utils
class CMSTest(utils.TestCase):
class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
"""Unit tests for the keystoneclient.common.cms module."""
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def test_cms_verify(self):
self.assertRaises(exceptions.CertificateConfigError,
cms.cms_verify,
@@ -39,33 +42,33 @@ class CMSTest(utils.TestCase):
'auth_token_scoped.pem')) as f:
AUTH_TOKEN_SCOPED_CMS = f.read()
self.assertEqual(cms.token_to_cms(client_fixtures.SIGNED_TOKEN_SCOPED),
self.assertEqual(cms.token_to_cms(self.examples.SIGNED_TOKEN_SCOPED),
AUTH_TOKEN_SCOPED_CMS)
tok = cms.cms_to_token(cms.token_to_cms(
client_fixtures.SIGNED_TOKEN_SCOPED))
self.assertEqual(tok, client_fixtures.SIGNED_TOKEN_SCOPED)
self.examples.SIGNED_TOKEN_SCOPED))
self.assertEqual(tok, self.examples.SIGNED_TOKEN_SCOPED)
def test_ans1_token(self):
self.assertTrue(cms.is_ans1_token(client_fixtures.SIGNED_TOKEN_SCOPED))
self.assertTrue(cms.is_ans1_token(self.examples.SIGNED_TOKEN_SCOPED))
self.assertFalse(cms.is_ans1_token('FOOBAR'))
def test_cms_sign_token_no_files(self):
self.assertRaises(subprocess.CalledProcessError,
cms.cms_sign_token,
client_fixtures.SIGNED_TOKEN_SCOPED,
self.examples.SIGNED_TOKEN_SCOPED,
'/no/such/file', '/no/such/key')
def test_cms_sign_token_success(self):
self.assertTrue(
cms.cms_sign_token(client_fixtures.SIGNED_TOKEN_SCOPED,
client_fixtures.SIGNING_CERT_FILE,
client_fixtures.SIGNING_KEY_FILE))
cms.cms_sign_token(self.examples.SIGNED_TOKEN_SCOPED,
self.examples.SIGNING_CERT_FILE,
self.examples.SIGNING_KEY_FILE))
def test_cms_verify_token_no_files(self):
self.assertRaises(exceptions.CertificateConfigError,
cms.cms_verify,
client_fixtures.SIGNED_TOKEN_SCOPED,
self.examples.SIGNED_TOKEN_SCOPED,
'/no/such/file', '/no/such/key')
def test_cms_verify_token_no_oserror(self):
@@ -86,26 +89,30 @@ class CMSTest(utils.TestCase):
self.fail('Expected subprocess.CalledProcessError')
def test_cms_verify_token_scoped(self):
cms_content = cms.token_to_cms(client_fixtures.SIGNED_TOKEN_SCOPED)
cms_content = cms.token_to_cms(self.examples.SIGNED_TOKEN_SCOPED)
self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE))
self.examples.SIGNING_CERT_FILE,
self.examples.SIGNING_CA_FILE))
def test_cms_verify_token_scoped_expired(self):
cms_content = cms.token_to_cms(
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED)
self.examples.SIGNED_TOKEN_SCOPED_EXPIRED)
self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE))
self.examples.SIGNING_CERT_FILE,
self.examples.SIGNING_CA_FILE))
def test_cms_verify_token_unscoped(self):
cms_content = cms.token_to_cms(client_fixtures.SIGNED_TOKEN_UNSCOPED)
cms_content = cms.token_to_cms(self.examples.SIGNED_TOKEN_UNSCOPED)
self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE))
self.examples.SIGNING_CERT_FILE,
self.examples.SIGNING_CA_FILE))
def test_cms_verify_token_v3_scoped(self):
cms_content = cms.token_to_cms(client_fixtures.SIGNED_v3_TOKEN_SCOPED)
cms_content = cms.token_to_cms(self.examples.SIGNED_v3_TOKEN_SCOPED)
self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE))
self.examples.SIGNING_CERT_FILE,
self.examples.SIGNING_CA_FILE))
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)

View File

@@ -14,6 +14,8 @@
import datetime
import testresources
from keystoneclient import access
from keystoneclient.openstack.common import timeutils
from keystoneclient.tests import client_fixtures as token_data
@@ -22,11 +24,12 @@ from keystoneclient.tests.v2_0 import utils
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
DIABLO_TOKEN = token_data.TOKEN_RESPONSES[token_data.VALID_DIABLO_TOKEN]
GRIZZLY_TOKEN = token_data.TOKEN_RESPONSES[token_data.SIGNED_TOKEN_SCOPED_KEY]
class AccessInfoTest(utils.TestCase):
class AccessInfoTest(utils.TestCase, testresources.ResourcedTestCase):
resources = [('examples', token_data.EXAMPLES_RESOURCE)]
def test_building_unscoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
@@ -100,7 +103,9 @@ class AccessInfoTest(utils.TestCase):
self.assertFalse(auth_ref.domain_scoped)
def test_diablo_token(self):
auth_ref = access.AccessInfo.factory(body=DIABLO_TOKEN)
diablo_token = self.examples.TOKEN_RESPONSES[
self.examples.VALID_DIABLO_TOKEN]
auth_ref = access.AccessInfo.factory(body=diablo_token)
self.assertTrue(auth_ref)
self.assertEqual(auth_ref.username, 'user_name1')
@@ -113,7 +118,9 @@ class AccessInfoTest(utils.TestCase):
self.assertFalse(auth_ref.scoped)
def test_grizzly_token(self):
auth_ref = access.AccessInfo.factory(body=GRIZZLY_TOKEN)
grizzly_token = self.examples.TOKEN_RESPONSES[
self.examples.SIGNED_TOKEN_SCOPED_KEY]
auth_ref = access.AccessInfo.factory(body=grizzly_token)
self.assertEqual(auth_ref.project_id, 'tenant_id1')
self.assertEqual(auth_ref.project_name, 'tenant_name1')
@@ -121,3 +128,7 @@ class AccessInfoTest(utils.TestCase):
self.assertEqual(auth_ref.project_domain_name, 'Default')
self.assertEqual(auth_ref.user_domain_id, 'default')
self.assertEqual(auth_ref.user_domain_name, 'Default')
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)