Merge "s3api: Stop relying on container listings during multipart uploads"

This commit is contained in:
Zuul 2019-01-10 04:35:15 +00:00 committed by Gerrit Code Review
commit d1e9a8e0cb
3 changed files with 133 additions and 116 deletions

View File

@ -62,6 +62,7 @@ Static Large Object when the multipart upload is completed.
from hashlib import md5
import os
import re
import time
from swift.common.swob import Range
from swift.common.utils import json, public, reiterate
@ -562,21 +563,7 @@ class UploadController(Controller):
if content_type:
headers['Content-Type'] = content_type
# Query for the objects in the segments area to make sure it completed
query = {
'format': 'json',
'prefix': '%s/%s/' % (req.object_name, upload_id),
'delimiter': '/'
}
container = req.container_name + MULTIUPLOAD_SUFFIX
resp = req.get_response(self.app, 'GET', container, '', query=query)
objinfo = json.loads(resp.body)
objtable = dict((o['name'],
{'path': '/'.join(['', container, o['name']]),
'etag': o['hash'],
'size_bytes': o['bytes']}) for o in objinfo)
s3_etag_hasher = md5()
manifest = []
previous_number = 0
@ -598,16 +585,16 @@ class UploadController(Controller):
if len(etag) >= 2 and etag[0] == '"' and etag[-1] == '"':
# strip double quotes
etag = etag[1:-1]
info = objtable.get("%s/%s/%s" % (req.object_name, upload_id,
part_number))
if info is None or info['etag'] != etag:
if len(etag) != 32 or any(c not in '0123456789abcdef'
for c in etag):
raise InvalidPart(upload_id=upload_id,
part_number=part_number)
manifest.append({
'path': '/%s/%s/%s/%d' % (
container, req.object_name, upload_id, part_number),
'etag': etag})
s3_etag_hasher.update(etag.decode('hex'))
info['size_bytes'] = int(info['size_bytes'])
manifest.append(info)
except (XMLSyntaxError, DocumentInvalid):
# NB: our schema definitions catch uploads with no parts here
raise MalformedXML()
@ -623,11 +610,21 @@ class UploadController(Controller):
c_etag = '; s3_etag=%s' % s3_etag
headers['X-Object-Sysmeta-Container-Update-Override-Etag'] = c_etag
# Check the size of each segment except the last and make sure they are
# all more than the minimum upload chunk size
for info in manifest[:-1]:
if info['size_bytes'] < self.conf.min_segment_size:
raise EntityTooSmall()
too_small_message = ('s3api requires that each segment be at least '
'%d bytes' % self.conf.min_segment_size)
def size_checker(manifest):
# Check the size of each segment except the last and make sure
# they are all more than the minimum upload chunk size.
# Note that we need to use the *internal* keys, since we're
# looking at the manifest that's about to be written.
return [
(item['name'], too_small_message)
for item in manifest[:-1]
if item and item['bytes'] < self.conf.min_segment_size]
req.environ['swift.callback.slo_manifest_hook'] = size_checker
start_time = time.time()
def response_iter():
# NB: XML requires that the XML declaration, if present, be at the
@ -650,27 +647,36 @@ class UploadController(Controller):
put_resp.fix_conditional_response()
for chunk in put_resp.response_iter:
if not chunk.strip():
if time.time() - start_time < 10:
# Include some grace period to keep
# ceph-s3tests happy
continue
if not yielded_anything:
yield ('<?xml version="1.0" '
'encoding="UTF-8"?>\n')
yielded_anything = True
yield chunk
continue
body.append(chunk)
body = json.loads(''.join(body))
body = json.loads(b''.join(body))
if body['Response Status'] != '201 Created':
for seg, err in body['Errors']:
if err == too_small_message:
raise EntityTooSmall()
elif err in ('Etag Mismatch', '404 Not Found'):
raise InvalidPart(upload_id=upload_id)
raise InvalidRequest(
status=body['Response Status'],
msg='\n'.join(': '.join(err)
for err in body['Errors']))
except BadSwiftRequest as e:
msg = str(e)
expected_msg = ('too small; each segment must be '
'at least 1 byte')
if expected_msg in msg:
# FIXME: AWS S3 allows a smaller object than 5 MB if
# there is only one part. Use a COPY request to copy
# the part object from the segments container instead.
if too_small_message in msg:
raise EntityTooSmall(msg)
elif ', Etag Mismatch' in msg:
raise InvalidPart(upload_id=upload_id)
elif ', 404 Not Found' in msg:
raise InvalidPart(upload_id=upload_id)
else:
raise

