Merge "proxy: add periodic zero-time sleep during object PUT"

This commit is contained in:
Zuul 2023-05-19 23:14:32 +00:00 committed by Gerrit Code Review
commit ef64b63fdf
4 changed files with 119 additions and 48 deletions

View File

@ -6323,3 +6323,48 @@ class WatchdogTimeout(object):
def __exit__(self, type, value, traceback):
self.watchdog.stop(self.key)
class CooperativeIterator(object):
"""
Wrapper to make a deliberate periodic call to ``sleep()`` while iterating
over wrapped iterator, providing an opportunity to switch greenthreads.
This is for fairness; if the network is outpacing the CPU, we'll always be
able to read and write data without encountering an EWOULDBLOCK, and so
eventlet will not switch greenthreads on its own. We do it manually so that
clients don't starve.
The number 5 here was chosen by making stuff up. It's not every single
chunk, but it's not too big either, so it seemed like it would probably be
an okay choice.
Note that we may trampoline to other greenthreads more often than once
every 5 chunks, depending on how blocking our network IO is; the explicit
sleep here simply provides a lower bound on the rate of trampolining.
:param iterable: iterator to wrap.
:param period: number of items yielded from this iterator between calls to
``sleep()``.
"""
__slots__ = ('period', 'count', 'wrapped_iter')
def __init__(self, iterable, period=5):
self.wrapped_iter = iterable
self.count = 0
self.period = period
def __iter__(self):
return self
def next(self):
if self.count >= self.period:
self.count = 0
sleep()
self.count += 1
return next(self.wrapped_iter)
__next__ = next
def close(self):
close_if_possible(self.wrapped_iter)

View File

@ -36,7 +36,6 @@ import random
from copy import deepcopy
from sys import exc_info
from eventlet import sleep
from eventlet.timeout import Timeout
import six
@ -45,7 +44,7 @@ from swift.common.utils import Timestamp, WatchdogTimeout, config_true_value, \
public, split_path, list_from_csv, GreenthreadSafeIterator, \
GreenAsyncPile, quorum_size, parse_content_type, drain_and_close, \
document_iters_to_http_response_body, ShardRange, cache_from_env, \
MetricsPrefixLoggerAdapter
MetricsPrefixLoggerAdapter, CooperativeIterator
from swift.common.bufferedhttp import http_connect
from swift.common import constraints
from swift.common.exceptions import ChunkReadTimeout, ChunkWriteTimeout, \
@ -1233,7 +1232,6 @@ class GetOrHeadHandler(object):
raise StopIteration()
def iter_bytes_from_response_part(part_file, nbytes):
nchunks = 0
buf = b''
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
@ -1241,7 +1239,6 @@ class GetOrHeadHandler(object):
with WatchdogTimeout(self.app.watchdog, node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
# NB: this append must be *inside* the context
# manager for test.unit.SlowBody to do its thing
buf += chunk
@ -1325,25 +1322,6 @@ class GetOrHeadHandler(object):
yield buf
buf = b''
# This is for fairness; if the network is outpacing
# the CPU, we'll always be able to read and write
# data without encountering an EWOULDBLOCK, and so
# eventlet will not switch greenthreads on its own.
# We do it manually so that clients don't starve.
#
# The number 5 here was chosen by making stuff up.
# It's not every single chunk, but it's not too big
# either, so it seemed like it would probably be an
# okay choice.
#
# Note that we may trampoline to other greenthreads
# more often than once every 5 chunks, depending on
# how blocking our network IO is; the explicit sleep
# here simply provides a lower bound on the rate of
# trampolining.
if nchunks % 5 == 0:
sleep()
part_iter = None
try:
while True:
@ -1360,7 +1338,8 @@ class GetOrHeadHandler(object):
if (end_byte is not None
and start_byte is not None)
else None)
part_iter = iter_bytes_from_response_part(part, byte_count)
part_iter = CooperativeIterator(
iter_bytes_from_response_part(part, byte_count))
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}

View File

