reconstructor: Abort just the changed policies
We've already walked the disks looking for work, may as well continue with the work that's definitely still valid. Change-Id: I4c33ed5f5a66d89d259761b5ce12fb6652b28c40
This commit is contained in:
parent
1b8708d9c5
commit
1907594bd8
@ -1434,15 +1434,21 @@ class ObjectReconstructor(Daemon):
|
|||||||
|
|
||||||
stats = spawn(self.heartbeat)
|
stats = spawn(self.heartbeat)
|
||||||
lockup_detector = spawn(self.detect_lockups)
|
lockup_detector = spawn(self.detect_lockups)
|
||||||
|
changed_rings = set()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
self.run_pool = GreenPool(size=self.concurrency)
|
self.run_pool = GreenPool(size=self.concurrency)
|
||||||
for part_info in self.collect_parts(**kwargs):
|
for part_info in self.collect_parts(**kwargs):
|
||||||
sleep() # Give spawns a cycle
|
sleep() # Give spawns a cycle
|
||||||
|
if part_info['policy'] in changed_rings:
|
||||||
|
continue
|
||||||
if not self.check_ring(part_info['policy'].object_ring):
|
if not self.check_ring(part_info['policy'].object_ring):
|
||||||
self.logger.info("Ring change detected. Aborting "
|
changed_rings.add(part_info['policy'])
|
||||||
"current reconstruction pass.")
|
self.logger.info(
|
||||||
return
|
"Ring change detected for policy %d (%s). Aborting "
|
||||||
|
"current reconstruction pass for this policy.",
|
||||||
|
part_info['policy'].idx, part_info['policy'].name)
|
||||||
|
continue
|
||||||
|
|
||||||
self.reconstruction_part_count += 1
|
self.reconstruction_part_count += 1
|
||||||
jobs = self.build_reconstruction_jobs(part_info)
|
jobs = self.build_reconstruction_jobs(part_info)
|
||||||
|
@ -158,6 +158,9 @@ def get_header_frag_index(self, body):
|
|||||||
|
|
||||||
@patch_policies([StoragePolicy(0, name='zero', is_default=True),
|
@patch_policies([StoragePolicy(0, name='zero', is_default=True),
|
||||||
ECStoragePolicy(1, name='one',
|
ECStoragePolicy(1, name='one',
|
||||||
|
ec_type=DEFAULT_TEST_EC_TYPE,
|
||||||
|
ec_ndata=3, ec_nparity=2),
|
||||||
|
ECStoragePolicy(2, name='two',
|
||||||
ec_type=DEFAULT_TEST_EC_TYPE,
|
ec_type=DEFAULT_TEST_EC_TYPE,
|
||||||
ec_ndata=3, ec_nparity=2)])
|
ec_ndata=3, ec_nparity=2)])
|
||||||
class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||||
@ -169,6 +172,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||||||
self.testdir = tempfile.mkdtemp()
|
self.testdir = tempfile.mkdtemp()
|
||||||
POLICIES[0].object_ring = FabricatedRing(3)
|
POLICIES[0].object_ring = FabricatedRing(3)
|
||||||
POLICIES[1].object_ring = FabricatedRing(5)
|
POLICIES[1].object_ring = FabricatedRing(5)
|
||||||
|
POLICIES[2].object_ring = FabricatedRing(5)
|
||||||
utils.HASH_PATH_SUFFIX = b'endcap'
|
utils.HASH_PATH_SUFFIX = b'endcap'
|
||||||
utils.HASH_PATH_PREFIX = b''
|
utils.HASH_PATH_PREFIX = b''
|
||||||
self.devices = os.path.join(self.testdir, 'node')
|
self.devices = os.path.join(self.testdir, 'node')
|
||||||
@ -868,13 +872,32 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||||||
def test_reconstruct_check_ring(self):
|
def test_reconstruct_check_ring(self):
|
||||||
# test reconstruct logs info when check_ring is false and that
|
# test reconstruct logs info when check_ring is false and that
|
||||||
# there are no jobs built
|
# there are no jobs built
|
||||||
with mock.patch('swift.obj.reconstructor.ObjectReconstructor.'
|
objects_2 = os.path.join(self.devices, 'sda1',
|
||||||
'check_ring', return_value=False):
|
diskfile.get_data_dir(POLICIES[2]))
|
||||||
|
os.mkdir(objects_2)
|
||||||
|
for part in ['0', '1', '2']:
|
||||||
|
os.mkdir(os.path.join(objects_2, part))
|
||||||
|
|
||||||
|
with mock.patch.object(self.reconstructor, 'process_job') as mock_pj, \
|
||||||
|
mock.patch(
|
||||||
|
'swift.obj.reconstructor.ObjectReconstructor.check_ring',
|
||||||
|
side_effect=lambda ring: ring is not POLICIES[1].object_ring):
|
||||||
self.reconstructor.reconstruct()
|
self.reconstructor.reconstruct()
|
||||||
msgs = self.logger.get_lines_for_level('info')
|
msgs = self.logger.get_lines_for_level('info')
|
||||||
self.assertIn('Ring change detected. Aborting'
|
self.assertEqual(1, msgs.count(
|
||||||
' current reconstruction pass.', msgs[0])
|
'Ring change detected for policy 1 (one). Aborting '
|
||||||
self.assertEqual(self.reconstructor.reconstruction_count, 0)
|
'current reconstruction pass for this policy.'), msgs)
|
||||||
|
self.assertEqual(
|
||||||
|
[call[1][0]['job_type'] for call in mock_pj.mock_calls],
|
||||||
|
['sync_only'] * 2)
|
||||||
|
self.assertEqual(
|
||||||
|
[call[1][0]['policy'] for call in mock_pj.mock_calls],
|
||||||
|
[POLICIES[2]] * 2)
|
||||||
|
# partition 2 doesn't belong here and doesn't have data,
|
||||||
|
# so it just gets cleaned up
|
||||||
|
self.assertEqual(
|
||||||
|
{call[1][0]['partition'] for call in mock_pj.mock_calls},
|
||||||
|
{0, 1})
|
||||||
|
|
||||||
def test_build_reconstruction_jobs(self):
|
def test_build_reconstruction_jobs(self):
|
||||||
self.reconstructor._reset_stats()
|
self.reconstructor._reset_stats()
|
||||||
@ -1039,10 +1062,13 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
|||||||
for part_info in self.reconstructor.collect_parts():
|
for part_info in self.reconstructor.collect_parts():
|
||||||
parts.append(part_info['partition'])
|
parts.append(part_info['partition'])
|
||||||
error_lines = self.logger.get_lines_for_level('error')
|
error_lines = self.logger.get_lines_for_level('error')
|
||||||
self.assertEqual(len(error_lines), 1,
|
self.assertEqual(
|
||||||
'Expected only one error, got %r' % error_lines)
|
len(error_lines), 2,
|
||||||
|
'Expected exactly two errors, got %r' % error_lines)
|
||||||
log_args, log_kwargs = self.logger.log_dict['error'][0]
|
log_args, log_kwargs = self.logger.log_dict['error'][0]
|
||||||
self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!')
|
self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!')
|
||||||
|
log_args, log_kwargs = self.logger.log_dict['error'][1]
|
||||||
|
self.assertEqual(str(log_kwargs['exc_info'][1]), 'Ow!')
|
||||||
|
|
||||||
def test_removes_zbf(self):
|
def test_removes_zbf(self):
|
||||||
# suppress unmount warning
|
# suppress unmount warning
|
||||||
|
Loading…
x
Reference in New Issue
Block a user