From 52b102163e48dc04a6a492a3430efa1f7778d7ec Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Wed, 15 Apr 2015 15:31:06 -0700 Subject: [PATCH] Don't apply the wrong Etag validation to rebuilt fragments Because of the object-server's interaction with ssync sender's X-Backend-Replication-Headers when a object (or fragment archive) is pushed unmodified to another node it's ETag value is duped into the recieving ends metadata as Etag. This interacts poorly with the reconstructor's RebuildingECDiskFileStream which can not know ahead of time the ETag of the fragment archive being rebuilt. Don't send the Etag from the local source fragment archive being used as the basis for the rebuilt fragent archive's metadata along to ssync. Change-Id: Ie59ad93a67a7f439c9a84cd9cff31540f97f334a --- swift/obj/reconstructor.py | 34 +++---- test/probe/common.py | 5 + test/probe/test_reconstructor_revert.py | 120 +++++++++++++++++++++++- test/unit/obj/test_reconstructor.py | 18 +--- 4 files changed, 143 insertions(+), 34 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 0ee2afbf6d..db078de2fc 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -49,6 +49,21 @@ SYNC, REVERT = ('sync_only', 'sync_revert') hubs.use_hub(get_hub()) +def _get_partners(frag_index, part_nodes): + """ + Returns the left and right partners of the node whose index is + equal to the given frag_index. + + :param frag_index: a fragment index + :param part_nodes: a list of primary nodes + :returns: [, ] + """ + return [ + part_nodes[(frag_index - 1) % len(part_nodes)], + part_nodes[(frag_index + 1) % len(part_nodes)], + ] + + class RebuildingECDiskFileStream(object): """ This class wraps the the reconstructed fragment archive data and @@ -65,7 +80,8 @@ class RebuildingECDiskFileStream(object): # update the FI and delete the ETag, the obj server will # recalc on the other side... self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index - del self.metadata['ETag'] + for etag_key in ('ETag', 'Etag'): + self.metadata.pop(etag_key, None) self.frag_index = frag_index self.rebuilt_fragment_iter = rebuilt_fragment_iter @@ -382,20 +398,6 @@ class ObjectReconstructor(Daemon): self.kill_coros() self.last_reconstruction_count = self.reconstruction_count - def _get_partners(self, frag_index, part_nodes): - """ - Returns the left and right partners of the node whose index is - equal to the given frag_index. - - :param frag_index: a fragment index - :param part_nodes: a list of primary nodes - :returns: [, ] - """ - return [ - part_nodes[(frag_index - 1) % len(part_nodes)], - part_nodes[(frag_index + 1) % len(part_nodes)], - ] - def _get_hashes(self, policy, path, recalculate=None, do_listdir=False): df_mgr = self._df_router[policy] hashed, suffix_hashes = tpool_reraise( @@ -715,7 +717,7 @@ class ObjectReconstructor(Daemon): job_type=SYNC, frag_index=frag_index, suffixes=suffixes, - sync_to=self._get_partners(frag_index, part_nodes), + sync_to=_get_partners(frag_index, part_nodes), ) # ssync callback to rebuild missing fragment_archives sync_job['sync_diskfile_builder'] = self.reconstruct_fa diff --git a/test/probe/common.py b/test/probe/common.py index 1311cc178a..7d1e754014 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -299,6 +299,11 @@ class ProbeTest(unittest.TestCase): path_parts.append(str(part)) return os.path.join(*path_parts) + def config_number(self, node): + _server_type, config_number = get_server_number( + node['port'], self.port2server) + return config_number + def get_to_final_state(self): # these .stop()s are probably not strictly necessary, # but may prevent race conditions diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py index 2a7bd7c834..39739b617d 100755 --- a/test/probe/test_reconstructor_revert.py +++ b/test/probe/test_reconstructor_revert.py @@ -18,6 +18,9 @@ from hashlib import md5 import unittest import uuid import os +import random +import shutil +from collections import defaultdict from test.probe.common import ECProbeTest @@ -25,6 +28,7 @@ from swift.common import direct_client from swift.common.storage_policy import EC_POLICY from swift.common.manager import Manager from swift.common.utils import renamer +from swift.obj import reconstructor from swiftclient import client @@ -233,7 +237,7 @@ class TestReconstructorRevert(ECProbeTest): # fire up reconstructor on handoff nodes only for hnode in hnodes: hnode_id = (hnode['port'] - 6000) / 10 - self.reconstructor.once(number=hnode_id, override_devices=['sdb8']) + self.reconstructor.once(number=hnode_id) # check the first node to make sure its gone try: @@ -253,6 +257,120 @@ class TestReconstructorRevert(ECProbeTest): self.fail('Node data on %r was not fully destoryed!' % (onodes[0])) + def test_reconstruct_from_reverted_fragment_archive(self): + headers = {'X-Storage-Policy': self.policy.name} + client.put_container(self.url, self.token, self.container_name, + headers=headers) + + # get our node lists + opart, onodes = self.object_ring.get_nodes( + self.account, self.container_name, self.object_name) + + # find a primary server that only has one of it's devices in the + # primary node list + group_nodes_by_config = defaultdict(list) + for n in onodes: + group_nodes_by_config[self.config_number(n)].append(n) + for config_number, node_list in group_nodes_by_config.items(): + if len(node_list) == 1: + break + else: + self.fail('ring balancing did not use all available nodes') + primary_node = node_list[0] + primary_device = self.device_dir('object', primary_node) + self.kill_drive(primary_device) + + # PUT object + contents = Body() + etag = client.put_object(self.url, self.token, self.container_name, + self.object_name, contents=contents) + self.assertEqual(contents.etag, etag) + + # fix the primary device and sanity GET + self.revive_drive(primary_device) + self.assertEqual(etag, self.proxy_get()) + + # find a handoff holding the fragment + for hnode in self.object_ring.get_more_nodes(opart): + try: + reverted_fragment_etag = self.direct_get(hnode, opart) + except direct_client.DirectClientException as err: + if err.http_status != 404: + raise + else: + break + else: + self.fail('Unable to find handoff fragment!') + + # we'll force the handoff device to revert instead of potentially + # racing with rebuild by deleting any other fragments that may be on + # the same server + handoff_fragment_etag = None + for node in onodes: + if node['port'] == hnode['port']: + # we'll keep track of the etag of this fragment we're removing + # in case we need it later (queue forshadowing music)... + try: + handoff_fragment_etag = self.direct_get(node, opart) + except direct_client.DirectClientException as err: + if err.http_status != 404: + raise + # this just means our handoff device was on the same + # machine as the primary! + continue + # use the primary nodes device - not the hnode device + part_dir = self.storage_dir('object', node, part=opart) + shutil.rmtree(part_dir, True) + + # revert from handoff device with reconstructor + self.reconstructor.once(number=self.config_number(hnode)) + + # verify fragment reverted to primary server + self.assertEqual(reverted_fragment_etag, + self.direct_get(primary_node, opart)) + + # now we'll remove some data on one of the primary node's partners + partner = random.choice(reconstructor._get_partners( + primary_node['index'], onodes)) + + try: + rebuilt_fragment_etag = self.direct_get(partner, opart) + except direct_client.DirectClientException as err: + if err.http_status != 404: + raise + # partner already had it's fragment removed + if (handoff_fragment_etag is not None and + hnode['port'] == partner['port']): + # oh, well that makes sense then... + rebuilt_fragment_etag = handoff_fragment_etag + else: + # I wonder what happened? + self.fail('Partner inexplicably missing fragment!') + part_dir = self.storage_dir('object', partner, part=opart) + shutil.rmtree(part_dir, True) + + # sanity, it's gone + try: + self.direct_get(partner, opart) + except direct_client.DirectClientException as err: + if err.http_status != 404: + raise + else: + self.fail('successful GET of removed partner fragment archive!?') + + # and force the primary node to do a rebuild + self.reconstructor.once(number=self.config_number(primary_node)) + + # and validate the partners rebuilt_fragment_etag + try: + self.assertEqual(rebuilt_fragment_etag, + self.direct_get(partner, opart)) + except direct_client.DirectClientException as err: + if err.http_status != 404: + raise + else: + self.fail('Did not find rebuilt fragment on partner node') + if __name__ == "__main__": unittest.main() diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 93a50e84de..b7254f4343 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -293,22 +293,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): writer.commit(timestamp) return df - def debug_wtf(self): - # won't include this in the final, just handy reminder of where - # things are... - for pol in [p for p in POLICIES if p.policy_type == EC_POLICY]: - obj_ring = pol.object_ring - for part_num in self.part_nums: - print "\n part_num %s " % part_num - part_nodes = obj_ring.get_part_nodes(int(part_num)) - print "\n part_nodes %s " % part_nodes - for local_dev in obj_ring.devs: - partners = self.reconstructor._get_partners( - local_dev['id'], obj_ring, part_num) - if partners: - print "\n local_dev %s \n partners %s " % (local_dev, - partners) - def assert_expected_jobs(self, part_num, jobs): for job in jobs: del job['path'] @@ -702,7 +686,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): part_nodes = obj_ring.get_part_nodes(int(part_num)) primary_ids = [n['id'] for n in part_nodes] for node in part_nodes: - partners = self.reconstructor._get_partners( + partners = object_reconstructor._get_partners( node['index'], part_nodes) left = partners[0]['id'] right = partners[1]['id']