Updated testing

- Leveraging other OpenStack code (mainly Nova)
- Enabled duration and colorized reporting
- Support for python setup.py test
- Updated testing to run with in-memory database
- load_fixture called from test (no longer in core code)

Change-Id: Ic602f7fb9be38f0aceed521c41e83a71bbc20e2f
This commit is contained in:
Ziad Sawalha 2011-11-28 08:15:32 -06:00
parent d410637b90
commit 1778dcf006
17 changed files with 682 additions and 177 deletions

View File

@ -184,7 +184,7 @@ class FakeLDAP(object):
if key in self.db:
LOG.error(
"FakeLDAP add item failed: dn '%s' is already in store." % dn)
raise ldap.ALREADY_EXISTS
raise ldap.ALREADY_EXISTS(dn)
self.db[key] = dict([(k, v if isinstance(v, list) else [v])
for k, v in attrs])
self.db.sync()

View File

@ -1,7 +1,6 @@
from collections import Mapping
__all__ = ['UserRoleAssociation', 'Endpoints', 'Role', 'Tenant', 'User',
'Credentials']
__all__ = ['UserRoleAssociation', 'Role', 'Tenant', 'User']
def create_model(name, attrs):
@ -41,13 +40,13 @@ def create_model(name, attrs):
UserRoleAssociation = create_model(
'UserRoleAssociation', ['id', 'user_id', 'role_id', 'tenant_id'])
Endpoints = create_model(
'Endpoints', ['tenant_id', 'endpoint_template_id'])
Role = create_model(
'Role', ['id', 'desc', 'service_id'])
Tenant = create_model(
'Tenant', ['id', 'name', 'desc', 'enabled'])
User = create_model(
'User', ['id', 'name', 'password', 'email', 'enabled', 'tenant_id'])
Credentials = create_model(
'Credentials', ['user_id', 'type', 'key', 'secret'])
#Endpoints = create_model(
# 'Endpoints', ['id', 'tenant_id', 'endpoint_template_id'])
#Credentials = create_model(
# 'Credentials', ['id', 'user_id', 'type', 'key', 'secret'])

View File

@ -55,3 +55,5 @@ def set_value(variable_name, value):
elif variable_name == 'Service':
global Service
Service = value
else:
raise IndexError("Unrecognized model type: %s" % variable_name)

View File

@ -17,6 +17,7 @@
import ast
import logging
import sys
from sqlalchemy import create_engine
from sqlalchemy.orm import joinedload, aliased, sessionmaker
@ -33,7 +34,6 @@ BASE = models.Base
MODEL_PREFIX = 'keystone.backends.sqlalchemy.models.'
API_PREFIX = 'keystone.backends.sqlalchemy.api.'
FOR_TESTING_ONLY = 'for_testing_only'
def configure_backend(options):
@ -52,10 +52,10 @@ def configure_backend(options):
timeout = config.get_option(
options, 'sql_idle_timeout', type='int', default=3600)
if options['sql_connection'] == FOR_TESTING_ONLY:
_ENGINE = create_engine('sqlite://',
connect_args={'check_same_thread': False},
poolclass=StaticPool)
if options['sql_connection'] == "sqlite://":
_ENGINE = create_engine(options['sql_connection'],
connect_args={'check_same_thread': False},
poolclass=StaticPool)
else:
_ENGINE = create_engine(options['sql_connection'],
pool_recycle=timeout)
@ -68,12 +68,6 @@ def configure_backend(options):
register_models(options)
# this is TERRIBLE coupling, but...
# if we're starting up a test database, load sample fixtures
if options['sql_connection'] == FOR_TESTING_ONLY:
from keystone.test import sampledata
sampledata.load_fixture()
def get_session(autocommit=True, expire_on_commit=False):
"""Helper method to grab session"""
@ -100,7 +94,7 @@ def register_models(options):
model = utils.import_module(MODEL_PREFIX + supported_alchemy_model)
supported_alchemy_tables.append(model.__table__)
top_models.set_value(supported_alchemy_model, model)
if model.__api__ != None:
if model.__api__ is not None:
model_api = utils.import_module(API_PREFIX + model.__api__)
top_api.set_value(model.__api__, model_api.get())
creation_tables = []
@ -111,7 +105,9 @@ def register_models(options):
def unregister_models():
"""Unregister Models, useful clearing out data before testing"""
"""Unregister Models and reset _ENGINE,
useful clearing out data before testing"""
global _ENGINE
assert _ENGINE
BASE.metadata.drop_all(_ENGINE)
if _ENGINE:
BASE.metadata.drop_all(_ENGINE)
_ENGINE = None

View File

