Merge "Make multiprocess reconstructor's logs more readable."

This commit is contained in:
Zuul 2018-06-04 16:19:24 +00:00 committed by Gerrit Code Review
commit 84f2bfcb2e
3 changed files with 148 additions and 71 deletions

View File

@ -33,7 +33,8 @@ from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger,
dump_recon_cache, mkdirs, config_true_value,
tpool_reraise, GreenAsyncPile, Timestamp, remove_file,
load_recon_cache, parse_override_options, distribute_evenly)
load_recon_cache, parse_override_options, distribute_evenly,
PrefixLoggerAdapter)
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
@ -142,8 +143,8 @@ class ObjectReconstructor(Daemon):
:param logger: logging object
"""
self.conf = conf
self.logger = logger or get_logger(
conf, log_route='object-reconstructor')
self.logger = PrefixLoggerAdapter(
logger or get_logger(conf, log_route='object-reconstructor'), {})
self.devices_dir = conf.get('devices', '/srv/node')
self.mount_check = config_true_value(conf.get('mount_check', 'true'))
self.swift_dir = conf.get('swift_dir', '/etc/swift')
@ -225,16 +226,21 @@ class ObjectReconstructor(Daemon):
if not devices:
# we only need a single worker to do nothing until a ring change
yield dict(override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
override_partitions=override_opts.partitions,
multiprocess_worker_index=0)
return
# for somewhat uniform load per worker use same
# max_devices_per_worker when handling all devices or just override
# devices, but only use enough workers for the actual devices being
# handled
n_workers = min(self.reconstructor_workers, len(devices))
for ods in distribute_evenly(devices, n_workers):
self.reconstructor_workers = min(self.reconstructor_workers,
len(devices))
for index, ods in enumerate(distribute_evenly(
devices, self.reconstructor_workers)):
yield dict(override_partitions=override_opts.partitions,
override_devices=ods)
override_devices=ods,
multiprocess_worker_index=index)
def is_healthy(self):
"""
@ -571,6 +577,12 @@ class ObjectReconstructor(Daemon):
_("Nothing reconstructed for %s seconds."),
(time.time() - self.start))
def _emplace_log_prefix(self, worker_index):
self.logger.set_prefix("[worker %d/%d pid=%s] " % (
worker_index + 1, # use 1-based indexing for more readable logs
self.reconstructor_workers,
os.getpid()))
def kill_coros(self):
"""Utility function that kills all coroutines currently running."""
for coro in list(self.run_pool.coroutines_running):
@ -1213,7 +1225,9 @@ class ObjectReconstructor(Daemon):
recon_update['object_reconstruction_per_disk'] = {}
dump_recon_cache(recon_update, self.rcache, self.logger)
def run_once(self, *args, **kwargs):
def run_once(self, multiprocess_worker_index=None, *args, **kwargs):
if multiprocess_worker_index is not None:
self._emplace_log_prefix(multiprocess_worker_index)
start = time.time()
self.logger.info(_("Running object reconstructor in script mode."))
override_opts = parse_override_options(once=True, **kwargs)
@ -1231,7 +1245,9 @@ class ObjectReconstructor(Daemon):
total, override_devices=override_opts.devices,
override_partitions=override_opts.partitions)
def run_forever(self, *args, **kwargs):
def run_forever(self, multiprocess_worker_index=None, *args, **kwargs):
if multiprocess_worker_index is not None:
self._emplace_log_prefix(multiprocess_worker_index)
self.logger.info(_("Starting object reconstructor in daemon mode."))
# Run the reconstructor continually
while True:

View File

