Refactor reconstructor probe tests
Refactor the reconstructor probe test to share common setup and helper methods. Change-Id: If75803648169f85b854c3d5d8784aaebbd93805b
This commit is contained in:
parent
73cc68f55b
commit
128f199508
@ -16,6 +16,7 @@
|
||||
from __future__ import print_function
|
||||
|
||||
import errno
|
||||
import json
|
||||
import os
|
||||
from subprocess import Popen, PIPE
|
||||
import sys
|
||||
@ -26,6 +27,7 @@ from collections import defaultdict
|
||||
import unittest
|
||||
from uuid import uuid4
|
||||
import shutil
|
||||
import six
|
||||
from six.moves.http_client import HTTPConnection
|
||||
from six.moves.urllib.parse import urlparse
|
||||
|
||||
@ -606,6 +608,113 @@ class ECProbeTest(ProbeTest):
|
||||
obj_required_devices = 8
|
||||
policy_requirements = {'policy_type': EC_POLICY}
|
||||
|
||||
def _make_name(self, prefix):
|
||||
return ('%s%s' % (prefix, uuid4())).encode()
|
||||
|
||||
def setUp(self):
|
||||
super(ECProbeTest, self).setUp()
|
||||
self.container_name = self._make_name('container-')
|
||||
self.object_name = self._make_name('object-')
|
||||
# sanity
|
||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||
self.reconstructor = Manager(["object-reconstructor"])
|
||||
|
||||
def proxy_put(self, extra_headers=None):
|
||||
contents = Body()
|
||||
headers = {
|
||||
self._make_name('x-object-meta-').decode('utf8'):
|
||||
self._make_name('meta-foo-').decode('utf8'),
|
||||
}
|
||||
if extra_headers:
|
||||
headers.update(extra_headers)
|
||||
self.etag = client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents, headers=headers)
|
||||
|
||||
def proxy_get(self):
|
||||
# GET object
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
resp_chunk_size=64 * 2 ** 10)
|
||||
resp_checksum = md5(usedforsecurity=False)
|
||||
for chunk in body:
|
||||
resp_checksum.update(chunk)
|
||||
return headers, resp_checksum.hexdigest()
|
||||
|
||||
def direct_get(self, node, part, require_durable=True, extra_headers=None):
|
||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||
if extra_headers:
|
||||
req_headers.update(extra_headers)
|
||||
if not require_durable:
|
||||
req_headers.update(
|
||||
{'X-Backend-Fragment-Preferences': json.dumps([])})
|
||||
# node dict has unicode values so utf8 decode our path parts too in
|
||||
# case they have non-ascii characters
|
||||
if six.PY2:
|
||||
acc, con, obj = (s.decode('utf8') for s in (
|
||||
self.account, self.container_name, self.object_name))
|
||||
else:
|
||||
acc, con, obj = self.account, self.container_name, self.object_name
|
||||
headers, data = direct_client.direct_get_object(
|
||||
node, part, acc, con, obj, headers=req_headers,
|
||||
resp_chunk_size=64 * 2 ** 20)
|
||||
hasher = md5(usedforsecurity=False)
|
||||
for chunk in data:
|
||||
hasher.update(chunk)
|
||||
return headers, hasher.hexdigest()
|
||||
|
||||
def assert_direct_get_fails(self, onode, opart, status,
|
||||
require_durable=True):
|
||||
try:
|
||||
self.direct_get(onode, opart, require_durable=require_durable)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, status)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destroyed!' % (onode,))
|
||||
|
||||
def assert_direct_get_succeeds(self, onode, opart, require_durable=True,
|
||||
extra_headers=None):
|
||||
try:
|
||||
self.direct_get(onode, opart, require_durable=require_durable,
|
||||
extra_headers=extra_headers)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.fail('Node data on %r was not available: %s' % (onode, err))
|
||||
|
||||
def break_nodes(self, nodes, opart, failed, non_durable):
|
||||
# delete partitions on the failed nodes and remove durable marker from
|
||||
# non-durable nodes
|
||||
made_non_durable = 0
|
||||
for i, node in enumerate(nodes):
|
||||
part_dir = self.storage_dir(node, part=opart)
|
||||
if i in failed:
|
||||
shutil.rmtree(part_dir, True)
|
||||
try:
|
||||
self.direct_get(node, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
elif i in non_durable:
|
||||
for dirs, subdirs, files in os.walk(part_dir):
|
||||
for fname in sorted(files, reverse=True):
|
||||
# make the newest durable be non-durable
|
||||
if fname.endswith('.data'):
|
||||
made_non_durable += 1
|
||||
non_durable_fname = fname.replace('#d', '')
|
||||
os.rename(os.path.join(dirs, fname),
|
||||
os.path.join(dirs, non_durable_fname))
|
||||
|
||||
break
|
||||
headers, etag = self.direct_get(node, opart,
|
||||
require_durable=False)
|
||||
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
|
||||
try:
|
||||
os.remove(os.path.join(part_dir, 'hashes.pkl'))
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
return made_non_durable
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
for server in ('account', 'container'):
|
||||
|
@ -14,14 +14,10 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import errno
|
||||
import json
|
||||
from contextlib import contextmanager
|
||||
import unittest
|
||||
import uuid
|
||||
import shutil
|
||||
import random
|
||||
import os
|
||||
import time
|
||||
import six
|
||||
|
||||
@ -30,8 +26,6 @@ from swift.common.utils import md5
|
||||
from test.probe.common import ECProbeTest
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import EC_POLICY
|
||||
from swift.common.manager import Manager
|
||||
|
||||
from swiftclient import client, ClientException
|
||||
|
||||
@ -64,17 +58,8 @@ class Body(object):
|
||||
|
||||
class TestReconstructorRebuild(ECProbeTest):
|
||||
|
||||
def _make_name(self, prefix):
|
||||
return ('%s%s' % (prefix, uuid.uuid4())).encode()
|
||||
|
||||
def setUp(self):
|
||||
super(TestReconstructorRebuild, self).setUp()
|
||||
self.container_name = self._make_name('container-')
|
||||
self.object_name = self._make_name('object-')
|
||||
# sanity
|
||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||
self.reconstructor = Manager(["object-reconstructor"])
|
||||
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
client.put_container(self.url, self.token, self.container_name,
|
||||
@ -99,80 +84,6 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
'X-Backend-Durable-Timestamp', hdrs,
|
||||
'Missing durable timestamp in %r' % self.frag_headers)
|
||||
|
||||
def proxy_put(self, extra_headers=None):
|
||||
contents = Body()
|
||||
headers = {
|
||||
self._make_name('x-object-meta-').decode('utf8'):
|
||||
self._make_name('meta-foo-').decode('utf8'),
|
||||
}
|
||||
if extra_headers:
|
||||
headers.update(extra_headers)
|
||||
self.etag = client.put_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
contents=contents, headers=headers)
|
||||
|
||||
def proxy_get(self):
|
||||
# GET object
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
resp_chunk_size=64 * 2 ** 10)
|
||||
resp_checksum = md5(usedforsecurity=False)
|
||||
for chunk in body:
|
||||
resp_checksum.update(chunk)
|
||||
return headers, resp_checksum.hexdigest()
|
||||
|
||||
def direct_get(self, node, part, require_durable=True, extra_headers=None):
|
||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||
if extra_headers:
|
||||
req_headers.update(extra_headers)
|
||||
if not require_durable:
|
||||
req_headers.update(
|
||||
{'X-Backend-Fragment-Preferences': json.dumps([])})
|
||||
# node dict has unicode values so utf8 decode our path parts too in
|
||||
# case they have non-ascii characters
|
||||
if six.PY2:
|
||||
acc, con, obj = (s.decode('utf8') for s in (
|
||||
self.account, self.container_name, self.object_name))
|
||||
else:
|
||||
acc, con, obj = self.account, self.container_name, self.object_name
|
||||
headers, data = direct_client.direct_get_object(
|
||||
node, part, acc, con, obj, headers=req_headers,
|
||||
resp_chunk_size=64 * 2 ** 20)
|
||||
hasher = md5(usedforsecurity=False)
|
||||
for chunk in data:
|
||||
hasher.update(chunk)
|
||||
return headers, hasher.hexdigest()
|
||||
|
||||
def _break_nodes(self, failed, non_durable):
|
||||
# delete partitions on the failed nodes and remove durable marker from
|
||||
# non-durable nodes
|
||||
for i, node in enumerate(self.onodes):
|
||||
part_dir = self.storage_dir(node, part=self.opart)
|
||||
if i in failed:
|
||||
shutil.rmtree(part_dir, True)
|
||||
try:
|
||||
self.direct_get(node, self.opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
elif i in non_durable:
|
||||
for dirs, subdirs, files in os.walk(part_dir):
|
||||
for fname in files:
|
||||
if fname.endswith('.data'):
|
||||
non_durable_fname = fname.replace('#d', '')
|
||||
os.rename(os.path.join(dirs, fname),
|
||||
os.path.join(dirs, non_durable_fname))
|
||||
break
|
||||
headers, etag = self.direct_get(node, self.opart,
|
||||
require_durable=False)
|
||||
self.assertNotIn('X-Backend-Durable-Timestamp', headers)
|
||||
try:
|
||||
os.remove(os.path.join(part_dir, 'hashes.pkl'))
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def _format_node(self, node):
|
||||
return '%s#%s' % (node['device'], node['index'])
|
||||
|
||||
@ -213,7 +124,7 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
# helper method to test a scenario with some nodes missing their
|
||||
# fragment and some nodes having non-durable fragments
|
||||
with self._annotate_failure_with_scenario(failed, non_durable):
|
||||
self._break_nodes(failed, non_durable)
|
||||
self.break_nodes(self.onodes, self.opart, failed, non_durable)
|
||||
|
||||
# make sure we can still GET the object and it is correct; the
|
||||
# proxy is doing decode on remaining fragments to get the obj
|
||||
@ -370,10 +281,12 @@ class TestReconstructorRebuild(ECProbeTest):
|
||||
|
||||
# sanity check - X-Backend-Replication let's us get expired frag...
|
||||
fail_node = random.choice(self.onodes)
|
||||
self.direct_get(fail_node, self.opart,
|
||||
extra_headers={'X-Backend-Replication': 'True'})
|
||||
self.assert_direct_get_succeeds(
|
||||
fail_node, self.opart,
|
||||
extra_headers={'X-Backend-Replication': 'True'})
|
||||
# ...until we remove the frag from fail_node
|
||||
self._break_nodes([self.onodes.index(fail_node)], [])
|
||||
self.break_nodes(
|
||||
self.onodes, self.opart, [self.onodes.index(fail_node)], [])
|
||||
# ...now it's really gone
|
||||
with self.assertRaises(DirectClientException) as cm:
|
||||
self.direct_get(fail_node, self.opart,
|
||||
|
@ -16,7 +16,6 @@
|
||||
|
||||
import itertools
|
||||
import unittest
|
||||
import uuid
|
||||
import random
|
||||
import shutil
|
||||
from collections import defaultdict
|
||||
@ -24,9 +23,6 @@ from collections import defaultdict
|
||||
from test.probe.common import ECProbeTest, Body
|
||||
|
||||
from swift.common import direct_client
|
||||
from swift.common.storage_policy import EC_POLICY
|
||||
from swift.common.manager import Manager
|
||||
from swift.common.utils import md5
|
||||
from swift.obj import reconstructor
|
||||
|
||||
from swiftclient import client
|
||||
@ -34,37 +30,6 @@ from swiftclient import client
|
||||
|
||||
class TestReconstructorRevert(ECProbeTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestReconstructorRevert, self).setUp()
|
||||
self.container_name = 'container-%s' % uuid.uuid4()
|
||||
self.object_name = 'object-%s' % uuid.uuid4()
|
||||
|
||||
# sanity
|
||||
self.assertEqual(self.policy.policy_type, EC_POLICY)
|
||||
self.reconstructor = Manager(["object-reconstructor"])
|
||||
|
||||
def proxy_get(self):
|
||||
# GET object
|
||||
headers, body = client.get_object(self.url, self.token,
|
||||
self.container_name,
|
||||
self.object_name,
|
||||
resp_chunk_size=64 * 2 ** 10)
|
||||
resp_checksum = md5(usedforsecurity=False)
|
||||
for chunk in body:
|
||||
resp_checksum.update(chunk)
|
||||
return resp_checksum.hexdigest()
|
||||
|
||||
def direct_get(self, node, part):
|
||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||
headers, data = direct_client.direct_get_object(
|
||||
node, part, self.account, self.container_name,
|
||||
self.object_name, headers=req_headers,
|
||||
resp_chunk_size=64 * 2 ** 20)
|
||||
hasher = md5(usedforsecurity=False)
|
||||
for chunk in data:
|
||||
hasher.update(chunk)
|
||||
return hasher.hexdigest()
|
||||
|
||||
def test_revert_object(self):
|
||||
# create EC container
|
||||
headers = {'X-Storage-Policy': self.policy.name}
|
||||
@ -99,30 +64,18 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
# these primaries can't serve the data any more, we expect 507
|
||||
# here and not 404 because we're using mount_check to kill nodes
|
||||
for onode in (onodes[0], onodes[1]):
|
||||
try:
|
||||
self.direct_get(onode, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 507)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(onode,))
|
||||
self.assert_direct_get_fails(onode, opart, 507)
|
||||
|
||||
# now take out another primary
|
||||
p_dev3 = self.device_dir(onodes[2])
|
||||
self.kill_drive(p_dev3)
|
||||
|
||||
# this node can't servce the data any more
|
||||
try:
|
||||
self.direct_get(onodes[2], opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 507)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(onode,))
|
||||
# this node can't serve the data any more
|
||||
self.assert_direct_get_fails(onodes[2], opart, 507)
|
||||
|
||||
# make sure we can still GET the object and its correct
|
||||
# we're now pulling from handoffs and reconstructing
|
||||
etag = self.proxy_get()
|
||||
_headers, etag = self.proxy_get()
|
||||
self.assertEqual(etag, contents.etag)
|
||||
|
||||
# rename the dev dirs so they don't 507 anymore
|
||||
@ -137,7 +90,7 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
|
||||
# first three primaries have data again
|
||||
for onode in (onodes[0], onodes[2]):
|
||||
self.direct_get(onode, opart)
|
||||
self.assert_direct_get_succeeds(onode, opart)
|
||||
|
||||
# check meta
|
||||
meta = client.head_object(self.url, self.token,
|
||||
@ -149,13 +102,7 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
|
||||
# handoffs are empty
|
||||
for hnode in hnodes:
|
||||
try:
|
||||
self.direct_get(hnode, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
self.assertEqual(err.http_status, 404)
|
||||
else:
|
||||
self.fail('Node data on %r was not fully destroyed!' %
|
||||
(hnode,))
|
||||
self.assert_direct_get_fails(hnode, opart, 404)
|
||||
|
||||
def test_delete_propagate(self):
|
||||
# create EC container
|
||||
@ -285,12 +232,13 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
|
||||
# fix the primary device and sanity GET
|
||||
self.revive_drive(primary_device)
|
||||
self.assertEqual(etag, self.proxy_get())
|
||||
_headers, actual_etag = self.proxy_get()
|
||||
self.assertEqual(etag, actual_etag)
|
||||
|
||||
# find a handoff holding the fragment
|
||||
for hnode in self.object_ring.get_more_nodes(opart):
|
||||
try:
|
||||
reverted_fragment_etag = self.direct_get(hnode, opart)
|
||||
_hdrs, reverted_fragment_etag = self.direct_get(hnode, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
@ -308,7 +256,7 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
# we'll keep track of the etag of this fragment we're removing
|
||||
# in case we need it later (queue forshadowing music)...
|
||||
try:
|
||||
handoff_fragment_etag = self.direct_get(node, opart)
|
||||
_hdrs, handoff_fragment_etag = self.direct_get(node, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
@ -324,14 +272,14 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
|
||||
# verify fragment reverted to primary server
|
||||
self.assertEqual(reverted_fragment_etag,
|
||||
self.direct_get(primary_node, opart))
|
||||
self.direct_get(primary_node, opart)[1])
|
||||
|
||||
# now we'll remove some data on one of the primary node's partners
|
||||
partner = random.choice(reconstructor._get_partners(
|
||||
primary_node['index'], onodes))
|
||||
|
||||
try:
|
||||
rebuilt_fragment_etag = self.direct_get(partner, opart)
|
||||
_hdrs, rebuilt_fragment_etag = self.direct_get(partner, opart)
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
@ -361,7 +309,7 @@ class TestReconstructorRevert(ECProbeTest):
|
||||
# and validate the partners rebuilt_fragment_etag
|
||||
try:
|
||||
self.assertEqual(rebuilt_fragment_etag,
|
||||
self.direct_get(partner, opart))
|
||||
self.direct_get(partner, opart)[1])
|
||||
except direct_client.DirectClientException as err:
|
||||
if err.http_status != 404:
|
||||
raise
|
||||
|
Loading…
Reference in New Issue
Block a user