Merge "replicator: Ensure handoffs can clear with large handoff_delete"

This commit is contained in:
Zuul 2024-05-20 21:41:28 +00:00 committed by Gerrit Code Review
commit 337079f21f
4 changed files with 140 additions and 78 deletions

View File

@ -240,6 +240,12 @@ class Replicator(Daemon):
self.handoffs_only = config_true_value(conf.get('handoffs_only', 'no'))
self.handoff_delete = config_auto_int_value(
conf.get('handoff_delete', 'auto'), 0)
if self.handoff_delete >= self.ring.replica_count:
self.logger.warning(
'handoff_delete=%d is too high to have an effect on a ring '
'with replica count %d. Disabling.',
self.handoff_delete, self.ring.replica_count)
self.handoff_delete = 0
def _zero_stats(self):
"""Zero out the stats."""

View File

@ -195,6 +195,12 @@ class ObjectReplicator(Daemon):
'operation, please disable handoffs_first and '
'handoff_delete before the next '
'normal rebalance')
if all(self.load_object_ring(p).replica_count <= self.handoff_delete
for p in self.policies):
self.logger.warning('No storage policies found for which '
'handoff_delete=%d would have an effect. '
'Disabling.', self.handoff_delete)
self.handoff_delete = 0
self.is_multiprocess_worker = None
self._df_router = DiskFileRouter(conf, self.logger)
self._child_process_reaper_queue = queue.LightQueue()
@ -554,7 +560,8 @@ class ObjectReplicator(Daemon):
if self.handoff_delete:
# delete handoff if we have had handoff_delete successes
successes_count = len([resp for resp in responses if resp])
delete_handoff = successes_count >= self.handoff_delete
delete_handoff = successes_count >= min(
self.handoff_delete, len(job['nodes']))
else:
# delete handoff if all syncs were successful
delete_handoff = len(responses) == len(job['nodes']) and \

View File

@ -62,6 +62,7 @@ def lock_parent_directory(filename):
class FakeRing(object):
class Ring(object):
devs = []
replica_count = 3
def __init__(self, path, reload_time=15, ring_name=None):
pass
@ -82,6 +83,7 @@ class FakeRingWithSingleNode(object):
id=1, weight=10.0, zone=1, ip='1.1.1.1', port=6200, device='sdb',
meta='', replication_ip='1.1.1.1', replication_port=6200, region=1
)]
replica_count = 3
def __init__(self, path, reload_time=15, ring_name=None):
pass
@ -117,6 +119,7 @@ class FakeRingWithNodes(object):
id=6, weight=10.0, zone=6, ip='1.1.1.6', port=6200, device='sdb',
meta='', replication_ip='1.1.1.6', replication_port=6200, region=2
)]
replica_count = 3
def __init__(self, path, reload_time=15, ring_name=None):
pass
@ -890,7 +893,10 @@ class TestDBReplicator(unittest.TestCase):
({'handoff_delete': 2}, [True, False, False], False),
({'handoff_delete': 1}, [True] * 3, True),
({'handoff_delete': 1}, [True, True, False], True),
({'handoff_delete': 1}, [True, False, False], True)):
({'handoff_delete': 1}, [True, False, False], True),
# if we configure it too high, handle it gracefully
({'handoff_delete': 5}, [True] * 3, True),
):
do_test(cfg, repl_results, expected_delete)
def test_replicate_object_delete_delegated_to_cleanup_post_replicate(self):

View File

