ec: Close down some unused responses more quickly

These should get GC'ed eventually, but sooner is probably better than later.

Change-Id: I4daa18c36235e6df65e8b1c00a12dbf10677ca61
This commit is contained in:
Tim Burke 2020-08-20 15:13:41 -07:00
parent 8f60e0a260
commit b7b45eadcd
3 changed files with 144 additions and 29 deletions

View File

@ -2087,6 +2087,14 @@ class ECGetResponseBucket(object):
result = self.policy.ec_ndata - (len(self.get_responses()) + len(alts))
return max(result, 0)
def close_conns(self):
"""
Close bucket's responses; they won't be used for a client response.
"""
for getter, frag_iter in self.get_responses():
if getattr(getter.source, 'swift_conn', None):
close_swift_conn(getter.source)
def __str__(self):
# return a string summarising bucket state, useful for debugging.
return '<%s, %s, %s, %s(%s), %s>' \
@ -2227,6 +2235,15 @@ class ECGetResponseCollection(object):
return bucket
return self.least_bad_bucket
def choose_best_bucket(self):
best_bucket = self.best_bucket
# it's now or never -- close down any other requests
for bucket in self.buckets.values():
if bucket is best_bucket:
continue
bucket.close_conns()
return best_bucket
@property
def least_bad_bucket(self):
"""
@ -2343,6 +2360,7 @@ class ECFragGetter(object):
self.client_chunk_size = policy.fragment_size
self.skip_bytes = 0
self.bytes_used_from_backend = 0
self.source = None
def fast_forward(self, num_bytes):
"""
@ -2452,19 +2470,15 @@ class ECFragGetter(object):
def response_parts_iter(self, req):
try:
source, node = next(self.source_and_node_iter)
self.source, self.node = next(self.source_and_node_iter)
except StopIteration:
return
it = None
if source:
it = self._get_response_parts_iter(req, node, source)
if self.source:
it = self._get_response_parts_iter(req)
return it
def _get_response_parts_iter(self, req, node, source):
# Someday we can replace this [mess] with python 3's "nonlocal"
source = [source]
node = [node]
def _get_response_parts_iter(self, req):
try:
client_chunk_size = self.client_chunk_size
node_timeout = self.app.recoverable_node_timeout
@ -2473,7 +2487,7 @@ class ECFragGetter(object):
# on it, so no IO is performed.
parts_iter = [
http_response_to_document_iters(
source[0], read_chunk_size=self.app.object_chunk_size)]
self.source, read_chunk_size=self.app.object_chunk_size)]
def get_next_doc_part():
while True:
@ -2497,13 +2511,13 @@ class ECFragGetter(object):
new_source, new_node = self._dig_for_source_and_node()
if new_source:
self.app.error_occurred(
node[0], _('Trying to read object during '
'GET (retrying)'))
self.node, _('Trying to read object during '
'GET (retrying)'))
# Close-out the connection as best as possible.
if getattr(source[0], 'swift_conn', None):
close_swift_conn(source[0])
source[0] = new_source
node[0] = new_node
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
self.source = new_source
self.node = new_node
# This is safe; it sets up a generator but does
# not call next() on it, so no IO is performed.
parts_iter[0] = http_response_to_document_iters(
@ -2539,13 +2553,13 @@ class ECFragGetter(object):
new_source, new_node = self._dig_for_source_and_node()
if new_source:
self.app.error_occurred(
node[0], _('Trying to read object during '
'GET (retrying)'))
self.node, _('Trying to read object during '
'GET (retrying)'))
# Close-out the connection as best as possible.
if getattr(source[0], 'swift_conn', None):
close_swift_conn(source[0])
source[0] = new_source
node[0] = new_node
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
self.source = new_source
self.node = new_node
# This is safe; it just sets up a generator but
# does not call next() on it, so no IO is
# performed.
@ -2650,7 +2664,7 @@ class ECFragGetter(object):
part_iter.close()
except ChunkReadTimeout:
self.app.exception_occurred(node[0], _('Object'),
self.app.exception_occurred(self.node, _('Object'),
_('Trying to read during GET'))
raise
except ChunkWriteTimeout:
@ -2677,8 +2691,8 @@ class ECFragGetter(object):
raise
finally:
# Close-out the connection as best as possible.
if getattr(source[0], 'swift_conn', None):
close_swift_conn(source[0])
if getattr(self.source, 'swift_conn', None):
close_swift_conn(self.source)
@property
def last_status(self):
@ -2727,6 +2741,7 @@ class ECFragGetter(object):
not Timestamp(src_headers.get('x-backend-timestamp', 0)):
# throw out 5XX and 404s from handoff nodes unless the data is
# really on disk and had been DELETEd
conn.close()
return None
self.status = possible_source.status
@ -2737,6 +2752,7 @@ class ECFragGetter(object):
return possible_source
else:
self.body = possible_source.read()
conn.close()
if possible_source.status == HTTP_INSUFFICIENT_STORAGE:
self.app.error_limit(node, _('ERROR Insufficient Storage'))
@ -2939,7 +2955,7 @@ class ECObjectController(BaseObjectController):
# Put this back, since we *may* need it for kickoff()/_fix_response()
# (but note that _fix_ranges() may also pop it back off before then)
req.range = orig_range
best_bucket = buckets.best_bucket
best_bucket = buckets.choose_best_bucket()
if best_bucket.shortfall <= 0 and best_bucket.durable:
# headers can come from any of the getters
resp_headers = best_bucket.headers
@ -2979,6 +2995,7 @@ class ECObjectController(BaseObjectController):
reasons = []
bodies = []
headers = []
best_bucket.close_conns()
for status, bad_bucket in buckets.bad_buckets.items():
for getter, _parts_iter in bad_bucket.get_responses():
if best_bucket.durable:

View File

@ -1029,6 +1029,10 @@ def fake_http_connect(*code_iter, **kwargs):
def getheader(self, name, default=None):
return HeaderKeyDict(self.getheaders()).get(name, default)
def nuke_from_orbit(self):
# wrapped connections from buffered_http have this helper
self.close()
def close(self):
self.closed = True
@ -1091,10 +1095,13 @@ def fake_http_connect(*code_iter, **kwargs):
body = static_body or b''
else:
body = next(body_iter)
return FakeConn(status, etag, body=body, timestamp=timestamp,
conn = FakeConn(status, etag, body=body, timestamp=timestamp,
headers=headers, expect_headers=expect_headers,
connection_id=i, give_send=kwargs.get('give_send'),
give_expect=kwargs.get('give_expect'))
if 'capture_connections' in kwargs:
kwargs['capture_connections'].append(conn)
return conn
connect.unexpected_requests = unexpected_requests
connect.code_iter = code_iter
@ -1105,6 +1112,7 @@ def fake_http_connect(*code_iter, **kwargs):
@contextmanager
def mocked_http_conn(*args, **kwargs):
requests = []
responses = []
def capture_requests(ip, port, method, path, headers, qs, ssl):
if six.PY2 and not isinstance(ip, bytes):
@ -1120,8 +1128,10 @@ def mocked_http_conn(*args, **kwargs):
}
requests.append(req)
kwargs.setdefault('give_connect', capture_requests)
kwargs['capture_connections'] = responses
fake_conn = fake_http_connect(*args, **kwargs)
fake_conn.requests = requests
fake_conn.responses = responses
with mocklib.patch('swift.common.bufferedhttp.http_connect_raw',
new=fake_conn):
yield fake_conn
@ -1185,6 +1195,10 @@ class StubResponse(object):
fake_reason = ('Fake', 'This response is a lie.')
self.reason = swob.RESPONSE_REASONS.get(status, fake_reason)[0]
def nuke_from_orbit(self):
if hasattr(self, 'swift_conn'):
self.swift_conn.close()
def getheader(self, header_name, default=None):
return self.headers.get(header_name, default)

View File

@ -2061,6 +2061,7 @@ def capture_http_requests(get_response):
self.req = req
self.resp = None
self.path = "/"
self.closed = False
def getresponse(self):
self.resp = get_response(self.req)
@ -2075,6 +2076,9 @@ def capture_http_requests(get_response):
def endheaders(self):
pass
def close(self):
self.closed = True
class ConnectionLog(object):
def __init__(self):
@ -2826,7 +2830,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers['etag'], obj2['etag'])
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
closed_conn[etag].add(conn.closed)
self.assertEqual({
obj1['etag']: {True},
obj2['etag']: {False},
}, closed_conn)
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
self.assertEqual({True}, {conn.closed for conn in log})
collected_responses = defaultdict(set)
for conn in log:
@ -2999,6 +3012,15 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
closed_conn[etag].add(conn.closed)
self.assertEqual({
obj1['etag']: {True},
obj2['etag']: {True},
None: {True},
}, dict(closed_conn))
self.assertEqual(resp.status_int, 503)
collected_responses = defaultdict(set)
@ -3148,10 +3170,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
collected_etags = set()
collected_status = set()
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
collected_etags.add(etag)
collected_status.add(conn.resp.status)
closed_conn[etag].add(conn.closed)
# default node_iter will exhaust at 2 * replicas
self.assertEqual(len(log), 2 * self.replicas())
@ -3159,6 +3183,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
collected_etags)
self.assertEqual({200}, collected_status)
self.assertEqual({
obj1['etag']: {True},
obj2['etag']: {True},
obj3['etag']: {True},
obj4['etag']: {True},
}, closed_conn)
def test_GET_with_mixed_durable_and_nondurable_frags_will_503(self):
# all nodes have a frag but there is no one set that reaches quorum,
@ -3208,12 +3238,14 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.status_int, 503)
closed_conn = defaultdict(set)
collected_etags = set()
collected_status = set()
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
collected_etags.add(etag)
collected_status.add(conn.resp.status)
closed_conn[etag].add(conn.closed)
# default node_iter will exhaust at 2 * replicas
self.assertEqual(len(log), 2 * self.replicas())
@ -3221,6 +3253,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
collected_etags)
self.assertEqual({200}, collected_status)
self.assertEqual({
obj1['etag']: {True},
obj2['etag']: {True},
obj3['etag']: {True},
obj4['etag']: {True},
}, closed_conn)
def test_GET_with_mixed_durable_frags_and_no_quorum_will_503(self):
# all nodes have a frag but there is no one set that reaches quorum,
@ -3270,12 +3308,17 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(resp.status_int, 503)
for conn in log:
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
collected_etags = set()
collected_status = set()
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers['X-Object-Sysmeta-Ec-Etag']
collected_etags.add(etag)
collected_status.add(conn.resp.status)
closed_conn[etag].add(conn.closed)
# default node_iter will exhaust at 2 * replicas
self.assertEqual(len(log), 2 * self.replicas())
@ -3283,6 +3326,12 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
{obj1['etag'], obj2['etag'], obj3['etag'], obj4['etag']},
collected_etags)
self.assertEqual({200}, collected_status)
self.assertEqual({
obj1['etag']: {True},
obj2['etag']: {True},
obj3['etag']: {True},
obj4['etag']: {True},
}, closed_conn)
def test_GET_with_quorum_durable_files(self):
# verify that only (ec_nparity + 1) nodes need to be durable for a GET
@ -3436,9 +3485,18 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
fake_response = self._fake_ec_node_response(list(node_frags))
req = swob.Request.blank('/v1/a/c/o')
with capture_http_requests(fake_response):
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
closed_conn[etag].add(conn.closed)
self.assertEqual({
obj1['etag']: {False},
obj2['etag']: {True},
}, closed_conn)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers['etag'], obj1['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj1['etag'])
@ -3530,6 +3588,15 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
with capture_http_requests(fake_response) as log:
resp = req.get_response(self.app)
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
closed_conn[etag].add(conn.closed)
self.assertEqual({
obj1['etag']: {True},
obj2['etag']: {False},
}, closed_conn)
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.headers['etag'], obj2['etag'])
self.assertEqual(md5(resp.body).hexdigest(), obj2['etag'])
@ -3922,6 +3989,16 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
with capture_http_requests(get_response) as log:
resp = req.get_response(self.app)
closed_conn = defaultdict(set)
for conn in log:
etag = conn.resp.headers.get('X-Object-Sysmeta-Ec-Etag')
closed_conn[etag].add(conn.closed)
self.assertEqual({
old_etag: {True},
new_etag: {False},
None: {True},
}, dict(closed_conn))
self.assertEqual(resp.status_int, 200)
self.assertEqual(resp.body, new_data[:segment_size])
self.assertEqual(len(log), self.policy.ec_ndata + 10)
@ -4134,8 +4211,8 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.app.recoverable_node_timeout = 0.01
req = swob.Request.blank('/v1/a/c/o')
status_codes, body_iter, headers = zip(*responses)
with set_http_connect(*status_codes, body_iter=body_iter,
headers=headers):
with mocked_http_conn(*status_codes, body_iter=body_iter,
headers=headers) as log:
resp = req.get_response(self.app)
self.assertEqual(resp.status_int, 200)
self.assertEqual(md5(resp.body).hexdigest(), etag1)
@ -4143,6 +4220,13 @@ class TestECObjController(ECObjectControllerMixin, unittest.TestCase):
self.assertEqual(2, len(error_lines))
for line in error_lines:
self.assertIn('retrying', line)
etag2_conns = []
for conn in log.responses:
if conn.headers.get('X-Object-Sysmeta-Ec-Etag') == etag2:
etag2_conns.append(conn)
self.assertEqual(
([True] * 8) + [False], # the resumed etag2 doesn't get closed
[conn.closed for conn in etag2_conns])
def test_fix_response_HEAD(self):
headers = {'X-Object-Sysmeta-Ec-Content-Length': '10',