From 44c63c69909e532e383017e7c900e4b7d13e0165 Mon Sep 17 00:00:00 2001 From: Clay Gerrard Date: Wed, 7 Jun 2017 11:36:43 -0700 Subject: [PATCH] Don't rehash primaries in reconstructor handoffs_only mode The reconstructor handoffs_only needs to aggressively avoid erroneous I/O related to rehash of primary suffixes. While in handoffs_only mode the reconstructor won't even look at primary partitions. This has a *huge* impact on cycle time once the node has completed processing handoffs; which results in a much faster and stronger signal that that it's either time to rebalance again or turn off handoffs_only. Related-Change-Id: Idde4b6cf92fab6c45f2c0c2733277701eb436898 Change-Id: If4bbb778d511efe13713590639c8b91615556f22 --- swift/obj/reconstructor.py | 16 ++++-- test/unit/obj/test_reconstructor.py | 85 ++++++++++++++++++++++++----- 2 files changed, 81 insertions(+), 20 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index f5e6fe48ed..0f8351ef10 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -888,6 +888,9 @@ class ObjectReconstructor(Daemon): override_partitions=None): """ Helper for getting partitions in the top level reconstructor + + In handoffs_only mode no primary partitions will not be included in the + returned (possibly empty) list. """ override_devices = override_devices or [] override_partitions = override_partitions or [] @@ -958,6 +961,14 @@ class ObjectReconstructor(Daemon): if override_partitions and (partition not in override_partitions): continue + if self.handoffs_only and any( + local_dev['id'] == n['id'] + for n in policy.object_ring.get_part_nodes( + partition)): + self.logger.debug('Skipping %s job for %s ' + 'while in handoffs_only mode.', + SYNC, part_path) + continue part_info = { 'local_dev': local_dev, 'policy': policy, @@ -1031,11 +1042,6 @@ class ObjectReconstructor(Daemon): self.run_pool.spawn(self.delete_partition, part_info['part_path']) for job in jobs: - if (self.handoffs_only and job['job_type'] != REVERT): - self.logger.debug('Skipping %s job for %s ' - 'while in handoffs_only mode.', - job['job_type'], job['path']) - continue self.run_pool.spawn(self.process_job, job) with Timeout(self.lockup_timeout): self.run_pool.waitall() diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index f3d7851bdd..268930ebdd 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -756,8 +756,25 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.reconstructor.process_job = fake_process_job + _orig_build_jobs = self.reconstructor.build_reconstruction_jobs + built_jobs = [] + + def capture_jobs(part_info): + jobs = _orig_build_jobs(part_info) + built_jobs.append((part_info, jobs)) + return jobs + + with mock.patch.object(self.reconstructor, 'build_reconstruction_jobs', + capture_jobs): + self.reconstructor.reconstruct() # only revert jobs - self.reconstructor.reconstruct() + found = [(part_info['partition'], set( + j['job_type'] for j in jobs)) + for part_info, jobs in built_jobs] + self.assertEqual([ + # partition, job_types + (2, {'sync_revert'}), + ], found) self.assertEqual(found_job_types, {object_reconstructor.REVERT}) # but failures keep handoffs remaining msgs = self.reconstructor.logger.get_lines_for_level('info') @@ -1003,18 +1020,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): with mock.patch('swift.obj.reconstructor.ssync_sender', self._make_fake_ssync( captured_ssync, fail_jobs=fail_jobs)), \ - mocked_http_conn(*[200, 200], - body=pickle.dumps({})) as request_log: + mocked_http_conn() as request_log: self.reconstructor.reconstruct() + self.assertFalse(request_log.unexpected_requests) - # global setup has four revert jobs - self.assertEqual(len(captured_ssync), 4) + # global setup has two handoff parts + self.assertEqual(len(captured_ssync), 2) expected_ssync_calls = set([ # device, part, frag_index ('sda1', 2, 2), ('sda1', 2, 0), - ('sda1', 0, 2), - ('sda1', 1, 1), ]) self.assertEqual(expected_ssync_calls, set([ (context['job']['device'], @@ -1023,7 +1038,26 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): for context in captured_ssync ])) - self.assertEqual(2, len(request_log.requests)) + # failed jobs don't sync suffixes + self.assertFalse( + self.reconstructor.logger.get_lines_for_level('warning')) + self.assertFalse( + self.reconstructor.logger.get_lines_for_level('error')) + # handoffs remaining and part exists + self.assertEqual(2, self.reconstructor.handoffs_remaining) + self.assertTrue(os.path.exists(self.parts_1['2'])) + + # again with no failures + captured_ssync = [] + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(captured_ssync)), \ + mocked_http_conn( + 200, 200, body=pickle.dumps({})) as request_log: + self.reconstructor.reconstruct() + self.assertFalse(request_log.unexpected_requests) + # same jobs + self.assertEqual(len(captured_ssync), 2) + # but this time we rehash at the end expected_suffix_calls = [] for context in captured_ssync: if not context['success']: @@ -1040,7 +1074,19 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): for r in request_log.requests)) self.assertFalse( self.reconstructor.logger.get_lines_for_level('error')) - self.assertFalse(request_log.unexpected_requests) + # handoffs are cleaned up + self.assertEqual(0, self.reconstructor.handoffs_remaining) + warning_msgs = self.reconstructor.logger.get_lines_for_level('warning') + self.assertEqual(1, len(warning_msgs)) + self.assertIn('no handoffs remaining', warning_msgs[0]) + + # need one more pass to cleanup the part dir + self.assertTrue(os.path.exists(self.parts_1['2'])) + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync([])), \ + mocked_http_conn() as request_log: + self.reconstructor.reconstruct() + self.assertFalse(os.path.exists(self.parts_1['2'])) def test_get_part_jobs(self): # yeah, this test code expects a specific setup @@ -1452,11 +1498,13 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): # we're only going to add sda and sdc into the ring local_devs = ('sda', 'sdb', 'sdc') stub_ring_devs = [{ + 'id': i, 'device': dev, 'replication_ip': self.ip, 'replication_port': self.port + 1 if dev == 'sdb' else self.port, - } for dev in local_devs] + } for i, dev in enumerate(local_devs)] stub_ring_devs.append({ + 'id': i + 1, 'device': 'sdd', 'replication_ip': '127.0.0.88', # not local via IP 'replication_port': self.port, @@ -1499,16 +1547,19 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): # we're only going to add sda and sdc into the ring local_devs = ('sda', 'sdc') stub_ring_devs = [{ + 'id': i, 'device': dev, 'replication_ip': self.ip, 'replication_port': self.port, - } for dev in local_devs] + } for i, dev in enumerate(local_devs)] stub_ring_devs.append({ + 'id': i + 1, 'device': 'sdb', 'replication_ip': self.ip, 'replication_port': self.port + 1, # not local via port }) stub_ring_devs.append({ + 'id': i + 2, 'device': 'sdd', 'replication_ip': '127.0.0.88', # not local via IP 'replication_port': self.port, @@ -1549,10 +1600,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): # we're only going to add sda and sdc into the ring local_devs = ('sda', 'sdc') stub_ring_devs = [{ + 'id': i, 'device': dev, 'replication_ip': self.ip, 'replication_port': self.port, - } for dev in local_devs] + } for i, dev in enumerate(local_devs)] self.reconstructor.bind_ip = '0.0.0.0' # use whataremyips with mock.patch('swift.obj.reconstructor.whataremyips', return_value=[self.ip]), \ @@ -1581,10 +1633,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): utils.mkdirs(os.path.join( self.devices, dev, datadir, str(i))) stub_ring_devs = [{ + 'id': i, 'device': dev, 'replication_ip': self.ip, 'replication_port': self.port - } for dev in local_devs] + } for i, dev in enumerate(local_devs)] with mock.patch('swift.obj.reconstructor.whataremyips', return_value=[self.ip]), \ mock.patch.object(self.policy.object_ring, '_devs', @@ -1652,10 +1705,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): def test_collect_parts_cleans_tmp(self): local_devs = ('sda', 'sdc') stub_ring_devs = [{ + 'id': i, 'device': dev, 'replication_ip': self.ip, 'replication_port': self.port - } for dev in local_devs] + } for i, dev in enumerate(local_devs)] for device in local_devs: utils.mkdirs(os.path.join(self.devices, device)) fake_unlink = mock.MagicMock() @@ -1777,10 +1831,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): # we're only going to add sda and sdc into the ring local_devs = ('sda', 'sdc') stub_ring_devs = [{ + 'id': i, 'device': dev, 'replication_ip': self.ip, 'replication_port': self.port - } for dev in local_devs] + } for i, dev in enumerate(local_devs)] expected = ( ({}, [