diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index ab821f8680..6415d9fd46 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -16,7 +16,7 @@ import eventlet.greenio import eventlet.wsgi -from eventlet import sleep +from eventlet import sleep, Timeout import urllib from swift.common import exceptions @@ -104,6 +104,93 @@ def encode_wanted(remote, local): return None +class SsyncInputProxy: + """ + Wraps a wsgi input to provide ssync specific read methods. + + If any exception or timeout is raised while reading from the input then + subsequent calls will raise the same exception. Callers are thereby + prevented from reading the input after it has raised an exception, when its + state may be uncertain. This enables the input to be safely shared by + multiple callers (typically an ssync Receiver and an ObjectController) who + may otherwise each be unaware that the other has encountered an exception. + + :param wsgi_input: a wsgi input + :param chunk_size: the number of bytes to read at a time + :param timeout: the timeout in seconds applied to each read + """ + def __init__(self, wsgi_input, chunk_size, timeout): + self.wsgi_input = wsgi_input + self.chunk_size = chunk_size + self.timeout = timeout + self.exception = None + + def read_line(self, context): + """ + Try to read a line from the wsgi input; annotate any timeout or read + errors with a description of the calling context. + + :param context: string to annotate any exception raised + """ + if self.exception: + raise self.exception + try: + try: + with exceptions.MessageTimeout(self.timeout, context): + line = self.wsgi_input.readline(self.chunk_size) + except (eventlet.wsgi.ChunkReadError, IOError) as err: + raise exceptions.ChunkReadError('%s: %s' % (context, err)) + except (Exception, Timeout) as err: + self.exception = err + raise + + if line and not line.endswith(b'\n'): + # Everywhere we would call readline, we should always get + # a clean end-of-line as we should be reading + # SSYNC-specific messages or HTTP request lines/headers. + # If we didn't, it indicates that the wsgi input readline reached a + # valid end of chunked body without finding a newline. + raise exceptions.ChunkReadError( + '%s: %s' % (context, 'missing newline')) + + return line + + def _read_chunk(self, context, size): + if self.exception: + raise self.exception + try: + try: + with exceptions.MessageTimeout(self.timeout, context): + chunk = self.wsgi_input.read(size) + except (eventlet.wsgi.ChunkReadError, IOError) as err: + raise exceptions.ChunkReadError('%s: %s' % (context, err)) + if not chunk: + raise exceptions.ChunkReadError( + 'Early termination for %s' % context) + except (Exception, Timeout) as err: + self.exception = err + raise + return chunk + + def make_subreq_input(self, context, content_length): + """ + Returns a wsgi input that will read up to the given ``content-length`` + from the wrapped wsgi input. + + :param context: string to annotate any exception raised + :param content_length: maximum number of bytes to read + """ + def subreq_iter(): + bytes_left = content_length + while bytes_left > 0: + size = min(bytes_left, self.chunk_size) + chunk = self._read_chunk(context, size) + bytes_left -= len(chunk) + yield chunk + + return utils.FileLikeIter(subreq_iter()) + + class Receiver(object): """ Handles incoming SSYNC requests to the object server. @@ -142,7 +229,7 @@ class Receiver(object): self.request = request self.device = None self.partition = None - self.fp = None + self.input = None # We default to dropping the connection in case there is any exception # raised during processing because otherwise the sender could send for # quite some time before realizing it was all in vain. @@ -210,6 +297,8 @@ class Receiver(object): '%s/%s/%s read failed in ssync.Receiver: %s' % ( self.request.remote_addr, self.device, self.partition, err)) + # Since the client (presumably) hung up, no point in trying to + # send anything about the error except swob.HTTPException as err: body = b''.join(err({}, lambda *args: None)) yield (':ERROR: %d %r\n' % ( @@ -260,18 +349,9 @@ class Receiver(object): self.diskfile_mgr = self.app._diskfile_router[self.policy] if not self.diskfile_mgr.get_dev_path(self.device): raise swob.HTTPInsufficientStorage(drive=self.device) - self.fp = self.request.environ['wsgi.input'] - - def _readline(self, context): - # try to read a line from the wsgi input; annotate any timeout or read - # errors with a description of the calling context - with exceptions.MessageTimeout( - self.app.client_timeout, context): - try: - line = self.fp.readline(self.app.network_chunk_size) - except (eventlet.wsgi.ChunkReadError, IOError) as err: - raise exceptions.ChunkReadError('%s: %s' % (context, err)) - return line + self.input = SsyncInputProxy(self.request.environ['wsgi.input'], + self.app.network_chunk_size, + self.app.client_timeout) def _check_local(self, remote, make_durable=True): """ @@ -382,7 +462,7 @@ class Receiver(object): have to read while it writes to ensure network buffers don't fill up and block everything. """ - line = self._readline('missing_check start') + line = self.input.read_line('missing_check start') if not line: # Guess they hung up raise SsyncClientDisconnected @@ -393,7 +473,7 @@ class Receiver(object): object_hashes = [] nlines = 0 while True: - line = self._readline('missing_check line') + line = self.input.read_line('missing_check line') if not line or line.strip() == b':MISSING_CHECK: END': break want = self._check_missing(line) @@ -446,7 +526,7 @@ class Receiver(object): success. This is so the sender knows if it can remove an out of place partition, for example. """ - line = self._readline('updates start') + line = self.input.read_line('updates start') if not line: # Guess they hung up waiting for us to process the missing check raise SsyncClientDisconnected @@ -457,11 +537,12 @@ class Receiver(object): failures = 0 updates = 0 while True: - line = self._readline('updates line') + line = self.input.read_line('updates line') if not line or line.strip() == b':UPDATES: END': break # Read first line METHOD PATH of subrequest. - method, path = swob.bytes_to_wsgi(line.strip()).split(' ', 1) + context = swob.bytes_to_wsgi(line.strip()) + method, path = context.split(' ', 1) subreq = swob.Request.blank( '/%s/%s%s' % (self.device, self.partition, path), environ={'REQUEST_METHOD': method}) @@ -469,10 +550,9 @@ class Receiver(object): content_length = None replication_headers = [] while True: - line = self._readline('updates line') + line = self.input.read_line('updates line') if not line: - raise Exception( - 'Got no headers for %s %s' % (method, path)) + raise Exception('Got no headers for %s' % context) line = line.strip() if not line: break @@ -500,24 +580,9 @@ class Receiver(object): % (method, path)) elif method == 'PUT': if content_length is None: - raise Exception( - 'No content-length sent for %s %s' % (method, path)) - - def subreq_iter(): - left = content_length - while left > 0: - with exceptions.MessageTimeout( - self.app.client_timeout, - 'updates content'): - chunk = self.fp.read( - min(left, self.app.network_chunk_size)) - if not chunk: - raise exceptions.ChunkReadError( - 'Early termination for %s %s' % (method, path)) - left -= len(chunk) - yield chunk - subreq.environ['wsgi.input'] = utils.FileLikeIter( - subreq_iter()) + raise Exception('No content-length sent for %s' % context) + subreq.environ['wsgi.input'] = self.input.make_subreq_input( + context, content_length) else: raise Exception('Invalid subrequest method %s' % method) subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy) @@ -535,8 +600,8 @@ class Receiver(object): successes += 1 else: self.app.logger.warning( - 'ssync subrequest failed with %s: %s %s (%s)' % - (resp.status_int, method, subreq.path, resp.body)) + 'ssync subrequest failed with %s: %s (%s)' % + (resp.status_int, context, resp.body)) failures += 1 if failures >= self.app.replication_failure_threshold and ( not successes or @@ -546,8 +611,8 @@ class Receiver(object): 'Too many %d failures to %d successes' % (failures, successes)) # The subreq may have failed, but we want to read the rest of the - # body from the remote side so we can continue on with the next - # subreq. + # body from the remote side so we can either detect a broken input + # or continue on with the next subreq. for junk in subreq.environ['wsgi.input']: pass if updates % 5 == 0: diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index f7178a5ff4..a69eeb0093 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -991,18 +991,22 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): pass # expected outcome if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - log_lines = self.logger.get_lines_for_level('error') + tx_error_log_lines = self.logger.get_lines_for_level('error') self.assertIn('Sent data length does not match content-length', - log_lines[0]) - self.assertFalse(log_lines[1:]) + tx_error_log_lines[0]) + self.assertFalse(tx_error_log_lines[1:]) # trampoline for the receiver to write a log - eventlet.sleep(0) - log_lines = self.rx_logger.get_lines_for_level('warning') - self.assertEqual(1, len(log_lines), self.rx_logger.all_log_lines()) + eventlet.sleep(0.001) + rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning') + self.assertEqual(1, len(rx_warning_log_lines), + self.rx_logger.all_log_lines()) self.assertIn('ssync subrequest failed with 499', - log_lines[0]) - self.assertFalse(log_lines[1:]) - self.assertFalse(self.rx_logger.get_lines_for_level('error')) + rx_warning_log_lines[0]) + self.assertFalse(rx_warning_log_lines[1:]) + rx_error_lines = self.rx_logger.get_lines_for_level('error') + self.assertEqual(1, len(rx_error_lines), rx_error_lines) + self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early ' + 'termination for PUT', rx_error_lines[0]) def test_sync_reconstructor_no_rebuilt_content(self): # First fragment to sync gets no content in any response to @@ -1026,17 +1030,20 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): pass # expected outcome if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - log_lines = self.logger.get_lines_for_level('error') + tx_error_log_lines = self.logger.get_lines_for_level('error') self.assertIn('Sent data length does not match content-length', - log_lines[0]) - self.assertFalse(log_lines[1:]) + tx_error_log_lines[0]) + self.assertFalse(tx_error_log_lines[1:]) # trampoline for the receiver to write a log - eventlet.sleep(0) - log_lines = self.rx_logger.get_lines_for_level('warning') + eventlet.sleep(0.001) + rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning') self.assertIn('ssync subrequest failed with 499', - log_lines[0]) - self.assertFalse(log_lines[1:]) - self.assertFalse(self.rx_logger.get_lines_for_level('error')) + rx_warning_log_lines[0]) + self.assertFalse(rx_warning_log_lines[1:]) + rx_error_lines = self.rx_logger.get_lines_for_level('error') + self.assertEqual(1, len(rx_error_lines), rx_error_lines) + self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early ' + 'termination for PUT', rx_error_lines[0]) def test_sync_reconstructor_exception_during_rebuild(self): # First fragment to sync has some reconstructor get responses raise @@ -1071,18 +1078,21 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): if msgs: self.fail('Failed with:\n%s' % '\n'.join(msgs)) - log_lines = self.logger.get_lines_for_level('error') - self.assertIn('Error trying to rebuild', log_lines[0]) + tx_error_log_lines = self.logger.get_lines_for_level('error') + self.assertIn('Error trying to rebuild', tx_error_log_lines[0]) self.assertIn('Sent data length does not match content-length', - log_lines[1]) - self.assertFalse(log_lines[2:]) + tx_error_log_lines[1]) + self.assertFalse(tx_error_log_lines[2:]) # trampoline for the receiver to write a log - eventlet.sleep(0) - log_lines = self.rx_logger.get_lines_for_level('warning') + eventlet.sleep(0.001) + rx_warning_log_lines = self.rx_logger.get_lines_for_level('warning') self.assertIn('ssync subrequest failed with 499', - log_lines[0]) - self.assertFalse(log_lines[1:]) - self.assertFalse(self.rx_logger.get_lines_for_level('error')) + rx_warning_log_lines[0]) + self.assertFalse(rx_warning_log_lines[1:]) + rx_error_lines = self.rx_logger.get_lines_for_level('error') + self.assertEqual(1, len(rx_error_lines), rx_error_lines) + self.assertIn('127.0.0.1/dev/9 read failed in ssync.Receiver: Early ' + 'termination for PUT', rx_error_lines[0]) def test_sync_reconstructor_no_responses(self): # First fragment to sync gets no responses for reconstructor to rebuild @@ -1131,7 +1141,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): log_lines = self.logger.get_lines_for_level('error') self.assertIn('Unable to get enough responses', log_lines[0]) # trampoline for the receiver to write a log - eventlet.sleep(0) + eventlet.sleep(0.001) self.assertFalse(self.rx_logger.get_lines_for_level('warning')) self.assertFalse(self.rx_logger.get_lines_for_level('error')) @@ -1234,7 +1244,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): fd.read()) # trampoline for the receiver to write a log - eventlet.sleep(0) + eventlet.sleep(0.001) self.assertFalse(self.rx_logger.get_lines_for_level('warning')) self.assertFalse(self.rx_logger.get_lines_for_level('error')) @@ -1270,7 +1280,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC): self.assertFalse( self.logger.get_lines_for_level('error')) # trampoline for the receiver to write a log - eventlet.sleep(0) + eventlet.sleep(0.001) self.assertFalse(self.rx_logger.get_lines_for_level('warning')) self.assertFalse(self.rx_logger.get_lines_for_level('error')) diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 6a80193d55..7df5086520 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -20,19 +20,24 @@ import tempfile import unittest import eventlet +import eventlet.wsgi from unittest import mock import itertools from swift.common import bufferedhttp from swift.common import exceptions from swift.common import swob +from swift.common.exceptions import MessageTimeout, ChunkReadError from swift.common.storage_policy import POLICIES from swift.common import utils -from swift.common.swob import HTTPException +from swift.common.swob import HTTPException, HTTPCreated, Request, \ + HTTPNoContent +from swift.common.utils import public from swift.obj import diskfile from swift.obj import server from swift.obj import ssync_receiver, ssync_sender from swift.obj.reconstructor import ObjectReconstructor +from swift.obj.ssync_receiver import SsyncInputProxy from test import listen_zero, unit from test.debug_logger import debug_logger @@ -44,6 +49,70 @@ from test.unit.obj.common import write_diskfile UNPACK_ERR = b":ERROR: 0 'not enough values to unpack (expected 2, got 1)'" +class FakeController(server.ObjectController): + def __init__(self, conf, logger=None): + super().__init__(conf, logger) + self.requests = [] + + def __call__(self, environ, start_response): + self.requests.append(Request(environ)) + return super().__call__(environ, start_response) + + @public + def PUT(self, req): + b''.join(req.environ['wsgi.input']) + return HTTPCreated() + + @public + def DELETE(self, req): + b''.join(req.environ['wsgi.input']) + return HTTPNoContent() + + +class SlowBytesIO(io.BytesIO): + """ + A BytesIO that will sleep once for sleep_time before reading the byte at + sleep_index. If a read or readline call is completed by the byte at + (sleep_index - 1) then the call returns without sleeping, and the sleep + will occur at the start of the next read or readline call. + """ + def __init__(self, value, sleep_index=-1, sleep_time=0.1): + io.BytesIO.__init__(self, value) + self.sleep_index = sleep_index + self.sleep_time = sleep_time + self.bytes_read = [] + self.num_bytes_read = 0 + + def _read(self, size=-1, readline=False): + size = -1 if size is None else size + num_read = 0 + data = b'' + self.bytes_read.append(data) + while True: + if self.num_bytes_read == self.sleep_index: + self.sleep_index = -1 + eventlet.sleep(self.sleep_time) + next_byte = io.BytesIO.read(self, 1) + data = data + next_byte + self.bytes_read[-1] = data + num_read += 1 + self.num_bytes_read += 1 + if len(data) < num_read: + break + if readline and data[-1:] == b'\n': + break + if 0 <= size <= num_read: + break + + return data + + def read(self, size=-1): + return self._read(size, False) + + def readline(self, size=-1): + return self._read(size, True) + + @unit.patch_policies() class TestReceiver(unittest.TestCase): @@ -498,7 +567,7 @@ class TestReceiver(unittest.TestCase): '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}, body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' - ':UPDATES: START\r\nBad content is here') + ':UPDATES: START\r\nBad content is here\n') req.remote_addr = '1.2.3.4' mock_wsgi_input = _Wrapper(req.body) req.environ['wsgi.input'] = mock_wsgi_input @@ -533,7 +602,7 @@ class TestReceiver(unittest.TestCase): '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}, body=':MISSING_CHECK: START\r\n:MISSING_CHECK: END\r\n' - ':UPDATES: START\r\nBad content is here') + ':UPDATES: START\r\nBad content is here\n') req.remote_addr = mock.MagicMock() req.remote_addr.__str__ = mock.Mock( side_effect=Exception("can't stringify this")) @@ -633,6 +702,22 @@ class TestReceiver(unittest.TestCase): self.controller.logger.exception.assert_called_once_with( '3.4.5.6/sda1/1 EXCEPTION in ssync.Receiver') + def test_MISSING_CHECK_partial_line(self): + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC'}, + # not sure this would ever be yielded by the wsgi input since the + # bytes read wouldn't match the chunk size that was sent + body=':MISSING_CHECK: START\r\nhash no_newline' + ) + resp = req.get_response(self.controller) + self.assertFalse(self.body_lines(resp.body)) + self.assertEqual(resp.status_int, 200) + lines = self.logger.get_lines_for_level('error') + self.assertEqual( + ['None/sda1/1 read failed in ssync.Receiver: missing_check line: ' + 'missing newline'], lines) + def test_MISSING_CHECK_empty_list(self): self.controller.logger = mock.MagicMock() @@ -1308,6 +1393,133 @@ class TestReceiver(unittest.TestCase): '2.3.4.5/device/partition TIMEOUT in ssync.Receiver: ' '0.01 seconds: updates line') + def test_UPDATES_timeout_reading_PUT_subreq_input_1(self): + # timeout reading from wsgi input part way through a PUT subreq body + body_chunks = [ + ':MISSING_CHECK: START\r\n', + ':MISSING_CHECK: END\r\n', + ':UPDATES: START\r\n', + 'PUT /a/c/o\r\nContent-Length: 28\r\n\r\n', + 'body_chunk_one', + 'body_chunk_two', + ':UPDATES: END\r\n', + '' + ] + chunked_body = ''.join([ + '%x\r\n%s\r\n' % (len(line), line) for line in body_chunks + ]) + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'SSYNC'}, + body=chunked_body) + req.remote_addr = '2.3.4.5' + slow_down_index = chunked_body.find('chunk_one') + slow_io = SlowBytesIO(req.body, sleep_index=slow_down_index) + wsgi_input = eventlet.wsgi.Input( + rfile=slow_io, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + req.environ['wsgi.input'] = wsgi_input + controller = FakeController(self.conf, logger=self.logger) + controller.client_timeout = 0.01 + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + resp = req.get_response(controller) + resp_body_lines = self.body_lines(resp.body) + self.assertEqual(resp.status_int, 200) + self.assertEqual( + [b':MISSING_CHECK: START', + b':MISSING_CHECK: END', + b":ERROR: 408 '0.01 seconds: PUT /a/c/o'"], resp_body_lines) + self.assertEqual([ + b'17\r\n', + b':MISSING_CHECK: START\r\n', + b'\r\n', + b'15\r\n', + b':MISSING_CHECK: END\r\n', + b'\r\n', + b'11\r\n', + b':UPDATES: START\r\n', + b'\r\n', + b'22\r\n', + b'PUT /a/c/o\r\n', + b'Content-Length: 28\r\n', + b'\r\n', + b'\r\n', + b'e\r\n', + b'body_', + ], slow_io.bytes_read) + # oops,the subreq body was not drained + self.assertEqual( + b'chunk_one\r\ne\r\nbody_chunk_two\r\n' + b'f\r\n:UPDATES: END\r\n\r\n' + b'0\r\n\r\n', slow_io.read()) + mock_shutdown_safe.assert_called_once_with( + wsgi_input.get_socket()) + self.assertTrue(wsgi_input.get_socket().closed) + log_lines = self.logger.get_lines_for_level('error') + self.assertEqual( + ['ERROR __call__ error with PUT /device/partition/a/c/o : ' + 'MessageTimeout (0.01s) PUT /a/c/o', + '2.3.4.5/device/partition TIMEOUT in ssync.Receiver: ' + '0.01 seconds: PUT /a/c/o'], + log_lines) + + def test_UPDATES_timeout_reading_PUT_subreq_input_2(self): + # timeout immediately before reading PUT subreq chunk content + body_chunks = [ + ':MISSING_CHECK: START\r\n', + ':MISSING_CHECK: END\r\n', + ':UPDATES: START\r\n', + 'PUT /a/c/o\r\nContent-Length: 99\r\n\r\n', + 'first body chunk', + # NB: this is still the PUT subreq body, it just happens to look + # like the start of another subreq... + 'DELETE /in/second/body chunk\r\n' + 'X-Timestamp: 123456789.12345\r\nContent-Length: 0\r\n\r\n', + ':UPDATES: END\r\n', + ] + chunked_body = ''.join([ + '%x\r\n%s\r\n' % (len(line), line) for line in body_chunks + ]) + req = swob.Request.blank( + '/device/partition', + environ={'REQUEST_METHOD': 'SSYNC'}, + body=chunked_body) + req.remote_addr = '2.3.4.5' + slow_down_index = chunked_body.find('DELETE /in/second/body chunk') + slow_io = SlowBytesIO(req.body, sleep_index=slow_down_index) + wsgi_input = eventlet.wsgi.Input( + rfile=slow_io, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + req.environ['wsgi.input'] = wsgi_input + controller = FakeController(self.conf, logger=self.logger) + controller.client_timeout = 0.01 + with mock.patch.object( + ssync_receiver.eventlet.greenio, 'shutdown_safe') as \ + mock_shutdown_safe: + resp = req.get_response(controller) + resp_body_lines = self.body_lines(resp.body) + self.assertEqual(resp.status_int, 200) + self.assertEqual(['SSYNC', 'PUT'], + [req.method for req in controller.requests]) + self.assertEqual(chunked_body.encode('utf-8')[:slow_down_index], + b''.join(slow_io.bytes_read)) + self.assertEqual([ + b':MISSING_CHECK: START', + b':MISSING_CHECK: END', + b":ERROR: 408 '0.01 seconds: PUT /a/c/o'"], resp_body_lines) + mock_shutdown_safe.assert_called_once_with( + wsgi_input.get_socket()) + self.assertTrue(wsgi_input.get_socket().closed) + log_lines = self.logger.get_lines_for_level('error') + self.assertEqual( + ['ERROR __call__ error with PUT /device/partition/a/c/o : ' + 'MessageTimeout (0.01s) PUT /a/c/o', + '2.3.4.5/device/partition TIMEOUT in ssync.Receiver: ' + '0.01 seconds: PUT /a/c/o'], + log_lines) + def test_UPDATES_other_exception(self): class _Wrapper(io.BytesIO): @@ -1391,8 +1603,7 @@ class TestReceiver(unittest.TestCase): self.assertFalse(mock_shutdown_safe.called) self.assertFalse(mock_wsgi_input.mock_socket.close.called) - def test_UPDATES_bad_subrequest_line(self): - self.controller.logger = mock.MagicMock() + def test_UPDATES_bad_subrequest_line_1(self): req = swob.Request.blank( '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}, @@ -1405,13 +1616,16 @@ class TestReceiver(unittest.TestCase): [b':MISSING_CHECK: START', b':MISSING_CHECK: END', UNPACK_ERR]) self.assertEqual(resp.status_int, 200) - self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in ssync.Receiver') + lines = self.logger.get_lines_for_level('error') + self.assertEqual( + ['None/device/partition EXCEPTION in ssync.Receiver: '], lines) + def test_UPDATES_bad_subrequest_line_2(self): + # If there's no line feed, we probably read a partial buffer + # because the client hung up with mock.patch.object( self.controller, 'DELETE', return_value=swob.HTTPNoContent()): - self.controller.logger = mock.MagicMock() req = swob.Request.blank( '/device/partition', environ={'REQUEST_METHOD': 'SSYNC'}, @@ -1424,11 +1638,14 @@ class TestReceiver(unittest.TestCase): resp = req.get_response(self.controller) self.assertEqual( self.body_lines(resp.body), - [b':MISSING_CHECK: START', b':MISSING_CHECK: END', - UNPACK_ERR]) + [b':MISSING_CHECK: START', b':MISSING_CHECK: END']) + # Since the client (presumably) hung up, no point in sending + # anything about the error self.assertEqual(resp.status_int, 200) - self.controller.logger.exception.assert_called_once_with( - 'None/device/partition EXCEPTION in ssync.Receiver') + lines = self.logger.get_lines_for_level('error') + self.assertEqual( + ['None/device/partition read failed in ssync.Receiver: ' + 'updates line: missing newline'], lines) def test_UPDATES_no_headers(self): self.controller.logger = mock.MagicMock() @@ -2675,5 +2892,162 @@ class TestModuleMethods(unittest.TestCase): expected) +class TestSsyncInputProxy(unittest.TestCase): + def test_read_line(self): + body = io.BytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n' + b'10\r\nDELETE /a/c/o1\r\n\r\n' + b'13\r\nDELETE /a/c/oh my\r\n\r\n') + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=60) + self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt')) + self.assertEqual(b'DELETE /a/c/o1\r\n', inpt.read_line('ctxt')) + self.assertEqual(b'DELETE /a/c/oh my\r\n', inpt.read_line('ctxt')) + + def test_read_line_timeout(self): + body = SlowBytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n' + b'10\r\nDELETE /a/c/o1\r\n\r\n', + # timeout reading second line... + sleep_index=23) + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01) + self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt')) + with self.assertRaises(MessageTimeout) as cm: + inpt.read_line('ctxt') + self.assertEqual('0.01 seconds: ctxt', str(cm.exception)) + # repeat + with self.assertRaises(MessageTimeout) as cm: + inpt.read_line('ctxt') + self.assertEqual('0.01 seconds: ctxt', str(cm.exception)) + # check subreq input will also fail + sub_input = inpt.make_subreq_input('ctxt2', 123) + with self.assertRaises(MessageTimeout) as cm: + sub_input.read() + self.assertEqual('0.01 seconds: ctxt', str(cm.exception)) + + def test_read_line_chunk_read_error(self): + body = SlowBytesIO(b'f\r\nDELETE /a/c/o\r\n\r\n' + # bad chunk length... + b'x\r\nDELETE /a/c/o1\r\n\r\n', + sleep_index=23) + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01) + self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt')) + with self.assertRaises(ChunkReadError) as cm: + inpt.read_line('ctxt') + self.assertEqual( + "ctxt: invalid literal for int() with base 16: b'x\\r\\n'", + str(cm.exception)) + # repeat + with self.assertRaises(ChunkReadError) as cm: + inpt.read_line('ctxt') + self.assertEqual( + "ctxt: invalid literal for int() with base 16: b'x\\r\\n'", + str(cm.exception)) + # check subreq input will also fail + sub_input = inpt.make_subreq_input('ctxt2', 123) + with self.assertRaises(ChunkReadError) as cm: + sub_input.read() + self.assertEqual( + "ctxt: invalid literal for int() with base 16: b'x\\r\\n'", + str(cm.exception)) + + def test_read_line_protocol_error(self): + body = io.BytesIO( + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'15\r\n:MISSING_CHECK: END\r\n\r\n' + b'11\r\n:UPDATES: START\r\n\r\n' + b'd\r\n:UPDATES: END\r\n' # note: chunk is missing its newline + b'0\r\n\r\n' + ) + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=65536, timeout=0.01) + self.assertEqual(b':MISSING_CHECK: START\r\n', inpt.read_line('ctxt')) + self.assertEqual(b':MISSING_CHECK: END\r\n', inpt.read_line('ctxt')) + self.assertEqual(b':UPDATES: START\r\n', inpt.read_line('ctxt')) + with self.assertRaises(ChunkReadError) as cm: + inpt.read_line('ctxt') + self.assertEqual('ctxt: missing newline', str(cm.exception)) + + def test_subreq_input(self): + body = io.BytesIO(b'1a\r\nchunk1 \r\n' + b'1b\r\nchunktwo \r\n' + b'1c\r\nchunkthree \r\n' + b'f\r\nDELETE /a/c/o\r\n\r\n') + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60) + sub_input = inpt.make_subreq_input('ctxt', content_length=81) + self.assertEqual(b'chunk1 ' + b'chunktwo ' + b'chunkthree ', + sub_input.read()) + # check next read_line (note: chunk_size needs to be big enough to read + # whole ssync protocol 'line' + self.assertEqual(b'DELETE /a/c/o\r\n', inpt.read_line('ctxt')) + + def test_subreq_input_content_length_less_than_body(self): + body = io.BytesIO(b'1a\r\nchunk1 \r\n' + b'1b\r\nchunktwo \r\n') + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60) + sub_input = inpt.make_subreq_input('ctxt', content_length=3) + self.assertEqual(b'chu', sub_input.read()) + + def test_subreq_input_content_length_more_than_body(self): + body = io.BytesIO(b'1a\r\nchunk1 \r\n') + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60) + sub_input = inpt.make_subreq_input('ctxt', content_length=81) + with self.assertRaises(ChunkReadError) as cm: + sub_input.read() + self.assertEqual("ctxt: invalid literal for int() with base 16: b''", + str(cm.exception)) + + def test_subreq_input_early_termination(self): + body = io.BytesIO(b'1a\r\nchunk1 \r\n' + b'0\r\n\r\n') # the sender disconnected + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=20, timeout=60) + sub_input = inpt.make_subreq_input('ctxt', content_length=81) + with self.assertRaises(ChunkReadError) as cm: + sub_input.read() + self.assertEqual('Early termination for ctxt', str(cm.exception)) + + def test_subreq_input_timeout(self): + body = SlowBytesIO(b'1a\r\nchunk1 \r\n' + b'1b\r\nchunktwo \r\n', + sleep_index=25) + wsgi_input = eventlet.wsgi.Input( + rfile=body, content_length=123, sock=mock.MagicMock(), + chunked_input=True) + inpt = SsyncInputProxy(wsgi_input, chunk_size=16, timeout=0.01) + sub_input = inpt.make_subreq_input('ctxt', content_length=81) + self.assertEqual(b'chunk1 ', sub_input.read(16)) + with self.assertRaises(MessageTimeout) as cm: + sub_input.read() + self.assertEqual('0.01 seconds: ctxt', str(cm.exception)) + # repeat + self.assertEqual(b'', sub_input.read()) + # check next read_line + with self.assertRaises(MessageTimeout) as cm: + inpt.read_line('ctxt2') + self.assertEqual('0.01 seconds: ctxt', str(cm.exception)) + + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 887f20e281..413937e7a7 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -82,12 +82,15 @@ class FakeResponse(ssync_sender.SsyncBufferedHTTPResponse): class FakeConnection(object): - def __init__(self): + def __init__(self, sleeps=None): + self.sleeps = sleeps self.sent = [] self.closed = False def send(self, data): self.sent.append(data) + if self.sleeps: + eventlet.sleep(self.sleeps.pop(0)) def close(self): self.closed = True @@ -791,18 +794,16 @@ class TestSender(BaseTest): self.assertEqual(response.readline(), b'') def test_missing_check_timeout_start(self): - connection = FakeConnection() + connection = FakeConnection(sleeps=[1]) response = FakeResponse() self.sender.daemon.node_timeout = 0.01 self.assertFalse(self.sender.limited_by_max_objects) - with mock.patch.object(connection, 'send', - side_effect=lambda *args: eventlet.sleep(1)): - with self.assertRaises(exceptions.MessageTimeout) as cm: - self.sender.missing_check(connection, response) + with self.assertRaises(exceptions.MessageTimeout) as cm: + self.sender.missing_check(connection, response) self.assertIn('0.01 seconds: missing_check start', str(cm.exception)) self.assertFalse(self.sender.limited_by_max_objects) - def test_missing_check_timeout_send_line(self): + def test_call_and_missing_check_timeout_send_line(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): yield ( '9d41d8cd98f00b204e9800998ecf0abc', @@ -810,23 +811,36 @@ class TestSender(BaseTest): yield ( '9d41d8cd98f00b204e9800998ecf0def', {'ts_data': Timestamp(1380144471.00000)}) - connection = FakeConnection() + response = FakeResponse() # max_objects unlimited - self.sender = ssync_sender.Sender(self.daemon, None, self.job, None, + node = {'replication_ip': '1.2.3.4', + 'replication_port': 5678, + 'device': 'sda1'} + self.sender = ssync_sender.Sender(self.daemon, node, self.job, None, max_objects=0) - self.sender.daemon.node_timeout = 0.01 + self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes + # arrange for timeout while sending first missing check item + self.sender.daemon.node_timeout = 0.01 + connection = FakeConnection(sleeps=[0, 1]) + self.sender.connect = mock.MagicMock(return_value=(connection, + response)) + self.sender.updates = mock.MagicMock() self.assertFalse(self.sender.limited_by_max_objects) - sleeps = [0, 0, 1] - with mock.patch.object( - connection, 'send', - side_effect=lambda *args: eventlet.sleep(sleeps.pop(0))): - with self.assertRaises(exceptions.MessageTimeout) as cm: - self.sender.missing_check(connection, response) - self.assertIn('0.01 seconds: missing_check send line: ' - '1 lines (57 bytes) sent', str(cm.exception)) + success, candidates = self.sender() + self.assertFalse(success) + log_lines = self.daemon_logger.get_lines_for_level('error') + self.assertIn( + '1.2.3.4:5678/sda1/99 0.01 seconds: missing_check send line: ' + '0 lines (0 bytes) sent', log_lines) self.assertFalse(self.sender.limited_by_max_objects) + # only the first missing check item was sent, plus a disconnect line + self.assertEqual( + b''.join(connection.sent), + b'17\r\n:MISSING_CHECK: START\r\n\r\n' + b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' + b'0\r\n\r\n') def test_missing_check_has_empty_suffixes(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs):