From fd6a1ad659a0c1eee1c50206117f5f74b7c34d6f Mon Sep 17 00:00:00 2001 From: Shreeya Deshpande Date: Thu, 11 May 2023 08:21:30 -0700 Subject: [PATCH] Refactor for extract closure to method Change-Id: Iccf43ec7d355e26ae4efe58b167ffccd89b158b3 --- swift/proxy/controllers/base.py | 296 +++++++++++------------ test/unit/proxy/controllers/test_base.py | 20 +- 2 files changed, 163 insertions(+), 153 deletions(-) diff --git a/swift/proxy/controllers/base.py b/swift/proxy/controllers/base.py index 648ed71988..620e724677 100644 --- a/swift/proxy/controllers/base.py +++ b/swift/proxy/controllers/base.py @@ -1034,7 +1034,13 @@ class GetOrHeadHandler(object): self.concurrency = concurrency self.policy = policy self.node = None + self.source = None + self.source_parts_iter = None self.latest_404_timestamp = Timestamp(0) + if self.server_type == 'Object': + self.node_timeout = self.app.recoverable_node_timeout + else: + self.node_timeout = self.app.node_timeout policy_options = self.app.get_policy_options(self.policy) self.rebalance_missing_suppression_count = min( policy_options.rebalance_missing_suppression_count, @@ -1177,156 +1183,148 @@ class GetOrHeadHandler(object): return True return is_success(src.status) or is_redirection(src.status) - def _get_response_parts_iter(self, req, node, source): - # Someday we can replace this [mess] with python 3's "nonlocal" - source = [source] - node = [node] + def get_next_doc_part(self): + while True: + try: + # This call to next() performs IO when we have a + # multipart/byteranges response; it reads the MIME + # boundary and part headers. + # + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and either just returns source or + # raises StopIteration. + with WatchdogTimeout(self.app.watchdog, self.node_timeout, + ChunkReadTimeout): + # if StopIteration is raised, it escapes and is + # handled elsewhere + start_byte, end_byte, length, headers, part = next( + self.source_parts_iter) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.error_occurred( + self.node, 'Trying to read object during ' + 'GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + else: + raise StopIteration() + def iter_bytes_from_response_part(self, part_file, nbytes): + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, self.node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = exc_info() + if self.newest or self.server_type != 'Object': + raise + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + new_source, new_node = self._get_source_and_node() + if new_source: + self.app.error_occurred( + self.node, 'Trying to read object during ' + 'GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + try: + _junk, _junk, _junk, _junk, part_file = \ + self.get_next_doc_part() + except StopIteration: + # Tried to find a new node from which to + # finish the GET, but failed. There's + # nothing more we can do here. + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 + else: + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + if not chunk: + if buf: + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(buf) + yield buf + buf = b'' + break + + if self.client_chunk_size is not None: + while len(buf) >= self.client_chunk_size: + client_chunk = buf[:self.client_chunk_size] + buf = buf[self.client_chunk_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += \ + len(client_chunk) + yield client_chunk + else: + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(buf) + yield buf + buf = b'' + + def _get_response_parts_iter(self, req): try: - client_chunk_size = self.client_chunk_size - node_timeout = self.app.node_timeout - if self.server_type == 'Object': - node_timeout = self.app.recoverable_node_timeout # This is safe; it sets up a generator but does not call next() # on it, so no IO is performed. - parts_iter = [ - http_response_to_document_iters( - source[0], read_chunk_size=self.app.object_chunk_size)] - - def get_next_doc_part(): - while True: - try: - # This call to next() performs IO when we have a - # multipart/byteranges response; it reads the MIME - # boundary and part headers. - # - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and either just returns source or - # raises StopIteration. - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - # if StopIteration is raised, it escapes and is - # handled elsewhere - start_byte, end_byte, length, headers, part = next( - parts_iter[0]) - return (start_byte, end_byte, length, headers, part) - except ChunkReadTimeout: - new_source, new_node = self._get_source_and_node() - if new_source: - self.app.error_occurred( - node[0], 'Trying to read object during ' - 'GET (retrying)') - # Close-out the connection as best as possible. - if getattr(source[0], 'swift_conn', None): - close_swift_conn(source[0]) - source[0] = new_source - node[0] = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - else: - raise StopIteration() - - def iter_bytes_from_response_part(part_file, nbytes): - buf = b'' - part_file = ByteCountEnforcer(part_file, nbytes) - while True: - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - chunk = part_file.read(self.app.object_chunk_size) - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk - if nbytes is not None: - nbytes -= len(chunk) - except (ChunkReadTimeout, ShortReadError): - exc_type, exc_value, exc_traceback = exc_info() - if self.newest or self.server_type != 'Object': - raise - try: - self.fast_forward(self.bytes_used_from_backend) - except (HTTPException, ValueError): - six.reraise(exc_type, exc_value, exc_traceback) - except RangeAlreadyComplete: - break - buf = b'' - new_source, new_node = self._get_source_and_node() - if new_source: - self.app.error_occurred( - node[0], 'Trying to read object during ' - 'GET (retrying)') - # Close-out the connection as best as possible. - if getattr(source[0], 'swift_conn', None): - close_swift_conn(source[0]) - source[0] = new_source - node[0] = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - parts_iter[0] = http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - - try: - _junk, _junk, _junk, _junk, part_file = \ - get_next_doc_part() - except StopIteration: - # Tried to find a new node from which to - # finish the GET, but failed. There's - # nothing more we can do here. - six.reraise(exc_type, exc_value, exc_traceback) - part_file = ByteCountEnforcer(part_file, nbytes) - else: - six.reraise(exc_type, exc_value, exc_traceback) - else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - break - - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' + self.source_parts_iter = http_response_to_document_iters( + self.source, read_chunk_size=self.app.object_chunk_size) part_iter = None try: while True: start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + self.get_next_doc_part() # note: learn_size_from_content_range() sets # self.skip_bytes self.learn_size_from_content_range( @@ -1339,7 +1337,7 @@ class GetOrHeadHandler(object): and start_byte is not None) else None) part_iter = CooperativeIterator( - iter_bytes_from_response_part(part, byte_count)) + self.iter_bytes_from_response_part(part, byte_count)) yield {'start_byte': start_byte, 'end_byte': end_byte, 'entity_length': length, 'headers': headers, 'part_iter': part_iter} @@ -1351,7 +1349,7 @@ class GetOrHeadHandler(object): part_iter.close() except ChunkReadTimeout: - self.app.exception_occurred(node[0], 'Object', + self.app.exception_occurred(self.node, 'Object', 'Trying to read during GET') raise except ChunkWriteTimeout: @@ -1378,8 +1376,8 @@ class GetOrHeadHandler(object): raise finally: # Close-out the connection as best as possible. - if getattr(source[0], 'swift_conn', None): - close_swift_conn(source[0]) + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) @property def last_status(self): @@ -1546,7 +1544,7 @@ class GetOrHeadHandler(object): return source, node return None, None - def _make_app_iter(self, req, node, source): + def _make_app_iter(self, req): """ Returns an iterator over the contents of the source (via its read func). There is also quite a bit of cleanup to ensure garbage @@ -1558,7 +1556,7 @@ class GetOrHeadHandler(object): :param node: The node the source is reading from, for logging purposes. """ - ct = source.getheader('Content-Type') + ct = self.source.getheader('Content-Type') if ct: content_type, content_type_attrs = parse_content_type(ct) is_multipart = content_type == 'multipart/byteranges' @@ -1571,7 +1569,7 @@ class GetOrHeadHandler(object): # furnished one for us, so we'll just re-use it boundary = dict(content_type_attrs)["boundary"] - parts_iter = self._get_response_parts_iter(req, node, source) + parts_iter = self._get_response_parts_iter(req) def add_content_type(response_part): response_part["content_type"] = \ @@ -1591,7 +1589,9 @@ class GetOrHeadHandler(object): update_headers(res, source.getheaders()) if req.method == 'GET' and \ source.status in (HTTP_OK, HTTP_PARTIAL_CONTENT): - res.app_iter = self._make_app_iter(req, node, source) + self.source = source + self.node = node + res.app_iter = self._make_app_iter(req) # See NOTE: swift_conn at top of file about this. res.swift_conn = source.swift_conn if not res.environ: diff --git a/test/unit/proxy/controllers/test_base.py b/test/unit/proxy/controllers/test_base.py index c5004bc12a..a2bd055427 100644 --- a/test/unit/proxy/controllers/test_base.py +++ b/test/unit/proxy/controllers/test_base.py @@ -1299,7 +1299,9 @@ class TestFuncs(BaseTest): self.app, req, None, Namespace(num_primary_nodes=3), None, None, {}, client_chunk_size=8) - app_iter = handler._make_app_iter(req, node, source) + handler.source = source + handler.node = node + app_iter = handler._make_app_iter(req) client_chunks = list(app_iter) self.assertEqual(client_chunks, [ b'abcd1234', b'abcd1234', b'abcd1234', b'abcd12']) @@ -1350,7 +1352,9 @@ class TestFuncs(BaseTest): range_headers.append(handler.backend_headers['Range']) return sources.pop(0) - app_iter = handler._make_app_iter(req, node, source1) + handler.source = source1 + handler.node = node + app_iter = handler._make_app_iter(req) with mock.patch.object(handler, '_get_source_and_node', side_effect=mock_get_source_and_node): client_chunks = list(app_iter) @@ -1392,7 +1396,9 @@ class TestFuncs(BaseTest): self.app, req, 'Object', Namespace(num_primary_nodes=1), None, None, {}, client_chunk_size=8) - app_iter = handler._make_app_iter(req, node, source1) + handler.source = source1 + handler.node = node + app_iter = handler._make_app_iter(req) with mock.patch.object(handler, '_get_source_and_node', lambda: (source2, node)): client_chunks = list(app_iter) @@ -1423,7 +1429,9 @@ class TestFuncs(BaseTest): handler = GetOrHeadHandler( self.app, req, 'Object', Namespace(num_primary_nodes=1), None, 'some-path', {}) - app_iter = handler._make_app_iter(req, node, source) + handler.source = source + handler.node = node + app_iter = handler._make_app_iter(req) app_iter.close() self.app.logger.info.assert_called_once_with( 'Client disconnected on read of %r', 'some-path') @@ -1433,7 +1441,9 @@ class TestFuncs(BaseTest): handler = GetOrHeadHandler( self.app, req, 'Object', Namespace(num_primary_nodes=1), None, None, {}) - app_iter = handler._make_app_iter(req, node, source) + handler.source = source + handler.node = node + app_iter = handler._make_app_iter(req) next(app_iter) app_iter.close() self.app.logger.warning.assert_not_called()