Files
python-keystoneclient/tests/test_auth_token_middleware.py

945 lines
36 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import iso8601
import os
import string
import sys
import tempfile
import testtools
import webob
from keystoneclient.common import cms
from keystoneclient import utils
from keystoneclient.middleware import auth_token
from keystoneclient.middleware import memcache_crypt
from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import timeutils
from keystoneclient.middleware import test
CERTDIR = test.rootdir('examples', 'pki', 'certs')
KEYDIR = test.rootdir('examples', 'pki', 'private')
CMSDIR = test.rootdir('examples', 'pki', 'cms')
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
CA = os.path.join(CERTDIR, 'ca.pem')
REVOCATION_LIST = None
REVOKED_TOKEN = None
REVOKED_TOKEN_HASH = None
SIGNED_REVOCATION_LIST = None
SIGNED_TOKEN_SCOPED = None
SIGNED_TOKEN_UNSCOPED = None
SIGNED_TOKEN_SCOPED_KEY = None
SIGNED_TOKEN_UNSCOPED_KEY = None
VALID_SIGNED_REVOCATION_LIST = None
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
INVALID_SIGNED_TOKEN = string.replace(
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
0000000000000000000000000000000000000000000000000000000000000000
1111111111111111111111111111111111111111111111111111111111111111
2222222222222222222222222222222222222222222222222222222222222222
3333333333333333333333333333333333333333333333333333333333333333
4444444444444444444444444444444444444444444444444444444444444444
5555555555555555555555555555555555555555555555555555555555555555
6666666666666666666666666666666666666666666666666666666666666666
7777777777777777777777777777777777777777777777777777777777777777
8888888888888888888888888888888888888888888888888888888888888888
9999999999999999999999999999999999999999999999999999999999999999
0000000000000000000000000000000000000000000000000000000000000000
xg==""", "\n", "")
# JSON responses keyed by token ID
TOKEN_RESPONSES = {
UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': UUID_TOKEN_DEFAULT,
'expires': '2999-01-01T00:00:10Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
},
VALID_DIABLO_TOKEN: {
'access': {
'token': {
'id': VALID_DIABLO_TOKEN,
'expires': '2999-01-01T00:00:10',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': UUID_TOKEN_UNSCOPED,
'expires': '2999-01-01T00:00:10Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2999-01-01T00:00:10Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
}
FAKE_RESPONSE_STACK = []
# The data for these tests are signed using openssl and are stored in files
# in the signing subdirectory. In order to keep the values consistent between
# the tests and the signed documents, we read them in for use in the tests.
def setUpModule(self):
signing_path = CMSDIR
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
self.REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
{'signed': f.read()})
self.SIGNED_TOKEN_SCOPED_KEY =\
cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
self.SIGNED_TOKEN_UNSCOPED_KEY =\
cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
'access': {
'token': {
'id': self.SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
}
self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
'access': {
'token': {
'id': SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
class FakeMemcache(object):
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
def get(self, key):
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
if not data or key != "tokens/%s" % (data['access']['token']['id']):
return
if not self.token_expiration:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
self.token_expiration = dt.strftime("%s")
dt = datetime.datetime.now() + datetime.timedelta(hours=24)
ks_expires = dt.isoformat()
data['access']['token']['expires'] = ks_expires
return (data, str(self.token_expiration))
def set(self, key, value, time=None):
self.set_value = value
self.set_key = key
class FakeSwiftMemcacheRing(object):
def __init__(self):
self.set_key = None
self.set_value = None
self.token_expiration = None
def get(self, key):
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
if not data or key != "tokens/%s" % (data['access']['token']['id']):
return
if not self.token_expiration:
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
self.token_expiration = dt.strftime("%s")
dt = datetime.datetime.now() + datetime.timedelta(hours=24)
ks_expires = dt.isoformat()
data['access']['token']['expires'] = ks_expires
return (data, str(self.token_expiration))
def set(self, key, value, serialize=True, timeout=0):
self.set_value = value
self.set_key = key
class FakeHTTPResponse(object):
def __init__(self, status, body):
self.status = status
self.body = body
def read(self):
return self.body
class FakeStackHTTPConnection(object):
def __init__(self, *args, **kwargs):
pass
def getresponse(self):
if len(FAKE_RESPONSE_STACK):
return FAKE_RESPONSE_STACK.pop()
return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
def request(self, *_args, **_kwargs):
pass
def close(self):
pass
class FakeHTTPConnection(object):
last_requested_url = ''
def __init__(self, *args, **kwargs):
self.send_valid_revocation_list = True
def request(self, method, path, **kwargs):
"""Fakes out several http responses.
If a POST request is made, we assume the calling code is trying
to get a new admin token.
If a GET request is made to validate a token, return success
if the token is 'token1'. If a different token is provided, return
a 404, indicating an unknown (therefore unauthorized) token.
"""
FakeHTTPConnection.last_requested_url = path
if method == 'POST':
status = 200
body = jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
})
else:
token_id = path.rsplit('/', 1)[1]
if token_id in TOKEN_RESPONSES.keys():
status = 200
body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
elif token_id == "revoked":
status = 200
body = SIGNED_REVOCATION_LIST
else:
status = 404
body = str()
self.resp = FakeHTTPResponse(status, body)
def getresponse(self):
return self.resp
def close(self):
pass
class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware."""
def __init__(self, expected_env=None):
expected_env = expected_env or {}
self.expected_env = {
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_name1',
'HTTP_X_USER_ID': 'user_id1',
'HTTP_X_USER_NAME': 'user_name1',
'HTTP_X_ROLES': 'role1,role2',
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
self.expected_env.update(expected_env)
def __call__(self, env, start_response):
for k, v in self.expected_env.items():
assert env[k] == v, '%s != %s' % (env[k], v)
resp = webob.Response()
resp.body = 'SUCCESS'
return resp(env, start_response)
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
def setUp(self, expected_env=None):
testtools.TestCase.setUp(self)
expected_env = expected_env or {}
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR,
}
self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
self.middleware.http_client_class = FakeHTTPConnection
self.middleware._iso8601 = iso8601
self.response_status = None
self.response_headers = None
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
cache_timeout = datetime.timedelta(days=1)
self.middleware.token_revocation_list_cache_timeout = cache_timeout
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
signed_list = 'SIGNED_REVOCATION_LIST'
valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
globals()[signed_list] = globals()[valid_signed_list]
def tearDown(self):
testtools.TestCase.tearDown(self)
try:
os.remove(self.middleware.revoked_file_name)
except OSError:
pass
def start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
self.response_headers = dict(headers)
if tuple(sys.version_info)[0:2] < (2, 7):
# 2.6 doesn't have the assert dict equals so make sure that it exists
class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def assertIsInstance(self, obj, cls, msg=None):
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
default message."""
if not isinstance(obj, cls):
standardMsg = '%s is not an instance of %r' % (obj, cls)
self.fail(self._formatMessage(msg, standardMsg))
def assertDictEqual(self, d1, d2, msg=None):
# Simple version taken from 2.7
self.assertIsInstance(d1, dict,
'First argument is not a dictionary')
self.assertIsInstance(d2, dict,
'Second argument is not a dictionary')
if d1 != d2:
if msg:
self.fail(msg)
else:
standardMsg = '%r != %r' % (d1, d2)
self.fail(standardMsg)
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""Auth Token middleware test setup that allows the tests to define
a stack of responses to HTTP requests in the test and get those
responses back in sequence for testing.
Example::
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
})
FAKE_RESPONSE_STACK.append(resp1)
FAKE_RESPONSE_STACK.append(resp2)
... do your testing code here ...
"""
def setUp(self, expected_env=None):
super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
self.middleware.http_client_class = FakeStackHTTPConnection
def test_fetch_revocation_list_with_expire(self):
# first response to revocation list should return 401 Unauthorized
# to pretend to be an expired token
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
}))
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
}))
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
# first get_admin_token() call
FAKE_RESPONSE_STACK.append(resp1)
# request revocation list, get "unauthorized" due to simulated expired
# token
FAKE_RESPONSE_STACK.append(resp2)
# request a new admin_token
FAKE_RESPONSE_STACK.append(resp3)
# request revocation list, get the revocation list properly
FAKE_RESPONSE_STACK.append(resp4)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""Auth Token middleware should understand Diablo keystone responses."""
def setUp(self):
# pre-diablo only had Tenant ID, which was also the Name
expected_env = {
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_id1',
# now deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_id1',
}
super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
def test_valid_diablo_response(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertTrue('keystone.token_info' in req.environ)
class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
def assert_valid_request_200(self, token):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertTrue(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
self.assertTrue('keystone.token_info' in req.environ)
def test_valid_uuid_request(self):
self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
FakeHTTPConnection.last_requested_url)
def test_valid_signed_request(self):
FakeHTTPConnection.last_requested_url = ''
self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
"/testadmin")
#ensure that signed requests do not generate HTTP traffic
self.assertEqual('', FakeHTTPConnection.last_requested_url)
def assert_unscoped_default_tenant_auto_scopes(self, token):
"""Unscoped requests with a default tenant should "auto-scope."
The implied scope is the user's tenant ID.
"""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual(body, ['SUCCESS'])
self.assertTrue('keystone.token_info' in req.environ)
def test_default_tenant_uuid_token(self):
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
def test_default_tenant_signed_token(self):
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected."""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
'Keystone uri=\'https://keystone.example.com:1234\'')
def test_unscoped_uuid_token_receives_401(self):
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
def test_unscoped_pki_token_receives_401(self):
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
def test_revoked_token_receives_401(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = REVOKED_TOKEN
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
def get_revocation_list_json(self, token_ids=None):
if token_ids is None:
token_ids = [REVOKED_TOKEN_HASH]
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
for x in token_ids]}
return jsonutils.dumps(revocation_list)
def test_is_signed_token_revoked_returns_false(self):
#explicitly setting an empty revocation list here to document intent
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.assertRaises(auth_token.InvalidUserToken,
self.middleware.verify_signed_token, REVOKED_TOKEN)
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
def test_cert_file_missing(self):
self.assertFalse(self.middleware.cert_file_missing(
"openstack: /tmp/haystack: No such file or directory",
"/tmp/needle"))
self.assertTrue(self.middleware.cert_file_missing(
"openstack: /not/exist: No such file or directory",
"/not/exist"))
def test_get_token_revocation_list_fetched_time_returns_min(self):
self.middleware.token_revocation_list_fetched_time = None
self.middleware.revoked_file_name = ''
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
datetime.datetime.min)
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware.token_revocation_list_fetched_time = None
mtime = os.path.getmtime(self.middleware.revoked_file_name)
fetched_time = datetime.datetime.fromtimestamp(mtime)
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
fetched_time)
def test_get_token_revocation_list_fetched_time_returns_value(self):
expected = self.middleware._token_revocation_list_fetched_time
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
expected)
def test_get_revocation_list_returns_fetched_list(self):
self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list,
REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware.token_revocation_list,
self.middleware._token_revocation_list)
def test_get_revocation_list_returns_current_list_from_disk(self):
in_memory_list = self.middleware.token_revocation_list
self.middleware._token_revocation_list = None
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
def test_invalid_revocation_list_raises_service_error(self):
globals()['SIGNED_REVOCATION_LIST'] = "{}"
self.assertRaises(auth_token.ServiceError,
self.middleware.fetch_revocation_list)
def test_fetch_revocation_list(self):
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
'Keystone uri=\'https://keystone.example.com:1234\'')
def test_request_invalid_signed_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
'Keystone uri=\'https://keystone.example.com:1234\'')
def test_request_no_token(self):
req = webob.Request.blank('/')
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
'Keystone uri=\'https://keystone.example.com:1234\'')
def test_request_no_token_log_message(self):
class FakeLog(object):
def __init__(self):
self.msg = None
self.debugmsg = None
def warn(self, msg=None, *args, **kwargs):
self.msg = msg
def debug(self, msg=None, *args, **kwargs):
self.debugmsg = msg
self.middleware.LOG = FakeLog()
self.middleware.delay_auth_decision = False
self.assertRaises(auth_token.InvalidUserToken,
self.middleware._get_user_token_from_header, {})
self.assertIsNotNone(self.middleware.LOG.msg)
self.assertIsNotNone(self.middleware.LOG.debugmsg)
def test_request_blank_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = ''
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
'Keystone uri=\'https://keystone.example.com:1234\'')
def test_memcache(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
self.middleware._cache = FakeMemcache()
self.middleware._use_keystone_cache = True
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, None)
def test_memcache_set_invalid(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
self.middleware._cache = FakeMemcache()
self.middleware._use_keystone_cache = True
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.middleware._cache.set_value, "invalid")
def test_memcache_set_expired(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
self.middleware._cache = FakeMemcache()
self.middleware._use_keystone_cache = True
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(len(self.middleware._cache.set_value), 2)
def test_swift_memcache_set_expired(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
self.middleware._cache = FakeSwiftMemcacheRing()
self.middleware._use_keystone_cache = False
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(len(self.middleware._cache.set_value), 2)
def test_nomemcache(self):
self.disable_module('memcache')
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
}
auth_token.AuthProtocol(FakeApp(), conf)
def test_use_cache_from_env(self):
env = {'swift.cache': 'CACHE_TEST'}
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'cache': 'swift.cache',
'memcache_servers': 'localhost:11211',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
auth._init_cache(env)
self.assertEqual(auth._cache, 'CACHE_TEST')
def test_not_use_cache_from_env(self):
self.disable_module('memcache')
env = {'swift.cache': 'CACHE_TEST'}
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
auth._init_cache(env)
self.assertEqual(auth._cache, None)
def test_request_prevent_service_catalog_injection(self):
req = webob.Request.blank('/')
req.headers['X-Service-Catalog'] = '[]'
req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertFalse(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
def test_will_expire_soon(self):
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=10)
self.assertTrue(auth_token.will_expire_soon(tenseconds))
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=40)
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
def test_encrypt_cache_data(self):
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'encrypt',
'memcache_secret_key': 'mysecret',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
encrypted_data = \
auth._protect_cache_value('token',
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
self.assertEqual(
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
auth._unprotect_cache_value('token', encrypted_data))
# should return None if unable to decrypt
self.assertIsNone(
auth._unprotect_cache_value('token', '{ENCRYPT:AES256}corrupted'))
self.assertIsNone(
auth._unprotect_cache_value('mykey', encrypted_data))
def test_sign_cache_data(self):
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
'memcache_secret_key': 'mysecret',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
signed_data = \
auth._protect_cache_value('mykey',
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
expected = '{MAC:SHA1}'
self.assertEqual(
signed_data[:10],
expected)
self.assertEqual(
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
auth._unprotect_cache_value('mykey', signed_data))
# should return None on corrupted data
self.assertIsNone(
auth._unprotect_cache_value('mykey', '{MAC:SHA1}corrupted'))
def test_no_memcache_protection(self):
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_secret_key': 'mysecret',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
data = auth._protect_cache_value('mykey', 'This is a test!')
self.assertEqual(data, 'This is a test!')
self.assertEqual(
'This is a test!',
auth._unprotect_cache_value('mykey', data))
def test_get_cache_key(self):
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_secret_key': 'mysecret',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
self.assertEqual(
'tokens/mytoken',
auth._get_cache_key('mytoken'))
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
'memcache_secret_key': 'mysecret',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
self.assertEqual(auth._get_cache_key('mytoken'), expected)
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': 'abc!',
}
auth = auth_token.AuthProtocol(FakeApp(), conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
self.assertEqual(auth._get_cache_key('mytoken'), expected)
def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
}
self.assertRaises(Exception, auth_token.AuthProtocol,
FakeApp(), conf)
# test invalue memcache_security_strategy
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'whatever',
}
self.assertRaises(Exception, auth_token.AuthProtocol,
FakeApp(), conf)
# test missing memcache_secret_key
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
}
self.assertRaises(Exception, auth_token.AuthProtocol,
FakeApp(), conf)
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': ''
}
self.assertRaises(Exception, auth_token.AuthProtocol,
FakeApp(), conf)
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mAc',
'memcache_secret_key': ''
}
self.assertRaises(Exception, auth_token.AuthProtocol,
FakeApp(), conf)
class TokenEncodingTest(testtools.TestCase):
def test_unquoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
def test_quoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))