middlewares: Clean up app iters better

Previously, logs would often show 499s in places where some other status
would be more appropriate.

Change-Id: I68dbb8593101cd3b5b64a1a947c68e340e36ce02
This commit is contained in:
Tim Burke 2020-01-02 15:19:27 -08:00
parent baaa5c57e1
commit 2a8d47f00e
13 changed files with 111 additions and 63 deletions

View File

@ -203,7 +203,7 @@ from cgi import parse_header
from swift.common.utils import get_logger, register_swift_info, split_path, \ from swift.common.utils import get_logger, register_swift_info, split_path, \
MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \ MD5_OF_EMPTY_STRING, close_if_possible, closing_if_possible, \
config_true_value config_true_value, drain_and_close
from swift.common.constraints import check_account_format from swift.common.constraints import check_account_format
from swift.common.wsgi import WSGIContext, make_subrequest from swift.common.wsgi import WSGIContext, make_subrequest
from swift.common.request_helpers import get_sys_meta_prefix, \ from swift.common.request_helpers import get_sys_meta_prefix, \
@ -468,7 +468,8 @@ class SymlinkObjectContext(WSGIContext):
resp_etag = self._response_header_value( resp_etag = self._response_header_value(
TGT_ETAG_SYSMETA_SYMLINK_HDR) TGT_ETAG_SYSMETA_SYMLINK_HDR)
if symlink_target and (resp_etag or follow_softlinks): if symlink_target and (resp_etag or follow_softlinks):
close_if_possible(resp) # Should be a zero-byte object
drain_and_close(resp)
found_etag = resp_etag or self._response_header_value('etag') found_etag = resp_etag or self._response_header_value('etag')
if target_etag and target_etag != found_etag: if target_etag and target_etag != found_etag:
raise HTTPConflict( raise HTTPConflict(
@ -491,6 +492,7 @@ class SymlinkObjectContext(WSGIContext):
else: else:
final_etag = self._response_header_value('etag') final_etag = self._response_header_value('etag')
if final_etag and target_etag and target_etag != final_etag: if final_etag and target_etag and target_etag != final_etag:
# do *not* drain; we don't know how big this is
close_if_possible(resp) close_if_possible(resp)
body = ('Object Etag %r does not match ' body = ('Object Etag %r does not match '
'X-Symlink-Target-Etag header %r') 'X-Symlink-Target-Etag header %r')
@ -538,9 +540,7 @@ class SymlinkObjectContext(WSGIContext):
'Content-Type': 'text/plain', 'Content-Type': 'text/plain',
'Content-Location': self._last_target_path}) 'Content-Location': self._last_target_path})
if not is_success(self._get_status_int()): if not is_success(self._get_status_int()):
with closing_if_possible(resp): drain_and_close(resp)
for chunk in resp:
pass
raise status_map[self._get_status_int()](request=req) raise status_map[self._get_status_int()](request=req)
response_headers = HeaderKeyDict(self._response_headers) response_headers = HeaderKeyDict(self._response_headers)
# carry forward any etag update params (e.g. "slo_etag"), we'll append # carry forward any etag update params (e.g. "slo_etag"), we'll append

View File