View File

@ -52,11 +52,11 @@ class TestS3ApiMultiUpload(S3ApiBase):
self.min_segment_size = int(tf.cluster_info['s3api'].get(
'min_segment_size', 5242880))
def _gen_comp_xml(self, etags):
def _gen_comp_xml(self, etags, step=1):
elem = Element('CompleteMultipartUpload')
for i, etag in enumerate(etags):
elem_part = SubElement(elem, 'Part')
SubElement(elem_part, 'PartNumber').text = str(i + 1)
SubElement(elem_part, 'PartNumber').text = str(i * step + 1)
SubElement(elem_part, 'ETag').text = etag
return tostring(elem)
@ -731,13 +731,13 @@ class TestS3ApiMultiUpload(S3ApiBase):
etags = []
for i in range(1, 4):
query = 'partNumber=%s&uploadId=%s' % (i, upload_id)
query = 'partNumber=%s&uploadId=%s' % (2 * i - 1, upload_id)
status, headers, body = \
self.conn.make_request('PUT', bucket, key,
body='A' * 1024 * 1024 * 5, query=query)
etags.append(headers['etag'])
query = 'uploadId=%s' % upload_id
xml = self._gen_comp_xml(etags[:-1])
xml = self._gen_comp_xml(etags[:-1], step=2)
status, headers, body = \
self.conn.make_request('POST', bucket, key, body=xml,
query=query)

View File

