Get better at closing WSGI iterables.
PEP 333 (WSGI) says: "If the iterable returned by the application has a close() method, the server or gateway must call that method upon completion of the current request[.]" There's a bunch of places where we weren't doing that; some of them matter more than others. Calling .close() can prevent a connection leak in some cases. In others, it just provides a certain pedantic smugness. Either way, we should do what WSGI requires. Noteworthy goofs include: * If a client is downloading a large object and disconnects halfway through, a proxy -> obj connection may be leaked. In this case, the WSGI iterable is a SegmentedIterable, which lacked a close() method. Thus, when the WSGI server noticed the client disconnect, it had no way of telling the SegmentedIterable about it, and so the underlying iterable for the segment's data didn't get closed. Here, it seems likely (though unproven) that the object server would time out and kill the connection, or that a ChunkWriteTimeout would fire down in the proxy server, so the leaked connection would eventually go away. However, a flurry of client disconnects could leave a big pile of useless connections. * If a conditional request receives a 304 or 412, the underlying app_iter is not closed. This mostly affects conditional requests for large objects. The leaked connections were noticed by this patch's co-author, who made the changes to SegmentedIterable. Those changes helped, but did not completely fix, the issue. The rest of the patch is an attempt to plug the rest of the holes. Co-Authored-By: Romain LE DISEZ <romain.ledisez@ovh.net> Change-Id: I168e147aae7c1728e7e3fdabb7fba6f2d747d937 Closes-Bug: #1466549
This commit is contained in:
parent
65fce49b3b
commit
12d8a53fff
@ -22,7 +22,8 @@ from swift.common.http import is_success
|
||||
from swift.common.swob import Request, Response, \
|
||||
HTTPRequestedRangeNotSatisfiable, HTTPBadRequest, HTTPConflict
|
||||
from swift.common.utils import get_logger, json, \
|
||||
RateLimitedIterator, read_conf_dir, quote
|
||||
RateLimitedIterator, read_conf_dir, quote, close_if_possible, \
|
||||
closing_if_possible
|
||||
from swift.common.request_helpers import SegmentedIterable
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest
|
||||
from urllib import unquote
|
||||
@ -48,7 +49,8 @@ class GetContext(WSGIContext):
|
||||
con_resp = con_req.get_response(self.dlo.app)
|
||||
if not is_success(con_resp.status_int):
|
||||
return con_resp, None
|
||||
return None, json.loads(''.join(con_resp.app_iter))
|
||||
with closing_if_possible(con_resp.app_iter):
|
||||
return None, json.loads(''.join(con_resp.app_iter))
|
||||
|
||||
def _segment_listing_iterator(self, req, version, account, container,
|
||||
prefix, segments, first_byte=None,
|
||||
@ -107,6 +109,7 @@ class GetContext(WSGIContext):
|
||||
# we've already started sending the response body to the
|
||||
# client, so all we can do is raise an exception to make the
|
||||
# WSGI server close the connection early
|
||||
close_if_possible(error_response.app_iter)
|
||||
raise ListingIterError(
|
||||
"Got status %d listing container /%s/%s" %
|
||||
(error_response.status_int, account, container))
|
||||
@ -233,6 +236,7 @@ class GetContext(WSGIContext):
|
||||
# make sure this response is for a dynamic large object manifest
|
||||
for header, value in self._response_headers:
|
||||
if (header.lower() == 'x-object-manifest'):
|
||||
close_if_possible(resp_iter)
|
||||
response = self.get_or_head_response(req, value)
|
||||
return response(req.environ, start_response)
|
||||
else:
|
||||
|
@ -159,9 +159,9 @@ from swift.common.swob import Request, HTTPBadRequest, HTTPServerError, \
|
||||
Response
|
||||
from swift.common.utils import json, get_logger, config_true_value, \
|
||||
get_valid_utf8_str, override_bytes_from_content_type, split_path, \
|
||||
register_swift_info, RateLimitedIterator, quote
|
||||
from swift.common.request_helpers import SegmentedIterable, \
|
||||
closing_if_possible, close_if_possible
|
||||
register_swift_info, RateLimitedIterator, quote, close_if_possible, \
|
||||
closing_if_possible
|
||||
from swift.common.request_helpers import SegmentedIterable
|
||||
from swift.common.constraints import check_utf8, MAX_BUFFERED_SLO_SEGMENTS
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_UNAUTHORIZED, is_success
|
||||
from swift.common.wsgi import WSGIContext, make_subrequest
|
||||
@ -239,6 +239,7 @@ class SloGetContext(WSGIContext):
|
||||
sub_resp = sub_req.get_response(self.slo.app)
|
||||
|
||||
if not is_success(sub_resp.status_int):
|
||||
close_if_possible(sub_resp.app_iter)
|
||||
raise ListingIterError(
|
||||
'ERROR: while fetching %s, GET of submanifest %s '
|
||||
'failed with status %d' % (req.path, sub_req.path,
|
||||
@ -412,7 +413,8 @@ class SloGetContext(WSGIContext):
|
||||
return response(req.environ, start_response)
|
||||
|
||||
def get_or_head_response(self, req, resp_headers, resp_iter):
|
||||
resp_body = ''.join(resp_iter)
|
||||
with closing_if_possible(resp_iter):
|
||||
resp_body = ''.join(resp_iter)
|
||||
try:
|
||||
segments = json.loads(resp_body)
|
||||
except ValueError:
|
||||
|
@ -23,7 +23,6 @@ from swob in here without creating circular imports.
|
||||
import hashlib
|
||||
import itertools
|
||||
import time
|
||||
from contextlib import contextmanager
|
||||
from urllib import unquote
|
||||
from swift import gettext_ as _
|
||||
from swift.common.storage_policy import POLICIES
|
||||
@ -32,7 +31,8 @@ from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.http import is_success
|
||||
from swift.common.swob import (HTTPBadRequest, HTTPNotAcceptable,
|
||||
HTTPServiceUnavailable)
|
||||
from swift.common.utils import split_path, validate_device_partition
|
||||
from swift.common.utils import split_path, validate_device_partition, \
|
||||
close_if_possible
|
||||
from swift.common.wsgi import make_subrequest
|
||||
|
||||
|
||||
@ -249,26 +249,6 @@ def copy_header_subset(from_r, to_r, condition):
|
||||
to_r.headers[k] = v
|
||||
|
||||
|
||||
def close_if_possible(maybe_closable):
|
||||
close_method = getattr(maybe_closable, 'close', None)
|
||||
if callable(close_method):
|
||||
return close_method()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def closing_if_possible(maybe_closable):
|
||||
"""
|
||||
Like contextlib.closing(), but doesn't crash if the object lacks a close()
|
||||
method.
|
||||
|
||||
PEP 333 (WSGI) says: "If the iterable returned by the application has a
|
||||
close() method, the server or gateway must call that method upon
|
||||
completion of the current request[.]" This function makes that easier.
|
||||
"""
|
||||
yield maybe_closable
|
||||
close_if_possible(maybe_closable)
|
||||
|
||||
|
||||
class SegmentedIterable(object):
|
||||
"""
|
||||
Iterable that returns the object contents for a large object.
|
||||
@ -304,6 +284,7 @@ class SegmentedIterable(object):
|
||||
self.peeked_chunk = None
|
||||
self.app_iter = self._internal_iter()
|
||||
self.validated_first_segment = False
|
||||
self.current_resp = None
|
||||
|
||||
def _internal_iter(self):
|
||||
start_time = time.time()
|
||||
@ -360,6 +341,8 @@ class SegmentedIterable(object):
|
||||
'r_size': seg_resp.content_length,
|
||||
's_etag': seg_etag,
|
||||
's_size': seg_size})
|
||||
else:
|
||||
self.current_resp = seg_resp
|
||||
|
||||
seg_hash = hashlib.md5()
|
||||
for chunk in seg_resp.app_iter:
|
||||
@ -431,3 +414,11 @@ class SegmentedIterable(object):
|
||||
return itertools.chain([pc], self.app_iter)
|
||||
else:
|
||||
return self.app_iter
|
||||
|
||||
def close(self):
|
||||
"""
|
||||
Called when the client disconnect. Ensure that the connection to the
|
||||
backend server is closed.
|
||||
"""
|
||||
if self.current_resp:
|
||||
close_if_possible(self.current_resp.app_iter)
|
||||
|
@ -49,7 +49,8 @@ import random
|
||||
import functools
|
||||
import inspect
|
||||
|
||||
from swift.common.utils import reiterate, split_path, Timestamp, pairs
|
||||
from swift.common.utils import reiterate, split_path, Timestamp, pairs, \
|
||||
close_if_possible
|
||||
from swift.common.exceptions import InvalidTimestamp
|
||||
|
||||
|
||||
@ -1220,12 +1221,14 @@ class Response(object):
|
||||
etag in self.request.if_none_match:
|
||||
self.status = 304
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if etag and self.request.if_match and \
|
||||
etag not in self.request.if_match:
|
||||
self.status = 412
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.status_int == 404 and self.request.if_match \
|
||||
@ -1236,18 +1239,21 @@ class Response(object):
|
||||
# Failed) response. [RFC 2616 section 14.24]
|
||||
self.status = 412
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.last_modified and self.request.if_modified_since \
|
||||
and self.last_modified <= self.request.if_modified_since:
|
||||
self.status = 304
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.last_modified and self.request.if_unmodified_since \
|
||||
and self.last_modified > self.request.if_unmodified_since:
|
||||
self.status = 412
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
|
||||
if self.request and self.request.method == 'HEAD':
|
||||
@ -1261,6 +1267,7 @@ class Response(object):
|
||||
if ranges == []:
|
||||
self.status = 416
|
||||
self.content_length = 0
|
||||
close_if_possible(app_iter)
|
||||
return ['']
|
||||
elif ranges:
|
||||
range_size = len(ranges)
|
||||
|
@ -3144,6 +3144,28 @@ def ismount_raw(path):
|
||||
return False
|
||||
|
||||
|
||||
def close_if_possible(maybe_closable):
|
||||
close_method = getattr(maybe_closable, 'close', None)
|
||||
if callable(close_method):
|
||||
return close_method()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def closing_if_possible(maybe_closable):
|
||||
"""
|
||||
Like contextlib.closing(), but doesn't crash if the object lacks a close()
|
||||
method.
|
||||
|
||||
PEP 333 (WSGI) says: "If the iterable returned by the application has a
|
||||
close() method, the server or gateway must call that method upon
|
||||
completion of the current request[.]" This function makes that easier.
|
||||
"""
|
||||
try:
|
||||
yield maybe_closable
|
||||
finally:
|
||||
close_if_possible(maybe_closable)
|
||||
|
||||
|
||||
_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'
|
||||
_rfc_extension_pattern = re.compile(
|
||||
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +
|
||||
|
@ -44,7 +44,7 @@ from swift.common.utils import (
|
||||
GreenAsyncPile, GreenthreadSafeIterator, json, Timestamp,
|
||||
normalize_delete_at_timestamp, public, get_expirer_container,
|
||||
document_iters_to_http_response_body, parse_content_range,
|
||||
quorum_size, reiterate)
|
||||
quorum_size, reiterate, close_if_possible)
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.constraints import check_metadata, check_object_creation, \
|
||||
check_copy_from_header, check_destination_header, \
|
||||
@ -70,7 +70,7 @@ from swift.common.swob import HTTPAccepted, HTTPBadRequest, HTTPNotFound, \
|
||||
HTTPClientDisconnect, HTTPUnprocessableEntity, Response, HTTPException, \
|
||||
HTTPRequestedRangeNotSatisfiable, Range
|
||||
from swift.common.request_helpers import is_sys_or_user_meta, is_sys_meta, \
|
||||
remove_items, copy_header_subset, close_if_possible
|
||||
remove_items, copy_header_subset
|
||||
|
||||
|
||||
def copy_headers_into(from_r, to_r):
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
# This stuff can't live in test/unit/__init__.py due to its swob dependency.
|
||||
|
||||
from collections import defaultdict
|
||||
from copy import deepcopy
|
||||
from hashlib import md5
|
||||
from swift.common import swob
|
||||
@ -23,6 +24,20 @@ from swift.common.utils import split_path
|
||||
from test.unit import FakeLogger, FakeRing
|
||||
|
||||
|
||||
class LeakTrackingIter(object):
|
||||
def __init__(self, inner_iter, fake_swift, path):
|
||||
self.inner_iter = inner_iter
|
||||
self.fake_swift = fake_swift
|
||||
self.path = path
|
||||
|
||||
def __iter__(self):
|
||||
for x in self.inner_iter:
|
||||
yield x
|
||||
|
||||
def close(self):
|
||||
self.fake_swift.mark_closed(self.path)
|
||||
|
||||
|
||||
class FakeSwift(object):
|
||||
"""
|
||||
A good-enough fake Swift proxy server to use in testing middleware.
|
||||
@ -30,6 +45,7 @@ class FakeSwift(object):
|
||||
|
||||
def __init__(self):
|
||||
self._calls = []
|
||||
self._unclosed_req_paths = defaultdict(int)
|
||||
self.req_method_paths = []
|
||||
self.swift_sources = []
|
||||
self.uploaded = {}
|
||||
@ -105,7 +121,21 @@ class FakeSwift(object):
|
||||
req = swob.Request(env)
|
||||
resp = resp_class(req=req, headers=headers, body=body,
|
||||
conditional_response=True)
|
||||
return resp(env, start_response)
|
||||
wsgi_iter = resp(env, start_response)
|
||||
self.mark_opened(path)
|
||||
return LeakTrackingIter(wsgi_iter, self, path)
|
||||
|
||||
def mark_opened(self, path):
|
||||
self._unclosed_req_paths[path] += 1
|
||||
|
||||
def mark_closed(self, path):
|
||||
self._unclosed_req_paths[path] -= 1
|
||||
|
||||
@property
|
||||
def unclosed_requests(self):
|
||||
return {path: count
|
||||
for path, count in self._unclosed_req_paths.items()
|
||||
if count > 0}
|
||||
|
||||
@property
|
||||
def calls(self):
|
||||
|
@ -26,6 +26,7 @@ import unittest
|
||||
|
||||
from swift.common import exceptions, swob
|
||||
from swift.common.middleware import dlo
|
||||
from swift.common.utils import closing_if_possible
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
@ -54,8 +55,10 @@ class DloTestCase(unittest.TestCase):
|
||||
body = ''
|
||||
caught_exc = None
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
# appease the close-checker
|
||||
with closing_if_possible(body_iter):
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
except Exception as exc:
|
||||
if expect_exception:
|
||||
caught_exc = exc
|
||||
@ -279,6 +282,9 @@ class TestDloHeadManifest(DloTestCase):
|
||||
|
||||
|
||||
class TestDloGetManifest(DloTestCase):
|
||||
def tearDown(self):
|
||||
self.assertEqual(self.app.unclosed_requests, {})
|
||||
|
||||
def test_get_manifest(self):
|
||||
expected_etag = '"%s"' % md5hex(
|
||||
md5hex("aaaaa") + md5hex("bbbbb") + md5hex("ccccc") +
|
||||
|
@ -24,7 +24,7 @@ from swift.common import swob, utils
|
||||
from swift.common.exceptions import ListingIterError, SegmentError
|
||||
from swift.common.middleware import slo
|
||||
from swift.common.swob import Request, Response, HTTPException
|
||||
from swift.common.utils import quote, json
|
||||
from swift.common.utils import quote, json, closing_if_possible
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
|
||||
|
||||
@ -74,8 +74,10 @@ class SloTestCase(unittest.TestCase):
|
||||
body = ''
|
||||
caught_exc = None
|
||||
try:
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
# appease the close-checker
|
||||
with closing_if_possible(body_iter):
|
||||
for chunk in body_iter:
|
||||
body += chunk
|
||||
except Exception as exc:
|
||||
if expect_exception:
|
||||
caught_exc = exc
|
||||
@ -232,7 +234,7 @@ class TestSloPutManifest(SloTestCase):
|
||||
'/?multipart-manifest=put',
|
||||
environ={'REQUEST_METHOD': 'PUT'}, body=test_json_data)
|
||||
self.assertEquals(
|
||||
self.slo.handle_multipart_put(req, fake_start_response),
|
||||
list(self.slo.handle_multipart_put(req, fake_start_response)),
|
||||
['passed'])
|
||||
|
||||
def test_handle_multipart_put_success(self):
|
||||
@ -949,6 +951,9 @@ class TestSloGetManifest(SloTestCase):
|
||||
'X-Object-Meta-Fish': 'Bass'},
|
||||
"[not {json (at ++++all")
|
||||
|
||||
def tearDown(self):
|
||||
self.assertEqual(self.app.unclosed_requests, {})
|
||||
|
||||
def test_get_manifest_passthrough(self):
|
||||
req = Request.blank(
|
||||
'/v1/AUTH_test/gettest/manifest-bc?multipart-manifest=get',
|
||||
|
Loading…
Reference in New Issue
Block a user