AWS EC2 and VPC API support in standalone service for OpenStack.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

colorizer.py 12KB


  1. #!/usr/bin/env python
  2. # Copyright (c) 2013, Nebula, Inc.
  3. # Copyright 2010 United States Government as represented by the
  4. # Administrator of the National Aeronautics and Space Administration.
  5. # All Rights Reserved.
  6. #
  7. # Licensed under the Apache License, Version 2.0 (the "License"); you may
  8. # not use this file except in compliance with the License. You may obtain
  9. # a copy of the License at
  10. #
  11. # http://www.apache.org/licenses/LICENSE-2.0
  12. #
  13. # Unless required by applicable law or agreed to in writing, software
  14. # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
  15. # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
  16. # License for the specific language governing permissions and limitations
  17. # under the License.
  18. #
  19. # Colorizer Code is borrowed from Twisted:
  20. # Copyright (c) 2001-2010 Twisted Matrix Laboratories.
  21. #
  22. # Permission is hereby granted, free of charge, to any person obtaining
  23. # a copy of this software and associated documentation files (the
  24. # "Software"), to deal in the Software without restriction, including
  25. # without limitation the rights to use, copy, modify, merge, publish,
  26. # distribute, sublicense, and/or sell copies of the Software, and to
  27. # permit persons to whom the Software is furnished to do so, subject to
  28. # the following conditions:
  29. #
  30. # The above copyright notice and this permission notice shall be
  31. # included in all copies or substantial portions of the Software.
  32. #
  33. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
  34. # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
  35. # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
  36. # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
  37. # LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
  38. # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
  39. # WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
  40. """Display a subunit stream through a colorized unittest test runner."""
  41. import heapq
  42. import sys
  43. import unittest
  44. import subunit
  45. import testtools
  46. class _AnsiColorizer(object):
  47. """
  48. A colorizer is an object that loosely wraps around a stream, allowing
  49. callers to write text to the stream in a particular color.
  50. Colorizer classes must implement C{supported()} and C{write(text, color)}.
  51. """
  52. _colors = dict(black=30, red=31, green=32, yellow=33,
  53. blue=34, magenta=35, cyan=36, white=37)
  54. def __init__(self, stream):
  55. self.stream = stream
  56. def supported(cls, stream=sys.stdout):
  57. """
  58. A class method that returns True if the current platform supports
  59. coloring terminal output using this method. Returns False otherwise.
  60. """
  61. if not stream.isatty():
  62. return False # auto color only on TTYs
  63. try:
  64. import curses
  65. except ImportError:
  66. return False
  67. else:
  68. try:
  69. try:
  70. return curses.tigetnum("colors") > 2
  71. except curses.error:
  72. curses.setupterm()
  73. return curses.tigetnum("colors") > 2
  74. except Exception:
  75. # guess false in case of error
  76. return False
  77. supported = classmethod(supported)
  78. def write(self, text, color):
  79. """
  80. Write the given text to the stream in the given color.
  81. @param text: Text to be written to the stream.
  82. @param color: A string label for a color. e.g. 'red', 'white'.
  83. """
  84. color = self._colors[color]
  85. self.stream.write('\x1b[%s;1m%s\x1b[0m' % (color, text))
  86. class _Win32Colorizer(object):
  87. """
  88. See _AnsiColorizer docstring.
  89. """
  90. def __init__(self, stream):
  91. import win32console
  92. red, green, blue, bold = (win32console.FOREGROUND_RED,
  93. win32console.FOREGROUND_GREEN,
  94. win32console.FOREGROUND_BLUE,
  95. win32console.FOREGROUND_INTENSITY)
  96. self.stream = stream
  97. self.screenBuffer = win32console.GetStdHandle(
  98. win32console.STD_OUT_HANDLE)
  99. self._colors = {'normal': red | green | blue,
  100. 'red': red | bold,
  101. 'green': green | bold,
  102. 'blue': blue | bold,
  103. 'yellow': red | green | bold,
  104. 'magenta': red | blue | bold,
  105. 'cyan': green | blue | bold,
  106. 'white': red | green | blue | bold}
  107. def supported(cls, stream=sys.stdout):
  108. try:
  109. import win32console
  110. screenBuffer = win32console.GetStdHandle(
  111. win32console.STD_OUT_HANDLE)
  112. except ImportError:
  113. return False
  114. import pywintypes
  115. try:
  116. screenBuffer.SetConsoleTextAttribute(
  117. win32console.FOREGROUND_RED |
  118. win32console.FOREGROUND_GREEN |
  119. win32console.FOREGROUND_BLUE)
  120. except pywintypes.error:
  121. return False
  122. else:
  123. return True
  124. supported = classmethod(supported)
  125. def write(self, text, color):
  126. color = self._colors[color]
  127. self.screenBuffer.SetConsoleTextAttribute(color)
  128. self.stream.write(text)
  129. self.screenBuffer.SetConsoleTextAttribute(self._colors['normal'])
  130. class _NullColorizer(object):
  131. """
  132. See _AnsiColorizer docstring.
  133. """
  134. def __init__(self, stream):
  135. self.stream = stream
  136. def supported(cls, stream=sys.stdout):
  137. return True
  138. supported = classmethod(supported)
  139. def write(self, text, color):
  140. self.stream.write(text)
  141. def get_elapsed_time_color(elapsed_time):
  142. if elapsed_time > 1.0:
  143. return 'red'
  144. elif elapsed_time > 0.25:
  145. return 'yellow'
  146. else:
  147. return 'green'
  148. class EC2ApiTestResult(testtools.TestResult):
  149. def __init__(self, stream, descriptions, verbosity):
  150. super(EC2ApiTestResult, self).__init__()
  151. self.stream = stream
  152. self.showAll = verbosity > 1
  153. self.num_slow_tests = 10
  154. self.slow_tests = [] # this is a fixed-sized heap
  155. self.colorizer = None
  156. # NOTE(vish): reset stdout for the terminal check
  157. stdout = sys.stdout
  158. sys.stdout = sys.__stdout__
  159. for colorizer in [_Win32Colorizer, _AnsiColorizer, _NullColorizer]:
  160. if colorizer.supported():
  161. self.colorizer = colorizer(self.stream)
  162. break
  163. sys.stdout = stdout
  164. self.start_time = None
  165. self.last_time = {}
  166. self.results = {}
  167. self.last_written = None
  168. def _writeElapsedTime(self, elapsed):
  169. color = get_elapsed_time_color(elapsed)
  170. self.colorizer.write(" %.2f" % elapsed, color)
  171. def _addResult(self, test, *args):
  172. try:
  173. name = test.id()
  174. except AttributeError:
  175. name = 'Unknown.unknown'
  176. test_class, test_name = name.rsplit('.', 1)
  177. elapsed = (self._now() - self.start_time).total_seconds()
  178. item = (elapsed, test_class, test_name)
  179. if len(self.slow_tests) >= self.num_slow_tests:
  180. heapq.heappushpop(self.slow_tests, item)
  181. else:
  182. heapq.heappush(self.slow_tests, item)
  183. self.results.setdefault(test_class, [])
  184. self.results[test_class].append((test_name, elapsed) + args)
  185. self.last_time[test_class] = self._now()
  186. self.writeTests()
  187. def _writeResult(self, test_name, elapsed, long_result, color,
  188. short_result, success):
  189. if self.showAll:
  190. self.stream.write(' %s' % str(test_name).ljust(66))
  191. self.colorizer.write(long_result, color)
  192. if success:
  193. self._writeElapsedTime(elapsed)
  194. self.stream.writeln()
  195. else:
  196. self.colorizer.write(short_result, color)
  197. def addSuccess(self, test):
  198. super(EC2ApiTestResult, self).addSuccess(test)
  199. self._addResult(test, 'OK', 'green', '.', True)
  200. def addFailure(self, test, err):
  201. if test.id() == 'process-returncode':
  202. return
  203. super(EC2ApiTestResult, self).addFailure(test, err)
  204. self._addResult(test, 'FAIL', 'red', 'F', False)
  205. def addError(self, test, err):
  206. super(EC2ApiTestResult, self).addFailure(test, err)
  207. self._addResult(test, 'ERROR', 'red', 'E', False)
  208. def addSkip(self, test, reason=None, details=None):
  209. super(EC2ApiTestResult, self).addSkip(test, reason, details)
  210. self._addResult(test, 'SKIP', 'blue', 'S', True)
  211. def startTest(self, test):
  212. self.start_time = self._now()
  213. super(EC2ApiTestResult, self).startTest(test)
  214. def writeTestCase(self, cls):
  215. if not self.results.get(cls):
  216. return
  217. if cls != self.last_written:
  218. self.colorizer.write(cls, 'white')
  219. self.stream.writeln()
  220. for result in self.results[cls]:
  221. self._writeResult(*result)
  222. del self.results[cls]
  223. self.stream.flush()
  224. self.last_written = cls
  225. def writeTests(self):
  226. time = self.last_time.get(self.last_written, self._now())
  227. if not self.last_written or (self._now() - time).total_seconds() > 2.0:
  228. diff = 3.0
  229. while diff > 2.0:
  230. classes = self.results.keys()
  231. oldest = min(classes, key=lambda x: self.last_time[x])
  232. diff = (self._now() - self.last_time[oldest]).total_seconds()
  233. self.writeTestCase(oldest)
  234. else:
  235. self.writeTestCase(self.last_written)
  236. def done(self):
  237. self.stopTestRun()
  238. def stopTestRun(self):
  239. for cls in list(self.results.iterkeys()):
  240. self.writeTestCase(cls)
  241. self.stream.writeln()
  242. self.writeSlowTests()
  243. def writeSlowTests(self):
  244. # Pare out 'fast' tests
  245. slow_tests = [item for item in self.slow_tests
  246. if get_elapsed_time_color(item[0]) != 'green']
  247. if slow_tests:
  248. slow_total_time = sum(item[0] for item in slow_tests)
  249. slow = ("Slowest %i tests took %.2f secs:"
  250. % (len(slow_tests), slow_total_time))
  251. self.colorizer.write(slow, 'yellow')
  252. self.stream.writeln()
  253. last_cls = None
  254. # sort by name
  255. for elapsed, cls, name in sorted(slow_tests,
  256. key=lambda x: x[1] + x[2]):
  257. if cls != last_cls:
  258. self.colorizer.write(cls, 'white')
  259. self.stream.writeln()
  260. last_cls = cls
  261. self.stream.write(' %s' % str(name).ljust(68))
  262. self._writeElapsedTime(elapsed)
  263. self.stream.writeln()
  264. def printErrors(self):
  265. if self.showAll:
  266. self.stream.writeln()
  267. self.printErrorList('ERROR', self.errors)
  268. self.printErrorList('FAIL', self.failures)
  269. def printErrorList(self, flavor, errors):
  270. for test, err in errors:
  271. self.colorizer.write("=" * 70, 'red')
  272. self.stream.writeln()
  273. self.colorizer.write(flavor, 'red')
  274. self.stream.writeln(": %s" % test.id())
  275. self.colorizer.write("-" * 70, 'red')
  276. self.stream.writeln()
  277. self.stream.writeln("%s" % err)
  278. test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
  279. if sys.version_info[0:2] <= (2, 6):
  280. runner = unittest.TextTestRunner(verbosity=2)
  281. else:
  282. runner = unittest.TextTestRunner(verbosity=2, resultclass=EC2ApiTestResult)
  283. if runner.run(test).wasSuccessful():
  284. exit_code = 0
  285. else:
  286. exit_code = 1
  287. sys.exit(exit_code)