sharder: Ignore already shrinking sequence when compacting
If a sequence of shard ranges is already shrinking then in some circumstances we do not want to report it as a candidate for shrinking. For backwards compatibility allow already shrinking sequences to be optionally included in return value of find_compactible_shard_sequences. Also refactor to add an is_shrinking_candidate() function. Change-Id: Ifa20b7c08aba7254185918dfcee69e8206f51cea
This commit is contained in:
parent
21a01e1c05
commit
e8df26a2b5
@ -124,6 +124,12 @@ def is_sharding_candidate(shard_range, threshold):
|
||||
shard_range.object_count >= threshold)
|
||||
|
||||
|
||||
def is_shrinking_candidate(shard_range, shrink_threshold, states=None):
|
||||
states = states or (ShardRange.ACTIVE,)
|
||||
return (shard_range.state in states and
|
||||
shard_range.object_count < shrink_threshold)
|
||||
|
||||
|
||||
def find_sharding_candidates(broker, threshold, shard_ranges=None):
|
||||
# this should only execute on root containers; the goal is to find
|
||||
# large shard containers that should be sharded.
|
||||
@ -148,7 +154,8 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size):
|
||||
merge_pairs = {}
|
||||
# restrict search to sequences with one donor
|
||||
results = find_compactible_shard_sequences(broker, shrink_threshold,
|
||||
merge_size, 1, -1)
|
||||
merge_size, 1, -1,
|
||||
include_shrinking=True)
|
||||
for sequence in results:
|
||||
# map acceptor -> donor list
|
||||
merge_pairs[sequence[-1]] = sequence[-2]
|
||||
@ -159,7 +166,8 @@ def find_compactible_shard_sequences(broker,
|
||||
shrink_threshold,
|
||||
merge_size,
|
||||
max_shrinking,
|
||||
max_expanding):
|
||||
max_expanding,
|
||||
include_shrinking=False):
|
||||
"""
|
||||
Find sequences of shard ranges that could be compacted into a single
|
||||
acceptor shard range.
|
||||
@ -201,12 +209,19 @@ def find_compactible_shard_sequences(broker,
|
||||
# - the total number of objects in the sequence has reached the
|
||||
# merge_size
|
||||
if (sequence and
|
||||
(sequence[-1].object_count >= shrink_threshold or
|
||||
(not is_shrinking_candidate(
|
||||
sequence[-1], shrink_threshold,
|
||||
states=(ShardRange.ACTIVE, ShardRange.SHRINKING)) or
|
||||
0 < max_shrinking < len(sequence) or
|
||||
sequence.object_count >= merge_size)):
|
||||
return True
|
||||
return False
|
||||
|
||||
def sequence_shrinking(sequence):
|
||||
# check for sequence that ia already shrinking
|
||||
return (sequence and any([sr.state == ShardRange.SHRINKING
|
||||
for sr in sequence]))
|
||||
|
||||
def find_compactible_sequence(shard_ranges_todo):
|
||||
compactible_sequence = ShardRangeList()
|
||||
object_count = 0
|
||||
@ -237,8 +252,8 @@ def find_compactible_shard_sequences(broker,
|
||||
|
||||
compactible_sequences = []
|
||||
index = 0
|
||||
while ((max_expanding < 0 or
|
||||
len(compactible_sequences) < max_expanding) and
|
||||
expanding = 0
|
||||
while ((max_expanding < 0 or expanding < max_expanding) and
|
||||
index < len(shard_ranges)):
|
||||
sequence, consumed = find_compactible_sequence(shard_ranges[index:])
|
||||
index += consumed
|
||||
@ -254,10 +269,17 @@ def find_compactible_shard_sequences(broker,
|
||||
# when *all* the remaining shard ranges can be simultaneously
|
||||
# shrunk to the root.
|
||||
sequence.append(own_shard_range)
|
||||
|
||||
if len(sequence) < 2 or sequence[-1].state not in (ShardRange.ACTIVE,
|
||||
ShardRange.SHARDED):
|
||||
# this sequence doesn't end with a suitable acceptor shard range
|
||||
continue
|
||||
|
||||
# all valid sequences are counted against the max_expanding allowance
|
||||
# even if the sequence is already shrinking
|
||||
expanding += 1
|
||||
if not sequence_shrinking(sequence) or include_shrinking:
|
||||
compactible_sequences.append(sequence)
|
||||
elif len(sequence) > 1 and sequence[-1].state == ShardRange.ACTIVE:
|
||||
compactible_sequences.append(sequence)
|
||||
# else: this sequence doesn't end with a suitable acceptor shard range
|
||||
|
||||
return compactible_sequences
|
||||
|
||||
@ -1627,7 +1649,8 @@ class ContainerSharder(ContainerReplicator):
|
||||
return
|
||||
|
||||
compactible_sequences = find_compactible_shard_sequences(
|
||||
broker, self.shrink_size, self.merge_size, 1, -1)
|
||||
broker, self.shrink_size, self.merge_size, 1, -1,
|
||||
include_shrinking=True)
|
||||
self.logger.debug('Found %s compactible sequences of length(s) %s' %
|
||||
(len(compactible_sequences),
|
||||
[len(s) for s in compactible_sequences]))
|
||||
|
@ -759,7 +759,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
|
||||
def do_compact():
|
||||
def do_compact(expect_msg):
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
@ -768,12 +768,11 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Updated 5 shard sequences for compaction.'],
|
||||
out_lines[:1])
|
||||
self.assertEqual([expect_msg], out_lines[:1])
|
||||
return broker.get_shard_ranges()
|
||||
|
||||
updated_ranges = do_compact()
|
||||
updated_ranges = do_compact(
|
||||
'Updated 5 shard sequences for compaction.')
|
||||
for acceptor in (1, 3, 5, 7, 9):
|
||||
shard_ranges[acceptor].lower = shard_ranges[acceptor - 1].lower
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
@ -781,7 +780,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
# check idempotency
|
||||
updated_ranges = do_compact()
|
||||
updated_ranges = do_compact('No shards identified for compaction.')
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5,
|
||||
[sr.state for sr in updated_ranges])
|
||||
@ -795,7 +794,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
|
||||
def do_compact():
|
||||
def do_compact(expect_msg):
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
@ -805,12 +804,11 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Updated 2 shard sequences for compaction.'],
|
||||
out_lines[:1])
|
||||
self.assertEqual([expect_msg], out_lines[:1])
|
||||
return broker.get_shard_ranges()
|
||||
|
||||
updated_ranges = do_compact()
|
||||
updated_ranges = do_compact(
|
||||
'Updated 2 shard sequences for compaction.')
|
||||
shard_ranges[7].lower = shard_ranges[0].lower
|
||||
shard_ranges[9].lower = shard_ranges[8].lower
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
@ -819,7 +817,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
# check idempotency
|
||||
updated_ranges = do_compact()
|
||||
updated_ranges = do_compact('No shards identified for compaction.')
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] +
|
||||
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
|
||||
@ -833,21 +831,24 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
sr.update_state(ShardRange.ACTIVE)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self._move_broker_to_sharded_state(broker)
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
# note: max_shrinking is set to 3 so that there is opportunity for more
|
||||
# than 2 acceptors
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'compact', '--yes',
|
||||
'--max-shrinking', '3', '--max-expanding', '2'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual(
|
||||
['Updated 2 shard sequences for compaction.'],
|
||||
out_lines[:1])
|
||||
updated_ranges = broker.get_shard_ranges()
|
||||
|
||||
def do_compact(expect_msg):
|
||||
out = StringIO()
|
||||
err = StringIO()
|
||||
# note: max_shrinking is set to 3 so that there is opportunity for
|
||||
# more than 2 acceptors
|
||||
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||
ret = main([broker.db_file, 'compact', '--yes',
|
||||
'--max-shrinking', '3', '--max-expanding', '2'])
|
||||
self.assertEqual(0, ret)
|
||||
err_lines = err.getvalue().split('\n')
|
||||
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
|
||||
out_lines = out.getvalue().split('\n')
|
||||
self.assertEqual([expect_msg], out_lines[:1])
|
||||
return broker.get_shard_ranges()
|
||||
|
||||
updated_ranges = do_compact(
|
||||
'Updated 2 shard sequences for compaction.')
|
||||
shard_ranges[3].lower = shard_ranges[0].lower
|
||||
shard_ranges[7].lower = shard_ranges[4].lower
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
@ -855,6 +856,14 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
[ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] * 3,
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
# check idempotency - no more sequences found while existing sequences
|
||||
# are shrinking
|
||||
updated_ranges = do_compact('No shards identified for compaction.')
|
||||
self.assertEqual(shard_ranges, updated_ranges)
|
||||
self.assertEqual([ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] +
|
||||
[ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] * 3,
|
||||
[sr.state for sr in updated_ranges])
|
||||
|
||||
def test_compact_expansion_limit(self):
|
||||
# verify option to limit the size of each acceptor after compaction
|
||||
broker = self._make_broker()
|
||||
|
@ -41,7 +41,8 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
||||
CleavingContext, DEFAULT_SHARD_SHRINK_POINT, \
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \
|
||||
find_shrinking_candidates, process_compactible_shard_sequences, \
|
||||
find_compactible_shard_sequences
|
||||
find_compactible_shard_sequences, is_shrinking_candidate, \
|
||||
is_sharding_candidate
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
||||
from test import annotate_failure
|
||||
@ -5904,13 +5905,15 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
threshold = (DEFAULT_SHARD_SHRINK_POINT *
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD / 100)
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
shard_bounds, state=ShardRange.ACTIVE, object_count=threshold)
|
||||
shard_bounds, state=ShardRange.ACTIVE, object_count=threshold,
|
||||
timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||
self.assertEqual({}, pairs)
|
||||
|
||||
# one range just below threshold
|
||||
shard_ranges[0].update_meta(threshold - 1, 0)
|
||||
shard_ranges[0].update_meta(threshold - 1, 0,
|
||||
meta_timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges(shard_ranges[0])
|
||||
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||
self.assertEqual(1, len(pairs), pairs)
|
||||
@ -5919,19 +5922,35 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
self.assertEqual(shard_ranges[0], donor)
|
||||
|
||||
# two ranges just below threshold
|
||||
shard_ranges[2].update_meta(threshold - 1, 0)
|
||||
shard_ranges[2].update_meta(threshold - 1, 0,
|
||||
meta_timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges(shard_ranges[2])
|
||||
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||
|
||||
# shenanigans to work around dicts with ShardRanges keys not comparing
|
||||
acceptors = []
|
||||
donors = []
|
||||
for acceptor, donor in pairs.items():
|
||||
acceptors.append(acceptor)
|
||||
donors.append(donor)
|
||||
acceptors.sort(key=ShardRange.sort_key)
|
||||
donors.sort(key=ShardRange.sort_key)
|
||||
self.assertEqual([shard_ranges[1], shard_ranges[3]], acceptors)
|
||||
self.assertEqual([shard_ranges[0], shard_ranges[2]], donors)
|
||||
def check_pairs(pairs):
|
||||
acceptors = []
|
||||
donors = []
|
||||
for acceptor, donor in pairs.items():
|
||||
acceptors.append(acceptor)
|
||||
donors.append(donor)
|
||||
acceptors.sort(key=ShardRange.sort_key)
|
||||
donors.sort(key=ShardRange.sort_key)
|
||||
self.assertEqual([shard_ranges[1], shard_ranges[3]], acceptors)
|
||||
self.assertEqual([shard_ranges[0], shard_ranges[2]], donors)
|
||||
|
||||
check_pairs(pairs)
|
||||
|
||||
# repeat call after broker is updated and expect same pairs
|
||||
shard_ranges[0].update_state(ShardRange.SHRINKING, next(self.ts_iter))
|
||||
shard_ranges[2].update_state(ShardRange.SHRINKING, next(self.ts_iter))
|
||||
shard_ranges[1].lower = shard_ranges[0].lower
|
||||
shard_ranges[1].timestamp = next(self.ts_iter)
|
||||
shard_ranges[3].lower = shard_ranges[2].lower
|
||||
shard_ranges[3].timestamp = next(self.ts_iter)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
pairs = find_shrinking_candidates(broker, threshold, threshold * 4)
|
||||
check_pairs(pairs)
|
||||
|
||||
def test_finalize_shrinking(self):
|
||||
broker = self._make_broker()
|
||||
@ -6072,15 +6091,26 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
# acceptor
|
||||
broker = self._make_broker()
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
(('', ''),),
|
||||
state=ShardRange.ACTIVE)
|
||||
(('', ''),), state=ShardRange.ACTIVE, timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
own_sr = broker.get_own_shard_range()
|
||||
own_sr.update_state(ShardRange.SHARDED)
|
||||
own_sr.update_state(ShardRange.SHARDED, next(self.ts_iter))
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1)
|
||||
self.assertEqual([shard_ranges + [own_sr]], sequences)
|
||||
|
||||
# update broker with donor/acceptor
|
||||
shard_ranges[0].update_state(ShardRange.SHRINKING, next(self.ts_iter))
|
||||
own_sr.update_state(ShardRange.ACTIVE, next(self.ts_iter))
|
||||
broker.merge_shard_ranges([shard_ranges[0], own_sr])
|
||||
# we don't find the same sequence again...
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1)
|
||||
self.assertEqual([], sequences)
|
||||
# ...unless explicitly requesting it
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, -1, -1,
|
||||
include_shrinking=True)
|
||||
self.assertEqual([shard_ranges + [own_sr]], sequences)
|
||||
|
||||
def test_find_compactible_donors_but_no_suitable_acceptor(self):
|
||||
# if shard ranges are already shrinking, check that the final one is
|
||||
# not made into an acceptor if a suitable adjacent acceptor is not
|
||||
@ -6140,6 +6170,26 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
self.assertEqual(
|
||||
[shard_ranges[:4], shard_ranges[4:8], shard_ranges[8:]], sequences)
|
||||
|
||||
# commit the first two sequences to the broker
|
||||
for sr in shard_ranges[:3] + shard_ranges[4:7]:
|
||||
sr.update_state(ShardRange.SHRINKING,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
shard_ranges[3].lower = shard_ranges[0].lower
|
||||
shard_ranges[3].timestamp = next(self.ts_iter)
|
||||
shard_ranges[7].lower = shard_ranges[4].lower
|
||||
shard_ranges[7].timestamp = next(self.ts_iter)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
# we don't find them again...
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, 3, 2)
|
||||
self.assertEqual([], sequences)
|
||||
# ...unless requested explicitly
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, 3, 2,
|
||||
include_shrinking=True)
|
||||
self.assertEqual([shard_ranges[:4], shard_ranges[4:8]], sequences)
|
||||
# we could find another if max_expanding is increased
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 999, 3, 3)
|
||||
self.assertEqual([shard_ranges[8:]], sequences)
|
||||
|
||||
def test_find_compactible_shrink_threshold(self):
|
||||
# verify option to set the shrink threshold for compaction;
|
||||
broker = self._make_broker()
|
||||
@ -6173,3 +6223,34 @@ class TestSharderFunctions(BaseTestSharder):
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
sequences = find_compactible_shard_sequences(broker, 10, 33, -1, -1)
|
||||
self.assertEqual([shard_ranges[:4], shard_ranges[7:]], sequences)
|
||||
|
||||
def test_is_sharding_candidate(self):
|
||||
for state in ShardRange.STATES:
|
||||
for object_count in (9, 10, 11):
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=object_count)
|
||||
with annotate_failure('%s %s' % (state, object_count)):
|
||||
if state == ShardRange.ACTIVE and object_count >= 10:
|
||||
self.assertTrue(is_sharding_candidate(sr, 10))
|
||||
else:
|
||||
self.assertFalse(is_sharding_candidate(sr, 10))
|
||||
|
||||
def test_is_shrinking_candidate(self):
|
||||
states = (ShardRange.ACTIVE, ShardRange.SHRINKING)
|
||||
|
||||
def do_check(state, object_count):
|
||||
sr = ShardRange('.shards_a/c', next(self.ts_iter), '', '',
|
||||
state=state, object_count=object_count)
|
||||
if object_count < 10:
|
||||
if state == ShardRange.ACTIVE and object_count < 10:
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10))
|
||||
if state in states:
|
||||
self.assertTrue(is_shrinking_candidate(sr, 10, states))
|
||||
else:
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10))
|
||||
self.assertFalse(is_shrinking_candidate(sr, 10, states))
|
||||
|
||||
for state in ShardRange.STATES:
|
||||
for object_count in (9, 10, 11):
|
||||
with annotate_failure('%s %s' % (state, object_count)):
|
||||
do_check(state, object_count)
|
||||
|
Loading…
x
Reference in New Issue
Block a user