From be0c481e44e4f09b3c7fbe37f4b9ee62caa12a27 Mon Sep 17 00:00:00 2001 From: indianwhocodes Date: Thu, 13 Apr 2023 12:01:07 -0700 Subject: [PATCH] Fix proxy traceback for GeneratorExit in py3 Client when explicitly closed before finishing the download. leads to a 499, but the shutdown logging for proxy in py3 needs to be fixed. We have done it by killing all running coroutines in the ContextPool Change-Id: Ic372ea9866bb7f2659e02f8796cdee01406e2079 --- swift/common/utils/__init__.py | 3 +++ swift/proxy/controllers/obj.py | 11 ++++++----- test/unit/common/test_utils.py | 20 ++++++++++++++++++++ test/unit/proxy/controllers/test_obj.py | 15 +++++++++++++++ 4 files changed, 44 insertions(+), 5 deletions(-) diff --git a/swift/common/utils/__init__.py b/swift/common/utils/__init__.py index c8bdca2b65..6f5b3bae01 100644 --- a/swift/common/utils/__init__.py +++ b/swift/common/utils/__init__.py @@ -2908,6 +2908,9 @@ class ContextPool(GreenPool): return self def __exit__(self, type, value, traceback): + self.close() + + def close(self): for coro in list(self.coroutines_running): coro.kill() diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 113ea9d439..4f3ed304d6 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -38,7 +38,7 @@ import random import sys from greenlet import GreenletExit -from eventlet import GreenPile, sleep +from eventlet import GreenPile from eventlet.queue import Queue, Empty from eventlet.timeout import Timeout @@ -1166,14 +1166,15 @@ class ECAppIter(object): self.mime_boundary = None self.learned_content_type = None self.stashed_iter = None + self.pool = ContextPool(len(internal_parts_iters)) def close(self): - # close down the stashed iter first so the ContextPool can - # cleanup the frag queue feeding coros that may be currently + # close down the stashed iter and shutdown the context pool to + # clean up the frag queue feeding coroutines that may be currently # executing the internal_parts_iters. if self.stashed_iter: close_if_possible(self.stashed_iter) - sleep() # Give the per-frag threads a chance to clean up + self.pool.close() for it in self.internal_parts_iters: close_if_possible(it) @@ -1531,7 +1532,7 @@ class ECAppIter(object): frag_iter.close() segments_decoded = 0 - with ContextPool(len(fragment_iters)) as pool: + with self.pool as pool: for frag_iter, queue in zip(fragment_iters, queues): pool.spawn(put_fragments_in_queue, frag_iter, queue, self.logger.thread_locals) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index 9fee92bb91..5779dd4214 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -8921,3 +8921,23 @@ class TestCooperativeIterator(unittest.TestCase): self.assertEqual(list(range(3)), actual) actual = do_test(utils.CooperativeIterator(itertools.count(), 0), 0) self.assertEqual(list(range(2)), actual) + + +class TestContextPool(unittest.TestCase): + def test_context_manager(self): + size = 5 + pool = utils.ContextPool(size) + with pool: + for _ in range(size): + pool.spawn(eventlet.sleep, 10) + self.assertEqual(pool.running(), size) + self.assertEqual(pool.running(), 0) + + def test_close(self): + size = 10 + pool = utils.ContextPool(size) + for _ in range(size): + pool.spawn(eventlet.sleep, 10) + self.assertEqual(pool.running(), size) + pool.close() + self.assertEqual(pool.running(), 0) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index b268e008ea..db56ad3eb2 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2411,6 +2411,10 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): def test_GET_disconnect(self): self.app.recoverable_node_timeout = 0.01 self.app.client_timeout = 0.1 + # Before, we used the default 64k chunk size, so the entire ~16k test + # data would come in the first chunk, and the generator should + # cleanly exit by the time we reiterate() the response. + self.app.object_chunk_size = 10 segment_size = self.policy.ec_segment_size test_data = (b'test' * segment_size)[:-743] etag = md5(test_data, usedforsecurity=False).hexdigest() @@ -2455,6 +2459,17 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): if stats: actual[self.app.error_limiter.node_key(n)] = stats self.assertEqual(actual, expected_error_limiting) + expected = ["Client disconnected on read of EC frag '/a/c/o'"] * 10 + self.assertEqual( + self.app.logger.get_lines_for_level('warning'), + expected) + for read_line in self.app.logger.get_lines_for_level('error'): + self.assertIn("Trying to read EC fragment during GET (retrying)", + read_line) + self.assertEqual( + len(self.logger.logger.records['ERROR']), 4, + 'Expected 4 ERROR lines, got %r' % ( + self.logger.logger.records['ERROR'], )) def test_GET_not_found_when_404_newer(self): # if proxy receives a 404, it keeps waiting for other connections until