Merge "EC: reconstruct using non-durable fragments"
This commit is contained in:
commit
78527aaf7a
@ -13,6 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import json
|
||||||
import os
|
import os
|
||||||
from os.path import join
|
from os.path import join
|
||||||
import random
|
import random
|
||||||
@ -244,9 +245,15 @@ class ObjectReconstructor(Daemon):
|
|||||||
# of the node we're rebuilding to within the primary part list
|
# of the node we're rebuilding to within the primary part list
|
||||||
fi_to_rebuild = node['index']
|
fi_to_rebuild = node['index']
|
||||||
|
|
||||||
# KISS send out connection requests to all nodes, see what sticks
|
# KISS send out connection requests to all nodes, see what sticks.
|
||||||
|
# Use fragment preferences header to tell other nodes that we want
|
||||||
|
# fragments at the same timestamp as our fragment, and that they don't
|
||||||
|
# need to be durable.
|
||||||
headers = self.headers.copy()
|
headers = self.headers.copy()
|
||||||
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||||
|
frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'],
|
||||||
|
'exclude': []}]
|
||||||
|
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
|
||||||
pile = GreenAsyncPile(len(part_nodes))
|
pile = GreenAsyncPile(len(part_nodes))
|
||||||
path = datafile_metadata['name']
|
path = datafile_metadata['name']
|
||||||
for node in part_nodes:
|
for node in part_nodes:
|
||||||
|
@ -1,169 +0,0 @@
|
|||||||
#!/usr/bin/python -u
|
|
||||||
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
||||||
#
|
|
||||||
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
# you may not use this file except in compliance with the License.
|
|
||||||
# You may obtain a copy of the License at
|
|
||||||
#
|
|
||||||
# http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
#
|
|
||||||
# Unless required by applicable law or agreed to in writing, software
|
|
||||||
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
||||||
# implied.
|
|
||||||
# See the License for the specific language governing permissions and
|
|
||||||
# limitations under the License.
|
|
||||||
import json
|
|
||||||
from hashlib import md5
|
|
||||||
import unittest
|
|
||||||
import uuid
|
|
||||||
import random
|
|
||||||
import os
|
|
||||||
import errno
|
|
||||||
|
|
||||||
from test.probe.common import ECProbeTest
|
|
||||||
|
|
||||||
from swift.common import direct_client
|
|
||||||
from swift.common.storage_policy import EC_POLICY
|
|
||||||
from swift.common.manager import Manager
|
|
||||||
|
|
||||||
from swiftclient import client
|
|
||||||
|
|
||||||
|
|
||||||
class Body(object):
|
|
||||||
|
|
||||||
def __init__(self, total=3.5 * 2 ** 20):
|
|
||||||
self.total = total
|
|
||||||
self.hasher = md5()
|
|
||||||
self.size = 0
|
|
||||||
self.chunk = 'test' * 16 * 2 ** 10
|
|
||||||
|
|
||||||
@property
|
|
||||||
def etag(self):
|
|
||||||
return self.hasher.hexdigest()
|
|
||||||
|
|
||||||
def __iter__(self):
|
|
||||||
return self
|
|
||||||
|
|
||||||
def next(self):
|
|
||||||
if self.size > self.total:
|
|
||||||
raise StopIteration()
|
|
||||||
self.size += len(self.chunk)
|
|
||||||
self.hasher.update(self.chunk)
|
|
||||||
return self.chunk
|
|
||||||
|
|
||||||
def __next__(self):
|
|
||||||
return next(self)
|
|
||||||
|
|
||||||
|
|
||||||
class TestReconstructorPropDurable(ECProbeTest):
|
|
||||||
|
|
||||||
def setUp(self):
|
|
||||||
super(TestReconstructorPropDurable, self).setUp()
|
|
||||||
self.container_name = 'container-%s' % uuid.uuid4()
|
|
||||||
self.object_name = 'object-%s' % uuid.uuid4()
|
|
||||||
# sanity
|
|
||||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
|
||||||
self.reconstructor = Manager(["object-reconstructor"])
|
|
||||||
|
|
||||||
def direct_get(self, node, part):
|
|
||||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
|
||||||
headers, data = direct_client.direct_get_object(
|
|
||||||
node, part, self.account, self.container_name,
|
|
||||||
self.object_name, headers=req_headers,
|
|
||||||
resp_chunk_size=64 * 2 ** 20)
|
|
||||||
hasher = md5()
|
|
||||||
for chunk in data:
|
|
||||||
hasher.update(chunk)
|
|
||||||
return headers, hasher.hexdigest()
|
|
||||||
|
|
||||||
def _check_node(self, node, part, etag, headers_post):
|
|
||||||
# get fragment archive etag
|
|
||||||
headers, fragment_archive_etag = self.direct_get(node, part)
|
|
||||||
self.assertIn('X-Backend-Durable-Timestamp', headers) # sanity check
|
|
||||||
durable_timestamp = headers['X-Backend-Durable-Timestamp']
|
|
||||||
|
|
||||||
# make the data file non-durable on the selected node
|
|
||||||
part_dir = self.storage_dir('object', node, part=part)
|
|
||||||
for dirs, subdirs, files in os.walk(part_dir):
|
|
||||||
for fname in files:
|
|
||||||
if fname.endswith('.data'):
|
|
||||||
non_durable_fname = fname.replace('#d', '')
|
|
||||||
os.rename(os.path.join(dirs, fname),
|
|
||||||
os.path.join(dirs, non_durable_fname))
|
|
||||||
try:
|
|
||||||
os.remove(os.path.join(part_dir, 'hashes.pkl'))
|
|
||||||
except OSError as e:
|
|
||||||
if e.errno != errno.ENOENT:
|
|
||||||
raise
|
|
||||||
|
|
||||||
# sanity check that fragment is no longer durable
|
|
||||||
headers = direct_client.direct_head_object(
|
|
||||||
node, part, self.account, self.container_name, self.object_name,
|
|
||||||
headers={'X-Backend-Storage-Policy-Index': int(self.policy),
|
|
||||||
'X-Backend-Fragment-Preferences': json.dumps([])})
|
|
||||||
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
|
|
||||||
|
|
||||||
# fire up reconstructor to propagate durable state
|
|
||||||
self.reconstructor.once()
|
|
||||||
|
|
||||||
# fragment is still exactly as it was before!
|
|
||||||
headers, fragment_archive_etag_2 = self.direct_get(node, part)
|
|
||||||
self.assertEqual(fragment_archive_etag, fragment_archive_etag_2)
|
|
||||||
self.assertIn('X-Backend-Durable-Timestamp', headers)
|
|
||||||
self.assertEqual(durable_timestamp,
|
|
||||||
headers['X-Backend-Durable-Timestamp'])
|
|
||||||
|
|
||||||
# check meta
|
|
||||||
meta = client.head_object(self.url, self.token,
|
|
||||||
self.container_name,
|
|
||||||
self.object_name)
|
|
||||||
for key in headers_post:
|
|
||||||
self.assertIn(key, meta)
|
|
||||||
self.assertEqual(meta[key], headers_post[key])
|
|
||||||
|
|
||||||
def _format_node(self, node):
|
|
||||||
return '%s#%s' % (node['device'], node['index'])
|
|
||||||
|
|
||||||
def test_main(self):
|
|
||||||
# create EC container
|
|
||||||
headers = {'X-Storage-Policy': self.policy.name}
|
|
||||||
client.put_container(self.url, self.token, self.container_name,
|
|
||||||
headers=headers)
|
|
||||||
|
|
||||||
# PUT object
|
|
||||||
contents = Body()
|
|
||||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
|
||||||
headers_post = {'x-object-meta-bar': 'meta-bar'}
|
|
||||||
|
|
||||||
etag = client.put_object(self.url, self.token,
|
|
||||||
self.container_name,
|
|
||||||
self.object_name,
|
|
||||||
contents=contents, headers=headers)
|
|
||||||
client.post_object(self.url, self.token, self.container_name,
|
|
||||||
self.object_name, headers=headers_post)
|
|
||||||
del headers_post['X-Auth-Token'] # WTF, where did this come from?
|
|
||||||
|
|
||||||
# built up a list of node lists to make non-durable,
|
|
||||||
# first try a single node
|
|
||||||
# then adjacent nodes and then nodes >1 node apart
|
|
||||||
opart, onodes = self.object_ring.get_nodes(
|
|
||||||
self.account, self.container_name, self.object_name)
|
|
||||||
single_node = [random.choice(onodes)]
|
|
||||||
adj_nodes = [onodes[0], onodes[-1]]
|
|
||||||
far_nodes = [onodes[0], onodes[-2]]
|
|
||||||
test_list = [single_node, adj_nodes, far_nodes]
|
|
||||||
|
|
||||||
for node_list in test_list:
|
|
||||||
for onode in node_list:
|
|
||||||
try:
|
|
||||||
self._check_node(onode, opart, etag, headers_post)
|
|
||||||
except AssertionError as e:
|
|
||||||
self.fail(
|
|
||||||
str(e) + '\n... for node %r of scenario %r' % (
|
|
||||||
self._format_node(onode),
|
|
||||||
[self._format_node(n) for n in node_list]))
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
unittest.main()
|
|
@ -14,12 +14,16 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
|
|
||||||
|
import errno
|
||||||
|
import json
|
||||||
|
from contextlib import contextmanager
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
import unittest
|
import unittest
|
||||||
import uuid
|
import uuid
|
||||||
import shutil
|
import shutil
|
||||||
import random
|
import random
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
|
import os
|
||||||
|
|
||||||
from test.probe.common import ECProbeTest
|
from test.probe.common import ECProbeTest
|
||||||
|
|
||||||
@ -28,7 +32,7 @@ from swift.common.storage_policy import EC_POLICY
|
|||||||
from swift.common.manager import Manager
|
from swift.common.manager import Manager
|
||||||
from swift.obj.reconstructor import _get_partners
|
from swift.obj.reconstructor import _get_partners
|
||||||
|
|
||||||
from swiftclient import client
|
from swiftclient import client, ClientException
|
||||||
|
|
||||||
|
|
||||||
class Body(object):
|
class Body(object):
|
||||||
@ -67,6 +71,34 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||||
self.reconstructor = Manager(["object-reconstructor"])
|
self.reconstructor = Manager(["object-reconstructor"])
|
||||||
|
|
||||||
|
# create EC container
|
||||||
|
headers = {'X-Storage-Policy': self.policy.name}
|
||||||
|
client.put_container(self.url, self.token, self.container_name,
|
||||||
|
headers=headers)
|
||||||
|
|
||||||
|
# PUT object and POST some metadata
|
||||||
|
contents = Body()
|
||||||
|
headers = {'x-object-meta-foo': 'meta-foo'}
|
||||||
|
self.headers_post = {'x-object-meta-bar': 'meta-bar'}
|
||||||
|
|
||||||
|
self.etag = client.put_object(self.url, self.token,
|
||||||
|
self.container_name,
|
||||||
|
self.object_name,
|
||||||
|
contents=contents, headers=headers)
|
||||||
|
client.post_object(self.url, self.token, self.container_name,
|
||||||
|
self.object_name, headers=dict(self.headers_post))
|
||||||
|
|
||||||
|
self.opart, self.onodes = self.object_ring.get_nodes(
|
||||||
|
self.account, self.container_name, self.object_name)
|
||||||
|
|
||||||
|
# stash frag etags and metadata for later comparison
|
||||||
|
self.frag_headers, self.frag_etags = self._assert_all_nodes_have_frag()
|
||||||
|
for node_index, hdrs in self.frag_headers.items():
|
||||||
|
# sanity check
|
||||||
|
self.assertIn(
|
||||||
|
'X-Backend-Durable-Timestamp', hdrs,
|
||||||
|
'Missing durable timestamp in %r' % self.frag_headers)
|
||||||
|
|
||||||
def proxy_get(self):
|
def proxy_get(self):
|
||||||
# GET object
|
# GET object
|
||||||
headers, body = client.get_object(self.url, self.token,
|
headers, body = client.get_object(self.url, self.token,
|
||||||
@ -76,10 +108,13 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
resp_checksum = md5()
|
resp_checksum = md5()
|
||||||
for chunk in body:
|
for chunk in body:
|
||||||
resp_checksum.update(chunk)
|
resp_checksum.update(chunk)
|
||||||
return resp_checksum.hexdigest()
|
return headers, resp_checksum.hexdigest()
|
||||||
|
|
||||||
def direct_get(self, node, part):
|
def direct_get(self, node, part, require_durable=True):
|
||||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||||
|
if not require_durable:
|
||||||
|
req_headers.update(
|
||||||
|
{'X-Backend-Fragment-Preferences': json.dumps([])})
|
||||||
headers, data = direct_client.direct_get_object(
|
headers, data = direct_client.direct_get_object(
|
||||||
node, part, self.account, self.container_name,
|
node, part, self.account, self.container_name,
|
||||||
self.object_name, headers=req_headers,
|
self.object_name, headers=req_headers,
|
||||||
@ -87,106 +122,168 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
hasher = md5()
|
hasher = md5()
|
||||||
for chunk in data:
|
for chunk in data:
|
||||||
hasher.update(chunk)
|
hasher.update(chunk)
|
||||||
return hasher.hexdigest()
|
return headers, hasher.hexdigest()
|
||||||
|
|
||||||
def _check_node(self, node, part, etag, headers_post):
|
def _break_nodes(self, failed, non_durable):
|
||||||
# get fragment archive etag
|
# delete partitions on the failed nodes and remove durable marker from
|
||||||
fragment_archive_etag = self.direct_get(node, part)
|
# non-durable nodes
|
||||||
|
for i, node in enumerate(self.onodes):
|
||||||
# remove data from the selected node
|
part_dir = self.storage_dir('object', node, part=self.opart)
|
||||||
part_dir = self.storage_dir('object', node, part=part)
|
if i in failed:
|
||||||
shutil.rmtree(part_dir, True)
|
shutil.rmtree(part_dir, True)
|
||||||
|
try:
|
||||||
# this node can't servce the data any more
|
self.direct_get(node, self.opart)
|
||||||
try:
|
except direct_client.DirectClientException as err:
|
||||||
self.direct_get(node, part)
|
self.assertEqual(err.http_status, 404)
|
||||||
except direct_client.DirectClientException as err:
|
elif i in non_durable:
|
||||||
self.assertEqual(err.http_status, 404)
|
for dirs, subdirs, files in os.walk(part_dir):
|
||||||
else:
|
for fname in files:
|
||||||
self.fail('Node data on %r was not fully destoryed!' %
|
if fname.endswith('.data'):
|
||||||
(node,))
|
non_durable_fname = fname.replace('#d', '')
|
||||||
|
os.rename(os.path.join(dirs, fname),
|
||||||
# make sure we can still GET the object and its correct, the
|
os.path.join(dirs, non_durable_fname))
|
||||||
# proxy is doing decode on remaining fragments to get the obj
|
break
|
||||||
self.assertEqual(etag, self.proxy_get())
|
headers, etag = self.direct_get(node, self.opart,
|
||||||
|
require_durable=False)
|
||||||
# fire up reconstructor
|
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
|
||||||
self.reconstructor.once()
|
try:
|
||||||
|
os.remove(os.path.join(part_dir, 'hashes.pkl'))
|
||||||
# fragment is rebuilt exactly as it was before!
|
except OSError as e:
|
||||||
self.assertEqual(fragment_archive_etag,
|
if e.errno != errno.ENOENT:
|
||||||
self.direct_get(node, part))
|
raise
|
||||||
|
|
||||||
# check meta
|
|
||||||
meta = client.head_object(self.url, self.token,
|
|
||||||
self.container_name,
|
|
||||||
self.object_name)
|
|
||||||
for key in headers_post:
|
|
||||||
self.assertIn(key, meta)
|
|
||||||
self.assertEqual(meta[key], headers_post[key])
|
|
||||||
|
|
||||||
def _format_node(self, node):
|
def _format_node(self, node):
|
||||||
return '%s#%s' % (node['device'], node['index'])
|
return '%s#%s' % (node['device'], node['index'])
|
||||||
|
|
||||||
def test_main(self):
|
def _assert_all_nodes_have_frag(self):
|
||||||
# create EC container
|
# check all frags are in place
|
||||||
headers = {'X-Storage-Policy': self.policy.name}
|
failures = []
|
||||||
client.put_container(self.url, self.token, self.container_name,
|
frag_etags = {}
|
||||||
headers=headers)
|
frag_headers = {}
|
||||||
|
for node in self.onodes:
|
||||||
|
try:
|
||||||
|
headers, etag = self.direct_get(node, self.opart)
|
||||||
|
frag_etags[node['index']] = etag
|
||||||
|
del headers['Date'] # Date header will vary so remove it
|
||||||
|
frag_headers[node['index']] = headers
|
||||||
|
except direct_client.DirectClientException as err:
|
||||||
|
failures.append((node, err))
|
||||||
|
if failures:
|
||||||
|
self.fail('\n'.join([' Node %r raised %r' %
|
||||||
|
(self._format_node(node), exc)
|
||||||
|
for (node, exc) in failures]))
|
||||||
|
return frag_headers, frag_etags
|
||||||
|
|
||||||
# PUT object
|
@contextmanager
|
||||||
contents = Body()
|
def _annotate_failure_with_scenario(self, failed, non_durable):
|
||||||
headers = {'x-object-meta-foo': 'meta-foo'}
|
try:
|
||||||
headers_post = {'x-object-meta-bar': 'meta-bar'}
|
yield
|
||||||
|
except (AssertionError, ClientException) as err:
|
||||||
|
self.fail(
|
||||||
|
'Scenario with failed nodes: %r, non-durable nodes: %r\n'
|
||||||
|
' failed with:\n%s' %
|
||||||
|
([self._format_node(self.onodes[n]) for n in failed],
|
||||||
|
[self._format_node(self.onodes[n]) for n in non_durable], err)
|
||||||
|
)
|
||||||
|
|
||||||
etag = client.put_object(self.url, self.token,
|
def _test_rebuild_scenario(self, failed, non_durable,
|
||||||
self.container_name,
|
reconstructor_cycles):
|
||||||
self.object_name,
|
# helper method to test a scenario with some nodes missing their
|
||||||
contents=contents, headers=headers)
|
# fragment and some nodes having non-durable fragments
|
||||||
client.post_object(self.url, self.token, self.container_name,
|
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||||
self.object_name, headers=headers_post)
|
self._break_nodes(failed, non_durable)
|
||||||
del headers_post['X-Auth-Token'] # WTF, where did this come from?
|
|
||||||
|
|
||||||
# built up a list of node lists to kill data from,
|
# make sure we can still GET the object and it is correct; the
|
||||||
|
# proxy is doing decode on remaining fragments to get the obj
|
||||||
|
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||||
|
headers, etag = self.proxy_get()
|
||||||
|
self.assertEqual(self.etag, etag)
|
||||||
|
for key in self.headers_post:
|
||||||
|
self.assertIn(key, headers)
|
||||||
|
self.assertEqual(self.headers_post[key], headers[key])
|
||||||
|
|
||||||
|
# fire up reconstructor
|
||||||
|
for i in range(reconstructor_cycles):
|
||||||
|
self.reconstructor.once()
|
||||||
|
|
||||||
|
# check GET via proxy returns expected data and metadata
|
||||||
|
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||||
|
headers, etag = self.proxy_get()
|
||||||
|
self.assertEqual(self.etag, etag)
|
||||||
|
for key in self.headers_post:
|
||||||
|
self.assertIn(key, headers)
|
||||||
|
self.assertEqual(self.headers_post[key], headers[key])
|
||||||
|
# check all frags are intact, durable and have expected metadata
|
||||||
|
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||||
|
frag_headers, frag_etags = self._assert_all_nodes_have_frag()
|
||||||
|
self.assertEqual(self.frag_etags, frag_etags)
|
||||||
|
# self._frag_headers include X-Backend-Durable-Timestamp so this
|
||||||
|
# assertion confirms that the rebuilt frags are all durable
|
||||||
|
self.assertEqual(self.frag_headers, frag_headers)
|
||||||
|
|
||||||
|
def test_rebuild_missing_frags(self):
|
||||||
|
# build up a list of node lists to kill data from,
|
||||||
# first try a single node
|
# first try a single node
|
||||||
# then adjacent nodes and then nodes >1 node apart
|
# then adjacent nodes and then nodes >1 node apart
|
||||||
opart, onodes = self.object_ring.get_nodes(
|
single_node = (random.randint(0, 5),)
|
||||||
self.account, self.container_name, self.object_name)
|
adj_nodes = (0, 5)
|
||||||
single_node = [random.choice(onodes)]
|
far_nodes = (0, 4)
|
||||||
adj_nodes = [onodes[0], onodes[-1]]
|
|
||||||
far_nodes = [onodes[0], onodes[-2]]
|
|
||||||
test_list = [single_node, adj_nodes, far_nodes]
|
|
||||||
|
|
||||||
for node_list in test_list:
|
for failed_nodes in [single_node, adj_nodes, far_nodes]:
|
||||||
for onode in node_list:
|
self._test_rebuild_scenario(failed_nodes, [], 1)
|
||||||
try:
|
|
||||||
self._check_node(onode, opart, etag, headers_post)
|
def test_rebuild_non_durable_frags(self):
|
||||||
except AssertionError as e:
|
# build up a list of node lists to make non-durable,
|
||||||
self.fail(
|
# first try a single node
|
||||||
str(e) + '\n... for node %r of scenario %r' % (
|
# then adjacent nodes and then nodes >1 node apart
|
||||||
self._format_node(onode),
|
single_node = (random.randint(0, 5),)
|
||||||
[self._format_node(n) for n in node_list]))
|
adj_nodes = (0, 5)
|
||||||
|
far_nodes = (0, 4)
|
||||||
|
|
||||||
|
for non_durable_nodes in [single_node, adj_nodes, far_nodes]:
|
||||||
|
self._test_rebuild_scenario([], non_durable_nodes, 1)
|
||||||
|
|
||||||
|
def test_rebuild_with_missing_frags_and_non_durable_frags(self):
|
||||||
|
# pick some nodes with parts deleted, some with non-durable fragments
|
||||||
|
scenarios = [
|
||||||
|
# failed, non-durable
|
||||||
|
((0, 2), (4,)),
|
||||||
|
((0, 4), (2,)),
|
||||||
|
]
|
||||||
|
for failed, non_durable in scenarios:
|
||||||
|
self._test_rebuild_scenario(failed, non_durable, 3)
|
||||||
|
scenarios = [
|
||||||
|
# failed, non-durable
|
||||||
|
((0, 1), (2,)),
|
||||||
|
((0, 2), (1,)),
|
||||||
|
]
|
||||||
|
for failed, non_durable in scenarios:
|
||||||
|
# why 2 repeats? consider missing fragment on nodes 0, 1 and
|
||||||
|
# missing durable on node 2: first reconstructor cycle on node 3
|
||||||
|
# will make node 2 durable, first cycle on node 5 will rebuild on
|
||||||
|
# node 0; second cycle on node 0 or 2 will rebuild on node 1. Note
|
||||||
|
# that it is possible, that reconstructor processes on each node
|
||||||
|
# run in order such that all rebuild complete in once cycle, but
|
||||||
|
# that is not guaranteed, we allow 2 cycles to be sure.
|
||||||
|
self._test_rebuild_scenario(failed, non_durable, 2)
|
||||||
|
scenarios = [
|
||||||
|
# failed, non-durable
|
||||||
|
((0, 2), (1, 3, 5)),
|
||||||
|
((0,), (1, 2, 4, 5)),
|
||||||
|
]
|
||||||
|
for failed, non_durable in scenarios:
|
||||||
|
# why 3 repeats? consider missing fragment on node 0 and single
|
||||||
|
# durable on node 3: first reconstructor cycle on node 3 will make
|
||||||
|
# nodes 2 and 4 durable, second cycle on nodes 2 and 4 will make
|
||||||
|
# node 1 and 5 durable, third cycle on nodes 1 or 5 will
|
||||||
|
# reconstruct the missing fragment on node 0.
|
||||||
|
self._test_rebuild_scenario(failed, non_durable, 3)
|
||||||
|
|
||||||
def test_rebuild_partner_down(self):
|
def test_rebuild_partner_down(self):
|
||||||
# create EC container
|
|
||||||
headers = {'X-Storage-Policy': self.policy.name}
|
|
||||||
client.put_container(self.url, self.token, self.container_name,
|
|
||||||
headers=headers)
|
|
||||||
|
|
||||||
# PUT object
|
|
||||||
contents = Body()
|
|
||||||
client.put_object(self.url, self.token,
|
|
||||||
self.container_name,
|
|
||||||
self.object_name,
|
|
||||||
contents=contents)
|
|
||||||
|
|
||||||
opart, onodes = self.object_ring.get_nodes(
|
|
||||||
self.account, self.container_name, self.object_name)
|
|
||||||
|
|
||||||
# find a primary server that only has one of it's devices in the
|
# find a primary server that only has one of it's devices in the
|
||||||
# primary node list
|
# primary node list
|
||||||
group_nodes_by_config = defaultdict(list)
|
group_nodes_by_config = defaultdict(list)
|
||||||
for n in onodes:
|
for n in self.onodes:
|
||||||
group_nodes_by_config[self.config_number(n)].append(n)
|
group_nodes_by_config[self.config_number(n)].append(n)
|
||||||
for config_number, node_list in group_nodes_by_config.items():
|
for config_number, node_list in group_nodes_by_config.items():
|
||||||
if len(node_list) == 1:
|
if len(node_list) == 1:
|
||||||
@ -197,27 +294,32 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
|
|
||||||
# pick one it's partners to fail randomly
|
# pick one it's partners to fail randomly
|
||||||
partner_node = random.choice(_get_partners(
|
partner_node = random.choice(_get_partners(
|
||||||
primary_node['index'], onodes))
|
primary_node['index'], self.onodes))
|
||||||
|
|
||||||
# 507 the partner device
|
# 507 the partner device
|
||||||
device_path = self.device_dir('object', partner_node)
|
device_path = self.device_dir('object', partner_node)
|
||||||
self.kill_drive(device_path)
|
self.kill_drive(device_path)
|
||||||
|
|
||||||
# select another primary sync_to node to fail
|
# select another primary sync_to node to fail
|
||||||
failed_primary = [n for n in onodes if n['id'] not in
|
failed_primary = [n for n in self.onodes if n['id'] not in
|
||||||
(primary_node['id'], partner_node['id'])][0]
|
(primary_node['id'], partner_node['id'])][0]
|
||||||
# ... capture it's fragment etag
|
# ... capture it's fragment etag
|
||||||
failed_primary_etag = self.direct_get(failed_primary, opart)
|
failed_primary_meta, failed_primary_etag = self.direct_get(
|
||||||
|
failed_primary, self.opart)
|
||||||
# ... and delete it
|
# ... and delete it
|
||||||
part_dir = self.storage_dir('object', failed_primary, part=opart)
|
part_dir = self.storage_dir('object', failed_primary, part=self.opart)
|
||||||
shutil.rmtree(part_dir, True)
|
shutil.rmtree(part_dir, True)
|
||||||
|
|
||||||
# reconstruct from the primary, while one of it's partners is 507'd
|
# reconstruct from the primary, while one of it's partners is 507'd
|
||||||
self.reconstructor.once(number=self.config_number(primary_node))
|
self.reconstructor.once(number=self.config_number(primary_node))
|
||||||
|
|
||||||
# the other failed primary will get it's fragment rebuilt instead
|
# the other failed primary will get it's fragment rebuilt instead
|
||||||
self.assertEqual(failed_primary_etag,
|
failed_primary_meta_new, failed_primary_etag_new = self.direct_get(
|
||||||
self.direct_get(failed_primary, opart))
|
failed_primary, self.opart)
|
||||||
|
del failed_primary_meta['Date']
|
||||||
|
del failed_primary_meta_new['Date']
|
||||||
|
self.assertEqual(failed_primary_etag, failed_primary_etag_new)
|
||||||
|
self.assertEqual(failed_primary_meta, failed_primary_meta_new)
|
||||||
|
|
||||||
# just to be nice
|
# just to be nice
|
||||||
self.revive_drive(device_path)
|
self.revive_drive(device_path)
|
||||||
|
@ -13,6 +13,7 @@
|
|||||||
# See the License for the specific language governing permissions and
|
# See the License for the specific language governing permissions and
|
||||||
# limitations under the License.
|
# limitations under the License.
|
||||||
import itertools
|
import itertools
|
||||||
|
import json
|
||||||
import unittest
|
import unittest
|
||||||
import os
|
import os
|
||||||
from hashlib import md5
|
from hashlib import md5
|
||||||
@ -2508,6 +2509,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': '0',
|
'Content-Length': '0',
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||||
@ -2541,16 +2543,24 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
job, node, metadata)
|
job, node, metadata)
|
||||||
self.assertEqual(0, df.content_length)
|
self.assertEqual(0, df.content_length)
|
||||||
fixed_body = ''.join(df.reader())
|
fixed_body = ''.join(df.reader())
|
||||||
self.assertEqual(len(fixed_body), len(broken_body))
|
self.assertEqual(len(fixed_body), len(broken_body))
|
||||||
self.assertEqual(md5(fixed_body).hexdigest(),
|
self.assertEqual(md5(fixed_body).hexdigest(),
|
||||||
md5(broken_body).hexdigest())
|
md5(broken_body).hexdigest())
|
||||||
for called_header in called_headers:
|
self.assertEqual(len(part_nodes) - 1, len(called_headers))
|
||||||
called_header = HeaderKeyDict(called_header)
|
for called_header in called_headers:
|
||||||
self.assertTrue('Content-Length' in called_header)
|
called_header = HeaderKeyDict(called_header)
|
||||||
self.assertEqual(called_header['Content-Length'], '0')
|
self.assertIn('Content-Length', called_header)
|
||||||
self.assertTrue('User-Agent' in called_header)
|
self.assertEqual(called_header['Content-Length'], '0')
|
||||||
user_agent = called_header['User-Agent']
|
self.assertIn('User-Agent', called_header)
|
||||||
self.assertTrue(user_agent.startswith('obj-reconstructor'))
|
user_agent = called_header['User-Agent']
|
||||||
|
self.assertTrue(user_agent.startswith('obj-reconstructor'))
|
||||||
|
self.assertIn('X-Backend-Storage-Policy-Index', called_header)
|
||||||
|
self.assertEqual(called_header['X-Backend-Storage-Policy-Index'],
|
||||||
|
self.policy)
|
||||||
|
self.assertIn('X-Backend-Fragment-Preferences', called_header)
|
||||||
|
self.assertEqual(
|
||||||
|
[{'timestamp': '1234567890.12345', 'exclude': []}],
|
||||||
|
json.loads(called_header['X-Backend-Fragment-Preferences']))
|
||||||
|
|
||||||
def test_reconstruct_fa_errors_works(self):
|
def test_reconstruct_fa_errors_works(self):
|
||||||
job = {
|
job = {
|
||||||
@ -2563,6 +2573,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||||
@ -2604,6 +2615,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
# make up some data (trim some amount to make it unaligned with
|
# make up some data (trim some amount to make it unaligned with
|
||||||
@ -2647,6 +2659,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
possible_errors = [404, Timeout(), Exception('kaboom!')]
|
possible_errors = [404, Timeout(), Exception('kaboom!')]
|
||||||
@ -2667,6 +2680,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||||
@ -2717,6 +2731,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||||
@ -2768,6 +2783,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||||
@ -2809,6 +2825,7 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
'name': '/a/c/o',
|
'name': '/a/c/o',
|
||||||
'Content-Length': 0,
|
'Content-Length': 0,
|
||||||
'ETag': 'etag',
|
'ETag': 'etag',
|
||||||
|
'X-Timestamp': '1234567890.12345'
|
||||||
}
|
}
|
||||||
|
|
||||||
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
|
||||||
|
Loading…
Reference in New Issue
Block a user