1569 lines
55 KiB
Python
1569 lines
55 KiB
Python
# Copyright (c) 2008-2012 testtools developers. See LICENSE for details.
|
|
|
|
"""Test results and related things."""
|
|
|
|
__metaclass__ = type
|
|
__all__ = [
|
|
'ExtendedToOriginalDecorator',
|
|
'ExtendedToStreamDecorator',
|
|
'MultiTestResult',
|
|
'StreamFailFast',
|
|
'StreamResult',
|
|
'StreamSummary',
|
|
'StreamTagger',
|
|
'StreamToDict',
|
|
'StreamToExtendedDecorator',
|
|
'Tagger',
|
|
'TestControl',
|
|
'TestResult',
|
|
'TestResultDecorator',
|
|
'ThreadsafeForwardingResult',
|
|
]
|
|
|
|
import datetime
|
|
from operator import methodcaller
|
|
import sys
|
|
import unittest
|
|
|
|
from extras import safe_hasattr, try_import
|
|
parse_mime_type = try_import('mimeparse.parse_mime_type')
|
|
|
|
from testtools.compat import all, str_is_unicode, _u, _b
|
|
from testtools.content import (
|
|
Content,
|
|
text_content,
|
|
TracebackContent,
|
|
)
|
|
from testtools.content_type import ContentType
|
|
from testtools.tags import TagContext
|
|
# circular import
|
|
# from testtools.testcase import PlaceHolder
|
|
PlaceHolder = None
|
|
|
|
# From http://docs.python.org/library/datetime.html
|
|
_ZERO = datetime.timedelta(0)
|
|
|
|
# A UTC class.
|
|
|
|
class UTC(datetime.tzinfo):
|
|
"""UTC"""
|
|
|
|
def utcoffset(self, dt):
|
|
return _ZERO
|
|
|
|
def tzname(self, dt):
|
|
return "UTC"
|
|
|
|
def dst(self, dt):
|
|
return _ZERO
|
|
|
|
utc = UTC()
|
|
|
|
|
|
class TestResult(unittest.TestResult):
|
|
"""Subclass of unittest.TestResult extending the protocol for flexability.
|
|
|
|
This test result supports an experimental protocol for providing additional
|
|
data to in test outcomes. All the outcome methods take an optional dict
|
|
'details'. If supplied any other detail parameters like 'err' or 'reason'
|
|
should not be provided. The details dict is a mapping from names to
|
|
MIME content objects (see testtools.content). This permits attaching
|
|
tracebacks, log files, or even large objects like databases that were
|
|
part of the test fixture. Until this API is accepted into upstream
|
|
Python it is considered experimental: it may be replaced at any point
|
|
by a newer version more in line with upstream Python. Compatibility would
|
|
be aimed for in this case, but may not be possible.
|
|
|
|
:ivar skip_reasons: A dict of skip-reasons -> list of tests. See addSkip.
|
|
"""
|
|
|
|
def __init__(self, failfast=False):
|
|
# startTestRun resets all attributes, and older clients don't know to
|
|
# call startTestRun, so it is called once here.
|
|
# Because subclasses may reasonably not expect this, we call the
|
|
# specific version we want to run.
|
|
self.failfast = failfast
|
|
TestResult.startTestRun(self)
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
"""Called when a test has failed in an expected manner.
|
|
|
|
Like with addSuccess and addError, testStopped should still be called.
|
|
|
|
:param test: The test that has been skipped.
|
|
:param err: The exc_info of the error that was raised.
|
|
:return: None
|
|
"""
|
|
# This is the python 2.7 implementation
|
|
self.expectedFailures.append(
|
|
(test, self._err_details_to_string(test, err, details)))
|
|
|
|
def addError(self, test, err=None, details=None):
|
|
"""Called when an error has occurred. 'err' is a tuple of values as
|
|
returned by sys.exc_info().
|
|
|
|
:param details: Alternative way to supply details about the outcome.
|
|
see the class docstring for more information.
|
|
"""
|
|
self.errors.append((test,
|
|
self._err_details_to_string(test, err, details)))
|
|
if self.failfast:
|
|
self.stop()
|
|
|
|
def addFailure(self, test, err=None, details=None):
|
|
"""Called when an error has occurred. 'err' is a tuple of values as
|
|
returned by sys.exc_info().
|
|
|
|
:param details: Alternative way to supply details about the outcome.
|
|
see the class docstring for more information.
|
|
"""
|
|
self.failures.append((test,
|
|
self._err_details_to_string(test, err, details)))
|
|
if self.failfast:
|
|
self.stop()
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
"""Called when a test has been skipped rather than running.
|
|
|
|
Like with addSuccess and addError, testStopped should still be called.
|
|
|
|
This must be called by the TestCase. 'addError' and 'addFailure' will
|
|
not call addSkip, since they have no assumptions about the kind of
|
|
errors that a test can raise.
|
|
|
|
:param test: The test that has been skipped.
|
|
:param reason: The reason for the test being skipped. For instance,
|
|
u"pyGL is not available".
|
|
:param details: Alternative way to supply details about the outcome.
|
|
see the class docstring for more information.
|
|
:return: None
|
|
"""
|
|
if reason is None:
|
|
reason = details.get('reason')
|
|
if reason is None:
|
|
reason = 'No reason given'
|
|
else:
|
|
reason = reason.as_text()
|
|
skip_list = self.skip_reasons.setdefault(reason, [])
|
|
skip_list.append(test)
|
|
|
|
def addSuccess(self, test, details=None):
|
|
"""Called when a test succeeded."""
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
"""Called when a test was expected to fail, but succeed."""
|
|
self.unexpectedSuccesses.append(test)
|
|
if self.failfast:
|
|
self.stop()
|
|
|
|
def wasSuccessful(self):
|
|
"""Has this result been successful so far?
|
|
|
|
If there have been any errors, failures or unexpected successes,
|
|
return False. Otherwise, return True.
|
|
|
|
Note: This differs from standard unittest in that we consider
|
|
unexpected successes to be equivalent to failures, rather than
|
|
successes.
|
|
"""
|
|
return not (self.errors or self.failures or self.unexpectedSuccesses)
|
|
|
|
def _err_details_to_string(self, test, err=None, details=None):
|
|
"""Convert an error in exc_info form or a contents dict to a string."""
|
|
if err is not None:
|
|
return TracebackContent(err, test).as_text()
|
|
return _details_to_str(details, special='traceback')
|
|
|
|
def _exc_info_to_unicode(self, err, test):
|
|
# Deprecated. Only present because subunit upcalls to it. See
|
|
# <https://bugs.launchpad.net/testtools/+bug/929063>.
|
|
return TracebackContent(err, test).as_text()
|
|
|
|
def _now(self):
|
|
"""Return the current 'test time'.
|
|
|
|
If the time() method has not been called, this is equivalent to
|
|
datetime.now(), otherwise its the last supplied datestamp given to the
|
|
time() method.
|
|
"""
|
|
if self.__now is None:
|
|
return datetime.datetime.now(utc)
|
|
else:
|
|
return self.__now
|
|
|
|
def startTestRun(self):
|
|
"""Called before a test run starts.
|
|
|
|
New in Python 2.7. The testtools version resets the result to a
|
|
pristine condition ready for use in another test run. Note that this
|
|
is different from Python 2.7's startTestRun, which does nothing.
|
|
"""
|
|
# failfast is reset by the super __init__, so stash it.
|
|
failfast = self.failfast
|
|
super(TestResult, self).__init__()
|
|
self.skip_reasons = {}
|
|
self.__now = None
|
|
self._tags = TagContext()
|
|
# -- Start: As per python 2.7 --
|
|
self.expectedFailures = []
|
|
self.unexpectedSuccesses = []
|
|
self.failfast = failfast
|
|
# -- End: As per python 2.7 --
|
|
|
|
def stopTestRun(self):
|
|
"""Called after a test run completes
|
|
|
|
New in python 2.7
|
|
"""
|
|
|
|
def startTest(self, test):
|
|
super(TestResult, self).startTest(test)
|
|
self._tags = TagContext(self._tags)
|
|
|
|
def stopTest(self, test):
|
|
self._tags = self._tags.parent
|
|
super(TestResult, self).stopTest(test)
|
|
|
|
@property
|
|
def current_tags(self):
|
|
"""The currently set tags."""
|
|
return self._tags.get_current_tags()
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
"""Add and remove tags from the test.
|
|
|
|
:param new_tags: A set of tags to be added to the stream.
|
|
:param gone_tags: A set of tags to be removed from the stream.
|
|
"""
|
|
self._tags.change_tags(new_tags, gone_tags)
|
|
|
|
def time(self, a_datetime):
|
|
"""Provide a timestamp to represent the current time.
|
|
|
|
This is useful when test activity is time delayed, or happening
|
|
concurrently and getting the system time between API calls will not
|
|
accurately represent the duration of tests (or the whole run).
|
|
|
|
Calling time() sets the datetime used by the TestResult object.
|
|
Time is permitted to go backwards when using this call.
|
|
|
|
:param a_datetime: A datetime.datetime object with TZ information or
|
|
None to reset the TestResult to gathering time from the system.
|
|
"""
|
|
self.__now = a_datetime
|
|
|
|
def done(self):
|
|
"""Called when the test runner is done.
|
|
|
|
deprecated in favour of stopTestRun.
|
|
"""
|
|
|
|
|
|
class StreamResult(object):
|
|
"""A test result for reporting the activity of a test run.
|
|
|
|
Typical use
|
|
-----------
|
|
|
|
>>> result = StreamResult()
|
|
>>> result.startTestRun()
|
|
>>> try:
|
|
... case.run(result)
|
|
... finally:
|
|
... result.stopTestRun()
|
|
|
|
The case object will be either a TestCase or a TestSuite, and
|
|
generally make a sequence of calls like::
|
|
|
|
>>> result.status(self.id(), 'inprogress')
|
|
>>> result.status(self.id(), 'success')
|
|
|
|
General concepts
|
|
----------------
|
|
|
|
StreamResult is built to process events that are emitted by tests during a
|
|
test run or test enumeration. The test run may be running concurrently, and
|
|
even be spread out across multiple machines.
|
|
|
|
All events are timestamped to prevent network buffering or scheduling
|
|
latency causing false timing reports. Timestamps are datetime objects in
|
|
the UTC timezone.
|
|
|
|
A route_code is a unicode string that identifies where a particular test
|
|
run. This is optional in the API but very useful when multiplexing multiple
|
|
streams together as it allows identification of interactions between tests
|
|
that were run on the same hardware or in the same test process. Generally
|
|
actual tests never need to bother with this - it is added and processed
|
|
by StreamResult's that do multiplexing / run analysis. route_codes are
|
|
also used to route stdin back to pdb instances.
|
|
|
|
The StreamResult base class does no accounting or processing, rather it
|
|
just provides an empty implementation of every method, suitable for use
|
|
as a base class regardless of intent.
|
|
"""
|
|
|
|
def startTestRun(self):
|
|
"""Start a test run.
|
|
|
|
This will prepare the test result to process results (which might imply
|
|
connecting to a database or remote machine).
|
|
"""
|
|
|
|
def stopTestRun(self):
|
|
"""Stop a test run.
|
|
|
|
This informs the result that no more test updates will be received. At
|
|
this point any test ids that have started and not completed can be
|
|
considered failed-or-hung.
|
|
"""
|
|
|
|
def status(self, test_id=None, test_status=None, test_tags=None,
|
|
runnable=True, file_name=None, file_bytes=None, eof=False,
|
|
mime_type=None, route_code=None, timestamp=None):
|
|
"""Inform the result about a test status.
|
|
|
|
:param test_id: The test whose status is being reported. None to
|
|
report status about the test run as a whole.
|
|
:param test_status: The status for the test. There are two sorts of
|
|
status - interim and final status events. As many interim events
|
|
can be generated as desired, but only one final event. After a
|
|
final status event any further file or status events from the
|
|
same test_id+route_code may be discarded or associated with a new
|
|
test by the StreamResult. (But no exception will be thrown).
|
|
Interim states:
|
|
* None - no particular status is being reported, or status being
|
|
reported is not associated with a test (e.g. when reporting on
|
|
stdout / stderr chatter).
|
|
* inprogress - the test is currently running. Emitted by tests when
|
|
they start running and at any intermediary point they might
|
|
choose to indicate their continual operation.
|
|
Final states:
|
|
* exists - the test exists. This is used when a test is not being
|
|
executed. Typically this is when querying what tests could be run
|
|
in a test run (which is useful for selecting tests to run).
|
|
* xfail - the test failed but that was expected. This is purely
|
|
informative - the test is not considered to be a failure.
|
|
* uxsuccess - the test passed but was expected to fail. The test
|
|
will be considered a failure.
|
|
* success - the test has finished without error.
|
|
* fail - the test failed (or errored). The test will be considered
|
|
a failure.
|
|
* skip - the test was selected to run but chose to be skipped. E.g.
|
|
a test dependency was missing. This is purely informative - the
|
|
test is not considered to be a failure.
|
|
:param test_tags: Optional set of tags to apply to the test. Tags
|
|
have no intrinsic meaning - that is up to the test author.
|
|
:param runnable: Allows status reports to mark that they are for
|
|
tests which are not able to be explicitly run. For instance,
|
|
subtests will report themselves as non-runnable.
|
|
:param file_name: The name for the file_bytes. Any unicode string may
|
|
be used. While there is no semantic value attached to the name
|
|
of any attachment, the names 'stdout' and 'stderr' and 'traceback'
|
|
are recommended for use only for output sent to stdout, stderr and
|
|
tracebacks of exceptions. When file_name is supplied, file_bytes
|
|
must be a bytes instance.
|
|
:param file_bytes: A bytes object containing content for the named
|
|
file. This can just be a single chunk of the file - emitting
|
|
another file event with more later. Must be None unleses a
|
|
file_name is supplied.
|
|
:param eof: True if this chunk is the last chunk of the file, any
|
|
additional chunks with the same name should be treated as an error
|
|
and discarded. Ignored unless file_name has been supplied.
|
|
:param mime_type: An optional MIME type for the file. stdout and
|
|
stderr will generally be "text/plain; charset=utf8". If None,
|
|
defaults to application/octet-stream. Ignores unless file_name
|
|
has been supplied.
|
|
"""
|
|
|
|
|
|
def domap(*args, **kwargs):
|
|
return list(map(*args, **kwargs))
|
|
|
|
|
|
class CopyStreamResult(StreamResult):
|
|
"""Copies all event it receives to multiple results.
|
|
|
|
This provides an easy facility for combining multiple StreamResults.
|
|
|
|
For TestResult the equivalent class was ``MultiTestResult``.
|
|
"""
|
|
|
|
def __init__(self, targets):
|
|
super(CopyStreamResult, self).__init__()
|
|
self.targets = targets
|
|
|
|
def startTestRun(self):
|
|
super(CopyStreamResult, self).startTestRun()
|
|
domap(methodcaller('startTestRun'), self.targets)
|
|
|
|
def stopTestRun(self):
|
|
super(CopyStreamResult, self).stopTestRun()
|
|
domap(methodcaller('stopTestRun'), self.targets)
|
|
|
|
def status(self, *args, **kwargs):
|
|
super(CopyStreamResult, self).status(*args, **kwargs)
|
|
domap(methodcaller('status', *args, **kwargs), self.targets)
|
|
|
|
|
|
class StreamFailFast(StreamResult):
|
|
"""Call the supplied callback if an error is seen in a stream.
|
|
|
|
An example callback::
|
|
|
|
def do_something():
|
|
pass
|
|
"""
|
|
|
|
def __init__(self, on_error):
|
|
self.on_error = on_error
|
|
|
|
def status(self, test_id=None, test_status=None, test_tags=None,
|
|
runnable=True, file_name=None, file_bytes=None, eof=False,
|
|
mime_type=None, route_code=None, timestamp=None):
|
|
if test_status in ('uxsuccess', 'fail'):
|
|
self.on_error()
|
|
|
|
|
|
class StreamTagger(CopyStreamResult):
|
|
"""Adds or discards tags from StreamResult events."""
|
|
|
|
def __init__(self, targets, add=None, discard=None):
|
|
"""Create a StreamTagger.
|
|
|
|
:param targets: A list of targets to forward events onto.
|
|
:param add: Either None or an iterable of tags to add to each event.
|
|
:param discard: Either None or an iterable of tags to discard from each
|
|
event.
|
|
"""
|
|
super(StreamTagger, self).__init__(targets)
|
|
self.add = frozenset(add or ())
|
|
self.discard = frozenset(discard or ())
|
|
|
|
def status(self, *args, **kwargs):
|
|
test_tags = kwargs.get('test_tags') or set()
|
|
test_tags.update(self.add)
|
|
test_tags.difference_update(self.discard)
|
|
kwargs['test_tags'] = test_tags or None
|
|
super(StreamTagger, self).status(*args, **kwargs)
|
|
|
|
|
|
class StreamToDict(StreamResult):
|
|
"""A specialised StreamResult that emits a callback as tests complete.
|
|
|
|
Top level file attachments are simply discarded. Hung tests are detected
|
|
by stopTestRun and notified there and then.
|
|
|
|
The callback is passed a dict with the following keys:
|
|
* id: the test id.
|
|
* tags: The tags for the test. A set of unicode strings.
|
|
* details: A dict of file attachments - ``testtools.content.Content``
|
|
objects.
|
|
* status: One of the StreamResult status codes (including inprogress) or
|
|
'unknown' (used if only file events for a test were received...)
|
|
* timestamps: A pair of timestamps - the first one received with this
|
|
test id, and the one in the event that triggered the notification.
|
|
Hung tests have a None for the second end event. Timestamps are not
|
|
compared - their ordering is purely order received in the stream.
|
|
|
|
Only the most recent tags observed in the stream are reported.
|
|
"""
|
|
|
|
def __init__(self, on_test):
|
|
"""Create a StreamToDict calling on_test on test completions.
|
|
|
|
:param on_test: A callback that accepts one parameter - a dict
|
|
describing a test.
|
|
"""
|
|
super(StreamToDict, self).__init__()
|
|
self.on_test = on_test
|
|
if parse_mime_type is None:
|
|
raise ImportError("mimeparse module missing.")
|
|
|
|
def startTestRun(self):
|
|
super(StreamToDict, self).startTestRun()
|
|
self._inprogress = {}
|
|
|
|
def status(self, test_id=None, test_status=None, test_tags=None,
|
|
runnable=True, file_name=None, file_bytes=None, eof=False,
|
|
mime_type=None, route_code=None, timestamp=None):
|
|
super(StreamToDict, self).status(test_id, test_status,
|
|
test_tags=test_tags, runnable=runnable, file_name=file_name,
|
|
file_bytes=file_bytes, eof=eof, mime_type=mime_type,
|
|
route_code=route_code, timestamp=timestamp)
|
|
key = self._ensure_key(test_id, route_code, timestamp)
|
|
# update fields
|
|
if not key:
|
|
return
|
|
if test_status is not None:
|
|
self._inprogress[key]['status'] = test_status
|
|
self._inprogress[key]['timestamps'][1] = timestamp
|
|
case = self._inprogress[key]
|
|
if file_name is not None:
|
|
if file_name not in case['details']:
|
|
if mime_type is None:
|
|
mime_type = 'application/octet-stream'
|
|
primary, sub, parameters = parse_mime_type(mime_type)
|
|
if 'charset' in parameters:
|
|
if ',' in parameters['charset']:
|
|
# testtools was emitting a bad encoding, workaround it,
|
|
# Though this does lose data - probably want to drop
|
|
# this in a few releases.
|
|
parameters['charset'] = parameters['charset'][
|
|
:parameters['charset'].find(',')]
|
|
content_type = ContentType(primary, sub, parameters)
|
|
content_bytes = []
|
|
case['details'][file_name] = Content(
|
|
content_type, lambda:content_bytes)
|
|
case['details'][file_name].iter_bytes().append(file_bytes)
|
|
if test_tags is not None:
|
|
self._inprogress[key]['tags'] = test_tags
|
|
# notify completed tests.
|
|
if test_status not in (None, 'inprogress'):
|
|
self.on_test(self._inprogress.pop(key))
|
|
|
|
def stopTestRun(self):
|
|
super(StreamToDict, self).stopTestRun()
|
|
while self._inprogress:
|
|
case = self._inprogress.popitem()[1]
|
|
case['timestamps'][1] = None
|
|
self.on_test(case)
|
|
|
|
def _ensure_key(self, test_id, route_code, timestamp):
|
|
if test_id is None:
|
|
return
|
|
key = (test_id, route_code)
|
|
if key not in self._inprogress:
|
|
self._inprogress[key] = {
|
|
'id': test_id,
|
|
'tags': set(),
|
|
'details': {},
|
|
'status': 'unknown',
|
|
'timestamps': [timestamp, None]}
|
|
return key
|
|
|
|
|
|
_status_map = {
|
|
'inprogress': 'addFailure',
|
|
'unknown': 'addFailure',
|
|
'success': 'addSuccess',
|
|
'skip': 'addSkip',
|
|
'fail': 'addFailure',
|
|
'xfail': 'addExpectedFailure',
|
|
'uxsuccess': 'addUnexpectedSuccess',
|
|
}
|
|
|
|
|
|
def test_dict_to_case(test_dict):
|
|
"""Convert a test dict into a TestCase object.
|
|
|
|
:param test_dict: A test dict as generated by StreamToDict.
|
|
:return: A PlaceHolder test object.
|
|
"""
|
|
# Circular import.
|
|
global PlaceHolder
|
|
if PlaceHolder is None:
|
|
from testtools.testcase import PlaceHolder
|
|
outcome = _status_map[test_dict['status']]
|
|
return PlaceHolder(test_dict['id'], outcome=outcome,
|
|
details=test_dict['details'], tags=test_dict['tags'],
|
|
timestamps=test_dict['timestamps'])
|
|
|
|
|
|
class StreamSummary(StreamToDict):
|
|
"""A specialised StreamResult that summarises a stream.
|
|
|
|
The summary uses the same representation as the original
|
|
unittest.TestResult contract, allowing it to be consumed by any test
|
|
runner.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super(StreamSummary, self).__init__(self._gather_test)
|
|
self._handle_status = {
|
|
'success': self._success,
|
|
'skip': self._skip,
|
|
'exists': self._exists,
|
|
'fail': self._fail,
|
|
'xfail': self._xfail,
|
|
'uxsuccess': self._uxsuccess,
|
|
'unknown': self._incomplete,
|
|
'inprogress': self._incomplete,
|
|
}
|
|
|
|
def startTestRun(self):
|
|
super(StreamSummary, self).startTestRun()
|
|
self.failures = []
|
|
self.errors = []
|
|
self.testsRun = 0
|
|
self.skipped = []
|
|
self.expectedFailures = []
|
|
self.unexpectedSuccesses = []
|
|
|
|
def wasSuccessful(self):
|
|
"""Return False if any failure has occured.
|
|
|
|
Note that incomplete tests can only be detected when stopTestRun is
|
|
called, so that should be called before checking wasSuccessful.
|
|
"""
|
|
return (not self.failures and not self.errors)
|
|
|
|
def _gather_test(self, test_dict):
|
|
self.testsRun += 1
|
|
if test_dict['status'] == 'exists':
|
|
return
|
|
case = test_dict_to_case(test_dict)
|
|
self._handle_status[test_dict['status']](case)
|
|
|
|
def _incomplete(self, case):
|
|
self.errors.append((case, "Test did not complete"))
|
|
|
|
def _success(self, case):
|
|
pass
|
|
|
|
def _skip(self, case):
|
|
if 'reason' not in case._details:
|
|
reason = "Unknown"
|
|
else:
|
|
reason = case._details['reason'].as_text()
|
|
self.skipped.append((case, reason))
|
|
|
|
def _exists(self, case):
|
|
pass
|
|
|
|
def _fail(self, case):
|
|
message = _details_to_str(case._details, special="traceback")
|
|
self.errors.append((case, message))
|
|
|
|
def _xfail(self, case):
|
|
message = _details_to_str(case._details, special="traceback")
|
|
self.expectedFailures.append((case, message))
|
|
|
|
def _uxsuccess(self, case):
|
|
case._outcome = 'addUnexpectedSuccess'
|
|
self.unexpectedSuccesses.append(case)
|
|
|
|
|
|
class TestControl(object):
|
|
"""Controls a running test run, allowing it to be interrupted.
|
|
|
|
:attribute shouldStop: If True, tests should not run and should instead
|
|
return immediately. Similarly a TestSuite should check this between
|
|
each test and if set stop dispatching any new tests and return.
|
|
"""
|
|
|
|
def __init__(self):
|
|
super(TestControl, self).__init__()
|
|
self.shouldStop = False
|
|
|
|
def stop(self):
|
|
"""Indicate that tests should stop running."""
|
|
self.shouldStop = True
|
|
|
|
|
|
class MultiTestResult(TestResult):
|
|
"""A test result that dispatches to many test results."""
|
|
|
|
def __init__(self, *results):
|
|
# Setup _results first, as the base class __init__ assigns to failfast.
|
|
self._results = list(map(ExtendedToOriginalDecorator, results))
|
|
super(MultiTestResult, self).__init__()
|
|
|
|
def __repr__(self):
|
|
return '<%s (%s)>' % (
|
|
self.__class__.__name__, ', '.join(map(repr, self._results)))
|
|
|
|
def _dispatch(self, message, *args, **kwargs):
|
|
return tuple(
|
|
getattr(result, message)(*args, **kwargs)
|
|
for result in self._results)
|
|
|
|
def _get_failfast(self):
|
|
return getattr(self._results[0], 'failfast', False)
|
|
def _set_failfast(self, value):
|
|
self._dispatch('__setattr__', 'failfast', value)
|
|
failfast = property(_get_failfast, _set_failfast)
|
|
|
|
def _get_shouldStop(self):
|
|
return any(self._dispatch('__getattr__', 'shouldStop'))
|
|
def _set_shouldStop(self, value):
|
|
# Called because we subclass TestResult. Probably should not do that.
|
|
pass
|
|
shouldStop = property(_get_shouldStop, _set_shouldStop)
|
|
|
|
def startTest(self, test):
|
|
super(MultiTestResult, self).startTest(test)
|
|
return self._dispatch('startTest', test)
|
|
|
|
def stop(self):
|
|
return self._dispatch('stop')
|
|
|
|
def stopTest(self, test):
|
|
super(MultiTestResult, self).stopTest(test)
|
|
return self._dispatch('stopTest', test)
|
|
|
|
def addError(self, test, error=None, details=None):
|
|
return self._dispatch('addError', test, error, details=details)
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
return self._dispatch(
|
|
'addExpectedFailure', test, err, details=details)
|
|
|
|
def addFailure(self, test, err=None, details=None):
|
|
return self._dispatch('addFailure', test, err, details=details)
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
return self._dispatch('addSkip', test, reason, details=details)
|
|
|
|
def addSuccess(self, test, details=None):
|
|
return self._dispatch('addSuccess', test, details=details)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
return self._dispatch('addUnexpectedSuccess', test, details=details)
|
|
|
|
def startTestRun(self):
|
|
super(MultiTestResult, self).startTestRun()
|
|
return self._dispatch('startTestRun')
|
|
|
|
def stopTestRun(self):
|
|
return self._dispatch('stopTestRun')
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
super(MultiTestResult, self).tags(new_tags, gone_tags)
|
|
return self._dispatch('tags', new_tags, gone_tags)
|
|
|
|
def time(self, a_datetime):
|
|
return self._dispatch('time', a_datetime)
|
|
|
|
def done(self):
|
|
return self._dispatch('done')
|
|
|
|
def wasSuccessful(self):
|
|
"""Was this result successful?
|
|
|
|
Only returns True if every constituent result was successful.
|
|
"""
|
|
return all(self._dispatch('wasSuccessful'))
|
|
|
|
|
|
class TextTestResult(TestResult):
|
|
"""A TestResult which outputs activity to a text stream."""
|
|
|
|
def __init__(self, stream, failfast=False):
|
|
"""Construct a TextTestResult writing to stream."""
|
|
super(TextTestResult, self).__init__(failfast=failfast)
|
|
self.stream = stream
|
|
self.sep1 = '=' * 70 + '\n'
|
|
self.sep2 = '-' * 70 + '\n'
|
|
|
|
def _delta_to_float(self, a_timedelta):
|
|
return (a_timedelta.days * 86400.0 + a_timedelta.seconds +
|
|
a_timedelta.microseconds / 1000000.0)
|
|
|
|
def _show_list(self, label, error_list):
|
|
for test, output in error_list:
|
|
self.stream.write(self.sep1)
|
|
self.stream.write("%s: %s\n" % (label, test.id()))
|
|
self.stream.write(self.sep2)
|
|
self.stream.write(output)
|
|
|
|
def startTestRun(self):
|
|
super(TextTestResult, self).startTestRun()
|
|
self.__start = self._now()
|
|
self.stream.write("Tests running...\n")
|
|
|
|
def stopTestRun(self):
|
|
if self.testsRun != 1:
|
|
plural = 's'
|
|
else:
|
|
plural = ''
|
|
stop = self._now()
|
|
self._show_list('ERROR', self.errors)
|
|
self._show_list('FAIL', self.failures)
|
|
for test in self.unexpectedSuccesses:
|
|
self.stream.write(
|
|
"%sUNEXPECTED SUCCESS: %s\n%s" % (
|
|
self.sep1, test.id(), self.sep2))
|
|
self.stream.write("\nRan %d test%s in %.3fs\n" %
|
|
(self.testsRun, plural,
|
|
self._delta_to_float(stop - self.__start)))
|
|
if self.wasSuccessful():
|
|
self.stream.write("OK\n")
|
|
else:
|
|
self.stream.write("FAILED (")
|
|
details = []
|
|
details.append("failures=%d" % (
|
|
sum(map(len, (
|
|
self.failures, self.errors, self.unexpectedSuccesses)))))
|
|
self.stream.write(", ".join(details))
|
|
self.stream.write(")\n")
|
|
super(TextTestResult, self).stopTestRun()
|
|
|
|
|
|
class ThreadsafeForwardingResult(TestResult):
|
|
"""A TestResult which ensures the target does not receive mixed up calls.
|
|
|
|
Multiple ``ThreadsafeForwardingResults`` can forward to the same target
|
|
result, and that target result will only ever receive the complete set of
|
|
events for one test at a time.
|
|
|
|
This is enforced using a semaphore, which further guarantees that tests
|
|
will be sent atomically even if the ``ThreadsafeForwardingResults`` are in
|
|
different threads.
|
|
|
|
``ThreadsafeForwardingResult`` is typically used by
|
|
``ConcurrentTestSuite``, which creates one ``ThreadsafeForwardingResult``
|
|
per thread, each of which wraps of the TestResult that
|
|
``ConcurrentTestSuite.run()`` is called with.
|
|
|
|
target.startTestRun() and target.stopTestRun() are called once for each
|
|
ThreadsafeForwardingResult that forwards to the same target. If the target
|
|
takes special action on these events, it should take care to accommodate
|
|
this.
|
|
|
|
time() and tags() calls are batched to be adjacent to the test result and
|
|
in the case of tags() are coerced into test-local scope, avoiding the
|
|
opportunity for bugs around global state in the target.
|
|
"""
|
|
|
|
def __init__(self, target, semaphore):
|
|
"""Create a ThreadsafeForwardingResult forwarding to target.
|
|
|
|
:param target: A ``TestResult``.
|
|
:param semaphore: A ``threading.Semaphore`` with limit 1.
|
|
"""
|
|
TestResult.__init__(self)
|
|
self.result = ExtendedToOriginalDecorator(target)
|
|
self.semaphore = semaphore
|
|
self._test_start = None
|
|
self._global_tags = set(), set()
|
|
self._test_tags = set(), set()
|
|
|
|
def __repr__(self):
|
|
return '<%s %r>' % (self.__class__.__name__, self.result)
|
|
|
|
def _any_tags(self, tags):
|
|
return bool(tags[0] or tags[1])
|
|
|
|
def _add_result_with_semaphore(self, method, test, *args, **kwargs):
|
|
now = self._now()
|
|
self.semaphore.acquire()
|
|
try:
|
|
self.result.time(self._test_start)
|
|
self.result.startTest(test)
|
|
self.result.time(now)
|
|
if self._any_tags(self._global_tags):
|
|
self.result.tags(*self._global_tags)
|
|
if self._any_tags(self._test_tags):
|
|
self.result.tags(*self._test_tags)
|
|
self._test_tags = set(), set()
|
|
try:
|
|
method(test, *args, **kwargs)
|
|
finally:
|
|
self.result.stopTest(test)
|
|
finally:
|
|
self.semaphore.release()
|
|
self._test_start = None
|
|
|
|
def addError(self, test, err=None, details=None):
|
|
self._add_result_with_semaphore(self.result.addError,
|
|
test, err, details=details)
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
self._add_result_with_semaphore(self.result.addExpectedFailure,
|
|
test, err, details=details)
|
|
|
|
def addFailure(self, test, err=None, details=None):
|
|
self._add_result_with_semaphore(self.result.addFailure,
|
|
test, err, details=details)
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
self._add_result_with_semaphore(self.result.addSkip,
|
|
test, reason, details=details)
|
|
|
|
def addSuccess(self, test, details=None):
|
|
self._add_result_with_semaphore(self.result.addSuccess,
|
|
test, details=details)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
self._add_result_with_semaphore(self.result.addUnexpectedSuccess,
|
|
test, details=details)
|
|
|
|
def progress(self, offset, whence):
|
|
pass
|
|
|
|
def startTestRun(self):
|
|
super(ThreadsafeForwardingResult, self).startTestRun()
|
|
self.semaphore.acquire()
|
|
try:
|
|
self.result.startTestRun()
|
|
finally:
|
|
self.semaphore.release()
|
|
|
|
def _get_shouldStop(self):
|
|
self.semaphore.acquire()
|
|
try:
|
|
return self.result.shouldStop
|
|
finally:
|
|
self.semaphore.release()
|
|
def _set_shouldStop(self, value):
|
|
# Another case where we should not subclass TestResult
|
|
pass
|
|
shouldStop = property(_get_shouldStop, _set_shouldStop)
|
|
|
|
def stop(self):
|
|
self.semaphore.acquire()
|
|
try:
|
|
self.result.stop()
|
|
finally:
|
|
self.semaphore.release()
|
|
|
|
def stopTestRun(self):
|
|
self.semaphore.acquire()
|
|
try:
|
|
self.result.stopTestRun()
|
|
finally:
|
|
self.semaphore.release()
|
|
|
|
def done(self):
|
|
self.semaphore.acquire()
|
|
try:
|
|
self.result.done()
|
|
finally:
|
|
self.semaphore.release()
|
|
|
|
def startTest(self, test):
|
|
self._test_start = self._now()
|
|
super(ThreadsafeForwardingResult, self).startTest(test)
|
|
|
|
def wasSuccessful(self):
|
|
return self.result.wasSuccessful()
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
"""See `TestResult`."""
|
|
super(ThreadsafeForwardingResult, self).tags(new_tags, gone_tags)
|
|
if self._test_start is not None:
|
|
self._test_tags = _merge_tags(
|
|
self._test_tags, (new_tags, gone_tags))
|
|
else:
|
|
self._global_tags = _merge_tags(
|
|
self._global_tags, (new_tags, gone_tags))
|
|
|
|
|
|
def _merge_tags(existing, changed):
|
|
new_tags, gone_tags = changed
|
|
result_new = set(existing[0])
|
|
result_gone = set(existing[1])
|
|
result_new.update(new_tags)
|
|
result_new.difference_update(gone_tags)
|
|
result_gone.update(gone_tags)
|
|
result_gone.difference_update(new_tags)
|
|
return result_new, result_gone
|
|
|
|
|
|
class ExtendedToOriginalDecorator(object):
|
|
"""Permit new TestResult API code to degrade gracefully with old results.
|
|
|
|
This decorates an existing TestResult and converts missing outcomes
|
|
such as addSkip to older outcomes such as addSuccess. It also supports
|
|
the extended details protocol. In all cases the most recent protocol
|
|
is attempted first, and fallbacks only occur when the decorated result
|
|
does not support the newer style of calling.
|
|
"""
|
|
|
|
def __init__(self, decorated):
|
|
self.decorated = decorated
|
|
self._tags = TagContext()
|
|
# Only used for old TestResults that do not have failfast.
|
|
self._failfast = False
|
|
|
|
def __repr__(self):
|
|
return '<%s %r>' % (self.__class__.__name__, self.decorated)
|
|
|
|
def __getattr__(self, name):
|
|
return getattr(self.decorated, name)
|
|
|
|
def addError(self, test, err=None, details=None):
|
|
try:
|
|
self._check_args(err, details)
|
|
if details is not None:
|
|
try:
|
|
return self.decorated.addError(test, details=details)
|
|
except TypeError:
|
|
# have to convert
|
|
err = self._details_to_exc_info(details)
|
|
return self.decorated.addError(test, err)
|
|
finally:
|
|
if self.failfast:
|
|
self.stop()
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
self._check_args(err, details)
|
|
addExpectedFailure = getattr(
|
|
self.decorated, 'addExpectedFailure', None)
|
|
if addExpectedFailure is None:
|
|
return self.addSuccess(test)
|
|
if details is not None:
|
|
try:
|
|
return addExpectedFailure(test, details=details)
|
|
except TypeError:
|
|
# have to convert
|
|
err = self._details_to_exc_info(details)
|
|
return addExpectedFailure(test, err)
|
|
|
|
def addFailure(self, test, err=None, details=None):
|
|
try:
|
|
self._check_args(err, details)
|
|
if details is not None:
|
|
try:
|
|
return self.decorated.addFailure(test, details=details)
|
|
except TypeError:
|
|
# have to convert
|
|
err = self._details_to_exc_info(details)
|
|
return self.decorated.addFailure(test, err)
|
|
finally:
|
|
if self.failfast:
|
|
self.stop()
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
self._check_args(reason, details)
|
|
addSkip = getattr(self.decorated, 'addSkip', None)
|
|
if addSkip is None:
|
|
return self.decorated.addSuccess(test)
|
|
if details is not None:
|
|
try:
|
|
return addSkip(test, details=details)
|
|
except TypeError:
|
|
# extract the reason if it's available
|
|
try:
|
|
reason = details['reason'].as_text()
|
|
except KeyError:
|
|
reason = _details_to_str(details)
|
|
return addSkip(test, reason)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
try:
|
|
outcome = getattr(self.decorated, 'addUnexpectedSuccess', None)
|
|
if outcome is None:
|
|
try:
|
|
test.fail("")
|
|
except test.failureException:
|
|
return self.addFailure(test, sys.exc_info())
|
|
if details is not None:
|
|
try:
|
|
return outcome(test, details=details)
|
|
except TypeError:
|
|
pass
|
|
return outcome(test)
|
|
finally:
|
|
if self.failfast:
|
|
self.stop()
|
|
|
|
def addSuccess(self, test, details=None):
|
|
if details is not None:
|
|
try:
|
|
return self.decorated.addSuccess(test, details=details)
|
|
except TypeError:
|
|
pass
|
|
return self.decorated.addSuccess(test)
|
|
|
|
def _check_args(self, err, details):
|
|
param_count = 0
|
|
if err is not None:
|
|
param_count += 1
|
|
if details is not None:
|
|
param_count += 1
|
|
if param_count != 1:
|
|
raise ValueError("Must pass only one of err '%s' and details '%s"
|
|
% (err, details))
|
|
|
|
def _details_to_exc_info(self, details):
|
|
"""Convert a details dict to an exc_info tuple."""
|
|
return (
|
|
_StringException,
|
|
_StringException(_details_to_str(details, special='traceback')),
|
|
None)
|
|
|
|
@property
|
|
def current_tags(self):
|
|
return getattr(
|
|
self.decorated, 'current_tags', self._tags.get_current_tags())
|
|
|
|
def done(self):
|
|
try:
|
|
return self.decorated.done()
|
|
except AttributeError:
|
|
return
|
|
|
|
def _get_failfast(self):
|
|
return getattr(self.decorated, 'failfast', self._failfast)
|
|
def _set_failfast(self, value):
|
|
if safe_hasattr(self.decorated, 'failfast'):
|
|
self.decorated.failfast = value
|
|
else:
|
|
self._failfast = value
|
|
failfast = property(_get_failfast, _set_failfast)
|
|
|
|
def progress(self, offset, whence):
|
|
method = getattr(self.decorated, 'progress', None)
|
|
if method is None:
|
|
return
|
|
return method(offset, whence)
|
|
|
|
@property
|
|
def shouldStop(self):
|
|
return self.decorated.shouldStop
|
|
|
|
def startTest(self, test):
|
|
self._tags = TagContext(self._tags)
|
|
return self.decorated.startTest(test)
|
|
|
|
def startTestRun(self):
|
|
self._tags = TagContext()
|
|
try:
|
|
return self.decorated.startTestRun()
|
|
except AttributeError:
|
|
return
|
|
|
|
def stop(self):
|
|
return self.decorated.stop()
|
|
|
|
def stopTest(self, test):
|
|
self._tags = self._tags.parent
|
|
return self.decorated.stopTest(test)
|
|
|
|
def stopTestRun(self):
|
|
try:
|
|
return self.decorated.stopTestRun()
|
|
except AttributeError:
|
|
return
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
method = getattr(self.decorated, 'tags', None)
|
|
if method is not None:
|
|
return method(new_tags, gone_tags)
|
|
else:
|
|
self._tags.change_tags(new_tags, gone_tags)
|
|
|
|
def time(self, a_datetime):
|
|
method = getattr(self.decorated, 'time', None)
|
|
if method is None:
|
|
return
|
|
return method(a_datetime)
|
|
|
|
def wasSuccessful(self):
|
|
return self.decorated.wasSuccessful()
|
|
|
|
|
|
class ExtendedToStreamDecorator(CopyStreamResult, StreamSummary, TestControl):
|
|
"""Permit using old TestResult API code with new StreamResult objects.
|
|
|
|
This decorates a StreamResult and converts old (Python 2.6 / 2.7 /
|
|
Extended) TestResult API calls into StreamResult calls.
|
|
|
|
It also supports regular StreamResult calls, making it safe to wrap around
|
|
any StreamResult.
|
|
"""
|
|
|
|
def __init__(self, decorated):
|
|
super(ExtendedToStreamDecorator, self).__init__([decorated])
|
|
# Deal with mismatched base class constructors.
|
|
TestControl.__init__(self)
|
|
self._started = False
|
|
|
|
def _get_failfast(self):
|
|
return len(self.targets) == 2
|
|
def _set_failfast(self, value):
|
|
if value:
|
|
if len(self.targets) == 2:
|
|
return
|
|
self.targets.append(StreamFailFast(self.stop))
|
|
else:
|
|
del self.targets[1:]
|
|
failfast = property(_get_failfast, _set_failfast)
|
|
|
|
def startTest(self, test):
|
|
if not self._started:
|
|
self.startTestRun()
|
|
self.status(test.id(), 'inprogress', timestamp=self._now())
|
|
self._tags = TagContext(self._tags)
|
|
|
|
def stopTest(self, test):
|
|
self._tags = self._tags.parent
|
|
|
|
def addError(self, test, err=None, details=None):
|
|
self._check_args(err, details)
|
|
self._convert(test, err, details, 'fail')
|
|
addFailure = addError
|
|
|
|
def _convert(self, test, err, details, status, reason=None):
|
|
if not self._started:
|
|
self.startTestRun()
|
|
test_id = test.id()
|
|
now = self._now()
|
|
if err is not None:
|
|
if details is None:
|
|
details = {}
|
|
details['traceback'] = TracebackContent(err, test)
|
|
if details is not None:
|
|
for name, content in details.items():
|
|
mime_type = repr(content.content_type)
|
|
for file_bytes in content.iter_bytes():
|
|
self.status(file_name=name, file_bytes=file_bytes,
|
|
mime_type=mime_type, test_id=test_id, timestamp=now)
|
|
self.status(file_name=name, file_bytes=_b(""), eof=True,
|
|
mime_type=mime_type, test_id=test_id, timestamp=now)
|
|
if reason is not None:
|
|
self.status(file_name='reason', file_bytes=reason.encode('utf8'),
|
|
eof=True, mime_type="text/plain; charset=utf8",
|
|
test_id=test_id, timestamp=now)
|
|
self.status(test_id, status, test_tags=self.current_tags,
|
|
timestamp=now)
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
self._check_args(err, details)
|
|
self._convert(test, err, details, 'xfail')
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
self._convert(test, None, details, 'skip', reason)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
self._convert(test, None, details, 'uxsuccess')
|
|
|
|
def addSuccess(self, test, details=None):
|
|
self._convert(test, None, details, 'success')
|
|
|
|
def _check_args(self, err, details):
|
|
param_count = 0
|
|
if err is not None:
|
|
param_count += 1
|
|
if details is not None:
|
|
param_count += 1
|
|
if param_count != 1:
|
|
raise ValueError("Must pass only one of err '%s' and details '%s"
|
|
% (err, details))
|
|
|
|
def startTestRun(self):
|
|
super(ExtendedToStreamDecorator, self).startTestRun()
|
|
self._tags = TagContext()
|
|
self.shouldStop = False
|
|
self.__now = None
|
|
self._started = True
|
|
|
|
def stopTest(self, test):
|
|
self._tags = self._tags.parent
|
|
|
|
@property
|
|
def current_tags(self):
|
|
"""The currently set tags."""
|
|
return self._tags.get_current_tags()
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
"""Add and remove tags from the test.
|
|
|
|
:param new_tags: A set of tags to be added to the stream.
|
|
:param gone_tags: A set of tags to be removed from the stream.
|
|
"""
|
|
self._tags.change_tags(new_tags, gone_tags)
|
|
|
|
def _now(self):
|
|
"""Return the current 'test time'.
|
|
|
|
If the time() method has not been called, this is equivalent to
|
|
datetime.now(), otherwise its the last supplied datestamp given to the
|
|
time() method.
|
|
"""
|
|
if self.__now is None:
|
|
return datetime.datetime.now(utc)
|
|
else:
|
|
return self.__now
|
|
|
|
def time(self, a_datetime):
|
|
self.__now = a_datetime
|
|
|
|
def wasSuccessful(self):
|
|
if not self._started:
|
|
self.startTestRun()
|
|
return super(ExtendedToStreamDecorator, self).wasSuccessful()
|
|
|
|
|
|
class StreamToExtendedDecorator(StreamResult):
|
|
"""Convert StreamResult API calls into ExtendedTestResult calls.
|
|
|
|
This will buffer all calls for all concurrently active tests, and
|
|
then flush each test as they complete.
|
|
|
|
Incomplete tests will be flushed as errors when the test run stops.
|
|
|
|
Non test file attachments are accumulated into a test called
|
|
'testtools.extradata' flushed at the end of the run.
|
|
"""
|
|
|
|
def __init__(self, decorated):
|
|
# ExtendedToOriginalDecorator takes care of thunking details back to
|
|
# exceptions/reasons etc.
|
|
self.decorated = ExtendedToOriginalDecorator(decorated)
|
|
# StreamToDict buffers and gives us individual tests.
|
|
self.hook = StreamToDict(self._handle_tests)
|
|
|
|
def status(self, test_id=None, test_status=None, *args, **kwargs):
|
|
if test_status == 'exists':
|
|
return
|
|
self.hook.status(
|
|
test_id=test_id, test_status=test_status, *args, **kwargs)
|
|
|
|
def startTestRun(self):
|
|
self.decorated.startTestRun()
|
|
self.hook.startTestRun()
|
|
|
|
def stopTestRun(self):
|
|
self.hook.stopTestRun()
|
|
self.decorated.stopTestRun()
|
|
|
|
def _handle_tests(self, test_dict):
|
|
case = test_dict_to_case(test_dict)
|
|
case.run(self.decorated)
|
|
|
|
|
|
class TestResultDecorator(object):
|
|
"""General pass-through decorator.
|
|
|
|
This provides a base that other TestResults can inherit from to
|
|
gain basic forwarding functionality.
|
|
"""
|
|
|
|
def __init__(self, decorated):
|
|
"""Create a TestResultDecorator forwarding to decorated."""
|
|
self.decorated = decorated
|
|
|
|
def startTest(self, test):
|
|
return self.decorated.startTest(test)
|
|
|
|
def startTestRun(self):
|
|
return self.decorated.startTestRun()
|
|
|
|
def stopTest(self, test):
|
|
return self.decorated.stopTest(test)
|
|
|
|
def stopTestRun(self):
|
|
return self.decorated.stopTestRun()
|
|
|
|
def addError(self, test, err=None, details=None):
|
|
return self.decorated.addError(test, err, details=details)
|
|
|
|
def addFailure(self, test, err=None, details=None):
|
|
return self.decorated.addFailure(test, err, details=details)
|
|
|
|
def addSuccess(self, test, details=None):
|
|
return self.decorated.addSuccess(test, details=details)
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
return self.decorated.addSkip(test, reason, details=details)
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
return self.decorated.addExpectedFailure(test, err, details=details)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
return self.decorated.addUnexpectedSuccess(test, details=details)
|
|
|
|
def progress(self, offset, whence):
|
|
return self.decorated.progress(offset, whence)
|
|
|
|
def wasSuccessful(self):
|
|
return self.decorated.wasSuccessful()
|
|
|
|
@property
|
|
def current_tags(self):
|
|
return self.decorated.current_tags
|
|
|
|
@property
|
|
def shouldStop(self):
|
|
return self.decorated.shouldStop
|
|
|
|
def stop(self):
|
|
return self.decorated.stop()
|
|
|
|
@property
|
|
def testsRun(self):
|
|
return self.decorated.testsRun
|
|
|
|
def tags(self, new_tags, gone_tags):
|
|
return self.decorated.tags(new_tags, gone_tags)
|
|
|
|
def time(self, a_datetime):
|
|
return self.decorated.time(a_datetime)
|
|
|
|
|
|
class Tagger(TestResultDecorator):
|
|
"""Tag each test individually."""
|
|
|
|
def __init__(self, decorated, new_tags, gone_tags):
|
|
"""Wrap 'decorated' such that each test is tagged.
|
|
|
|
:param new_tags: Tags to be added for each test.
|
|
:param gone_tags: Tags to be removed for each test.
|
|
"""
|
|
super(Tagger, self).__init__(decorated)
|
|
self._new_tags = set(new_tags)
|
|
self._gone_tags = set(gone_tags)
|
|
|
|
def startTest(self, test):
|
|
super(Tagger, self).startTest(test)
|
|
self.tags(self._new_tags, self._gone_tags)
|
|
|
|
|
|
class TestByTestResult(TestResult):
|
|
"""Call something every time a test completes."""
|
|
|
|
def __init__(self, on_test):
|
|
"""Construct a ``TestByTestResult``.
|
|
|
|
:param on_test: A callable that take a test case, a status (one of
|
|
"success", "failure", "error", "skip", or "xfail"), a start time
|
|
(a ``datetime`` with timezone), a stop time, an iterable of tags,
|
|
and a details dict. Is called at the end of each test (i.e. on
|
|
``stopTest``) with the accumulated values for that test.
|
|
"""
|
|
super(TestByTestResult, self).__init__()
|
|
self._on_test = on_test
|
|
|
|
def startTest(self, test):
|
|
super(TestByTestResult, self).startTest(test)
|
|
self._start_time = self._now()
|
|
# There's no supported (i.e. tested) behaviour that relies on these
|
|
# being set, but it makes me more comfortable all the same. -- jml
|
|
self._status = None
|
|
self._details = None
|
|
self._stop_time = None
|
|
|
|
def stopTest(self, test):
|
|
self._stop_time = self._now()
|
|
tags = set(self.current_tags)
|
|
super(TestByTestResult, self).stopTest(test)
|
|
self._on_test(
|
|
test=test,
|
|
status=self._status,
|
|
start_time=self._start_time,
|
|
stop_time=self._stop_time,
|
|
tags=tags,
|
|
details=self._details)
|
|
|
|
def _err_to_details(self, test, err, details):
|
|
if details:
|
|
return details
|
|
return {'traceback': TracebackContent(err, test)}
|
|
|
|
def addSuccess(self, test, details=None):
|
|
super(TestByTestResult, self).addSuccess(test)
|
|
self._status = 'success'
|
|
self._details = details
|
|
|
|
def addFailure(self, test, err=None, details=None):
|
|
super(TestByTestResult, self).addFailure(test, err, details)
|
|
self._status = 'failure'
|
|
self._details = self._err_to_details(test, err, details)
|
|
|
|
def addError(self, test, err=None, details=None):
|
|
super(TestByTestResult, self).addError(test, err, details)
|
|
self._status = 'error'
|
|
self._details = self._err_to_details(test, err, details)
|
|
|
|
def addSkip(self, test, reason=None, details=None):
|
|
super(TestByTestResult, self).addSkip(test, reason, details)
|
|
self._status = 'skip'
|
|
if details is None:
|
|
details = {'reason': text_content(reason)}
|
|
elif reason:
|
|
# XXX: What if details already has 'reason' key?
|
|
details['reason'] = text_content(reason)
|
|
self._details = details
|
|
|
|
def addExpectedFailure(self, test, err=None, details=None):
|
|
super(TestByTestResult, self).addExpectedFailure(test, err, details)
|
|
self._status = 'xfail'
|
|
self._details = self._err_to_details(test, err, details)
|
|
|
|
def addUnexpectedSuccess(self, test, details=None):
|
|
super(TestByTestResult, self).addUnexpectedSuccess(test, details)
|
|
self._status = 'success'
|
|
self._details = details
|
|
|
|
|
|
class _StringException(Exception):
|
|
"""An exception made from an arbitrary string."""
|
|
|
|
if not str_is_unicode:
|
|
def __init__(self, string):
|
|
if type(string) is not unicode:
|
|
raise TypeError("_StringException expects unicode, got %r" %
|
|
(string,))
|
|
Exception.__init__(self, string)
|
|
|
|
def __str__(self):
|
|
return self.args[0].encode("utf-8")
|
|
|
|
def __unicode__(self):
|
|
return self.args[0]
|
|
# For 3.0 and above the default __str__ is fine, so we don't define one.
|
|
|
|
def __hash__(self):
|
|
return id(self)
|
|
|
|
def __eq__(self, other):
|
|
try:
|
|
return self.args == other.args
|
|
except AttributeError:
|
|
return False
|
|
|
|
|
|
def _format_text_attachment(name, text):
|
|
if '\n' in text:
|
|
return "%s: {{{\n%s\n}}}\n" % (name, text)
|
|
return "%s: {{{%s}}}" % (name, text)
|
|
|
|
|
|
def _details_to_str(details, special=None):
|
|
"""Convert a details dict to a string.
|
|
|
|
:param details: A dictionary mapping short names to ``Content`` objects.
|
|
:param special: If specified, an attachment that should have special
|
|
attention drawn to it. The primary attachment. Normally it's the
|
|
traceback that caused the test to fail.
|
|
:return: A formatted string that can be included in text test results.
|
|
"""
|
|
empty_attachments = []
|
|
binary_attachments = []
|
|
text_attachments = []
|
|
special_content = None
|
|
# sorted is for testing, may want to remove that and use a dict
|
|
# subclass with defined order for items instead.
|
|
for key, content in sorted(details.items()):
|
|
if content.content_type.type != 'text':
|
|
binary_attachments.append((key, content.content_type))
|
|
continue
|
|
text = content.as_text().strip()
|
|
if not text:
|
|
empty_attachments.append(key)
|
|
continue
|
|
# We want the 'special' attachment to be at the bottom.
|
|
if key == special:
|
|
special_content = '%s\n' % (text,)
|
|
continue
|
|
text_attachments.append(_format_text_attachment(key, text))
|
|
if text_attachments and not text_attachments[-1].endswith('\n'):
|
|
text_attachments.append('')
|
|
if special_content:
|
|
text_attachments.append(special_content)
|
|
lines = []
|
|
if binary_attachments:
|
|
lines.append('Binary content:\n')
|
|
for name, content_type in binary_attachments:
|
|
lines.append(' %s (%s)\n' % (name, content_type))
|
|
if empty_attachments:
|
|
lines.append('Empty attachments:\n')
|
|
for name in empty_attachments:
|
|
lines.append(' %s\n' % (name,))
|
|
if (binary_attachments or empty_attachments) and text_attachments:
|
|
lines.append('\n')
|
|
lines.append('\n'.join(text_attachments))
|
|
return _u('').join(lines)
|