diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 61ff11763a..a91ce2aa72 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -53,7 +53,7 @@ from swift.common import constraints from swift.common.exceptions import ChunkReadTimeout, \ ChunkWriteTimeout, ConnectionTimeout, ResponseTimeout, \ InsufficientStorage, FooterNotSupported, MultiphasePUTNotSupported, \ - PutterConnectError + PutterConnectError, ChunkReadError from swift.common.http import ( is_success, is_server_error, HTTP_CONTINUE, HTTP_CREATED, HTTP_MULTIPLE_CHOICES, HTTP_INTERNAL_SERVER_ERROR, @@ -721,8 +721,13 @@ class BaseObjectController(Controller): if error_response: return error_response else: - reader = req.environ['wsgi.input'].read - data_source = iter(lambda: reader(self.app.client_chunk_size), '') + def reader(): + try: + return req.environ['wsgi.input'].read( + self.app.client_chunk_size) + except (ValueError, IOError) as e: + raise ChunkReadError(str(e)) + data_source = iter(reader, '') update_response = lambda req, resp: resp # check if object is set to be automatically deleted (i.e. expired) @@ -962,6 +967,12 @@ class ReplicatedObjectController(BaseObjectController): raise HTTPRequestTimeout(request=req) except HTTPException: raise + except ChunkReadError: + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending last chunk')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) except (Exception, Timeout): self.app.logger.exception( _('ERROR Exception causing client disconnect')) @@ -2162,24 +2173,6 @@ class ECObjectController(BaseObjectController): try: chunk = next(data_source) except StopIteration: - computed_etag = (etag_hasher.hexdigest() - if etag_hasher else None) - received_etag = req.headers.get( - 'etag', '').strip('"') - if (computed_etag and received_etag and - computed_etag != received_etag): - raise HTTPUnprocessableEntity(request=req) - - send_chunk('') # flush out any buffered data - - for putter in putters: - trail_md = trailing_metadata( - policy, etag_hasher, - bytes_transferred, - chunk_index[putter]) - trail_md['Etag'] = \ - putter.chunk_hasher.hexdigest() - putter.end_of_object_data(trail_md) break bytes_transferred += len(chunk) if bytes_transferred > constraints.MAX_FILE_SIZE: @@ -2187,6 +2180,33 @@ class ECObjectController(BaseObjectController): send_chunk(chunk) + if req.content_length and ( + bytes_transferred < req.content_length): + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending enough data')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) + + computed_etag = (etag_hasher.hexdigest() + if etag_hasher else None) + received_etag = req.headers.get( + 'etag', '').strip('"') + if (computed_etag and received_etag and + computed_etag != received_etag): + raise HTTPUnprocessableEntity(request=req) + + send_chunk('') # flush out any buffered data + + for putter in putters: + trail_md = trailing_metadata( + policy, etag_hasher, + bytes_transferred, + chunk_index[putter]) + trail_md['Etag'] = \ + putter.chunk_hasher.hexdigest() + putter.end_of_object_data(trail_md) + for putter in putters: putter.wait() @@ -2219,18 +2239,18 @@ class ECObjectController(BaseObjectController): _('ERROR Client read timeout (%ss)'), err.seconds) self.app.logger.increment('client_timeouts') raise HTTPRequestTimeout(request=req) + except ChunkReadError: + req.client_disconnect = True + self.app.logger.warn( + _('Client disconnected without sending last chunk')) + self.app.logger.increment('client_disconnects') + raise HTTPClientDisconnect(request=req) except HTTPException: raise except (Exception, Timeout): self.app.logger.exception( _('ERROR Exception causing client disconnect')) raise HTTPClientDisconnect(request=req) - if req.content_length and bytes_transferred < req.content_length: - req.client_disconnect = True - self.app.logger.warn( - _('Client disconnected without sending enough data')) - self.app.logger.increment('client_disconnects') - raise HTTPClientDisconnect(request=req) def _have_adequate_successes(self, statuses, min_responses): """ diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 4081f3a025..68518c2681 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -39,9 +39,11 @@ import functools from swift.obj import diskfile import re import random +from collections import defaultdict import mock -from eventlet import sleep, spawn, wsgi, listen, Timeout +from eventlet import sleep, spawn, wsgi, listen, Timeout, debug +from eventlet.green import httplib from six import BytesIO from six import StringIO from six.moves import range @@ -6072,6 +6074,119 @@ class TestECMismatchedFA(unittest.TestCase): self.assertEqual(resp.status_int, 503) +class TestObjectDisconnectCleanup(unittest.TestCase): + + # update this if you need to make more different devices in do_setup + device_pattern = re.compile('sd[a-z][0-9]') + + def _cleanup_devices(self): + # make sure all the object data is cleaned up + for dev in os.listdir(_testdir): + if not self.device_pattern.match(dev): + continue + device_path = os.path.join(_testdir, dev) + for datadir in os.listdir(device_path): + if 'object' not in datadir: + continue + data_path = os.path.join(device_path, datadir) + rmtree(data_path, ignore_errors=True) + mkdirs(data_path) + + def setUp(self): + debug.hub_exceptions(False) + self._cleanup_devices() + + def tearDown(self): + debug.hub_exceptions(True) + self._cleanup_devices() + + def _check_disconnect_cleans_up(self, policy_name, is_chunked=False): + proxy_port = _test_sockets[0].getsockname()[1] + + def put(path, headers=None, body=None): + conn = httplib.HTTPConnection('localhost', proxy_port) + try: + conn.connect() + conn.putrequest('PUT', path) + for k, v in (headers or {}).items(): + conn.putheader(k, v) + conn.endheaders() + body = body or [''] + for chunk in body: + if is_chunked: + chunk = '%x\r\n%s\r\n' % (len(chunk), chunk) + conn.send(chunk) + resp = conn.getresponse() + body = resp.read() + finally: + # seriously - shut this mother down + if conn.sock: + conn.sock.fd._sock.close() + return resp, body + + # ensure container + container_path = '/v1/a/%s-disconnect-test' % policy_name + resp, _body = put(container_path, headers={ + 'Connection': 'close', + 'X-Storage-Policy': policy_name, + 'Content-Length': '0', + }) + self.assertIn(resp.status, (201, 202)) + + def exploding_body(): + for i in range(3): + yield '\x00' * (64 * 2 ** 10) + raise Exception('kaboom!') + + headers = {} + if is_chunked: + headers['Transfer-Encoding'] = 'chunked' + else: + headers['Content-Length'] = 64 * 2 ** 20 + + obj_path = container_path + '/disconnect-data' + try: + resp, _body = put(obj_path, headers=headers, + body=exploding_body()) + except Exception as e: + if str(e) != 'kaboom!': + raise + else: + self.fail('obj put connection did not ka-splod') + + sleep(0.1) + + def find_files(self): + found_files = defaultdict(list) + for root, dirs, files in os.walk(_testdir): + for fname in files: + filename, ext = os.path.splitext(fname) + found_files[ext].append(os.path.join(root, fname)) + return found_files + + def test_repl_disconnect_cleans_up(self): + self._check_disconnect_cleans_up('zero') + found_files = self.find_files() + self.assertEqual(found_files['.data'], []) + + def test_ec_disconnect_cleans_up(self): + self._check_disconnect_cleans_up('ec') + found_files = self.find_files() + self.assertEqual(found_files['.durable'], []) + self.assertEqual(found_files['.data'], []) + + def test_repl_chunked_transfer_disconnect_cleans_up(self): + self._check_disconnect_cleans_up('zero', is_chunked=True) + found_files = self.find_files() + self.assertEqual(found_files['.data'], []) + + def test_ec_chunked_transfer_disconnect_cleans_up(self): + self._check_disconnect_cleans_up('ec', is_chunked=True) + found_files = self.find_files() + self.assertEqual(found_files['.durable'], []) + self.assertEqual(found_files['.data'], []) + + class TestObjectECRangedGET(unittest.TestCase): def setUp(self): _test_servers[0].logger._clear()