Merge "Do not sync suffixes when remote rejects reconstructor revert"
This commit is contained in:
commit
d7a6d6e1e9
@ -730,8 +730,8 @@ class ObjectReconstructor(Daemon):
|
||||
for node in job['sync_to']:
|
||||
success, in_sync_objs = ssync_sender(
|
||||
self, node, job, job['suffixes'])()
|
||||
self.rehash_remote(node, job, job['suffixes'])
|
||||
if success:
|
||||
self.rehash_remote(node, job, job['suffixes'])
|
||||
syncd_with += 1
|
||||
reverted_objs.update(in_sync_objs)
|
||||
if syncd_with >= len(job['sync_to']):
|
||||
|
@ -1012,6 +1012,7 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
body_iter = kwargs.get('body_iter', None)
|
||||
if body_iter:
|
||||
body_iter = iter(body_iter)
|
||||
unexpected_requests = []
|
||||
|
||||
def connect(*args, **ckwargs):
|
||||
if kwargs.get('slow_connect', False):
|
||||
@ -1021,7 +1022,15 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
kwargs['give_content_type'](args[6]['Content-Type'])
|
||||
else:
|
||||
kwargs['give_content_type']('')
|
||||
i, status = next(conn_id_and_code_iter)
|
||||
try:
|
||||
i, status = next(conn_id_and_code_iter)
|
||||
except StopIteration:
|
||||
# the code under test may swallow the StopIteration, so by logging
|
||||
# unexpected requests here we allow the test framework to check for
|
||||
# them after the connect function has been used.
|
||||
unexpected_requests.append((args, kwargs))
|
||||
raise
|
||||
|
||||
if 'give_connect' in kwargs:
|
||||
give_conn_fn = kwargs['give_connect']
|
||||
argspec = inspect.getargspec(give_conn_fn)
|
||||
@ -1044,6 +1053,7 @@ def fake_http_connect(*code_iter, **kwargs):
|
||||
connection_id=i, give_send=kwargs.get('give_send'),
|
||||
give_expect=kwargs.get('give_expect'))
|
||||
|
||||
connect.unexpected_requests = unexpected_requests
|
||||
connect.code_iter = code_iter
|
||||
|
||||
return connect
|
||||
|
@ -896,7 +896,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
for status_path in status_paths:
|
||||
self.assertTrue(os.path.exists(status_path))
|
||||
|
||||
def _make_fake_ssync(self, ssync_calls):
|
||||
def _make_fake_ssync(self, ssync_calls, fail_jobs=None):
|
||||
"""
|
||||
Replace SsyncSender with a thin Fake.
|
||||
|
||||
:param ssync_calls: an empty list, a non_local, all calls to ssync will
|
||||
be captured for assertion in the caller.
|
||||
:param fail_jobs: optional iter of dicts, any job passed into Fake that
|
||||
matches a failure dict will return success == False.
|
||||
"""
|
||||
class _fake_ssync(object):
|
||||
def __init__(self, daemon, node, job, suffixes, **kwargs):
|
||||
# capture context and generate an available_map of objs
|
||||
@ -916,9 +924,15 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
self.available_map[hash_] = timestamps
|
||||
context['available_map'] = self.available_map
|
||||
ssync_calls.append(context)
|
||||
self.success = True
|
||||
for failure in (fail_jobs or []):
|
||||
if all(job.get(k) == v for (k, v) in failure.items()):
|
||||
self.success = False
|
||||
break
|
||||
context['success'] = self.success
|
||||
|
||||
def __call__(self, *args, **kwargs):
|
||||
return True, self.available_map
|
||||
return self.success, self.available_map if self.success else {}
|
||||
|
||||
return _fake_ssync
|
||||
|
||||
@ -977,6 +991,57 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
# sanity check that some files should were deleted
|
||||
self.assertGreater(n_files, n_files_after)
|
||||
|
||||
def test_no_delete_failed_revert(self):
|
||||
# test will only process revert jobs
|
||||
self.reconstructor.handoffs_only = True
|
||||
|
||||
captured_ssync = []
|
||||
# fail all jobs on part 2 on sda1
|
||||
fail_jobs = [
|
||||
{'device': 'sda1', 'partition': 2},
|
||||
]
|
||||
with mock.patch('swift.obj.reconstructor.ssync_sender',
|
||||
self._make_fake_ssync(
|
||||
captured_ssync, fail_jobs=fail_jobs)), \
|
||||
mocked_http_conn(*[200, 200],
|
||||
body=pickle.dumps({})) as request_log:
|
||||
self.reconstructor.reconstruct()
|
||||
|
||||
# global setup has four revert jobs
|
||||
self.assertEqual(len(captured_ssync), 4)
|
||||
expected_ssync_calls = set([
|
||||
# device, part, frag_index
|
||||
('sda1', 2, 2),
|
||||
('sda1', 2, 0),
|
||||
('sda1', 0, 2),
|
||||
('sda1', 1, 1),
|
||||
])
|
||||
self.assertEqual(expected_ssync_calls, set([
|
||||
(context['job']['device'],
|
||||
context['job']['partition'],
|
||||
context['job']['frag_index'])
|
||||
for context in captured_ssync
|
||||
]))
|
||||
|
||||
self.assertEqual(2, len(request_log.requests))
|
||||
expected_suffix_calls = []
|
||||
for context in captured_ssync:
|
||||
if not context['success']:
|
||||
# only successful jobs generate suffix rehash calls
|
||||
continue
|
||||
job = context['job']
|
||||
expected_suffix_calls.append(
|
||||
(job['sync_to'][0]['replication_ip'], '/%s/%s/%s' % (
|
||||
job['device'], job['partition'],
|
||||
'-'.join(sorted(job['suffixes']))))
|
||||
)
|
||||
self.assertEqual(set(expected_suffix_calls),
|
||||
set((r['ip'], r['path'])
|
||||
for r in request_log.requests))
|
||||
self.assertFalse(
|
||||
self.reconstructor.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(request_log.unexpected_requests)
|
||||
|
||||
def test_get_part_jobs(self):
|
||||
# yeah, this test code expects a specific setup
|
||||
self.assertEqual(len(self.part_nums), 3)
|
||||
@ -2532,15 +2597,11 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
ssync_calls = []
|
||||
with mock_ssync_sender(ssync_calls,
|
||||
response_callback=ssync_response_callback), \
|
||||
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
|
||||
return_value=(None, stub_hashes)), \
|
||||
mocked_http_conn(*[200] * len(expected_suffix_calls),
|
||||
body=pickle.dumps({})) as request_log:
|
||||
mocked_http_conn() as request_log:
|
||||
self.reconstructor.process_job(job)
|
||||
|
||||
found_suffix_calls = set((r['ip'], r['path'])
|
||||
for r in request_log.requests)
|
||||
self.assertEqual(expected_suffix_calls, found_suffix_calls)
|
||||
# failed ssync job should not generate a suffix rehash
|
||||
self.assertEqual([], request_log.requests)
|
||||
|
||||
self.assertEqual(len(ssync_calls), len(expected_suffix_calls))
|
||||
call = ssync_calls[0]
|
||||
@ -2581,23 +2642,14 @@ class TestObjectReconstructor(BaseTestObjectReconstructor):
|
||||
# should increment
|
||||
return False, {}
|
||||
|
||||
expected_suffix_calls = set([
|
||||
(sync_to[0]['replication_ip'],
|
||||
'/%s/0/123-abc' % sync_to[0]['device'])
|
||||
])
|
||||
|
||||
ssync_calls = []
|
||||
with mock_ssync_sender(ssync_calls,
|
||||
response_callback=ssync_response_callback), \
|
||||
mock.patch('swift.obj.diskfile.ECDiskFileManager._get_hashes',
|
||||
return_value=(None, stub_hashes)), \
|
||||
mocked_http_conn(*[200] * len(expected_suffix_calls),
|
||||
body=pickle.dumps({})) as request_log:
|
||||
mocked_http_conn() as request_log:
|
||||
self.reconstructor.process_job(job)
|
||||
|
||||
found_suffix_calls = set((r['ip'], r['path'])
|
||||
for r in request_log.requests)
|
||||
self.assertEqual(expected_suffix_calls, found_suffix_calls)
|
||||
# failed ssync job should not generate a suffix rehash
|
||||
self.assertEqual([], request_log.requests)
|
||||
|
||||
# this is ssync call to primary (which fails) and nothing else!
|
||||
self.assertEqual(len(ssync_calls), 1)
|
||||
|
Loading…
Reference in New Issue
Block a user