Merge "Better error handling for EC PUT path when client goes away"
This commit is contained in:
@@ -3330,7 +3330,10 @@ class _MultipartMimeFileLikeObject(object):
|
||||
if len(self.input_buffer) < length + len(self.boundary) + 2:
|
||||
to_read = length + len(self.boundary) + 2
|
||||
while to_read > 0:
|
||||
chunk = self.wsgi_input.read(to_read)
|
||||
try:
|
||||
chunk = self.wsgi_input.read(to_read)
|
||||
except (IOError, ValueError) as e:
|
||||
raise swift.common.exceptions.ChunkReadError(str(e))
|
||||
to_read -= len(chunk)
|
||||
self.input_buffer += chunk
|
||||
if not chunk:
|
||||
@@ -3400,9 +3403,12 @@ def iter_multipart_mime_documents(wsgi_input, boundary, read_chunk_size=4096):
|
||||
"""
|
||||
boundary = '--' + boundary
|
||||
blen = len(boundary) + 2 # \r\n
|
||||
got = wsgi_input.readline(blen)
|
||||
while got == '\r\n':
|
||||
try:
|
||||
got = wsgi_input.readline(blen)
|
||||
while got == '\r\n':
|
||||
got = wsgi_input.readline(blen)
|
||||
except (IOError, ValueError) as e:
|
||||
raise swift.common.exceptions.ChunkReadError(str(e))
|
||||
|
||||
if got.strip() != boundary:
|
||||
raise swift.common.exceptions.MimeInvalid(
|
||||
|
||||
@@ -405,8 +405,10 @@ class ObjectController(BaseStorageServer):
|
||||
if commit_hdrs.get('X-Document', None) == "put commit":
|
||||
rcvd_commit = True
|
||||
drain(commit_iter, self.network_chunk_size, self.client_timeout)
|
||||
except (ChunkReadTimeout, ChunkReadError):
|
||||
except ChunkReadError:
|
||||
raise HTTPClientDisconnect()
|
||||
except ChunkReadTimeout:
|
||||
raise HTTPRequestTimeout()
|
||||
except StopIteration:
|
||||
raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
|
||||
return rcvd_commit
|
||||
@@ -415,16 +417,20 @@ class ObjectController(BaseStorageServer):
|
||||
try:
|
||||
with ChunkReadTimeout(self.client_timeout):
|
||||
footer_hdrs, footer_iter = next(mime_documents_iter)
|
||||
except ChunkReadTimeout:
|
||||
except ChunkReadError:
|
||||
raise HTTPClientDisconnect()
|
||||
except ChunkReadTimeout:
|
||||
raise HTTPRequestTimeout()
|
||||
except StopIteration:
|
||||
raise HTTPBadRequest(body="couldn't find footer MIME doc")
|
||||
|
||||
timeout_reader = self._make_timeout_reader(footer_iter)
|
||||
try:
|
||||
footer_body = ''.join(iter(timeout_reader, ''))
|
||||
except ChunkReadTimeout:
|
||||
except ChunkReadError:
|
||||
raise HTTPClientDisconnect()
|
||||
except ChunkReadTimeout:
|
||||
raise HTTPRequestTimeout()
|
||||
|
||||
footer_md5 = footer_hdrs.get('Content-MD5')
|
||||
if not footer_md5:
|
||||
@@ -609,6 +615,8 @@ class ObjectController(BaseStorageServer):
|
||||
request.environ['wsgi.input'],
|
||||
mime_boundary, self.network_chunk_size)
|
||||
_junk_hdrs, obj_input = next(mime_documents_iter)
|
||||
except ChunkReadError:
|
||||
return HTTPClientDisconnect(request=request)
|
||||
except ChunkReadTimeout:
|
||||
return HTTPRequestTimeout(request=request)
|
||||
|
||||
@@ -622,6 +630,8 @@ class ObjectController(BaseStorageServer):
|
||||
etag.update(chunk)
|
||||
upload_size = writer.write(chunk)
|
||||
elapsed_time += time.time() - start_time
|
||||
except ChunkReadError:
|
||||
return HTTPClientDisconnect(request=request)
|
||||
except ChunkReadTimeout:
|
||||
return HTTPRequestTimeout(request=request)
|
||||
if upload_size:
|
||||
@@ -682,8 +692,10 @@ class ObjectController(BaseStorageServer):
|
||||
_junk_hdrs, _junk_body = next(mime_documents_iter)
|
||||
drain(_junk_body, self.network_chunk_size,
|
||||
self.client_timeout)
|
||||
except ChunkReadTimeout:
|
||||
except ChunkReadError:
|
||||
raise HTTPClientDisconnect()
|
||||
except ChunkReadTimeout:
|
||||
raise HTTPRequestTimeout()
|
||||
except StopIteration:
|
||||
pass
|
||||
|
||||
|
||||
@@ -5271,9 +5271,10 @@ class TestObjectServer(unittest.TestCase):
|
||||
'mount_check': 'false',
|
||||
}
|
||||
self.logger = debug_logger('test-object-server')
|
||||
app = object_server.ObjectController(self.conf, logger=self.logger)
|
||||
self.app = object_server.ObjectController(
|
||||
self.conf, logger=self.logger)
|
||||
sock = listen(('127.0.0.1', 0))
|
||||
self.server = spawn(wsgi.server, sock, app, utils.NullLogger())
|
||||
self.server = spawn(wsgi.server, sock, self.app, utils.NullLogger())
|
||||
self.port = sock.getsockname()[1]
|
||||
|
||||
def tearDown(self):
|
||||
@@ -5367,6 +5368,34 @@ class TestObjectServer(unittest.TestCase):
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
def test_expect_on_multiphase_put_diconnect(self):
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Obj-Content-Length': 0,
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
|
||||
conn.send('c\r\n--boundary123\r\n')
|
||||
|
||||
# disconnect client
|
||||
conn.sock.fd._sock.close()
|
||||
for i in range(2):
|
||||
sleep(0)
|
||||
self.assertFalse(self.logger.get_lines_for_level('error'))
|
||||
for line in self.logger.get_lines_for_level('info'):
|
||||
self.assertIn(' 499 ', line)
|
||||
|
||||
def find_files(self):
|
||||
found_files = defaultdict(list)
|
||||
for root, dirs, files in os.walk(self.devices):
|
||||
@@ -5377,8 +5406,10 @@ class TestObjectServer(unittest.TestCase):
|
||||
return found_files
|
||||
|
||||
@contextmanager
|
||||
def _check_multiphase_put_commit_handling(self, test_doc=None,
|
||||
headers=None):
|
||||
def _check_multiphase_put_commit_handling(self,
|
||||
test_doc=None,
|
||||
headers=None,
|
||||
finish_body=True):
|
||||
"""
|
||||
This helper will setup a multiphase chunked PUT request and yield at
|
||||
the context at the commit phase (after getting the second expect-100
|
||||
@@ -5393,6 +5424,8 @@ class TestObjectServer(unittest.TestCase):
|
||||
:param headers: headers to send along with the initial request; some
|
||||
object-metadata (e.g. X-Backend-Obj-Content-Length)
|
||||
is generally expected tomatch the test_doc)
|
||||
:param finish_body: boolean, if true send "0\r\n\r\n" after test_doc
|
||||
and wait for 100-continue before yeilding context
|
||||
"""
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
@@ -5439,11 +5472,13 @@ class TestObjectServer(unittest.TestCase):
|
||||
self.assertEqual(resp.status, 100)
|
||||
expect_headers = HeaderKeyDict(resp.getheaders())
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
to_send = "%x\r\n%s\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
if finish_body:
|
||||
conn.send("0\r\n\r\n")
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
|
||||
# yield relevant context for test
|
||||
yield {
|
||||
@@ -5453,10 +5488,9 @@ class TestObjectServer(unittest.TestCase):
|
||||
'mock_container_update': _container_update,
|
||||
}
|
||||
|
||||
for i in range(3):
|
||||
# give the object server a few trampolines to recognize request
|
||||
# has finished, or socket has closed or whatever
|
||||
sleep(0)
|
||||
# give the object server a few trampolines to recognize request
|
||||
# has finished, or socket has closed or whatever
|
||||
sleep(0.1)
|
||||
|
||||
def test_multiphase_put_client_disconnect_right_before_commit(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
@@ -5480,7 +5514,7 @@ class TestObjectServer(unittest.TestCase):
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
@@ -5518,7 +5552,7 @@ class TestObjectServer(unittest.TestCase):
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
@@ -5620,6 +5654,58 @@ class TestObjectServer(unittest.TestCase):
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_metadata_footer_disconnect(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
))
|
||||
# eventlet.wsgi won't return < network_chunk_size from a chunked read
|
||||
self.app.network_chunk_size = 16
|
||||
with self._check_multiphase_put_commit_handling(
|
||||
test_doc=test_doc, finish_body=False) as context:
|
||||
conn = context['conn']
|
||||
|
||||
# make footer doc
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "2",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
|
||||
# send most of the footer doc
|
||||
footer_doc = "\r\n".join((
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
))
|
||||
|
||||
# but don't send final boundry or last chunk
|
||||
to_send = "%x\r\n%s\r\n" % \
|
||||
(len(footer_doc), footer_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# and then bail out
|
||||
conn.sock.fd._sock.close()
|
||||
|
||||
# and make sure it demonstrates the client disconnect
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(log_lines), 1)
|
||||
self.assertIn(' 499 ', log_lines[0])
|
||||
|
||||
# no artifacts left on disk
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(len(found_files['.data']), 0)
|
||||
self.assertEqual(len(found_files['.durable']), 0)
|
||||
# ... and no continer update
|
||||
_container_update = context['mock_container_update']
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_ec_fragment_in_headers_no_footers(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
@@ -5714,7 +5800,7 @@ class TestObjectServer(unittest.TestCase):
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
@@ -5773,6 +5859,56 @@ class TestObjectServer(unittest.TestCase):
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_drains_extra_commit_junk_disconnect(self):
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
))
|
||||
# eventlet.wsgi won't return < network_chunk_size from a chunked read
|
||||
self.app.network_chunk_size = 16
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# send commit confirmation and some other stuff
|
||||
# but don't send final boundry or last chunk
|
||||
to_send = "%x\r\n%s\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# and then bail out
|
||||
conn.sock.fd._sock.close()
|
||||
|
||||
# and make sure it demonstrates the client disconnect
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(log_lines), 1)
|
||||
self.assertIn(' 499 ', log_lines[0])
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# ... and .durable is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# but no continer update
|
||||
self.assertFalse(context['mock_container_update'].called)
|
||||
|
||||
|
||||
@patch_policies
|
||||
class TestZeroCopy(unittest.TestCase):
|
||||
|
||||
Reference in New Issue
Block a user