EC Reconstructor: Do not reconstruct existing fragments.

The EC reconstructor needs to verify that the fragment needing to
be reconstructed does not reside in the collection of node responses.
Otherwise, resources will be spent unnecessarily reconstructing
the fragment. Moreover, this could cause a segfault on some backends.

This change adds the necessary verification steps to make sure
that a fragment will only be rebuilt in the case it is missing from
the other fragment archives.

Added some tests to provide coverage for these scenarios.

Change-Id: I91f3d4af52cbc66c9f7ce00726f247b5462e66f9
Closes-Bug: #1452553
This commit is contained in:
Minwoo Bae 2015-06-18 14:21:06 -05:00
parent 09e7477a39
commit 44b76a1b1b
2 changed files with 153 additions and 37 deletions

View File

@ -249,6 +249,13 @@ class ObjectReconstructor(Daemon):
if not resp: if not resp:
continue continue
resp.headers = HeaderKeyDict(resp.getheaders()) resp.headers = HeaderKeyDict(resp.getheaders())
if str(fi_to_rebuild) == \
resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index'):
continue
if resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') in set(
r.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
for r in responses):
continue
responses.append(resp) responses.append(resp)
etag = sorted(responses, reverse=True, etag = sorted(responses, reverse=True,
key=lambda r: Timestamp( key=lambda r: Timestamp(

View File

@ -23,6 +23,7 @@ import time
import shutil import shutil
import re import re
import random import random
import struct
from eventlet import Timeout from eventlet import Timeout
from contextlib import closing, nested, contextmanager from contextlib import closing, nested, contextmanager
@ -126,6 +127,14 @@ def count_stats(logger, key, metric):
return count return count
def get_header_frag_index(self, body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
}
@patch_policies([StoragePolicy(0, name='zero', is_default=True), @patch_policies([StoragePolicy(0, name='zero', is_default=True),
ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand', ECStoragePolicy(1, name='one', ec_type='jerasure_rs_vand',
ec_ndata=2, ec_nparity=1)]) ec_ndata=2, ec_nparity=1)])
@ -2309,9 +2318,13 @@ class TestObjectReconstructor(unittest.TestCase):
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
responses = list((200, body) for body in ec_archive_bodies) responses = list()
headers = {'X-Object-Sysmeta-Ec-Etag': etag} for body in ec_archive_bodies:
codes, body_iter = zip(*responses) headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, metadata) job, node, metadata)
@ -2339,17 +2352,21 @@ class TestObjectReconstructor(unittest.TestCase):
broken_body = ec_archive_bodies.pop(4) broken_body = ec_archive_bodies.pop(4)
base_responses = list((200, body) for body in ec_archive_bodies) base_responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
base_responses.append((200, body, headers))
# since we're already missing a fragment a +2 scheme can only support # since we're already missing a fragment a +2 scheme can only support
# one additional failure at a time # one additional failure at a time
for error in (Timeout(), 404, Exception('kaboom!')): for error in (Timeout(), 404, Exception('kaboom!')):
responses = list(base_responses) responses = base_responses
error_index = random.randint(0, len(responses) - 1) error_index = random.randint(0, len(responses) - 1)
responses[error_index] = (error, '') responses[error_index] = (error, '', '')
headers = {'X-Object-Sysmeta-Ec-Etag': etag} codes, body_iter, headers_iter = zip(*responses)
codes, body_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata)) job, node, dict(metadata))
fixed_body = ''.join(df.reader()) fixed_body = ''.join(df.reader())
@ -2379,16 +2396,19 @@ class TestObjectReconstructor(unittest.TestCase):
# the scheme is 10+4, so this gets a parity node # the scheme is 10+4, so this gets a parity node
broken_body = ec_archive_bodies.pop(-4) broken_body = ec_archive_bodies.pop(-4)
base_responses = list((200, body) for body in ec_archive_bodies) responses = list()
for body in ec_archive_bodies:
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
responses.append((200, body, headers))
for error in (Timeout(), 404, Exception('kaboom!')): for error in (Timeout(), 404, Exception('kaboom!')):
responses = list(base_responses)
# grab a data node index # grab a data node index
error_index = random.randint(0, self.policy.ec_ndata - 1) error_index = random.randint(0, self.policy.ec_ndata - 1)
responses[error_index] = (error, '') responses[error_index] = (error, '', '')
headers = {'X-Object-Sysmeta-Ec-Etag': etag} codes, body_iter, headers_iter = zip(*responses)
codes, body_iter = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, with mocked_http_conn(*codes, body_iter=body_iter,
headers=headers): headers=headers_iter):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata)) job, node, dict(metadata))
fixed_body = ''.join(df.reader()) fixed_body = ''.join(df.reader())
@ -2435,23 +2455,28 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# bad response # bad response
bad_response = (200, '', { bad_headers = {
'X-Object-Sysmeta-Ec-Etag': 'some garbage', 'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal, 'X-Backend-Timestamp': next(ts).internal,
}) }
# good responses # good responses
headers = { responses = list()
'X-Object-Sysmeta-Ec-Etag': etag, t1 = next(ts).internal
'X-Backend-Timestamp': next(ts).internal for body in ec_archive_bodies:
} headers = get_header_frag_index(self, body)
responses = [(200, body, headers) headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
for body in ec_archive_bodies] 'X-Backend-Timestamp': t1})
responses.append((200, body, headers))
# mixed together # mixed together
error_index = random.randint(0, len(responses) - 2) error_index = random.randint(0, self.policy.ec_ndata)
error_headers = get_header_frag_index(self,
(responses[error_index])[1])
error_headers.update(bad_headers)
bad_response = (200, '', bad_headers)
responses[error_index] = bad_response responses[error_index] = bad_response
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
@ -2480,18 +2505,19 @@ class TestObjectReconstructor(unittest.TestCase):
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data) ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1) broken_body = ec_archive_bodies.pop(1)
ts = (utils.Timestamp(t) for t in itertools.count(int(time.time()))) ts = (utils.Timestamp(t) for t in itertools.count(int(time.time())))
# good responses # good responses
headers = { responses = list()
'X-Object-Sysmeta-Ec-Etag': etag, t0 = next(ts).internal
'X-Backend-Timestamp': next(ts).internal for body in ec_archive_bodies:
} headers = get_header_frag_index(self, body)
responses = [(200, body, headers) headers.update({'X-Object-Sysmeta-Ec-Etag': etag,
for body in ec_archive_bodies] 'X-Backend-Timestamp': t0})
codes, body_iter, headers = zip(*responses) responses.append((200, body, headers))
# sanity check before negative test # sanity check before negative test
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, dict(metadata)) job, node, dict(metadata))
@ -2501,17 +2527,100 @@ class TestObjectReconstructor(unittest.TestCase):
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# one newer etag can spoil the bunch # one newer etag can spoil the bunch
new_response = (200, '', {
'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal,
})
new_index = random.randint(0, len(responses) - self.policy.ec_nparity) new_index = random.randint(0, len(responses) - self.policy.ec_nparity)
new_headers = get_header_frag_index(self, (responses[new_index])[1])
new_headers.update({'X-Object-Sysmeta-Ec-Etag': 'some garbage',
'X-Backend-Timestamp': next(ts).internal})
new_response = (200, '', new_headers)
responses[new_index] = new_response responses[new_index] = new_response
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa, self.assertRaises(DiskFileError, self.reconstructor.reconstruct_fa,
job, node, dict(metadata)) job, node, dict(metadata))
def test_reconstruct_fa_finds_itself_does_not_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
broken_body = ec_archive_bodies[1]
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
etag = md5(test_data).hexdigest()
ec_archive_bodies = make_ec_archive_bodies(self.policy, test_data)
broken_body = ec_archive_bodies.pop(1)
# add some duplicates
num_duplicates = self.policy.ec_nparity - 1
ec_archive_bodies = (ec_archive_bodies[:num_duplicates] +
ec_archive_bodies)[:-num_duplicates]
def make_header(body):
metadata = self.policy.pyeclib_driver.get_metadata(body)
frag_index = struct.unpack('h', metadata[:2])[0]
return {
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Object-Sysmeta-Ec-Etag': etag,
}
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()