reconstructor: extract closure for handle_response

Since _get_response was already a method, and called after the inline
definition of the handle_response closure (but before we called the
closure) I found the control flow to be confusingly different from
visual layout of the code.

Passing a few extra params around felt worth doing a:

def _get_response(self, ...
def _handle_response(self, ...

... and then using them, in that order, in the next method we define
_make_fragment_requests (which is now quite short and obvoius, despite
doing some concurrency with a GreenAsyncPile)

Related-Change-Id: I3f87933f788685775ce59f3724f17d5db948d502
Change-Id: I8bca2d0804569952d31aee7de4ffe60ede4343d2
This commit is contained in:
Clay Gerrard 2021-04-28 10:54:58 -05:00 committed by Alistair Coles
parent 7960097f02
commit ab8accbb0a
1 changed files with 88 additions and 92 deletions

View File

@ -395,6 +395,91 @@ class ObjectReconstructor(Daemon):
'full_path': full_path})
return resp
def _handle_fragment_response(self, node, policy, partition, fi_to_rebuild,
path, buckets, error_responses, resp):
"""
Place ok responses into a per-timestamp bucket. Append bad responses to
a list per-status-code in error_responses.
:return: the per-timestamp bucket if the response is ok, otherwise
None.
"""
if not resp:
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': resp.full_path})
if resp.status != HTTP_OK:
error_responses[resp.status].append(resp)
return None
resp.headers = HeaderKeyDict(resp.getheaders())
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
resp_frag_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case either
# missing X-Object-Sysmeta-Ec-Frag-Index or invalid frag index to
# reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
resp.full_path, frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing X-Backend-Timestamp)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning(
'Invalid resp from %s, frag index %s (missing Etag)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
bucket = buckets[timestamp]
bucket.num_responses += 1
if bucket.etag is None:
bucket.etag = etag
elif bucket.etag != etag:
self.logger.error('Mixed Etag (%s, %s) for %s frag#%s',
etag, bucket.etag,
_full_path(node, partition, path, policy),
fi_to_rebuild)
return None
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to find the
# very fragment we're trying to rebuild exists on another primary
# node. In this case we should stream it directly from the remote
# node to our target instead of rebuild. But instead we ignore it.
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
_full_path(node, partition, path, policy))
return None
durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index not in bucket.useful_responses:
bucket.useful_responses[resp_frag_index] = resp
return bucket
return None
def _make_fragment_requests(self, job, node, datafile_metadata, buckets,
error_responses, nodes):
"""
@ -434,91 +519,6 @@ class ObjectReconstructor(Daemon):
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
path = datafile_metadata['name']
def handle_fragment_response(resp):
"""
Place ok responses into a per-timestamp bucket. Append bad
responses to a list per-status-code in error_responses.
:return: the per-timestamp bucket if the response is ok, otherwise
None.
"""
if not resp:
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': resp.full_path})
if resp.status != HTTP_OK:
error_responses[resp.status].append(resp)
return None
resp.headers = HeaderKeyDict(resp.getheaders())
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
resp_frag_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case
# either missing X-Object-Sysmeta-Ec-Frag-Index or invalid
# frag index to reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
resp.full_path, frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing X-Backend-Timestamp)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning(
'Invalid resp from %s, frag index %s (missing Etag)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
bucket = buckets[timestamp]
bucket.num_responses += 1
if bucket.etag is None:
bucket.etag = etag
elif bucket.etag != etag:
self.logger.error('Mixed Etag (%s, %s) for %s frag#%s',
etag, bucket.etag,
_full_path(node, partition, path, policy),
fi_to_rebuild)
return None
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to
# find the very fragment we're trying to rebuild exists on
# another primary node. In this case we should stream it
# directly from the remote node to our target instead of
# rebuild. But instead we ignore it.
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
_full_path(node, partition, path, policy))
return None
durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index not in bucket.useful_responses:
bucket.useful_responses[resp_frag_index] = resp
return bucket
return None
pile = GreenAsyncPile(len(nodes))
for _node in nodes:
full_get_path = _full_path(_node, partition, path, policy)
@ -526,7 +526,9 @@ class ObjectReconstructor(Daemon):
path, headers, full_get_path)
for resp in pile:
bucket = handle_fragment_response(resp)
bucket = self._handle_fragment_response(
node, policy, partition, fi_to_rebuild, path, buckets,
error_responses, resp)
if bucket and len(bucket.useful_responses) >= policy.ec_ndata:
frag_indexes = list(bucket.useful_responses.keys())
self.logger.debug('Reconstruct frag #%s with frag indexes %s'
@ -550,12 +552,6 @@ class ObjectReconstructor(Daemon):
reconstructed and has as a result been quarantined.
:raises DiskFileError: if the fragment archive cannot be reconstructed.
"""
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
# fragments at the same timestamp as our fragment, and that they don't
# need to be durable. Accumulate responses into per-timestamp buckets
# and if any buckets gets enough responses then use those responses to
# rebuild.
policy = job['policy']
partition = job['partition']
# the fragment index we need to reconstruct is the position index