Merge "Don't apply the wrong Etag validation to rebuilt fragments"

This commit is contained in:
Jenkins 2015-04-16 12:01:58 +00:00 committed by Gerrit Code Review
commit 399a66fb12
4 changed files with 143 additions and 34 deletions

View File

@ -49,6 +49,21 @@ SYNC, REVERT = ('sync_only', 'sync_revert')
hubs.use_hub(get_hub()) hubs.use_hub(get_hub())
def _get_partners(frag_index, part_nodes):
"""
Returns the left and right partners of the node whose index is
equal to the given frag_index.
:param frag_index: a fragment index
:param part_nodes: a list of primary nodes
:returns: [<node-to-left>, <node-to-right>]
"""
return [
part_nodes[(frag_index - 1) % len(part_nodes)],
part_nodes[(frag_index + 1) % len(part_nodes)],
]
class RebuildingECDiskFileStream(object): class RebuildingECDiskFileStream(object):
""" """
This class wraps the the reconstructed fragment archive data and This class wraps the the reconstructed fragment archive data and
@ -65,7 +80,8 @@ class RebuildingECDiskFileStream(object):
# update the FI and delete the ETag, the obj server will # update the FI and delete the ETag, the obj server will
# recalc on the other side... # recalc on the other side...
self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
del self.metadata['ETag'] for etag_key in ('ETag', 'Etag'):
self.metadata.pop(etag_key, None)
self.frag_index = frag_index self.frag_index = frag_index
self.rebuilt_fragment_iter = rebuilt_fragment_iter self.rebuilt_fragment_iter = rebuilt_fragment_iter
@ -382,20 +398,6 @@ class ObjectReconstructor(Daemon):
self.kill_coros() self.kill_coros()
self.last_reconstruction_count = self.reconstruction_count self.last_reconstruction_count = self.reconstruction_count
def _get_partners(self, frag_index, part_nodes):
"""
Returns the left and right partners of the node whose index is
equal to the given frag_index.
:param frag_index: a fragment index
:param part_nodes: a list of primary nodes
:returns: [<node-to-left>, <node-to-right>]
"""
return [
part_nodes[(frag_index - 1) % len(part_nodes)],
part_nodes[(frag_index + 1) % len(part_nodes)],
]
def _get_hashes(self, policy, path, recalculate=None, do_listdir=False): def _get_hashes(self, policy, path, recalculate=None, do_listdir=False):
df_mgr = self._df_router[policy] df_mgr = self._df_router[policy]
hashed, suffix_hashes = tpool_reraise( hashed, suffix_hashes = tpool_reraise(
@ -715,7 +717,7 @@ class ObjectReconstructor(Daemon):
job_type=SYNC, job_type=SYNC,
frag_index=frag_index, frag_index=frag_index,
suffixes=suffixes, suffixes=suffixes,
sync_to=self._get_partners(frag_index, part_nodes), sync_to=_get_partners(frag_index, part_nodes),
) )
# ssync callback to rebuild missing fragment_archives # ssync callback to rebuild missing fragment_archives
sync_job['sync_diskfile_builder'] = self.reconstruct_fa sync_job['sync_diskfile_builder'] = self.reconstruct_fa

View File

@ -299,6 +299,11 @@ class ProbeTest(unittest.TestCase):
path_parts.append(str(part)) path_parts.append(str(part))
return os.path.join(*path_parts) return os.path.join(*path_parts)
def config_number(self, node):
_server_type, config_number = get_server_number(
node['port'], self.port2server)
return config_number
def get_to_final_state(self): def get_to_final_state(self):
# these .stop()s are probably not strictly necessary, # these .stop()s are probably not strictly necessary,
# but may prevent race conditions # but may prevent race conditions

View File

