Don't apply the wrong Etag validation to rebuilt fragments

Because of the object-server's interaction with ssync sender's
X-Backend-Replication-Headers when a object (or fragment archive) is
pushed unmodified to another node it's ETag value is duped into the
recieving ends metadata as Etag.  This interacts poorly with the
reconstructor's RebuildingECDiskFileStream which can not know ahead of
time the ETag of the fragment archive being rebuilt.

Don't send the Etag from the local source fragment archive being used as
the basis for the rebuilt fragent archive's metadata along to ssync.

Closes-Bug: 1446800
Change-Id: Ie59ad93a67a7f439c9a84cd9cff31540f97f334a
This commit is contained in:
Clay Gerrard 2015-04-15 15:31:06 -07:00 committed by John Dickinson
parent f6482bdece
commit 51e31c5c71
4 changed files with 143 additions and 34 deletions

View File

@ -49,6 +49,21 @@ SYNC, REVERT = ('sync_only', 'sync_revert')
hubs.use_hub(get_hub())
def _get_partners(frag_index, part_nodes):
"""
Returns the left and right partners of the node whose index is
equal to the given frag_index.
:param frag_index: a fragment index
:param part_nodes: a list of primary nodes
:returns: [<node-to-left>, <node-to-right>]
"""
return [
part_nodes[(frag_index - 1) % len(part_nodes)],
part_nodes[(frag_index + 1) % len(part_nodes)],
]
class RebuildingECDiskFileStream(object):
"""
This class wraps the the reconstructed fragment archive data and
@ -65,7 +80,8 @@ class RebuildingECDiskFileStream(object):
# update the FI and delete the ETag, the obj server will
# recalc on the other side...
self.metadata['X-Object-Sysmeta-Ec-Frag-Index'] = frag_index
del self.metadata['ETag']
for etag_key in ('ETag', 'Etag'):
self.metadata.pop(etag_key, None)
self.frag_index = frag_index
self.rebuilt_fragment_iter = rebuilt_fragment_iter
@ -382,20 +398,6 @@ class ObjectReconstructor(Daemon):
self.kill_coros()
self.last_reconstruction_count = self.reconstruction_count
def _get_partners(self, frag_index, part_nodes):
"""
Returns the left and right partners of the node whose index is
equal to the given frag_index.
:param frag_index: a fragment index
:param part_nodes: a list of primary nodes
:returns: [<node-to-left>, <node-to-right>]
"""
return [
part_nodes[(frag_index - 1) % len(part_nodes)],
part_nodes[(frag_index + 1) % len(part_nodes)],
]
def _get_hashes(self, policy, path, recalculate=None, do_listdir=False):
df_mgr = self._df_router[policy]
hashed, suffix_hashes = tpool_reraise(
@ -715,7 +717,7 @@ class ObjectReconstructor(Daemon):
job_type=SYNC,
frag_index=frag_index,
suffixes=suffixes,
sync_to=self._get_partners(frag_index, part_nodes),
sync_to=_get_partners(frag_index, part_nodes),
)
# ssync callback to rebuild missing fragment_archives
sync_job['sync_diskfile_builder'] = self.reconstruct_fa

View File

@ -299,6 +299,11 @@ class ProbeTest(unittest.TestCase):
path_parts.append(str(part))
return os.path.join(*path_parts)
def config_number(self, node):
_server_type, config_number = get_server_number(
node['port'], self.port2server)
return config_number
def get_to_final_state(self):
# these .stop()s are probably not strictly necessary,
# but may prevent race conditions

View File

@ -18,6 +18,9 @@ from hashlib import md5
import unittest
import uuid
import os
import random
import shutil
from collections import defaultdict
from test.probe.common import ECProbeTest
@ -25,6 +28,7 @@ from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY
from swift.common.manager import Manager
from swift.common.utils import renamer
from swift.obj import reconstructor
from swiftclient import client
@ -233,7 +237,7 @@ class TestReconstructorRevert(ECProbeTest):
# fire up reconstructor on handoff nodes only
for hnode in hnodes:
hnode_id = (hnode['port'] - 6000) / 10
self.reconstructor.once(number=hnode_id, override_devices=['sdb8'])
self.reconstructor.once(number=hnode_id)
# check the first node to make sure its gone
try:
@ -253,6 +257,120 @@ class TestReconstructorRevert(ECProbeTest):
self.fail('Node data on %r was not fully destoryed!' %
(onodes[0]))
def test_reconstruct_from_reverted_fragment_archive(self):
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
# get our node lists
opart, onodes = self.object_ring.get_nodes(
self.account, self.container_name, self.object_name)
# find a primary server that only has one of it's devices in the
# primary node list
group_nodes_by_config = defaultdict(list)
for n in onodes:
group_nodes_by_config[self.config_number(n)].append(n)
for config_number, node_list in group_nodes_by_config.items():
if len(node_list) == 1:
break
else:
self.fail('ring balancing did not use all available nodes')
primary_node = node_list[0]
primary_device = self.device_dir('object', primary_node)
self.kill_drive(primary_device)
# PUT object
contents = Body()
etag = client.put_object(self.url, self.token, self.container_name,
self.object_name, contents=contents)
self.assertEqual(contents.etag, etag)
# fix the primary device and sanity GET
self.revive_drive(primary_device)
self.assertEqual(etag, self.proxy_get())
# find a handoff holding the fragment
for hnode in self.object_ring.get_more_nodes(opart):
try:
reverted_fragment_etag = self.direct_get(hnode, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
else:
break
else:
self.fail('Unable to find handoff fragment!')
# we'll force the handoff device to revert instead of potentially
# racing with rebuild by deleting any other fragments that may be on
# the same server
handoff_fragment_etag = None
for node in onodes:
if node['port'] == hnode['port']:
# we'll keep track of the etag of this fragment we're removing
# in case we need it later (queue forshadowing music)...
try:
handoff_fragment_etag = self.direct_get(node, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
# this just means our handoff device was on the same
# machine as the primary!
continue
# use the primary nodes device - not the hnode device
part_dir = self.storage_dir('object', node, part=opart)
shutil.rmtree(part_dir, True)
# revert from handoff device with reconstructor
self.reconstructor.once(number=self.config_number(hnode))
# verify fragment reverted to primary server
self.assertEqual(reverted_fragment_etag,
self.direct_get(primary_node, opart))
# now we'll remove some data on one of the primary node's partners
partner = random.choice(reconstructor._get_partners(
primary_node['index'], onodes))
try:
rebuilt_fragment_etag = self.direct_get(partner, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
# partner already had it's fragment removed
if (handoff_fragment_etag is not None and
hnode['port'] == partner['port']):
# oh, well that makes sense then...
rebuilt_fragment_etag = handoff_fragment_etag
else:
# I wonder what happened?
self.fail('Partner inexplicably missing fragment!')
part_dir = self.storage_dir('object', partner, part=opart)
shutil.rmtree(part_dir, True)
# sanity, it's gone
try:
self.direct_get(partner, opart)
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
else:
self.fail('successful GET of removed partner fragment archive!?')
# and force the primary node to do a rebuild
self.reconstructor.once(number=self.config_number(primary_node))
# and validate the partners rebuilt_fragment_etag
try:
self.assertEqual(rebuilt_fragment_etag,
self.direct_get(partner, opart))
except direct_client.DirectClientException as err:
if err.http_status != 404:
raise
else:
self.fail('Did not find rebuilt fragment on partner node')
if __name__ == "__main__":
unittest.main()

View File

@ -293,22 +293,6 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
writer.commit(timestamp)
return df
def debug_wtf(self):
# won't include this in the final, just handy reminder of where
# things are...
for pol in [p for p in POLICIES if p.policy_type == EC_POLICY]:
obj_ring = pol.object_ring
for part_num in self.part_nums:
print "\n part_num %s " % part_num
part_nodes = obj_ring.get_part_nodes(int(part_num))
print "\n part_nodes %s " % part_nodes
for local_dev in obj_ring.devs:
partners = self.reconstructor._get_partners(
local_dev['id'], obj_ring, part_num)
if partners:
print "\n local_dev %s \n partners %s " % (local_dev,
partners)
def assert_expected_jobs(self, part_num, jobs):
for job in jobs:
del job['path']
@ -702,7 +686,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
part_nodes = obj_ring.get_part_nodes(int(part_num))
primary_ids = [n['id'] for n in part_nodes]
for node in part_nodes:
partners = self.reconstructor._get_partners(
partners = object_reconstructor._get_partners(
node['index'], part_nodes)
left = partners[0]['id']
right = partners[1]['id']