diff --git a/swift/common/exceptions.py b/swift/common/exceptions.py index abc09bf36b..54b4e2e2ea 100644 --- a/swift/common/exceptions.py +++ b/swift/common/exceptions.py @@ -117,6 +117,10 @@ class PathNotDir(OSError): pass +class ChunkReadError(SwiftException): + pass + + class ChunkReadTimeout(Timeout): pass diff --git a/swift/common/utils.py b/swift/common/utils.py index 996486916d..99a1fa8417 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3356,7 +3356,10 @@ class _MultipartMimeFileLikeObject(object): return '' boundary_pos = newline_pos = -1 while newline_pos < 0 and boundary_pos < 0: - chunk = self.wsgi_input.read(self.read_chunk_size) + try: + chunk = self.wsgi_input.read(self.read_chunk_size) + except (IOError, ValueError) as e: + raise swift.common.exceptions.ChunkReadError(str(e)) self.input_buffer += chunk newline_pos = self.input_buffer.find('\r\n') boundary_pos = self.input_buffer.find(self.boundary) diff --git a/swift/obj/server.py b/swift/obj/server.py index e0d43920c5..31ab993869 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -40,7 +40,7 @@ from swift.common.constraints import check_object_creation, \ from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \ DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, DiskFileDeleted, \ DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout, \ - DiskFileXattrNotSupported + ChunkReadError, DiskFileXattrNotSupported from swift.obj import ssync_receiver from swift.common.http import is_success from swift.common.base_storage_server import BaseStorageServer @@ -405,7 +405,7 @@ class ObjectController(BaseStorageServer): if commit_hdrs.get('X-Document', None) == "put commit": rcvd_commit = True drain(commit_iter, self.network_chunk_size, self.client_timeout) - except ChunkReadTimeout: + except (ChunkReadTimeout, ChunkReadError): raise HTTPClientDisconnect() except StopIteration: raise HTTPBadRequest(body="couldn't find PUT commit MIME doc") diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 937d9f4106..83d546b490 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -34,6 +34,7 @@ from tempfile import mkdtemp from hashlib import md5 import itertools import tempfile +from collections import defaultdict from contextlib import contextmanager from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread @@ -5162,6 +5163,95 @@ class TestObjectController(unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertEqual(resp.headers['X-Timestamp'], put_timestamp) + def test_multiphase_put_draining(self): + # We want to ensure that we read the whole response body even if + # it's multipart MIME and there's document parts that we don't + # expect or understand. This'll help save our bacon if we ever jam + # more stuff in there. + in_a_timeout = [False] + + # inherit from BaseException so we get a stack trace when the test + # fails instead of just a 500 + class NotInATimeout(BaseException): + pass + + class FakeTimeout(BaseException): + def __enter__(self): + in_a_timeout[0] = True + + def __exit__(self, typ, value, tb): + in_a_timeout[0] = False + + class PickyWsgiBytesIO(WsgiBytesIO): + def read(self, *a, **kw): + if not in_a_timeout[0]: + raise NotInATimeout() + return WsgiBytesIO.read(self, *a, **kw) + + def readline(self, *a, **kw): + if not in_a_timeout[0]: + raise NotInATimeout() + return WsgiBytesIO.readline(self, *a, **kw) + + test_data = 'obj data' + footer_meta = { + "X-Object-Sysmeta-Ec-Frag-Index": "7", + "Etag": md5(test_data).hexdigest(), + } + footer_json = json.dumps(footer_meta) + footer_meta_cksum = md5(footer_json).hexdigest() + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + "X-Document: object metadata", + "Content-MD5: " + footer_meta_cksum, + "", + footer_json, + "--boundary123", + "X-Document: we got cleverer", + "", + "stuff stuff meaningless stuuuuuuuuuuff", + "--boundary123", + "X-Document: we got even cleverer; can you believe it?", + "Waneshaft: ambifacient lunar", + "Casing: malleable logarithmic", + "", + "potato potato potato potato potato potato potato", + "--boundary123--" + )) + if six.PY3: + test_doc = test_doc.encode('utf-8') + + # phase1 - PUT request with object metadata in footer and + # multiphase commit conversation + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Storage-Policy-Index': '1', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Metadata-Footer': 'yes', + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + } + wsgi_input = PickyWsgiBytesIO(test_doc) + req = Request.blank( + "/sda1/0/a/c/o", + environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input}, + headers=headers) + + app = object_server.ObjectController(self.conf, logger=self.logger) + with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout): + resp = req.get_response(app) + self.assertEqual(resp.status_int, 201) # sanity check + + in_a_timeout[0] = True # so we can check without an exception + self.assertEqual(wsgi_input.read(), '') # we read all the bytes + @patch_policies(test_policies) class TestObjectServer(unittest.TestCase): @@ -5277,46 +5367,33 @@ class TestObjectServer(unittest.TestCase): resp.read() resp.close() - def test_expect_on_multiphase_put(self): - test_data = 'obj data' - test_doc = "\r\n".join(( - "--boundary123", - "X-Document: object body", - "", - test_data, - "--boundary123", - )) + def find_files(self): + found_files = defaultdict(list) + for root, dirs, files in os.walk(self.devices): + for filename in files: + _name, ext = os.path.splitext(filename) + file_path = os.path.join(root, filename) + found_files[ext].append(file_path) + return found_files - put_timestamp = utils.Timestamp(time()).internal - headers = { - 'Content-Type': 'text/plain', - 'X-Timestamp': put_timestamp, - 'Transfer-Encoding': 'chunked', - 'Expect': '100-continue', - 'X-Backend-Obj-Content-Length': len(test_data), - 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', - 'X-Backend-Obj-Multiphase-Commit': 'yes', - } - conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', - 'PUT', '/a/c/o', headers=headers) - resp = conn.getexpect() - self.assertEqual(resp.status, 100) - headers = HeaderKeyDict(resp.getheaders()) - self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') + @contextmanager + def _check_multiphase_put_commit_handling(self, test_doc=None, + headers=None): + """ + This helper will setup a multiphase chunked PUT request and yield at + the context at the commit phase (after getting the second expect-100 + continue response. - to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) - conn.send(to_send) + It can setup a resonable stub request, but you can over-ride some + characteristics of the request via kwargs. - # verify 100-continue response to mark end of phase1 - resp = conn.getexpect() - self.assertEqual(resp.status, 100) - resp.close() - - def test_multiphase_put_metadata_footer(self): - # Test 2-phase commit conversation - end of 1st phase marked - # by 100-continue response from the object server, with a - # successful 2nd phase marked by the presence of a .durable - # file along with .data file in the object data directory + :param test_doc: first part of the mime conversation before the object + server will send the 100-continue, this includes the + object body + :param headers: headers to send along with the initial request; some + object-metadata (e.g. X-Backend-Obj-Content-Length) + is generally expected tomatch the test_doc) + """ test_data = 'obj data' footer_meta = { "X-Object-Sysmeta-Ec-Frag-Index": "2", @@ -5324,7 +5401,7 @@ class TestObjectServer(unittest.TestCase): } footer_json = json.dumps(footer_meta) footer_meta_cksum = md5(footer_json).hexdigest() - test_doc = "\r\n".join(( + test_doc = test_doc or "\r\n".join(( "--boundary123", "X-Document: object body", "", @@ -5339,10 +5416,9 @@ class TestObjectServer(unittest.TestCase): # phase1 - PUT request with object metadata in footer and # multiphase commit conversation - put_timestamp = utils.Timestamp(time()).internal - headers = { + put_timestamp = utils.Timestamp(time()) + headers = headers or { 'Content-Type': 'text/plain', - 'X-Timestamp': put_timestamp, 'Transfer-Encoding': 'chunked', 'Expect': '100-continue', 'X-Backend-Storage-Policy-Index': '1', @@ -5351,55 +5427,200 @@ class TestObjectServer(unittest.TestCase): 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', 'X-Backend-Obj-Multiphase-Commit': 'yes', } - conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', - 'PUT', '/a/c/o', headers=headers) - resp = conn.getexpect() - self.assertEqual(resp.status, 100) - headers = HeaderKeyDict(resp.getheaders()) - self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') - self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes') + put_timestamp = utils.Timestamp(headers.setdefault( + 'X-Timestamp', utils.Timestamp(time()).internal)) + container_update = \ + 'swift.obj.server.ObjectController.container_update' + with mock.patch(container_update) as _container_update: + conn = bufferedhttp.http_connect( + '127.0.0.1', self.port, 'sda1', '0', + 'PUT', '/a/c/o', headers=headers) + resp = conn.getexpect() + self.assertEqual(resp.status, 100) + expect_headers = HeaderKeyDict(resp.getheaders()) - to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) - conn.send(to_send) - # verify 100-continue response to mark end of phase1 - resp = conn.getexpect() - self.assertEqual(resp.status, 100) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) + conn.send(to_send) + # verify 100-continue response to mark end of phase1 + resp = conn.getexpect() + self.assertEqual(resp.status, 100) - # send commit confirmation to start phase2 - commit_confirmation_doc = "\r\n".join(( - "X-Document: put commit", - "", - "commit_confirmation", - "--boundary123--", - )) - to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ - (len(commit_confirmation_doc), commit_confirmation_doc) - conn.send(to_send) + # yield relevant context for test + yield { + 'conn': conn, + 'expect_headers': expect_headers, + 'put_timestamp': put_timestamp, + 'mock_container_update': _container_update, + } - # verify success (2xx) to make end of phase2 - resp = conn.getresponse() - self.assertEqual(resp.status, 201) - resp.read() - resp.close() + for i in range(3): + # give the object server a few trampolines to recognize request + # has finished, or socket has closed or whatever + sleep(0) + + def test_multiphase_put_client_disconnect_right_before_commit(self): + with self._check_multiphase_put_commit_handling() as context: + conn = context['conn'] + # just bail stright out + conn.sock.fd._sock.close() + + put_timestamp = context['put_timestamp'] + _container_update = context['mock_container_update'] + + # and make sure it demonstrates the client disconnect + log_lines = self.logger.get_lines_for_level('info') + self.assertEqual(len(log_lines), 1) + self.assertIn(' 499 ', log_lines[0]) # verify successful object data and durable state file write - obj_basename = os.path.join( - self.devices, 'sda1', - storage_directory(diskfile.get_data_dir(POLICIES[1]), '0', - hash_path('a', 'c', 'o')), - put_timestamp) - obj_datafile = obj_basename + '#2.data' - self.assertTrue(os.path.isfile(obj_datafile)) - obj_durablefile = obj_basename + '.durable' - self.assertTrue(os.path.isfile(obj_durablefile)) + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s#2.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # but .durable isn't + self.assertEqual(found_files['.druable'], []) + # And no continer update + self.assertFalse(_container_update.called) - def test_multiphase_put_no_metadata_footer(self): - # Test 2-phase commit conversation, with no metadata footer - # at the end of object data - end of 1st phase marked - # by 100-continue response from the object server, with a - # successful 2nd phase marked by the presence of a .durable - # file along with .data file in the object data directory - # (No metadata footer case) + def test_multiphase_put_client_disconnect_in_the_middle_of_commit(self): + with self._check_multiphase_put_commit_handling() as context: + conn = context['conn'] + # start commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123--", + )) + # but don't quite the commit body + to_send = "%x\r\n%s" % \ + (len(commit_confirmation_doc), commit_confirmation_doc[:-1]) + conn.send(to_send) + + # and then bail out + conn.sock.fd._sock.close() + + put_timestamp = context['put_timestamp'] + _container_update = context['mock_container_update'] + + # and make sure it demonstrates the client disconnect + log_lines = self.logger.get_lines_for_level('info') + self.assertEqual(len(log_lines), 1) + self.assertIn(' 499 ', log_lines[0]) + + # verify successful object data and durable state file write + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s#2.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # but .durable isn't + self.assertEqual(found_files['.druable'], []) + # And no continer update + self.assertFalse(_container_update.called) + + def test_multiphase_put_no_metadata_replicated(self): + test_data = 'obj data' + test_doc = "\r\n".join(( + "--boundary123", + "X-Document: object body", + "", + test_data, + "--boundary123", + )) + + put_timestamp = utils.Timestamp(time()).internal + headers = { + 'Content-Type': 'text/plain', + 'X-Timestamp': put_timestamp, + 'Transfer-Encoding': 'chunked', + 'Expect': '100-continue', + 'X-Backend-Obj-Content-Length': len(test_data), + 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', + 'X-Backend-Obj-Multiphase-Commit': 'yes', + } + with self._check_multiphase_put_commit_handling( + test_doc=test_doc, headers=headers) as context: + expect_headers = context['expect_headers'] + self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes') + # N.B. no X-Obj-Metadata-Footer header + self.assertNotIn('X-Obj-Metadata-Footer', expect_headers) + + conn = context['conn'] + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + + # verify success (2xx) to make end of phase2 + resp = conn.getresponse() + self.assertEqual(resp.status, 201) + resp.read() + resp.close() + + # verify successful object data and durable state file write + put_timestamp = context['put_timestamp'] + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # replicated objects do not have a .durable file + self.assertEqual(found_files['.durable'], []) + # And continer update was called + self.assertTrue(context['mock_container_update'].called) + + def test_multiphase_put_metadata_footer(self): + with self._check_multiphase_put_commit_handling() as context: + expect_headers = context['expect_headers'] + self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes') + self.assertEqual(expect_headers['X-Obj-Metadata-Footer'], 'yes') + + conn = context['conn'] + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + + # verify success (2xx) to make end of phase2 + resp = conn.getresponse() + self.assertEqual(resp.status, 201) + resp.read() + resp.close() + + # verify successful object data and durable state file write + put_timestamp = context['put_timestamp'] + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s#2.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # .durable file is there + self.assertEqual(len(found_files['.durable']), 1) + durable_file = found_files['.durable'][0] + self.assertEqual("%s.durable" % put_timestamp.internal, + os.path.basename(durable_file)) + # And continer update was called + self.assertTrue(context['mock_container_update'].called) + + def test_multiphase_put_ec_fragment_in_headers_no_footers(self): test_data = 'obj data' test_doc = "\r\n".join(( "--boundary123", @@ -5427,210 +5648,130 @@ class TestObjectServer(unittest.TestCase): 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', 'X-Backend-Obj-Multiphase-Commit': 'yes', } - conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', - 'PUT', '/a/c/o', headers=headers) - resp = conn.getexpect() - self.assertEqual(resp.status, 100) - headers = HeaderKeyDict(resp.getheaders()) - self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') + with self._check_multiphase_put_commit_handling( + test_doc=test_doc, headers=headers) as context: + expect_headers = context['expect_headers'] + self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes') + # N.B. no X-Obj-Metadata-Footer header + self.assertNotIn('X-Obj-Metadata-Footer', expect_headers) - to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) - conn.send(to_send) - # verify 100-continue response to mark end of phase1 - resp = conn.getexpect() - self.assertEqual(resp.status, 100) + conn = context['conn'] + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) - # send commit confirmation to start phase2 - commit_confirmation_doc = "\r\n".join(( - "X-Document: put commit", - "", - "commit_confirmation", - "--boundary123--", - )) - to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ - (len(commit_confirmation_doc), commit_confirmation_doc) - conn.send(to_send) - - # verify success (2xx) to make end of phase2 - resp = conn.getresponse() - self.assertEqual(resp.status, 201) - resp.read() - resp.close() + # verify success (2xx) to make end of phase2 + resp = conn.getresponse() + self.assertEqual(resp.status, 201) + resp.read() + resp.close() # verify successful object data and durable state file write - obj_basename = os.path.join( - self.devices, 'sda1', - storage_directory(diskfile.get_data_dir(POLICIES[1]), '0', - hash_path('a', 'c', 'o')), - put_timestamp) - obj_datafile = obj_basename + '#2.data' - self.assertTrue(os.path.isfile(obj_datafile)) - obj_durablefile = obj_basename + '.durable' - self.assertTrue(os.path.isfile(obj_durablefile)) - - def test_multiphase_put_draining(self): - # We want to ensure that we read the whole response body even if - # it's multipart MIME and there's document parts that we don't - # expect or understand. This'll help save our bacon if we ever jam - # more stuff in there. - in_a_timeout = [False] - - # inherit from BaseException so we get a stack trace when the test - # fails instead of just a 500 - class NotInATimeout(BaseException): - pass - - class FakeTimeout(BaseException): - def __enter__(self): - in_a_timeout[0] = True - - def __exit__(self, typ, value, tb): - in_a_timeout[0] = False - - class PickyWsgiBytesIO(WsgiBytesIO): - def read(self, *a, **kw): - if not in_a_timeout[0]: - raise NotInATimeout() - return WsgiBytesIO.read(self, *a, **kw) - - def readline(self, *a, **kw): - if not in_a_timeout[0]: - raise NotInATimeout() - return WsgiBytesIO.readline(self, *a, **kw) - - test_data = 'obj data' - footer_meta = { - "X-Object-Sysmeta-Ec-Frag-Index": "7", - "Etag": md5(test_data).hexdigest(), - } - footer_json = json.dumps(footer_meta) - footer_meta_cksum = md5(footer_json).hexdigest() - test_doc = "\r\n".join(( - "--boundary123", - "X-Document: object body", - "", - test_data, - "--boundary123", - "X-Document: object metadata", - "Content-MD5: " + footer_meta_cksum, - "", - footer_json, - "--boundary123", - "X-Document: we got cleverer", - "", - "stuff stuff meaningless stuuuuuuuuuuff", - "--boundary123", - "X-Document: we got even cleverer; can you believe it?", - "Waneshaft: ambifacient lunar", - "Casing: malleable logarithmic", - "", - "potato potato potato potato potato potato potato", - "--boundary123--" - )) - if six.PY3: - test_doc = test_doc.encode('utf-8') - - # phase1 - PUT request with object metadata in footer and - # multiphase commit conversation - put_timestamp = utils.Timestamp(time()).internal - headers = { - 'Content-Type': 'text/plain', - 'X-Timestamp': put_timestamp, - 'Transfer-Encoding': 'chunked', - 'Expect': '100-continue', - 'X-Backend-Storage-Policy-Index': '1', - 'X-Backend-Obj-Content-Length': len(test_data), - 'X-Backend-Obj-Metadata-Footer': 'yes', - 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', - } - wsgi_input = PickyWsgiBytesIO(test_doc) - req = Request.blank( - "/sda1/0/a/c/o", - environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input}, - headers=headers) - - app = object_server.ObjectController(self.conf, logger=self.logger) - with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout): - resp = req.get_response(app) - self.assertEqual(resp.status_int, 201) # sanity check - - in_a_timeout[0] = True # so we can check without an exception - self.assertEqual(wsgi_input.read(), '') # we read all the bytes + put_timestamp = context['put_timestamp'] + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s#2.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # .durable file is there + self.assertEqual(len(found_files['.durable']), 1) + durable_file = found_files['.durable'][0] + self.assertEqual("%s.durable" % put_timestamp.internal, + os.path.basename(durable_file)) + # And continer update was called + self.assertTrue(context['mock_container_update'].called) def test_multiphase_put_bad_commit_message(self): - # Test 2-phase commit conversation - end of 1st phase marked - # by 100-continue response from the object server, with 2nd - # phase commit confirmation being received corrupt - test_data = 'obj data' - footer_meta = { - "X-Object-Sysmeta-Ec-Frag-Index": "7", - "Etag": md5(test_data).hexdigest(), - } - footer_json = json.dumps(footer_meta) - footer_meta_cksum = md5(footer_json).hexdigest() - test_doc = "\r\n".join(( - "--boundary123", - "X-Document: object body", - "", - test_data, - "--boundary123", - "X-Document: object metadata", - "Content-MD5: " + footer_meta_cksum, - "", - footer_json, - "--boundary123", - )) - - # phase1 - PUT request with object metadata in footer and - # multiphase commit conversation - put_timestamp = utils.Timestamp(time()).internal - headers = { - 'Content-Type': 'text/plain', - 'X-Timestamp': put_timestamp, - 'Transfer-Encoding': 'chunked', - 'Expect': '100-continue', - 'X-Backend-Storage-Policy-Index': '1', - 'X-Backend-Obj-Content-Length': len(test_data), - 'X-Backend-Obj-Metadata-Footer': 'yes', - 'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123', - 'X-Backend-Obj-Multiphase-Commit': 'yes', - } - conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0', - 'PUT', '/a/c/o', headers=headers) - resp = conn.getexpect() - self.assertEqual(resp.status, 100) - headers = HeaderKeyDict(resp.getheaders()) - self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes') - self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes') - - to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc) - conn.send(to_send) - # verify 100-continue response to mark end of phase1 - resp = conn.getexpect() - self.assertEqual(resp.status, 100) - - # send commit confirmation to start phase2 - commit_confirmation_doc = "\r\n".join(( - "junkjunk", - "--boundary123--", - )) - to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ - (len(commit_confirmation_doc), commit_confirmation_doc) - conn.send(to_send) - resp = conn.getresponse() - self.assertEqual(resp.status, 500) - resp.read() - resp.close() + with self._check_multiphase_put_commit_handling() as context: + conn = context['conn'] + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "junkjunk", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + resp = conn.getresponse() + self.assertEqual(resp.status, 500) + resp.read() + resp.close() + put_timestamp = context['put_timestamp'] + _container_update = context['mock_container_update'] # verify that durable file was NOT created - obj_basename = os.path.join( - self.devices, 'sda1', - storage_directory(diskfile.get_data_dir(1), '0', - hash_path('a', 'c', 'o')), - put_timestamp) - obj_datafile = obj_basename + '#7.data' - self.assertTrue(os.path.isfile(obj_datafile)) - obj_durablefile = obj_basename + '.durable' - self.assertFalse(os.path.isfile(obj_durablefile)) + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s#2.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # but .durable isn't + self.assertEqual(found_files['.druable'], []) + # And no continer update + self.assertFalse(_container_update.called) + + def test_multiphase_put_drains_extra_commit_junk(self): + with self._check_multiphase_put_commit_handling() as context: + conn = context['conn'] + # send commit confirmation to start phase2 + commit_confirmation_doc = "\r\n".join(( + "X-Document: put commit", + "", + "commit_confirmation", + "--boundary123", + "X-Document: we got cleverer", + "", + "stuff stuff meaningless stuuuuuuuuuuff", + "--boundary123", + "X-Document: we got even cleverer; can you believe it?", + "Waneshaft: ambifacient lunar", + "Casing: malleable logarithmic", + "", + "potato potato potato potato potato potato potato", + "--boundary123--", + )) + to_send = "%x\r\n%s\r\n0\r\n\r\n" % \ + (len(commit_confirmation_doc), commit_confirmation_doc) + conn.send(to_send) + + # verify success (2xx) to make end of phase2 + resp = conn.getresponse() + self.assertEqual(resp.status, 201) + resp.read() + + # make another request to validate the HTTP protocol state + conn.putrequest('GET', '/sda1/0/a/c/o') + conn.putheader('X-Backend-Storage-Policy-Index', '1') + conn.endheaders() + resp = conn.getresponse() + self.assertEqual(resp.status, 200) + resp.read() + + resp.close() + + # verify successful object data and durable state file write + put_timestamp = context['put_timestamp'] + found_files = self.find_files() + # .data file is there + self.assertEqual(len(found_files['.data']), 1) + obj_datafile = found_files['.data'][0] + self.assertEqual("%s#2.data" % put_timestamp.internal, + os.path.basename(obj_datafile)) + # .durable file is there + self.assertEqual(len(found_files['.durable']), 1) + durable_file = found_files['.durable'][0] + self.assertEqual("%s.durable" % put_timestamp.internal, + os.path.basename(durable_file)) + # And continer update was called + self.assertTrue(context['mock_container_update'].called) @patch_policies