Merge "Stop reading from object server when client disconnects."
This commit is contained in:
commit
3206132339
@ -41,6 +41,10 @@ class BufferedHTTPResponse(HTTPResponse):
|
|||||||
def __init__(self, sock, debuglevel=0, strict=0,
|
def __init__(self, sock, debuglevel=0, strict=0,
|
||||||
method=None): # pragma: no cover
|
method=None): # pragma: no cover
|
||||||
self.sock = sock
|
self.sock = sock
|
||||||
|
# sock is an eventlet.greenio.GreenSocket
|
||||||
|
# sock.fd is a socket._socketobject
|
||||||
|
# sock.fd._sock is a socket._socket object, which is what we want.
|
||||||
|
self._real_socket = sock.fd._sock
|
||||||
self.fp = sock.makefile('rb')
|
self.fp = sock.makefile('rb')
|
||||||
self.debuglevel = debuglevel
|
self.debuglevel = debuglevel
|
||||||
self.strict = strict
|
self.strict = strict
|
||||||
@ -74,9 +78,25 @@ class BufferedHTTPResponse(HTTPResponse):
|
|||||||
self.msg = HTTPMessage(self.fp, 0)
|
self.msg = HTTPMessage(self.fp, 0)
|
||||||
self.msg.fp = None
|
self.msg.fp = None
|
||||||
|
|
||||||
|
def nuke_from_orbit(self):
|
||||||
|
"""
|
||||||
|
Terminate the socket with extreme prejudice.
|
||||||
|
|
||||||
|
Closes the underlying socket regardless of whether or not anyone else
|
||||||
|
has references to it. Use this when you are certain that nobody else
|
||||||
|
you care about has a reference to this socket.
|
||||||
|
"""
|
||||||
|
if self._real_socket:
|
||||||
|
# this is idempotent; see sock_close in Modules/socketmodule.c in
|
||||||
|
# the Python source for details.
|
||||||
|
self._real_socket.close()
|
||||||
|
self._real_socket = None
|
||||||
|
self.close()
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
HTTPResponse.close(self)
|
HTTPResponse.close(self)
|
||||||
self.sock = None
|
self.sock = None
|
||||||
|
self._real_socket = None
|
||||||
|
|
||||||
|
|
||||||
class BufferedHTTPConnection(HTTPConnection):
|
class BufferedHTTPConnection(HTTPConnection):
|
||||||
|
@ -987,17 +987,17 @@ class Controller(object):
|
|||||||
:param src: the response from the backend
|
:param src: the response from the backend
|
||||||
"""
|
"""
|
||||||
try:
|
try:
|
||||||
src.swift_conn.close()
|
# Since the backends set "Connection: close" in their response
|
||||||
except Exception:
|
# headers, the response object (src) is solely responsible for the
|
||||||
pass
|
# socket. The connection object (src.swift_conn) has no references
|
||||||
src.swift_conn = None
|
# to the socket, so calling its close() method does nothing, and
|
||||||
try:
|
# therefore we don't do it.
|
||||||
while src.read(self.app.object_chunk_size):
|
#
|
||||||
pass
|
# Also, since calling the response's close() method might not
|
||||||
except Exception:
|
# close the underlying socket but only decrement some
|
||||||
pass
|
# reference-counter, we have a special method here that really,
|
||||||
try:
|
# really kills the underlying socket with a close() syscall.
|
||||||
src.close()
|
src.nuke_from_orbit() # it's the only way to be sure
|
||||||
except Exception:
|
except Exception:
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@ -20,7 +20,6 @@ import os
|
|||||||
import sys
|
import sys
|
||||||
import unittest
|
import unittest
|
||||||
import urlparse
|
import urlparse
|
||||||
import signal
|
|
||||||
from contextlib import contextmanager, nested, closing
|
from contextlib import contextmanager, nested, closing
|
||||||
from gzip import GzipFile
|
from gzip import GzipFile
|
||||||
from shutil import rmtree
|
from shutil import rmtree
|
||||||
@ -681,41 +680,31 @@ class TestObjectController(unittest.TestCase):
|
|||||||
self.assertEquals(res.status_int, expected)
|
self.assertEquals(res.status_int, expected)
|
||||||
|
|
||||||
def test_GET_newest_large_file(self):
|
def test_GET_newest_large_file(self):
|
||||||
calls = [0]
|
prolis = _test_sockets[0]
|
||||||
|
prosrv = _test_servers[0]
|
||||||
def handler(_junk1, _junk2):
|
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
||||||
calls[0] += 1
|
fd = sock.makefile()
|
||||||
|
obj = 'a' * (1024 * 1024)
|
||||||
old_handler = signal.signal(signal.SIGPIPE, handler)
|
path = '/v1/a/c/o.large'
|
||||||
try:
|
fd.write('PUT %s HTTP/1.1\r\n'
|
||||||
prolis = _test_sockets[0]
|
'Host: localhost\r\n'
|
||||||
prosrv = _test_servers[0]
|
'Connection: close\r\n'
|
||||||
sock = connect_tcp(('localhost', prolis.getsockname()[1]))
|
'X-Storage-Token: t\r\n'
|
||||||
fd = sock.makefile()
|
'Content-Length: %s\r\n'
|
||||||
obj = 'a' * (1024 * 1024)
|
'Content-Type: application/octet-stream\r\n'
|
||||||
path = '/v1/a/c/o.large'
|
'\r\n%s' % (path, str(len(obj)), obj))
|
||||||
fd.write('PUT %s HTTP/1.1\r\n'
|
fd.flush()
|
||||||
'Host: localhost\r\n'
|
headers = readuntil2crlfs(fd)
|
||||||
'Connection: close\r\n'
|
exp = 'HTTP/1.1 201'
|
||||||
'X-Storage-Token: t\r\n'
|
self.assertEqual(headers[:len(exp)], exp)
|
||||||
'Content-Length: %s\r\n'
|
req = Request.blank(path,
|
||||||
'Content-Type: application/octet-stream\r\n'
|
environ={'REQUEST_METHOD': 'GET'},
|
||||||
'\r\n%s' % (path, str(len(obj)), obj))
|
headers={'Content-Type':
|
||||||
fd.flush()
|
'application/octet-stream',
|
||||||
headers = readuntil2crlfs(fd)
|
'X-Newest': 'true'})
|
||||||
exp = 'HTTP/1.1 201'
|
res = req.get_response(prosrv)
|
||||||
self.assertEqual(headers[:len(exp)], exp)
|
self.assertEqual(res.status_int, 200)
|
||||||
req = Request.blank(path,
|
self.assertEqual(res.body, obj)
|
||||||
environ={'REQUEST_METHOD': 'GET'},
|
|
||||||
headers={'Content-Type':
|
|
||||||
'application/octet-stream',
|
|
||||||
'X-Newest': 'true'})
|
|
||||||
res = req.get_response(prosrv)
|
|
||||||
self.assertEqual(res.status_int, 200)
|
|
||||||
self.assertEqual(res.body, obj)
|
|
||||||
self.assertEqual(calls[0], 0)
|
|
||||||
finally:
|
|
||||||
signal.signal(signal.SIGPIPE, old_handler)
|
|
||||||
|
|
||||||
def test_PUT_expect_header_zero_content_length(self):
|
def test_PUT_expect_header_zero_content_length(self):
|
||||||
test_errors = []
|
test_errors = []
|
||||||
|
Loading…
x
Reference in New Issue
Block a user