@ -755,7 +755,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
with mock.patch('swift.obj.reconstructor.ObjectReconstructor.'
'check_ring', return_value=False):
self.reconstructor.reconstruct()
msgs = self.reconstructor.logger.get_lines_for_level('info')
msgs = self.logger.get_lines_for_level('info')
self.assertIn('Ring change detected. Aborting'
' current reconstruction pass.', msgs[0])
self.assertEqual(self.reconstructor.reconstruction_count, 0)
@ -802,7 +802,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
], found)
self.assertEqual(found_job_types, {object_reconstructor.REVERT})
# but failures keep handoffs remaining
msgs = self.reconstructor.logger.get_lines_for_level('info')
msgs = self.logger.get_lines_for_level('info')
self.assertIn('Next pass will continue to revert handoffs', msgs[-1])
self.logger._clear()
@ -818,7 +818,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.reconstructor.reconstruct()
self.assertEqual(found_job_types, {object_reconstructor.REVERT})
# it's time to turn off handoffs_only
msgs = self.reconstructor.logger.get_lines_for_level('warning')
msgs = self.logger.get_lines_for_level('warning')
self.assertIn('You should disable handoffs_only', msgs[-1])
def test_get_partners(self):
@ -895,7 +895,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.reconstructor.reconstruct()
self.assertFalse(os.path.exists(pol_1_part_1_path))
warnings = self.reconstructor.logger.get_lines_for_level('warning')
warnings = self.logger.get_lines_for_level('warning')
self.assertEqual(2, len(warnings))
# first warning is due to get_hashes failing to take lock on non-dir
self.assertIn(pol_1_part_1_path + '/hashes.pkl', warnings[0])
@ -934,7 +934,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.reconstructor._reset_stats()
for part_info in self.reconstructor.collect_parts():
self.assertNotIn(part_info['part_path'], status_paths)
warnings = self.reconstructor.logger.get_lines_for_level('warning')
warnings = self.logger.get_lines_for_level('warning')
self.assertEqual(0, len(warnings))
for status_path in status_paths:
self.assertTrue(os.path.exists(status_path))
@ -1066,9 +1066,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# failed jobs don't sync suffixes
self.assertFalse(
self.reconstructor.logger.get_lines_for_level('warning'))
self.logger.get_lines_for_level('warning'))
self.assertFalse(
self.reconstructor.logger.get_lines_for_level('error'))
self.logger.get_lines_for_level('error'))
# handoffs remaining and part exists
self.assertEqual(2, self.reconstructor.handoffs_remaining)
self.assertTrue(os.path.exists(self.parts_1['2']))
@ -1099,10 +1099,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
set((r['ip'], r['path'])
for r in request_log.requests))
self.assertFalse(
self.reconstructor.logger.get_lines_for_level('error'))
self.logger.get_lines_for_level('error'))
# handoffs are cleaned up
self.assertEqual(0, self.reconstructor.handoffs_remaining)
warning_msgs = self.reconstructor.logger.get_lines_for_level('warning')
warning_msgs = self.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warning_msgs))
self.assertIn('no handoffs remaining', warning_msgs[0])
@ -1335,7 +1335,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.reconstructor.reconstruct()
self.assertEqual(0, self.reconstructor.reconstruction_count)
warnings = self.reconstructor.logger.get_lines_for_level('warning')
warnings = self.logger.get_lines_for_level('warning')
self.assertIn(
"next_part_power set in policy 'one'. Skipping", warnings)
@ -1383,7 +1383,8 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(num_workers, reconstructor.reconstructor_workers)
self.assertEqual(1, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': []},
{'override_partitions': [], 'override_devices': [],
'multiprocess_worker_index': 0},
], list(reconstructor.get_worker_args()))
do_test(1)
do_test(10)
@ -1399,15 +1400,18 @@ class TestWorkerReconstructor(unittest.TestCase):
once=True, devices='sdz'))
self.assertEqual(1, len(worker_args))
self.assertEqual([{'override_partitions': [],
'override_devices': ['sdz']}],
'override_devices': ['sdz'],
'multiprocess_worker_index': 0}],
worker_args)
# overrides are ignored in forever mode
worker_args = list(reconstructor.get_worker_args(
once=False, devices='sdz'))
self.assertEqual(2, len(worker_args))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']}
{'override_partitions': [], 'override_devices': ['sdb'],
'multiprocess_worker_index': 0},
{'override_partitions': [], 'override_devices': ['sdc'],
'multiprocess_worker_index': 1},
], worker_args)
def test_workers_with_devices(self):
@ -1417,8 +1421,10 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(2, reconstructor.reconstructor_workers)
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
expected = [
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']},
{'override_partitions': [], 'override_devices': ['sdb'],
'multiprocess_worker_index': 0},
{'override_partitions': [], 'override_devices': ['sdc'],
'multiprocess_worker_index': 1},
]
worker_args = list(reconstructor.get_worker_args(once=False))
self.assertEqual(2, len(worker_args))
@ -1439,15 +1445,21 @@ class TestWorkerReconstructor(unittest.TestCase):
once=True, devices='sdb,sdz', partitions='99,333'))
self.assertEqual(1, len(worker_args))
self.assertEqual(
[{'override_partitions': [99, 333], 'override_devices': ['sdb']}],
[{'override_partitions': [99, 333], 'override_devices': ['sdb'],
'multiprocess_worker_index': 0}],
worker_args)
# overrides are ignored in forever mode
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '2'}, logger=self.logger)
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
worker_args = list(reconstructor.get_worker_args(
once=False, devices='sdb,sdz', partitions='99,333'))
self.assertEqual(2, len(worker_args))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']}
{'override_partitions': [], 'override_devices': ['sdb'],
'multiprocess_worker_index': 0},
{'override_partitions': [], 'override_devices': ['sdc'],
'multiprocess_worker_index': 1}
], worker_args)
def test_workers_with_lots_of_devices(self):
@ -1458,10 +1470,12 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(2, reconstructor.reconstructor_workers)
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': [
'sdb', 'sdd', 'sdf']},
{'override_partitions': [], 'override_devices': [
'sdc', 'sde']},
{'override_partitions': [],
'override_devices': ['sdb', 'sdd', 'sdf'],
'multiprocess_worker_index': 0},
{'override_partitions': [],
'override_devices': ['sdc', 'sde'],
'multiprocess_worker_index': 1},
], list(reconstructor.get_worker_args()))
def test_workers_with_lots_of_devices_and_overrides(self):
@ -1479,9 +1493,11 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual([{
'override_partitions': [99, 333],
'override_devices': ['sdb', 'sdf'],
'multiprocess_worker_index': 0,
}, {
'override_partitions': [99, 333],
'override_devices': ['sdd'],
'multiprocess_worker_index': 1,
}], worker_args)
# with 4 override devices, expect 2 per worker
@ -1489,10 +1505,12 @@ class TestWorkerReconstructor(unittest.TestCase):
once=True, devices='sdb,sdc,sdd,sdf', partitions='99,333'))
self.assertEqual(2, len(worker_args))
self.assertEqual([
{'override_partitions': [99, 333], 'override_devices': [
'sdb', 'sdd']},
{'override_partitions': [99, 333], 'override_devices': [
'sdc', 'sdf']},
{'override_partitions': [99, 333],
'override_devices': ['sdb', 'sdd'],
'multiprocess_worker_index': 0},
{'override_partitions': [99, 333],
'override_devices': ['sdc', 'sdf'],
'multiprocess_worker_index': 1},
], worker_args)
def test_workers_with_lots_of_workers(self):
@ -1502,8 +1520,10 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(10, reconstructor.reconstructor_workers)
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']},
{'override_partitions': [], 'override_devices': ['sdb'],
'multiprocess_worker_index': 0},
{'override_partitions': [], 'override_devices': ['sdc'],
'multiprocess_worker_index': 1},
], list(reconstructor.get_worker_args()))
def test_workers_with_lots_of_workers_and_devices(self):
@ -1514,11 +1534,16 @@ class TestWorkerReconstructor(unittest.TestCase):
self.assertEqual(10, reconstructor.reconstructor_workers)
self.assertEqual(5, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']},
{'override_partitions': [], 'override_devices': ['sdd']},
{'override_partitions': [], 'override_devices': ['sde']},
{'override_partitions': [], 'override_devices': ['sdf']},
{'override_partitions': [], 'override_devices': ['sdb'],
'multiprocess_worker_index': 0},
{'override_partitions': [], 'override_devices': ['sdc'],
'multiprocess_worker_index': 1},
{'override_partitions': [], 'override_devices': ['sdd'],
'multiprocess_worker_index': 2},
{'override_partitions': [], 'override_devices': ['sde'],
'multiprocess_worker_index': 3},
{'override_partitions': [], 'override_devices': ['sdf'],
'multiprocess_worker_index': 4},
], list(reconstructor.get_worker_args()))
def test_workers_with_some_workers_and_devices(self):
@ -1562,17 +1587,28 @@ class TestWorkerReconstructor(unittest.TestCase):
# Spot check one full result for sanity's sake
reconstructor.reconstructor_workers = 11
self.assertEqual([
{'override_partitions': [], 'override_devices': ['d1', 'd12']},
{'override_partitions': [], 'override_devices': ['d2', 'd13']},
{'override_partitions': [], 'override_devices': ['d3', 'd14']},
{'override_partitions': [], 'override_devices': ['d4', 'd15']},
{'override_partitions': [], 'override_devices': ['d5', 'd16']},
{'override_partitions': [], 'override_devices': ['d6', 'd17']},
{'override_partitions': [], 'override_devices': ['d7', 'd18']},
{'override_partitions': [], 'override_devices': ['d8', 'd19']},
{'override_partitions': [], 'override_devices': ['d9', 'd20']},
{'override_partitions': [], 'override_devices': ['d10', 'd21']},
{'override_partitions': [], 'override_devices': ['d11']},
{'override_partitions': [], 'override_devices': ['d1', 'd12'],
'multiprocess_worker_index': 0},
{'override_partitions': [], 'override_devices': ['d2', 'd13'],
'multiprocess_worker_index': 1},
{'override_partitions': [], 'override_devices': ['d3', 'd14'],
'multiprocess_worker_index': 2},
{'override_partitions': [], 'override_devices': ['d4', 'd15'],
'multiprocess_worker_index': 3},
{'override_partitions': [], 'override_devices': ['d5', 'd16'],
'multiprocess_worker_index': 4},
{'override_partitions': [], 'override_devices': ['d6', 'd17'],
'multiprocess_worker_index': 5},
{'override_partitions': [], 'override_devices': ['d7', 'd18'],
'multiprocess_worker_index': 6},
{'override_partitions': [], 'override_devices': ['d8', 'd19'],
'multiprocess_worker_index': 7},
{'override_partitions': [], 'override_devices': ['d9', 'd20'],
'multiprocess_worker_index': 8},
{'override_partitions': [], 'override_devices': ['d10', 'd21'],
'multiprocess_worker_index': 9},
{'override_partitions': [], 'override_devices': ['d11'],
'multiprocess_worker_index': 10},
], list(reconstructor.get_worker_args()))
def test_next_rcache_update_configured_with_stats_interval(self):
@ -2395,6 +2431,33 @@ class TestWorkerReconstructor(unittest.TestCase):
}
}, data)
def test_worker_logging(self):
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 4,
'recon_cache_path': self.recon_cache_path
}, logger=self.logger)
def log_some_stuff(*a, **kw):
reconstructor.logger.debug("debug message")
reconstructor.logger.info("info message")
reconstructor.logger.warning("warning message")
reconstructor.logger.error("error message")
with mock.patch.object(reconstructor, 'reconstruct',
log_some_stuff), \
mock.patch("os.getpid", lambda: 20641):
reconstructor.get_worker_args()
reconstructor.run_once(multiprocess_worker_index=1,
override_devices=['sda', 'sdb'])
prefix = "[worker 2/4 pid=20641] "
for level, lines in self.logger.logger.all_log_lines().items():
for line in lines:
self.assertTrue(
line.startswith(prefix),
"%r doesn't start with %r (level %s)" % (
line, prefix, level))
@patch_policies(with_ec_default=True)
class BaseTestObjectReconstructor(unittest.TestCase):

