From ab8accbb0a388c56b43736082daa0ac43d2b8ba1 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Wed, 28 Apr 2021 10:54:58 -0500 Subject: [PATCH] reconstructor: extract closure for handle_response Since _get_response was already a method, and called after the inline definition of the handle_response closure (but before we called the closure) I found the control flow to be confusingly different from visual layout of the code. Passing a few extra params around felt worth doing a: def _get_response(self, ... def _handle_response(self, ... ... and then using them, in that order, in the next method we define _make_fragment_requests (which is now quite short and obvoius, despite doing some concurrency with a GreenAsyncPile) Related-Change-Id: I3f87933f788685775ce59f3724f17d5db948d502 Change-Id: I8bca2d0804569952d31aee7de4ffe60ede4343d2 --- swift/obj/reconstructor.py | 180 ++++++++++++++++++------------------- 1 file changed, 88 insertions(+), 92 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index bc71699d63..7d3477bf94 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -395,6 +395,91 @@ class ObjectReconstructor(Daemon): 'full_path': full_path}) return resp + def _handle_fragment_response(self, node, policy, partition, fi_to_rebuild, + path, buckets, error_responses, resp): + """ + Place ok responses into a per-timestamp bucket. Append bad responses to + a list per-status-code in error_responses. + + :return: the per-timestamp bucket if the response is ok, otherwise + None. + """ + if not resp: + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None + + if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]: + self.logger.warning( + _("Invalid response %(resp)s from %(full_path)s"), + {'resp': resp.status, 'full_path': resp.full_path}) + if resp.status != HTTP_OK: + error_responses[resp.status].append(resp) + return None + + resp.headers = HeaderKeyDict(resp.getheaders()) + frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') + try: + resp_frag_index = int(frag_index) + except (TypeError, ValueError): + # The successful response should include valid X-Object- + # Sysmeta-Ec-Frag-Index but for safety, catching the case either + # missing X-Object-Sysmeta-Ec-Frag-Index or invalid frag index to + # reconstruct and dump warning log for that + self.logger.warning( + 'Invalid resp from %s ' + '(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)', + resp.full_path, frag_index) + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None + + timestamp = resp.headers.get('X-Backend-Timestamp') + if not timestamp: + self.logger.warning('Invalid resp from %s, frag index %s ' + '(missing X-Backend-Timestamp)', + resp.full_path, resp_frag_index) + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None + timestamp = Timestamp(timestamp) + + etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag') + if not etag: + self.logger.warning( + 'Invalid resp from %s, frag index %s (missing Etag)', + resp.full_path, resp_frag_index) + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None + + bucket = buckets[timestamp] + bucket.num_responses += 1 + if bucket.etag is None: + bucket.etag = etag + elif bucket.etag != etag: + self.logger.error('Mixed Etag (%s, %s) for %s frag#%s', + etag, bucket.etag, + _full_path(node, partition, path, policy), + fi_to_rebuild) + return None + + if fi_to_rebuild == resp_frag_index: + # TODO: With duplicated EC frags it's not unreasonable to find the + # very fragment we're trying to rebuild exists on another primary + # node. In this case we should stream it directly from the remote + # node to our target instead of rebuild. But instead we ignore it. + self.logger.debug( + 'Found existing frag #%s at %s while rebuilding to %s', + fi_to_rebuild, resp.full_path, + _full_path(node, partition, path, policy)) + return None + + durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp') + if durable_timestamp: + buckets[Timestamp(durable_timestamp)].durable = True + + if resp_frag_index not in bucket.useful_responses: + bucket.useful_responses[resp_frag_index] = resp + return bucket + return None + def _make_fragment_requests(self, job, node, datafile_metadata, buckets, error_responses, nodes): """ @@ -434,91 +519,6 @@ class ObjectReconstructor(Daemon): headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) path = datafile_metadata['name'] - def handle_fragment_response(resp): - """ - Place ok responses into a per-timestamp bucket. Append bad - responses to a list per-status-code in error_responses. - - :return: the per-timestamp bucket if the response is ok, otherwise - None. - """ - if not resp: - error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) - return None - - if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]: - self.logger.warning( - _("Invalid response %(resp)s from %(full_path)s"), - {'resp': resp.status, 'full_path': resp.full_path}) - if resp.status != HTTP_OK: - error_responses[resp.status].append(resp) - return None - - resp.headers = HeaderKeyDict(resp.getheaders()) - frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') - try: - resp_frag_index = int(frag_index) - except (TypeError, ValueError): - # The successful response should include valid X-Object- - # Sysmeta-Ec-Frag-Index but for safety, catching the case - # either missing X-Object-Sysmeta-Ec-Frag-Index or invalid - # frag index to reconstruct and dump warning log for that - self.logger.warning( - 'Invalid resp from %s ' - '(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)', - resp.full_path, frag_index) - error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) - return None - - timestamp = resp.headers.get('X-Backend-Timestamp') - if not timestamp: - self.logger.warning('Invalid resp from %s, frag index %s ' - '(missing X-Backend-Timestamp)', - resp.full_path, resp_frag_index) - error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) - return None - timestamp = Timestamp(timestamp) - - etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag') - if not etag: - self.logger.warning( - 'Invalid resp from %s, frag index %s (missing Etag)', - resp.full_path, resp_frag_index) - error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) - return None - - bucket = buckets[timestamp] - bucket.num_responses += 1 - if bucket.etag is None: - bucket.etag = etag - elif bucket.etag != etag: - self.logger.error('Mixed Etag (%s, %s) for %s frag#%s', - etag, bucket.etag, - _full_path(node, partition, path, policy), - fi_to_rebuild) - return None - - if fi_to_rebuild == resp_frag_index: - # TODO: With duplicated EC frags it's not unreasonable to - # find the very fragment we're trying to rebuild exists on - # another primary node. In this case we should stream it - # directly from the remote node to our target instead of - # rebuild. But instead we ignore it. - self.logger.debug( - 'Found existing frag #%s at %s while rebuilding to %s', - fi_to_rebuild, resp.full_path, - _full_path(node, partition, path, policy)) - return None - - durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp') - if durable_timestamp: - buckets[Timestamp(durable_timestamp)].durable = True - - if resp_frag_index not in bucket.useful_responses: - bucket.useful_responses[resp_frag_index] = resp - return bucket - return None - pile = GreenAsyncPile(len(nodes)) for _node in nodes: full_get_path = _full_path(_node, partition, path, policy) @@ -526,7 +526,9 @@ class ObjectReconstructor(Daemon): path, headers, full_get_path) for resp in pile: - bucket = handle_fragment_response(resp) + bucket = self._handle_fragment_response( + node, policy, partition, fi_to_rebuild, path, buckets, + error_responses, resp) if bucket and len(bucket.useful_responses) >= policy.ec_ndata: frag_indexes = list(bucket.useful_responses.keys()) self.logger.debug('Reconstruct frag #%s with frag indexes %s' @@ -550,12 +552,6 @@ class ObjectReconstructor(Daemon): reconstructed and has as a result been quarantined. :raises DiskFileError: if the fragment archive cannot be reconstructed. """ - # KISS send out connection requests to all nodes, see what sticks. - # Use fragment preferences header to tell other nodes that we want - # fragments at the same timestamp as our fragment, and that they don't - # need to be durable. Accumulate responses into per-timestamp buckets - # and if any buckets gets enough responses then use those responses to - # rebuild. policy = job['policy'] partition = job['partition'] # the fragment index we need to reconstruct is the position index