# Copyright (c) 2013 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. """ Middleware that will provide Dynamic Large Object (DLO) support. --------------- Using ``swift`` --------------- The quickest way to try out this feature is use the ``swift`` Swift Tool included with the `python-swiftclient`_ library. You can use the ``-S`` option to specify the segment size to use when splitting a large file. For example:: swift upload test_container -S 1073741824 large_file This would split the large_file into 1G segments and begin uploading those segments in parallel. Once all the segments have been uploaded, ``swift`` will then create the manifest file so the segments can be downloaded as one. So now, the following ``swift`` command would download the entire large object:: swift download test_container large_file ``swift`` command uses a strict convention for its segmented object support. In the above example it will upload all the segments into a second container named test_container_segments. These segments will have names like large_file/1290206778.25/21474836480/00000000, large_file/1290206778.25/21474836480/00000001, etc. The main benefit for using a separate container is that the main container listings will not be polluted with all the segment names. The reason for using the segment name format of /// is so that an upload of a new file with the same name won't overwrite the contents of the first until the last moment when the manifest file is updated. ``swift`` will manage these segment files for you, deleting old segments on deletes and overwrites, etc. You can override this behavior with the ``--leave-segments`` option if desired; this is useful if you want to have multiple versions of the same large object available. .. _`python-swiftclient`: http://github.com/openstack/python-swiftclient ---------- Direct API ---------- You can also work with the segments and manifests directly with HTTP requests instead of having ``swift`` do that for you. You can just upload the segments like you would any other object and the manifest is just a zero-byte (not enforced) file with an extra ``X-Object-Manifest`` header. All the object segments need to be in the same container, have a common object name prefix, and sort in the order in which they should be concatenated. Object names are sorted lexicographically as UTF-8 byte strings. They don't have to be in the same container as the manifest file will be, which is useful to keep container listings clean as explained above with ``swift``. The manifest file is simply a zero-byte (not enforced) file with the extra ``X-Object-Manifest: /`` header, where ```` is the container the object segments are in and ```` is the common prefix for all the segments. It is best to upload all the segments first and then create or update the manifest. In this way, the full object won't be available for downloading until the upload is complete. Also, you can upload a new set of segments to a second location and then update the manifest to point to this new location. During the upload of the new segments, the original manifest will still be available to download the first set of segments. .. note:: When updating a manifest object using a POST request, a ``X-Object-Manifest`` header must be included for the object to continue to behave as a manifest object. The manifest file should have no content. However, this is not enforced. If the manifest path itself conforms to container/prefix specified in ``X-Object-Manifest``, and if manifest has some content/data in it, it would also be considered as segment and manifest's content will be part of the concatenated GET response. The order of concatenation follows the usual DLO logic which is - the order of concatenation adheres to order returned when segment names are sorted. Here's an example using ``curl`` with tiny 1-byte segments:: # First, upload the segments curl -X PUT -H 'X-Auth-Token: ' \ http:///container/myobject/00000001 --data-binary '1' curl -X PUT -H 'X-Auth-Token: ' \ http:///container/myobject/00000002 --data-binary '2' curl -X PUT -H 'X-Auth-Token: ' \ http:///container/myobject/00000003 --data-binary '3' # Next, create the manifest file curl -X PUT -H 'X-Auth-Token: ' \ -H 'X-Object-Manifest: container/myobject/' \ http:///container/myobject --data-binary '' # And now we can download the segments as a single object curl -H 'X-Auth-Token: ' \ http:///container/myobject """ import json import six from six.moves.urllib.parse import unquote from hashlib import md5 from swift.common import constraints from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success from swift.common.swob import Request, Response, \ HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict from swift.common.utils import get_logger, \ RateLimitedIterator, quote, close_if_possible, closing_if_possible from swift.common.request_helpers import SegmentedIterable from swift.common.wsgi import WSGIContext, make_subrequest, load_app_config class GetContext(WSGIContext): def __init__(self, dlo, logger): super(GetContext, self).__init__(dlo.app) self.dlo = dlo self.logger = logger def _get_container_listing(self, req, version, account, container, prefix, marker=''): con_req = make_subrequest( req.environ, path='/'.join(['', version, account, container]), method='GET', headers={'x-auth-token': req.headers.get('x-auth-token')}, agent=('%(orig)s ' + 'DLO MultipartGET'), swift_source='DLO') con_req.query_string = 'prefix=%s' % quote(prefix) if marker: con_req.query_string += '&marker=%s' % quote(marker) con_resp = con_req.get_response(self.dlo.app) if not is_success(con_resp.status_int): return con_resp, None with closing_if_possible(con_resp.app_iter): return None, json.loads(''.join(con_resp.app_iter)) def _segment_listing_iterator(self, req, version, account, container, prefix, segments, first_byte=None, last_byte=None): # It's sort of hokey that this thing takes in the first page of # segments as an argument, but we need to compute the etag and content # length from the first page, and it's better to have a hokey # interface than to make redundant requests. if first_byte is None: first_byte = 0 if last_byte is None: last_byte = float("inf") marker = '' while True: for segment in segments: seg_length = int(segment['bytes']) if first_byte >= seg_length: # don't need any bytes from this segment first_byte = max(first_byte - seg_length, -1) last_byte = max(last_byte - seg_length, -1) continue elif last_byte < 0: # no bytes are needed from this or any future segment break seg_name = segment['name'] if isinstance(seg_name, six.text_type): seg_name = seg_name.encode("utf-8") # We deliberately omit the etag and size here; # SegmentedIterable will check size and etag if # specified, but we don't want it to. DLOs only care # that the objects' names match the specified prefix. # SegmentedIterable will instead check that the data read # from each segment matches the response headers. _path = "/".join(["", version, account, container, seg_name]) _first = None if first_byte <= 0 else first_byte _last = None if last_byte >= seg_length - 1 else last_byte yield { 'path': _path, 'first_byte': _first, 'last_byte': _last } first_byte = max(first_byte - seg_length, -1) last_byte = max(last_byte - seg_length, -1) if len(segments) < constraints.CONTAINER_LISTING_LIMIT: # a short page means that we're done with the listing break elif last_byte < 0: break marker = segments[-1]['name'] error_response, segments = self._get_container_listing( req, version, account, container, prefix, marker) if error_response: # we've already started sending the response body to the # client, so all we can do is raise an exception to make the # WSGI server close the connection early close_if_possible(error_response.app_iter) raise ListingIterError( "Got status %d listing container /%s/%s" % (error_response.status_int, account, container)) def get_or_head_response(self, req, x_object_manifest, response_headers=None): if response_headers is None: response_headers = self._response_headers container, obj_prefix = x_object_manifest.split('/', 1) container = unquote(container) obj_prefix = unquote(obj_prefix) version, account, _junk = req.split_path(2, 3, True) error_response, segments = self._get_container_listing( req, version, account, container, obj_prefix) if error_response: return error_response have_complete_listing = len(segments) < \ constraints.CONTAINER_LISTING_LIMIT first_byte = last_byte = None actual_content_length = None content_length_for_swob_range = None if req.range and len(req.range.ranges) == 1: content_length_for_swob_range = sum(o['bytes'] for o in segments) # This is a hack to handle suffix byte ranges (e.g. "bytes=-5"), # which we can't honor unless we have a complete listing. _junk, range_end = req.range.ranges_for_length(float("inf"))[0] # If this is all the segments, we know whether or not this # range request is satisfiable. # # Alternately, we may not have all the segments, but this range # falls entirely within the first page's segments, so we know # that it is satisfiable. if (have_complete_listing or range_end < content_length_for_swob_range): byteranges = req.range.ranges_for_length( content_length_for_swob_range) if not byteranges: headers = {'Accept-Ranges': 'bytes'} if have_complete_listing: headers['Content-Range'] = 'bytes */%d' % ( content_length_for_swob_range, ) return HTTPRequestedRangeNotSatisfiable( request=req, headers=headers) first_byte, last_byte = byteranges[0] # For some reason, swob.Range.ranges_for_length adds 1 to the # last byte's position. last_byte -= 1 actual_content_length = last_byte - first_byte + 1 else: # The range may or may not be satisfiable, but we can't tell # based on just one page of listing, and we're not going to go # get more pages because that would use up too many resources, # so we ignore the Range header and return the whole object. actual_content_length = None content_length_for_swob_range = None req.range = None else: req.range = None response_headers = [ (h, v) for h, v in response_headers if h.lower() not in ("content-length", "content-range")] if content_length_for_swob_range is not None: # Here, we have to give swob a big-enough content length so that # it can compute the actual content length based on the Range # header. This value will not be visible to the client; swob will # substitute its own Content-Length. # # Note: if the manifest points to at least CONTAINER_LISTING_LIMIT # segments, this may be less than the sum of all the segments' # sizes. However, it'll still be greater than the last byte in the # Range header, so it's good enough for swob. response_headers.append(('Content-Length', str(content_length_for_swob_range))) elif have_complete_listing: actual_content_length = sum(o['bytes'] for o in segments) response_headers.append(('Content-Length', str(actual_content_length))) if have_complete_listing: response_headers = [(h, v) for h, v in response_headers if h.lower() != "etag"] etag = md5() for seg_dict in segments: etag.update(seg_dict['hash'].strip('"')) response_headers.append(('Etag', '"%s"' % etag.hexdigest())) app_iter = None if req.method == 'GET': listing_iter = RateLimitedIterator( self._segment_listing_iterator( req, version, account, container, obj_prefix, segments, first_byte=first_byte, last_byte=last_byte), self.dlo.rate_limit_segments_per_sec, limit_after=self.dlo.rate_limit_after_segment) app_iter = SegmentedIterable( req, self.dlo.app, listing_iter, ua_suffix="DLO MultipartGET", swift_source="DLO", name=req.path, logger=self.logger, max_get_time=self.dlo.max_get_time, response_body_length=actual_content_length) try: app_iter.validate_first_segment() except (SegmentError, ListingIterError): return HTTPConflict(request=req) resp = Response(request=req, headers=response_headers, conditional_response=True, app_iter=app_iter) return resp def handle_request(self, req, start_response): """ Take a GET or HEAD request, and if it is for a dynamic large object manifest, return an appropriate response. Otherwise, simply pass it through. """ resp_iter = self._app_call(req.environ) # make sure this response is for a dynamic large object manifest for header, value in self._response_headers: if (header.lower() == 'x-object-manifest'): close_if_possible(resp_iter) response = self.get_or_head_response(req, value) return response(req.environ, start_response) # Not a dynamic large object manifest; just pass it through. start_response(self._response_status, self._response_headers, self._response_exc_info) return resp_iter class DynamicLargeObject(object): def __init__(self, app, conf): self.app = app self.logger = get_logger(conf, log_route='dlo') # DLO functionality used to live in the proxy server, not middleware, # so let's try to go find config values in the proxy's config section # to ease cluster upgrades. self._populate_config_from_old_location(conf) self.max_get_time = int(conf.get('max_get_time', '86400')) self.rate_limit_after_segment = int(conf.get( 'rate_limit_after_segment', '10')) self.rate_limit_segments_per_sec = int(conf.get( 'rate_limit_segments_per_sec', '1')) def _populate_config_from_old_location(self, conf): if ('rate_limit_after_segment' in conf or 'rate_limit_segments_per_sec' in conf or 'max_get_time' in conf or '__file__' not in conf): return proxy_conf = load_app_config(conf['__file__']) for setting in ('rate_limit_after_segment', 'rate_limit_segments_per_sec', 'max_get_time'): if setting in proxy_conf: conf[setting] = proxy_conf[setting] def __call__(self, env, start_response): """ WSGI entry point """ req = Request(env) try: vrs, account, container, obj = req.split_path(4, 4, True) except ValueError: return self.app(env, start_response) if ((req.method == 'GET' or req.method == 'HEAD') and req.params.get('multipart-manifest') != 'get'): return GetContext(self, self.logger).\ handle_request(req, start_response) elif req.method == 'PUT': error_response = self._validate_x_object_manifest_header(req) if error_response: return error_response(env, start_response) return self.app(env, start_response) def _validate_x_object_manifest_header(self, req): """ Make sure that X-Object-Manifest is valid if present. """ if 'X-Object-Manifest' in req.headers: value = req.headers['X-Object-Manifest'] container = prefix = None try: container, prefix = value.split('/', 1) except ValueError: pass if not container or not prefix or '?' in value or '&' in value or \ prefix.startswith('/'): return HTTPBadRequest( request=req, body=('X-Object-Manifest must be in the ' 'format container/prefix')) def filter_factory(global_conf, **local_conf): conf = global_conf.copy() conf.update(local_conf) def dlo_filter(app): return DynamicLargeObject(app, conf) return dlo_filter