View File

@ -309,8 +309,8 @@ class TestBaseSsyncEC(TestBaseSsync):
def setUp(self):
super(TestBaseSsyncEC, self).setUp()
self.policy = POLICIES.default
self.daemon = ObjectReconstructor(self.daemon_conf,
debug_logger('test-ssync-sender'))
self.logger = debug_logger('test-ssync-sender')
self.daemon = ObjectReconstructor(self.daemon_conf, self.logger)
def _get_object_data(self, path, frag_index=None, **kwargs):
# return a frag archive for given object name and frag index.
@ -674,7 +674,7 @@ class TestSsyncEC(TestBaseSsyncEC):
self.daemon, self.rx_node, job, ['abc'])
success, _ = sender()
self.assertFalse(success)
error_log_lines = self.daemon.logger.get_lines_for_level('error')
error_log_lines = self.logger.get_lines_for_level('error')
self.assertEqual(1, len(error_log_lines))
error_msg = error_log_lines[0]
self.assertIn("Expected status 200; got 400", error_msg)
@ -857,7 +857,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
self.policy.object_ring, 'get_part_nodes',
fake_get_part_nodes):
self.reconstructor = ObjectReconstructor(
{}, logger=debug_logger('test_reconstructor'))
{}, logger=self.logger)
job = {
'device': self.device,
'partition': self.partition,
@ -892,7 +892,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.daemon.logger.get_lines_for_level('error')
log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
@ -926,7 +926,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.daemon.logger.get_lines_for_level('error')
log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
@ -969,12 +969,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
log_lines = self.reconstructor.logger.get_lines_for_level('error')
log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Error trying to rebuild', log_lines[0])
log_lines = self.daemon.logger.get_lines_for_level('error')
self.assertIn('Sent data length does not match content-length',
log_lines[0])
self.assertFalse(log_lines[1:])
log_lines[1])
self.assertFalse(log_lines[2:])
# trampoline for the receiver to write a log
eventlet.sleep(0)
log_lines = self.rx_logger.get_lines_for_level('warning')
@ -1027,8 +1026,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
pass # expected outcome
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
self.assertFalse(self.daemon.logger.get_lines_for_level('error'))
log_lines = self.reconstructor.logger.get_lines_for_level('error')
log_lines = self.logger.get_lines_for_level('error')
self.assertIn('Unable to get enough responses', log_lines[0])
# trampoline for the receiver to write a log
eventlet.sleep(0)
@ -1063,9 +1061,9 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
msgs.append('Missing rx diskfile for %r' % obj_name)
if msgs:
self.fail('Failed with:\n%s' % '\n'.join(msgs))
self.assertFalse(self.daemon.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(
self.reconstructor.logger.get_lines_for_level('error'))
self.logger.get_lines_for_level('error'))
# trampoline for the receiver to write a log
eventlet.sleep(0)
self.assertFalse(self.rx_logger.get_lines_for_level('warning'))
@ -1076,8 +1074,8 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
class TestSsyncReplication(TestBaseSsync):
def setUp(self):
super(TestSsyncReplication, self).setUp()
self.daemon = ObjectReplicator(self.daemon_conf,
debug_logger('test-ssync-sender'))
self.logger = debug_logger('test-ssync-sender')
self.daemon = ObjectReplicator(self.daemon_conf, self.logger)
def test_sync(self):
policy = POLICIES.default