sharder: fix loop in find_compactible_shard_sequences
The sharder find_compactible_shard_sequences function was vulnerable to looping with some combinations of shrink_threshold and merge_size parameters. The inner loop might not consume a shard range, resulting in the same shard range being submitted to the inner loop again. This patch simplifies the function in an attempt to make it more obvious that the loops are always making progress towards termination by consuming shard ranges from the list. Change-Id: Ia87ab6feaf5172d91f1c60c2e0f72e03182e3c9b
This commit is contained in:
parent
e2f5249271
commit
41f85f3969
@ -125,10 +125,13 @@ def is_sharding_candidate(shard_range, threshold):
|
||||
shard_range.object_count >= threshold)
|
||||
|
||||
|
||||
def is_shrinking_candidate(shard_range, shrink_threshold, states=None):
|
||||
def is_shrinking_candidate(shard_range, shrink_threshold, merge_size,
|
||||
states=None):
|
||||
# typically shrink_threshold < merge_size but check both just in case
|
||||
states = states or (ShardRange.ACTIVE,)
|
||||
return (shard_range.state in states and
|
||||
shard_range.object_count < shrink_threshold)
|
||||
shard_range.object_count < shrink_threshold and
|
||||
shard_range.object_count <= merge_size)
|
||||
|
||||
|
||||
def find_sharding_candidates(broker, threshold, shard_ranges=None):
|
||||
@ -211,53 +214,53 @@ def find_compactible_shard_sequences(broker,
|
||||
# merge_size
|
||||
if (sequence and
|
||||
(not is_shrinking_candidate(
|
||||
sequence[-1], shrink_threshold,
|
||||
sequence[-1], shrink_threshold, merge_size,
|
||||
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or
|
||||
0 < max_shrinking < len(sequence) or
|
||||
sequence.object_count >= merge_size)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def sequence_shrinking(sequence):
|
||||
# check for sequence that ia already shrinking
|
||||
return (sequence and any([sr.state == ShardRange.SHRINKING
|
||||
for sr in sequence]))
|
||||
|
||||
def find_compactible_sequence(shard_ranges_todo):
|
||||
compactible_sequence = ShardRangeList()
|
||||
object_count = 0
|
||||
consumed = 0
|
||||
for shard_range in shard_ranges_todo:
|
||||
if (compactible_sequence and
|
||||
compactible_sequence.upper < shard_range.lower):
|
||||
# found a gap! break before consuming this range because it
|
||||
# could become the first in the next sequence
|
||||
break
|
||||
if (shard_range.name != own_shard_range.name and
|
||||
shard_range.state not in (ShardRange.ACTIVE,
|
||||
ShardRange.SHRINKING)):
|
||||
# found? created? sharded? don't touch it
|
||||
consumed += 1
|
||||
break
|
||||
proposed_object_count = object_count + shard_range.object_count
|
||||
if (shard_range.state == ShardRange.SHRINKING or
|
||||
proposed_object_count <= merge_size):
|
||||
consumed += 1
|
||||
compactible_sequence.append(shard_range)
|
||||
object_count = proposed_object_count
|
||||
if shard_range.state == ShardRange.SHRINKING:
|
||||
continue
|
||||
if sequence_complete(compactible_sequence):
|
||||
break
|
||||
return compactible_sequence, consumed
|
||||
|
||||
compactible_sequences = []
|
||||
index = 0
|
||||
expanding = 0
|
||||
while ((max_expanding < 0 or expanding < max_expanding) and
|
||||
index < len(shard_ranges)):
|
||||
sequence, consumed = find_compactible_sequence(shard_ranges[index:])
|
||||
index += consumed
|
||||
if not is_shrinking_candidate(
|
||||
shard_ranges[index], shrink_threshold, merge_size,
|
||||
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)):
|
||||
# this shard range cannot be the start of a new or existing
|
||||
# compactible sequence, move on
|
||||
index += 1
|
||||
continue
|
||||
|
||||
# start of a *possible* sequence
|
||||
sequence = ShardRangeList([shard_ranges[index]])
|
||||
for shard_range in shard_ranges[index + 1:]:
|
||||
# attempt to add contiguous shard ranges to the sequence
|
||||
if sequence.upper < shard_range.lower:
|
||||
# found a gap! break before consuming this range because it
|
||||
# could become the first in the next sequence
|
||||
break
|
||||
|
||||
if shard_range.state not in (ShardRange.ACTIVE,
|
||||
ShardRange.SHRINKING):
|
||||
# found? created? sharded? don't touch it
|
||||
break
|
||||
|
||||
if shard_range.state == ShardRange.SHRINKING:
|
||||
# already shrinking: add to sequence unconditionally
|
||||
sequence.append(shard_range)
|
||||
elif (sequence.object_count + shard_range.object_count
|
||||
<= merge_size):
|
||||
# add to sequence: could be a donor or acceptor
|
||||
sequence.append(shard_range)
|
||||
if sequence_complete(sequence):
|
||||
break
|
||||
else:
|
||||
break
|
||||
|
||||
index += len(sequence)
|
||||
if (index == len(shard_ranges) and
|
||||
not compactible_sequences and
|
||||
not sequence_complete(sequence) and
|
||||
@ -279,7 +282,8 @@ def find_compactible_shard_sequences(broker,
|
||||
# all valid sequences are counted against the max_expanding allowance
|
||||
# even if the sequence is already shrinking
|
||||
expanding += 1
|
||||
if not sequence_shrinking(sequence) or include_shrinking:
|
||||
if (all([sr.state != ShardRange.SHRINKING for sr in sequence]) or
|
||||
include_shrinking):
|
||||
compactible_sequences.append(sequence)
|
||||
|
||||
return compactible_sequences
|
||||
|
@ -901,6 +901,60 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
def test_compact_expansion_limit_less_than_shrink_threshold(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
for i, sr in enumerate(shard_ranges):
|
||||
if i % 2:
|
||||
sr.object_count = 25
|
||||
else:
|
||||
sr.object_count = 3
|
||||
sr.update_state(ShardRange.ACTIVE)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'compact', '--yes',
|
||||
'--shrink-threshold', '10',
|
||||
'--expansion-limit', '5'])
|
||||
self.assertEqual(0, ret)
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['No shards identified for compaction.'],
|
||||
out_lines[:1])
|
||||
|
||||
def test_compact_nothing_to_do(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||
for i, sr in enumerate(shard_ranges):
|
||||
sr.update_state(ShardRange.ACTIVE)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
# all shards are too big to shrink
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'compact', '--yes',
|
||||
'--shrink-threshold', '5',
|
||||
'--expansion-limit', '8'])
|
||||
self.assertEqual(0, ret)
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['No shards identified for compaction.'],
|
||||
out_lines[:1])
|
||||
|
||||
# all shards could shrink but acceptors would be too large
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'compact', '--yes',
|
||||
'--shrink-threshold', '11',
|
||||
'--expansion-limit', '12'])
|
||||
self.assertEqual(0, ret)
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['No shards identified for compaction.'],
|
||||
out_lines[:1])
|
||||
|
||||
def test_compact_shrink_threshold(self):
|
||||
# verify option to set the shrink threshold for compaction;
|
||||
broker = self._make_broker()
|
||||
|
@ -6329,6 +6329,36 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
|
||||
def test_find_compactible_no_donors(self):
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=ShardRange.ACTIVE, object_count=10)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
# shards exceed shrink threshold
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# compacted shards would exceed merge size
|
||||
sequences = find_compactible_shard_sequences(broker, 11, 19, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# shards exceed merge size
|
||||
sequences = find_compactible_shard_sequences(broker, 11, 9, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# shards exceed merge size and shrink threshold
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 9, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# shards exceed *zero'd* merge size and shrink threshold
|
||||
sequences = find_compactible_shard_sequences(broker, 0, 0, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# shards exceed *negative* merge size and shrink threshold
|
||||
sequences = find_compactible_shard_sequences(broker, -1, -2, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# weird case: shards object count less than threshold but compacted
|
||||
# shards would exceed merge size
|
||||
sequences = find_compactible_shard_sequences(broker, 20, 19, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
|
||||
def test_find_compactible_four_donors_two_acceptors(self):
|
||||
small_ranges = (2, 3, 4, 7)
|
||||
broker = self._make_broker()
|
||||
@ -6412,6 +6442,28 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1)
|
||||
self.assertEqual([shard_ranges[:3], shard_ranges[3:]], sequences)
|
||||
|
||||
def test_find_compactible_eligible_states(self):
|
||||
# verify that compactible sequences only include shards in valid states
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'),
|
||||
('f', 'g'), ('g', 'h'), ('h', 'i'), ('i', 'j'), ('j', '')),
|
||||
state=[ShardRange.SHRINKING, ShardRange.ACTIVE, # ok, shrinking
|
||||
ShardRange.CREATED, # ineligible state
|
||||
ShardRange.ACTIVE, ShardRange.ACTIVE, # ok
|
||||
ShardRange.FOUND, # ineligible state
|
||||
ShardRange.SHARDED, # ineligible state
|
||||
ShardRange.ACTIVE, ShardRange.SHRINKING, # ineligible state
|
||||
ShardRange.SHARDING, # ineligible state
|
||||
])
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
own_sr.update_state(ShardRange.SHARDED)
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1,
|
||||
include_shrinking=True)
|
||||
self.assertEqual([shard_ranges[:2], shard_ranges[3:5], ], sequences)
|
||||
|
||||
def test_find_compactible_max_shrinking(self):
|
||||
# verify option to limit the number of shrinking shards per acceptor
|
||||
broker = self._make_broker()
|
||||
@ -6508,21 +6560,31 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
self.assertFalse(is_sharding_candidate(sr, 10))
|
||||
|
||||
def test_is_shrinking_candidate(self):
|
||||
states = (ShardRange.ACTIVE, ShardRange.SHRINKING)
|
||||
def do_check_true(state, ok_states):
|
||||
# shard range has 9 objects
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=9)
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10, 9, ok_states))
|
||||
|
||||
def do_check(state, object_count):
|
||||
do_check_true(ShardRange.ACTIVE, (ShardRange.ACTIVE,))
|
||||
do_check_true(ShardRange.ACTIVE,
|
||||
(ShardRange.ACTIVE, ShardRange.SHRINKING))
|
||||
do_check_true(ShardRange.SHRINKING,
|
||||
(ShardRange.ACTIVE, ShardRange.SHRINKING))
|
||||
|
||||
def do_check_false(state, object_count):
|
||||
states = (ShardRange.ACTIVE, ShardRange.SHRINKING)
|
||||
# shard range has 10 objects
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=object_count)
|
||||
if object_count < 10:
|
||||
if state == ShardRange.ACTIVE and object_count < 10:
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10))
|
||||
if state in states:
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10, states))
|
||||
else:
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, states))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 20))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 20, states))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 9))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, 9, states))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 20, 9))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 20, 9, states))
|
||||
|
||||
for state in ShardRange.STATES:
|
||||
for object_count in (9, 10, 11):
|
||||
for object_count in (10, 11):
|
||||
with annotate_failure('%s %s' % (state, object_count)):
|
||||
do_check(state, object_count)
|
||||
do_check_false(state, object_count)
|
||||
|
Loading…
Reference in New Issue
Block a user