SLOs broken for range requests

Change-Id: I21175a4be0cda9a8a98c425bff11c80895cd6d3e
This commit is contained in:
David Goetz 2013-09-10 09:01:32 -07:00
parent ddcf26c729
commit 01f58d6826
5 changed files with 270 additions and 97 deletions

View File

@ -2076,6 +2076,24 @@ def parse_content_type(content_type):
return content_type, parm_list
def override_bytes_from_content_type(listing_dict, logger=None):
"""
Takes a dict from a container listing and overrides the content_type,
bytes fields if swift_bytes is set.
"""
content_type, params = parse_content_type(listing_dict['content_type'])
for key, value in params:
if key == 'swift_bytes':
try:
listing_dict['bytes'] = int(value)
except ValueError:
if logger:
logger.exception("Invalid swift_bytes")
else:
content_type += ';%s=%s' % (key, value)
listing_dict['content_type'] = content_type
def quote(value, safe='/'):
"""
Patched version of urllib.quote that encodes utf-8 strings before quoting

View File

@ -30,7 +30,8 @@ from swift.common.db import DatabaseAlreadyExists
from swift.common.request_helpers import get_param, get_listing_content_type, \
split_and_validate_path
from swift.common.utils import get_logger, public, validate_sync_to, \
config_true_value, json, timing_stats, replication, parse_content_type
config_true_value, json, timing_stats, replication, \
override_bytes_from_content_type
from swift.common.ondisk import hash_path, normalize_timestamp, \
storage_directory
from swift.common.constraints import CONTAINER_LISTING_LIMIT, \
@ -327,22 +328,14 @@ class ContainerController(object):
(name, created, size, content_type, etag) = record
if content_type is None:
return {'subdir': name}
response = {'bytes': size, 'hash': etag, 'name': name}
response = {'bytes': size, 'hash': etag, 'name': name,
'content_type': content_type}
last_modified = datetime.utcfromtimestamp(float(created)).isoformat()
# python isoformat() doesn't include msecs when zero
if len(last_modified) < len("1970-01-01T00:00:00.000000"):
last_modified += ".000000"
response['last_modified'] = last_modified
content_type, params = parse_content_type(content_type)
for key, value in params:
if key == 'swift_bytes':
try:
response['bytes'] = int(value)
except ValueError:
self.logger.exception("Invalid swift_bytes")
else:
content_type += ';%s=%s' % (key, value)
response['content_type'] = content_type
override_bytes_from_content_type(response, logger=self.logger)
return response
@public

View File

@ -32,17 +32,19 @@ from datetime import datetime
from swift import gettext_ as _
from urllib import unquote, quote
from hashlib import md5
from sys import exc_info
from eventlet import sleep, GreenPile
from eventlet.queue import Queue
from eventlet.timeout import Timeout
from swift.common.utils import ContextPool, config_true_value, public, json, \
csv_append, GreenthreadSafeIterator, quorum_size
csv_append, GreenthreadSafeIterator, quorum_size, split_path, \
override_bytes_from_content_type, get_valid_utf8_str
from swift.common.ondisk import normalize_timestamp
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation, \
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE, MAX_BUFFERED_SLO_SEGMENTS
CONTAINER_LISTING_LIMIT, MAX_FILE_SIZE
from swift.common.exceptions import ChunkReadTimeout, \
ChunkWriteTimeout, ConnectionTimeout, ListingIterNotFound, \
ListingIterNotAuthorized, ListingIterError, SegmentError
@ -55,34 +57,16 @@ from swift.proxy.controllers.base import Controller, delay_denial, \
from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
HTTPPreconditionFailed, HTTPRequestEntityTooLarge, HTTPRequestTimeout, \
HTTPServerError, HTTPServiceUnavailable, Request, Response, \
HTTPClientDisconnect, HTTPNotImplemented
HTTPClientDisconnect, HTTPNotImplemented, HTTPException
class SegmentListing(object):
def __init__(self, listing):
self.listing = iter(listing)
self._prepended_segments = []
def prepend_segments(self, new_segs):
"""
Will prepend given segments to listing when iterating.
:raises SegmentError: when # segments > MAX_BUFFERED_SLO_SEGMENTS
"""
new_segs.extend(self._prepended_segments)
if len(new_segs) > MAX_BUFFERED_SLO_SEGMENTS:
raise SegmentError('Too many unread slo segments in buffer')
self._prepended_segments = new_segs
def listing_iter(self):
while True:
if self._prepended_segments:
seg_dict = self._prepended_segments.pop(0)
else:
seg_dict = self.listing.next()
if isinstance(seg_dict['name'], unicode):
seg_dict['name'] = seg_dict['name'].encode('utf-8')
yield seg_dict
def segment_listing_iter(listing):
listing = iter(listing)
while True:
seg_dict = listing.next()
if isinstance(seg_dict['name'], unicode):
seg_dict['name'] = seg_dict['name'].encode('utf-8')
yield seg_dict
def copy_headers_into(from_r, to_r):
@ -134,8 +118,7 @@ class SegmentedIterable(object):
is_slo=False, max_lo_time=86400):
self.controller = controller
self.container = container
self.segment_listing = SegmentListing(listing)
self.listing = self.segment_listing.listing_iter()
self.listing = segment_listing_iter(listing)
self.is_slo = is_slo
self.max_lo_time = max_lo_time
self.ratelimit_index = 0
@ -178,7 +161,8 @@ class SegmentedIterable(object):
path = '/%s/%s/%s' % (self.controller.account_name, container, obj)
req = Request.blank(path)
if self.seek or (self.length and self.length > 0):
bytes_available = self.segment_dict['bytes'] - self.seek
bytes_available = \
self.segment_dict['bytes'] - self.seek
range_tail = ''
if self.length:
if bytes_available >= self.length:
@ -206,27 +190,16 @@ class SegmentedIterable(object):
'Could not load object segment %(path)s:'
' %(status)s') % {'path': path, 'status': resp.status_int})
if self.is_slo:
if 'X-Static-Large-Object' in resp.headers:
# this segment is a nested slo object. read in the body
# and add its segments into this slo.
try:
sub_manifest = json.loads(resp.body)
self.segment_listing.prepend_segments(sub_manifest)
sub_etag = md5(''.join(
o['hash'] for o in sub_manifest)).hexdigest()
if sub_etag != self.segment_dict['hash']:
raise SegmentError(_(
'Object segment does not match sub-slo: '
'%(path)s etag: %(r_etag)s != %(s_etag)s.') %
{'path': path, 'r_etag': sub_etag,
's_etag': self.segment_dict['hash']})
return self._load_next_segment()
except ValueError:
raise SegmentError(_(
'Sub SLO has invalid manifest: %s') % path)
elif resp.etag != self.segment_dict['hash'] or \
resp.content_length != self.segment_dict['bytes']:
if (resp.etag != self.segment_dict['hash'] or
(resp.content_length != self.segment_dict['bytes'] and
not req.range)):
# The content-length check is for security reasons. Seems
# possible that an attacker could upload a >1mb object and
# then replace it with a much smaller object with same
# etag. Then create a big nested SLO that calls that
# object many times which would hammer our obj servers. If
# this is a range request, don't check content-length
# because it won't match.
raise SegmentError(_(
'Object segment no longer valid: '
'%(path)s etag: %(r_etag)s != %(s_etag)s or '
@ -288,6 +261,9 @@ class SegmentedIterable(object):
except StopIteration:
raise
except SegmentError:
# I have to save this error because yielding the ' ' below clears
# the exception from the current stack frame.
err = exc_info()
if not self.have_yielded_data:
# Normally, exceptions before any data has been yielded will
# cause Eventlet to send a 5xx response. In this particular
@ -297,7 +273,7 @@ class SegmentedIterable(object):
# Agreements and this SegmentError indicates the user has
# created an invalid condition.
yield ' '
raise
raise err
except (Exception, Timeout) as err:
if not getattr(err, 'swift_logged', False):
self.controller.app.logger.exception(_(
@ -376,6 +352,7 @@ class SegmentedIterable(object):
class ObjectController(Controller):
"""WSGI controller for object requests."""
server_type = 'Object'
max_slo_recusion_depth = 10
def __init__(self, app, account_name, container_name, object_name,
**kwargs):
@ -383,6 +360,7 @@ class ObjectController(Controller):
self.account_name = unquote(account_name)
self.container_name = unquote(container_name)
self.object_name = unquote(object_name)
self.slo_recursion_depth = 0
def _listing_iter(self, lcontainer, lprefix, env):
for page in self._listing_pages_iter(lcontainer, lprefix, env):
@ -422,6 +400,67 @@ class ObjectController(Controller):
marker = sublisting[-1]['name'].encode('utf-8')
yield sublisting
def _slo_listing_obj_iter(self, incoming_req, account, container, obj,
partition=None, initial_resp=None):
"""
The initial_resp indicated that this is a SLO manifest file. This will
create an iterable that will expand nested SLOs as it walks though the
listing.
:params incoming_req: The original GET request from client
:params initial_resp: the first resp from the above request
"""
if initial_resp and initial_resp.status_int == HTTP_OK and \
incoming_req.method == 'GET' and not incoming_req.range:
valid_resp = initial_resp
else:
new_req = incoming_req.copy_get()
new_req.method = 'GET'
new_req.range = None
if partition is None:
try:
partition = self.app.object_ring.get_part(
account, container, obj)
except ValueError:
raise HTTPException(
"Invalid path to whole SLO manifest: %s" %
new_req.path)
valid_resp = self.GETorHEAD_base(
new_req, _('Object'), self.app.object_ring, partition,
'/'.join(['', account, container, obj]))
if 'swift.authorize' in incoming_req.environ:
incoming_req.acl = valid_resp.headers.get('x-container-read')
auth_resp = incoming_req.environ['swift.authorize'](incoming_req)
if auth_resp:
raise ListingIterNotAuthorized(auth_resp)
if valid_resp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound()
elif not is_success(valid_resp.status_int):
raise ListingIterError()
try:
listing = json.loads(valid_resp.body)
except ValueError:
listing = []
for seg_dict in listing:
if config_true_value(seg_dict.get('sub_slo')):
if incoming_req.method == 'HEAD':
override_bytes_from_content_type(seg_dict,
logger=self.app.logger)
yield seg_dict
continue
sub_path = get_valid_utf8_str(seg_dict['name'])
sub_cont, sub_obj = split_path(sub_path, 2, 2, True)
self.slo_recursion_depth += 1
if self.slo_recursion_depth >= self.max_slo_recusion_depth:
raise ListingIterError("Max recursion depth exceeded")
for sub_seg_dict in self._slo_listing_obj_iter(
incoming_req, account, sub_cont, sub_obj):
yield sub_seg_dict
self.slo_recursion_depth -= 1
else:
yield seg_dict
def _remaining_items(self, listing_iter):
"""
Returns an item-by-item iterator for a page-by-page iterator
@ -527,31 +566,29 @@ class ObjectController(Controller):
req.params.get('multipart-manifest') != 'get' and \
self.app.allow_static_large_object:
large_object = 'SLO'
listing_page1 = ()
listing = []
lcontainer = None # container name is included in listing
if resp.status_int == HTTP_OK and \
req.method == 'GET' and not req.range:
try:
listing = json.loads(resp.body)
except ValueError:
listing = []
else:
# need to make a second request to get whole manifest
new_req = req.copy_get()
new_req.method = 'GET'
new_req.range = None
new_resp = self.GETorHEAD_base(
new_req, _('Object'), self.app.object_ring, partition,
req.path_info)
if new_resp.status_int // 100 == 2:
try:
listing = json.loads(new_resp.body)
except ValueError:
listing = []
else:
return HTTPServiceUnavailable(
"Unable to load SLO manifest", request=req)
try:
seg_iter = iter(self._slo_listing_obj_iter(
req, self.account_name, self.container_name,
self.object_name, partition=partition, initial_resp=resp))
listing_page1 = []
for seg in seg_iter:
listing_page1.append(seg)
if len(listing_page1) >= CONTAINER_LISTING_LIMIT:
break
listing = itertools.chain(listing_page1,
self._remaining_items(seg_iter))
except ListingIterNotFound:
return HTTPNotFound(request=req)
except ListingIterNotAuthorized, err:
return err.aresp
except ListingIterError:
return HTTPServerError(request=req)
except StopIteration:
listing_page1 = listing = ()
except HTTPException:
return HTTPServiceUnavailable(
"Unable to load SLO manifest", request=req)
if 'x-object-manifest' in resp.headers and \
req.params.get('multipart-manifest') != 'get':
@ -602,8 +639,8 @@ class ObjectController(Controller):
else:
# For objects with a reasonable number of segments, we'll serve
# them with a set content-length and computed etag.
listing = list(listing)
if listing:
listing = list(listing)
try:
content_length = sum(o['bytes'] for o in listing)
last_modified = \

