167 lines
5.5 KiB
Python
167 lines
5.5 KiB
Python
"""
|
|
This code is adapted from the Werkzeug project, under the BSD license.
|
|
|
|
:copyright: (c) 2011 by the Werkzeug Team, see AUTHORS for more details.
|
|
:license: BSD, see LICENSE for more details.
|
|
"""
|
|
|
|
import os
|
|
import mimetypes
|
|
from datetime import datetime
|
|
from time import gmtime
|
|
|
|
import six
|
|
|
|
|
|
class FileWrapper(object):
|
|
"""This class can be used to convert a :class:`file`-like object into
|
|
an iterable. It yields `buffer_size` blocks until the file is fully
|
|
read.
|
|
|
|
You should not use this class directly but rather use the
|
|
:func:`wrap_file` function that uses the WSGI server's file wrapper
|
|
support if it's available.
|
|
|
|
:param file: a :class:`file`-like object with a :meth:`~file.read` method.
|
|
:param buffer_size: number of bytes for one iteration.
|
|
"""
|
|
|
|
def __init__(self, file, buffer_size=8192):
|
|
self.file = file
|
|
self.buffer_size = buffer_size
|
|
|
|
def close(self):
|
|
if hasattr(self.file, 'close'):
|
|
self.file.close()
|
|
|
|
def __iter__(self):
|
|
return self
|
|
|
|
def next(self):
|
|
data = self.file.read(self.buffer_size)
|
|
if data:
|
|
return data
|
|
raise StopIteration()
|
|
|
|
|
|
if six.PY3:
|
|
FileWrapper.__next__ = FileWrapper.next
|
|
|
|
|
|
def wrap_file(environ, file, buffer_size=8192):
|
|
"""Wraps a file. This uses the WSGI server's file wrapper if available
|
|
or otherwise the generic :class:`FileWrapper`.
|
|
|
|
If the file wrapper from the WSGI server is used it's important to not
|
|
iterate over it from inside the application but to pass it through
|
|
unchanged.
|
|
|
|
More information about file wrappers are available in :pep:`333`.
|
|
|
|
:param file: a :class:`file`-like object with a :meth:`~file.read` method.
|
|
:param buffer_size: number of bytes for one iteration.
|
|
"""
|
|
return environ.get('wsgi.file_wrapper', FileWrapper)(file, buffer_size)
|
|
|
|
|
|
def _dump_date(d, delim):
|
|
"""Used for `http_date` and `cookie_date`."""
|
|
if d is None:
|
|
d = gmtime()
|
|
elif isinstance(d, datetime):
|
|
d = d.utctimetuple()
|
|
elif isinstance(d, (int, float)):
|
|
d = gmtime(d)
|
|
return '%s, %02d%s%s%s%s %02d:%02d:%02d GMT' % (
|
|
('Mon', 'Tue', 'Wed', 'Thu', 'Fri', 'Sat', 'Sun')[d.tm_wday],
|
|
d.tm_mday, delim,
|
|
('Jan', 'Feb', 'Mar', 'Apr', 'May', 'Jun', 'Jul', 'Aug', 'Sep',
|
|
'Oct', 'Nov', 'Dec')[d.tm_mon - 1],
|
|
delim, str(d.tm_year), d.tm_hour, d.tm_min, d.tm_sec
|
|
)
|
|
|
|
|
|
def http_date(timestamp=None):
|
|
"""Formats the time to match the RFC1123 date format.
|
|
|
|
Accepts a floating point number expressed in seconds since the epoch in, a
|
|
datetime object or a timetuple. All times in UTC.
|
|
|
|
Outputs a string in the format ``Wdy, DD Mon YYYY HH:MM:SS GMT``.
|
|
|
|
:param timestamp: If provided that date is used, otherwise the current.
|
|
"""
|
|
return _dump_date(timestamp, ' ')
|
|
|
|
|
|
class StaticFileMiddleware(object):
|
|
"""A WSGI middleware that provides static content for development
|
|
environments.
|
|
|
|
Currently the middleware does not support non ASCII filenames. If the
|
|
encoding on the file system happens to be the encoding of the URI it may
|
|
work but this could also be by accident. We strongly suggest using ASCII
|
|
only file names for static files.
|
|
|
|
The middleware will guess the mimetype using the Python `mimetype`
|
|
module. If it's unable to figure out the charset it will fall back
|
|
to `fallback_mimetype`.
|
|
|
|
:param app: the application to wrap. If you don't want to wrap an
|
|
application you can pass it :exc:`NotFound`.
|
|
:param directory: the directory to serve up.
|
|
:param fallback_mimetype: the fallback mimetype for unknown files.
|
|
"""
|
|
|
|
def __init__(self, app, directory, fallback_mimetype='text/plain'):
|
|
self.app = app
|
|
self.loader = self.get_directory_loader(directory)
|
|
self.fallback_mimetype = fallback_mimetype
|
|
|
|
def _opener(self, filename):
|
|
return lambda: (
|
|
open(filename, 'rb'),
|
|
datetime.utcfromtimestamp(os.path.getmtime(filename)),
|
|
int(os.path.getsize(filename))
|
|
)
|
|
|
|
def get_directory_loader(self, directory):
|
|
def loader(path):
|
|
path = path or directory
|
|
if path is not None:
|
|
path = os.path.join(directory, path)
|
|
if os.path.isfile(path):
|
|
return os.path.basename(path), self._opener(path)
|
|
return None, None
|
|
return loader
|
|
|
|
def __call__(self, environ, start_response):
|
|
# sanitize the path for non unix systems
|
|
cleaned_path = environ.get('PATH_INFO', '').strip('/')
|
|
for sep in os.sep, os.altsep:
|
|
if sep and sep != '/':
|
|
cleaned_path = cleaned_path.replace(sep, '/')
|
|
path = '/'.join([''] + [x for x in cleaned_path.split('/')
|
|
if x and x != '..'])
|
|
|
|
# attempt to find a loader for the file
|
|
real_filename, file_loader = self.loader(path[1:])
|
|
if file_loader is None:
|
|
return self.app(environ, start_response)
|
|
|
|
# serve the file with the appropriate name if we found it
|
|
guessed_type = mimetypes.guess_type(real_filename)
|
|
mime_type = guessed_type[0] or self.fallback_mimetype
|
|
f, mtime, file_size = file_loader()
|
|
|
|
headers = [('Date', http_date())]
|
|
headers.append(('Cache-Control', 'public'))
|
|
headers.extend((
|
|
('Content-Type', mime_type),
|
|
('Content-Length', str(file_size)),
|
|
('Last-Modified', http_date(mtime))
|
|
))
|
|
|
|
start_response('200 OK', headers)
|
|
return wrap_file(environ, f)
|