diff --git a/swift/common/utils.py b/swift/common/utils.py index 8173cbbdd2..94bac3b9f4 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -2076,6 +2076,24 @@ def parse_content_type(content_type): return content_type, parm_list +def override_bytes_from_content_type(listing_dict, logger=None): + """ + Takes a dict from a container listing and overrides the content_type, + bytes fields if swift_bytes is set. + """ + content_type, params = parse_content_type(listing_dict['content_type']) + for key, value in params: + if key == 'swift_bytes': + try: + listing_dict['bytes'] = int(value) + except ValueError: + if logger: + logger.exception("Invalid swift_bytes") + else: + content_type += ';%s=%s' % (key, value) + listing_dict['content_type'] = content_type + + def quote(value, safe='/'): """ Patched version of urllib.quote that encodes utf-8 strings before quoting diff --git a/swift/container/server.py b/swift/container/server.py index 2e08bba890..084b068943 100644 --- a/swift/container/server.py +++ b/swift/container/server.py @@ -30,7 +30,8 @@ from swift.common.db import DatabaseAlreadyExists from swift.common.request_helpers import get_param, get_listing_content_type, \ split_and_validate_path from swift.common.utils import get_logger, public, validate_sync_to, \ - config_true_value, json, timing_stats, replication, parse_content_type + config_true_value, json, timing_stats, replication, \ + override_bytes_from_content_type from swift.common.ondisk import hash_path, normalize_timestamp, \ storage_directory from swift.common.constraints import CONTAINER_LISTING_LIMIT, \ @@ -327,22 +328,14 @@ class ContainerController(object): (name, created, size, content_type, etag) = record if content_type is None: return {'subdir': name} - response = {'bytes': size, 'hash': etag, 'name': name} + response = {'bytes': size, 'hash': etag, 'name': name, + 'content_type': content_type} last_modified = datetime.utcfromtimestamp(float(created)).isoformat() # python isoformat() doesn't include msecs when zero if len(last_modified) < len("1970-01-01T00:00:00.000000"): last_modified += ".000000" response['last_modified'] = last_modified - content_type, params = parse_content_type(content_type) - for key, value in params: - if key == 'swift_bytes': - try: - response['bytes'] = int(value) - except ValueError: - self.logger.exception("Invalid swift_bytes") - else: - content_type += ';%s=%s' % (key, value) - response['content_type'] = content_type + override_bytes_from_content_type(response, logger=self.logger) return response @public diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index ee6afdac1c..6ac39b8ee6 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -32,17 +32,19 @@ from datetime import datetime from swift import gettext_ as _ from urllib import unquote, quote from hashlib import md5 +from sys import exc_info from eventlet import sleep, GreenPile from eventlet.queue import Queue from eventlet.timeout import Timeout from swift.common.utils import ContextPool, config_true_value, public, json, \ - csv_append, GreenthreadSafeIterator, quorum_size + csv_append, GreenthreadSafeIterator, quorum_size, split_path, \ + override_bytes_from_content_type, get_valid_utf8_str from swift.common.ondisk import normalize_timestamp from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ - CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE, MAX_BUFFERED_SLO_SEGMENTS + CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \ ListingIterNotAuthorized, ListingIterError, SegmentError @@ -55,34 +57,16 @@ from swift.proxy.controllers.base import Controller, delay_denial, \ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \ HTTPServerError, HTTPServiceUnavailable, Request, Response, \ - HTTPClientDisconnect, HTTPNotImplemented + HTTPClientDisconnect, HTTPNotImplemented, HTTPException -class SegmentListing(object): - - def __init__(self, listing): - self.listing = iter(listing) - self._prepended_segments = [] - - def prepend_segments(self, new_segs): - """ - Will prepend given segments to listing when iterating. - :raises SegmentError: when # segments > MAX_BUFFERED_SLO_SEGMENTS - """ - new_segs.extend(self._prepended_segments) - if len(new_segs) > MAX_BUFFERED_SLO_SEGMENTS: - raise SegmentError('Too many unread slo segments in buffer') - self._prepended_segments = new_segs - - def listing_iter(self): - while True: - if self._prepended_segments: - seg_dict = self._prepended_segments.pop(0) - else: - seg_dict = self.listing.next() - if isinstance(seg_dict['name'], unicode): - seg_dict['name'] = seg_dict['name'].encode('utf-8') - yield seg_dict +def segment_listing_iter(listing): + listing = iter(listing) + while True: + seg_dict = listing.next() + if isinstance(seg_dict['name'], unicode): + seg_dict['name'] = seg_dict['name'].encode('utf-8') + yield seg_dict def copy_headers_into(from_r, to_r): @@ -134,8 +118,7 @@ class SegmentedIterable(object): is_slo=False, max_lo_time=86400): self.controller = controller self.container = container - self.segment_listing = SegmentListing(listing) - self.listing = self.segment_listing.listing_iter() + self.listing = segment_listing_iter(listing) self.is_slo = is_slo self.max_lo_time = max_lo_time self.ratelimit_index = 0 @@ -178,7 +161,8 @@ class SegmentedIterable(object): path = '/%s/%s/%s' % (self.controller.account_name, container, obj) req = Request.blank(path) if self.seek or (self.length and self.length > 0): - bytes_available = self.segment_dict['bytes'] - self.seek + bytes_available = \ + self.segment_dict['bytes'] - self.seek range_tail = '' if self.length: if bytes_available >= self.length: @@ -206,27 +190,16 @@ class SegmentedIterable(object): 'Could not load object segment %(path)s:' ' %(status)s') % {'path': path, 'status': resp.status_int}) if self.is_slo: - if 'X-Static-Large-Object' in resp.headers: - # this segment is a nested slo object. read in the body - # and add its segments into this slo. - try: - sub_manifest = json.loads(resp.body) - self.segment_listing.prepend_segments(sub_manifest) - sub_etag = md5(''.join( - o['hash'] for o in sub_manifest)).hexdigest() - if sub_etag != self.segment_dict['hash']: - raise SegmentError(_( - 'Object segment does not match sub-slo: ' - '%(path)s etag: %(r_etag)s != %(s_etag)s.') % - {'path': path, 'r_etag': sub_etag, - 's_etag': self.segment_dict['hash']}) - return self._load_next_segment() - except ValueError: - raise SegmentError(_( - 'Sub SLO has invalid manifest: %s') % path) - - elif resp.etag != self.segment_dict['hash'] or \ - resp.content_length != self.segment_dict['bytes']: + if (resp.etag != self.segment_dict['hash'] or + (resp.content_length != self.segment_dict['bytes'] and + not req.range)): + # The content-length check is for security reasons. Seems + # possible that an attacker could upload a >1mb object and + # then replace it with a much smaller object with same + # etag. Then create a big nested SLO that calls that + # object many times which would hammer our obj servers. If + # this is a range request, don't check content-length + # because it won't match. raise SegmentError(_( 'Object segment no longer valid: ' '%(path)s etag: %(r_etag)s != %(s_etag)s or ' @@ -288,6 +261,9 @@ class SegmentedIterable(object): except StopIteration: raise except SegmentError: + # I have to save this error because yielding the ' ' below clears + # the exception from the current stack frame. + err = exc_info() if not self.have_yielded_data: # Normally, exceptions before any data has been yielded will # cause Eventlet to send a 5xx response. In this particular @@ -297,7 +273,7 @@ class SegmentedIterable(object): # Agreements and this SegmentError indicates the user has # created an invalid condition. yield ' ' - raise + raise err except (Exception, Timeout) as err: if not getattr(err, 'swift_logged', False): self.controller.app.logger.exception(_( @@ -376,6 +352,7 @@ class SegmentedIterable(object): class ObjectController(Controller): """WSGI controller for object requests.""" server_type = 'Object' + max_slo_recusion_depth = 10 def __init__(self, app, account_name, container_name, object_name, **kwargs): @@ -383,6 +360,7 @@ class ObjectController(Controller): self.account_name = unquote(account_name) self.container_name = unquote(container_name) self.object_name = unquote(object_name) + self.slo_recursion_depth = 0 def _listing_iter(self, lcontainer, lprefix, env): for page in self._listing_pages_iter(lcontainer, lprefix, env): @@ -422,6 +400,67 @@ class ObjectController(Controller): marker = sublisting[-1]['name'].encode('utf-8') yield sublisting + def _slo_listing_obj_iter(self, incoming_req, account, container, obj, + partition=None, initial_resp=None): + """ + The initial_resp indicated that this is a SLO manifest file. This will + create an iterable that will expand nested SLOs as it walks though the + listing. + :params incoming_req: The original GET request from client + :params initial_resp: the first resp from the above request + """ + + if initial_resp and initial_resp.status_int == HTTP_OK and \ + incoming_req.method == 'GET' and not incoming_req.range: + valid_resp = initial_resp + else: + new_req = incoming_req.copy_get() + new_req.method = 'GET' + new_req.range = None + if partition is None: + try: + partition = self.app.object_ring.get_part( + account, container, obj) + except ValueError: + raise HTTPException( + "Invalid path to whole SLO manifest: %s" % + new_req.path) + valid_resp = self.GETorHEAD_base( + new_req, _('Object'), self.app.object_ring, partition, + '/'.join(['', account, container, obj])) + + if 'swift.authorize' in incoming_req.environ: + incoming_req.acl = valid_resp.headers.get('x-container-read') + auth_resp = incoming_req.environ['swift.authorize'](incoming_req) + if auth_resp: + raise ListingIterNotAuthorized(auth_resp) + if valid_resp.status_int == HTTP_NOT_FOUND: + raise ListingIterNotFound() + elif not is_success(valid_resp.status_int): + raise ListingIterError() + try: + listing = json.loads(valid_resp.body) + except ValueError: + listing = [] + for seg_dict in listing: + if config_true_value(seg_dict.get('sub_slo')): + if incoming_req.method == 'HEAD': + override_bytes_from_content_type(seg_dict, + logger=self.app.logger) + yield seg_dict + continue + sub_path = get_valid_utf8_str(seg_dict['name']) + sub_cont, sub_obj = split_path(sub_path, 2, 2, True) + self.slo_recursion_depth += 1 + if self.slo_recursion_depth >= self.max_slo_recusion_depth: + raise ListingIterError("Max recursion depth exceeded") + for sub_seg_dict in self._slo_listing_obj_iter( + incoming_req, account, sub_cont, sub_obj): + yield sub_seg_dict + self.slo_recursion_depth -= 1 + else: + yield seg_dict + def _remaining_items(self, listing_iter): """ Returns an item-by-item iterator for a page-by-page iterator @@ -527,31 +566,29 @@ class ObjectController(Controller): req.params.get('multipart-manifest') != 'get' and \ self.app.allow_static_large_object: large_object = 'SLO' - listing_page1 = () - listing = [] lcontainer = None # container name is included in listing - if resp.status_int == HTTP_OK and \ - req.method == 'GET' and not req.range: - try: - listing = json.loads(resp.body) - except ValueError: - listing = [] - else: - # need to make a second request to get whole manifest - new_req = req.copy_get() - new_req.method = 'GET' - new_req.range = None - new_resp = self.GETorHEAD_base( - new_req, _('Object'), self.app.object_ring, partition, - req.path_info) - if new_resp.status_int // 100 == 2: - try: - listing = json.loads(new_resp.body) - except ValueError: - listing = [] - else: - return HTTPServiceUnavailable( - "Unable to load SLO manifest", request=req) + try: + seg_iter = iter(self._slo_listing_obj_iter( + req, self.account_name, self.container_name, + self.object_name, partition=partition, initial_resp=resp)) + listing_page1 = [] + for seg in seg_iter: + listing_page1.append(seg) + if len(listing_page1) >= CONTAINER_LISTING_LIMIT: + break + listing = itertools.chain(listing_page1, + self._remaining_items(seg_iter)) + except ListingIterNotFound: + return HTTPNotFound(request=req) + except ListingIterNotAuthorized, err: + return err.aresp + except ListingIterError: + return HTTPServerError(request=req) + except StopIteration: + listing_page1 = listing = () + except HTTPException: + return HTTPServiceUnavailable( + "Unable to load SLO manifest", request=req) if 'x-object-manifest' in resp.headers and \ req.params.get('multipart-manifest') != 'get': @@ -602,8 +639,8 @@ class ObjectController(Controller): else: # For objects with a reasonable number of segments, we'll serve # them with a set content-length and computed etag. + listing = list(listing) if listing: - listing = list(listing) try: content_length = sum(o['bytes'] for o in listing) last_modified = \ diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 06881a7e70..58bafc74d2 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -51,6 +51,7 @@ from swift.common.exceptions import (Timeout, MessageTimeout, ConnectionTimeout, LockTimeout) from swift.common import utils from swift.common.swob import Response +from test.unit import FakeLogger class MockOs(): @@ -1492,6 +1493,25 @@ log_name = %(yarr)s''' utils.parse_content_type(r'text/plain; x="\""; a'), ('text/plain', [('x', r'"\""'), ('a', '')])) + def test_override_bytes_from_content_type(self): + listing_dict = { + 'bytes': 1234, 'hash': 'asdf', 'name': 'zxcv', + 'content_type': 'text/plain; hello="world"; swift_bytes=15'} + utils.override_bytes_from_content_type(listing_dict, + logger=FakeLogger()) + self.assertEquals(listing_dict['bytes'], 15) + self.assertEquals(listing_dict['content_type'], + 'text/plain;hello="world"') + + listing_dict = { + 'bytes': 1234, 'hash': 'asdf', 'name': 'zxcv', + 'content_type': 'text/plain; hello="world"; swift_bytes=hey'} + utils.override_bytes_from_content_type(listing_dict, + logger=FakeLogger()) + self.assertEquals(listing_dict['bytes'], 1234) + self.assertEquals(listing_dict['content_type'], + 'text/plain;hello="world"') + def test_quote(self): res = utils.quote('/v1/a/c3/subdirx/') assert res == '/v1/a/c3/subdirx/' diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 6a32dd30a1..37ff6b8b08 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -1046,7 +1046,7 @@ class TestObjectController(unittest.TestCase): response_bodies = ( '', # HEAD /a '', # HEAD /a/c - '', # GET manifest + simplejson.dumps([]), # GET manifest simplejson.dumps([])) # GET empty listing with save_globals(): @@ -1393,7 +1393,7 @@ class TestObjectController(unittest.TestCase): {"hash": "8681fb3ada2715c8754706ee5f23d4f8", "last_modified": "2012-11-08T04:05:37.846710", "bytes": 4, - "name": "/d2/sub_manifest", + "name": u"/d2/sub_manifest \u2661", "sub_slo": True, "content_type": "application/octet-stream"}, {"hash": "419af6d362a14b7a789ba1c7e772bbae", "last_modified": "2012-11-08T04:05:37.866820", @@ -1416,8 +1416,8 @@ class TestObjectController(unittest.TestCase): '', # HEAD /a '', # HEAD /a/c simplejson.dumps(listing), # GET manifest - 'Aa', # GET seg01 simplejson.dumps(sub_listing), # GET sub_manifest + 'Aa', # GET seg01 'Bb', # GET seg02 'Cc', # GET seg03 'Dd') # GET seg04 @@ -1439,12 +1439,12 @@ class TestObjectController(unittest.TestCase): 200, # HEAD /a 200, # HEAD /a/c 200, # GET listing1 - 200, # GET seg01 200, # GET sub listing1 + 200, # GET seg01 200, # GET seg02 200, # GET seg03 200, # GET seg04 - headers=[{}, {}, slob_headers, {}, slob_headers, {}, {}, {}], + headers=[{}, {}, slob_headers, slob_headers, {}, {}, {}, {}], body_iter=response_bodies, give_connect=capture_requested_paths) req = Request.blank('/a/c/manifest') @@ -1457,7 +1457,8 @@ class TestObjectController(unittest.TestCase): requested, [['HEAD', '/a', {}], ['HEAD', '/a/c', {}], - ['GET', '/a/c/manifest', {}]]) + ['GET', '/a/c/manifest', {}], + ['GET', '/a/d2/sub_manifest \xe2\x99\xa1', {}]]) # iterating over body will retrieve manifest and sub manifest's # objects self.assertEqual(resp.body, 'AaBbCcDd') @@ -1466,12 +1467,116 @@ class TestObjectController(unittest.TestCase): [['HEAD', '/a', {}], ['HEAD', '/a/c', {}], ['GET', '/a/c/manifest', {}], + ['GET', '/a/d2/sub_manifest \xe2\x99\xa1', {}], ['GET', '/a/d1/seg01', {}], - ['GET', '/a/d2/sub_manifest', {}], ['GET', '/a/d1/seg02', {}], ['GET', '/a/d2/seg03', {}], ['GET', '/a/d1/seg04', {}]]) + def test_GET_nested_manifest_slo_with_range(self): + """ + Original whole slo is Aa1234Bb where 1234 is a sub-manifests. I'm + pulling out 34Bb + """ + listing = [{"hash": "98568d540134639be4655198a36614a4", # Aa + "last_modified": "2012-11-08T04:05:37.866820", + "bytes": 2, + "name": "/d1/seg01", + "content_type": "application/octet-stream"}, + {"hash": "7b4b0ffa275d404bdc2fc6384916714f", # SubManifest1 + "last_modified": "2012-11-08T04:05:37.866820", + "bytes": 4, "sub_slo": True, + "name": "/d2/subManifest01", + "content_type": "application/octet-stream"}, + {"hash": "d526f1c8ef6c1e4e980e2b8471352d23", # Bb + "last_modified": "2012-11-08T04:05:37.866820", + "bytes": 2, + "name": "/d1/seg02", + "content_type": "application/octet-stream"}] + + sublisting = [{"hash": "c20ad4d76fe97759aa27a0c99bff6710", # 12 + "last_modified": "2012-11-08T04:05:37.866820", + "bytes": 2, + "name": "/d2/subSeg01", + "content_type": "application/octet-stream"}, + {"hash": "e369853df766fa44e1ed0ff613f563bd", # 34 + "last_modified": "2012-11-08T04:05:37.866820", + "bytes": 2, + "name": "/d2/subSeg02", + "content_type": "application/octet-stream"}] + + response_bodies = ( + '', # HEAD /a + '', # HEAD /a/c + simplejson.dumps(listing)[1:1], # GET incomplete manifest + simplejson.dumps(listing), # GET complete manifest + simplejson.dumps(sublisting), # GET complete submanifest + '34', # GET subseg02 + 'Bb') # GET seg02 + etag_iter = ['', '', '', '', '', + 'e369853df766fa44e1ed0ff613f563bd', # subSeg02 + 'd526f1c8ef6c1e4e980e2b8471352d23'] # seg02 + headers = [{}, {}, + {'X-Static-Large-Object': 'True', + 'content-type': 'text/html; swift_bytes=4'}, + {'X-Static-Large-Object': 'True', + 'content-type': 'text/html; swift_bytes=4'}, + {'X-Static-Large-Object': 'True', + 'content-type': 'text/html; swift_bytes=4'}, + {}, {}] + self.assertTrue(len(response_bodies) == len(etag_iter) == len(headers)) + with save_globals(): + controller = proxy_server.ObjectController( + self.app, 'a', 'c', 'manifest') + + requested = [] + + def capture_requested_paths(ipaddr, port, device, partition, + method, path, headers=None, + query_string=None): + qs_dict = dict(urlparse.parse_qsl(query_string or '')) + requested.append([method, path, qs_dict]) + + set_http_connect( + 200, # HEAD /a + 200, # HEAD /a/c + 206, # GET incomplete listing + 200, # GET complete listing + 200, # GET complete sublisting + 200, # GET subSeg02 + 200, # GET seg02 + headers=headers, + etags=etag_iter, + body_iter=response_bodies, + give_connect=capture_requested_paths) + + req = Request.blank('/a/c/manifest') + req.range = 'bytes=4-7' + resp = controller.GET(req) + got_called = [False, ] + + def fake_start_response(*args, **kwargs): + got_called[0] = True + self.assertTrue(args[0].startswith('206')) + + app_iter = resp(req.environ, fake_start_response) + resp_body = ''.join(app_iter) # read in entire resp + self.assertEqual(resp.status_int, 206) + self.assertEqual(resp_body, '34Bb') + self.assertTrue(got_called[0]) + self.assertEqual(resp.content_length, 4) + self.assertEqual(resp.content_type, 'text/html') + + self.assertEqual( + requested, + [['HEAD', '/a', {}], + ['HEAD', '/a/c', {}], + ['GET', '/a/c/manifest', {}], # for incomplete manifest + ['GET', '/a/c/manifest', {}], + ['GET', '/a/d2/subManifest01', {}], + ['GET', '/a/d2/subSeg02', {}], + ['GET', '/a/d1/seg02', {}]]) + def test_GET_bad_404_manifest_slo(self): listing = [{"hash": "98568d540134639be4655198a36614a4", "last_modified": "2012-11-08T04:05:37.866820",