Merge "ssync-receiver: terminate session if subreq read times out"
This commit is contained in:
@@ -16,7 +16,7 @@
|
||||
|
||||
import eventlet.greenio
|
||||
import eventlet.wsgi
|
||||
from eventlet import sleep
|
||||
from eventlet import sleep, Timeout
|
||||
import urllib
|
||||
|
||||
from swift.common import exceptions
|
||||
@@ -104,6 +104,93 @@ def encode_wanted(remote, local):
|
||||
return None
|
||||
|
||||
|
||||
class SsyncInputProxy:
|
||||
"""
|
||||
Wraps a wsgi input to provide ssync specific read methods.
|
||||
|
||||
If any exception or timeout is raised while reading from the input then
|
||||
subsequent calls will raise the same exception. Callers are thereby
|
||||
prevented from reading the input after it has raised an exception, when its
|
||||
state may be uncertain. This enables the input to be safely shared by
|
||||
multiple callers (typically an ssync Receiver and an ObjectController) who
|
||||
may otherwise each be unaware that the other has encountered an exception.
|
||||
|
||||
:param wsgi_input: a wsgi input
|
||||
:param chunk_size: the number of bytes to read at a time
|
||||
:param timeout: the timeout in seconds applied to each read
|
||||
"""
|
||||
def __init__(self, wsgi_input, chunk_size, timeout):
|
||||
self.wsgi_input = wsgi_input
|
||||
self.chunk_size = chunk_size
|
||||
self.timeout = timeout
|
||||
self.exception = None
|
||||
|
||||
def read_line(self, context):
|
||||
"""
|
||||
Try to read a line from the wsgi input; annotate any timeout or read
|
||||
errors with a description of the calling context.
|
||||
|
||||
:param context: string to annotate any exception raised
|
||||
"""
|
||||
if self.exception:
|
||||
raise self.exception
|
||||
try:
|
||||
try:
|
||||
with exceptions.MessageTimeout(self.timeout, context):
|
||||
line = self.wsgi_input.readline(self.chunk_size)
|
||||
except (eventlet.wsgi.ChunkReadError, IOError) as err:
|
||||
raise exceptions.ChunkReadError('%s: %s' % (context, err))
|
||||
except (Exception, Timeout) as err:
|
||||
self.exception = err
|
||||
raise
|
||||
|
||||
if line and not line.endswith(b'\n'):
|
||||
# Everywhere we would call readline, we should always get
|
||||
# a clean end-of-line as we should be reading
|
||||
# SSYNC-specific messages or HTTP request lines/headers.
|
||||
# If we didn't, it indicates that the wsgi input readline reached a
|
||||
# valid end of chunked body without finding a newline.
|
||||
raise exceptions.ChunkReadError(
|
||||
'%s: %s' % (context, 'missing newline'))
|
||||
|
||||
return line
|
||||
|
||||
def _read_chunk(self, context, size):
|
||||
if self.exception:
|
||||
raise self.exception
|
||||
try:
|
||||
try:
|
||||
with exceptions.MessageTimeout(self.timeout, context):
|
||||
chunk = self.wsgi_input.read(size)
|
||||
except (eventlet.wsgi.ChunkReadError, IOError) as err:
|
||||
raise exceptions.ChunkReadError('%s: %s' % (context, err))
|
||||
if not chunk:
|
||||
raise exceptions.ChunkReadError(
|
||||
'Early termination for %s' % context)
|
||||
except (Exception, Timeout) as err:
|
||||
self.exception = err
|
||||
raise
|
||||
return chunk
|
||||
|
||||
def make_subreq_input(self, context, content_length):
|
||||
"""
|
||||
Returns a wsgi input that will read up to the given ``content-length``
|
||||
from the wrapped wsgi input.
|
||||
|
||||
:param context: string to annotate any exception raised
|
||||
:param content_length: maximum number of bytes to read
|
||||
"""
|
||||
def subreq_iter():
|
||||
bytes_left = content_length
|
||||
while bytes_left > 0:
|
||||
size = min(bytes_left, self.chunk_size)
|
||||
chunk = self._read_chunk(context, size)
|
||||
bytes_left -= len(chunk)
|
||||
yield chunk
|
||||
|
||||
return utils.FileLikeIter(subreq_iter())
|
||||
|
||||
|
||||
class Receiver(object):
|
||||
"""
|
||||
Handles incoming SSYNC requests to the object server.
|
||||
@@ -142,7 +229,7 @@ class Receiver(object):
|
||||
self.request = request
|
||||
self.device = None
|
||||
self.partition = None
|
||||
self.fp = None
|
||||
self.input = None
|
||||
# We default to dropping the connection in case there is any exception
|
||||
# raised during processing because otherwise the sender could send for
|
||||
# quite some time before realizing it was all in vain.
|
||||
@@ -210,6 +297,8 @@ class Receiver(object):
|
||||
'%s/%s/%s read failed in ssync.Receiver: %s' % (
|
||||
self.request.remote_addr, self.device, self.partition,
|
||||
err))
|
||||
# Since the client (presumably) hung up, no point in trying to
|
||||
# send anything about the error
|
||||
except swob.HTTPException as err:
|
||||
body = b''.join(err({}, lambda *args: None))
|
||||
yield (':ERROR: %d %r\n' % (
|
||||
@@ -260,18 +349,9 @@ class Receiver(object):
|
||||
self.diskfile_mgr = self.app._diskfile_router[self.policy]
|
||||
if not self.diskfile_mgr.get_dev_path(self.device):
|
||||
raise swob.HTTPInsufficientStorage(drive=self.device)
|
||||
self.fp = self.request.environ['wsgi.input']
|
||||
|
||||
def _readline(self, context):
|
||||
# try to read a line from the wsgi input; annotate any timeout or read
|
||||
# errors with a description of the calling context
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout, context):
|
||||
try:
|
||||
line = self.fp.readline(self.app.network_chunk_size)
|
||||
except (eventlet.wsgi.ChunkReadError, IOError) as err:
|
||||
raise exceptions.ChunkReadError('%s: %s' % (context, err))
|
||||
return line
|
||||
self.input = SsyncInputProxy(self.request.environ['wsgi.input'],
|
||||
self.app.network_chunk_size,
|
||||
self.app.client_timeout)
|
||||
|
||||
def _check_local(self, remote, make_durable=True):
|
||||
"""
|
||||
@@ -382,7 +462,7 @@ class Receiver(object):
|
||||
have to read while it writes to ensure network buffers don't
|
||||
fill up and block everything.
|
||||
"""
|
||||
line = self._readline('missing_check start')
|
||||
line = self.input.read_line('missing_check start')
|
||||
if not line:
|
||||
# Guess they hung up
|
||||
raise SsyncClientDisconnected
|
||||
@@ -393,7 +473,7 @@ class Receiver(object):
|
||||
object_hashes = []
|
||||
nlines = 0
|
||||
while True:
|
||||
line = self._readline('missing_check line')
|
||||
line = self.input.read_line('missing_check line')
|
||||
if not line or line.strip() == b':MISSING_CHECK: END':
|
||||
break
|
||||
want = self._check_missing(line)
|
||||
@@ -446,7 +526,7 @@ class Receiver(object):
|
||||
success. This is so the sender knows if it can remove an out
|
||||
of place partition, for example.
|
||||
"""
|
||||
line = self._readline('updates start')
|
||||
line = self.input.read_line('updates start')
|
||||
if not line:
|
||||
# Guess they hung up waiting for us to process the missing check
|
||||
raise SsyncClientDisconnected
|
||||
@@ -457,11 +537,12 @@ class Receiver(object):
|
||||
failures = 0
|
||||
updates = 0
|
||||
while True:
|
||||
line = self._readline('updates line')
|
||||
line = self.input.read_line('updates line')
|
||||
if not line or line.strip() == b':UPDATES: END':
|
||||
break
|
||||
# Read first line METHOD PATH of subrequest.
|
||||
method, path = swob.bytes_to_wsgi(line.strip()).split(' ', 1)
|
||||
context = swob.bytes_to_wsgi(line.strip())
|
||||
method, path = context.split(' ', 1)
|
||||
subreq = swob.Request.blank(
|
||||
'/%s/%s%s' % (self.device, self.partition, path),
|
||||
environ={'REQUEST_METHOD': method})
|
||||
@@ -469,10 +550,9 @@ class Receiver(object):
|
||||
content_length = None
|
||||
replication_headers = []
|
||||
while True:
|
||||
line = self._readline('updates line')
|
||||
line = self.input.read_line('updates line')
|
||||
if not line:
|
||||
raise Exception(
|
||||
'Got no headers for %s %s' % (method, path))
|
||||
raise Exception('Got no headers for %s' % context)
|
||||
line = line.strip()
|
||||
if not line:
|
||||
break
|
||||
@@ -500,24 +580,9 @@ class Receiver(object):
|
||||
% (method, path))
|
||||
elif method == 'PUT':
|
||||
if content_length is None:
|
||||
raise Exception(
|
||||
'No content-length sent for %s %s' % (method, path))
|
||||
|
||||
def subreq_iter():
|
||||
left = content_length
|
||||
while left > 0:
|
||||
with exceptions.MessageTimeout(
|
||||
self.app.client_timeout,
|
||||
'updates content'):
|
||||
chunk = self.fp.read(
|
||||
min(left, self.app.network_chunk_size))
|
||||
if not chunk:
|
||||
raise exceptions.ChunkReadError(
|
||||
'Early termination for %s %s' % (method, path))
|
||||
left -= len(chunk)
|
||||
yield chunk
|
||||
subreq.environ['wsgi.input'] = utils.FileLikeIter(
|
||||
subreq_iter())
|
||||
raise Exception('No content-length sent for %s' % context)
|
||||
subreq.environ['wsgi.input'] = self.input.make_subreq_input(
|
||||
context, content_length)
|
||||
else:
|
||||
raise Exception('Invalid subrequest method %s' % method)
|
||||
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
|
||||
@@ -535,8 +600,8 @@ class Receiver(object):
|
||||
successes += 1
|
||||
else:
|
||||
self.app.logger.warning(
|
||||
'ssync subrequest failed with %s: %s %s (%s)' %
|
||||
(resp.status_int, method, subreq.path, resp.body))
|
||||
'ssync subrequest failed with %s: %s (%s)' %
|
||||
(resp.status_int, context, resp.body))
|
||||
failures += 1
|
||||
if failures >= self.app.replication_failure_threshold and (
|
||||
not successes or
|
||||
@@ -546,8 +611,8 @@ class Receiver(object):
|
||||
'Too many %d failures to %d successes' %
|
||||
(failures, successes))
|
||||
# The subreq may have failed, but we want to read the rest of the
|
||||
# body from the remote side so we can continue on with the next
|
||||
# subreq.
|
||||
# body from the remote side so we can either detect a broken input
|
||||
# or continue on with the next subreq.
|
||||
for junk in subreq.environ['wsgi.input']:
|
||||
pass
|
||||
if updates % 5 == 0:
|
||||
|
||||
@@ -991,18 +991,22 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
pass # expected outcome
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
tx_error_log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Sent data length does not match content-length',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
tx_error_log_lines[0])
|
||||
self.assertFalse(tx_error_log_lines[1:])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(log_lines), self.rx_logger.all_log_lines())
|
||||
eventlet.sleep(0.001)
|
||||
rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(rx_warning_log_lines),
|
||||
self.rx_logger.all_log_lines())
|
||||
self.assertIn('ssync subrequest failed with 499',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
rx_warning_log_lines[0])
|
||||
self.assertFalse(rx_warning_log_lines[1:])
|
||||
rx_error_lines = self.rx_logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(rx_error_lines), rx_error_lines)
|
||||
self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early '
|
||||
'termination for PUT', rx_error_lines[0])
|
||||
|
||||
def test_sync_reconstructor_no_rebuilt_content(self):
|
||||
# First fragment to sync gets no content in any response to
|
||||
@@ -1026,17 +1030,20 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
pass # expected outcome
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
tx_error_log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Sent data length does not match content-length',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
tx_error_log_lines[0])
|
||||
self.assertFalse(tx_error_log_lines[1:])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
eventlet.sleep(0.001)
|
||||
rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertIn('ssync subrequest failed with 499',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
rx_warning_log_lines[0])
|
||||
self.assertFalse(rx_warning_log_lines[1:])
|
||||
rx_error_lines = self.rx_logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(rx_error_lines), rx_error_lines)
|
||||
self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early '
|
||||
'termination for PUT', rx_error_lines[0])
|
||||
|
||||
def test_sync_reconstructor_exception_during_rebuild(self):
|
||||
# First fragment to sync has some reconstructor get responses raise
|
||||
@@ -1071,18 +1078,21 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
if msgs:
|
||||
self.fail('Failed with:\n%s' % '\n'.join(msgs))
|
||||
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error trying to rebuild', log_lines[0])
|
||||
tx_error_log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Error trying to rebuild', tx_error_log_lines[0])
|
||||
self.assertIn('Sent data length does not match content-length',
|
||||
log_lines[1])
|
||||
self.assertFalse(log_lines[2:])
|
||||
tx_error_log_lines[1])
|
||||
self.assertFalse(tx_error_log_lines[2:])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
eventlet.sleep(0.001)
|
||||
rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning')
|
||||
self.assertIn('ssync subrequest failed with 499',
|
||||
log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
rx_warning_log_lines[0])
|
||||
self.assertFalse(rx_warning_log_lines[1:])
|
||||
rx_error_lines = self.rx_logger.get_lines_for_level('error')
|
||||
self.assertEqual(1, len(rx_error_lines), rx_error_lines)
|
||||
self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early '
|
||||
'termination for PUT', rx_error_lines[0])
|
||||
|
||||
def test_sync_reconstructor_no_responses(self):
|
||||
# First fragment to sync gets no responses for reconstructor to rebuild
|
||||
@@ -1131,7 +1141,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertIn('Unable to get enough responses', log_lines[0])
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
eventlet.sleep(0.001)
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
@@ -1234,7 +1244,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
fd.read())
|
||||
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
eventlet.sleep(0.001)
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
@@ -1270,7 +1280,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
self.assertFalse(
|
||||
self.logger.get_lines_for_level('error'))
|
||||
# trampoline for the receiver to write a log
|
||||
eventlet.sleep(0)
|
||||
eventlet.sleep(0.001)
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(self.rx_logger.get_lines_for_level('error'))
|
||||
|
||||
|
||||
@@ -20,19 +20,24 @@ import tempfile
|
||||
import unittest
|
||||
|
||||
import eventlet
|
||||
import eventlet.wsgi
|
||||
from unittest import mock
|
||||
import itertools
|
||||
|
||||
from swift.common import bufferedhttp
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common.exceptions import MessageTimeout, ChunkReadError
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common import utils
|
||||
from swift.common.swob import HTTPException
|
||||
from swift.common.swob import HTTPException, HTTPCreated, Request, \
|
||||
HTTPNoContent
|
||||
from swift.common.utils import public
|
||||
from swift.obj import diskfile
|
||||
from swift.obj import server
|
||||
from swift.obj import ssync_receiver, ssync_sender
|
||||
from swift.obj.reconstructor import ObjectReconstructor
|
||||
from swift.obj.ssync_receiver import SsyncInputProxy
|
||||
|
||||
from test import listen_zero, unit
|
||||
from test.debug_logger import debug_logger
|
||||
@@ -44,6 +49,70 @@ from test.unit.obj.common import write_diskfile
|
||||
UNPACK_ERR = b":ERROR: 0 'not enough values to unpack (expected 2, got 1)'"
|
||||
|
||||
|
||||
class FakeController(server.ObjectController):
|
||||
def __init__(self, conf, logger=None):
|
||||
super().__init__(conf, logger)
|
||||
self.requests = []
|
||||
|
||||
def __call__(self, environ, start_response):
|
||||
self.requests.append(Request(environ))
|
||||
return super().__call__(environ, start_response)
|
||||
|
||||
@public
|
||||
def PUT(self, req):
|
||||
b''.join(req.environ['wsgi.input'])
|
||||
return HTTPCreated()
|
||||
|
||||
@public
|
||||
def DELETE(self, req):
|
||||
b''.join(req.environ['wsgi.input'])
|
||||
return HTTPNoContent()
|
||||
|
||||
|
||||
class SlowBytesIO(io.BytesIO):
|
||||
"""
|
||||
A BytesIO that will sleep once for sleep_time before reading the byte at
|
||||
sleep_index. If a read or readline call is completed by the byte at
|
||||
(sleep_index - 1) then the call returns without sleeping, and the sleep
|
||||
will occur at the start of the next read or readline call.
|
||||
"""
|
||||
def __init__(self, value, sleep_index=-1, sleep_time=0.1):
|
||||
io.BytesIO.__init__(self, value)
|
||||
self.sleep_index = sleep_index
|
||||
self.sleep_time = sleep_time
|
||||
self.bytes_read = []
|
||||
self.num_bytes_read = 0
|
||||
|
||||
def _read(self, size=-1, readline=False):
|
||||
size = -1 if size is None else size
|
||||
num_read = 0
|
||||
data = b''
|
||||
self.bytes_read.append(data)
|
||||
while True:
|
||||
if self.num_bytes_read == self.sleep_index:
|
||||
self.sleep_index = -1
|
||||
eventlet.sleep(self.sleep_time)
|
||||
next_byte = io.BytesIO.read(self, 1)
|
||||
data = data + next_byte
|
||||
self.bytes_read[-1] = data
|
||||
num_read += 1
|
||||
self.num_bytes_read += 1
|
||||
if len(data) < num_read:
|
||||
break
|
||||
if readline and data[-1:] == b'\n':
|
||||
break
|
||||
if 0 <= size <= num_read:
|
||||
break
|
||||
|
||||
return data
|
||||
|
||||
def read(self, size=-1):
|
||||
return self._read(size, False)
|
||||
|
||||
def readline(self, size=-1):
|
||||
return self._read(size, True)
|
||||
|
||||
|
||||
@unit.patch_policies()
|
||||
class TestReceiver(unittest.TestCase):
|
||||
|
||||
@@ -498,7 +567,7 @@ class TestReceiver(unittest.TestCase):
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\nBad content is here')
|
||||
':UPDATES: START\r\nBad content is here\n')
|
||||
req.remote_addr = '1.2.3.4'
|
||||
mock_wsgi_input = _Wrapper(req.body)
|
||||
req.environ['wsgi.input'] = mock_wsgi_input
|
||||
@@ -533,7 +602,7 @@ class TestReceiver(unittest.TestCase):
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\nBad content is here')
|
||||
':UPDATES: START\r\nBad content is here\n')
|
||||
req.remote_addr = mock.MagicMock()
|
||||
req.remote_addr.__str__ = mock.Mock(
|
||||
side_effect=Exception("can't stringify this"))
|
||||
@@ -633,6 +702,22 @@ class TestReceiver(unittest.TestCase):
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'3.4.5.6/sda1/1 EXCEPTION in ssync.Receiver')
|
||||
|
||||
def test_MISSING_CHECK_partial_line(self):
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
# not sure this would ever be yielded by the wsgi input since the
|
||||
# bytes read wouldn't match the chunk size that was sent
|
||||
body=':MISSING_CHECK: START\r\nhash no_newline'
|
||||
)
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertFalse(self.body_lines(resp.body))
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
['None/sda1/1 read failed in ssync.Receiver: missing_check line: '
|
||||
'missing newline'], lines)
|
||||
|
||||
def test_MISSING_CHECK_empty_list(self):
|
||||
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@@ -1308,6 +1393,133 @@ class TestReceiver(unittest.TestCase):
|
||||
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
|
||||
'0.01 seconds: updates line')
|
||||
|
||||
def test_UPDATES_timeout_reading_PUT_subreq_input_1(self):
|
||||
# timeout reading from wsgi input part way through a PUT subreq body
|
||||
body_chunks = [
|
||||
':MISSING_CHECK: START\r\n',
|
||||
':MISSING_CHECK: END\r\n',
|
||||
':UPDATES: START\r\n',
|
||||
'PUT /a/c/o\r\nContent-Length: 28\r\n\r\n',
|
||||
'body_chunk_one',
|
||||
'body_chunk_two',
|
||||
':UPDATES: END\r\n',
|
||||
''
|
||||
]
|
||||
chunked_body = ''.join([
|
||||
'%x\r\n%s\r\n' % (len(line), line) for line in body_chunks
|
||||
])
|
||||
req = swob.Request.blank(
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
body=chunked_body)
|
||||
req.remote_addr = '2.3.4.5'
|
||||
slow_down_index = chunked_body.find('chunk_one')
|
||||
slow_io = SlowBytesIO(req.body, sleep_index=slow_down_index)
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=slow_io, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
req.environ['wsgi.input'] = wsgi_input
|
||||
controller = FakeController(self.conf, logger=self.logger)
|
||||
controller.client_timeout = 0.01
|
||||
with mock.patch.object(
|
||||
ssync_receiver.eventlet.greenio, 'shutdown_safe') as \
|
||||
mock_shutdown_safe:
|
||||
resp = req.get_response(controller)
|
||||
resp_body_lines = self.body_lines(resp.body)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(
|
||||
[b':MISSING_CHECK: START',
|
||||
b':MISSING_CHECK: END',
|
||||
b":ERROR: 408 '0.01 seconds: PUT /a/c/o'"], resp_body_lines)
|
||||
self.assertEqual([
|
||||
b'17\r\n',
|
||||
b':MISSING_CHECK: START\r\n',
|
||||
b'\r\n',
|
||||
b'15\r\n',
|
||||
b':MISSING_CHECK: END\r\n',
|
||||
b'\r\n',
|
||||
b'11\r\n',
|
||||
b':UPDATES: START\r\n',
|
||||
b'\r\n',
|
||||
b'22\r\n',
|
||||
b'PUT /a/c/o\r\n',
|
||||
b'Content-Length: 28\r\n',
|
||||
b'\r\n',
|
||||
b'\r\n',
|
||||
b'e\r\n',
|
||||
b'body_',
|
||||
], slow_io.bytes_read)
|
||||
# oops,the subreq body was not drained
|
||||
self.assertEqual(
|
||||
b'chunk_one\r\ne\r\nbody_chunk_two\r\n'
|
||||
b'f\r\n:UPDATES: END\r\n\r\n'
|
||||
b'0\r\n\r\n', slow_io.read())
|
||||
mock_shutdown_safe.assert_called_once_with(
|
||||
wsgi_input.get_socket())
|
||||
self.assertTrue(wsgi_input.get_socket().closed)
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
['ERROR __call__ error with PUT /device/partition/a/c/o : '
|
||||
'MessageTimeout (0.01s) PUT /a/c/o',
|
||||
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
|
||||
'0.01 seconds: PUT /a/c/o'],
|
||||
log_lines)
|
||||
|
||||
def test_UPDATES_timeout_reading_PUT_subreq_input_2(self):
|
||||
# timeout immediately before reading PUT subreq chunk content
|
||||
body_chunks = [
|
||||
':MISSING_CHECK: START\r\n',
|
||||
':MISSING_CHECK: END\r\n',
|
||||
':UPDATES: START\r\n',
|
||||
'PUT /a/c/o\r\nContent-Length: 99\r\n\r\n',
|
||||
'first body chunk',
|
||||
# NB: this is still the PUT subreq body, it just happens to look
|
||||
# like the start of another subreq...
|
||||
'DELETE /in/second/body chunk\r\n'
|
||||
'X-Timestamp: 123456789.12345\r\nContent-Length: 0\r\n\r\n',
|
||||
':UPDATES: END\r\n',
|
||||
]
|
||||
chunked_body = ''.join([
|
||||
'%x\r\n%s\r\n' % (len(line), line) for line in body_chunks
|
||||
])
|
||||
req = swob.Request.blank(
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
body=chunked_body)
|
||||
req.remote_addr = '2.3.4.5'
|
||||
slow_down_index = chunked_body.find('DELETE /in/second/body chunk')
|
||||
slow_io = SlowBytesIO(req.body, sleep_index=slow_down_index)
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=slow_io, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
req.environ['wsgi.input'] = wsgi_input
|
||||
controller = FakeController(self.conf, logger=self.logger)
|
||||
controller.client_timeout = 0.01
|
||||
with mock.patch.object(
|
||||
ssync_receiver.eventlet.greenio, 'shutdown_safe') as \
|
||||
mock_shutdown_safe:
|
||||
resp = req.get_response(controller)
|
||||
resp_body_lines = self.body_lines(resp.body)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(['SSYNC', 'PUT'],
|
||||
[req.method for req in controller.requests])
|
||||
self.assertEqual(chunked_body.encode('utf-8')[:slow_down_index],
|
||||
b''.join(slow_io.bytes_read))
|
||||
self.assertEqual([
|
||||
b':MISSING_CHECK: START',
|
||||
b':MISSING_CHECK: END',
|
||||
b":ERROR: 408 '0.01 seconds: PUT /a/c/o'"], resp_body_lines)
|
||||
mock_shutdown_safe.assert_called_once_with(
|
||||
wsgi_input.get_socket())
|
||||
self.assertTrue(wsgi_input.get_socket().closed)
|
||||
log_lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
['ERROR __call__ error with PUT /device/partition/a/c/o : '
|
||||
'MessageTimeout (0.01s) PUT /a/c/o',
|
||||
'2.3.4.5/device/partition TIMEOUT in ssync.Receiver: '
|
||||
'0.01 seconds: PUT /a/c/o'],
|
||||
log_lines)
|
||||
|
||||
def test_UPDATES_other_exception(self):
|
||||
|
||||
class _Wrapper(io.BytesIO):
|
||||
@@ -1391,8 +1603,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertFalse(mock_shutdown_safe.called)
|
||||
self.assertFalse(mock_wsgi_input.mock_socket.close.called)
|
||||
|
||||
def test_UPDATES_bad_subrequest_line(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
def test_UPDATES_bad_subrequest_line_1(self):
|
||||
req = swob.Request.blank(
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
@@ -1405,13 +1616,16 @@ class TestReceiver(unittest.TestCase):
|
||||
[b':MISSING_CHECK: START', b':MISSING_CHECK: END',
|
||||
UNPACK_ERR])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
['None/device/partition EXCEPTION in ssync.Receiver: '], lines)
|
||||
|
||||
def test_UPDATES_bad_subrequest_line_2(self):
|
||||
# If there's no line feed, we probably read a partial buffer
|
||||
# because the client hung up
|
||||
with mock.patch.object(
|
||||
self.controller, 'DELETE',
|
||||
return_value=swob.HTTPNoContent()):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
req = swob.Request.blank(
|
||||
'/device/partition',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
@@ -1424,11 +1638,14 @@ class TestReceiver(unittest.TestCase):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[b':MISSING_CHECK: START', b':MISSING_CHECK: END',
|
||||
UNPACK_ERR])
|
||||
[b':MISSING_CHECK: START', b':MISSING_CHECK: END'])
|
||||
# Since the client (presumably) hung up, no point in sending
|
||||
# anything about the error
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.controller.logger.exception.assert_called_once_with(
|
||||
'None/device/partition EXCEPTION in ssync.Receiver')
|
||||
lines = self.logger.get_lines_for_level('error')
|
||||
self.assertEqual(
|
||||
['None/device/partition read failed in ssync.Receiver: '
|
||||
'updates line: missing newline'], lines)
|
||||
|
||||
def test_UPDATES_no_headers(self):
|
||||
self.controller.logger = mock.MagicMock()
|
||||
@@ -2675,5 +2892,162 @@ class TestModuleMethods(unittest.TestCase):
|
||||
expected)
|
||||
|
||||
|
||||
class TestSsyncInputProxy(unittest.TestCase):
|
||||
def test_read_line(self):
|
||||
body = io.BytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n'
|
||||
b'10\r\nDELETE /a/c/o1\r\n\r\n'
|
||||
b'13\r\nDELETE /a/c/oh my\r\n\r\n')
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=60)
|
||||
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
|
||||
self.assertEqual(b'DELETE /a/c/o1\r\n', inpt.read_line('ctxt'))
|
||||
self.assertEqual(b'DELETE /a/c/oh my\r\n', inpt.read_line('ctxt'))
|
||||
|
||||
def test_read_line_timeout(self):
|
||||
body = SlowBytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n'
|
||||
b'10\r\nDELETE /a/c/o1\r\n\r\n',
|
||||
# timeout reading second line...
|
||||
sleep_index=23)
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01)
|
||||
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
|
||||
with self.assertRaises(MessageTimeout) as cm:
|
||||
inpt.read_line('ctxt')
|
||||
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
|
||||
# repeat
|
||||
with self.assertRaises(MessageTimeout) as cm:
|
||||
inpt.read_line('ctxt')
|
||||
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
|
||||
# check subreq input will also fail
|
||||
sub_input = inpt.make_subreq_input('ctxt2', 123)
|
||||
with self.assertRaises(MessageTimeout) as cm:
|
||||
sub_input.read()
|
||||
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
|
||||
|
||||
def test_read_line_chunk_read_error(self):
|
||||
body = SlowBytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n'
|
||||
# bad chunk length...
|
||||
b'x\r\nDELETE /a/c/o1\r\n\r\n',
|
||||
sleep_index=23)
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01)
|
||||
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
|
||||
with self.assertRaises(ChunkReadError) as cm:
|
||||
inpt.read_line('ctxt')
|
||||
self.assertEqual(
|
||||
"ctxt: invalid literal for int() with base 16: b'x\\r\\n'",
|
||||
str(cm.exception))
|
||||
# repeat
|
||||
with self.assertRaises(ChunkReadError) as cm:
|
||||
inpt.read_line('ctxt')
|
||||
self.assertEqual(
|
||||
"ctxt: invalid literal for int() with base 16: b'x\\r\\n'",
|
||||
str(cm.exception))
|
||||
# check subreq input will also fail
|
||||
sub_input = inpt.make_subreq_input('ctxt2', 123)
|
||||
with self.assertRaises(ChunkReadError) as cm:
|
||||
sub_input.read()
|
||||
self.assertEqual(
|
||||
"ctxt: invalid literal for int() with base 16: b'x\\r\\n'",
|
||||
str(cm.exception))
|
||||
|
||||
def test_read_line_protocol_error(self):
|
||||
body = io.BytesIO(
|
||||
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
b'15\r\n:MISSING_CHECK: END\r\n\r\n'
|
||||
b'11\r\n:UPDATES: START\r\n\r\n'
|
||||
b'd\r\n:UPDATES: END\r\n' # note: chunk is missing its newline
|
||||
b'0\r\n\r\n'
|
||||
)
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01)
|
||||
self.assertEqual(b':MISSING_CHECK: START\r\n', inpt.read_line('ctxt'))
|
||||
self.assertEqual(b':MISSING_CHECK: END\r\n', inpt.read_line('ctxt'))
|
||||
self.assertEqual(b':UPDATES: START\r\n', inpt.read_line('ctxt'))
|
||||
with self.assertRaises(ChunkReadError) as cm:
|
||||
inpt.read_line('ctxt')
|
||||
self.assertEqual('ctxt: missing newline', str(cm.exception))
|
||||
|
||||
def test_subreq_input(self):
|
||||
body = io.BytesIO(b'1a\r\nchunk1 \r\n'
|
||||
b'1b\r\nchunktwo \r\n'
|
||||
b'1c\r\nchunkthree \r\n'
|
||||
b'f\r\nDELETE /a/c/o\r\n\r\n')
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
|
||||
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
|
||||
self.assertEqual(b'chunk1 '
|
||||
b'chunktwo '
|
||||
b'chunkthree ',
|
||||
sub_input.read())
|
||||
# check next read_line (note: chunk_size needs to be big enough to read
|
||||
# whole ssync protocol 'line'
|
||||
self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt'))
|
||||
|
||||
def test_subreq_input_content_length_less_than_body(self):
|
||||
body = io.BytesIO(b'1a\r\nchunk1 \r\n'
|
||||
b'1b\r\nchunktwo \r\n')
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
|
||||
sub_input = inpt.make_subreq_input('ctxt', content_length=3)
|
||||
self.assertEqual(b'chu', sub_input.read())
|
||||
|
||||
def test_subreq_input_content_length_more_than_body(self):
|
||||
body = io.BytesIO(b'1a\r\nchunk1 \r\n')
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
|
||||
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
|
||||
with self.assertRaises(ChunkReadError) as cm:
|
||||
sub_input.read()
|
||||
self.assertEqual("ctxt: invalid literal for int() with base 16: b''",
|
||||
str(cm.exception))
|
||||
|
||||
def test_subreq_input_early_termination(self):
|
||||
body = io.BytesIO(b'1a\r\nchunk1 \r\n'
|
||||
b'0\r\n\r\n') # the sender disconnected
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60)
|
||||
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
|
||||
with self.assertRaises(ChunkReadError) as cm:
|
||||
sub_input.read()
|
||||
self.assertEqual('Early termination for ctxt', str(cm.exception))
|
||||
|
||||
def test_subreq_input_timeout(self):
|
||||
body = SlowBytesIO(b'1a\r\nchunk1 \r\n'
|
||||
b'1b\r\nchunktwo \r\n',
|
||||
sleep_index=25)
|
||||
wsgi_input = eventlet.wsgi.Input(
|
||||
rfile=body, content_length=123, sock=mock.MagicMock(),
|
||||
chunked_input=True)
|
||||
inpt = SsyncInputProxy(wsgi_input, chunk_size=16, timeout=0.01)
|
||||
sub_input = inpt.make_subreq_input('ctxt', content_length=81)
|
||||
self.assertEqual(b'chunk1 ', sub_input.read(16))
|
||||
with self.assertRaises(MessageTimeout) as cm:
|
||||
sub_input.read()
|
||||
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
|
||||
# repeat
|
||||
self.assertEqual(b'', sub_input.read())
|
||||
# check next read_line
|
||||
with self.assertRaises(MessageTimeout) as cm:
|
||||
inpt.read_line('ctxt2')
|
||||
self.assertEqual('0.01 seconds: ctxt', str(cm.exception))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
@@ -82,12 +82,15 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse):
|
||||
|
||||
class FakeConnection(object):
|
||||
|
||||
def __init__(self):
|
||||
def __init__(self, sleeps=None):
|
||||
self.sleeps = sleeps
|
||||
self.sent = []
|
||||
self.closed = False
|
||||
|
||||
def send(self, data):
|
||||
self.sent.append(data)
|
||||
if self.sleeps:
|
||||
eventlet.sleep(self.sleeps.pop(0))
|
||||
|
||||
def close(self):
|
||||
self.closed = True
|
||||
@@ -791,18 +794,16 @@ class TestSender(BaseTest):
|
||||
self.assertEqual(response.readline(), b'')
|
||||
|
||||
def test_missing_check_timeout_start(self):
|
||||
connection = FakeConnection()
|
||||
connection = FakeConnection(sleeps=[1])
|
||||
response = FakeResponse()
|
||||
self.sender.daemon.node_timeout = 0.01
|
||||
self.assertFalse(self.sender.limited_by_max_objects)
|
||||
with mock.patch.object(connection, 'send',
|
||||
side_effect=lambda *args: eventlet.sleep(1)):
|
||||
with self.assertRaises(exceptions.MessageTimeout) as cm:
|
||||
self.sender.missing_check(connection, response)
|
||||
with self.assertRaises(exceptions.MessageTimeout) as cm:
|
||||
self.sender.missing_check(connection, response)
|
||||
self.assertIn('0.01 seconds: missing_check start', str(cm.exception))
|
||||
self.assertFalse(self.sender.limited_by_max_objects)
|
||||
|
||||
def test_missing_check_timeout_send_line(self):
|
||||
def test_call_and_missing_check_timeout_send_line(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0abc',
|
||||
@@ -810,23 +811,36 @@ class TestSender(BaseTest):
|
||||
yield (
|
||||
'9d41d8cd98f00b204e9800998ecf0def',
|
||||
{'ts_data': Timestamp(1380144471.00000)})
|
||||
connection = FakeConnection()
|
||||
|
||||
response = FakeResponse()
|
||||
# max_objects unlimited
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, self.job, None,
|
||||
node = {'replication_ip': '1.2.3.4',
|
||||
'replication_port': 5678,
|
||||
'device': 'sda1'}
|
||||
self.sender = ssync_sender.Sender(self.daemon, node, self.job, None,
|
||||
max_objects=0)
|
||||
self.sender.daemon.node_timeout = 0.01
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
# arrange for timeout while sending first missing check item
|
||||
self.sender.daemon.node_timeout = 0.01
|
||||
connection = FakeConnection(sleeps=[0, 1])
|
||||
self.sender.connect = mock.MagicMock(return_value=(connection,
|
||||
response))
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.assertFalse(self.sender.limited_by_max_objects)
|
||||
sleeps = [0, 0, 1]
|
||||
with mock.patch.object(
|
||||
connection, 'send',
|
||||
side_effect=lambda *args: eventlet.sleep(sleeps.pop(0))):
|
||||
with self.assertRaises(exceptions.MessageTimeout) as cm:
|
||||
self.sender.missing_check(connection, response)
|
||||
self.assertIn('0.01 seconds: missing_check send line: '
|
||||
'1 lines (57 bytes) sent', str(cm.exception))
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
log_lines = self.daemon_logger.get_lines_for_level('error')
|
||||
self.assertIn(
|
||||
'1.2.3.4:5678/sda1/99 0.01 seconds: missing_check send line: '
|
||||
'0 lines (0 bytes) sent', log_lines)
|
||||
self.assertFalse(self.sender.limited_by_max_objects)
|
||||
# only the first missing check item was sent, plus a disconnect line
|
||||
self.assertEqual(
|
||||
b''.join(connection.sent),
|
||||
b'17\r\n:MISSING_CHECK: START\r\n\r\n'
|
||||
b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n'
|
||||
b'0\r\n\r\n')
|
||||
|
||||
def test_missing_check_has_empty_suffixes(self):
|
||||
def yield_hashes(device, partition, policy, suffixes=None, **kwargs):
|
||||
|
||||
Reference in New Issue
Block a user