@ -230,7 +230,7 @@ import json
import time import time
from swift.common.utils import get_logger, Timestamp, \ from swift.common.utils import get_logger, Timestamp, \
config_true_value, close_if_possible, FileLikeIter config_true_value, close_if_possible, FileLikeIter, drain_and_close
from swift.common.request_helpers import get_sys_meta_prefix, \ from swift.common.request_helpers import get_sys_meta_prefix, \
copy_header_subset copy_header_subset
from swift.common.wsgi import WSGIContext, make_pre_authed_request from swift.common.wsgi import WSGIContext, make_pre_authed_request
@ -341,7 +341,8 @@ class VersionedWritesContext(WSGIContext):
lreq.environ['QUERY_STRING'] += '&reverse=on' lreq.environ['QUERY_STRING'] += '&reverse=on'
lresp = lreq.get_response(self.app) lresp = lreq.get_response(self.app)
if not is_success(lresp.status_int): if not is_success(lresp.status_int):
close_if_possible(lresp.app_iter) # errors should be short
drain_and_close(lresp)
if lresp.status_int == HTTP_NOT_FOUND: if lresp.status_int == HTTP_NOT_FOUND:
raise ListingIterNotFound() raise ListingIterNotFound()
elif is_client_error(lresp.status_int): elif is_client_error(lresp.status_int):
@ -382,6 +383,8 @@ class VersionedWritesContext(WSGIContext):
if source_resp.content_length is None or \ if source_resp.content_length is None or \
source_resp.content_length > MAX_FILE_SIZE: source_resp.content_length > MAX_FILE_SIZE:
# Consciously *don't* drain the response before closing;
# any logged 499 is actually rather appropriate here
close_if_possible(source_resp.app_iter) close_if_possible(source_resp.app_iter)
return HTTPRequestEntityTooLarge(request=req) return HTTPRequestEntityTooLarge(request=req)
@ -402,6 +405,7 @@ class VersionedWritesContext(WSGIContext):
put_req.environ['wsgi.input'] = FileLikeIter(source_resp.app_iter) put_req.environ['wsgi.input'] = FileLikeIter(source_resp.app_iter)
put_resp = put_req.get_response(self.app) put_resp = put_req.get_response(self.app)
# the PUT was responsible for draining
close_if_possible(source_resp.app_iter) close_if_possible(source_resp.app_iter)
return put_resp return put_resp
@ -411,7 +415,8 @@ class VersionedWritesContext(WSGIContext):
""" """
if is_success(resp.status_int): if is_success(resp.status_int):
return return
close_if_possible(resp.app_iter) # any error should be short
drain_and_close(resp)
if is_client_error(resp.status_int): if is_client_error(resp.status_int):
# missing container or bad permissions # missing container or bad permissions
raise HTTPPreconditionFailed(request=req) raise HTTPPreconditionFailed(request=req)
@ -444,7 +449,7 @@ class VersionedWritesContext(WSGIContext):
if get_resp.status_int == HTTP_NOT_FOUND: if get_resp.status_int == HTTP_NOT_FOUND:
# nothing to version, proceed with original request # nothing to version, proceed with original request
close_if_possible(get_resp.app_iter) drain_and_close(get_resp)
return return
# check for any other errors # check for any other errors
@ -466,7 +471,8 @@ class VersionedWritesContext(WSGIContext):
put_resp = self._put_versioned_obj(req, put_path_info, get_resp) put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
self._check_response_error(req, put_resp) self._check_response_error(req, put_resp)
close_if_possible(put_resp.app_iter) # successful PUT response should be short
drain_and_close(put_resp)
def handle_obj_versions_put(self, req, versions_cont, api_version, def handle_obj_versions_put(self, req, versions_cont, api_version,
account_name, object_name): account_name, object_name):
@ -521,7 +527,7 @@ class VersionedWritesContext(WSGIContext):
marker_req.environ['swift.content_type_overridden'] = True marker_req.environ['swift.content_type_overridden'] = True
marker_resp = marker_req.get_response(self.app) marker_resp = marker_req.get_response(self.app)
self._check_response_error(req, marker_resp) self._check_response_error(req, marker_resp)
close_if_possible(marker_resp.app_iter) drain_and_close(marker_resp)
# successfully copied and created delete marker; safe to delete # successfully copied and created delete marker; safe to delete
return self.app return self.app
@ -535,7 +541,7 @@ class VersionedWritesContext(WSGIContext):
# if the version isn't there, keep trying with previous version # if the version isn't there, keep trying with previous version
if get_resp.status_int == HTTP_NOT_FOUND: if get_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(get_resp.app_iter) drain_and_close(get_resp)
return False return False
self._check_response_error(req, get_resp) self._check_response_error(req, get_resp)
@ -545,7 +551,7 @@ class VersionedWritesContext(WSGIContext):
put_resp = self._put_versioned_obj(req, put_path_info, get_resp) put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
self._check_response_error(req, put_resp) self._check_response_error(req, put_resp)
close_if_possible(put_resp.app_iter) drain_and_close(put_resp)
return get_path return get_path
def handle_obj_versions_delete_pop(self, req, versions_cont, api_version, def handle_obj_versions_delete_pop(self, req, versions_cont, api_version,
@ -591,7 +597,7 @@ class VersionedWritesContext(WSGIContext):
req.environ, path=wsgi_quote(req.path_info), method='HEAD', req.environ, path=wsgi_quote(req.path_info), method='HEAD',
headers=obj_head_headers, swift_source='VW') headers=obj_head_headers, swift_source='VW')
hresp = head_req.get_response(self.app) hresp = head_req.get_response(self.app)
close_if_possible(hresp.app_iter) drain_and_close(hresp)
if hresp.status_int != HTTP_NOT_FOUND: if hresp.status_int != HTTP_NOT_FOUND:
self._check_response_error(req, hresp) self._check_response_error(req, hresp)
@ -619,7 +625,7 @@ class VersionedWritesContext(WSGIContext):
method='DELETE', headers=auth_token_header, method='DELETE', headers=auth_token_header,
swift_source='VW') swift_source='VW')
del_resp = old_del_req.get_response(self.app) del_resp = old_del_req.get_response(self.app)
close_if_possible(del_resp.app_iter) drain_and_close(del_resp)
if del_resp.status_int != HTTP_NOT_FOUND: if del_resp.status_int != HTTP_NOT_FOUND:
self._check_response_error(req, del_resp) self._check_response_error(req, del_resp)
# else, well, it existed long enough to do the # else, well, it existed long enough to do the

View File

