Merge "relinker: Rehash the parts actually touched when relinking"

This commit is contained in:
Zuul 2021-05-12 11:14:09 +00:00 committed by Gerrit Code Review
commit 6167dbf7ba
4 changed files with 105 additions and 32 deletions

View File

@ -29,7 +29,7 @@ from swift.common.utils import replace_partition_in_path, config_true_value, \
audit_location_generator, get_logger, readconf, drop_privileges, \ audit_location_generator, get_logger, readconf, drop_privileges, \
RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \ RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \
non_negative_float, non_negative_int, config_auto_int_value, \ non_negative_float, non_negative_int, config_auto_int_value, \
dump_recon_cache dump_recon_cache, get_partition_from_path
from swift.obj import diskfile from swift.obj import diskfile
@ -118,6 +118,7 @@ class Relinker(object):
self.devices_data = recursive_defaultdict() self.devices_data = recursive_defaultdict()
self.policy_count = 0 self.policy_count = 0
self.pid = os.getpid() self.pid = os.getpid()
self.linked_into_partitions = set()
def _aggregate_dev_policy_stats(self): def _aggregate_dev_policy_stats(self):
for dev_data in self.devices_data.values(): for dev_data in self.devices_data.values():
@ -283,9 +284,11 @@ class Relinker(object):
def hook_pre_partition(self, partition_path): def hook_pre_partition(self, partition_path):
self.pre_partition_errors = self.total_errors self.pre_partition_errors = self.total_errors
self.linked_into_partitions = set()
def hook_post_partition(self, partition_path): def hook_post_partition(self, partition_path):
datadir_path, part = os.path.split(os.path.abspath(partition_path)) datadir_path, partition = os.path.split(
os.path.abspath(partition_path))
device_path, datadir_name = os.path.split(datadir_path) device_path, datadir_name = os.path.split(datadir_path)
device = os.path.basename(device_path) device = os.path.basename(device_path)
state_tmp_file = os.path.join( state_tmp_file = os.path.join(
@ -315,15 +318,15 @@ class Relinker(object):
# shift to the new partition space and rehash # shift to the new partition space and rehash
# |0 2N| # |0 2N|
# | IIJJKKLLMMNNOOPP| # | IIJJKKLLMMNNOOPP|
partition = int(part) for dirty_partition in self.linked_into_partitions:
if not self.do_cleanup and partition >= 2 ** ( if self.do_cleanup or \
self.states['part_power'] - 1): dirty_partition >= 2 ** self.states['part_power']:
for new_part in (2 * partition, 2 * partition + 1):
self.diskfile_mgr.get_hashes( self.diskfile_mgr.get_hashes(
device, new_part, [], self.policy) device, dirty_partition, [], self.policy)
elif self.do_cleanup:
if self.do_cleanup:
hashes = self.diskfile_mgr.get_hashes( hashes = self.diskfile_mgr.get_hashes(
device, partition, [], self.policy) device, int(partition), [], self.policy)
# In any reasonably-large cluster, we'd expect all old # In any reasonably-large cluster, we'd expect all old
# partitions P to be empty after cleanup (i.e., it's unlikely # partitions P to be empty after cleanup (i.e., it's unlikely
# that there's another partition Q := P//2 that also has data # that there's another partition Q := P//2 that also has data
@ -359,7 +362,7 @@ class Relinker(object):
# in case the process is interrupted and needs to resume, or there # in case the process is interrupted and needs to resume, or there
# were errors and the relinker needs to run again. # were errors and the relinker needs to run again.
if self.pre_partition_errors == self.total_errors: if self.pre_partition_errors == self.total_errors:
self.states["state"][part] = True self.states["state"][partition] = True
with open(state_tmp_file, 'wt') as f: with open(state_tmp_file, 'wt') as f:
json.dump(self.states, f) json.dump(self.states, f)
os.fsync(f.fileno()) os.fsync(f.fileno())
@ -507,6 +510,8 @@ class Relinker(object):
self.stats['errors'] += 1 self.stats['errors'] += 1
missing_links += 1 missing_links += 1
if created_links: if created_links:
self.linked_into_partitions.add(get_partition_from_path(
self.conf['devices'], new_hash_path))
diskfile.invalidate_hash(os.path.dirname(new_hash_path)) diskfile.invalidate_hash(os.path.dirname(new_hash_path))
if self.do_cleanup and not missing_links: if self.do_cleanup and not missing_links:
@ -529,6 +534,9 @@ class Relinker(object):
self.logger.debug("Removed %s", old_file) self.logger.debug("Removed %s", old_file)
if rehash: if rehash:
# Even though we're invalidating the suffix, don't update
# self.linked_into_partitions -- we only care about them for
# relinking into the new part-power space
try: try:
diskfile.invalidate_hash(os.path.dirname(hash_path)) diskfile.invalidate_hash(os.path.dirname(hash_path))
except Exception as exc: except Exception as exc:

View File

@ -5982,6 +5982,22 @@ def get_partition_for_hash(hex_hash, part_power):
return struct.unpack_from('>I', raw_hash)[0] >> part_shift return struct.unpack_from('>I', raw_hash)[0] >> part_shift
def get_partition_from_path(devices, path):
"""
:param devices: directory where devices are mounted (e.g. /srv/node)
:param path: full path to a object file or hashdir
:returns: the (integer) partition from the path
"""
offset_parts = devices.rstrip(os.sep).split(os.sep)
path_components = path.split(os.sep)
if offset_parts == path_components[:len(offset_parts)]:
offset = len(offset_parts)
else:
raise ValueError('Path %r is not under device dir %r' % (
path, devices))
return int(path_components[offset + 2])
def replace_partition_in_path(devices, path, part_power): def replace_partition_in_path(devices, path, part_power):
""" """
Takes a path and a partition power and returns the same path, but with the Takes a path and a partition power and returns the same path, but with the
@ -5990,8 +6006,6 @@ def replace_partition_in_path(devices, path, part_power):
:param devices: directory where devices are mounted (e.g. /srv/node) :param devices: directory where devices are mounted (e.g. /srv/node)
:param path: full path to a object file or hashdir :param path: full path to a object file or hashdir
:param part_power: partition power to compute correct partition number :param part_power: partition power to compute correct partition number
:param is_hash_dir: if True then ``path`` is the path to a hash dir,
otherwise ``path`` is the path to a file in a hash dir.
:returns: Path with re-computed partition power :returns: Path with re-computed partition power
""" """
offset_parts = devices.rstrip(os.sep).split(os.sep) offset_parts = devices.rstrip(os.sep).split(os.sep)

View File

@ -2120,7 +2120,7 @@ class TestRelinker(unittest.TestCase):
(('meta', 1),), (('meta', 1),),
None, None,
None, None,
(('data', 0), ('meta', 1), ('meta', 2))) (('data', 0), ('meta', 2)))
info_lines = self.logger.get_lines_for_level('info') info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) ' self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 2 linked, 2 removed, 0 errors)', '(2 files, 2 linked, 2 removed, 0 errors)',
@ -2131,7 +2131,7 @@ class TestRelinker(unittest.TestCase):
(('data', 0),), (('data', 0),),
None, None,
None, None,
(('data', 0), ('ts', 2),)) (('ts', 2),))
info_lines = self.logger.get_lines_for_level('info') info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) ' self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 1 linked, 1 removed, 0 errors)', '(1 files, 1 linked, 1 removed, 0 errors)',
@ -2142,7 +2142,7 @@ class TestRelinker(unittest.TestCase):
(('ts', 0),), (('ts', 0),),
None, None,
None, None,
(('ts', 0), ('data', 1), ('meta', 2))) (('data', 1), ('meta', 2)))
info_lines = self.logger.get_lines_for_level('info') info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) ' self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 2 linked, 2 removed, 0 errors)', '(2 files, 2 linked, 2 removed, 0 errors)',
@ -3212,7 +3212,8 @@ class TestRelinker(unittest.TestCase):
self.assertEqual([], self.logger.get_lines_for_level('error')) self.assertEqual([], self.logger.get_lines_for_level('error'))
def test_cleanup_not_yet_relinked(self): def test_cleanup_not_yet_relinked(self):
# force rehash of new partition to not happen during cleanup # force new partition to be above range of partitions visited during
# cleanup
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1)) self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
self._common_test_cleanup(relink=False) self._common_test_cleanup(relink=False)
with self._mock_relinker(): with self._mock_relinker():
@ -3234,11 +3235,48 @@ class TestRelinker(unittest.TestCase):
info_lines = self.logger.get_lines_for_level('info') info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) ' self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines) '(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
# suffix should be invalidated in new partition # suffix should be invalidated and rehashed in new partition
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid') hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
self.assertTrue(os.path.exists(hashes_invalid)) self.assertTrue(os.path.exists(hashes_invalid))
with open(hashes_invalid, 'r') as fd: with open(hashes_invalid, 'r') as fd:
self.assertEqual(str(self.suffix), fd.read().strip()) self.assertEqual('', fd.read().strip())
self.assertEqual([], self.logger.get_lines_for_level('error'))
def test_cleanup_not_yet_relinked_low(self):
# force new partition to be in the range of partitions visited during
# cleanup, but not exist until after cleanup would have visited it
self._setup_object(lambda part: part < 2 ** (PART_POWER - 1))
self._common_test_cleanup(relink=False)
self.assertFalse(os.path.isfile(self.expected_file))
self.assertFalse(os.path.exists(self.next_part_dir))
# Relinker processes partitions in reverse order; as a result, the
# "normal" rehash during cleanup won't hit this, since it doesn't
# exist yet -- but when we finish processing the old partition,
# we'll loop back around.
with self._mock_relinker():
self.assertEqual(0, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
]))
self.assertTrue(os.path.isfile(self.expected_file)) # link created
# old partition should be cleaned up
self.assertFalse(os.path.exists(self.part_dir))
self.assertEqual([], self.logger.get_lines_for_level('warning'))
self.assertIn(
'Relinking (cleanup) created link: %s to %s'
% (self.objname, self.expected_file),
self.logger.get_lines_for_level('debug'))
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
# suffix should be invalidated and rehashed in new partition
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
self.assertTrue(os.path.exists(hashes_invalid))
with open(hashes_invalid, 'r') as fd:
self.assertEqual('', fd.read().strip())
self.assertEqual([], self.logger.get_lines_for_level('error')) self.assertEqual([], self.logger.get_lines_for_level('error'))
def test_cleanup_same_object_different_inode_in_new_partition(self): def test_cleanup_same_object_different_inode_in_new_partition(self):
@ -3302,7 +3340,8 @@ class TestRelinker(unittest.TestCase):
self.assertEqual(0, res) self.assertEqual(0, res)
# old partition should be cleaned up # old partition should be cleaned up
self.assertFalse(os.path.exists(self.part_dir)) self.assertFalse(os.path.exists(self.part_dir))
self.assertTrue(os.path.isfile(older_obj_file)) # older file intact # which is also going to clean up the older file
self.assertFalse(os.path.isfile(older_obj_file))
self.assertTrue(os.path.isfile(self.expected_file)) # link created self.assertTrue(os.path.isfile(self.expected_file)) # link created
self.assertIn( self.assertIn(
'Relinking (cleanup) created link: %s to %s' 'Relinking (cleanup) created link: %s to %s'
@ -3312,11 +3351,11 @@ class TestRelinker(unittest.TestCase):
info_lines = self.logger.get_lines_for_level('info') info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) ' self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 1 linked, 1 removed, 0 errors)', info_lines) '(1 files, 1 linked, 1 removed, 0 errors)', info_lines)
# suffix should be invalidated in new partition # suffix should be invalidated and rehashed in new partition
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid') hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
self.assertTrue(os.path.exists(hashes_invalid)) self.assertTrue(os.path.exists(hashes_invalid))
with open(hashes_invalid, 'r') as fd: with open(hashes_invalid, 'r') as fd:
self.assertEqual(str(self.suffix), fd.read().strip()) self.assertEqual('', fd.read().strip())
self.assertEqual([], self.logger.get_lines_for_level('error')) self.assertEqual([], self.logger.get_lines_for_level('error'))
def test_cleanup_deleted(self): def test_cleanup_deleted(self):
@ -3613,11 +3652,11 @@ class TestRelinker(unittest.TestCase):
'--skip-mount', '--skip-mount',
])) ]))
warning_lines = self.logger.get_lines_for_level('warning') warning_lines = self.logger.get_lines_for_level('warning')
# once for cleanup_ondisk_files in old and once once for the # once for cleanup_ondisk_files in old, again for the get_ondisk_files
# get_ondisk_files of union of files; the new partition did not exist # of union of files, and one last time when the new partition gets
# at start of cleanup so is not rehashed # rehashed at the end of processing the old one
self.assertEqual(2, len(warning_lines), self.assertEqual(3, len(warning_lines),
'Expected 2 log lines, got %r' % warning_lines) 'Expected 3 log lines, got %r' % warning_lines)
for line in warning_lines: for line in warning_lines:
self.assertIn('Bad fragment index: None', line, warning_lines) self.assertIn('Bad fragment index: None', line, warning_lines)
self.assertIn( self.assertIn(
@ -3666,12 +3705,8 @@ class TestRelinker(unittest.TestCase):
])) ]))
expected = [('invalidate', self.next_suffix_dir)] expected = [('invalidate', self.next_suffix_dir)]
if self.part >= 2 ** (PART_POWER - 1): if self.part >= 2 ** (PART_POWER - 1):
expected.extend([ expected.append(('get_hashes', self.existing_device,
('get_hashes', self.existing_device, self.next_part & ~1, self.next_part, [], POLICIES[0]))
[], POLICIES[0]),
('get_hashes', self.existing_device, self.next_part | 1,
[], POLICIES[0]),
])
self.assertEqual(calls, expected) self.assertEqual(calls, expected)
# Depending on partition, there may or may not be a get_hashes here # Depending on partition, there may or may not be a get_hashes here

