Rework S3Token middleware tests.
- Split between good and bad tests. - Add more tests to get to 100% coverage. Change-Id: Iffd00c2b557e54b122f29f8b0ec7f7ab7a92d16e
This commit is contained in:
parent
557cb9411a
commit
ad7e1c2641
@ -85,15 +85,14 @@ class S3Token(object):
|
||||
|
||||
def _json_request(self, creds_json):
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
|
||||
if self.auth_protocol == 'http':
|
||||
conn = self.http_client_class(self.auth_host, self.auth_port)
|
||||
else:
|
||||
conn = self.http_client_class(self.auth_host,
|
||||
self.auth_port,
|
||||
self.key_file,
|
||||
self.cert_file)
|
||||
try:
|
||||
if self.auth_protocol == 'http':
|
||||
conn = self.http_client_class(self.auth_host, self.auth_port)
|
||||
else:
|
||||
conn = self.http_client_class(self.auth_host,
|
||||
self.auth_port,
|
||||
self.key_file,
|
||||
self.cert_file)
|
||||
conn.request('POST', '/v2.0/s3tokens',
|
||||
body=creds_json,
|
||||
headers=headers)
|
||||
|
@ -26,17 +26,6 @@ from keystone.middleware import s3_token
|
||||
from keystone.openstack.common import jsonutils
|
||||
|
||||
|
||||
def denied_request(code):
|
||||
error_table = {
|
||||
'AccessDenied': (401, 'Access denied'),
|
||||
'InvalidURI': (400, 'Could not parse the specified URI'),
|
||||
}
|
||||
xml = ('<?xml version="1.0" encoding="UTF-8"?>\r\n<Error>\r\n '
|
||||
'<Code>%s</Code>\r\n <Message>%s</Message>\r\n</Error>\r\n' %
|
||||
(code, error_table[code][1]))
|
||||
return xml
|
||||
|
||||
|
||||
def setUpModule(self):
|
||||
self.stubs = stubout.StubOutForTesting()
|
||||
# Stub out swift_utils.get_logger. get_logger tries to configure
|
||||
@ -61,28 +50,6 @@ class FakeHTTPResponse(object):
|
||||
return self.body
|
||||
|
||||
|
||||
class FakeHTTPConnection(object):
|
||||
status = 201
|
||||
|
||||
def __init__(self, *args):
|
||||
pass
|
||||
|
||||
def request(self, method, path, **kwargs):
|
||||
if self.status == 503:
|
||||
raise Exception
|
||||
ret = {'access': {'token': {'id': 'TOKEN_ID',
|
||||
'tenant': {'id': 'TENANT_ID'}}}}
|
||||
body = jsonutils.dumps(ret)
|
||||
status = self.status
|
||||
self.resp = FakeHTTPResponse(status, body)
|
||||
|
||||
def getresponse(self):
|
||||
return self.resp
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class FakeApp(object):
|
||||
"""This represents a WSGI app protected by the auth_token middleware."""
|
||||
def __call__(self, env, start_response):
|
||||
@ -91,65 +58,68 @@ class FakeApp(object):
|
||||
return resp(env, start_response)
|
||||
|
||||
|
||||
class S3TokenMiddlewareTest(unittest.TestCase):
|
||||
def setUp(self, expected_env=None):
|
||||
self.middleware = s3_token.S3Token(FakeApp(), {})
|
||||
self.middleware.http_client_class = FakeHTTPConnection
|
||||
class FakeHTTPConnection(object):
|
||||
def __init__(self, *args):
|
||||
return
|
||||
|
||||
self.response_status = None
|
||||
self.response_headers = None
|
||||
super(S3TokenMiddlewareTest, self).setUp()
|
||||
def getresponse(self):
|
||||
return self.resp
|
||||
|
||||
def _start_fake_response(self, status, headers):
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
def request(self, method, path, **kwargs):
|
||||
pass
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestBase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
super(S3TokenMiddlewareTestBase, self).setUp()
|
||||
|
||||
def start_fake_response(self, status, headers):
|
||||
self.response_status = int(status.split(' ', 1)[0])
|
||||
self.response_headers = dict(headers)
|
||||
|
||||
|
||||
def good_request(cls, method, path, **kwargs):
|
||||
cls.status = 201
|
||||
ret = {'access': {'token':
|
||||
{'id': 'TOKEN_ID',
|
||||
'tenant': {'id': 'TENANT_ID'}}}}
|
||||
body = jsonutils.dumps(ret)
|
||||
cls.resp = FakeHTTPResponse(cls.status, body)
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestGood(S3TokenMiddlewareTestBase):
|
||||
def setup_middleware_fake(self):
|
||||
self.middleware.http_client_class = FakeHTTPConnection
|
||||
self.middleware.http_client_class.request = good_request
|
||||
|
||||
def setUp(self):
|
||||
self.middleware = s3_token.S3Token(FakeApp(), {})
|
||||
self.setup_middleware_fake()
|
||||
super(S3TokenMiddlewareTestGood, self).setUp()
|
||||
|
||||
# Ignore the request and pass to the next middleware in the
|
||||
# pipeline if no path has been specified.
|
||||
def test_no_path_request(self):
|
||||
req = webob.Request.blank('/')
|
||||
self.middleware(req.environ, self._start_fake_response)
|
||||
self.middleware(req.environ, self.start_fake_response)
|
||||
self.assertEqual(self.response_status, 200)
|
||||
|
||||
# Ignore the request and pass to the next middleware in the
|
||||
# pipeline if no Authorization header has been specified
|
||||
def test_without_authorization(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
self.middleware(req.environ, self._start_fake_response)
|
||||
self.middleware(req.environ, self.start_fake_response)
|
||||
self.assertEqual(self.response_status, 200)
|
||||
|
||||
def test_without_auth_storage_token(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'badboy'
|
||||
self.middleware(req.environ, self._start_fake_response)
|
||||
self.middleware(req.environ, self.start_fake_response)
|
||||
self.assertEqual(self.response_status, 200)
|
||||
|
||||
def test_with_bogus_authorization(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'badboy'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
resp = req.get_response(self.middleware)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertEqual(resp.body, denied_request('InvalidURI'))
|
||||
|
||||
def test_bad_token(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
self.middleware.http_client_class.status = 403
|
||||
resp = req.get_response(self.middleware)
|
||||
self.assertEqual(resp.status_int, 401)
|
||||
self.assertEqual(resp.body, denied_request('AccessDenied'))
|
||||
|
||||
def test_fail_to_connect_to_keystone(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
self.middleware.http_client_class.status = 503
|
||||
resp = req.get_response(self.middleware)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertEqual(resp.body, denied_request('InvalidURI'))
|
||||
|
||||
def test_authorized(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
@ -158,6 +128,17 @@ class S3TokenMiddlewareTest(unittest.TestCase):
|
||||
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
|
||||
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
|
||||
|
||||
def test_authorized_http(self):
|
||||
self.middleware = (
|
||||
s3_token.filter_factory({'auth_protocol': 'http'})(FakeApp()))
|
||||
self.setup_middleware_fake()
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
req.get_response(self.middleware)
|
||||
self.assertTrue(req.path.startswith('/v1/AUTH_TENANT_ID'))
|
||||
self.assertEqual(req.headers['X-Auth-Token'], 'TOKEN_ID')
|
||||
|
||||
def test_authorization_nova_toconnect(self):
|
||||
req = webob.Request.blank('/v1/AUTH_swiftint/c/o')
|
||||
req.headers['Authorization'] = 'access:FORCED_TENANT_ID:signature'
|
||||
@ -165,3 +146,68 @@ class S3TokenMiddlewareTest(unittest.TestCase):
|
||||
req.get_response(self.middleware)
|
||||
path = req.environ['PATH_INFO']
|
||||
self.assertTrue(path.startswith('/v1/AUTH_FORCED_TENANT_ID'))
|
||||
|
||||
|
||||
class S3TokenMiddlewareTestBad(S3TokenMiddlewareTestBase):
|
||||
def setUp(self):
|
||||
self.middleware = s3_token.S3Token(FakeApp(), {})
|
||||
self.middleware.http_client_class = FakeHTTPConnection
|
||||
super(S3TokenMiddlewareTestBad, self).setUp()
|
||||
|
||||
def test_unauthorized_token(self):
|
||||
def request(self, method, path, **kwargs):
|
||||
ret = {"error":
|
||||
{"message": "EC2 access key not found.",
|
||||
"code": 401,
|
||||
"title": "Not Authorized"}}
|
||||
body = jsonutils.dumps(ret)
|
||||
self.status = 403
|
||||
self.resp = FakeHTTPResponse(self.status, body)
|
||||
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
self.middleware.http_client_class.request = request
|
||||
resp = req.get_response(self.middleware)
|
||||
s3_denied_req = self.middleware.deny_request('AccessDenied')
|
||||
self.assertEqual(resp.body, s3_denied_req.body)
|
||||
self.assertEqual(resp.status_int, s3_denied_req.status_int)
|
||||
|
||||
def test_bogus_authorization(self):
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'badboy'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
resp = req.get_response(self.middleware)
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
s3_invalid_req = self.middleware.deny_request('InvalidURI')
|
||||
self.assertEqual(resp.body, s3_invalid_req.body)
|
||||
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
|
||||
|
||||
def test_fail_to_connect_to_keystone(self):
|
||||
def request(self, method, path, **kwargs):
|
||||
raise s3_token.ServiceError
|
||||
self.middleware.http_client_class.request = request
|
||||
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
self.middleware.http_client_class.status = 503
|
||||
resp = req.get_response(self.middleware)
|
||||
s3_invalid_req = self.middleware.deny_request('InvalidURI')
|
||||
self.assertEqual(resp.body, s3_invalid_req.body)
|
||||
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
|
||||
|
||||
def test_bad_reply(self):
|
||||
def request(self, method, path, **kwargs):
|
||||
body = "<badreply>"
|
||||
self.status = 201
|
||||
self.resp = FakeHTTPResponse(self.status, body)
|
||||
|
||||
req = webob.Request.blank('/v1/AUTH_cfa/c/o')
|
||||
req.headers['Authorization'] = 'access:signature'
|
||||
req.headers['X-Storage-Token'] = 'token'
|
||||
self.middleware.http_client_class.request = request
|
||||
resp = req.get_response(self.middleware)
|
||||
s3_invalid_req = self.middleware.deny_request('InvalidURI')
|
||||
self.assertEqual(resp.body, s3_invalid_req.body)
|
||||
self.assertEqual(resp.status_int, s3_invalid_req.status_int)
|
||||
|
Loading…
Reference in New Issue
Block a user