Extract test token data from auth_token middleware

This data can be used elsewhere as well.

Change-Id: I38b6c782b6d38e9e7d091f1c045a13ca10dfcc85
This commit is contained in:
Jamie Lennox
2013-08-12 12:15:02 +10:00
parent becec90286
commit 903575cf4a
2 changed files with 342 additions and 327 deletions

297
tests/client_fixtures.py Normal file
View File

@@ -0,0 +1,297 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from keystoneclient.common import cms
from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import timeutils
from keystoneclient import utils
ROOTDIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
CERTDIR = os.path.join(ROOTDIR, "examples/pki/certs")
CMSDIR = os.path.join(ROOTDIR, "examples/pki/cms")
# @TODO(mordred) This should become a testresources resource attached to the
# class
# The data for these tests are signed using openssl and are stored in files
# in the signing subdirectory. In order to keep the values consistent between
# the tests and the signed documents, we read them in for use in the tests.
with open(os.path.join(CMSDIR, 'auth_token_scoped.pem')) as f:
SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_unscoped.pem')) as f:
SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_scoped.pem')) as f:
SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_revoked.pem')) as f:
REVOKED_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_token_scoped_expired.pem')) as f:
SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'auth_v3_token_revoked.pem')) as f:
REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.json')) as f:
REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(CMSDIR, 'revocation_list.pem')) as f:
SIGNED_REVOCATION_LIST = jsonutils.dumps({'signed': f.read()})
with open(os.path.join(CERTDIR, 'signing_cert.pem')) as f:
SIGNING_CERT = f.read()
with open(os.path.join(CERTDIR, 'cacert.pem')) as f:
SIGNING_CA = f.read()
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
REVOKED_TOKEN_HASH = utils.hash_signed_token(REVOKED_TOKEN)
REVOKED_TOKEN_LIST = {'revoked': [{'id': REVOKED_TOKEN_HASH,
'expires': timeutils.utcnow()}]}
REVOKED_TOKEN_LIST_JSON = jsonutils.dumps(REVOKED_TOKEN_LIST)
REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(REVOKED_v3_TOKEN)
REVOKED_v3_TOKEN_LIST = {'revoked': [{'id': REVOKED_v3_TOKEN_HASH,
'expires': timeutils.utcnow()}]}
REVOKED_v3_TOKEN_LIST_JSON = jsonutils.dumps(REVOKED_v3_TOKEN_LIST)
SIGNED_TOKEN_SCOPED_KEY = cms.cms_hash_token(SIGNED_TOKEN_SCOPED)
SIGNED_TOKEN_UNSCOPED_KEY = cms.cms_hash_token(SIGNED_TOKEN_UNSCOPED)
SIGNED_v3_TOKEN_SCOPED_KEY = cms.cms_hash_token(SIGNED_v3_TOKEN_SCOPED)
INVALID_SIGNED_TOKEN = \
"MIIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA" \
"BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB" \
"CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC" \
"DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD" \
"EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE" \
"FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF" \
"0000000000000000000000000000000000000000000000000000000000000000" \
"1111111111111111111111111111111111111111111111111111111111111111" \
"2222222222222222222222222222222222222222222222222222222222222222" \
"3333333333333333333333333333333333333333333333333333333333333333" \
"4444444444444444444444444444444444444444444444444444444444444444" \
"5555555555555555555555555555555555555555555555555555555555555555" \
"6666666666666666666666666666666666666666666666666666666666666666" \
"7777777777777777777777777777777777777777777777777777777777777777" \
"8888888888888888888888888888888888888888888888888888888888888888" \
"9999999999999999999999999999999999999999999999999999999999999999" \
"0000000000000000000000000000000000000000000000000000000000000000" \
# JSON responses keyed by token ID
TOKEN_RESPONSES = {
UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': UUID_TOKEN_DEFAULT,
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
},
VALID_DIABLO_TOKEN: {
'access': {
'token': {
'id': VALID_DIABLO_TOKEN,
'expires': '2020-01-01T00:00:10.000123Z',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': UUID_TOKEN_UNSCOPED,
'expires': '2020-01-01T00:00:10.000123Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
v3_UUID_TOKEN_DEFAULT: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
v3_UUID_TOKEN_UNSCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
SIGNED_TOKEN_SCOPED_KEY: {
'access': {
'token': {
'id': SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
SIGNED_TOKEN_UNSCOPED_KEY: {
'access': {
'token': {
'id': SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
SIGNED_v3_TOKEN_SCOPED_KEY: {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
}
},
}
JSON_TOKEN_RESPONSES = dict([(k, jsonutils.dumps(v)) for k, v in
TOKEN_RESPONSES.iteritems()])

View File

@@ -19,7 +19,6 @@ import iso8601
import os import os
import shutil import shutil
import stat import stat
import string
import sys import sys
import tempfile import tempfile
import testtools import testtools
@@ -33,200 +32,11 @@ from keystoneclient.middleware import auth_token
from keystoneclient.openstack.common import jsonutils from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import memorycache from keystoneclient.openstack.common import memorycache
from keystoneclient.openstack.common import timeutils from keystoneclient.openstack.common import timeutils
from keystoneclient import utils
import client_fixtures
ROOTDIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
CERTDIR = os.path.join(ROOTDIR, "examples/pki/certs")
KEYDIR = os.path.join(ROOTDIR, "examples/pki/private")
CMSDIR = os.path.join(ROOTDIR, "examples/pki/cms")
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
CA = os.path.join(CERTDIR, 'ca.pem')
REVOCATION_LIST = None
REVOKED_TOKEN = None
REVOKED_TOKEN_HASH = None
REVOKED_v3_TOKEN = None
REVOKED_v3_TOKEN_HASH = None
SIGNED_REVOCATION_LIST = None SIGNED_REVOCATION_LIST = None
SIGNED_TOKEN_SCOPED = None VALID_SIGNED_REVOCATION_LIST = client_fixtures.SIGNED_REVOCATION_LIST
SIGNED_TOKEN_UNSCOPED = None
SIGNED_v3_TOKEN_SCOPED = None
SIGNED_v3_TOKEN_UNSCOPED = None
SIGNED_TOKEN_SCOPED_KEY = None
SIGNED_TOKEN_UNSCOPED_KEY = None
SIGNED_v3_TOKEN_SCOPED_KEY = None
VALID_SIGNED_REVOCATION_LIST = None
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
INVALID_SIGNED_TOKEN = string.replace(
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
0000000000000000000000000000000000000000000000000000000000000000
1111111111111111111111111111111111111111111111111111111111111111
2222222222222222222222222222222222222222222222222222222222222222
3333333333333333333333333333333333333333333333333333333333333333
4444444444444444444444444444444444444444444444444444444444444444
5555555555555555555555555555555555555555555555555555555555555555
6666666666666666666666666666666666666666666666666666666666666666
7777777777777777777777777777777777777777777777777777777777777777
8888888888888888888888888888888888888888888888888888888888888888
9999999999999999999999999999999999999999999999999999999999999999
0000000000000000000000000000000000000000000000000000000000000000
xg==""", "\n", "")
# JSON responses keyed by token ID
TOKEN_RESPONSES = {
UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': UUID_TOKEN_DEFAULT,
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
},
VALID_DIABLO_TOKEN: {
'access': {
'token': {
'id': VALID_DIABLO_TOKEN,
'expires': '2020-01-01T00:00:10.000123Z',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': UUID_TOKEN_UNSCOPED,
'expires': '2020-01-01T00:00:10.000123Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
v3_UUID_TOKEN_DEFAULT: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
v3_UUID_TOKEN_UNSCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
}
}
EXPECTED_V2_DEFAULT_ENV_RESPONSE = { EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
'HTTP_X_IDENTITY_STATUS': 'Confirmed', 'HTTP_X_IDENTITY_STATUS': 'Confirmed',
@@ -242,101 +52,6 @@ EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
FAKE_RESPONSE_STACK = [] FAKE_RESPONSE_STACK = []
# @TODO(mordred) This should become a testresources resource attached to the
# class
# The data for these tests are signed using openssl and are stored in files
# in the signing subdirectory. In order to keep the values consistent between
# the tests and the signed documents, we read them in for use in the tests.
signing_path = CMSDIR
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_v3_token_scoped.pem')) as f:
SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
REVOKED_TOKEN = cms.cms_to_token(f.read())
with open(os.path.join(signing_path,
'auth_token_scoped_expired.pem')) as f:
SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
REVOKED_TOKEN_HASH = utils.hash_signed_token(REVOKED_TOKEN)
with open(os.path.join(signing_path, 'auth_v3_token_revoked.pem')) as f:
REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(REVOKED_v3_TOKEN)
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
{'signed': f.read()})
SIGNED_TOKEN_SCOPED_KEY =\
cms.cms_hash_token(SIGNED_TOKEN_SCOPED)
SIGNED_TOKEN_UNSCOPED_KEY =\
cms.cms_hash_token(SIGNED_TOKEN_UNSCOPED)
SIGNED_v3_TOKEN_SCOPED_KEY = (
cms.cms_hash_token(SIGNED_v3_TOKEN_SCOPED))
TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY] = {
'access': {
'token': {
'id': SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
}
TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
'access': {
'token': {
'id': SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
}
TOKEN_RESPONSES[SIGNED_v3_TOKEN_SCOPED_KEY] = {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
}
}
VERSION_LIST_v3 = { VERSION_LIST_v3 = {
"versions": { "versions": {
"values": [ "values": [
@@ -444,9 +159,9 @@ class BaseFakeHTTPConnection(object):
return 404 indicating an unknown (therefore unauthorized) token. return 404 indicating an unknown (therefore unauthorized) token.
""" """
if token_id in TOKEN_RESPONSES.keys(): if token_id in client_fixtures.JSON_TOKEN_RESPONSES.keys():
status = 200 status = 200
body = jsonutils.dumps(TOKEN_RESPONSES[token_id]) body = client_fixtures.JSON_TOKEN_RESPONSES[token_id]
elif token_id == "revoked": elif token_id == "revoked":
status = 200 status = 200
body = SIGNED_REVOCATION_LIST body = SIGNED_REVOCATION_LIST
@@ -655,19 +370,20 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
self.token_dict = token_dict self.token_dict = token_dict
else: else:
self.token_dict = { self.token_dict = {
'uuid_token_default': UUID_TOKEN_DEFAULT, 'uuid_token_default': client_fixtures.UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': UUID_TOKEN_UNSCOPED, 'uuid_token_unscoped': client_fixtures.UUID_TOKEN_UNSCOPED,
'signed_token_scoped': SIGNED_TOKEN_SCOPED, 'signed_token_scoped': client_fixtures.SIGNED_TOKEN_SCOPED,
'signed_token_scoped_expired': SIGNED_TOKEN_SCOPED_EXPIRED, 'signed_token_scoped_expired':
'revoked_token': REVOKED_TOKEN, client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token_hash': REVOKED_TOKEN_HASH 'revoked_token': client_fixtures.REVOKED_TOKEN,
'revoked_token_hash': client_fixtures.REVOKED_TOKEN_HASH
} }
self.conf = { self.conf = {
'auth_host': 'keystone.example.com', 'auth_host': 'keystone.example.com',
'auth_port': 1234, 'auth_port': 1234,
'auth_admin_prefix': '/testadmin', 'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR, 'signing_dir': client_fixtures.CERTDIR,
'auth_version': auth_version 'auth_version': auth_version
} }
@@ -811,7 +527,7 @@ class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
FAKE_RESPONSE_STACK.append(resp4) FAKE_RESPONSE_STACK.append(resp4)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST) self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
@@ -829,7 +545,7 @@ class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def test_valid_diablo_response(self): def test_valid_diablo_response(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN req.headers['X-Auth-Token'] = client_fixtures.VALID_DIABLO_TOKEN
self.middleware(req.environ, self.start_fake_response) self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200) self.assertEqual(self.response_status, 200)
self.assertTrue('keystone.token_info' in req.environ) self.assertTrue('keystone.token_info' in req.environ)
@@ -994,7 +710,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
self.middleware.token_revocation_list_fetched_time = None self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name) os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list, self.assertEqual(self.middleware.token_revocation_list,
REVOCATION_LIST) client_fixtures.REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self): def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware.token_revocation_list, self.assertEqual(self.middleware.token_revocation_list,
@@ -1015,7 +731,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
# tests to override the fake http connection # tests to override the fake http connection
self.set_fake_http(FakeHTTPConnection) self.set_fake_http(FakeHTTPConnection)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list()) fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST) self.assertEqual(fetched_list, client_fixtures.REVOCATION_LIST)
def test_request_invalid_uuid_token(self): def test_request_invalid_uuid_token(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
@@ -1027,7 +743,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def test_request_invalid_signed_token(self): def test_request_invalid_signed_token(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN req.headers['X-Auth-Token'] = client_fixtures.INVALID_SIGNED_TOKEN
self.middleware(req.environ, self.start_fake_response) self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401) self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'], self.assertEqual(self.response_headers['WWW-Authenticate'],
@@ -1061,17 +777,11 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def test_request_no_token_http(self): def test_request_no_token_http(self):
req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'}) req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
conf = { self.set_middleware()
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_protocol': 'http',
'auth_admin_prefix': '/testadmin',
}
self.set_middleware(conf=conf)
body = self.middleware(req.environ, self.start_fake_response) body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401) self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'], self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='http://keystone.example.com:1234'") "Keystone uri='https://keystone.example.com:1234'")
self.assertEqual(body, ['']) self.assertEqual(body, [''])
def test_request_blank_token(self): def test_request_blank_token(self):
@@ -1113,7 +823,7 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
token_cache_time = 10 token_cache_time = 10
conf = { conf = {
'token_cache_time': token_cache_time, 'token_cache_time': token_cache_time,
'signing_dir': CERTDIR, 'signing_dir': client_fixtures.CERTDIR,
} }
conf.update(extra_conf) conf.update(extra_conf)
self.set_middleware(conf=conf) self.set_middleware(conf=conf)
@@ -1402,10 +1112,12 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
self.assertTrue('keystone.token_info' in req.environ) self.assertTrue('keystone.token_info' in req.environ)
def test_default_tenant_uuid_token(self): def test_default_tenant_uuid_token(self):
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT) self.assert_unscoped_default_tenant_auto_scopes(
client_fixtures.UUID_TOKEN_DEFAULT)
def test_default_tenant_signed_token(self): def test_default_tenant_signed_token(self):
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED) self.assert_unscoped_default_tenant_auto_scopes(
client_fixtures.SIGNED_TOKEN_SCOPED)
def assert_unscoped_token_receives_401(self, token): def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected.""" """Unscoped requests with no default tenant ID should be rejected."""
@@ -1417,15 +1129,18 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"Keystone uri='https://keystone.example.com:1234'") "Keystone uri='https://keystone.example.com:1234'")
def test_unscoped_uuid_token_receives_401(self): def test_unscoped_uuid_token_receives_401(self):
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED) self.assert_unscoped_token_receives_401(
client_fixtures.UUID_TOKEN_UNSCOPED)
def test_unscoped_pki_token_receives_401(self): def test_unscoped_pki_token_receives_401(self):
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED) self.assert_unscoped_token_receives_401(
client_fixtures.SIGNED_TOKEN_UNSCOPED)
def test_request_prevent_service_catalog_injection(self): def test_request_prevent_service_catalog_injection(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Service-Catalog'] = '[]' req.headers['X-Service-Catalog'] = '[]'
req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG req.headers['X-Auth-Token'] = \
client_fixtures.UUID_TOKEN_NO_SERVICE_CATALOG
body = self.middleware(req.environ, self.start_fake_response) body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200) self.assertEqual(self.response_status, 200)
self.assertFalse(req.headers.get('X-Service-Catalog')) self.assertFalse(req.headers.get('X-Service-Catalog'))
@@ -1445,17 +1160,18 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com', 'auth_host': 'keystone.example.com',
'auth_port': 1234, 'auth_port': 1234,
'auth_admin_prefix': '/testadmin', 'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR, 'signing_dir': client_fixtures.CERTDIR,
'auth_version': 'v2.0' 'auth_version': 'v2.0'
} }
self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf) self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
# This tests will only work is auth_token has chosen to use the # This tests will only work is auth_token has chosen to use the
# lower, v2, api version # lower, v2, api version
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = UUID_TOKEN_DEFAULT req.headers['X-Auth-Token'] = client_fixtures.UUID_TOKEN_DEFAULT
body = self.middleware(req.environ, self.start_fake_response) body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200) self.assertEqual(self.response_status, 200)
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT, self.assertEqual("/testadmin/v2.0/tokens/%s" %
client_fixtures.UUID_TOKEN_DEFAULT,
v3FakeHTTPConnection.last_requested_url) v3FakeHTTPConnection.last_requested_url)
def test_invalid_auth_version_request(self): def test_invalid_auth_version_request(self):
@@ -1463,7 +1179,7 @@ class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
'auth_host': 'keystone.example.com', 'auth_host': 'keystone.example.com',
'auth_port': 1234, 'auth_port': 1234,
'auth_admin_prefix': '/testadmin', 'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR, 'signing_dir': client_fixtures.CERTDIR,
'auth_version': 'v1.0' # v1.0 is no longer supported 'auth_version': 'v1.0' # v1.0 is no longer supported
} }
self.assertRaises(Exception, self.set_middleware, conf) self.assertRaises(Exception, self.set_middleware, conf)
@@ -1495,12 +1211,13 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
""" """
def setUp(self): def setUp(self):
token_dict = { token_dict = {
'uuid_token_default': v3_UUID_TOKEN_DEFAULT, 'uuid_token_default': client_fixtures.v3_UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': v3_UUID_TOKEN_UNSCOPED, 'uuid_token_unscoped': client_fixtures.v3_UUID_TOKEN_UNSCOPED,
'signed_token_scoped': SIGNED_v3_TOKEN_SCOPED, 'signed_token_scoped': client_fixtures.SIGNED_v3_TOKEN_SCOPED,
'signed_token_scoped_expired': SIGNED_TOKEN_SCOPED_EXPIRED, 'signed_token_scoped_expired':
'revoked_token': REVOKED_v3_TOKEN, client_fixtures.SIGNED_TOKEN_SCOPED_EXPIRED,
'revoked_token_hash': REVOKED_v3_TOKEN_HASH 'revoked_token': client_fixtures.REVOKED_v3_TOKEN,
'revoked_token_hash': client_fixtures.REVOKED_v3_TOKEN_HASH
} }
super(v3AuthTokenMiddlewareTest, self).setUp( super(v3AuthTokenMiddlewareTest, self).setUp(
auth_version='v3.0', auth_version='v3.0',
@@ -1528,7 +1245,7 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
'HTTP_X_ROLE': '', 'HTTP_X_ROLE': '',
} }
self.set_middleware(expected_env=delta_expected_env) self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(v3_UUID_TOKEN_UNSCOPED, self.assert_valid_request_200(client_fixtures.v3_UUID_TOKEN_UNSCOPED,
with_catalog=False) with_catalog=False)
self.assertEqual('/testadmin/v3/auth/tokens', self.assertEqual('/testadmin/v3/auth/tokens',
v3FakeHTTPConnection.last_requested_url) v3FakeHTTPConnection.last_requested_url)
@@ -1547,7 +1264,8 @@ class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
'HTTP_X_TENANT': None 'HTTP_X_TENANT': None
} }
self.set_middleware(expected_env=delta_expected_env) self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(v3_UUID_TOKEN_DOMAIN_SCOPED) self.assert_valid_request_200(
client_fixtures.v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertEqual('/testadmin/v3/auth/tokens', self.assertEqual('/testadmin/v3/auth/tokens',
v3FakeHTTPConnection.last_requested_url) v3FakeHTTPConnection.last_requested_url)