relinker: refactor to share common code

Refactor the relinker to reduce the amount of code that is
duplicated in the relink and cleanup functions.

Change-Id: Icf30f6799e4e5c7bb5c3d468c0e7e8591334ac1e
This commit is contained in:
Alistair Coles 2021-03-09 16:17:41 +00:00
parent 1fad8c2ed5
commit 0b129509c9
2 changed files with 338 additions and 255 deletions

View File

@ -220,264 +220,243 @@ def hashes_filter(devices, next_part_power, suff_path, hashes):
return hashes
def determine_exit_code(logger, found_policy, processed, action, action_errors,
error_counter):
if not found_policy:
logger.warning("No policy found to increase the partition power.")
return EXIT_NO_APPLICABLE_POLICY
class Relinker(object):
def __init__(self, conf, logger, device, do_cleanup=False):
self.conf = conf
self.logger = logger
self.device = device
self.do_cleanup = do_cleanup
self.root = self.conf['devices']
if self.device is not None:
self.root = os.path.join(self.root, self.device)
self.part_power = self.next_part_power = None
self.diskfile_mgr = None
self.diskfile_router = diskfile.DiskFileRouter(self.conf, self.logger)
self._zero_stats()
unmounted = error_counter.pop('unmounted', 0)
if unmounted:
logger.warning('%d disks were unmounted', unmounted)
listdir_errors = error_counter.pop('unlistable_partitions', 0)
if listdir_errors:
logger.warning('There were %d errors listing partition directories',
listdir_errors)
if error_counter:
logger.warning(
'There were unexpected errors while enumerating disk files: %r',
error_counter)
def _zero_stats(self):
self.stats = {
'ok': 0,
'errors': 0,
'policies': 0,
}
logger.info('%d diskfiles %s (%d errors)', processed, action,
action_errors + listdir_errors)
if action_errors + listdir_errors + unmounted > 0:
# NB: audit_location_generator logs unmounted disks as warnings,
# but we want to treat them as errors
return EXIT_ERROR
return EXIT_SUCCESS
def relink(self, fname):
newfname = replace_partition_in_path(self.conf['devices'], fname,
self.next_part_power)
try:
self.logger.debug('Relinking %s to %s', fname, newfname)
diskfile.relink_paths(fname, newfname)
self.stats['ok'] += 1
suffix_dir = os.path.dirname(os.path.dirname(newfname))
diskfile.invalidate_hash(suffix_dir)
except OSError as exc:
self.stats['errors'] += 1
self.logger.warning("Relinking %s to %s failed: %s",
fname, newfname, exc)
def cleanup(self, hash_path):
# Compare the contents of each hash dir with contents of same hash
# dir in its new partition to verify that the new location has the
# most up to date set of files. The new location may have newer
# files if it has been updated since relinked.
new_hash_path = replace_partition_in_path(
self.conf['devices'], hash_path, self.part_power)
if new_hash_path == hash_path:
return
# Get on disk data for new and old locations, cleaning up any
# reclaimable or obsolete files in each. The new location is
# cleaned up *before* the old location to prevent false negatives
# where the old still has a file that has been cleaned up in the
# new; cleaning up the new location first ensures that the old will
# always be 'cleaner' than the new.
new_df_data = self.diskfile_mgr.cleanup_ondisk_files(new_hash_path)
old_df_data = self.diskfile_mgr.cleanup_ondisk_files(hash_path)
# Now determine the most up to date set of on disk files would be
# given the content of old and new locations...
new_files = set(new_df_data['files'])
old_files = set(old_df_data['files'])
union_files = new_files.union(old_files)
union_data = self.diskfile_mgr.get_ondisk_files(
union_files, '', verify=False)
obsolete_files = set(info['filename']
for info in union_data.get('obsolete', []))
# drop 'obsolete' files but retain 'unexpected' files which might
# be misplaced diskfiles from another policy
required_files = union_files.difference(obsolete_files)
required_links = required_files.intersection(old_files)
missing_links = 0
created_links = 0
for filename in required_links:
# Before removing old files, be sure that the corresponding
# required new files exist by calling relink_paths again. There
# are several possible outcomes:
# - The common case is that the new file exists, in which case
# relink_paths checks that the new file has the same inode
# as the old file. An exception is raised if the inode of
# the new file is not the same as the old file.
# - The new file may not exist because the relinker failed to
# create the link to the old file and has erroneously moved
# on to cleanup. In this case the relink_paths will create
# the link now or raise an exception if that fails.
# - The new file may not exist because some other process,
# such as an object server handling a request, has cleaned
# it up since we called cleanup_ondisk_files(new_hash_path).
# In this case a new link will be created to the old file.
# This is unnecessary but simpler than repeating the
# evaluation of what links are now required and safer than
# assuming that a non-existent file that *was* required is
# no longer required. The new file will eventually be
# cleaned up again.
old_file = os.path.join(hash_path, filename)
new_file = os.path.join(new_hash_path, filename)
try:
if diskfile.relink_paths(old_file, new_file):
self.logger.debug(
"Relinking (cleanup) created link: %s to %s",
old_file, new_file)
created_links += 1
except OSError as exc:
self.logger.warning(
"Error relinking (cleanup): failed to relink %s to "
"%s: %s", old_file, new_file, exc)
self.stats['errors'] += 1
missing_links += 1
if created_links:
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
if missing_links:
return
# the new partition hash dir has the most up to date set of on
# disk files so it is safe to delete the old location...
rehash = False
# use the sorted list to help unit testing
for filename in old_df_data['files']:
old_file = os.path.join(hash_path, filename)
try:
os.remove(old_file)
except OSError as exc:
self.logger.warning('Error cleaning up %s: %r', old_file, exc)
self.stats['errors'] += 1
else:
rehash = True
self.stats['ok'] += 1
self.logger.debug("Removed %s", old_file)
if rehash:
try:
diskfile.invalidate_hash(os.path.dirname(hash_path))
except Exception as exc:
# note: not counted as an error
self.logger.warning(
'Error invalidating suffix for %s: %r',
hash_path, exc)
def process_policy(self, policy):
self.logger.info(
'Processing files for policy %s under %s (cleanup=%s)',
policy.name, self.root, self.do_cleanup)
self.part_power = policy.object_ring.part_power
self.next_part_power = policy.object_ring.next_part_power
self.diskfile_mgr = self.diskfile_router[policy]
datadir = diskfile.get_data_dir(policy)
locks = [None]
states = {
"part_power": self.part_power,
"next_part_power": self.next_part_power,
"state": {},
}
run_devices_filter = partial(devices_filter, self.device)
run_hook_pre_device = partial(hook_pre_device, locks, states,
datadir)
run_hook_post_device = partial(hook_post_device, locks)
run_partition_filter = partial(
partitions_filter, states, self.part_power, self.next_part_power)
run_hook_post_partition = partial(
hook_post_partition, self.logger, states,
STEP_CLEANUP if self.do_cleanup else STEP_RELINK,
policy, self.diskfile_mgr)
run_hashes_filter = partial(hashes_filter, self.conf['devices'],
self.next_part_power)
locations = audit_location_generator(
self.conf['devices'],
datadir,
mount_check=self.conf['mount_check'],
devices_filter=run_devices_filter,
hook_pre_device=run_hook_pre_device,
hook_post_device=run_hook_post_device,
partitions_filter=run_partition_filter,
hook_post_partition=run_hook_post_partition,
hashes_filter=run_hashes_filter,
logger=self.logger,
error_counter=self.stats,
yield_hash_dirs=self.do_cleanup
)
if self.conf['files_per_second'] > 0:
locations = RateLimitedIterator(
locations, self.conf['files_per_second'])
for location, device, partition in locations:
if self.do_cleanup:
self.cleanup(location)
else:
self.relink(location)
def run(self):
self._zero_stats()
for policy in POLICIES:
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(self.conf['swift_dir'])
ring = policy.object_ring
if not ring.next_part_power:
continue
part_power_increased = ring.next_part_power == ring.part_power
if self.do_cleanup != part_power_increased:
continue
self.stats['policies'] += 1
self.process_policy(policy)
policies = self.stats.pop('policies')
if not policies:
self.logger.warning(
"No policy found to increase the partition power.")
return EXIT_NO_APPLICABLE_POLICY
processed = self.stats.pop('ok')
action_errors = self.stats.pop('errors')
unmounted = self.stats.pop('unmounted', 0)
if unmounted:
self.logger.warning('%d disks were unmounted', unmounted)
listdir_errors = self.stats.pop('unlistable_partitions', 0)
if listdir_errors:
self.logger.warning(
'There were %d errors listing partition directories',
listdir_errors)
if self.stats:
self.logger.warning(
'There were unexpected errors while enumerating disk '
'files: %r', self.stats)
self.logger.info(
'%d diskfiles processed (cleanup=%s) (%d errors)',
processed, self.do_cleanup, action_errors + listdir_errors)
if action_errors + listdir_errors + unmounted > 0:
# NB: audit_location_generator logs unmounted disks as warnings,
# but we want to treat them as errors
return EXIT_ERROR
return EXIT_SUCCESS
def relink(conf, logger, device):
diskfile_router = diskfile.DiskFileRouter(conf, logger)
found_policy = False
relinked = errors = 0
error_counter = {}
for policy in POLICIES:
diskfile_mgr = diskfile_router[policy]
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(conf['swift_dir'])
part_power = policy.object_ring.part_power
next_part_power = policy.object_ring.next_part_power
if not next_part_power or next_part_power == part_power:
continue
logger.info('Relinking files for policy %s under %s',
policy.name, conf['devices'])
found_policy = True
datadir = diskfile.get_data_dir(policy)
locks = [None]
states = {
"part_power": part_power,
"next_part_power": next_part_power,
"state": {},
}
relink_devices_filter = partial(devices_filter, device)
relink_hook_pre_device = partial(hook_pre_device, locks, states,
datadir)
relink_hook_post_device = partial(hook_post_device, locks)
relink_partition_filter = partial(partitions_filter,
states, part_power, next_part_power)
relink_hook_post_partition = partial(hook_post_partition, logger,
states, STEP_RELINK, policy,
diskfile_mgr)
relink_hashes_filter = partial(hashes_filter, conf['devices'],
next_part_power)
locations = audit_location_generator(
conf['devices'],
datadir,
mount_check=conf['mount_check'],
devices_filter=relink_devices_filter,
hook_pre_device=relink_hook_pre_device,
hook_post_device=relink_hook_post_device,
partitions_filter=relink_partition_filter,
hook_post_partition=relink_hook_post_partition,
hashes_filter=relink_hashes_filter,
logger=logger, error_counter=error_counter)
if conf['files_per_second'] > 0:
locations = RateLimitedIterator(
locations, conf['files_per_second'])
for fname, _, _ in locations:
newfname = replace_partition_in_path(conf['devices'], fname,
next_part_power)
try:
logger.debug('Relinking %s to %s', fname, newfname)
diskfile.relink_paths(fname, newfname)
relinked += 1
suffix_dir = os.path.dirname(os.path.dirname(newfname))
diskfile.invalidate_hash(suffix_dir)
except OSError as exc:
errors += 1
logger.warning("Relinking %s to %s failed: %s",
fname, newfname, exc)
return determine_exit_code(
logger=logger,
found_policy=found_policy,
processed=relinked, action='relinked',
action_errors=errors,
error_counter=error_counter,
)
return Relinker(conf, logger, device, do_cleanup=False).run()
def cleanup(conf, logger, device):
diskfile_router = diskfile.DiskFileRouter(conf, logger)
errors = cleaned_up = 0
error_counter = {}
found_policy = False
for policy in POLICIES:
diskfile_mgr = diskfile_router[policy]
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(conf['swift_dir'])
part_power = policy.object_ring.part_power
next_part_power = policy.object_ring.next_part_power
if not next_part_power or next_part_power != part_power:
continue
logger.info('Cleaning up files for policy %s under %s',
policy.name, conf['devices'])
found_policy = True
datadir = diskfile.get_data_dir(policy)
locks = [None]
states = {
"part_power": part_power,
"next_part_power": next_part_power,
"state": {},
}
cleanup_devices_filter = partial(devices_filter, device)
cleanup_hook_pre_device = partial(hook_pre_device, locks, states,
datadir)
cleanup_hook_post_device = partial(hook_post_device, locks)
cleanup_partition_filter = partial(partitions_filter,
states, part_power, next_part_power)
cleanup_hook_post_partition = partial(hook_post_partition, logger,
states, STEP_CLEANUP, policy,
diskfile_mgr)
cleanup_hashes_filter = partial(hashes_filter, conf['devices'],
next_part_power)
locations = audit_location_generator(
conf['devices'],
datadir,
mount_check=conf['mount_check'],
devices_filter=cleanup_devices_filter,
hook_pre_device=cleanup_hook_pre_device,
hook_post_device=cleanup_hook_post_device,
partitions_filter=cleanup_partition_filter,
hook_post_partition=cleanup_hook_post_partition,
hashes_filter=cleanup_hashes_filter,
logger=logger,
error_counter=error_counter,
yield_hash_dirs=True)
if conf['files_per_second'] > 0:
locations = RateLimitedIterator(
locations, conf['files_per_second'])
for hash_path, device, partition in locations:
# Compare the contents of each hash dir with contents of same hash
# dir in its new partition to verify that the new location has the
# most up to date set of files. The new location may have newer
# files if it has been updated since relinked.
new_hash_path = replace_partition_in_path(
conf['devices'], hash_path, part_power)
if new_hash_path == hash_path:
continue
# Get on disk data for new and old locations, cleaning up any
# reclaimable or obsolete files in each. The new location is
# cleaned up *before* the old location to prevent false negatives
# where the old still has a file that has been cleaned up in the
# new; cleaning up the new location first ensures that the old will
# always be 'cleaner' than the new.
new_df_data = diskfile_mgr.cleanup_ondisk_files(new_hash_path)
old_df_data = diskfile_mgr.cleanup_ondisk_files(hash_path)
# Now determine the most up to date set of on disk files would be
# given the content of old and new locations...
new_files = set(new_df_data['files'])
old_files = set(old_df_data['files'])
union_files = new_files.union(old_files)
union_data = diskfile_mgr.get_ondisk_files(
union_files, '', verify=False)
obsolete_files = set(info['filename']
for info in union_data.get('obsolete', []))
# drop 'obsolete' files but retain 'unexpected' files which might
# be misplaced diskfiles from another policy
required_files = union_files.difference(obsolete_files)
required_links = required_files.intersection(old_files)
missing_links = 0
created_links = 0
for filename in required_links:
# Before removing old files, be sure that the corresponding
# required new files exist by calling relink_paths again. There
# are several possible outcomes:
# - The common case is that the new file exists, in which case
# relink_paths checks that the new file has the same inode
# as the old file. An exception is raised if the inode of
# the new file is not the same as the old file.
# - The new file may not exist because the relinker failed to
# create the link to the old file and has erroneously moved
# on to cleanup. In this case the relink_paths will create
# the link now or raise an exception if that fails.
# - The new file may not exist because some other process,
# such as an object server handling a request, has cleaned
# it up since we called cleanup_ondisk_files(new_hash_path).
# In this case a new link will be created to the old file.
# This is unnecessary but simpler than repeating the
# evaluation of what links are now required and safer than
# assuming that a non-existent file that *was* required is
# no longer required. The new file will eventually be
# cleaned up again.
old_file = os.path.join(hash_path, filename)
new_file = os.path.join(new_hash_path, filename)
try:
if diskfile.relink_paths(old_file, new_file):
logger.debug(
"Relinking (cleanup) created link: %s to %s",
old_file, new_file)
created_links += 1
except OSError as exc:
logger.warning(
"Error relinking (cleanup): failed to relink %s to "
"%s: %s", old_file, new_file, exc)
errors += 1
missing_links += 1
if created_links:
diskfile.invalidate_hash(os.path.dirname(new_hash_path))
if missing_links:
continue
# the new partition hash dir has the most up to date set of on
# disk files so it is safe to delete the old location...
rehash = False
# use the sorted list to help unit testing
for filename in old_df_data['files']:
old_file = os.path.join(hash_path, filename)
try:
os.remove(old_file)
except OSError as exc:
logger.warning('Error cleaning up %s: %r', old_file, exc)
errors += 1
else:
rehash = True
cleaned_up += 1
logger.debug("Removed %s", old_file)
if rehash:
try:
diskfile.invalidate_hash(os.path.dirname(hash_path))
except Exception as exc:
# note: not counted as an error
logger.warning(
'Error invalidating suffix for %s: %r',
hash_path, exc)
return determine_exit_code(
logger=logger,
found_policy=found_policy,
processed=cleaned_up, action='cleaned up',
action_errors=errors,
error_counter=error_counter,
)
return Relinker(conf, logger, device, do_cleanup=True).run()
def main(args):

