Adding middleware tests

Middleware tests start a fake echo app and put the auth_token.py
middleware in front of it and then simulate calls. The tests
check the correct response for unauthenticated calls and also
that the right headers are being passed down to the fake app from
the middleware (the fake app echos the headers it receives)

Includes fixes discovered in testing and non-breaking fix
for Bug 878431

Update testing options to support verbosity and selecting
individual tests

Addresses bug 890777

Fixes to middleware tests that were hanging Jenkins
- needed to add support for SSL tests

Change-Id: Iea273196f1782653eccdcf0f2391eacb1434aa8e
This commit is contained in:
Ziad Sawalha 2011-11-11 17:16:41 -06:00
parent 7bb4733450
commit 719d6ed06a
10 changed files with 197 additions and 25 deletions

View File

@ -468,7 +468,9 @@ class ValidateData(object):
user = {
"id": unicode(self.user.id),
"name": unicode(self.user.username)}
"name": unicode(self.user.username),
# TODO(ziad) temporary until we are comfortable clients are updated
"username": unicode(self.user.username)}
if self.user.tenant_id is not None:
user['tenantId'] = unicode(self.user.tenant_id)

View File

@ -129,7 +129,9 @@ class AuthProtocol(object):
# and the OpenSTack service is running remotely
self.service_protocol = conf.get('service_protocol', 'https')
self.service_host = conf.get('service_host')
self.service_port = int(conf.get('service_port'))
service_port = conf.get('service_port')
if service_port:
self.service_port = int(service_port)
self.service_url = '%s://%s:%s' % (self.service_protocol,
self.service_host,
self.service_port)
@ -279,7 +281,7 @@ class AuthProtocol(object):
start_response)
def _validate_claims(self, claims):
"""Validate claims, and provide identity information isf applicable """
"""Validate claims, and provide identity information if applicable """
# Step 1: We need to auth with the keystone service, so get an
# admin token
@ -304,16 +306,15 @@ class AuthProtocol(object):
ssl=(self.auth_protocol == 'https'),
key_file=self.key_file, cert_file=self.cert_file)
resp = conn.getresponse()
# data = resp.read()
conn.close()
if not str(resp.status).startswith('20'):
# Keystone rejected claim
return False
else:
#TODO(Ziad): there is an optimization we can do here. We have just
#received data from Keystone that we can use instead of making
#another call in _expound_claims
#TODO(Ziad): there is an optimization we can do here. We can make
#one call and reuse the data instead of calling again
#in _expound_claims
return True
def _expound_claims(self, claims):

View File

@ -1,9 +1,11 @@
import cgitb
import os
import sys
import subprocess
import tempfile
import time
import unittest2 as unittest
cgitb.enable(format="text")
from functional.common import HttpTestCase
@ -31,20 +33,14 @@ def execute(cmd, raise_error=True):
env['PATH'] = os.path.join(BASE_DIR, 'bin') + ':' + env['PATH']
process = subprocess.Popen(cmd,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env)
result = process.communicate()
(out, err) = result
exitcode = process.returncode
if process.returncode != 0 and raise_error:
msg = "Command %(cmd)s did not succeed. Returned an exit "\
"code of %(exitcode)d."\
"\n\nSTDOUT: %(out)s"\
"\n\nSTDERR: %(err)s" % locals()
"code of %(exitcode)d." % locals()
raise RuntimeError(msg)
return exitcode, out, err
return exitcode, result
class KeystoneTest(object):
@ -90,8 +86,11 @@ class KeystoneTest(object):
# run the keystone server
print "Starting the keystone server..."
self.server = subprocess.Popen(
[os.path.join(BASE_DIR, 'bin/keystone'), '-c', self.conf_fp.name])
params = [os.path.join(BASE_DIR, 'bin/keystone'),
'-c', self.conf_fp.name]
if '--debug' in sys.argv:
params += ['-d']
self.server = subprocess.Popen(params)
# blatant hack.
time.sleep(5)
@ -113,15 +112,32 @@ class KeystoneTest(object):
if '--with-progress' in sys.argv:
loader = unittest.TestLoader()
suite = loader.discover(TEST_DIR, top_level_dir=BASE_DIR)
result = unittest.TextTestRunner(verbosity=1).run(suite)
verbosity = 1
if '--verbose' in sys.argv:
verbosity = 2
result = unittest.TextTestRunner(verbosity=verbosity). \
run(suite)
if not result.wasSuccessful():
raise RuntimeError("%s unresolved issues." %
(len(result.errors) + len(result.failures),))
elif '--with-coverage' in sys.argv:
print "running coverage"
execute('coverage run %s discover -t %s -s %s' %
('/usr/bin/unit2', BASE_DIR, TEST_DIR))
options = ''
if '--verbose' in sys.argv:
options += ' -v'
cmd = 'coverage run %s%s discover -t %s -s %s' % \
('/usr/bin/unit2', options, BASE_DIR, TEST_DIR)
execute(cmd)
else:
execute('unit2 discover -f -t %s -s %s' % (BASE_DIR, TEST_DIR))
options = ''
if '--verbose' in sys.argv:
options += ' -v'
cmd = 'unit2 discover %s -f -t%s -s %s' % \
(options, BASE_DIR, TEST_DIR)
execute(cmd)
finally:
self.tearDown()

