
Otherwise, we waste a request on some 416/206 response that won't be helpful. To do this, add a new X-Backend-Ignore-Range-If-Metadata-Present header whose value is a comma-separated list of header names. Middlewares may include this header to tell object-servers to send the whole object (rather than a 206 or 416) if *any* of the metadata are present. Have dlo and symlink use it, too; it won't save us any round-trips, but it should clean up some object-server logging. Change-Id: I4ff2a178d0456e7e37d561109ef57dd0d92cbd4e
469 lines
20 KiB
Python
469 lines
20 KiB
Python
# Copyright (c) 2013 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
"""
|
|
Middleware that will provide Dynamic Large Object (DLO) support.
|
|
|
|
---------------
|
|
Using ``swift``
|
|
---------------
|
|
|
|
The quickest way to try out this feature is use the ``swift`` Swift Tool
|
|
included with the `python-swiftclient`_ library. You can use the ``-S``
|
|
option to specify the segment size to use when splitting a large file. For
|
|
example::
|
|
|
|
swift upload test_container -S 1073741824 large_file
|
|
|
|
This would split the large_file into 1G segments and begin uploading those
|
|
segments in parallel. Once all the segments have been uploaded, ``swift`` will
|
|
then create the manifest file so the segments can be downloaded as one.
|
|
|
|
So now, the following ``swift`` command would download the entire large
|
|
object::
|
|
|
|
swift download test_container large_file
|
|
|
|
``swift`` command uses a strict convention for its segmented object
|
|
support. In the above example it will upload all the segments into a
|
|
second container named test_container_segments. These segments will
|
|
have names like large_file/1290206778.25/21474836480/00000000,
|
|
large_file/1290206778.25/21474836480/00000001, etc.
|
|
|
|
The main benefit for using a separate container is that the main container
|
|
listings will not be polluted with all the segment names. The reason for using
|
|
the segment name format of <name>/<timestamp>/<size>/<segment> is so that an
|
|
upload of a new file with the same name won't overwrite the contents of the
|
|
first until the last moment when the manifest file is updated.
|
|
|
|
``swift`` will manage these segment files for you, deleting old segments on
|
|
deletes and overwrites, etc. You can override this behavior with the
|
|
``--leave-segments`` option if desired; this is useful if you want to have
|
|
multiple versions of the same large object available.
|
|
|
|
.. _`python-swiftclient`: http://github.com/openstack/python-swiftclient
|
|
|
|
----------
|
|
Direct API
|
|
----------
|
|
|
|
You can also work with the segments and manifests directly with HTTP
|
|
requests instead of having ``swift`` do that for you. You can just
|
|
upload the segments like you would any other object and the manifest
|
|
is just a zero-byte (not enforced) file with an extra
|
|
``X-Object-Manifest`` header.
|
|
|
|
All the object segments need to be in the same container, have a common object
|
|
name prefix, and sort in the order in which they should be concatenated.
|
|
Object names are sorted lexicographically as UTF-8 byte strings.
|
|
They don't have to be in the same container as the manifest file will be, which
|
|
is useful to keep container listings clean as explained above with ``swift``.
|
|
|
|
The manifest file is simply a zero-byte (not enforced) file with the extra
|
|
``X-Object-Manifest: <container>/<prefix>`` header, where ``<container>`` is
|
|
the container the object segments are in and ``<prefix>`` is the common prefix
|
|
for all the segments.
|
|
|
|
It is best to upload all the segments first and then create or update the
|
|
manifest. In this way, the full object won't be available for downloading
|
|
until the upload is complete. Also, you can upload a new set of segments to
|
|
a second location and then update the manifest to point to this new location.
|
|
During the upload of the new segments, the original manifest will still be
|
|
available to download the first set of segments.
|
|
|
|
.. note::
|
|
|
|
When updating a manifest object using a POST request, a
|
|
``X-Object-Manifest`` header must be included for the object to
|
|
continue to behave as a manifest object.
|
|
|
|
The manifest file should have no content. However, this is not enforced.
|
|
If the manifest path itself conforms to container/prefix specified in
|
|
``X-Object-Manifest``, and if manifest has some content/data in it, it
|
|
would also be considered as segment and manifest's content will be part of
|
|
the concatenated GET response. The order of concatenation follows the usual
|
|
DLO logic which is - the order of concatenation adheres to order returned
|
|
when segment names are sorted.
|
|
|
|
|
|
Here's an example using ``curl`` with tiny 1-byte segments::
|
|
|
|
# First, upload the segments
|
|
curl -X PUT -H 'X-Auth-Token: <token>' \
|
|
http://<storage_url>/container/myobject/00000001 --data-binary '1'
|
|
curl -X PUT -H 'X-Auth-Token: <token>' \
|
|
http://<storage_url>/container/myobject/00000002 --data-binary '2'
|
|
curl -X PUT -H 'X-Auth-Token: <token>' \
|
|
http://<storage_url>/container/myobject/00000003 --data-binary '3'
|
|
|
|
# Next, create the manifest file
|
|
curl -X PUT -H 'X-Auth-Token: <token>' \
|
|
-H 'X-Object-Manifest: container/myobject/' \
|
|
http://<storage_url>/container/myobject --data-binary ''
|
|
|
|
# And now we can download the segments as a single object
|
|
curl -H 'X-Auth-Token: <token>' \
|
|
http://<storage_url>/container/myobject
|
|
"""
|
|
|
|
import json
|
|
|
|
import six
|
|
|
|
from hashlib import md5
|
|
from swift.common import constraints
|
|
from swift.common.exceptions import ListingIterError, SegmentError
|
|
from swift.common.http import is_success
|
|
from swift.common.swob import Request, Response, \
|
|
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict, \
|
|
str_to_wsgi, wsgi_to_str, wsgi_quote, wsgi_unquote, normalize_etag
|
|
from swift.common.utils import get_logger, \
|
|
RateLimitedIterator, quote, close_if_possible, closing_if_possible
|
|
from swift.common.request_helpers import SegmentedIterable, \
|
|
update_ignore_range_header
|
|
from swift.common.wsgi import WSGIContext, make_subrequest, load_app_config
|
|
|
|
|
|
class GetContext(WSGIContext):
|
|
def __init__(self, dlo, logger):
|
|
super(GetContext, self).__init__(dlo.app)
|
|
self.dlo = dlo
|
|
self.logger = logger
|
|
|
|
def _get_container_listing(self, req, version, account, container,
|
|
prefix, marker=''):
|
|
'''
|
|
:param version: whatever
|
|
:param account: native
|
|
:param container: native
|
|
:param prefix: native
|
|
:param marker: native
|
|
'''
|
|
con_req = make_subrequest(
|
|
req.environ,
|
|
path=wsgi_quote('/'.join([
|
|
'', str_to_wsgi(version),
|
|
str_to_wsgi(account), str_to_wsgi(container)])),
|
|
method='GET',
|
|
headers={'x-auth-token': req.headers.get('x-auth-token')},
|
|
agent=('%(orig)s ' + 'DLO MultipartGET'), swift_source='DLO')
|
|
con_req.query_string = 'prefix=%s' % quote(prefix)
|
|
if marker:
|
|
con_req.query_string += '&marker=%s' % quote(marker)
|
|
|
|
con_resp = con_req.get_response(self.dlo.app)
|
|
if not is_success(con_resp.status_int):
|
|
if req.method == 'HEAD':
|
|
con_resp.body = b''
|
|
return con_resp, None
|
|
with closing_if_possible(con_resp.app_iter):
|
|
return None, json.loads(b''.join(con_resp.app_iter))
|
|
|
|
def _segment_listing_iterator(self, req, version, account, container,
|
|
prefix, segments, first_byte=None,
|
|
last_byte=None):
|
|
'''
|
|
:param req: upstream request
|
|
:param version: native
|
|
:param account: native
|
|
:param container: native
|
|
:param prefix: native
|
|
:param segments: array of dicts, with native strings
|
|
:param first_byte: number
|
|
:param last_byte: number
|
|
'''
|
|
# It's sort of hokey that this thing takes in the first page of
|
|
# segments as an argument, but we need to compute the etag and content
|
|
# length from the first page, and it's better to have a hokey
|
|
# interface than to make redundant requests.
|
|
if first_byte is None:
|
|
first_byte = 0
|
|
if last_byte is None:
|
|
last_byte = float("inf")
|
|
|
|
while True:
|
|
for segment in segments:
|
|
seg_length = int(segment['bytes'])
|
|
|
|
if first_byte >= seg_length:
|
|
# don't need any bytes from this segment
|
|
first_byte = max(first_byte - seg_length, -1)
|
|
last_byte = max(last_byte - seg_length, -1)
|
|
continue
|
|
elif last_byte < 0:
|
|
# no bytes are needed from this or any future segment
|
|
break
|
|
|
|
seg_name = segment['name']
|
|
if six.PY2:
|
|
seg_name = seg_name.encode("utf-8")
|
|
|
|
# We deliberately omit the etag and size here;
|
|
# SegmentedIterable will check size and etag if
|
|
# specified, but we don't want it to. DLOs only care
|
|
# that the objects' names match the specified prefix.
|
|
# SegmentedIterable will instead check that the data read
|
|
# from each segment matches the response headers.
|
|
_path = "/".join(["", version, account, container, seg_name])
|
|
_first = None if first_byte <= 0 else first_byte
|
|
_last = None if last_byte >= seg_length - 1 else last_byte
|
|
yield {
|
|
'path': _path,
|
|
'first_byte': _first,
|
|
'last_byte': _last
|
|
}
|
|
|
|
first_byte = max(first_byte - seg_length, -1)
|
|
last_byte = max(last_byte - seg_length, -1)
|
|
|
|
if len(segments) < constraints.CONTAINER_LISTING_LIMIT:
|
|
# a short page means that we're done with the listing
|
|
break
|
|
elif last_byte < 0:
|
|
break
|
|
|
|
marker = segments[-1]['name']
|
|
error_response, segments = self._get_container_listing(
|
|
req, version, account, container, prefix, marker)
|
|
if error_response:
|
|
# we've already started sending the response body to the
|
|
# client, so all we can do is raise an exception to make the
|
|
# WSGI server close the connection early
|
|
close_if_possible(error_response.app_iter)
|
|
raise ListingIterError(
|
|
"Got status %d listing container /%s/%s" %
|
|
(error_response.status_int, account, container))
|
|
|
|
def get_or_head_response(self, req, x_object_manifest):
|
|
'''
|
|
:param req: user's request
|
|
:param x_object_manifest: as unquoted, native string
|
|
'''
|
|
response_headers = self._response_headers
|
|
|
|
container, obj_prefix = x_object_manifest.split('/', 1)
|
|
|
|
version, account, _junk = req.split_path(2, 3, True)
|
|
version = wsgi_to_str(version)
|
|
account = wsgi_to_str(account)
|
|
error_response, segments = self._get_container_listing(
|
|
req, version, account, container, obj_prefix)
|
|
if error_response:
|
|
return error_response
|
|
have_complete_listing = len(segments) < \
|
|
constraints.CONTAINER_LISTING_LIMIT
|
|
|
|
first_byte = last_byte = None
|
|
actual_content_length = None
|
|
content_length_for_swob_range = None
|
|
if req.range and len(req.range.ranges) == 1:
|
|
content_length_for_swob_range = sum(o['bytes'] for o in segments)
|
|
|
|
# This is a hack to handle suffix byte ranges (e.g. "bytes=-5"),
|
|
# which we can't honor unless we have a complete listing.
|
|
_junk, range_end = req.range.ranges_for_length(float("inf"))[0]
|
|
|
|
# If this is all the segments, we know whether or not this
|
|
# range request is satisfiable.
|
|
#
|
|
# Alternately, we may not have all the segments, but this range
|
|
# falls entirely within the first page's segments, so we know
|
|
# that it is satisfiable.
|
|
if (have_complete_listing
|
|
or range_end < content_length_for_swob_range):
|
|
byteranges = req.range.ranges_for_length(
|
|
content_length_for_swob_range)
|
|
if not byteranges:
|
|
headers = {'Accept-Ranges': 'bytes'}
|
|
if have_complete_listing:
|
|
headers['Content-Range'] = 'bytes */%d' % (
|
|
content_length_for_swob_range, )
|
|
return HTTPRequestedRangeNotSatisfiable(
|
|
request=req, headers=headers)
|
|
first_byte, last_byte = byteranges[0]
|
|
# For some reason, swob.Range.ranges_for_length adds 1 to the
|
|
# last byte's position.
|
|
last_byte -= 1
|
|
actual_content_length = last_byte - first_byte + 1
|
|
else:
|
|
# The range may or may not be satisfiable, but we can't tell
|
|
# based on just one page of listing, and we're not going to go
|
|
# get more pages because that would use up too many resources,
|
|
# so we ignore the Range header and return the whole object.
|
|
actual_content_length = None
|
|
content_length_for_swob_range = None
|
|
req.range = None
|
|
else:
|
|
req.range = None
|
|
|
|
response_headers = [
|
|
(h, v) for h, v in response_headers
|
|
if h.lower() not in ("content-length", "content-range")]
|
|
|
|
if content_length_for_swob_range is not None:
|
|
# Here, we have to give swob a big-enough content length so that
|
|
# it can compute the actual content length based on the Range
|
|
# header. This value will not be visible to the client; swob will
|
|
# substitute its own Content-Length.
|
|
#
|
|
# Note: if the manifest points to at least CONTAINER_LISTING_LIMIT
|
|
# segments, this may be less than the sum of all the segments'
|
|
# sizes. However, it'll still be greater than the last byte in the
|
|
# Range header, so it's good enough for swob.
|
|
response_headers.append(('Content-Length',
|
|
str(content_length_for_swob_range)))
|
|
elif have_complete_listing:
|
|
actual_content_length = sum(o['bytes'] for o in segments)
|
|
response_headers.append(('Content-Length',
|
|
str(actual_content_length)))
|
|
|
|
if have_complete_listing:
|
|
response_headers = [(h, v) for h, v in response_headers
|
|
if h.lower() != "etag"]
|
|
etag = md5()
|
|
for seg_dict in segments:
|
|
etag.update(normalize_etag(seg_dict['hash']).encode('utf8'))
|
|
response_headers.append(('Etag', '"%s"' % etag.hexdigest()))
|
|
|
|
app_iter = None
|
|
if req.method == 'GET':
|
|
listing_iter = RateLimitedIterator(
|
|
self._segment_listing_iterator(
|
|
req, version, account, container, obj_prefix, segments,
|
|
first_byte=first_byte, last_byte=last_byte),
|
|
self.dlo.rate_limit_segments_per_sec,
|
|
limit_after=self.dlo.rate_limit_after_segment)
|
|
|
|
app_iter = SegmentedIterable(
|
|
req, self.dlo.app, listing_iter, ua_suffix="DLO MultipartGET",
|
|
swift_source="DLO", name=req.path, logger=self.logger,
|
|
max_get_time=self.dlo.max_get_time,
|
|
response_body_length=actual_content_length)
|
|
|
|
try:
|
|
app_iter.validate_first_segment()
|
|
except (SegmentError, ListingIterError):
|
|
return HTTPConflict(request=req)
|
|
|
|
resp = Response(request=req, headers=response_headers,
|
|
conditional_response=True,
|
|
app_iter=app_iter)
|
|
|
|
return resp
|
|
|
|
def handle_request(self, req, start_response):
|
|
"""
|
|
Take a GET or HEAD request, and if it is for a dynamic large object
|
|
manifest, return an appropriate response.
|
|
|
|
Otherwise, simply pass it through.
|
|
"""
|
|
update_ignore_range_header(req, 'X-Object-Manifest')
|
|
resp_iter = self._app_call(req.environ)
|
|
|
|
# make sure this response is for a dynamic large object manifest
|
|
for header, value in self._response_headers:
|
|
if (header.lower() == 'x-object-manifest'):
|
|
close_if_possible(resp_iter)
|
|
response = self.get_or_head_response(
|
|
req, wsgi_to_str(wsgi_unquote(value)))
|
|
return response(req.environ, start_response)
|
|
# Not a dynamic large object manifest; just pass it through.
|
|
start_response(self._response_status,
|
|
self._response_headers,
|
|
self._response_exc_info)
|
|
return resp_iter
|
|
|
|
|
|
class DynamicLargeObject(object):
|
|
def __init__(self, app, conf):
|
|
self.app = app
|
|
self.logger = get_logger(conf, log_route='dlo')
|
|
|
|
# DLO functionality used to live in the proxy server, not middleware,
|
|
# so let's try to go find config values in the proxy's config section
|
|
# to ease cluster upgrades.
|
|
self._populate_config_from_old_location(conf)
|
|
|
|
self.max_get_time = int(conf.get('max_get_time', '86400'))
|
|
self.rate_limit_after_segment = int(conf.get(
|
|
'rate_limit_after_segment', '10'))
|
|
self.rate_limit_segments_per_sec = int(conf.get(
|
|
'rate_limit_segments_per_sec', '1'))
|
|
|
|
def _populate_config_from_old_location(self, conf):
|
|
if ('rate_limit_after_segment' in conf or
|
|
'rate_limit_segments_per_sec' in conf or
|
|
'max_get_time' in conf or
|
|
'__file__' not in conf):
|
|
return
|
|
|
|
proxy_conf = load_app_config(conf['__file__'])
|
|
for setting in ('rate_limit_after_segment',
|
|
'rate_limit_segments_per_sec',
|
|
'max_get_time'):
|
|
if setting in proxy_conf:
|
|
conf[setting] = proxy_conf[setting]
|
|
|
|
def __call__(self, env, start_response):
|
|
"""
|
|
WSGI entry point
|
|
"""
|
|
req = Request(env)
|
|
try:
|
|
vrs, account, container, obj = req.split_path(4, 4, True)
|
|
is_obj_req = True
|
|
except ValueError:
|
|
is_obj_req = False
|
|
if not is_obj_req:
|
|
return self.app(env, start_response)
|
|
|
|
if ((req.method == 'GET' or req.method == 'HEAD') and
|
|
req.params.get('multipart-manifest') != 'get'):
|
|
return GetContext(self, self.logger).\
|
|
handle_request(req, start_response)
|
|
elif req.method == 'PUT':
|
|
error_response = self._validate_x_object_manifest_header(req)
|
|
if error_response:
|
|
return error_response(env, start_response)
|
|
return self.app(env, start_response)
|
|
|
|
def _validate_x_object_manifest_header(self, req):
|
|
"""
|
|
Make sure that X-Object-Manifest is valid if present.
|
|
"""
|
|
if 'X-Object-Manifest' in req.headers:
|
|
value = req.headers['X-Object-Manifest']
|
|
container = prefix = None
|
|
try:
|
|
container, prefix = value.split('/', 1)
|
|
except ValueError:
|
|
pass
|
|
if not container or not prefix or '?' in value or '&' in value or \
|
|
prefix.startswith('/'):
|
|
return HTTPBadRequest(
|
|
request=req,
|
|
body=('X-Object-Manifest must be in the '
|
|
'format container/prefix'))
|
|
|
|
|
|
def filter_factory(global_conf, **local_conf):
|
|
conf = global_conf.copy()
|
|
conf.update(local_conf)
|
|
|
|
def dlo_filter(app):
|
|
return DynamicLargeObject(app, conf)
|
|
return dlo_filter
|