Merge content convenience functions.

This commit is contained in:
Jonathan Lange
2011-04-01 14:45:45 +01:00
9 changed files with 314 additions and 42 deletions

4
NEWS
View File

@@ -27,6 +27,10 @@ Improvements
* ``MultiTestResult`` now documented in the manual. (Jonathan Lange, #661116)
* New content helpers ``content_from_file``, ``content_from_stream`` and
``attach_file`` make it easier to attach file-like objects to a
test. (Jonathan Lange, Robert Collins, #694126)
* New ``ExpectedException`` context manager to help write tests against things
that are expected to raise exceptions. (Aaron Bentley)

View File

@@ -765,6 +765,10 @@ To make content out of an image stored on disk, you could do something like::
image = Content(ContentType('image', 'png'), lambda: open('foo.png').read())
Or you could use the convenience function::
image = content_from_file('foo.png', ContentType('image', 'png'))
The ``lambda`` helps make sure that the file is opened and the actual bytes
read only when they are needed by default, when the test is finished. This
means that tests can construct and add Content objects freely without worrying
@@ -782,7 +786,7 @@ see the client-side error *and* the logs from the server-side. Here's how you
might do it::
from testtools import TestCase
from testtools.content import Content
from testtools.content import attach_file, Content
from testtools.content_type import UTF8_TEXT
from myproject import SomeServer
@@ -794,7 +798,7 @@ might do it::
self.server = SomeServer()
self.server.start_up()
self.addCleanup(self.server.shut_down)
self.addCleanup(self.attach_log_file)
self.addCleanup(attach_file, self.server.logfile, self)
def attach_log_file(self):
self.addDetail(
@@ -809,10 +813,6 @@ This test will attach the log file of ``SomeServer`` to each test that is
run. testtools will only display the log file for failing tests, so it's not
such a big deal.
Note the callable passed to ``Content`` reads the log file line-by-line. This
is something of an optimization. If we naively returned all of the log file
as one bytestring, ``Content`` would treat that as a list of byte chunks.
If the act of adding at detail is expensive, you might want to use
addOnException_ so that you only do it when a test actually raises an
exception.

View File

@@ -2,6 +2,15 @@
"""Compatibility support for python 2 and 3."""
__metaclass__ = type
__all__ = [
'_b',
'_u',
'advance_iterator',
'str_is_unicode',
'StringIO',
'unicode_output_stream',
]
import codecs
import linecache
@@ -11,14 +20,9 @@ import re
import sys
import traceback
__metaclass__ = type
__all__ = [
'_b',
'_u',
'advance_iterator',
'str_is_unicode',
'unicode_output_stream',
]
from testtools.helpers import try_imports
StringIO = try_imports(['StringIO.StringIO', 'io.StringIO'])
__u_doc = """A function version of the 'u' prefix.

View File

@@ -1,17 +1,44 @@
# Copyright (c) 2009-2010 testtools developers. See LICENSE for details.
# Copyright (c) 2009-2011 testtools developers. See LICENSE for details.
"""Content - a MIME-like Content object."""
import codecs
__all__ = [
'attach_file',
'Content',
'content_from_file',
'content_from_stream',
'text_content',
'TracebackContent',
]
import codecs
import os
from testtools import try_import
from testtools.compat import _b
from testtools.content_type import ContentType, UTF8_TEXT
from testtools.testresult import TestResult
functools = try_import('functools')
_join_b = _b("").join
DEFAULT_CHUNK_SIZE = 4096
def _iter_chunks(stream, chunk_size):
"""Read 'stream' in chunks of 'chunk_size'.
:param stream: A file-like object to read from.
:param chunk_size: The size of each read from 'stream'.
"""
chunk = stream.read(chunk_size)
while chunk:
yield chunk
chunk = stream.read(chunk_size)
class Content(object):
"""A MIME-like Content object.
@@ -100,3 +127,112 @@ def text_content(text):
This is useful for adding details which are short strings.
"""
return Content(UTF8_TEXT, lambda: [text.encode('utf8')])
def maybe_wrap(wrapper, func):
"""Merge metadata for func into wrapper if functools is present."""
if functools is not None:
wrapper = functools.update_wrapper(wrapper, func)
return wrapper
def content_from_file(path, content_type=None, chunk_size=DEFAULT_CHUNK_SIZE,
buffer_now=False):
"""Create a `Content` object from a file on disk.
Note that unless 'read_now' is explicitly passed in as True, the file
will only be read from when ``iter_bytes`` is called.
:param path: The path to the file to be used as content.
:param content_type: The type of content. If not specified, defaults
to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file.
Defaults to `DEFAULT_CHUNK_SIZE`.
:param buffer_now: If True, read the file from disk now and keep it in
memory. Otherwise, only read when the content is serialized.
"""
if content_type is None:
content_type = UTF8_TEXT
def reader():
# This should be try:finally:, but python2.4 makes that hard. When
# We drop older python support we can make this use a context manager
# for maximum simplicity.
stream = open(path, 'rb')
for chunk in _iter_chunks(stream, chunk_size):
yield chunk
stream.close()
return content_from_reader(reader, content_type, buffer_now)
def content_from_stream(stream, content_type=None,
chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=False):
"""Create a `Content` object from a file-like stream.
Note that the stream will only be read from when ``iter_bytes`` is
called.
:param stream: A file-like object to read the content from. The stream
is not closed by this function or the content object it returns.
:param content_type: The type of content. If not specified, defaults
to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file.
Defaults to `DEFAULT_CHUNK_SIZE`.
:param buffer_now: If True, reads from the stream right now. Otherwise,
only reads when the content is serialized. Defaults to False.
"""
if content_type is None:
content_type = UTF8_TEXT
reader = lambda: _iter_chunks(stream, chunk_size)
return content_from_reader(reader, content_type, buffer_now)
def content_from_reader(reader, content_type, buffer_now):
"""Create a Content object that will obtain the content from reader.
:param reader: A callback to read the content. Should return an iterable of
bytestrings.
:param content_type: The content type to create.
:param buffer_now: If True the reader is evaluated immediately and
buffered.
"""
if content_type is None:
content_type = UTF8_TEXT
if buffer_now:
contents = list(reader())
reader = lambda: contents
return Content(content_type, reader)
def attach_file(detailed, path, name=None, content_type=None,
chunk_size=DEFAULT_CHUNK_SIZE, buffer_now=True):
"""Attach a file to this test as a detail.
This is a convenience method wrapping around `addDetail`.
Note that unless 'read_now' is explicitly passed in as True, the file
*must* exist when the test result is called with the results of this
test, after the test has been torn down.
:param detailed: An object with details
:param path: The path to the file to attach.
:param name: The name to give to the detail for the attached file.
:param content_type: The content type of the file. If not provided,
defaults to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file. Defaults
to something sensible.
:param buffer_now: If False the file content is read when the content
object is evaluated rather than when attach_file is called.
Note that this may be after any cleanups that obj_with_details has, so
if the file is a temporary file disabling buffer_now may cause the file
to be read after it is deleted. To handle those cases, using
attach_file as a cleanup is recommended because it guarantees a
sequence for when the attach_file call is made::
detailed.addCleanup(attach_file, 'foo.txt', detailed)
"""
if name is None:
name = os.path.basename(path)
content_object = content_from_file(
path, content_type, chunk_size, buffer_now)
detailed.addDetail(name, content_object)

View File

@@ -15,7 +15,7 @@ __all__ = [
import sys
from testtools import try_imports
from testtools.compat import StringIO
from testtools.content import (
Content,
text_content,
@@ -34,8 +34,6 @@ from twisted.internet import defer
from twisted.python import log
from twisted.trial.unittest import _LogObserver
StringIO = try_imports(['StringIO.StringIO', 'io.StringIO'])
class _DeferredRunTest(RunTest):
"""Base for tests that return Deferreds."""

View File

@@ -1,11 +1,33 @@
# Copyright (c) 2008-2010 testtools developers. See LICENSE for details.
# Copyright (c) 2008-2011 testtools developers. See LICENSE for details.
import os
import tempfile
import unittest
from testtools import TestCase
from testtools.compat import _b, _u
from testtools.content import Content, TracebackContent, text_content
from testtools.content_type import ContentType, UTF8_TEXT
from testtools.matchers import MatchesException, Raises
from testtools.compat import (
_b,
_u,
StringIO,
)
from testtools.content import (
attach_file,
Content,
content_from_file,
content_from_stream,
TracebackContent,
text_content,
)
from testtools.content_type import (
ContentType,
UTF8_TEXT,
)
from testtools.matchers import (
Equals,
MatchesException,
Raises,
raises,
)
from testtools.tests.helpers import an_exc_info
@@ -15,10 +37,11 @@ raises_value_error = Raises(MatchesException(ValueError))
class TestContent(TestCase):
def test___init___None_errors(self):
self.assertThat(lambda:Content(None, None), raises_value_error)
self.assertThat(lambda:Content(None, lambda: ["traceback"]),
raises_value_error)
self.assertThat(lambda:Content(ContentType("text", "traceback"), None),
self.assertThat(lambda: Content(None, None), raises_value_error)
self.assertThat(
lambda: Content(None, lambda: ["traceback"]), raises_value_error)
self.assertThat(
lambda: Content(ContentType("text", "traceback"), None),
raises_value_error)
def test___init___sets_ivars(self):
@@ -64,12 +87,67 @@ class TestContent(TestCase):
content = Content(content_type, lambda: [iso_version])
self.assertEqual([text], list(content.iter_text()))
def test_from_file(self):
fd, path = tempfile.mkstemp()
self.addCleanup(os.remove, path)
os.write(fd, 'some data')
os.close(fd)
content = content_from_file(path, UTF8_TEXT, chunk_size=2)
self.assertThat(
list(content.iter_bytes()), Equals(['so', 'me', ' d', 'at', 'a']))
def test_from_nonexistent_file(self):
directory = tempfile.mkdtemp()
nonexistent = os.path.join(directory, 'nonexistent-file')
content = content_from_file(nonexistent)
self.assertThat(content.iter_bytes, raises(IOError))
def test_from_file_default_type(self):
content = content_from_file('/nonexistent/path')
self.assertThat(content.content_type, Equals(UTF8_TEXT))
def test_from_file_eager_loading(self):
fd, path = tempfile.mkstemp()
os.write(fd, 'some data')
os.close(fd)
content = content_from_file(path, UTF8_TEXT, buffer_now=True)
os.remove(path)
self.assertThat(
_b('').join(content.iter_bytes()), Equals('some data'))
def test_from_stream(self):
data = StringIO('some data')
content = content_from_stream(data, UTF8_TEXT, chunk_size=2)
self.assertThat(
list(content.iter_bytes()), Equals(['so', 'me', ' d', 'at', 'a']))
def test_from_stream_default_type(self):
data = StringIO('some data')
content = content_from_stream(data)
self.assertThat(content.content_type, Equals(UTF8_TEXT))
def test_from_stream_eager_loading(self):
fd, path = tempfile.mkstemp()
self.addCleanup(os.remove, path)
os.write(fd, 'some data')
stream = open(path, 'rb')
content = content_from_stream(stream, UTF8_TEXT, buffer_now=True)
os.write(fd, 'more data')
os.close(fd)
self.assertThat(
_b('').join(content.iter_bytes()), Equals('some data'))
def test_from_text(self):
data = _u("some data")
expected = Content(UTF8_TEXT, lambda: [data.encode('utf8')])
self.assertEqual(expected, text_content(data))
class TestTracebackContent(TestCase):
def test___init___None_errors(self):
self.assertThat(lambda:TracebackContent(None, None),
raises_value_error)
self.assertThat(
lambda: TracebackContent(None, None), raises_value_error)
def test___init___sets_ivars(self):
content = TracebackContent(an_exc_info, self)
@@ -81,12 +159,63 @@ class TestTracebackContent(TestCase):
self.assertEqual(expected, ''.join(list(content.iter_text())))
class TestBytesContent(TestCase):
class TestAttachFile(TestCase):
def test_bytes(self):
data = _u("some data")
expected = Content(UTF8_TEXT, lambda: [data.encode('utf8')])
self.assertEqual(expected, text_content(data))
def make_file(self, data):
fd, path = tempfile.mkstemp()
self.addCleanup(os.remove, path)
os.write(fd, data)
os.close(fd)
return path
def test_simple(self):
class SomeTest(TestCase):
def test_foo(self):
pass
test = SomeTest('test_foo')
data = 'some data'
path = self.make_file(data)
my_content = text_content(data)
attach_file(test, path, name='foo')
self.assertEqual({'foo': my_content}, test.getDetails())
def test_optional_name(self):
# If no name is provided, attach_file just uses the base name of the
# file.
class SomeTest(TestCase):
def test_foo(self):
pass
test = SomeTest('test_foo')
path = self.make_file('some data')
base_path = os.path.basename(path)
attach_file(test, path)
self.assertEqual([base_path], list(test.getDetails()))
def test_lazy_read(self):
class SomeTest(TestCase):
def test_foo(self):
pass
test = SomeTest('test_foo')
path = self.make_file('some data')
attach_file(test, path, name='foo', buffer_now=False)
content = test.getDetails()['foo']
content_file = open(path, 'w')
content_file.write('new data')
content_file.close()
self.assertEqual(''.join(content.iter_bytes()), 'new data')
def test_eager_read_by_default(self):
class SomeTest(TestCase):
def test_foo(self):
pass
test = SomeTest('test_foo')
path = self.make_file('some data')
attach_file(test, path, name='foo')
content = test.getDetails()['foo']
content_file = open(path, 'w')
content_file.write('new data')
content_file.close()
self.assertEqual(''.join(content.iter_bytes()), 'some data')
def test_suite():

View File

@@ -2,9 +2,9 @@
"""Tests for the test runner logic."""
from testtools.helpers import try_import, try_imports
from testtools.compat import StringIO
from testtools.helpers import try_import
fixtures = try_import('fixtures')
StringIO = try_imports(['StringIO.StringIO', 'io.StringIO'])
import testtools
from testtools import TestCase, run
@@ -41,7 +41,7 @@ class TestRun(TestCase):
def test_run_list(self):
if fixtures is None:
self.skipTest("Need fixtures")
package = self.useFixture(SampleTestFixture())
self.useFixture(SampleTestFixture())
out = StringIO()
run.main(['prog', '-l', 'testtools.runexample.test_suite'], out)
self.assertEqual("""testtools.runexample.TestFoo.test_bar
@@ -51,7 +51,7 @@ testtools.runexample.TestFoo.test_quux
def test_run_load_list(self):
if fixtures is None:
self.skipTest("Need fixtures")
package = self.useFixture(SampleTestFixture())
self.useFixture(SampleTestFixture())
out = StringIO()
# We load two tests - one that exists and one that doesn't, and we
# should get the one that exists and neither the one that doesn't nor
@@ -71,6 +71,7 @@ testtools.runexample.missingtest
self.assertEqual("""testtools.runexample.TestFoo.test_bar
""", out.getvalue())
def test_suite():
from unittest import TestLoader
return TestLoader().loadTestsFromName(__name__)

View File

@@ -22,7 +22,6 @@ from testtools import (
TextTestResult,
ThreadsafeForwardingResult,
testresult,
try_imports,
)
from testtools.compat import (
_b,
@@ -30,6 +29,7 @@ from testtools.compat import (
_r,
_u,
str_is_unicode,
StringIO,
)
from testtools.content import Content
from testtools.content_type import ContentType, UTF8_TEXT
@@ -47,8 +47,6 @@ from testtools.tests.helpers import (
)
from testtools.testresult.real import utc
StringIO = try_imports(['StringIO.StringIO', 'io.StringIO'])
class Python26Contract(object):

View File

@@ -3,7 +3,9 @@
"""Tests for extensions to the base test library."""
from pprint import pformat
import os
import sys
import tempfile
import unittest
from testtools import (