Additional middleware test coverage

Addresses bug 888142
Adds testing for Glance and Quantum middleware
Abstracts some middleware tests to reuse MiddlewareTestCase class
Minor fixes to quantum_auth_token.py based on issues raised by test

ZNS: rebased, pep8ed and had to update to support SSL communication between
middleware and Keystone which has since been merged.

Change-Id: I5b52b646487aa0fb0aeeff34ec5ee0c0d76099a9
This commit is contained in:
Ziad Sawalha 2011-11-22 21:08:25 -06:00
parent 4b9c2d9e2e
commit f1568369f5
4 changed files with 273 additions and 128 deletions

View File

@ -378,7 +378,7 @@ class AuthProtocol(object):
#TODO(ziad): use a more sophisticated proxy
# we are rewriting the headers now
if resp.status == 401 or resp.status == 305:
if resp.status in (401, 305):
# Add our own headers to the list
headers = [("WWW_AUTHENTICATE",
"Keystone uri='%s'" % self.auth_location)]

View File

@ -97,11 +97,15 @@ class AuthProtocol(object):
# where to find the auth service (we use this to validate tokens)
self.auth_host = conf.get('auth_host')
self.auth_port = int(conf.get('auth_port'))
self.auth_protocol = conf.get('auth_protocol', 'https')
self.auth_protocol = conf.get('auth_protocol', 'http')
self.cert_file = conf.get('certfile', None)
self.key_file = conf.get('keyfile', None)
self.auth_timeout = conf.get('auth_timeout', 30)
self.auth_api_version = conf.get('auth_version', '2.0')
self.auth_location = "%s://%s:%s" % (self.auth_protocol,
self.auth_host,
self.auth_port)
self.auth_uri = conf.get('auth_uri', self.auth_location)
LOG.debug("Authentication Service:%s", self.auth_location)
# Credentials used to verify this component with the Auth service
# since validating tokens is a privileged call
@ -126,8 +130,11 @@ class AuthProtocol(object):
self.auth_api_version = None
self.auth_host = None
self.auth_location = None
self.auth_uri = None
self.auth_port = None
self.auth_protocol = None
self.cert_file = None
self.key_file = None
self.service_host = None
self.service_port = None
self.service_protocol = None
@ -188,8 +195,20 @@ class AuthProtocol(object):
# like tenant and group info
self._decorate_request('X_AUTHORIZATION', "Proxy %s" %
claims['user'])
self._decorate_request('X_TENANT', claims['tenant'])
self._decorate_request('X_USER', claims['user'])
self._decorate_request('X_TENANT_ID',
claims['tenant']['id'],)
self._decorate_request('X_TENANT_NAME',
claims['tenant']['name'])
self._decorate_request('X_USER_ID',
claims['user']['id'])
self._decorate_request('X_USER_NAME',
claims['user']['name'])
self._decorate_request('X_TENANT', claims['tenant']['id'])
self._decorate_request('X_USER', claims['user']['id'])
if 'group' in claims:
self._decorate_request('X_GROUP', claims['group'])
if 'roles' in claims and len(claims['roles']) > 0:
@ -227,8 +246,11 @@ class AuthProtocol(object):
}
}
}
conn = httplib.HTTPConnection("%s:%s" \
% (self.auth_host, self.auth_port))
if self.auth_protocol == "http":
conn = httplib.HTTPConnection(self.auth_host, self.auth_port)
else:
conn = httplib.HTTPSConnection(self.auth_host, self.auth_port,
cert_file=self.cert_file)
conn.request("POST", self._build_token_uri(), json.dumps(params), \
headers=headers)
response = conn.getresponse()
@ -242,7 +264,10 @@ class AuthProtocol(object):
def _reject_request(self):
"""Redirect client to auth server"""
return HTTPUnauthorized()(self.env, self.start_response)
return HTTPUnauthorized("Authentication required",
[("WWW-Authenticate",
"Keystone uri='%s'" % self.auth_uri)])(self.env,
self.start_response)
def _reject_claims(self):
"""Client sent bad claims"""
@ -269,7 +294,10 @@ class AuthProtocol(object):
"Accept": "application/json",
"X-Auth-Token": self.admin_token}
conn = http_connect(self.auth_host, self.auth_port, 'GET',
self._build_token_uri(claims), headers=headers)
self._build_token_uri(claims), headers=headers,
ssl=(self.auth_protocol == 'https'),
key_file=self.key_file, cert_file=self.cert_file,
timeout=self.auth_timeout)
resp = conn.getresponse()
conn.close()
@ -302,7 +330,10 @@ class AuthProtocol(object):
"X-Auth-Token": self.admin_token}
conn = http_connect(self.auth_host, self.auth_port, 'GET',
self._build_token_uri(self.claims),
headers=headers)
headers=headers,
ssl=(self.auth_protocol == 'https'),
key_file=self.key_file, cert_file=self.cert_file,
timeout=self.auth_timeout)
resp = conn.getresponse()
data = resp.read()
conn.close()
@ -317,15 +348,35 @@ class AuthProtocol(object):
role_refs = token_info["access"]["user"]["roles"]
if role_refs != None:
for role_ref in role_refs:
roles.append(role_ref["roleId"])
roles.append(role_ref["Id"])
verified_claims = {'user': token_info['access']['user']['name'],
'tenant': token_info['access']['user']['tenantId'],
'roles': roles}
token_info = json.loads(data)
roles = [role['name'] for role in token_info[
"access"]["user"]["roles"]]
# in diablo, there were two ways to get tenant data
tenant = token_info['access']['token'].get('tenant')
if tenant:
# post diablo
tenant_id = tenant['id']
tenant_name = tenant['name']
else:
# diablo only
tenant_id = token_info['access']['user'].get('tenantId')
tenant_name = token_info['access']['user'].get('tenantName')
verified_claims = {
'user': {
'id': token_info['access']['user']['id'],
'name': token_info['access']['user']['name'],
},
'tenant': {
'id': tenant_id,
'name': tenant_name
},
'roles': roles}
# TODO(Ziad): removed groups for now
# ,'group': '%s/%s' % (first_group['id'],
# first_group['tenantId'])}
return verified_claims
def _decorate_request(self, index, value):