@ -90,11 +90,16 @@ class Server(object):
def __init__(self, threads=1000):
self.pool = eventlet.GreenPool(threads)
self.socket_info = {}
self.threads = {}
def start(self, application, port, host='0.0.0.0', backlog=128):
def start(self, application, port, host='0.0.0.0', key=None, backlog=128):
"""Run a WSGI server with the given application."""
socket = eventlet.listen((host, port), backlog=backlog)
self.pool.spawn_n(self._run, application, socket)
thread = self.pool.spawn(self._run, application, socket)
if key:
self.socket_info[key] = socket
self.threads[key] = thread
def wait(self):
"""Wait until all servers have completed running."""
@ -116,7 +121,7 @@ class SslServer(Server):
"""SSL Server class to manage multiple WSGI sockets and applications."""
def start(self, application, port, host='0.0.0.0', backlog=128,
certfile=None, keyfile=None, ca_certs=None,
cert_required='True'):
cert_required='True', key=None):
"""Run a 2-way SSL WSGI server with the given application."""
socket = eventlet.listen((host, port), backlog=backlog)
if cert_required == 'True':
@ -127,7 +132,10 @@ class SslServer(Server):
keyfile=keyfile,
server_side=True, cert_reqs=cert_reqs,
ca_certs=ca_certs)
self.pool.spawn_n(self._run, application, sslsocket)
thread = self.pool.spawn(self._run, application, sslsocket)
if key:
self.socket_info[key] = sslsocket
self.threads[key] = thread
class Middleware(object):

View File

