Merge "Fix proxy handling of EC client disconnect"
This commit is contained in:
commit
a7837c785a
@ -53,7 +53,7 @@ from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \
|
||||
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
|
||||
PutterConnectError
|
||||
PutterConnectError, ChunkReadError
|
||||
from swift.common.http import (
|
||||
is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
|
||||
HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR,
|
||||
@ -721,8 +721,13 @@ class BaseObjectController(Controller):
|
||||
if error_response:
|
||||
return error_response
|
||||
else:
|
||||
reader = req.environ['wsgi.input'].read
|
||||
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
|
||||
def reader():
|
||||
try:
|
||||
return req.environ['wsgi.input'].read(
|
||||
self.app.client_chunk_size)
|
||||
except (ValueError, IOError) as e:
|
||||
raise ChunkReadError(str(e))
|
||||
data_source = iter(reader, '')
|
||||
update_response = lambda req, resp: resp
|
||||
|
||||
# check if object is set to be automatically deleted (i.e. expired)
|
||||
@ -962,6 +967,12 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except ChunkReadError:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending last chunk'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except (Exception, Timeout):
|
||||
self.app.logger.exception(
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
@ -2162,24 +2173,6 @@ class ECObjectController(BaseObjectController):
|
||||
try:
|
||||
chunk = next(data_source)
|
||||
except StopIteration:
|
||||
computed_etag = (etag_hasher.hexdigest()
|
||||
if etag_hasher else None)
|
||||
received_etag = req.headers.get(
|
||||
'etag', '').strip('"')
|
||||
if (computed_etag and received_etag and
|
||||
computed_etag != received_etag):
|
||||
raise HTTPUnprocessableEntity(request=req)
|
||||
|
||||
send_chunk('') # flush out any buffered data
|
||||
|
||||
for putter in putters:
|
||||
trail_md = trailing_metadata(
|
||||
policy, etag_hasher,
|
||||
bytes_transferred,
|
||||
chunk_index[putter])
|
||||
trail_md['Etag'] = \
|
||||
putter.chunk_hasher.hexdigest()
|
||||
putter.end_of_object_data(trail_md)
|
||||
break
|
||||
bytes_transferred += len(chunk)
|
||||
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
||||
@ -2187,6 +2180,33 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
send_chunk(chunk)
|
||||
|
||||
if req.content_length and (
|
||||
bytes_transferred < req.content_length):
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending enough data'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
computed_etag = (etag_hasher.hexdigest()
|
||||
if etag_hasher else None)
|
||||
received_etag = req.headers.get(
|
||||
'etag', '').strip('"')
|
||||
if (computed_etag and received_etag and
|
||||
computed_etag != received_etag):
|
||||
raise HTTPUnprocessableEntity(request=req)
|
||||
|
||||
send_chunk('') # flush out any buffered data
|
||||
|
||||
for putter in putters:
|
||||
trail_md = trailing_metadata(
|
||||
policy, etag_hasher,
|
||||
bytes_transferred,
|
||||
chunk_index[putter])
|
||||
trail_md['Etag'] = \
|
||||
putter.chunk_hasher.hexdigest()
|
||||
putter.end_of_object_data(trail_md)
|
||||
|
||||
for putter in putters:
|
||||
putter.wait()
|
||||
|
||||
@ -2219,18 +2239,18 @@ class ECObjectController(BaseObjectController):
|
||||
_('ERROR Client read timeout (%ss)'), err.seconds)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except ChunkReadError:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending last chunk'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except (Exception, Timeout):
|
||||
self.app.logger.exception(
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
if req.content_length and bytes_transferred < req.content_length:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending enough data'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
def _have_adequate_successes(self, statuses, min_responses):
|
||||
"""
|
||||
|
@ -39,9 +39,11 @@ import functools
|
||||
from swift.obj import diskfile
|
||||
import re
|
||||
import random
|
||||
from collections import defaultdict
|
||||
|
||||
import mock
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, debug
|
||||
from eventlet.green import httplib
|
||||
from six import BytesIO
|
||||
from six import StringIO
|
||||
from six.moves import range
|
||||
@ -6072,6 +6074,119 @@ class TestECMismatchedFA(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
|
||||
class TestObjectDisconnectCleanup(unittest.TestCase):
|
||||
|
||||
# update this if you need to make more different devices in do_setup
|
||||
device_pattern = re.compile('sd[a-z][0-9]')
|
||||
|
||||
def _cleanup_devices(self):
|
||||
# make sure all the object data is cleaned up
|
||||
for dev in os.listdir(_testdir):
|
||||
if not self.device_pattern.match(dev):
|
||||
continue
|
||||
device_path = os.path.join(_testdir, dev)
|
||||
for datadir in os.listdir(device_path):
|
||||
if 'object' not in datadir:
|
||||
continue
|
||||
data_path = os.path.join(device_path, datadir)
|
||||
rmtree(data_path, ignore_errors=True)
|
||||
mkdirs(data_path)
|
||||
|
||||
def setUp(self):
|
||||
debug.hub_exceptions(False)
|
||||
self._cleanup_devices()
|
||||
|
||||
def tearDown(self):
|
||||
debug.hub_exceptions(True)
|
||||
self._cleanup_devices()
|
||||
|
||||
def _check_disconnect_cleans_up(self, policy_name, is_chunked=False):
|
||||
proxy_port = _test_sockets[0].getsockname()[1]
|
||||
|
||||
def put(path, headers=None, body=None):
|
||||
conn = httplib.HTTPConnection('localhost', proxy_port)
|
||||
try:
|
||||
conn.connect()
|
||||
conn.putrequest('PUT', path)
|
||||
for k, v in (headers or {}).items():
|
||||
conn.putheader(k, v)
|
||||
conn.endheaders()
|
||||
body = body or ['']
|
||||
for chunk in body:
|
||||
if is_chunked:
|
||||
chunk = '%x\r\n%s\r\n' % (len(chunk), chunk)
|
||||
conn.send(chunk)
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
finally:
|
||||
# seriously - shut this mother down
|
||||
if conn.sock:
|
||||
conn.sock.fd._sock.close()
|
||||
return resp, body
|
||||
|
||||
# ensure container
|
||||
container_path = '/v1/a/%s-disconnect-test' % policy_name
|
||||
resp, _body = put(container_path, headers={
|
||||
'Connection': 'close',
|
||||
'X-Storage-Policy': policy_name,
|
||||
'Content-Length': '0',
|
||||
})
|
||||
self.assertIn(resp.status, (201, 202))
|
||||
|
||||
def exploding_body():
|
||||
for i in range(3):
|
||||
yield '\x00' * (64 * 2 ** 10)
|
||||
raise Exception('kaboom!')
|
||||
|
||||
headers = {}
|
||||
if is_chunked:
|
||||
headers['Transfer-Encoding'] = 'chunked'
|
||||
else:
|
||||
headers['Content-Length'] = 64 * 2 ** 20
|
||||
|
||||
obj_path = container_path + '/disconnect-data'
|
||||
try:
|
||||
resp, _body = put(obj_path, headers=headers,
|
||||
body=exploding_body())
|
||||
except Exception as e:
|
||||
if str(e) != 'kaboom!':
|
||||
raise
|
||||
else:
|
||||
self.fail('obj put connection did not ka-splod')
|
||||
|
||||
sleep(0.1)
|
||||
|
||||
def find_files(self):
|
||||
found_files = defaultdict(list)
|
||||
for root, dirs, files in os.walk(_testdir):
|
||||
for fname in files:
|
||||
filename, ext = os.path.splitext(fname)
|
||||
found_files[ext].append(os.path.join(root, fname))
|
||||
return found_files
|
||||
|
||||
def test_repl_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('zero')
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
def test_ec_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('ec')
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
def test_repl_chunked_transfer_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('zero', is_chunked=True)
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
def test_ec_chunked_transfer_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('ec', is_chunked=True)
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
|
||||
class TestObjectECRangedGET(unittest.TestCase):
|
||||
def setUp(self):
|
||||
_test_servers[0].logger._clear()
|
||||
|
Loading…
Reference in New Issue
Block a user