pyeclib/test/test_pyeclib_api.py
2024-10-22 18:37:26 +00:00

849 lines
33 KiB
Python

# Copyright (c) 2013, Kevin Greenan (kmgreen2@gmail.com)
# All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions are met:
#
# Redistributions of source code must retain the above copyright notice, this
# list of conditions and the following disclaimer.
#
# Redistributions in binary form must reproduce the above copyright notice,
# this list of conditions and the following disclaimer in the documentation
# and/or other materials provided with the distribution. THIS SOFTWARE IS
# PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS
# OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES
# OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN
# NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE FOR ANY
# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
# (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
# LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF
# THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import os
try:
import queue
except ImportError: # py2
import Queue as queue
import random
import resource
import string
import sys
import tempfile
import threading
import unittest
from itertools import combinations
import six
import pyeclib.ec_iface
from pyeclib.ec_iface import ECBackendNotSupported
from pyeclib.ec_iface import ECDriver
from pyeclib.ec_iface import ECDriverError
from pyeclib.ec_iface import ECInsufficientFragments
from pyeclib.ec_iface import ECInvalidFragmentMetadata
from pyeclib.ec_iface import ECInvalidParameter
from pyeclib.ec_iface import PyECLib_EC_Types
from pyeclib.ec_iface import ALL_EC_TYPES
from pyeclib.ec_iface import VALID_EC_TYPES
if sys.version < '3':
def b2i(b):
return ord(b)
else:
def b2i(b):
return b
class TestNullDriver(unittest.TestCase):
def setUp(self):
self.null_driver = ECDriver(
library_import_str="pyeclib.core.ECNullDriver", k=8, m=2)
def tearDown(self):
pass
def test_null_driver(self):
self.null_driver.encode('')
self.null_driver.decode([])
class TestStripeDriver(unittest.TestCase):
def setUp(self):
self.stripe_driver = ECDriver(
library_import_str="pyeclib.core.ECStripingDriver", k=8, m=0)
def tearDown(self):
pass
class TestPyECLibDriver(unittest.TestCase):
@classmethod
def setUpClass(cls):
# Create the temp files needed for testing
cls.file_sizes = ["100-K"]
cls.files = {}
cls.num_iterations = 100
cls._create_tmp_files()
@classmethod
def _create_tmp_files(cls):
"""
Create the temporary files needed for testing. Use the tempfile
package so that the files will be automatically removed during
garbage collection.
"""
for size_str in cls.file_sizes:
# Determine the size of the file to create
size_desc = size_str.split("-")
size = int(size_desc[0])
if size_desc[1] == 'M':
size *= 1000000
elif size_desc[1] == 'K':
size *= 1000
# Create the dictionary of files to test with
buf = ''.join(random.choice(string.ascii_letters)
for i in range(size))
if sys.version_info >= (3,):
buf = buf.encode('ascii')
tmp_file = tempfile.NamedTemporaryFile()
tmp_file.write(buf)
cls.files[size_str] = tmp_file
def setUp(self):
# Ensure that the file offset is set to the head of the file
for _, tmp_file in self.files.items():
tmp_file.seek(0, 0)
@classmethod
def tearDownClass(cls):
for tmp_file in cls.files.values():
tmp_file.close()
def test_missing_required_args(self):
# missing ec_type
with self.assertRaises(ECDriverError) as err_context:
ECDriver(k=1, m=1)
self.assertEqual(str(err_context.exception),
"Invalid Argument: either ec_type or "
"library_import_str must be provided")
for ec_type in VALID_EC_TYPES:
# missing k
with self.assertRaises(ECDriverError) as err_context:
ECDriver(ec_type=ec_type, m=1)
self.assertEqual(str(err_context.exception),
"Invalid Argument: k is required")
# missing m
with self.assertRaises(ECDriverError) as err_context:
ECDriver(ec_type=ec_type, k=1)
self.assertEqual(str(err_context.exception),
"Invalid Argument: m is required")
def test_invalid_km_args(self):
for ec_type in VALID_EC_TYPES:
with self.assertRaises(ECDriverError) as err_context:
# k is smaller than 1
ECDriver(ec_type=ec_type, k=-100, m=1)
self.assertEqual(str(err_context.exception),
"Invalid number of data fragments (k)")
with self.assertRaises(ECDriverError) as err_context:
# m is smaller than 1
ECDriver(ec_type=ec_type, k=1, m=-100)
self.assertEqual(str(err_context.exception),
"Invalid number of parity fragments (m)")
def test_valid_ec_types(self):
# Build list of available types and compare to VALID_EC_TYPES
available_ec_types = []
for _type in ALL_EC_TYPES:
try:
if _type == 'shss':
_k = 10
_m = 4
elif _type == 'libphazr':
_k = 4
_m = 4
else:
_k = 10
_m = 5
ECDriver(k=_k, m=_m, ec_type=_type, validate=True)
available_ec_types.append(_type)
except Exception:
# ignore any errors, assume backend not available
pass
self.assertEqual(available_ec_types, VALID_EC_TYPES)
def test_create_in_threads(self):
def create_backend(kwargs, q):
q.put(ECDriver(**kwargs))
for backend in VALID_EC_TYPES:
q = queue.Queue()
threads = [
threading.Thread(
target=create_backend,
args=({'k': 10, 'm': 5, 'ec_type': backend}, q))
for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
results = []
while not q.empty():
results.append(q.get())
self.assertEqual(len(results), 5)
def test_valid_algo(self):
print("")
for _type in ALL_EC_TYPES:
# Check if this algo works
if _type not in VALID_EC_TYPES:
print("Skipping test for %s backend" % _type)
continue
try:
if _type == 'shss':
ECDriver(k=10, m=4, ec_type=_type)
elif _type == 'libphazr':
ECDriver(k=4, m=4, ec_type=_type)
else:
ECDriver(k=10, m=5, ec_type=_type)
except ECDriverError:
self.fail("%s algorithm not supported" % _type)
self.assertRaises(ECBackendNotSupported, ECDriver, k=10, m=5,
ec_type="invalid_algo")
def get_pyeclib_testspec(self, csum="none"):
pyeclib_drivers = []
_type1 = 'jerasure_rs_vand'
if _type1 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type1,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type1,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type1,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type1,
chksum_type=csum))
_type2 = 'liberasurecode_rs_vand'
if _type2 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type2,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type2,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type2,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type2,
chksum_type=csum))
_type3_1 = 'flat_xor_hd'
if _type3_1 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=6, ec_type=_type3_1,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=5, ec_type=_type3_1,
chksum_type=csum))
_type3_2 = 'flat_xor_hd_4'
if _type3_2 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=6, ec_type=_type3_2,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=5, ec_type=_type3_2,
chksum_type=csum))
_type4 = 'shss'
if _type4 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=10, m=4, ec_type=_type4,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=20, m=4, ec_type=_type4,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=7, ec_type=_type4,
chksum_type=csum))
_type5 = 'isa_l_rs_vand'
if _type5 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type5,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type5,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type5,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type5,
chksum_type=csum))
_type6 = 'isa_l_rs_cauchy'
if _type6 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=12, m=2, ec_type=_type6,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=2, ec_type=_type6,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=10, m=2, ec_type=_type6,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=8, m=4, ec_type=_type6,
chksum_type=csum))
pyeclib_drivers.append(ECDriver(k=11, m=7, ec_type=_type6,
chksum_type=csum))
_type7 = 'libphazr'
if _type7 in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=4, m=4, ec_type=_type7,
chksum_type=csum))
return pyeclib_drivers
def test_small_encode(self):
pyeclib_drivers = self.get_pyeclib_testspec()
encode_strs = [b"a", b"hello", b"hellohyhi", b"yo"]
for pyeclib_driver in pyeclib_drivers:
for encode_str in encode_strs:
encoded_fragments = pyeclib_driver.encode(encode_str)
decoded_str = pyeclib_driver.decode(encoded_fragments)
self.assertTrue(decoded_str == encode_str)
def test_encode_invalid_params(self):
pyeclib_drivers = self.get_pyeclib_testspec()
encode_args = [u'\U0001F0A1', 3, object(), None, {}, []]
for pyeclib_driver in pyeclib_drivers:
for encode_str in encode_args:
with self.assertRaises(ECInvalidParameter):
pyeclib_driver.encode(encode_str)
def test_attribute_error_in_the_error_handling(self):
pyeclib_drivers = self.get_pyeclib_testspec()
self.assertGreater(len(pyeclib_drivers), 0)
pyeclib_driver = pyeclib_drivers[0]
del pyeclib.ec_iface.ECInvalidParameter
try:
with self.assertRaises(AttributeError): # !!
pyeclib_driver.encode(3)
finally:
pyeclib.ec_iface.ECInvalidParameter = ECInvalidParameter
def test_import_error_in_the_error_handling(self):
pyeclib_drivers = self.get_pyeclib_testspec()
self.assertGreater(len(pyeclib_drivers), 0)
pyeclib_driver = pyeclib_drivers[0]
from six.moves import builtins
real_import = builtins.__import__
def fake_import(*a, **kw):
raise ImportError
builtins.__import__ = fake_import
try:
with self.assertRaises(ImportError): # !!
pyeclib_driver.encode(3)
finally:
builtins.__import__ = real_import
def test_decode_reconstruct_with_fragment_iterator(self):
pyeclib_drivers = self.get_pyeclib_testspec()
encode_strs = [b"a", b"hello", b"hellohyhi", b"yo"]
for pyeclib_driver in pyeclib_drivers:
for encode_str in encode_strs:
encoded_fragments = pyeclib_driver.encode(encode_str)
idxs_to_remove = random.sample(range(
pyeclib_driver.k + pyeclib_driver.m), 2)
available_fragments = encoded_fragments[:]
for i in sorted(idxs_to_remove, reverse=True):
available_fragments.pop(i)
frag_iter = iter(available_fragments)
decoded_str = pyeclib_driver.decode(frag_iter)
self.assertEqual(decoded_str, encode_str)
# Since the iterator is exhausted, we can't decode again
with self.assertRaises(ECDriverError) as exc_mgr:
pyeclib_driver.decode(frag_iter)
self.assertEqual(
'Invalid fragment payload in ECPyECLibDriver.decode',
str(exc_mgr.exception))
frag_iter = iter(available_fragments)
reconstructed_fragments = pyeclib_driver.reconstruct(
frag_iter, idxs_to_remove)
self.assertEqual(len(reconstructed_fragments),
len(idxs_to_remove))
for i, data in zip(idxs_to_remove, reconstructed_fragments):
self.assertEqual(data, encoded_fragments[i])
# Since the iterator is exhausted, we can't decode again
with self.assertRaises(ECDriverError) as exc_mgr:
pyeclib_driver.reconstruct(frag_iter, idxs_to_remove)
self.assertEqual(
'Invalid fragment payload in ECPyECLibDriver.reconstruct',
str(exc_mgr.exception))
def check_metadata_formatted(self, k, m, ec_type, chksum_type):
if ec_type not in VALID_EC_TYPES:
return
filesize = 1024 * 1024 * 3
file_str = ''.join(random.choice(string.ascii_letters)
for i in range(filesize))
file_bytes = file_str.encode('utf-8')
pyeclib_driver = ECDriver(k=k, m=m, ec_type=ec_type,
chksum_type=chksum_type)
fragments = pyeclib_driver.encode(file_bytes)
f = 0
for fragment in fragments:
metadata = pyeclib_driver.get_metadata(fragment, 1)
if 'index' in metadata:
self.assertEqual(metadata['index'], f)
else:
self.assertTrue(False)
if 'chksum_mismatch' in metadata:
self.assertEqual(metadata['chksum_mismatch'], 0)
else:
self.assertTrue(False)
if 'backend_id' in metadata:
self.assertEqual(metadata['backend_id'], ec_type)
else:
self.assertTrue(False)
if 'orig_data_size' in metadata:
self.assertEqual(metadata['orig_data_size'], 3145728)
else:
self.assertTrue(False)
if 'chksum_type' in metadata:
self.assertEqual(metadata['chksum_type'], 'crc32')
else:
self.assertTrue(False)
if 'backend_version' not in metadata:
self.assertTrue(False)
if 'chksum' not in metadata:
self.assertTrue(False)
if 'size' not in metadata:
self.assertTrue(False)
f += 1
def test_get_metadata_formatted(self):
self.check_metadata_formatted(12, 2, "jerasure_rs_vand",
"inline_crc32")
self.check_metadata_formatted(12, 2, "liberasurecode_rs_vand",
"inline_crc32")
self.check_metadata_formatted(8, 4, "liberasurecode_rs_vand",
"inline_crc32")
def test_verify_fragment_inline_chksum_fail(self):
pyeclib_drivers = self.get_pyeclib_testspec("inline_crc32")
filesize = 1024 * 1024 * 3
file_str = ''.join(random.choice(string.ascii_letters)
for i in range(filesize))
file_bytes = file_str.encode('utf-8')
for pyeclib_driver in pyeclib_drivers:
fragments = pyeclib_driver.encode(file_bytes)
fragment_metadata_list = []
first_fragment_to_corrupt = random.randint(0, len(fragments))
num_to_corrupt = 2
fragments_to_corrupt = [
(first_fragment_to_corrupt + i) % len(fragments)
for i in range(num_to_corrupt + 1)]
fragments_to_corrupt.sort()
i = 0
for fragment in fragments:
if i in fragments_to_corrupt:
corrupted_fragment = fragment[:100] +\
(str(chr((b2i(fragment[100]) + 0x1)
% 128))).encode('utf-8') + fragment[101:]
fragment_metadata_list.append(
pyeclib_driver.get_metadata(corrupted_fragment))
else:
fragment_metadata_list.append(
pyeclib_driver.get_metadata(fragment))
i += 1
expected_ret_value = {"status": -206,
"reason": "Bad checksum",
"bad_fragments": fragments_to_corrupt}
self.assertEqual(
pyeclib_driver.verify_stripe_metadata(fragment_metadata_list),
expected_ret_value)
def test_verify_fragment_inline_chksum_succeed(self):
pyeclib_drivers = self.get_pyeclib_testspec("inline_crc32")
filesize = 1024 * 1024 * 3
file_str = ''.join(random.choice(string.ascii_letters)
for i in range(filesize))
file_bytes = file_str.encode('utf-8')
for pyeclib_driver in pyeclib_drivers:
fragments = pyeclib_driver.encode(file_bytes)
fragment_metadata_list = []
for fragment in fragments:
fragment_metadata_list.append(
pyeclib_driver.get_metadata(fragment))
expected_ret_value = {"status": 0}
self.assertTrue(pyeclib_driver.verify_stripe_metadata(
fragment_metadata_list) == expected_ret_value)
def test_get_segment_byterange_info(self):
pyeclib_drivers = self.get_pyeclib_testspec()
if not pyeclib_drivers:
return
file_size = 1024 * 1024
segment_size = 3 * 1024
ranges = [
(0, 1),
(1, 12),
(10, 1000),
(0, segment_size - 1),
(1, segment_size + 1),
(segment_size - 1, 2 * segment_size)]
expected_results = {}
expected_results[(0, 1)] = {0: (0, 1)}
expected_results[(1, 12)] = {0: (1, 12)}
expected_results[(10, 1000)] = {0: (10, 1000)}
expected_results[(0, segment_size - 1)] = {0: (0, segment_size - 1)}
expected_results[(1, segment_size + 1)
] = {0: (1, segment_size - 1), 1: (0, 1)}
expected_results[
(segment_size - 1, 2 * segment_size)] = {
0: (segment_size - 1, segment_size - 1),
1: (0, segment_size - 1),
2: (0, 0)}
results = pyeclib_drivers[0].get_segment_info_byterange(
ranges, file_size, segment_size)
for exp_result_key in expected_results:
self.assertIn(exp_result_key, results)
self.assertTrue(
len(results[exp_result_key]) ==
len(expected_results[exp_result_key]))
exp_result_map = expected_results[exp_result_key]
for segment_key in exp_result_map:
self.assertIn(segment_key, results[exp_result_key])
self.assertTrue(
results[exp_result_key][segment_key] ==
expected_results[exp_result_key][segment_key])
def test_get_segment_info(self):
pyeclib_drivers = self.get_pyeclib_testspec()
file_sizes = [
1024 * 1024,
2 * 1024 * 1024,
10 * 1024 * 1024,
10 * 1024 * 1024 + 7]
segment_sizes = [3 * 1024, 1024 * 1024]
segment_strings = {}
#
# Generate some test segments for each segment size.
# Use 2 * segment size, because last segment may be
# greater than segment_size
#
char_set = string.ascii_uppercase + string.digits
for segment_size in segment_sizes:
segment_strings[segment_size] = ''.join(
random.choice(char_set) for i in range(segment_size * 2))
for pyeclib_driver in pyeclib_drivers:
for file_size in file_sizes:
for segment_size in segment_sizes:
#
# Compute the segment info
#
segment_info = pyeclib_driver.get_segment_info(
file_size,
segment_size)
num_segments = segment_info['num_segments']
segment_size = segment_info['segment_size']
fragment_size = segment_info['fragment_size']
last_segment_size = segment_info['last_segment_size']
last_fragment_size = segment_info['last_fragment_size']
computed_file_size = (
(num_segments - 1) * segment_size) + last_segment_size
#
# Verify that the segment sizes add up
#
self.assertTrue(computed_file_size == file_size)
encoded_fragments = pyeclib_driver.encode(
(segment_strings[segment_size][
:segment_size]).encode('utf-8'))
#
# Verify the fragment size
#
self.assertTrue(fragment_size == len(encoded_fragments[0]))
if last_segment_size > 0:
encoded_fragments = pyeclib_driver.encode(
(segment_strings[segment_size][
:last_segment_size]).encode('utf-8'))
#
# Verify the last fragment size, if there is one
#
self.assertTrue(
last_fragment_size == len(
encoded_fragments[0]))
def test_greedy_decode_reconstruct_combination(self):
# the testing spec defined at get_pyeclib_testspec() method
# and if you want to test either other parameters or backends,
# you can add the spec you want to test there.
pyeclib_drivers = self.get_pyeclib_testspec()
orig_data = os.urandom(1024 ** 2)
for pyeclib_driver in pyeclib_drivers:
encoded = pyeclib_driver.encode(orig_data)
# make all fragment like (index, frag_data) format to feed
# to combinations
frags = [(i, frag) for i, frag in enumerate(encoded)]
num_frags = pyeclib_driver.k + pyeclib_driver.m
if pyeclib_driver.ec_type == PyECLib_EC_Types.flat_xor_hd:
# flat_xord_hd is guaranteed to work with 2 or 3 failures
tolerable_failures = pyeclib_driver.hd - 1
else:
# ... while others can tolerate more
tolerable_failures = pyeclib_driver.m
for check_frags_tuples in combinations(
frags, num_frags - tolerable_failures):
# extract check_frags_tuples from [(index, data bytes), ...]
# to [index, index, ...] and [data bytes, data bytes, ...]
indexes, check_frags = zip(*check_frags_tuples)
decoded = pyeclib_driver.decode(check_frags)
self.assertEqual(
orig_data, decoded,
"assertion fail in decode %s from:%s" %
(pyeclib_driver, indexes))
holes = [index for index in range(num_frags) if
index not in indexes]
for hole in holes:
reconed = pyeclib_driver.reconstruct(
check_frags, [hole])[0]
self.assertEqual(
frags[hole][1], reconed,
"assertion fail in reconstruct %s target:%s "
"from:%s" % (pyeclib_driver, hole, indexes))
def test_rs(self):
pyeclib_drivers = self.get_pyeclib_testspec()
for pyeclib_driver in pyeclib_drivers:
for file_size in self.file_sizes:
tmp_file = self.files[file_size]
tmp_file.seek(0)
whole_file_bytes = tmp_file.read()
encode_input = whole_file_bytes
orig_fragments = pyeclib_driver.encode(encode_input)
for iter in range(self.num_iterations):
num_missing = 2
idxs_to_remove = random.sample(range(
pyeclib_driver.k + pyeclib_driver.m), num_missing)
fragments = orig_fragments[:]
# Reverse sort the list, so we can always
# remove from the original index
idxs_to_remove.sort(reverse=True)
for idx in idxs_to_remove:
fragments.pop(idx)
#
# Test decoder
#
decoded_string = pyeclib_driver.decode(fragments)
self.assertTrue(encode_input == decoded_string)
#
# Test reconstructor
#
reconstructed_fragments = pyeclib_driver.reconstruct(
fragments,
idxs_to_remove)
self.assertEqual(len(reconstructed_fragments),
len(idxs_to_remove))
for idx, frag_data in zip(idxs_to_remove,
reconstructed_fragments):
self.assertEqual(
frag_data, orig_fragments[idx],
'Failed to reconstruct fragment %d!' % idx)
#
# Test decode with integrity checks
#
first_fragment_to_corrupt = random.randint(
0, len(fragments))
num_to_corrupt = min(len(fragments), pyeclib_driver.m + 1)
fragments_to_corrupt = [
(first_fragment_to_corrupt + i) % len(fragments)
for i in range(num_to_corrupt)]
i = 0
for fragment in fragments:
if i in fragments_to_corrupt:
corrupted_fragment = (
"0" * len(fragment)).encode('utf-8')
fragments[i] = corrupted_fragment
i += 1
self.assertRaises(ECInvalidFragmentMetadata,
pyeclib_driver.decode,
fragments, force_metadata_checks=True)
def get_available_backend(self, k, m, ec_type, chksum_type="inline_crc32"):
if ec_type[:11] == "flat_xor_hd":
return ECDriver(k=k, m=m, ec_type="flat_xor_hd",
chksum_type=chksum_type)
elif ec_type in VALID_EC_TYPES:
return ECDriver(k=k, m=m, ec_type=ec_type, chksum_type=chksum_type)
else:
return None
def test_liberasurecode_insufficient_frags_error(self):
file_size = self.file_sizes[0]
tmp_file = self.files[file_size]
tmp_file.seek(0)
whole_file_bytes = tmp_file.read()
for ec_type in ['flat_xor_hd_3', 'liberasurecode_rs_vand']:
if ec_type in VALID_EC_TYPES:
pyeclib_driver = self.get_available_backend(
k=10, m=5, ec_type=ec_type)
fragments = pyeclib_driver.encode(whole_file_bytes)
self.assertRaises(ECInsufficientFragments,
pyeclib_driver.reconstruct,
[fragments[0]], [1, 2, 3, 4, 5, 6])
def test_min_parity_fragments_needed(self):
pyeclib_drivers = []
for ec_type in ['flat_xor_hd_3', 'liberasurecode_rs_vand']:
if ec_type in VALID_EC_TYPES:
pyeclib_drivers.append(ECDriver(k=10, m=5, ec_type=ec_type))
self.assertTrue(
pyeclib_drivers[0].min_parity_fragments_needed() == 1)
def test_pyeclib_driver_repr_expression(self):
pyeclib_drivers = self.get_pyeclib_testspec()
for driver in pyeclib_drivers:
if driver.ec_type.name == 'flat_xor_hd':
name = 'flat_xor_hd_%s' % driver.hd
else:
name = driver.ec_type.name
self.assertEqual(
"ECDriver(ec_type='%s', k=%s, m=%s)" %
(name, driver.k, driver.m), repr(driver))
def test_get_segment_info_memory_usage(self):
for ec_driver in self.get_pyeclib_testspec():
self._test_get_segment_info_memory_usage(ec_driver)
def _test_get_segment_info_memory_usage(self, ec_driver):
# 1. Preapre the expected memory allocation
ec_driver.get_segment_info(1024 * 1024, 1024 * 1024)
loop_range = range(1000)
# 2. Get current memory usage
usage = resource.getrusage(resource.RUSAGE_SELF)[2]
# 3. Loop to call get_segment_info
for x in loop_range:
ec_driver.get_segment_info(1024 * 1024, 1024 * 1024)
# 4. memory usage shoudln't be increased
self.assertEqual(usage, resource.getrusage(resource.RUSAGE_SELF)[2],
'Memory usage is increased unexpectedly %s - %s' %
(usage, resource.getrusage(resource.RUSAGE_SELF)[2]))
def test_get_metadata_memory_usage(self):
for ec_driver in self.get_pyeclib_testspec():
self._test_get_metadata_memory_usage(ec_driver)
def _test_get_metadata_memory_usage(self, ec_driver):
# 1. Prepare the expected memory allocation
encoded = ec_driver.encode(b'aaa')
ec_driver.get_metadata(encoded[0], formatted=True)
loop_range = range(400000)
# 2. Get current memory usage
baseline_usage = resource.getrusage(resource.RUSAGE_SELF)[2]
# 3. Loop to call get_metadata
for x in loop_range:
ec_driver.get_metadata(encoded[0], formatted=True)
# 4. memory usage shouldn't increase
new_usage = resource.getrusage(resource.RUSAGE_SELF)[2]
self.assertEqual(baseline_usage, new_usage,
'Memory usage is increased unexpectedly %s -> %s' %
(baseline_usage, new_usage))
class BackendsEnabledMetaclass(type):
def __new__(meta, cls_name, cls_bases, cls_dict):
for ec_type in ALL_EC_TYPES:
def dummy(self, ec_type=ec_type):
if ec_type not in VALID_EC_TYPES:
raise unittest.SkipTest
if ec_type == 'shss':
k = 10
m = 4
elif ec_type == 'libphazr':
k = 4
m = 4
else:
k = 10
m = 5
ECDriver(k=k, m=m, ec_type=ec_type)
dummy.__name__ = 'test_%s_available' % ec_type
cls_dict[dummy.__name__] = dummy
return type.__new__(meta, cls_name, cls_bases, cls_dict)
class TestBackendsEnabled(six.with_metaclass(BackendsEnabledMetaclass,
unittest.TestCase)):
'''
Based on TestPyECLibDriver.test_valid_algo above, but these tests
should *always* either pass or skip.
'''
if __name__ == '__main__':
unittest.main()