From 41f85f3969d854dceb1a0f6f22c377d62e7e6477 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Fri, 26 Feb 2021 19:21:59 +0000 Subject: [PATCH] sharder: fix loop in find_compactible_shard_sequences The sharder find_compactible_shard_sequences function was vulnerable to looping with some combinations of shrink_threshold and merge_size parameters. The inner loop might not consume a shard range, resulting in the same shard range being submitted to the inner loop again. This patch simplifies the function in an attempt to make it more obvious that the loops are always making progress towards termination by consuming shard ranges from the list. Change-Id: Ia87ab6feaf5172d91f1c60c2e0f72e03182e3c9b --- swift/container/sharder.py | 82 +++++++++++---------- test/unit/cli/test_manage_shard_ranges.py | 54 ++++++++++++++ test/unit/container/test_sharder.py | 86 +++++++++++++++++++---- 3 files changed, 171 insertions(+), 51 deletions(-) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 0bfa42a231..2616cf9e49 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -125,10 +125,13 @@ def is_sharding_candidate(shard_range, threshold): shard_range.object_count >= threshold) -def is_shrinking_candidate(shard_range, shrink_threshold, states=None): +def is_shrinking_candidate(shard_range, shrink_threshold, merge_size, + states=None): + # typically shrink_threshold < merge_size but check both just in case states = states or (ShardRange.ACTIVE,) return (shard_range.state in states and - shard_range.object_count < shrink_threshold) + shard_range.object_count < shrink_threshold and + shard_range.object_count <= merge_size) def find_sharding_candidates(broker, threshold, shard_ranges=None): @@ -211,53 +214,53 @@ def find_compactible_shard_sequences(broker, # merge_size if (sequence and (not is_shrinking_candidate( - sequence[-1], shrink_threshold, + sequence[-1], shrink_threshold, merge_size, states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or 0 < max_shrinking < len(sequence) or sequence.object_count >= merge_size)): return True return False - def sequence_shrinking(sequence): - # check for sequence that ia already shrinking - return (sequence and any([sr.state == ShardRange.SHRINKING - for sr in sequence])) - - def find_compactible_sequence(shard_ranges_todo): - compactible_sequence = ShardRangeList() - object_count = 0 - consumed = 0 - for shard_range in shard_ranges_todo: - if (compactible_sequence and - compactible_sequence.upper < shard_range.lower): - # found a gap! break before consuming this range because it - # could become the first in the next sequence - break - if (shard_range.name != own_shard_range.name and - shard_range.state not in (ShardRange.ACTIVE, - ShardRange.SHRINKING)): - # found? created? sharded? don't touch it - consumed += 1 - break - proposed_object_count = object_count + shard_range.object_count - if (shard_range.state == ShardRange.SHRINKING or - proposed_object_count <= merge_size): - consumed += 1 - compactible_sequence.append(shard_range) - object_count = proposed_object_count - if shard_range.state == ShardRange.SHRINKING: - continue - if sequence_complete(compactible_sequence): - break - return compactible_sequence, consumed - compactible_sequences = [] index = 0 expanding = 0 while ((max_expanding < 0 or expanding < max_expanding) and index < len(shard_ranges)): - sequence, consumed = find_compactible_sequence(shard_ranges[index:]) - index += consumed + if not is_shrinking_candidate( + shard_ranges[index], shrink_threshold, merge_size, + states=(ShardRange.ACTIVE, ShardRange.SHRINKING)): + # this shard range cannot be the start of a new or existing + # compactible sequence, move on + index += 1 + continue + + # start of a *possible* sequence + sequence = ShardRangeList([shard_ranges[index]]) + for shard_range in shard_ranges[index + 1:]: + # attempt to add contiguous shard ranges to the sequence + if sequence.upper < shard_range.lower: + # found a gap! break before consuming this range because it + # could become the first in the next sequence + break + + if shard_range.state not in (ShardRange.ACTIVE, + ShardRange.SHRINKING): + # found? created? sharded? don't touch it + break + + if shard_range.state == ShardRange.SHRINKING: + # already shrinking: add to sequence unconditionally + sequence.append(shard_range) + elif (sequence.object_count + shard_range.object_count + <= merge_size): + # add to sequence: could be a donor or acceptor + sequence.append(shard_range) + if sequence_complete(sequence): + break + else: + break + + index += len(sequence) if (index == len(shard_ranges) and not compactible_sequences and not sequence_complete(sequence) and @@ -279,7 +282,8 @@ def find_compactible_shard_sequences(broker, # all valid sequences are counted against the max_expanding allowance # even if the sequence is already shrinking expanding += 1 - if not sequence_shrinking(sequence) or include_shrinking: + if (all([sr.state != ShardRange.SHRINKING for sr in sequence]) or + include_shrinking): compactible_sequences.append(sequence) return compactible_sequences diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 644d35bbb3..e24a471a92 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -901,6 +901,60 @@ class TestManageShardRanges(unittest.TestCase): [ShardRange.SHRINKING] + [ShardRange.ACTIVE], [sr.state for sr in updated_ranges]) + def test_compact_expansion_limit_less_than_shrink_threshold(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + if i % 2: + sr.object_count = 25 + else: + sr.object_count = 3 + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--shrink-threshold', '10', + '--expansion-limit', '5']) + self.assertEqual(0, ret) + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['No shards identified for compaction.'], + out_lines[:1]) + + def test_compact_nothing_to_do(self): + broker = self._make_broker() + shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_') + for i, sr in enumerate(shard_ranges): + sr.update_state(ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + self._move_broker_to_sharded_state(broker) + out = StringIO() + err = StringIO() + # all shards are too big to shrink + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--shrink-threshold', '5', + '--expansion-limit', '8']) + self.assertEqual(0, ret) + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['No shards identified for compaction.'], + out_lines[:1]) + + # all shards could shrink but acceptors would be too large + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'compact', '--yes', + '--shrink-threshold', '11', + '--expansion-limit', '12']) + self.assertEqual(0, ret) + out_lines = out.getvalue().split('\n') + self.assertEqual( + ['No shards identified for compaction.'], + out_lines[:1]) + def test_compact_shrink_threshold(self): # verify option to set the shrink threshold for compaction; broker = self._make_broker() diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index b7cd4e2a14..7853699645 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -6329,6 +6329,36 @@ class TestSharderFunctions(BaseTestSharder): sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1) self.assertEqual([], sequences) + def test_find_compactible_no_donors(self): + broker = self._make_broker() + shard_ranges = self._make_shard_ranges( + (('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), + ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), + state=ShardRange.ACTIVE, object_count=10) + broker.merge_shard_ranges(shard_ranges) + # shards exceed shrink threshold + sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1) + self.assertEqual([], sequences) + # compacted shards would exceed merge size + sequences = find_compactible_shard_sequences(broker, 11, 19, -1, -1) + self.assertEqual([], sequences) + # shards exceed merge size + sequences = find_compactible_shard_sequences(broker, 11, 9, -1, -1) + self.assertEqual([], sequences) + # shards exceed merge size and shrink threshold + sequences = find_compactible_shard_sequences(broker, 10, 9, -1, -1) + self.assertEqual([], sequences) + # shards exceed *zero'd* merge size and shrink threshold + sequences = find_compactible_shard_sequences(broker, 0, 0, -1, -1) + self.assertEqual([], sequences) + # shards exceed *negative* merge size and shrink threshold + sequences = find_compactible_shard_sequences(broker, -1, -2, -1, -1) + self.assertEqual([], sequences) + # weird case: shards object count less than threshold but compacted + # shards would exceed merge size + sequences = find_compactible_shard_sequences(broker, 20, 19, -1, -1) + self.assertEqual([], sequences) + def test_find_compactible_four_donors_two_acceptors(self): small_ranges = (2, 3, 4, 7) broker = self._make_broker() @@ -6412,6 +6442,28 @@ class TestSharderFunctions(BaseTestSharder): sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1) self.assertEqual([shard_ranges[:3], shard_ranges[3:]], sequences) + def test_find_compactible_eligible_states(self): + # verify that compactible sequences only include shards in valid states + broker = self._make_broker() + shard_ranges = self._make_shard_ranges( + (('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'), + ('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')), + state=[ShardRange.SHRINKING, ShardRange.ACTIVE, # ok, shrinking + ShardRange.CREATED, # ineligible state + ShardRange.ACTIVE, ShardRange.ACTIVE, # ok + ShardRange.FOUND, # ineligible state + ShardRange.SHARDED, # ineligible state + ShardRange.ACTIVE, ShardRange.SHRINKING, # ineligible state + ShardRange.SHARDING, # ineligible state + ]) + broker.merge_shard_ranges(shard_ranges) + own_sr = broker.get_own_shard_range() + own_sr.update_state(ShardRange.SHARDED) + broker.merge_shard_ranges(own_sr) + sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1, + include_shrinking=True) + self.assertEqual([shard_ranges[:2], shard_ranges[3:5], ], sequences) + def test_find_compactible_max_shrinking(self): # verify option to limit the number of shrinking shards per acceptor broker = self._make_broker() @@ -6508,21 +6560,31 @@ class TestSharderFunctions(BaseTestSharder): self.assertFalse(is_sharding_candidate(sr, 10)) def test_is_shrinking_candidate(self): - states = (ShardRange.ACTIVE, ShardRange.SHRINKING) + def do_check_true(state, ok_states): + # shard range has 9 objects + sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '', + state=state, object_count=9) + self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states)) - def do_check(state, object_count): + do_check_true(ShardRange.ACTIVE, (ShardRange.ACTIVE,)) + do_check_true(ShardRange.ACTIVE, + (ShardRange.ACTIVE, ShardRange.SHRINKING)) + do_check_true(ShardRange.SHRINKING, + (ShardRange.ACTIVE, ShardRange.SHRINKING)) + + def do_check_false(state, object_count): + states = (ShardRange.ACTIVE, ShardRange.SHRINKING) + # shard range has 10 objects sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '', state=state, object_count=object_count) - if object_count < 10: - if state == ShardRange.ACTIVE and object_count < 10: - self.assertTrue(is_shrinking_candidate(sr, 10)) - if state in states: - self.assertTrue(is_shrinking_candidate(sr, 10, states)) - else: - self.assertFalse(is_shrinking_candidate(sr, 10)) - self.assertFalse(is_shrinking_candidate(sr, 10, states)) + self.assertFalse(is_shrinking_candidate(sr, 10, 20)) + self.assertFalse(is_shrinking_candidate(sr, 10, 20, states)) + self.assertFalse(is_shrinking_candidate(sr, 10, 9)) + self.assertFalse(is_shrinking_candidate(sr, 10, 9, states)) + self.assertFalse(is_shrinking_candidate(sr, 20, 9)) + self.assertFalse(is_shrinking_candidate(sr, 20, 9, states)) for state in ShardRange.STATES: - for object_count in (9, 10, 11): + for object_count in (10, 11): with annotate_failure('%s %s' % (state, object_count)): - do_check(state, object_count) + do_check_false(state, object_count)