relinker: Parallelize per disk

Add a new option, workers, that works more or less like the same option
from background daemons. Disks will be distributed across N worker
sub-processes so we can make the best use of the I/O available.

While we're at it, log final stats at warning if there were errors.

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Change-Id: I039d2b8861f69a64bd9d2cdf68f1f534c236b2ba
This commit is contained in:
Tim Burke 2021-01-06 16:18:04 -08:00
parent b79ca57fa9
commit abfa6bee72
3 changed files with 357 additions and 127 deletions

View File

@ -610,7 +610,12 @@ use = egg:swift#xprofile
# log_level = INFO
# log_address = /dev/log
#
# Target this many relinks/cleanups per second, to reduce the
# Start up to this many sub-processes to process disks in parallel. Each disk
# will be handled by at most one child process. By default, one process is
# spawned per disk.
# workers = auto
#
# Target this many relinks/cleanups per second for each worker, to reduce the
# likelihood that the added I/O from a partition-power increase impacts
# client traffic. Use zero for unlimited.
# files_per_second = 0.0
@ -632,4 +637,4 @@ use = egg:swift#xprofile
# useful if previous partition power increases have failed to cleanup
# tombstones from their old locations, causing duplicate tombstones with
# different inodes to be relinked to the next partition power location.
# link_check_limit = 2
# link_check_limit = 2

View File