@ -18,6 +18,9 @@ from hashlib import md5
import unittest import unittest
import uuid import uuid
import os import os
import random
import shutil
from collections import defaultdict
from test.probe.common import ECProbeTest from test.probe.common import ECProbeTest
@ -25,6 +28,7 @@ from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY from swift.common.storage_policy import EC_POLICY
from swift.common.manager import Manager from swift.common.manager import Manager
from swift.common.utils import renamer from swift.common.utils import renamer
from swift.obj import reconstructor
from swiftclient import client from swiftclient import client
@ -233,7 +237,7 @@ class TestReconstructorRevert(ECProbeTest):
# fire up reconstructor on handoff nodes only # fire up reconstructor on handoff nodes only
for hnode in hnodes: for hnode in hnodes:
hnode_id = (hnode['port'] - 6000) / 10 hnode_id = (hnode['port'] - 6000) / 10
self.reconstructor.once(number=hnode_id, override_devices=['sdb8']) self.reconstructor.once(number=hnode_id)
# check the first node to make sure its gone # check the first node to make sure its gone
try: try:
@ -253,6 +257,120 @@ class TestReconstructorRevert(ECProbeTest):
self.fail('Node data on %r was not fully destoryed!' % self.fail('Node data on %r was not fully destoryed!' %
(onodes[0])) (onodes[0]))
def test_reconstruct_from_reverted_fragment_archive(self):
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
# get our node lists
opart, onodes = self.object_ring.get_nodes(
self.account, self.container_name, self.object_name)
# find a primary server that only has one of it's devices in the
# primary node list
group_nodes_by_config = defaultdict(list)
for n in onodes:
group_nodes_by_config[self.config_number(n)].append(n)
for config_number, node_list in group_nodes_by_config.items():
if len(node_list) == 1:
break
else:
self.fail('ring balancing did not use all available nodes')
primary_node = node_list[0]
primary_device = self.device_dir('object', primary_node)
self.kill_drive(primary_device)
# PUT object
contents = Body()
etag = client.put_object(self.url, self.token, self.container_name,
self.object_name, contents=contents)
self.assertEqual(contents.etag, etag)
# fix the primary device and sanity GET
self.revive_drive(primary_device)
self.assertEqual(etag, self.proxy_get())
# find a handoff holding the fragment
for hnode in self.object_ring.get_more_nodes(opart):
try:
reverted_fragment_etag = self.direct_get(hnode, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
else:
break
else:
self.fail('Unable to find handoff fragment!')
# we'll force the handoff device to revert instead of potentially
# racing with rebuild by deleting any other fragments that may be on
# the same server
handoff_fragment_etag = None
for node in onodes:
if node['port'] == hnode['port']:
# we'll keep track of the etag of this fragment we're removing
# in case we need it later (queue forshadowing music)...
try:
handoff_fragment_etag = self.direct_get(node, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
# this just means our handoff device was on the same
# machine as the primary!
continue
# use the primary nodes device - not the hnode device
part_dir = self.storage_dir('object', node, part=opart)
shutil.rmtree(part_dir, True)
# revert from handoff device with reconstructor
self.reconstructor.once(number=self.config_number(hnode))
# verify fragment reverted to primary server
self.assertEqual(reverted_fragment_etag,
self.direct_get(primary_node, opart))
# now we'll remove some data on one of the primary node's partners
partner = random.choice(reconstructor._get_partners(
primary_node['index'], onodes))
try:
rebuilt_fragment_etag = self.direct_get(partner, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
# partner already had it's fragment removed
if (handoff_fragment_etag is not None and
hnode['port'] == partner['port']):
# oh, well that makes sense then...
rebuilt_fragment_etag = handoff_fragment_etag
else:
# I wonder what happened?
self.fail('Partner inexplicably missing fragment!')
part_dir = self.storage_dir('object', partner, part=opart)
shutil.rmtree(part_dir, True)
# sanity, it's gone
try:
self.direct_get(partner, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
else:
self.fail('successful GET of removed partner fragment archive!?')
# and force the primary node to do a rebuild
self.reconstructor.once(number=self.config_number(primary_node))
# and validate the partners rebuilt_fragment_etag
try:
self.assertEqual(rebuilt_fragment_etag,
self.direct_get(partner, opart))
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
else:
self.fail('Did not find rebuilt fragment on partner node')
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -293,22 +293,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
writer.commit(timestamp) writer.commit(timestamp)
return df return df
def debug_wtf(self):
# won't include this in the final, just handy reminder of where
# things are...
for pol in [p for p in POLICIES if p.policy_type == EC_POLICY]:
obj_ring = pol.object_ring
for part_num in self.part_nums:
print "\n part_num %s " % part_num
part_nodes = obj_ring.get_part_nodes(int(part_num))
print "\n part_nodes %s " % part_nodes
for local_dev in obj_ring.devs:
partners = self.reconstructor._get_partners(
local_dev['id'], obj_ring, part_num)
if partners:
print "\n local_dev %s \n partners %s " % (local_dev,
partners)
def assert_expected_jobs(self, part_num, jobs): def assert_expected_jobs(self, part_num, jobs):
for job in jobs: for job in jobs:
del job['path'] del job['path']
@ -702,7 +686,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
part_nodes = obj_ring.get_part_nodes(int(part_num)) part_nodes = obj_ring.get_part_nodes(int(part_num))
primary_ids = [n['id'] for n in part_nodes] primary_ids = [n['id'] for n in part_nodes]
for node in part_nodes: for node in part_nodes:
partners = self.reconstructor._get_partners( partners = object_reconstructor._get_partners(
node['index'], part_nodes) node['index'], part_nodes)
left = partners[0]['id'] left = partners[0]['id']
right = partners[1]['id'] right = partners[1]['id']