Tweaks for Jono's consideration.

This commit is contained in:
Robert Collins
2011-02-19 21:54:14 +13:00
parent da41ba01b6
commit 8a48f927c4
2 changed files with 75 additions and 31 deletions

View File

@@ -14,10 +14,12 @@ __all__ = [
import codecs
import os
from testtools import try_import
from testtools.compat import _b
from testtools.content_type import ContentType, UTF8_TEXT
from testtools.testresult import TestResult
functools = try_import('functools')
_join_b = _b("").join
@@ -127,8 +129,42 @@ def text_content(text):
return Content(UTF8_TEXT, lambda: [text.encode('utf8')])
def maybe_wrap(wrapper, func):
"""Merge metadata for func into wrapper if functools is present."""
if functools is not None:
wrapper = functools.update_wrapper(wrapper, func)
return wrapper
def _default_parameter(offset, name, default):
"""Create a decorator which will default a parameter to a value.
:param offset: the offset if the parameter is supplied in *args.
e.g. in 'def foo(bar, quux)' the offset of quux is 2.
:param name: The key for the parameter if supplied in **kwargs.
:param default: The default value to use for the parameter.
"""
def decorator(func):
def wrapper(*args, **kwargs):
if len(args) < offset:
if kwargs.get(name, None) is None:
kwargs[name] = default
elif args[offset-1] is None:
args = args[:offset-1] + (default,) + args[offset:]
return func(*args, **kwargs)
return maybe_wrap(wrapper, func)
return decorator
_set_content_type = _default_parameter(2, 'content_type', UTF8_TEXT)
_set_chunk_size = _default_parameter(3, 'chunk_size', DEFAULT_CHUNK_SIZE)
@_set_content_type
@_set_chunk_size
def content_from_file(path, content_type=None, chunk_size=None,
lazy_read=True):
buffer_now=False):
"""Create a `Content` object from a file on disk.
Note that unless 'read_now' is explicitly passed in as True, the file
@@ -139,52 +175,59 @@ def content_from_file(path, content_type=None, chunk_size=None,
to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file.
Defaults to `DEFAULT_CHUNK_SIZE`.
:param lazy_read: If False, read the file from disk now and keep it in
:param buffer_now: If True, read the file from disk now and keep it in
memory. Otherwise, only read when the content is serialized.
"""
if content_type is None:
content_type = UTF8_TEXT
if chunk_size is None:
chunk_size = DEFAULT_CHUNK_SIZE
def reader():
# This should be try:finally:, but python2.4 makes that hard. When
# We drop older python support we can make this use a context manager
# for maximum simplicity.
stream = open(path, 'rb')
for chunk in _iter_chunks(stream, chunk_size):
yield chunk
stream.close()
if not lazy_read:
contents = list(reader())
reader = lambda: contents
return Content(content_type, reader)
return content_from_reader(reader, content_type, buffer_now)
@_set_content_type
@_set_chunk_size
def content_from_stream(stream, content_type=None, chunk_size=None,
lazy_read=True):
buffer_now=False):
"""Create a `Content` object from a file-like stream.
Note that the stream will only be read from when ``iter_bytes`` is
called.
:param stream: A file-like object to read the content from.
:param stream: A file-like object to read the content from. The stream
is not closed by this function or the content object it returns.
:param content_type: The type of content. If not specified, defaults
to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file.
Defaults to `DEFAULT_CHUNK_SIZE`.
:param lazy_read: If False, reads from the stream right now. Otherwise,
only reads when the content is serialized. Defaults to True.
:param buffer_now: If True, reads from the stream right now. Otherwise,
only reads when the content is serialized. Defaults to False.
"""
if content_type is None:
content_type = UTF8_TEXT
if chunk_size is None:
chunk_size = DEFAULT_CHUNK_SIZE
reader = lambda: _iter_chunks(stream, chunk_size)
if not lazy_read:
return content_from_reader(reader, content_type, buffer_now)
def content_from_reader(reader, content_type, buffer_now):
"""Create a Content object that will obtain the content from reader.
:param reader: A callback to read the content. Should return an iterable of
bytestrings.
:param content_type: The content type to create.
:param buffer_now: If True the reader is evaluated immediately and
buffered.
"""
if buffer_now:
contents = list(reader())
reader = lambda: contents
return Content(content_type, reader)
def attach_file(detailed, path, name=None, content_type=None,
chunk_size=None, lazy_read=False):
chunk_size=None, buffer_now=True):
"""Attach a file to this test as a detail.
This is a convenience method wrapping around `addDetail`.
@@ -200,17 +243,18 @@ def attach_file(detailed, path, name=None, content_type=None,
defaults to UTF8-encoded text/plain.
:param chunk_size: The size of chunks to read from the file. Defaults
to something sensible.
:param lazy_read: If True the file content is not read when attach_file is
called, but later when the content object is evaluated. Note that this
may be after any cleanups that obj_with_details has, so if the file is
a temporary file lazy_read may cause the file to be read after it is
deleted. To handle those cases, using attach_file as a cleanup is
recommended::
:param buffer_now: If False the file content is read when the content
object is evaluated rather than when attach_file is called.
Note that this may be after any cleanups that obj_with_details has, so
if the file is a temporary file disabling buffer_now may cause the file
to be read after it is deleted. To handle those cases, using
attach_file as a cleanup is recommended because it guarantees a
sequence for when the attach_file call is made::
detailed.addCleanup(attach_file, 'foo.txt', detailed)
"""
if name is None:
name = os.path.basename(os.path.abspath(path))
name = os.path.basename(path)
content_object = content_from_file(
path, content_type, chunk_size, lazy_read)
path, content_type, chunk_size, buffer_now)
detailed.addDetail(name, content_object)

View File

@@ -110,7 +110,7 @@ class TestContent(TestCase):
fd, path = tempfile.mkstemp()
os.write(fd, 'some data')
os.close(fd)
content = content_from_file(path, UTF8_TEXT, lazy_read=False)
content = content_from_file(path, UTF8_TEXT, buffer_now=True)
os.remove(path)
self.assertThat(
_b('').join(content.iter_bytes()), Equals('some data'))
@@ -131,7 +131,7 @@ class TestContent(TestCase):
self.addCleanup(os.remove, path)
os.write(fd, 'some data')
stream = open(path, 'rb')
content = content_from_stream(stream, UTF8_TEXT, lazy_read=False)
content = content_from_stream(stream, UTF8_TEXT, buffer_now=True)
os.write(fd, 'more data')
os.close(fd)
self.assertThat(
@@ -197,7 +197,7 @@ class TestAttachFile(TestCase):
pass
test = SomeTest('test_foo')
path = self.make_file('some data')
attach_file(test, path, name='foo', lazy_read=True)
attach_file(test, path, name='foo', buffer_now=False)
content = test.getDetails()['foo']
content_file = open(path, 'w')
content_file.write('new data')