Fix proxy traceback for GeneratorExit in py3

Client when explicitly closed before finishing the download.
leads to a 499, but the shutdown logging for proxy in py3
needs to be fixed. We have done it by killing all running
coroutines in the ContextPool

Change-Id: Ic372ea9866bb7f2659e02f8796cdee01406e2079
This commit is contained in:
indianwhocodes
2023-04-13 12:01:07 -07:00
committed by Tim Burke
parent cc26229eef
commit be0c481e44
4 changed files with 44 additions and 5 deletions

View File

@@ -2908,6 +2908,9 @@ class ContextPool(GreenPool):
return self
def __exit__(self, type, value, traceback):
self.close()
def close(self):
for coro in list(self.coroutines_running):
coro.kill()

View File

@@ -38,7 +38,7 @@ import random
import sys
from greenlet import GreenletExit
from eventlet import GreenPile, sleep
from eventlet import GreenPile
from eventlet.queue import Queue, Empty
from eventlet.timeout import Timeout
@@ -1166,14 +1166,15 @@ class ECAppIter(object):
self.mime_boundary = None
self.learned_content_type = None
self.stashed_iter = None
self.pool = ContextPool(len(internal_parts_iters))
def close(self):
# close down the stashed iter first so the ContextPool can
# cleanup the frag queue feeding coros that may be currently
# close down the stashed iter and shutdown the context pool to
# clean up the frag queue feeding coroutines that may be currently
# executing the internal_parts_iters.
if self.stashed_iter:
close_if_possible(self.stashed_iter)
sleep() # Give the per-frag threads a chance to clean up
self.pool.close()
for it in self.internal_parts_iters:
close_if_possible(it)
@@ -1531,7 +1532,7 @@ class ECAppIter(object):
frag_iter.close()
segments_decoded = 0
with ContextPool(len(fragment_iters)) as pool:
with self.pool as pool:
for frag_iter, queue in zip(fragment_iters, queues):
pool.spawn(put_fragments_in_queue, frag_iter, queue,
self.logger.thread_locals)

View File

@@ -8921,3 +8921,23 @@ class TestCooperativeIterator(unittest.TestCase):
self.assertEqual(list(range(3)), actual)
actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0)
self.assertEqual(list(range(2)), actual)
class TestContextPool(unittest.TestCase):
def test_context_manager(self):
size = 5
pool = utils.ContextPool(size)
with pool:
for _ in range(size):
pool.spawn(eventlet.sleep, 10)
self.assertEqual(pool.running(), size)
self.assertEqual(pool.running(), 0)
def test_close(self):
size = 10
pool = utils.ContextPool(size)
for _ in range(size):
pool.spawn(eventlet.sleep, 10)
self.assertEqual(pool.running(), size)
pool.close()
self.assertEqual(pool.running(), 0)

View File

@@ -2411,6 +2411,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
def test_GET_disconnect(self):
self.app.recoverable_node_timeout = 0.01
self.app.client_timeout = 0.1
# Before, we used the default 64k chunk size, so the entire ~16k test
# data would come in the first chunk, and the generator should
# cleanly exit by the time we reiterate() the response.
self.app.object_chunk_size = 10
segment_size = self.policy.ec_segment_size
test_data = (b'test' * segment_size)[:-743]
etag = md5(test_data, usedforsecurity=False).hexdigest()
@@ -2455,6 +2459,17 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
if stats:
actual[self.app.error_limiter.node_key(n)] = stats
self.assertEqual(actual, expected_error_limiting)
expected = ["Client disconnected on read of EC frag '/a/c/o'"] * 10
self.assertEqual(
self.app.logger.get_lines_for_level('warning'),
expected)
for read_line in self.app.logger.get_lines_for_level('error'):
self.assertIn("Trying to read EC fragment during GET (retrying)",
read_line)
self.assertEqual(
len(self.logger.logger.records['ERROR']), 4,
'Expected 4 ERROR lines, got %r' % (
self.logger.logger.records['ERROR'], ))
def test_GET_not_found_when_404_newer(self):
# if proxy receives a 404, it keeps waiting for other connections until