diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 2b3db0217b..bc71699d63 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -48,6 +48,7 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileError, \ SuffixSyncError, PartitionLockTimeout SYNC, REVERT = ('sync_only', 'sync_revert') +UNKNOWN_RESPONSE_STATUS = 0 # used as response status for timeouts, exceptions def _get_partners(node_index, part_nodes): @@ -94,6 +95,22 @@ def _full_path(node, part, relative_path, policy): } +class ResponseBucket(object): + """ + Encapsulates fragment GET response data related to a single timestamp. + """ + def __init__(self): + # count of all responses associated with this Bucket + self.num_responses = 0 + # map {frag_index: response} for subset of responses that + # could be used to rebuild the missing fragment + self.useful_responses = {} + # set if a durable timestamp was seen in responses + self.durable = False + # etag of the first response associated with the Bucket + self.etag = None + + class RebuildingECDiskFileStream(object): """ This class wraps the reconstructed fragment archive data and @@ -371,36 +388,33 @@ class ObjectReconstructor(Daemon): with Timeout(self.node_timeout): resp = conn.getresponse() resp.full_path = full_path - if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]: - self.logger.warning( - _("Invalid response %(resp)s from %(full_path)s"), - {'resp': resp.status, 'full_path': full_path}) - resp = None - elif resp.status == HTTP_NOT_FOUND: - resp = None + resp.node = node except (Exception, Timeout): self.logger.exception( _("Trying to GET %(full_path)s"), { 'full_path': full_path}) return resp - def reconstruct_fa(self, job, node, datafile_metadata): + def _make_fragment_requests(self, job, node, datafile_metadata, buckets, + error_responses, nodes): """ - Reconstructs a fragment archive - this method is called from ssync - after a remote node responds that is missing this object - the local - diskfile is opened to provide metadata - but to reconstruct the - missing fragment archive we must connect to multiple object servers. + Issue requests for fragments to the list of ``nodes`` and sort the + responses into per-timestamp ``buckets`` or per-status + ``error_responses``. If any bucket accumulates sufficient responses to + rebuild the missing fragment then return that bucket. - :param job: job from ssync_sender - :param node: node that we're rebuilding to + :param job: job from ssync_sender. + :param node: node to which we're rebuilding. :param datafile_metadata: the datafile metadata to attach to the rebuilt fragment archive - :returns: a DiskFile like class for use by ssync - :raises DiskFileError: if the fragment archive cannot be reconstructed + :param buckets: dict of per-timestamp buckets for ok responses. + :param error_responses: dict of per-status lists of error responses. + :param nodes: A list of nodes. + :return: A per-timestamp with sufficient responses, or None if + there is no such bucket. """ - # don't try and fetch a fragment from the node we're rebuilding to - part_nodes = [n for n in job['policy'].object_ring.get_part_nodes( - job['partition']) if n['id'] != node['id']] + policy = job['policy'] + partition = job['partition'] # the fragment index we need to reconstruct is the position index # of the node we're rebuilding to within the primary part list @@ -409,29 +423,37 @@ class ObjectReconstructor(Daemon): # KISS send out connection requests to all nodes, see what sticks. # Use fragment preferences header to tell other nodes that we want # fragments at the same timestamp as our fragment, and that they don't - # need to be durable. + # need to be durable. Accumulate responses into per-timestamp buckets + # and if any buckets gets enough responses then use those responses to + # rebuild. headers = self.headers.copy() - headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + headers['X-Backend-Storage-Policy-Index'] = int(policy) headers['X-Backend-Replication'] = 'True' - frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'], - 'exclude': []}] + local_timestamp = Timestamp(datafile_metadata['X-Timestamp']) + frag_prefs = [{'timestamp': local_timestamp.normal, 'exclude': []}] headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs) - pile = GreenAsyncPile(len(part_nodes)) path = datafile_metadata['name'] - for _node in part_nodes: - full_get_path = _full_path( - _node, job['partition'], path, job['policy']) - pile.spawn(self._get_response, _node, job['partition'], - path, headers, full_get_path) - buckets = defaultdict(dict) - durable_buckets = {} - etag_buckets = {} - error_resp_count = 0 - for resp in pile: + def handle_fragment_response(resp): + """ + Place ok responses into a per-timestamp bucket. Append bad + responses to a list per-status-code in error_responses. + + :return: the per-timestamp bucket if the response is ok, otherwise + None. + """ if not resp: - error_resp_count += 1 - continue + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None + + if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]: + self.logger.warning( + _("Invalid response %(resp)s from %(full_path)s"), + {'resp': resp.status, 'full_path': resp.full_path}) + if resp.status != HTTP_OK: + error_responses[resp.status].append(resp) + return None + resp.headers = HeaderKeyDict(resp.getheaders()) frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') try: @@ -445,90 +467,143 @@ class ObjectReconstructor(Daemon): 'Invalid resp from %s ' '(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)', resp.full_path, frag_index) - continue - - if fi_to_rebuild == resp_frag_index: - # TODO: With duplicated EC frags it's not unreasonable to find - # the very fragment we're trying to rebuild exists on another - # primary node. In this case we should stream it directly from - # the remote node to our target instead of rebuild. But - # instead we ignore it. - self.logger.debug( - 'Found existing frag #%s at %s while rebuilding to %s', - fi_to_rebuild, resp.full_path, - _full_path( - node, job['partition'], datafile_metadata['name'], - job['policy'])) - continue + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None timestamp = resp.headers.get('X-Backend-Timestamp') if not timestamp: self.logger.warning('Invalid resp from %s, frag index %s ' '(missing X-Backend-Timestamp)', resp.full_path, resp_frag_index) - continue + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None timestamp = Timestamp(timestamp) - durable = resp.headers.get('X-Backend-Durable-Timestamp') - if durable: - durable_buckets[Timestamp(durable)] = True - etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag') if not etag: - self.logger.warning('Invalid resp from %s, frag index %s ' - '(missing Etag)', - resp.full_path, resp_frag_index) - continue + self.logger.warning( + 'Invalid resp from %s, frag index %s (missing Etag)', + resp.full_path, resp_frag_index) + error_responses[UNKNOWN_RESPONSE_STATUS].append(resp) + return None - if etag != etag_buckets.setdefault(timestamp, etag): - self.logger.error( - 'Mixed Etag (%s, %s) for %s frag#%s', - etag, etag_buckets[timestamp], - _full_path(node, job['partition'], - datafile_metadata['name'], job['policy']), - fi_to_rebuild) - continue + bucket = buckets[timestamp] + bucket.num_responses += 1 + if bucket.etag is None: + bucket.etag = etag + elif bucket.etag != etag: + self.logger.error('Mixed Etag (%s, %s) for %s frag#%s', + etag, bucket.etag, + _full_path(node, partition, path, policy), + fi_to_rebuild) + return None - if resp_frag_index not in buckets[timestamp]: - buckets[timestamp][resp_frag_index] = resp - if len(buckets[timestamp]) >= job['policy'].ec_ndata: - responses = list(buckets[timestamp].values()) - self.logger.debug( - 'Reconstruct frag #%s with frag indexes %s' - % (fi_to_rebuild, list(buckets[timestamp]))) - break - else: - path = _full_path(node, job['partition'], - datafile_metadata['name'], - job['policy']) + if fi_to_rebuild == resp_frag_index: + # TODO: With duplicated EC frags it's not unreasonable to + # find the very fragment we're trying to rebuild exists on + # another primary node. In this case we should stream it + # directly from the remote node to our target instead of + # rebuild. But instead we ignore it. + self.logger.debug( + 'Found existing frag #%s at %s while rebuilding to %s', + fi_to_rebuild, resp.full_path, + _full_path(node, partition, path, policy)) + return None - for timestamp, resp in sorted(buckets.items()): - etag = etag_buckets[timestamp] - durable = durable_buckets.get(timestamp) - self.logger.error( - 'Unable to get enough responses (%s/%s) to reconstruct ' - '%s %s frag#%s with ETag %s and timestamp %s' % ( - len(resp), job['policy'].ec_ndata, - 'durable' if durable else 'non-durable', - path, fi_to_rebuild, etag, timestamp.internal)) + durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp') + if durable_timestamp: + buckets[Timestamp(durable_timestamp)].durable = True - if error_resp_count: - durable = durable_buckets.get(Timestamp( - datafile_metadata['X-Timestamp'])) - self.logger.error( - 'Unable to get enough responses (%s error responses) ' - 'to reconstruct %s %s frag#%s' % ( - error_resp_count, - 'durable' if durable else 'non-durable', - path, fi_to_rebuild)) + if resp_frag_index not in bucket.useful_responses: + bucket.useful_responses[resp_frag_index] = resp + return bucket + return None - raise DiskFileError('Unable to reconstruct EC archive') + pile = GreenAsyncPile(len(nodes)) + for _node in nodes: + full_get_path = _full_path(_node, partition, path, policy) + pile.spawn(self._get_response, _node, partition, + path, headers, full_get_path) - rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( - responses[:job['policy'].ec_ndata], path, job['policy'], - fi_to_rebuild) - return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild, - rebuilt_fragment_iter) + for resp in pile: + bucket = handle_fragment_response(resp) + if bucket and len(bucket.useful_responses) >= policy.ec_ndata: + frag_indexes = list(bucket.useful_responses.keys()) + self.logger.debug('Reconstruct frag #%s with frag indexes %s' + % (fi_to_rebuild, frag_indexes)) + return bucket + return None + + def reconstruct_fa(self, job, node, datafile_metadata): + """ + Reconstructs a fragment archive - this method is called from ssync + after a remote node responds that is missing this object - the local + diskfile is opened to provide metadata - but to reconstruct the + missing fragment archive we must connect to multiple object servers. + + :param job: job from ssync_sender. + :param node: node to which we're rebuilding. + :param datafile_metadata: the datafile metadata to attach to + the rebuilt fragment archive + :returns: a DiskFile like class for use by ssync. + :raises DiskFileQuarantined: if the fragment archive cannot be + reconstructed and has as a result been quarantined. + :raises DiskFileError: if the fragment archive cannot be reconstructed. + """ + # KISS send out connection requests to all nodes, see what sticks. + # Use fragment preferences header to tell other nodes that we want + # fragments at the same timestamp as our fragment, and that they don't + # need to be durable. Accumulate responses into per-timestamp buckets + # and if any buckets gets enough responses then use those responses to + # rebuild. + policy = job['policy'] + partition = job['partition'] + # the fragment index we need to reconstruct is the position index + # of the node we're rebuilding to within the primary part list + fi_to_rebuild = node['backend_index'] + local_timestamp = Timestamp(datafile_metadata['X-Timestamp']) + path = datafile_metadata['name'] + + buckets = defaultdict(ResponseBucket) # map timestamp -> Bucket + error_responses = defaultdict(list) # map status code -> response list + + # don't try and fetch a fragment from the node we're rebuilding to + part_nodes = [n for n in policy.object_ring.get_part_nodes(partition) + if n['id'] != node['id']] + useful_bucket = self._make_fragment_requests( + job, node, datafile_metadata, buckets, error_responses, part_nodes) + + if useful_bucket: + responses = list(useful_bucket.useful_responses.values()) + rebuilt_fragment_iter = self.make_rebuilt_fragment_iter( + responses[:policy.ec_ndata], path, policy, fi_to_rebuild) + return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild, + rebuilt_fragment_iter) + + full_path = _full_path(node, partition, path, policy) + for timestamp, bucket in sorted(buckets.items()): + self.logger.error( + 'Unable to get enough responses (%s/%s from %s ok responses) ' + 'to reconstruct %s %s frag#%s with ETag %s and timestamp %s' % + (len(bucket.useful_responses), policy.ec_ndata, + bucket.num_responses, + 'durable' if bucket.durable else 'non-durable', + full_path, fi_to_rebuild, bucket.etag, timestamp.internal)) + + if error_responses: + durable = buckets[local_timestamp].durable + errors = ', '.join( + '%s x %s' % (len(responses), + 'unknown' if status == UNKNOWN_RESPONSE_STATUS + else status) + for status, responses in sorted(error_responses.items())) + self.logger.error( + 'Unable to get enough responses (%s error responses) ' + 'to reconstruct %s %s frag#%s' % ( + errors, 'durable' if durable else 'non-durable', + full_path, fi_to_rebuild)) + + raise DiskFileError('Unable to reconstruct EC archive') def _reconstruct(self, policy, fragment_payload, frag_index): return policy.pyeclib_driver.reconstruct(fragment_payload, diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 7ca6cbdddd..10a667af0f 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -791,16 +791,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): full_path='nada/nada') return resp - resp = do_test(200) - self.assertEqual(resp.status, 200) - - resp = do_test(400) - # on the error case return value will be None instead of response - self.assertIsNone(resp) - # ... and log warnings for 400 - for line in self.logger.get_lines_for_level('warning'): - self.assertIn('Invalid response 400', line) - self.logger._clear() + for status in (200, 400, 404, 503): + resp = do_test(status) + self.assertEqual(status, resp.status) resp = do_test(Exception()) self.assertIsNone(resp) @@ -818,20 +811,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertIn('Timeout', line) self.logger.clear() - # we should get a warning on 503 (sanity) - resp = do_test(503) - self.assertIsNone(resp) - warnings = self.logger.get_lines_for_level('warning') - self.assertEqual(1, len(warnings)) - self.assertIn('Invalid response 503', warnings[0]) - self.logger.clear() - - # ... but no messages should be emitted for 404 - resp = do_test(404) - self.assertIsNone(resp) - for level, msgs in self.logger.lines_dict.items(): - self.assertFalse(msgs) - def test_reconstructor_skips_bogus_partition_dirs(self): # A directory in the wrong place shouldn't crash the reconstructor self.reconstructor._reset_stats() @@ -4771,7 +4750,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): for line in error_lines[:-1]: self.assertIn("Trying to GET", line) self.assertIn( - 'Unable to get enough responses (%s error responses)' + 'Unable to get enough responses (%s x unknown error responses)' % (policy.object_ring.replicas - 1), error_lines[-1], "Unexpected error line found: %s" % error_lines[-1]) @@ -4796,13 +4775,50 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # only 1 log to report not enough responses self.assertEqual(1, len(error_lines)) self.assertIn( - 'Unable to get enough responses (%s error responses)' + 'Unable to get enough responses (%s x 404 error responses)' % (policy.object_ring.replicas - 1), error_lines[0], "Unexpected error line found: %s" % error_lines[0]) # no warning self.assertFalse(self.logger.get_lines_for_level('warning')) + def test_reconstruct_fa_mixture_of_errors_fails(self): + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) + policy = self.policy + + # ensure at least one of each error type + possible_errors = [Timeout(), 404, 507] + codes = possible_errors + [random.choice(possible_errors) for i in + range(policy.object_ring.replicas - 4)] + with mocked_http_conn(*codes): + self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, + job, node, self.obj_metadata) + exp_timeouts = len([c for c in codes if isinstance(c, Timeout)]) + exp_404s = len([c for c in codes if c == 404]) + exp_507s = len([c for c in codes if c == 507]) + error_lines = self.logger.get_lines_for_level('error') + # 1 error log to report not enough responses and possibly some to + # report Timeouts + self.assertEqual(len(error_lines), exp_timeouts + 1, error_lines) + for line in error_lines[:-1]: + self.assertIn("Trying to GET", line) + self.assertIn( + 'Unable to get enough responses ' + '(%s x unknown, %s x 404, %s x 507 error responses)' + % (exp_timeouts, exp_404s, exp_507s), error_lines[-1], + "Unexpected error line found: %s" % error_lines[-1]) + # no warning + warning_lines = self.logger.get_lines_for_level('warning') + self.assertEqual(exp_507s, len(warning_lines), warning_lines) + for line in warning_lines: + self.assertIn('Invalid response 507', line) + def test_reconstruct_fa_with_mixed_old_etag(self): job = { 'partition': 0, @@ -4976,7 +4992,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): error_log_lines[0]) self.assertFalse(self.logger.get_lines_for_level('warning')) - def test_reconstruct_fa_with_mixed_not_enough_etags_fail(self): + def test_reconstruct_fa_with_mixed_timestamps_etags_fail(self): job = { 'partition': 0, 'policy': self.policy, @@ -5040,13 +5056,12 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # following error lines del ec_archive_dict[(expected_etag, ts, durable)] - expected = 'Unable to get enough responses (%s/10) to ' \ - 'reconstruct %s 10.0.0.1:1001/sdb/0%s policy#0 ' \ - 'frag#1 with ETag %s and timestamp %s' % \ - (etag_count[expected_etag], + expected = 'Unable to get enough responses (%s/10 from %s ok ' \ + 'responses) to reconstruct %s 10.0.0.1:1001/sdb/0%s ' \ + 'policy#0 frag#1 with ETag %s and timestamp %s' %\ + (etag_count[expected_etag], etag_count[expected_etag], 'durable' if durable else 'non-durable', - self.obj_path.decode('utf8'), - expected_etag, ts) + self.obj_path.decode('utf8'), expected_etag, ts) self.assertIn( expected, error_line, "Unexpected error line found: Expected: %s Got: %s" @@ -5054,7 +5069,84 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor): # no warning self.assertFalse(self.logger.get_lines_for_level('warning')) - def test_reconstruct_fa_finds_itself_does_not_fail(self): + def test_reconstruct_fa_with_mixed_etags_same_timestamp_fail(self): + job = { + 'partition': 0, + 'policy': self.policy, + } + part_nodes = self.policy.object_ring.get_part_nodes(0) + node = part_nodes[1] + node['backend_index'] = self.policy.get_backend_index(node['index']) + + test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777] + ec_archive_dict = dict() + ts = next(make_timestamp_iter()) + # create 3 different ec bodies + for i in range(3): + body = test_data[i:] + archive_bodies = encode_frag_archive_bodies(self.policy, body) + # pop the index to the destination node + archive_bodies.pop(1) + key = (md5(body, usedforsecurity=False).hexdigest(), + ts.internal, bool(i % 2)) + ec_archive_dict[key] = archive_bodies + + responses = list() + # fill out response list by 3 different etag bodies, same timestamp + for etag, ts, durable in itertools.cycle(ec_archive_dict): + body = ec_archive_dict[(etag, ts, durable)].pop(0) + headers = get_header_frag_index(self, body) + headers.update({'X-Object-Sysmeta-Ec-Etag': etag, + 'X-Backend-Timestamp': ts}) + if durable: + headers['X-Backend-Durable-Timestamp'] = ts + responses.append((200, body, headers)) + if len(responses) >= (self.policy.object_ring.replicas - 1): + break + + # sanity, there is 3 different etag and each etag + # doesn't have > ec_k bodies + etag_count = collections.Counter( + [in_resp_headers['X-Object-Sysmeta-Ec-Etag'] + for _, _, in_resp_headers in responses]) + self.assertEqual(3, len(etag_count)) + for etag, count in etag_count.items(): + self.assertLess(count, self.policy.ec_ndata) + + codes, body_iter, headers = zip(*responses) + with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): + self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, + job, node, self.obj_metadata) + + error_lines = self.logger.get_lines_for_level('error') + self.assertGreater(len(error_lines), 1) + for expected_etag, ts, durable in ec_archive_dict: + if expected_etag in error_lines[-1]: + break + else: + self.fail( + "no expected etag %s found: %s" % + (list(ec_archive_dict), error_lines[0])) + + other_etags_count = sum(count for etag, count in etag_count.items() + if etag != expected_etag) + self.assertEqual(other_etags_count + 1, len(error_lines)) + for line in error_lines[:-1]: + self.assertIn('Mixed Etag', line) + expected = 'Unable to get enough responses (%s/10 from %s ok ' \ + 'responses) to reconstruct %s 10.0.0.1:1001/sdb/0%s ' \ + 'policy#0 frag#1 with ETag %s and timestamp %s' % \ + (etag_count[expected_etag], len(responses), + 'durable' if durable else 'non-durable', + self.obj_path.decode('utf8'), expected_etag, ts) + self.assertIn( + expected, error_lines[-1], + "Unexpected error line found: Expected: %s Got: %s" + % (expected, error_lines[0])) + # no warning + self.assertFalse(self.logger.get_lines_for_level('warning')) + + def test_reconstruct_fa_finds_missing_frag_does_not_fail(self): # verify that reconstruction of a missing frag can cope with finding # that missing frag in the responses it gets from other nodes while # attempting to rebuild the missing frag diff --git a/test/unit/obj/test_ssync.py b/test/unit/obj/test_ssync.py index 3646e7ef60..1322e89c32 100644 --- a/test/unit/obj/test_ssync.py +++ b/test/unit/obj/test_ssync.py @@ -853,6 +853,7 @@ class FakeResponse(object): self.obj_data = obj_data self.data = b'' self.length = length + self.status = 200 def init(self, path): if isinstance(self.obj_data, Exception):