EC: reconstruct using non-durable fragments
Previously the reconstructor would only reconstruct a missing fragment when a set of ec_ndata other fragments was available, *all* of which were durable. Since change [1] it has been possible to retrieve non-durable fragments from object servers. This patch changes the reconstructor to take advantage of [1] and use non-durable fragments. A new probe test is added to test scenarios with a mix of failed and non-durable nodes. The existing probe tests in test_reconstructor_rebuild.py and test_reconstructor_durable.py were broken. These were intended to simulate cases where combinations of nodes were either failed or had non-durable fragments, but the test scenarios defined were not actually created - every test scenario broke only one node instead of the intent of breaking multiple nodes. The existing tests have been refactored to re-use most of their setup and assertion code, and merged with the new test into a single class in test_reconstructor_rebuild.py. test_reconstructor_durable.py is removed. [1] Related-Change: I2310981fd1c4622ff5d1a739cbcc59637ffe3fc3 Change-Id: Ic0cdbc7cee657cea0330c2eb1edabe8eb52c0567 Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> Closes-Bug: #1624088
This commit is contained in:
parent
a852b1ecd7
commit
6574ce31ee
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import os
|
||||
from os.path import join
|
||||
import random
|
||||
@ -244,9 +245,15 @@ class ObjectReconstructor(Daemon):
|
||||
# of the node we're rebuilding to within the primary part list
|
||||
fi_to_rebuild = node['index']
|
||||
|
||||
# KISS send out connection requests to all nodes, see what sticks
|
||||
# KISS send out connection requests to all nodes, see what sticks.
|
||||
# Use fragment preferences header to tell other nodes that we want
|
||||
# fragments at the same timestamp as our fragment, and that they don't
|
||||
# need to be durable.
|
||||
headers = self.headers.copy()
|
||||
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||
frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'],
|
||||
'exclude': []}]
|
||||
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
|
||||
pile = GreenAsyncPile(len(part_nodes))
|
||||
path = datafile_metadata['name']
|
||||
for node in part_nodes:
|
||||
|
@ -1,169 +0,0 @@
|
||||
#!/usr/bin/python -u
|
||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||||
# you may not use this file except in compliance with the License.
|
||||
# You may obtain a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import json
|
||||
from hashlib import md5
|
||||
import unittest
|
||||
import uuid
|
||||
import random
|
||||
import os
|
||||
import errno
|
||||
|
||||
from test.probe.common import ECProbeTest
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import EC_POLICY
|
||||
from swift.common.manager import Manager
|
||||
|
||||
from swiftclient import client
|
||||
|
||||
|
||||
class Body(object):
|
||||
|
||||
def __init__(self, total=3.5 * 2 ** 20):
|
||||
self.total = total
|
||||
self.hasher = md5()
|
||||
self.size = 0
|
||||
self.chunk = 'test' * 16 * 2 ** 10
|
||||
|
||||
@property
|
||||
def etag(self):
|
||||
return self.hasher.hexdigest()
|
||||
|
||||
def __iter__(self):
|
||||
return self
|
||||
|
||||
def next(self):
|
||||
if self.size > self.total:
|
||||
raise StopIteration()
|
||||
self.size += len(self.chunk)
|
||||
self.hasher.update(self.chunk)
|
||||
return self.chunk
|
||||
|
||||
def __next__(self):
|
||||
return next(self)
|
||||
|
||||
|
||||
class TestReconstructorPropDurable(ECProbeTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestReconstructorPropDurable, self).setUp()
|
||||
self.container_name = 'container-%s' % uuid.uuid4()
|
||||
self.object_name = 'object-%s' % uuid.uuid4()
|
||||
# sanity
|
||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||
self.reconstructor = Manager(["object-reconstructor"])
|
||||
|
||||
def direct_get(self, node, part):
|
||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||
headers, data = direct_client.direct_get_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers=req_headers,
|
||||
resp_chunk_size=64 * 2 ** 20)
|
||||
hasher = md5()
|
||||
for chunk in data:
|
||||
hasher.update(chunk)
|
||||
return headers, hasher.hexdigest()
|
||||
|
||||
def _check_node(self, node, part, etag, headers_post):
|
||||
# get fragment archive etag
|
||||
headers, fragment_archive_etag = self.direct_get(node, part)
|
||||
self.assertIn('X-Backend-Durable-Timestamp', headers) # sanity check
|
||||
durable_timestamp = headers['X-Backend-Durable-Timestamp']
|
||||
|
||||
# make the data file non-durable on the selected node
|
||||
part_dir = self.storage_dir('object', node, part=part)
|
||||
for dirs, subdirs, files in os.walk(part_dir):
|
||||
for fname in files:
|
||||
if fname.endswith('.data'):
|
||||
non_durable_fname = fname.replace('#d', '')
|
||||
os.rename(os.path.join(dirs, fname),
|
||||
os.path.join(dirs, non_durable_fname))
|
||||
try:
|
||||
os.remove(os.path.join(part_dir, 'hashes.pkl'))
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
# sanity check that fragment is no longer durable
|
||||
headers = direct_client.direct_head_object(
|
||||
node, part, self.account, self.container_name, self.object_name,
|
||||
headers={'X-Backend-Storage-Policy-Index': int(self.policy),
|
||||
'X-Backend-Fragment-Preferences': json.dumps([])})
|
||||
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
|
||||
|
||||
# fire up reconstructor to propagate durable state
|
||||
self.reconstructor.once()
|
||||
|
||||
# fragment is still exactly as it was before!
|
||||
headers, fragment_archive_etag_2 = self.direct_get(node, part)
|
||||
self.assertEqual(fragment_archive_etag, fragment_archive_etag_2)
|
||||
self.assertIn('X-Backend-Durable-Timestamp', headers)
|
||||
self.assertEqual(durable_timestamp,
|
||||
headers['X-Backend-Durable-Timestamp'])
|
||||
|
||||
# check meta
|
||||
meta = client.head_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name)
|
||||
for key in headers_post:
|
||||
self.assertIn(key, meta)
|
||||
self.assertEqual(meta[key], headers_post[key])
|
||||
|
||||
def _format_node(self, node):
|
||||
return '%s#%s' % (node['device'], node['index'])
|
||||
|
||||
def test_main(self):
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers=headers)
|
||||
|
||||
# PUT object
|
||||
contents = Body()
|
||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
||||
headers_post = {'x-object-meta-bar': 'meta-bar'}
|
||||
|
||||
etag = client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents, headers=headers)
|
||||
client.post_object(self.url, self.token, self.container_name,
|
||||
self.object_name, headers=headers_post)
|
||||
del headers_post['X-Auth-Token'] # WTF, where did this come from?
|
||||
|
||||
# built up a list of node lists to make non-durable,
|
||||
# first try a single node
|
||||
# then adjacent nodes and then nodes >1 node apart
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
single_node = [random.choice(onodes)]
|
||||
adj_nodes = [onodes[0], onodes[-1]]
|
||||
far_nodes = [onodes[0], onodes[-2]]
|
||||
test_list = [single_node, adj_nodes, far_nodes]
|
||||
|
||||
for node_list in test_list:
|
||||
for onode in node_list:
|
||||
try:
|
||||
self._check_node(onode, opart, etag, headers_post)
|
||||
except AssertionError as e:
|
||||
self.fail(
|
||||
str(e) + '\n... for node %r of scenario %r' % (
|
||||
self._format_node(onode),
|
||||
[self._format_node(n) for n in node_list]))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
@ -14,12 +14,16 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import errno
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
from hashlib import md5
|
||||
import unittest
|
||||
import uuid
|
||||
import shutil
|
||||
import random
|
||||
from collections import defaultdict
|
||||
import os
|
||||
|
||||
from test.probe.common import ECProbeTest
|
||||
|
||||
@ -28,7 +32,7 @@ from swift.common.storage_policy import EC_POLICY
|
||||
from swift.common.manager import Manager
|
||||
from swift.obj.reconstructor import _get_partners
|
||||
|
||||
from swiftclient import client
|
||||
from swiftclient import client, ClientException
|
||||
|
||||
|
||||
class Body(object):
|
||||
@ -67,6 +71,34 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||
self.reconstructor = Manager(["object-reconstructor"])
|
||||
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers=headers)
|
||||
|
||||
# PUT object and POST some metadata
|
||||
contents = Body()
|
||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
||||
self.headers_post = {'x-object-meta-bar': 'meta-bar'}
|
||||
|
||||
self.etag = client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents, headers=headers)
|
||||
client.post_object(self.url, self.token, self.container_name,
|
||||
self.object_name, headers=dict(self.headers_post))
|
||||
|
||||
self.opart, self.onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
|
||||
# stash frag etags and metadata for later comparison
|
||||
self.frag_headers, self.frag_etags = self._assert_all_nodes_have_frag()
|
||||
for node_index, hdrs in self.frag_headers.items():
|
||||
# sanity check
|
||||
self.assertIn(
|
||||
'X-Backend-Durable-Timestamp', hdrs,
|
||||
'Missing durable timestamp in %r' % self.frag_headers)
|
||||
|
||||
def proxy_get(self):
|
||||
# GET object
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
@ -76,10 +108,13 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
resp_checksum = md5()
|
||||
for chunk in body:
|
||||
resp_checksum.update(chunk)
|
||||
return resp_checksum.hexdigest()
|
||||
return headers, resp_checksum.hexdigest()
|
||||
|
||||
def direct_get(self, node, part):
|
||||
def direct_get(self, node, part, require_durable=True):
|
||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||
if not require_durable:
|
||||
req_headers.update(
|
||||
{'X-Backend-Fragment-Preferences': json.dumps([])})
|
||||
headers, data = direct_client.direct_get_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers=req_headers,
|
||||
@ -87,106 +122,168 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
hasher = md5()
|
||||
for chunk in data:
|
||||
hasher.update(chunk)
|
||||
return hasher.hexdigest()
|
||||
return headers, hasher.hexdigest()
|
||||
|
||||
def _check_node(self, node, part, etag, headers_post):
|
||||
# get fragment archive etag
|
||||
fragment_archive_etag = self.direct_get(node, part)
|
||||
|
||||
# remove data from the selected node
|
||||
part_dir = self.storage_dir('object', node, part=part)
|
||||
shutil.rmtree(part_dir, True)
|
||||
|
||||
# this node can't servce the data any more
|
||||
try:
|
||||
self.direct_get(node, part)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destoryed!' %
|
||||
(node,))
|
||||
|
||||
# make sure we can still GET the object and its correct, the
|
||||
# proxy is doing decode on remaining fragments to get the obj
|
||||
self.assertEqual(etag, self.proxy_get())
|
||||
|
||||
# fire up reconstructor
|
||||
self.reconstructor.once()
|
||||
|
||||
# fragment is rebuilt exactly as it was before!
|
||||
self.assertEqual(fragment_archive_etag,
|
||||
self.direct_get(node, part))
|
||||
|
||||
# check meta
|
||||
meta = client.head_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name)
|
||||
for key in headers_post:
|
||||
self.assertIn(key, meta)
|
||||
self.assertEqual(meta[key], headers_post[key])
|
||||
def _break_nodes(self, failed, non_durable):
|
||||
# delete partitions on the failed nodes and remove durable marker from
|
||||
# non-durable nodes
|
||||
for i, node in enumerate(self.onodes):
|
||||
part_dir = self.storage_dir('object', node, part=self.opart)
|
||||
if i in failed:
|
||||
shutil.rmtree(part_dir, True)
|
||||
try:
|
||||
self.direct_get(node, self.opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
elif i in non_durable:
|
||||
for dirs, subdirs, files in os.walk(part_dir):
|
||||
for fname in files:
|
||||
if fname.endswith('.data'):
|
||||
non_durable_fname = fname.replace('#d', '')
|
||||
os.rename(os.path.join(dirs, fname),
|
||||
os.path.join(dirs, non_durable_fname))
|
||||
break
|
||||
headers, etag = self.direct_get(node, self.opart,
|
||||
require_durable=False)
|
||||
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
|
||||
try:
|
||||
os.remove(os.path.join(part_dir, 'hashes.pkl'))
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def _format_node(self, node):
|
||||
return '%s#%s' % (node['device'], node['index'])
|
||||
|
||||
def test_main(self):
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers=headers)
|
||||
def _assert_all_nodes_have_frag(self):
|
||||
# check all frags are in place
|
||||
failures = []
|
||||
frag_etags = {}
|
||||
frag_headers = {}
|
||||
for node in self.onodes:
|
||||
try:
|
||||
headers, etag = self.direct_get(node, self.opart)
|
||||
frag_etags[node['index']] = etag
|
||||
del headers['Date'] # Date header will vary so remove it
|
||||
frag_headers[node['index']] = headers
|
||||
except direct_client.DirectClientException as err:
|
||||
failures.append((node, err))
|
||||
if failures:
|
||||
self.fail('\n'.join([' Node %r raised %r' %
|
||||
(self._format_node(node), exc)
|
||||
for (node, exc) in failures]))
|
||||
return frag_headers, frag_etags
|
||||
|
||||
# PUT object
|
||||
contents = Body()
|
||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
||||
headers_post = {'x-object-meta-bar': 'meta-bar'}
|
||||
@contextmanager
|
||||
def _annotate_failure_with_scenario(self, failed, non_durable):
|
||||
try:
|
||||
yield
|
||||
except (AssertionError, ClientException) as err:
|
||||
self.fail(
|
||||
'Scenario with failed nodes: %r, non-durable nodes: %r\n'
|
||||
' failed with:\n%s' %
|
||||
([self._format_node(self.onodes[n]) for n in failed],
|
||||
[self._format_node(self.onodes[n]) for n in non_durable], err)
|
||||
)
|
||||
|
||||
etag = client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents, headers=headers)
|
||||
client.post_object(self.url, self.token, self.container_name,
|
||||
self.object_name, headers=headers_post)
|
||||
del headers_post['X-Auth-Token'] # WTF, where did this come from?
|
||||
def _test_rebuild_scenario(self, failed, non_durable,
|
||||
reconstructor_cycles):
|
||||
# helper method to test a scenario with some nodes missing their
|
||||
# fragment and some nodes having non-durable fragments
|
||||
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||
self._break_nodes(failed, non_durable)
|
||||
|
||||
# built up a list of node lists to kill data from,
|
||||
# make sure we can still GET the object and it is correct; the
|
||||
# proxy is doing decode on remaining fragments to get the obj
|
||||
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||
headers, etag = self.proxy_get()
|
||||
self.assertEqual(self.etag, etag)
|
||||
for key in self.headers_post:
|
||||
self.assertIn(key, headers)
|
||||
self.assertEqual(self.headers_post[key], headers[key])
|
||||
|
||||
# fire up reconstructor
|
||||
for i in range(reconstructor_cycles):
|
||||
self.reconstructor.once()
|
||||
|
||||
# check GET via proxy returns expected data and metadata
|
||||
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||
headers, etag = self.proxy_get()
|
||||
self.assertEqual(self.etag, etag)
|
||||
for key in self.headers_post:
|
||||
self.assertIn(key, headers)
|
||||
self.assertEqual(self.headers_post[key], headers[key])
|
||||
# check all frags are intact, durable and have expected metadata
|
||||
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||
frag_headers, frag_etags = self._assert_all_nodes_have_frag()
|
||||
self.assertEqual(self.frag_etags, frag_etags)
|
||||
# self._frag_headers include X-Backend-Durable-Timestamp so this
|
||||
# assertion confirms that the rebuilt frags are all durable
|
||||
self.assertEqual(self.frag_headers, frag_headers)
|
||||
|
||||
def test_rebuild_missing_frags(self):
|
||||
# build up a list of node lists to kill data from,
|
||||
# first try a single node
|
||||
# then adjacent nodes and then nodes >1 node apart
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
single_node = [random.choice(onodes)]
|
||||
adj_nodes = [onodes[0], onodes[-1]]
|
||||
far_nodes = [onodes[0], onodes[-2]]
|
||||
test_list = [single_node, adj_nodes, far_nodes]
|
||||
single_node = (random.randint(0, 5),)
|
||||
adj_nodes = (0, 5)
|
||||
far_nodes = (0, 4)
|
||||
|
||||
for node_list in test_list:
|
||||
for onode in node_list:
|
||||
try:
|
||||
self._check_node(onode, opart, etag, headers_post)
|
||||
except AssertionError as e:
|
||||
self.fail(
|
||||
str(e) + '\n... for node %r of scenario %r' % (
|
||||
self._format_node(onode),
|
||||
[self._format_node(n) for n in node_list]))
|
||||
for failed_nodes in [single_node, adj_nodes, far_nodes]:
|
||||
self._test_rebuild_scenario(failed_nodes, [], 1)
|
||||
|
||||
def test_rebuild_non_durable_frags(self):
|
||||
# build up a list of node lists to make non-durable,
|
||||
# first try a single node
|
||||
# then adjacent nodes and then nodes >1 node apart
|
||||
single_node = (random.randint(0, 5),)
|
||||
adj_nodes = (0, 5)
|
||||
far_nodes = (0, 4)
|
||||
|
||||
for non_durable_nodes in [single_node, adj_nodes, far_nodes]:
|
||||
self._test_rebuild_scenario([], non_durable_nodes, 1)
|
||||
|
||||
def test_rebuild_with_missing_frags_and_non_durable_frags(self):
|
||||
# pick some nodes with parts deleted, some with non-durable fragments
|
||||
scenarios = [
|
||||
# failed, non-durable
|
||||
((0, 2), (4,)),
|
||||
((0, 4), (2,)),
|
||||
]
|
||||
for failed, non_durable in scenarios:
|
||||
self._test_rebuild_scenario(failed, non_durable, 3)
|
||||
scenarios = [
|
||||
# failed, non-durable
|
||||
((0, 1), (2,)),
|
||||
((0, 2), (1,)),
|
||||
]
|
||||
for failed, non_durable in scenarios:
|
||||
# why 2 repeats? consider missing fragment on nodes 0, 1 and
|
||||
# missing durable on node 2: first reconstructor cycle on node 3
|
||||
# will make node 2 durable, first cycle on node 5 will rebuild on
|
||||
# node 0; second cycle on node 0 or 2 will rebuild on node 1. Note
|
||||
# that it is possible, that reconstructor processes on each node
|
||||
# run in order such that all rebuild complete in once cycle, but
|
||||
# that is not guaranteed, we allow 2 cycles to be sure.
|
||||
self._test_rebuild_scenario(failed, non_durable, 2)
|
||||
scenarios = [
|
||||
# failed, non-durable
|
||||
((0, 2), (1, 3, 5)),
|
||||
((0,), (1, 2, 4, 5)),
|
||||
]
|
||||
for failed, non_durable in scenarios:
|
||||
# why 3 repeats? consider missing fragment on node 0 and single
|
||||
# durable on node 3: first reconstructor cycle on node 3 will make
|
||||
# nodes 2 and 4 durable, second cycle on nodes 2 and 4 will make
|
||||
# node 1 and 5 durable, third cycle on nodes 1 or 5 will
|
||||
# reconstruct the missing fragment on node 0.
|
||||
self._test_rebuild_scenario(failed, non_durable, 3)
|
||||
|
||||
def test_rebuild_partner_down(self):
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
headers=headers)
|
||||
|
||||
# PUT object
|
||||
contents = Body()
|
||||
client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents)
|
||||
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
|
||||
# find a primary server that only has one of it's devices in the
|
||||
# primary node list
|
||||
group_nodes_by_config = defaultdict(list)
|
||||
for n in onodes:
|
||||
for n in self.onodes:
|
||||
group_nodes_by_config[self.config_number(n)].append(n)
|
||||
for config_number, node_list in group_nodes_by_config.items():
|
||||
if len(node_list) == 1:
|
||||
@ -197,27 +294,32 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
|
||||
# pick one it's partners to fail randomly
|
||||
partner_node = random.choice(_get_partners(
|
||||
primary_node['index'], onodes))
|
||||
primary_node['index'], self.onodes))
|
||||
|
||||
# 507 the partner device
|
||||
device_path = self.device_dir('object', partner_node)
|
||||
self.kill_drive(device_path)
|
||||
|
||||
# select another primary sync_to node to fail
|
||||
failed_primary = [n for n in onodes if n['id'] not in
|
||||
failed_primary = [n for n in self.onodes if n['id'] not in
|
||||
(primary_node['id'], partner_node['id'])][0]
|
||||
# ... capture it's fragment etag
|
||||
failed_primary_etag = self.direct_get(failed_primary, opart)
|
||||
failed_primary_meta, failed_primary_etag = self.direct_get(
|
||||
failed_primary, self.opart)
|
||||
# ... and delete it
|
||||
part_dir = self.storage_dir('object', failed_primary, part=opart)
|
||||
part_dir = self.storage_dir('object', failed_primary, part=self.opart)
|
||||
shutil.rmtree(part_dir, True)
|
||||
|
||||
# reconstruct from the primary, while one of it's partners is 507'd
|
||||
self.reconstructor.once(number=self.config_number(primary_node))
|
||||
|
||||
# the other failed primary will get it's fragment rebuilt instead
|
||||
self.assertEqual(failed_primary_etag,
|
||||
self.direct_get(failed_primary, opart))
|
||||
failed_primary_meta_new, failed_primary_etag_new = self.direct_get(
|
||||
failed_primary, self.opart)
|
||||
del failed_primary_meta['Date']
|
||||
del failed_primary_meta_new['Date']
|
||||
self.assertEqual(failed_primary_etag, failed_primary_etag_new)
|
||||
self.assertEqual(failed_primary_meta, failed_primary_meta_new)
|
||||
|
||||
# just to be nice
|
||||
self.revive_drive(device_path)
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
import itertools
|
||||
import json
|
||||
import unittest
|
||||
import os
|
||||
from hashlib import md5
|
||||
@ -2508,6 +2509,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': '0',
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||
@ -2541,16 +2543,24 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
job, node, metadata)
|
||||
self.assertEqual(0, df.content_length)
|
||||
fixed_body = ''.join(df.reader())
|
||||
self.assertEqual(len(fixed_body), len(broken_body))
|
||||
self.assertEqual(md5(fixed_body).hexdigest(),
|
||||
md5(broken_body).hexdigest())
|
||||
for called_header in called_headers:
|
||||
called_header = HeaderKeyDict(called_header)
|
||||
self.assertTrue('Content-Length' in called_header)
|
||||
self.assertEqual(called_header['Content-Length'], '0')
|
||||
self.assertTrue('User-Agent' in called_header)
|
||||
user_agent = called_header['User-Agent']
|
||||
self.assertTrue(user_agent.startswith('obj-reconstructor'))
|
||||
self.assertEqual(len(fixed_body), len(broken_body))
|
||||
self.assertEqual(md5(fixed_body).hexdigest(),
|
||||
md5(broken_body).hexdigest())
|
||||
self.assertEqual(len(part_nodes) - 1, len(called_headers))
|
||||
for called_header in called_headers:
|
||||
called_header = HeaderKeyDict(called_header)
|
||||
self.assertIn('Content-Length', called_header)
|
||||
self.assertEqual(called_header['Content-Length'], '0')
|
||||
self.assertIn('User-Agent', called_header)
|
||||
user_agent = called_header['User-Agent']
|
||||
self.assertTrue(user_agent.startswith('obj-reconstructor'))
|
||||
self.assertIn('X-Backend-Storage-Policy-Index', called_header)
|
||||
self.assertEqual(called_header['X-Backend-Storage-Policy-Index'],
|
||||
self.policy)
|
||||
self.assertIn('X-Backend-Fragment-Preferences', called_header)
|
||||
self.assertEqual(
|
||||
[{'timestamp': '1234567890.12345', 'exclude': []}],
|
||||
json.loads(called_header['X-Backend-Fragment-Preferences']))
|
||||
|
||||
def test_reconstruct_fa_errors_works(self):
|
||||
job = {
|
||||
@ -2563,6 +2573,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||
@ -2604,6 +2615,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
# make up some data (trim some amount to make it unaligned with
|
||||
@ -2647,6 +2659,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
possible_errors = [404, Timeout(), Exception('kaboom!')]
|
||||
@ -2667,6 +2680,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||
@ -2717,6 +2731,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||
@ -2768,6 +2783,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||
@ -2809,6 +2825,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
||||
'name': '/a/c/o',
|
||||
'Content-Length': 0,
|
||||
'ETag': 'etag',
|
||||
'X-Timestamp': '1234567890.12345'
|
||||
}
|
||||
|
||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||
|
Loading…
Reference in New Issue
Block a user