Fix client disconnect during multhiphase commit
This patch add a test to figure out the failure case behavior of object-server when the connection from proxy-server disconnected during commit phase. Especially, this patch was made to focus on making sure whether or not contaienr updates occurs in the situation. In the process of working on that test we made the behavior of the object-server when the connection from the proxy-server disconnected during the commit phase - reasonable. We capture the IOError/ValueError's that eventlet.wsgi might barf out really close to the wsgi_input read and translate them to a swift.common.exceptions.ChunkReadError so we can handle them at a higher level in the ObjectController's generic PUT disconnect handling. Since that test went so well, we refactored the other ones to use some common context management and wrote a few more. Co-Author: Clay Gerrard <clay.gerrard@gmail.com> Change-Id: I60c98172e524869b06bdf23fd1c4e1bce7a98f80
This commit is contained in:
parent
9e95613d71
commit
9046676968
@ -117,6 +117,10 @@ class PathNotDir(OSError):
|
||||
pass
|
||||
|
||||
|
||||
class ChunkReadError(SwiftException):
|
||||
pass
|
||||
|
||||
|
||||
class ChunkReadTimeout(Timeout):
|
||||
pass
|
||||
|
||||
|
@ -3356,7 +3356,10 @@ class _MultipartMimeFileLikeObject(object):
|
||||
return ''
|
||||
boundary_pos = newline_pos = -1
|
||||
while newline_pos < 0 and boundary_pos < 0:
|
||||
chunk = self.wsgi_input.read(self.read_chunk_size)
|
||||
try:
|
||||
chunk = self.wsgi_input.read(self.read_chunk_size)
|
||||
except (IOError, ValueError) as e:
|
||||
raise swift.common.exceptions.ChunkReadError(str(e))
|
||||
self.input_buffer += chunk
|
||||
newline_pos = self.input_buffer.find('\r\n')
|
||||
boundary_pos = self.input_buffer.find(self.boundary)
|
||||
|
@ -40,7 +40,7 @@ from swift.common.constraints import check_object_creation, \
|
||||
from swift.common.exceptions import ConnectionTimeout, DiskFileQuarantined, \
|
||||
DiskFileNotExist, DiskFileCollision, DiskFileNoSpace, DiskFileDeleted, \
|
||||
DiskFileDeviceUnavailable, DiskFileExpired, ChunkReadTimeout, \
|
||||
DiskFileXattrNotSupported
|
||||
ChunkReadError, DiskFileXattrNotSupported
|
||||
from swift.obj import ssync_receiver
|
||||
from swift.common.http import is_success
|
||||
from swift.common.base_storage_server import BaseStorageServer
|
||||
@ -405,7 +405,7 @@ class ObjectController(BaseStorageServer):
|
||||
if commit_hdrs.get('X-Document', None) == "put commit":
|
||||
rcvd_commit = True
|
||||
drain(commit_iter, self.network_chunk_size, self.client_timeout)
|
||||
except ChunkReadTimeout:
|
||||
except (ChunkReadTimeout, ChunkReadError):
|
||||
raise HTTPClientDisconnect()
|
||||
except StopIteration:
|
||||
raise HTTPBadRequest(body="couldn't find PUT commit MIME doc")
|
||||
|
@ -34,6 +34,7 @@ from tempfile import mkdtemp
|
||||
from hashlib import md5
|
||||
import itertools
|
||||
import tempfile
|
||||
from collections import defaultdict
|
||||
from contextlib import contextmanager
|
||||
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, tpool, greenthread
|
||||
@ -5162,6 +5163,95 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.headers['X-Timestamp'], put_timestamp)
|
||||
|
||||
def test_multiphase_put_draining(self):
|
||||
# We want to ensure that we read the whole response body even if
|
||||
# it's multipart MIME and there's document parts that we don't
|
||||
# expect or understand. This'll help save our bacon if we ever jam
|
||||
# more stuff in there.
|
||||
in_a_timeout = [False]
|
||||
|
||||
# inherit from BaseException so we get a stack trace when the test
|
||||
# fails instead of just a 500
|
||||
class NotInATimeout(BaseException):
|
||||
pass
|
||||
|
||||
class FakeTimeout(BaseException):
|
||||
def __enter__(self):
|
||||
in_a_timeout[0] = True
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
in_a_timeout[0] = False
|
||||
|
||||
class PickyWsgiBytesIO(WsgiBytesIO):
|
||||
def read(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.read(self, *a, **kw)
|
||||
|
||||
def readline(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.readline(self, *a, **kw)
|
||||
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "7",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
"--boundary123--"
|
||||
))
|
||||
if six.PY3:
|
||||
test_doc = test_doc.encode('utf-8')
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Metadata-Footer': 'yes',
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
}
|
||||
wsgi_input = PickyWsgiBytesIO(test_doc)
|
||||
req = Request.blank(
|
||||
"/sda1/0/a/c/o",
|
||||
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input},
|
||||
headers=headers)
|
||||
|
||||
app = object_server.ObjectController(self.conf, logger=self.logger)
|
||||
with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout):
|
||||
resp = req.get_response(app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
in_a_timeout[0] = True # so we can check without an exception
|
||||
self.assertEqual(wsgi_input.read(), '') # we read all the bytes
|
||||
|
||||
|
||||
@patch_policies(test_policies)
|
||||
class TestObjectServer(unittest.TestCase):
|
||||
@ -5277,46 +5367,33 @@ class TestObjectServer(unittest.TestCase):
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
def test_expect_on_multiphase_put(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
))
|
||||
def find_files(self):
|
||||
found_files = defaultdict(list)
|
||||
for root, dirs, files in os.walk(self.devices):
|
||||
for filename in files:
|
||||
_name, ext = os.path.splitext(filename)
|
||||
file_path = os.path.join(root, filename)
|
||||
found_files[ext].append(file_path)
|
||||
return found_files
|
||||
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
@contextmanager
|
||||
def _check_multiphase_put_commit_handling(self, test_doc=None,
|
||||
headers=None):
|
||||
"""
|
||||
This helper will setup a multiphase chunked PUT request and yield at
|
||||
the context at the commit phase (after getting the second expect-100
|
||||
continue response.
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
It can setup a resonable stub request, but you can over-ride some
|
||||
characteristics of the request via kwargs.
|
||||
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
resp.close()
|
||||
|
||||
def test_multiphase_put_metadata_footer(self):
|
||||
# Test 2-phase commit conversation - end of 1st phase marked
|
||||
# by 100-continue response from the object server, with a
|
||||
# successful 2nd phase marked by the presence of a .durable
|
||||
# file along with .data file in the object data directory
|
||||
:param test_doc: first part of the mime conversation before the object
|
||||
server will send the 100-continue, this includes the
|
||||
object body
|
||||
:param headers: headers to send along with the initial request; some
|
||||
object-metadata (e.g. X-Backend-Obj-Content-Length)
|
||||
is generally expected tomatch the test_doc)
|
||||
"""
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "2",
|
||||
@ -5324,7 +5401,7 @@ class TestObjectServer(unittest.TestCase):
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
test_doc = test_doc or "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
@ -5339,10 +5416,9 @@ class TestObjectServer(unittest.TestCase):
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
put_timestamp = utils.Timestamp(time())
|
||||
headers = headers or {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
@ -5351,55 +5427,200 @@ class TestObjectServer(unittest.TestCase):
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
|
||||
put_timestamp = utils.Timestamp(headers.setdefault(
|
||||
'X-Timestamp', utils.Timestamp(time()).internal))
|
||||
container_update = \
|
||||
'swift.obj.server.ObjectController.container_update'
|
||||
with mock.patch(container_update) as _container_update:
|
||||
conn = bufferedhttp.http_connect(
|
||||
'127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
expect_headers = HeaderKeyDict(resp.getheaders())
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
# yield relevant context for test
|
||||
yield {
|
||||
'conn': conn,
|
||||
'expect_headers': expect_headers,
|
||||
'put_timestamp': put_timestamp,
|
||||
'mock_container_update': _container_update,
|
||||
}
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
for i in range(3):
|
||||
# give the object server a few trampolines to recognize request
|
||||
# has finished, or socket has closed or whatever
|
||||
sleep(0)
|
||||
|
||||
def test_multiphase_put_client_disconnect_right_before_commit(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# just bail stright out
|
||||
conn.sock.fd._sock.close()
|
||||
|
||||
put_timestamp = context['put_timestamp']
|
||||
_container_update = context['mock_container_update']
|
||||
|
||||
# and make sure it demonstrates the client disconnect
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(log_lines), 1)
|
||||
self.assertIn(' 499 ', log_lines[0])
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
obj_basename = os.path.join(
|
||||
self.devices, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(POLICIES[1]), '0',
|
||||
hash_path('a', 'c', 'o')),
|
||||
put_timestamp)
|
||||
obj_datafile = obj_basename + '#2.data'
|
||||
self.assertTrue(os.path.isfile(obj_datafile))
|
||||
obj_durablefile = obj_basename + '.durable'
|
||||
self.assertTrue(os.path.isfile(obj_durablefile))
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_no_metadata_footer(self):
|
||||
# Test 2-phase commit conversation, with no metadata footer
|
||||
# at the end of object data - end of 1st phase marked
|
||||
# by 100-continue response from the object server, with a
|
||||
# successful 2nd phase marked by the presence of a .durable
|
||||
# file along with .data file in the object data directory
|
||||
# (No metadata footer case)
|
||||
def test_multiphase_put_client_disconnect_in_the_middle_of_commit(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# start commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
# but don't quite the commit body
|
||||
to_send = "%x\r\n%s" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc[:-1])
|
||||
conn.send(to_send)
|
||||
|
||||
# and then bail out
|
||||
conn.sock.fd._sock.close()
|
||||
|
||||
put_timestamp = context['put_timestamp']
|
||||
_container_update = context['mock_container_update']
|
||||
|
||||
# and make sure it demonstrates the client disconnect
|
||||
log_lines = self.logger.get_lines_for_level('info')
|
||||
self.assertEqual(len(log_lines), 1)
|
||||
self.assertIn(' 499 ', log_lines[0])
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_no_metadata_replicated(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
))
|
||||
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
with self._check_multiphase_put_commit_handling(
|
||||
test_doc=test_doc, headers=headers) as context:
|
||||
expect_headers = context['expect_headers']
|
||||
self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
# N.B. no X-Obj-Metadata-Footer header
|
||||
self.assertNotIn('X-Obj-Metadata-Footer', expect_headers)
|
||||
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# replicated objects do not have a .durable file
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_metadata_footer(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
expect_headers = context['expect_headers']
|
||||
self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
self.assertEqual(expect_headers['X-Obj-Metadata-Footer'], 'yes')
|
||||
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# .durable file is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_ec_fragment_in_headers_no_footers(self):
|
||||
test_data = 'obj data'
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
@ -5427,210 +5648,130 @@ class TestObjectServer(unittest.TestCase):
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
with self._check_multiphase_put_commit_handling(
|
||||
test_doc=test_doc, headers=headers) as context:
|
||||
expect_headers = context['expect_headers']
|
||||
self.assertEqual(expect_headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
# N.B. no X-Obj-Metadata-Footer header
|
||||
self.assertNotIn('X-Obj-Metadata-Footer', expect_headers)
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
obj_basename = os.path.join(
|
||||
self.devices, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(POLICIES[1]), '0',
|
||||
hash_path('a', 'c', 'o')),
|
||||
put_timestamp)
|
||||
obj_datafile = obj_basename + '#2.data'
|
||||
self.assertTrue(os.path.isfile(obj_datafile))
|
||||
obj_durablefile = obj_basename + '.durable'
|
||||
self.assertTrue(os.path.isfile(obj_durablefile))
|
||||
|
||||
def test_multiphase_put_draining(self):
|
||||
# We want to ensure that we read the whole response body even if
|
||||
# it's multipart MIME and there's document parts that we don't
|
||||
# expect or understand. This'll help save our bacon if we ever jam
|
||||
# more stuff in there.
|
||||
in_a_timeout = [False]
|
||||
|
||||
# inherit from BaseException so we get a stack trace when the test
|
||||
# fails instead of just a 500
|
||||
class NotInATimeout(BaseException):
|
||||
pass
|
||||
|
||||
class FakeTimeout(BaseException):
|
||||
def __enter__(self):
|
||||
in_a_timeout[0] = True
|
||||
|
||||
def __exit__(self, typ, value, tb):
|
||||
in_a_timeout[0] = False
|
||||
|
||||
class PickyWsgiBytesIO(WsgiBytesIO):
|
||||
def read(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.read(self, *a, **kw)
|
||||
|
||||
def readline(self, *a, **kw):
|
||||
if not in_a_timeout[0]:
|
||||
raise NotInATimeout()
|
||||
return WsgiBytesIO.readline(self, *a, **kw)
|
||||
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "7",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
"--boundary123--"
|
||||
))
|
||||
if six.PY3:
|
||||
test_doc = test_doc.encode('utf-8')
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Metadata-Footer': 'yes',
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
}
|
||||
wsgi_input = PickyWsgiBytesIO(test_doc)
|
||||
req = Request.blank(
|
||||
"/sda1/0/a/c/o",
|
||||
environ={'REQUEST_METHOD': 'PUT', 'wsgi.input': wsgi_input},
|
||||
headers=headers)
|
||||
|
||||
app = object_server.ObjectController(self.conf, logger=self.logger)
|
||||
with mock.patch('swift.obj.server.ChunkReadTimeout', FakeTimeout):
|
||||
resp = req.get_response(app)
|
||||
self.assertEqual(resp.status_int, 201) # sanity check
|
||||
|
||||
in_a_timeout[0] = True # so we can check without an exception
|
||||
self.assertEqual(wsgi_input.read(), '') # we read all the bytes
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# .durable file is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
def test_multiphase_put_bad_commit_message(self):
|
||||
# Test 2-phase commit conversation - end of 1st phase marked
|
||||
# by 100-continue response from the object server, with 2nd
|
||||
# phase commit confirmation being received corrupt
|
||||
test_data = 'obj data'
|
||||
footer_meta = {
|
||||
"X-Object-Sysmeta-Ec-Frag-Index": "7",
|
||||
"Etag": md5(test_data).hexdigest(),
|
||||
}
|
||||
footer_json = json.dumps(footer_meta)
|
||||
footer_meta_cksum = md5(footer_json).hexdigest()
|
||||
test_doc = "\r\n".join((
|
||||
"--boundary123",
|
||||
"X-Document: object body",
|
||||
"",
|
||||
test_data,
|
||||
"--boundary123",
|
||||
"X-Document: object metadata",
|
||||
"Content-MD5: " + footer_meta_cksum,
|
||||
"",
|
||||
footer_json,
|
||||
"--boundary123",
|
||||
))
|
||||
|
||||
# phase1 - PUT request with object metadata in footer and
|
||||
# multiphase commit conversation
|
||||
put_timestamp = utils.Timestamp(time()).internal
|
||||
headers = {
|
||||
'Content-Type': 'text/plain',
|
||||
'X-Timestamp': put_timestamp,
|
||||
'Transfer-Encoding': 'chunked',
|
||||
'Expect': '100-continue',
|
||||
'X-Backend-Storage-Policy-Index': '1',
|
||||
'X-Backend-Obj-Content-Length': len(test_data),
|
||||
'X-Backend-Obj-Metadata-Footer': 'yes',
|
||||
'X-Backend-Obj-Multipart-Mime-Boundary': 'boundary123',
|
||||
'X-Backend-Obj-Multiphase-Commit': 'yes',
|
||||
}
|
||||
conn = bufferedhttp.http_connect('127.0.0.1', self.port, 'sda1', '0',
|
||||
'PUT', '/a/c/o', headers=headers)
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
headers = HeaderKeyDict(resp.getheaders())
|
||||
self.assertEqual(headers['X-Obj-Multiphase-Commit'], 'yes')
|
||||
self.assertEqual(headers['X-Obj-Metadata-Footer'], 'yes')
|
||||
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % (len(test_doc), test_doc)
|
||||
conn.send(to_send)
|
||||
# verify 100-continue response to mark end of phase1
|
||||
resp = conn.getexpect()
|
||||
self.assertEqual(resp.status, 100)
|
||||
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"junkjunk",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 500)
|
||||
resp.read()
|
||||
resp.close()
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"junkjunk",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 500)
|
||||
resp.read()
|
||||
resp.close()
|
||||
put_timestamp = context['put_timestamp']
|
||||
_container_update = context['mock_container_update']
|
||||
# verify that durable file was NOT created
|
||||
obj_basename = os.path.join(
|
||||
self.devices, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(1), '0',
|
||||
hash_path('a', 'c', 'o')),
|
||||
put_timestamp)
|
||||
obj_datafile = obj_basename + '#7.data'
|
||||
self.assertTrue(os.path.isfile(obj_datafile))
|
||||
obj_durablefile = obj_basename + '.durable'
|
||||
self.assertFalse(os.path.isfile(obj_durablefile))
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# but .durable isn't
|
||||
self.assertEqual(found_files['.druable'], [])
|
||||
# And no continer update
|
||||
self.assertFalse(_container_update.called)
|
||||
|
||||
def test_multiphase_put_drains_extra_commit_junk(self):
|
||||
with self._check_multiphase_put_commit_handling() as context:
|
||||
conn = context['conn']
|
||||
# send commit confirmation to start phase2
|
||||
commit_confirmation_doc = "\r\n".join((
|
||||
"X-Document: put commit",
|
||||
"",
|
||||
"commit_confirmation",
|
||||
"--boundary123",
|
||||
"X-Document: we got cleverer",
|
||||
"",
|
||||
"stuff stuff meaningless stuuuuuuuuuuff",
|
||||
"--boundary123",
|
||||
"X-Document: we got even cleverer; can you believe it?",
|
||||
"Waneshaft: ambifacient lunar",
|
||||
"Casing: malleable logarithmic",
|
||||
"",
|
||||
"potato potato potato potato potato potato potato",
|
||||
"--boundary123--",
|
||||
))
|
||||
to_send = "%x\r\n%s\r\n0\r\n\r\n" % \
|
||||
(len(commit_confirmation_doc), commit_confirmation_doc)
|
||||
conn.send(to_send)
|
||||
|
||||
# verify success (2xx) to make end of phase2
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 201)
|
||||
resp.read()
|
||||
|
||||
# make another request to validate the HTTP protocol state
|
||||
conn.putrequest('GET', '/sda1/0/a/c/o')
|
||||
conn.putheader('X-Backend-Storage-Policy-Index', '1')
|
||||
conn.endheaders()
|
||||
resp = conn.getresponse()
|
||||
self.assertEqual(resp.status, 200)
|
||||
resp.read()
|
||||
|
||||
resp.close()
|
||||
|
||||
# verify successful object data and durable state file write
|
||||
put_timestamp = context['put_timestamp']
|
||||
found_files = self.find_files()
|
||||
# .data file is there
|
||||
self.assertEqual(len(found_files['.data']), 1)
|
||||
obj_datafile = found_files['.data'][0]
|
||||
self.assertEqual("%s#2.data" % put_timestamp.internal,
|
||||
os.path.basename(obj_datafile))
|
||||
# .durable file is there
|
||||
self.assertEqual(len(found_files['.durable']), 1)
|
||||
durable_file = found_files['.durable'][0]
|
||||
self.assertEqual("%s.durable" % put_timestamp.internal,
|
||||
os.path.basename(durable_file))
|
||||
# And continer update was called
|
||||
self.assertTrue(context['mock_container_update'].called)
|
||||
|
||||
|
||||
@patch_policies
|
||||
|
Loading…
Reference in New Issue
Block a user