View File

@ -3,6 +3,7 @@ import httplib
import uuid
import json
import os
from webob import Request, Response
from xml.etree import ElementTree
@ -1008,3 +1009,137 @@ class FunctionalTestCase(ApiTestCase):
user_id = optional_str(user_id)
return self.delete_user_credentials_by_type(
user_id, 'passwordCredentials', **kwargs)
class HeaderApp(object):
"""
Dummy WSGI app the returns HTTP headers in the body
This is useful for making sure the headers we want
aer being passwed down to the downstream WSGI app.
"""
def __init__(self):
pass
def __call__(self, env, start_response):
self.request = Request.blank('', environ=env)
body = ''
for key in env:
if key.startswith('HTTP_'):
body += '%s: %s\n' % (key, env[key])
return Response(status="200 OK",
body=body)(env, start_response)
class BlankApp(object):
"""
Dummy WSGI app - does not do anything
"""
def __init__(self):
pass
def __call__(self, env, start_response):
self.request = Request.blank('', environ=env)
return Response(status="200 OK",
body={})(env, start_response)
class MiddlewareTestCase(FunctionalTestCase):
"""
Base class to run tests for Keystone WSGI middleware.
"""
def setUp(self, middleware, settings=None):
super(MiddlewareTestCase, self).setUp()
if settings is None:
settings = {'delay_auth_decision': '0',
'auth_host': '127.0.0.1',
'auth_port': '35357',
'auth_protocol': 'http',
'auth_uri': 'http://localhost:35357/',
'admin_token': '999888777666',
'auth_admin_user': self.admin_username,
'auth_admin_password': self.admin_password}
cert_file = isSsl()
if cert_file:
settings['auth_protocol'] = 'https'
settings['certfile'] = cert_file
settings['auth_uri'] = 'https://localhost:35357/'
if isinstance(middleware, tuple):
self.test_middleware = HeaderApp()
for filter in middleware:
self.test_middleware = \
filter.filter_factory(settings)(self.test_middleware)
else:
self.test_middleware = \
middleware.filter_factory(settings)(HeaderApp())
password = unique_str()
self.tenant = self.create_tenant().json['tenant']
self.user = self.create_user(user_password=password,
tenant_id=self.tenant['id']).json['user']
self.user['password'] = password
self.services = {}
self.endpoint_templates = {}
for x in range(0, 5):
self.services[x] = self.create_service().json['OS-KSADM:service']
self.endpoint_templates[x] = self.create_endpoint_template(
name=self.services[x]['name'], \
type=self.services[x]['type']).\
json['OS-KSCATALOG:endpointTemplate']
self.create_endpoint_for_tenant(self.tenant['id'],
self.endpoint_templates[x]['id'])
r = self.authenticate(self.user['name'], self.user['password'],
self.tenant['id'], assert_status=200)
self.user_token = r.json['access']['token']['id']
def test_401_without_token(self):
resp = Request.blank('/').get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
headers = resp.headers
self.assertTrue("WWW-Authenticate" in headers)
if isSsl():
self.assertEquals(headers['WWW-Authenticate'],
"Keystone uri='https://localhost:35357/'")
else:
self.assertEquals(headers['WWW-Authenticate'],
"Keystone uri='http://localhost:35357/'")
def test_401_bad_token(self):
resp = Request.blank('/',
headers={'X-Auth-Token': 'MADE_THIS_UP'}) \
.get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
def test_200_good_token(self):
resp = Request.blank('/',
headers={'X-Auth-Token': self.user_token}) \
.get_response(self.test_middleware)
self.assertEquals(resp.status_int, 200)
headers = resp.body.split('\n')
header = "HTTP_X_IDENTITY_STATUS: Confirmed"
self.assertTrue(header in headers, "Missing %s" % header)
header = "HTTP_X_USER_ID: %s" % self.user['id']
self.assertTrue(header in headers, "Missing %s" % header)
header = "HTTP_X_USER_NAME: %s" % self.user['name']
self.assertTrue(header in headers, "Missing %s" % header)
header = "HTTP_X_TENANT_ID: %s" % self.tenant['id']
self.assertTrue(header in headers, "Missing %s" % header)
header = "HTTP_X_TENANT_NAME: %s" % self.tenant['name']
self.assertTrue(header in headers, "Missing %s" % header)
# These are here for legacy support and should be removed by F
header = "HTTP_X_TENANT: %s" % self.tenant['id']
self.assertTrue(header in headers, "Missing %s" % header)
header = "HTTP_X_USER: %s" % self.user['id']
self.assertTrue(header in headers, "Missing %s" % header)