View File

@ -8,7 +8,7 @@ extensions= osksadm,oskscatalog
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
'cdn' : 'X-CDN-Manageent-Url'}
'cdn' : 'X-CDN-Management-Url'}
service_host = 0.0.0.0
service_port = 5000
service_ssl = False

View File

@ -8,7 +8,7 @@ extensions= osksadm,oskscatalog
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
'cdn' : 'X-CDN-Manageent-Url'}
'cdn' : 'X-CDN-Management-Url'}
service_host = 0.0.0.0
service_port = 5000
service_ssl = False

View File

@ -8,7 +8,7 @@ extensions= osksadm,oskscatalog
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
'cdn' : 'X-CDN-Manageent-Url'}
'cdn' : 'X-CDN-Management-Url'}
service_host = 0.0.0.0
service_port = 5000
service_ssl = False

View File

@ -7,7 +7,7 @@ backends = keystone.backends.sqlalchemy
service-header-mappings = {
'nova' : 'X-Server-Management-Url',
'swift' : 'X-Storage-Url',
'cdn' : 'X-CDN-Manageent-Url'}
'cdn' : 'X-CDN-Management-Url'}
service_host = 0.0.0.0
service_port = 5000
service_ssl = True

View File

@ -0,0 +1,135 @@
import unittest2 as unittest
from webob import Request, Response
import keystone.common.exception
from keystone.test.functional import common
from keystone.middleware import auth_token
class HeaderApp(object):
"""
Dummy WSGI app the returns HTTP headers in the body
This is useful for making sure the headers we want
aer being passwed down to the downstream WSGI app.
"""
def __init__(self):
pass
def __call__(self, env, start_response):
self.request = Request.blank('', environ=env)
body = ''
for key in env:
if key.startswith('HTTP_'):
body += '%s: %s\n' % (key, env[key])
return Response(status="200 OK",
body=body)(env, start_response)
class BlankApp(object):
"""
Dummy WSGI app - does not do anything
"""
def __init__(self):
pass
def __call__(self, env, start_response):
self.request = Request.blank('', environ=env)
return Response(status="200 OK",
body={})(env, start_response)
class TestMiddleware(common.FunctionalTestCase):
"""
Tests for Keystone WSGI middleware.
"""
def setUp(self):
super(TestMiddleware, self).setUp()
settings = {'delay_auth_decision': '0',
'auth_host': '127.0.0.1',
'auth_port': '35357',
'auth_protocol': 'http',
'auth_uri': 'http://localhost:35357/',
'admin_token': '999888777666'}
cert_file = common.isSsl()
if cert_file:
settings['auth_protocol'] = 'https'
settings['certfile'] = cert_file
settings['auth_uri'] = 'https://localhost:35357/'
self.test_middleware = \
auth_token.filter_factory(settings)(HeaderApp())
password = common.unique_str()
self.tenant = self.create_tenant().json['tenant']
self.user = self.create_user(user_password=password,
tenant_id=self.tenant['id']).json['user']
self.user['password'] = password
self.services = {}
self.endpoint_templates = {}
for x in range(0, 5):
self.services[x] = self.create_service().json['OS-KSADM:service']
self.endpoint_templates[x] = self.create_endpoint_template(
name=self.services[x]['name'], \
type=self.services[x]['type']).\
json['OS-KSCATALOG:endpointTemplate']
self.create_endpoint_for_tenant(self.tenant['id'],
self.endpoint_templates[x]['id'])
r = self.authenticate(self.user['name'], self.user['password'],
self.tenant['id'], assert_status=200)
self.user_token = r.json['access']['token']['id']
def test_401_without_token(self):
resp = Request.blank('/').get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
headers = resp.headers
self.assertTrue("WWW-Authenticate" in headers)
if common.isSsl():
self.assertEquals(headers['WWW-Authenticate'],
"Keystone uri='https://localhost:35357/'")
else:
self.assertEquals(headers['WWW-Authenticate'],
"Keystone uri='http://localhost:35357/'")
def test_401_bad_token(self):
resp = Request.blank('/',
headers={'X-Auth-Token': 'MADE_THIS_UP'}) \
.get_response(self.test_middleware)
self.assertEquals(resp.status_int, 401)
def test_200_good_token(self):
resp = Request.blank('/',
headers={'X-Auth-Token': self.user_token}) \
.get_response(self.test_middleware)
self.assertEquals(resp.status_int, 200)
headers = resp.body.split('\n')
header = "HTTP_X_IDENTITY_STATUS: Confirmed"
self.assertTrue(header in headers)
header = "HTTP_X_USER_ID: %s" % self.user['id']
self.assertTrue(header in headers)
header = "HTTP_X_USER_NAME: %s" % self.user['name']
self.assertTrue(header in headers)
header = "HTTP_X_TENANT_ID: %s" % self.tenant['id']
self.assertTrue(header in headers)
header = "HTTP_X_TENANT_NAME: %s" % self.tenant['name']
self.assertTrue(header in headers)
# These are here for legacy support and should be removed by F
header = "HTTP_X_TENANT: %s" % self.tenant['id']
self.assertTrue(header in headers)
header = "HTTP_X_USER: %s" % self.user['id']
self.assertTrue(header in headers)
if __name__ == '__main__':
unittest.main()

