diff --git a/swift/common/middleware/dlo.py b/swift/common/middleware/dlo.py index d2761acb..9330ccb8 100644 --- a/swift/common/middleware/dlo.py +++ b/swift/common/middleware/dlo.py @@ -22,7 +22,8 @@ from swift.common.http import is_success from swift.common.swob import Request, Response, \ HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict from swift.common.utils import get_logger, json, \ - RateLimitedIterator, read_conf_dir, quote + RateLimitedIterator, read_conf_dir, quote, close_if_possible, \ + closing_if_possible from swift.common.request_helpers import SegmentedIterable from swift.common.wsgi import WSGIContext, make_subrequest from urllib import unquote @@ -48,7 +49,8 @@ class GetContext(WSGIContext): con_resp = con_req.get_response(self.dlo.app) if not is_success(con_resp.status_int): return con_resp, None - return None, json.loads(''.join(con_resp.app_iter)) + with closing_if_possible(con_resp.app_iter): + return None, json.loads(''.join(con_resp.app_iter)) def _segment_listing_iterator(self, req, version, account, container, prefix, segments, first_byte=None, @@ -107,6 +109,7 @@ class GetContext(WSGIContext): # we've already started sending the response body to the # client, so all we can do is raise an exception to make the # WSGI server close the connection early + close_if_possible(error_response.app_iter) raise ListingIterError( "Got status %d listing container /%s/%s" % (error_response.status_int, account, container)) @@ -233,6 +236,7 @@ class GetContext(WSGIContext): # make sure this response is for a dynamic large object manifest for header, value in self._response_headers: if (header.lower() == 'x-object-manifest'): + close_if_possible(resp_iter) response = self.get_or_head_response(req, value) return response(req.environ, start_response) else: diff --git a/swift/common/middleware/slo.py b/swift/common/middleware/slo.py index 241210d6..4fce4f9d 100644 --- a/swift/common/middleware/slo.py +++ b/swift/common/middleware/slo.py @@ -159,9 +159,9 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \ Response from swift.common.utils import json, get_logger, config_true_value, \ get_valid_utf8_str, override_bytes_from_content_type, split_path, \ - register_swift_info, RateLimitedIterator, quote -from swift.common.request_helpers import SegmentedIterable, \ - closing_if_possible, close_if_possible + register_swift_info, RateLimitedIterator, quote, close_if_possible, \ + closing_if_possible +from swift.common.request_helpers import SegmentedIterable from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success from swift.common.wsgi import WSGIContext, make_subrequest @@ -239,6 +239,7 @@ class SloGetContext(WSGIContext): sub_resp = sub_req.get_response(self.slo.app) if not is_success(sub_resp.status_int): + close_if_possible(sub_resp.app_iter) raise ListingIterError( 'ERROR: while fetching %s, GET of submanifest %s ' 'failed with status %d' % (req.path, sub_req.path, @@ -412,7 +413,8 @@ class SloGetContext(WSGIContext): return response(req.environ, start_response) def get_or_head_response(self, req, resp_headers, resp_iter): - resp_body = ''.join(resp_iter) + with closing_if_possible(resp_iter): + resp_body = ''.join(resp_iter) try: segments = json.loads(resp_body) except ValueError: diff --git a/swift/common/request_helpers.py b/swift/common/request_helpers.py index c9da1cb7..c7d551c3 100644 --- a/swift/common/request_helpers.py +++ b/swift/common/request_helpers.py @@ -23,7 +23,6 @@ from swob in here without creating circular imports. import hashlib import itertools import time -from contextlib import contextmanager from urllib import unquote from swift import gettext_ as _ from swift.common.storage_policy import POLICIES @@ -32,7 +31,8 @@ from swift.common.exceptions import ListingIterError, SegmentError from swift.common.http import is_success from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable, HTTPServiceUnavailable) -from swift.common.utils import split_path, validate_device_partition +from swift.common.utils import split_path, validate_device_partition, \ + close_if_possible from swift.common.wsgi import make_subrequest @@ -249,26 +249,6 @@ def copy_header_subset(from_r, to_r, condition): to_r.headers[k] = v -def close_if_possible(maybe_closable): - close_method = getattr(maybe_closable, 'close', None) - if callable(close_method): - return close_method() - - -@contextmanager -def closing_if_possible(maybe_closable): - """ - Like contextlib.closing(), but doesn't crash if the object lacks a close() - method. - - PEP 333 (WSGI) says: "If the iterable returned by the application has a - close() method, the server or gateway must call that method upon - completion of the current request[.]" This function makes that easier. - """ - yield maybe_closable - close_if_possible(maybe_closable) - - class SegmentedIterable(object): """ Iterable that returns the object contents for a large object. @@ -304,6 +284,7 @@ class SegmentedIterable(object): self.peeked_chunk = None self.app_iter = self._internal_iter() self.validated_first_segment = False + self.current_resp = None def _internal_iter(self): start_time = time.time() @@ -360,6 +341,8 @@ class SegmentedIterable(object): 'r_size': seg_resp.content_length, 's_etag': seg_etag, 's_size': seg_size}) + else: + self.current_resp = seg_resp seg_hash = hashlib.md5() for chunk in seg_resp.app_iter: @@ -431,3 +414,11 @@ class SegmentedIterable(object): return itertools.chain([pc], self.app_iter) else: return self.app_iter + + def close(self): + """ + Called when the client disconnect. Ensure that the connection to the + backend server is closed. + """ + if self.current_resp: + close_if_possible(self.current_resp.app_iter) diff --git a/swift/common/swob.py b/swift/common/swob.py index 39f0c0e3..b35be684 100644 --- a/swift/common/swob.py +++ b/swift/common/swob.py @@ -49,7 +49,8 @@ import random import functools import inspect -from swift.common.utils import reiterate, split_path, Timestamp, pairs +from swift.common.utils import reiterate, split_path, Timestamp, pairs, \ + close_if_possible from swift.common.exceptions import InvalidTimestamp @@ -1220,12 +1221,14 @@ class Response(object): etag in self.request.if_none_match: self.status = 304 self.content_length = 0 + close_if_possible(app_iter) return [''] if etag and self.request.if_match and \ etag not in self.request.if_match: self.status = 412 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.status_int == 404 and self.request.if_match \ @@ -1236,18 +1239,21 @@ class Response(object): # Failed) response. [RFC 2616 section 14.24] self.status = 412 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.last_modified and self.request.if_modified_since \ and self.last_modified <= self.request.if_modified_since: self.status = 304 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.last_modified and self.request.if_unmodified_since \ and self.last_modified > self.request.if_unmodified_since: self.status = 412 self.content_length = 0 + close_if_possible(app_iter) return [''] if self.request and self.request.method == 'HEAD': @@ -1261,6 +1267,7 @@ class Response(object): if ranges == []: self.status = 416 self.content_length = 0 + close_if_possible(app_iter) return [''] elif ranges: range_size = len(ranges) diff --git a/swift/common/utils.py b/swift/common/utils.py index 63919af1..e2e94b2e 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3161,6 +3161,28 @@ def ismount_raw(path): return False +def close_if_possible(maybe_closable): + close_method = getattr(maybe_closable, 'close', None) + if callable(close_method): + return close_method() + + +@contextmanager +def closing_if_possible(maybe_closable): + """ + Like contextlib.closing(), but doesn't crash if the object lacks a close() + method. + + PEP 333 (WSGI) says: "If the iterable returned by the application has a + close() method, the server or gateway must call that method upon + completion of the current request[.]" This function makes that easier. + """ + try: + yield maybe_closable + finally: + close_if_possible(maybe_closable) + + _rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+' _rfc_extension_pattern = re.compile( r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token + diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 10e83bca..609f21b5 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -44,7 +44,7 @@ from swift.common.utils import ( GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp, normalize_delete_at_timestamp, public, get_expirer_container, document_iters_to_http_response_body, parse_content_range, - quorum_size, reiterate) + quorum_size, reiterate, close_if_possible) from swift.common.bufferedhttp import http_connect from swift.common.constraints import check_metadata, check_object_creation, \ check_copy_from_header, check_destination_header, \ @@ -70,7 +70,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \ HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \ HTTPRequestedRangeNotSatisfiable, Range from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \ - remove_items, copy_header_subset, close_if_possible + remove_items, copy_header_subset def copy_headers_into(from_r, to_r): diff --git a/test/unit/common/middleware/helpers.py b/test/unit/common/middleware/helpers.py index 68a4bfee..7c1b4557 100644 --- a/test/unit/common/middleware/helpers.py +++ b/test/unit/common/middleware/helpers.py @@ -15,6 +15,7 @@ # This stuff can't live in test/unit/__init__.py due to its swob dependency. +from collections import defaultdict from copy import deepcopy from hashlib import md5 from swift.common import swob @@ -23,6 +24,20 @@ from swift.common.utils import split_path from test.unit import FakeLogger, FakeRing +class LeakTrackingIter(object): + def __init__(self, inner_iter, fake_swift, path): + self.inner_iter = inner_iter + self.fake_swift = fake_swift + self.path = path + + def __iter__(self): + for x in self.inner_iter: + yield x + + def close(self): + self.fake_swift.mark_closed(self.path) + + class FakeSwift(object): """ A good-enough fake Swift proxy server to use in testing middleware. @@ -30,6 +45,7 @@ class FakeSwift(object): def __init__(self): self._calls = [] + self._unclosed_req_paths = defaultdict(int) self.req_method_paths = [] self.swift_sources = [] self.uploaded = {} @@ -105,7 +121,21 @@ class FakeSwift(object): req = swob.Request(env) resp = resp_class(req=req, headers=headers, body=body, conditional_response=True) - return resp(env, start_response) + wsgi_iter = resp(env, start_response) + self.mark_opened(path) + return LeakTrackingIter(wsgi_iter, self, path) + + def mark_opened(self, path): + self._unclosed_req_paths[path] += 1 + + def mark_closed(self, path): + self._unclosed_req_paths[path] -= 1 + + @property + def unclosed_requests(self): + return {path: count + for path, count in self._unclosed_req_paths.items() + if count > 0} @property def calls(self): diff --git a/test/unit/common/middleware/test_dlo.py b/test/unit/common/middleware/test_dlo.py index 16237eb1..119e4aba 100644 --- a/test/unit/common/middleware/test_dlo.py +++ b/test/unit/common/middleware/test_dlo.py @@ -26,6 +26,7 @@ import unittest from swift.common import exceptions, swob from swift.common.middleware import dlo +from swift.common.utils import closing_if_possible from test.unit.common.middleware.helpers import FakeSwift @@ -54,8 +55,10 @@ class DloTestCase(unittest.TestCase): body = '' caught_exc = None try: - for chunk in body_iter: - body += chunk + # appease the close-checker + with closing_if_possible(body_iter): + for chunk in body_iter: + body += chunk except Exception as exc: if expect_exception: caught_exc = exc @@ -279,6 +282,9 @@ class TestDloHeadManifest(DloTestCase): class TestDloGetManifest(DloTestCase): + def tearDown(self): + self.assertEqual(self.app.unclosed_requests, {}) + def test_get_manifest(self): expected_etag = '"%s"' % md5hex( md5hex("aaaaa") + md5hex("bbbbb") + md5hex("ccccc") + diff --git a/test/unit/common/middleware/test_slo.py b/test/unit/common/middleware/test_slo.py index 86a11734..d5129da4 100644 --- a/test/unit/common/middleware/test_slo.py +++ b/test/unit/common/middleware/test_slo.py @@ -24,7 +24,7 @@ from swift.common import swob, utils from swift.common.exceptions import ListingIterError, SegmentError from swift.common.middleware import slo from swift.common.swob import Request, Response, HTTPException -from swift.common.utils import quote, json +from swift.common.utils import quote, json, closing_if_possible from test.unit.common.middleware.helpers import FakeSwift @@ -74,8 +74,10 @@ class SloTestCase(unittest.TestCase): body = '' caught_exc = None try: - for chunk in body_iter: - body += chunk + # appease the close-checker + with closing_if_possible(body_iter): + for chunk in body_iter: + body += chunk except Exception as exc: if expect_exception: caught_exc = exc @@ -232,7 +234,7 @@ class TestSloPutManifest(SloTestCase): '/?multipart-manifest=put', environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data) self.assertEquals( - self.slo.handle_multipart_put(req, fake_start_response), + list(self.slo.handle_multipart_put(req, fake_start_response)), ['passed']) def test_handle_multipart_put_success(self): @@ -949,6 +951,9 @@ class TestSloGetManifest(SloTestCase): 'X-Object-Meta-Fish': 'Bass'}, "[not {json (at ++++all") + def tearDown(self): + self.assertEqual(self.app.unclosed_requests, {}) + def test_get_manifest_passthrough(self): req = Request.blank( '/v1/AUTH_test/gettest/manifest-bc?multipart-manifest=get',