Merge "Shuffle disks and parts in reconstructor"
This commit is contained in:
commit
65744c8448
@ -14,6 +14,7 @@
|
||||
# limitations under the License.
|
||||
|
||||
import json
|
||||
import errno
|
||||
import os
|
||||
from os.path import join
|
||||
import random
|
||||
@ -685,7 +686,14 @@ class ObjectReconstructor(Daemon):
|
||||
:returns: a list of dicts of job info
|
||||
"""
|
||||
# find all the fi's in the part, and which suffixes have them
|
||||
hashes = self._get_hashes(policy, part_path, do_listdir=True)
|
||||
try:
|
||||
hashes = self._get_hashes(policy, part_path, do_listdir=True)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
self.logger.warning(
|
||||
'Unexpected entity %r is not a directory' % part_path)
|
||||
return []
|
||||
non_data_fragment_suffixes = []
|
||||
data_fi_to_suffixes = defaultdict(list)
|
||||
for suffix, fi_hash in hashes.items():
|
||||
@ -808,6 +816,8 @@ class ObjectReconstructor(Daemon):
|
||||
policy2devices[policy] = local_devices
|
||||
self.device_count += len(local_devices)
|
||||
|
||||
all_parts = []
|
||||
|
||||
for policy, local_devices in policy2devices.items():
|
||||
df_mgr = self._df_router[policy]
|
||||
for local_dev in local_devices:
|
||||
@ -842,8 +852,7 @@ class ObjectReconstructor(Daemon):
|
||||
if partition in ('auditor_status_ALL.json',
|
||||
'auditor_status_ZBF.json'):
|
||||
continue
|
||||
if not (partition.isdigit() and
|
||||
os.path.isdir(part_path)):
|
||||
if not partition.isdigit():
|
||||
self.logger.warning(
|
||||
'Unexpected entity in data dir: %r' % part_path)
|
||||
remove_file(part_path)
|
||||
@ -859,8 +868,9 @@ class ObjectReconstructor(Daemon):
|
||||
'partition': partition,
|
||||
'part_path': part_path,
|
||||
}
|
||||
yield part_info
|
||||
self.reconstruction_part_count += 1
|
||||
all_parts.append(part_info)
|
||||
random.shuffle(all_parts)
|
||||
return all_parts
|
||||
|
||||
def build_reconstruction_jobs(self, part_info):
|
||||
"""
|
||||
@ -891,6 +901,7 @@ class ObjectReconstructor(Daemon):
|
||||
def delete_partition(self, path):
|
||||
self.logger.info(_("Removing partition: %s"), path)
|
||||
tpool.execute(shutil.rmtree, path, ignore_errors=True)
|
||||
remove_file(path)
|
||||
|
||||
def reconstruct(self, **kwargs):
|
||||
"""Run a reconstruction pass"""
|
||||
@ -908,6 +919,7 @@ class ObjectReconstructor(Daemon):
|
||||
self.logger.info(_("Ring change detected. Aborting "
|
||||
"current reconstruction pass."))
|
||||
return
|
||||
self.reconstruction_part_count += 1
|
||||
jobs = self.build_reconstruction_jobs(part_info)
|
||||
if not jobs:
|
||||
# If this part belongs on this node, _get_part_jobs
|
||||
|
@ -625,7 +625,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
stat_line = line.split('of', 1)[0].strip()
|
||||
stats_lines.add(stat_line)
|
||||
acceptable = set([
|
||||
'0/3 (0.00%) partitions',
|
||||
'3/8 (37.50%) partitions',
|
||||
'5/8 (62.50%) partitions',
|
||||
'8/8 (100.00%) partitions',
|
||||
])
|
||||
matched = stats_lines & acceptable
|
||||
@ -804,16 +805,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
pass
|
||||
self.assertTrue(os.path.isfile(pol_1_part_1_path)) # sanity check
|
||||
|
||||
# since our collect_parts job is a generator, that yields directly
|
||||
# into build_jobs and then spawns it's safe to do the remove_files
|
||||
# without making reconstructor startup slow
|
||||
self.reconstructor._reset_stats()
|
||||
for part_info in self.reconstructor.collect_parts():
|
||||
self.assertNotEqual(pol_1_part_1_path, part_info['part_path'])
|
||||
self.reconstructor.process_job = lambda j: None
|
||||
self.reconstructor.reconstruct()
|
||||
|
||||
self.assertFalse(os.path.exists(pol_1_part_1_path))
|
||||
warnings = self.reconstructor.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(warnings))
|
||||
self.assertIn('Unexpected entity in data dir:', warnings[0])
|
||||
self.assertIn(pol_1_part_1_path, warnings[0])
|
||||
self.assertIn('not a directory', warnings[0].lower())
|
||||
|
||||
def test_ignores_status_file(self):
|
||||
# Following fd86d5a, the auditor will leave status files on each device
|
||||
|
Loading…
Reference in New Issue
Block a user