Fix race in new partitions detecting new/invalid suffixes.
The assumption that we don't need to write an entry in the invalidations file when the hashes.pkl does not exist turned out to be a premature optimization and also wrong. Primarily we should recognize the creation of hashes.pkl is the first thing that happens in a part when it lands on a new primary. The code should be optimized toward the assumption of the most common disk state. Also, in this case the extra stat calls to check if the hashes.pkl exists were not only un-optimized - but introducing a race. Consider the common case: proc 1 | proc 2 -------------------------------|--------------------------- a) read then truncate journal | b) do work | c) append to journal d) apply "a" to index | The index written at "d" may not (yet) reflect the entry writen by proc 2 at "c"; however, it's clearly in the journal so it's easy to see we're safe. Adding in the extra stat call for the index existence check increases the state which can effect correctness. proc 1 | proc 2 ------------------------------|--------------------------- a) no index, truncate journal | b) do work | b) iff index exists | c) append to journal d) apply (or create) index | If step "c" doesn't happen because the index does not yet exist - the update is clearly lost. In our case we'd skip marking a suffix as invalid when the hashes.pkl does not exist because we know "the next time we rehash" we'll have to os.listdir suffixes anyway. But if another process is *currently* rehashing (and has already done it's os.listdir) instead we've just dropped an invalidation on the floor. Don't do that. Instead - write down the invalidation. The running rehash is welcome to proceed on outdated information - as long as the next pass will grab and hash the new suffix. Known-Issue(s): If the suffix already exists there's an even chance the running rehash will hash in the very update for which we want to invalidate the suffix, but that's ok it's idempotent. Co-Author: Pavel Kvasnička <pavel.kvasnicka@firma.seznam.cz> Co-Author: Alistair Coles <alistair.coles@hpe.com> Co-Author: Kota Tsuyuzaki <tsuyuzaki.kota@lab.ntt.co.jp> Related-Change-Id: I64cadb1a3feb4d819d545137eecfc295389794f0 Change-Id: I2b48238d9d684e831d9777a7b18f91a3cef57cd1 Closes-Bug: #1651530
This commit is contained in:
parent
ffd099c26a
commit
442cc1d16d
@ -309,10 +309,6 @@ def invalidate_hash(suffix_dir):
|
||||
|
||||
suffix = basename(suffix_dir)
|
||||
partition_dir = dirname(suffix_dir)
|
||||
hashes_file = join(partition_dir, HASH_FILE)
|
||||
if not os.path.exists(hashes_file):
|
||||
return
|
||||
|
||||
invalidations_file = join(partition_dir, HASH_INVALIDATIONS_FILE)
|
||||
with lock_path(partition_dir):
|
||||
with open(invalidations_file, 'ab') as inv_fh:
|
||||
|
@ -830,7 +830,9 @@ class TestAuditor(unittest.TestCase):
|
||||
# create tombstone and hashes.pkl file, ensuring the tombstone is not
|
||||
# reclaimed by mocking time to be the tombstone time
|
||||
with mock.patch('time.time', return_value=float(ts_tomb)):
|
||||
# this delete will create a invalid hashes entry
|
||||
self.disk_file.delete(ts_tomb)
|
||||
# this get_hashes call will truncate the invalid hashes entry
|
||||
self.disk_file.manager.get_hashes(
|
||||
self.devices + '/sda', '0', [], self.disk_file.policy)
|
||||
suffix = basename(dirname(self.disk_file._datadir))
|
||||
@ -839,8 +841,10 @@ class TestAuditor(unittest.TestCase):
|
||||
self.assertEqual(['%s.ts' % ts_tomb.internal],
|
||||
os.listdir(self.disk_file._datadir))
|
||||
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
||||
self.assertFalse(os.path.exists(
|
||||
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
|
||||
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
||||
self.assertTrue(os.path.exists(hash_invalid))
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual('', fp.read().strip('\n'))
|
||||
# Run auditor
|
||||
self.auditor.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
|
||||
# sanity check - auditor should not remove tombstone file
|
||||
@ -853,8 +857,10 @@ class TestAuditor(unittest.TestCase):
|
||||
ts_tomb = Timestamp(time.time() - 55)
|
||||
part_dir, suffix = self._audit_tombstone(self.conf, ts_tomb)
|
||||
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
||||
self.assertFalse(os.path.exists(
|
||||
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
|
||||
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
||||
self.assertTrue(os.path.exists(hash_invalid))
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual('', fp.read().strip('\n'))
|
||||
|
||||
def test_reclaimable_tombstone(self):
|
||||
# audit with a reclaimable tombstone
|
||||
@ -874,8 +880,10 @@ class TestAuditor(unittest.TestCase):
|
||||
conf['reclaim_age'] = 2 * 604800
|
||||
part_dir, suffix = self._audit_tombstone(conf, ts_tomb)
|
||||
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
||||
self.assertFalse(os.path.exists(
|
||||
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
|
||||
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
||||
self.assertTrue(os.path.exists(hash_invalid))
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual('', fp.read().strip('\n'))
|
||||
|
||||
def test_reclaimable_tombstone_with_custom_reclaim_age(self):
|
||||
# audit with a tombstone older than custom reclaim age
|
||||
@ -897,8 +905,10 @@ class TestAuditor(unittest.TestCase):
|
||||
part_dir, suffix = self._audit_tombstone(
|
||||
self.conf, ts_tomb, zero_byte_fps=50)
|
||||
self.assertTrue(os.path.exists(os.path.join(part_dir, HASH_FILE)))
|
||||
self.assertFalse(os.path.exists(
|
||||
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
|
||||
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
||||
self.assertTrue(os.path.exists(hash_invalid))
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual('', fp.read().strip('\n'))
|
||||
|
||||
def _test_expired_object_is_ignored(self, zero_byte_fps):
|
||||
# verify that an expired object does not get mistaken for a tombstone
|
||||
@ -910,15 +920,41 @@ class TestAuditor(unittest.TestCase):
|
||||
extra_metadata={'X-Delete-At': now - 10})
|
||||
files = os.listdir(self.disk_file._datadir)
|
||||
self.assertTrue([f for f in files if f.endswith('.data')]) # sanity
|
||||
# diskfile write appends to invalid hashes file
|
||||
part_dir = dirname(dirname(self.disk_file._datadir))
|
||||
hash_invalid = os.path.join(part_dir, HASH_INVALIDATIONS_FILE)
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual(basename(dirname(self.disk_file._datadir)),
|
||||
fp.read().strip('\n')) # sanity check
|
||||
|
||||
# run the auditor...
|
||||
with mock.patch.object(auditor, 'dump_recon_cache'):
|
||||
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
|
||||
|
||||
# the auditor doesn't touch anything on the invalidation file
|
||||
# (i.e. not truncate and add no entry)
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual(basename(dirname(self.disk_file._datadir)),
|
||||
fp.read().strip('\n')) # sanity check
|
||||
|
||||
# this get_hashes call will truncate the invalid hashes entry
|
||||
self.disk_file.manager.get_hashes(
|
||||
self.devices + '/sda', '0', [], self.disk_file.policy)
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual('', fp.read().strip('\n')) # sanity check
|
||||
|
||||
# run the auditor, again...
|
||||
with mock.patch.object(auditor, 'dump_recon_cache'):
|
||||
audit.run_audit(mode='once', zero_byte_fps=zero_byte_fps)
|
||||
|
||||
# verify nothing changed
|
||||
self.assertTrue(os.path.exists(self.disk_file._datadir))
|
||||
part_dir = dirname(dirname(self.disk_file._datadir))
|
||||
self.assertFalse(os.path.exists(
|
||||
os.path.join(part_dir, HASH_INVALIDATIONS_FILE)))
|
||||
self.assertEqual(files, os.listdir(self.disk_file._datadir))
|
||||
self.assertFalse(audit.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(audit.logger.get_lines_for_level('warning'))
|
||||
# and there was no hash invalidation
|
||||
with open(hash_invalid, 'rb') as fp:
|
||||
self.assertEqual('', fp.read().strip('\n'))
|
||||
|
||||
def test_expired_object_is_ignored(self):
|
||||
self._test_expired_object_is_ignored(0)
|
||||
|
@ -39,7 +39,8 @@ from gzip import GzipFile
|
||||
import pyeclib.ec_iface
|
||||
|
||||
from eventlet import hubs, timeout, tpool
|
||||
from swift.obj.diskfile import MD5_OF_EMPTY_STRING, update_auditor_status
|
||||
from swift.obj.diskfile import (MD5_OF_EMPTY_STRING, update_auditor_status,
|
||||
write_pickle)
|
||||
from test.unit import (FakeLogger, mock as unit_mock, temptree,
|
||||
patch_policies, debug_logger, EMPTY_ETAG,
|
||||
make_timestamp_iter, DEFAULT_TEST_EC_TYPE,
|
||||
@ -6057,18 +6058,37 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
suffix_dir = os.path.dirname(df._datadir)
|
||||
suffix = os.path.basename(suffix_dir)
|
||||
part_path = os.path.join(self.devices, 'sda1',
|
||||
diskfile.get_data_dir(policy), '0')
|
||||
hashes_file = os.path.join(part_path, diskfile.HASH_FILE)
|
||||
inv_file = os.path.join(
|
||||
part_path, diskfile.HASH_INVALIDATIONS_FILE)
|
||||
self.assertFalse(os.path.exists(hashes_file)) # sanity
|
||||
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
|
||||
df_mgr.invalidate_hash(suffix_dir)
|
||||
self.assertFalse(mock_lock.called)
|
||||
# does not create files
|
||||
# sanity, new partition has no suffix hashing artifacts
|
||||
self.assertFalse(os.path.exists(hashes_file))
|
||||
self.assertFalse(os.path.exists(inv_file))
|
||||
# invalidating a hash does not create the hashes_file
|
||||
with mock.patch(
|
||||
'swift.obj.diskfile.BaseDiskFileManager.invalidate_hash',
|
||||
side_effect=diskfile.invalidate_hash) \
|
||||
as mock_invalidate_hash:
|
||||
df.delete(self.ts())
|
||||
self.assertFalse(os.path.exists(hashes_file))
|
||||
# ... but does invalidate the suffix
|
||||
self.assertEqual([mock.call(suffix_dir)],
|
||||
mock_invalidate_hash.call_args_list)
|
||||
with open(inv_file) as f:
|
||||
self.assertEqual(suffix, f.read().strip('\n'))
|
||||
# ... and hashing suffixes finds (and hashes) the new suffix
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertTrue(os.path.exists(hashes_file))
|
||||
self.assertIn(os.path.basename(suffix_dir), hashes)
|
||||
with open(hashes_file) as f:
|
||||
self.assertEqual(hashes, pickle.load(f))
|
||||
# ... and truncates the invalidations file
|
||||
with open(inv_file) as f:
|
||||
self.assertEqual('', f.read().strip('\n'))
|
||||
|
||||
def test_invalidate_hash_empty_file_exists(self):
|
||||
for policy in self.iter_policies():
|
||||
@ -6125,6 +6145,105 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
}
|
||||
self.assertEqual(open_log, expected)
|
||||
|
||||
def test_invalidates_hashes_of_new_partition(self):
|
||||
# a suffix can be changed or created by second process when new pkl
|
||||
# is calculated - that suffix must be correct on next get_hashes call
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
orig_listdir = os.listdir
|
||||
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
df2 = self.get_different_suffix_df(df)
|
||||
suffix2 = os.path.basename(os.path.dirname(df2._datadir))
|
||||
non_local = {'df2touched': False}
|
||||
df.delete(self.ts())
|
||||
|
||||
def mock_listdir(*args, **kwargs):
|
||||
# simulating an invalidation occuring in another process while
|
||||
# get_hashes is executing
|
||||
result = orig_listdir(*args, **kwargs)
|
||||
if not non_local['df2touched']:
|
||||
non_local['df2touched'] = True
|
||||
# other process creates new suffix
|
||||
df2.delete(self.ts())
|
||||
return result
|
||||
|
||||
with mock.patch('swift.obj.diskfile.os.listdir',
|
||||
mock_listdir):
|
||||
# creates pkl file
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
|
||||
# second suffix added after directory listing, it's added later
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertNotIn(suffix2, hashes)
|
||||
# updates pkl file
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIn(suffix2, hashes)
|
||||
|
||||
@mock.patch('swift.obj.diskfile.getmtime')
|
||||
@mock.patch('swift.obj.diskfile.write_pickle')
|
||||
def test_contains_hashes_of_existing_partition(self, mock_write_pickle,
|
||||
mock_getmtime):
|
||||
# get_hashes must repeat path listing and return all hashes when
|
||||
# another concurrent process created new pkl before hashes are stored
|
||||
# by the first process
|
||||
non_local = {}
|
||||
|
||||
def mock_write_pickle_def(*args, **kwargs):
|
||||
if 'mtime' not in non_local:
|
||||
non_local['mtime'] = time()
|
||||
non_local['mtime'] += 1
|
||||
write_pickle(*args, **kwargs)
|
||||
|
||||
def mock_getmtime_def(filename):
|
||||
if 'mtime' not in non_local:
|
||||
raise OSError(errno.ENOENT, os.strerror(errno.ENOENT))
|
||||
return non_local['mtime']
|
||||
|
||||
mock_write_pickle.side_effect = mock_write_pickle_def
|
||||
mock_getmtime.side_effect = mock_getmtime_def
|
||||
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
# force hashes.pkl to exist; when it does not exist that's fine,
|
||||
# it's just a different race; in that case the invalidation file
|
||||
# gets appended, but we don't restart hashing suffixes (the
|
||||
# invalidation get's squashed in and the suffix gets rehashed on
|
||||
# the next REPLICATE call)
|
||||
df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
orig_listdir = os.listdir
|
||||
df = df_mgr.get_diskfile('sda1', '0', 'a', 'c', 'o',
|
||||
policy=policy)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
df2 = self.get_different_suffix_df(df)
|
||||
suffix2 = os.path.basename(os.path.dirname(df2._datadir))
|
||||
non_local['df2touched'] = False
|
||||
|
||||
df.delete(self.ts())
|
||||
|
||||
def mock_listdir(*args, **kwargs):
|
||||
# simulating hashes.pkl modification by another process while
|
||||
# get_hashes is executing
|
||||
# df2 is created to check path hashes recalculation
|
||||
result = orig_listdir(*args, **kwargs)
|
||||
if not non_local['df2touched']:
|
||||
non_local['df2touched'] = True
|
||||
df2.delete(self.ts())
|
||||
# simulate pkl update by other process - mtime is updated
|
||||
self.assertIn('mtime', non_local, "hashes.pkl must exist")
|
||||
non_local['mtime'] += 1
|
||||
return result
|
||||
|
||||
with mock.patch('swift.obj.diskfile.os.listdir',
|
||||
mock_listdir):
|
||||
# creates pkl file and repeats listing when pkl modified
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
|
||||
self.assertIn(suffix, hashes)
|
||||
self.assertIn(suffix2, hashes)
|
||||
|
||||
def test_invalidate_hash_consolidation(self):
|
||||
def assert_consolidation(suffixes):
|
||||
# verify that suffixes are invalidated after consolidation
|
||||
@ -6160,7 +6279,6 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
part_path, diskfile.HASH_INVALIDATIONS_FILE)
|
||||
with open(hashes_file, 'rb') as f:
|
||||
self.assertEqual(original_hashes, pickle.load(f))
|
||||
self.assertFalse(os.path.exists(invalidations_file))
|
||||
|
||||
# invalidate the hash
|
||||
with mock.patch('swift.obj.diskfile.lock_path') as mock_lock:
|
||||
|
@ -6968,9 +6968,12 @@ class TestObjectServer(unittest.TestCase):
|
||||
self.assertIn(' 499 ', line)
|
||||
|
||||
def find_files(self):
|
||||
ignore_files = {'.lock', 'hashes.invalid'}
|
||||
found_files = defaultdict(list)
|
||||
for root, dirs, files in os.walk(self.devices):
|
||||
for filename in files:
|
||||
if filename in ignore_files:
|
||||
continue
|
||||
_name, ext = os.path.splitext(filename)
|
||||
file_path = os.path.join(root, filename)
|
||||
found_files[ext].append(file_path)
|
||||
|
Loading…
Reference in New Issue
Block a user