@ -20,10 +20,12 @@ import fcntl
import json
import logging
import os
import time
from swift.common.storage_policy import POLICIES
from swift.common.utils import replace_partition_in_path, config_true_value, \
audit_location_generator, get_logger, readconf, drop_privileges, \
RateLimitedIterator, lock_path, non_negative_float, non_negative_int
RateLimitedIterator, lock_path, PrefixLoggerAdapter, distribute_evenly, \
non_negative_float, non_negative_int, config_auto_int_value
from swift.obj import diskfile
@ -45,14 +47,14 @@ def policy(policy_name_or_index):
class Relinker(object):
def __init__(self, conf, logger, device, do_cleanup=False):
def __init__(self, conf, logger, device_list=None, do_cleanup=False):
self.conf = conf
self.logger = logger
self.device = device
self.device_list = device_list or []
self.do_cleanup = do_cleanup
self.root = self.conf['devices']
if self.device is not None:
self.root = os.path.join(self.root, self.device)
if len(self.device_list) == 1:
self.root = os.path.join(self.root, list(self.device_list)[0])
self.part_power = self.next_part_power = None
self.diskfile_mgr = None
self.dev_lock = None
@ -70,8 +72,8 @@ class Relinker(object):
}
def devices_filter(self, _, devices):
if self.device:
devices = [d for d in devices if d == self.device]
if self.device_list:
devices = [d for d in devices if d in self.device_list]
return set(devices)
@ -494,23 +496,84 @@ class Relinker(object):
'There were unexpected errors while enumerating disk '
'files: %r', self.stats)
self.logger.info(
if action_errors + listdir_errors + unmounted > 0:
log_method = self.logger.warning
# NB: audit_location_generator logs unmounted disks as warnings,
# but we want to treat them as errors
status = EXIT_ERROR
else:
log_method = self.logger.info
status = EXIT_SUCCESS
log_method(
'%d hash dirs processed (cleanup=%s) (%d files, %d linked, '
'%d removed, %d errors)', hash_dirs, self.do_cleanup, files,
linked, removed, action_errors + listdir_errors)
if action_errors + listdir_errors + unmounted > 0:
# NB: audit_location_generator logs unmounted disks as warnings,
# but we want to treat them as errors
return EXIT_ERROR
return EXIT_SUCCESS
return status
def relink(conf, logger, device):
return Relinker(conf, logger, device, do_cleanup=False).run()
def parallel_process(do_cleanup, conf, logger=None, device_list=None):
logger = logger or logging.getLogger()
device_list = sorted(set(device_list or os.listdir(conf['devices'])))
workers = conf['workers']
if workers == 'auto':
workers = len(device_list)
else:
workers = min(workers, len(device_list))
if workers == 0 or len(device_list) in (0, 1):
return Relinker(
conf, logger, device_list, do_cleanup=do_cleanup).run()
start = time.time()
children = {}
for worker_devs in distribute_evenly(device_list, workers):
pid = os.fork()
if pid == 0:
dev_logger = PrefixLoggerAdapter(logger, {})
dev_logger.set_prefix('[pid=%s, devs=%s] ' % (
os.getpid(), ','.join(worker_devs)))
os._exit(Relinker(
conf, dev_logger, worker_devs, do_cleanup=do_cleanup).run())
else:
children[pid] = worker_devs
def cleanup(conf, logger, device):
return Relinker(conf, logger, device, do_cleanup=True).run()
final_status = EXIT_SUCCESS
final_messages = []
while children:
pid, status = os.wait()
sig = status & 0xff
status = status >> 8
time_delta = time.time() - start
devs = children.pop(pid, ['unknown device'])
worker_desc = '(pid=%s, devs=%s)' % (pid, ','.join(devs))
if sig != 0:
final_status = EXIT_ERROR
final_messages.append(
'Worker %s exited in %.1fs after receiving signal: %s'
% (worker_desc, time_delta, sig))
continue
if status == EXIT_SUCCESS:
continue
if status == EXIT_NO_APPLICABLE_POLICY:
if final_status == EXIT_SUCCESS:
final_status = status
continue
final_status = EXIT_ERROR
if status == EXIT_ERROR:
final_messages.append(
'Worker %s completed in %.1fs with errors'
% (worker_desc, time_delta))
else:
final_messages.append(
'Worker %s exited in %.1fs with unexpected status %s'
% (worker_desc, time_delta, status))
for msg in final_messages:
logger.warning(msg)
return final_status
def main(args):
@ -529,7 +592,8 @@ def main(args):
dest='devices', help='Path to swift device directory')
parser.add_argument('--user', default=None, dest='user',
help='Drop privileges to this user before relinking')
parser.add_argument('--device', default=None, dest='device',
parser.add_argument('--device',
default=[], dest='device_list', action='append',
help='Device name to relink (default: all)')
parser.add_argument('--partition', '-p', default=[], dest='partitions',
type=non_negative_int, action='append',
@ -541,6 +605,10 @@ def main(args):
type=non_negative_float, dest='files_per_second',
help='Used to limit I/O. Zero implies no limit '
'(default: no limit).')
parser.add_argument(
'--workers', default=None, type=non_negative_int, help=(
'Process devices across N workers '
'(default: one worker per device)'))
parser.add_argument('--logfile', default=None, dest='logfile',
help='Set log file name. Ignored if using conf_file.')
parser.add_argument('--debug', default=False, action='store_true',
@ -584,13 +652,12 @@ def main(args):
else non_negative_float(conf.get('files_per_second', '0'))),
'policies': set(args.policies) or POLICIES,
'partitions': set(args.partitions),
'workers': config_auto_int_value(
conf.get('workers') if args.workers is None else args.workers,
'auto'),
'link_check_limit': (
args.link_check_limit if args.link_check_limit is not None
else non_negative_int(conf.get('link_check_limit', 2))),
})
if args.action == 'relink':
return relink(conf, logger, device=args.device)
if args.action == 'cleanup':
return cleanup(conf, logger, device=args.device)
return parallel_process(
args.action == 'cleanup', conf, logger, args.device_list)

View File