@ -1,46 +1,339 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# Colorizer Code is borrowed from Twisted:
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
#
# Permission is hereby granted, free of charge, to any person obtaining
# a copy of this software and associated documentation files (the
# "Software"), to deal in the Software without restriction, including
# without limitation the rights to use, copy, modify, merge, publish,
# distribute, sublicense, and/or sell copies of the Software, and to
# permit persons to whom the Software is furnished to do so, subject to
# the following conditions:
#
# The above copyright notice and this permission notice shall be
# included in all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
#
# Code copied from Nova and other OpenStack projects:
# Colorizers
# Classes starting with Nova
# other setup and initialization code
#
""" Module that handles starting the Keystone server and running
test suites"""
import cgitb
import gettext
import heapq
import logging
from nose import config as noseconfig
from nose import core
from nose import result
import optparse
import os
import sys
import subprocess
import tempfile
import time
import unittest2 as unittest
import unittest
cgitb.enable(format="text")
from functional.common import HttpTestCase
import keystone
from keystone.common import config, wsgi
from keystone import backends
TEST_DIR = os.path.abspath(os.path.dirname(__file__))
BASE_DIR = os.path.abspath(os.path.join(TEST_DIR, '..', '..'))
TEST_CERT = os.path.join(BASE_DIR, 'examples/ssl/certs/middleware-key.pem')
def execute(cmd, raise_error=True):
class _AnsiColorizer(object):
"""
Executes a command in a subprocess. Returns a tuple
of (exitcode, out, err), where out is the string output
from stdout and err is the string output from stderr when
executing the command.
A colorizer is an object that loosely wraps around a stream, allowing
callers to write text to the stream in a particular color.
:param cmd: Command string to execute
:param raise_error: If returncode is not 0 (success), then
raise a RuntimeError? Default: True)
Colorizer classes must implement C{supported()} and C{write(text, color)}.
"""
_colors = dict(black=30, red=31, green=32, yellow=33,
blue=34, magenta=35, cyan=36, white=37)
env = os.environ.copy()
# Make sure that we use the programs in the
# current source directory's bin/ directory.
env['PATH'] = os.path.join(BASE_DIR, 'bin') + ':' + env['PATH']
process = subprocess.Popen(cmd,
shell=True,
env=env)
result = process.communicate()
exitcode = process.returncode
if process.returncode != 0 and raise_error:
msg = "Command %(cmd)s did not succeed. Returned an exit "\
"code of %(exitcode)d." % locals()
raise RuntimeError(msg)
return exitcode, result
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
"""
A class method that returns True if the current platform supports
coloring terminal output using this method. Returns False otherwise.
"""
if not stream.isatty():
return False # auto color only on TTYs
try:
import curses
except ImportError:
return False
else:
try:
try:
return curses.tigetnum("colors") > 2
except curses.error:
curses.setupterm()
return curses.tigetnum("colors") > 2
except:
raise
# guess false in case of error
return False
supported = classmethod(supported)
def write(self, text, color):
"""
Write the given text to the stream in the given color.
@param text: Text to be written to the stream.
@param color: A string label for a color. e.g. 'red', 'white'.
"""
color = self._colors[color]
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
class _Win32Colorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
from win32console import GetStdHandle, STD_OUT_HANDLE, \
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
FOREGROUND_INTENSITY
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
self.stream = stream
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
self._colors = {
'normal': red | green | blue,
'red': red | bold,
'green': green | bold,
'blue': blue | bold,
'yellow': red | green | bold,
'magenta': red | blue | bold,
'cyan': green | blue | bold,
'white': red | green | blue | bold
}
def supported(cls, stream=sys.stdout):
try:
import win32console
screenBuffer = win32console.GetStdHandle(
win32console.STD_OUT_HANDLE)
except ImportError:
return False
import pywintypes
try:
screenBuffer.SetConsoleTextAttribute(
win32console.FOREGROUND_RED |
win32console.FOREGROUND_GREEN |
win32console.FOREGROUND_BLUE)
except pywintypes.error:
return False
else:
return True
supported = classmethod(supported)
def write(self, text, color):
color = self._colors[color]
self.screenBuffer.SetConsoleTextAttribute(color)
self.stream.write(text)
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
class _NullColorizer(object):
"""
See _AnsiColorizer docstring.
"""
def __init__(self, stream):
self.stream = stream
def supported(cls, stream=sys.stdout):
return True
supported = classmethod(supported)
def write(self, text, color):
self.stream.write(text)
def get_elapsed_time_color(elapsed_time):
if elapsed_time > 1.0:
return 'red'
elif elapsed_time > 0.25:
return 'yellow'
else:
return 'green'
class NovaTestResult(result.TextTestResult):
def __init__(self, *args, **kw):
self.show_elapsed = kw.pop('show_elapsed')
result.TextTestResult.__init__(self, *args, **kw)
self.num_slow_tests = 5
self.slow_tests = [] # this is a fixed-sized heap
self._last_case = None
self.colorizer = None
# NOTE(vish): reset stdout for the terminal check
stdout = sys.stdout
sys.stdout = sys.__stdout__
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
if colorizer.supported():
self.colorizer = colorizer(self.stream)
break
sys.stdout = stdout
# NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
# error results in it failing to be initialized later. Otherwise,
# _handleElapsedTime will fail, causing the wrong error message to
# be outputted.
self.start_time = time.time()
def getDescription(self, test):
return str(test)
def _handleElapsedTime(self, test):
self.elapsed_time = time.time() - self.start_time
item = (self.elapsed_time, test)
# Record only the n-slowest tests using heap
if len(self.slow_tests) >= self.num_slow_tests:
heapq.heappushpop(self.slow_tests, item)
else:
heapq.heappush(self.slow_tests, item)
def _writeElapsedTime(self, test):
color = get_elapsed_time_color(self.elapsed_time)
self.colorizer.write(" %.2f" % self.elapsed_time, color)
def _writeResult(self, test, long_result, color, short_result, success):
if self.showAll:
self.colorizer.write(long_result, color)
if self.show_elapsed and success:
self._writeElapsedTime(test)
self.stream.writeln()
elif self.dots:
self.stream.write(short_result)
self.stream.flush()
# NOTE(vish): copied from unittest with edit to add color
def addSuccess(self, test):
unittest.TestResult.addSuccess(self, test)
self._handleElapsedTime(test)
self._writeResult(test, 'OK', 'green', '.', True)
# NOTE(vish): copied from unittest with edit to add color
def addFailure(self, test, err):
unittest.TestResult.addFailure(self, test, err)
self._handleElapsedTime(test)
self._writeResult(test, 'FAIL', 'red', 'F', False)
# NOTE(vish): copied from nose with edit to add color
def addError(self, test, err):
"""Overrides normal addError to add support for
errorClasses. If the exception is a registered class, the
error will be added to the list for that class, not errors.
"""
self._handleElapsedTime(test)
stream = getattr(self, 'stream', None)
ec, ev, tb = err
try:
exc_info = self._exc_info_to_string(err, test)
except TypeError:
# 2.3 compat
exc_info = self._exc_info_to_string(err)
for cls, (storage, label, isfail) in self.errorClasses.items():
if result.isclass(ec) and issubclass(ec, cls):
if isfail:
test.passed = False
storage.append((test, exc_info))
# Might get patched into a streamless result
if stream is not None:
if self.showAll:
message = [label]
detail = result._exception_detail(err[1])
if detail:
message.append(detail)
stream.writeln(": ".join(message))
elif self.dots:
stream.write(label[:1])
return
self.errors.append((test, exc_info))
test.passed = False
if stream is not None:
self._writeResult(test, 'ERROR', 'red', 'E', False)
def startTest(self, test):
unittest.TestResult.startTest(self, test)
self.start_time = time.time()
current_case = test.test.__class__.__name__
if self.showAll:
if current_case != self._last_case:
self.stream.writeln(current_case)
self._last_case = current_case
self.stream.write(
' %s' % str(test.test._testMethodName).ljust(60))
self.stream.flush()
class NovaTestRunner(core.TextTestRunner):
def __init__(self, *args, **kwargs):
self.show_elapsed = kwargs.pop('show_elapsed')
core.TextTestRunner.__init__(self, *args, **kwargs)
def _makeResult(self):
return NovaTestResult(self.stream,
self.descriptions,
self.verbosity,
self.config,
show_elapsed=self.show_elapsed)
def _writeSlowTests(self, result_):
# Pare out 'fast' tests
slow_tests = [item for item in result_.slow_tests
if get_elapsed_time_color(item[0]) != 'green']
if slow_tests:
slow_total_time = sum(item[0] for item in slow_tests)
self.stream.writeln("Slowest %i tests took %.2f secs:"
% (len(slow_tests), slow_total_time))
for elapsed_time, test in sorted(slow_tests, reverse=True):
time_str = "%.2f" % elapsed_time
self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
def run(self, test):
result_ = core.TextTestRunner.run(self, test)
if self.show_elapsed:
self._writeSlowTests(result_)
return result_
class KeystoneTest(object):
@ -62,10 +355,13 @@ class KeystoneTest(object):
def clear_database(self):
"""Remove any test databases or files generated by previous tests."""
for fname in self.test_files:
fpath = os.path.join(TEST_DIR, fname)
if os.path.exists(fpath):
print "Removing test file %s" % fname
os.unlink(fpath)
paths = [os.path.join(os.curdir, fname),
os.path.join(os.getcwd(), fname),
os.path.join(TEST_DIR, fname)]
for fpath in paths:
if os.path.exists(fpath):
print "Removing test file %s" % fname
os.unlink(fpath)
def construct_temp_conf_file(self):
"""Populates a configuration template, and writes to a file pointer."""
@ -77,6 +373,8 @@ class KeystoneTest(object):
self.conf_fp.flush()
def setUp(self):
self.server = None
self.admin_server = None
self.clear_database()
self.construct_temp_conf_file()
@ -86,58 +384,183 @@ class KeystoneTest(object):
# run the keystone server
print "Starting the keystone server..."
params = [os.path.join(BASE_DIR, 'bin/keystone'),
'-c', self.conf_fp.name]
if '--debug' in sys.argv:
params += ['-d']
self.server = subprocess.Popen(params)
# blatant hack.
time.sleep(5)
if self.server.poll() is not None:
raise RuntimeError('Failed to start server')
parser = optparse.OptionParser(version='%%prog %s' % keystone.version)
common_group = config.add_common_options(parser)
config.add_log_options(parser)
# Handle a special argument to support starting two endpoints
common_group.add_option(
'-a', '--admin-port', dest="admin_port", metavar="PORT",
help="specifies port for Admin API to listen " \
"on (default is 35357)")
# Parse arguments and load config
(options, args) = config.parse_options(parser)
options['config_file'] = self.conf_fp.name
# Start services
try:
# Load Service API server
conf, app = config.load_paste_app(
'keystone-legacy-auth', options, args)
admin_conf, admin_app = config.load_paste_app(
'admin', options, args)
port = int(options['bind_port'] or conf['service_port'] or 5000)
host = options['bind_host'] or conf['service_host']
if (self.isSsl == True):
server = wsgi.SslServer()
server.start(app, port, host,
certfile=conf['certfile'],
keyfile=conf['keyfile'],
ca_certs=conf['ca_certs'],
cert_required=conf['cert_required'])
# Load Admin API server
port = int(options['admin_port'] or conf['admin_port']
or 35357)
host = options['bind_host'] or conf['admin_host']
admin_server = wsgi.SslServer()
admin_server.start(admin_app,
port, host,
certfile=conf['certfile'],
keyfile=conf['keyfile'],
ca_certs=conf['ca_certs'],
cert_required=conf['cert_required'])
else:
server = wsgi.Server()
server.start(app, port, host, key="Test")
print "Service API (ssl=%s) listening on %s:%s" % (
conf['service_ssl'], host, port)
# Load Admin API server
port = int(options['admin_port'] or conf['admin_port']
or 35357)
host = options['bind_host'] or conf['admin_host']
admin_server = wsgi.Server()
admin_server.start(admin_app, port, host, key="Test")
print "Admin API (ssl=%s) listening on %s:%s" % (
conf['admin_ssl'], host, port)
except RuntimeError, e:
print e
sys.exit("ERROR: %s" % e)
self.server = server
self.admin_server = admin_server
# Load sample data
from keystone.test import sampledata
manage_args = ['--config-file', self.conf_fp.name]
sampledata.load_fixture(args=manage_args)
def tearDown(self):
# kill the keystone server
print "Stopping the keystone server..."
self.server.kill()
self.clear_database()
try:
if self.server is not None:
if 'Test' in self.server.threads:
self.server.threads['Test'].kill()
self.server = None
if self.admin_server is not None:
if 'Test' in self.admin_server.threads:
self.admin_server.threads['Test'].kill()
self.admin_server = None
self.conf_fp.close()
self.conf_fp = None
except Exception as e:
print "Error cleaning up %s" % e
finally:
self.clear_database()
def run(self):
try:
self.setUp()
# discover and run tests
print "Running tests..."
if '--with-progress' in sys.argv:
loader = unittest.TestLoader()
suite = loader.discover(TEST_DIR, top_level_dir=BASE_DIR)
verbosity = 1
if '--verbose' in sys.argv:
verbosity = 2
result = unittest.TextTestRunner(verbosity=verbosity). \
run(suite)
if not result.wasSuccessful():
raise RuntimeError("%s unresolved issues." %
(len(result.errors) + len(result.failures),))
elif '--with-coverage' in sys.argv:
print "running coverage"
options = ''
if '--verbose' in sys.argv:
options += ' -v'
verbosity = 1
if '--verbose' in sys.argv:
verbosity = 2
cmd = 'coverage run %s%s discover -t %s -s %s' % \
('/usr/bin/unit2', options, BASE_DIR, TEST_DIR)
# If any argument looks like a test name but doesn't have
# "nova.tests" in front of it, automatically add that so we don't
# have to type as much
show_elapsed = True
argv = []
for x in sys.argv:
if x.startswith('functional') or x.startswith('unit'):
argv.append('keystone.test.%s' % x)
elif x.startswith('--hide-elapsed'):
show_elapsed = False
elif x.startswith('--trace-calls'):
pass
elif x.startswith('--debug'):
pass
else:
argv.append(x)
execute(cmd)
c = noseconfig.Config(stream=sys.stdout,
env=os.environ,
verbosity=3,
workingDir=TEST_DIR,
plugins=core.DefaultPluginManager())
else:
options = ''
if '--verbose' in sys.argv:
options += ' -v'
runner = NovaTestRunner(stream=c.stream,
verbosity=c.verbosity,
config=c,
show_elapsed=show_elapsed)
cmd = 'unit2 discover%s -f -t %s -s %s' % \
(options, BASE_DIR, TEST_DIR)
execute(cmd)
return not core.run(config=c, testRunner=runner,
argv=argv + ['-P'])
except Exception as e:
print 'Error %s' % e
finally:
self.tearDown()
def runtests():
"""This function can be called from 'python setup.py test'."""
return SQLTest().run()
class SQLTest(KeystoneTest):
"""Test defined using only SQLAlchemy back-end"""
config_name = 'sql.conf.template'
test_files = ('keystone.sqltest.db',)
def clear_database(self):
# Disconnect the database before deleting
from keystone.backends import sqlalchemy
sqlalchemy.unregister_models()
super(SQLTest, self).clear_database()
class SSLTest(SQLTest):
config_name = 'ssl.conf.template'
isSsl = True
test_files = ('keystone.ssltest.db',)
class MemcacheTest(SQLTest):
"""Test defined using only SQLAlchemy and Memcache back-end"""
config_name = 'memcache.conf.template'
test_files = ('keystone.memcachetest.db',)
class LDAPTest(SQLTest):
"""Test defined using only SQLAlchemy and LDAP back-end"""
config_name = 'ldap.conf.template'
test_files = ('keystone.ldaptest.db', 'ldap.db', 'ldap.db.db',)
def clear_database(self):
super(LDAPTest, self).clear_database()
from keystone.backends.ldap.fakeldap import FakeShelve
db = FakeShelve().get_instance()
db.clear()