@ -38,11 +38,11 @@ from swift.common.middleware.s3api.s3request import MAX_32BIT_INT
xml = '<CompleteMultipartUpload>' \
'<Part>' \
'<PartNumber>1</PartNumber>' \
'<ETag>0123456789abcdef</ETag>' \
'<ETag>0123456789abcdef0123456789abcdef</ETag>' \
'</Part>' \
'<Part>' \
'<PartNumber>2</PartNumber>' \
'<ETag>"fedcba9876543210"</ETag>' \
'<ETag>"fedcba9876543210fedcba9876543210"</ETag>' \
'</Part>' \
'</CompleteMultipartUpload>'
@ -66,8 +66,9 @@ multiparts_template = \
('subdir/object/Z/2', '2014-05-07T19:47:58.592270', 'fedcba9876543210',
41))
s3_etag = '"%s-2"' % hashlib.md5(
'0123456789abcdeffedcba9876543210'.decode('hex')).hexdigest()
s3_etag = '"%s-2"' % hashlib.md5((
'0123456789abcdef0123456789abcdef'
'fedcba9876543210fedcba9876543210').decode('hex')).hexdigest()
class TestS3ApiMultiUpload(S3ApiTestCase):
@ -682,9 +683,6 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
('HEAD', '/v1/AUTH_test/bucket'),
# Segment container exists
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
# Get the currently-uploaded segments
('GET', '/v1/AUTH_test/bucket+segments?delimiter=/'
'&format=json&prefix=object/X/'),
# Create the SLO
('PUT', '/v1/AUTH_test/bucket/object'
'?heartbeat=on&multipart-manifest=put'),
@ -700,7 +698,8 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
h = 'X-Object-Sysmeta-Container-Update-Override-Etag'
self.assertEqual(headers.get(h), override_etag)
def test_object_multipart_upload_complete_with_heartbeat(self):
@patch('swift.common.middleware.s3api.controllers.multi_upload.time')
def test_object_multipart_upload_complete_with_heartbeat(self, mock_time):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X',
swob.HTTPOk, {}, None)
@ -718,6 +717,13 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
'Response Status': '201 Created',
'Errors': [],
})])
mock_time.return_value.time.side_effect = (
1, # start_time
12, # first whitespace
13, # second...
14, # third...
15, # JSON body
)
self.swift.register(
'DELETE', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X',
swob.HTTPNoContent, {}, None)
@ -739,14 +745,63 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/bucket'),
('HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X'),
('GET', '/v1/AUTH_test/bucket+segments?'
'delimiter=/&format=json&prefix=heartbeat-ok/X/'),
('PUT', '/v1/AUTH_test/bucket/heartbeat-ok?'
'heartbeat=on&multipart-manifest=put'),
('DELETE', '/v1/AUTH_test/bucket+segments/heartbeat-ok/X'),
])
def test_object_multipart_upload_complete_failure_with_heartbeat(self):
@patch('swift.common.middleware.s3api.controllers.multi_upload.time')
def test_object_multipart_upload_complete_failure_with_heartbeat(
self, mock_time):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X',
swob.HTTPOk, {}, None)
self.swift.register(
'GET', '/v1/AUTH_test/bucket+segments', swob.HTTPOk, {},
json.dumps([
{'name': item[0].replace('object', 'heartbeat-fail'),
'last_modified': item[1], 'hash': item[2], 'bytes': item[3]}
for item in objects_template
]))
self.swift.register(
'PUT', '/v1/AUTH_test/bucket/heartbeat-fail',
swob.HTTPAccepted, {}, [' ', ' ', ' ', json.dumps({
'Response Status': '400 Bad Request',
'Errors': [['some/object', '403 Forbidden']],
})])
mock_time.return_value.time.side_effect = (
1, # start_time
12, # first whitespace
13, # second...
14, # third...
15, # JSON body
)
req = Request.blank('/bucket/heartbeat-fail?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(), },
body=xml)
status, headers, body = self.call_s3api(req)
lines = body.split('\n')
self.assertTrue(lines[0].startswith('<?xml '), (status, lines))
self.assertTrue(lines[1])
self.assertFalse(lines[1].strip())
fromstring(body, 'Error')
self.assertEqual(status.split()[0], '200')
self.assertEqual(self._get_error_code(body), 'InvalidRequest')
self.assertEqual(self._get_error_message(body),
'some/object: 403 Forbidden')
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/bucket'),
('HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X'),
('PUT', '/v1/AUTH_test/bucket/heartbeat-fail?'
'heartbeat=on&multipart-manifest=put'),
])
@patch('swift.common.middleware.s3api.controllers.multi_upload.time')
def test_object_multipart_upload_missing_part_with_heartbeat(
self, mock_time):
self.swift.register(
'HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X',
swob.HTTPOk, {}, None)
@ -763,6 +818,13 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
'Response Status': '400 Bad Request',
'Errors': [['some/object', '404 Not Found']],
})])
mock_time.return_value.time.side_effect = (
1, # start_time
12, # first whitespace
13, # second...
14, # third...
15, # JSON body
)
req = Request.blank('/bucket/heartbeat-fail?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
@ -776,14 +838,12 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertFalse(lines[1].strip())
fromstring(body, 'Error')
self.assertEqual(status.split()[0], '200')
self.assertEqual(self._get_error_code(body), 'InvalidRequest')
self.assertEqual(self._get_error_message(body),
'some/object: 404 Not Found')
self.assertEqual(self._get_error_code(body), 'InvalidPart')
self.assertIn('One or more of the specified parts could not be found',
self._get_error_message(body))
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/bucket'),
('HEAD', '/v1/AUTH_test/bucket+segments/heartbeat-fail/X'),
('GET', '/v1/AUTH_test/bucket+segments?'
'delimiter=/&format=json&prefix=heartbeat-fail/X/'),
('PUT', '/v1/AUTH_test/bucket/heartbeat-fail?'
'heartbeat=on&multipart-manifest=put'),
])
@ -856,8 +916,10 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertEqual(headers.get('X-Object-Meta-Foo'), 'bar')
def test_object_multipart_upload_complete_segment_too_small(self):
msg = 'Index 0: too small; each segment must be at least 1 byte.'
msg = ('some/path: s3api requires that each segment be at least '
'%d bytes') % self.s3api.conf.min_segment_size
self.swift.register('PUT', '/v1/AUTH_test/bucket/object',
swob.HTTPBadRequest, {}, msg)
req = Request.blank(
'/bucket/object?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
@ -865,15 +927,20 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
'Date': self.get_date_header(), },
body=xml)
self.swift.register('PUT', '/v1/AUTH_test/bucket/object',
swob.HTTPBadRequest, {}, msg)
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'EntityTooSmall')
self.assertEqual(self._get_error_message(body), msg)
# We punt to SLO to do the validation
self.assertEqual([method for method, _ in self.swift.calls],
['HEAD', 'HEAD', 'PUT'])
self.swift.clear_calls()
self.s3api.conf.min_segment_size = 5242880
msg = ('some/path: s3api requires that each segment be at least '
'%d bytes') % self.s3api.conf.min_segment_size
self.swift.register('PUT', '/v1/AUTH_test/bucket/object',
swob.HTTPBadRequest, {}, msg)
req = Request.blank(
'/bucket/object?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
@ -884,10 +951,10 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
status, headers, body = self.call_s3api(req)
self.assertEqual(status.split()[0], '400')
self.assertEqual(self._get_error_code(body), 'EntityTooSmall')
self.assertEqual(self._get_error_message(body),
'Your proposed upload is smaller than the minimum '
'allowed object size.')
self.assertNotIn('PUT', [method for method, _ in self.swift.calls])
self.assertEqual(self._get_error_message(body), msg)
# Again, we punt to SLO to do the validation
self.assertEqual([method for method, _ in self.swift.calls],
['HEAD', 'HEAD', 'PUT'])
def test_object_multipart_upload_complete_zero_segments(self):
segment_bucket = '/v1/AUTH_test/empty-bucket+segments'
@ -927,8 +994,6 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/empty-bucket'),
('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'),
('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'),
])
def test_object_multipart_upload_complete_single_zero_length_segment(self):
@ -975,8 +1040,6 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/empty-bucket'),
('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'),
('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'),
('PUT', '/v1/AUTH_test/empty-bucket/object?'
'heartbeat=on&multipart-manifest=put'),
('DELETE', '/v1/AUTH_test/empty-bucket+segments/object/X'),
@ -985,56 +1048,6 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertEqual(put_headers.get('X-Object-Meta-Foo'), 'bar')
self.assertEqual(put_headers.get('Content-Type'), 'baz/quux')
def test_object_multipart_upload_complete_double_zero_length_segment(self):
segment_bucket = '/v1/AUTH_test/empty-bucket+segments'
object_list = [{
'name': 'object/X/1',
'last_modified': self.last_modified,
'hash': 'd41d8cd98f00b204e9800998ecf8427e',
'bytes': '0',
}, {
'name': 'object/X/2',
'last_modified': self.last_modified,
'hash': 'd41d8cd98f00b204e9800998ecf8427e',
'bytes': '0',
}]
self.swift.register('GET', segment_bucket, swob.HTTPOk, {},
json.dumps(object_list))
self.swift.register('HEAD', '/v1/AUTH_test/empty-bucket',
swob.HTTPNoContent, {}, None)
self.swift.register('HEAD', segment_bucket + '/object/X',
swob.HTTPOk, {'x-object-meta-foo': 'bar',
'content-type': 'baz/quux'}, None)
xml = '<CompleteMultipartUpload>' \
'<Part>' \
'<PartNumber>1</PartNumber>' \
'<ETag>d41d8cd98f00b204e9800998ecf8427e</ETag>' \
'</Part>' \
'<Part>' \
'<PartNumber>2</PartNumber>' \
'<ETag>d41d8cd98f00b204e9800998ecf8427e</ETag>' \
'</Part>' \
'</CompleteMultipartUpload>'
req = Request.blank('/empty-bucket/object?uploadId=X',
environ={'REQUEST_METHOD': 'POST'},
headers={'Authorization': 'AWS test:tester:hmac',
'Date': self.get_date_header(), },
body=xml)
status, headers, body = self.call_s3api(req)
self.assertEqual(self._get_error_code(body), 'EntityTooSmall')
self.assertEqual(status.split()[0], '400')
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/empty-bucket'),
('HEAD', '/v1/AUTH_test/empty-bucket+segments/object/X'),
('GET', '/v1/AUTH_test/empty-bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'),
])
def test_object_multipart_upload_complete_zero_length_final_segment(self):
segment_bucket = '/v1/AUTH_test/bucket+segments'
@ -1096,8 +1109,6 @@ class TestS3ApiMultiUpload(S3ApiTestCase):
self.assertEqual(self.swift.calls, [
('HEAD', '/v1/AUTH_test/bucket'),
('HEAD', '/v1/AUTH_test/bucket+segments/object/X'),
('GET', '/v1/AUTH_test/bucket+segments?delimiter=/&'
'format=json&prefix=object/X/'),
('PUT', '/v1/AUTH_test/bucket/object?'
'heartbeat=on&multipart-manifest=put'),
('DELETE', '/v1/AUTH_test/bucket+segments/object/X'),