Merge "Don't rehash primaries in reconstructor handoffs_only mode"
This commit is contained in:
commit
ded0de7aa5
@ -892,6 +892,9 @@ class ObjectReconstructor(Daemon):
|
||||
override_partitions=None):
|
||||
"""
|
||||
Helper for getting partitions in the top level reconstructor
|
||||
|
||||
In handoffs_only mode no primary partitions will not be included in the
|
||||
returned (possibly empty) list.
|
||||
"""
|
||||
override_devices = override_devices or []
|
||||
override_partitions = override_partitions or []
|
||||
@ -975,6 +978,14 @@ class ObjectReconstructor(Daemon):
|
||||
if override_partitions and (partition not in
|
||||
override_partitions):
|
||||
continue
|
||||
if self.handoffs_only and any(
|
||||
local_dev['id'] == n['id']
|
||||
for n in policy.object_ring.get_part_nodes(
|
||||
partition)):
|
||||
self.logger.debug('Skipping %s job for %s '
|
||||
'while in handoffs_only mode.',
|
||||
SYNC, part_path)
|
||||
continue
|
||||
part_info = {
|
||||
'local_dev': local_dev,
|
||||
'policy': policy,
|
||||
@ -1049,11 +1060,6 @@ class ObjectReconstructor(Daemon):
|
||||
self.run_pool.spawn(self.delete_partition,
|
||||
part_info['part_path'])
|
||||
for job in jobs:
|
||||
if (self.handoffs_only and job['job_type'] != REVERT):
|
||||
self.logger.debug('Skipping %s job for %s '
|
||||
'while in handoffs_only mode.',
|
||||
job['job_type'], job['path'])
|
||||
continue
|
||||
self.run_pool.spawn(self.process_job, job)
|
||||
with Timeout(self.lockup_timeout):
|
||||
self.run_pool.waitall()
|
||||
|
@ -777,8 +777,25 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
|
||||
self.reconstructor.process_job = fake_process_job
|
||||
|
||||
_orig_build_jobs = self.reconstructor.build_reconstruction_jobs
|
||||
built_jobs = []
|
||||
|
||||
def capture_jobs(part_info):
|
||||
jobs = _orig_build_jobs(part_info)
|
||||
built_jobs.append((part_info, jobs))
|
||||
return jobs
|
||||
|
||||
with mock.patch.object(self.reconstructor, 'build_reconstruction_jobs',
|
||||
capture_jobs):
|
||||
self.reconstructor.reconstruct()
|
||||
# only revert jobs
|
||||
self.reconstructor.reconstruct()
|
||||
found = [(part_info['partition'], set(
|
||||
j['job_type'] for j in jobs))
|
||||
for part_info, jobs in built_jobs]
|
||||
self.assertEqual([
|
||||
# partition, job_types
|
||||
(2, {'sync_revert'}),
|
||||
], found)
|
||||
self.assertEqual(found_job_types, {object_reconstructor.REVERT})
|
||||
# but failures keep handoffs remaining
|
||||
msgs = self.reconstructor.logger.get_lines_for_level('info')
|
||||
@ -1024,18 +1041,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(
|
||||
captured_ssync, fail_jobs=fail_jobs)), \
|
||||
mocked_http_conn(*[200, 200],
|
||||
body=pickle.dumps({})) as request_log:
|
||||
mocked_http_conn() as request_log:
|
||||
self.reconstructor.reconstruct()
|
||||
self.assertFalse(request_log.unexpected_requests)
|
||||
|
||||
# global setup has four revert jobs
|
||||
self.assertEqual(len(captured_ssync), 4)
|
||||
# global setup has two handoff parts
|
||||
self.assertEqual(len(captured_ssync), 2)
|
||||
expected_ssync_calls = set([
|
||||
# device, part, frag_index
|
||||
('sda1', 2, 2),
|
||||
('sda1', 2, 0),
|
||||
('sda1', 0, 2),
|
||||
('sda1', 1, 1),
|
||||
])
|
||||
self.assertEqual(expected_ssync_calls, set([
|
||||
(context['job']['device'],
|
||||
@ -1044,7 +1059,26 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
for context in captured_ssync
|
||||
]))
|
||||
|
||||
self.assertEqual(2, len(request_log.requests))
|
||||
# failed jobs don't sync suffixes
|
||||
self.assertFalse(
|
||||
self.reconstructor.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(
|
||||
self.reconstructor.logger.get_lines_for_level('error'))
|
||||
# handoffs remaining and part exists
|
||||
self.assertEqual(2, self.reconstructor.handoffs_remaining)
|
||||
self.assertTrue(os.path.exists(self.parts_1['2']))
|
||||
|
||||
# again with no failures
|
||||
captured_ssync = []
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(captured_ssync)), \
|
||||
mocked_http_conn(
|
||||
200, 200, body=pickle.dumps({})) as request_log:
|
||||
self.reconstructor.reconstruct()
|
||||
self.assertFalse(request_log.unexpected_requests)
|
||||
# same jobs
|
||||
self.assertEqual(len(captured_ssync), 2)
|
||||
# but this time we rehash at the end
|
||||
expected_suffix_calls = []
|
||||
for context in captured_ssync:
|
||||
if not context['success']:
|
||||
@ -1061,7 +1095,19 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
for r in request_log.requests))
|
||||
self.assertFalse(
|
||||
self.reconstructor.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(request_log.unexpected_requests)
|
||||
# handoffs are cleaned up
|
||||
self.assertEqual(0, self.reconstructor.handoffs_remaining)
|
||||
warning_msgs = self.reconstructor.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(warning_msgs))
|
||||
self.assertIn('no handoffs remaining', warning_msgs[0])
|
||||
|
||||
# need one more pass to cleanup the part dir
|
||||
self.assertTrue(os.path.exists(self.parts_1['2']))
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync([])), \
|
||||
mocked_http_conn() as request_log:
|
||||
self.reconstructor.reconstruct()
|
||||
self.assertFalse(os.path.exists(self.parts_1['2']))
|
||||
|
||||
def test_get_part_jobs(self):
|
||||
# yeah, this test code expects a specific setup
|
||||
@ -1486,11 +1532,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
# we're only going to add sda and sdc into the ring
|
||||
local_devs = ('sda', 'sdb', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'id': i,
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port + 1 if dev == 'sdb' else self.port,
|
||||
} for dev in local_devs]
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
stub_ring_devs.append({
|
||||
'id': i + 1,
|
||||
'device': 'sdd',
|
||||
'replication_ip': '127.0.0.88', # not local via IP
|
||||
'replication_port': self.port,
|
||||
@ -1533,16 +1581,19 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
# we're only going to add sda and sdc into the ring
|
||||
local_devs = ('sda', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'id': i,
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port,
|
||||
} for dev in local_devs]
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
stub_ring_devs.append({
|
||||
'id': i + 1,
|
||||
'device': 'sdb',
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port + 1, # not local via port
|
||||
})
|
||||
stub_ring_devs.append({
|
||||
'id': i + 2,
|
||||
'device': 'sdd',
|
||||
'replication_ip': '127.0.0.88', # not local via IP
|
||||
'replication_port': self.port,
|
||||
@ -1583,10 +1634,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
# we're only going to add sda and sdc into the ring
|
||||
local_devs = ('sda', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'id': i,
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port,
|
||||
} for dev in local_devs]
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips
|
||||
with mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]), \
|
||||
@ -1615,10 +1667,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
utils.mkdirs(os.path.join(
|
||||
self.devices, dev, datadir, str(i)))
|
||||
stub_ring_devs = [{
|
||||
'id': i,
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port
|
||||
} for dev in local_devs]
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
with mock.patch('swift.obj.reconstructor.whataremyips',
|
||||
return_value=[self.ip]), \
|
||||
mock.patch.object(self.policy.object_ring, '_devs',
|
||||
@ -1686,10 +1739,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
def test_collect_parts_cleans_tmp(self):
|
||||
local_devs = ('sda', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'id': i,
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port
|
||||
} for dev in local_devs]
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
for device in local_devs:
|
||||
utils.mkdirs(os.path.join(self.devices, device))
|
||||
fake_unlink = mock.MagicMock()
|
||||
@ -1811,10 +1865,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
# we're only going to add sda and sdc into the ring
|
||||
local_devs = ('sda', 'sdc')
|
||||
stub_ring_devs = [{
|
||||
'id': i,
|
||||
'device': dev,
|
||||
'replication_ip': self.ip,
|
||||
'replication_port': self.port
|
||||
} for dev in local_devs]
|
||||
} for i, dev in enumerate(local_devs)]
|
||||
|
||||
expected = (
|
||||
({}, [
|
||||
|
Loading…
Reference in New Issue
Block a user