SLO: Concurrently HEAD segments

Before creating a static large object, we must verify that all of the
referenced segments exist. Previously, this was done sequentially; due
to latency between proxy and object nodes, clients must be careful to
either keep their segment count low or use very long (minute+) timeouts.
We mitigate this somewhat by enforcing a hard limit on segment count,
but even then, HEADing a thousand segments (the default limit) with an
average latency of (say) 100ms will require more than a minute and a
half.

Further, the nested-SLO approach requires multiple requests from the
client -- as a result, Swift3 is in the position of enforcing a lower
limit than S3's 10,000 (which will break some clients) or requiring that
clients have timeouts on the order of 15-20 minutes (!).

Now, we'll perform the segment HEADs in parallel, with a concurrency
factor set by the operator. This is very similar to (and builds upon)
the parallel-bulk-delete work. By default, two HEAD requests will be
allowed at a time.

As a side-effect, we'll also only ever HEAD a path once per manifest.
Previously, if a manifest alternated between two paths repeatedly (for
instance, because the user wanted to splice together various ranges from
two sub-SLOs), then each entry in the manifest would trigger a fresh
HEAD request.

Upgrade Consideration
=====================
If operators would like to preserve the prior (single-threaded) SLO
creation behavior, they must add the following line to their
[filter:slo] proxy config section:

   concurrency = 1

This may be done prior to upgrading Swift.

UpgradeImpact
Closes-Bug: #1637133
Related-Change: I128374d74a4cef7a479b221fd15eec785cc4694a
Change-Id: I567949567ecdbd94fa06d1dd5d3cdab0d97207b6
This commit is contained in:
Tim Burke 2016-10-29 22:00:01 +02:00
parent e55f3ad203
commit f850ff065e
3 changed files with 124 additions and 99 deletions

View File

@ -698,10 +698,17 @@ use = egg:swift#slo
# Time limit on GET requests (seconds) # Time limit on GET requests (seconds)
# max_get_time = 86400 # max_get_time = 86400
# #
# When deleting with ?multipart-manifest=delete, multiple deletes may be # When creating an SLO, multiple segment validations may be executed in
# executed in parallel. Avoid setting this too high, as it gives clients a # parallel. Further, multiple deletes may be executed in parallel when deleting
# force multiplier which may be used in DoS attacks. The suggested range is # with ?multipart-manifest=delete. Use this setting to limit how many
# between 2 and 10. # subrequests may be executed concurrently. Avoid setting it too high, as it
# gives clients a force multiplier which may be used in DoS attacks. The
# suggested range is between 2 and 10.
# concurrency = 2
#
# This may be used to separately tune validation and delete concurrency values.
# Default is to use the concurrency value from above; all of the same caveats
# apply regarding recommended ranges.
# delete_concurrency = 2 # delete_concurrency = 2
# Note: Put after auth and staticweb in the pipeline. # Note: Put after auth and staticweb in the pipeline.

View File