@ -320,6 +320,19 @@ class TestObjectReplicator(unittest.TestCase):
config,
))
def test_massive_handoff_delete_setting_warnings(self):
replicator = object_replicator.ObjectReplicator(
{'swift_dir': self.testdir, 'handoff_delete': '1000'},
logger=self.logger)
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Handoff only mode is not intended for normal operation, '
'please disable handoffs_first and handoff_delete before '
'the next normal rebalance',
'No storage policies found for which handoff_delete=1000 '
'would have an effect. Disabling.',
])
self.assertEqual(replicator.handoff_delete, 0)
def _write_disk_data(self, disk_name, with_json=False):
os.mkdir(os.path.join(self.devices, disk_name))
objects = os.path.join(self.devices, disk_name,
@ -1188,7 +1201,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1200,7 +1213,7 @@ class TestObjectReplicator(unittest.TestCase):
(0, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(part_path))
def test_delete_partition_default_sync_method(self):
self.replicator.conf.pop('sync_method')
@ -1218,7 +1231,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1230,7 +1243,7 @@ class TestObjectReplicator(unittest.TestCase):
(0, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(part_path))
def test_delete_partition_ssync_single_region(self):
devs = [
@ -1269,16 +1282,16 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
def _fake_ssync(node, job, suffixes, **kwargs):
return True, {ohash: ts}
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertFalse(os.path.exists(suffix_dir_path))
self.assertFalse(os.path.exists(part_path))
def test_delete_partition_1(self):
with mock.patch('swift.obj.replicator.http_connect',
@ -1295,7 +1308,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects_1, '1', data_dir)
part_path = os.path.join(self.objects_1, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[1])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1307,7 +1320,7 @@ class TestObjectReplicator(unittest.TestCase):
(0, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(part_path))
def test_delete_partition_with_failures(self):
with mock.patch('swift.obj.replicator.http_connect',
@ -1324,7 +1337,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1342,7 +1355,7 @@ class TestObjectReplicator(unittest.TestCase):
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The path should still exist
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
def test_delete_partition_with_handoff_delete(self):
with mock.patch('swift.obj.replicator.http_connect',
@ -1360,7 +1373,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1377,7 +1390,40 @@ class TestObjectReplicator(unittest.TestCase):
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(part_path))
def test_delete_partition_with_too_high_handoff_delete(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.handoff_delete = 5
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
f = open(os.path.join(df._datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write(b'1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
if node['ip'] not in _ips()]
self.assertLess(len(nodes), self.replicator.handoff_delete)
process_arg_checker = []
for i, node in enumerate(nodes):
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
# everybody succeeds
ret_code = 0
process_arg_checker.append(
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.path.exists(part_path))
def test_delete_partition_with_handoff_delete_failures(self):
with mock.patch('swift.obj.replicator.http_connect',
@ -1395,7 +1441,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1413,7 +1459,7 @@ class TestObjectReplicator(unittest.TestCase):
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
def test_delete_partition_with_handoff_delete_fail_in_other_region(self):
with mock.patch('swift.obj.replicator.http_connect',
@ -1430,7 +1476,7 @@ class TestObjectReplicator(unittest.TestCase):
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
ring = self.replicator.load_object_ring(POLICIES[0])
nodes = [node for node in
ring.get_part_nodes(1)
@ -1448,21 +1494,21 @@ class TestObjectReplicator(unittest.TestCase):
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
def test_delete_partition_override_params(self):
df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o',
policy=POLICIES.legacy)
mkdirs(df._datadir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate(override_devices=['sdb'])
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate(override_partitions=[9])
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate(override_devices=['sda'],
override_partitions=[1])
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(part_path))
def _make_OSError(self, err):
return OSError(err, os.strerror(err))
@ -1515,15 +1561,15 @@ class TestObjectReplicator(unittest.TestCase):
pol1_part_path = os.path.join(self.objects_1, '99')
# sanity checks
self.assertTrue(os.access(pol0_part_path, os.F_OK))
self.assertTrue(os.access(pol1_part_path, os.F_OK))
self.assertTrue(os.path.exists(pol0_part_path))
self.assertTrue(os.path.exists(pol1_part_path))
# a bogus policy index doesn't bother the replicator any more than a
# bogus device or partition does
self.replicator.run_once(policies='1,2,5')
self.assertFalse(os.access(pol1_part_path, os.F_OK))
self.assertTrue(os.access(pol0_part_path, os.F_OK))
self.assertFalse(os.path.exists(pol1_part_path))
self.assertTrue(os.path.exists(pol0_part_path))
# since we weren't operating on everything, but only a subset of
# storage policies, we didn't dump any recon stats.
@ -1545,7 +1591,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
@ -1564,19 +1610,19 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate()
# The file should be deleted at the second replicate call
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertFalse(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate()
# The partition should be deleted at the third replicate call
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertFalse(os.path.exists(suffix_dir_path))
self.assertFalse(os.path.exists(part_path))
del self.call_nums
def test_delete_partition_ssync_with_sync_failure(self):
@ -1593,7 +1639,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
@ -1611,19 +1657,19 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
del self.call_nums
def test_delete_objs_ssync_only_when_in_sync(self):
@ -1641,7 +1687,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
@ -1660,9 +1706,9 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.replicate()
self.assertEqual(3, self.call_nums)
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
del self.call_nums
@ -1682,7 +1728,7 @@ class TestObjectReplicator(unittest.TestCase):
whole_path_from = storage_directory(self.objects, 1, ohash)
suffix_dir_path = os.path.dirname(whole_path_from)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(part_path))
self.call_nums = 0
self.conf['sync_method'] = 'ssync'
@ -1716,27 +1762,27 @@ class TestObjectReplicator(unittest.TestCase):
self.replicator.sync_method = _fake_ssync
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertTrue(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
# Fail with ENOENT
with mock.patch('os.rmdir',
raise_exception_rmdir(OSError, ENOENT)):
self.replicator.replicate()
self.assertFalse(mock_logger.get_lines_for_level('error'))
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
# Fail with ENOTEMPTY
with mock.patch('os.rmdir',
raise_exception_rmdir(OSError, ENOTEMPTY)):
self.replicator.replicate()
self.assertFalse(mock_logger.get_lines_for_level('error'))
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
# Fail with ENOTDIR
with mock.patch('os.rmdir',
@ -1746,19 +1792,19 @@ class TestObjectReplicator(unittest.TestCase):
'Unexpected error trying to cleanup suffix dir %r: ' %
os.path.dirname(df._datadir),
])
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertTrue(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
# Finally we can cleanup everything
self.replicator.replicate()
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertTrue(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertFalse(os.path.exists(suffix_dir_path))
self.assertTrue(os.path.exists(part_path))
self.replicator.replicate()
self.assertFalse(os.access(whole_path_from, os.F_OK))
self.assertFalse(os.access(suffix_dir_path, os.F_OK))
self.assertFalse(os.access(part_path, os.F_OK))
self.assertFalse(os.path.exists(whole_path_from))
self.assertFalse(os.path.exists(suffix_dir_path))
self.assertFalse(os.path.exists(part_path))
def test_run_once_recover_from_failure(self):
conf = dict(swift_dir=self.testdir, devices=self.devices,
@ -1792,18 +1838,15 @@ class TestObjectReplicator(unittest.TestCase):
cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects,
'1', data_dir, ohash),
os.F_OK))
self.assertTrue(os.path.exists(os.path.join(
self.objects, '1', data_dir, ohash)))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
for i, result in [('0', True), ('1', False),
('2', True), ('3', True)]:
self.assertEqual(os.access(
os.path.join(self.objects,
i, diskfile.HASH_FILE),
os.F_OK), result)
hashes_pkl = os.path.join(self.objects, i, diskfile.HASH_FILE)
self.assertEqual(os.path.exists(hashes_pkl), result)
finally:
object_replicator.http_connect = was_connector