relinker: Warn on unmounted disks

...rather than raising EPERM because we can't create the status file.

Still exit non-zero, so ops have a clear indication that something went
wrong and relinking may not be complete.

Drive-by: Also exit non-zero when audit_location_generator has trouble
listing partitions.

Closes-Bug: 1910470
Change-Id: Icb0b2282113dc0bc1cfac711470027ea31e0b022
This commit is contained in:
Tim Burke 2021-01-29 12:43:54 -08:00
parent 53c0fc3403
commit a90515bbbe
4 changed files with 264 additions and 43 deletions

View File

@ -35,6 +35,9 @@ STATE_FILE = 'relink.{datadir}.json'
STATE_TMP_FILE = '.relink.{datadir}.json.tmp'
STEP_RELINK = 'relink'
STEP_CLEANUP = 'cleanup'
EXIT_SUCCESS = 0
EXIT_NO_APPLICABLE_POLICY = 2
EXIT_ERROR = 1
def non_negative_float(value):
@ -161,6 +164,33 @@ def hashes_filter(next_part_power, suff_path, hashes):
return hashes
def determine_exit_code(logger, found_policy, processed, action, action_errors,
error_counter):
if not found_policy:
logger.warning("No policy found to increase the partition power.")
return EXIT_NO_APPLICABLE_POLICY
unmounted = error_counter.pop('unmounted', 0)
if unmounted:
logger.warning('%d disks were unmounted', unmounted)
listdir_errors = error_counter.pop('unlistable_partitions', 0)
if listdir_errors:
logger.warning('There were %d errors listing partition directories',
listdir_errors)
if error_counter:
logger.warning(
'There were unexpected errors while enumerating disk files: %r',
error_counter)
logger.info('%d diskfiles %s (%d errors)', processed, action,
action_errors + listdir_errors)
if action_errors + listdir_errors + unmounted > 0:
# NB: audit_location_generator logs unmounted disks as warnings,
# but we want to treat them as errors
return EXIT_ERROR
return EXIT_SUCCESS
def relink(swift_dir='/etc/swift',
devices='/srv/node',
skip_mount_check=False,
@ -168,8 +198,9 @@ def relink(swift_dir='/etc/swift',
device=None,
files_per_second=0):
mount_check = not skip_mount_check
run = False
found_policy = False
relinked = errors = 0
error_counter = {}
for policy in POLICIES:
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(swift_dir)
@ -179,7 +210,7 @@ def relink(swift_dir='/etc/swift',
continue
logger.info('Relinking files for policy %s under %s',
policy.name, devices)
run = True
found_policy = True
datadir = diskfile.get_data_dir(policy)
locks = [None]
@ -208,7 +239,7 @@ def relink(swift_dir='/etc/swift',
partitions_filter=relink_partition_filter,
hook_post_partition=relink_hook_post_partition,
hashes_filter=relink_hashes_filter,
logger=logger)
logger=logger, error_counter=error_counter)
if files_per_second > 0:
locations = RateLimitedIterator(locations, files_per_second)
for fname, _, _ in locations:
@ -220,14 +251,13 @@ def relink(swift_dir='/etc/swift',
errors += 1
logger.warning("Relinking %s to %s failed: %s",
fname, newfname, exc)
if not run:
logger.warning("No policy found to increase the partition power.")
return 2
logger.info('Relinked %d diskfiles (%d errors)', relinked, errors)
if errors > 0:
return 1
return 0
return determine_exit_code(
logger=logger,
found_policy=found_policy,
processed=relinked, action='relinked',
action_errors=errors,
error_counter=error_counter,
)
def cleanup(swift_dir='/etc/swift',
@ -240,7 +270,8 @@ def cleanup(swift_dir='/etc/swift',
conf = {'devices': devices, 'mount_check': mount_check}
diskfile_router = diskfile.DiskFileRouter(conf, logger)
errors = cleaned_up = 0
run = False
error_counter = {}
found_policy = False
for policy in POLICIES:
policy.object_ring = None # Ensure it will be reloaded
policy.load_ring(swift_dir)
@ -250,7 +281,7 @@ def cleanup(swift_dir='/etc/swift',
continue
logger.info('Cleaning up files for policy %s under %s',
policy.name, devices)
run = True
found_policy = True
datadir = diskfile.get_data_dir(policy)
locks = [None]
@ -279,7 +310,7 @@ def cleanup(swift_dir='/etc/swift',
partitions_filter=cleanup_partition_filter,
hook_post_partition=cleanup_hook_post_partition,
hashes_filter=cleanup_hashes_filter,
logger=logger)
logger=logger, error_counter=error_counter)
if files_per_second > 0:
locations = RateLimitedIterator(locations, files_per_second)
for fname, device, partition in locations:
@ -327,14 +358,13 @@ def cleanup(swift_dir='/etc/swift',
except OSError as exc:
logger.warning('Error cleaning up %s: %r', fname, exc)
errors += 1
if not run:
logger.warning("No policy found to increase the partition power.")
return 2
logger.info('Cleaned up %d diskfiles (%d errors)', cleaned_up, errors)
if errors > 0:
return 1
return 0
return determine_exit_code(
logger=logger,
found_policy=found_policy,
processed=cleaned_up, action='cleaned up',
action_errors=errors,
error_counter=error_counter,
)
def main(args):

View File

@ -3216,7 +3216,8 @@ def audit_location_generator(devices, datadir, suffix='',
hook_pre_device=None, hook_post_device=None,
hook_pre_partition=None, hook_post_partition=None,
hook_pre_suffix=None, hook_post_suffix=None,
hook_pre_hash=None, hook_post_hash=None):
hook_pre_hash=None, hook_post_hash=None,
error_counter=None):
"""
Given a devices path and a data directory, yield (path, device,
partition) for all files in that directory
@ -3238,22 +3239,24 @@ def audit_location_generator(devices, datadir, suffix='',
:param mount_check: Flag to check if a mount check should be performed
on devices
:param logger: a logger object
:devices_filter: a callable taking (devices, [list of devices]) as
parameters and returning a [list of devices]
:partitions_filter: a callable taking (datadir_path, [list of parts]) as
parameters and returning a [list of parts]
:suffixes_filter: a callable taking (part_path, [list of suffixes]) as
parameters and returning a [list of suffixes]
:hashes_filter: a callable taking (suff_path, [list of hashes]) as
parameters and returning a [list of hashes]
:hook_pre_device: a callable taking device_path as parameter
:hook_post_device: a callable taking device_path as parameter
:hook_pre_partition: a callable taking part_path as parameter
:hook_post_partition: a callable taking part_path as parameter
:hook_pre_suffix: a callable taking suff_path as parameter
:hook_post_suffix: a callable taking suff_path as parameter
:hook_pre_hash: a callable taking hash_path as parameter
:hook_post_hash: a callable taking hash_path as parameter
:param devices_filter: a callable taking (devices, [list of devices]) as
parameters and returning a [list of devices]
:param partitions_filter: a callable taking (datadir_path, [list of parts])
as parameters and returning a [list of parts]
:param suffixes_filter: a callable taking (part_path, [list of suffixes])
as parameters and returning a [list of suffixes]
:param hashes_filter: a callable taking (suff_path, [list of hashes]) as
parameters and returning a [list of hashes]
:param hook_pre_device: a callable taking device_path as parameter
:param hook_post_device: a callable taking device_path as parameter
:param hook_pre_partition: a callable taking part_path as parameter
:param hook_post_partition: a callable taking part_path as parameter
:param hook_pre_suffix: a callable taking suff_path as parameter
:param hook_post_suffix: a callable taking suff_path as parameter
:param hook_pre_hash: a callable taking hash_path as parameter
:param hook_post_hash: a callable taking hash_path as parameter
:param error_counter: a dictionary used to accumulate error counts; may
add keys 'unmounted' and 'unlistable_partitions'
"""
device_dir = listdir(devices)
# randomize devices in case of process restart before sweep completed
@ -3261,17 +3264,24 @@ def audit_location_generator(devices, datadir, suffix='',
if devices_filter:
device_dir = devices_filter(devices, device_dir)
for device in device_dir:
if hook_pre_device:
hook_pre_device(os.path.join(devices, device))
if mount_check and not ismount(os.path.join(devices, device)):
if error_counter is not None:
error_counter.setdefault('unmounted', 0)
error_counter['unmounted'] += 1
if logger:
logger.warning(
_('Skipping %s as it is not mounted'), device)
continue
if hook_pre_device:
hook_pre_device(os.path.join(devices, device))
datadir_path = os.path.join(devices, device, datadir)
try:
partitions = listdir(datadir_path)
except OSError as e:
# NB: listdir ignores non-existent datadir_path
if error_counter is not None:
error_counter.setdefault('unlistable_partitions', 0)
error_counter['unlistable_partitions'] += 1
if logger:
logger.warning(_('Skipping %(datadir)s because %(err)s'),
{'datadir': datadir_path, 'err': e})

View File

@ -77,7 +77,7 @@ class TestRelinker(unittest.TestCase):
dummy.write(b"Hello World!")
write_metadata(dummy, {'name': '/a/c/o', 'Content-Length': '12'})
test_policies = [StoragePolicy(0, 'platin', True)]
test_policies = [StoragePolicy(0, 'platinum', True)]
storage_policy._POLICIES = StoragePolicyCollection(test_policies)
self.expected_dir = os.path.join(
@ -96,6 +96,18 @@ class TestRelinker(unittest.TestCase):
shutil.rmtree(self.testdir, ignore_errors=1)
storage_policy.reload_storage_policies()
@contextmanager
def _mock_listdir(self):
orig_listdir = utils.listdir
def mocked(path):
if path == self.objects:
raise OSError
return orig_listdir(path)
with mock.patch('swift.common.utils.listdir', mocked):
yield
def _do_test_relinker_drop_privileges(self, command):
@contextmanager
def do_mocks():
@ -243,6 +255,49 @@ class TestRelinker(unittest.TestCase):
stat_new = os.stat(self.expected_file)
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
def test_relink_no_applicable_policy(self):
# NB do not prepare part power increase
self._save_ring()
with mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
self.assertEqual(2, relinker.main([
'relink',
'--swift-dir', self.testdir,
'--devices', self.devices,
]))
self.assertEqual(self.logger.get_lines_for_level('warning'),
['No policy found to increase the partition power.'])
def test_relink_not_mounted(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
with mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
self.assertEqual(1, relinker.main([
'relink',
'--swift-dir', self.testdir,
'--devices', self.devices,
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping sda1 as it is not mounted',
'1 disks were unmounted'])
def test_relink_listdir_error(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
with mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
with self._mock_listdir():
self.assertEqual(1, relinker.main([
'relink',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount-check'
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping %s because ' % self.objects,
'There were 1 errors listing partition directories'])
def test_relink_device_filter(self):
self.rb.prepare_increase_partition_power()
self._save_ring()
@ -304,6 +359,47 @@ class TestRelinker(unittest.TestCase):
self.assertFalse(os.path.isfile(
os.path.join(self.objdir, self.object_fname)))
def test_cleanup_no_applicable_policy(self):
# NB do not prepare part power increase
self._save_ring()
with mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
self.assertEqual(2, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
]))
self.assertEqual(self.logger.get_lines_for_level('warning'),
['No policy found to increase the partition power.'])
def test_cleanup_not_mounted(self):
self._common_test_cleanup()
with mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
self.assertEqual(1, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping sda1 as it is not mounted',
'1 disks were unmounted'])
def test_cleanup_listdir_error(self):
self._common_test_cleanup()
with mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
with self._mock_listdir():
self.assertEqual(1, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount-check'
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping %s because ' % self.objects,
'There were 1 errors listing partition directories'])
def test_cleanup_device_filter(self):
self._common_test_cleanup()
self.assertEqual(0, relinker.main([
@ -621,7 +717,7 @@ class TestRelinker(unittest.TestCase):
@patch_policies(
[ECStoragePolicy(
0, name='platin', is_default=True, ec_type=DEFAULT_TEST_EC_TYPE,
0, name='platinum', is_default=True, ec_type=DEFAULT_TEST_EC_TYPE,
ec_ndata=4, ec_nparity=2)])
def test_cleanup_diskfile_error(self):
self._common_test_cleanup()

View File

@ -6405,6 +6405,91 @@ class TestAuditLocationGenerator(unittest.TestCase):
hashes_filter.assert_called_once_with(suffix, ["hash1"])
self.assertNotIn(((hash_path,),), m_listdir.call_args_list)
@with_tempdir
def test_error_counter(self, tmpdir):
def assert_no_errors(devices, mount_check=False):
logger = debug_logger()
error_counter = {}
locations = utils.audit_location_generator(
devices, "data", mount_check=mount_check, logger=logger,
error_counter=error_counter
)
self.assertEqual([], list(locations))
self.assertEqual([], logger.get_lines_for_level('warning'))
self.assertEqual([], logger.get_lines_for_level('error'))
self.assertEqual({}, error_counter)
# no devices, no problem
devices = os.path.join(tmpdir, 'devices1')
os.makedirs(devices)
assert_no_errors(devices)
# empty dir under devices/
devices = os.path.join(tmpdir, 'devices2')
os.makedirs(devices)
dev_dir = os.path.join(devices, 'device_is_empty_dir')
os.makedirs(dev_dir)
def assert_listdir_error(devices):
logger = debug_logger()
error_counter = {}
locations = utils.audit_location_generator(
devices, "data", mount_check=False, logger=logger,
error_counter=error_counter
)
self.assertEqual([], list(locations))
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
self.assertEqual({'unlistable_partitions': 1}, error_counter)
# file under devices/
devices = os.path.join(tmpdir, 'devices3')
os.makedirs(devices)
with open(os.path.join(devices, 'device_is_file'), 'w'):
pass
assert_listdir_error(devices)
# dir under devices/
devices = os.path.join(tmpdir, 'devices4')
device = os.path.join(devices, 'device')
os.makedirs(device)
assert_no_errors(devices)
# error for dir under devices/
orig_listdir = utils.listdir
def mocked(path):
if path.endswith('data'):
raise OSError
return orig_listdir(path)
with mock.patch('swift.common.utils.listdir', mocked):
assert_listdir_error(devices)
# mount check error
devices = os.path.join(tmpdir, 'devices5')
device = os.path.join(devices, 'device')
os.makedirs(device)
# no check
with mock.patch('swift.common.utils.ismount', return_value=False):
assert_no_errors(devices, mount_check=False)
# check passes
with mock.patch('swift.common.utils.ismount', return_value=True):
assert_no_errors(devices, mount_check=True)
# check fails
logger = debug_logger()
error_counter = {}
with mock.patch('swift.common.utils.ismount', return_value=False):
locations = utils.audit_location_generator(
devices, "data", mount_check=True, logger=logger,
error_counter=error_counter
)
self.assertEqual([], list(locations))
self.assertEqual(1, len(logger.get_lines_for_level('warning')))
self.assertEqual({'unmounted': 1}, error_counter)
class TestGreenAsyncPile(unittest.TestCase):