Port existing filters to v2.

This commit is contained in:
Robert Collins
2013-03-03 22:28:51 +13:00
parent 1565f398b5
commit b1d5d5881f
12 changed files with 358 additions and 382 deletions

View File

@@ -33,7 +33,7 @@ def make_options(description):
def main():
parser = make_options(__doc__)
(options, args) = parser.parse_args()
case = ByteStreamToStreamResult(sys.stdin)
case = ByteStreamToStreamResult(sys.stdin, non_subunit_name='stdout')
result = StreamToExtendedDecorator(TestProtocolClient(sys.stdout))
# What about stdout chunks?
result.startTestRun()

View File

@@ -19,6 +19,7 @@
import pygtk
pygtk.require('2.0')
import pynotify
from testtools import StreamToExtendedDecorator
from subunit import TestResultStats
from subunit.filters import run_filter_script
@@ -28,6 +29,7 @@ if not pynotify.init("Subunit-notify"):
def notify_of_result(result):
result = result.decorated
if result.failed_tests > 0:
summary = "Test run failed"
else:
@@ -41,4 +43,6 @@ def notify_of_result(result):
nw.show()
run_filter_script(TestResultStats, __doc__, notify_of_result)
run_filter_script(
lambda output:StreamToExtendedDecorator(TestResultStats(output)),
__doc__, notify_of_result, protocol_version=2)

View File

@@ -16,26 +16,17 @@
"""Filter a subunit stream to get aggregate statistics."""
from optparse import OptionParser
import sys
import unittest
from subunit import DiscardStream, ProtocolTestCase, TestResultStats
from testtools import StreamToExtendedDecorator
from subunit import TestResultStats
from subunit.filters import run_filter_script
parser = OptionParser(description=__doc__)
parser.add_option("--no-passthrough", action="store_true",
help="Hide all non subunit input.", default=False, dest="no_passthrough")
(options, args) = parser.parse_args()
result = TestResultStats(sys.stdout)
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:
passthrough_stream = None
test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream)
test.run(result)
result.formatStats()
if result.wasSuccessful():
exit_code = 0
else:
exit_code = 1
sys.exit(exit_code)
def show_stats(r):
r.decorated.formatStats()
run_filter_script(
lambda output:StreamToExtendedDecorator(result),
__doc__, show_stats, protocol_version=2)

View File

@@ -16,8 +16,11 @@
"""Turn a subunit stream into a CSV"""
from testtools import StreamToExtendedDecorator
from subunit.filters import run_filter_script
from subunit.test_results import CsvResult
run_filter_script(CsvResult, __doc__)
run_filter_script(lambda output:StreamToExtendedDecorator(CsvResult(output)),
__doc__, protocol_version=2)

View File

