diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 7149ecfb7f..057c77ffa3 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2620,171 +2620,172 @@ class ECFragGetter(object): it = self._get_response_parts_iter(req) return it + def get_next_doc_part(self): + node_timeout = self.app.recoverable_node_timeout + + while True: + # the loop here is to resume if trying to parse + # multipart/byteranges response raises a ChunkReadTimeout + # and resets the source_parts_iter + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + # If we don't have a multipart/byteranges response, + # but just a 200 or a single-range 206, then this + # performs no IO, and just returns source (or + # raises StopIteration). + # Otherwise, this call to next() performs IO when + # we have a multipart/byteranges response; as it + # will read the MIME boundary and part headers. + start_byte, end_byte, length, headers, part = next( + self.source_parts_iter) + return (start_byte, end_byte, length, headers, part) + except ChunkReadTimeout: + new_source, new_node = self._dig_for_source_and_node() + if not new_source: + raise + self.app.error_occurred( + self.node, 'Trying to read next part of ' + 'EC multi-part GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it sets up a generator but does + # not call next() on it, so no IO is performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + + def iter_bytes_from_response_part(self, part_file, nbytes): + client_chunk_size = self.client_chunk_size + node_timeout = self.app.recoverable_node_timeout + nchunks = 0 + buf = b'' + part_file = ByteCountEnforcer(part_file, nbytes) + while True: + try: + with WatchdogTimeout(self.app.watchdog, node_timeout, + ChunkReadTimeout): + chunk = part_file.read(self.app.object_chunk_size) + nchunks += 1 + # NB: this append must be *inside* the context + # manager for test.unit.SlowBody to do its thing + buf += chunk + if nbytes is not None: + nbytes -= len(chunk) + except (ChunkReadTimeout, ShortReadError): + exc_type, exc_value, exc_traceback = sys.exc_info() + try: + self.fast_forward(self.bytes_used_from_backend) + except (HTTPException, ValueError): + self.logger.exception('Unable to fast forward') + six.reraise(exc_type, exc_value, exc_traceback) + except RangeAlreadyComplete: + break + buf = b'' + old_node = self.node + new_source, new_node = self._dig_for_source_and_node() + if new_source: + self.app.error_occurred( + old_node, 'Trying to read EC fragment ' + 'during GET (retrying)') + # Close-out the connection as best as possible. + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node + # This is safe; it just sets up a generator but + # does not call next() on it, so no IO is + # performed. + self.source_parts_iter = \ + http_response_to_document_iters( + new_source, + read_chunk_size=self.app.object_chunk_size) + try: + _junk, _junk, _junk, _junk, part_file = \ + self.get_next_doc_part() + except StopIteration: + # it's not clear to me how to make + # get_next_doc_part raise StopIteration for the + # first doc part of a new request + six.reraise(exc_type, exc_value, exc_traceback) + part_file = ByteCountEnforcer(part_file, nbytes) + else: + six.reraise(exc_type, exc_value, exc_traceback) + else: + if buf and self.skip_bytes: + if self.skip_bytes < len(buf): + buf = buf[self.skip_bytes:] + self.bytes_used_from_backend += self.skip_bytes + self.skip_bytes = 0 + else: + self.skip_bytes -= len(buf) + self.bytes_used_from_backend += len(buf) + buf = b'' + + if not chunk: + if buf: + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(buf) + yield buf + buf = b'' + break + + if client_chunk_size is not None: + while len(buf) >= client_chunk_size: + client_chunk = buf[:client_chunk_size] + buf = buf[client_chunk_size:] + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += \ + len(client_chunk) + yield client_chunk + else: + with WatchdogTimeout(self.app.watchdog, + self.app.client_timeout, + ChunkWriteTimeout): + self.bytes_used_from_backend += len(buf) + yield buf + buf = b'' + + # This is for fairness; if the network is outpacing + # the CPU, we'll always be able to read and write + # data without encountering an EWOULDBLOCK, and so + # eventlet will not switch greenthreads on its own. + # We do it manually so that clients don't starve. + # + # The number 5 here was chosen by making stuff up. + # It's not every single chunk, but it's not too big + # either, so it seemed like it would probably be an + # okay choice. + # + # Note that we may trampoline to other greenthreads + # more often than once every 5 chunks, depending on + # how blocking our network IO is; the explicit sleep + # here simply provides a lower bound on the rate of + # trampolining. + if nchunks % 5 == 0: + sleep() + def _get_response_parts_iter(self, req): try: - client_chunk_size = self.client_chunk_size - node_timeout = self.app.recoverable_node_timeout - # This is safe; it sets up a generator but does not call next() # on it, so no IO is performed. self.source_parts_iter = http_response_to_document_iters( self.source, read_chunk_size=self.app.object_chunk_size) - def get_next_doc_part(): - while True: - # the loop here is to resume if trying to parse - # multipart/byteranges response raises a ChunkReadTimeout - # and resets the source_parts_iter - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - # If we don't have a multipart/byteranges response, - # but just a 200 or a single-range 206, then this - # performs no IO, and just returns source (or - # raises StopIteration). - # Otherwise, this call to next() performs IO when - # we have a multipart/byteranges response; as it - # will read the MIME boundary and part headers. - start_byte, end_byte, length, headers, part = next( - self.source_parts_iter) - return (start_byte, end_byte, length, headers, part) - except ChunkReadTimeout: - new_source, new_node = self._dig_for_source_and_node() - if not new_source: - raise - self.app.error_occurred( - self.node, 'Trying to read next part of ' - 'EC multi-part GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it sets up a generator but does - # not call next() on it, so no IO is performed. - self.source_parts_iter = \ - http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - - def iter_bytes_from_response_part(part_file, nbytes): - nchunks = 0 - buf = b'' - part_file = ByteCountEnforcer(part_file, nbytes) - while True: - try: - with WatchdogTimeout(self.app.watchdog, node_timeout, - ChunkReadTimeout): - chunk = part_file.read(self.app.object_chunk_size) - nchunks += 1 - # NB: this append must be *inside* the context - # manager for test.unit.SlowBody to do its thing - buf += chunk - if nbytes is not None: - nbytes -= len(chunk) - except (ChunkReadTimeout, ShortReadError): - exc_type, exc_value, exc_traceback = sys.exc_info() - try: - self.fast_forward(self.bytes_used_from_backend) - except (HTTPException, ValueError): - self.logger.exception('Unable to fast forward') - six.reraise(exc_type, exc_value, exc_traceback) - except RangeAlreadyComplete: - break - buf = b'' - old_node = self.node - new_source, new_node = self._dig_for_source_and_node() - if new_source: - self.app.error_occurred( - old_node, 'Trying to read EC fragment ' - 'during GET (retrying)') - # Close-out the connection as best as possible. - if getattr(self.source, 'swift_conn', None): - close_swift_conn(self.source) - self.source = new_source - self.node = new_node - # This is safe; it just sets up a generator but - # does not call next() on it, so no IO is - # performed. - self.source_parts_iter = \ - http_response_to_document_iters( - new_source, - read_chunk_size=self.app.object_chunk_size) - try: - _junk, _junk, _junk, _junk, part_file = \ - get_next_doc_part() - except StopIteration: - # it's not clear to me how to make - # get_next_doc_part raise StopIteration for the - # first doc part of a new request - six.reraise(exc_type, exc_value, exc_traceback) - part_file = ByteCountEnforcer(part_file, nbytes) - else: - six.reraise(exc_type, exc_value, exc_traceback) - else: - if buf and self.skip_bytes: - if self.skip_bytes < len(buf): - buf = buf[self.skip_bytes:] - self.bytes_used_from_backend += self.skip_bytes - self.skip_bytes = 0 - else: - self.skip_bytes -= len(buf) - self.bytes_used_from_backend += len(buf) - buf = b'' - - if not chunk: - if buf: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - break - - if client_chunk_size is not None: - while len(buf) >= client_chunk_size: - client_chunk = buf[:client_chunk_size] - buf = buf[client_chunk_size:] - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += \ - len(client_chunk) - yield client_chunk - else: - with WatchdogTimeout(self.app.watchdog, - self.app.client_timeout, - ChunkWriteTimeout): - self.bytes_used_from_backend += len(buf) - yield buf - buf = b'' - - # This is for fairness; if the network is outpacing - # the CPU, we'll always be able to read and write - # data without encountering an EWOULDBLOCK, and so - # eventlet will not switch greenthreads on its own. - # We do it manually so that clients don't starve. - # - # The number 5 here was chosen by making stuff up. - # It's not every single chunk, but it's not too big - # either, so it seemed like it would probably be an - # okay choice. - # - # Note that we may trampoline to other greenthreads - # more often than once every 5 chunks, depending on - # how blocking our network IO is; the explicit sleep - # here simply provides a lower bound on the rate of - # trampolining. - if nchunks % 5 == 0: - sleep() - part_iter = None try: while True: try: start_byte, end_byte, length, headers, part = \ - get_next_doc_part() + self.get_next_doc_part() except StopIteration: # it seems this is the only way out of the loop; not # sure why the req.environ update is always needed @@ -2801,7 +2802,8 @@ class ECFragGetter(object): if (end_byte is not None and start_byte is not None) else None) - part_iter = iter_bytes_from_response_part(part, byte_count) + part_iter = self.iter_bytes_from_response_part( + part, byte_count) yield {'start_byte': start_byte, 'end_byte': end_byte, 'entity_length': length, 'headers': headers, 'part_iter': part_iter}