View File

@ -4468,6 +4468,22 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertEqual(0, utils.get_partition_for_hash(hex_hash, 0)) self.assertEqual(0, utils.get_partition_for_hash(hex_hash, 0))
self.assertEqual(0, utils.get_partition_for_hash(hex_hash, -1)) self.assertEqual(0, utils.get_partition_for_hash(hex_hash, -1))
def test_get_partition_from_path(self):
def do_test(path):
self.assertEqual(utils.get_partition_from_path('/s/n', path), 70)
self.assertEqual(utils.get_partition_from_path('/s/n/', path), 70)
path += '/'
self.assertEqual(utils.get_partition_from_path('/s/n', path), 70)
self.assertEqual(utils.get_partition_from_path('/s/n/', path), 70)
do_test('/s/n/d/o/70/c77/af088baea4806dcaba30bf07d9e64c77/f')
# also works with a hashdir
do_test('/s/n/d/o/70/c77/af088baea4806dcaba30bf07d9e64c77')
# or suffix dir
do_test('/s/n/d/o/70/c77')
# or even the part dir itself
do_test('/s/n/d/o/70')
def test_replace_partition_in_path(self): def test_replace_partition_in_path(self):
# Check for new part = part * 2 # Check for new part = part * 2
old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f' old = '/s/n/d/o/700/c77/af088baea4806dcaba30bf07d9e64c77/f'