View File

@ -21,9 +21,9 @@ keystone-service-admin-role = KeystoneServiceAdmin
hash-password = True
[keystone.backends.sqlalchemy]
sql_connection = for_testing_only
sql_connection = sqlite://
sql_idle_timeout = 30
backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Token', 'Service']
backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Token', 'Service']
[keystone.backends.ldap]
ldap_url = fake://memory

View File

@ -20,7 +20,7 @@ keystone-admin-role = Admin
keystone-service-admin-role = KeystoneServiceAdmin
[keystone.backends.sqlalchemy]
sql_connection = for_testing_only
sql_connection = sqlite://
sql_idle_timeout = 30
backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Tenant', 'User', 'UserRoleAssociation', 'Role', 'Service']

View File

@ -21,7 +21,7 @@ keystone-service-admin-role = KeystoneServiceAdmin
hash-password = True
[keystone.backends.sqlalchemy]
sql_connection = for_testing_only
sql_connection = sqlite://
sql_idle_timeout = 30
backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Tenant', 'User', 'UserRoleAssociation', 'Role', 'Token', 'Service']

View File

@ -24,7 +24,7 @@ ca_certs = %(base_dir)s/examples/ssl/certs/ca.pem
cert_required = True
[keystone.backends.sqlalchemy]
sql_connection = for_testing_only
sql_connection = sqlite://
sql_idle_timeout = 30
backend_entities = ['Endpoints', 'Credentials', 'EndpointTemplates', 'Tenant', 'User', 'UserRoleAssociation', 'Role', 'Token', 'Service']