@ -202,6 +202,7 @@ metadata which can be used for stats purposes.
from six.moves import range from six.moves import range
from collections import defaultdict
from datetime import datetime from datetime import datetime
import json import json
import mimetypes import mimetypes
@ -217,7 +218,7 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
from swift.common.utils import get_logger, config_true_value, \ from swift.common.utils import get_logger, config_true_value, \
get_valid_utf8_str, override_bytes_from_content_type, split_path, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \
register_swift_info, RateLimitedIterator, quote, close_if_possible, \ register_swift_info, RateLimitedIterator, quote, close_if_possible, \
closing_if_possible, LRUCache closing_if_possible, LRUCache, StreamingPile
from swift.common.request_helpers import SegmentedIterable from swift.common.request_helpers import SegmentedIterable
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
@ -798,7 +799,10 @@ class StaticLargeObject(object):
'rate_limit_after_segment', '10')) 'rate_limit_after_segment', '10'))
self.rate_limit_segments_per_sec = int(self.conf.get( self.rate_limit_segments_per_sec = int(self.conf.get(
'rate_limit_segments_per_sec', '1')) 'rate_limit_segments_per_sec', '1'))
delete_concurrency = int(self.conf.get('delete_concurrency', '2')) self.concurrency = min(1000, max(0, int(self.conf.get(
'concurrency', '2'))))
delete_concurrency = int(self.conf.get(
'delete_concurrency', self.concurrency))
self.bulk_deleter = Bulk( self.bulk_deleter = Bulk(
app, {}, delete_concurrency=delete_concurrency, logger=self.logger) app, {}, delete_concurrency=delete_concurrency, logger=self.logger)
@ -851,93 +855,103 @@ class StaticLargeObject(object):
if not out_content_type: if not out_content_type:
out_content_type = 'text/plain' out_content_type = 'text/plain'
data_for_storage = [] data_for_storage = []
slo_etag = md5() path2indices = defaultdict(list)
last_obj_path = None
for index, seg_dict in enumerate(parsed_data): for index, seg_dict in enumerate(parsed_data):
obj_name = seg_dict['path'] path2indices[seg_dict['path']].append(index)
if isinstance(obj_name, six.text_type):
obj_name = obj_name.encode('utf-8')
obj_path = '/'.join(['', vrs, account, obj_name.lstrip('/')])
if obj_path != last_obj_path: def do_head(obj_name):
last_obj_path = obj_path obj_path = '/'.join(['', vrs, account,
sub_req = make_subrequest( get_valid_utf8_str(obj_name).lstrip('/')])
req.environ, path=obj_path + '?', # kill the query string
method='HEAD',
headers={'x-auth-token': req.headers.get('x-auth-token')},
agent='%(orig)s SLO MultipartPUT', swift_source='SLO')
head_seg_resp = sub_req.get_response(self)
if head_seg_resp.is_success: sub_req = make_subrequest(
segment_length = head_seg_resp.content_length req.environ, path=obj_path + '?', # kill the query string
if seg_dict.get('range'): method='HEAD',
# Since we now know the length, we can normalize the headers={'x-auth-token': req.headers.get('x-auth-token')},
# range. We know that there is exactly one range agent='%(orig)s SLO MultipartPUT', swift_source='SLO')
# requested since we checked that earlier in return obj_name, sub_req.get_response(self)
# parse_and_validate_input().
ranges = seg_dict['range'].ranges_for_length(
head_seg_resp.content_length)
if not ranges: def validate_seg_dict(seg_dict, head_seg_resp):
problem_segments.append([quote(obj_name), if not head_seg_resp.is_success:
'Unsatisfiable Range'])
elif ranges == [(0, head_seg_resp.content_length)]:
# Just one range, and it exactly matches the object.
# Why'd we do this again?
del seg_dict['range']
segment_length = head_seg_resp.content_length
else:
rng = ranges[0]
seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
segment_length = rng[1] - rng[0]
if segment_length < 1:
problem_segments.append(
[quote(obj_name),
'Too small; each segment must be at least 1 byte.'])
total_size += segment_length
if seg_dict['size_bytes'] is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict['etag'] is None or \
seg_dict['etag'] == head_seg_resp.etag:
if seg_dict.get('range'):
slo_etag.update('%s:%s;' % (head_seg_resp.etag,
seg_dict['range']))
else:
slo_etag.update(head_seg_resp.etag)
else:
problem_segments.append([quote(obj_name), 'Etag Mismatch'])
if head_seg_resp.last_modified:
last_modified = head_seg_resp.last_modified
else:
# shouldn't happen
last_modified = datetime.now()
last_modified_formatted = \
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted}
if seg_dict.get('range'):
seg_data['range'] = seg_dict['range']
if config_true_value(
head_seg_resp.headers.get('X-Static-Large-Object')):
seg_data['sub_slo'] = True
data_for_storage.append(seg_data)
else:
problem_segments.append([quote(obj_name), problem_segments.append([quote(obj_name),
head_seg_resp.status]) head_seg_resp.status])
return 0, None
segment_length = head_seg_resp.content_length
if seg_dict.get('range'):
# Since we now know the length, we can normalize the
# range. We know that there is exactly one range
# requested since we checked that earlier in
# parse_and_validate_input().
ranges = seg_dict['range'].ranges_for_length(
head_seg_resp.content_length)
if not ranges:
problem_segments.append([quote(obj_name),
'Unsatisfiable Range'])
elif ranges == [(0, head_seg_resp.content_length)]:
# Just one range, and it exactly matches the object.
# Why'd we do this again?
del seg_dict['range']
segment_length = head_seg_resp.content_length
else:
rng = ranges[0]
seg_dict['range'] = '%d-%d' % (rng[0], rng[1] - 1)
segment_length = rng[1] - rng[0]
if segment_length < 1:
problem_segments.append(
[quote(obj_name),
'Too small; each segment must be at least 1 byte.'])
if seg_dict['size_bytes'] is not None and \
seg_dict['size_bytes'] != head_seg_resp.content_length:
problem_segments.append([quote(obj_name), 'Size Mismatch'])
if seg_dict['etag'] is not None and \
seg_dict['etag'] != head_seg_resp.etag:
problem_segments.append([quote(obj_name), 'Etag Mismatch'])
if head_seg_resp.last_modified:
last_modified = head_seg_resp.last_modified
else:
# shouldn't happen
last_modified = datetime.now()
last_modified_formatted = \
last_modified.strftime('%Y-%m-%dT%H:%M:%S.%f')
seg_data = {'name': '/' + seg_dict['path'].lstrip('/'),
'bytes': head_seg_resp.content_length,
'hash': head_seg_resp.etag,
'content_type': head_seg_resp.content_type,
'last_modified': last_modified_formatted}
if seg_dict.get('range'):
seg_data['range'] = seg_dict['range']
if config_true_value(
head_seg_resp.headers.get('X-Static-Large-Object')):
seg_data['sub_slo'] = True
return segment_length, seg_data
data_for_storage = [None] * len(parsed_data)
with StreamingPile(self.concurrency) as pile:
for obj_name, resp in pile.asyncstarmap(do_head, (
(path, ) for path in path2indices)):
for i in path2indices[obj_name]:
segment_length, seg_data = validate_seg_dict(
parsed_data[i], resp)
data_for_storage[i] = seg_data
total_size += segment_length
if problem_segments: if problem_segments:
resp_body = get_response_body( resp_body = get_response_body(
out_content_type, {}, problem_segments) out_content_type, {}, problem_segments)
raise HTTPBadRequest(resp_body, content_type=out_content_type) raise HTTPBadRequest(resp_body, content_type=out_content_type)
env = req.environ env = req.environ
slo_etag = md5()
for seg_data in data_for_storage:
if seg_data.get('range'):
slo_etag.update('%s:%s;' % (seg_data['hash'],
seg_data['range']))
else:
slo_etag.update(seg_data['hash'])
if not env.get('CONTENT_TYPE'): if not env.get('CONTENT_TYPE'):
guessed_type, _junk = mimetypes.guess_type(req.path_info) guessed_type, _junk = mimetypes.guess_type(req.path_info)
env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream' env['CONTENT_TYPE'] = guessed_type or 'application/octet-stream'

