Add an adapter to convert ExtendedTestResult to StreamResult.

This permits using code that uses any Python unittest test code
with a StreamResult.
This commit is contained in:
Robert Collins
2013-03-15 05:27:57 +13:00
parent 4598acee88
commit 0175baf265
6 changed files with 221 additions and 1 deletions

10
NEWS
View File

@@ -41,6 +41,16 @@ Improvements
* New support class ``StreamFailFast`` which calls a ``TestControl`` instance
to abort the test run when a failure is detected. (Robert Collins)
* New support class ``ExtendedToStreamDecorator`` which translates both regular
unittest TestResult API calls and the ExtendedTestResult API which testtools
has supported into the StreamResult API. ExtendedToStreamDecorator also
forwards calls made in the StreamResult API, permitting it to be used
anywhere a StreamResult is used. Key TestResult query methods like
wasSuccessful and shouldStop are synchronised with the StreamResult API
calls, but the detailed statistics like the list of errors are not - a
separate consumer will be created to support that.
(Robert Collins)
* New ``TestCase`` decorator ``DecorateTestCaseResult`` that adapts the
``TestResult`` or ``StreamResult`` a case will be run with, for ensuring that
a particular result object is used even if the runner running the test doesn't

View File

@@ -204,6 +204,15 @@ with the result. e.g.::
>>> # At stopTestRun() any incomplete buffered tests are announced.
>>> result.stopTestRun()
ExtendedToStreamDecorator
-------------------------
This is a hybrid object that combines both the ``Extended`` and ``Stream``
``TestResult`` APIs into one class, but only emits ``StreamResult`` events.
This is useful when a ``StreamResult`` stream is desired, but you cannot
be sure that the tests which will run have been updated to the ``StreamResult``
API.
ThreadsafeStreamResult
----------------------

View File

