Shuffle disks and parts in reconstructor

The main problem with going disk by disk is that it means all of your
I/O is only on one spindle at a time and no matter how high you set
concurrency it doesn't go any faster.

Closes-Bug: #1491605

Change-Id: I69e4c4baee64fd2192cbf5836b0803db1cc71705
This commit is contained in:
Clay Gerrard 2017-01-25 11:45:55 -08:00
parent 2914e04493
commit 2f0ab78f9f
2 changed files with 24 additions and 13 deletions

View File

@ -14,6 +14,7 @@
# limitations under the License. # limitations under the License.
import json import json
import errno
import os import os
from os.path import join from os.path import join
import random import random
@ -697,7 +698,14 @@ class ObjectReconstructor(Daemon):
:returns: a list of dicts of job info :returns: a list of dicts of job info
""" """
# find all the fi's in the part, and which suffixes have them # find all the fi's in the part, and which suffixes have them
hashes = self._get_hashes(policy, part_path, do_listdir=True) try:
hashes = self._get_hashes(policy, part_path, do_listdir=True)
except OSError as e:
if e.errno != errno.ENOTDIR:
raise
self.logger.warning(
'Unexpected entity %r is not a directory' % part_path)
return []
non_data_fragment_suffixes = [] non_data_fragment_suffixes = []
data_fi_to_suffixes = defaultdict(list) data_fi_to_suffixes = defaultdict(list)
for suffix, fi_hash in hashes.items(): for suffix, fi_hash in hashes.items():
@ -820,6 +828,8 @@ class ObjectReconstructor(Daemon):
policy2devices[policy] = local_devices policy2devices[policy] = local_devices
self.device_count += len(local_devices) self.device_count += len(local_devices)
all_parts = []
for policy, local_devices in policy2devices.items(): for policy, local_devices in policy2devices.items():
df_mgr = self._df_router[policy] df_mgr = self._df_router[policy]
for local_dev in local_devices: for local_dev in local_devices:
@ -854,8 +864,7 @@ class ObjectReconstructor(Daemon):
if partition in ('auditor_status_ALL.json', if partition in ('auditor_status_ALL.json',
'auditor_status_ZBF.json'): 'auditor_status_ZBF.json'):
continue continue
if not (partition.isdigit() and if not partition.isdigit():
os.path.isdir(part_path)):
self.logger.warning( self.logger.warning(
'Unexpected entity in data dir: %r' % part_path) 'Unexpected entity in data dir: %r' % part_path)
remove_file(part_path) remove_file(part_path)
@ -871,8 +880,9 @@ class ObjectReconstructor(Daemon):
'partition': partition, 'partition': partition,
'part_path': part_path, 'part_path': part_path,
} }
yield part_info all_parts.append(part_info)
self.reconstruction_part_count += 1 random.shuffle(all_parts)
return all_parts
def build_reconstruction_jobs(self, part_info): def build_reconstruction_jobs(self, part_info):
""" """
@ -903,6 +913,7 @@ class ObjectReconstructor(Daemon):
def delete_partition(self, path): def delete_partition(self, path):
self.logger.info(_("Removing partition: %s"), path) self.logger.info(_("Removing partition: %s"), path)
tpool.execute(shutil.rmtree, path, ignore_errors=True) tpool.execute(shutil.rmtree, path, ignore_errors=True)
remove_file(path)
def reconstruct(self, **kwargs): def reconstruct(self, **kwargs):
"""Run a reconstruction pass""" """Run a reconstruction pass"""
@ -920,6 +931,7 @@ class ObjectReconstructor(Daemon):
self.logger.info(_("Ring change detected. Aborting " self.logger.info(_("Ring change detected. Aborting "
"current reconstruction pass.")) "current reconstruction pass."))
return return
self.reconstruction_part_count += 1
jobs = self.build_reconstruction_jobs(part_info) jobs = self.build_reconstruction_jobs(part_info)
if not jobs: if not jobs:
# If this part belongs on this node, _get_part_jobs # If this part belongs on this node, _get_part_jobs

View File

@ -625,7 +625,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
stat_line = line.split('of', 1)[0].strip() stat_line = line.split('of', 1)[0].strip()
stats_lines.add(stat_line) stats_lines.add(stat_line)
acceptable = set([ acceptable = set([
'0/3 (0.00%) partitions', '3/8 (37.50%) partitions',
'5/8 (62.50%) partitions',
'8/8 (100.00%) partitions', '8/8 (100.00%) partitions',
]) ])
matched = stats_lines & acceptable matched = stats_lines & acceptable
@ -804,16 +805,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
pass pass
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
# since our collect_parts job is a generator, that yields directly self.reconstructor.process_job = lambda j: None
# into build_jobs and then spawns it's safe to do the remove_files self.reconstructor.reconstruct()
# without making reconstructor startup slow
self.reconstructor._reset_stats()
for part_info in self.reconstructor.collect_parts():
self.assertNotEqual(pol_1_part_1_path, part_info['part_path'])
self.assertFalse(os.path.exists(pol_1_part_1_path)) self.assertFalse(os.path.exists(pol_1_part_1_path))
warnings = self.reconstructor.logger.get_lines_for_level('warning') warnings = self.reconstructor.logger.get_lines_for_level('warning')
self.assertEqual(1, len(warnings)) self.assertEqual(1, len(warnings))
self.assertIn('Unexpected entity in data dir:', warnings[0]) self.assertIn(pol_1_part_1_path, warnings[0])
self.assertIn('not a directory', warnings[0].lower())
def test_ignores_status_file(self): def test_ignores_status_file(self):
# Following fd86d5a, the auditor will leave status files on each device # Following fd86d5a, the auditor will leave status files on each device