From 1b3879e0da4def66b98ff5735331ab1cef4d365f Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Fri, 3 Dec 2021 14:35:09 +0000 Subject: [PATCH] reconstructor: include partially reverted handoffs in handoffs_remaining For a reconstructor revert job, if sync'd to sufficient other nodes, the handoff partition is considered done and handoffs_remaining is not incremented. With the new max_objects_per_revert option [1], a ssync job may appear to be complete but not all objects have yet been reverted, so handoffs remaining should be incremented. [1] Related-Change: If81760c80a4692212e3774e73af5ce37c02e8aff Change-Id: I59572f75b9b0ba331369eb7358932943b7935ff0 --- swift/obj/reconstructor.py | 9 +++-- swift/obj/ssync_sender.py | 22 +++++++----- test/unit/obj/test_reconstructor.py | 52 ++++++++++++++++++++--------- test/unit/obj/test_ssync_sender.py | 22 ++++++++++++ 4 files changed, 78 insertions(+), 27 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index c1c3636492..d8fcf6977f 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -1088,19 +1088,22 @@ class ObjectReconstructor(Daemon): with df_mgr.partition_lock(job['device'], job['policy'], job['partition'], name='replication', timeout=0.2): + limited_by_max_objects = False for node in job['sync_to']: node['backend_index'] = job['policy'].get_backend_index( node['index']) - success, in_sync_objs = ssync_sender( + sender = ssync_sender( self, node, job, job['suffixes'], include_non_durable=True, - max_objects=self.max_objects_per_revert)() + max_objects=self.max_objects_per_revert) + success, in_sync_objs = sender() + limited_by_max_objects |= sender.limited_by_max_objects if success: syncd_with += 1 reverted_objs.update(in_sync_objs) if syncd_with >= len(job['sync_to']): self.delete_reverted_objs(job, reverted_objs) - else: + if syncd_with < len(job['sync_to']) or limited_by_max_objects: self.handoffs_remaining += 1 except PartitionLockTimeout: self.logger.info("Unable to lock handoff partition %d for revert " diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 309bcc1b61..efcef6948f 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -155,6 +155,7 @@ class Sender(object): self.remote_check_objs = remote_check_objs self.include_non_durable = include_non_durable self.max_objects = max_objects + self.limited_by_max_objects = False def __call__(self): """ @@ -285,6 +286,7 @@ class Sender(object): Full documentation of this can be found at :py:meth:`.Receiver.missing_check`. """ + self.limited_by_max_objects = False available_map = {} send_map = {} # First, send our list. @@ -307,6 +309,7 @@ class Sender(object): self.remote_check_objs, hash_gen) nlines = 0 nbytes = 0 + object_hash = None for object_hash, timestamps in hash_gen: available_map[object_hash] = timestamps with exceptions.MessageTimeout( @@ -321,16 +324,17 @@ class Sender(object): nlines += 1 nbytes += len(msg) if 0 < self.max_objects <= nlines: - for _ in hash_gen: - # only log truncation if there were more hashes to come... - self.daemon.logger.info( - 'ssync missing_check truncated after %d objects: ' - 'device: %s, part: %s, policy: %s, last object hash: ' - '%s', nlines, self.job['device'], - self.job['partition'], int(self.job['policy']), - object_hash) - break break + for _ in hash_gen: + # only log truncation if there were more hashes to come... + self.limited_by_max_objects = True + self.daemon.logger.info( + 'ssync missing_check truncated after %d objects: ' + 'device: %s, part: %s, policy: %s, last object hash: ' + '%s', nlines, self.job['device'], + self.job['partition'], int(self.job['policy']), + object_hash) + break with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check end'): msg = b':MISSING_CHECK: END\r\n' diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index f8f71e3581..922419ac78 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -52,21 +52,33 @@ from test.unit import (patch_policies, mocked_http_conn, FabricatedRing, from test.unit.obj.common import write_diskfile -@contextmanager -def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs): - def fake_ssync(daemon, node, job, suffixes, **kwargs): +class FakeSsyncSender(object): + def __init__(self, daemon, node, job, suffixes, ssync_calls=None, + response_callback=None, **kwargs): if ssync_calls is not None: call_args = {'node': node, 'job': job, 'suffixes': suffixes} call_args.update(kwargs) ssync_calls.append(call_args) + self.response_callback = response_callback + self.node = node + self.job = job + self.suffixes = suffixes + self.limited_by_max_objects = False - def fake_call(): - if response_callback: - response = response_callback(node, job, suffixes) - else: - response = True, {} - return response - return fake_call + def __call__(self): + if self.response_callback: + response = self.response_callback( + self.node, self.job, self.suffixes) + else: + response = True, {} + return response + + +@contextmanager +def mock_ssync_sender(ssync_calls=None, response_callback=None, **kwargs): + def fake_ssync(daemon, node, job, suffixes, **kwargs): + return FakeSsyncSender(daemon, node, job, suffixes, ssync_calls, + response_callback, **kwargs) with mock.patch('swift.obj.reconstructor.ssync_sender', fake_ssync): yield fake_ssync @@ -1126,13 +1138,16 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): frag_index=self.job.get('frag_index'), frag_prefs=frag_prefs) self.available_map = {} + self.limited_by_max_objects = False nlines = 0 for hash_, timestamps in hash_gen: self.available_map[hash_] = timestamps nlines += 1 if 0 < max_objects <= nlines: break - + for _ in hash_gen: + self.limited_by_max_objects = True + break context['available_map'] = self.available_map ssync_calls.append(context) self.success = True @@ -1459,7 +1474,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): dev['replication_port'] == self.reconstructor.port] partition = (local_devs[0]['id'] + 1) % 3 - # 2 durable objects + # three durable objects df_0 = self._create_diskfile( object_name='zero', part=partition) datafile_0 = df_0.manager.cleanup_ondisk_files( @@ -1480,7 +1495,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): actual_datafiles = [df for df in datafiles if os.path.exists(df)] self.assertEqual(datafiles, actual_datafiles) - # only two object will be sync'd and purged... + # only two objects will be sync'd and purged... ssync_calls = [] conf = dict(self.conf, max_objects_per_revert=2, handoffs_only=True) self.reconstructor = object_reconstructor.ObjectReconstructor( @@ -1493,18 +1508,25 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): self.assertEqual(2, context.get('max_objects')) actual_datafiles = [df for df in datafiles if os.path.exists(df)] self.assertEqual(1, len(actual_datafiles), actual_datafiles) + # handoff still reported as remaining + self.assertEqual(1, self.reconstructor.handoffs_remaining) # ...until next reconstructor run which will sync and purge the last - # object + # object; max_objects_per_revert == actual number of objects ssync_calls = [] + conf = dict(self.conf, max_objects_per_revert=1, handoffs_only=True) + self.reconstructor = object_reconstructor.ObjectReconstructor( + conf, logger=self.logger) with mock.patch('swift.obj.reconstructor.ssync_sender', self._make_fake_ssync(ssync_calls)): self.reconstructor.reconstruct() for context in ssync_calls: self.assertEqual(REVERT, context['job']['job_type']) - self.assertEqual(2, context.get('max_objects')) + self.assertEqual(1, context.get('max_objects')) actual_datafiles = [df for df in datafiles if os.path.exists(df)] self.assertEqual([], actual_datafiles) + # handoff is no longer remaining + self.assertEqual(0, self.reconstructor.handoffs_remaining) def test_no_delete_failed_revert(self): # test will only process revert jobs diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index cd7999e41e..a5eb203c2a 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -794,11 +794,13 @@ class TestSender(BaseTest): connection = FakeConnection() response = FakeResponse() self.sender.daemon.node_timeout = 0.01 + self.assertFalse(self.sender.limited_by_max_objects) with mock.patch.object(connection, 'send', side_effect=lambda *args: eventlet.sleep(1)): with self.assertRaises(exceptions.MessageTimeout) as cm: self.sender.missing_check(connection, response) self.assertIn('0.01 seconds: missing_check start', str(cm.exception)) + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_timeout_send_line(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -815,6 +817,7 @@ class TestSender(BaseTest): max_objects=0) self.sender.daemon.node_timeout = 0.01 self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) sleeps = [0, 0, 1] with mock.patch.object( connection, 'send', @@ -823,6 +826,7 @@ class TestSender(BaseTest): self.sender.missing_check(connection, response) self.assertIn('0.01 seconds: missing_check send line: ' '1 lines (57 bytes) sent', str(cm.exception)) + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_has_empty_suffixes(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -846,6 +850,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) self.assertEqual( @@ -854,6 +859,7 @@ class TestSender(BaseTest): b'15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(send_map, {}) self.assertEqual(available_map, {}) + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_has_suffixes(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -893,6 +899,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) self.assertEqual( @@ -916,6 +923,7 @@ class TestSender(BaseTest): ts_ctype=Timestamp(1380144474.44448)))] self.assertEqual(available_map, dict(candidates)) self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_max_objects_less_than_actual_objects(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -957,6 +965,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) self.assertEqual( @@ -978,6 +987,7 @@ class TestSender(BaseTest): 'part: 9, policy: 0, last object hash: ' '9d41d8cd98f00b204e9800998ecf0def'], self.daemon_logger.get_lines_for_level('info')) + self.assertTrue(self.sender.limited_by_max_objects) def test_missing_check_max_objects_exactly_actual_objects(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -1012,6 +1022,7 @@ class TestSender(BaseTest): ':MISSING_CHECK: START\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) self.assertEqual( @@ -1030,6 +1041,7 @@ class TestSender(BaseTest): self.assertEqual(available_map, dict(candidates)) # nothing logged re: truncation self.assertEqual([], self.daemon_logger.get_lines_for_level('info')) + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_far_end_disconnect(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -1052,6 +1064,7 @@ class TestSender(BaseTest): } self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) response = FakeResponse(chunk_body='\r\n') exc = None try: @@ -1064,6 +1077,7 @@ class TestSender(BaseTest): b'17\r\n:MISSING_CHECK: START\r\n\r\n' b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_far_end_disconnect2(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -1086,6 +1100,7 @@ class TestSender(BaseTest): } self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) response = FakeResponse( chunk_body=':MISSING_CHECK: START\r\n') exc = None @@ -1099,6 +1114,7 @@ class TestSender(BaseTest): b'17\r\n:MISSING_CHECK: START\r\n\r\n' b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_far_end_unexpected(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -1121,6 +1137,7 @@ class TestSender(BaseTest): } self.sender.suffixes = ['abc'] self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) response = FakeResponse(chunk_body='OH HAI\r\n') exc = None try: @@ -1133,6 +1150,7 @@ class TestSender(BaseTest): b'17\r\n:MISSING_CHECK: START\r\n\r\n' b'33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' b'15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_send_map(self): def yield_hashes(device, partition, policy, suffixes=None, **kwargs): @@ -1160,6 +1178,7 @@ class TestSender(BaseTest): '0123abc dm\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) self.assertEqual( @@ -1171,6 +1190,7 @@ class TestSender(BaseTest): self.assertEqual(available_map, dict([('9d41d8cd98f00b204e9800998ecf0abc', {'ts_data': Timestamp(1380144470.00000)})])) + self.assertFalse(self.sender.limited_by_max_objects) def test_missing_check_extra_line_parts(self): # check that sender tolerates extra parts in missing check @@ -1200,12 +1220,14 @@ class TestSender(BaseTest): '0123abc d extra response parts\r\n' ':MISSING_CHECK: END\r\n')) self.sender.df_mgr.yield_hashes = yield_hashes + self.assertFalse(self.sender.limited_by_max_objects) available_map, send_map = self.sender.missing_check(connection, response) self.assertEqual(send_map, {'0123abc': {'data': True}}) self.assertEqual(available_map, dict([('9d41d8cd98f00b204e9800998ecf0abc', {'ts_data': Timestamp(1380144470.00000)})])) + self.assertFalse(self.sender.limited_by_max_objects) def test_updates_timeout(self): connection = FakeConnection()