Fix proxy handling of EC client disconnect
The ECObjectController was unconditionally sending down the frag archive commit document after the client source stream terminated - even if the client disconnected early. We can detect early disconnect in two ways: 1. Content-Length and not enough bytes_transfered When eventlet.wsgi is reading from a Content-Length body the readable returns the empty string and our iterable raises StopIteration - but we can check content-length against bytes_transfered and know if the client disconnected. 2. Transfer-Encoding: chunked - w/o a 0\r\n\r\n When eventlet.wsgi is reading from a Transfer-Encoding: chunked body the socket read returns the empty string, eventlet.wsgi's chunked parser raises ValueError (which we translate to ChunkReadError*) and we know we know the client disconnected. ... if we detect either of these conditions the proxy should: 1. *not* send down the commit document to object servers 2. disconnect from backend servers 3. log the client disconnect Oddly the code on master was only messing up the first part. Backend connections were terminated (gracefully after the commit document), and then the disconnect was being logged as 499. So now we only send down the commit document on a successful complete client HTTP request (either whole Content-Length, or clean Transfer-Encoding: chunked 0\r\n\r\n). * To detect the early disconnect on Transfer-Encoding: chunked a new swift.common.exceptions.ChunkReadError is used to translate eventlet.wsgi's more general IOError and ValueErrors into something more appropriate to catch and handle closer to our generic ChunkReadTimeout handling. Co-Author: Alistair Coles <alistair.coles@hp.com> Closes-Bug: #1496205 Change-Id: I028a530aba82d50baa4ee1d05ddce18d4cce4e81
This commit is contained in:
parent
9046676968
commit
3afdcf6b8f
@ -53,7 +53,7 @@ from swift.common import constraints
|
||||
from swift.common.exceptions import ChunkReadTimeout, \
|
||||
ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \
|
||||
InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \
|
||||
PutterConnectError
|
||||
PutterConnectError, ChunkReadError
|
||||
from swift.common.http import (
|
||||
is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED,
|
||||
HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR,
|
||||
@ -721,8 +721,13 @@ class BaseObjectController(Controller):
|
||||
if error_response:
|
||||
return error_response
|
||||
else:
|
||||
reader = req.environ['wsgi.input'].read
|
||||
data_source = iter(lambda: reader(self.app.client_chunk_size), '')
|
||||
def reader():
|
||||
try:
|
||||
return req.environ['wsgi.input'].read(
|
||||
self.app.client_chunk_size)
|
||||
except (ValueError, IOError) as e:
|
||||
raise ChunkReadError(str(e))
|
||||
data_source = iter(reader, '')
|
||||
update_response = lambda req, resp: resp
|
||||
|
||||
# check if object is set to be automatically deleted (i.e. expired)
|
||||
@ -962,6 +967,12 @@ class ReplicatedObjectController(BaseObjectController):
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except ChunkReadError:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending last chunk'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except (Exception, Timeout):
|
||||
self.app.logger.exception(
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
@ -2162,24 +2173,6 @@ class ECObjectController(BaseObjectController):
|
||||
try:
|
||||
chunk = next(data_source)
|
||||
except StopIteration:
|
||||
computed_etag = (etag_hasher.hexdigest()
|
||||
if etag_hasher else None)
|
||||
received_etag = req.headers.get(
|
||||
'etag', '').strip('"')
|
||||
if (computed_etag and received_etag and
|
||||
computed_etag != received_etag):
|
||||
raise HTTPUnprocessableEntity(request=req)
|
||||
|
||||
send_chunk('') # flush out any buffered data
|
||||
|
||||
for putter in putters:
|
||||
trail_md = trailing_metadata(
|
||||
policy, etag_hasher,
|
||||
bytes_transferred,
|
||||
chunk_index[putter])
|
||||
trail_md['Etag'] = \
|
||||
putter.chunk_hasher.hexdigest()
|
||||
putter.end_of_object_data(trail_md)
|
||||
break
|
||||
bytes_transferred += len(chunk)
|
||||
if bytes_transferred > constraints.MAX_FILE_SIZE:
|
||||
@ -2187,6 +2180,33 @@ class ECObjectController(BaseObjectController):
|
||||
|
||||
send_chunk(chunk)
|
||||
|
||||
if req.content_length and (
|
||||
bytes_transferred < req.content_length):
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending enough data'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
computed_etag = (etag_hasher.hexdigest()
|
||||
if etag_hasher else None)
|
||||
received_etag = req.headers.get(
|
||||
'etag', '').strip('"')
|
||||
if (computed_etag and received_etag and
|
||||
computed_etag != received_etag):
|
||||
raise HTTPUnprocessableEntity(request=req)
|
||||
|
||||
send_chunk('') # flush out any buffered data
|
||||
|
||||
for putter in putters:
|
||||
trail_md = trailing_metadata(
|
||||
policy, etag_hasher,
|
||||
bytes_transferred,
|
||||
chunk_index[putter])
|
||||
trail_md['Etag'] = \
|
||||
putter.chunk_hasher.hexdigest()
|
||||
putter.end_of_object_data(trail_md)
|
||||
|
||||
for putter in putters:
|
||||
putter.wait()
|
||||
|
||||
@ -2219,18 +2239,18 @@ class ECObjectController(BaseObjectController):
|
||||
_('ERROR Client read timeout (%ss)'), err.seconds)
|
||||
self.app.logger.increment('client_timeouts')
|
||||
raise HTTPRequestTimeout(request=req)
|
||||
except ChunkReadError:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending last chunk'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
except HTTPException:
|
||||
raise
|
||||
except (Exception, Timeout):
|
||||
self.app.logger.exception(
|
||||
_('ERROR Exception causing client disconnect'))
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
if req.content_length and bytes_transferred < req.content_length:
|
||||
req.client_disconnect = True
|
||||
self.app.logger.warn(
|
||||
_('Client disconnected without sending enough data'))
|
||||
self.app.logger.increment('client_disconnects')
|
||||
raise HTTPClientDisconnect(request=req)
|
||||
|
||||
def _have_adequate_successes(self, statuses, min_responses):
|
||||
"""
|
||||
|
@ -39,9 +39,11 @@ import functools
|
||||
from swift.obj import diskfile
|
||||
import re
|
||||
import random
|
||||
from collections import defaultdict
|
||||
|
||||
import mock
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout
|
||||
from eventlet import sleep, spawn, wsgi, listen, Timeout, debug
|
||||
from eventlet.green import httplib
|
||||
from six import BytesIO
|
||||
from six import StringIO
|
||||
from six.moves import range
|
||||
@ -6072,6 +6074,119 @@ class TestECMismatchedFA(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 503)
|
||||
|
||||
|
||||
class TestObjectDisconnectCleanup(unittest.TestCase):
|
||||
|
||||
# update this if you need to make more different devices in do_setup
|
||||
device_pattern = re.compile('sd[a-z][0-9]')
|
||||
|
||||
def _cleanup_devices(self):
|
||||
# make sure all the object data is cleaned up
|
||||
for dev in os.listdir(_testdir):
|
||||
if not self.device_pattern.match(dev):
|
||||
continue
|
||||
device_path = os.path.join(_testdir, dev)
|
||||
for datadir in os.listdir(device_path):
|
||||
if 'object' not in datadir:
|
||||
continue
|
||||
data_path = os.path.join(device_path, datadir)
|
||||
rmtree(data_path, ignore_errors=True)
|
||||
mkdirs(data_path)
|
||||
|
||||
def setUp(self):
|
||||
debug.hub_exceptions(False)
|
||||
self._cleanup_devices()
|
||||
|
||||
def tearDown(self):
|
||||
debug.hub_exceptions(True)
|
||||
self._cleanup_devices()
|
||||
|
||||
def _check_disconnect_cleans_up(self, policy_name, is_chunked=False):
|
||||
proxy_port = _test_sockets[0].getsockname()[1]
|
||||
|
||||
def put(path, headers=None, body=None):
|
||||
conn = httplib.HTTPConnection('localhost', proxy_port)
|
||||
try:
|
||||
conn.connect()
|
||||
conn.putrequest('PUT', path)
|
||||
for k, v in (headers or {}).items():
|
||||
conn.putheader(k, v)
|
||||
conn.endheaders()
|
||||
body = body or ['']
|
||||
for chunk in body:
|
||||
if is_chunked:
|
||||
chunk = '%x\r\n%s\r\n' % (len(chunk), chunk)
|
||||
conn.send(chunk)
|
||||
resp = conn.getresponse()
|
||||
body = resp.read()
|
||||
finally:
|
||||
# seriously - shut this mother down
|
||||
if conn.sock:
|
||||
conn.sock.fd._sock.close()
|
||||
return resp, body
|
||||
|
||||
# ensure container
|
||||
container_path = '/v1/a/%s-disconnect-test' % policy_name
|
||||
resp, _body = put(container_path, headers={
|
||||
'Connection': 'close',
|
||||
'X-Storage-Policy': policy_name,
|
||||
'Content-Length': '0',
|
||||
})
|
||||
self.assertIn(resp.status, (201, 202))
|
||||
|
||||
def exploding_body():
|
||||
for i in range(3):
|
||||
yield '\x00' * (64 * 2 ** 10)
|
||||
raise Exception('kaboom!')
|
||||
|
||||
headers = {}
|
||||
if is_chunked:
|
||||
headers['Transfer-Encoding'] = 'chunked'
|
||||
else:
|
||||
headers['Content-Length'] = 64 * 2 ** 20
|
||||
|
||||
obj_path = container_path + '/disconnect-data'
|
||||
try:
|
||||
resp, _body = put(obj_path, headers=headers,
|
||||
body=exploding_body())
|
||||
except Exception as e:
|
||||
if str(e) != 'kaboom!':
|
||||
raise
|
||||
else:
|
||||
self.fail('obj put connection did not ka-splod')
|
||||
|
||||
sleep(0.1)
|
||||
|
||||
def find_files(self):
|
||||
found_files = defaultdict(list)
|
||||
for root, dirs, files in os.walk(_testdir):
|
||||
for fname in files:
|
||||
filename, ext = os.path.splitext(fname)
|
||||
found_files[ext].append(os.path.join(root, fname))
|
||||
return found_files
|
||||
|
||||
def test_repl_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('zero')
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
def test_ec_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('ec')
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
def test_repl_chunked_transfer_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('zero', is_chunked=True)
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
def test_ec_chunked_transfer_disconnect_cleans_up(self):
|
||||
self._check_disconnect_cleans_up('ec', is_chunked=True)
|
||||
found_files = self.find_files()
|
||||
self.assertEqual(found_files['.durable'], [])
|
||||
self.assertEqual(found_files['.data'], [])
|
||||
|
||||
|
||||
class TestObjectECRangedGET(unittest.TestCase):
|
||||
def setUp(self):
|
||||
_test_servers[0].logger._clear()
|
||||
|
Loading…
Reference in New Issue
Block a user