@@ -46,17 +46,20 @@
"""Display a subunit stream in a gtk progress window."""
import sys
import threading
import unittest
import pygtk
pygtk.require('2.0')
import gtk, gtk.gdk, gobject
from testtools import StreamToExtendedDecorator
from subunit import (
PROGRESS_POP,
PROGRESS_PUSH,
PROGRESS_SET,
TestProtocolServer,
ByteStreamToStreamResult,
)
from subunit.progress_model import ProgressModel
@@ -139,6 +142,9 @@ class GTKTestResult(unittest.TestResult):
def stopTest(self, test):
super(GTKTestResult, self).stopTest(test)
gobject.idle_add(self._stopTest)
def _stopTest(self):
self.progress_model.advance()
if self.progress_model.width() == 0:
self.pbar.pulse()
@@ -153,26 +159,26 @@ class GTKTestResult(unittest.TestResult):
super(GTKTestResult, self).stopTestRun()
except AttributeError:
pass
self.pbar.set_text('Finished')
gobject.idle_add(self.pbar.set_text, 'Finished')
def addError(self, test, err):
super(GTKTestResult, self).addError(test, err)
self.update_counts()
gobject.idle_add(self.update_counts)
def addFailure(self, test, err):
super(GTKTestResult, self).addFailure(test, err)
self.update_counts()
gobject.idle_add(self.update_counts)
def addSuccess(self, test):
super(GTKTestResult, self).addSuccess(test)
self.update_counts()
gobject.idle_add(self.update_counts)
def addSkip(self, test, reason):
# addSkip is new in Python 2.7/3.1
addSkip = getattr(super(GTKTestResult, self), 'addSkip', None)
if callable(addSkip):
addSkip(test, reason)
self.update_counts()
gobject.idle_add(self.update_counts)
def addExpectedFailure(self, test, err):
# addExpectedFailure is new in Python 2.7/3.1
@@ -180,7 +186,7 @@ class GTKTestResult(unittest.TestResult):
'addExpectedFailure', None)
if callable(addExpectedFailure):
addExpectedFailure(test, err)
self.update_counts()
gobject.idle_add(self.update_counts)
def addUnexpectedSuccess(self, test):
# addUnexpectedSuccess is new in Python 2.7/3.1
@@ -188,7 +194,7 @@ class GTKTestResult(unittest.TestResult):
'addUnexpectedSuccess', None)
if callable(addUnexpectedSuccess):
addUnexpectedSuccess(test)
self.update_counts()
gobject.idle_add(self.update_counts)
def progress(self, offset, whence):
if whence == PROGRESS_PUSH:
@@ -212,47 +218,22 @@ class GTKTestResult(unittest.TestResult):
self.ok_label.set_text(str(self.testsRun - bad))
self.not_ok_label.set_text(str(bad))
class GIOProtocolTestCase(object):
def __init__(self, stream, result, on_finish):
self.stream = stream
self.schedule_read()
self.hup_id = gobject.io_add_watch(stream, gobject.IO_HUP, self.hup)
self.protocol = TestProtocolServer(result)
self.on_finish = on_finish
def read(self, source, condition, all=False):
#NB: \o/ actually blocks
line = source.readline()
if not line:
self.protocol.lostConnection()
self.on_finish()
return False
self.protocol.lineReceived(line)
# schedule more IO shortly - if we say we're willing to do it
# immediately we starve things.
if not all:
source_id = gobject.timeout_add(1, self.schedule_read)
return False
else:
return True
def schedule_read(self):
self.read_id = gobject.io_add_watch(self.stream, gobject.IO_IN, self.read)
def hup(self, source, condition):
while self.read(source, condition, all=True): pass
self.protocol.lostConnection()
gobject.source_remove(self.read_id)
self.on_finish()
return False
result = GTKTestResult()
test = GIOProtocolTestCase(sys.stdin, result, result.stopTestRun)
gobject.threads_init()
result = StreamToExtendedDecorator(GTKTestResult())
test = ByteStreamToStreamResult(sys.stdin, non_subunit_name='stdout')
# Get setup
while gtk.events_pending():
gtk.main_iteration()
# Start IO
def run_and_finish():
test.run(result)
result.stopTestRun()
t = threading.Thread(target=run_and_finish)
t.daemon = True
result.startTestRun()
t.start()
gtk.main()
if result.wasSuccessful():
if result.decorated.wasSuccessful():
exit_code = 0
else:
exit_code = 1

View File

@@ -18,6 +18,9 @@
import sys
from testtools import StreamToExtendedDecorator
from subunit.filters import run_filter_script
try:
@@ -28,4 +31,6 @@ except ImportError:
raise
run_filter_script(JUnitXmlResult, __doc__)
run_filter_script(
lambda output:StreamToExtendedDecorator(JUnitXmlResult(output)), __doc__,
protocol_version=2)

View File

@@ -16,11 +16,15 @@
"""Display a subunit stream through python's unittest test runner."""
from operator import methodcaller
from optparse import OptionParser
import sys
import unittest
from subunit import DiscardStream, ProtocolTestCase, TestProtocolServer
from testtools import StreamToExtendedDecorator, DecorateTestCaseResult, StreamResultRouter
from subunit import ByteStreamToStreamResult
from subunit.test_results import CatFiles
parser = OptionParser(description=__doc__)
parser.add_option("--no-passthrough", action="store_true",
@@ -29,11 +33,16 @@ parser.add_option("--progress", action="store_true",
help="Use bzrlib's test reporter (requires bzrlib)",
default=False)
(options, args) = parser.parse_args()
if options.no_passthrough:
passthrough_stream = DiscardStream()
else:
passthrough_stream = None
test = ProtocolTestCase(sys.stdin, passthrough=passthrough_stream)
test = ByteStreamToStreamResult(sys.stdin, non_subunit_name='stdout')
def wrap_result(result):
result = StreamToExtendedDecorator(result)
if not options.no_passthrough:
result = StreamResultRouter(result)
result.map(CatFiles(sys.stdout), 'test_id', test_id=None)
return result
test = DecorateTestCaseResult(test, wrap_result,
before_run=methodcaller('startTestRun'),
after_run=methodcaller('stopTestRun'))
if options.progress:
from bzrlib.tests import TextTestRunner
from bzrlib import ui