diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 2c076888db..d9181c579a 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1030,6 +1030,31 @@ class GetterBase(object): self.source = None self.source_parts_iter = None + def _get_source_and_node(self): + raise NotImplementedError() + + def _replace_source_and_node(self, err_msg): + # be defensive against _get_source_and_node modifying self.source + # or self.node... + old_source = self.source + old_node = self.node + + new_source, new_node = self._get_source_and_node() + if not new_source: + return False + + self.app.error_occurred(old_node, err_msg) + # Close-out the connection as best as possible. + if getattr(old_source, 'swift_conn', None): + close_swift_conn(old_source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = http_response_to_document_iters( + new_source, read_chunk_size=self.app.object_chunk_size) + return True + def fast_forward(self, num_bytes): """ Will skip num_bytes into the current ranges. @@ -1207,22 +1232,8 @@ class GetOrHeadHandler(GetterBase): self.source_parts_iter) return (start_byte, end_byte, length, headers, part) except ChunkReadTimeout: - new_source, new_node = self._get_source_and_node() - if new_source: - self.app.error_occurred( - self.node, 'Trying to read object during ' - 'GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - self.source_parts_iter = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - else: + if not self._replace_source_and_node( + 'Trying to read object during GET (retrying)'): raise StopIteration() def iter_bytes_from_response_part(self, part_file, nbytes): @@ -1249,24 +1260,8 @@ class GetOrHeadHandler(GetterBase): except RangeAlreadyComplete: break buf = b'' - new_source, new_node = self._get_source_and_node() - if new_source: - self.app.error_occurred( - self.node, 'Trying to read object during ' - 'GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - self.source_parts_iter = \ - http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - + if self._replace_source_and_node( + 'Trying to read object during GET (retrying)'): try: _junk, _junk, _junk, _junk, part_file = \ self.get_next_doc_part() diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index d1221c7667..43dffdfbab 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2568,23 +2568,10 @@ class ECFragGetter(GetterBase): self.source_parts_iter) return (start_byte, end_byte, length, headers, part) except ChunkReadTimeout: - new_source, new_node = self._dig_for_source_and_node() - if not new_source: + if not self._replace_source_and_node( + 'Trying to read next part of EC multi-part GET ' + '(retrying)'): raise - self.app.error_occurred( - self.node, 'Trying to read next part of ' - 'EC multi-part GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - self.source_parts_iter = \ - http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) def iter_bytes_from_response_part(self, part_file, nbytes): buf = b'' @@ -2610,24 +2597,8 @@ class ECFragGetter(GetterBase): except RangeAlreadyComplete: break buf = b'' - old_node = self.node - new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - old_node, 'Trying to read EC fragment ' - 'during GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - self.source_parts_iter = \ - http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) + if self._replace_source_and_node( + 'Trying to read EC fragment during GET (retrying)'): try: _junk, _junk, _junk, _junk, part_file = \ self.get_next_doc_part() @@ -2814,13 +2785,12 @@ class ECFragGetter(GetterBase): node, self.app.recoverable_node_timeout) if source: - self.node = node yield source, node else: yield None, None self.status = self.reason = self.body = self.source_headers = None - def _dig_for_source_and_node(self): + def _get_source_and_node(self): # capture last used etag before continuation used_etag = self.last_headers.get('X-Object-Sysmeta-EC-ETag') for source, node in self.source_and_node_iter: diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index db56ad3eb2..0c2eb50d4b 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -1575,6 +1575,54 @@ class TestReplicatedObjController(CommonObjectControllerMixin, self.assertIn('Accept-Ranges', resp.headers) self.assertNotIn('Connection', resp.headers) + def test_GET_slow_read(self): + self.app.recoverable_node_timeout = 0.01 + self.app.client_timeout = 0.1 + self.app.object_chunk_size = 10 + body = b'test' + etag = md5(body, usedforsecurity=False).hexdigest() + headers = { + 'Etag': etag, + 'Content-Length': len(body), + 'X-Timestamp': Timestamp(self.ts()).normal, + } + responses = [(200, body, headers)] * 2 + status_codes, body_iter, headers = zip(*responses) + req = swift.common.swob.Request.blank('/v1/a/c/o') + # make the first response slow... + read_sleeps = [0.1, 0] + with mocked_http_conn(*status_codes, body_iter=body_iter, + headers=headers, slow=read_sleeps) as log: + resp = req.get_response(self.app) + self.assertEqual(resp.status_int, 200) + _ = resp.body + self.assertEqual(len(log.requests), 2) + + def make_key(r): + r['device'] = r['path'].split('/')[1] + return '%(ip)s:%(port)s/%(device)s' % r + # the first node got errors incr'd + expected_error_limiting = { + make_key(log.requests[0]): { + 'errors': 1, + 'last_error': mock.ANY, + } + } + actual = {} + for n in self.app.get_object_ring(int(self.policy)).devs: + node_key = self.app.error_limiter.node_key(n) + stats = self.app.error_limiter.stats.get(node_key) or {} + if stats: + actual[self.app.error_limiter.node_key(n)] = stats + self.assertEqual(actual, expected_error_limiting) + for read_line in self.app.logger.get_lines_for_level('error'): + self.assertIn("Trying to read object during GET (retrying)", + read_line) + self.assertEqual( + len(self.logger.logger.records['ERROR']), 1, + 'Expected 1 ERROR lines, got %r' % ( + self.logger.logger.records['ERROR'], )) + def test_GET_transfer_encoding_chunked(self): req = swift.common.swob.Request.blank('/v1/a/c/o') with set_http_connect(200, headers={'transfer-encoding': 'chunked'}): @@ -6710,7 +6758,7 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase): def test_iter_bytes_from_response_part_insufficient_bytes(self): part = FileLikeIter([b'some', b'thing']) it = self.getter.iter_bytes_from_response_part(part, nbytes=100) - with mock.patch.object(self.getter, '_dig_for_source_and_node', + with mock.patch.object(self.getter, '_get_source_and_node', return_value=(None, None)): with self.assertRaises(ShortReadError) as cm: b''.join(it) @@ -6722,7 +6770,7 @@ class TestECFragGetter(BaseObjectControllerMixin, unittest.TestCase): self.app.recoverable_node_timeout = 0.05 self.app.client_timeout = 0.8 it = self.getter.iter_bytes_from_response_part(part, nbytes=9) - with mock.patch.object(self.getter, '_dig_for_source_and_node', + with mock.patch.object(self.getter, '_get_source_and_node', return_value=(None, None)): with mock.patch.object(part, 'read', side_effect=[b'some', ChunkReadTimeout(9)]):