d7f1221684
OpenStack as a whole is moving towards using testrepository and testtools for running tests. To that end, bring Glance into line by switching it to use testrepository to run tests. This copies run_tests.sh and tools/colorizer.py from Nova. This change also has some minor changes to make run_test.sh work well. Partial fixes bug: 1179009 Fixes bug: 1271806 Change-Id: Ic265bc0d2f1528358f6e8ee5b4139f991923fc72 Signed-off-by: Steve Kowalik <steven@wedontsleep.org> Signed-off-by: Zhi Yan Liu <zhiyanl@cn.ibm.com>
336 lines
12 KiB
Python
Executable File
336 lines
12 KiB
Python
Executable File
#!/usr/bin/env python
|
|
|
|
# Copyright (c) 2013, Nebula, Inc.
|
|
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
#
|
|
# Colorizer Code is borrowed from Twisted:
|
|
# Copyright (c) 2001-2010 Twisted Matrix Laboratories.
|
|
#
|
|
# Permission is hereby granted, free of charge, to any person obtaining
|
|
# a copy of this software and associated documentation files (the
|
|
# "Software"), to deal in the Software without restriction, including
|
|
# without limitation the rights to use, copy, modify, merge, publish,
|
|
# distribute, sublicense, and/or sell copies of the Software, and to
|
|
# permit persons to whom the Software is furnished to do so, subject to
|
|
# the following conditions:
|
|
#
|
|
# The above copyright notice and this permission notice shall be
|
|
# included in all copies or substantial portions of the Software.
|
|
#
|
|
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
|
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
|
# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
|
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
|
|
# LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
|
|
# OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
|
|
# WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
|
|
|
|
"""Display a subunit stream through a colorized unittest test runner."""
|
|
|
|
import heapq
|
|
import subunit
|
|
import sys
|
|
import unittest
|
|
|
|
import testtools
|
|
|
|
|
|
class _AnsiColorizer(object):
|
|
"""
|
|
A colorizer is an object that loosely wraps around a stream, allowing
|
|
callers to write text to the stream in a particular color.
|
|
|
|
Colorizer classes must implement C{supported()} and C{write(text, color)}.
|
|
"""
|
|
_colors = dict(black=30, red=31, green=32, yellow=33,
|
|
blue=34, magenta=35, cyan=36, white=37)
|
|
|
|
def __init__(self, stream):
|
|
self.stream = stream
|
|
|
|
def supported(cls, stream=sys.stdout):
|
|
"""
|
|
A class method that returns True if the current platform supports
|
|
coloring terminal output using this method. Returns False otherwise.
|
|
"""
|
|
if not stream.isatty():
|
|
return False # auto color only on TTYs
|
|
try:
|
|
import curses
|
|
except ImportError:
|
|
return False
|
|
else:
|
|
try:
|
|
try:
|
|
return curses.tigetnum("colors") > 2
|
|
except curses.error:
|
|
curses.setupterm()
|
|
return curses.tigetnum("colors") > 2
|
|
except Exception:
|
|
# guess false in case of error
|
|
return False
|
|
supported = classmethod(supported)
|
|
|
|
def write(self, text, color):
|
|
"""
|
|
Write the given text to the stream in the given color.
|
|
|
|
@param text: Text to be written to the stream.
|
|
|
|
@param color: A string label for a color. e.g. 'red', 'white'.
|
|
"""
|
|
color = self._colors[color]
|
|
self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
|
|
|
|
|
|
class _Win32Colorizer(object):
|
|
"""
|
|
See _AnsiColorizer docstring.
|
|
"""
|
|
def __init__(self, stream):
|
|
import win32console
|
|
red, green, blue, bold = (win32console.FOREGROUND_RED,
|
|
win32console.FOREGROUND_GREEN,
|
|
win32console.FOREGROUND_BLUE,
|
|
win32console.FOREGROUND_INTENSITY)
|
|
self.stream = stream
|
|
self.screenBuffer = win32console.GetStdHandle(
|
|
win32console.STD_OUT_HANDLE)
|
|
self._colors = {
|
|
'normal': red | green | blue,
|
|
'red': red | bold,
|
|
'green': green | bold,
|
|
'blue': blue | bold,
|
|
'yellow': red | green | bold,
|
|
'magenta': red | blue | bold,
|
|
'cyan': green | blue | bold,
|
|
'white': red | green | blue | bold
|
|
}
|
|
|
|
def supported(cls, stream=sys.stdout):
|
|
try:
|
|
import win32console
|
|
screenBuffer = win32console.GetStdHandle(
|
|
win32console.STD_OUT_HANDLE)
|
|
except ImportError:
|
|
return False
|
|
import pywintypes
|
|
try:
|
|
screenBuffer.SetConsoleTextAttribute(
|
|
win32console.FOREGROUND_RED |
|
|
win32console.FOREGROUND_GREEN |
|
|
win32console.FOREGROUND_BLUE)
|
|
except pywintypes.error:
|
|
return False
|
|
else:
|
|
return True
|
|
supported = classmethod(supported)
|
|
|
|
def write(self, text, color):
|
|
color = self._colors[color]
|
|
self.screenBuffer.SetConsoleTextAttribute(color)
|
|
self.stream.write(text)
|
|
self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
|
|
|
|
|
|
class _NullColorizer(object):
|
|
"""
|
|
See _AnsiColorizer docstring.
|
|
"""
|
|
def __init__(self, stream):
|
|
self.stream = stream
|
|
|
|
def supported(cls, stream=sys.stdout):
|
|
return True
|
|
supported = classmethod(supported)
|
|
|
|
def write(self, text, color):
|
|
self.stream.write(text)
|
|
|
|
|
|
def get_elapsed_time_color(elapsed_time):
|
|
if elapsed_time > 1.0:
|
|
return 'red'
|
|
elif elapsed_time > 0.25:
|
|
return 'yellow'
|
|
else:
|
|
return 'green'
|
|
|
|
|
|
class SubunitTestResult(testtools.TestResult):
|
|
def __init__(self, stream, descriptions, verbosity):
|
|
super(SubunitTestResult, self).__init__()
|
|
self.stream = stream
|
|
self.showAll = verbosity > 1
|
|
self.num_slow_tests = 10
|
|
self.slow_tests = [] # this is a fixed-sized heap
|
|
self.colorizer = None
|
|
# NOTE(vish): reset stdout for the terminal check
|
|
stdout = sys.stdout
|
|
sys.stdout = sys.__stdout__
|
|
for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
|
|
if colorizer.supported():
|
|
self.colorizer = colorizer(self.stream)
|
|
break
|
|
sys.stdout = stdout
|
|
self.start_time = None
|
|
self.last_time = {}
|
|
self.results = {}
|
|
self.last_written = None
|
|
|
|
def _writeElapsedTime(self, elapsed):
|
|
color = get_elapsed_time_color(elapsed)
|
|
self.colorizer.write(" %.2f" % elapsed, color)
|
|
|
|
def _addResult(self, test, *args):
|
|
try:
|
|
name = test.id()
|
|
except AttributeError:
|
|
name = 'Unknown.unknown'
|
|
test_class, test_name = name.rsplit('.', 1)
|
|
|
|
elapsed = (self._now() - self.start_time).total_seconds()
|
|
item = (elapsed, test_class, test_name)
|
|
if len(self.slow_tests) >= self.num_slow_tests:
|
|
heapq.heappushpop(self.slow_tests, item)
|
|
else:
|
|
heapq.heappush(self.slow_tests, item)
|
|
|
|
self.results.setdefault(test_class, [])
|
|
self.results[test_class].append((test_name, elapsed) + args)
|
|
self.last_time[test_class] = self._now()
|
|
self.writeTests()
|
|
|
|
def _writeResult(self, test_name, elapsed, long_result, color,
|
|
short_result, success):
|
|
if self.showAll:
|
|
self.stream.write(' %s' % str(test_name).ljust(66))
|
|
self.colorizer.write(long_result, color)
|
|
if success:
|
|
self._writeElapsedTime(elapsed)
|
|
self.stream.writeln()
|
|
else:
|
|
self.colorizer.write(short_result, color)
|
|
|
|
def addSuccess(self, test):
|
|
super(SubunitTestResult, self).addSuccess(test)
|
|
self._addResult(test, 'OK', 'green', '.', True)
|
|
|
|
def addFailure(self, test, err):
|
|
if test.id() == 'process-returncode':
|
|
return
|
|
super(SubunitTestResult, self).addFailure(test, err)
|
|
self._addResult(test, 'FAIL', 'red', 'F', False)
|
|
|
|
def addError(self, test, err):
|
|
super(SubunitTestResult, self).addFailure(test, err)
|
|
self._addResult(test, 'ERROR', 'red', 'E', False)
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
super(SubunitTestResult, self).addSkip(test, reason, details)
|
|
self._addResult(test, 'SKIP', 'blue', 'S', True)
|
|
|
|
def startTest(self, test):
|
|
self.start_time = self._now()
|
|
super(SubunitTestResult, self).startTest(test)
|
|
|
|
def writeTestCase(self, cls):
|
|
if not self.results.get(cls):
|
|
return
|
|
if cls != self.last_written:
|
|
self.colorizer.write(cls, 'white')
|
|
self.stream.writeln()
|
|
for result in self.results[cls]:
|
|
self._writeResult(*result)
|
|
del self.results[cls]
|
|
self.stream.flush()
|
|
self.last_written = cls
|
|
|
|
def writeTests(self):
|
|
time = self.last_time.get(self.last_written, self._now())
|
|
if not self.last_written or (self._now() - time).total_seconds() > 2.0:
|
|
diff = 3.0
|
|
while diff > 2.0:
|
|
classes = self.results.keys()
|
|
oldest = min(classes, key=lambda x: self.last_time[x])
|
|
diff = (self._now() - self.last_time[oldest]).total_seconds()
|
|
self.writeTestCase(oldest)
|
|
else:
|
|
self.writeTestCase(self.last_written)
|
|
|
|
def done(self):
|
|
self.stopTestRun()
|
|
|
|
def stopTestRun(self):
|
|
for cls in list(self.results.iterkeys()):
|
|
self.writeTestCase(cls)
|
|
self.stream.writeln()
|
|
self.writeSlowTests()
|
|
|
|
def writeSlowTests(self):
|
|
# Pare out 'fast' tests
|
|
slow_tests = [item for item in self.slow_tests
|
|
if get_elapsed_time_color(item[0]) != 'green']
|
|
if slow_tests:
|
|
slow_total_time = sum(item[0] for item in slow_tests)
|
|
slow = ("Slowest %i tests took %.2f secs:"
|
|
% (len(slow_tests), slow_total_time))
|
|
self.colorizer.write(slow, 'yellow')
|
|
self.stream.writeln()
|
|
last_cls = None
|
|
# sort by name
|
|
for elapsed, cls, name in sorted(slow_tests,
|
|
key=lambda x: x[1] + x[2]):
|
|
if cls != last_cls:
|
|
self.colorizer.write(cls, 'white')
|
|
self.stream.writeln()
|
|
last_cls = cls
|
|
self.stream.write(' %s' % str(name).ljust(68))
|
|
self._writeElapsedTime(elapsed)
|
|
self.stream.writeln()
|
|
|
|
def printErrors(self):
|
|
if self.showAll:
|
|
self.stream.writeln()
|
|
self.printErrorList('ERROR', self.errors)
|
|
self.printErrorList('FAIL', self.failures)
|
|
|
|
def printErrorList(self, flavor, errors):
|
|
for test, err in errors:
|
|
self.colorizer.write("=" * 70, 'red')
|
|
self.stream.writeln()
|
|
self.colorizer.write(flavor, 'red')
|
|
self.stream.writeln(": %s" % test.id())
|
|
self.colorizer.write("-" * 70, 'red')
|
|
self.stream.writeln()
|
|
self.stream.writeln("%s" % err)
|
|
|
|
|
|
test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
|
|
|
|
if sys.version_info[0:2] <= (2, 6):
|
|
runner = unittest.TextTestRunner(verbosity=2)
|
|
else:
|
|
runner = unittest.TextTestRunner(
|
|
verbosity=2, resultclass=SubunitTestResult)
|
|
|
|
if runner.run(test).wasSuccessful():
|
|
exit_code = 0
|
|
else:
|
|
exit_code = 1
|
|
sys.exit(exit_code)
|