@ -165,7 +165,7 @@ from swift.common.swob import HTTPPreconditionFailed, HTTPServiceUnavailable, \
HTTPRequestEntityTooLarge, HTTPInternalServerError, HTTPNotAcceptable, \ HTTPRequestEntityTooLarge, HTTPInternalServerError, HTTPNotAcceptable, \
HTTPConflict HTTPConflict
from swift.common.storage_policy import POLICIES from swift.common.storage_policy import POLICIES
from swift.common.utils import get_logger, Timestamp, \ from swift.common.utils import get_logger, Timestamp, drain_and_close, \
config_true_value, close_if_possible, closing_if_possible, \ config_true_value, close_if_possible, closing_if_possible, \
FileLikeIter, split_path, parse_content_type, RESERVED_STR FileLikeIter, split_path, parse_content_type, RESERVED_STR
from swift.common.wsgi import WSGIContext, make_pre_authed_request from swift.common.wsgi import WSGIContext, make_pre_authed_request
@ -288,6 +288,8 @@ class ObjectContext(ObjectVersioningContext):
put_req.headers['Content-Type'] += '; swift_bytes=%s' % slo_size put_req.headers['Content-Type'] += '; swift_bytes=%s' % slo_size
put_req.environ['swift.content_type_overridden'] = True put_req.environ['swift.content_type_overridden'] = True
put_resp = put_req.get_response(self.app) put_resp = put_req.get_response(self.app)
drain_and_close(put_resp)
# the PUT should have already drained source_resp
close_if_possible(source_resp.app_iter) close_if_possible(source_resp.app_iter)
return put_resp return put_resp
@ -324,19 +326,16 @@ class ObjectContext(ObjectVersioningContext):
# do the write # do the write
put_resp = put_req.get_response(self.app) put_resp = put_req.get_response(self.app)
drain_and_close(put_resp)
close_if_possible(put_req.environ['wsgi.input'])
if put_resp.status_int == HTTP_NOT_FOUND: if put_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(put_resp.app_iter)
raise HTTPInternalServerError( raise HTTPInternalServerError(
request=req, content_type='text/plain', request=req, content_type='text/plain',
body=b'The versions container does not exist. You may ' body=b'The versions container does not exist. You may '
b'want to re-enable object versioning.') b'want to re-enable object versioning.')
self._check_response_error(req, put_resp) self._check_response_error(req, put_resp)
with closing_if_possible(put_resp.app_iter), closing_if_possible(
put_req.environ['wsgi.input']):
for chunk in put_resp.app_iter:
pass
put_bytes = byte_counter.bytes_read put_bytes = byte_counter.bytes_read
# N.B. this is essentially the same hack that symlink does in # N.B. this is essentially the same hack that symlink does in
# _validate_etag_and_update_sysmeta to deal with SLO # _validate_etag_and_update_sysmeta to deal with SLO
@ -390,7 +389,7 @@ class ObjectContext(ObjectVersioningContext):
""" """
if is_success(resp.status_int): if is_success(resp.status_int):
return return
close_if_possible(resp.app_iter) drain_and_close(resp)
if is_client_error(resp.status_int): if is_client_error(resp.status_int):
# missing container or bad permissions # missing container or bad permissions
if resp.status_int == 404: if resp.status_int == 404:
@ -429,10 +428,7 @@ class ObjectContext(ObjectVersioningContext):
if get_resp.status_int == HTTP_NOT_FOUND: if get_resp.status_int == HTTP_NOT_FOUND:
# nothing to version, proceed with original request # nothing to version, proceed with original request
for chunk in get_resp.app_iter: drain_and_close(get_resp)
# Should be short; just avoiding the 499
pass
close_if_possible(get_resp.app_iter)
return get_resp return get_resp
# check for any other errors # check for any other errors
@ -440,7 +436,7 @@ class ObjectContext(ObjectVersioningContext):
if get_resp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true': if get_resp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true':
# existing object is a VW symlink; no action required # existing object is a VW symlink; no action required
close_if_possible(get_resp.app_iter) drain_and_close(get_resp)
return get_resp return get_resp
# if there's an existing object, then copy it to # if there's an existing object, then copy it to
@ -458,15 +454,12 @@ class ObjectContext(ObjectVersioningContext):
put_resp = self._put_versioned_obj(req, put_path_info, get_resp) put_resp = self._put_versioned_obj(req, put_path_info, get_resp)
if put_resp.status_int == HTTP_NOT_FOUND: if put_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(put_resp.app_iter)
raise HTTPInternalServerError( raise HTTPInternalServerError(
request=req, content_type='text/plain', request=req, content_type='text/plain',
body=b'The versions container does not exist. You may ' body=b'The versions container does not exist. You may '
b'want to re-enable object versioning.') b'want to re-enable object versioning.')
self._check_response_error(req, put_resp) self._check_response_error(req, put_resp)
close_if_possible(put_resp.app_iter)
return put_resp
def handle_put(self, req, versions_cont, api_version, def handle_put(self, req, versions_cont, api_version,
account_name, object_name, is_enabled): account_name, object_name, is_enabled):
@ -553,7 +546,7 @@ class ObjectContext(ObjectVersioningContext):
marker_req.environ['swift.content_type_overridden'] = True marker_req.environ['swift.content_type_overridden'] = True
marker_resp = marker_req.get_response(self.app) marker_resp = marker_req.get_response(self.app)
self._check_response_error(req, marker_resp) self._check_response_error(req, marker_resp)
close_if_possible(marker_resp.app_iter) drain_and_close(marker_resp)
# successfully copied and created delete marker; safe to delete # successfully copied and created delete marker; safe to delete
resp = req.get_response(self.app) resp = req.get_response(self.app)
@ -561,7 +554,7 @@ class ObjectContext(ObjectVersioningContext):
resp.headers['X-Object-Version-Id'] = \ resp.headers['X-Object-Version-Id'] = \
self._split_version_from_name(marker_name)[1].internal self._split_version_from_name(marker_name)[1].internal
resp.headers['X-Backend-Content-Type'] = DELETE_MARKER_CONTENT_TYPE resp.headers['X-Backend-Content-Type'] = DELETE_MARKER_CONTENT_TYPE
close_if_possible(resp.app_iter) drain_and_close(resp)
return resp return resp
def handle_post(self, req, versions_cont, account): def handle_post(self, req, versions_cont, account):
@ -595,7 +588,7 @@ class ObjectContext(ObjectVersioningContext):
# Only follow if the version container matches # Only follow if the version container matches
if split_path(loc, 4, 4, True)[1:3] == [ if split_path(loc, 4, 4, True)[1:3] == [
account, versions_cont]: account, versions_cont]:
close_if_possible(resp.app_iter) drain_and_close(resp)
post_req.path_info = loc post_req.path_info = loc
resp = post_req.get_response(self.app) resp = post_req.get_response(self.app)
return resp return resp
@ -620,7 +613,7 @@ class ObjectContext(ObjectVersioningContext):
self._check_response_error(req, hresp) self._check_response_error(req, hresp)
if hresp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true': if hresp.headers.get(SYSMETA_VERSIONS_SYMLINK) == 'true':
symlink_target = hresp.headers.get(TGT_OBJ_SYMLINK_HDR) symlink_target = hresp.headers.get(TGT_OBJ_SYMLINK_HDR)
close_if_possible(hresp.app_iter) drain_and_close(hresp)
return head_is_tombstone, symlink_target return head_is_tombstone, symlink_target
def handle_delete_version(self, req, versions_cont, api_version, def handle_delete_version(self, req, versions_cont, api_version,
@ -656,7 +649,7 @@ class ObjectContext(ObjectVersioningContext):
req.environ['QUERY_STRING'] = '' req.environ['QUERY_STRING'] = ''
link_resp = req.get_response(self.app) link_resp = req.get_response(self.app)
self._check_response_error(req, link_resp) self._check_response_error(req, link_resp)
close_if_possible(link_resp.app_iter) drain_and_close(link_resp)
# *then* the backing data # *then* the backing data
req.path_info = "/%s/%s/%s/%s" % ( req.path_info = "/%s/%s/%s/%s" % (
@ -693,7 +686,7 @@ class ObjectContext(ObjectVersioningContext):
method='HEAD', headers=obj_head_headers, swift_source='OV') method='HEAD', headers=obj_head_headers, swift_source='OV')
head_resp = head_req.get_response(self.app) head_resp = head_req.get_response(self.app)
if head_resp.status_int == HTTP_NOT_FOUND: if head_resp.status_int == HTTP_NOT_FOUND:
close_if_possible(head_resp.app_iter) drain_and_close(head_resp)
if is_success(get_container_info( if is_success(get_container_info(
head_req.environ, self.app, swift_source='OV')['status']): head_req.environ, self.app, swift_source='OV')['status']):
raise HTTPNotFound( raise HTTPNotFound(
@ -706,7 +699,7 @@ class ObjectContext(ObjectVersioningContext):
b'want to re-enable object versioning.') b'want to re-enable object versioning.')
self._check_response_error(req, head_resp) self._check_response_error(req, head_resp)
close_if_possible(head_resp.app_iter) drain_and_close(head_resp)
put_etag = head_resp.headers['ETag'] put_etag = head_resp.headers['ETag']
put_bytes = head_resp.content_length put_bytes = head_resp.content_length
@ -773,7 +766,7 @@ class ObjectContext(ObjectVersioningContext):
raise HTTPNotFound(request=req) raise HTTPNotFound(request=req)
resp.headers['X-Object-Version-Id'] = 'null' resp.headers['X-Object-Version-Id'] = 'null'
if req.method == 'HEAD': if req.method == 'HEAD':
close_if_possible(resp.app_iter) drain_and_close(resp)
return resp return resp
else: else:
# Re-write the path; most everything else goes through normally # Re-write the path; most everything else goes through normally
@ -791,7 +784,7 @@ class ObjectContext(ObjectVersioningContext):
'X-Backend-Content-Type', resp.headers['Content-Type']) 'X-Backend-Content-Type', resp.headers['Content-Type'])
if req.method == 'HEAD': if req.method == 'HEAD':
close_if_possible(resp.app_iter) drain_and_close(resp)
if is_del_marker: if is_del_marker:
hdrs = {'X-Object-Version-Id': version, hdrs = {'X-Object-Version-Id': version,
@ -880,7 +873,7 @@ class ContainerContext(ObjectVersioningContext):
self._response_headers[bytes_idx] = ( self._response_headers[bytes_idx] = (
'X-Container-Bytes-Used', 'X-Container-Bytes-Used',
str(int(curr_bytes) + int(ver_bytes))) str(int(curr_bytes) + int(ver_bytes)))
close_if_possible(vresp.app_iter) drain_and_close(vresp)
elif is_success(self._get_status_int()): elif is_success(self._get_status_int()):
# If client is doing a version-aware listing for a container that # If client is doing a version-aware listing for a container that
# (as best we could tell) has never had versioning enabled, # (as best we could tell) has never had versioning enabled,
@ -972,7 +965,7 @@ class ContainerContext(ObjectVersioningContext):
account, str_to_wsgi(versions_cont))), account, str_to_wsgi(versions_cont))),
headers={'X-Backend-Allow-Reserved-Names': 'true'}) headers={'X-Backend-Allow-Reserved-Names': 'true'})
vresp = versions_req.get_response(self.app) vresp = versions_req.get_response(self.app)
close_if_possible(vresp.app_iter) drain_and_close(vresp)
if vresp.is_success and int(vresp.headers.get( if vresp.is_success and int(vresp.headers.get(
'X-Container-Object-Count', 0)) > 0: 'X-Container-Object-Count', 0)) > 0:
raise HTTPConflict( raise HTTPConflict(
@ -984,7 +977,7 @@ class ContainerContext(ObjectVersioningContext):
else: else:
versions_req.method = 'DELETE' versions_req.method = 'DELETE'
resp = versions_req.get_response(self.app) resp = versions_req.get_response(self.app)
close_if_possible(resp.app_iter) drain_and_close(resp)
if not is_success(resp.status_int) and resp.status_int != 404: if not is_success(resp.status_int) and resp.status_int != 404:
raise HTTPInternalServerError( raise HTTPInternalServerError(
'Error deleting versioned container') 'Error deleting versioned container')
@ -1072,9 +1065,7 @@ class ContainerContext(ObjectVersioningContext):
method='PUT', headers=hdrs, swift_source='OV') method='PUT', headers=hdrs, swift_source='OV')
resp = ver_cont_req.get_response(self.app) resp = ver_cont_req.get_response(self.app)
# Should always be short; consume the body # Should always be short; consume the body
for chunk in resp.app_iter: drain_and_close(resp)
pass
close_if_possible(resp.app_iter)
if is_success(resp.status_int) or resp.status_int == HTTP_CONFLICT: if is_success(resp.status_int) or resp.status_int == HTTP_CONFLICT:
req.headers[SYSMETA_VERSIONS_CONT] = wsgi_quote(versions_cont) req.headers[SYSMETA_VERSIONS_CONT] = wsgi_quote(versions_cont)
else: else:
@ -1097,7 +1088,7 @@ class ContainerContext(ObjectVersioningContext):
# TODO: what if this one fails?? # TODO: what if this one fails??
resp = ver_cont_req.get_response(self.app) resp = ver_cont_req.get_response(self.app)
close_if_possible(resp.app_iter) drain_and_close(resp)
if self._response_headers is None: if self._response_headers is None:
self._response_headers = [] self._response_headers = []
@ -1202,7 +1193,7 @@ class ContainerContext(ObjectVersioningContext):
reverse=config_true_value(params.get('reverse', 'no'))) reverse=config_true_value(params.get('reverse', 'no')))
self.update_content_length(len(body)) self.update_content_length(len(body))
app_resp = [body] app_resp = [body]
close_if_possible(versions_resp.app_iter) drain_and_close(versions_resp)
elif is_success(versions_resp.status_int): elif is_success(versions_resp.status_int):
try: try:
listing = json.loads(versions_resp.body) listing = json.loads(versions_resp.body)
@ -1324,9 +1315,8 @@ class AccountContext(ObjectVersioningContext):
try: try:
versions_listing = json.loads(versions_resp.body) versions_listing = json.loads(versions_resp.body)
except ValueError: except ValueError:
close_if_possible(versions_resp.app_iter)
versions_listing = [] versions_listing = []
else: finally:
close_if_possible(versions_resp.app_iter) close_if_possible(versions_resp.app_iter)
# create a dict from versions listing to facilitate # create a dict from versions listing to facilitate

