Merge "reconstructor: extract closure for handle_response"

This commit is contained in:
Zuul 2021-04-28 23:07:47 +00:00 committed by Gerrit Code Review
commit 7cfdb50f93
1 changed files with 88 additions and 92 deletions

View File

@ -395,6 +395,91 @@ class ObjectReconstructor(Daemon):
'full_path': full_path}) 'full_path': full_path})
return resp return resp
def _handle_fragment_response(self, node, policy, partition, fi_to_rebuild,
path, buckets, error_responses, resp):
"""
Place ok responses into a per-timestamp bucket. Append bad responses to
a list per-status-code in error_responses.
:return: the per-timestamp bucket if the response is ok, otherwise
None.
"""
if not resp:
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': resp.full_path})
if resp.status != HTTP_OK:
error_responses[resp.status].append(resp)
return None
resp.headers = HeaderKeyDict(resp.getheaders())
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
resp_frag_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case either
# missing X-Object-Sysmeta-Ec-Frag-Index or invalid frag index to
# reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
resp.full_path, frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing X-Backend-Timestamp)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning(
'Invalid resp from %s, frag index %s (missing Etag)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
bucket = buckets[timestamp]
bucket.num_responses += 1
if bucket.etag is None:
bucket.etag = etag
elif bucket.etag != etag:
self.logger.error('Mixed Etag (%s, %s) for %s frag#%s',
etag, bucket.etag,
_full_path(node, partition, path, policy),
fi_to_rebuild)
return None
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to find the
# very fragment we're trying to rebuild exists on another primary
# node. In this case we should stream it directly from the remote
# node to our target instead of rebuild. But instead we ignore it.
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
_full_path(node, partition, path, policy))
return None
durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index not in bucket.useful_responses:
bucket.useful_responses[resp_frag_index] = resp
return bucket
return None
def _make_fragment_requests(self, job, node, datafile_metadata, buckets, def _make_fragment_requests(self, job, node, datafile_metadata, buckets,
error_responses, nodes): error_responses, nodes):
""" """
@ -434,91 +519,6 @@ class ObjectReconstructor(Daemon):
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
path = datafile_metadata['name'] path = datafile_metadata['name']
def handle_fragment_response(resp):
"""
Place ok responses into a per-timestamp bucket. Append bad
responses to a list per-status-code in error_responses.
:return: the per-timestamp bucket if the response is ok, otherwise
None.
"""
if not resp:
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': resp.full_path})
if resp.status != HTTP_OK:
error_responses[resp.status].append(resp)
return None
resp.headers = HeaderKeyDict(resp.getheaders())
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
resp_frag_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case
# either missing X-Object-Sysmeta-Ec-Frag-Index or invalid
# frag index to reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
resp.full_path, frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing X-Backend-Timestamp)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning(
'Invalid resp from %s, frag index %s (missing Etag)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
bucket = buckets[timestamp]
bucket.num_responses += 1
if bucket.etag is None:
bucket.etag = etag
elif bucket.etag != etag:
self.logger.error('Mixed Etag (%s, %s) for %s frag#%s',
etag, bucket.etag,
_full_path(node, partition, path, policy),
fi_to_rebuild)
return None
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to
# find the very fragment we're trying to rebuild exists on
# another primary node. In this case we should stream it
# directly from the remote node to our target instead of
# rebuild. But instead we ignore it.
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
_full_path(node, partition, path, policy))
return None
durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index not in bucket.useful_responses:
bucket.useful_responses[resp_frag_index] = resp
return bucket
return None
pile = GreenAsyncPile(len(nodes)) pile = GreenAsyncPile(len(nodes))
for _node in nodes: for _node in nodes:
full_get_path = _full_path(_node, partition, path, policy) full_get_path = _full_path(_node, partition, path, policy)
@ -526,7 +526,9 @@ class ObjectReconstructor(Daemon):
path, headers, full_get_path) path, headers, full_get_path)
for resp in pile: for resp in pile:
bucket = handle_fragment_response(resp) bucket = self._handle_fragment_response(
node, policy, partition, fi_to_rebuild, path, buckets,
error_responses, resp)
if bucket and len(bucket.useful_responses) >= policy.ec_ndata: if bucket and len(bucket.useful_responses) >= policy.ec_ndata:
frag_indexes = list(bucket.useful_responses.keys()) frag_indexes = list(bucket.useful_responses.keys())
self.logger.debug('Reconstruct frag #%s with frag indexes %s' self.logger.debug('Reconstruct frag #%s with frag indexes %s'
@ -550,12 +552,6 @@ class ObjectReconstructor(Daemon):
reconstructed and has as a result been quarantined. reconstructed and has as a result been quarantined.
:raises DiskFileError: if the fragment archive cannot be reconstructed. :raises DiskFileError: if the fragment archive cannot be reconstructed.
""" """
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
# fragments at the same timestamp as our fragment, and that they don't
# need to be durable. Accumulate responses into per-timestamp buckets
# and if any buckets gets enough responses then use those responses to
# rebuild.
policy = job['policy'] policy = job['policy']
partition = job['partition'] partition = job['partition']
# the fragment index we need to reconstruct is the position index # the fragment index we need to reconstruct is the position index