View File

@ -31,9 +31,8 @@ class HttpTestCase(unittest.TestCase):
# Initialize headers dictionary
headers = {} if not headers else headers
# Initialize a connection
cert_file = isSsl()
if (cert_file != None):
if (cert_file is not None):
connection = httplib.HTTPSConnection(host, port,
cert_file=cert_file,
timeout=20)
@ -370,7 +369,7 @@ class ApiTestCase(RestfulTestCase):
path='/tenants/%s/users/%s/roles/OS-KSADM/%s' % (tenant_id,
user_id, role_id,), **kwargs)
def delete_user_role(self, user_id, role_id, tenant_id, **kwargs):
def delete_user_role(self, user_id, role_id, tenant_id, **kwargs):
"""DELETE /users/{user_id}/roles/{role_id}"""
if tenant_id is None:
return self.admin_request(method='DELETE',

View File

@ -13,7 +13,7 @@ class TestGetCredentials(common.FunctionalTestCase):
def test_get_user_credentials(self):
password_credentials = self.fetch_user_credentials(
self.user['id']).json['credentials'][0]['passwordCredentials']
self.assertEquals(password_credentials['username'], self.user['name'])
self.assertEquals(password_credentials['username'], self.user['name'])
def test_get_user_credentials_xml(self):
r = self.fetch_user_credentials(self.user['id'],
@ -57,7 +57,7 @@ class TestGetPasswordCredentials(common.FunctionalTestCase):
def test_get_user_credentials(self):
password_credentials = self.fetch_password_credentials(
self.user['id']).json['passwordCredentials']
self.assertEquals(password_credentials['username'], self.user['name'])
self.assertEquals(password_credentials['username'], self.user['name'])
def test_get_user_credentials_xml(self):
r = self.fetch_password_credentials(self.user['id'],

View File

@ -4,8 +4,10 @@ from keystone.test.functional import common
class TestStaticFiles(common.ApiTestCase):
def test_pdf_contract(self):
r = self.service_request(path='/identitydevguide.pdf')
self.assertTrue('pdf' in r.getheader('Content-Type'))
if not common.isSsl():
#TODO(ziad): Caller hangs in SSL (but works with cURL)
r = self.service_request(path='/identitydevguide.pdf')
self.assertTrue('pdf' in r.getheader('Content-Type'))
def test_wadl_contract(self):
r = self.service_request(path='/identity.wadl')
@ -46,8 +48,10 @@ class TestStaticFiles(common.ApiTestCase):
class TestAdminStaticFiles(common.FunctionalTestCase):
def test_pdf_contract(self):
r = self.admin_request(path='/identityadminguide.pdf')
self.assertTrue('pdf' in r.getheader('Content-Type'))
if not common.isSsl():
#TODO(ziad): Caller hangs in SSL (but works with cURL)
r = self.admin_request(path='/identityadminguide.pdf')
self.assertTrue('pdf' in r.getheader('Content-Type'))
def test_wadl_contract(self):
r = self.admin_request(path='/identity-admin.wadl')

View File

@ -1,42 +1,34 @@
#!/usr/bin/env python
"""
To run all tests
python run_tests.py
To run a single test:
python run_tests.py
functional.test_extensions:TestExtensions.test_extensions_json
To run a single test module:
python run_tests.py functional.test_extensions
"""
import sys
import subprocess
"""Manages execution of keystone test suites"""
from keystone.test import KeystoneTest
import keystone.tools.tracer # @UnusedImport # module runs on import
from keystone import test
class SQLTest(KeystoneTest):
"""Test defined using only SQLAlchemy back-end"""
config_name = 'sql.conf.template'
test_files = ('keystone.db',)
class SSLTest(KeystoneTest):
config_name = 'ssl.conf.template'
test_files = ('keystone.db',)
isSsl = True
class MemcacheTest(KeystoneTest):
"""Test defined using only SQLAlchemy and Memcache back-end"""
config_name = 'memcache.conf.template'
test_files = ('keystone.db',)
class LDAPTest(KeystoneTest):
"""Test defined using only SQLAlchemy and LDAP back-end"""
config_name = 'ldap.conf.template'
test_files = ('keystone.db', 'ldap.db', 'ldap.db.db',)
TESTS = [
SQLTest,
test.SQLTest,
test.LDAPTest,
# Waiting on instructions on how to start memcached in jenkins:
# But tests pass
# MemcacheTest,
LDAPTest,
SSLTest,
test.SSLTest,
]
if __name__ == '__main__':
if '-O' in sys.argv:
filter = None
@ -44,6 +36,11 @@ if __name__ == '__main__':
if sys.argv[i] == '-O':
if len(sys.argv) > i + 1:
filter = sys.argv[i + 1]
# Remove -O settings from sys.argv
argv = sys.argv[0:i]
if len(sys.argv) > i:
argv += sys.argv[i + 2:]
sys.argv = argv[:]
break
if filter:
TESTS = [t for t in TESTS if filter in str(t)]
@ -51,7 +48,19 @@ if __name__ == '__main__':
print 'No tests by the name %s found' % filter
exit()
for test_num, test_cls in enumerate(TESTS):
print 'Starting test %d of %d with config: %s' % \
(test_num + 1, len(TESTS), test_cls.config_name)
test_cls().run()
if len(TESTS) > 1:
# We have a problem with resetting SQLAlchemy, so we need to fire
# off a separate process for each test now
for test_num, test_cls in enumerate(TESTS):
params = ["python", __file__, '-O',
str(test_cls.__name__)] + sys.argv[1:]
p = subprocess.Popen(params)
result = p.wait()
if result:
sys.exit(result)
else:
for test_num, test_cls in enumerate(TESTS):
print 'Starting test %d of %d with config: %s' % \
(test_num + 1, len(TESTS), test_cls.config_name)
test_cls().run()

View File

@ -1,5 +1,7 @@
#!/bin/bash
set -eu
function usage {
echo "Usage: $0 [OPTION]..."
echo "Run Keystone's test suite(s)"
@ -8,16 +10,16 @@ function usage {
echo " Note: valid options now are SQLTest, LDAPTest, SSLTest, and MemcacheTest"
echo " -V, --virtual-env Always use virtualenv. Install automatically if not present"
echo " -N, --no-virtual-env Don't use virtualenv. Run tests in local environment"
echo " -x, --stop Stop running tests after the first error or failure."
echo " -f, --force Force a clean re-build of the virtual environment. Useful when dependencies have been added."
echo " --with-coverage Runs tests with python code coverage (useful for jenkins)"
echo " Note: cannot be used in combination --with-progress"
echo " --with-progress Runs tests with progress (useful for developers)"
echo " Note: cannot be used in combination --with-coverage"
echo " --verbose Print additional logging"
echo " --debug Enable debug logging in Keystone instances"
echo " Note: you might need to 'sudo' this since it pip installs into the vitual environment"
echo " -p, --pep8 Just run pep8"
echo " -l, --pylint Just run pylint"
echo " -c, --coverage Generate coverage report"
echo " -h, --help Print this usage message"
echo " --hide-elapsed Don't print the elapsed time for each test along with slow test list"
echo " --verbose Print additional logging"
echo " --debug Enable debug logging in Keystone instances"
echo ""
echo "Note: with no options specified, the script will try to run the tests in a virtual environment,"
echo " If no virtualenv is found, the script will ask if you would like to create one. If you "
@ -25,16 +27,27 @@ function usage {
exit
}
only_run_flag=0
only_run=""
function process_option {
case "$1" in
-h|--help) usage;;
-V|--virtual-env) let always_venv=1; let never_venv=0;;
-N|--no-virtual-env) let always_venv=0; let never_venv=1;;
-p|--pep8) let just_pep8=1;;
-l|--pylint) let just_pylint=1; let never_venv=0;;
-f|--force) let force=1;;
*) addlargs="$addlargs $1"
esac
if [ $only_run_flag -eq 1 ]; then
only_run_flag=0
only_run=$1
return
else
case "$1" in
-h|--help) usage;;
-V|--virtual-env) always_venv=1; never_venv=0;;
-N|--no-virtual-env) always_venv=0; never_venv=1;;
-O) only_run_flag=1;;
-f|--force) force=1;;
-p|--pep8) just_pep8=1;;
-l|--pylint) just_pylint=1;;
-c|--coverage) coverage=1;;
-*) addlopts="$addlopts $1";;
*) addlargs="$addlargs $1"
esac
fi
}
venv=.keystone-venv
@ -43,36 +56,27 @@ always_venv=0
never_venv=0
force=0
addlargs=
addlopts=
wrapper=""
just_pep8=0
no_pep8=0
just_pylint=0
coverage=0
for arg in "$@"; do
process_option $arg
done
RUNTESTS="python run_tests.py $addlargs"
function run_tests {
# Just run the test suites in current environment
${wrapper} $RUNTESTS
}
# If enabled, tell nose/unittest to collect coverage data
if [ $coverage -eq 1 ]; then
addlopts="$addlopts --with-coverage --cover-package=keystone"
fi
function run_pep8 {
echo "Running pep8 ..."
PEP8_EXCLUDE="vcsversion.py"
PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat --show-pep8 --show-source"
PEP8_INCLUDE="bin/k* keystone examples tools setup.py run_tests.py"
${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE
}
function run_pylint {
echo "Running pylint ..."
PYLINT_OPTIONS="--rcfile=pylintrc --output-format=parseable"
PYLINT_INCLUDE="keystone"
echo "Pylint messages count: "
pylint $PYLINT_OPTIONS $PYLINT_INCLUDE | grep 'keystone/' | wc -l
echo "Run 'pylint $PYLINT_OPTIONS $PYLINT_INCLUDE' for a full report."
}
if [ "x$only_run" = "x" ]; then
RUNTESTS="python run_tests.py$addlopts$addlargs"
else
RUNTESTS="python run_tests.py$addlopts$addlargs -O $only_run"
fi
if [ $never_venv -eq 0 ]
then
@ -100,6 +104,51 @@ then
fi
fi
function run_tests {
# Just run the test suites in current environment
${wrapper} $RUNTESTS 2> run_tests.log
# If we get some short import error right away, print the error log directly
RESULT=$?
if [ "$RESULT" -ne "0" ];
then
ERRSIZE=`wc -l run_tests.log | awk '{print \$1}'`
if [ "$ERRSIZE" -lt "40" ];
then
cat run_tests.log
fi
fi
return $RESULT
}
function run_pep8 {
echo "Running pep8 ..."
# Opt-out files from pep8
ignore_scripts="*.sh"
ignore_files="*eventlet-patch,*pip-requires,*.log"
ignore_dirs="*ajaxterm*"
GLOBIGNORE="$ignore_scripts,$ignore_files,$ignore_dirs"
srcfiles=`find bin -type f -not -name "*.log" -not -name "*.db"`
srcfiles+=" keystone examples tools setup.py run_tests.py"
# Just run PEP8 in current environment
${wrapper} pep8 --repeat --show-pep8 --show-source \
--ignore=E202,E111 \
--exclude=vcsversion.py,$GLOBIGNORE ${srcfiles}
}
function run_pylint {
echo "Running pylint ..."
PYLINT_OPTIONS="--rcfile=pylintrc --output-format=parseable"
PYLINT_INCLUDE="keystone"
echo "Pylint messages count: "
pylint $PYLINT_OPTIONS $PYLINT_INCLUDE | grep 'keystone/' | wc -l
echo "Run 'pylint $PYLINT_OPTIONS $PYLINT_INCLUDE' for a full report."
}
# Delete old coverage data from previous runs
if [ $coverage -eq 1 ]; then
${wrapper} coverage erase
fi
if [ $just_pep8 -eq 1 ]; then
run_pep8
exit
@ -110,4 +159,11 @@ if [ $just_pylint -eq 1 ]; then
exit
fi
run_tests || exit
run_tests
if [ $coverage -eq 1 ]; then
echo "Generating coverage report in covhtml/"
${wrapper} coverage html -d covhtml -i
fi

View File

@ -17,6 +17,7 @@
import keystone
import os
import subprocess
import sys
from setuptools import setup, find_packages
@ -42,6 +43,12 @@ try:
except:
pass
requirements = ['setuptools', 'httplib2', 'eventlet', 'paste', 'pastedeploy',
'webob', 'Routes', 'sqlalchemy', 'sqlalchemy-migrate',
'pysqlite', 'lxml', 'passlib']
if sys.version_info < (2, 6):
requirements.append('simplejson')
setup(
name='keystone',
version=keystone.canonical_version(),
@ -59,8 +66,9 @@ setup(
'bin/keystone-control'],
zip_safe=False,
cmdclass=cmdclass,
install_requires=['setuptools'],
test_suite='nose.collector',
install_requires=requirements,
tests_require=['nose', 'unittest2', 'webtest', 'mox', 'pylint', 'pep8'],
test_suite='keystone.test.runtests',
entry_points={
'paste.app_factory': ['main=identity:app_factory'],
'paste.filter_factory': [

View File

@ -26,6 +26,7 @@ Sphinx # required to build documentation
coverage # computes code coverage percentages
# Testing
nose # for test discovery and console feedback
unittest2 # backport of unittest lib in python 2.7
webtest # test wsgi apps without starting an http server
pylint # static code analysis