Stop invalidating suffixes post-SSYNC

We only need the invalidation post-rsync, since rsync was changing data
on disk behind Swift's back. Move the REPLICATE call down into the
rsync() helper function and drop it from the reconstructor entirely.

Change-Id: I576901344f1f3abb33b52b36fde0b25b43e54c8a
Closes-Bug: #1818709
This commit is contained in:
Tim Burke
2020-11-02 11:22:46 -08:00
committed by Clay Gerrard
parent 5eaf15486e
commit 3c3cab2645
4 changed files with 48 additions and 158 deletions

View File

@@ -688,22 +688,6 @@ class ObjectReconstructor(Daemon):
suffixes.append(suffix)
return suffixes
def rehash_remote(self, node, job, suffixes):
headers = self.headers.copy()
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
try:
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(sorted(suffixes)),
headers=headers)
conn.getresponse().read()
except (Exception, Timeout):
self.logger.exception(
_("Trying to sync suffixes with %s") % _full_path(
node, job['partition'], '', job['policy']))
def _iter_nodes_for_frag(self, policy, partition, node):
"""
Generate a priority list of nodes that can sync to the given node.
@@ -881,9 +865,6 @@ class ObjectReconstructor(Daemon):
# ssync any out-of-sync suffixes with the remote node
success, _ = ssync_sender(
self, node, job, suffixes)()
# let remote end know to rehash it's suffixes
if success:
self.rehash_remote(node, job, suffixes)
# update stats for this attempt
self.suffix_sync += len(suffixes)
self.logger.update_stats('suffix.syncs', len(suffixes))
@@ -912,7 +893,6 @@ class ObjectReconstructor(Daemon):
success, in_sync_objs = ssync_sender(
self, node, job, job['suffixes'])()
if success:
self.rehash_remote(node, job, job['suffixes'])
syncd_with += 1
reverted_objs.update(in_sync_objs)
if syncd_with >= len(job['sync_to']):

View File

@@ -465,7 +465,21 @@ class ObjectReplicator(Daemon):
data_dir = get_data_dir(job['policy'])
args.append(join(rsync_module, node['device'],
data_dir, job['partition']))
return self._rsync(args) == 0, {}
success = (self._rsync(args) == 0)
# TODO: Catch and swallow (or at least minimize) timeouts when doing
# an update job; if we don't manage to notify the remote, we should
# catch it on the next pass
if success or not job['delete']:
headers = dict(self.default_headers)
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes), headers=headers)
conn.getresponse().read()
return success, {}
def ssync(self, node, job, suffixes, remote_check_objs=None):
return ssync_sender.Sender(
@@ -529,21 +543,12 @@ class ObjectReplicator(Daemon):
# for deletion
success, candidates = self.sync(
node, job, suffixes, **kwargs)
if success:
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'],
node['replication_port'],
node['device'], job['partition'],
'REPLICATE', '/' + '-'.join(suffixes),
headers=headers)
conn.getresponse().read()
if node['region'] != job['region']:
synced_remote_regions[node['region']] = \
viewkeys(candidates)
else:
if not success:
failure_devs_info.add((node['replication_ip'],
node['device']))
if success and node['region'] != job['region']:
synced_remote_regions[node['region']] = viewkeys(
candidates)
responses.append(success)
for cand_objs in synced_remote_regions.values():
if delete_objs is None:
@@ -714,13 +719,6 @@ class ObjectReplicator(Daemon):
continue
stats.rsync += 1
success, _junk = self.sync(node, job, suffixes)
with Timeout(self.http_timeout):
conn = http_connect(
node['replication_ip'], node['replication_port'],
node['device'], job['partition'], 'REPLICATE',
'/' + '-'.join(suffixes),
headers=headers)
conn.getresponse().read()
if not success:
failure_devs_info.add((node['replication_ip'],
node['device']))

View File

@@ -1162,7 +1162,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
ssync_calls = []
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)), \
mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \
mocked_http_conn(*[200] * 6, body=pickle.dumps({})), \
mock.patch.object(
self.reconstructor, 'delete_reverted_objs') as mock_delete:
self.reconstructor.reconstruct()
@@ -1189,7 +1189,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
ssync_calls = []
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)), \
mocked_http_conn(*[200] * 17, body=pickle.dumps({})), \
mocked_http_conn(*[200] * 6, body=pickle.dumps({})), \
mock.patch('swift.obj.reconstructor.random.shuffle'):
self.reconstructor.reconstruct()
for context in ssync_calls:
@@ -1248,28 +1248,10 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
# again with no failures
captured_ssync = []
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(captured_ssync)), \
mocked_http_conn(
200, 200, body=pickle.dumps({})) as request_log:
self._make_fake_ssync(captured_ssync)):
self.reconstructor.reconstruct()
self.assertFalse(request_log.unexpected_requests)
# same jobs
self.assertEqual(len(captured_ssync), 2)
# but this time we rehash at the end
expected_suffix_calls = []
for context in captured_ssync:
if not context['success']:
# only successful jobs generate suffix rehash calls
continue
job = context['job']
expected_suffix_calls.append(
(job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % (
job['sync_to'][0]['device'], job['partition'],
'-'.join(sorted(job['suffixes']))))
)
self.assertEqual(set(expected_suffix_calls),
set((r['ip'], r['path'])
for r in request_log.requests))
self.assertFalse(
self.logger.get_lines_for_level('error'))
# handoffs are cleaned up
@@ -1310,19 +1292,9 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self.assertTrue(os.access(part_path, os.F_OK))
ssync_calls = []
status = [200] * 2
body = pickle.dumps({})
with mocked_http_conn(*status, body=body) as request_log:
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
self.reconstructor.reconstruct(override_partitions=[2])
expected_repliate_calls = set([
(u'10.0.0.0', '/sda4/2/3c1'),
(u'10.0.0.2', '/sda2/2/061'),
])
found_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_repliate_calls, found_calls)
with mock.patch('swift.obj.reconstructor.ssync_sender',
self._make_fake_ssync(ssync_calls)):
self.reconstructor.reconstruct(override_partitions=[2])
expected_ssync_calls = sorted([
(u'10.0.0.0', REVERT, 2, [u'3c1']),
@@ -1367,10 +1339,11 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
self._make_fake_ssync(ssync_calls)):
self.reconstructor.reconstruct(override_partitions=[2])
self.assertEqual([], ssync_calls)
self.assertEqual([], request_log.requests)
self.assertFalse(os.access(part_path, os.F_OK))
def test_process_job_all_success(self):
rehash_per_job_type = {SYNC: 2, REVERT: 1}
rehash_per_job_type = {SYNC: 1, REVERT: 0}
self.reconstructor._reset_stats()
with mock_ssync_sender():
found_jobs = []
@@ -1413,7 +1386,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_insufficient_storage(self):
self.reconstructor._reset_stats()
with mock_ssync_sender():
with mocked_http_conn(*[507] * 15):
with mocked_http_conn(*[507] * 10):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
@@ -1446,7 +1419,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_client_error(self):
self.reconstructor._reset_stats()
with mock_ssync_sender():
with mocked_http_conn(*[400] * 11):
with mocked_http_conn(*[400] * 6):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
@@ -1478,7 +1451,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_timeout(self):
self.reconstructor._reset_stats()
with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 11):
with mock_ssync_sender(), mocked_http_conn(*[Timeout()] * 6):
found_jobs = []
for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs(
@@ -3605,33 +3578,6 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
v, job[k], k)
self.assertEqual(v, job[k], msg)
def test_rehash_remote(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
utils.mkdirs(part_path)
part_info = {
'local_dev': self.local_dev,
'policy': self.policy,
'partition': 1,
'part_path': part_path,
}
jobs = self.reconstructor.build_reconstruction_jobs(part_info)
self.assertEqual(1, len(jobs))
job = jobs[0]
node = job['sync_to'][0]
# process_job used to try and modify the instance base headers
self.reconstructor.headers['X-Backend-Storage-Policy-Index'] = \
int(POLICIES[1])
# ... which doesn't work out under concurrency with multiple policies
self.assertNotEqual(
self.reconstructor.headers['X-Backend-Storage-Policy-Index'],
int(job['policy']))
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.rehash_remote(node, job, [])
self.assertEqual([int(job['policy'])], [
r['headers']['X-Backend-Storage-Policy-Index']
for r in request_log.requests])
def test_get_suffixes_to_sync(self):
part_path = os.path.join(self.devices, self.local_dev['device'],
diskfile.get_data_dir(self.policy), '1')
@@ -3833,7 +3779,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
responses = []
for hashes in (left_hashes, right_hashes, far_hashes):
responses.extend([(200, pickle.dumps(hashes))] * 2)
responses.append((200, pickle.dumps(hashes)))
codes, body_iter = zip(*responses)
ssync_calls = []
@@ -3845,11 +3791,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
expected_suffix_calls = [
(sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
(sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']),
(sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
(sync_to[1]['ip'], '/%s/0/123-abc' % sync_to[1]['device']),
(sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']),
(sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']),
]
self.assertEqual(expected_suffix_calls,
[(r['ip'], r['path']) for r in request_log.requests])
@@ -3915,7 +3858,6 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
responses = [
(200, pickle.dumps(left_hashes)),
(200, pickle.dumps(right_hashes)),
(200, pickle.dumps(right_hashes)),
(200, pickle.dumps(far_hashes)),
]
codes, body_iter = zip(*responses)
@@ -4005,7 +3947,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
}
responses = [(200, pickle.dumps(hashes)) for hashes in (
left_hashes, right_hashes, right_hashes, far_hashes)]
left_hashes, right_hashes, far_hashes)]
codes, body_iter = zip(*responses)
ssync_calls = []
@@ -4018,7 +3960,6 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
expected_suffix_calls = set([
(sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
(sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
(sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']),
(sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']),
])
self.assertEqual(expected_suffix_calls,
@@ -4076,7 +4017,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
responses = []
for hashes in (left_hashes, right_hashes, far_hashes):
responses.extend([(200, pickle.dumps(hashes))] * 2)
responses.append((200, pickle.dumps(hashes)))
codes, body_iter = zip(*responses)
ssync_calls = []
@@ -4089,11 +4030,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
expected_suffix_calls = set([
(sync_to[0]['ip'], '/%s/0' % sync_to[0]['device']),
(sync_to[0]['ip'], '/%s/0/123' % sync_to[0]['device']),
(sync_to[1]['ip'], '/%s/0' % sync_to[1]['device']),
(sync_to[1]['ip'], '/%s/0/abc' % sync_to[1]['device']),
(sync_to[2]['ip'], '/%s/0' % sync_to[2]['device']),
(sync_to[2]['ip'], '/%s/0/123-abc' % sync_to[2]['device']),
])
self.assertEqual(expected_suffix_calls,
set((r['ip'], r['path'])
@@ -4149,13 +4087,7 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
for node in part_nodes[:3]:
expected_suffix_calls.update([
(node['replication_ip'], '/%s/0' % node['device']),
(node['replication_ip'], '/%s/0/123-abc' % node['device']),
])
# the first (primary sync_to) node's rehash_remote will be skipped
first_node = part_nodes[0]
expected_suffix_calls.remove(
(first_node['replication_ip'], '/%s/0/123-abc'
% first_node['device']))
ssync_calls = []
with mock_ssync_sender(ssync_calls,
@@ -4272,10 +4204,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
responses = [
(200, pickle.dumps(left_hashes)), # hashes left partner
(200, pickle.dumps(left_hashes)), # hashes post-sync
(507, ''), # unmounted right partner
(200, pickle.dumps({})), # hashes handoff
(200, ''), # hashes post-sync
(200, pickle.dumps(far_hashes)), # hashes far partner
]
codes, body_iter = zip(*responses)
@@ -4295,11 +4225,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
else:
self.fail('Unable to find handoff?!')
expected = collections.Counter([
(200, sync_to[0]['ip']),
(200, sync_to[0]['ip']),
(507, sync_to[1]['ip']),
(200, handoff['ip']),
(200, handoff['ip']),
(200, sync_to[2]['ip']),
])
self.assertEqual(expected, collections.Counter(
@@ -4343,17 +4271,9 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
ssync_calls = []
with mock_ssync_sender(ssync_calls), \
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
return_value=(None, stub_hashes)), \
mocked_http_conn(200, body=pickle.dumps({})) as request_log:
return_value=(None, stub_hashes)):
self.reconstructor.process_job(job)
expected_suffix_calls = set([
(sync_to[0]['ip'], '/%s/0/123-abc' % sync_to[0]['device']),
])
found_suffix_calls = set((r['ip'], r['path'])
for r in request_log.requests)
self.assertEqual(expected_suffix_calls, found_suffix_calls)
self.assertEqual(
sorted(collections.Counter(
(c['node']['ip'], c['node']['port'], c['node']['device'],
@@ -4523,15 +4443,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback):
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
self.reconstructor.process_job(job)
self.assertEqual([
(sync_to[0]['replication_ip'], '/%s/0/%s' % (
sync_to[0]['device'], suffix)),
], [
(r['ip'], r['path']) for r in request_log.requests
])
# hashpath has been removed
self.assertFalse(os.path.exists(df._datadir))
@@ -4575,15 +4488,8 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
ssync_calls = []
with mock_ssync_sender(ssync_calls,
response_callback=ssync_response_callback):
with mocked_http_conn(200, body=pickle.dumps({})) as request_log:
self.reconstructor.process_job(job)
self.reconstructor.process_job(job)
self.assertEqual([
(sync_to[0]['replication_ip'], '/%s/0/%s' % (
sync_to[0]['device'], suffix)),
], [
(r['ip'], r['path']) for r in request_log.requests
])
# hashpath is still there, but it's empty
self.assertEqual([], os.listdir(df._datadir))

View File

@@ -2011,11 +2011,6 @@ class TestObjectReplicator(unittest.TestCase):
node['replication_port'], node['device'],
repl_job['partition'], 'REPLICATE',
'', headers=self.headers))
reqs.append(mock.call(
node['replication_ip'],
node['replication_port'], node['device'],
repl_job['partition'], 'REPLICATE',
'/a83', headers=self.headers))
mock_http.assert_has_calls(reqs, any_order=True)
@mock.patch('swift.obj.replicator.tpool.execute')
@@ -2056,12 +2051,21 @@ class TestObjectReplicator(unittest.TestCase):
jobs = self.replicator.collect_jobs()
_m_rsync = mock.Mock(return_value=0)
_m_os_path_exists = mock.Mock(return_value=True)
expected_reqs = []
with mock.patch.object(self.replicator, '_rsync', _m_rsync), \
mock.patch('os.path.exists', _m_os_path_exists):
mock.patch('os.path.exists', _m_os_path_exists), \
mocked_http_conn(
*[200] * 2 * sum(len(job['nodes']) for job in jobs),
body=pickle.dumps('{}')) as request_log:
for job in jobs:
self.assertTrue('region' in job)
for node in job['nodes']:
for rsync_compress in (True, False):
expected_reqs.append((
'REPLICATE', node['ip'],
'/%s/%s/fake_suffix' % (
node['device'], job['partition']),
))
self.replicator.rsync_compress = rsync_compress
ret = self.replicator.sync(node, job,
['fake_suffix'])
@@ -2086,6 +2090,8 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEqual(
_m_os_path_exists.call_args_list[-2][0][0],
os.path.join(job['path']))
self.assertEqual(expected_reqs, [
(r['method'], r['ip'], r['path']) for r in request_log.requests])
def test_do_listdir(self):
# Test if do_listdir is enabled for every 10th partition to rehash