Merge "Fix client disconnect during multhiphase commit"
This commit is contained in:
commit
ce8f503ead
@ -117,6 +117,10 @@ class PathNotDir(OSError):
|
||||
pass
|
||||
|
||||
|
||||
class ChunkReadError(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class ChunkReadTimeout(Timeout):
|
||||
pass
|
||||
|
||||
|
@ -3356,7 +3356,10 @@ class _MultipartMimeFileLikeObject(object):
|
||||
return ''
|
||||
boundary_pos = newline_pos = -1
|
||||
while newline_pos < 0 and boundary_pos < 0:
|
||||
chunk = self.wsgi_input.read(self.read_chunk_size)
|
||||
try:
|
||||
chunk = self.wsgi_input.read(self.read_chunk_size)
|
||||
except (IOError, ValueError) as e:
|
||||
raise swift.common.exceptions.ChunkReadError(str(e))
|
||||
self.input_buffer += chunk
|
||||
newline_pos = self.input_buffer.find('\r\n')
|
||||
boundary_pos = self.input_buffer.find(self.boundary)
|
||||
|
@ -40,7 +40,7 @@ from swift.common.constraints import check_object_creation, \
|
||||
from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
|
||||
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, DiskFileDeleted, \
|
||||
DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout, \
|
||||
DiskFileXattrNotSupported
|
||||
ChunkReadError, DiskFileXattrNotSupported
|
||||
from swift.obj import ssync_receiver
|
||||
from swift.common.http import is_success
|
||||
from swift.common.base_storage_server import BaseStorageServer
|
||||
@ -405,7 +405,7 @@ class ObjectController(BaseStorageServer):
|
||||
if commit_hdrs.get('X-Document', None) == "put commit":
|
||||
rcvd_commit = True
|
||||
drain(commit_iter, self.network_chunk_size, self.client_timeout)
|
||||
except ChunkReadTimeout:
|
||||
except (ChunkReadTimeout, ChunkReadError):
|
||||
raise HTTPClientDisconnect()
|
||||
except StopIteration:
|
||||
raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
|
||||
|
@ -34,6 +34,7 @@ from tempfile import mkdtemp
|
||||
from hashlib import md5
|
||||
import itertools
|
||||
import tempfile
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread
|
||||
@ -5162,6 +5163,95 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers['X-Timestamp'], put_timestamp)
|
||||
|
||||
def test_multiphase_put_draining(self):
|
||||
# We want to ensure that we read the whole response body even if
|
||||
# it's multipart MIME and there's document parts that we don't
|
||||
# expect or understand. This'll help save our bacon if we ever jam
|
||||
# more stuff in there.
|
||||
in_a_timeout = [False]
|
||||
|
||||
# inherit from BaseException so we get a stack trace when the test
|
||||
# fails instead of just a 500
|
||||
class NotInATimeout(BaseException):
|
||||
pass
|
||||
|
||||
class FakeTimeout(BaseException):
|
||||
def __enter__(self):
|
||||
in_a_timeout[0] = True
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
in_a_timeout[0] = False
|
||||
|
||||
class PickyWsgiBytesIO(WsgiBytesIO):
|
||||
def read(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.read(self, *a, **kw)
|
||||
|
||||
def readline(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.readline(self, *a, **kw)
|
||||
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "7",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
"--boundary123--"
|
||||
))
|
||||
if six.PY3:
|
||||
test_doc = test_doc.encode('utf-8')
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Metadata-Footer': 'yes',
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
}
|
||||
wsgi_input = PickyWsgiBytesIO(test_doc)
|
||||
req = Request.blank(
|
||||
"/sda1/0/a/c/o",
|
||||
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input},
|
||||
headers=headers)
|
||||
|
||||
app = object_server.ObjectController(self.conf, logger=self.logger)
|
||||
with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout):
|
||||
resp = req.get_response(app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
in_a_timeout[0] = True # so we can check without an exception
|
||||
self.assertEqual(wsgi_input.read(), '') # we read all the bytes
|
||||
|
||||
|
||||
@patch_policies(test_policies)
|
||||
class TestObjectServer(unittest.TestCase):
|
||||
@ -5277,46 +5367,33 @@ class TestObjectServer(unittest.TestCase):
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
def test_expect_on_multiphase_put(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
))
|
||||
def find_files(self):
|
||||
found_files = defaultdict(list)
|
||||
for root, dirs, files in os.walk(self.devices):
|
||||
for filename in files:
|
||||
_name, ext = os.path.splitext(filename)
|
||||
file_path = os.path.join(root, filename)
|
||||
found_files[ext].append(file_path)
|
||||
return found_files
|
||||
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
@contextmanager
|
||||
def _check_multiphase_put_commit_handling(self, test_doc=None,
|
||||
headers=None):
|
||||
"""
|
||||
This helper will setup a multiphase chunked PUT request and yield at
|
||||
the context at the commit phase (after getting the second expect-100
|
||||
continue response.
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
It can setup a resonable stub request, but you can over-ride some
|
||||
characteristics of the request via kwargs.
|
||||
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
resp.close()
|
||||
|
||||
def test_multiphase_put_metadata_footer(self):
|
||||
# Test 2-phase commit conversation - end of 1st phase marked
|
||||
# by 100-continue response from the object server, with a
|
||||
# successful 2nd phase marked by the presence of a .durable
|
||||
# file along with .data file in the object data directory
|
||||
:param test_doc: first part of the mime conversation before the object
|
||||
server will send the 100-continue, this includes the
|
||||
object body
|
||||
:param headers: headers to send along with the initial request; some
|
||||
object-metadata (e.g. X-Backend-Obj-Content-Length)
|
||||
is generally expected tomatch the test_doc)
|
||||
"""
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "2",
|
||||
@ -5324,7 +5401,7 @@ class TestObjectServer(unittest.TestCase):
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
test_doc = test_doc or "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
@ -5339,10 +5416,9 @@ class TestObjectServer(unittest.TestCase):
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
put_timestamp = utils.Timestamp(time())
|
||||
headers = headers or {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
@ -5351,55 +5427,200 @@ class TestObjectServer(unittest.TestCase):
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
|
||||
put_timestamp = utils.Timestamp(headers.setdefault(
|
||||
'X-Timestamp', utils.Timestamp(time()).internal))
|
||||
container_update = \
|
||||
'swift.obj.server.ObjectController.container_update'
|
||||
with mock.patch(container_update) as _container_update:
|
||||
conn = bufferedhttp.http_connect(
|
||||
'127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
expect_headers = HeaderKeyDict(resp.getheaders())
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
# yield relevant context for test
|
||||
yield {
|
||||
'conn': conn,
|
||||
'expect_headers': expect_headers,
|
||||
'put_timestamp': put_timestamp,
|
||||
'mock_container_update': _container_update,
|
||||
}
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
for i in range(3):
|
||||
# give the object server a few trampolines to recognize request
|
||||
# has finished, or socket has closed or whatever
|
||||
sleep(0)
|
||||
|
||||
def test_multiphase_put_client_disconnect_right_before_commit(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# just bail stright out
|
||||
conn.sock.fd._sock.close()
|
||||
|
||||
put_timestamp = context['put_timestamp']
|
||||
_container_update = context['mock_container_update']
|
||||
|
||||
# and make sure it demonstrates the client disconnect
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(log_lines), 1)
|
||||
self.assertIn(' 499 ', log_lines[0])
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
obj_basename = os.path.join(
|
||||
self.devices, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(POLICIES[1]), '0',
|
||||
hash_path('a', 'c', 'o')),
|
||||
put_timestamp)
|
||||
obj_datafile = obj_basename + '#2.data'
|
||||
self.assertTrue(os.path.isfile(obj_datafile))
|
||||
obj_durablefile = obj_basename + '.durable'
|
||||
self.assertTrue(os.path.isfile(obj_durablefile))
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_no_metadata_footer(self):
|
||||
# Test 2-phase commit conversation, with no metadata footer
|
||||
# at the end of object data - end of 1st phase marked
|
||||
# by 100-continue response from the object server, with a
|
||||
# successful 2nd phase marked by the presence of a .durable
|
||||
# file along with .data file in the object data directory
|
||||
# (No metadata footer case)
|
||||
def test_multiphase_put_client_disconnect_in_the_middle_of_commit(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# start commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
# but don't quite the commit body
|
||||
to_send = "%x\r\n%s" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc[:-1])
|
||||
conn.send(to_send)
|
||||
|
||||
# and then bail out
|
||||
conn.sock.fd._sock.close()
|
||||
|
||||
put_timestamp = context['put_timestamp']
|
||||
_container_update = context['mock_container_update']
|
||||
|
||||
# and make sure it demonstrates the client disconnect
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(log_lines), 1)
|
||||
self.assertIn(' 499 ', log_lines[0])
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_no_metadata_replicated(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
))
|
||||
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
with self._check_multiphase_put_commit_handling(
|
||||
test_doc=test_doc, headers=headers) as context:
|
||||
expect_headers = context['expect_headers']
|
||||
self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
# N.B. no X-Obj-Metadata-Footer header
|
||||
self.assertNotIn('X-Obj-Metadata-Footer', expect_headers)
|
||||
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# replicated objects do not have a .durable file
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_metadata_footer(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
expect_headers = context['expect_headers']
|
||||
self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
self.assertEqual(expect_headers['X-Obj-Metadata-Footer'], 'yes')
|
||||
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# .durable file is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_ec_fragment_in_headers_no_footers(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
@ -5427,210 +5648,130 @@ class TestObjectServer(unittest.TestCase):
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
with self._check_multiphase_put_commit_handling(
|
||||
test_doc=test_doc, headers=headers) as context:
|
||||
expect_headers = context['expect_headers']
|
||||
self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
# N.B. no X-Obj-Metadata-Footer header
|
||||
self.assertNotIn('X-Obj-Metadata-Footer', expect_headers)
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
obj_basename = os.path.join(
|
||||
self.devices, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(POLICIES[1]), '0',
|
||||
hash_path('a', 'c', 'o')),
|
||||
put_timestamp)
|
||||
obj_datafile = obj_basename + '#2.data'
|
||||
self.assertTrue(os.path.isfile(obj_datafile))
|
||||
obj_durablefile = obj_basename + '.durable'
|
||||
self.assertTrue(os.path.isfile(obj_durablefile))
|
||||
|
||||
def test_multiphase_put_draining(self):
|
||||
# We want to ensure that we read the whole response body even if
|
||||
# it's multipart MIME and there's document parts that we don't
|
||||
# expect or understand. This'll help save our bacon if we ever jam
|
||||
# more stuff in there.
|
||||
in_a_timeout = [False]
|
||||
|
||||
# inherit from BaseException so we get a stack trace when the test
|
||||
# fails instead of just a 500
|
||||
class NotInATimeout(BaseException):
|
||||
pass
|
||||
|
||||
class FakeTimeout(BaseException):
|
||||
def __enter__(self):
|
||||
in_a_timeout[0] = True
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
in_a_timeout[0] = False
|
||||
|
||||
class PickyWsgiBytesIO(WsgiBytesIO):
|
||||
def read(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.read(self, *a, **kw)
|
||||
|
||||
def readline(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.readline(self, *a, **kw)
|
||||
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "7",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
"--boundary123--"
|
||||
))
|
||||
if six.PY3:
|
||||
test_doc = test_doc.encode('utf-8')
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Metadata-Footer': 'yes',
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
}
|
||||
wsgi_input = PickyWsgiBytesIO(test_doc)
|
||||
req = Request.blank(
|
||||
"/sda1/0/a/c/o",
|
||||
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input},
|
||||
headers=headers)
|
||||
|
||||
app = object_server.ObjectController(self.conf, logger=self.logger)
|
||||
with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout):
|
||||
resp = req.get_response(app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
in_a_timeout[0] = True # so we can check without an exception
|
||||
self.assertEqual(wsgi_input.read(), '') # we read all the bytes
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# .durable file is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_bad_commit_message(self):
|
||||
# Test 2-phase commit conversation - end of 1st phase marked
|
||||
# by 100-continue response from the object server, with 2nd
|
||||
# phase commit confirmation being received corrupt
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "7",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
"--boundary123",
|
||||
))
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Metadata-Footer': 'yes',
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"junkjunk",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 500)
|
||||
resp.read()
|
||||
resp.close()
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"junkjunk",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 500)
|
||||
resp.read()
|
||||
resp.close()
|
||||
put_timestamp = context['put_timestamp']
|
||||
_container_update = context['mock_container_update']
|
||||
# verify that durable file was NOT created
|
||||
obj_basename = os.path.join(
|
||||
self.devices, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(1), '0',
|
||||
hash_path('a', 'c', 'o')),
|
||||
put_timestamp)
|
||||
obj_datafile = obj_basename + '#7.data'
|
||||
self.assertTrue(os.path.isfile(obj_datafile))
|
||||
obj_durablefile = obj_basename + '.durable'
|
||||
self.assertFalse(os.path.isfile(obj_durablefile))
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_drains_extra_commit_junk(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
|
||||
# make another request to validate the HTTP protocol state
|
||||
conn.putrequest('GET', '/sda1/0/a/c/o')
|
||||
conn.putheader('X-Backend-Storage-Policy-Index', '1')
|
||||
conn.endheaders()
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 200)
|
||||
resp.read()
|
||||
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# .durable file is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
|
||||
@patch_policies
|
||||
|
Loading…
x
Reference in New Issue
Block a user