@@ -10,6 +10,7 @@ __all__ = [
'ErrorHolder',
'ExpectedException',
'ExtendedToOriginalDecorator',
'ExtendedToStreamDecorator',
'FixtureSuite',
'iterate_tests',
'MultipleExceptions',
@@ -74,6 +75,7 @@ else:
from testtools.testresult import (
CopyStreamResult,
ExtendedToOriginalDecorator,
ExtendedToStreamDecorator,
MultiTestResult,
StreamFailFast,
StreamResult,

View File

@@ -5,6 +5,7 @@
__all__ = [
'CopyStreamResult',
'ExtendedToOriginalDecorator',
'ExtendedToStreamDecorator',
'MultiTestResult',
'StreamFailFast',
'StreamResult',
@@ -22,6 +23,7 @@ __all__ = [
from testtools.testresult.real import (
CopyStreamResult,
ExtendedToOriginalDecorator,
ExtendedToStreamDecorator,
MultiTestResult,
StreamFailFast,
StreamResult,

View File

@@ -5,6 +5,7 @@
__metaclass__ = type
__all__ = [
'ExtendedToOriginalDecorator',
'ExtendedToStreamDecorator',
'MultiTestResult',
'StreamFailFast',
'StreamResult',
@@ -25,7 +26,7 @@ import unittest
from extras import safe_hasattr, try_import
parse_mime_type = try_import('mimeparse.parse_mime_type')
from testtools.compat import all, str_is_unicode, _u
from testtools.compat import all, str_is_unicode, _u, _b
from testtools.content import (
Content,
text_content,
@@ -1127,6 +1128,138 @@ class ExtendedToOriginalDecorator(object):
return self.decorated.wasSuccessful()
class ExtendedToStreamDecorator(CopyStreamResult, StreamSummary, TestControl):
"""Permit using old TestResult API code with new StreamResult objects.
This decorates a StreamResult and converts old (Python 2.6 / 2.7 /
Extended) TestResult API calls into StreamResult calls.
It also supports regular StreamResult calls, making it safe to wrap around
any StreamResult.
"""
def __init__(self, decorated):
super(ExtendedToStreamDecorator, self).__init__([decorated])
# Deal with mismatched base class constructors.
TestControl.__init__(self)
self._started = False
def _get_failfast(self):
return len(self.targets) == 2
def _set_failfast(self, value):
if value:
if len(self.targets) == 2:
return
self.targets.append(StreamFailFast(self.stop))
else:
del self.targets[1:]
failfast = property(_get_failfast, _set_failfast)
def startTest(self, test):
if not self._started:
self.startTestRun()
self.status(test.id(), 'inprogress', timestamp=self._now())
self._tags = TagContext(self._tags)
def stopTest(self, test):
self._tags = self._tags.parent
def addError(self, test, err=None, details=None):
self._check_args(err, details)
self._convert(test, err, details, 'fail')
addFailure = addError
def _convert(self, test, err, details, status, reason=None):
if not self._started:
self.startTestRun()
test_id = test.id()
now = self._now()
if err is not None:
if details is None:
details = {}
details['traceback'] = TracebackContent(err, test)
if details is not None:
for name, content in details.items():
mime_type = repr(content.content_type)
for file_bytes in content.iter_bytes():
self.status(file_name=name, file_bytes=file_bytes,
mime_type=mime_type, test_id=test_id, timestamp=now)
self.status(file_name=name, file_bytes=_b(""), eof=True,
mime_type=mime_type, test_id=test_id, timestamp=now)
if reason is not None:
self.status(file_name='reason', file_bytes=reason.encode('utf8'),
eof=True, mime_type="text/plain; charset=utf8",
test_id=test_id, timestamp=now)
self.status(test_id, status, test_tags=self.current_tags,
timestamp=now)
def addExpectedFailure(self, test, err=None, details=None):
self._check_args(err, details)
self._convert(test, err, details, 'xfail')
def addSkip(self, test, reason=None, details=None):
self._convert(test, None, details, 'skip', reason)
def addUnexpectedSuccess(self, test, details=None):
self._convert(test, None, details, 'uxsuccess')
def addSuccess(self, test, details=None):
self._convert(test, None, details, 'success')
def _check_args(self, err, details):
param_count = 0
if err is not None:
param_count += 1
if details is not None:
param_count += 1
if param_count != 1:
raise ValueError("Must pass only one of err '%s' and details '%s"
% (err, details))
def startTestRun(self):
super(ExtendedToStreamDecorator, self).startTestRun()
self._tags = TagContext()
self.shouldStop = False
self.__now = None
self._started = True
def stopTest(self, test):
self._tags = self._tags.parent
@property
def current_tags(self):
"""The currently set tags."""
return self._tags.get_current_tags()
def tags(self, new_tags, gone_tags):
"""Add and remove tags from the test.
:param new_tags: A set of tags to be added to the stream.
:param gone_tags: A set of tags to be removed from the stream.
"""
self._tags.change_tags(new_tags, gone_tags)
def _now(self):
"""Return the current 'test time'.
If the time() method has not been called, this is equivalent to
datetime.now(), otherwise its the last supplied datestamp given to the
time() method.
"""
if self.__now is None:
return datetime.datetime.now(utc)
else:
return self.__now
def time(self, a_datetime):
self.__now = a_datetime
def wasSuccessful(self):
if not self._started:
self.startTestRun()
return super(ExtendedToStreamDecorator, self).wasSuccessful()
class TestResultDecorator(object):
"""General pass-through decorator.

View File

@@ -21,6 +21,7 @@ from extras import safe_hasattr
from testtools import (
CopyStreamResult,
ExtendedToOriginalDecorator,
ExtendedToStreamDecorator,
MultiTestResult,
PlaceHolder,
StreamFailFast,
@@ -232,18 +233,21 @@ class TagsContract(Python27Contract):
def test_no_tags_by_default(self):
# Results initially have no tags.
result = self.makeResult()
result.startTestRun()
self.assertEqual(frozenset(), result.current_tags)
def test_adding_tags(self):
# Tags are added using 'tags' and thus become visible in
# 'current_tags'.
result = self.makeResult()
result.startTestRun()
result.tags(set(['foo']), set())
self.assertEqual(set(['foo']), result.current_tags)
def test_removing_tags(self):
# Tags are removed using 'tags'.
result = self.makeResult()
result.startTestRun()
result.tags(set(['foo']), set())
result.tags(set(), set(['foo']))
self.assertEqual(set(), result.current_tags)
@@ -251,6 +255,7 @@ class TagsContract(Python27Contract):
def test_startTestRun_resets_tags(self):
# startTestRun makes a new test run, and thus clears all the tags.
result = self.makeResult()
result.startTestRun()
result.tags(set(['foo']), set())
result.startTestRun()
self.assertEqual(set(), result.current_tags)
@@ -448,6 +453,12 @@ class TestAdaptedPython27TestResultContract(TestCase, DetailsContract):
return ExtendedToOriginalDecorator(Python27TestResult())
class TestAdaptedStreamResult(TestCase, DetailsContract):
def makeResult(self):
return ExtendedToStreamDecorator(StreamResult())
class TestTestResultDecoratorContract(TestCase, StartTestRunContract):
run_test_with = FullStackRunTest
@@ -530,6 +541,12 @@ class TestDoubleStreamResultContract(TestCase, TestStreamResultContract):
return LoggingStreamResult()
class TestExtendedToStreamDecoratorContract(TestCase, TestStreamResultContract):
def _make_result(self):
return ExtendedToStreamDecorator(StreamResult())
class TestStreamSummaryResultContract(TestCase, TestStreamResultContract):
def _make_result(self):
@@ -702,6 +719,53 @@ class TestStreamToDict(TestCase):
self.assertEqual(["C", None], tests[1]['timestamps'])
class TestExtendedToStreamDecorator(TestCase):
def test_explicit_time(self):
log = LoggingStreamResult()
result = ExtendedToStreamDecorator(log)
result.startTestRun()
now = datetime.datetime.now(utc)
result.time(now)
result.startTest(self)
result.addSuccess(self)
result.stopTest(self)
result.stopTestRun()
self.assertEqual([
('startTestRun',),
('status',
'testtools.tests.test_testresult.TestExtendedToStreamDecorator.test_explicit_time',
'inprogress',
None,
True,
None,
None,
False,
None,
None,
now),
('status',
'testtools.tests.test_testresult.TestExtendedToStreamDecorator.test_explicit_time',
'success',
set(),
True,
None,
None,
False,
None,
None,
now),
('stopTestRun',)], log._events)
def test_wasSuccessful_after_stopTestRun(self):
log = LoggingStreamResult()
result = ExtendedToStreamDecorator(log)
result.startTestRun()
result.status(test_id='foo', test_status='fail')
result.stopTestRun()
self.assertEqual(False, result.wasSuccessful())
class TestStreamFailFast(TestCase):
def test_inprogress(self):