Files
python-keystoneclient/tests/test_auth_token_middleware.py
Monty Taylor 11263ac318 Use testr instead of nose.
Part of blueprint grizzly-testtools

Change-Id: I76dee19781eaac21901b5c0258e83a42180c1702
2013-05-10 15:39:10 -07:00

1420 lines
52 KiB
Python

# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import datetime
import iso8601
import os
import string
import sys
import tempfile
import testtools
import fixtures
import webob
from keystoneclient.common import cms
from keystoneclient import utils
from keystoneclient.middleware import auth_token
from keystoneclient.middleware import memcache_crypt
from keystoneclient.openstack.common import memorycache
from keystoneclient.openstack.common import jsonutils
from keystoneclient.openstack.common import timeutils
ROOTDIR = os.path.dirname(os.path.dirname(os.path.realpath(__file__)))
CERTDIR = os.path.join(ROOTDIR, "examples/pki/certs")
KEYDIR = os.path.join(ROOTDIR, "examples/pki/private")
CMSDIR = os.path.join(ROOTDIR, "examples/pki/cms")
SIGNING_CERT = os.path.join(CERTDIR, 'signing_cert.pem')
SIGNING_KEY = os.path.join(KEYDIR, 'signing_key.pem')
CA = os.path.join(CERTDIR, 'ca.pem')
REVOCATION_LIST = None
REVOKED_TOKEN = None
REVOKED_TOKEN_HASH = None
REVOKED_v3_TOKEN = None
REVOKED_v3_TOKEN_HASH = None
SIGNED_REVOCATION_LIST = None
SIGNED_TOKEN_SCOPED = None
SIGNED_TOKEN_UNSCOPED = None
SIGNED_v3_TOKEN_SCOPED = None
SIGNED_v3_TOKEN_UNSCOPED = None
SIGNED_TOKEN_SCOPED_KEY = None
SIGNED_TOKEN_UNSCOPED_KEY = None
SIGNED_v3_TOKEN_SCOPED_KEY = None
VALID_SIGNED_REVOCATION_LIST = None
UUID_TOKEN_DEFAULT = "ec6c0710ec2f471498484c1b53ab4f9d"
UUID_TOKEN_NO_SERVICE_CATALOG = '8286720fbe4941e69fa8241723bb02df'
UUID_TOKEN_UNSCOPED = '731f903721c14827be7b2dc912af7776'
VALID_DIABLO_TOKEN = 'b0cf19b55dbb4f20a6ee18e6c6cf1726'
v3_UUID_TOKEN_DEFAULT = '5603457654b346fdbb93437bfe76f2f1'
v3_UUID_TOKEN_UNSCOPED = 'd34835fdaec447e695a0a024d84f8d79'
v3_UUID_TOKEN_DOMAIN_SCOPED = 'e8a7b63aaa4449f38f0c5c05c3581792'
INVALID_SIGNED_TOKEN = string.replace(
"""AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA
BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB
CCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCCC
DDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDDD
EEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEEE
FFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFFF
0000000000000000000000000000000000000000000000000000000000000000
1111111111111111111111111111111111111111111111111111111111111111
2222222222222222222222222222222222222222222222222222222222222222
3333333333333333333333333333333333333333333333333333333333333333
4444444444444444444444444444444444444444444444444444444444444444
5555555555555555555555555555555555555555555555555555555555555555
6666666666666666666666666666666666666666666666666666666666666666
7777777777777777777777777777777777777777777777777777777777777777
8888888888888888888888888888888888888888888888888888888888888888
9999999999999999999999999999999999999999999999999999999999999999
0000000000000000000000000000000000000000000000000000000000000000
xg==""", "\n", "")
# JSON responses keyed by token ID
TOKEN_RESPONSES = {
UUID_TOKEN_DEFAULT: {
'access': {
'token': {
'id': UUID_TOKEN_DEFAULT,
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
'serviceCatalog': {}
},
},
VALID_DIABLO_TOKEN: {
'access': {
'token': {
'id': VALID_DIABLO_TOKEN,
'expires': '2020-01-01T00:00:10.000123Z',
'tenantId': 'tenant_id1',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_UNSCOPED: {
'access': {
'token': {
'id': UUID_TOKEN_UNSCOPED,
'expires': '2020-01-01T00:00:10.000123Z',
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
},
UUID_TOKEN_NO_SERVICE_CATALOG: {
'access': {
'token': {
'id': 'valid-token',
'expires': '2020-01-01T00:00:10.000123Z',
'tenant': {
'id': 'tenant_id1',
'name': 'tenant_name1',
},
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
}
},
},
v3_UUID_TOKEN_DEFAULT: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
},
v3_UUID_TOKEN_UNSCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
}
}
},
v3_UUID_TOKEN_DOMAIN_SCOPED: {
'token': {
'expires_at': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'domain': {
'id': 'domain_id1',
'name': 'domain_name1',
},
'roles': [
{'name': 'role1', 'id': 'Role1'},
{'name': 'role2', 'id': 'Role2'},
],
'catalog': {}
}
}
}
EXPECTED_V2_DEFAULT_ENV_RESPONSE = {
'HTTP_X_IDENTITY_STATUS': 'Confirmed',
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_name1',
'HTTP_X_USER_ID': 'user_id1',
'HTTP_X_USER_NAME': 'user_name1',
'HTTP_X_ROLES': 'role1,role2',
'HTTP_X_USER': 'user_name1', # deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_name1', # deprecated (diablo-compat)
'HTTP_X_ROLE': 'role1,role2', # deprecated (diablo-compat)
}
FAKE_RESPONSE_STACK = []
# @TODO(mordred) This should become a testresources resource attached to the
# class
# The data for these tests are signed using openssl and are stored in files
# in the signing subdirectory. In order to keep the values consistent between
# the tests and the signed documents, we read them in for use in the tests.
signing_path = CMSDIR
with open(os.path.join(signing_path, 'auth_token_scoped.pem')) as f:
SIGNED_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_unscoped.pem')) as f:
SIGNED_TOKEN_UNSCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_v3_token_scoped.pem')) as f:
SIGNED_v3_TOKEN_SCOPED = cms.cms_to_token(f.read())
with open(os.path.join(signing_path, 'auth_token_revoked.pem')) as f:
REVOKED_TOKEN = cms.cms_to_token(f.read())
REVOKED_TOKEN_HASH = utils.hash_signed_token(REVOKED_TOKEN)
with open(os.path.join(signing_path, 'auth_v3_token_revoked.pem')) as f:
REVOKED_v3_TOKEN = cms.cms_to_token(f.read())
REVOKED_v3_TOKEN_HASH = utils.hash_signed_token(REVOKED_v3_TOKEN)
with open(os.path.join(signing_path, 'revocation_list.json')) as f:
REVOCATION_LIST = jsonutils.loads(f.read())
with open(os.path.join(signing_path, 'revocation_list.pem')) as f:
VALID_SIGNED_REVOCATION_LIST = jsonutils.dumps(
{'signed': f.read()})
SIGNED_TOKEN_SCOPED_KEY =\
cms.cms_hash_token(SIGNED_TOKEN_SCOPED)
SIGNED_TOKEN_UNSCOPED_KEY =\
cms.cms_hash_token(SIGNED_TOKEN_UNSCOPED)
SIGNED_v3_TOKEN_SCOPED_KEY = (
cms.cms_hash_token(SIGNED_v3_TOKEN_SCOPED))
TOKEN_RESPONSES[SIGNED_TOKEN_SCOPED_KEY] = {
'access': {
'token': {
'id': SIGNED_TOKEN_SCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'tenantId': 'tenant_id1',
'tenantName': 'tenant_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
}
TOKEN_RESPONSES[SIGNED_TOKEN_UNSCOPED_KEY] = {
'access': {
'token': {
'id': SIGNED_TOKEN_UNSCOPED_KEY,
},
'user': {
'id': 'user_id1',
'name': 'user_name1',
'roles': [
{'name': 'role1'},
{'name': 'role2'},
],
},
},
}
TOKEN_RESPONSES[SIGNED_v3_TOKEN_SCOPED_KEY] = {
'token': {
'expires': '2020-01-01T00:00:10.000123Z',
'user': {
'id': 'user_id1',
'name': 'user_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'project': {
'id': 'tenant_id1',
'name': 'tenant_name1',
'domain': {
'id': 'domain_id1',
'name': 'domain_name1'
}
},
'roles': [
{'name': 'role1'},
{'name': 'role2'}
],
'catalog': {}
}
}
VERSION_LIST_v3 = {
"versions": {
"values": [
{
"id": "v3.0",
"status": "stable",
"updated": "2013-03-06T00:00:00Z",
"links": []
},
{
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": []
}
]
}
}
VERSION_LIST_v2 = {
"versions": {
"values": [
{
"id": "v2.0",
"status": "beta",
"updated": "2011-11-19T00:00:00Z",
"links": []
}
]
}
}
class NoModuleFinder(object):
""" Disallow further imports of 'module' """
def __init__(self, module):
self.module = module
def find_module(self, fullname, path):
if fullname == self.module or fullname.startswith(self.module + '.'):
raise ImportError
class DisableModuleFixture(fixtures.Fixture):
"""A fixture to provide support for unloading/disabling modules."""
def __init__(self, module, *args, **kw):
super(DisableModuleFixture, self).__init__(*args, **kw)
self.module = module
self._finders = []
self._cleared_modules = {}
def tearDown(self):
super(DisableModuleFixture, self).tearDown()
for finder in self._finders:
sys.meta_path.remove(finder)
sys.modules.update(self._cleared_modules)
def clear_module(self):
cleared_modules = {}
for fullname in sys.modules.keys():
if (fullname == self.module or
fullname.startswith(self.module + '.')):
cleared_modules[fullname] = sys.modules.pop(fullname)
return cleared_modules
def setUp(self):
"""Ensure ImportError for the specified module."""
super(DisableModuleFixture, self).setUp()
# Clear 'module' references in sys.modules
self._cleared_modules.update(self.clear_module())
finder = NoModuleFinder(self.module)
self._finders.append(finder)
sys.meta_path.insert(0, finder)
class FakeSwiftMemcacheRing(memorycache.Client):
# NOTE(vish): swift memcache uses param timeout instead of time
def set(self, key, value, timeout=0, min_compress_len=0):
sup = super(FakeSwiftMemcacheRing, self)
sup.set(key, value, timeout, min_compress_len)
class FakeHTTPResponse(object):
def __init__(self, status, body):
self.status = status
self.body = body
def read(self):
return self.body
class BaseFakeHTTPConnection(object):
def _user_token_responses(self, token_id):
""" Emulate user token responses.
Return success if the token is in the list we know
about. If the request is for revoked tokens, then return
the revoked list, else if a different token is provided,
return 404 indicating an unknown (therefore unauthorized) token.
"""
if token_id in TOKEN_RESPONSES.keys():
status = 200
body = jsonutils.dumps(TOKEN_RESPONSES[token_id])
elif token_id == "revoked":
status = 200
body = SIGNED_REVOCATION_LIST
else:
status = 404
body = str()
return status, body
def fake_v2_responses(self, path):
token_id = path.rsplit('/', 1)[1]
return self._user_token_responses(token_id)
def fake_v3_responses(self, path, **kwargs):
headers = kwargs.get('headers')
token_id = headers['X-Subject-Token']
return self._user_token_responses(token_id)
def fake_v2_admin_token(self, path):
status = 200
body = jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2',
'expires': '2012-10-03T16:58:01Z'}
},
})
return status, body
class FakeHTTPConnection(BaseFakeHTTPConnection):
""" Emulate a fake Keystone v2 server """
def __init__(self, *args, **kwargs):
self.send_valid_revocation_list = True
self.resp = None
def request(self, method, path, **kwargs):
"""Fakes out several http responses.
Support the following requests:
- Create admin token ('POST /testadmin/v2.0/tokens')
- Get versions ('GET /testadmin/')
- Get v2 user token responses (see fake_v2_responses)
"""
FakeHTTPConnection.last_requested_url = path
if method == 'POST' and path == '/testadmin/v2.0/tokens':
status, body = self.fake_v2_admin_token(path)
else:
if path == '/testadmin/':
# It's a GET versions call
status = 300
body = jsonutils.dumps(VERSION_LIST_v2)
else:
status, body = self.fake_v2_responses(path)
self.resp = FakeHTTPResponse(status, body)
def getresponse(self):
# If self.resp is set then this is just the response to
# the earlier request. If it is not set, then we expect
# a stack of responses to have been pre-prepared
if self.resp:
return self.resp
else:
if len(FAKE_RESPONSE_STACK):
return FAKE_RESPONSE_STACK.pop()
return FakeHTTPResponse(
500, jsonutils.dumps('UNEXPECTED RESPONSE'))
def close(self):
pass
class v3FakeHTTPConnection(FakeHTTPConnection):
""" Emulate a fake Keystone v3 server """
def request(self, method, path, **kwargs):
"""Fakes out several http responses.
Support the following requests:
- Create admin token ('POST /testadmin/v2.0/tokens')
- Get versions ('GET /testadmin/')
- Get v2 user token responses (see fake_v2_responses)
- Get v3 user token responses (see fake_v3_responses)
"""
v3FakeHTTPConnection.last_requested_url = path
if method == 'POST' and path == '/testadmin/v2.0/tokens':
status, body = self.fake_v2_admin_token(path)
else:
if path == '/testadmin/':
# It's a GET versions call
status = 300
body = jsonutils.dumps(VERSION_LIST_v3)
elif path.split('/')[2] == 'v2.0':
status, body = self.fake_v2_responses(path)
else:
status, body = self.fake_v3_responses(path, **kwargs)
self.resp = FakeHTTPResponse(status, body)
class RaisingHTTPConnection(FakeHTTPConnection):
""" An HTTPConnection that always raises."""
def request(self, method, path, **kwargs):
raise AssertionError("HTTP request was called.")
class FakeApp(object):
"""This represents a WSGI app protected by the auth_token middleware."""
def __init__(self, expected_env=None):
expected_env = expected_env or {}
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
self.expected_env.update(expected_env)
def __call__(self, env, start_response):
for k, v in self.expected_env.items():
assert env[k] == v, '%s != %s' % (env[k], v)
resp = webob.Response()
resp.body = 'SUCCESS'
return resp(env, start_response)
class v3FakeApp(object):
"""This represents a v3 WSGI app protected by the auth_token middleware."""
def __init__(self, expected_env=None):
expected_env = expected_env or {}
# We should always get back the same v2 items
self.expected_env = dict(EXPECTED_V2_DEFAULT_ENV_RESPONSE)
# ...and with v3 additions, these are for the DEFAULT TOKEN
v3_default_env_additions = {
'HTTP_X_PROJECT_ID': 'tenant_id1',
'HTTP_X_PROJECT_NAME': 'tenant_name1',
'HTTP_X_PROJECT_DOMAIN_ID': 'domain_id1',
'HTTP_X_PROJECT_DOMAIN_NAME': 'domain_name1',
'HTTP_X_USER_DOMAIN_ID': 'domain_id1',
'HTTP_X_USER_DOMAIN_NAME': 'domain_name1'
}
self.expected_env.update(v3_default_env_additions)
# And finally update for anything passed in
self.expected_env.update(expected_env)
def __call__(self, env, start_response):
for k, v in self.expected_env.items():
assert env[k] == v, '%s != %s' % (env[k], v)
resp = webob.Response()
resp.body = 'SUCCESS'
return resp(env, start_response)
class BaseAuthTokenMiddlewareTest(testtools.TestCase):
""" Base test class for auth_token middleware.
All the tests allow for running with auth_token
configured for receiving v2 or v3 tokens, with the
choice being made by passing configuration data into
Setup().
The base class will, by default, run all the tests
expecting v2 token formats. Child classes can override
this to specify, for instance, v3 format.
"""
def setUp(self, expected_env=None, auth_version=None,
fake_app=None, fake_http=None, token_dict=None):
testtools.TestCase.setUp(self)
expected_env = expected_env or {}
if token_dict:
self.token_dict = token_dict
else:
self.token_dict = {
'uuid_token_default': UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': UUID_TOKEN_UNSCOPED,
'signed_token_scoped': SIGNED_TOKEN_SCOPED,
'revoked_token': REVOKED_TOKEN,
'revoked_token_hash': REVOKED_TOKEN_HASH
}
self.conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR,
'auth_version': auth_version
}
# Base assumes v2 for fake app and http, can be overridden for
# child classes by called set_middleware() directly
self.fake_app = fake_app or FakeApp
self.fake_http = fake_http or FakeHTTPConnection
self.set_middleware(self.fake_app, self.fake_http,
expected_env, self.conf)
self.response_status = None
self.response_headers = None
signed_list = 'SIGNED_REVOCATION_LIST'
valid_signed_list = 'VALID_SIGNED_REVOCATION_LIST'
globals()[signed_list] = globals()[valid_signed_list]
def set_fake_http(self, http_handler):
""" Configure the http handler for the auth_token middleware.
Allows tests to override the default handler on specific tests,
e.g. to use v2 for those parts of auth_token that still use v2
tokens while running the v3 test class, i.e. getting an admin
token or revocation list.
"""
self.middleware.http_client_class = http_handler
def set_middleware(self, fake_app=None, fake_http=None,
expected_env=None, conf=None):
""" Configure the class ready to call the auth_token middleware.
Set up the various fake items needed to run the middleware.
Individual tests that need to further refine these can call this
function to override the class defaults.
"""
conf = conf or self.conf
if 'http_handler' not in conf:
fake_http = fake_http or self.fake_http
conf['http_handler'] = fake_http
fake_app = fake_app or self.fake_app
self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
self.middleware._iso8601 = iso8601
self.middleware.revoked_file_name = tempfile.mkstemp()[1]
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
def tearDown(self):
testtools.TestCase.tearDown(self)
try:
os.remove(self.middleware.revoked_file_name)
except OSError:
pass
def start_fake_response(self, status, headers):
self.response_status = int(status.split(' ', 1)[0])
self.response_headers = dict(headers)
if tuple(sys.version_info)[0:2] < (2, 7):
# 2.6 doesn't have the assert dict equals so make sure that it exists
class AdjustedBaseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def assertIsInstance(self, obj, cls, msg=None):
"""Same as self.assertTrue(isinstance(obj, cls)), with a nicer
default message."""
if not isinstance(obj, cls):
standardMsg = '%s is not an instance of %r' % (obj, cls)
self.fail(self._formatMessage(msg, standardMsg))
def assertDictEqual(self, d1, d2, msg=None):
# Simple version taken from 2.7
self.assertIsInstance(d1, dict,
'First argument is not a dictionary')
self.assertIsInstance(d2, dict,
'Second argument is not a dictionary')
if d1 != d2:
if msg:
self.fail(msg)
else:
standardMsg = '%r != %r' % (d1, d2)
self.fail(standardMsg)
BaseAuthTokenMiddlewareTest = AdjustedBaseAuthTokenMiddlewareTest
class StackResponseAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""Auth Token middleware test setup that allows the tests to define
a stack of responses to HTTP requests in the test and get those
responses back in sequence for testing.
Example::
resp1 = FakeHTTPResponse(401, jsonutils.dumps(''))
resp2 = FakeHTTPResponse(200, jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
})
FAKE_RESPONSE_STACK.append(resp1)
FAKE_RESPONSE_STACK.append(resp2)
... do your testing code here ...
"""
def setUp(self):
super(StackResponseAuthTokenMiddlewareTest, self).setUp()
def test_fetch_revocation_list_with_expire(self):
# first response to revocation list should return 401 Unauthorized
# to pretend to be an expired token
resp1 = FakeHTTPResponse(200, jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
}))
resp2 = FakeHTTPResponse(401, jsonutils.dumps(''))
resp3 = FakeHTTPResponse(200, jsonutils.dumps({
'access': {
'token': {'id': 'admin_token2'},
},
}))
resp4 = FakeHTTPResponse(200, SIGNED_REVOCATION_LIST)
# first get_admin_token() call
FAKE_RESPONSE_STACK.append(resp1)
# request revocation list, get "unauthorized" due to simulated expired
# token
FAKE_RESPONSE_STACK.append(resp2)
# request a new admin_token
FAKE_RESPONSE_STACK.append(resp3)
# request revocation list, get the revocation list properly
FAKE_RESPONSE_STACK.append(resp4)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST)
class DiabloAuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""Auth Token middleware should understand Diablo keystone responses."""
def setUp(self):
# pre-diablo only had Tenant ID, which was also the Name
expected_env = {
'HTTP_X_TENANT_ID': 'tenant_id1',
'HTTP_X_TENANT_NAME': 'tenant_id1',
# now deprecated (diablo-compat)
'HTTP_X_TENANT': 'tenant_id1',
}
super(DiabloAuthTokenMiddlewareTest, self).setUp(
expected_env=expected_env)
def test_valid_diablo_response(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = VALID_DIABLO_TOKEN
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertTrue('keystone.token_info' in req.environ)
class NoMemcacheAuthToken(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(NoMemcacheAuthToken, self).setUp()
self.useFixture(DisableModuleFixture('memcache'))
def test_nomemcache(self):
conf = {
'admin_token': 'admin_token1',
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'memcache_servers': 'localhost:11211',
}
auth_token.AuthProtocol(FakeApp(), conf)
def test_not_use_cache_from_env(self):
env = {'swift.cache': 'CACHE_TEST'}
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211'
}
self.set_middleware(conf=conf)
self.middleware._init_cache(env)
self.assertNotEqual(self.middleware._cache, 'CACHE_TEST')
class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
def test_init_does_not_call_http(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'revocation_cache_time': 1
}
self.set_fake_http(RaisingHTTPConnection)
self.set_middleware(conf=conf, fake_http=RaisingHTTPConnection)
def assert_valid_last_url(self, token_id):
# Default version (v2) has id in the token, override this
# method for v3 and other versions
self.assertEqual("/testadmin/v2.0/tokens/%s" % token_id,
self.middleware.http_client_class.last_requested_url)
def assert_valid_request_200(self, token, with_catalog=True):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
if with_catalog:
self.assertTrue(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
self.assertTrue('keystone.token_info' in req.environ)
def test_valid_uuid_request(self):
self.assert_valid_request_200(self.token_dict['uuid_token_default'])
self.assert_valid_last_url(self.token_dict['uuid_token_default'])
def test_valid_signed_request(self):
self.middleware.http_client_class.last_requested_url = ''
self.assert_valid_request_200(
self.token_dict['signed_token_scoped'])
self.assertEqual(self.middleware.conf['auth_admin_prefix'],
"/testadmin")
#ensure that signed requests do not generate HTTP traffic
self.assertEqual(
'', self.middleware.http_client_class.last_requested_url)
def test_revoked_token_receives_401(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = self.token_dict['revoked_token']
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
def get_revocation_list_json(self, token_ids=None):
if token_ids is None:
token_ids = [self.token_dict['revoked_token_hash']]
revocation_list = {'revoked': [{'id': x, 'expires': timeutils.utcnow()}
for x in token_ids]}
return jsonutils.dumps(revocation_list)
def test_is_signed_token_revoked_returns_false(self):
#explicitly setting an empty revocation list here to document intent
self.middleware.token_revocation_list = jsonutils.dumps(
{"revoked": [], "extra": "success"})
result = self.middleware.is_signed_token_revoked(
self.token_dict['revoked_token'])
self.assertFalse(result)
def test_is_signed_token_revoked_returns_true(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
result = self.middleware.is_signed_token_revoked(
self.token_dict['revoked_token'])
self.assertTrue(result)
def test_verify_signed_token_raises_exception_for_revoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.assertRaises(auth_token.InvalidUserToken,
self.middleware.verify_signed_token,
self.token_dict['revoked_token'])
def test_verify_signed_token_succeeds_for_unrevoked_token(self):
self.middleware.token_revocation_list = self.get_revocation_list_json()
self.middleware.verify_signed_token(
self.token_dict['signed_token_scoped'])
def test_cert_file_missing(self):
self.assertFalse(self.middleware.cert_file_missing(
"openstack: /tmp/haystack: No such file or directory",
"/tmp/needle"))
self.assertTrue(self.middleware.cert_file_missing(
"openstack: /not/exist: No such file or directory",
"/not/exist"))
def test_get_token_revocation_list_fetched_time_returns_min(self):
self.middleware.token_revocation_list_fetched_time = None
self.middleware.revoked_file_name = ''
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
datetime.datetime.min)
def test_get_token_revocation_list_fetched_time_returns_mtime(self):
self.middleware.token_revocation_list_fetched_time = None
mtime = os.path.getmtime(self.middleware.revoked_file_name)
fetched_time = datetime.datetime.fromtimestamp(mtime)
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
fetched_time)
def test_get_token_revocation_list_fetched_time_returns_value(self):
expected = self.middleware._token_revocation_list_fetched_time
self.assertEqual(self.middleware.token_revocation_list_fetched_time,
expected)
def test_get_revocation_list_returns_fetched_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
self.set_fake_http(FakeHTTPConnection)
self.middleware.token_revocation_list_fetched_time = None
os.remove(self.middleware.revoked_file_name)
self.assertEqual(self.middleware.token_revocation_list,
REVOCATION_LIST)
def test_get_revocation_list_returns_current_list_from_memory(self):
self.assertEqual(self.middleware.token_revocation_list,
self.middleware._token_revocation_list)
def test_get_revocation_list_returns_current_list_from_disk(self):
in_memory_list = self.middleware.token_revocation_list
self.middleware._token_revocation_list = None
self.assertEqual(self.middleware.token_revocation_list, in_memory_list)
def test_invalid_revocation_list_raises_service_error(self):
globals()['SIGNED_REVOCATION_LIST'] = "{}"
self.assertRaises(auth_token.ServiceError,
self.middleware.fetch_revocation_list)
def test_fetch_revocation_list(self):
# auth_token uses v2 to fetch this, so don't allow the v3
# tests to override the fake http connection
self.set_fake_http(FakeHTTPConnection)
fetched_list = jsonutils.loads(self.middleware.fetch_revocation_list())
self.assertEqual(fetched_list, REVOCATION_LIST)
def test_request_invalid_uuid_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = 'invalid-token'
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='https://keystone.example.com:1234'")
def test_request_invalid_signed_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = INVALID_SIGNED_TOKEN
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='https://keystone.example.com:1234'")
def test_request_no_token(self):
req = webob.Request.blank('/')
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='https://keystone.example.com:1234'")
def test_request_no_token_log_message(self):
class FakeLog(object):
def __init__(self):
self.msg = None
self.debugmsg = None
def warn(self, msg=None, *args, **kwargs):
self.msg = msg
def debug(self, msg=None, *args, **kwargs):
self.debugmsg = msg
self.middleware.LOG = FakeLog()
self.middleware.delay_auth_decision = False
self.assertRaises(auth_token.InvalidUserToken,
self.middleware._get_user_token_from_header, {})
self.assertIsNotNone(self.middleware.LOG.msg)
self.assertIsNotNone(self.middleware.LOG.debugmsg)
def test_request_blank_token(self):
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = ''
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='https://keystone.example.com:1234'")
def _get_cached_token(self, token):
token_id = cms.cms_hash_token(token)
# NOTE(vish): example tokens are expired so skip the expiration check.
key = self.middleware._get_cache_key(token_id)
cached = self.middleware._cache.get(key)
return self.middleware._unprotect_cache_value(token, cached)
def test_memcache(self):
req = webob.Request.blank('/')
token = self.token_dict['signed_token_scoped']
req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response)
self.assertNotEqual(self._get_cached_token(token), None)
def test_memcache_set_invalid(self):
req = webob.Request.blank('/')
token = 'invalid-token'
req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self._get_cached_token(token), "invalid")
def test_memcache_set_expired(self):
token_cache_time = 10
conf = {
'token_cache_time': token_cache_time,
'signing_dir': CERTDIR,
}
self.set_middleware(conf=conf)
req = webob.Request.blank('/')
token = self.token_dict['signed_token_scoped']
req.headers['X-Auth-Token'] = token
try:
now = datetime.datetime.utcnow()
timeutils.set_time_override(now)
self.middleware(req.environ, self.start_fake_response)
self.assertNotEqual(self._get_cached_token(token), None)
expired = now + datetime.timedelta(seconds=token_cache_time)
timeutils.set_time_override(expired)
self.assertEqual(self._get_cached_token(token), None)
finally:
timeutils.clear_time_override()
def test_swift_memcache_set_expired(self):
self.middleware._cache = FakeSwiftMemcacheRing()
self.middleware._use_keystone_cache = False
self.middleware._cache_initialized = True
self.test_memcache_set_expired()
def test_use_cache_from_env(self):
env = {'swift.cache': 'CACHE_TEST'}
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'cache': 'swift.cache',
'memcache_servers': 'localhost:11211'
}
self.set_middleware(conf=conf)
self.middleware._init_cache(env)
self.assertEqual(self.middleware._cache, 'CACHE_TEST')
def test_will_expire_soon(self):
tenseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=10)
self.assertTrue(auth_token.will_expire_soon(tenseconds))
fortyseconds = datetime.datetime.utcnow() + datetime.timedelta(
seconds=40)
self.assertFalse(auth_token.will_expire_soon(fortyseconds))
def test_encrypt_cache_data(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'encrypt',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
encrypted_data = self.middleware._protect_cache_value(
'token', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
self.assertEqual('{ENCRYPT:AES256}', encrypted_data[:16])
self.assertEqual(
TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
self.middleware._unprotect_cache_value('token', encrypted_data))
# should return None if unable to decrypt
self.assertIsNone(
self.middleware._unprotect_cache_value(
'token', '{ENCRYPT:AES256}corrupted'))
self.assertIsNone(
self.middleware._unprotect_cache_value('mykey', encrypted_data))
def test_sign_cache_data(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
signed_data = self.middleware._protect_cache_value(
'mykey', TOKEN_RESPONSES[self.token_dict['uuid_token_default']])
expected = '{MAC:SHA1}'
self.assertEqual(
signed_data[:10],
expected)
self.assertEqual(
TOKEN_RESPONSES[self.token_dict['uuid_token_default']],
self.middleware._unprotect_cache_value('mykey', signed_data))
# should return None on corrupted data
self.assertIsNone(
self.middleware._unprotect_cache_value('mykey',
'{MAC:SHA1}corrupted'))
def test_no_memcache_protection(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
data = self.middleware._protect_cache_value('mykey',
'This is a test!')
self.assertEqual(data, 'This is a test!')
self.assertEqual(
'This is a test!',
self.middleware._unprotect_cache_value('mykey', data))
def test_get_cache_key(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
self.assertEqual(
'tokens/mytoken',
self.middleware._get_cache_key('mytoken'))
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac',
'memcache_secret_key': 'mysecret'
}
self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'mysecret')
self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': 'abc!'
}
self.set_middleware(conf=conf)
expected = 'tokens/' + memcache_crypt.hash_data('mytoken' + 'abc!')
self.assertEqual(self.middleware._get_cache_key('mytoken'), expected)
def test_assert_valid_memcache_protection_config(self):
# test missing memcache_secret_key
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt'
}
self.assertRaises(Exception, self.set_middleware, conf)
# test invalue memcache_security_strategy
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'whatever'
}
self.assertRaises(Exception, self.set_middleware, conf)
# test missing memcache_secret_key
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mac'
}
self.assertRaises(Exception, self.set_middleware, conf)
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'Encrypt',
'memcache_secret_key': ''
}
self.assertRaises(Exception, self.set_middleware, conf)
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'memcache_servers': 'localhost:11211',
'memcache_security_strategy': 'mAc',
'memcache_secret_key': ''
}
self.assertRaises(Exception, self.set_middleware, conf)
def test_config_revocation_cache_timeout(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'revocation_cache_time': 24
}
middleware = auth_token.AuthProtocol(self.fake_app, conf)
self.assertEquals(middleware.token_revocation_list_cache_timeout,
datetime.timedelta(seconds=24))
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
""" v2 token specific tests.
There are some differences between how the auth-token middleware handles
v2 and v3 tokens over and above the token formats, namely:
- A v3 keystone server will auto scope a token to a user's default project
if no scope is specified. A v2 server assumes that the auth-token
middleware will do that.
- A v2 keystone server may issue a token without a catalog, even with a
tenant
The tests below were originally part of the generic AuthTokenMiddlewareTest
class, but now, since they really are v2 specifc, they are included here.
"""
def assert_unscoped_default_tenant_auto_scopes(self, token):
"""Unscoped v2 requests with a default tenant should "auto-scope."
The implied scope is the user's tenant ID.
"""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual(body, ['SUCCESS'])
self.assertTrue('keystone.token_info' in req.environ)
def test_default_tenant_uuid_token(self):
self.assert_unscoped_default_tenant_auto_scopes(UUID_TOKEN_DEFAULT)
def test_default_tenant_signed_token(self):
self.assert_unscoped_default_tenant_auto_scopes(SIGNED_TOKEN_SCOPED)
def assert_unscoped_token_receives_401(self, token):
"""Unscoped requests with no default tenant ID should be rejected."""
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = token
self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='https://keystone.example.com:1234'")
def test_unscoped_uuid_token_receives_401(self):
self.assert_unscoped_token_receives_401(UUID_TOKEN_UNSCOPED)
def test_unscoped_pki_token_receives_401(self):
self.assert_unscoped_token_receives_401(SIGNED_TOKEN_UNSCOPED)
def test_request_prevent_service_catalog_injection(self):
req = webob.Request.blank('/')
req.headers['X-Service-Catalog'] = '[]'
req.headers['X-Auth-Token'] = UUID_TOKEN_NO_SERVICE_CATALOG
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertFalse(req.headers.get('X-Service-Catalog'))
self.assertEqual(body, ['SUCCESS'])
def test_valid_uuid_request_forced_to_2_0(self):
""" Test forcing auth_token to use lower api version.
By installing the v3 http hander, auth_token will be get
a version list that looks like a v3 server - from which it
would normally chose v3.0 as the auth version. However, here
we specify v2.0 in the configuration - which should force
auth_token to use that version instead.
"""
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR,
'auth_version': 'v2.0'
}
self.set_middleware(fake_http=v3FakeHTTPConnection, conf=conf)
# This tests will only work is auth_token has chosen to use the
# lower, v2, api version
req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = UUID_TOKEN_DEFAULT
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 200)
self.assertEqual("/testadmin/v2.0/tokens/%s" % UUID_TOKEN_DEFAULT,
v3FakeHTTPConnection.last_requested_url)
def test_invalid_auth_version_request(self):
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_admin_prefix': '/testadmin',
'signing_dir': CERTDIR,
'auth_version': 'v1.0' # v1.0 is no longer supported
}
self.assertRaises(Exception, self.set_middleware, conf)
class v3AuthTokenMiddlewareTest(AuthTokenMiddlewareTest):
""" Test auth_token middleware with v3 tokens.
Re-execute the AuthTokenMiddlewareTest class tests, but with the
the auth_token middleware configured to expect v3 tokens back from
a keystone server.
This is done by configuring the AuthTokenMiddlewareTest class via
its Setup(), passing in v3 style data that will then be used by
the tests themselves. This approach has been used to ensure we
really are running the same tests for both v2 and v3 tokens.
There a few additional specific test for v3 only:
- We allow an unscoped token to be validated (as unscoped), where
as for v2 tokens, the auth_token middleware is expected to try and
auto-scope it (and fail if there is no default tenant)
- Domain scoped tokens
Since we don't specify an auth version for auth_token to use, by
definition we are thefore implicitely testing that it will use
the highest available auth version, i.e. v3.0
"""
def setUp(self):
token_dict = {
'uuid_token_default': v3_UUID_TOKEN_DEFAULT,
'uuid_token_unscoped': v3_UUID_TOKEN_UNSCOPED,
'signed_token_scoped': SIGNED_v3_TOKEN_SCOPED,
'revoked_token': REVOKED_v3_TOKEN,
'revoked_token_hash': REVOKED_v3_TOKEN_HASH
}
super(v3AuthTokenMiddlewareTest, self).setUp(
auth_version='v3.0',
fake_app=v3FakeApp,
fake_http=v3FakeHTTPConnection,
token_dict=token_dict)
def assert_valid_last_url(self, token_id):
# Token ID is not part of the url in v3, so override
# this assert test in the base class
self.assertEqual('/testadmin/v3/auth/tokens',
v3FakeHTTPConnection.last_requested_url)
def test_valid_unscoped_uuid_request(self):
# Remove items that won't be in an unscoped token
delta_expected_env = {
'HTTP_X_PROJECT_ID': None,
'HTTP_X_PROJECT_NAME': None,
'HTTP_X_PROJECT_DOMAIN_ID': None,
'HTTP_X_PROJECT_DOMAIN_NAME': None,
'HTTP_X_TENANT_ID': None,
'HTTP_X_TENANT_NAME': None,
'HTTP_X_ROLES': '',
'HTTP_X_TENANT': None,
'HTTP_X_ROLE': '',
}
self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(v3_UUID_TOKEN_UNSCOPED,
with_catalog=False)
self.assertEqual('/testadmin/v3/auth/tokens',
v3FakeHTTPConnection.last_requested_url)
def test_domain_scoped_uuid_request(self):
# Modify items compared to default token for a domain scope
delta_expected_env = {
'HTTP_X_DOMAIN_ID': 'domain_id1',
'HTTP_X_DOMAIN_NAME': 'domain_name1',
'HTTP_X_PROJECT_ID': None,
'HTTP_X_PROJECT_NAME': None,
'HTTP_X_PROJECT_DOMAIN_ID': None,
'HTTP_X_PROJECT_DOMAIN_NAME': None,
'HTTP_X_TENANT_ID': None,
'HTTP_X_TENANT_NAME': None,
'HTTP_X_TENANT': None
}
self.set_middleware(expected_env=delta_expected_env)
self.assert_valid_request_200(v3_UUID_TOKEN_DOMAIN_SCOPED)
self.assertEqual('/testadmin/v3/auth/tokens',
v3FakeHTTPConnection.last_requested_url)
class TokenEncodingTest(testtools.TestCase):
def test_unquoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo bar'))
def test_quoted_token(self):
self.assertEqual('foo%20bar', auth_token.safe_quote('foo%20bar'))