diff --git a/swift/common/middleware/s3api/controllers/multi_upload.py b/swift/common/middleware/s3api/controllers/multi_upload.py index 0dbbb9934e..3ab7e78016 100644 --- a/swift/common/middleware/s3api/controllers/multi_upload.py +++ b/swift/common/middleware/s3api/controllers/multi_upload.py @@ -108,6 +108,13 @@ def _get_upload_info(req, app, upload_id): try: return req.get_response(app, 'HEAD', container=container, obj=obj) except NoSuchKey: + try: + resp = req.get_response(app, 'HEAD') + if resp.sysmeta_headers.get(sysmeta_header( + 'object', 'upload-id')) == upload_id: + return resp + except NoSuchKey: + pass raise NoSuchUpload(upload_id=upload_id) finally: # ...making sure to restore any copy-source before returning @@ -115,9 +122,34 @@ def _get_upload_info(req, app, upload_id): req.headers['X-Amz-Copy-Source'] = copy_source -def _check_upload_info(req, app, upload_id): +def _make_complete_body(req, s3_etag, yielded_anything): + result_elem = Element('CompleteMultipartUploadResult') - _get_upload_info(req, app, upload_id) + # NOTE: boto with sig v4 appends port to HTTP_HOST value at + # the request header when the port is non default value and it + # makes req.host_url like as http://localhost:8080:8080/path + # that obviously invalid. Probably it should be resolved at + # swift.common.swob though, tentatively we are parsing and + # reconstructing the correct host_url info here. + # in detail, https://github.com/boto/boto/pull/3513 + parsed_url = urlparse(req.host_url) + host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname) + # Why are we doing our own port parsing? Because py3 decided + # to start raising ValueErrors on access after parsing such + # an invalid port + netloc = parsed_url.netloc.split('@')[-1].split(']')[-1] + if ':' in netloc: + port = netloc.split(':', 2)[1] + host_url += ':%s' % port + + SubElement(result_elem, 'Location').text = host_url + req.path + SubElement(result_elem, 'Bucket').text = req.container_name + SubElement(result_elem, 'Key').text = req.object_name + SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag + body = tostring(result_elem, xml_declaration=not yielded_anything) + if yielded_anything: + return b'\n' + body + return body class PartController(Controller): @@ -152,7 +184,7 @@ class PartController(Controller): err_msg) upload_id = req.params['uploadId'] - _check_upload_info(req, self.app, upload_id) + _get_upload_info(req, self.app, upload_id) req.container_name += MULTIUPLOAD_SUFFIX req.object_name = '%s/%s/%d' % (req.object_name, upload_id, @@ -449,7 +481,7 @@ class UploadController(Controller): raise InvalidArgument('encoding-type', encoding_type, err_msg) upload_id = req.params['uploadId'] - _check_upload_info(req, self.app, upload_id) + _get_upload_info(req, self.app, upload_id) maxparts = req.get_validated_param( 'max-parts', DEFAULT_MAX_PARTS_LISTING, @@ -536,7 +568,7 @@ class UploadController(Controller): Handles Abort Multipart Upload. """ upload_id = req.params['uploadId'] - _check_upload_info(req, self.app, upload_id) + _get_upload_info(req, self.app, upload_id) # First check to see if this multi-part upload was already # completed. Look in the primary container, if the object exists, @@ -580,7 +612,8 @@ class UploadController(Controller): """ upload_id = req.params['uploadId'] resp = _get_upload_info(req, self.app, upload_id) - headers = {'Accept': 'application/json'} + headers = {'Accept': 'application/json', + sysmeta_header('object', 'upload-id'): upload_id} for key, val in resp.headers.items(): _key = key.lower() if _key.startswith('x-amz-meta-'): @@ -650,7 +683,15 @@ class UploadController(Controller): raise s3_etag = '%s-%d' % (s3_etag_hasher.hexdigest(), len(manifest)) - headers[sysmeta_header('object', 'etag')] = s3_etag + s3_etag_header = sysmeta_header('object', 'etag') + if resp.sysmeta_headers.get(s3_etag_header) == s3_etag: + # This header should only already be present if the upload marker + # has been cleaned up and the current target uses the same + # upload-id; assuming the segments to use haven't changed, the work + # is already done + return HTTPOk(body=_make_complete_body(req, s3_etag, False), + content_type='application/xml') + headers[s3_etag_header] = s3_etag # Leave base header value blank; SLO will populate c_etag = '; s3_etag=%s' % s3_etag headers[get_container_update_override_key('etag')] = c_etag @@ -730,37 +771,13 @@ class UploadController(Controller): try: req.get_response(self.app, 'DELETE', container, obj) except NoSuchKey: - # We know that this existed long enough for us to HEAD + # The important thing is that we wrote out a tombstone to + # make sure the marker got cleaned up. If it's already + # gone (e.g., because of concurrent completes or a retried + # complete), so much the better. pass - result_elem = Element('CompleteMultipartUploadResult') - - # NOTE: boto with sig v4 appends port to HTTP_HOST value at - # the request header when the port is non default value and it - # makes req.host_url like as http://localhost:8080:8080/path - # that obviously invalid. Probably it should be resolved at - # swift.common.swob though, tentatively we are parsing and - # reconstructing the correct host_url info here. - # in detail, https://github.com/boto/boto/pull/3513 - parsed_url = urlparse(req.host_url) - host_url = '%s://%s' % (parsed_url.scheme, parsed_url.hostname) - # Why are we doing our own port parsing? Because py3 decided - # to start raising ValueErrors on access after parsing such - # an invalid port - netloc = parsed_url.netloc.split('@')[-1].split(']')[-1] - if ':' in netloc: - port = netloc.split(':', 2)[1] - host_url += ':%s' % port - - SubElement(result_elem, 'Location').text = host_url + req.path - SubElement(result_elem, 'Bucket').text = req.container_name - SubElement(result_elem, 'Key').text = req.object_name - SubElement(result_elem, 'ETag').text = '"%s"' % s3_etag - resp.headers.pop('ETag', None) - if yielded_anything: - yield b'\n' - yield tostring(result_elem, - xml_declaration=not yielded_anything) + yield _make_complete_body(req, s3_etag, yielded_anything) except ErrorResponse as err_resp: if yielded_anything: err_resp.xml_declaration = False diff --git a/swift/common/middleware/s3api/utils.py b/swift/common/middleware/s3api/utils.py index 5cf501e4e3..613d43a849 100644 --- a/swift/common/middleware/s3api/utils.py +++ b/swift/common/middleware/s3api/utils.py @@ -120,10 +120,6 @@ class S3Timestamp(utils.Timestamp): return self.isoformat.replace( '-', '').replace(':', '')[:-7] + 'Z' - @classmethod - def now(cls): - return cls(time.time()) - def mktime(timestamp_str, time_format='%Y-%m-%dT%H:%M:%S'): """ diff --git a/test/functional/s3api/__init__.py b/test/functional/s3api/__init__.py index 7ad2c077b0..4d0075dfc1 100644 --- a/test/functional/s3api/__init__.py +++ b/test/functional/s3api/__init__.py @@ -15,6 +15,8 @@ import unittest import traceback +from contextlib import contextmanager +import logging import test.functional as tf from test.functional.s3api.s3_test_client import ( Connection, get_boto3_conn, tear_down_s3) @@ -33,6 +35,14 @@ class S3ApiBase(unittest.TestCase): super(S3ApiBase, self).__init__(method_name) self.method_name = method_name + @contextmanager + def quiet_boto_logging(self): + try: + logging.getLogger('boto').setLevel(logging.INFO) + yield + finally: + logging.getLogger('boto').setLevel(logging.DEBUG) + def setUp(self): if 's3api' not in tf.cluster_info: raise tf.SkipTest('s3api middleware is not enabled') diff --git a/test/functional/s3api/test_multi_upload.py b/test/functional/s3api/test_multi_upload.py index c2e1c0f93d..b2aa006a28 100644 --- a/test/functional/s3api/test_multi_upload.py +++ b/test/functional/s3api/test_multi_upload.py @@ -78,9 +78,9 @@ class TestS3ApiMultiUpload(S3ApiBase): def _upload_part(self, bucket, key, upload_id, content=None, part_num=1): query = 'partNumber=%s&uploadId=%s' % (part_num, upload_id) content = content if content else b'a' * self.min_segment_size - status, headers, body = \ - self.conn.make_request('PUT', bucket, key, body=content, - query=query) + with self.quiet_boto_logging(): + status, headers, body = self.conn.make_request( + 'PUT', bucket, key, body=content, query=query) return status, headers, body def _upload_part_copy(self, src_bucket, src_obj, dst_bucket, dst_key, @@ -113,7 +113,7 @@ class TestS3ApiMultiUpload(S3ApiBase): bucket = 'bucket' keys = ['obj1', 'obj2', 'obj3'] bad_content_md5 = base64.b64encode(b'a' * 16).strip().decode('ascii') - headers = [None, + headers = [{'Content-Type': 'foo/bar', 'x-amz-meta-baz': 'quux'}, {'Content-MD5': bad_content_md5}, {'Etag': 'nonsense'}] uploads = [] @@ -293,7 +293,7 @@ class TestS3ApiMultiUpload(S3ApiBase): self._complete_multi_upload(bucket, key, upload_id, xml) self.assertEqual(status, 200) self.assertCommonResponseHeaders(headers) - self.assertTrue('content-type' in headers) + self.assertIn('content-type', headers) self.assertEqual(headers['content-type'], 'application/xml') if 'content-length' in headers: self.assertEqual(headers['content-length'], str(len(body))) @@ -317,9 +317,46 @@ class TestS3ApiMultiUpload(S3ApiBase): self.assertEqual(etag, exp_etag) exp_size = self.min_segment_size * len(etags) + status, headers, body = \ + self.conn.make_request('HEAD', bucket, key) + self.assertEqual(status, 200) + self.assertEqual(headers['content-length'], str(exp_size)) + self.assertEqual(headers['content-type'], 'foo/bar') + self.assertEqual(headers['x-amz-meta-baz'], 'quux') + swift_etag = '"%s"' % md5(concatted_etags).hexdigest() # TODO: GET via swift api, check against swift_etag + # Should be safe to retry + status, headers, body = \ + self._complete_multi_upload(bucket, key, upload_id, xml) + self.assertEqual(status, 200) + self.assertCommonResponseHeaders(headers) + self.assertIn('content-type', headers) + self.assertEqual(headers['content-type'], 'application/xml') + if 'content-length' in headers: + self.assertEqual(headers['content-length'], str(len(body))) + else: + self.assertIn('transfer-encoding', headers) + self.assertEqual(headers['transfer-encoding'], 'chunked') + lines = body.split(b'\n') + self.assertTrue(lines[0].startswith(b''), body) + elem = fromstring(body, 'CompleteMultipartUploadResult') + self.assertEqual( + '%s/bucket/obj1' % tf.config['s3_storage_url'].rstrip('/'), + elem.find('Location').text) + self.assertEqual(elem.find('Bucket').text, bucket) + self.assertEqual(elem.find('Key').text, key) + self.assertEqual(elem.find('ETag').text, exp_etag) + + status, headers, body = \ + self.conn.make_request('HEAD', bucket, key) + self.assertEqual(status, 200) + self.assertEqual(headers['content-length'], str(exp_size)) + self.assertEqual(headers['content-type'], 'foo/bar') + self.assertEqual(headers['x-amz-meta-baz'], 'quux') + # Upload Part Copy -- MU as source key, upload_id = uploads[1] status, headers, body, resp_etag = \ diff --git a/test/unit/common/middleware/s3api/test_multi_upload.py b/test/unit/common/middleware/s3api/test_multi_upload.py index 15eb022a4a..16aeb4126a 100644 --- a/test/unit/common/middleware/s3api/test_multi_upload.py +++ b/test/unit/common/middleware/s3api/test_multi_upload.py @@ -827,7 +827,7 @@ class TestS3ApiMultiUpload(S3ApiTestCase): self.assertEqual(self.swift.calls, [ # Bucket exists ('HEAD', '/v1/AUTH_test/bucket'), - # Segment container exists + # Upload marker exists ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), # Create the SLO ('PUT', '/v1/AUTH_test/bucket/object' @@ -843,6 +843,123 @@ class TestS3ApiMultiUpload(S3ApiTestCase): override_etag = '; s3_etag=%s' % S3_ETAG.strip('"') h = 'X-Object-Sysmeta-Container-Update-Override-Etag' self.assertEqual(headers.get(h), override_etag) + self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X') + + def test_object_multipart_upload_retry_complete(self): + content_md5 = base64.b64encode(hashlib.md5( + XML.encode('ascii')).digest()) + self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X', + swob.HTTPNotFound, {}, None) + recent_ts = S3Timestamp.now(delta=-1000000).internal # 10s ago + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, + {'x-object-meta-foo': 'bar', + 'content-type': 'baz/quux', + 'x-object-sysmeta-s3api-upload-id': 'X', + 'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'), + 'x-timestamp': recent_ts}, None) + req = Request.blank('/bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5, }, + body=XML) + status, headers, body = self.call_s3api(req) + elem = fromstring(body, 'CompleteMultipartUploadResult') + self.assertNotIn('Etag', headers) + self.assertEqual(elem.find('ETag').text, S3_ETAG) + self.assertEqual(status.split()[0], '200') + + self.assertEqual(self.swift.calls, [ + # Bucket exists + ('HEAD', '/v1/AUTH_test/bucket'), + # Upload marker does not exist + ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), + # But the object does, and with the same upload ID + ('HEAD', '/v1/AUTH_test/bucket/object'), + # So no PUT necessary + ]) + + def test_object_multipart_upload_retry_complete_etag_mismatch(self): + content_md5 = base64.b64encode(hashlib.md5( + XML.encode('ascii')).digest()) + self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X', + swob.HTTPNotFound, {}, None) + recent_ts = S3Timestamp.now(delta=-1000000).internal + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, + {'x-object-meta-foo': 'bar', + 'content-type': 'baz/quux', + 'x-object-sysmeta-s3api-upload-id': 'X', + 'x-object-sysmeta-s3api-etag': 'not-the-etag', + 'x-timestamp': recent_ts}, None) + req = Request.blank('/bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5, }, + body=XML) + status, headers, body = self.call_s3api(req) + elem = fromstring(body, 'CompleteMultipartUploadResult') + self.assertNotIn('Etag', headers) + self.assertEqual(elem.find('ETag').text, S3_ETAG) + self.assertEqual(status.split()[0], '200') + + self.assertEqual(self.swift.calls, [ + # Bucket exists + ('HEAD', '/v1/AUTH_test/bucket'), + # Upload marker does not exist + ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), + # But the object does, and with the same upload ID + ('HEAD', '/v1/AUTH_test/bucket/object'), + # Create the SLO + ('PUT', '/v1/AUTH_test/bucket/object' + '?heartbeat=on&multipart-manifest=put'), + # Retry deleting the marker for the sake of completeness + ('DELETE', '/v1/AUTH_test/bucket+segments/object/X') + ]) + + _, _, headers = self.swift.calls_with_headers[-2] + self.assertEqual(headers.get('X-Object-Meta-Foo'), 'bar') + self.assertEqual(headers.get('Content-Type'), 'baz/quux') + # SLO will provide a base value + override_etag = '; s3_etag=%s' % S3_ETAG.strip('"') + h = 'X-Object-Sysmeta-Container-Update-Override-Etag' + self.assertEqual(headers.get(h), override_etag) + self.assertEqual(headers.get('X-Object-Sysmeta-S3Api-Upload-Id'), 'X') + + def test_object_multipart_upload_retry_complete_upload_id_mismatch(self): + content_md5 = base64.b64encode(hashlib.md5( + XML.encode('ascii')).digest()) + self.swift.register('HEAD', '/v1/AUTH_test/bucket+segments/object/X', + swob.HTTPNotFound, {}, None) + recent_ts = S3Timestamp.now(delta=-1000000).internal + self.swift.register('HEAD', '/v1/AUTH_test/bucket/object', + swob.HTTPOk, + {'x-object-meta-foo': 'bar', + 'content-type': 'baz/quux', + 'x-object-sysmeta-s3api-upload-id': 'Y', + 'x-object-sysmeta-s3api-etag': S3_ETAG.strip('"'), + 'x-timestamp': recent_ts}, None) + req = Request.blank('/bucket/object?uploadId=X', + environ={'REQUEST_METHOD': 'POST'}, + headers={'Authorization': 'AWS test:tester:hmac', + 'Date': self.get_date_header(), + 'Content-MD5': content_md5, }, + body=XML) + status, headers, body = self.call_s3api(req) + elem = fromstring(body, 'Error') + self.assertEqual(elem.find('Code').text, 'NoSuchUpload') + self.assertEqual(status.split()[0], '404') + + self.assertEqual(self.swift.calls, [ + # Bucket exists + ('HEAD', '/v1/AUTH_test/bucket'), + # Upload marker does not exist + ('HEAD', '/v1/AUTH_test/bucket+segments/object/X'), + # But the object does, and with the same upload ID + ('HEAD', '/v1/AUTH_test/bucket/object'), + ]) def test_object_multipart_upload_invalid_md5(self): bad_md5 = base64.b64encode(hashlib.md5(