Merge "Use testresources for example files"

This commit is contained in:
Jenkins
2014-01-02 18:07:56 +00:00
committed by Gerrit Code Review
4 changed files with 409 additions and 336 deletions

View File

@@ -16,7 +16,9 @@
import os import os
import fixtures
import six import six
import testresources
from keystoneclient.common import cms from keystoneclient.common import cms
from keystoneclient.openstack.common import jsonutils from keystoneclient.openstack.common import jsonutils
@@ -30,279 +32,306 @@ CERTDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'certs')
CMSDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'cms') CMSDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'cms')
KEYDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'private') KEYDIR = os.path.join(ROOTDIR, 'examples', 'pki', 'private')
# @TODO(mordred) This should become a testresources resource attached to the
# class
# The data for these tests are signed using openssl and are stored in files
# in the signing subdirectory. In order to keep the values consistent between
# the tests and the signed documents, we read them in for use in the tests.
with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f:
SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f:
SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f:
SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
REVOKED_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem') class Examples(fixtures.Fixture):
with open(SIGNING_CERT_FILE) as f: """Example tokens and certs loaded from the examples directory.
SIGNING_CERT = f.read()
SIGNING_KEY_FILE = os.path.join(KEYDIR, 'signing_key.pem') To use this class correctly, the module needs to override the test suite
with open(SIGNING_KEY_FILE) as f: class to use testresources.OptimisingTestSuite (otherwise the files will
SIGNING_KEY = f.read() be read on every test). This is done by defining a load_tests function
in the module, like this:
SIGNING_CA_FILE = os.path.join(CERTDIR, 'cacert.pem') def load_tests(loader, tests, pattern):
with open(SIGNING_CA_FILE) as f: return testresources.OptimisingTestSuite(tests)
SIGNING_CA = f.read()
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d" (see http://docs.python.org/2/library/unittest.html#load-tests-protocol )
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
REVOKED_TOKEN_HASH = utils.hash_signed_token(REVOKED_TOKEN) """
REVOKED_TOKEN_LIST = {'revoked': [{'id': REVOKED_TOKEN_HASH,
'expires': timeutils.utcnow()}]}
REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(REVOKED_TOKEN_LIST)
REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(REVOKED_v3_TOKEN) def setUp(self):
REVOKED_v3_TOKEN_LIST = {'revoked': [{'id': REVOKED_v3_TOKEN_HASH, super(Examples, self).setUp()
'expires': timeutils.utcnow()}]}
REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(REVOKED_v3_TOKEN_LIST)
SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(SIGNED_TOKEN_SCOPED) # The data for several tests are signed using openssl and are stored in
SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(SIGNED_TOKEN_UNSCOPED) # files in the signing subdirectory. In order to keep the values
SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token(SIGNED_v3_TOKEN_SCOPED) # consistent between the tests and the signed documents, we read them
# in for use in the tests.
INVALID_SIGNED_TOKEN = \ with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f:
"MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" \ self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" \ with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f:
"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" \ self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
"DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD" \ with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f:
"EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" \ self.SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" \ with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
"0000000000000000000000000000000000000000000000000000000000000000" \ self.REVOKED_TOKEN = cms.cms_to_token(f.read())
"1111111111111111111111111111111111111111111111111111111111111111" \ with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
"2222222222222222222222222222222222222222222222222222222222222222" \ self.SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
"3333333333333333333333333333333333333333333333333333333333333333" \ with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
"4444444444444444444444444444444444444444444444444444444444444444" \ self.REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
"5555555555555555555555555555555555555555555555555555555555555555" \ with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
"6666666666666666666666666666666666666666666666666666666666666666" \ self.REVOCATION_LIST = jsonutils.loads(f.read())
"7777777777777777777777777777777777777777777777777777777777777777" \ with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
"8888888888888888888888888888888888888888888888888888888888888888" \ self.SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
"9999999999999999999999999999999999999999999999999999999999999999" \
"0000000000000000000000000000000000000000000000000000000000000000" \
self.SIGNING_CERT_FILE = os.path.join(CERTDIR, 'signing_cert.pem')
with open(self.SIGNING_CERT_FILE) as f:
self.SIGNING_CERT = f.read()
# JSON responses keyed by token ID self.SIGNING_KEY_FILE = os.path.join(KEYDIR, 'signing_key.pem')
TOKEN_RESPONSES = { with open(self.SIGNING_KEY_FILE) as f:
UUID_TOKEN_DEFAULT: { self.SIGNING_KEY = f.read()
'access': {
'token': { self.SIGNING_CA_FILE = os.path.join(CERTDIR, 'cacert.pem')
'id': UUID_TOKEN_DEFAULT, with open(self.SIGNING_CA_FILE) as f:
'expires': '2020-01-01T00:00:10.000123Z', self.SIGNING_CA = f.read()
'tenant': {
'id': 'tenant_id1', self.UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
'name': 'tenant_name1', self.UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
self.UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
self.VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
self.v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
self.v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
self.v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
self.REVOKED_TOKEN_LIST = (
{'revoked': [{'id': self.REVOKED_TOKEN_HASH,
'expires': timeutils.utcnow()}]})
self.REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(self.REVOKED_TOKEN_LIST)
self.REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(
self.REVOKED_v3_TOKEN)
self.REVOKED_v3_TOKEN_LIST = (
{'revoked': [{'id': self.REVOKED_v3_TOKEN_HASH,
'expires': timeutils.utcnow()}]})
self.REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(
self.REVOKED_v3_TOKEN_LIST)
self.SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(
self.SIGNED_TOKEN_SCOPED)
self.SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(
self.SIGNED_TOKEN_UNSCOPED)
self.SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token(
self.SIGNED_v3_TOKEN_SCOPED)
self.INVALID_SIGNED_TOKEN = (
"MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA"
"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"
"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC"
"DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD"
"EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE"
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF"
"0000000000000000000000000000000000000000000000000000000000000000"
"1111111111111111111111111111111111111111111111111111111111111111"
"2222222222222222222222222222222222222222222222222222222222222222"
"3333333333333333333333333333333333333333333333333333333333333333"
"4444444444444444444444444444444444444444444444444444444444444444"
"5555555555555555555555555555555555555555555555555555555555555555"
"6666666666666666666666666666666666666666666666666666666666666666"
"7777777777777777777777777777777777777777777777777777777777777777"
"8888888888888888888888888888888888888888888888888888888888888888"
"9999999999999999999999999999999999999999999999999999999999999999"
"0000000000000000000000000000000000000000000000000000000000000000")
# JSON responses keyed by token ID
self.TOKEN_RESPONSES = {
self.UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': self.UUID_TOKEN_DEFAULT,
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
}, },
}, },
'user': { self.VALID_DIABLO_TOKEN: {
'id': 'user_id1', 'access': {
'name': 'user_name1', 'token': {
'roles': [ 'id': self.VALID_DIABLO_TOKEN,
{'name': 'role1'}, 'expires': '2020-01-01T00:00:10.000123Z',
{'name': 'role2'}, 'tenantId': 'tenant_id1',
], },
}, 'user': {
'serviceCatalog': {} 'id': 'user_id1',
}, 'name': 'user_name1',
}, 'roles': [
VALID_DIABLO_TOKEN: { {'name': 'role1'},
'access': { {'name': 'role2'},
'token': { ],
'id': VALID_DIABLO_TOKEN, },
'expires': '2020-01-01T00:00:10.000123Z',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': UUID_TOKEN_UNSCOPED,
'expires': '2020-01-01T00:00:10.000123Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
}, },
}, },
'user': { self.UUID_TOKEN_UNSCOPED: {
'id': 'user_id1', 'access': {
'name': 'user_name1', 'token': {
'roles': [ 'id': self.UUID_TOKEN_UNSCOPED,
{'name': 'role1'}, 'expires': '2020-01-01T00:00:10.000123Z',
{'name': 'role2'}, },
], 'user': {
} 'id': 'user_id1',
}, 'name': 'user_name1',
}, 'roles': [
v3_UUID_TOKEN_DEFAULT: { {'name': 'role1'},
'token': { {'name': 'role2'},
'expires_at': '2020-01-01T00:00:10.000123Z', ],
'user': { },
'id': 'user_id1', },
'name': 'user_name1', },
'domain': { self.UUID_TOKEN_NO_SERVICE_CATALOG: {
'id': 'domain_id1', 'access': {
'name': 'domain_name1' 'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
self.v3_UUID_TOKEN_DEFAULT: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
} }
}, },
'project': { self.v3_UUID_TOKEN_UNSCOPED: {
'id': 'tenant_id1', 'token': {
'name': 'tenant_name1', 'expires_at': '2020-01-01T00:00:10.000123Z',
'domain': { 'user': {
'id': 'domain_id1', 'id': 'user_id1',
'name': 'domain_name1' 'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
self.v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
self.SIGNED_TOKEN_SCOPED_KEY: {
'access': {
'token': {
'id': self.SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
self.SIGNED_TOKEN_UNSCOPED_KEY: {
'access': {
'token': {
'id': self.SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
self.SIGNED_v3_TOKEN_SCOPED_KEY: {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
} }
}, },
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
} }
},
v3_UUID_TOKEN_UNSCOPED: { self.JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in
'token': { six.iteritems(self.TOKEN_RESPONSES)])
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
SIGNED_TOKEN_SCOPED_KEY: {
'access': {
'token': {
'id': SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
SIGNED_TOKEN_UNSCOPED_KEY: {
'access': {
'token': {
'id': SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
SIGNED_v3_TOKEN_SCOPED_KEY: {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
}
},
}
JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in EXAMPLES_RESOURCE = testresources.FixtureResource(Examples())
six.iteritems(TOKEN_RESPONSES)])

View File

@@ -29,6 +29,7 @@ import uuid
import fixtures import fixtures
import httpretty import httpretty
import mock import mock
import testresources
import webob import webob
from keystoneclient.common import cms from keystoneclient.common import cms
@@ -320,7 +321,10 @@ if tuple(sys.version_info)[0:2] < (2, 7):
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
@httpretty.activate @httpretty.activate
def test_fetch_revocation_list_with_expire(self): def test_fetch_revocation_list_with_expire(self):
@@ -333,20 +337,24 @@ class MultiStepAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
responses = [httpretty.Response(body='', status=401), responses = [httpretty.Response(body='', status=401),
httpretty.Response( httpretty.Response(
body=client_fixtures.SIGNED_REVOCATION_LIST)] body=self.examples.SIGNED_REVOCATION_LIST)]
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/revoked" % BASE_URI, "%s/v2.0/tokens/revoked" % BASE_URI,
responses=responses) responses=responses)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST) self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
# Check that 4 requests have been made # Check that 4 requests have been made
self.assertEqual(len(httpretty.httpretty.latest_requests), 4) self.assertEqual(len(httpretty.httpretty.latest_requests), 4)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
"""Auth Token middleware should understand Diablo keystone responses.""" """Auth Token middleware should understand Diablo keystone responses."""
def setUp(self): def setUp(self):
# pre-diablo only had Tenant ID, which was also the Name # pre-diablo only had Tenant ID, which was also the Name
@@ -372,8 +380,8 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"%s/v2.0/tokens" % BASE_URI, "%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN) body=FAKE_ADMIN_TOKEN)
self.token_id = client_fixtures.VALID_DIABLO_TOKEN self.token_id = self.examples.VALID_DIABLO_TOKEN
token_response = client_fixtures.JSON_TOKEN_RESPONSES[self.token_id] token_response = self.examples.JSON_TOKEN_RESPONSES[self.token_id]
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, self.token_id), "%s/v2.0/tokens/%s" % (BASE_URI, self.token_id),
@@ -507,7 +515,7 @@ class CommonAuthTokenMiddlewareTest(object):
tmp_name = uuid.uuid4().hex tmp_name = uuid.uuid4().hex
test_parent_signing_dir = "/tmp/%s" % tmp_name test_parent_signing_dir = "/tmp/%s" % tmp_name
self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2) self.middleware.signing_dirname = "/tmp/%s/%s" % ((tmp_name,) * 2)
self.middleware.signing_cert_file_name = "%s/test.pem" %\ self.middleware.signing_cert_file_name = "%s/test.pem" % \
self.middleware.signing_dirname self.middleware.signing_dirname
self.middleware.verify_signing_dir() self.middleware.verify_signing_dir()
# NOTE(wu_wenxiang): Verify if the signing dir was created as expected. # NOTE(wu_wenxiang): Verify if the signing dir was created as expected.
@@ -546,7 +554,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_get_token_revocation_list_fetched_time_returns_utc(self): def test_get_token_revocation_list_fetched_time_returns_utc(self):
with TimezoneFixture('UTC-1'): with TimezoneFixture('UTC-1'):
self.middleware.token_revocation_list = jsonutils.dumps( self.middleware.token_revocation_list = jsonutils.dumps(
client_fixtures.REVOCATION_LIST) self.examples.REVOCATION_LIST)
self.middleware.token_revocation_list_fetched_time = None self.middleware.token_revocation_list_fetched_time = None
fetched_time = self.middleware.token_revocation_list_fetched_time fetched_time = self.middleware.token_revocation_list_fetched_time
self.assertTrue(timeutils.is_soon(fetched_time, 1)) self.assertTrue(timeutils.is_soon(fetched_time, 1))
@@ -562,7 +570,7 @@ class CommonAuthTokenMiddlewareTest(object):
self.middleware.token_revocation_list_fetched_time = None self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name) os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list, self.assertEqual(self.middleware.token_revocation_list,
client_fixtures.REVOCATION_LIST) self.examples.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self): def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware.token_revocation_list, self.assertEqual(self.middleware.token_revocation_list,
@@ -586,7 +594,7 @@ class CommonAuthTokenMiddlewareTest(object):
# auth_token uses v2 to fetch this, so don't allow the v3 # auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection # tests to override the fake http connection
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST) self.assertEqual(fetched_list, self.examples.REVOCATION_LIST)
def test_request_invalid_uuid_token(self): def test_request_invalid_uuid_token(self):
# remember because we are testing the middleware we stub the connection # remember because we are testing the middleware we stub the connection
@@ -603,7 +611,7 @@ class CommonAuthTokenMiddlewareTest(object):
def test_request_invalid_signed_token(self): def test_request_invalid_signed_token(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN req.headers['X-Auth-Token'] = self.examples.INVALID_SIGNED_TOKEN
self.middleware(req.environ, self.start_fake_response) self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401) self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'], self.assertEqual(self.response_headers['WWW-Authenticate'],
@@ -747,23 +755,23 @@ class CommonAuthTokenMiddlewareTest(object):
self.assertFalse(auth_token.will_expire_soon(fortyseconds)) self.assertFalse(auth_token.will_expire_soon(fortyseconds))
def test_token_is_v2_accepts_v2(self): def test_token_is_v2_accepts_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT token = self.examples.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token] token_response = self.examples.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v2(token_response)) self.assertTrue(auth_token._token_is_v2(token_response))
def test_token_is_v2_rejects_v3(self): def test_token_is_v2_rejects_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT token = self.examples.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token] token_response = self.examples.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v2(token_response)) self.assertFalse(auth_token._token_is_v2(token_response))
def test_token_is_v3_rejects_v2(self): def test_token_is_v3_rejects_v2(self):
token = client_fixtures.UUID_TOKEN_DEFAULT token = self.examples.UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token] token_response = self.examples.TOKEN_RESPONSES[token]
self.assertFalse(auth_token._token_is_v3(token_response)) self.assertFalse(auth_token._token_is_v3(token_response))
def test_token_is_v3_accepts_v3(self): def test_token_is_v3_accepts_v3(self):
token = client_fixtures.v3_UUID_TOKEN_DEFAULT token = self.examples.v3_UUID_TOKEN_DEFAULT
token_response = client_fixtures.TOKEN_RESPONSES[token] token_response = self.examples.TOKEN_RESPONSES[token]
self.assertTrue(auth_token._token_is_v3(token_response)) self.assertTrue(auth_token._token_is_v3(token_response))
def test_encrypt_cache_data(self): def test_encrypt_cache_data(self):
@@ -895,7 +903,11 @@ class CommonAuthTokenMiddlewareTest(object):
with_catalog=False) with_catalog=False)
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest): class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def setUp(self): def setUp(self):
super(CertDownloadMiddlewareTest, self).setUp() super(CertDownloadMiddlewareTest, self).setUp()
self.base_dir = tempfile.mkdtemp() self.base_dir = tempfile.mkdtemp()
@@ -927,7 +939,7 @@ class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
status=404) status=404)
self.assertRaises(exceptions.CertificateConfigError, self.assertRaises(exceptions.CertificateConfigError,
self.middleware.verify_signed_token, self.middleware.verify_signed_token,
client_fixtures.SIGNED_TOKEN_SCOPED) self.examples.SIGNED_TOKEN_SCOPED)
def test_fetch_signing_cert(self): def test_fetch_signing_cert(self):
data = 'FAKE CERT' data = 'FAKE CERT'
@@ -1005,7 +1017,8 @@ def network_error_response(method, uri, headers):
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
CommonAuthTokenMiddlewareTest): CommonAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
"""v2 token specific tests. """v2 token specific tests.
There are some differences between how the auth-token middleware handles There are some differences between how the auth-token middleware handles
@@ -1022,17 +1035,19 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
""" """
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def setUp(self): def setUp(self):
super(v2AuthTokenMiddlewareTest, self).setUp() super(v2AuthTokenMiddlewareTest, self).setUp()
self.token_dict = { self.token_dict = {
'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT, 'uuid_token_default': self.examples.UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED, 'uuid_token_unscoped': self.examples.UUID_TOKEN_UNSCOPED,
'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED, 'signed_token_scoped': self.examples.SIGNED_TOKEN_SCOPED,
'signed_token_scoped_expired': 'signed_token_scoped_expired':
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED, self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': client_fixtures.REVOKED_TOKEN, 'revoked_token': self.examples.REVOKED_TOKEN,
'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH 'revoked_token_hash': self.examples.REVOKED_TOKEN_HASH
} }
httpretty.httpretty.reset() httpretty.httpretty.reset()
@@ -1049,16 +1064,16 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/revoked" % BASE_URI, "%s/v2.0/tokens/revoked" % BASE_URI,
body=client_fixtures.SIGNED_REVOCATION_LIST, body=self.examples.SIGNED_REVOCATION_LIST,
status=200) status=200)
for token in (client_fixtures.UUID_TOKEN_DEFAULT, for token in (self.examples.UUID_TOKEN_DEFAULT,
client_fixtures.UUID_TOKEN_UNSCOPED, self.examples.UUID_TOKEN_UNSCOPED,
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG): self.examples.UUID_TOKEN_NO_SERVICE_CATALOG):
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, token), "%s/v2.0/tokens/%s" % (BASE_URI, token),
body= body=
client_fixtures.JSON_TOKEN_RESPONSES[token]) self.examples.JSON_TOKEN_RESPONSES[token])
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
'%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN), '%s/v2.0/tokens/%s' % (BASE_URI, ERROR_TOKEN),
@@ -1088,11 +1103,11 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_default_tenant_uuid_token(self): def test_default_tenant_uuid_token(self):
self.assert_unscoped_default_tenant_auto_scopes( self.assert_unscoped_default_tenant_auto_scopes(
client_fixtures.UUID_TOKEN_DEFAULT) self.examples.UUID_TOKEN_DEFAULT)
def test_default_tenant_signed_token(self): def test_default_tenant_signed_token(self):
self.assert_unscoped_default_tenant_auto_scopes( self.assert_unscoped_default_tenant_auto_scopes(
client_fixtures.SIGNED_TOKEN_SCOPED) self.examples.SIGNED_TOKEN_SCOPED)
def assert_unscoped_token_receives_401(self, token): def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected.""" """Unscoped requests with no default tenant ID should be rejected."""
@@ -1105,24 +1120,27 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
def test_unscoped_uuid_token_receives_401(self): def test_unscoped_uuid_token_receives_401(self):
self.assert_unscoped_token_receives_401( self.assert_unscoped_token_receives_401(
client_fixtures.UUID_TOKEN_UNSCOPED) self.examples.UUID_TOKEN_UNSCOPED)
def test_unscoped_pki_token_receives_401(self): def test_unscoped_pki_token_receives_401(self):
self.assert_unscoped_token_receives_401( self.assert_unscoped_token_receives_401(
client_fixtures.SIGNED_TOKEN_UNSCOPED) self.examples.SIGNED_TOKEN_UNSCOPED)
def test_request_prevent_service_catalog_injection(self): def test_request_prevent_service_catalog_injection(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Service-Catalog'] = '[]' req.headers['X-Service-Catalog'] = '[]'
req.headers['X-Auth-Token'] = \ req.headers['X-Auth-Token'] = \
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG self.examples.UUID_TOKEN_NO_SERVICE_CATALOG
body = self.middleware(req.environ, self.start_fake_response) body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200) self.assertEqual(self.response_status, 200)
self.assertFalse(req.headers.get('X-Service-Catalog')) self.assertFalse(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS']) self.assertEqual(body, ['SUCCESS'])
class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
@httpretty.activate @httpretty.activate
def test_valid_uuid_request_forced_to_2_0(self): def test_valid_uuid_request_forced_to_2_0(self):
@@ -1149,27 +1167,28 @@ class CrossVersionAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"%s/v2.0/tokens" % BASE_URI, "%s/v2.0/tokens" % BASE_URI,
body=FAKE_ADMIN_TOKEN) body=FAKE_ADMIN_TOKEN)
token = client_fixtures.UUID_TOKEN_DEFAULT token = self.examples.UUID_TOKEN_DEFAULT
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/%s" % (BASE_URI, token), "%s/v2.0/tokens/%s" % (BASE_URI, token),
body= body=
client_fixtures.JSON_TOKEN_RESPONSES[token]) self.examples.JSON_TOKEN_RESPONSES[token])
self.set_middleware(conf=conf) self.set_middleware(conf=conf)
# This tests will only work is auth_token has chosen to use the # This tests will only work is auth_token has chosen to use the
# lower, v2, api version # lower, v2, api version
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT req.headers['X-Auth-Token'] = self.examples.UUID_TOKEN_DEFAULT
self.middleware(req.environ, self.start_fake_response) self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200) self.assertEqual(self.response_status, 200)
self.assertEqual("/testadmin/v2.0/tokens/%s" % self.assertEqual("/testadmin/v2.0/tokens/%s" %
client_fixtures.UUID_TOKEN_DEFAULT, self.examples.UUID_TOKEN_DEFAULT,
httpretty.httpretty.last_request.path) httpretty.httpretty.last_request.path)
class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest, class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
CommonAuthTokenMiddlewareTest): CommonAuthTokenMiddlewareTest,
testresources.ResourcedTestCase):
"""Test auth_token middleware with v3 tokens. """Test auth_token middleware with v3 tokens.
Re-execute the AuthTokenMiddlewareTest class tests, but with the Re-execute the AuthTokenMiddlewareTest class tests, but with the
@@ -1193,19 +1212,22 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
the highest available auth version, i.e. v3.0 the highest available auth version, i.e. v3.0
""" """
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def setUp(self): def setUp(self):
super(v3AuthTokenMiddlewareTest, self).setUp( super(v3AuthTokenMiddlewareTest, self).setUp(
auth_version='v3.0', auth_version='v3.0',
fake_app=v3FakeApp) fake_app=v3FakeApp)
self.token_dict = { self.token_dict = {
'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT, 'uuid_token_default': self.examples.v3_UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED, 'uuid_token_unscoped': self.examples.v3_UUID_TOKEN_UNSCOPED,
'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED, 'signed_token_scoped': self.examples.SIGNED_v3_TOKEN_SCOPED,
'signed_token_scoped_expired': 'signed_token_scoped_expired':
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED, self.examples.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token': client_fixtures.REVOKED_v3_TOKEN, 'revoked_token': self.examples.REVOKED_v3_TOKEN,
'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH 'revoked_token_hash': self.examples.REVOKED_v3_TOKEN_HASH
} }
httpretty.httpretty.reset() httpretty.httpretty.reset()
@@ -1225,7 +1247,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
# TODO(jamielennox): there is no v3 revocation url yet, it uses v2 # TODO(jamielennox): there is no v3 revocation url yet, it uses v2
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
"%s/v2.0/tokens/revoked" % BASE_URI, "%s/v2.0/tokens/revoked" % BASE_URI,
body=client_fixtures.SIGNED_REVOCATION_LIST, body=self.examples.SIGNED_REVOCATION_LIST,
status=200) status=200)
httpretty.register_uri(httpretty.GET, httpretty.register_uri(httpretty.GET,
@@ -1251,7 +1273,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
raise auth_token.NetworkError("Network connection error.") raise auth_token.NetworkError("Network connection error.")
try: try:
response = client_fixtures.JSON_TOKEN_RESPONSES[token_id] response = self.examples.JSON_TOKEN_RESPONSES[token_id]
except KeyError: except KeyError:
status = 404 status = 404
@@ -1274,7 +1296,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
'HTTP_X_ROLE': '', 'HTTP_X_ROLE': '',
} }
self.set_middleware(expected_env=delta_expected_env) self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED, self.assert_valid_request_200(self.examples.v3_UUID_TOKEN_UNSCOPED,
with_catalog=False) with_catalog=False)
self.assertLastPath('/testadmin/v3/auth/tokens') self.assertLastPath('/testadmin/v3/auth/tokens')
@@ -1293,7 +1315,7 @@ class v3AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest,
} }
self.set_middleware(expected_env=delta_expected_env) self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200( self.assert_valid_request_200(
client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED) self.examples.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertLastPath('/testadmin/v3/auth/tokens') self.assertLastPath('/testadmin/v3/auth/tokens')
@@ -1512,3 +1534,7 @@ class TokenExpirationTest(BaseAuthTokenMiddlewareTest):
expires = timeutils.strtime(some_time_earlier) + '-02:00' expires = timeutils.strtime(some_time_earlier) + '-02:00'
self.middleware._cache_put(token, data, expires) self.middleware._cache_put(token, data, expires)
self.assertIsNone(self.middleware._cache_get(token)) self.assertIsNone(self.middleware._cache_get(token))
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)

View File

@@ -16,6 +16,7 @@ import os
import subprocess import subprocess
import mock import mock
import testresources
from keystoneclient.common import cms from keystoneclient.common import cms
from keystoneclient import exceptions from keystoneclient import exceptions
@@ -23,10 +24,12 @@ from keystoneclient.tests import client_fixtures
from keystoneclient.tests import utils from keystoneclient.tests import utils
class CMSTest(utils.TestCase): class CMSTest(utils.TestCase, testresources.ResourcedTestCase):
"""Unit tests for the keystoneclient.common.cms module.""" """Unit tests for the keystoneclient.common.cms module."""
resources = [('examples', client_fixtures.EXAMPLES_RESOURCE)]
def test_cms_verify(self): def test_cms_verify(self):
self.assertRaises(exceptions.CertificateConfigError, self.assertRaises(exceptions.CertificateConfigError,
cms.cms_verify, cms.cms_verify,
@@ -39,33 +42,33 @@ class CMSTest(utils.TestCase):
'auth_token_scoped.pem')) as f: 'auth_token_scoped.pem')) as f:
AUTH_TOKEN_SCOPED_CMS = f.read() AUTH_TOKEN_SCOPED_CMS = f.read()
self.assertEqual(cms.token_to_cms(client_fixtures.SIGNED_TOKEN_SCOPED), self.assertEqual(cms.token_to_cms(self.examples.SIGNED_TOKEN_SCOPED),
AUTH_TOKEN_SCOPED_CMS) AUTH_TOKEN_SCOPED_CMS)
tok = cms.cms_to_token(cms.token_to_cms( tok = cms.cms_to_token(cms.token_to_cms(
client_fixtures.SIGNED_TOKEN_SCOPED)) self.examples.SIGNED_TOKEN_SCOPED))
self.assertEqual(tok, client_fixtures.SIGNED_TOKEN_SCOPED) self.assertEqual(tok, self.examples.SIGNED_TOKEN_SCOPED)
def test_ans1_token(self): def test_ans1_token(self):
self.assertTrue(cms.is_ans1_token(client_fixtures.SIGNED_TOKEN_SCOPED)) self.assertTrue(cms.is_ans1_token(self.examples.SIGNED_TOKEN_SCOPED))
self.assertFalse(cms.is_ans1_token('FOOBAR')) self.assertFalse(cms.is_ans1_token('FOOBAR'))
def test_cms_sign_token_no_files(self): def test_cms_sign_token_no_files(self):
self.assertRaises(subprocess.CalledProcessError, self.assertRaises(subprocess.CalledProcessError,
cms.cms_sign_token, cms.cms_sign_token,
client_fixtures.SIGNED_TOKEN_SCOPED, self.examples.SIGNED_TOKEN_SCOPED,
'/no/such/file', '/no/such/key') '/no/such/file', '/no/such/key')
def test_cms_sign_token_success(self): def test_cms_sign_token_success(self):
self.assertTrue( self.assertTrue(
cms.cms_sign_token(client_fixtures.SIGNED_TOKEN_SCOPED, cms.cms_sign_token(self.examples.SIGNED_TOKEN_SCOPED,
client_fixtures.SIGNING_CERT_FILE, self.examples.SIGNING_CERT_FILE,
client_fixtures.SIGNING_KEY_FILE)) self.examples.SIGNING_KEY_FILE))
def test_cms_verify_token_no_files(self): def test_cms_verify_token_no_files(self):
self.assertRaises(exceptions.CertificateConfigError, self.assertRaises(exceptions.CertificateConfigError,
cms.cms_verify, cms.cms_verify,
client_fixtures.SIGNED_TOKEN_SCOPED, self.examples.SIGNED_TOKEN_SCOPED,
'/no/such/file', '/no/such/key') '/no/such/file', '/no/such/key')
def test_cms_verify_token_no_oserror(self): def test_cms_verify_token_no_oserror(self):
@@ -86,26 +89,30 @@ class CMSTest(utils.TestCase):
self.fail('Expected subprocess.CalledProcessError') self.fail('Expected subprocess.CalledProcessError')
def test_cms_verify_token_scoped(self): def test_cms_verify_token_scoped(self):
cms_content = cms.token_to_cms(client_fixtures.SIGNED_TOKEN_SCOPED) cms_content = cms.token_to_cms(self.examples.SIGNED_TOKEN_SCOPED)
self.assertTrue(cms.cms_verify(cms_content, self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE, self.examples.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE)) self.examples.SIGNING_CA_FILE))
def test_cms_verify_token_scoped_expired(self): def test_cms_verify_token_scoped_expired(self):
cms_content = cms.token_to_cms( cms_content = cms.token_to_cms(
client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED) self.examples.SIGNED_TOKEN_SCOPED_EXPIRED)
self.assertTrue(cms.cms_verify(cms_content, self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE, self.examples.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE)) self.examples.SIGNING_CA_FILE))
def test_cms_verify_token_unscoped(self): def test_cms_verify_token_unscoped(self):
cms_content = cms.token_to_cms(client_fixtures.SIGNED_TOKEN_UNSCOPED) cms_content = cms.token_to_cms(self.examples.SIGNED_TOKEN_UNSCOPED)
self.assertTrue(cms.cms_verify(cms_content, self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE, self.examples.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE)) self.examples.SIGNING_CA_FILE))
def test_cms_verify_token_v3_scoped(self): def test_cms_verify_token_v3_scoped(self):
cms_content = cms.token_to_cms(client_fixtures.SIGNED_v3_TOKEN_SCOPED) cms_content = cms.token_to_cms(self.examples.SIGNED_v3_TOKEN_SCOPED)
self.assertTrue(cms.cms_verify(cms_content, self.assertTrue(cms.cms_verify(cms_content,
client_fixtures.SIGNING_CERT_FILE, self.examples.SIGNING_CERT_FILE,
client_fixtures.SIGNING_CA_FILE)) self.examples.SIGNING_CA_FILE))
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)

View File

@@ -14,6 +14,8 @@
import datetime import datetime
import testresources
from keystoneclient import access from keystoneclient import access
from keystoneclient.openstack.common import timeutils from keystoneclient.openstack.common import timeutils
from keystoneclient.tests import client_fixtures as token_data from keystoneclient.tests import client_fixtures as token_data
@@ -22,11 +24,12 @@ from keystoneclient.tests.v2_0 import utils
UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN UNSCOPED_TOKEN = client_fixtures.UNSCOPED_TOKEN
PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN PROJECT_SCOPED_TOKEN = client_fixtures.PROJECT_SCOPED_TOKEN
DIABLO_TOKEN = token_data.TOKEN_RESPONSES[token_data.VALID_DIABLO_TOKEN]
GRIZZLY_TOKEN = token_data.TOKEN_RESPONSES[token_data.SIGNED_TOKEN_SCOPED_KEY]
class AccessInfoTest(utils.TestCase): class AccessInfoTest(utils.TestCase, testresources.ResourcedTestCase):
resources = [('examples', token_data.EXAMPLES_RESOURCE)]
def test_building_unscoped_accessinfo(self): def test_building_unscoped_accessinfo(self):
auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN) auth_ref = access.AccessInfo.factory(body=UNSCOPED_TOKEN)
@@ -100,7 +103,9 @@ class AccessInfoTest(utils.TestCase):
self.assertFalse(auth_ref.domain_scoped) self.assertFalse(auth_ref.domain_scoped)
def test_diablo_token(self): def test_diablo_token(self):
auth_ref = access.AccessInfo.factory(body=DIABLO_TOKEN) diablo_token = self.examples.TOKEN_RESPONSES[
self.examples.VALID_DIABLO_TOKEN]
auth_ref = access.AccessInfo.factory(body=diablo_token)
self.assertTrue(auth_ref) self.assertTrue(auth_ref)
self.assertEqual(auth_ref.username, 'user_name1') self.assertEqual(auth_ref.username, 'user_name1')
@@ -113,7 +118,9 @@ class AccessInfoTest(utils.TestCase):
self.assertFalse(auth_ref.scoped) self.assertFalse(auth_ref.scoped)
def test_grizzly_token(self): def test_grizzly_token(self):
auth_ref = access.AccessInfo.factory(body=GRIZZLY_TOKEN) grizzly_token = self.examples.TOKEN_RESPONSES[
self.examples.SIGNED_TOKEN_SCOPED_KEY]
auth_ref = access.AccessInfo.factory(body=grizzly_token)
self.assertEqual(auth_ref.project_id, 'tenant_id1') self.assertEqual(auth_ref.project_id, 'tenant_id1')
self.assertEqual(auth_ref.project_name, 'tenant_name1') self.assertEqual(auth_ref.project_name, 'tenant_name1')
@@ -121,3 +128,7 @@ class AccessInfoTest(utils.TestCase):
self.assertEqual(auth_ref.project_domain_name, 'Default') self.assertEqual(auth_ref.project_domain_name, 'Default')
self.assertEqual(auth_ref.user_domain_id, 'default') self.assertEqual(auth_ref.user_domain_id, 'default')
self.assertEqual(auth_ref.user_domain_name, 'Default') self.assertEqual(auth_ref.user_domain_name, 'Default')
def load_tests(loader, tests, pattern):
return testresources.OptimisingTestSuite(tests)