Merge "Fix InternalClient to drain response body if the request fails"
This commit is contained in:
@@ -22,14 +22,13 @@ from six.moves import urllib
|
||||
import struct
|
||||
from sys import exc_info, exit
|
||||
import zlib
|
||||
from swift import gettext_ as _
|
||||
from time import gmtime, strftime, time
|
||||
from zlib import compressobj
|
||||
|
||||
from swift.common.exceptions import ClientException
|
||||
from swift.common.http import HTTP_NOT_FOUND, HTTP_MULTIPLE_CHOICES
|
||||
from swift.common.swob import Request
|
||||
from swift.common.utils import quote
|
||||
from swift.common.utils import quote, closing_if_possible
|
||||
from swift.common.wsgi import loadapp, pipeline_property
|
||||
|
||||
if six.PY3:
|
||||
@@ -199,10 +198,22 @@ class InternalClient(object):
|
||||
exc_type, exc_value, exc_traceback = exc_info()
|
||||
# sleep only between tries, not after each one
|
||||
if attempt < self.request_tries - 1:
|
||||
if resp:
|
||||
# always close any resp.app_iter before we discard it
|
||||
with closing_if_possible(resp.app_iter):
|
||||
# for non 2XX requests it's safe and useful to drain
|
||||
# the response body so we log the correct status code
|
||||
if resp.status_int // 100 != 2:
|
||||
for iter_body in resp.app_iter:
|
||||
pass
|
||||
sleep(2 ** (attempt + 1))
|
||||
if resp:
|
||||
raise UnexpectedResponse(
|
||||
_('Unexpected response: %s') % resp.status, resp)
|
||||
msg = 'Unexpected response: %s' % resp.status
|
||||
if resp.status_int // 100 != 2 and resp.body:
|
||||
# provide additional context (and drain the response body) for
|
||||
# non 2XX responses
|
||||
msg += ' (%s)' % resp.body
|
||||
raise UnexpectedResponse(msg, resp)
|
||||
if exc_type:
|
||||
# To make pep8 tool happy, in place of raise t, v, tb:
|
||||
six.reraise(exc_type(*exc_value.args), None, exc_traceback)
|
||||
|
||||
@@ -28,9 +28,9 @@ from test.unit import FakeLogger, FakeRing
|
||||
|
||||
|
||||
class LeakTrackingIter(object):
|
||||
def __init__(self, inner_iter, fake_swift, path):
|
||||
def __init__(self, inner_iter, mark_closed, path):
|
||||
self.inner_iter = inner_iter
|
||||
self.fake_swift = fake_swift
|
||||
self.mark_closed = mark_closed
|
||||
self.path = path
|
||||
|
||||
def __iter__(self):
|
||||
@@ -38,7 +38,7 @@ class LeakTrackingIter(object):
|
||||
yield x
|
||||
|
||||
def close(self):
|
||||
self.fake_swift.mark_closed(self.path)
|
||||
self.mark_closed(self.path)
|
||||
|
||||
|
||||
FakeSwiftCall = namedtuple('FakeSwiftCall', ['method', 'path', 'headers'])
|
||||
@@ -173,7 +173,7 @@ class FakeSwift(object):
|
||||
conditional_etag=conditional_etag)
|
||||
wsgi_iter = resp(env, start_response)
|
||||
self.mark_opened(path)
|
||||
return LeakTrackingIter(wsgi_iter, self, path)
|
||||
return LeakTrackingIter(wsgi_iter, self.mark_closed, path)
|
||||
|
||||
def mark_opened(self, path):
|
||||
self._unclosed_req_paths[path] += 1
|
||||
|
||||
@@ -19,6 +19,7 @@ import unittest
|
||||
import zlib
|
||||
from textwrap import dedent
|
||||
import os
|
||||
from itertools import izip_longest
|
||||
|
||||
import six
|
||||
from six import StringIO
|
||||
@@ -28,9 +29,11 @@ from test.unit import FakeLogger
|
||||
from swift.common import exceptions, internal_client, swob
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.storage_policy import StoragePolicy
|
||||
from swift.common.middleware.proxy_logging import ProxyLoggingMiddleware
|
||||
|
||||
from test.unit import with_tempdir, write_fake_ring, patch_policies
|
||||
from test.unit.common.middleware.helpers import FakeSwift
|
||||
from test.unit import with_tempdir, write_fake_ring, patch_policies, \
|
||||
DebugLogger
|
||||
from test.unit.common.middleware.helpers import FakeSwift, LeakTrackingIter
|
||||
|
||||
if six.PY3:
|
||||
from eventlet.green.urllib import request as urllib2
|
||||
@@ -438,6 +441,74 @@ class TestInternalClient(unittest.TestCase):
|
||||
self.assertEqual(client.env['PATH_INFO'], path)
|
||||
self.assertEqual(client.env['HTTP_X_TEST'], path)
|
||||
|
||||
def test_make_request_error_case(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
self.logger = DebugLogger()
|
||||
# wrap the fake app with ProxyLoggingMiddleware
|
||||
self.app = ProxyLoggingMiddleware(
|
||||
self.fake_app, {}, self.logger)
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
body = 'fake error response'
|
||||
start_response('409 Conflict',
|
||||
[('Content-Length', str(len(body)))])
|
||||
return [body]
|
||||
|
||||
client = InternalClient()
|
||||
with self.assertRaises(internal_client.UnexpectedResponse), \
|
||||
mock.patch('swift.common.internal_client.sleep'):
|
||||
client.make_request('DELETE', '/container', {}, (200,))
|
||||
|
||||
for logline in client.logger.get_lines_for_level('info'):
|
||||
# add spaces before/after 409 because it can match with
|
||||
# something like timestamp
|
||||
self.assertIn(' 409 ', logline)
|
||||
|
||||
def test_make_request_acceptable_status_not_2xx(self):
|
||||
closed_paths = []
|
||||
|
||||
def mark_closed(path):
|
||||
closed_paths.append(path)
|
||||
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
self.logger = DebugLogger()
|
||||
# wrap the fake app with ProxyLoggingMiddleware
|
||||
self.app = ProxyLoggingMiddleware(
|
||||
self.fake_app, {}, self.logger)
|
||||
self.user_agent = 'some_agent'
|
||||
self.request_tries = 3
|
||||
|
||||
def fake_app(self, env, start_response):
|
||||
body = 'fake error response'
|
||||
start_response('200 Ok', [('Content-Length', str(len(body)))])
|
||||
return LeakTrackingIter((c for c in body),
|
||||
mark_closed, env['PATH_INFO'])
|
||||
|
||||
client = InternalClient()
|
||||
with self.assertRaises(internal_client.UnexpectedResponse) as ctx, \
|
||||
mock.patch('swift.common.internal_client.sleep'):
|
||||
# This is obvious strange tests to expect only 400 Bad Request
|
||||
# but this test intended to avoid extra body drain if it's
|
||||
# correct object body with 2xx.
|
||||
client.make_request('GET', '/cont/obj', {}, (400,))
|
||||
self.assertEqual(closed_paths, ['/cont/obj'] * 2)
|
||||
# swob's body resp property will call close
|
||||
self.assertEqual(ctx.exception.resp.body, 'fake error response')
|
||||
self.assertEqual(closed_paths, ['/cont/obj'] * 3)
|
||||
|
||||
# Retries like this will cause 499 because internal_client won't
|
||||
# automatically consume (potentially large) 2XX responses.
|
||||
expected = (' 499 ', ' 499 ', ' 200 ')
|
||||
loglines = client.logger.get_lines_for_level('info')
|
||||
for expected, logline in izip_longest(expected, loglines):
|
||||
if not expected:
|
||||
self.fail('Unexpected extra log line: %r' % logline)
|
||||
self.assertIn(expected, logline)
|
||||
|
||||
def test_make_request_codes(self):
|
||||
class InternalClient(internal_client.InternalClient):
|
||||
def __init__(self):
|
||||
|
||||
Reference in New Issue
Block a user