unittest2 recently added the ability to show local variables in tracebacks as https://github.com/testing-cabal/testtools/issues/111 requested for us. Reusing that requires some refactoring of our code, in particular where we were reimplementing bits of the traceback module. Now we can just hard-depend on traceback2 and linecache2 which are brought in by unittest2 1.0.0. Change-Id: Ieb3268029d26b48ed4fcd25ed644bd339f6aa3fb
		
			
				
	
	
		
			371 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			371 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# Copyright (c) 2009-2012 testtools developers. See LICENSE for details.
 | 
						|
 | 
						|
"""Content - a MIME-like Content object."""
 | 
						|
 | 
						|
__all__ = [
 | 
						|
    'attach_file',
 | 
						|
    'Content',
 | 
						|
    'content_from_file',
 | 
						|
    'content_from_stream',
 | 
						|
    'json_content',
 | 
						|
    'text_content',
 | 
						|
    'TracebackContent',
 | 
						|
    ]
 | 
						|
 | 
						|
import codecs
 | 
						|
import inspect
 | 
						|
import json
 | 
						|
import os
 | 
						|
import sys
 | 
						|
 | 
						|
from extras import try_import
 | 
						|
# To let setup.py work, make this a conditional import.
 | 
						|
traceback = try_import('traceback2')
 | 
						|
 | 
						|
from testtools.compat import (
 | 
						|
    _b,
 | 
						|
    _u,
 | 
						|
    istext,
 | 
						|
    str_is_unicode,
 | 
						|
)
 | 
						|
from testtools.content_type import ContentType, JSON, UTF8_TEXT
 | 
						|
 | 
						|
 | 
						|
functools = try_import('functools')
 | 
						|
 | 
						|
_join_b = _b("").join
 | 
						|
 | 
						|
 | 
						|
DEFAULT_CHUNK_SIZE = 4096
 | 
						|
 | 
						|
STDOUT_LINE = '\nStdout:\n%s'
 | 
						|
STDERR_LINE = '\nStderr:\n%s'
 | 
						|
 | 
						|
 | 
						|
def _iter_chunks(stream, chunk_size, seek_offset=None, seek_whence=0):
 | 
						|
    """Read 'stream' in chunks of 'chunk_size'.
 | 
						|
 | 
						|
    :param stream: A file-like object to read from.
 | 
						|
    :param chunk_size: The size of each read from 'stream'.
 | 
						|
    :param seek_offset: If non-None, seek before iterating.
 | 
						|
    :param seek_whence: Pass through to the seek call, if seeking.
 | 
						|
    """
 | 
						|
    if seek_offset is not None:
 | 
						|
        stream.seek(seek_offset, seek_whence)
 | 
						|
    chunk = stream.read(chunk_size)
 | 
						|
    while chunk:
 | 
						|
        yield chunk
 | 
						|
        chunk = stream.read(chunk_size)
 | 
						|
 | 
						|
 | 
						|
