
Explicitly checks the expiry on the tokens, and rejects tokens that have expired had to regenerate the sample data for the tokens as they all had been generated with values that are now expired. bug 1179615 Change-Id: Ie06500d446f55fd0ad67ea540c92d8cfc57483f4
1432 lines
53 KiB
Python
1432 lines
53 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2012 OpenStack LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import iso8601
|
|
import os
|
|
import string
|
|
import sys
|
|
import tempfile
|
|
import testtools
|
|
|
|
import fixtures
|
|
import webob
|
|
|
|
from keystoneclient.common import cms
|
|
from keystoneclient import utils
|
|
from keystoneclient.middleware import auth_token
|
|
from keystoneclient.middleware import memcache_crypt
|
|
from keystoneclient.openstack.common import memorycache
|
|
from keystoneclient.openstack.common import jsonutils
|
|
from keystoneclient.openstack.common import timeutils
|
|
|
|
|
|
ROOTDIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
|
|
|
|
CERTDIR = os.path.join(ROOTDIR, "examples/pki/certs")
|
|
KEYDIR = os.path.join(ROOTDIR, "examples/pki/private")
|
|
CMSDIR = os.path.join(ROOTDIR, "examples/pki/cms")
|
|
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
|
|
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
|
|
CA = os.path.join(CERTDIR, 'ca.pem')
|
|
|
|
REVOCATION_LIST = None
|
|
REVOKED_TOKEN = None
|
|
REVOKED_TOKEN_HASH = None
|
|
REVOKED_v3_TOKEN = None
|
|
REVOKED_v3_TOKEN_HASH = None
|
|
SIGNED_REVOCATION_LIST = None
|
|
SIGNED_TOKEN_SCOPED = None
|
|
SIGNED_TOKEN_UNSCOPED = None
|
|
SIGNED_v3_TOKEN_SCOPED = None
|
|
SIGNED_v3_TOKEN_UNSCOPED = None
|
|
SIGNED_TOKEN_SCOPED_KEY = None
|
|
SIGNED_TOKEN_UNSCOPED_KEY = None
|
|
SIGNED_v3_TOKEN_SCOPED_KEY = None
|
|
|
|
VALID_SIGNED_REVOCATION_LIST = None
|
|
|
|
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
|
|
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
|
|
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
|
|
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
|
|
v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
|
|
v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
|
|
v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
|
|
|
|
INVALID_SIGNED_TOKEN = string.replace(
|
|
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
|
|
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
|
|
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
|
|
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
|
|
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
|
|
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
|
|
0000000000000000000000000000000000000000000000000000000000000000
|
|
1111111111111111111111111111111111111111111111111111111111111111
|
|
2222222222222222222222222222222222222222222222222222222222222222
|
|
3333333333333333333333333333333333333333333333333333333333333333
|
|
4444444444444444444444444444444444444444444444444444444444444444
|
|
5555555555555555555555555555555555555555555555555555555555555555
|
|
6666666666666666666666666666666666666666666666666666666666666666
|
|
7777777777777777777777777777777777777777777777777777777777777777
|
|
8888888888888888888888888888888888888888888888888888888888888888
|
|
9999999999999999999999999999999999999999999999999999999999999999
|
|
0000000000000000000000000000000000000000000000000000000000000000
|
|
xg==""", "\n", "")
|
|
|
|
# JSON responses keyed by token ID
|
|
TOKEN_RESPONSES = {
|
|
UUID_TOKEN_DEFAULT: {
|
|
'access': {
|
|
'token': {
|
|
'id': UUID_TOKEN_DEFAULT,
|
|
'expires': '2020-01-01T00:00:10.000123Z',
|
|
'tenant': {
|
|
'id': 'tenant_id1',
|
|
'name': 'tenant_name1',
|
|
},
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
'serviceCatalog': {}
|
|
},
|
|
},
|
|
VALID_DIABLO_TOKEN: {
|
|
'access': {
|
|
'token': {
|
|
'id': VALID_DIABLO_TOKEN,
|
|
'expires': '2020-01-01T00:00:10.000123Z',
|
|
'tenantId': 'tenant_id1',
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
},
|
|
UUID_TOKEN_UNSCOPED: {
|
|
'access': {
|
|
'token': {
|
|
'id': UUID_TOKEN_UNSCOPED,
|
|
'expires': '2020-01-01T00:00:10.000123Z',
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
},
|
|
UUID_TOKEN_NO_SERVICE_CATALOG: {
|
|
'access': {
|
|
'token': {
|
|
'id': 'valid-token',
|
|
'expires': '2020-01-01T00:00:10.000123Z',
|
|
'tenant': {
|
|
'id': 'tenant_id1',
|
|
'name': 'tenant_name1',
|
|
},
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
}
|
|
},
|
|
},
|
|
v3_UUID_TOKEN_DEFAULT: {
|
|
'token': {
|
|
'expires_at': '2020-01-01T00:00:10.000123Z',
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1'
|
|
}
|
|
},
|
|
'project': {
|
|
'id': 'tenant_id1',
|
|
'name': 'tenant_name1',
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1'
|
|
}
|
|
},
|
|
'roles': [
|
|
{'name': 'role1', 'id': 'Role1'},
|
|
{'name': 'role2', 'id': 'Role2'},
|
|
],
|
|
'catalog': {}
|
|
}
|
|
},
|
|
v3_UUID_TOKEN_UNSCOPED: {
|
|
'token': {
|
|
'expires_at': '2020-01-01T00:00:10.000123Z',
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1'
|
|
}
|
|
}
|
|
}
|
|
},
|
|
v3_UUID_TOKEN_DOMAIN_SCOPED: {
|
|
'token': {
|
|
'expires_at': '2020-01-01T00:00:10.000123Z',
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1'
|
|
}
|
|
},
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1',
|
|
},
|
|
'roles': [
|
|
{'name': 'role1', 'id': 'Role1'},
|
|
{'name': 'role2', 'id': 'Role2'},
|
|
],
|
|
'catalog': {}
|
|
}
|
|
}
|
|
}
|
|
|
|
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
|
|
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
|
|
'HTTP_X_TENANT_ID': 'tenant_id1',
|
|
'HTTP_X_TENANT_NAME': 'tenant_name1',
|
|
'HTTP_X_USER_ID': 'user_id1',
|
|
'HTTP_X_USER_NAME': 'user_name1',
|
|
'HTTP_X_ROLES': 'role1,role2',
|
|
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
|
|
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
|
|
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
|
|
}
|
|
|
|
FAKE_RESPONSE_STACK = []
|
|
|
|
|
|
# @TODO(mordred) This should become a testresources resource attached to the
|
|
# class
|
|
# The data for these tests are signed using openssl and are stored in files
|
|
# in the signing subdirectory. In order to keep the values consistent between
|
|
# the tests and the signed documents, we read them in for use in the tests.
|
|
signing_path = CMSDIR
|
|
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
|
|
SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
|
|
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
|
|
SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
|
|
with open(os.path.join(signing_path, 'auth_v3_token_scoped.pem')) as f:
|
|
SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
|
|
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
|
|
REVOKED_TOKEN = cms.cms_to_token(f.read())
|
|
with open(os.path.join(signing_path,
|
|
'auth_token_scoped_expired.pem')) as f:
|
|
SIGNED_TOKEN_SCOPED_EXPIRED = cms.cms_to_token(f.read())
|
|
REVOKED_TOKEN_HASH = utils.hash_signed_token(REVOKED_TOKEN)
|
|
with open(os.path.join(signing_path, 'auth_v3_token_revoked.pem')) as f:
|
|
REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
|
|
REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(REVOKED_v3_TOKEN)
|
|
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
|
|
REVOCATION_LIST = jsonutils.loads(f.read())
|
|
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
|
|
VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
|
|
{'signed': f.read()})
|
|
SIGNED_TOKEN_SCOPED_KEY =\
|
|
cms.cms_hash_token(SIGNED_TOKEN_SCOPED)
|
|
SIGNED_TOKEN_UNSCOPED_KEY =\
|
|
cms.cms_hash_token(SIGNED_TOKEN_UNSCOPED)
|
|
SIGNED_v3_TOKEN_SCOPED_KEY = (
|
|
cms.cms_hash_token(SIGNED_v3_TOKEN_SCOPED))
|
|
|
|
TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY] = {
|
|
'access': {
|
|
'token': {
|
|
'id': SIGNED_TOKEN_SCOPED_KEY,
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'tenantId': 'tenant_id1',
|
|
'tenantName': 'tenant_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
}
|
|
|
|
TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
|
|
'access': {
|
|
'token': {
|
|
'id': SIGNED_TOKEN_UNSCOPED_KEY,
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
}
|
|
|
|
TOKEN_RESPONSES[SIGNED_v3_TOKEN_SCOPED_KEY] = {
|
|
'token': {
|
|
'expires': '2020-01-01T00:00:10.000123Z',
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1'
|
|
}
|
|
},
|
|
'project': {
|
|
'id': 'tenant_id1',
|
|
'name': 'tenant_name1',
|
|
'domain': {
|
|
'id': 'domain_id1',
|
|
'name': 'domain_name1'
|
|
}
|
|
},
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'}
|
|
],
|
|
'catalog': {}
|
|
}
|
|
}
|
|
|
|
VERSION_LIST_v3 = {
|
|
"versions": {
|
|
"values": [
|
|
{
|
|
"id": "v3.0",
|
|
"status": "stable",
|
|
"updated": "2013-03-06T00:00:00Z",
|
|
"links": []
|
|
},
|
|
{
|
|
"id": "v2.0",
|
|
"status": "beta",
|
|
"updated": "2011-11-19T00:00:00Z",
|
|
"links": []
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
VERSION_LIST_v2 = {
|
|
"versions": {
|
|
"values": [
|
|
{
|
|
"id": "v2.0",
|
|
"status": "beta",
|
|
"updated": "2011-11-19T00:00:00Z",
|
|
"links": []
|
|
}
|
|
]
|
|
}
|
|
}
|
|
|
|
|
|
class NoModuleFinder(object):
|
|
""" Disallow further imports of 'module' """
|
|
|
|
def __init__(self, module):
|
|
self.module = module
|
|
|
|
def find_module(self, fullname, path):
|
|
if fullname == self.module or fullname.startswith(self.module + '.'):
|
|
raise ImportError
|
|
|
|
|
|
class DisableModuleFixture(fixtures.Fixture):
|
|
"""A fixture to provide support for unloading/disabling modules."""
|
|
|
|
def __init__(self, module, *args, **kw):
|
|
super(DisableModuleFixture, self).__init__(*args, **kw)
|
|
self.module = module
|
|
self._finders = []
|
|
self._cleared_modules = {}
|
|
|
|
def tearDown(self):
|
|
super(DisableModuleFixture, self).tearDown()
|
|
for finder in self._finders:
|
|
sys.meta_path.remove(finder)
|
|
sys.modules.update(self._cleared_modules)
|
|
|
|
def clear_module(self):
|
|
cleared_modules = {}
|
|
for fullname in sys.modules.keys():
|
|
if (fullname == self.module or
|
|
fullname.startswith(self.module + '.')):
|
|
cleared_modules[fullname] = sys.modules.pop(fullname)
|
|
return cleared_modules
|
|
|
|
def setUp(self):
|
|
"""Ensure ImportError for the specified module."""
|
|
|
|
super(DisableModuleFixture, self).setUp()
|
|
|
|
# Clear 'module' references in sys.modules
|
|
self._cleared_modules.update(self.clear_module())
|
|
|
|
finder = NoModuleFinder(self.module)
|
|
self._finders.append(finder)
|
|
sys.meta_path.insert(0, finder)
|
|
|
|
|
|
class FakeSwiftMemcacheRing(memorycache.Client):
|
|
# NOTE(vish): swift memcache uses param timeout instead of time
|
|
def set(self, key, value, timeout=0, min_compress_len=0):
|
|
sup = super(FakeSwiftMemcacheRing, self)
|
|
sup.set(key, value, timeout, min_compress_len)
|
|
|
|
|
|
class FakeHTTPResponse(object):
|
|
def __init__(self, status, body):
|
|
self.status = status
|
|
self.body = body
|
|
|
|
def read(self):
|
|
return self.body
|
|
|
|
|
|
class BaseFakeHTTPConnection(object):
|
|
|
|
def _user_token_responses(self, token_id):
|
|
""" Emulate user token responses.
|
|
|
|
Return success if the token is in the list we know
|
|
about. If the request is for revoked tokens, then return
|
|
the revoked list, else if a different token is provided,
|
|
return 404 indicating an unknown (therefore unauthorized) token.
|
|
|
|
"""
|
|
if token_id in TOKEN_RESPONSES.keys():
|
|
status = 200
|
|
body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
|
|
elif token_id == "revoked":
|
|
status = 200
|
|
body = SIGNED_REVOCATION_LIST
|
|
else:
|
|
status = 404
|
|
body = str()
|
|
return status, body
|
|
|
|
def fake_v2_responses(self, path):
|
|
token_id = path.rsplit('/', 1)[1]
|
|
return self._user_token_responses(token_id)
|
|
|
|
def fake_v3_responses(self, path, **kwargs):
|
|
headers = kwargs.get('headers')
|
|
token_id = headers['X-Subject-Token']
|
|
return self._user_token_responses(token_id)
|
|
|
|
def fake_v2_admin_token(self, path):
|
|
status = 200
|
|
body = jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2',
|
|
'expires': '2022-10-03T16:58:01Z'}
|
|
},
|
|
})
|
|
return status, body
|
|
|
|
|
|
class FakeHTTPConnection(BaseFakeHTTPConnection):
|
|
""" Emulate a fake Keystone v2 server """
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
self.send_valid_revocation_list = True
|
|
self.resp = None
|
|
|
|
def request(self, method, path, **kwargs):
|
|
"""Fakes out several http responses.
|
|
|
|
Support the following requests:
|
|
|
|
- Create admin token ('POST /testadmin/v2.0/tokens')
|
|
- Get versions ('GET /testadmin/')
|
|
- Get v2 user token responses (see fake_v2_responses)
|
|
|
|
"""
|
|
FakeHTTPConnection.last_requested_url = path
|
|
if method == 'POST' and path == '/testadmin/v2.0/tokens':
|
|
status, body = self.fake_v2_admin_token(path)
|
|
else:
|
|
if path == '/testadmin/':
|
|
# It's a GET versions call
|
|
status = 300
|
|
body = jsonutils.dumps(VERSION_LIST_v2)
|
|
else:
|
|
status, body = self.fake_v2_responses(path)
|
|
|
|
self.resp = FakeHTTPResponse(status, body)
|
|
|
|
def getresponse(self):
|
|
# If self.resp is set then this is just the response to
|
|
# the earlier request. If it is not set, then we expect
|
|
# a stack of responses to have been pre-prepared
|
|
if self.resp:
|
|
return self.resp
|
|
else:
|
|
if len(FAKE_RESPONSE_STACK):
|
|
return FAKE_RESPONSE_STACK.pop()
|
|
return FakeHTTPResponse(
|
|
500, jsonutils.dumps('UNEXPECTED RESPONSE'))
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class v3FakeHTTPConnection(FakeHTTPConnection):
|
|
""" Emulate a fake Keystone v3 server """
|
|
|
|
def request(self, method, path, **kwargs):
|
|
"""Fakes out several http responses.
|
|
|
|
Support the following requests:
|
|
|
|
- Create admin token ('POST /testadmin/v2.0/tokens')
|
|
- Get versions ('GET /testadmin/')
|
|
- Get v2 user token responses (see fake_v2_responses)
|
|
- Get v3 user token responses (see fake_v3_responses)
|
|
|
|
"""
|
|
v3FakeHTTPConnection.last_requested_url = path
|
|
if method == 'POST' and path == '/testadmin/v2.0/tokens':
|
|
status, body = self.fake_v2_admin_token(path)
|
|
else:
|
|
if path == '/testadmin/':
|
|
# It's a GET versions call
|
|
status = 300
|
|
body = jsonutils.dumps(VERSION_LIST_v3)
|
|
elif path.split('/')[2] == 'v2.0':
|
|
status, body = self.fake_v2_responses(path)
|
|
else:
|
|
status, body = self.fake_v3_responses(path, **kwargs)
|
|
|
|
self.resp = FakeHTTPResponse(status, body)
|
|
|
|
|
|
class RaisingHTTPConnection(FakeHTTPConnection):
|
|
""" An HTTPConnection that always raises."""
|
|
|
|
def request(self, method, path, **kwargs):
|
|
raise AssertionError("HTTP request was called.")
|
|
|
|
|
|
class FakeApp(object):
|
|
"""This represents a WSGI app protected by the auth_token middleware."""
|
|
def __init__(self, expected_env=None):
|
|
expected_env = expected_env or {}
|
|
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
|
|
self.expected_env.update(expected_env)
|
|
|
|
def __call__(self, env, start_response):
|
|
for k, v in self.expected_env.items():
|
|
assert env[k] == v, '%s != %s' % (env[k], v)
|
|
|
|
resp = webob.Response()
|
|
resp.body = 'SUCCESS'
|
|
return resp(env, start_response)
|
|
|
|
|
|
class v3FakeApp(object):
|
|
"""This represents a v3 WSGI app protected by the auth_token middleware."""
|
|
def __init__(self, expected_env=None):
|
|
expected_env = expected_env or {}
|
|
# We should always get back the same v2 items
|
|
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
|
|
# ...and with v3 additions, these are for the DEFAULT TOKEN
|
|
v3_default_env_additions = {
|
|
'HTTP_X_PROJECT_ID': 'tenant_id1',
|
|
'HTTP_X_PROJECT_NAME': 'tenant_name1',
|
|
'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
|
|
'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
|
|
'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
|
|
'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
|
|
}
|
|
self.expected_env.update(v3_default_env_additions)
|
|
# And finally update for anything passed in
|
|
self.expected_env.update(expected_env)
|
|
|
|
def __call__(self, env, start_response):
|
|
for k, v in self.expected_env.items():
|
|
assert env[k] == v, '%s != %s' % (env[k], v)
|
|
resp = webob.Response()
|
|
resp.body = 'SUCCESS'
|
|
return resp(env, start_response)
|
|
|
|
|
|
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
|
|
""" Base test class for auth_token middleware.
|
|
|
|
All the tests allow for running with auth_token
|
|
configured for receiving v2 or v3 tokens, with the
|
|
choice being made by passing configuration data into
|
|
Setup().
|
|
|
|
The base class will, by default, run all the tests
|
|
expecting v2 token formats. Child classes can override
|
|
this to specify, for instance, v3 format.
|
|
|
|
"""
|
|
def setUp(self, expected_env=None, auth_version=None,
|
|
fake_app=None, fake_http=None, token_dict=None):
|
|
testtools.TestCase.setUp(self)
|
|
expected_env = expected_env or {}
|
|
|
|
if token_dict:
|
|
self.token_dict = token_dict
|
|
else:
|
|
self.token_dict = {
|
|
'uuid_token_default': UUID_TOKEN_DEFAULT,
|
|
'uuid_token_unscoped': UUID_TOKEN_UNSCOPED,
|
|
'signed_token_scoped': SIGNED_TOKEN_SCOPED,
|
|
'signed_token_scoped_expired': SIGNED_TOKEN_SCOPED_EXPIRED,
|
|
'revoked_token': REVOKED_TOKEN,
|
|
'revoked_token_hash': REVOKED_TOKEN_HASH
|
|
}
|
|
|
|
self.conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'signing_dir': CERTDIR,
|
|
'auth_version': auth_version
|
|
}
|
|
|
|
# Base assumes v2 for fake app and http, can be overridden for
|
|
# child classes by called set_middleware() directly
|
|
self.fake_app = fake_app or FakeApp
|
|
self.fake_http = fake_http or FakeHTTPConnection
|
|
self.set_middleware(self.fake_app, self.fake_http,
|
|
expected_env, self.conf)
|
|
|
|
self.response_status = None
|
|
self.response_headers = None
|
|
|
|
signed_list = 'SIGNED_REVOCATION_LIST'
|
|
valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
|
|
globals()[signed_list] = globals()[valid_signed_list]
|
|
|
|
def set_fake_http(self, http_handler):
|
|
""" Configure the http handler for the auth_token middleware.
|
|
|
|
Allows tests to override the default handler on specific tests,
|
|
e.g. to use v2 for those parts of auth_token that still use v2
|
|
tokens while running the v3 test class, i.e. getting an admin
|
|
token or revocation list.
|
|
|
|
"""
|
|
self.middleware.http_client_class = http_handler
|
|
|
|
def set_middleware(self, fake_app=None, fake_http=None,
|
|
expected_env=None, conf=None):
|
|
""" Configure the class ready to call the auth_token middleware.
|
|
|
|
Set up the various fake items needed to run the middleware.
|
|
Individual tests that need to further refine these can call this
|
|
function to override the class defaults.
|
|
|
|
"""
|
|
conf = conf or self.conf
|
|
if 'http_handler' not in conf:
|
|
fake_http = fake_http or self.fake_http
|
|
conf['http_handler'] = fake_http
|
|
fake_app = fake_app or self.fake_app
|
|
self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
|
|
self.middleware._iso8601 = iso8601
|
|
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
|
|
self.middleware.token_revocation_list = jsonutils.dumps(
|
|
{"revoked": [], "extra": "success"})
|
|
|
|
def tearDown(self):
|
|
testtools.TestCase.tearDown(self)
|
|
try:
|
|
os.remove(self.middleware.revoked_file_name)
|
|
except OSError:
|
|
pass
|
|
|
|
def start_fake_response(self, status, headers):
|
|
self.response_status = int(status.split(' ', 1)[0])
|
|
self.response_headers = dict(headers)
|
|
|
|
|
|
if tuple(sys.version_info)[0:2] < (2, 7):
|
|
|
|
# 2.6 doesn't have the assert dict equals so make sure that it exists
|
|
class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
def assertIsInstance(self, obj, cls, msg=None):
|
|
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
|
|
default message."""
|
|
if not isinstance(obj, cls):
|
|
standardMsg = '%s is not an instance of %r' % (obj, cls)
|
|
self.fail(self._formatMessage(msg, standardMsg))
|
|
|
|
def assertDictEqual(self, d1, d2, msg=None):
|
|
# Simple version taken from 2.7
|
|
self.assertIsInstance(d1, dict,
|
|
'First argument is not a dictionary')
|
|
self.assertIsInstance(d2, dict,
|
|
'Second argument is not a dictionary')
|
|
if d1 != d2:
|
|
if msg:
|
|
self.fail(msg)
|
|
else:
|
|
standardMsg = '%r != %r' % (d1, d2)
|
|
self.fail(standardMsg)
|
|
|
|
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
|
|
|
|
|
|
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
"""Auth Token middleware test setup that allows the tests to define
|
|
a stack of responses to HTTP requests in the test and get those
|
|
responses back in sequence for testing.
|
|
|
|
Example::
|
|
|
|
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
|
|
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
})
|
|
FAKE_RESPONSE_STACK.append(resp1)
|
|
FAKE_RESPONSE_STACK.append(resp2)
|
|
|
|
... do your testing code here ...
|
|
|
|
"""
|
|
|
|
def setUp(self):
|
|
super(StackResponseAuthTokenMiddlewareTest, self).setUp()
|
|
|
|
def test_fetch_revocation_list_with_expire(self):
|
|
# first response to revocation list should return 401 Unauthorized
|
|
# to pretend to be an expired token
|
|
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
}))
|
|
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
|
|
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
}))
|
|
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
|
|
|
|
# first get_admin_token() call
|
|
FAKE_RESPONSE_STACK.append(resp1)
|
|
# request revocation list, get "unauthorized" due to simulated expired
|
|
# token
|
|
FAKE_RESPONSE_STACK.append(resp2)
|
|
# request a new admin_token
|
|
FAKE_RESPONSE_STACK.append(resp3)
|
|
# request revocation list, get the revocation list properly
|
|
FAKE_RESPONSE_STACK.append(resp4)
|
|
|
|
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
|
|
self.assertEqual(fetched_list, REVOCATION_LIST)
|
|
|
|
|
|
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
"""Auth Token middleware should understand Diablo keystone responses."""
|
|
def setUp(self):
|
|
# pre-diablo only had Tenant ID, which was also the Name
|
|
expected_env = {
|
|
'HTTP_X_TENANT_ID': 'tenant_id1',
|
|
'HTTP_X_TENANT_NAME': 'tenant_id1',
|
|
# now deprecated (diablo-compat)
|
|
'HTTP_X_TENANT': 'tenant_id1',
|
|
}
|
|
super(DiabloAuthTokenMiddlewareTest, self).setUp(
|
|
expected_env=expected_env)
|
|
|
|
def test_valid_diablo_response(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertTrue('keystone.token_info' in req.environ)
|
|
|
|
|
|
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
|
|
|
|
def setUp(self):
|
|
super(NoMemcacheAuthToken, self).setUp()
|
|
self.useFixture(DisableModuleFixture('memcache'))
|
|
|
|
def test_nomemcache(self):
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
}
|
|
|
|
auth_token.AuthProtocol(FakeApp(), conf)
|
|
|
|
def test_not_use_cache_from_env(self):
|
|
env = {'swift.cache': 'CACHE_TEST'}
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': 'localhost:11211'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
self.middleware._init_cache(env)
|
|
self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
|
|
|
|
|
|
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
|
|
def test_init_does_not_call_http(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'revocation_cache_time': 1
|
|
}
|
|
self.set_fake_http(RaisingHTTPConnection)
|
|
self.set_middleware(conf=conf, fake_http=RaisingHTTPConnection)
|
|
|
|
def assert_valid_last_url(self, token_id):
|
|
# Default version (v2) has id in the token, override this
|
|
# method for v3 and other versions
|
|
self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id,
|
|
self.middleware.http_client_class.last_requested_url)
|
|
|
|
def assert_valid_request_200(self, token, with_catalog=True):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = token
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
if with_catalog:
|
|
self.assertTrue(req.headers.get('X-Service-Catalog'))
|
|
self.assertEqual(body, ['SUCCESS'])
|
|
self.assertTrue('keystone.token_info' in req.environ)
|
|
|
|
def test_valid_uuid_request(self):
|
|
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
|
|
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
|
|
|
|
def test_valid_signed_request(self):
|
|
self.middleware.http_client_class.last_requested_url = ''
|
|
self.assert_valid_request_200(
|
|
self.token_dict['signed_token_scoped'])
|
|
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
|
|
"/testadmin")
|
|
#ensure that signed requests do not generate HTTP traffic
|
|
self.assertEqual(
|
|
'', self.middleware.http_client_class.last_requested_url)
|
|
|
|
def test_revoked_token_receives_401(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
|
|
def get_revocation_list_json(self, token_ids=None):
|
|
if token_ids is None:
|
|
token_ids = [self.token_dict['revoked_token_hash']]
|
|
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
|
|
for x in token_ids]}
|
|
return jsonutils.dumps(revocation_list)
|
|
|
|
def test_is_signed_token_revoked_returns_false(self):
|
|
#explicitly setting an empty revocation list here to document intent
|
|
self.middleware.token_revocation_list = jsonutils.dumps(
|
|
{"revoked": [], "extra": "success"})
|
|
result = self.middleware.is_signed_token_revoked(
|
|
self.token_dict['revoked_token'])
|
|
self.assertFalse(result)
|
|
|
|
def test_is_signed_token_revoked_returns_true(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
result = self.middleware.is_signed_token_revoked(
|
|
self.token_dict['revoked_token'])
|
|
self.assertTrue(result)
|
|
|
|
def test_verify_signed_token_raises_exception_for_revoked_token(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
self.assertRaises(auth_token.InvalidUserToken,
|
|
self.middleware.verify_signed_token,
|
|
self.token_dict['revoked_token'])
|
|
|
|
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
self.middleware.verify_signed_token(
|
|
self.token_dict['signed_token_scoped'])
|
|
|
|
def test_cert_file_missing(self):
|
|
self.assertFalse(self.middleware.cert_file_missing(
|
|
"openstack: /tmp/haystack: No such file or directory",
|
|
"/tmp/needle"))
|
|
self.assertTrue(self.middleware.cert_file_missing(
|
|
"openstack: /not/exist: No such file or directory",
|
|
"/not/exist"))
|
|
|
|
def test_get_token_revocation_list_fetched_time_returns_min(self):
|
|
self.middleware.token_revocation_list_fetched_time = None
|
|
self.middleware.revoked_file_name = ''
|
|
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
|
|
datetime.datetime.min)
|
|
|
|
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
|
|
self.middleware.token_revocation_list_fetched_time = None
|
|
mtime = os.path.getmtime(self.middleware.revoked_file_name)
|
|
fetched_time = datetime.datetime.fromtimestamp(mtime)
|
|
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
|
|
fetched_time)
|
|
|
|
def test_get_token_revocation_list_fetched_time_returns_value(self):
|
|
expected = self.middleware._token_revocation_list_fetched_time
|
|
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
|
|
expected)
|
|
|
|
def test_get_revocation_list_returns_fetched_list(self):
|
|
# auth_token uses v2 to fetch this, so don't allow the v3
|
|
# tests to override the fake http connection
|
|
self.set_fake_http(FakeHTTPConnection)
|
|
self.middleware.token_revocation_list_fetched_time = None
|
|
os.remove(self.middleware.revoked_file_name)
|
|
self.assertEqual(self.middleware.token_revocation_list,
|
|
REVOCATION_LIST)
|
|
|
|
def test_get_revocation_list_returns_current_list_from_memory(self):
|
|
self.assertEqual(self.middleware.token_revocation_list,
|
|
self.middleware._token_revocation_list)
|
|
|
|
def test_get_revocation_list_returns_current_list_from_disk(self):
|
|
in_memory_list = self.middleware.token_revocation_list
|
|
self.middleware._token_revocation_list = None
|
|
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
|
|
|
|
def test_invalid_revocation_list_raises_service_error(self):
|
|
globals()['SIGNED_REVOCATION_LIST'] = "{}"
|
|
self.assertRaises(auth_token.ServiceError,
|
|
self.middleware.fetch_revocation_list)
|
|
|
|
def test_fetch_revocation_list(self):
|
|
# auth_token uses v2 to fetch this, so don't allow the v3
|
|
# tests to override the fake http connection
|
|
self.set_fake_http(FakeHTTPConnection)
|
|
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
|
|
self.assertEqual(fetched_list, REVOCATION_LIST)
|
|
|
|
def test_request_invalid_uuid_token(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = 'invalid-token'
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
"Keystone uri='https://keystone.example.com:1234'")
|
|
|
|
def test_request_invalid_signed_token(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
"Keystone uri='https://keystone.example.com:1234'")
|
|
|
|
def test_request_no_token(self):
|
|
req = webob.Request.blank('/')
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
"Keystone uri='https://keystone.example.com:1234'")
|
|
|
|
def test_request_no_token_log_message(self):
|
|
class FakeLog(object):
|
|
def __init__(self):
|
|
self.msg = None
|
|
self.debugmsg = None
|
|
|
|
def warn(self, msg=None, *args, **kwargs):
|
|
self.msg = msg
|
|
|
|
def debug(self, msg=None, *args, **kwargs):
|
|
self.debugmsg = msg
|
|
|
|
self.middleware.LOG = FakeLog()
|
|
self.middleware.delay_auth_decision = False
|
|
self.assertRaises(auth_token.InvalidUserToken,
|
|
self.middleware._get_user_token_from_header, {})
|
|
self.assertIsNotNone(self.middleware.LOG.msg)
|
|
self.assertIsNotNone(self.middleware.LOG.debugmsg)
|
|
|
|
def test_request_blank_token(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = ''
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
"Keystone uri='https://keystone.example.com:1234'")
|
|
|
|
def _get_cached_token(self, token):
|
|
token_id = cms.cms_hash_token(token)
|
|
# NOTE(vish): example tokens are expired so skip the expiration check.
|
|
key = self.middleware._get_cache_key(token_id)
|
|
cached = self.middleware._cache.get(key)
|
|
return self.middleware._unprotect_cache_value(token, cached)
|
|
|
|
def test_memcache(self):
|
|
req = webob.Request.blank('/')
|
|
token = self.token_dict['signed_token_scoped']
|
|
req.headers['X-Auth-Token'] = token
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertNotEqual(self._get_cached_token(token), None)
|
|
|
|
def test_expired(self):
|
|
req = webob.Request.blank('/')
|
|
token = self.token_dict['signed_token_scoped_expired']
|
|
req.headers['X-Auth-Token'] = token
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
|
|
def test_memcache_set_invalid(self):
|
|
req = webob.Request.blank('/')
|
|
token = 'invalid-token'
|
|
req.headers['X-Auth-Token'] = token
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self._get_cached_token(token), "invalid")
|
|
|
|
def test_memcache_set_expired(self):
|
|
token_cache_time = 10
|
|
conf = {
|
|
'token_cache_time': token_cache_time,
|
|
'signing_dir': CERTDIR,
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
req = webob.Request.blank('/')
|
|
token = self.token_dict['signed_token_scoped']
|
|
req.headers['X-Auth-Token'] = token
|
|
try:
|
|
now = datetime.datetime.utcnow()
|
|
timeutils.set_time_override(now)
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertNotEqual(self._get_cached_token(token), None)
|
|
expired = now + datetime.timedelta(seconds=token_cache_time)
|
|
timeutils.set_time_override(expired)
|
|
self.assertEqual(self._get_cached_token(token), None)
|
|
finally:
|
|
timeutils.clear_time_override()
|
|
|
|
def test_swift_memcache_set_expired(self):
|
|
self.middleware._cache = FakeSwiftMemcacheRing()
|
|
self.middleware._use_keystone_cache = False
|
|
self.middleware._cache_initialized = True
|
|
self.test_memcache_set_expired()
|
|
|
|
def test_use_cache_from_env(self):
|
|
env = {'swift.cache': 'CACHE_TEST'}
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'cache': 'swift.cache',
|
|
'memcache_servers': ['localhost:11211']
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
self.middleware._init_cache(env)
|
|
self.assertEqual(self.middleware._cache, 'CACHE_TEST')
|
|
|
|
def test_will_expire_soon(self):
|
|
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
|
seconds=10)
|
|
self.assertTrue(auth_token.will_expire_soon(tenseconds))
|
|
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
|
seconds=40)
|
|
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
|
|
|
|
def test_encrypt_cache_data(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'encrypt',
|
|
'memcache_secret_key': 'mysecret'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
encrypted_data = self.middleware._protect_cache_value(
|
|
'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
|
|
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
|
|
self.assertEqual(
|
|
TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
|
|
self.middleware._unprotect_cache_value('token', encrypted_data))
|
|
# should return None if unable to decrypt
|
|
self.assertIsNone(
|
|
self.middleware._unprotect_cache_value(
|
|
'token', '{ENCRYPT:AES256}corrupted'))
|
|
self.assertIsNone(
|
|
self.middleware._unprotect_cache_value('mykey', encrypted_data))
|
|
|
|
def test_sign_cache_data(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'mac',
|
|
'memcache_secret_key': 'mysecret'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
signed_data = self.middleware._protect_cache_value(
|
|
'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
|
|
expected = '{MAC:SHA1}'
|
|
self.assertEqual(
|
|
signed_data[:10],
|
|
expected)
|
|
self.assertEqual(
|
|
TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
|
|
self.middleware._unprotect_cache_value('mykey', signed_data))
|
|
# should return None on corrupted data
|
|
self.assertIsNone(
|
|
self.middleware._unprotect_cache_value('mykey',
|
|
'{MAC:SHA1}corrupted'))
|
|
|
|
def test_no_memcache_protection(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_secret_key': 'mysecret'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
data = self.middleware._protect_cache_value('mykey',
|
|
'This is a test!')
|
|
self.assertEqual(data, 'This is a test!')
|
|
self.assertEqual(
|
|
'This is a test!',
|
|
self.middleware._unprotect_cache_value('mykey', data))
|
|
|
|
def test_get_cache_key(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_secret_key': 'mysecret'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
self.assertEqual(
|
|
'tokens/mytoken',
|
|
self.middleware._get_cache_key('mytoken'))
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'mac',
|
|
'memcache_secret_key': 'mysecret'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
|
|
self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'Encrypt',
|
|
'memcache_secret_key': 'abc!'
|
|
}
|
|
self.set_middleware(conf=conf)
|
|
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
|
|
self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
|
|
|
|
def test_assert_valid_memcache_protection_config(self):
|
|
# test missing memcache_secret_key
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'Encrypt'
|
|
}
|
|
self.assertRaises(Exception, self.set_middleware, conf)
|
|
# test invalue memcache_security_strategy
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'whatever'
|
|
}
|
|
self.assertRaises(Exception, self.set_middleware, conf)
|
|
# test missing memcache_secret_key
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'mac'
|
|
}
|
|
self.assertRaises(Exception, self.set_middleware, conf)
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'Encrypt',
|
|
'memcache_secret_key': ''
|
|
}
|
|
self.assertRaises(Exception, self.set_middleware, conf)
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'memcache_servers': ['localhost:11211'],
|
|
'memcache_security_strategy': 'mAc',
|
|
'memcache_secret_key': ''
|
|
}
|
|
self.assertRaises(Exception, self.set_middleware, conf)
|
|
|
|
def test_config_revocation_cache_timeout(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'revocation_cache_time': 24
|
|
}
|
|
middleware = auth_token.AuthProtocol(self.fake_app, conf)
|
|
self.assertEquals(middleware.token_revocation_list_cache_timeout,
|
|
datetime.timedelta(seconds=24))
|
|
|
|
|
|
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
""" v2 token specific tests.
|
|
|
|
There are some differences between how the auth-token middleware handles
|
|
v2 and v3 tokens over and above the token formats, namely:
|
|
|
|
- A v3 keystone server will auto scope a token to a user's default project
|
|
if no scope is specified. A v2 server assumes that the auth-token
|
|
middleware will do that.
|
|
- A v2 keystone server may issue a token without a catalog, even with a
|
|
tenant
|
|
|
|
The tests below were originally part of the generic AuthTokenMiddlewareTest
|
|
class, but now, since they really are v2 specifc, they are included here.
|
|
|
|
"""
|
|
def assert_unscoped_default_tenant_auto_scopes(self, token):
|
|
"""Unscoped v2 requests with a default tenant should "auto-scope."
|
|
|
|
The implied scope is the user's tenant ID.
|
|
|
|
"""
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = token
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertEqual(body, ['SUCCESS'])
|
|
self.assertTrue('keystone.token_info' in req.environ)
|
|
|
|
def test_default_tenant_uuid_token(self):
|
|
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
|
|
|
|
def test_default_tenant_signed_token(self):
|
|
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
|
|
|
|
def assert_unscoped_token_receives_401(self, token):
|
|
"""Unscoped requests with no default tenant ID should be rejected."""
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = token
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
"Keystone uri='https://keystone.example.com:1234'")
|
|
|
|
def test_unscoped_uuid_token_receives_401(self):
|
|
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
|
|
|
|
def test_unscoped_pki_token_receives_401(self):
|
|
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
|
|
|
|
def test_request_prevent_service_catalog_injection(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Service-Catalog'] = '[]'
|
|
req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertFalse(req.headers.get('X-Service-Catalog'))
|
|
self.assertEqual(body, ['SUCCESS'])
|
|
|
|
def test_valid_uuid_request_forced_to_2_0(self):
|
|
""" Test forcing auth_token to use lower api version.
|
|
|
|
By installing the v3 http hander, auth_token will be get
|
|
a version list that looks like a v3 server - from which it
|
|
would normally chose v3.0 as the auth version. However, here
|
|
we specify v2.0 in the configuration - which should force
|
|
auth_token to use that version instead.
|
|
|
|
"""
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'signing_dir': CERTDIR,
|
|
'auth_version': 'v2.0'
|
|
}
|
|
self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
|
|
# This tests will only work is auth_token has chosen to use the
|
|
# lower, v2, api version
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = UUID_TOKEN_DEFAULT
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
|
|
v3FakeHTTPConnection.last_requested_url)
|
|
|
|
def test_invalid_auth_version_request(self):
|
|
conf = {
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'signing_dir': CERTDIR,
|
|
'auth_version': 'v1.0' # v1.0 is no longer supported
|
|
}
|
|
self.assertRaises(Exception, self.set_middleware, conf)
|
|
|
|
|
|
class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
|
|
""" Test auth_token middleware with v3 tokens.
|
|
|
|
Re-execute the AuthTokenMiddlewareTest class tests, but with the
|
|
the auth_token middleware configured to expect v3 tokens back from
|
|
a keystone server.
|
|
|
|
This is done by configuring the AuthTokenMiddlewareTest class via
|
|
its Setup(), passing in v3 style data that will then be used by
|
|
the tests themselves. This approach has been used to ensure we
|
|
really are running the same tests for both v2 and v3 tokens.
|
|
|
|
There a few additional specific test for v3 only:
|
|
|
|
- We allow an unscoped token to be validated (as unscoped), where
|
|
as for v2 tokens, the auth_token middleware is expected to try and
|
|
auto-scope it (and fail if there is no default tenant)
|
|
- Domain scoped tokens
|
|
|
|
Since we don't specify an auth version for auth_token to use, by
|
|
definition we are thefore implicitely testing that it will use
|
|
the highest available auth version, i.e. v3.0
|
|
|
|
"""
|
|
def setUp(self):
|
|
token_dict = {
|
|
'uuid_token_default': v3_UUID_TOKEN_DEFAULT,
|
|
'uuid_token_unscoped': v3_UUID_TOKEN_UNSCOPED,
|
|
'signed_token_scoped': SIGNED_v3_TOKEN_SCOPED,
|
|
'signed_token_scoped_expired': SIGNED_TOKEN_SCOPED_EXPIRED,
|
|
'revoked_token': REVOKED_v3_TOKEN,
|
|
'revoked_token_hash': REVOKED_v3_TOKEN_HASH
|
|
}
|
|
super(v3AuthTokenMiddlewareTest, self).setUp(
|
|
auth_version='v3.0',
|
|
fake_app=v3FakeApp,
|
|
fake_http=v3FakeHTTPConnection,
|
|
token_dict=token_dict)
|
|
|
|
def assert_valid_last_url(self, token_id):
|
|
# Token ID is not part of the url in v3, so override
|
|
# this assert test in the base class
|
|
self.assertEqual('/testadmin/v3/auth/tokens',
|
|
v3FakeHTTPConnection.last_requested_url)
|
|
|
|
def test_valid_unscoped_uuid_request(self):
|
|
# Remove items that won't be in an unscoped token
|
|
delta_expected_env = {
|
|
'HTTP_X_PROJECT_ID': None,
|
|
'HTTP_X_PROJECT_NAME': None,
|
|
'HTTP_X_PROJECT_DOMAIN_ID': None,
|
|
'HTTP_X_PROJECT_DOMAIN_NAME': None,
|
|
'HTTP_X_TENANT_ID': None,
|
|
'HTTP_X_TENANT_NAME': None,
|
|
'HTTP_X_ROLES': '',
|
|
'HTTP_X_TENANT': None,
|
|
'HTTP_X_ROLE': '',
|
|
}
|
|
self.set_middleware(expected_env=delta_expected_env)
|
|
self.assert_valid_request_200(v3_UUID_TOKEN_UNSCOPED,
|
|
with_catalog=False)
|
|
self.assertEqual('/testadmin/v3/auth/tokens',
|
|
v3FakeHTTPConnection.last_requested_url)
|
|
|
|
def test_domain_scoped_uuid_request(self):
|
|
# Modify items compared to default token for a domain scope
|
|
delta_expected_env = {
|
|
'HTTP_X_DOMAIN_ID': 'domain_id1',
|
|
'HTTP_X_DOMAIN_NAME': 'domain_name1',
|
|
'HTTP_X_PROJECT_ID': None,
|
|
'HTTP_X_PROJECT_NAME': None,
|
|
'HTTP_X_PROJECT_DOMAIN_ID': None,
|
|
'HTTP_X_PROJECT_DOMAIN_NAME': None,
|
|
'HTTP_X_TENANT_ID': None,
|
|
'HTTP_X_TENANT_NAME': None,
|
|
'HTTP_X_TENANT': None
|
|
}
|
|
self.set_middleware(expected_env=delta_expected_env)
|
|
self.assert_valid_request_200(v3_UUID_TOKEN_DOMAIN_SCOPED)
|
|
self.assertEqual('/testadmin/v3/auth/tokens',
|
|
v3FakeHTTPConnection.last_requested_url)
|
|
|
|
|
|
class TokenEncodingTest(testtools.TestCase):
|
|
def test_unquoted_token(self):
|
|
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
|
|
|
|
def test_quoted_token(self):
|
|
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
|