84693b4a16
Fixes bug 897155 Also adds a new fake rpc implementation that tests use by default. This speeds up the test run by ~10% on my system. We can decide to ditch fake_rabbit at some point later.. Change-Id: I8877fad3d41ae055c15b1adff99e535c34e9ce92
366 lines
13 KiB
Python
366 lines
13 KiB
Python
#!/usr/bin/env python
|
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
# Colorizer Code is borrowed from Twisted:
|
|
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
|
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
|
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
|
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
"""Unittest runner for Nova.
|
|
|
|
To run all tests
|
|
python run_tests.py
|
|
|
|
To run a single test:
|
|
python run_tests.py test_compute:ComputeTestCase.test_run_terminate
|
|
|
|
To run a single test module:
|
|
python run_tests.py test_compute
|
|
|
|
or
|
|
|
|
python run_tests.py api.test_wsgi
|
|
|
|
"""
|
|
|
|
import gettext
|
|
import heapq
|
|
import os
|
|
import unittest
|
|
import sys
|
|
import time
|
|
|
|
gettext.install('nova', unicode=1)
|
|
|
|
import eventlet
|
|
from nose import config
|
|
from nose import core
|
|
from nose import result
|
|
|
|
from nova import log as logging
|
|
|
|
|
|
class _AnsiColorizer(object):
|
|
"""
|
|
A colorizer is an object that loosely wraps around a stream, allowing
|
|
callers to write text to the stream in a particular color.
|
|
|
|
Colorizer classes must implement C{supported()} and C{write(text, color)}.
|
|
"""
|
|
_colors = dict(black=30, red=31, green=32, yellow=33,
|
|
blue=34, magenta=35, cyan=36, white=37)
|
|
|
|
def __init__(self, stream):
|
|
self.stream = stream
|
|
|
|
def supported(cls, stream=sys.stdout):
|
|
"""
|
|
A class method that returns True if the current platform supports
|
|
coloring terminal output using this method. Returns False otherwise.
|
|
"""
|
|
if not stream.isatty():
|
|
return False # auto color only on TTYs
|
|
try:
|
|
import curses
|
|
except ImportError:
|
|
return False
|
|
else:
|
|
try:
|
|
try:
|
|
return curses.tigetnum("colors") > 2
|
|
except curses.error:
|
|
curses.setupterm()
|
|
return curses.tigetnum("colors") > 2
|
|
except:
|
|
raise
|
|
# guess false in case of error
|
|
return False
|
|
supported = classmethod(supported)
|
|
|
|
def write(self, text, color):
|
|
"""
|
|
Write the given text to the stream in the given color.
|
|
|
|
@param text: Text to be written to the stream.
|
|
|
|
@param color: A string label for a color. e.g. 'red', 'white'.
|
|
"""
|
|
color = self._colors[color]
|
|
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
|
|
|
|
|
|
class _Win32Colorizer(object):
|
|
"""
|
|
See _AnsiColorizer docstring.
|
|
"""
|
|
def __init__(self, stream):
|
|
from win32console import GetStdHandle, STD_OUT_HANDLE, \
|
|
FOREGROUND_RED, FOREGROUND_BLUE, FOREGROUND_GREEN, \
|
|
FOREGROUND_INTENSITY
|
|
red, green, blue, bold = (FOREGROUND_RED, FOREGROUND_GREEN,
|
|
FOREGROUND_BLUE, FOREGROUND_INTENSITY)
|
|
self.stream = stream
|
|
self.screenBuffer = GetStdHandle(STD_OUT_HANDLE)
|
|
self._colors = {
|
|
'normal': red | green | blue,
|
|
'red': red | bold,
|
|
'green': green | bold,
|
|
'blue': blue | bold,
|
|
'yellow': red | green | bold,
|
|
'magenta': red | blue | bold,
|
|
'cyan': green | blue | bold,
|
|
'white': red | green | blue | bold
|
|
}
|
|
|
|
def supported(cls, stream=sys.stdout):
|
|
try:
|
|
import win32console
|
|
screenBuffer = win32console.GetStdHandle(
|
|
win32console.STD_OUT_HANDLE)
|
|
except ImportError:
|
|
return False
|
|
import pywintypes
|
|
try:
|
|
screenBuffer.SetConsoleTextAttribute(
|
|
win32console.FOREGROUND_RED |
|
|
win32console.FOREGROUND_GREEN |
|
|
win32console.FOREGROUND_BLUE)
|
|
except pywintypes.error:
|
|
return False
|
|
else:
|
|
return True
|
|
supported = classmethod(supported)
|
|
|
|
def write(self, text, color):
|
|
color = self._colors[color]
|
|
self.screenBuffer.SetConsoleTextAttribute(color)
|
|
self.stream.write(text)
|
|
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
|
|
|
|
|
|
class _NullColorizer(object):
|
|
"""
|
|
See _AnsiColorizer docstring.
|
|
"""
|
|
def __init__(self, stream):
|
|
self.stream = stream
|
|
|
|
def supported(cls, stream=sys.stdout):
|
|
return True
|
|
supported = classmethod(supported)
|
|
|
|
def write(self, text, color):
|
|
self.stream.write(text)
|
|
|
|
|
|
def get_elapsed_time_color(elapsed_time):
|
|
if elapsed_time > 1.0:
|
|
return 'red'
|
|
elif elapsed_time > 0.25:
|
|
return 'yellow'
|
|
else:
|
|
return 'green'
|
|
|
|
|
|
class NovaTestResult(result.TextTestResult):
|
|
def __init__(self, *args, **kw):
|
|
self.show_elapsed = kw.pop('show_elapsed')
|
|
result.TextTestResult.__init__(self, *args, **kw)
|
|
self.num_slow_tests = 5
|
|
self.slow_tests = [] # this is a fixed-sized heap
|
|
self._last_case = None
|
|
self.colorizer = None
|
|
# NOTE(vish): reset stdout for the terminal check
|
|
stdout = sys.stdout
|
|
sys.stdout = sys.__stdout__
|
|
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
|
|
if colorizer.supported():
|
|
self.colorizer = colorizer(self.stream)
|
|
break
|
|
sys.stdout = stdout
|
|
|
|
# NOTE(lorinh): Initialize start_time in case a sqlalchemy-migrate
|
|
# error results in it failing to be initialized later. Otherwise,
|
|
# _handleElapsedTime will fail, causing the wrong error message to
|
|
# be outputted.
|
|
self.start_time = time.time()
|
|
|
|
def getDescription(self, test):
|
|
return str(test)
|
|
|
|
def _handleElapsedTime(self, test):
|
|
self.elapsed_time = time.time() - self.start_time
|
|
item = (self.elapsed_time, test)
|
|
# Record only the n-slowest tests using heap
|
|
if len(self.slow_tests) >= self.num_slow_tests:
|
|
heapq.heappushpop(self.slow_tests, item)
|
|
else:
|
|
heapq.heappush(self.slow_tests, item)
|
|
|
|
def _writeElapsedTime(self, test):
|
|
color = get_elapsed_time_color(self.elapsed_time)
|
|
self.colorizer.write(" %.2f" % self.elapsed_time, color)
|
|
|
|
def _writeResult(self, test, long_result, color, short_result, success):
|
|
if self.showAll:
|
|
self.colorizer.write(long_result, color)
|
|
if self.show_elapsed and success:
|
|
self._writeElapsedTime(test)
|
|
self.stream.writeln()
|
|
elif self.dots:
|
|
self.stream.write(short_result)
|
|
self.stream.flush()
|
|
|
|
# NOTE(vish): copied from unittest with edit to add color
|
|
def addSuccess(self, test):
|
|
unittest.TestResult.addSuccess(self, test)
|
|
self._handleElapsedTime(test)
|
|
self._writeResult(test, 'OK', 'green', '.', True)
|
|
|
|
# NOTE(vish): copied from unittest with edit to add color
|
|
def addFailure(self, test, err):
|
|
unittest.TestResult.addFailure(self, test, err)
|
|
self._handleElapsedTime(test)
|
|
self._writeResult(test, 'FAIL', 'red', 'F', False)
|
|
|
|
# NOTE(vish): copied from nose with edit to add color
|
|
def addError(self, test, err):
|
|
"""Overrides normal addError to add support for
|
|
errorClasses. If the exception is a registered class, the
|
|
error will be added to the list for that class, not errors.
|
|
"""
|
|
self._handleElapsedTime(test)
|
|
stream = getattr(self, 'stream', None)
|
|
ec, ev, tb = err
|
|
try:
|
|
exc_info = self._exc_info_to_string(err, test)
|
|
except TypeError:
|
|
# 2.3 compat
|
|
exc_info = self._exc_info_to_string(err)
|
|
for cls, (storage, label, isfail) in self.errorClasses.items():
|
|
if result.isclass(ec) and issubclass(ec, cls):
|
|
if isfail:
|
|
test.passed = False
|
|
storage.append((test, exc_info))
|
|
# Might get patched into a streamless result
|
|
if stream is not None:
|
|
if self.showAll:
|
|
message = [label]
|
|
detail = result._exception_detail(err[1])
|
|
if detail:
|
|
message.append(detail)
|
|
stream.writeln(": ".join(message))
|
|
elif self.dots:
|
|
stream.write(label[:1])
|
|
return
|
|
self.errors.append((test, exc_info))
|
|
test.passed = False
|
|
if stream is not None:
|
|
self._writeResult(test, 'ERROR', 'red', 'E', False)
|
|
|
|
def startTest(self, test):
|
|
unittest.TestResult.startTest(self, test)
|
|
self.start_time = time.time()
|
|
current_case = test.test.__class__.__name__
|
|
|
|
if self.showAll:
|
|
if current_case != self._last_case:
|
|
self.stream.writeln(current_case)
|
|
self._last_case = current_case
|
|
|
|
self.stream.write(
|
|
' %s' % str(test.test._testMethodName).ljust(60))
|
|
self.stream.flush()
|
|
|
|
|
|
class NovaTestRunner(core.TextTestRunner):
|
|
def __init__(self, *args, **kwargs):
|
|
self.show_elapsed = kwargs.pop('show_elapsed')
|
|
core.TextTestRunner.__init__(self, *args, **kwargs)
|
|
|
|
def _makeResult(self):
|
|
return NovaTestResult(self.stream,
|
|
self.descriptions,
|
|
self.verbosity,
|
|
self.config,
|
|
show_elapsed=self.show_elapsed)
|
|
|
|
def _writeSlowTests(self, result_):
|
|
# Pare out 'fast' tests
|
|
slow_tests = [item for item in result_.slow_tests
|
|
if get_elapsed_time_color(item[0]) != 'green']
|
|
if slow_tests:
|
|
slow_total_time = sum(item[0] for item in slow_tests)
|
|
self.stream.writeln("Slowest %i tests took %.2f secs:"
|
|
% (len(slow_tests), slow_total_time))
|
|
for elapsed_time, test in sorted(slow_tests, reverse=True):
|
|
time_str = "%.2f" % elapsed_time
|
|
self.stream.writeln(" %s %s" % (time_str.ljust(10), test))
|
|
|
|
def run(self, test):
|
|
result_ = core.TextTestRunner.run(self, test)
|
|
if self.show_elapsed:
|
|
self._writeSlowTests(result_)
|
|
return result_
|
|
|
|
|
|
if __name__ == '__main__':
|
|
eventlet.monkey_patch()
|
|
logging.setup()
|
|
# If any argument looks like a test name but doesn't have "nova.tests" in
|
|
# front of it, automatically add that so we don't have to type as much
|
|
show_elapsed = True
|
|
argv = []
|
|
for x in sys.argv:
|
|
if x.startswith('test_'):
|
|
argv.append('nova.tests.%s' % x)
|
|
elif x.startswith('--hide-elapsed'):
|
|
show_elapsed = False
|
|
else:
|
|
argv.append(x)
|
|
|
|
testdir = os.path.abspath(os.path.join("nova", "tests"))
|
|
c = config.Config(stream=sys.stdout,
|
|
env=os.environ,
|
|
verbosity=3,
|
|
workingDir=testdir,
|
|
plugins=core.DefaultPluginManager())
|
|
|
|
runner = NovaTestRunner(stream=c.stream,
|
|
verbosity=c.verbosity,
|
|
config=c,
|
|
show_elapsed=show_elapsed)
|
|
sys.exit(not core.run(config=c, testRunner=runner, argv=argv))
|