Merge "Better optimistic lock in get_hashes"
This commit is contained in:
commit
fc09dda92b
@ -31,6 +31,7 @@ are also not considered part of the backend API.
|
||||
"""
|
||||
|
||||
import six.moves.cPickle as pickle
|
||||
import copy
|
||||
import errno
|
||||
import fcntl
|
||||
import json
|
||||
@ -42,7 +43,7 @@ import hashlib
|
||||
import logging
|
||||
import traceback
|
||||
import xattr
|
||||
from os.path import basename, dirname, exists, getmtime, join, splitext
|
||||
from os.path import basename, dirname, exists, join, splitext
|
||||
from random import shuffle
|
||||
from tempfile import mkstemp
|
||||
from contextlib import contextmanager
|
||||
@ -232,6 +233,48 @@ def quarantine_renamer(device_path, corrupted_file_path):
|
||||
return to_dir
|
||||
|
||||
|
||||
def read_hashes(partition_dir):
|
||||
"""
|
||||
Read the existing hashes.pkl
|
||||
|
||||
:returns: a dict, the suffix hashes (if any), the key 'valid' will be False
|
||||
if hashes.pkl is corrupt, cannot be read or does not exist
|
||||
"""
|
||||
hashes_file = join(partition_dir, HASH_FILE)
|
||||
hashes = {'valid': False}
|
||||
try:
|
||||
with open(hashes_file, 'rb') as hashes_fp:
|
||||
pickled_hashes = hashes_fp.read()
|
||||
except (IOError, OSError):
|
||||
pass
|
||||
else:
|
||||
try:
|
||||
hashes = pickle.loads(pickled_hashes)
|
||||
except Exception:
|
||||
# pickle.loads() can raise a wide variety of exceptions when
|
||||
# given invalid input depending on the way in which the
|
||||
# input is invalid.
|
||||
pass
|
||||
# hashes.pkl w/o valid updated key is "valid" but "forever old"
|
||||
hashes.setdefault('valid', True)
|
||||
hashes.setdefault('updated', -1)
|
||||
return hashes
|
||||
|
||||
|
||||
def write_hashes(partition_dir, hashes):
|
||||
"""
|
||||
Write hashes to hashes.pkl
|
||||
|
||||
The updated key is added to hashes before it is written.
|
||||
"""
|
||||
hashes_file = join(partition_dir, HASH_FILE)
|
||||
# 'valid' key should always be set by the caller; however, if there's a bug
|
||||
# setting invalid is most safe
|
||||
hashes.setdefault('valid', False)
|
||||
hashes['updated'] = time.time()
|
||||
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
|
||||
|
||||
|
||||
def consolidate_hashes(partition_dir):
|
||||
"""
|
||||
Take what's in hashes.pkl and hashes.invalid, combine them, write the
|
||||
@ -258,41 +301,23 @@ def consolidate_hashes(partition_dir):
|
||||
return None
|
||||
|
||||
with lock_path(partition_dir):
|
||||
try:
|
||||
with open(hashes_file, 'rb') as hashes_fp:
|
||||
pickled_hashes = hashes_fp.read()
|
||||
except (IOError, OSError):
|
||||
hashes = {}
|
||||
else:
|
||||
try:
|
||||
hashes = pickle.loads(pickled_hashes)
|
||||
except Exception:
|
||||
# pickle.loads() can raise a wide variety of exceptions when
|
||||
# given invalid input depending on the way in which the
|
||||
# input is invalid.
|
||||
hashes = None
|
||||
hashes = read_hashes(partition_dir)
|
||||
|
||||
modified = False
|
||||
found_invalidation_entry = False
|
||||
try:
|
||||
with open(invalidations_file, 'rb') as inv_fh:
|
||||
for line in inv_fh:
|
||||
found_invalidation_entry = True
|
||||
suffix = line.strip()
|
||||
if hashes is not None and \
|
||||
hashes.get(suffix, '') is not None:
|
||||
hashes[suffix] = None
|
||||
modified = True
|
||||
except (IOError, OSError) as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
if modified:
|
||||
write_pickle(hashes, hashes_file, partition_dir, PICKLE_PROTOCOL)
|
||||
|
||||
if found_invalidation_entry:
|
||||
write_hashes(partition_dir, hashes)
|
||||
# Now that all the invalidations are reflected in hashes.pkl, it's
|
||||
# safe to clear out the invalidations file.
|
||||
if found_invalidation_entry:
|
||||
with open(invalidations_file, 'wb') as inv_fh:
|
||||
pass
|
||||
|
||||
@ -1010,7 +1035,13 @@ class BaseDiskFileManager(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _get_hashes(self, partition_path, recalculate=None, do_listdir=False):
|
||||
def _get_hashes(self, *args, **kwargs):
|
||||
hashed, hashes = self.__get_hashes(*args, **kwargs)
|
||||
hashes.pop('updated', None)
|
||||
hashes.pop('valid', None)
|
||||
return hashed, hashes
|
||||
|
||||
def __get_hashes(self, partition_path, recalculate=None, do_listdir=False):
|
||||
"""
|
||||
Get hashes for each suffix dir in a partition. do_listdir causes it to
|
||||
mistrust the hash cache for suffix existence at the (unexpectedly high)
|
||||
@ -1027,31 +1058,39 @@ class BaseDiskFileManager(object):
|
||||
hashed = 0
|
||||
hashes_file = join(partition_path, HASH_FILE)
|
||||
modified = False
|
||||
force_rewrite = False
|
||||
hashes = {}
|
||||
mtime = -1
|
||||
orig_hashes = {'valid': False}
|
||||
|
||||
if recalculate is None:
|
||||
recalculate = []
|
||||
|
||||
try:
|
||||
mtime = getmtime(hashes_file)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
try:
|
||||
hashes = self.consolidate_hashes(partition_path)
|
||||
orig_hashes = self.consolidate_hashes(partition_path)
|
||||
except Exception:
|
||||
self.logger.warning('Unable to read %r', hashes_file,
|
||||
exc_info=True)
|
||||
|
||||
if orig_hashes is None:
|
||||
# consolidate_hashes returns None if hashes.pkl does not exist
|
||||
orig_hashes = {'valid': False}
|
||||
if not orig_hashes['valid']:
|
||||
# This is the only path to a valid hashes from invalid read (e.g.
|
||||
# does not exist, corrupt, etc.). Moreover, in order to write this
|
||||
# valid hashes we must read *the exact same* invalid state or we'll
|
||||
# trigger race detection.
|
||||
do_listdir = True
|
||||
force_rewrite = True
|
||||
hashes = {'valid': True}
|
||||
# If the exception handling around consolidate_hashes fired we're
|
||||
# going to do a full rehash regardless; but we need to avoid
|
||||
# needless recursion if the on-disk hashes.pkl is actually readable
|
||||
# (worst case is consolidate_hashes keeps raising exceptions and we
|
||||
# eventually run out of stack).
|
||||
# N.B. orig_hashes invalid only effects new parts and error/edge
|
||||
# conditions - so try not to get overly caught up trying to
|
||||
# optimize it out unless you manage to convince yourself there's a
|
||||
# bad behavior.
|
||||
orig_hashes = read_hashes(partition_path)
|
||||
else:
|
||||
if hashes is None: # no hashes.pkl file; let's build it
|
||||
do_listdir = True
|
||||
force_rewrite = True
|
||||
hashes = {}
|
||||
hashes = copy.deepcopy(orig_hashes)
|
||||
|
||||
if do_listdir:
|
||||
for suff in os.listdir(partition_path):
|
||||
@ -1073,12 +1112,10 @@ class BaseDiskFileManager(object):
|
||||
modified = True
|
||||
if modified:
|
||||
with lock_path(partition_path):
|
||||
if force_rewrite or not exists(hashes_file) or \
|
||||
getmtime(hashes_file) == mtime:
|
||||
write_pickle(
|
||||
hashes, hashes_file, partition_path, PICKLE_PROTOCOL)
|
||||
if read_hashes(partition_path) == orig_hashes:
|
||||
write_hashes(partition_path, hashes)
|
||||
return hashed, hashes
|
||||
return self._get_hashes(partition_path, recalculate, do_listdir)
|
||||
return self.__get_hashes(partition_path, recalculate, do_listdir)
|
||||
else:
|
||||
return hashed, hashes
|
||||
|
||||
|
@ -39,8 +39,7 @@ from gzip import GzipFile
|
||||
import pyeclib.ec_iface
|
||||
|
||||
from eventlet import hubs, timeout, tpool
|
||||
from swift.obj.diskfile import (MD5_OF_EMPTY_STRING, update_auditor_status,
|
||||
write_pickle)
|
||||
from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status
|
||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||
patch_policies, debug_logger, EMPTY_ETAG,
|
||||
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
|
||||
@ -5726,7 +5725,7 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
with self.policy_in_message():
|
||||
unittest.TestCase.assertEqual(self, *args)
|
||||
|
||||
def get_different_suffix_df(self, df):
|
||||
def get_different_suffix_df(self, df, **kwargs):
|
||||
# returns diskfile in the same partition with different suffix
|
||||
suffix_dir = os.path.dirname(df._datadir)
|
||||
for i in itertools.count():
|
||||
@ -5736,7 +5735,8 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
df._account,
|
||||
df._container,
|
||||
'o%d' % i,
|
||||
policy=df.policy)
|
||||
policy=df.policy,
|
||||
**kwargs)
|
||||
suffix_dir2 = os.path.dirname(df2._datadir)
|
||||
if suffix_dir != suffix_dir2:
|
||||
return df2
|
||||
@ -6085,7 +6085,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
self.assertTrue(os.path.exists(hashes_file))
|
||||
self.assertIn(os.path.basename(suffix_dir), hashes)
|
||||
with open(hashes_file) as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
found_hashes.pop('updated')
|
||||
self.assertTrue(found_hashes.pop('valid'))
|
||||
self.assertEqual(hashes, found_hashes)
|
||||
# ... and truncates the invalidations file
|
||||
with open(inv_file) as f:
|
||||
self.assertEqual('', f.read().strip('\n'))
|
||||
@ -6182,29 +6185,11 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIn(suffix2, hashes)
|
||||
|
||||
@mock.patch('swift.obj.diskfile.getmtime')
|
||||
@mock.patch('swift.obj.diskfile.write_pickle')
|
||||
def test_contains_hashes_of_existing_partition(self, mock_write_pickle,
|
||||
mock_getmtime):
|
||||
def test_hash_invalidations_survive_racing_get_hashes_diff_suffix(self):
|
||||
# get_hashes must repeat path listing and return all hashes when
|
||||
# another concurrent process created new pkl before hashes are stored
|
||||
# by the first process
|
||||
non_local = {}
|
||||
|
||||
def mock_write_pickle_def(*args, **kwargs):
|
||||
if 'mtime' not in non_local:
|
||||
non_local['mtime'] = time()
|
||||
non_local['mtime'] += 1
|
||||
write_pickle(*args, **kwargs)
|
||||
|
||||
def mock_getmtime_def(filename):
|
||||
if 'mtime' not in non_local:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
return non_local['mtime']
|
||||
|
||||
mock_write_pickle.side_effect = mock_write_pickle_def
|
||||
mock_getmtime.side_effect = mock_getmtime_def
|
||||
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
# force hashes.pkl to exist; when it does not exist that's fine,
|
||||
@ -6231,18 +6216,121 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
if not non_local['df2touched']:
|
||||
non_local['df2touched'] = True
|
||||
df2.delete(self.ts())
|
||||
# simulate pkl update by other process - mtime is updated
|
||||
self.assertIn('mtime', non_local, "hashes.pkl must exist")
|
||||
non_local['mtime'] += 1
|
||||
return result
|
||||
|
||||
with mock.patch('swift.obj.diskfile.os.listdir',
|
||||
mock_listdir):
|
||||
# creates pkl file but leaves invalidation alone
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
|
||||
# suffix2 just sits in the invalidations file
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertNotIn(suffix2, hashes)
|
||||
|
||||
# it'll show up next hash
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIn(suffix2, hashes)
|
||||
|
||||
def test_hash_invalidations_survive_racing_get_hashes_same_suffix(self):
|
||||
# verify that when two processes concurrently call get_hashes, then any
|
||||
# concurrent hash invalidation will survive and be consolidated on a
|
||||
# subsequent call to get_hashes (i.e. ensure first get_hashes process
|
||||
# does not ignore the concurrent hash invalidation that second
|
||||
# get_hashes might have consolidated to hashes.pkl)
|
||||
non_local = {}
|
||||
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
orig_hash_suffix = df_mgr._hash_suffix
|
||||
# create hashes.pkl
|
||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
|
||||
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
suffix_dir = os.path.dirname(df._datadir)
|
||||
suffix = os.path.basename(suffix_dir)
|
||||
part_dir = os.path.dirname(suffix_dir)
|
||||
invalidations_file = os.path.join(
|
||||
part_dir, diskfile.HASH_INVALIDATIONS_FILE)
|
||||
|
||||
non_local['hash'] = None
|
||||
non_local['called'] = False
|
||||
|
||||
# delete will append suffix to hashes.invalid
|
||||
df.delete(self.ts())
|
||||
with open(invalidations_file) as f:
|
||||
self.assertEqual(suffix, f.read().strip('\n')) # sanity
|
||||
hash1 = df_mgr._hash_suffix(suffix_dir)
|
||||
|
||||
def mock_hash_suffix(*args, **kwargs):
|
||||
# after first get_hashes has called _hash_suffix, simulate a
|
||||
# second process invalidating the same suffix, followed by a
|
||||
# third process calling get_hashes and failing (or yielding)
|
||||
# after consolidate_hashes has completed
|
||||
result = orig_hash_suffix(*args, **kwargs)
|
||||
if not non_local['called']:
|
||||
non_local['called'] = True
|
||||
# appends suffix to hashes.invalid
|
||||
df.delete(self.ts())
|
||||
# simulate another process calling get_hashes but failing
|
||||
# after hash invalidation have been consolidated
|
||||
hashes = df_mgr.consolidate_hashes(part_dir)
|
||||
self.assertTrue(hashes['valid'])
|
||||
# get the updated suffix hash...
|
||||
non_local['hash'] = orig_hash_suffix(suffix_dir)
|
||||
return result
|
||||
|
||||
with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix):
|
||||
# creates pkl file and repeats listing when pkl modified
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
|
||||
# first get_hashes should complete with suffix1 state
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIn(suffix2, hashes)
|
||||
# sanity check - the suffix hash has changed...
|
||||
self.assertNotEqual(hash1, non_local['hash'])
|
||||
# the invalidation file has been truncated...
|
||||
with open(invalidations_file, 'r') as f:
|
||||
self.assertEqual('', f.read())
|
||||
# so hashes should have the latest suffix hash...
|
||||
self.assertEqual(hashes[suffix], non_local['hash'])
|
||||
|
||||
def _check_unpickle_error_and_get_hashes_failure(self, existing):
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
if existing:
|
||||
df.delete(self.ts())
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
df.delete(self.ts())
|
||||
part_path = os.path.join(self.devices, 'sda1',
|
||||
diskfile.get_data_dir(policy), '0')
|
||||
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
|
||||
# write a corrupt hashes.pkl
|
||||
open(hashes_file, 'w')
|
||||
# simulate first call to get_hashes failing after attempting to
|
||||
# consolidate hashes
|
||||
with mock.patch('swift.obj.diskfile.os.listdir',
|
||||
side_effect=Exception()):
|
||||
self.assertRaises(
|
||||
Exception, df_mgr.get_hashes, 'sda1', '0', [], policy)
|
||||
# sanity on-disk state is invalid
|
||||
with open(hashes_file) as f:
|
||||
found_hashes = pickle.load(f)
|
||||
found_hashes.pop('updated')
|
||||
self.assertEqual(False, found_hashes.pop('valid'))
|
||||
# verify subsequent call to get_hashes reaches correct outcome
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertEqual([], df_mgr.logger.get_lines_for_level('warning'))
|
||||
|
||||
def test_unpickle_error_and_get_hashes_failure_new_part(self):
|
||||
self._check_unpickle_error_and_get_hashes_failure(False)
|
||||
|
||||
def test_unpickle_error_and_get_hashes_failure_existing_part(self):
|
||||
self._check_unpickle_error_and_get_hashes_failure(True)
|
||||
|
||||
def test_invalidate_hash_consolidation(self):
|
||||
def assert_consolidation(suffixes):
|
||||
@ -6254,7 +6342,9 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIsNone(hashes[suffix])
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
self.assertTrue(hashes['valid'])
|
||||
self.assertEqual(hashes, found_hashes)
|
||||
with open(invalidations_file, 'rb') as f:
|
||||
self.assertEqual("", f.read())
|
||||
return hashes
|
||||
@ -6278,7 +6368,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
invalidations_file = os.path.join(
|
||||
part_path, diskfile.HASH_INVALIDATIONS_FILE)
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(original_hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
found_hashes.pop('updated')
|
||||
self.assertTrue(found_hashes.pop('valid'))
|
||||
self.assertEqual(original_hashes, found_hashes)
|
||||
|
||||
# invalidate the hash
|
||||
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
|
||||
@ -6289,7 +6382,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
self.assertEqual(suffix + "\n", f.read())
|
||||
# hashes file is unchanged
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(original_hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
found_hashes.pop('updated')
|
||||
self.assertTrue(found_hashes.pop('valid'))
|
||||
self.assertEqual(original_hashes, found_hashes)
|
||||
|
||||
# consolidate the hash and the invalidations
|
||||
hashes = assert_consolidation([suffix])
|
||||
@ -6305,7 +6401,9 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
self.assertEqual(suffix2 + "\n", f.read())
|
||||
# hashes file is not yet changed
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
self.assertTrue(hashes['valid'])
|
||||
self.assertEqual(hashes, found_hashes)
|
||||
|
||||
# consolidate hashes
|
||||
hashes = assert_consolidation([suffix, suffix2])
|
||||
@ -6318,10 +6416,43 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
self.assertEqual("%s\n%s\n" % (suffix2, suffix2), f.read())
|
||||
# hashes file is not yet changed
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
self.assertTrue(hashes['valid'])
|
||||
self.assertEqual(hashes, found_hashes)
|
||||
# consolidate hashes
|
||||
assert_consolidation([suffix, suffix2])
|
||||
|
||||
def test_get_hashes_consolidates_suffix_rehash_once(self):
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
df.delete(self.ts())
|
||||
suffix_dir = os.path.dirname(df._datadir)
|
||||
|
||||
with mock.patch.object(df_mgr, 'consolidate_hashes',
|
||||
side_effect=df_mgr.consolidate_hashes
|
||||
) as mock_consolidate_hashes, \
|
||||
mock.patch.object(df_mgr, '_hash_suffix',
|
||||
side_effect=df_mgr._hash_suffix
|
||||
) as mock_hash_suffix:
|
||||
# creates pkl file
|
||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
mock_consolidate_hashes.assert_called_once()
|
||||
self.assertEqual([mock.call(suffix_dir)],
|
||||
mock_hash_suffix.call_args_list)
|
||||
# second object in path
|
||||
df2 = self.get_different_suffix_df(df)
|
||||
df2.delete(self.ts())
|
||||
suffix_dir2 = os.path.dirname(df2._datadir)
|
||||
mock_consolidate_hashes.reset_mock()
|
||||
mock_hash_suffix.reset_mock()
|
||||
# updates pkl file
|
||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
mock_consolidate_hashes.assert_called_once()
|
||||
self.assertEqual([mock.call(suffix_dir2)],
|
||||
mock_hash_suffix.call_args_list)
|
||||
|
||||
def test_consolidate_hashes_raises_exception(self):
|
||||
# verify that if consolidate_hashes raises an exception then suffixes
|
||||
# are rehashed and a hashes.pkl is written
|
||||
@ -6348,7 +6479,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
|
||||
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
found_hashes.pop('updated')
|
||||
self.assertTrue(found_hashes.pop('valid'))
|
||||
self.assertEqual(hashes, found_hashes)
|
||||
|
||||
# sanity check log warning
|
||||
warnings = self.logger.get_lines_for_level('warning')
|
||||
@ -6367,7 +6501,10 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
diskfile.get_data_dir(policy), '0')
|
||||
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
found_hashes = pickle.load(f)
|
||||
found_hashes.pop('updated')
|
||||
self.assertTrue(found_hashes.pop('valid'))
|
||||
self.assertEqual(hashes, found_hashes)
|
||||
|
||||
# invalidate_hash tests - error handling
|
||||
|
||||
@ -7437,6 +7574,71 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
policy)
|
||||
self.assertEqual(hashes, {})
|
||||
|
||||
def _test_get_hashes_race(self, hash_breaking_function):
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
|
||||
df = df_mgr.get_diskfile(self.existing_device, '0', 'a', 'c',
|
||||
'o', policy=policy, frag_index=3)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
|
||||
df2 = self.get_different_suffix_df(df, frag_index=5)
|
||||
suffix2 = os.path.basename(os.path.dirname(df2._datadir))
|
||||
part_path = os.path.dirname(os.path.dirname(
|
||||
os.path.join(df._datadir)))
|
||||
hashfile_path = os.path.join(part_path, diskfile.HASH_FILE)
|
||||
# create hashes.pkl
|
||||
hashes = df_mgr.get_hashes(self.existing_device, '0', [],
|
||||
policy)
|
||||
self.assertEqual(hashes, {}) # sanity
|
||||
self.assertTrue(os.path.exists(hashfile_path))
|
||||
# and optionally tamper with the hashes.pkl...
|
||||
hash_breaking_function(hashfile_path)
|
||||
non_local = {'called': False}
|
||||
orig_hash_suffix = df_mgr._hash_suffix
|
||||
|
||||
# then create a suffix
|
||||
df.delete(self.ts())
|
||||
|
||||
def mock_hash_suffix(*args, **kwargs):
|
||||
# capture first call to mock_hash
|
||||
if not non_local['called']:
|
||||
non_local['called'] = True
|
||||
df2.delete(self.ts())
|
||||
non_local['other_hashes'] = df_mgr.get_hashes(
|
||||
self.existing_device, '0', [], policy)
|
||||
return orig_hash_suffix(*args, **kwargs)
|
||||
|
||||
with mock.patch.object(df_mgr, '_hash_suffix', mock_hash_suffix):
|
||||
hashes = df_mgr.get_hashes(self.existing_device, '0', [],
|
||||
policy)
|
||||
|
||||
self.assertTrue(non_local['called'])
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIn(suffix2, hashes)
|
||||
|
||||
def test_get_hashes_race_invalid_pickle(self):
|
||||
def hash_breaking_function(hashfile_path):
|
||||
# create a garbage invalid zero-byte file which can not unpickle
|
||||
open(hashfile_path, 'w').close()
|
||||
self._test_get_hashes_race(hash_breaking_function)
|
||||
|
||||
def test_get_hashes_race_new_partition(self):
|
||||
def hash_breaking_function(hashfile_path):
|
||||
# simulate rebalanced part doing post-rsync REPLICATE
|
||||
os.unlink(hashfile_path)
|
||||
part_dir = os.path.dirname(hashfile_path)
|
||||
os.unlink(os.path.join(part_dir, '.lock'))
|
||||
# sanity
|
||||
self.assertEqual([], os.listdir(os.path.dirname(hashfile_path)))
|
||||
self._test_get_hashes_race(hash_breaking_function)
|
||||
|
||||
def test_get_hashes_race_existing_partition(self):
|
||||
def hash_breaking_function(hashfile_path):
|
||||
# no-op - simulate ok existing partition
|
||||
self.assertTrue(os.path.exists(hashfile_path))
|
||||
self._test_get_hashes_race(hash_breaking_function)
|
||||
|
||||
def test_get_hashes_hash_suffix_enotdir(self):
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
@ -7490,37 +7692,125 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
df_mgr = self.df_router[policy]
|
||||
# first create an empty pickle
|
||||
df_mgr.get_hashes(self.existing_device, '0', [], policy)
|
||||
hashes_file = os.path.join(
|
||||
self.devices, self.existing_device,
|
||||
diskfile.get_data_dir(policy), '0', diskfile.HASH_FILE)
|
||||
mtime = os.path.getmtime(hashes_file)
|
||||
non_local = {'mtime': mtime}
|
||||
|
||||
non_local = {'suffix_count': 1}
|
||||
calls = []
|
||||
|
||||
def mock_getmtime(filename):
|
||||
t = non_local['mtime']
|
||||
def mock_read_hashes(filename):
|
||||
rv = {'%03x' % i: 'fake'
|
||||
for i in range(non_local['suffix_count'])}
|
||||
if len(calls) <= 3:
|
||||
# this will make the *next* call get a slightly
|
||||
# newer mtime than the last
|
||||
non_local['mtime'] += 1
|
||||
# this will make the *next* call get slightly
|
||||
# different content
|
||||
non_local['suffix_count'] += 1
|
||||
# track exactly the value for every return
|
||||
calls.append(t)
|
||||
return t
|
||||
with mock.patch('swift.obj.diskfile.getmtime',
|
||||
mock_getmtime):
|
||||
calls.append(dict(rv))
|
||||
rv['valid'] = True
|
||||
return rv
|
||||
with mock.patch('swift.obj.diskfile.read_hashes',
|
||||
mock_read_hashes):
|
||||
df_mgr.get_hashes(self.existing_device, '0', ['123'],
|
||||
policy)
|
||||
|
||||
self.assertEqual(calls, [
|
||||
mtime + 0, # read
|
||||
mtime + 1, # modified
|
||||
mtime + 2, # read
|
||||
mtime + 3, # modifed
|
||||
mtime + 4, # read
|
||||
mtime + 4, # not modifed
|
||||
{'000': 'fake'}, # read
|
||||
{'000': 'fake', '001': 'fake'}, # modification
|
||||
{'000': 'fake', '001': 'fake', '002': 'fake'}, # read
|
||||
{'000': 'fake', '001': 'fake', '002': 'fake',
|
||||
'003': 'fake'}, # modifed
|
||||
{'000': 'fake', '001': 'fake', '002': 'fake',
|
||||
'003': 'fake', '004': 'fake'}, # read
|
||||
{'000': 'fake', '001': 'fake', '002': 'fake',
|
||||
'003': 'fake', '004': 'fake'}, # not modifed
|
||||
])
|
||||
|
||||
|
||||
class TestHashesHelpers(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
self.testdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
rmtree(self.testdir, ignore_errors=1)
|
||||
|
||||
def test_read_legacy_hashes(self):
|
||||
hashes = {'stub': 'fake'}
|
||||
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
|
||||
with open(hashes_file, 'w') as f:
|
||||
pickle.dump(hashes, f)
|
||||
expected = {
|
||||
'stub': 'fake',
|
||||
'updated': -1,
|
||||
'valid': True,
|
||||
}
|
||||
self.assertEqual(expected, diskfile.read_hashes(self.testdir))
|
||||
|
||||
def test_write_hashes_valid_updated(self):
|
||||
hashes = {'stub': 'fake', 'valid': True}
|
||||
now = time()
|
||||
with mock.patch('swift.obj.diskfile.time.time', return_value=now):
|
||||
diskfile.write_hashes(self.testdir, hashes)
|
||||
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
|
||||
with open(hashes_file) as f:
|
||||
data = pickle.load(f)
|
||||
expected = {
|
||||
'stub': 'fake',
|
||||
'updated': now,
|
||||
'valid': True,
|
||||
}
|
||||
self.assertEqual(expected, data)
|
||||
|
||||
def test_write_hashes_invalid_updated(self):
|
||||
hashes = {'valid': False}
|
||||
now = time()
|
||||
with mock.patch('swift.obj.diskfile.time.time', return_value=now):
|
||||
diskfile.write_hashes(self.testdir, hashes)
|
||||
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
|
||||
with open(hashes_file) as f:
|
||||
data = pickle.load(f)
|
||||
expected = {
|
||||
'updated': now,
|
||||
'valid': False,
|
||||
}
|
||||
self.assertEqual(expected, data)
|
||||
|
||||
def test_write_hashes_safe_default(self):
|
||||
hashes = {}
|
||||
now = time()
|
||||
with mock.patch('swift.obj.diskfile.time.time', return_value=now):
|
||||
diskfile.write_hashes(self.testdir, hashes)
|
||||
hashes_file = os.path.join(self.testdir, diskfile.HASH_FILE)
|
||||
with open(hashes_file) as f:
|
||||
data = pickle.load(f)
|
||||
expected = {
|
||||
'updated': now,
|
||||
'valid': False,
|
||||
}
|
||||
self.assertEqual(expected, data)
|
||||
|
||||
def test_read_write_valid_hashes_mutation_and_transative_equality(self):
|
||||
hashes = {'stub': 'fake', 'valid': True}
|
||||
diskfile.write_hashes(self.testdir, hashes)
|
||||
# write_hashes mutates the passed in hashes, it adds the updated key
|
||||
self.assertIn('updated', hashes)
|
||||
self.assertTrue(hashes['valid'])
|
||||
result = diskfile.read_hashes(self.testdir)
|
||||
# unpickling result in a new object
|
||||
self.assertNotEqual(id(hashes), id(result))
|
||||
# with the exactly the same value mutation from write_hashes
|
||||
self.assertEqual(hashes, result)
|
||||
|
||||
def test_read_write_invalid_hashes_mutation_and_transative_equality(self):
|
||||
hashes = {'valid': False}
|
||||
diskfile.write_hashes(self.testdir, hashes)
|
||||
# write_hashes mutates the passed in hashes, it adds the updated key
|
||||
self.assertIn('updated', hashes)
|
||||
self.assertFalse(hashes['valid'])
|
||||
result = diskfile.read_hashes(self.testdir)
|
||||
# unpickling result in a new object
|
||||
self.assertNotEqual(id(hashes), id(result))
|
||||
# with the exactly the same value mutation from write_hashes
|
||||
self.assertEqual(hashes, result)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
Reference in New Issue
Block a user