The python function string.find() returns -1 on a miss, which is also evaluated as True. Therefore use the "X in Y" approach instead. Also added a rather trivial test to test for this code bug. In order to make the code easier to test, I've changed the parameters to operate on the command output, not the exception object and updated all callers. Change-Id: If0b4fed6fe676cad50512267c1b601a3a8a631e5
919 lines
35 KiB
Python
919 lines
35 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2012 OpenStack LLC
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import iso8601
|
|
import os
|
|
import string
|
|
import tempfile
|
|
import testtools
|
|
|
|
import webob
|
|
|
|
from keystoneclient.common import cms
|
|
from keystoneclient import utils
|
|
from keystoneclient.middleware import auth_token
|
|
from keystoneclient.middleware import memcache_crypt
|
|
from keystoneclient.openstack.common import jsonutils
|
|
from keystoneclient.openstack.common import timeutils
|
|
from keystoneclient.middleware import test
|
|
|
|
|
|
CERTDIR = test.rootdir("python-keystoneclient/examples/pki/certs")
|
|
KEYDIR = test.rootdir("python-keystoneclient/examples/pki/private")
|
|
CMSDIR = test.rootdir("python-keystoneclient/examples/pki/cms")
|
|
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
|
|
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
|
|
CA = os.path.join(CERTDIR, 'ca.pem')
|
|
|
|
REVOCATION_LIST = None
|
|
REVOKED_TOKEN = None
|
|
REVOKED_TOKEN_HASH = None
|
|
SIGNED_REVOCATION_LIST = None
|
|
SIGNED_TOKEN_SCOPED = None
|
|
SIGNED_TOKEN_UNSCOPED = None
|
|
SIGNED_TOKEN_SCOPED_KEY = None
|
|
SIGNED_TOKEN_UNSCOPED_KEY = None
|
|
|
|
VALID_SIGNED_REVOCATION_LIST = None
|
|
|
|
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
|
|
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
|
|
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
|
|
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
|
|
|
|
INVALID_SIGNED_TOKEN = string.replace(
|
|
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
|
|
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
|
|
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
|
|
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
|
|
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
|
|
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
|
|
0000000000000000000000000000000000000000000000000000000000000000
|
|
1111111111111111111111111111111111111111111111111111111111111111
|
|
2222222222222222222222222222222222222222222222222222222222222222
|
|
3333333333333333333333333333333333333333333333333333333333333333
|
|
4444444444444444444444444444444444444444444444444444444444444444
|
|
5555555555555555555555555555555555555555555555555555555555555555
|
|
6666666666666666666666666666666666666666666666666666666666666666
|
|
7777777777777777777777777777777777777777777777777777777777777777
|
|
8888888888888888888888888888888888888888888888888888888888888888
|
|
9999999999999999999999999999999999999999999999999999999999999999
|
|
0000000000000000000000000000000000000000000000000000000000000000
|
|
xg==""", "\n", "")
|
|
|
|
# JSON responses keyed by token ID
|
|
TOKEN_RESPONSES = {
|
|
UUID_TOKEN_DEFAULT: {
|
|
'access': {
|
|
'token': {
|
|
'id': UUID_TOKEN_DEFAULT,
|
|
'expires': '2999-01-01T00:00:10Z',
|
|
'tenant': {
|
|
'id': 'tenant_id1',
|
|
'name': 'tenant_name1',
|
|
},
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
'serviceCatalog': {}
|
|
},
|
|
},
|
|
VALID_DIABLO_TOKEN: {
|
|
'access': {
|
|
'token': {
|
|
'id': VALID_DIABLO_TOKEN,
|
|
'expires': '2999-01-01T00:00:10',
|
|
'tenantId': 'tenant_id1',
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
},
|
|
UUID_TOKEN_UNSCOPED: {
|
|
'access': {
|
|
'token': {
|
|
'id': UUID_TOKEN_UNSCOPED,
|
|
'expires': '2999-01-01T00:00:10Z',
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
},
|
|
UUID_TOKEN_NO_SERVICE_CATALOG: {
|
|
'access': {
|
|
'token': {
|
|
'id': 'valid-token',
|
|
'expires': '2999-01-01T00:00:10Z',
|
|
'tenant': {
|
|
'id': 'tenant_id1',
|
|
'name': 'tenant_name1',
|
|
},
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
}
|
|
},
|
|
},
|
|
}
|
|
|
|
FAKE_RESPONSE_STACK = []
|
|
|
|
|
|
# The data for these tests are signed using openssl and are stored in files
|
|
# in the signing subdirectory. In order to keep the values consistent between
|
|
# the tests and the signed documents, we read them in for use in the tests.
|
|
def setUpModule(self):
|
|
signing_path = CMSDIR
|
|
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
|
|
self.SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
|
|
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
|
|
self.SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
|
|
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
|
|
self.REVOKED_TOKEN = cms.cms_to_token(f.read())
|
|
self.REVOKED_TOKEN_HASH = utils.hash_signed_token(self.REVOKED_TOKEN)
|
|
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
|
|
self.REVOCATION_LIST = jsonutils.loads(f.read())
|
|
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
|
|
self.VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
|
|
{'signed': f.read()})
|
|
self.SIGNED_TOKEN_SCOPED_KEY =\
|
|
cms.cms_hash_token(self.SIGNED_TOKEN_SCOPED)
|
|
self.SIGNED_TOKEN_UNSCOPED_KEY =\
|
|
cms.cms_hash_token(self.SIGNED_TOKEN_UNSCOPED)
|
|
|
|
self.TOKEN_RESPONSES[self.SIGNED_TOKEN_SCOPED_KEY] = {
|
|
'access': {
|
|
'token': {
|
|
'id': self.SIGNED_TOKEN_SCOPED_KEY,
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'tenantId': 'tenant_id1',
|
|
'tenantName': 'tenant_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
}
|
|
|
|
self.TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
|
|
'access': {
|
|
'token': {
|
|
'id': SIGNED_TOKEN_UNSCOPED_KEY,
|
|
},
|
|
'user': {
|
|
'id': 'user_id1',
|
|
'name': 'user_name1',
|
|
'roles': [
|
|
{'name': 'role1'},
|
|
{'name': 'role2'},
|
|
],
|
|
},
|
|
},
|
|
},
|
|
|
|
|
|
class FakeMemcache(object):
|
|
def __init__(self):
|
|
self.set_key = None
|
|
self.set_value = None
|
|
self.token_expiration = None
|
|
|
|
def get(self, key):
|
|
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
|
|
if not data or key != "tokens/%s" % (data['access']['token']['id']):
|
|
return
|
|
if not self.token_expiration:
|
|
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
|
|
self.token_expiration = dt.strftime("%s")
|
|
dt = datetime.datetime.now() + datetime.timedelta(hours=24)
|
|
ks_expires = dt.isoformat()
|
|
data['access']['token']['expires'] = ks_expires
|
|
return (data, str(self.token_expiration))
|
|
|
|
def set(self, key, value, time=None):
|
|
self.set_value = value
|
|
self.set_key = key
|
|
|
|
|
|
class FakeSwiftMemcacheRing(object):
|
|
def __init__(self):
|
|
self.set_key = None
|
|
self.set_value = None
|
|
self.token_expiration = None
|
|
|
|
def get(self, key):
|
|
data = TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY].copy()
|
|
if not data or key != "tokens/%s" % (data['access']['token']['id']):
|
|
return
|
|
if not self.token_expiration:
|
|
dt = datetime.datetime.now() + datetime.timedelta(minutes=5)
|
|
self.token_expiration = dt.strftime("%s")
|
|
dt = datetime.datetime.now() + datetime.timedelta(hours=24)
|
|
ks_expires = dt.isoformat()
|
|
data['access']['token']['expires'] = ks_expires
|
|
return (data, str(self.token_expiration))
|
|
|
|
def set(self, key, value, serialize=True, timeout=0):
|
|
self.set_value = value
|
|
self.set_key = key
|
|
|
|
|
|
class FakeHTTPResponse(object):
|
|
def __init__(self, status, body):
|
|
self.status = status
|
|
self.body = body
|
|
|
|
def read(self):
|
|
return self.body
|
|
|
|
|
|
class FakeStackHTTPConnection(object):
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
pass
|
|
|
|
def getresponse(self):
|
|
if len(FAKE_RESPONSE_STACK):
|
|
return FAKE_RESPONSE_STACK.pop()
|
|
return FakeHTTPResponse(500, jsonutils.dumps('UNEXPECTED RESPONSE'))
|
|
|
|
def request(self, *_args, **_kwargs):
|
|
pass
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class FakeHTTPConnection(object):
|
|
|
|
last_requested_url = ''
|
|
|
|
def __init__(self, *args):
|
|
self.send_valid_revocation_list = True
|
|
|
|
def request(self, method, path, **kwargs):
|
|
"""Fakes out several http responses.
|
|
|
|
If a POST request is made, we assume the calling code is trying
|
|
to get a new admin token.
|
|
|
|
If a GET request is made to validate a token, return success
|
|
if the token is 'token1'. If a different token is provided, return
|
|
a 404, indicating an unknown (therefore unauthorized) token.
|
|
|
|
"""
|
|
FakeHTTPConnection.last_requested_url = path
|
|
if method == 'POST':
|
|
status = 200
|
|
body = jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
})
|
|
|
|
else:
|
|
token_id = path.rsplit('/', 1)[1]
|
|
if token_id in TOKEN_RESPONSES.keys():
|
|
status = 200
|
|
body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
|
|
elif token_id == "revoked":
|
|
status = 200
|
|
body = SIGNED_REVOCATION_LIST
|
|
else:
|
|
status = 404
|
|
body = str()
|
|
|
|
self.resp = FakeHTTPResponse(status, body)
|
|
|
|
def getresponse(self):
|
|
return self.resp
|
|
|
|
def close(self):
|
|
pass
|
|
|
|
|
|
class FakeApp(object):
|
|
"""This represents a WSGI app protected by the auth_token middleware."""
|
|
def __init__(self, expected_env=None):
|
|
expected_env = expected_env or {}
|
|
self.expected_env = {
|
|
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
|
|
'HTTP_X_TENANT_ID': 'tenant_id1',
|
|
'HTTP_X_TENANT_NAME': 'tenant_name1',
|
|
'HTTP_X_USER_ID': 'user_id1',
|
|
'HTTP_X_USER_NAME': 'user_name1',
|
|
'HTTP_X_ROLES': 'role1,role2',
|
|
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
|
|
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
|
|
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
|
|
}
|
|
self.expected_env.update(expected_env)
|
|
|
|
def __call__(self, env, start_response):
|
|
for k, v in self.expected_env.items():
|
|
assert env[k] == v, '%s != %s' % (env[k], v)
|
|
|
|
resp = webob.Response()
|
|
resp.body = 'SUCCESS'
|
|
return resp(env, start_response)
|
|
|
|
|
|
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
|
|
|
|
def setUp(self, expected_env=None):
|
|
super(BaseAuthTokenMiddlewareTest, self).setUp()
|
|
expected_env = expected_env or {}
|
|
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'auth_admin_prefix': '/testadmin',
|
|
'signing_dir': CERTDIR,
|
|
}
|
|
|
|
self.middleware = auth_token.AuthProtocol(FakeApp(expected_env), conf)
|
|
self.middleware.http_client_class = FakeHTTPConnection
|
|
self.middleware._iso8601 = iso8601
|
|
|
|
self.response_status = None
|
|
self.response_headers = None
|
|
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
|
|
cache_timeout = datetime.timedelta(days=1)
|
|
self.middleware.token_revocation_list_cache_timeout = cache_timeout
|
|
self.middleware.token_revocation_list = jsonutils.dumps(
|
|
{"revoked": [], "extra": "success"})
|
|
|
|
signed_list = 'SIGNED_REVOCATION_LIST'
|
|
valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
|
|
globals()[signed_list] = globals()[valid_signed_list]
|
|
|
|
super(BaseAuthTokenMiddlewareTest, self).setUp()
|
|
|
|
def tearDown(self):
|
|
super(BaseAuthTokenMiddlewareTest, self).tearDown()
|
|
try:
|
|
os.remove(self.middleware.revoked_file_name)
|
|
except OSError:
|
|
pass
|
|
|
|
def start_fake_response(self, status, headers):
|
|
self.response_status = int(status.split(' ', 1)[0])
|
|
self.response_headers = dict(headers)
|
|
|
|
|
|
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
"""Auth Token middleware test setup that allows the tests to define
|
|
a stack of responses to HTTP requests in the test and get those
|
|
responses back in sequence for testing.
|
|
|
|
Example::
|
|
|
|
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
|
|
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
})
|
|
FAKE_RESPONSE_STACK.append(resp1)
|
|
FAKE_RESPONSE_STACK.append(resp2)
|
|
|
|
... do your testing code here ...
|
|
|
|
"""
|
|
|
|
def setUp(self, expected_env=None):
|
|
super(StackResponseAuthTokenMiddlewareTest, self).setUp(expected_env)
|
|
self.middleware.http_client_class = FakeStackHTTPConnection
|
|
|
|
def test_fetch_revocation_list_with_expire(self):
|
|
# first response to revocation list should return 401 Unauthorized
|
|
# to pretend to be an expired token
|
|
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
}))
|
|
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
|
|
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
|
|
'access': {
|
|
'token': {'id': 'admin_token2'},
|
|
},
|
|
}))
|
|
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
|
|
|
|
# first get_admin_token() call
|
|
FAKE_RESPONSE_STACK.append(resp1)
|
|
# request revocation list, get "unauthorized" due to simulated expired
|
|
# token
|
|
FAKE_RESPONSE_STACK.append(resp2)
|
|
# request a new admin_token
|
|
FAKE_RESPONSE_STACK.append(resp3)
|
|
# request revocation list, get the revocation list properly
|
|
FAKE_RESPONSE_STACK.append(resp4)
|
|
|
|
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
|
|
self.assertEqual(fetched_list, REVOCATION_LIST)
|
|
|
|
|
|
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
|
|
"""Auth Token middleware should understand Diablo keystone responses."""
|
|
def setUp(self):
|
|
# pre-diablo only had Tenant ID, which was also the Name
|
|
expected_env = {
|
|
'HTTP_X_TENANT_ID': 'tenant_id1',
|
|
'HTTP_X_TENANT_NAME': 'tenant_id1',
|
|
# now deprecated (diablo-compat)
|
|
'HTTP_X_TENANT': 'tenant_id1',
|
|
}
|
|
super(DiabloAuthTokenMiddlewareTest, self).setUp(expected_env)
|
|
|
|
def test_valid_diablo_response(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertTrue('keystone.token_info' in req.environ)
|
|
|
|
|
|
class AuthTokenMiddlewareTest(test.NoModule, BaseAuthTokenMiddlewareTest):
|
|
def assert_valid_request_200(self, token):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = token
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertTrue(req.headers.get('X-Service-Catalog'))
|
|
self.assertEqual(body, ['SUCCESS'])
|
|
self.assertTrue('keystone.token_info' in req.environ)
|
|
|
|
def test_valid_uuid_request(self):
|
|
self.assert_valid_request_200(UUID_TOKEN_DEFAULT)
|
|
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
|
|
FakeHTTPConnection.last_requested_url)
|
|
|
|
def test_valid_signed_request(self):
|
|
FakeHTTPConnection.last_requested_url = ''
|
|
self.assert_valid_request_200(SIGNED_TOKEN_SCOPED)
|
|
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
|
|
"/testadmin")
|
|
#ensure that signed requests do not generate HTTP traffic
|
|
self.assertEqual('', FakeHTTPConnection.last_requested_url)
|
|
|
|
def assert_unscoped_default_tenant_auto_scopes(self, token):
|
|
"""Unscoped requests with a default tenant should "auto-scope."
|
|
|
|
The implied scope is the user's tenant ID.
|
|
|
|
"""
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = token
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertEqual(body, ['SUCCESS'])
|
|
self.assertTrue('keystone.token_info' in req.environ)
|
|
|
|
def test_default_tenant_uuid_token(self):
|
|
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
|
|
|
|
def test_default_tenant_signed_token(self):
|
|
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
|
|
|
|
def assert_unscoped_token_receives_401(self, token):
|
|
"""Unscoped requests with no default tenant ID should be rejected."""
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = token
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
'Keystone uri=\'https://keystone.example.com:1234\'')
|
|
|
|
def test_unscoped_uuid_token_receives_401(self):
|
|
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
|
|
|
|
def test_unscoped_pki_token_receives_401(self):
|
|
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
|
|
|
|
def test_revoked_token_receives_401(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = REVOKED_TOKEN
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
|
|
def get_revocation_list_json(self, token_ids=None):
|
|
if token_ids is None:
|
|
token_ids = [REVOKED_TOKEN_HASH]
|
|
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
|
|
for x in token_ids]}
|
|
return jsonutils.dumps(revocation_list)
|
|
|
|
def test_is_signed_token_revoked_returns_false(self):
|
|
#explicitly setting an empty revocation list here to document intent
|
|
self.middleware.token_revocation_list = jsonutils.dumps(
|
|
{"revoked": [], "extra": "success"})
|
|
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
|
|
self.assertFalse(result)
|
|
|
|
def test_is_signed_token_revoked_returns_true(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
result = self.middleware.is_signed_token_revoked(REVOKED_TOKEN)
|
|
self.assertTrue(result)
|
|
|
|
def test_verify_signed_token_raises_exception_for_revoked_token(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
self.assertRaises(auth_token.InvalidUserToken,
|
|
self.middleware.verify_signed_token, REVOKED_TOKEN)
|
|
|
|
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
|
|
self.middleware.token_revocation_list = self.get_revocation_list_json()
|
|
self.middleware.verify_signed_token(SIGNED_TOKEN_SCOPED)
|
|
|
|
def test_cert_file_missing(self):
|
|
self.assertFalse(self.middleware.cert_file_missing(
|
|
"openstack: /tmp/haystack: No such file or directory",
|
|
"/tmp/needle"))
|
|
self.assertTrue(self.middleware.cert_file_missing(
|
|
"openstack: /not/exist: No such file or directory",
|
|
"/not/exist"))
|
|
|
|
def test_get_token_revocation_list_fetched_time_returns_min(self):
|
|
self.middleware.token_revocation_list_fetched_time = None
|
|
self.middleware.revoked_file_name = ''
|
|
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
|
|
datetime.datetime.min)
|
|
|
|
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
|
|
self.middleware.token_revocation_list_fetched_time = None
|
|
mtime = os.path.getmtime(self.middleware.revoked_file_name)
|
|
fetched_time = datetime.datetime.fromtimestamp(mtime)
|
|
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
|
|
fetched_time)
|
|
|
|
def test_get_token_revocation_list_fetched_time_returns_value(self):
|
|
expected = self.middleware._token_revocation_list_fetched_time
|
|
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
|
|
expected)
|
|
|
|
def test_get_revocation_list_returns_fetched_list(self):
|
|
self.middleware.token_revocation_list_fetched_time = None
|
|
os.remove(self.middleware.revoked_file_name)
|
|
self.assertEqual(self.middleware.token_revocation_list,
|
|
REVOCATION_LIST)
|
|
|
|
def test_get_revocation_list_returns_current_list_from_memory(self):
|
|
self.assertEqual(self.middleware.token_revocation_list,
|
|
self.middleware._token_revocation_list)
|
|
|
|
def test_get_revocation_list_returns_current_list_from_disk(self):
|
|
in_memory_list = self.middleware.token_revocation_list
|
|
self.middleware._token_revocation_list = None
|
|
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
|
|
|
|
def test_invalid_revocation_list_raises_service_error(self):
|
|
globals()['SIGNED_REVOCATION_LIST'] = "{}"
|
|
self.assertRaises(auth_token.ServiceError,
|
|
self.middleware.fetch_revocation_list)
|
|
|
|
def test_fetch_revocation_list(self):
|
|
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
|
|
self.assertEqual(fetched_list, REVOCATION_LIST)
|
|
|
|
def test_request_invalid_uuid_token(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = 'invalid-token'
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
'Keystone uri=\'https://keystone.example.com:1234\'')
|
|
|
|
def test_request_invalid_signed_token(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
'Keystone uri=\'https://keystone.example.com:1234\'')
|
|
|
|
def test_request_no_token(self):
|
|
req = webob.Request.blank('/')
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
'Keystone uri=\'https://keystone.example.com:1234\'')
|
|
|
|
def test_request_no_token_log_message(self):
|
|
class FakeLog(object):
|
|
def __init__(self):
|
|
self.msg = None
|
|
self.debugmsg = None
|
|
|
|
def warn(self, msg=None, *args, **kwargs):
|
|
self.msg = msg
|
|
|
|
def debug(self, msg=None, *args, **kwargs):
|
|
self.debugmsg = msg
|
|
|
|
self.middleware.LOG = FakeLog()
|
|
self.middleware.delay_auth_decision = False
|
|
self.assertRaises(auth_token.InvalidUserToken,
|
|
self.middleware._get_user_token_from_header, {})
|
|
self.assertIsNotNone(self.middleware.LOG.msg)
|
|
self.assertIsNotNone(self.middleware.LOG.debugmsg)
|
|
|
|
def test_request_blank_token(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = ''
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 401)
|
|
self.assertEqual(self.response_headers['WWW-Authenticate'],
|
|
'Keystone uri=\'https://keystone.example.com:1234\'')
|
|
|
|
def test_memcache(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
|
|
self.middleware._cache = FakeMemcache()
|
|
self.middleware._use_keystone_cache = True
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.middleware._cache.set_value, None)
|
|
|
|
def test_memcache_set_invalid(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = 'invalid-token'
|
|
self.middleware._cache = FakeMemcache()
|
|
self.middleware._use_keystone_cache = True
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.middleware._cache.set_value, "invalid")
|
|
|
|
def test_memcache_set_expired(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
|
|
self.middleware._cache = FakeMemcache()
|
|
self.middleware._use_keystone_cache = True
|
|
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
|
|
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(len(self.middleware._cache.set_value), 2)
|
|
|
|
def test_swift_memcache_set_expired(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Auth-Token'] = SIGNED_TOKEN_SCOPED
|
|
self.middleware._cache = FakeSwiftMemcacheRing()
|
|
self.middleware._use_keystone_cache = False
|
|
expired = datetime.datetime.now() - datetime.timedelta(minutes=1)
|
|
self.middleware._cache.token_expiration = float(expired.strftime("%s"))
|
|
self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(len(self.middleware._cache.set_value), 2)
|
|
|
|
def test_nomemcache(self):
|
|
self.disable_module('memcache')
|
|
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
}
|
|
|
|
auth_token.AuthProtocol(FakeApp(), conf)
|
|
|
|
def test_use_cache_from_env(self):
|
|
env = {'swift.cache': 'CACHE_TEST'}
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'cache': 'swift.cache',
|
|
'memcache_servers': 'localhost:11211',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
auth._init_cache(env)
|
|
self.assertEqual(auth._cache, 'CACHE_TEST')
|
|
|
|
def test_not_use_cache_from_env(self):
|
|
self.disable_module('memcache')
|
|
env = {'swift.cache': 'CACHE_TEST'}
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
auth._init_cache(env)
|
|
self.assertEqual(auth._cache, None)
|
|
|
|
def test_request_prevent_service_catalog_injection(self):
|
|
req = webob.Request.blank('/')
|
|
req.headers['X-Service-Catalog'] = '[]'
|
|
req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
|
|
body = self.middleware(req.environ, self.start_fake_response)
|
|
self.assertEqual(self.response_status, 200)
|
|
self.assertFalse(req.headers.get('X-Service-Catalog'))
|
|
self.assertEqual(body, ['SUCCESS'])
|
|
|
|
def test_will_expire_soon(self):
|
|
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
|
seconds=10)
|
|
self.assertTrue(auth_token.will_expire_soon(tenseconds))
|
|
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
|
|
seconds=40)
|
|
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
|
|
|
|
def test_encrypt_cache_data(self):
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'encrypt',
|
|
'memcache_secret_key': 'mysecret',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
encrypted_data = \
|
|
auth._protect_cache_value('token',
|
|
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
|
|
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
|
|
self.assertDictEqual(
|
|
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
|
|
auth._unprotect_cache_value('token', encrypted_data))
|
|
# should return None if unable to decrypt
|
|
self.assertIsNone(
|
|
auth._unprotect_cache_value('token', '{ENCRYPT:AES256}corrupted'))
|
|
self.assertIsNone(
|
|
auth._unprotect_cache_value('mykey', encrypted_data))
|
|
|
|
def test_sign_cache_data(self):
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'mac',
|
|
'memcache_secret_key': 'mysecret',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
signed_data = \
|
|
auth._protect_cache_value('mykey',
|
|
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT])
|
|
expected = '{MAC:SHA1}'
|
|
self.assertEqual(
|
|
signed_data[:10],
|
|
expected)
|
|
self.assertDictEqual(
|
|
TOKEN_RESPONSES[UUID_TOKEN_DEFAULT],
|
|
auth._unprotect_cache_value('mykey', signed_data))
|
|
# should return None on corrupted data
|
|
self.assertIsNone(
|
|
auth._unprotect_cache_value('mykey', '{MAC:SHA1}corrupted'))
|
|
|
|
def test_no_memcache_protection(self):
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_secret_key': 'mysecret',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
data = auth._protect_cache_value('mykey', 'This is a test!')
|
|
self.assertEqual(data, 'This is a test!')
|
|
self.assertEqual(
|
|
'This is a test!',
|
|
auth._unprotect_cache_value('mykey', data))
|
|
|
|
def test_get_cache_key(self):
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_secret_key': 'mysecret',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
self.assertEqual(
|
|
'tokens/mytoken',
|
|
auth._get_cache_key('mytoken'))
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'mac',
|
|
'memcache_secret_key': 'mysecret',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
|
|
self.assertEqual(auth._get_cache_key('mytoken'), expected)
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'Encrypt',
|
|
'memcache_secret_key': 'abc!',
|
|
}
|
|
auth = auth_token.AuthProtocol(FakeApp(), conf)
|
|
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
|
|
self.assertEqual(auth._get_cache_key('mytoken'), expected)
|
|
|
|
def test_assert_valid_memcache_protection_config(self):
|
|
# test missing memcache_secret_key
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'Encrypt',
|
|
}
|
|
self.assertRaises(Exception, auth_token.AuthProtocol,
|
|
FakeApp(), conf)
|
|
# test invalue memcache_security_strategy
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'whatever',
|
|
}
|
|
self.assertRaises(Exception, auth_token.AuthProtocol,
|
|
FakeApp(), conf)
|
|
# test missing memcache_secret_key
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'mac',
|
|
}
|
|
self.assertRaises(Exception, auth_token.AuthProtocol,
|
|
FakeApp(), conf)
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'Encrypt',
|
|
'memcache_secret_key': ''
|
|
}
|
|
self.assertRaises(Exception, auth_token.AuthProtocol,
|
|
FakeApp(), conf)
|
|
conf = {
|
|
'admin_token': 'admin_token1',
|
|
'auth_host': 'keystone.example.com',
|
|
'auth_port': 1234,
|
|
'memcache_servers': 'localhost:11211',
|
|
'memcache_security_strategy': 'mAc',
|
|
'memcache_secret_key': ''
|
|
}
|
|
self.assertRaises(Exception, auth_token.AuthProtocol,
|
|
FakeApp(), conf)
|
|
|
|
|
|
class TokenEncodingTest(testtools.TestCase):
|
|
def test_unquoted_token(self):
|
|
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
|
|
|
|
def test_quoted_token(self):
|
|
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))
|