diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 9265e683d7..16e9f0f436 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -1404,11 +1404,17 @@ class ECAppIter(object): pass except ChunkReadTimeout: # unable to resume in ECFragGetter - self.logger.exception(_("Timeout fetching fragments for %r"), - quote(self.path)) + self.logger.exception( + "ChunkReadTimeout fetching fragments for %r", + quote(self.path)) + except ChunkWriteTimeout: + # slow client disconnect + self.logger.exception( + "ChunkWriteTimeout fetching fragments for %r", + quote(self.path)) except: # noqa - self.logger.exception(_("Exception fetching fragments for" - " %r"), quote(self.path)) + self.logger.exception("Exception fetching fragments for %r", + quote(self.path)) finally: queue.resize(2) # ensure there's room queue.put(None) @@ -1437,8 +1443,8 @@ class ECAppIter(object): try: segment = self.policy.pyeclib_driver.decode(fragments) except ECDriverError: - self.logger.exception(_("Error decoding fragments for" - " %r"), quote(self.path)) + self.logger.exception("Error decoding fragments for %r", + quote(self.path)) raise yield segment @@ -2522,22 +2528,21 @@ class ECFragGetter(object): return (start_byte, end_byte, length, headers, part) except ChunkReadTimeout: new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - self.node, _('Trying to read object during ' - 'GET (retrying)')) - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - else: + if not new_source: raise + self.app.error_occurred( + self.node, 'Trying to read next part of ' + 'EC multi-part GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + parts_iter[0] = http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) def iter_bytes_from_response_part(part_file, nbytes): nchunks = 0 @@ -2559,6 +2564,7 @@ class ECFragGetter(object): try: self.fast_forward(self.bytes_used_from_backend) except (HTTPException, ValueError): + self.app.logger.exception('Unable to fast forward') six.reraise(exc_type, exc_value, exc_traceback) except RangeAlreadyComplete: break @@ -2566,8 +2572,8 @@ class ECFragGetter(object): new_source, new_node = self._dig_for_source_and_node() if new_source: self.app.error_occurred( - self.node, _('Trying to read object during ' - 'GET (retrying)')) + self.node, 'Trying to read EC fragment ' + 'during GET (retrying)') # Close-out the connection as best as possible. if getattr(self.source, 'swift_conn', None): close_swift_conn(self.source) @@ -2699,8 +2705,8 @@ class ECFragGetter(object): if end - begin + 1 == self.bytes_used_from_backend: warn = False if not req.environ.get('swift.non_client_disconnect') and warn: - self.app.logger.warning('Client disconnected on read of %r', - self.path) + self.app.logger.warning( + 'Client disconnected on read of EC frag %r', self.path) raise except Exception: self.app.logger.exception(_('Trying to send to client')) @@ -2743,7 +2749,7 @@ class ECFragGetter(object): except (Exception, Timeout): self.app.exception_occurred( node, 'Object', - _('Trying to %(method)s %(path)s') % + 'Trying to %(method)s %(path)s' % {'method': self.req.method, 'path': self.req.path}) return None @@ -2757,6 +2763,8 @@ class ECFragGetter(object): not Timestamp(src_headers.get('x-backend-timestamp', 0)): # throw out 5XX and 404s from handoff nodes unless the data is # really on disk and had been DELETEd + self.app.logger.debug('Ignoring %s from handoff' % + possible_source.status) conn.close() return None @@ -2778,6 +2786,10 @@ class ECFragGetter(object): 'From Object Server') % {'status': possible_source.status, 'body': self.body[:1024]}) + else: + self.app.logger.debug( + 'Ignoring %s from primary' % possible_source.status) + return None @property @@ -2803,8 +2815,14 @@ class ECFragGetter(object): # capture last used etag before continuation used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag') for source, node in self.source_and_node_iter: - if source and is_good_source(source.status) and \ - source.getheader('X-Object-Sysmeta-EC-ETag') == used_etag: + if not source: + # _make_node_request only returns good sources + continue + if source.getheader('X-Object-Sysmeta-EC-ETag') != used_etag: + self.app.logger.warning( + 'Skipping source (etag mismatch: got %s, expected %s)', + source.getheader('X-Object-Sysmeta-EC-ETag'), used_etag) + else: return source, node return None, None diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index c137de9f0d..0c552865b2 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -4314,8 +4314,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 206) self.assertEqual(len(log), self.policy.ec_n_unique_fragments * 2) log_lines = self.app.logger.get_lines_for_level('error') - self.assertIn("Trying to read object during GET (retrying)", - log_lines[0]) + self.assertIn("Trying to read next part of EC multi-part " + "GET (retrying)", log_lines[0]) # not the most graceful ending self.assertIn("Exception fetching fragments for '/a/c/o'", log_lines[-1]) @@ -4755,6 +4755,44 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) + def test_GET_write_timeout(self): + # verify EC GET behavior when there's a timeout sending decoded frags + # via the queue. + segment_size = self.policy.ec_segment_size + test_data = (b'test' * segment_size)[:-333] + etag = md5(test_data, usedforsecurity=False).hexdigest() + ec_archive_bodies = self._make_ec_archive_bodies(test_data) + headers = {'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Object-Sysmeta-Ec-Content-Length': '333'} + ndata = self.policy.ec_ndata + responses = [ + (200, body, self._add_frag_index(i, headers)) + for i, body in enumerate(ec_archive_bodies[:ndata]) + ] * self.policy.ec_duplication_factor + + req = swob.Request.blank('/v1/a/c/o') + + status_codes, body_iter, headers = zip(*responses) + self.app.client_timeout = 0.01 + with mocked_http_conn(*status_codes, body_iter=body_iter, + headers=headers): + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + resp_body = next(resp.app_iter) + sleep(0.5) # lazy client + # remaining resp truncated + resp_body += b''.join(resp.app_iter) + # we log errors + log_lines = self.app.logger.get_lines_for_level('error') + for line in log_lines: + self.assertIn('ChunkWriteTimeout fetching fragments', line) + # client gets a short read + self.assertEqual(16051, len(test_data)) + self.assertEqual(8192, len(resp_body)) + self.assertNotEqual( + md5(resp_body, usedforsecurity=False).hexdigest(), + etag) + def test_GET_read_timeout_retrying_but_no_more_useful_nodes(self): # verify EC GET behavior when initial batch of nodes time out then # remaining nodes either return 404 or return data for different etag @@ -4801,6 +4839,22 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) + debug_lines = self.logger.get_lines_for_level('debug') + nparity = self.policy.ec_nparity + nhandoffs = self.policy.object_ring.max_more_nodes + ignore_404 = ignore_404_handoff = 0 + for line in debug_lines: + if 'Ignoring 404 from primary' in line: + ignore_404 += 1 + if 'Ignoring 404 from handoff' in line: + ignore_404_handoff += 1 + self.assertEqual(nparity - 2, ignore_404, debug_lines) + self.assertEqual(nhandoffs, ignore_404_handoff, debug_lines) + self.assertEqual(len(debug_lines), ignore_404_handoff + ignore_404) + self.assertEqual(self.logger.get_lines_for_level('warning'), [ + 'Skipping source (etag mismatch: got other_etag, ' + 'expected %s)' % etag] * 2) + def test_GET_read_timeout_resume(self): segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-333] @@ -4867,8 +4921,9 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertNotEqual(md5(resp.body).hexdigest(), etag) error_lines = self.logger.get_lines_for_level('error') - self.assertEqual(1, len(error_lines)) - self.assertIn('Timeout fetching', error_lines[0]) + self.assertEqual(2, len(error_lines)) + self.assertIn('Unable to fast forward', error_lines[0]) + self.assertIn('Timeout fetching', error_lines[1]) for line in self.logger.logger.records['ERROR']: self.assertIn(req.headers['x-trans-id'], line) diff --git a/test/unit/proxy/test_server.py b/test/unit/proxy/test_server.py index 691041832a..3b9fef8b4e 100644 --- a/test/unit/proxy/test_server.py +++ b/test/unit/proxy/test_server.py @@ -7471,8 +7471,9 @@ class BaseTestECObjectController(BaseTestObjectController): _test_servers[0].logger.get_lines_for_level('warning')) # check for disconnect message! - expected = ["Client disconnected on read of '/a/%s-discon/test'" - % self.ec_policy.name] * 2 + expected = [ + "Client disconnected on read of EC frag '/a/%s-discon/test'" + % self.ec_policy.name] * 2 self.assertEqual( _test_servers[0].logger.get_lines_for_level('warning'), expected)