Add a StreamResult safe for use in threaded/concurrent code.

This commit is contained in:
Robert Collins
2013-03-16 19:45:05 +13:00
parent 04384df526
commit 877058e8dc
8 changed files with 354 additions and 8 deletions

6
NEWS
View File

@@ -27,6 +27,9 @@ Improvements
* New class ``StreamResult`` which defines the API for the new result type.
(Robert Collins)
* New support class ``ConcurrentStreamTestSuite`` for convenient construction
and utilisation of ``StreamToQueue`` objects. (Robert Collins)
* New support class ``CopyStreamResult`` which forwards events onto multiple
``StreamResult`` objects (each of which receives all the events).
(Robert Collins)
@@ -62,6 +65,9 @@ Improvements
``TestResult``) calls. This permits using un-migrated result objects with
new runners / tests. (Robert Collins)
* New support class ``StreamToQueue`` for sending messages to one
``StreamResult`` from multiple threads. (Robert Collins)
* New support class ``TimestampingStreamResult`` which adds a timestamp to
events with no timestamp. (Robert Collins)

View File

@@ -233,13 +233,12 @@ response to events from the ``StreamResult`` API. Useful when outputting
``StreamResult`` events from a ``TestCase`` but the supplied ``TestResult``
does not support the ``status`` and ``file`` methods.
ThreadsafeStreamResult
----------------------
StreamToQueue
-------------
This is a ``StreamResult`` decorator for reporting tests from multiple threads
at once. Each method takes out a lock around the decorated result to prevent
race conditions. The ``startTestRun`` and ``stopTestRun`` methods are not
forwarded to prevent the decorated result having them called multiple times.
at once. Each method submits an event to a supplied Queue object as a simple
dict. See ``ConcurrentStreamTestSuite`` for a convenient way to use this.
TimestampingStreamResult
------------------------
@@ -358,6 +357,20 @@ ConcurrentTestSuite uses the helper to get a number of separate runnable
objects with a run(result), runs them all in threads using the
ThreadsafeForwardingResult to coalesce their activity.
ConcurrentStreamTestSuite
-------------------------
A variant of ConcurrentTestSuite that uses the new StreamResult API instead of
the TestResult API. ConcurrentStreamTestSuite coordinates running some number
of test/suites concurrently, with one StreamToQueue per test/suite.
Each test/suite gets given its own ExtendedToStreamDecorator +
TimestampingStreamResult wrapped StreamToQueue instance, forwarding onto the
StreamResult that ConcurrentStreamTestSuite.run was called with.
ConcurrentStreamTestSuite is a thin shim and it is easy to implement your own
specialised form if that is needed.
FixtureSuite
------------

View File

@@ -6,6 +6,7 @@ __all__ = [
'clone_test_with_new_id',
'CopyStreamResult',
'ConcurrentTestSuite',
'ConcurrentStreamTestSuite',
'DecorateTestCaseResult',
'ErrorHolder',
'ExpectedException',
@@ -34,6 +35,7 @@ __all__ = [
'StreamTagger',
'StreamToDict',
'StreamToExtendedDecorator',
'StreamToQueue',
'TestControl',
'ThreadsafeForwardingResult',
'TimestampingStreamResult',
@@ -86,6 +88,7 @@ else:
StreamTagger,
StreamToDict,
StreamToExtendedDecorator,
StreamToQueue,
Tagger,
TestByTestResult,
TestControl,
@@ -97,6 +100,7 @@ else:
)
from testtools.testsuite import (
ConcurrentTestSuite,
ConcurrentStreamTestSuite,
FixtureSuite,
iterate_tests,
)

View File

@@ -13,6 +13,7 @@ __all__ = [
'StreamTagger',
'StreamToDict',
'StreamToExtendedDecorator',
'StreamToQueue',
'Tagger',
'TestByTestResult',
'TestControl',
@@ -34,6 +35,7 @@ from testtools.testresult.real import (
StreamTagger,
StreamToDict,
StreamToExtendedDecorator,
StreamToQueue,
Tagger,
TestByTestResult,
TestControl,

View File

@@ -13,6 +13,7 @@ __all__ = [
'StreamTagger',
'StreamToDict',
'StreamToExtendedDecorator',
'StreamToQueue',
'Tagger',
'TestControl',
'TestResult',
@@ -26,8 +27,9 @@ from operator import methodcaller
import sys
import unittest
from extras import safe_hasattr, try_import
from extras import safe_hasattr, try_import, try_imports
parse_mime_type = try_import('mimeparse.parse_mime_type')
Queue = try_imports(['Queue.Queue', 'queue.Queue'])
from testtools.compat import all, str_is_unicode, _u, _b
from testtools.content import (
@@ -1325,6 +1327,68 @@ class StreamToExtendedDecorator(StreamResult):
case.run(self.decorated)
class StreamToQueue(StreamResult):
"""A StreamResult which enqueues events as a dict to a queue.Queue.
Events have their route code updated to include the route code
StreamToQueue was constructed with before they are submitted. If the event
route code is None, it is replaced with the StreamToQueue route code,
otherwise it is prefixed with the supplied code + a hyphen.
startTestRun and stopTestRun are forwarded to the queue. Implementors that
dequeue events back into StreamResult calls should take care not to call
startTestRun / stopTestRun on other StreamResult objects multiple times
(e.g. by filtering startTestRun and stopTestRun).
``StreamToQueue`` is typically used by
``ConcurrentStreamTestSuite``, which creates one ``StreamToQueue``
per thread, forwards status events to the the StreamResult that
``ConcurrentStreamTestSuite.run()`` was called with, and uses the
stopTestRun event to trigger calling join() on the each thread.
Unlike ThreadsafeForwardingResult which this supercedes, no buffering takes
place - any event supplied to a StreamToQueue will be inserted into the
queue immediately.
Events are forwarded as a dict with a key ``event`` which is one of
``startTestRun``, ``stopTestRun`` or ``status``. When ``event`` is
``status`` the dict also has keys matching the keyword arguments
of ``StreamResult.status``, otherwise it has one other key ``result`` which
is the result that invoked ``startTestRun``.
"""
def __init__(self, queue, routing_code):
"""Create a StreamToQueue forwarding to target.
:param queue: A ``queue.Queue`` to receive events.
:param routing_code: The routing code to apply to messages.
"""
super(StreamToQueue, self).__init__()
self.queue = queue
self.routing_code = routing_code
def startTestRun(self):
self.queue.put(dict(event='startTestRun', result=self))
def status(self, test_id=None, test_status=None, test_tags=None,
runnable=True, file_name=None, file_bytes=None, eof=False,
mime_type=None, route_code=None, timestamp=None):
self.queue.put(dict(event='status', test_id=test_id,
test_status=test_status, test_tags=test_tags, runnable=runnable,
file_name=file_name, file_bytes=file_bytes, eof=eof,
mime_type=mime_type, route_code=self.route_code(route_code),
timestamp=timestamp))
def stopTestRun(self):
self.queue.put(dict(event='stopTestRun', result=self))
def route_code(self, route_code):
"""Adjust route_code on the way through."""
if route_code is None:
return self.routing_code
return self.routing_code + _u("/") + route_code
class TestResultDecorator(object):
"""General pass-through decorator.

View File

@@ -16,7 +16,9 @@ import threading
from unittest import TestSuite
import warnings
from extras import safe_hasattr
from extras import safe_hasattr, try_imports
Queue = try_imports(['Queue.Queue', 'queue.Queue'])
from testtools import (
CopyStreamResult,
@@ -30,6 +32,7 @@ from testtools import (
StreamTagger,
StreamToDict,
StreamToExtendedDecorator,
StreamToQueue,
Tagger,
TestCase,
TestControl,
@@ -583,6 +586,13 @@ class TestStreamToExtendedDecoratorContract(TestCase, TestStreamResultContract):
return StreamToExtendedDecorator(ExtendedTestResult())
class TestStreamToQueueContract(TestCase, TestStreamResultContract):
def _make_result(self):
queue = Queue()
return StreamToQueue(queue, "foo")
class TestStreamFailFastContract(TestCase, TestStreamResultContract):
def _make_result(self):
@@ -1683,6 +1693,54 @@ class TestMergeTags(TestCase):
expected, _merge_tags(current_tags, changing_tags))
class TestStreamToQueue(TestCase):
def make_result(self):
queue = Queue()
return queue, StreamToQueue(queue, "foo")
def test_status(self):
def check_event(event_dict, route=None, time=None):
self.assertEqual("status", event_dict['event'])
self.assertEqual("test", event_dict['test_id'])
self.assertEqual("fail", event_dict['test_status'])
self.assertEqual(set(["quux"]), event_dict['test_tags'])
self.assertEqual(False, event_dict['runnable'])
self.assertEqual("file", event_dict['file_name'])
self.assertEqual(_b("content"), event_dict['file_bytes'])
self.assertEqual(True, event_dict['eof'])
self.assertEqual("quux", event_dict['mime_type'])
self.assertEqual("test", event_dict['test_id'])
self.assertEqual(route, event_dict['route_code'])
self.assertEqual(time, event_dict['timestamp'])
queue, result = self.make_result()
result.status("test", "fail", test_tags=set(["quux"]), runnable=False,
file_name="file", file_bytes=_b("content"), eof=True,
mime_type="quux", route_code=None, timestamp=None)
self.assertEqual(1, queue.qsize())
a_time = datetime.datetime.now(utc)
result.status("test", "fail", test_tags=set(["quux"]), runnable=False,
file_name="file", file_bytes=_b("content"), eof=True,
mime_type="quux", route_code="bar", timestamp=a_time)
self.assertEqual(2, queue.qsize())
check_event(queue.get(False), route="foo", time=None)
check_event(queue.get(False), route="foo/bar", time=a_time)
def testStartTestRun(self):
queue, result = self.make_result()
result.startTestRun()
self.assertEqual(
{'event':'startTestRun', 'result':result}, queue.get(False))
self.assertTrue(queue.empty())
def testStopTestRun(self):
queue, result = self.make_result()
result.stopTestRun()
self.assertEqual(
{'event':'stopTestRun', 'result':result}, queue.get(False))
self.assertTrue(queue.empty())
class TestExtendedToOriginalResultDecoratorBase(TestCase):
def make_26_result(self):

View File

@@ -4,6 +4,8 @@
__metaclass__ = type
import doctest
from functools import partial
import sys
import unittest
@@ -11,14 +13,17 @@ from extras import try_import
from testtools import (
ConcurrentTestSuite,
ConcurrentStreamTestSuite,
iterate_tests,
PlaceHolder,
TestByTestResult,
TestCase,
)
from testtools.compat import _u
from testtools.compat import _b, _u
from testtools.matchers import DocTestMatches
from testtools.testsuite import FixtureSuite, iterate_tests, sorted_tests
from testtools.tests.helpers import LoggingResult
from testtools.testresult.doubles import StreamResult as LoggingStream
FunctionFixture = try_import('fixtures.FunctionFixture')
@@ -30,6 +35,7 @@ class Sample(TestCase):
def test_method2(self):
pass
class TestConcurrentTestSuiteRun(TestCase):
def test_broken_test(self):
@@ -89,6 +95,114 @@ class TestConcurrentTestSuiteRun(TestCase):
return list(iterate_tests(suite))
class TestConcurrentStreamTestSuiteRun(TestCase):
def test_trivial(self):
result = LoggingStream()
test1 = Sample('test_method1')
test2 = Sample('test_method2')
cases = lambda:[(test1, '0'), (test2, '1')]
suite = ConcurrentStreamTestSuite(cases)
suite.run(result)
def freeze(set_or_none):
if set_or_none is None:
return set_or_none
return frozenset(set_or_none)
# Ignore event order: we're testing the code is all glued together,
# which just means we can pump events through and they get route codes
# added appropriately.
self.assertEqual(set([
('status',
'testtools.tests.test_testsuite.Sample.test_method1',
'inprogress',
None,
True,
None,
None,
False,
None,
'0',
None,
),
('status',
'testtools.tests.test_testsuite.Sample.test_method1',
'success',
frozenset(),
True,
None,
None,
False,
None,
'0',
None,
),
('status',
'testtools.tests.test_testsuite.Sample.test_method2',
'inprogress',
None,
True,
None,
None,
False,
None,
'1',
None,
),
('status',
'testtools.tests.test_testsuite.Sample.test_method2',
'success',
frozenset(),
True,
None,
None,
False,
None,
'1',
None,
),
]), set(event[0:3] + (freeze(event[3]),) + event[4:10] + (None,)
for event in result._events))
def test_broken_runner(self):
# If the object called breaks, the stream is informed about it
# regardless.
class BrokenTest(object):
# broken - no result parameter!
def __call__(self):
pass
def run(self):
pass
result = LoggingStream()
cases = lambda:[(BrokenTest(), '0')]
suite = ConcurrentStreamTestSuite(cases)
suite.run(result)
events = result._events
# Check the traceback loosely.
self.assertThat(events[1][6].decode('utf8'), DocTestMatches("""\
Traceback (most recent call last):
File "...testtools/testsuite.py", line ..., in _run_test
test.run(process_result)
TypeError: run() takes ...1 ...argument...2...given...
""", doctest.ELLIPSIS))
events = [event[0:10] + (None,) for event in events]
events[1] = events[1][:6] + (None,) + events[1][7:]
self.assertEqual([
('status', "broken-runner-'0'", 'inprogress', None, True, None, None, False, None, _u('0'), None),
('status', "broken-runner-'0'", None, None, True, 'traceback', None,
False,
'text/x-traceback; charset="utf8"; language="python"',
'0',
None),
('status', "broken-runner-'0'", None, None, True, 'traceback', b'', True,
'text/x-traceback; charset="utf8"; language="python"', '0', None),
('status', "broken-runner-'0'", 'fail', set(), True, None, None, False, None, _u('0'), None)
], events)
def split_suite(self, suite):
tests = list(enumerate(iterate_tests(suite)))
return [(test, _u(str(pos))) for pos, test in tests]
class TestFixtureSuite(TestCase):
def setUp(self):

View File

@@ -5,6 +5,7 @@
__metaclass__ = type
__all__ = [
'ConcurrentTestSuite',
'ConcurrentStreamTestSuite',
'filter_by_ids',
'iterate_tests',
'sorted_tests',
@@ -112,6 +113,90 @@ class ConcurrentTestSuite(unittest.TestSuite):
queue.put(test)
class ConcurrentStreamTestSuite(object):
"""A TestSuite whose run() parallelises."""
def __init__(self, make_tests):
"""Create a ConcurrentTestSuite to execute tests returned by make_tests.
:param make_tests: A helper function that should return some number
of concurrently executable test suite / test case objects.
make_tests must take no parameters and return an iterable of
tuples. Each tuple must be of the form (case, route_code), where
case is a TestCase-like object with a run(result) method, and
route_code is either None or a unicode string.
"""
super(ConcurrentStreamTestSuite, self).__init__()
self.make_tests = make_tests
def run(self, result):
"""Run the tests concurrently.
This calls out to the provided make_tests helper to determine the
concurrency to use and to assign routing codes to each worker.
ConcurrentTestSuite provides no special mechanism to stop the tests
returned by make_tests, it is up to the made tests to honour the
shouldStop attribute on the result object they are run with, which will
be set if the test run is to be aborted.
The tests are run with an ExtendedToStreamDecorator wrapped around a
StreamToQueue instance. ConcurrentStreamTestSuite dequeues events from
the queue and forwards them to result. Tests can therefore be either
original unittest tests (or compatible tests), or new tests that emit
StreamResult events directly.
:param result: A StreamResult instance. The caller is responsible for
calling startTestRun on this instance prior to invoking suite.run,
and stopTestRun subsequent to the run method returning.
"""
tests = self.make_tests()
try:
threads = {}
queue = Queue()
for test, route_code in tests:
to_queue = testtools.StreamToQueue(queue, route_code)
process_result = testtools.ExtendedToStreamDecorator(
testtools.TimestampingStreamResult(to_queue))
runner_thread = threading.Thread(
target=self._run_test,
args=(test, process_result, route_code))
threads[to_queue] = runner_thread, process_result
runner_thread.start()
while threads:
event_dict = queue.get()
event = event_dict.pop('event')
if event == 'status':
result.status(**event_dict)
elif event == 'stopTestRun':
thread = threads.pop(event_dict['result'])[0]
thread.join()
elif event == 'startTestRun':
pass
else:
raise ValueError('unknown event type %r' % (event,))
except:
for thread, process_result in threads.values():
# Signal to each TestControl in the ExtendedToStreamDecorator
# that the thread should stop running tests and cleanup
process_result.stop()
raise
def _run_test(self, test, process_result, route_code):
process_result.startTestRun()
try:
try:
test.run(process_result)
except Exception as e:
# The run logic itself failed.
case = testtools.ErrorHolder(
"broken-runner-'%s'" % (route_code,),
error=sys.exc_info())
case.run(process_result)
finally:
process_result.stopTestRun()
class FixtureSuite(unittest.TestSuite):
def __init__(self, fixture, tests):