View File

@ -51,6 +51,7 @@ from swift.common.exceptions import (Timeout, MessageTimeout,
ConnectionTimeout, LockTimeout)
from swift.common import utils
from swift.common.swob import Response
from test.unit import FakeLogger
class MockOs():
@ -1492,6 +1493,25 @@ log_name = %(yarr)s'''
utils.parse_content_type(r'text/plain; x="\""; a'),
('text/plain', [('x', r'"\""'), ('a', '')]))
def test_override_bytes_from_content_type(self):
listing_dict = {
'bytes': 1234, 'hash': 'asdf', 'name': 'zxcv',
'content_type': 'text/plain; hello="world"; swift_bytes=15'}
utils.override_bytes_from_content_type(listing_dict,
logger=FakeLogger())
self.assertEquals(listing_dict['bytes'], 15)
self.assertEquals(listing_dict['content_type'],
'text/plain;hello="world"')
listing_dict = {
'bytes': 1234, 'hash': 'asdf', 'name': 'zxcv',
'content_type': 'text/plain; hello="world"; swift_bytes=hey'}
utils.override_bytes_from_content_type(listing_dict,
logger=FakeLogger())
self.assertEquals(listing_dict['bytes'], 1234)
self.assertEquals(listing_dict['content_type'],
'text/plain;hello="world"')
def test_quote(self):
res = utils.quote('/v1/a/c3/subdirx/')
assert res == '/v1/a/c3/subdirx/'

View File

@ -1046,7 +1046,7 @@ class TestObjectController(unittest.TestCase):
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
'', # GET manifest
simplejson.dumps([]), # GET manifest
simplejson.dumps([])) # GET empty listing
with save_globals():
@ -1393,7 +1393,7 @@ class TestObjectController(unittest.TestCase):
{"hash": "8681fb3ada2715c8754706ee5f23d4f8",
"last_modified": "2012-11-08T04:05:37.846710",
"bytes": 4,
"name": "/d2/sub_manifest",
"name": u"/d2/sub_manifest \u2661", "sub_slo": True,
"content_type": "application/octet-stream"},
{"hash": "419af6d362a14b7a789ba1c7e772bbae",
"last_modified": "2012-11-08T04:05:37.866820",
@ -1416,8 +1416,8 @@ class TestObjectController(unittest.TestCase):
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing), # GET manifest
'Aa', # GET seg01
simplejson.dumps(sub_listing), # GET sub_manifest
'Aa', # GET seg01
'Bb', # GET seg02
'Cc', # GET seg03
'Dd') # GET seg04
@ -1439,12 +1439,12 @@ class TestObjectController(unittest.TestCase):
200, # HEAD /a
200, # HEAD /a/c
200, # GET listing1
200, # GET seg01
200, # GET sub listing1
200, # GET seg01
200, # GET seg02
200, # GET seg03
200, # GET seg04
headers=[{}, {}, slob_headers, {}, slob_headers, {}, {}, {}],
headers=[{}, {}, slob_headers, slob_headers, {}, {}, {}, {}],
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/a/c/manifest')
@ -1457,7 +1457,8 @@ class TestObjectController(unittest.TestCase):
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}]])
['GET', '/a/c/manifest', {}],
['GET', '/a/d2/sub_manifest \xe2\x99\xa1', {}]])
# iterating over body will retrieve manifest and sub manifest's
# objects
self.assertEqual(resp.body, 'AaBbCcDd')
@ -1466,12 +1467,116 @@ class TestObjectController(unittest.TestCase):
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}],
['GET', '/a/d2/sub_manifest \xe2\x99\xa1', {}],
['GET', '/a/d1/seg01', {}],
['GET', '/a/d2/sub_manifest', {}],
['GET', '/a/d1/seg02', {}],
['GET', '/a/d2/seg03', {}],
['GET', '/a/d1/seg04', {}]])
def test_GET_nested_manifest_slo_with_range(self):
"""
Original whole slo is Aa1234Bb where 1234 is a sub-manifests. I'm
pulling out 34Bb
"""
listing = [{"hash": "98568d540134639be4655198a36614a4", # Aa
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg01",
"content_type": "application/octet-stream"},
{"hash": "7b4b0ffa275d404bdc2fc6384916714f", # SubManifest1
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 4, "sub_slo": True,
"name": "/d2/subManifest01",
"content_type": "application/octet-stream"},
{"hash": "d526f1c8ef6c1e4e980e2b8471352d23", # Bb
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d1/seg02",
"content_type": "application/octet-stream"}]
sublisting = [{"hash": "c20ad4d76fe97759aa27a0c99bff6710", # 12
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d2/subSeg01",
"content_type": "application/octet-stream"},
{"hash": "e369853df766fa44e1ed0ff613f563bd", # 34
"last_modified": "2012-11-08T04:05:37.866820",
"bytes": 2,
"name": "/d2/subSeg02",
"content_type": "application/octet-stream"}]
response_bodies = (
'', # HEAD /a
'', # HEAD /a/c
simplejson.dumps(listing)[1:1], # GET incomplete manifest
simplejson.dumps(listing), # GET complete manifest
simplejson.dumps(sublisting), # GET complete submanifest
'34', # GET subseg02
'Bb') # GET seg02
etag_iter = ['', '', '', '', '',
'e369853df766fa44e1ed0ff613f563bd', # subSeg02
'd526f1c8ef6c1e4e980e2b8471352d23'] # seg02
headers = [{}, {},
{'X-Static-Large-Object': 'True',
'content-type': 'text/html; swift_bytes=4'},
{'X-Static-Large-Object': 'True',
'content-type': 'text/html; swift_bytes=4'},
{'X-Static-Large-Object': 'True',
'content-type': 'text/html; swift_bytes=4'},
{}, {}]
self.assertTrue(len(response_bodies) == len(etag_iter) == len(headers))
with save_globals():
controller = proxy_server.ObjectController(
self.app, 'a', 'c', 'manifest')
requested = []
def capture_requested_paths(ipaddr, port, device, partition,
method, path, headers=None,
query_string=None):
qs_dict = dict(urlparse.parse_qsl(query_string or ''))
requested.append([method, path, qs_dict])
set_http_connect(
200, # HEAD /a
200, # HEAD /a/c
206, # GET incomplete listing
200, # GET complete listing
200, # GET complete sublisting
200, # GET subSeg02
200, # GET seg02
headers=headers,
etags=etag_iter,
body_iter=response_bodies,
give_connect=capture_requested_paths)
req = Request.blank('/a/c/manifest')
req.range = 'bytes=4-7'
resp = controller.GET(req)
got_called = [False, ]
def fake_start_response(*args, **kwargs):
got_called[0] = True
self.assertTrue(args[0].startswith('206'))
app_iter = resp(req.environ, fake_start_response)
resp_body = ''.join(app_iter) # read in entire resp
self.assertEqual(resp.status_int, 206)
self.assertEqual(resp_body, '34Bb')
self.assertTrue(got_called[0])
self.assertEqual(resp.content_length, 4)
self.assertEqual(resp.content_type, 'text/html')
self.assertEqual(
requested,
[['HEAD', '/a', {}],
['HEAD', '/a/c', {}],
['GET', '/a/c/manifest', {}], # for incomplete manifest
['GET', '/a/c/manifest', {}],
['GET', '/a/d2/subManifest01', {}],
['GET', '/a/d2/subSeg02', {}],
['GET', '/a/d1/seg02', {}]])
def test_GET_bad_404_manifest_slo(self):
listing = [{"hash": "98568d540134639be4655198a36614a4",
"last_modified": "2012-11-08T04:05:37.866820",