Merge "Extract some closures to methods"

This commit is contained in:
Zuul 2023-04-24 19:49:03 +00:00 committed by Gerrit Code Review
commit 0a970d0736

View File

@ -2620,171 +2620,172 @@ class ECFragGetter(object):
it = self._get_response_parts_iter(req)
return it
def get_next_doc_part(self):
node_timeout = self.app.recoverable_node_timeout
while True:
# the loop here is to resume if trying to parse
# multipart/byteranges response raises a ChunkReadTimeout
# and resets the source_parts_iter
try:
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
# If we don't have a multipart/byteranges response,
# but just a 200 or a single-range 206, then this
# performs no IO, and just returns source (or
# raises StopIteration).
# Otherwise, this call to next() performs IO when
# we have a multipart/byteranges response; as it
# will read the MIME boundary and part headers.
start_byte, end_byte, length, headers, part = next(
self.source_parts_iter)
return (start_byte, end_byte, length, headers, part)
except ChunkReadTimeout:
new_source, new_node = self._dig_for_source_and_node()
if not new_source:
raise
self.app.error_occurred(
self.node, 'Trying to read next part of '
'EC multi-part GET (retrying)')
# Close-out the connection as best as possible.
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
self.source = new_source
self.node = new_node
# This is safe; it sets up a generator but does
# not call next() on it, so no IO is performed.
self.source_parts_iter = \
http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
def iter_bytes_from_response_part(self, part_file, nbytes):
client_chunk_size = self.client_chunk_size
node_timeout = self.app.recoverable_node_timeout
nchunks = 0
buf = b''
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
try:
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
# NB: this append must be *inside* the context
# manager for test.unit.SlowBody to do its thing
buf += chunk
if nbytes is not None:
nbytes -= len(chunk)
except (ChunkReadTimeout, ShortReadError):
exc_type, exc_value, exc_traceback = sys.exc_info()
try:
self.fast_forward(self.bytes_used_from_backend)
except (HTTPException, ValueError):
self.logger.exception('Unable to fast forward')
six.reraise(exc_type, exc_value, exc_traceback)
except RangeAlreadyComplete:
break
buf = b''
old_node = self.node
new_source, new_node = self._dig_for_source_and_node()
if new_source:
self.app.error_occurred(
old_node, 'Trying to read EC fragment '
'during GET (retrying)')
# Close-out the connection as best as possible.
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
self.source = new_source
self.node = new_node
# This is safe; it just sets up a generator but
# does not call next() on it, so no IO is
# performed.
self.source_parts_iter = \
http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
try:
_junk, _junk, _junk, _junk, part_file = \
self.get_next_doc_part()
except StopIteration:
# it's not clear to me how to make
# get_next_doc_part raise StopIteration for the
# first doc part of a new request
six.reraise(exc_type, exc_value, exc_traceback)
part_file = ByteCountEnforcer(part_file, nbytes)
else:
six.reraise(exc_type, exc_value, exc_traceback)
else:
if buf and self.skip_bytes:
if self.skip_bytes < len(buf):
buf = buf[self.skip_bytes:]
self.bytes_used_from_backend += self.skip_bytes
self.skip_bytes = 0
else:
self.skip_bytes -= len(buf)
self.bytes_used_from_backend += len(buf)
buf = b''
if not chunk:
if buf:
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
break
if client_chunk_size is not None:
while len(buf) >= client_chunk_size:
client_chunk = buf[:client_chunk_size]
buf = buf[client_chunk_size:]
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += \
len(client_chunk)
yield client_chunk
else:
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
# This is for fairness; if the network is outpacing
# the CPU, we'll always be able to read and write
# data without encountering an EWOULDBLOCK, and so
# eventlet will not switch greenthreads on its own.
# We do it manually so that clients don't starve.
#
# The number 5 here was chosen by making stuff up.
# It's not every single chunk, but it's not too big
# either, so it seemed like it would probably be an
# okay choice.
#
# Note that we may trampoline to other greenthreads
# more often than once every 5 chunks, depending on
# how blocking our network IO is; the explicit sleep
# here simply provides a lower bound on the rate of
# trampolining.
if nchunks % 5 == 0:
sleep()
def _get_response_parts_iter(self, req):
try:
client_chunk_size = self.client_chunk_size
node_timeout = self.app.recoverable_node_timeout
# This is safe; it sets up a generator but does not call next()
# on it, so no IO is performed.
self.source_parts_iter = http_response_to_document_iters(
self.source, read_chunk_size=self.app.object_chunk_size)
def get_next_doc_part():
while True:
# the loop here is to resume if trying to parse
# multipart/byteranges response raises a ChunkReadTimeout
# and resets the source_parts_iter
try:
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
# If we don't have a multipart/byteranges response,
# but just a 200 or a single-range 206, then this
# performs no IO, and just returns source (or
# raises StopIteration).
# Otherwise, this call to next() performs IO when
# we have a multipart/byteranges response; as it
# will read the MIME boundary and part headers.
start_byte, end_byte, length, headers, part = next(
self.source_parts_iter)
return (start_byte, end_byte, length, headers, part)
except ChunkReadTimeout:
new_source, new_node = self._dig_for_source_and_node()
if not new_source:
raise
self.app.error_occurred(
self.node, 'Trying to read next part of '
'EC multi-part GET (retrying)')
# Close-out the connection as best as possible.
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
self.source = new_source
self.node = new_node
# This is safe; it sets up a generator but does
# not call next() on it, so no IO is performed.
self.source_parts_iter = \
http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
def iter_bytes_from_response_part(part_file, nbytes):
nchunks = 0
buf = b''
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
try:
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
# NB: this append must be *inside* the context
# manager for test.unit.SlowBody to do its thing
buf += chunk
if nbytes is not None:
nbytes -= len(chunk)
except (ChunkReadTimeout, ShortReadError):
exc_type, exc_value, exc_traceback = sys.exc_info()
try:
self.fast_forward(self.bytes_used_from_backend)
except (HTTPException, ValueError):
self.logger.exception('Unable to fast forward')
six.reraise(exc_type, exc_value, exc_traceback)
except RangeAlreadyComplete:
break
buf = b''
old_node = self.node
new_source, new_node = self._dig_for_source_and_node()
if new_source:
self.app.error_occurred(
old_node, 'Trying to read EC fragment '
'during GET (retrying)')
# Close-out the connection as best as possible.
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
self.source = new_source
self.node = new_node
# This is safe; it just sets up a generator but
# does not call next() on it, so no IO is
# performed.
self.source_parts_iter = \
http_response_to_document_iters(
new_source,
read_chunk_size=self.app.object_chunk_size)
try:
_junk, _junk, _junk, _junk, part_file = \
get_next_doc_part()
except StopIteration:
# it's not clear to me how to make
# get_next_doc_part raise StopIteration for the
# first doc part of a new request
six.reraise(exc_type, exc_value, exc_traceback)
part_file = ByteCountEnforcer(part_file, nbytes)
else:
six.reraise(exc_type, exc_value, exc_traceback)
else:
if buf and self.skip_bytes:
if self.skip_bytes < len(buf):
buf = buf[self.skip_bytes:]
self.bytes_used_from_backend += self.skip_bytes
self.skip_bytes = 0
else:
self.skip_bytes -= len(buf)
self.bytes_used_from_backend += len(buf)
buf = b''
if not chunk:
if buf:
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
break
if client_chunk_size is not None:
while len(buf) >= client_chunk_size:
client_chunk = buf[:client_chunk_size]
buf = buf[client_chunk_size:]
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += \
len(client_chunk)
yield client_chunk
else:
with WatchdogTimeout(self.app.watchdog,
self.app.client_timeout,
ChunkWriteTimeout):
self.bytes_used_from_backend += len(buf)
yield buf
buf = b''
# This is for fairness; if the network is outpacing
# the CPU, we'll always be able to read and write
# data without encountering an EWOULDBLOCK, and so
# eventlet will not switch greenthreads on its own.
# We do it manually so that clients don't starve.
#
# The number 5 here was chosen by making stuff up.
# It's not every single chunk, but it's not too big
# either, so it seemed like it would probably be an
# okay choice.
#
# Note that we may trampoline to other greenthreads
# more often than once every 5 chunks, depending on
# how blocking our network IO is; the explicit sleep
# here simply provides a lower bound on the rate of
# trampolining.
if nchunks % 5 == 0:
sleep()
part_iter = None
try:
while True:
try:
start_byte, end_byte, length, headers, part = \
get_next_doc_part()
self.get_next_doc_part()
except StopIteration:
# it seems this is the only way out of the loop; not
# sure why the req.environ update is always needed
@ -2801,7 +2802,8 @@ class ECFragGetter(object):
if (end_byte is not None
and start_byte is not None)
else None)
part_iter = iter_bytes_from_response_part(part, byte_count)
part_iter = self.iter_bytes_from_response_part(
part, byte_count)
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}