View File

@ -418,7 +418,7 @@ class TestSloPutManifest(SloTestCase):
def my_fake_start_response(*args, **kwargs): def my_fake_start_response(*args, **kwargs):
gen_etag = '"' + md5hex('etagoftheobjectsegment') + '"' gen_etag = '"' + md5hex('etagoftheobjectsegment') + '"'
self.assertTrue(('Etag', gen_etag) in args[1]) self.assertIn(('Etag', gen_etag), args[1])
self.slo(req.environ, my_fake_start_response) self.slo(req.environ, my_fake_start_response)
self.assertIn('X-Static-Large-Object', req.headers) self.assertIn('X-Static-Large-Object', req.headers)
@ -552,17 +552,13 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual(self.app.call_count, 5) self.assertEqual(self.app.call_count, 5)
errors = json.loads(body)['Errors'] errors = json.loads(body)['Errors']
self.assertEqual(len(errors), 5) self.assertEqual([
self.assertEqual(errors[0][0], '/checktest/a_1') [u'/checktest/a_1', u'Size Mismatch'],
self.assertEqual(errors[0][1], 'Size Mismatch') [u'/checktest/b_2', u'Etag Mismatch'],
self.assertEqual(errors[1][0], '/checktest/badreq') [u'/checktest/badreq', u'400 Bad Request'],
self.assertEqual(errors[1][1], '400 Bad Request') [u'/checktest/slob', u'Etag Mismatch'],
self.assertEqual(errors[2][0], '/checktest/b_2') [u'/checktest/slob', u'Size Mismatch'],
self.assertEqual(errors[2][1], 'Etag Mismatch') ], sorted(errors))
self.assertEqual(errors[3][0], '/checktest/slob')
self.assertEqual(errors[3][1], 'Size Mismatch')
self.assertEqual(errors[4][0], '/checktest/slob')
self.assertEqual(errors[4][1], 'Etag Mismatch')
def test_handle_multipart_put_skip_size_check(self): def test_handle_multipart_put_skip_size_check(self):
good_data = json.dumps( good_data = json.dumps(
@ -675,21 +671,25 @@ class TestSloPutManifest(SloTestCase):
'size_bytes': 2, 'range': '-1'}, 'size_bytes': 2, 'range': '-1'},
{'path': '/checktest/b_2', 'etag': None, {'path': '/checktest/b_2', 'etag': None,
'size_bytes': 2, 'range': '0-0'}, 'size_bytes': 2, 'range': '0-0'},
{'path': '/checktest/a_1', 'etag': None,
'size_bytes': None},
{'path': '/cont/object', 'etag': None, {'path': '/cont/object', 'etag': None,
'size_bytes': None, 'range': '10-40'}]) 'size_bytes': None, 'range': '10-40'}])
req = Request.blank( req = Request.blank(
'/v1/AUTH_test/checktest/man_3?multipart-manifest=put', '/v1/AUTH_test/checktest/man_3?multipart-manifest=put',
environ={'REQUEST_METHOD': 'PUT'}, body=good_data) environ={'REQUEST_METHOD': 'PUT'}, body=good_data)
status, headers, body = self.call_slo(req) status, headers, body = self.call_slo(req)
expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;etagoftheobjectsegment:' expected_etag = '"%s"' % md5hex('ab:1-1;b:0-0;aetagoftheobjectsegment:'
'10-40;') '10-40;')
self.assertEqual(expected_etag, dict(headers)['Etag']) self.assertEqual(expected_etag, dict(headers)['Etag'])
self.assertEqual([ self.assertEqual([
('HEAD', '/v1/AUTH_test/checktest/a_1'), ('HEAD', '/v1/AUTH_test/checktest/a_1'), # Only once!
('HEAD', '/v1/AUTH_test/checktest/b_2'), # Only once! ('HEAD', '/v1/AUTH_test/checktest/b_2'), # Only once!
('HEAD', '/v1/AUTH_test/cont/object'), ('HEAD', '/v1/AUTH_test/cont/object'),
], sorted(self.app.calls[:-1]))
self.assertEqual(
('PUT', '/v1/AUTH_test/checktest/man_3?multipart-manifest=put'), ('PUT', '/v1/AUTH_test/checktest/man_3?multipart-manifest=put'),
], self.app.calls) self.app.calls[-1])
# Check that we still populated the manifest properly from our HEADs # Check that we still populated the manifest properly from our HEADs
req = Request.blank( req = Request.blank(
@ -699,9 +699,10 @@ class TestSloPutManifest(SloTestCase):
environ={'REQUEST_METHOD': 'GET'}) environ={'REQUEST_METHOD': 'GET'})
status, headers, body = self.call_app(req) status, headers, body = self.call_app(req)
manifest_data = json.loads(body) manifest_data = json.loads(body)
self.assertEqual(len(manifest_data), 5)
self.assertEqual('a', manifest_data[0]['hash']) self.assertEqual('a', manifest_data[0]['hash'])
self.assertNotIn('range', manifest_data[0]) self.assertNotIn('range', manifest_data[0])
self.assertNotIn('segment_bytes', manifest_data[0])
self.assertEqual('b', manifest_data[1]['hash']) self.assertEqual('b', manifest_data[1]['hash'])
self.assertEqual('1-1', manifest_data[1]['range']) self.assertEqual('1-1', manifest_data[1]['range'])
@ -709,8 +710,11 @@ class TestSloPutManifest(SloTestCase):
self.assertEqual('b', manifest_data[2]['hash']) self.assertEqual('b', manifest_data[2]['hash'])
self.assertEqual('0-0', manifest_data[2]['range']) self.assertEqual('0-0', manifest_data[2]['range'])
self.assertEqual('etagoftheobjectsegment', manifest_data[3]['hash']) self.assertEqual('a', manifest_data[3]['hash'])
self.assertEqual('10-40', manifest_data[3]['range']) self.assertNotIn('range', manifest_data[3])
self.assertEqual('etagoftheobjectsegment', manifest_data[4]['hash'])
self.assertEqual('10-40', manifest_data[4]['range'])
class TestSloDeleteManifest(SloTestCase): class TestSloDeleteManifest(SloTestCase):