@ -48,7 +48,8 @@ from swift.common.utils import (
normalize_delete_at_timestamp, public, get_expirer_container,
document_iters_to_http_response_body, parse_content_range,
quorum_size, reiterate, close_if_possible, safe_json_loads, md5,
ShardRange, find_namespace, cache_from_env, NamespaceBoundList)
ShardRange, find_namespace, cache_from_env, NamespaceBoundList,
CooperativeIterator)
from swift.common.bufferedhttp import http_connect
from swift.common.constraints import check_metadata, check_object_creation
from swift.common import constraints
@ -1007,6 +1008,7 @@ class ReplicatedObjectController(BaseObjectController):
This method was added in the PUT method extraction change
"""
bytes_transferred = 0
data_source = CooperativeIterator(data_source)
def send_chunk(chunk):
timeout_at = time.time() + self.app.node_timeout
@ -2659,7 +2661,6 @@ class ECFragGetter(object):
read_chunk_size=self.app.object_chunk_size)
def iter_bytes_from_response_part(self, part_file, nbytes):
nchunks = 0
buf = b''
part_file = ByteCountEnforcer(part_file, nbytes)
while True:
@ -2668,7 +2669,6 @@ class ECFragGetter(object):
self.app.recoverable_node_timeout,
ChunkReadTimeout):
chunk = part_file.read(self.app.object_chunk_size)
nchunks += 1
# NB: this append must be *inside* the context
# manager for test.unit.SlowBody to do its thing
buf += chunk
@ -2736,25 +2736,6 @@ class ECFragGetter(object):
if not chunk:
break
# This is for fairness; if the network is outpacing
# the CPU, we'll always be able to read and write
# data without encountering an EWOULDBLOCK, and so
# eventlet will not switch greenthreads on its own.
# We do it manually so that clients don't starve.
#
# The number 5 here was chosen by making stuff up.
# It's not every single chunk, but it's not too big
# either, so it seemed like it would probably be an
# okay choice.
#
# Note that we may trampoline to other greenthreads
# more often than once every 5 chunks, depending on
# how blocking our network IO is; the explicit sleep
# here simply provides a lower bound on the rate of
# trampolining.
if nchunks % 5 == 0:
sleep()
def _get_response_parts_iter(self, req):
try:
# This is safe; it sets up a generator but does not call next()
@ -2784,8 +2765,8 @@ class ECFragGetter(object):
if (end_byte is not None
and start_byte is not None)
else None)
part_iter = self.iter_bytes_from_response_part(
part, byte_count)
part_iter = CooperativeIterator(
self.iter_bytes_from_response_part(part, byte_count))
yield {'start_byte': start_byte, 'end_byte': end_byte,
'entity_length': length, 'headers': headers,
'part_iter': part_iter}
@ -3360,6 +3341,7 @@ class ECObjectController(BaseObjectController):
# same part nodes index as the primaries they are covering
putter_to_frag_index = self._determine_chunk_destinations(
putters, policy)
data_source = CooperativeIterator(data_source)
while True:
with WatchdogTimeout(self.app.watchdog,

View File

@ -17,6 +17,7 @@
from __future__ import print_function
import hashlib
import itertools
from test import annotate_failure
from test.debug_logger import debug_logger
@ -8984,3 +8985,67 @@ class TestCloseableChain(unittest.TestCase):
chain.close()
self.assertEqual(1, test_iter1.close_call_count)
self.assertTrue(generator_closed[0])
class TestCooperativeIterator(unittest.TestCase):
def test_init(self):
wrapped = itertools.count()
it = utils.CooperativeIterator(wrapped, period=3)
self.assertIs(wrapped, it.wrapped_iter)
self.assertEqual(0, it.count)
self.assertEqual(3, it.period)
def test_iter(self):
it = utils.CooperativeIterator(itertools.count())
actual = []
with mock.patch('swift.common.utils.sleep') as mock_sleep:
for i in it:
if i >= 100:
break
actual.append(i)
self.assertEqual(list(range(100)), actual)
self.assertEqual(20, mock_sleep.call_count)
def test_close(self):
it = utils.CooperativeIterator(range(5))
it.close()
closeable = mock.MagicMock()
closeable.close = mock.MagicMock()
it = utils.CooperativeIterator(closeable)
it.close()
self.assertTrue(closeable.close.called)
def test_next(self):
def do_test(it, period):
results = []
for i in range(period):
with mock.patch('swift.common.utils.sleep') as mock_sleep:
results.append(next(it))
self.assertFalse(mock_sleep.called, i)
with mock.patch('swift.common.utils.sleep') as mock_sleep:
results.append(next(it))
self.assertTrue(mock_sleep.called)
for i in range(period - 1):
with mock.patch('swift.common.utils.sleep') as mock_sleep:
results.append(next(it))
self.assertFalse(mock_sleep.called, i)
with mock.patch('swift.common.utils.sleep') as mock_sleep:
results.append(next(it))
self.assertTrue(mock_sleep.called)
return results
actual = do_test(utils.CooperativeIterator(itertools.count()), 5)
self.assertEqual(list(range(11)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 5), 5)
self.assertEqual(list(range(11)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 3), 3)
self.assertEqual(list(range(7)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 1), 1)
self.assertEqual(list(range(3)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0)
self.assertEqual(list(range(2)), actual)