View File

@ -4217,6 +4217,19 @@ def closing_if_possible(maybe_closable):
close_if_possible(maybe_closable) close_if_possible(maybe_closable)
def drain_and_close(response_or_app_iter):
"""
Drain and close a swob or WSGI response.
This ensures we don't log a 499 in the proxy just because we realized we
don't care about the body of an error.
"""
app_iter = getattr(response_or_app_iter, 'app_iter', response_or_app_iter)
for _chunk in app_iter:
pass
close_if_possible(app_iter)
_rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+' _rfc_token = r'[^()<>@,;:\"/\[\]?={}\x00-\x20\x7f]+'
_rfc_extension_pattern = re.compile( _rfc_extension_pattern = re.compile(
r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token + r'(?:\s*;\s*(' + _rfc_token + r")\s*(?:=\s*(" + _rfc_token +

View File

@ -44,7 +44,7 @@ import six
from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request from swift.common.wsgi import make_pre_authed_env, make_pre_authed_request
from swift.common.utils import Timestamp, config_true_value, \ from swift.common.utils import Timestamp, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \ public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, close_if_possible, \ GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
document_iters_to_http_response_body, ShardRange, find_shard_range document_iters_to_http_response_body, ShardRange, find_shard_range
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common import constraints from swift.common import constraints
@ -369,7 +369,7 @@ def get_container_info(env, app, swift_source=None):
# caller to keep the result private-ish # caller to keep the result private-ish
req.headers['X-Backend-Allow-Reserved-Names'] = 'true' req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
resp = req.get_response(app) resp = req.get_response(app)
close_if_possible(resp.app_iter) drain_and_close(resp)
# Check in infocache to see if the proxy (or anyone else) already # Check in infocache to see if the proxy (or anyone else) already
# populated the cache for us. If they did, just use what's there. # populated the cache for us. If they did, just use what's there.
# #
@ -443,7 +443,7 @@ def get_account_info(env, app, swift_source=None):
# caller to keep the result private-ish # caller to keep the result private-ish
req.headers['X-Backend-Allow-Reserved-Names'] = 'true' req.headers['X-Backend-Allow-Reserved-Names'] = 'true'
resp = req.get_response(app) resp = req.get_response(app)
close_if_possible(resp.app_iter) drain_and_close(resp)
# Check in infocache to see if the proxy (or anyone else) already # Check in infocache to see if the proxy (or anyone else) already
# populated the cache for us. If they did, just use what's there. # populated the cache for us. If they did, just use what's there.
# #
@ -1226,7 +1226,8 @@ class ResumingGetter(object):
if end - begin + 1 == self.bytes_used_from_backend: if end - begin + 1 == self.bytes_used_from_backend:
warn = False warn = False
if not req.environ.get('swift.non_client_disconnect') and warn: if not req.environ.get('swift.non_client_disconnect') and warn:
self.app.logger.warning(_('Client disconnected on read')) self.app.logger.warning('Client disconnected on read of %r',
self.path)
raise raise
except Exception: except Exception:
self.app.logger.exception(_('Trying to send to client')) self.app.logger.exception(_('Trying to send to client'))

View File

@ -29,16 +29,18 @@ from test.unit import FakeLogger, FakeRing
class LeakTrackingIter(object): class LeakTrackingIter(object):
def __init__(self, inner_iter, mark_closed, key): def __init__(self, inner_iter, mark_closed, mark_read, key):
if isinstance(inner_iter, bytes): if isinstance(inner_iter, bytes):
inner_iter = (inner_iter, ) inner_iter = (inner_iter, )
self.inner_iter = inner_iter self.inner_iter = inner_iter
self.mark_closed = mark_closed self.mark_closed = mark_closed
self.mark_read = mark_read
self.key = key self.key = key
def __iter__(self): def __iter__(self):
for x in self.inner_iter: for x in self.inner_iter:
yield x yield x
self.mark_read(self.key)
def close(self): def close(self):
self.mark_closed(self.key) self.mark_closed(self.key)
@ -71,6 +73,7 @@ class FakeSwift(object):
def __init__(self): def __init__(self):
self._calls = [] self._calls = []
self._unclosed_req_keys = defaultdict(int) self._unclosed_req_keys = defaultdict(int)
self._unread_req_paths = defaultdict(int)
self.req_method_paths = [] self.req_method_paths = []
self.swift_sources = [] self.swift_sources = []
self.txn_ids = [] self.txn_ids = []
@ -189,20 +192,31 @@ class FakeSwift(object):
conditional_etag=conditional_etag) conditional_etag=conditional_etag)
wsgi_iter = resp(env, start_response) wsgi_iter = resp(env, start_response)
self.mark_opened((method, path)) self.mark_opened((method, path))
return LeakTrackingIter(wsgi_iter, self.mark_closed, (method, path)) return LeakTrackingIter(wsgi_iter, self.mark_closed,
self.mark_read, (method, path))
def mark_opened(self, key): def mark_opened(self, key):
self._unclosed_req_keys[key] += 1 self._unclosed_req_keys[key] += 1
self._unread_req_paths[key] += 1
def mark_closed(self, key): def mark_closed(self, key):
self._unclosed_req_keys[key] -= 1 self._unclosed_req_keys[key] -= 1
def mark_read(self, key):
self._unread_req_paths[key] -= 1
@property @property
def unclosed_requests(self): def unclosed_requests(self):
return {key: count return {key: count
for key, count in self._unclosed_req_keys.items() for key, count in self._unclosed_req_keys.items()
if count > 0} if count > 0}
@property
def unread_requests(self):
return {path: count
for path, count in self._unread_req_paths.items()
if count > 0}
@property @property
def calls(self): def calls(self):
return [(method, path) for method, path, headers in self._calls] return [(method, path) for method, path, headers in self._calls]

View File

@ -315,6 +315,7 @@ class TestDloGetManifest(DloTestCase):
self.assertEqual(status, '200 OK') self.assertEqual(status, '200 OK')
self.assertEqual(body, b'useful stuff here') self.assertEqual(body, b'useful stuff here')
self.assertEqual(self.app.call_count, 1) self.assertEqual(self.app.call_count, 1)
self.assertFalse(self.app.unread_requests)
def test_get_manifest_passthrough(self): def test_get_manifest_passthrough(self):
# reregister it with the query param # reregister it with the query param
@ -331,6 +332,7 @@ class TestDloGetManifest(DloTestCase):
headers = HeaderKeyDict(headers) headers = HeaderKeyDict(headers)
self.assertEqual(headers["Etag"], "manifest-etag") self.assertEqual(headers["Etag"], "manifest-etag")
self.assertEqual(body, b'manifest-contents') self.assertEqual(body, b'manifest-contents')
self.assertFalse(self.app.unread_requests)
def test_error_passthrough(self): def test_error_passthrough(self):
self.app.register( self.app.register(
@ -354,6 +356,10 @@ class TestDloGetManifest(DloTestCase):
md5hex("aaaaa") + md5hex("bbbbb") + md5hex("ccccc") + md5hex("aaaaa") + md5hex("bbbbb") + md5hex("ccccc") +
md5hex("ddddd") + md5hex("eeeee")) md5hex("ddddd") + md5hex("eeeee"))
self.assertEqual(headers.get("Etag"), expected_etag) self.assertEqual(headers.get("Etag"), expected_etag)
self.assertEqual(self.app.unread_requests, {
# Since we don't know how big this will be, we just disconnect
('GET', '/v1/AUTH_test/mancon/manifest'): 1,
})
def test_get_range_on_segment_boundaries(self): def test_get_range_on_segment_boundaries(self):
req = swob.Request.blank('/v1/AUTH_test/mancon/manifest', req = swob.Request.blank('/v1/AUTH_test/mancon/manifest',

View File

@ -101,9 +101,12 @@ class ObjectVersioningBaseTestCase(unittest.TestCase):
self.cache_version_off.set( self.cache_version_off.set(
get_cache_key('a', self.build_container_name('c')), get_cache_key('a', self.build_container_name('c')),
{'status': 200}) {'status': 200})
self.expected_unread_requests = {}
def tearDown(self): def tearDown(self):
self.assertEqual(self.app.unclosed_requests, {}) self.assertEqual(self.app.unclosed_requests, {})
self.assertEqual(self.app.unread_requests,
self.expected_unread_requests)
def call_ov(self, req): def call_ov(self, req):
self.authorized = [] self.authorized = []
@ -1949,6 +1952,9 @@ class ObjectVersioningTestVersionAPI(ObjectVersioningBaseTestCase):
self.assertEqual(1, len(self.authorized)) self.assertEqual(1, len(self.authorized))
self.assertEqual(1, len(self.app.calls)) self.assertEqual(1, len(self.app.calls))
self.assertNotIn(('X-Object-Version-Id', '0000001234.00000'), headers) self.assertNotIn(('X-Object-Version-Id', '0000001234.00000'), headers)
# This will log a 499 but (at the moment, anyway)
# we don't have a good way to avoid it
self.expected_unread_requests[('GET', '/v1/a/c/o?version-id=null')] = 1
def test_GET_null_id_404(self): def test_GET_null_id_404(self):
self.app.register( self.app.register(

View File

@ -417,6 +417,7 @@ class TestSymlinkMiddleware(TestSymlinkMiddlewareBase):
req_headers['User-Agent'] = 'Swift' req_headers['User-Agent'] = 'Swift'
self.assertEqual(req_headers, calls[1].headers) self.assertEqual(req_headers, calls[1].headers)
self.assertFalse(calls[2:]) self.assertFalse(calls[2:])
self.assertFalse(self.app.unread_requests)
def test_get_target_object_not_found(self): def test_get_target_object_not_found(self):
self.app.register('GET', '/v1/a/c/symlink', swob.HTTPOk, self.app.register('GET', '/v1/a/c/symlink', swob.HTTPOk,
@ -430,6 +431,7 @@ class TestSymlinkMiddleware(TestSymlinkMiddlewareBase):
self.assertNotIn('X-Symlink-Target', dict(headers)) self.assertNotIn('X-Symlink-Target', dict(headers))
self.assertNotIn('X-Symlink-Target-Account', dict(headers)) self.assertNotIn('X-Symlink-Target-Account', dict(headers))
self.assertIn(('Content-Location', '/v1/a2/c1/o'), headers) self.assertIn(('Content-Location', '/v1/a2/c1/o'), headers)
self.assertFalse(self.app.unread_requests)
def test_get_target_object_range_not_satisfiable(self): def test_get_target_object_range_not_satisfiable(self):
self.app.register('GET', '/v1/a/c/symlink', swob.HTTPOk, self.app.register('GET', '/v1/a/c/symlink', swob.HTTPOk,
@ -447,6 +449,7 @@ class TestSymlinkMiddleware(TestSymlinkMiddlewareBase):
self.assertNotIn('X-Symlink-Target', dict(headers)) self.assertNotIn('X-Symlink-Target', dict(headers))
self.assertNotIn('X-Symlink-Target-Account', dict(headers)) self.assertNotIn('X-Symlink-Target-Account', dict(headers))
self.assertIn(('Content-Location', '/v1/a2/c1/o'), headers) self.assertIn(('Content-Location', '/v1/a2/c1/o'), headers)
self.assertFalse(self.app.unread_requests)
def test_get_ec_symlink_range_unsatisfiable_can_redirect_to_target(self): def test_get_ec_symlink_range_unsatisfiable_can_redirect_to_target(self):
self.app.register('GET', '/v1/a/c/symlink', self.app.register('GET', '/v1/a/c/symlink',

View File

@ -65,6 +65,7 @@ class VersionedWritesBaseTestCase(unittest.TestCase):
def tearDown(self): def tearDown(self):
self.assertEqual(self.app.unclosed_requests, {}) self.assertEqual(self.app.unclosed_requests, {})
self.assertEqual(self.app.unread_requests, {})
def call_app(self, req, app=None): def call_app(self, req, app=None):
if app is None: if app is None:

View File

@ -500,12 +500,14 @@ class TestInternalClient(unittest.TestCase):
self.resp_status = resp_status self.resp_status = resp_status
self.request_tries = 3 self.request_tries = 3
self.closed_paths = [] self.closed_paths = []
self.fully_read_paths = []
def fake_app(self, env, start_response): def fake_app(self, env, start_response):
body = b'fake error response' body = b'fake error response'
start_response(self.resp_status, start_response(self.resp_status,
[('Content-Length', str(len(body)))]) [('Content-Length', str(len(body)))])
return LeakTrackingIter(body, self.closed_paths.append, return LeakTrackingIter(body, self.closed_paths.append,
self.fully_read_paths.append,
env['PATH_INFO']) env['PATH_INFO'])
def do_test(resp_status): def do_test(resp_status):
@ -517,14 +519,17 @@ class TestInternalClient(unittest.TestCase):
# correct object body with 2xx. # correct object body with 2xx.
client.make_request('GET', '/cont/obj', {}, (400,)) client.make_request('GET', '/cont/obj', {}, (400,))
loglines = client.logger.get_lines_for_level('info') loglines = client.logger.get_lines_for_level('info')
return client.closed_paths, ctx.exception.resp, loglines return (client.fully_read_paths, client.closed_paths,
ctx.exception.resp, loglines)
closed_paths, resp, loglines = do_test('200 OK') fully_read_paths, closed_paths, resp, loglines = do_test('200 OK')
# Since the 200 is considered "properly handled", it won't be retried # Since the 200 is considered "properly handled", it won't be retried
self.assertEqual(fully_read_paths, [])
self.assertEqual(closed_paths, []) self.assertEqual(closed_paths, [])
# ...and it'll be on us (the caller) to close (for example, by using # ...and it'll be on us (the caller) to read and close (for example,
# swob.Response's body property) # by using swob.Response's body property)
self.assertEqual(resp.body, b'fake error response') self.assertEqual(resp.body, b'fake error response')
self.assertEqual(fully_read_paths, ['/cont/obj'])
self.assertEqual(closed_paths, ['/cont/obj']) self.assertEqual(closed_paths, ['/cont/obj'])
expected = (' HTTP/1.0 200 ', ) expected = (' HTTP/1.0 200 ', )
@ -533,9 +538,11 @@ class TestInternalClient(unittest.TestCase):
self.fail('Unexpected extra log line: %r' % logline) self.fail('Unexpected extra log line: %r' % logline)
self.assertIn(expected, logline) self.assertIn(expected, logline)
closed_paths, resp, loglines = do_test('503 Service Unavailable') fully_read_paths, closed_paths, resp, loglines = do_test(
'503 Service Unavailable')
# But since 5xx is neither "properly handled" not likely to include # But since 5xx is neither "properly handled" not likely to include
# a large body, it will be retried and responses will already be closed # a large body, it will be retried and responses will already be closed
self.assertEqual(fully_read_paths, ['/cont/obj'] * 3)
self.assertEqual(closed_paths, ['/cont/obj'] * 3) self.assertEqual(closed_paths, ['/cont/obj'] * 3)
expected = (' HTTP/1.0 503 ', ' HTTP/1.0 503 ', ' HTTP/1.0 503 ', ) expected = (' HTTP/1.0 503 ', ' HTTP/1.0 503 ', ' HTTP/1.0 503 ', )

View File

@ -1098,11 +1098,11 @@ class TestFuncs(unittest.TestCase):
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'} node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}
handler = GetOrHeadHandler( handler = GetOrHeadHandler(
self.app, req, 'Object', None, None, None, {}) self.app, req, 'Object', None, None, 'some-path', {})
app_iter = handler._make_app_iter(req, node, source) app_iter = handler._make_app_iter(req, node, source)
app_iter.close() app_iter.close()
self.app.logger.warning.assert_called_once_with( self.app.logger.warning.assert_called_once_with(
'Client disconnected on read') 'Client disconnected on read of %r', 'some-path')
self.app.logger = mock.Mock() self.app.logger = mock.Mock()
node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'} node = {'ip': '1.2.3.4', 'port': 6200, 'device': 'sda'}

View File

@ -7316,7 +7316,8 @@ class BaseTestECObjectController(BaseTestObjectController):
_test_servers[0].logger.get_lines_for_level('warning')) _test_servers[0].logger.get_lines_for_level('warning'))
# check for disconnect message! # check for disconnect message!
expected = ['Client disconnected on read'] * 2 expected = ["Client disconnected on read of '/a/%s-discon/test'"
% self.ec_policy.name] * 2
self.assertEqual( self.assertEqual(
_test_servers[0].logger.get_lines_for_level('warning'), _test_servers[0].logger.get_lines_for_level('warning'),
expected) expected)