From b7b45eadcd3b948e3313ce88e34ed5a7152eaa7b Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Thu, 20 Aug 2020 15:13:41 -0700 Subject: [PATCH] ec: Close down some unused responses more quickly These should get GC'ed eventually, but sooner is probably better than later. Change-Id: I4daa18c36235e6df65e8b1c00a12dbf10677ca61 --- swift/proxy/controllers/obj.py | 67 +++++++++++------- test/unit/__init__.py | 16 ++++- test/unit/proxy/controllers/test_obj.py | 90 ++++++++++++++++++++++++- 3 files changed, 144 insertions(+), 29 deletions(-) diff --git a/swift/proxy/controllers/obj.py b/swift/proxy/controllers/obj.py index 6fa359d679..5662f4f2d9 100644 --- a/swift/proxy/controllers/obj.py +++ b/swift/proxy/controllers/obj.py @@ -2087,6 +2087,14 @@ class ECGetResponseBucket(object): result = self.policy.ec_ndata - (len(self.get_responses()) + len(alts)) return max(result, 0) + def close_conns(self): + """ + Close bucket's responses; they won't be used for a client response. + """ + for getter, frag_iter in self.get_responses(): + if getattr(getter.source, 'swift_conn', None): + close_swift_conn(getter.source) + def __str__(self): # return a string summarising bucket state, useful for debugging. return '<%s, %s, %s, %s(%s), %s>' \ @@ -2227,6 +2235,15 @@ class ECGetResponseCollection(object): return bucket return self.least_bad_bucket + def choose_best_bucket(self): + best_bucket = self.best_bucket + # it's now or never -- close down any other requests + for bucket in self.buckets.values(): + if bucket is best_bucket: + continue + bucket.close_conns() + return best_bucket + @property def least_bad_bucket(self): """ @@ -2343,6 +2360,7 @@ class ECFragGetter(object): self.client_chunk_size = policy.fragment_size self.skip_bytes = 0 self.bytes_used_from_backend = 0 + self.source = None def fast_forward(self, num_bytes): """ @@ -2452,19 +2470,15 @@ class ECFragGetter(object): def response_parts_iter(self, req): try: - source, node = next(self.source_and_node_iter) + self.source, self.node = next(self.source_and_node_iter) except StopIteration: return it = None - if source: - it = self._get_response_parts_iter(req, node, source) + if self.source: + it = self._get_response_parts_iter(req) return it - def _get_response_parts_iter(self, req, node, source): - # Someday we can replace this [mess] with python 3's "nonlocal" - source = [source] - node = [node] - + def _get_response_parts_iter(self, req): try: client_chunk_size = self.client_chunk_size node_timeout = self.app.recoverable_node_timeout @@ -2473,7 +2487,7 @@ class ECFragGetter(object): # on it, so no IO is performed. parts_iter = [ http_response_to_document_iters( - source[0], read_chunk_size=self.app.object_chunk_size)] + self.source, read_chunk_size=self.app.object_chunk_size)] def get_next_doc_part(): while True: @@ -2497,13 +2511,13 @@ class ECFragGetter(object): new_source, new_node = self._dig_for_source_and_node() if new_source: self.app.error_occurred( - node[0], _('Trying to read object during ' - 'GET (retrying)')) + self.node, _('Trying to read object during ' + 'GET (retrying)')) # Close-out the connection as best as possible. - if getattr(source[0], 'swift_conn', None): - close_swift_conn(source[0]) - source[0] = new_source - node[0] = new_node + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node # This is safe; it sets up a generator but does # not call next() on it, so no IO is performed. parts_iter[0] = http_response_to_document_iters( @@ -2539,13 +2553,13 @@ class ECFragGetter(object): new_source, new_node = self._dig_for_source_and_node() if new_source: self.app.error_occurred( - node[0], _('Trying to read object during ' - 'GET (retrying)')) + self.node, _('Trying to read object during ' + 'GET (retrying)')) # Close-out the connection as best as possible. - if getattr(source[0], 'swift_conn', None): - close_swift_conn(source[0]) - source[0] = new_source - node[0] = new_node + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) + self.source = new_source + self.node = new_node # This is safe; it just sets up a generator but # does not call next() on it, so no IO is # performed. @@ -2650,7 +2664,7 @@ class ECFragGetter(object): part_iter.close() except ChunkReadTimeout: - self.app.exception_occurred(node[0], _('Object'), + self.app.exception_occurred(self.node, _('Object'), _('Trying to read during GET')) raise except ChunkWriteTimeout: @@ -2677,8 +2691,8 @@ class ECFragGetter(object): raise finally: # Close-out the connection as best as possible. - if getattr(source[0], 'swift_conn', None): - close_swift_conn(source[0]) + if getattr(self.source, 'swift_conn', None): + close_swift_conn(self.source) @property def last_status(self): @@ -2727,6 +2741,7 @@ class ECFragGetter(object): not Timestamp(src_headers.get('x-backend-timestamp', 0)): # throw out 5XX and 404s from handoff nodes unless the data is # really on disk and had been DELETEd + conn.close() return None self.status = possible_source.status @@ -2737,6 +2752,7 @@ class ECFragGetter(object): return possible_source else: self.body = possible_source.read() + conn.close() if possible_source.status == HTTP_INSUFFICIENT_STORAGE: self.app.error_limit(node, _('ERROR Insufficient Storage')) @@ -2939,7 +2955,7 @@ class ECObjectController(BaseObjectController): # Put this back, since we *may* need it for kickoff()/_fix_response() # (but note that _fix_ranges() may also pop it back off before then) req.range = orig_range - best_bucket = buckets.best_bucket + best_bucket = buckets.choose_best_bucket() if best_bucket.shortfall <= 0 and best_bucket.durable: # headers can come from any of the getters resp_headers = best_bucket.headers @@ -2979,6 +2995,7 @@ class ECObjectController(BaseObjectController): reasons = [] bodies = [] headers = [] + best_bucket.close_conns() for status, bad_bucket in buckets.bad_buckets.items(): for getter, _parts_iter in bad_bucket.get_responses(): if best_bucket.durable: diff --git a/test/unit/__init__.py b/test/unit/__init__.py index adb6663903..b80df0c19c 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -1029,6 +1029,10 @@ def fake_http_connect(*code_iter, **kwargs): def getheader(self, name, default=None): return HeaderKeyDict(self.getheaders()).get(name, default) + def nuke_from_orbit(self): + # wrapped connections from buffered_http have this helper + self.close() + def close(self): self.closed = True @@ -1091,10 +1095,13 @@ def fake_http_connect(*code_iter, **kwargs): body = static_body or b'' else: body = next(body_iter) - return FakeConn(status, etag, body=body, timestamp=timestamp, + conn = FakeConn(status, etag, body=body, timestamp=timestamp, headers=headers, expect_headers=expect_headers, connection_id=i, give_send=kwargs.get('give_send'), give_expect=kwargs.get('give_expect')) + if 'capture_connections' in kwargs: + kwargs['capture_connections'].append(conn) + return conn connect.unexpected_requests = unexpected_requests connect.code_iter = code_iter @@ -1105,6 +1112,7 @@ def fake_http_connect(*code_iter, **kwargs): @contextmanager def mocked_http_conn(*args, **kwargs): requests = [] + responses = [] def capture_requests(ip, port, method, path, headers, qs, ssl): if six.PY2 and not isinstance(ip, bytes): @@ -1120,8 +1128,10 @@ def mocked_http_conn(*args, **kwargs): } requests.append(req) kwargs.setdefault('give_connect', capture_requests) + kwargs['capture_connections'] = responses fake_conn = fake_http_connect(*args, **kwargs) fake_conn.requests = requests + fake_conn.responses = responses with mocklib.patch('swift.common.bufferedhttp.http_connect_raw', new=fake_conn): yield fake_conn @@ -1185,6 +1195,10 @@ class StubResponse(object): fake_reason = ('Fake', 'This response is a lie.') self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0] + def nuke_from_orbit(self): + if hasattr(self, 'swift_conn'): + self.swift_conn.close() + def getheader(self, header_name, default=None): return self.headers.get(header_name, default) diff --git a/test/unit/proxy/controllers/test_obj.py b/test/unit/proxy/controllers/test_obj.py index fb84af7fd9..4eb71380e9 100644 --- a/test/unit/proxy/controllers/test_obj.py +++ b/test/unit/proxy/controllers/test_obj.py @@ -2061,6 +2061,7 @@ def capture_http_requests(get_response): self.req = req self.resp = None self.path = "/" + self.closed = False def getresponse(self): self.resp = get_response(self.req) @@ -2075,6 +2076,9 @@ def capture_http_requests(get_response): def endheaders(self): pass + def close(self): + self.closed = True + class ConnectionLog(object): def __init__(self): @@ -2826,7 +2830,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 200) self.assertEqual(resp.headers['etag'], obj2['etag']) + closed_conn = defaultdict(set) + for conn in log: + etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] + closed_conn[etag].add(conn.closed) + self.assertEqual({ + obj1['etag']: {True}, + obj2['etag']: {False}, + }, closed_conn) self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) + self.assertEqual({True}, {conn.closed for conn in log}) collected_responses = defaultdict(set) for conn in log: @@ -2999,6 +3012,15 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): with capture_http_requests(fake_response) as log: resp = req.get_response(self.app) + closed_conn = defaultdict(set) + for conn in log: + etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag') + closed_conn[etag].add(conn.closed) + self.assertEqual({ + obj1['etag']: {True}, + obj2['etag']: {True}, + None: {True}, + }, dict(closed_conn)) self.assertEqual(resp.status_int, 503) collected_responses = defaultdict(set) @@ -3148,10 +3170,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): collected_etags = set() collected_status = set() + closed_conn = defaultdict(set) for conn in log: etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] collected_etags.add(etag) collected_status.add(conn.resp.status) + closed_conn[etag].add(conn.closed) # default node_iter will exhaust at 2 * replicas self.assertEqual(len(log), 2 * self.replicas()) @@ -3159,6 +3183,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): {obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']}, collected_etags) self.assertEqual({200}, collected_status) + self.assertEqual({ + obj1['etag']: {True}, + obj2['etag']: {True}, + obj3['etag']: {True}, + obj4['etag']: {True}, + }, closed_conn) def test_GET_with_mixed_durable_and_nondurable_frags_will_503(self): # all nodes have a frag but there is no one set that reaches quorum, @@ -3208,12 +3238,14 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 503) + closed_conn = defaultdict(set) collected_etags = set() collected_status = set() for conn in log: etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] collected_etags.add(etag) collected_status.add(conn.resp.status) + closed_conn[etag].add(conn.closed) # default node_iter will exhaust at 2 * replicas self.assertEqual(len(log), 2 * self.replicas()) @@ -3221,6 +3253,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): {obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']}, collected_etags) self.assertEqual({200}, collected_status) + self.assertEqual({ + obj1['etag']: {True}, + obj2['etag']: {True}, + obj3['etag']: {True}, + obj4['etag']: {True}, + }, closed_conn) def test_GET_with_mixed_durable_frags_and_no_quorum_will_503(self): # all nodes have a frag but there is no one set that reaches quorum, @@ -3270,12 +3308,17 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(resp.status_int, 503) + for conn in log: + etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag') + collected_etags = set() collected_status = set() + closed_conn = defaultdict(set) for conn in log: etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag'] collected_etags.add(etag) collected_status.add(conn.resp.status) + closed_conn[etag].add(conn.closed) # default node_iter will exhaust at 2 * replicas self.assertEqual(len(log), 2 * self.replicas()) @@ -3283,6 +3326,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): {obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']}, collected_etags) self.assertEqual({200}, collected_status) + self.assertEqual({ + obj1['etag']: {True}, + obj2['etag']: {True}, + obj3['etag']: {True}, + obj4['etag']: {True}, + }, closed_conn) def test_GET_with_quorum_durable_files(self): # verify that only (ec_nparity + 1) nodes need to be durable for a GET @@ -3436,9 +3485,18 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): fake_response = self._fake_ec_node_response(list(node_frags)) req = swob.Request.blank('/v1/a/c/o') - with capture_http_requests(fake_response): + with capture_http_requests(fake_response) as log: resp = req.get_response(self.app) + closed_conn = defaultdict(set) + for conn in log: + etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag') + closed_conn[etag].add(conn.closed) + self.assertEqual({ + obj1['etag']: {False}, + obj2['etag']: {True}, + }, closed_conn) + self.assertEqual(resp.status_int, 200) self.assertEqual(resp.headers['etag'], obj1['etag']) self.assertEqual(md5(resp.body).hexdigest(), obj1['etag']) @@ -3530,6 +3588,15 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): with capture_http_requests(fake_response) as log: resp = req.get_response(self.app) + closed_conn = defaultdict(set) + for conn in log: + etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag') + closed_conn[etag].add(conn.closed) + self.assertEqual({ + obj1['etag']: {True}, + obj2['etag']: {False}, + }, closed_conn) + self.assertEqual(resp.status_int, 200) self.assertEqual(resp.headers['etag'], obj2['etag']) self.assertEqual(md5(resp.body).hexdigest(), obj2['etag']) @@ -3922,6 +3989,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): with capture_http_requests(get_response) as log: resp = req.get_response(self.app) + closed_conn = defaultdict(set) + for conn in log: + etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag') + closed_conn[etag].add(conn.closed) + self.assertEqual({ + old_etag: {True}, + new_etag: {False}, + None: {True}, + }, dict(closed_conn)) + self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body, new_data[:segment_size]) self.assertEqual(len(log), self.policy.ec_ndata + 10) @@ -4134,8 +4211,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.app.recoverable_node_timeout = 0.01 req = swob.Request.blank('/v1/a/c/o') status_codes, body_iter, headers = zip(*responses) - with set_http_connect(*status_codes, body_iter=body_iter, - headers=headers): + with mocked_http_conn(*status_codes, body_iter=body_iter, + headers=headers) as log: resp = req.get_response(self.app) self.assertEqual(resp.status_int, 200) self.assertEqual(md5(resp.body).hexdigest(), etag1) @@ -4143,6 +4220,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase): self.assertEqual(2, len(error_lines)) for line in error_lines: self.assertIn('retrying', line) + etag2_conns = [] + for conn in log.responses: + if conn.headers.get('X-Object-Sysmeta-Ec-Etag') == etag2: + etag2_conns.append(conn) + self.assertEqual( + ([True] * 8) + [False], # the resumed etag2 doesn't get closed + [conn.closed for conn in etag2_conns]) def test_fix_response_HEAD(self): headers = {'X-Object-Sysmeta-Ec-Content-Length': '10',