From 02548717acb4b59d3fdc90fa32097d1eac00a3a6 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Fri, 15 May 2020 16:12:58 -0700 Subject: [PATCH] s3api: Allow CompleteMultipartUpload requests to be retried When completing a multipart-upload, include the upload-id in sysmeta. If we can't find the upload marker, check the final object name; if it has an upload-id in sysmeta and it matches the upload-id that we're trying to complete, allow the complete to continue. Also add an early return if the already-completed upload's ETag matches the computed ETag for the user's request. This should help clients that can't take advantage of how we dribble out whitespace to try to keep the conneciton alive: The client times out, retries, and if the upload actually completed, it gets a fast 200 response. Change-Id: I38958839be5b250c9d268ec7c50a56cdb56c2fa2 --- .../s3api/controllers/multi_upload.py | 89 +++++++------ swift/common/middleware/s3api/utils.py | 4 - test/functional/s3api/__init__.py | 10 ++ test/functional/s3api/test_multi_upload.py | 47 ++++++- .../middleware/s3api/test_multi_upload.py | 119 +++++++++++++++++- 5 files changed, 223 insertions(+), 46 deletions(-) diff --git a/swift/common/middleware/s3api/controllers/multi_upload.py b/swift/common/middleware/s3api/controllers/multi_upload.py index 0dbbb9934e..3ab7e78016 100644 --- a/swift/common/middleware/s3api/controllers/multi_upload.py +++ b/swift/common/middleware/s3api/controllers/multi_upload.py @@ -108,6 +108,13 @@ def _get_upload_info(req, app, upload_id): try: return req.get_response(app, 'HEAD', container=container, obj=obj) except NoSuchKey: + try: + resp = req.get_response(app, 'HEAD') + if resp.sysmeta_headers.get(sysmeta_header( + 'object', 'upload-id')) == upload_id: + return resp + except NoSuchKey: + pass raise NoSuchUpload(upload_id=upload_id) finally: # ...making sure to restore any copy-source before returning @@ -115,9 +122,34 @@ def _get_upload_info(req, app, upload_id): req.headers['X-Amz-Copy-Source'] = copy_source -def _check_upload_info(req, app, upload_id): +def _make_complete_body(req, s3_etag, yielded_anything): + result_elem = Element('CompleteMultipartUploadResult') - _get_upload_info(req, app, upload_id) + # NOTE: boto with sig v4 appends port to HTTP_HOST value at + # the request header when the port is non default value and it + # makes req.host_url like as http://localhost:8080:8080/path + # that obviously invalid. Probably it should be resolved at + # swift.common.swob though, tentatively we are parsing and + # reconstructing the correct host_url info here. + # in detail, https://github.com/boto/boto/pull/3513 + parsed_url = urlparse(req.host_url) + host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname) + # Why are we doing our own port parsing? Because py3 decided + # to start raising ValueErrors on access after parsing such + # an invalid port + netloc = parsed_url.netloc.split('@')[-1].split(']')[-1] + if ':' in netloc: + port = netloc.split(':', 2)[1] + host_url += ':%s' % port + + SubElement(result_elem, 'Location').text = host_url + req.path + SubElement(result_elem, 'Bucket').text = req.container_name + SubElement(result_elem, 'Key').text = req.object_name + SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag + body = tostring(result_elem, xml_declaration=not yielded_anything) + if yielded_anything: + return b'\n' + body + return body class PartController(Controller): @@ -152,7 +184,7 @@ class PartController(Controller): err_msg) upload_id = req.params['uploadId'] - _check_upload_info(req, self.app, upload_id) + _get_upload_info(req, self.app, upload_id) req.container_name += MULTIUPLOAD_SUFFIX req.object_name = '%s/%s/%d' % (req.object_name, upload_id, @@ -449,7 +481,7 @@ class UploadController(Controller): raise InvalidArgument('encoding-type', encoding_type, err_msg) upload_id = req.params['uploadId'] - _check_upload_info(req, self.app, upload_id) + _get_upload_info(req, self.app, upload_id) maxparts = req.get_validated_param( 'max-parts', DEFAULT_MAX_PARTS_LISTING, @@ -536,7 +568,7 @@ class UploadController(Controller): Handles Abort Multipart Upload. """ upload_id = req.params['uploadId'] - _check_upload_info(req, self.app, upload_id) + _get_upload_info(req, self.app, upload_id) # First check to see if this multi-part upload was already # completed. Look in the primary container, if the object exists, @@ -580,7 +612,8 @@ class UploadController(Controller): """ upload_id = req.params['uploadId'] resp = _get_upload_info(req, self.app, upload_id) - headers = {'Accept': 'application/json'} + headers = {'Accept': 'application/json', + sysmeta_header('object', 'upload-id'): upload_id} for key, val in resp.headers.items(): _key = key.lower() if _key.startswith('x-amz-meta-'): @@ -650,7 +683,15 @@ class UploadController(Controller): raise s3_etag = '%s-%d' % (s3_etag_hasher.hexdigest(), len(manifest)) - headers[sysmeta_header('object', 'etag')] = s3_etag + s3_etag_header = sysmeta_header('object', 'etag') + if resp.sysmeta_headers.get(s3_etag_header) == s3_etag: + # This header should only already be present if the upload marker + # has been cleaned up and the current target uses the same + # upload-id; assuming the segments to use haven't changed, the work + # is already done + return HTTPOk(body=_make_complete_body(req, s3_etag, False), + content_type='application/xml') + headers[s3_etag_header] = s3_etag # Leave base header value blank; SLO will populate c_etag = '; s3_etag=%s' % s3_etag headers[get_container_update_override_key('etag')] = c_etag @@ -730,37 +771,13 @@ class UploadController(Controller): try: req.get_response(self.app, 'DELETE', container, obj) except NoSuchKey: - # We know that this existed long enough for us to HEAD + # The important thing is that we wrote out a tombstone to + # make sure the marker got cleaned up. If it's already + # gone (e.g., because of concurrent completes or a retried + # complete), so much the better. pass - result_elem = Element('CompleteMultipartUploadResult') - - # NOTE: boto with sig v4 appends port to HTTP_HOST value at - # the request header when the port is non default value and it - # makes req.host_url like as http://localhost:8080:8080/path - # that obviously invalid. Probably it should be resolved at - # swift.common.swob though, tentatively we are parsing and - # reconstructing the correct host_url info here. - # in detail, https://github.com/boto/boto/pull/3513 - parsed_url = urlparse(req.host_url) - host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname) - # Why are we doing our own port parsing? Because py3 decided - # to start raising ValueErrors on access after parsing such - # an invalid port - netloc = parsed_url.netloc.split('@')[-1].split(']')[-1] - if ':' in netloc: - port = netloc.split(':', 2)[1] - host_url += ':%s' % port - - SubElement(result_elem, 'Location').text = host_url + req.path - SubElement(result_elem, 'Bucket').text = req.container_name - SubElement(result_elem, 'Key').text = req.object_name - SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag - resp.headers.pop('ETag', None) - if yielded_anything: - yield b'\n' - yield tostring(result_elem, - xml_declaration=not yielded_anything) + yield _make_complete_body(req, s3_etag, yielded_anything) except ErrorResponse as err_resp: if yielded_anything: err_resp.xml_declaration = False diff --git a/swift/common/middleware/s3api/utils.py b/swift/common/middleware/s3api/utils.py index 5cf501e4e3..613d43a849 100644 --- a/swift/common/middleware/s3api/utils.py +++ b/swift/common/middleware/s3api/utils.py @@ -120,10 +120,6 @@ class S3Timestamp(utils.Timestamp): return self.isoformat.replace( '-', '').replace(':', '')[:-7] + 'Z' - @classmethod - def now(cls): - return cls(time.time()) - def mktime(timestamp_str, time_format='%Y-%m-%dT%H:%M:%S'): """ diff --git a/test/functional/s3api/__init__.py b/test/functional/s3api/__init__.py index 7ad2c077b0..4d0075dfc1 100644 --- a/test/functional/s3api/__init__.py +++ b/test/functional/s3api/__init__.py @@ -15,6 +15,8 @@ import unittest import traceback +from contextlib import contextmanager +import logging import test.functional as tf from test.functional.s3api.s3_test_client import ( Connection, get_boto3_conn, tear_down_s3) @@ -33,6 +35,14 @@ class S3ApiBase(unittest.TestCase): super(S3ApiBase, self).__init__(method_name) self.method_name = method_name + @contextmanager + def quiet_boto_logging(self): + try: + logging.getLogger('boto').setLevel(logging.INFO) + yield + finally: + logging.getLogger('boto').setLevel(logging.DEBUG) + def setUp(self): if 's3api' not in tf.cluster_info: raise tf.SkipTest('s3api middleware is not enabled') diff --git a/test/functional/s3api/test_multi_upload.py b/test/functional/s3api/test_multi_upload.py index c2e1c0f93d..b2aa006a28 100644 --- a/test/functional/s3api/test_multi_upload.py +++ b/test/functional/s3api/test_multi_upload.py @@ -78,9 +78,9 @@ class TestS3ApiMultiUpload(S3ApiBase): def _upload_part(self, bucket, key, upload_id, content=None, part_num=1): query = 'partNumber=%s&uploadId=%s' % (part_num, upload_id) content = content if content else b'a' * self.min_segment_size - status, headers, body = \ - self.conn.make_request('PUT', bucket, key, body=content, - query=query) + with self.quiet_boto_logging(): + status, headers, body = self.conn.make_request( + 'PUT', bucket, key, body=content, query=query) return status, headers, body def _upload_part_copy(self, src_bucket, src_obj, dst_bucket, dst_key, @@ -113,7 +113,7 @@ class TestS3ApiMultiUpload(S3ApiBase): bucket = 'bucket' keys = ['obj1', 'obj2', 'obj3'] bad_content_md5 = base64.b64encode(b'a' * 16).strip().decode('ascii') - headers = [None, + headers = [{'Content-Type': 'foo/bar', 'x-amz-meta-baz': 'quux'}, {'Content-MD5': bad_content_md5}, {'Etag': 'nonsense'}] uploads = [] @@ -293,7 +293,7 @@ class TestS3ApiMultiUpload(S3ApiBase): self._complete_multi_upload(bucket, key, upload_id, xml) self.assertEqual(status, 200) self.assertCommonResponseHeaders(headers) - self.assertTrue('content-type' in headers) + self.assertIn('content-type', headers) self.assertEqual(headers['content-type'], 'application/xml') if 'content-length' in headers: self.assertEqual(headers['content-length'], str(len(body))) @@ -317,9 +317,46 @@ class TestS3ApiMultiUpload(S3ApiBase): self.assertEqual(etag, exp_etag) exp_size = self.min_segment_size * len(etags) + status, headers, body = \ + self.conn.make_request('HEAD', bucket, key) + self.assertEqual(status, 200) + self.assertEqual(headers['content-length'], str(exp_size)) + self.assertEqual(headers['content-type'], 'foo/bar') + self.assertEqual(headers['x-amz-meta-baz'], 'quux') + swift_etag = '"%s"' % md5(concatted_etags).hexdigest() # TODO: GET via swift api, check against swift_etag + # Should be safe to retry + status, headers, body = \ + self._complete_multi_upload(bucket, key, upload_id, xml) + self.assertEqual(status, 200) + self.assertCommonResponseHeaders(headers) + self.assertIn('content-type', headers) + self.assertEqual(headers['content-type'], 'application/xml') + if 'content-length' in headers: + self.assertEqual(headers['content-length'], str(len(body))) + else: + self.assertIn('transfer-encoding', headers) + self.assertEqual(headers['transfer-encoding'], 'chunked') + lines = body.split(b'\n') + self.assertTrue(lines[0].startswith(b''), body) + elem = fromstring(body, 'CompleteMultipartUploadResult') + self.assertEqual( + '%s/bucket/obj1' % tf.config['s3_storage_url'].rstrip('/'), + elem.find('Location').text) + self.assertEqual(elem.find('Bucket').text, bucket) + self.assertEqual(elem.find('Key').text, key) + self.assertEqual(elem.find('ETag').text, exp_etag) + + status, headers, body = \ + self.conn.make_request('HEAD', bucket, key) + self.assertEqual(status, 200) + self.assertEqual(headers['content-length'], str(exp_size)) + self.assertEqual(headers['content-type'], 'foo/bar') + self.assertEqual(headers['x-amz-meta-baz'], 'quux') + # Upload Part Copy -- MU as source key, upload_id = uploads[1] status, headers, body, resp_etag = \ diff --git a/test/unit/common/middleware/s3api/test_multi_upload.py b/test/unit/common/middleware/s3api/test_multi_upload.py index 15eb022a4a..16aeb4126a 100644 --- a/test/unit/common/middleware/s3api/test_multi_upload.py +++ b/test/unit/common/middleware/s3api/test_multi_upload.py @@ -827,7 +827,7 @@ class TestS3ApiMultiUpload(S3ApiTestCase): self.assertEqual(self.swift.calls, [ # Bucket exists ('HEAD', '/v1/AUTH_test/bucket'), - # Segment container exists + # Upload marker exists ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), # Create the SLO ('PUT', '/v1/AUTH_test/bucket/object' @@ -843,6 +843,123 @@ class TestS3ApiMultiUpload(S3ApiTestCase): override_etag = '; s3_etag=%s' % S3_ETAG.strip('"') h = 'X-Object-Sysmeta-Container-Update-Override-Etag' self.assertEqual(headers.get(h), override_etag) + self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X') + + def test_object_multipart_upload_retry_complete(self): + content_md5 = base64.b64encode(hashlib.md5( + XML.encode('ascii')).digest()) + self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X', + swob.HTTPNotFound, {}, None) + recent_ts = S3Timestamp.now(delta=-1000000).internal # 10s ago + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, + {'x-object-meta-foo': 'bar', + 'content-type': 'baz/quux', + 'x-object-sysmeta-s3api-upload-id': 'X', + 'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'), + 'x-timestamp': recent_ts}, None) + req = Request.blank('/bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5, }, + body=XML) + status, headers, body = self.call_s3api(req) + elem = fromstring(body, 'CompleteMultipartUploadResult') + self.assertNotIn('Etag', headers) + self.assertEqual(elem.find('ETag').text, S3_ETAG) + self.assertEqual(status.split()[0], '200') + + self.assertEqual(self.swift.calls, [ + # Bucket exists + ('HEAD', '/v1/AUTH_test/bucket'), + # Upload marker does not exist + ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), + # But the object does, and with the same upload ID + ('HEAD', '/v1/AUTH_test/bucket/object'), + # So no PUT necessary + ]) + + def test_object_multipart_upload_retry_complete_etag_mismatch(self): + content_md5 = base64.b64encode(hashlib.md5( + XML.encode('ascii')).digest()) + self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X', + swob.HTTPNotFound, {}, None) + recent_ts = S3Timestamp.now(delta=-1000000).internal + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, + {'x-object-meta-foo': 'bar', + 'content-type': 'baz/quux', + 'x-object-sysmeta-s3api-upload-id': 'X', + 'x-object-sysmeta-s3api-etag': 'not-the-etag', + 'x-timestamp': recent_ts}, None) + req = Request.blank('/bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5, }, + body=XML) + status, headers, body = self.call_s3api(req) + elem = fromstring(body, 'CompleteMultipartUploadResult') + self.assertNotIn('Etag', headers) + self.assertEqual(elem.find('ETag').text, S3_ETAG) + self.assertEqual(status.split()[0], '200') + + self.assertEqual(self.swift.calls, [ + # Bucket exists + ('HEAD', '/v1/AUTH_test/bucket'), + # Upload marker does not exist + ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), + # But the object does, and with the same upload ID + ('HEAD', '/v1/AUTH_test/bucket/object'), + # Create the SLO + ('PUT', '/v1/AUTH_test/bucket/object' + '?heartbeat=on&multipart-manifest=put'), + # Retry deleting the marker for the sake of completeness + ('DELETE', '/v1/AUTH_test/bucket+segments/object/X') + ]) + + _, _, headers = self.swift.calls_with_headers[-2] + self.assertEqual(headers.get('X-Object-Meta-Foo'), 'bar') + self.assertEqual(headers.get('Content-Type'), 'baz/quux') + # SLO will provide a base value + override_etag = '; s3_etag=%s' % S3_ETAG.strip('"') + h = 'X-Object-Sysmeta-Container-Update-Override-Etag' + self.assertEqual(headers.get(h), override_etag) + self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X') + + def test_object_multipart_upload_retry_complete_upload_id_mismatch(self): + content_md5 = base64.b64encode(hashlib.md5( + XML.encode('ascii')).digest()) + self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X', + swob.HTTPNotFound, {}, None) + recent_ts = S3Timestamp.now(delta=-1000000).internal + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, + {'x-object-meta-foo': 'bar', + 'content-type': 'baz/quux', + 'x-object-sysmeta-s3api-upload-id': 'Y', + 'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'), + 'x-timestamp': recent_ts}, None) + req = Request.blank('/bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5, }, + body=XML) + status, headers, body = self.call_s3api(req) + elem = fromstring(body, 'Error') + self.assertEqual(elem.find('Code').text, 'NoSuchUpload') + self.assertEqual(status.split()[0], '404') + + self.assertEqual(self.swift.calls, [ + # Bucket exists + ('HEAD', '/v1/AUTH_test/bucket'), + # Upload marker does not exist + ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), + # But the object does, and with the same upload ID + ('HEAD', '/v1/AUTH_test/bucket/object'), + ]) def test_object_multipart_upload_invalid_md5(self): bad_md5 = base64.b64encode(hashlib.md5(