From 3c3cab264568f8e10ba5b30be739c25e56af53c1 Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Mon, 2 Nov 2020 11:22:46 -0800 Subject: [PATCH] Stop invalidating suffixes post-SSYNC We only need the invalidation post-rsync, since rsync was changing data on disk behind Swift's back. Move the REPLICATE call down into the rsync() helper function and drop it from the reconstructor entirely. Change-Id: I576901344f1f3abb33b52b36fde0b25b43e54c8a Closes-Bug: #1818709 --- swift/obj/reconstructor.py | 20 ----- swift/obj/replicator.py | 40 +++++---- test/unit/obj/test_reconstructor.py | 128 ++++------------------------ test/unit/obj/test_replicator.py | 18 ++-- 4 files changed, 48 insertions(+), 158 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index cce2719882..a05c08d7eb 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -688,22 +688,6 @@ class ObjectReconstructor(Daemon): suffixes.append(suffix) return suffixes - def rehash_remote(self, node, job, suffixes): - headers = self.headers.copy() - headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) - try: - with Timeout(self.http_timeout): - conn = http_connect( - node['replication_ip'], node['replication_port'], - node['device'], job['partition'], 'REPLICATE', - '/' + '-'.join(sorted(suffixes)), - headers=headers) - conn.getresponse().read() - except (Exception, Timeout): - self.logger.exception( - _("Trying to sync suffixes with %s") % _full_path( - node, job['partition'], '', job['policy'])) - def _iter_nodes_for_frag(self, policy, partition, node): """ Generate a priority list of nodes that can sync to the given node. @@ -881,9 +865,6 @@ class ObjectReconstructor(Daemon): # ssync any out-of-sync suffixes with the remote node success, _ = ssync_sender( self, node, job, suffixes)() - # let remote end know to rehash it's suffixes - if success: - self.rehash_remote(node, job, suffixes) # update stats for this attempt self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) @@ -912,7 +893,6 @@ class ObjectReconstructor(Daemon): success, in_sync_objs = ssync_sender( self, node, job, job['suffixes'])() if success: - self.rehash_remote(node, job, job['suffixes']) syncd_with += 1 reverted_objs.update(in_sync_objs) if syncd_with >= len(job['sync_to']): diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index ee93164d4f..8f21373282 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -465,7 +465,21 @@ class ObjectReplicator(Daemon): data_dir = get_data_dir(job['policy']) args.append(join(rsync_module, node['device'], data_dir, job['partition'])) - return self._rsync(args) == 0, {} + success = (self._rsync(args) == 0) + + # TODO: Catch and swallow (or at least minimize) timeouts when doing + # an update job; if we don't manage to notify the remote, we should + # catch it on the next pass + if success or not job['delete']: + headers = dict(self.default_headers) + headers['X-Backend-Storage-Policy-Index'] = int(job['policy']) + with Timeout(self.http_timeout): + conn = http_connect( + node['replication_ip'], node['replication_port'], + node['device'], job['partition'], 'REPLICATE', + '/' + '-'.join(suffixes), headers=headers) + conn.getresponse().read() + return success, {} def ssync(self, node, job, suffixes, remote_check_objs=None): return ssync_sender.Sender( @@ -529,21 +543,12 @@ class ObjectReplicator(Daemon): # for deletion success, candidates = self.sync( node, job, suffixes, **kwargs) - if success: - with Timeout(self.http_timeout): - conn = http_connect( - node['replication_ip'], - node['replication_port'], - node['device'], job['partition'], - 'REPLICATE', '/' + '-'.join(suffixes), - headers=headers) - conn.getresponse().read() - if node['region'] != job['region']: - synced_remote_regions[node['region']] = \ - viewkeys(candidates) - else: + if not success: failure_devs_info.add((node['replication_ip'], node['device'])) + if success and node['region'] != job['region']: + synced_remote_regions[node['region']] = viewkeys( + candidates) responses.append(success) for cand_objs in synced_remote_regions.values(): if delete_objs is None: @@ -714,13 +719,6 @@ class ObjectReplicator(Daemon): continue stats.rsync += 1 success, _junk = self.sync(node, job, suffixes) - with Timeout(self.http_timeout): - conn = http_connect( - node['replication_ip'], node['replication_port'], - node['device'], job['partition'], 'REPLICATE', - '/' + '-'.join(suffixes), - headers=headers) - conn.getresponse().read() if not success: failure_devs_info.add((node['replication_ip'], node['device'])) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 604b5cadca..a350a77681 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -1162,7 +1162,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ssync_calls = [] with mock.patch('swift.obj.reconstructor.ssync_sender', self._make_fake_ssync(ssync_calls)), \ - mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \ + mocked_http_conn(*[200] * 6, body=pickle.dumps({})), \ mock.patch.object( self.reconstructor, 'delete_reverted_objs') as mock_delete: self.reconstructor.reconstruct() @@ -1189,7 +1189,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): ssync_calls = [] with mock.patch('swift.obj.reconstructor.ssync_sender', self._make_fake_ssync(ssync_calls)), \ - mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \ + mocked_http_conn(*[200] * 6, body=pickle.dumps({})), \ mock.patch('swift.obj.reconstructor.random.shuffle'): self.reconstructor.reconstruct() for context in ssync_calls: @@ -1248,28 +1248,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): # again with no failures captured_ssync = [] with mock.patch('swift.obj.reconstructor.ssync_sender', - self._make_fake_ssync(captured_ssync)), \ - mocked_http_conn( - 200, 200, body=pickle.dumps({})) as request_log: + self._make_fake_ssync(captured_ssync)): self.reconstructor.reconstruct() - self.assertFalse(request_log.unexpected_requests) # same jobs self.assertEqual(len(captured_ssync), 2) - # but this time we rehash at the end - expected_suffix_calls = [] - for context in captured_ssync: - if not context['success']: - # only successful jobs generate suffix rehash calls - continue - job = context['job'] - expected_suffix_calls.append( - (job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % ( - job['sync_to'][0]['device'], job['partition'], - '-'.join(sorted(job['suffixes'])))) - ) - self.assertEqual(set(expected_suffix_calls), - set((r['ip'], r['path']) - for r in request_log.requests)) self.assertFalse( self.logger.get_lines_for_level('error')) # handoffs are cleaned up @@ -1310,19 +1292,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertTrue(os.access(part_path, os.F_OK)) ssync_calls = [] - status = [200] * 2 - body = pickle.dumps({}) - with mocked_http_conn(*status, body=body) as request_log: - with mock.patch('swift.obj.reconstructor.ssync_sender', - self._make_fake_ssync(ssync_calls)): - self.reconstructor.reconstruct(override_partitions=[2]) - expected_repliate_calls = set([ - (u'10.0.0.0', '/sda4/2/3c1'), - (u'10.0.0.2', '/sda2/2/061'), - ]) - found_calls = set((r['ip'], r['path']) - for r in request_log.requests) - self.assertEqual(expected_repliate_calls, found_calls) + with mock.patch('swift.obj.reconstructor.ssync_sender', + self._make_fake_ssync(ssync_calls)): + self.reconstructor.reconstruct(override_partitions=[2]) expected_ssync_calls = sorted([ (u'10.0.0.0', REVERT, 2, [u'3c1']), @@ -1367,10 +1339,11 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self._make_fake_ssync(ssync_calls)): self.reconstructor.reconstruct(override_partitions=[2]) self.assertEqual([], ssync_calls) + self.assertEqual([], request_log.requests) self.assertFalse(os.access(part_path, os.F_OK)) def test_process_job_all_success(self): - rehash_per_job_type = {SYNC: 2, REVERT: 1} + rehash_per_job_type = {SYNC: 1, REVERT: 0} self.reconstructor._reset_stats() with mock_ssync_sender(): found_jobs = [] @@ -1413,7 +1386,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_process_job_all_insufficient_storage(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[507] * 15): + with mocked_http_conn(*[507] * 10): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1446,7 +1419,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_process_job_all_client_error(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[400] * 11): + with mocked_http_conn(*[400] * 6): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1478,7 +1451,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_process_job_all_timeout(self): self.reconstructor._reset_stats() - with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 11): + with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 6): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -3605,33 +3578,6 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): v, job[k], k) self.assertEqual(v, job[k], msg) - def test_rehash_remote(self): - part_path = os.path.join(self.devices, self.local_dev['device'], - diskfile.get_data_dir(self.policy), '1') - utils.mkdirs(part_path) - part_info = { - 'local_dev': self.local_dev, - 'policy': self.policy, - 'partition': 1, - 'part_path': part_path, - } - jobs = self.reconstructor.build_reconstruction_jobs(part_info) - self.assertEqual(1, len(jobs)) - job = jobs[0] - node = job['sync_to'][0] - # process_job used to try and modify the instance base headers - self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \ - int(POLICIES[1]) - # ... which doesn't work out under concurrency with multiple policies - self.assertNotEqual( - self.reconstructor.headers['X-Backend-Storage-Policy-Index'], - int(job['policy'])) - with mocked_http_conn(200, body=pickle.dumps({})) as request_log: - self.reconstructor.rehash_remote(node, job, []) - self.assertEqual([int(job['policy'])], [ - r['headers']['X-Backend-Storage-Policy-Index'] - for r in request_log.requests]) - def test_get_suffixes_to_sync(self): part_path = os.path.join(self.devices, self.local_dev['device'], diskfile.get_data_dir(self.policy), '1') @@ -3833,7 +3779,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): responses = [] for hashes in (left_hashes, right_hashes, far_hashes): - responses.extend([(200, pickle.dumps(hashes))] * 2) + responses.append((200, pickle.dumps(hashes))) codes, body_iter = zip(*responses) ssync_calls = [] @@ -3845,11 +3791,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): expected_suffix_calls = [ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), - (sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), - (sync_to[1]['ip'], '/%s/0/123-abc' % sync_to[1]['device']), (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), - (sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']), ] self.assertEqual(expected_suffix_calls, [(r['ip'], r['path']) for r in request_log.requests]) @@ -3915,7 +3858,6 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): responses = [ (200, pickle.dumps(left_hashes)), (200, pickle.dumps(right_hashes)), - (200, pickle.dumps(right_hashes)), (200, pickle.dumps(far_hashes)), ] codes, body_iter = zip(*responses) @@ -4005,7 +3947,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): } responses = [(200, pickle.dumps(hashes)) for hashes in ( - left_hashes, right_hashes, right_hashes, far_hashes)] + left_hashes, right_hashes, far_hashes)] codes, body_iter = zip(*responses) ssync_calls = [] @@ -4018,7 +3960,6 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): expected_suffix_calls = set([ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), - (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']), (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), ]) self.assertEqual(expected_suffix_calls, @@ -4076,7 +4017,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): responses = [] for hashes in (left_hashes, right_hashes, far_hashes): - responses.extend([(200, pickle.dumps(hashes))] * 2) + responses.append((200, pickle.dumps(hashes))) codes, body_iter = zip(*responses) ssync_calls = [] @@ -4089,11 +4030,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): expected_suffix_calls = set([ (sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']), - (sync_to[0]['ip'], '/%s/0/123' % sync_to[0]['device']), (sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']), - (sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']), (sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']), - (sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']), ]) self.assertEqual(expected_suffix_calls, set((r['ip'], r['path']) @@ -4149,13 +4087,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): for node in part_nodes[:3]: expected_suffix_calls.update([ (node['replication_ip'], '/%s/0' % node['device']), - (node['replication_ip'], '/%s/0/123-abc' % node['device']), ]) - # the first (primary sync_to) node's rehash_remote will be skipped - first_node = part_nodes[0] - expected_suffix_calls.remove( - (first_node['replication_ip'], '/%s/0/123-abc' - % first_node['device'])) ssync_calls = [] with mock_ssync_sender(ssync_calls, @@ -4272,10 +4204,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): responses = [ (200, pickle.dumps(left_hashes)), # hashes left partner - (200, pickle.dumps(left_hashes)), # hashes post-sync (507, ''), # unmounted right partner (200, pickle.dumps({})), # hashes handoff - (200, ''), # hashes post-sync (200, pickle.dumps(far_hashes)), # hashes far partner ] codes, body_iter = zip(*responses) @@ -4295,11 +4225,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): else: self.fail('Unable to find handoff?!') expected = collections.Counter([ - (200, sync_to[0]['ip']), (200, sync_to[0]['ip']), (507, sync_to[1]['ip']), (200, handoff['ip']), - (200, handoff['ip']), (200, sync_to[2]['ip']), ]) self.assertEqual(expected, collections.Counter( @@ -4343,17 +4271,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): ssync_calls = [] with mock_ssync_sender(ssync_calls), \ mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes', - return_value=(None, stub_hashes)), \ - mocked_http_conn(200, body=pickle.dumps({})) as request_log: + return_value=(None, stub_hashes)): self.reconstructor.process_job(job) - expected_suffix_calls = set([ - (sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']), - ]) - found_suffix_calls = set((r['ip'], r['path']) - for r in request_log.requests) - self.assertEqual(expected_suffix_calls, found_suffix_calls) - self.assertEqual( sorted(collections.Counter( (c['node']['ip'], c['node']['port'], c['node']['device'], @@ -4523,15 +4443,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): ssync_calls = [] with mock_ssync_sender(ssync_calls, response_callback=ssync_response_callback): - with mocked_http_conn(200, body=pickle.dumps({})) as request_log: - self.reconstructor.process_job(job) + self.reconstructor.process_job(job) - self.assertEqual([ - (sync_to[0]['replication_ip'], '/%s/0/%s' % ( - sync_to[0]['device'], suffix)), - ], [ - (r['ip'], r['path']) for r in request_log.requests - ]) # hashpath has been removed self.assertFalse(os.path.exists(df._datadir)) @@ -4575,15 +4488,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor): ssync_calls = [] with mock_ssync_sender(ssync_calls, response_callback=ssync_response_callback): - with mocked_http_conn(200, body=pickle.dumps({})) as request_log: - self.reconstructor.process_job(job) + self.reconstructor.process_job(job) - self.assertEqual([ - (sync_to[0]['replication_ip'], '/%s/0/%s' % ( - sync_to[0]['device'], suffix)), - ], [ - (r['ip'], r['path']) for r in request_log.requests - ]) # hashpath is still there, but it's empty self.assertEqual([], os.listdir(df._datadir)) diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index ca9c9abf05..dbfd17e04d 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -2011,11 +2011,6 @@ class TestObjectReplicator(unittest.TestCase): node['replication_port'], node['device'], repl_job['partition'], 'REPLICATE', '', headers=self.headers)) - reqs.append(mock.call( - node['replication_ip'], - node['replication_port'], node['device'], - repl_job['partition'], 'REPLICATE', - '/a83', headers=self.headers)) mock_http.assert_has_calls(reqs, any_order=True) @mock.patch('swift.obj.replicator.tpool.execute') @@ -2056,12 +2051,21 @@ class TestObjectReplicator(unittest.TestCase): jobs = self.replicator.collect_jobs() _m_rsync = mock.Mock(return_value=0) _m_os_path_exists = mock.Mock(return_value=True) + expected_reqs = [] with mock.patch.object(self.replicator, '_rsync', _m_rsync), \ - mock.patch('os.path.exists', _m_os_path_exists): + mock.patch('os.path.exists', _m_os_path_exists), \ + mocked_http_conn( + *[200] * 2 * sum(len(job['nodes']) for job in jobs), + body=pickle.dumps('{}')) as request_log: for job in jobs: self.assertTrue('region' in job) for node in job['nodes']: for rsync_compress in (True, False): + expected_reqs.append(( + 'REPLICATE', node['ip'], + '/%s/%s/fake_suffix' % ( + node['device'], job['partition']), + )) self.replicator.rsync_compress = rsync_compress ret = self.replicator.sync(node, job, ['fake_suffix']) @@ -2086,6 +2090,8 @@ class TestObjectReplicator(unittest.TestCase): self.assertEqual( _m_os_path_exists.call_args_list[-2][0][0], os.path.join(job['path'])) + self.assertEqual(expected_reqs, [ + (r['method'], r['ip'], r['path']) for r in request_log.requests]) def test_do_listdir(self): # Test if do_listdir is enabled for every 10th partition to rehash