Merge "Drop webob from auth_token.py"

This commit is contained in:
Jenkins
2013-08-01 23:37:19 +00:00
committed by Gerrit Code Review
2 changed files with 64 additions and 7 deletions

View File

@@ -153,7 +153,6 @@ import stat
import tempfile import tempfile
import time import time
import urllib import urllib
import webob.exc
from keystoneclient.openstack.common import jsonutils from keystoneclient.openstack.common import jsonutils
from keystoneclient.common import cms from keystoneclient.common import cms
@@ -252,6 +251,19 @@ class ConfigurationError(Exception):
pass pass
class MiniResp(object):
def __init__(self, error_message, env, headers=[]):
# The HEAD method is unique: it must never return a body, even if
# it reports an error (RFC-2616 clause 9.4). We relieve callers
# from varying the error responses depending on the method.
if env['REQUEST_METHOD'] == 'HEAD':
self.body = ['']
else:
self.body = [error_message]
self.headers = list(headers)
self.headers.append(('Content-type', 'text/plain'))
class AuthProtocol(object): class AuthProtocol(object):
"""Auth Middleware that handles authenticating client calls.""" """Auth Middleware that handles authenticating client calls."""
@@ -472,8 +484,9 @@ class AuthProtocol(object):
except ServiceError as e: except ServiceError as e:
self.LOG.critical('Unable to obtain admin token: %s' % e) self.LOG.critical('Unable to obtain admin token: %s' % e)
resp = webob.exc.HTTPServiceUnavailable() resp = MiniResp('Service unavailable', env)
return resp(env, start_response) start_response('503 Service Unavailable', resp.headers)
return resp.body
def _remove_auth_headers(self, env): def _remove_auth_headers(self, env):
"""Remove headers so a user can't fake authentication. """Remove headers so a user can't fake authentication.
@@ -534,8 +547,9 @@ class AuthProtocol(object):
""" """
headers = [('WWW-Authenticate', 'Keystone uri=\'%s\'' % self.auth_uri)] headers = [('WWW-Authenticate', 'Keystone uri=\'%s\'' % self.auth_uri)]
resp = webob.exc.HTTPUnauthorized('Authentication required', headers) resp = MiniResp('Authentication required', env, headers)
return resp(env, start_response) start_response('401 Unauthorized', resp.headers)
return resp.body
def get_admin_token(self): def get_admin_token(self):
"""Return admin token, possibly fetching a new one. """Return admin token, possibly fetching a new one.

View File

@@ -17,6 +17,7 @@
import datetime import datetime
import iso8601 import iso8601
import os import os
import shutil
import string import string
import sys import sys
import tempfile import tempfile
@@ -671,8 +672,7 @@ class BaseAuthTokenMiddlewareTest(testtools.TestCase):
""" """
conf = conf or self.conf conf = conf or self.conf
if 'http_handler' not in conf: if fake_http:
fake_http = fake_http or self.fake_http
conf['http_handler'] = fake_http conf['http_handler'] = fake_http
fake_app = fake_app or self.fake_app fake_app = fake_app or self.fake_app
self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf) self.middleware = auth_token.AuthProtocol(fake_app(expected_env), conf)
@@ -1002,6 +1002,21 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
self.assertIsNotNone(self.middleware.LOG.msg) self.assertIsNotNone(self.middleware.LOG.msg)
self.assertIsNotNone(self.middleware.LOG.debugmsg) self.assertIsNotNone(self.middleware.LOG.debugmsg)
def test_request_no_token_http(self):
req = webob.Request.blank('/', environ={'REQUEST_METHOD': 'HEAD'})
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_protocol': 'http',
'auth_admin_prefix': '/testadmin',
}
self.set_middleware(conf=conf)
body = self.middleware(req.environ, self.start_fake_response)
self.assertEqual(self.response_status, 401)
self.assertEqual(self.response_headers['WWW-Authenticate'],
"Keystone uri='http://keystone.example.com:1234'")
self.assertEqual(body, [''])
def test_request_blank_token(self): def test_request_blank_token(self):
req = webob.Request.blank('/') req = webob.Request.blank('/')
req.headers['X-Auth-Token'] = '' req.headers['X-Auth-Token'] = ''
@@ -1197,6 +1212,34 @@ class AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
datetime.timedelta(seconds=24)) datetime.timedelta(seconds=24))
class CertDownloadMiddlewareTest(BaseAuthTokenMiddlewareTest):
def setUp(self):
super(CertDownloadMiddlewareTest, self).setUp()
self.base_dir = tempfile.mkdtemp()
def tearDown(self):
shutil.rmtree(self.base_dir)
super(CertDownloadMiddlewareTest, self).tearDown()
# Usually we supply a signed_dir with pre-installed certificates,
# so invocation of /usr/bin/openssl succeeds. This time we give it
# an empty directory, so it fails.
def test_request_no_token_dummy(self):
cert_dir = os.path.join(self.base_dir, 'certs')
os.mkdir(cert_dir)
conf = {
'auth_host': 'keystone.example.com',
'auth_port': 1234,
'auth_protocol': 'http',
'auth_admin_prefix': '/testadmin',
'signing_dir': cert_dir,
}
self.set_middleware(fake_http=self.fake_http, conf=conf)
self.assertRaises(cms.subprocess.CalledProcessError,
self.middleware.verify_signed_token,
self.token_dict['signed_token_scoped'])
class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest): class v2AuthTokenMiddlewareTest(BaseAuthTokenMiddlewareTest):
"""v2 token specific tests. """v2 token specific tests.