@ -161,29 +161,175 @@ class TestRelinker(unittest.TestCase):
with mock.patch('swift.common.utils.listdir', mocked):
yield
def test_workers_parent(self):
os.mkdir(os.path.join(self.devices, 'sda2'))
self.rb.prepare_increase_partition_power()
self.rb.increase_partition_power()
self._save_ring()
pids = {
2: 0,
3: 0,
}
def mock_wait():
return pids.popitem()
with mock.patch('os.fork', side_effect=list(pids.keys())), \
mock.patch('os.wait', mock_wait):
self.assertEqual(0, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--workers', '2',
'--skip-mount',
]))
self.assertEqual(pids, {})
def test_workers_parent_bubbles_up_errors(self):
def do_test(wait_result, msg):
pids = {
2: 0,
3: 0,
4: 0,
5: wait_result,
6: 0,
}
with mock.patch('os.fork', side_effect=list(pids.keys())), \
mock.patch('os.wait', lambda: pids.popitem()), \
mock.patch.object(relinker.logging, 'getLogger',
return_value=self.logger):
self.assertEqual(1, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
]))
self.assertEqual(pids, {})
warning_lines = self.logger.get_lines_for_level('warning')
self.assertTrue(
warning_lines[0].startswith('Worker (pid=5, devs='))
self.assertTrue(
warning_lines[0].endswith(msg),
'Expected log line to end with %r; got %r'
% (msg, warning_lines[0]))
self.assertFalse(warning_lines[1:])
self.logger.clear()
os.mkdir(os.path.join(self.devices, 'sda2'))
os.mkdir(os.path.join(self.devices, 'sda3'))
os.mkdir(os.path.join(self.devices, 'sda4'))
os.mkdir(os.path.join(self.devices, 'sda5'))
self.rb.prepare_increase_partition_power()
self.rb.increase_partition_power()
self._save_ring()
# signals get the low bits
do_test(9, 'exited in 0.0s after receiving signal: 9')
# exit codes get the high
do_test(1 << 8, 'completed in 0.0s with errors')
do_test(42 << 8, 'exited in 0.0s with unexpected status 42')
def test_workers_children(self):
os.mkdir(os.path.join(self.devices, 'sda2'))
os.mkdir(os.path.join(self.devices, 'sda3'))
os.mkdir(os.path.join(self.devices, 'sda4'))
os.mkdir(os.path.join(self.devices, 'sda5'))
self.rb.prepare_increase_partition_power()
self.rb.increase_partition_power()
self._save_ring()
calls = []
def fake_fork():
calls.append('fork')
return 0
def fake_run(self):
calls.append(('run', self.device_list))
return 0
def fake_exit(status):
calls.append(('exit', status))
with mock.patch('os.fork', fake_fork), \
mock.patch('os._exit', fake_exit), \
mock.patch('swift.cli.relinker.Relinker.run', fake_run):
self.assertEqual(0, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--workers', '2',
'--skip-mount',
]))
self.assertEqual([
'fork',
('run', ['sda1', 'sda3', 'sda5']),
('exit', 0),
'fork',
('run', ['sda2', 'sda4']),
('exit', 0),
], calls)
# test too many workers
calls = []
with mock.patch('os.fork', fake_fork), \
mock.patch('os._exit', fake_exit), \
mock.patch('swift.cli.relinker.Relinker.run', fake_run):
self.assertEqual(0, relinker.main([
'cleanup',
'--swift-dir', self.testdir,
'--devices', self.devices,
'--workers', '6',
'--skip-mount',
]))
self.assertEqual([
'fork',
('run', ['sda1']),
('exit', 0),
'fork',
('run', ['sda2']),
('exit', 0),
'fork',
('run', ['sda3']),
('exit', 0),
'fork',
('run', ['sda4']),
('exit', 0),
'fork',
('run', ['sda5']),
('exit', 0),
], calls)
def _do_test_relinker_drop_privileges(self, command):
@contextmanager
def do_mocks():
# attach mocks to call_capture so that call order can be asserted
call_capture = mock.Mock()
with mock.patch('swift.cli.relinker.drop_privileges') as mock_dp:
with mock.patch('swift.cli.relinker.' + command,
return_value=0) as mock_command:
call_capture.attach_mock(mock_dp, 'drop_privileges')
call_capture.attach_mock(mock_command, command)
yield call_capture
mod = 'swift.cli.relinker.'
with mock.patch(mod + 'Relinker') as mock_relinker, \
mock.patch(mod + 'drop_privileges') as mock_dp, \
mock.patch(mod + 'os.listdir',
return_value=['sda', 'sdb']):
mock_relinker.return_value.run.return_value = 0
call_capture.attach_mock(mock_dp, 'drop_privileges')
call_capture.attach_mock(mock_relinker, 'run')
yield call_capture
# no user option
with do_mocks() as capture:
self.assertEqual(0, relinker.main([command]))
self.assertEqual([(command, mock.ANY, mock.ANY)],
self.assertEqual(0, relinker.main([command, '--workers', '0']))
self.assertEqual([mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'],
do_cleanup=(command == 'cleanup'))],
capture.method_calls)
# cli option --user
with do_mocks() as capture:
self.assertEqual(0, relinker.main([command, '--user', 'cli_user']))
self.assertEqual(0, relinker.main([command, '--user', 'cli_user',
'--workers', '0']))
self.assertEqual([('drop_privileges', ('cli_user',), {}),
(command, mock.ANY, mock.ANY)],
mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'],
do_cleanup=(command == 'cleanup'))],
capture.method_calls)
# cli option --user takes precedence over conf file user
@ -191,18 +337,22 @@ class TestRelinker(unittest.TestCase):
with mock.patch('swift.cli.relinker.readconf',
return_value={'user': 'conf_user'}):
self.assertEqual(0, relinker.main([command, 'conf_file',
'--user', 'cli_user']))
'--user', 'cli_user',
'--workers', '0']))
self.assertEqual([('drop_privileges', ('cli_user',), {}),
(command, mock.ANY, mock.ANY)],
mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'],
do_cleanup=(command == 'cleanup'))],
capture.method_calls)
# conf file user
with do_mocks() as capture:
with mock.patch('swift.cli.relinker.readconf',
return_value={'user': 'conf_user'}):
return_value={'user': 'conf_user',
'workers': '0'}):
self.assertEqual(0, relinker.main([command, 'conf_file']))
self.assertEqual([('drop_privileges', ('conf_user',), {}),
(command, mock.ANY, mock.ANY)],
mock.call.run(mock.ANY, mock.ANY, ['sda', 'sdb'],
do_cleanup=(command == 'cleanup'))],
capture.method_calls)
def test_relinker_drop_privileges(self):
@ -290,7 +440,7 @@ class TestRelinker(unittest.TestCase):
f.write(dedent(config))
# cite conf file on command line
with mock.patch('swift.cli.relinker.relink') as mock_relink:
with mock.patch('swift.cli.relinker.Relinker') as mock_relinker:
relinker.main(['relink', conf_file, '--device', 'sdx', '--debug'])
exp_conf = {
'__file__': mock.ANY,
@ -302,11 +452,13 @@ class TestRelinker(unittest.TestCase):
'log_name': 'test-relinker',
'log_level': 'DEBUG',
'policies': POLICIES,
'workers': 'auto',
'partitions': set(),
'link_check_limit': 2,
}
mock_relink.assert_called_once_with(exp_conf, mock.ANY, device='sdx')
logger = mock_relink.call_args[0][1]
mock_relinker.assert_called_once_with(
exp_conf, mock.ANY, ['sdx'], do_cleanup=False)
logger = mock_relinker.call_args[0][1]
# --debug overrides conf file
self.assertEqual(logging.DEBUG, logger.getEffectiveLevel())
self.assertEqual('test-relinker', logger.logger.name)
@ -333,9 +485,9 @@ class TestRelinker(unittest.TestCase):
"""
with open(conf_file, 'w') as f:
f.write(dedent(config))
with mock.patch('swift.cli.relinker.relink') as mock_relink:
with mock.patch('swift.cli.relinker.Relinker') as mock_relinker:
relinker.main(['relink', conf_file, '--device', 'sdx'])
mock_relink.assert_called_once_with({
mock_relinker.assert_called_once_with({
'__file__': mock.ANY,
'swift_dir': 'test/swift/dir',
'devices': '/test/node',
@ -345,14 +497,15 @@ class TestRelinker(unittest.TestCase):
'log_level': 'WARNING',
'policies': POLICIES,
'partitions': set(),
'workers': 'auto',
'link_check_limit': 1,
}, mock.ANY, device='sdx')
logger = mock_relink.call_args[0][1]
}, mock.ANY, ['sdx'], do_cleanup=False)
logger = mock_relinker.call_args[0][1]
self.assertEqual(logging.WARNING, logger.getEffectiveLevel())
self.assertEqual('test-relinker', logger.logger.name)
# override with cli options...
with mock.patch('swift.cli.relinker.relink') as mock_relink:
with mock.patch('swift.cli.relinker.Relinker') as mock_relinker:
relinker.main([
'relink', conf_file, '--device', 'sdx', '--debug',
'--swift-dir', 'cli-dir', '--devices', 'cli-devs',
@ -361,7 +514,7 @@ class TestRelinker(unittest.TestCase):
'--partition', '123', '--partition', '456',
'--link-check-limit', '3',
])
mock_relink.assert_called_once_with({
mock_relinker.assert_called_once_with({
'__file__': mock.ANY,
'swift_dir': 'cli-dir',
'devices': 'cli-devs',
@ -371,15 +524,16 @@ class TestRelinker(unittest.TestCase):
'log_name': 'test-relinker',
'policies': {POLICIES[1]},
'partitions': {123, 456},
'workers': 'auto',
'link_check_limit': 3,
}, mock.ANY, device='sdx')
}, mock.ANY, ['sdx'], do_cleanup=False)
with mock.patch('swift.cli.relinker.relink') as mock_relink, \
with mock.patch('swift.cli.relinker.Relinker') as mock_relinker, \
mock.patch('logging.basicConfig') as mock_logging_config:
relinker.main(['relink', '--device', 'sdx',
'--swift-dir', 'cli-dir', '--devices', 'cli-devs',
'--skip-mount-check'])
mock_relink.assert_called_once_with({
mock_relinker.assert_called_once_with({
'swift_dir': 'cli-dir',
'devices': 'cli-devs',
'mount_check': False,
@ -387,12 +541,13 @@ class TestRelinker(unittest.TestCase):
'log_level': 'INFO',
'policies': POLICIES,
'partitions': set(),
'workers': 'auto',
'link_check_limit': 2,
}, mock.ANY, device='sdx')
}, mock.ANY, ['sdx'], do_cleanup=False)
mock_logging_config.assert_called_once_with(
format='%(message)s', level=logging.INFO, filename=None)
with mock.patch('swift.cli.relinker.relink') as mock_relink, \
with mock.patch('swift.cli.relinker.Relinker') as mock_relinker, \
mock.patch('logging.basicConfig') as mock_logging_config:
relinker.main([
'relink', '--debug',
@ -404,7 +559,7 @@ class TestRelinker(unittest.TestCase):
'--policy', '1',
'--policy', '0',
])
mock_relink.assert_called_once_with({
mock_relinker.assert_called_once_with({
'swift_dir': 'cli-dir',
'devices': 'cli-devs',
'mount_check': False,
@ -412,8 +567,9 @@ class TestRelinker(unittest.TestCase):
'log_level': 'DEBUG',
'policies': set(POLICIES),
'partitions': set(),
'workers': 'auto',
'link_check_limit': 2
}, mock.ANY, device='sdx')
}, mock.ANY, ['sdx'], do_cleanup=False)
# --debug is now effective
mock_logging_config.assert_called_once_with(
format='%(message)s', level=logging.DEBUG, filename=None)
@ -740,10 +896,10 @@ class TestRelinker(unittest.TestCase):
(('ts', 0),),
(('ts', 0),),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_relink_link_already_exists_but_different_inode(self):
self.rb.prepare_increase_partition_power()
@ -769,9 +925,9 @@ class TestRelinker(unittest.TestCase):
'[Errno 17] File exists'
% (self.objname, self.expected_file),
warning_lines[0])
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=False) '
'(1 files, 0 linked, 0 removed, 1 errors)', info_lines)
'(1 files, 0 linked, 0 removed, 1 errors)',
warning_lines)
def test_relink_link_already_exists(self):
self.rb.prepare_increase_partition_power()
@ -862,7 +1018,10 @@ class TestRelinker(unittest.TestCase):
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping sda1 as it is not mounted',
'1 disks were unmounted'])
'1 disks were unmounted',
'0 hash dirs processed (cleanup=False) '
'(0 files, 0 linked, 0 removed, 0 errors)',
])
def test_relink_listdir_error(self):
self.rb.prepare_increase_partition_power()
@ -878,10 +1037,10 @@ class TestRelinker(unittest.TestCase):
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping %s because ' % self.objects,
'There were 1 errors listing partition directories'])
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('0 hash dirs processed (cleanup=False) '
'(0 files, 0 linked, 0 removed, 1 errors)', info_lines)
'There were 1 errors listing partition directories',
'0 hash dirs processed (cleanup=False) '
'(0 files, 0 linked, 0 removed, 1 errors)',
])
def test_relink_device_filter(self):
self.rb.prepare_increase_partition_power()
@ -946,7 +1105,7 @@ class TestRelinker(unittest.TestCase):
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
'--partition', '-1'
'--partition', '-1',
]))
self.assertEqual(2, cm.exception.code)
@ -957,7 +1116,7 @@ class TestRelinker(unittest.TestCase):
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
'--partition', 'abc'
'--partition', 'abc',
]))
self.assertEqual(2, cm.exception.code)
@ -970,12 +1129,12 @@ class TestRelinker(unittest.TestCase):
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
'--partition', str(self.part + 1)
'--partition', str(self.part + 1),
]))
self.assertFalse(os.path.isdir(self.expected_dir))
self.assertEqual(
['Processing files for policy platinum under %s (cleanup=False)'
% self.devices,
% os.path.join(self.devices, 'sda1'),
'0 hash dirs processed (cleanup=False) (0 files, 0 linked, '
'0 removed, 0 errors)'],
self.logger.get_lines_for_level('info')
@ -990,7 +1149,7 @@ class TestRelinker(unittest.TestCase):
'--swift-dir', self.testdir,
'--devices', self.devices,
'--skip-mount',
'--partition', str(self.part)
'--partition', str(self.part),
]))
self.assertTrue(os.path.isdir(self.expected_dir))
self.assertTrue(os.path.isfile(self.expected_file))
@ -999,7 +1158,7 @@ class TestRelinker(unittest.TestCase):
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
self.assertEqual(
['Processing files for policy platinum under %s (cleanup=False)'
% self.devices,
% os.path.join(self.devices, 'sda1'),
'Step: relink Device: sda1 Policy: platinum Partitions: 1/3',
'1 hash dirs processed (cleanup=False) (1 files, 1 linked, '
'0 removed, 0 errors)'],
@ -1017,7 +1176,7 @@ class TestRelinker(unittest.TestCase):
'--skip-mount',
'--partition', str(other_objs[0][0]),
'-p', str(other_objs[0][0]), # duplicates should be ignored
'-p', str(other_objs[1][0])
'-p', str(other_objs[1][0]),
]))
expected_file = utils.replace_partition_in_path(
self.devices, other_objs[0][1], PART_POWER + 1)
@ -1033,7 +1192,7 @@ class TestRelinker(unittest.TestCase):
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
self.assertEqual(
['Processing files for policy platinum under %s (cleanup=False)'
% self.devices,
% os.path.join(self.devices, 'sda1'),
'Step: relink Device: sda1 Policy: platinum Partitions: 2/3',
'Step: relink Device: sda1 Policy: platinum Partitions: 3/3',
'2 hash dirs processed (cleanup=False) (2 files, 2 linked, '
@ -1398,11 +1557,10 @@ class TestRelinker(unittest.TestCase):
(('ts', 0),), # retained
(('ts', 0),),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('2 hash dirs processed (cleanup=False) '
'(2 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines = self.logger.get_lines_for_level('warning')
warning_lines)
self.assertIn('Error relinking: failed to relink', warning_lines[0])
with open(new_filepath, 'r') as fd:
self.assertEqual(older_filepath, fd.read())
@ -1507,10 +1665,10 @@ class TestRelinker(unittest.TestCase):
extra_options=['--link-check-limit', '0'],
mock_relink_paths=mock_relink_paths,
)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=False) '
'(1 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
# one attempt to link from current plus a check/retry from each
# possible partition power, stopping at part power 1
@ -1544,10 +1702,10 @@ class TestRelinker(unittest.TestCase):
extra_options=['--link-check-limit', str(PART_POWER + 99)],
mock_relink_paths=mock_relink_paths,
)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=False) '
'(1 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
# one attempt to link from current plus a check/retry from each
# possible partition power, stopping at part power 1
@ -1629,9 +1787,11 @@ class TestRelinker(unittest.TestCase):
'devices': self.devices,
'mount_check': False,
'files_per_second': 0,
'policies': POLICIES}
self.assertEqual(0, relinker.relink(
conf, logger=self.logger, device=self.existing_device))
'policies': POLICIES,
'workers': 0}
self.assertEqual(0, relinker.Relinker(
conf, logger=self.logger, device_list=[self.existing_device],
do_cleanup=False).run())
self.rb.increase_partition_power()
self._save_ring()
@ -1845,10 +2005,10 @@ class TestRelinker(unittest.TestCase):
exp_ret_code=1,
relink_errors={'data': OSError(errno.EPERM, 'oops')}
)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_cleanup_missing_meta_file_relink_fails(self):
self._cleanup_test((('data', 0), ('meta', 1)),
@ -1859,10 +2019,10 @@ class TestRelinker(unittest.TestCase):
exp_ret_code=1,
relink_errors={'meta': OSError(errno.EPERM, 'oops')}
)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_cleanup_missing_data_and_meta_file_one_relink_fails(self):
self._cleanup_test((('data', 0), ('meta', 1)),
@ -1873,10 +2033,10 @@ class TestRelinker(unittest.TestCase):
exp_ret_code=1,
relink_errors={'meta': OSError(errno.EPERM, 'oops')}
)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 1 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_cleanup_missing_data_and_meta_file_both_relinks_fails(self):
self._cleanup_test((('data', 0), ('meta', 1)),
@ -1888,10 +2048,10 @@ class TestRelinker(unittest.TestCase):
relink_errors={'data': OSError(errno.EPERM, 'oops'),
'meta': OSError(errno.EPERM, 'oops')}
)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 0 removed, 2 errors)',
info_lines)
warning_lines)
def test_cleanup_conflicting_data_file(self):
self._cleanup_test((('data', 0),),
@ -1900,10 +2060,10 @@ class TestRelinker(unittest.TestCase):
(('data', 0),),
(('data', 0),),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_cleanup_conflicting_ts_file(self):
self._cleanup_test((('ts', 0),),
@ -1912,10 +2072,10 @@ class TestRelinker(unittest.TestCase):
(('ts', 0),),
(('ts', 0),),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_cleanup_conflicting_ts_is_linked_to_part_power_minus_1(self):
self._setup_object(lambda part: part >= 2 ** (PART_POWER - 1))
@ -1965,11 +2125,10 @@ class TestRelinker(unittest.TestCase):
(('ts', 0),), # retained
(('ts', 0),),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('2 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 1 removed, 1 errors)',
info_lines)
warning_lines = self.logger.get_lines_for_level('warning')
warning_lines)
self.assertIn('Error relinking (cleanup): failed to relink',
warning_lines[0])
with open(new_filepath, 'r') as fd:
@ -2031,10 +2190,10 @@ class TestRelinker(unittest.TestCase):
(('data', 0), ('meta', 1)),
(('data', 0), ('meta', 1)),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 0 removed, 2 errors)',
info_lines)
warning_lines)
def test_cleanup_conflicting_data_file_existing_meta_file(self):
# if just one link fails to be created then *nothing* is removed from
@ -2045,10 +2204,10 @@ class TestRelinker(unittest.TestCase):
(('data', 0), ('meta', 1)),
(('data', 0), ('meta', 1)),
exp_ret_code=1)
info_lines = self.logger.get_lines_for_level('info')
warning_lines = self.logger.get_lines_for_level('warning')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 0 removed, 1 errors)',
info_lines)
warning_lines)
def test_cleanup_first_quartile_does_rehash(self):
# we need object name in lower half of current part
@ -2160,7 +2319,10 @@ class TestRelinker(unittest.TestCase):
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping sda1 as it is not mounted',
'1 disks were unmounted'])
'1 disks were unmounted',
'0 hash dirs processed (cleanup=True) '
'(0 files, 0 linked, 0 removed, 0 errors)',
])
def test_cleanup_listdir_error(self):
self._common_test_cleanup()
@ -2175,10 +2337,10 @@ class TestRelinker(unittest.TestCase):
]))
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Skipping %s because ' % self.objects,
'There were 1 errors listing partition directories'])
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('0 hash dirs processed (cleanup=True) '
'(0 files, 0 linked, 0 removed, 1 errors)', info_lines)
'There were 1 errors listing partition directories',
'0 hash dirs processed (cleanup=True) '
'(0 files, 0 linked, 0 removed, 1 errors)',
])
def test_cleanup_device_filter(self):
self._common_test_cleanup()
@ -2269,7 +2431,7 @@ class TestRelinker(unittest.TestCase):
self.assertEqual(set([self.existing_device]), devices)
# With a non matching filter, returns nothing
r.device = 'none'
r.device_list = ['none']
devices = r.devices_filter("", [self.existing_device])
self.assertEqual(set(), devices)
@ -2539,15 +2701,15 @@ class TestRelinker(unittest.TestCase):
with open(self.expected_file, 'r') as fd:
self.assertEqual('same but different', fd.read())
warning_lines = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_lines), warning_lines)
self.assertEqual(2, len(warning_lines), warning_lines)
self.assertIn('Error relinking (cleanup): failed to relink %s to %s'
% (self.objname, self.expected_file), warning_lines[0])
# suffix should not be invalidated in new partition
hashes_invalid = os.path.join(self.next_part_dir, 'hashes.invalid')
self.assertFalse(os.path.exists(hashes_invalid))
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)', info_lines)
self.assertEqual('1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)',
warning_lines[1])
def test_cleanup_older_object_in_new_partition(self):
# relink of the current object failed, but there is an older version of
@ -2749,14 +2911,12 @@ class TestRelinker(unittest.TestCase):
]))
self.assertFalse(os.path.isfile(self.expected_file))
self.assertTrue(os.path.isfile(self.objname)) # old file intact
self.assertEqual(
['Error relinking (cleanup): failed to relink %s to %s: '
% (self.objname, self.expected_file)],
self.logger.get_lines_for_level('warning'),
)
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)', info_lines)
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Error relinking (cleanup): failed to relink %s to %s: '
% (self.objname, self.expected_file),
'1 hash dirs processed (cleanup=True) '
'(1 files, 0 linked, 0 removed, 1 errors)',
])
# suffix should not be invalidated in new partition
self.assertTrue(os.path.exists(hashes_invalid))
with open(hashes_invalid, 'r') as fd:
@ -2797,13 +2957,11 @@ class TestRelinker(unittest.TestCase):
self.assertTrue(os.path.isfile(new_meta_path)) # new file intact
self.assertFalse(os.path.isfile(self.objname)) # old file removed
self.assertTrue(os.path.isfile(old_meta_path)) # meta file remove fail
self.assertEqual(
['Error cleaning up %s: OSError()' % old_meta_path],
self.logger.get_lines_for_level('warning'),
)
info_lines = self.logger.get_lines_for_level('info')
self.assertIn('1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 1 removed, 1 errors)', info_lines)
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Error cleaning up %s: OSError()' % old_meta_path,
'1 hash dirs processed (cleanup=True) '
'(2 files, 0 linked, 1 removed, 1 errors)',
])
def test_cleanup_two_files_need_linking(self):
meta_file = utils.Timestamp(int(self.obj_ts) + 1).internal + '.meta'