class Content(object):
 | 
						|
    """A MIME-like Content object.
 | 
						|
 | 
						|
    'Content' objects can be serialised to bytes using the iter_bytes method.
 | 
						|
    If the 'Content-Type' is recognised by other code, they are welcome to
 | 
						|
    look for richer contents that mere byte serialisation - for example in
 | 
						|
    memory object graphs etc. However, such code MUST be prepared to receive
 | 
						|
    a generic 'Content' object that has been reconstructed from a byte stream.
 | 
						|
 | 
						|
    :ivar content_type: The content type of this Content.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, content_type, get_bytes):
 | 
						|
        """Create a ContentType."""
 | 
						|
        if None in (content_type, get_bytes):
 | 
						|
            raise ValueError("None not permitted in %r, %r" % (
 | 
						|
                content_type, get_bytes))
 | 
						|
        self.content_type = content_type
 | 
						|
        self._get_bytes = get_bytes
 | 
						|
 | 
						|
    def __eq__(self, other):
 | 
						|
        return (self.content_type == other.content_type and
 | 
						|
            _join_b(self.iter_bytes()) == _join_b(other.iter_bytes()))
 | 
						|
 | 
						|
    def as_text(self):
 | 
						|
        """Return all of the content as text.
 | 
						|
 | 
						|
        This is only valid where ``iter_text`` is.  It will load all of the
 | 
						|
        content into memory.  Where this is a concern, use ``iter_text``
 | 
						|
        instead.
 | 
						|
        """
 | 
						|
        return _u('').join(self.iter_text())
 | 
						|
 | 
						|
    def iter_bytes(self):
 | 
						|
        """Iterate over bytestrings of the serialised content."""
 | 
						|
        return self._get_bytes()
 | 
						|
 | 
						|
    def iter_text(self):
 | 
						|
        """Iterate over the text of the serialised content.
 | 
						|
 | 
						|
        This is only valid for text MIME types, and will use ISO-8859-1 if
 | 
						|
        no charset parameter is present in the MIME type. (This is somewhat
 | 
						|
        arbitrary, but consistent with RFC2617 3.7.1).
 | 
						|
 | 
						|
        :raises ValueError: If the content type is not text/\*.
 | 
						|
        """
 | 
						|
        if self.content_type.type != "text":
 | 
						|
            raise ValueError("Not a text type %r" % self.content_type)
 | 
						|
        return self._iter_text()
 | 
						|
 | 
						|
    def _iter_text(self):
 | 
						|
        """Worker for iter_text - does the decoding."""
 | 
						|
        encoding = self.content_type.parameters.get('charset', 'ISO-8859-1')
 | 
						|
        decoder = codecs.getincrementaldecoder(encoding)()
 | 
						|
        for bytes in self.iter_bytes():
 | 
						|
            yield decoder.decode(bytes)
 | 
						|
        final = decoder.decode(_b(''), True)
 | 
						|
        if final:
 | 
						|
            yield final
 | 
						|
 | 
						|
    def __repr__(self):
 | 
						|
        return "<Content type=%r, value=%r>" % (
 | 
						|
            self.content_type, _join_b(self.iter_bytes()))
 | 
						|
 | 
						|
 | 
						|
class StackLinesContent(Content):
 | 
						|
    """Content object for stack lines.
 | 
						|
 | 
						|
    This adapts a list of "preprocessed" stack lines into a 'Content' object.
 | 
						|
    The stack lines are most likely produced from ``traceback.extract_stack``
 | 
						|
    or ``traceback.extract_tb``.
 | 
						|
 | 
						|
    text/x-traceback;language=python is used for the mime type, in order to
 | 
						|
    provide room for other languages to format their tracebacks differently.
 | 
						|
    """
 | 
						|
 | 
						|
    # Whether or not to hide layers of the stack trace that are
 | 
						|
    # unittest/testtools internal code.  Defaults to True since the
 | 
						|
    # system-under-test is rarely unittest or testtools.
 | 
						|
    HIDE_INTERNAL_STACK = True
 | 
						|
 | 
						|
    def __init__(self, stack_lines, prefix_content="", postfix_content=""):
 | 
						|
        """Create a StackLinesContent for ``stack_lines``.
 | 
						|
 | 
						|
        :param stack_lines: A list of preprocessed stack lines, probably
 | 
						|
            obtained by calling ``traceback.extract_stack`` or
 | 
						|
            ``traceback.extract_tb``.
 | 
						|
        :param prefix_content: If specified, a unicode string to prepend to the
 | 
						|
            text content.
 | 
						|
        :param postfix_content: If specified, a unicode string to append to the
 | 
						|
            text content.
 | 
						|
        """
 | 
						|
        content_type = ContentType('text', 'x-traceback',
 | 
						|
            {"language": "python", "charset": "utf8"})
 | 
						|
        value = prefix_content + \
 | 
						|
            self._stack_lines_to_unicode(stack_lines) + \
 | 
						|
            postfix_content
 | 
						|
        super(StackLinesContent, self).__init__(
 | 
						|
            content_type, lambda: [value.encode("utf8")])
 | 
						|
 | 
						|
    def _stack_lines_to_unicode(self, stack_lines):
 | 
						|
        """Converts a list of pre-processed stack lines into a unicode string.
 | 
						|
        """
 | 
						|
        msg_lines = traceback.format_list(stack_lines)
 | 
						|
        return _u('').join(msg_lines)
 | 
						|
 | 
						|
 | 
						|
class TracebackContent(Content):
 | 
						|
    """Content object for tracebacks.
 | 
						|
 | 
						|
    This adapts an exc_info tuple to the 'Content' interface.
 | 
						|
    'text/x-traceback;language=python' is used for the mime type, in order to
 | 
						|
    provide room for other languages to format their tracebacks differently.
 | 
						|
    """
 | 
						|
 | 
						|
    def __init__(self, err, test, capture_locals=False):
 | 
						|
        """Create a TracebackContent for ``err``.
 | 
						|
 | 
						|
        :param err: An exc_info error tuple.
 | 
						|
        :param test: A test object used to obtain failureException.
 | 
						|
        :param capture_locals: If true, show locals in the traceback.
 | 
						|
        """
 | 
						|
        if err is None:
 | 
						|
            raise ValueError("err may not be None")
 | 
						|
 | 
						|
        exctype, value, tb = err
 | 
						|
        # Skip test runner traceback levels
 | 
						|
        if StackLinesContent.HIDE_INTERNAL_STACK:
 | 
						|
            while tb and '__unittest' in tb.tb_frame.f_globals:
 | 
						|
                tb = tb.tb_next
 | 
						|
 | 
						|
        limit = None
 | 
						|
        # Disabled due to https://bugs.launchpad.net/testtools/+bug/1188420
 | 
						|
        if (False
 | 
						|
            and StackLinesContent.HIDE_INTERNAL_STACK
 | 
						|
            and test.failureException
 | 
						|
            and isinstance(value, test.failureException)):
 | 
						|
            # Skip assert*() traceback levels
 | 
						|
            limit = 0
 | 
						|
            while tb and not self._is_relevant_tb_level(tb):
 | 
						|
                limit += 1
 | 
						|
                tb = tb.tb_next
 | 
						|
 | 
						|
        stack_lines = list(traceback.TracebackException(exctype, value, tb,
 | 
						|
            limit=limit, capture_locals=capture_locals).format())
 | 
						|
        content_type = ContentType('text', 'x-traceback',
 | 
						|
            {"language": "python", "charset": "utf8"})
 | 
						|
        super(TracebackContent, self).__init__(
 | 
						|
            content_type, lambda: [x.encode('utf8') for x in stack_lines])
 | 
						|
 | 
						|
 | 
						|
def StacktraceContent(prefix_content="", postfix_content=""):
 | 
						|
    """Content object for stack traces.
 | 
						|
 | 
						|
    This function will create and return a 'Content' object that contains a
 | 
						|
    stack trace.
 | 
						|
 | 
						|
    The mime type is set to 'text/x-traceback;language=python', so other
 | 
						|
    languages can format their stack traces differently.
 | 
						|
 | 
						|
    :param prefix_content: A unicode string to add before the stack lines.
 | 
						|
    :param postfix_content: A unicode string to add after the stack lines.
 | 
						|
    """
 | 
						|
    stack = traceback.walk_stack(None)
 | 
						|
    def filter_stack(stack):
 | 
						|
        # Discard the filter_stack frame.
 | 
						|
        next(stack)
 | 
						|
        # Discard the StacktraceContent frame.
 | 
						|
        next(stack)
 | 
						|
        for f, f_lineno in stack:
 | 
						|
            if StackLinesContent.HIDE_INTERNAL_STACK:
 | 
						|
                if '__unittest' in f.f_globals:
 | 
						|
                    return
 | 
						|
                yield f, f_lineno
 | 
						|
    extract = traceback.StackSummary.extract(filter_stack(stack))
 | 
						|
    extract.reverse()
 | 
						|
    return StackLinesContent(extract, prefix_content, postfix_content)
 | 
						|
 | 
						|
 | 
						|
def json_content(json_data):
 | 
						|
    """Create a JSON Content object from JSON-encodeable data."""
 | 
						|
    data = json.dumps(json_data)
 | 
						|
    if str_is_unicode:
 | 
						|
        # The json module perversely returns native str not bytes
 | 
						|
        data = data.encode('utf8')
 | 
						|
    return Content(JSON, lambda: [data])
 | 
						|
 | 
						|
 | 
						|
def text_content(text):
 | 
						|
    """Create a Content object from some text.
 | 
						|
 | 
						|
    This is useful for adding details which are short strings.
 | 
						|
    """
 | 
						|
    if not istext(text):
 | 
						|
        raise TypeError(
 | 
						|
            "text_content must be given text, not '%s'." % type(text).__name__
 | 
						|
        )
 | 
						|
    return Content(UTF8_TEXT, lambda: [text.encode('utf8')])
 | 
						|
 | 
						|
 | 
						|
def maybe_wrap(wrapper, func):
 | 
						|
    """Merge metadata for func into wrapper if functools is present."""
 | 
						|
    if functools is not None:
 | 
						|
        wrapper = functools.update_wrapper(wrapper, func)
 | 
						|
    return wrapper
 | 
						|
 | 
						|
 | 
						|
def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
 | 
						|
                      buffer_now=False, seek_offset=None, seek_whence=0):
 | 
						|
    """Create a Content object from a file on disk.
 | 
						|
 | 
						|
    Note that unless ``buffer_now`` is explicitly passed in as True, the file
 | 
						|
    will only be read from when ``iter_bytes`` is called.
 | 
						|
 | 
						|
    :param path: The path to the file to be used as content.
 | 
						|
    :param content_type: The type of content.  If not specified, defaults
 | 
						|
        to UTF8-encoded text/plain.
 | 
						|
    :param chunk_size: The size of chunks to read from the file.
 | 
						|
        Defaults to ``DEFAULT_CHUNK_SIZE``.
 | 
						|
    :param buffer_now: If True, read the file from disk now and keep it in
 | 
						|
        memory. Otherwise, only read when the content is serialized.
 | 
						|
    :param seek_offset: If non-None, seek within the stream before reading it.
 | 
						|
    :param seek_whence: If supplied, pass to ``stream.seek()`` when seeking.
 | 
						|
    """
 | 
						|
    if content_type is None:
 | 
						|
        content_type = UTF8_TEXT
 | 
						|
    def reader():
 | 
						|
        with open(path, 'rb') as stream:
 | 
						|
            for chunk in _iter_chunks(stream,
 | 
						|
                                      chunk_size,
 | 
						|
                                      seek_offset,
 | 
						|
                                      seek_whence):
 | 
						|
                yield chunk
 | 
						|
    return content_from_reader(reader, content_type, buffer_now)
 | 
						|
 | 
						|
 | 
						|
def content_from_stream(stream, content_type=None,
 | 
						|
                        chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False,
 | 
						|
                        seek_offset=None, seek_whence=0):
 | 
						|
    """Create a Content object from a file-like stream.
 | 
						|
 | 
						|
    Note that unless ``buffer_now`` is explicitly passed in as True, the stream
 | 
						|
    will only be read from when ``iter_bytes`` is called.
 | 
						|
 | 
						|
    :param stream: A file-like object to read the content from. The stream
 | 
						|
        is not closed by this function or the 'Content' object it returns.
 | 
						|
    :param content_type: The type of content. If not specified, defaults
 | 
						|
        to UTF8-encoded text/plain.
 | 
						|
    :param chunk_size: The size of chunks to read from the file.
 | 
						|
        Defaults to ``DEFAULT_CHUNK_SIZE``.
 | 
						|
    :param buffer_now: If True, reads from the stream right now. Otherwise,
 | 
						|
        only reads when the content is serialized. Defaults to False.
 | 
						|
    :param seek_offset: If non-None, seek within the stream before reading it.
 | 
						|
    :param seek_whence: If supplied, pass to ``stream.seek()`` when seeking.
 | 
						|
    """
 | 
						|
    if content_type is None:
 | 
						|
        content_type = UTF8_TEXT
 | 
						|
    reader = lambda: _iter_chunks(stream, chunk_size, seek_offset, seek_whence)
 | 
						|
    return content_from_reader(reader, content_type, buffer_now)
 | 
						|
 | 
						|
 | 
						|
def content_from_reader(reader, content_type, buffer_now):
 | 
						|
    """Create a Content object that will obtain the content from reader.
 | 
						|
 | 
						|
    :param reader: A callback to read the content. Should return an iterable of
 | 
						|
        bytestrings.
 | 
						|
    :param content_type: The content type to create.
 | 
						|
    :param buffer_now: If True the reader is evaluated immediately and
 | 
						|
        buffered.
 | 
						|
    """
 | 
						|
    if content_type is None:
 | 
						|
        content_type = UTF8_TEXT
 | 
						|
    if buffer_now:
 | 
						|
        contents = list(reader())
 | 
						|
        reader = lambda: contents
 | 
						|
    return Content(content_type, reader)
 | 
						|
 | 
						|
 | 
						|
def attach_file(detailed, path, name=None, content_type=None,
 | 
						|
                chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=True):
 | 
						|
    """Attach a file to this test as a detail.
 | 
						|
 | 
						|
    This is a convenience method wrapping around ``addDetail``.
 | 
						|
 | 
						|
    Note that by default the contents of the file will be read immediately. If
 | 
						|
    ``buffer_now`` is False, then the file *must* exist when the test result is
 | 
						|
    called with the results of this test, after the test has been torn down.
 | 
						|
 | 
						|
    :param detailed: An object with details
 | 
						|
    :param path: The path to the file to attach.
 | 
						|
    :param name: The name to give to the detail for the attached file.
 | 
						|
    :param content_type: The content type of the file.  If not provided,
 | 
						|
        defaults to UTF8-encoded text/plain.
 | 
						|
    :param chunk_size: The size of chunks to read from the file.  Defaults
 | 
						|
        to something sensible.
 | 
						|
    :param buffer_now: If False the file content is read when the content
 | 
						|
        object is evaluated rather than when attach_file is called.
 | 
						|
        Note that this may be after any cleanups that obj_with_details has, so
 | 
						|
        if the file is a temporary file disabling buffer_now may cause the file
 | 
						|
        to be read after it is deleted. To handle those cases, using
 | 
						|
        attach_file as a cleanup is recommended because it guarantees a
 | 
						|
        sequence for when the attach_file call is made::
 | 
						|
 | 
						|
            detailed.addCleanup(attach_file, 'foo.txt', detailed)
 | 
						|
    """
 | 
						|
    if name is None:
 | 
						|
        name = os.path.basename(path)
 | 
						|
    content_object = content_from_file(
 | 
						|
        path, content_type, chunk_size, buffer_now)
 | 
						|
    detailed.addDetail(name, content_object)
 |