View File

@ -1,4 +1,5 @@
#!/usr/bin/env python
import sys
"""Manages execution of keystone test suites"""
from keystone.test import KeystoneTest
@ -35,6 +36,19 @@ TESTS = [
]
if __name__ == '__main__':
if '-O' in sys.argv:
filter = None
for i in range(len(sys.argv)):
if sys.argv[i] == '-O':
if len(sys.argv) > i + 1:
filter = sys.argv[i + 1]
break
if filter:
TESTS = [t for t in TESTS if filter in str(t)]
if not TESTS:
print 'No tests by the name %s found' % filter
exit()
for test_num, test_cls in enumerate(TESTS):
print 'Starting test %d of %d with config: %s' % \
(test_num + 1, len(TESTS), test_cls.config_name)

View File

@ -4,6 +4,8 @@ function usage {
echo "Usage: $0 [OPTION]..."
echo "Run Keystone's test suite(s)"
echo ""
echo " -O test_name Only run the specified test"
echo " Note: valid options now are SQLTest, LDAPTest, SSLTest, and MemcacheTest"
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
@ -11,6 +13,8 @@ function usage {
echo " Note: cannot be used in combination --with-progress"
echo " --with-progress Runs tests with progress (useful for developers)"
echo " Note: cannot be used in combination --with-coverage"
echo " --verbose Print additional logging"
echo " --debug Enable debug logging in Keystone instances"
echo " -p, --pep8 Just run pep8"
echo " -l, --pylint Just run pylint"
echo " -h, --help Print this usage message"