View File

@ -1,134 +1,93 @@
import unittest2 as unittest
from webob import Request, Response
import keystone.common.exception
from keystone.test.functional import common
#
# Auth Token
#
from keystone.middleware import auth_token
class HeaderApp(object):
class TestAuthTokenMiddleware(common.MiddlewareTestCase):
"""
Dummy WSGI app the returns HTTP headers in the body
This is useful for making sure the headers we want
aer being passwed down to the downstream WSGI app.
"""
def __init__(self):
pass
def __call__(self, env, start_response):
self.request = Request.blank('', environ=env)
body = ''
for key in env:
if key.startswith('HTTP_'):
body += '%s: %s\n' % (key, env[key])
return Response(status="200 OK",
body=body)(env, start_response)
class BlankApp(object):
"""
Dummy WSGI app - does not do anything
"""
def __init__(self):
pass
def __call__(self, env, start_response):
self.request = Request.blank('', environ=env)
return Response(status="200 OK",
body={})(env, start_response)
class TestMiddleware(common.FunctionalTestCase):
"""
Tests for Keystone WSGI middleware.
Tests for Keystone WSGI middleware: Auth Token
"""
def setUp(self):
super(TestAuthTokenMiddleware, self).setUp(auth_token)
#
# Glance
#
try:
from keystone.middleware import glance_auth_token
except ImportError as e:
print 'Could not load glance_auth_token: %s' % e
@unittest.skipUnless('glance_auth_token' in vars(),
"Glance Auth Token not imported")
class TestGlanceMiddleware(common.MiddlewareTestCase):
"""
Tests for Keystone WSGI middleware: Glance
"""
def setUp(self):
super(TestGlanceMiddleware, self).setUp(
(auth_token, glance_auth_token))
#
# Quantum
#
from keystone.middleware import quantum_auth_token
class TestQuantumMiddleware(common.MiddlewareTestCase):
"""
Tests for Keystone WSGI middleware: Glance
"""
def setUp(self):
super(TestMiddleware, self).setUp()
settings = {'delay_auth_decision': '0',
'auth_host': '127.0.0.1',
'auth_port': '35357',
'auth_protocol': 'http',
'auth_uri': 'http://localhost:35357/',
'admin_token': '999888777666'}
cert_file = common.isSsl()
if cert_file:
settings['auth_protocol'] = 'https'
settings['certfile'] = cert_file
settings['auth_uri'] = 'https://localhost:35357/'
self.test_middleware = \
auth_token.filter_factory(settings)(HeaderApp())
'auth_version': '2.0',
'auth_admin_token': self.admin_token,
'auth_admin_user': self.admin_username,
'auth_admin_password': self.admin_password}
super(TestQuantumMiddleware, self).setUp(quantum_auth_token, settings)
password = common.unique_str()
self.tenant = self.create_tenant().json['tenant']
self.user = self.create_user(user_password=password,
tenant_id=self.tenant['id']).json['user']
self.user['password'] = password
self.services = {}
self.endpoint_templates = {}
for x in range(0, 5):
self.services[x] = self.create_service().json['OS-KSADM:service']
self.endpoint_templates[x] = self.create_endpoint_template(
name=self.services[x]['name'], \
type=self.services[x]['type']).\
json['OS-KSCATALOG:endpointTemplate']
self.create_endpoint_for_tenant(self.tenant['id'],
self.endpoint_templates[x]['id'])
#
# Swift
#
try:
from keystone.middleware import swift_auth
except ImportError as e:
print 'Could not load swift_auth: %s' % e
r = self.authenticate(self.user['name'], self.user['password'],
self.tenant['id'], assert_status=200)
self.user_token = r.json['access']['token']['id']
def test_401_without_token(self):
resp = Request.blank('/').get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
headers = resp.headers
self.assertTrue("WWW-Authenticate" in headers)
if common.isSsl():
self.assertEquals(headers['WWW-Authenticate'],
"Keystone uri='https://localhost:35357/'")
else:
self.assertEquals(headers['WWW-Authenticate'],
"Keystone uri='http://localhost:35357/'")
def test_401_bad_token(self):
resp = Request.blank('/',
headers={'X-Auth-Token': 'MADE_THIS_UP'}) \
.get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
def test_200_good_token(self):
resp = Request.blank('/',
headers={'X-Auth-Token': self.user_token}) \
.get_response(self.test_middleware)
self.assertEquals(resp.status_int, 200)
headers = resp.body.split('\n')
header = "HTTP_X_IDENTITY_STATUS: Confirmed"
self.assertTrue(header in headers)
header = "HTTP_X_USER_ID: %s" % self.user['id']
self.assertTrue(header in headers)
header = "HTTP_X_USER_NAME: %s" % self.user['name']
self.assertTrue(header in headers)
header = "HTTP_X_TENANT_ID: %s" % self.tenant['id']
self.assertTrue(header in headers)
header = "HTTP_X_TENANT_NAME: %s" % self.tenant['name']
self.assertTrue(header in headers)
# These are here for legacy support and should be removed by F
header = "HTTP_X_TENANT: %s" % self.tenant['id']
self.assertTrue(header in headers)
header = "HTTP_X_USER: %s" % self.user['id']
self.assertTrue(header in headers)
#TODO(Ziad): find out how to disable swift logging
#@unittest.skipUnless('swift_auth' in vars(),
# "swift_auth not imported")
#class TestSwiftMiddleware(common.MiddlewareTestCase):
# """
# Tests for Keystone WSGI middleware: Glance
# """
#
# def setUp(self):
# settings = {'delay_auth_decision': '0',
# 'auth_host': '127.0.0.1',
# 'auth_port': '35357',
# 'auth_protocol': 'http',
# 'auth_uri': 'http://localhost:35357/',
# 'admin_token': '999888777666',
# 'set log_facility': 'LOG_NULL'}
# super(TestSwiftMiddleware, self).setUp(swift_auth, settings)
if __name__ == '__main__':