Browse Source

Merge "reconstructor: log more details when rebuild fails"

changes/02/774002/15
Zuul 2 weeks ago
committed by Gerrit Code Review
parent
commit
020a13ed3c
3 changed files with 310 additions and 142 deletions
  1. +183
    -108
      swift/obj/reconstructor.py
  2. +126
    -34
      test/unit/obj/test_reconstructor.py
  3. +1
    -0
      test/unit/obj/test_ssync.py

+ 183
- 108
swift/obj/reconstructor.py View File

@ -48,6 +48,7 @@ from swift.common.exceptions import ConnectionTimeout, DiskFileError, \
SuffixSyncError, PartitionLockTimeout
SYNC, REVERT = ('sync_only', 'sync_revert')
UNKNOWN_RESPONSE_STATUS = 0 # used as response status for timeouts, exceptions
def _get_partners(node_index, part_nodes):
@ -94,6 +95,22 @@ def _full_path(node, part, relative_path, policy):
}
class ResponseBucket(object):
"""
Encapsulates fragment GET response data related to a single timestamp.
"""
def __init__(self):
# count of all responses associated with this Bucket
self.num_responses = 0
# map {frag_index: response} for subset of responses that
# could be used to rebuild the missing fragment
self.useful_responses = {}
# set if a durable timestamp was seen in responses
self.durable = False
# etag of the first response associated with the Bucket
self.etag = None
class RebuildingECDiskFileStream(object):
"""
This class wraps the reconstructed fragment archive data and
@ -371,36 +388,33 @@ class ObjectReconstructor(Daemon):
with Timeout(self.node_timeout):
resp = conn.getresponse()
resp.full_path = full_path
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': full_path})
resp = None
elif resp.status == HTTP_NOT_FOUND:
resp = None
resp.node = node
except (Exception, Timeout):
self.logger.exception(
_("Trying to GET %(full_path)s"), {
'full_path': full_path})
return resp
def reconstruct_fa(self, job, node, datafile_metadata):
def _make_fragment_requests(self, job, node, datafile_metadata, buckets,
error_responses, nodes):
"""
Reconstructs a fragment archive - this method is called from ssync
after a remote node responds that is missing this object - the local
diskfile is opened to provide metadata - but to reconstruct the
missing fragment archive we must connect to multiple object servers.
Issue requests for fragments to the list of ``nodes`` and sort the
responses into per-timestamp ``buckets`` or per-status
``error_responses``. If any bucket accumulates sufficient responses to
rebuild the missing fragment then return that bucket.
:param job: job from ssync_sender
:param node: node that we're rebuilding to
:param job: job from ssync_sender.
:param node: node to which we're rebuilding.
:param datafile_metadata: the datafile metadata to attach to
the rebuilt fragment archive
:returns: a DiskFile like class for use by ssync
:raises DiskFileError: if the fragment archive cannot be reconstructed
:param buckets: dict of per-timestamp buckets for ok responses.
:param error_responses: dict of per-status lists of error responses.
:param nodes: A list of nodes.
:return: A per-timestamp with sufficient responses, or None if
there is no such bucket.
"""
# don't try and fetch a fragment from the node we're rebuilding to
part_nodes = [n for n in job['policy'].object_ring.get_part_nodes(
job['partition']) if n['id'] != node['id']]
policy = job['policy']
partition = job['partition']
# the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list
@ -409,29 +423,37 @@ class ObjectReconstructor(Daemon):
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
# fragments at the same timestamp as our fragment, and that they don't
# need to be durable.
# need to be durable. Accumulate responses into per-timestamp buckets
# and if any buckets gets enough responses then use those responses to
# rebuild.
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
headers['X-Backend-Storage-Policy-Index'] = int(policy)
headers['X-Backend-Replication'] = 'True'
frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'],
'exclude': []}]
local_timestamp = Timestamp(datafile_metadata['X-Timestamp'])
frag_prefs = [{'timestamp': local_timestamp.normal, 'exclude': []}]
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
pile = GreenAsyncPile(len(part_nodes))
path = datafile_metadata['name']
for _node in part_nodes:
full_get_path = _full_path(
_node, job['partition'], path, job['policy'])
pile.spawn(self._get_response, _node, job['partition'],
path, headers, full_get_path)
buckets = defaultdict(dict)
durable_buckets = {}
etag_buckets = {}
error_resp_count = 0
for resp in pile:
def handle_fragment_response(resp):
"""
Place ok responses into a per-timestamp bucket. Append bad
responses to a list per-status-code in error_responses.
:return: the per-timestamp bucket if the response is ok, otherwise
None.
"""
if not resp:
error_resp_count += 1
continue
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
if resp.status not in [HTTP_OK, HTTP_NOT_FOUND]:
self.logger.warning(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': resp.full_path})
if resp.status != HTTP_OK:
error_responses[resp.status].append(resp)
return None
resp.headers = HeaderKeyDict(resp.getheaders())
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
@ -445,90 +467,143 @@ class ObjectReconstructor(Daemon):
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
resp.full_path, frag_index)
continue
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to find
# the very fragment we're trying to rebuild exists on another
# primary node. In this case we should stream it directly from
# the remote node to our target instead of rebuild. But
# instead we ignore it.
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
_full_path(
node, job['partition'], datafile_metadata['name'],
job['policy']))
continue
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing X-Backend-Timestamp)',
resp.full_path, resp_frag_index)
continue
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
timestamp = Timestamp(timestamp)
durable = resp.headers.get('X-Backend-Durable-Timestamp')
if durable:
durable_buckets[Timestamp(durable)] = True
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing Etag)',
resp.full_path, resp_frag_index)
continue
self.logger.warning(
'Invalid resp from %s, frag index %s (missing Etag)',
resp.full_path, resp_frag_index)
error_responses[UNKNOWN_RESPONSE_STATUS].append(resp)
return None
bucket = buckets[timestamp]
bucket.num_responses += 1
if bucket.etag is None:
bucket.etag = etag
elif bucket.etag != etag:
self.logger.error('Mixed Etag (%s, %s) for %s frag#%s',
etag, bucket.etag,
_full_path(node, partition, path, policy),
fi_to_rebuild)
return None
if etag != etag_buckets.setdefault(timestamp, etag):
self.logger.error(
'Mixed Etag (%s, %s) for %s frag#%s',
etag, etag_buckets[timestamp],
_full_path(node, job['partition'],
datafile_metadata['name'], job['policy']),
fi_to_rebuild)
continue
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to
# find the very fragment we're trying to rebuild exists on
# another primary node. In this case we should stream it
# directly from the remote node to our target instead of
# rebuild. But instead we ignore it.
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
_full_path(node, partition, path, policy))
return None
durable_timestamp = resp.headers.get('X-Backend-Durable-Timestamp')
if durable_timestamp:
buckets[Timestamp(durable_timestamp)].durable = True
if resp_frag_index not in bucket.useful_responses:
bucket.useful_responses[resp_frag_index] = resp
return bucket
return None
pile = GreenAsyncPile(len(nodes))
for _node in nodes:
full_get_path = _full_path(_node, partition, path, policy)
pile.spawn(self._get_response, _node, partition,
path, headers, full_get_path)
if resp_frag_index not in buckets[timestamp]:
buckets[timestamp][resp_frag_index] = resp
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
responses = list(buckets[timestamp].values())
self.logger.debug(
'Reconstruct frag #%s with frag indexes %s'
% (fi_to_rebuild, list(buckets[timestamp])))
break
else:
path = _full_path(node, job['partition'],
datafile_metadata['name'],
job['policy'])
for timestamp, resp in sorted(buckets.items()):
etag = etag_buckets[timestamp]
durable = durable_buckets.get(timestamp)
self.logger.error(
'Unable to get enough responses (%s/%s) to reconstruct '
'%s %s frag#%s with ETag %s and timestamp %s' % (
len(resp), job['policy'].ec_ndata,
'durable' if durable else 'non-durable',
path, fi_to_rebuild, etag, timestamp.internal))
if error_resp_count:
durable = durable_buckets.get(Timestamp(
datafile_metadata['X-Timestamp']))
self.logger.error(
'Unable to get enough responses (%s error responses) '
'to reconstruct %s %s frag#%s' % (
error_resp_count,
'durable' if durable else 'non-durable',
path, fi_to_rebuild))
raise DiskFileError('Unable to reconstruct EC archive')
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
responses[:job['policy'].ec_ndata], path, job['policy'],
fi_to_rebuild)
return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild,
rebuilt_fragment_iter)
for resp in pile:
bucket = handle_fragment_response(resp)
if bucket and len(bucket.useful_responses) >= policy.ec_ndata:
frag_indexes = list(bucket.useful_responses.keys())
self.logger.debug('Reconstruct frag #%s with frag indexes %s'
% (fi_to_rebuild, frag_indexes))
return bucket
return None
def reconstruct_fa(self, job, node, datafile_metadata):
"""
Reconstructs a fragment archive - this method is called from ssync
after a remote node responds that is missing this object - the local
diskfile is opened to provide metadata - but to reconstruct the
missing fragment archive we must connect to multiple object servers.
:param job: job from ssync_sender.
:param node: node to which we're rebuilding.
:param datafile_metadata: the datafile metadata to attach to
the rebuilt fragment archive
:returns: a DiskFile like class for use by ssync.
:raises DiskFileQuarantined: if the fragment archive cannot be
reconstructed and has as a result been quarantined.
:raises DiskFileError: if the fragment archive cannot be reconstructed.
"""
# KISS send out connection requests to all nodes, see what sticks.
# Use fragment preferences header to tell other nodes that we want
# fragments at the same timestamp as our fragment, and that they don't
# need to be durable. Accumulate responses into per-timestamp buckets
# and if any buckets gets enough responses then use those responses to
# rebuild.
policy = job['policy']
partition = job['partition']
# the fragment index we need to reconstruct is the position index
# of the node we're rebuilding to within the primary part list
fi_to_rebuild = node['backend_index']
local_timestamp = Timestamp(datafile_metadata['X-Timestamp'])
path = datafile_metadata['name']
buckets = defaultdict(ResponseBucket) # map timestamp -> Bucket
error_responses = defaultdict(list) # map status code -> response list
# don't try and fetch a fragment from the node we're rebuilding to
part_nodes = [n for n in policy.object_ring.get_part_nodes(partition)
if n['id'] != node['id']]
useful_bucket = self._make_fragment_requests(
job, node, datafile_metadata, buckets, error_responses, part_nodes)
if useful_bucket:
responses = list(useful_bucket.useful_responses.values())
rebuilt_fragment_iter = self.make_rebuilt_fragment_iter(
responses[:policy.ec_ndata], path, policy, fi_to_rebuild)
return RebuildingECDiskFileStream(datafile_metadata, fi_to_rebuild,
rebuilt_fragment_iter)
full_path = _full_path(node, partition, path, policy)
for timestamp, bucket in sorted(buckets.items()):
self.logger.error(
'Unable to get enough responses (%s/%s from %s ok responses) '
'to reconstruct %s %s frag#%s with ETag %s and timestamp %s' %
(len(bucket.useful_responses), policy.ec_ndata,
bucket.num_responses,
'durable' if bucket.durable else 'non-durable',
full_path, fi_to_rebuild, bucket.etag, timestamp.internal))
if error_responses:
durable = buckets[local_timestamp].durable
errors = ', '.join(
'%s x %s' % (len(responses),
'unknown' if status == UNKNOWN_RESPONSE_STATUS
else status)
for status, responses in sorted(error_responses.items()))
self.logger.error(
'Unable to get enough responses (%s error responses) '
'to reconstruct %s %s frag#%s' % (
errors, 'durable' if durable else 'non-durable',
full_path, fi_to_rebuild))
raise DiskFileError('Unable to reconstruct EC archive')
def _reconstruct(self, policy, fragment_payload, frag_index):
return policy.pyeclib_driver.reconstruct(fragment_payload,


+ 126
- 34
test/unit/obj/test_reconstructor.py View File

@ -791,16 +791,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
full_path='nada/nada')
return resp
resp = do_test(200)
self.assertEqual(resp.status, 200)
resp = do_test(400)
# on the error case return value will be None instead of response
self.assertIsNone(resp)
# ... and log warnings for 400
for line in self.logger.get_lines_for_level('warning'):
self.assertIn('Invalid response 400', line)
self.logger._clear()
for status in (200, 400, 404, 503):
resp = do_test(status)
self.assertEqual(status, resp.status)
resp = do_test(Exception())
self.assertIsNone(resp)
@ -818,20 +811,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertIn('Timeout', line)
self.logger.clear()
# we should get a warning on 503 (sanity)
resp = do_test(503)
self.assertIsNone(resp)
warnings = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warnings))
self.assertIn('Invalid response 503', warnings[0])
self.logger.clear()
# ... but no messages should be emitted for 404
resp = do_test(404)
self.assertIsNone(resp)
for level, msgs in self.logger.lines_dict.items():
self.assertFalse(msgs)
def test_reconstructor_skips_bogus_partition_dirs(self):
# A directory in the wrong place shouldn't crash the reconstructor
self.reconstructor._reset_stats()
@ -4771,7 +4750,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
for line in error_lines[:-1]:
self.assertIn("Trying to GET", line)
self.assertIn(
'Unable to get enough responses (%s error responses)'
'Unable to get enough responses (%s x unknown error responses)'
% (policy.object_ring.replicas - 1),
error_lines[-1],
"Unexpected error line found: %s" % error_lines[-1])
@ -4796,13 +4775,50 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# only 1 log to report not enough responses
self.assertEqual(1, len(error_lines))
self.assertIn(
'Unable to get enough responses (%s error responses)'
'Unable to get enough responses (%s x 404 error responses)'
% (policy.object_ring.replicas - 1),
error_lines[0],
"Unexpected error line found: %s" % error_lines[0])
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_mixture_of_errors_fails(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
policy = self.policy
# ensure at least one of each error type
possible_errors = [Timeout(), 404, 507]
codes = possible_errors + [random.choice(possible_errors) for i in
range(policy.object_ring.replicas - 4)]
with mocked_http_conn(*codes):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata)
exp_timeouts = len([c for c in codes if isinstance(c, Timeout)])
exp_404s = len([c for c in codes if c == 404])
exp_507s = len([c for c in codes if c == 507])
error_lines = self.logger.get_lines_for_level('error')
# 1 error log to report not enough responses and possibly some to
# report Timeouts
self.assertEqual(len(error_lines), exp_timeouts + 1, error_lines)
for line in error_lines[:-1]:
self.assertIn("Trying to GET", line)
self.assertIn(
'Unable to get enough responses '
'(%s x unknown, %s x 404, %s x 507 error responses)'
% (exp_timeouts, exp_404s, exp_507s), error_lines[-1],
"Unexpected error line found: %s" % error_lines[-1])
# no warning
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(exp_507s, len(warning_lines), warning_lines)
for line in warning_lines:
self.assertIn('Invalid response 507', line)
def test_reconstruct_fa_with_mixed_old_etag(self):
job = {
'partition': 0,
@ -4976,7 +4992,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
error_log_lines[0])
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_with_mixed_not_enough_etags_fail(self):
def test_reconstruct_fa_with_mixed_timestamps_etags_fail(self):
job = {
'partition': 0,
'policy': self.policy,
@ -5040,13 +5056,12 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# following error lines
del ec_archive_dict[(expected_etag, ts, durable)]
expected = 'Unable to get enough responses (%s/10) to ' \
'reconstruct %s 10.0.0.1:1001/sdb/0%s policy#0 ' \
'frag#1 with ETag %s and timestamp %s' % \
(etag_count[expected_etag],
expected = 'Unable to get enough responses (%s/10 from %s ok ' \
'responses) to reconstruct %s 10.0.0.1:1001/sdb/0%s ' \
'policy#0 frag#1 with ETag %s and timestamp %s' %\
(etag_count[expected_etag], etag_count[expected_etag],
'durable' if durable else 'non-durable',
self.obj_path.decode('utf8'),
expected_etag, ts)
self.obj_path.decode('utf8'), expected_etag, ts)
self.assertIn(
expected, error_line,
"Unexpected error line found: Expected: %s Got: %s"
@ -5054,7 +5069,84 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_finds_itself_does_not_fail(self):
def test_reconstruct_fa_with_mixed_etags_same_timestamp_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
node['backend_index'] = self.policy.get_backend_index(node['index'])
test_data = (b'rebuild' * self.policy.ec_segment_size)[:-777]
ec_archive_dict = dict()
ts = next(make_timestamp_iter())
# create 3 different ec bodies
for i in range(3):
body = test_data[i:]
archive_bodies = encode_frag_archive_bodies(self.policy, body)
# pop the index to the destination node
archive_bodies.pop(1)
key = (md5(body, usedforsecurity=False).hexdigest(),
ts.internal, bool(i % 2))
ec_archive_dict[key] = archive_bodies
responses = list()
# fill out response list by 3 different etag bodies, same timestamp
for etag, ts, durable in itertools.cycle(ec_archive_dict):
body = ec_archive_dict[(etag, ts, durable)].pop(0)
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': ts})
if durable:
headers['X-Backend-Durable-Timestamp'] = ts
responses.append((200, body, headers))
if len(responses) >= (self.policy.object_ring.replicas - 1):
break
# sanity, there is 3 different etag and each etag
# doesn't have > ec_k bodies
etag_count = collections.Counter(
[in_resp_headers['X-Object-Sysmeta-Ec-Etag']
for _, _, in_resp_headers in responses])
self.assertEqual(3, len(etag_count))
for etag, count in etag_count.items():
self.assertLess(count, self.policy.ec_ndata)
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, self.obj_metadata)
error_lines = self.logger.get_lines_for_level('error')
self.assertGreater(len(error_lines), 1)
for expected_etag, ts, durable in ec_archive_dict:
if expected_etag in error_lines[-1]:
break
else:
self.fail(
"no expected etag %s found: %s" %
(list(ec_archive_dict), error_lines[0]))
other_etags_count = sum(count for etag, count in etag_count.items()
if etag != expected_etag)
self.assertEqual(other_etags_count + 1, len(error_lines))
for line in error_lines[:-1]:
self.assertIn('Mixed Etag', line)
expected = 'Unable to get enough responses (%s/10 from %s ok ' \
'responses) to reconstruct %s 10.0.0.1:1001/sdb/0%s ' \
'policy#0 frag#1 with ETag %s and timestamp %s' % \
(etag_count[expected_etag], len(responses),
'durable' if durable else 'non-durable',
self.obj_path.decode('utf8'), expected_etag, ts)
self.assertIn(
expected, error_lines[-1],
"Unexpected error line found: Expected: %s Got: %s"
% (expected, error_lines[0]))
# no warning
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_finds_missing_frag_does_not_fail(self):
# verify that reconstruction of a missing frag can cope with finding
# that missing frag in the responses it gets from other nodes while
# attempting to rebuild the missing frag


+ 1
- 0
test/unit/obj/test_ssync.py View File

@ -853,6 +853,7 @@ class FakeResponse(object):
self.obj_data = obj_data
self.data = b''
self.length = length
self.status = 200
def init(self, path):
if isinstance(self.obj_data, Exception):


Loading…
Cancel
Save