View File

@ -67,6 +67,8 @@ class TestRelinker(unittest.TestCase):
os.mkdir(os.path.join(self.devices, self.existing_device))
self.objects = os.path.join(self.devices, self.existing_device,
'objects')
self.policy = StoragePolicy(0, 'platinum', True)
storage_policy._POLICIES = StoragePolicyCollection([self.policy])
self._setup_object()
def _setup_object(self, condition=None):
@ -103,9 +105,6 @@ class TestRelinker(unittest.TestCase):
write_metadata(dummy,
{'name': self.obj_path, 'Content-Length': '12'})
self.policy = StoragePolicy(0, 'platinum', True)
storage_policy._POLICIES = StoragePolicyCollection([self.policy])
self.part_dir = os.path.join(self.objects, str(self.part))
self.suffix = self._hash[-3:]
self.suffix_dir = os.path.join(self.part_dir, self.suffix)
@ -114,10 +113,10 @@ class TestRelinker(unittest.TestCase):
self.expected_dir = os.path.join(self.next_suffix_dir, self._hash)
self.expected_file = os.path.join(self.expected_dir, self.object_fname)
def _save_ring(self):
def _save_ring(self, policies=POLICIES):
self.rb._ring = None
rd = self.rb.get_ring()
for policy in POLICIES:
for policy in policies:
rd.save(os.path.join(
self.testdir, '%s.ring.gz' % policy.ring_name))
# Enforce ring reloading in relinker
@ -551,6 +550,111 @@ class TestRelinker(unittest.TestCase):
self.assertFalse(os.path.isdir(self.expected_dir))
self.assertFalse(os.path.isfile(self.expected_file))
@patch_policies(
[StoragePolicy(0, name='gold', is_default=True),
ECStoragePolicy(1, name='platinum', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2)])
def test_relink_all_policies(self):
# verify that only policies in appropriate state are processed
def do_relink():
with mock.patch(
'swift.cli.relinker.Relinker.process_policy') as mocked:
res = relinker.main([
'relink',
'--swift-dir', self.testdir,
'--skip-mount',
'--devices', self.devices,
'--device', self.existing_device,
])
return res, mocked
self._save_ring(POLICIES) # no ring prepared for increase
res, mocked = do_relink()
self.assertEqual([], mocked.call_args_list)
self.assertEqual(2, res)
self._save_ring([POLICIES[0]]) # not prepared for increase
self.rb.prepare_increase_partition_power()
self._save_ring([POLICIES[1]]) # prepared for increase
res, mocked = do_relink()
self.assertEqual([mock.call(POLICIES[1])], mocked.call_args_list)
self.assertEqual(0, res)
self._save_ring([POLICIES[0]]) # prepared for increase
res, mocked = do_relink()
self.assertEqual([mock.call(POLICIES[0]), mock.call(POLICIES[1])],
mocked.call_args_list)
self.assertEqual(0, res)
self.rb.increase_partition_power()
self._save_ring([POLICIES[0]]) # increased
res, mocked = do_relink()
self.assertEqual([mock.call(POLICIES[1])], mocked.call_args_list)
self.assertEqual(0, res)
self._save_ring([POLICIES[1]]) # increased
res, mocked = do_relink()
self.assertEqual([], mocked.call_args_list)
self.assertEqual(2, res)
self.rb.finish_increase_partition_power()
self._save_ring(POLICIES) # all rings finished
res, mocked = do_relink()
self.assertEqual([], mocked.call_args_list)
self.assertEqual(2, res)
@patch_policies(
[StoragePolicy(0, name='gold', is_default=True),
ECStoragePolicy(1, name='platinum', ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2)])
def test_cleanup_all_policies(self):
# verify that only policies in appropriate state are processed
def do_cleanup():
with mock.patch(
'swift.cli.relinker.Relinker.process_policy') as mocked:
res = relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--skip-mount',
'--devices', self.devices,
'--device', self.existing_device,
])
return res, mocked
self._save_ring(POLICIES) # no ring prepared for increase
res, mocked = do_cleanup()
self.assertEqual([], mocked.call_args_list)
self.assertEqual(2, res)
self.rb.prepare_increase_partition_power()
self._save_ring(POLICIES) # all rings prepared for increase
res, mocked = do_cleanup()
self.assertEqual([], mocked.call_args_list)
self.assertEqual(2, res)
self.rb.increase_partition_power()
self._save_ring([POLICIES[0]]) # increased
res, mocked = do_cleanup()
self.assertEqual([mock.call(POLICIES[0])], mocked.call_args_list)
self.assertEqual(0, res)
self._save_ring([POLICIES[1]]) # increased
res, mocked = do_cleanup()
self.assertEqual([mock.call(POLICIES[0]), mock.call(POLICIES[1])],
mocked.call_args_list)
self.assertEqual(0, res)
self.rb.finish_increase_partition_power()
self._save_ring([POLICIES[1]]) # finished
res, mocked = do_cleanup()
self.assertEqual([mock.call(POLICIES[0])], mocked.call_args_list)
self.assertEqual(0, res)
self._save_ring([POLICIES[0]]) # finished
res, mocked = do_cleanup()
self.assertEqual([], mocked.call_args_list)
self.assertEqual(2, res)
def _common_test_cleanup(self, relink=True):
# Create a ring that has prev_part_power set
self.rb.prepare_increase_partition_power()