From 2f0ab78f9ff85483a157c9cbb17b50eeff539ef9 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Wed, 25 Jan 2017 11:45:55 -0800 Subject: [PATCH] Shuffle disks and parts in reconstructor The main problem with going disk by disk is that it means all of your I/O is only on one spindle at a time and no matter how high you set concurrency it doesn't go any faster. Closes-Bug: #1491605 Change-Id: I69e4c4baee64fd2192cbf5836b0803db1cc71705 --- swift/obj/reconstructor.py | 22 +++++++++++++++++----- test/unit/obj/test_reconstructor.py | 15 +++++++-------- 2 files changed, 24 insertions(+), 13 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index ccc5f137de..b5b1045fe7 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -14,6 +14,7 @@ # limitations under the License. import json +import errno import os from os.path import join import random @@ -697,7 +698,14 @@ class ObjectReconstructor(Daemon): :returns: a list of dicts of job info """ # find all the fi's in the part, and which suffixes have them - hashes = self._get_hashes(policy, part_path, do_listdir=True) + try: + hashes = self._get_hashes(policy, part_path, do_listdir=True) + except OSError as e: + if e.errno != errno.ENOTDIR: + raise + self.logger.warning( + 'Unexpected entity %r is not a directory' % part_path) + return [] non_data_fragment_suffixes = [] data_fi_to_suffixes = defaultdict(list) for suffix, fi_hash in hashes.items(): @@ -820,6 +828,8 @@ class ObjectReconstructor(Daemon): policy2devices[policy] = local_devices self.device_count += len(local_devices) + all_parts = [] + for policy, local_devices in policy2devices.items(): df_mgr = self._df_router[policy] for local_dev in local_devices: @@ -854,8 +864,7 @@ class ObjectReconstructor(Daemon): if partition in ('auditor_status_ALL.json', 'auditor_status_ZBF.json'): continue - if not (partition.isdigit() and - os.path.isdir(part_path)): + if not partition.isdigit(): self.logger.warning( 'Unexpected entity in data dir: %r' % part_path) remove_file(part_path) @@ -871,8 +880,9 @@ class ObjectReconstructor(Daemon): 'partition': partition, 'part_path': part_path, } - yield part_info - self.reconstruction_part_count += 1 + all_parts.append(part_info) + random.shuffle(all_parts) + return all_parts def build_reconstruction_jobs(self, part_info): """ @@ -903,6 +913,7 @@ class ObjectReconstructor(Daemon): def delete_partition(self, path): self.logger.info(_("Removing partition: %s"), path) tpool.execute(shutil.rmtree, path, ignore_errors=True) + remove_file(path) def reconstruct(self, **kwargs): """Run a reconstruction pass""" @@ -920,6 +931,7 @@ class ObjectReconstructor(Daemon): self.logger.info(_("Ring change detected. Aborting " "current reconstruction pass.")) return + self.reconstruction_part_count += 1 jobs = self.build_reconstruction_jobs(part_info) if not jobs: # If this part belongs on this node, _get_part_jobs diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 695670f050..f821d96e9f 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -625,7 +625,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): stat_line = line.split('of', 1)[0].strip() stats_lines.add(stat_line) acceptable = set([ - '0/3 (0.00%) partitions', + '3/8 (37.50%) partitions', + '5/8 (62.50%) partitions', '8/8 (100.00%) partitions', ]) matched = stats_lines & acceptable @@ -804,16 +805,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): pass self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check - # since our collect_parts job is a generator, that yields directly - # into build_jobs and then spawns it's safe to do the remove_files - # without making reconstructor startup slow - self.reconstructor._reset_stats() - for part_info in self.reconstructor.collect_parts(): - self.assertNotEqual(pol_1_part_1_path, part_info['part_path']) + self.reconstructor.process_job = lambda j: None + self.reconstructor.reconstruct() + self.assertFalse(os.path.exists(pol_1_part_1_path)) warnings = self.reconstructor.logger.get_lines_for_level('warning') self.assertEqual(1, len(warnings)) - self.assertIn('Unexpected entity in data dir:', warnings[0]) + self.assertIn(pol_1_part_1_path, warnings[0]) + self.assertIn('not a directory', warnings[0].lower()) def test_ignores_status_file(self): # Following fd86d5a, the auditor will leave status files on each device