Replacing the Paste static file serving middleware and cascade
middleware with a heavily-simplified implementation from Werkzeug, which was thankfully BSD-licensed. Includes a full suite of tests, providing 100% coverage.
This commit is contained in:
@@ -1,8 +1,6 @@
|
||||
from paste.cascade import Cascade
|
||||
from paste.errordocument import make_errordocument
|
||||
from paste.recursive import RecursiveMiddleware
|
||||
from paste.translogger import TransLogger
|
||||
from paste.urlparser import StaticURLParser
|
||||
from weberror.errormiddleware import ErrorMiddleware
|
||||
from weberror.evalexception import EvalException
|
||||
|
||||
@@ -13,10 +11,14 @@ from core import (
|
||||
from decorators import expose
|
||||
from hooks import RequestViewerHook
|
||||
from templating import error_formatters
|
||||
from static import SharedDataMiddleware
|
||||
|
||||
from configuration import set_config
|
||||
from configuration import _runtime_conf as conf
|
||||
|
||||
import os
|
||||
|
||||
|
||||
__all__ = [
|
||||
'make_app', 'load_app', 'Pecan', 'request', 'response',
|
||||
'override_template', 'expose', 'conf', 'set_config', 'render',
|
||||
@@ -54,7 +56,7 @@ def make_app(root, static_root=None, debug=False, errorcfg={},
|
||||
app = make_errordocument(app, conf, **dict(conf.app.errors))
|
||||
# Support for serving static files (for development convenience)
|
||||
if static_root:
|
||||
app = Cascade([StaticURLParser(static_root), app])
|
||||
app = SharedDataMiddleware(app, static_root)
|
||||
# Support for simple Apache-style logs
|
||||
if isinstance(logging, dict) or logging == True:
|
||||
app = TransLogger(app, **(isinstance(logging, dict) and logging or {}))
|
||||
|
||||
161
pecan/static.py
Normal file
161
pecan/static.py
Normal file
@@ -0,0 +1,161 @@
|
||||
"""
|
||||
This code is adapted from the Werkzeug project, under the BSD license.
|
||||
|
||||
:copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
|
||||
:license: BSD, see LICENSE for more details.
|
||||
"""
|
||||
|
||||
import os
|
||||
import mimetypes
|
||||
from datetime import datetime
|
||||
from time import gmtime
|
||||
|
||||
|
||||
class FileWrapper(object):
|
||||
"""This class can be used to convert a :class:`file`-like object into
|
||||
an iterable. It yields `buffer_size` blocks until the file is fully
|
||||
read.
|
||||
|
||||
You should not use this class directly but rather use the
|
||||
:func:`wrap_file` function that uses the WSGI server's file wrapper
|
||||
support if it's available.
|
||||
|
||||
:param file: a :class:`file`-like object with a :meth:`~file.read` method.
|
||||
:param buffer_size: number of bytes for one iteration.
|
||||
"""
|
||||
|
||||
def __init__(self, file, buffer_size=8192):
|
||||
self.file = file
|
||||
self.buffer_size = buffer_size
|
||||
|
||||
def close(self):
|
||||
if hasattr(self.file, 'close'):
|
||||
self.file.close()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
data = self.file.read(self.buffer_size)
|
||||
if data:
|
||||
return data
|
||||
raise StopIteration()
|
||||
|
||||
|
||||
def wrap_file(environ, file, buffer_size=8192):
|
||||
"""Wraps a file. This uses the WSGI server's file wrapper if available
|
||||
or otherwise the generic :class:`FileWrapper`.
|
||||
|
||||
If the file wrapper from the WSGI server is used it's important to not
|
||||
iterate over it from inside the application but to pass it through
|
||||
unchanged.
|
||||
|
||||
More information about file wrappers are available in :pep:`333`.
|
||||
|
||||
:param file: a :class:`file`-like object with a :meth:`~file.read` method.
|
||||
:param buffer_size: number of bytes for one iteration.
|
||||
"""
|
||||
return environ.get('wsgi.file_wrapper', FileWrapper)(file, buffer_size)
|
||||
|
||||
|
||||
def _dump_date(d, delim):
|
||||
"""Used for `http_date` and `cookie_date`."""
|
||||
if d is None:
|
||||
d = gmtime()
|
||||
elif isinstance(d, datetime):
|
||||
d = d.utctimetuple()
|
||||
elif isinstance(d, (int, long, float)):
|
||||
d = gmtime(d)
|
||||
return '%s, %02d%s%s%s%s %02d:%02d:%02d GMT' % (
|
||||
('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')[d.tm_wday],
|
||||
d.tm_mday, delim,
|
||||
('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep',
|
||||
'Oct', 'Nov', 'Dec')[d.tm_mon - 1],
|
||||
delim, str(d.tm_year), d.tm_hour, d.tm_min, d.tm_sec
|
||||
)
|
||||
|
||||
|
||||
def http_date(timestamp=None):
|
||||
"""Formats the time to match the RFC1123 date format.
|
||||
|
||||
Accepts a floating point number expressed in seconds since the epoch in, a
|
||||
datetime object or a timetuple. All times in UTC.
|
||||
|
||||
Outputs a string in the format ``Wdy, DD Mon YYYY HH:MM:SS GMT``.
|
||||
|
||||
:param timestamp: If provided that date is used, otherwise the current.
|
||||
"""
|
||||
return _dump_date(timestamp, ' ')
|
||||
|
||||
|
||||
class SharedDataMiddleware(object):
|
||||
"""A WSGI middleware that provides static content for development
|
||||
environments.
|
||||
|
||||
Currently the middleware does not support non ASCII filenames. If the
|
||||
encoding on the file system happens to be the encoding of the URI it may
|
||||
work but this could also be by accident. We strongly suggest using ASCII
|
||||
only file names for static files.
|
||||
|
||||
The middleware will guess the mimetype using the Python `mimetype`
|
||||
module. If it's unable to figure out the charset it will fall back
|
||||
to `fallback_mimetype`.
|
||||
|
||||
:param app: the application to wrap. If you don't want to wrap an
|
||||
application you can pass it :exc:`NotFound`.
|
||||
:param directory: the directory to serve up.
|
||||
:param fallback_mimetype: the fallback mimetype for unknown files.
|
||||
"""
|
||||
|
||||
def __init__(self, app, directory, fallback_mimetype='text/plain'):
|
||||
self.app = app
|
||||
self.directory = directory
|
||||
self.loader = self.get_directory_loader(directory)
|
||||
self.fallback_mimetype = fallback_mimetype
|
||||
|
||||
def _opener(self, filename):
|
||||
return lambda: (
|
||||
open(filename, 'rb'),
|
||||
datetime.utcfromtimestamp(os.path.getmtime(filename)),
|
||||
int(os.path.getsize(filename))
|
||||
)
|
||||
|
||||
def get_directory_loader(self, directory):
|
||||
def loader(path):
|
||||
path = path or directory
|
||||
if path is not None:
|
||||
path = os.path.join(directory, path)
|
||||
if os.path.isfile(path):
|
||||
return os.path.basename(path), self._opener(path)
|
||||
return None, None
|
||||
return loader
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
# sanitize the path for non unix systems
|
||||
cleaned_path = environ.get('PATH_INFO', '').strip('/')
|
||||
for sep in os.sep, os.altsep:
|
||||
if sep and sep != '/':
|
||||
cleaned_path = cleaned_path.replace(sep, '/')
|
||||
path = '/'.join([''] + [x for x in cleaned_path.split('/')
|
||||
if x and x != '..'])
|
||||
|
||||
# attempt to find a loader for the file
|
||||
real_filename, file_loader = self.loader(path[1:])
|
||||
if file_loader is None:
|
||||
return self.app(environ, start_response)
|
||||
|
||||
# serve the file with the appropriate name if we found it
|
||||
guessed_type = mimetypes.guess_type(real_filename)
|
||||
mime_type = guessed_type[0] or self.fallback_mimetype
|
||||
f, mtime, file_size = file_loader()
|
||||
|
||||
headers = [('Date', http_date())]
|
||||
headers.append(('Cache-Control', 'public'))
|
||||
headers.extend((
|
||||
('Content-Type', mime_type),
|
||||
('Content-Length', str(file_size)),
|
||||
('Last-Modified', http_date(mtime))
|
||||
))
|
||||
|
||||
start_response('200 OK', headers)
|
||||
return wrap_file(environ, f)
|
||||
BIN
pecan/tests/static/self.png
Normal file
BIN
pecan/tests/static/self.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 6.8 KiB |
65
pecan/tests/test_static.py
Normal file
65
pecan/tests/test_static.py
Normal file
@@ -0,0 +1,65 @@
|
||||
from unittest import TestCase
|
||||
from pecan.static import SharedDataMiddleware, FileWrapper, _dump_date
|
||||
|
||||
import os
|
||||
|
||||
|
||||
class TestStaticFileMiddleware(TestCase):
|
||||
|
||||
def setUp(self):
|
||||
def app(environ, start_response):
|
||||
response_headers = [('Content-type', 'text/plain')]
|
||||
start_response('200 OK', response_headers)
|
||||
return ['Hello world!\n']
|
||||
|
||||
self.app = SharedDataMiddleware(
|
||||
app, os.path.dirname(__file__)
|
||||
)
|
||||
|
||||
self._status = None
|
||||
self._response_headers = None
|
||||
|
||||
def _request(self, path):
|
||||
def start_response(status, response_headers, exc_info=None):
|
||||
self._status = status
|
||||
self._response_headers = response_headers
|
||||
return self.app(
|
||||
dict(PATH_INFO=path),
|
||||
start_response
|
||||
)
|
||||
|
||||
def _get_response_header(self, header):
|
||||
for k, v in self._response_headers:
|
||||
if k.upper() == header.upper():
|
||||
return v
|
||||
return None
|
||||
|
||||
def test_file_can_be_found(self):
|
||||
result = self._request('/static/text.txt')
|
||||
assert isinstance(result, FileWrapper)
|
||||
|
||||
def test_no_file_found_causes_passthrough(self):
|
||||
result = self._request('/static/nosuchfile.txt')
|
||||
assert not isinstance(result, FileWrapper)
|
||||
assert result == ['Hello world!\n']
|
||||
|
||||
def test_mime_type_works_for_png_files(self):
|
||||
self._request('/static/self.png')
|
||||
assert self._get_response_header('Content-Type') == 'image/png'
|
||||
|
||||
def test_file_can_be_closed(self):
|
||||
result = self._request('/static/text.txt')
|
||||
assert result.close() == None
|
||||
|
||||
def test_file_can_be_iterated_over(self):
|
||||
result = self._request('/static/text.txt')
|
||||
assert len([x for x in result])
|
||||
|
||||
def test_date_dumping_on_unix_timestamps(self):
|
||||
result = _dump_date(1331755274.59, ' ')
|
||||
assert result == 'Wed, 14 Mar 2012 20:01:14 GMT'
|
||||
|
||||
def test_separator_sanitization_still_finds_file(self):
|
||||
os.altsep = ':'
|
||||
result = self._request(':static:text.txt')
|
||||
assert isinstance(result, FileWrapper)
|
||||
Reference in New Issue
Block a user