Add shrink candidates to recon dump
This patch adds shrinking candidates to the sharding recon dump shrinking candidates will always be SHARDED root containers. And get added to the candidate list if they have any ranges that are compactible, that is to say they have ranges that can be compacted into an upper neighbour. The shrinking_candidates data comes out something like: { 'found': 1, 'top': [ { 'object_count': <some number>, 'account': 'a', 'meta_timestamp': <ts1>, 'container': 'c', 'file_size': <something>, 'path': <something>, 'root': <something>, 'node_index': 0, 'compactible_ranges': 2 }] } In this case 'compactible_ranges' is the number of donors that can be shrunk in a single command. Change-Id: I63fc9ae39e164c2ce82865d055527b52c86b5b2a
This commit is contained in:
parent
e8df26a2b5
commit
13b17af45e
@ -17,6 +17,7 @@ import errno
|
||||
import json
|
||||
import time
|
||||
from collections import defaultdict
|
||||
from operator import itemgetter
|
||||
from random import random
|
||||
|
||||
import os
|
||||
@ -514,6 +515,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
conf.get('cleave_row_batch_size', 10000))
|
||||
self.auto_shard = config_true_value(conf.get('auto_shard', False))
|
||||
self.sharding_candidates = []
|
||||
self.shrinking_candidates = []
|
||||
self.recon_candidates_limit = int(
|
||||
conf.get('recon_candidates_limit', 5))
|
||||
self.broker_timeout = config_positive_int_value(
|
||||
@ -567,6 +569,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
# stats are maintained under the 'sharding' key in self.stats
|
||||
self.stats['sharding'] = defaultdict(lambda: defaultdict(int))
|
||||
self.sharding_candidates = []
|
||||
self.shrinking_candidates = []
|
||||
|
||||
def _append_stat(self, category, key, value):
|
||||
if not self.stats['sharding'][category][key]:
|
||||
@ -615,11 +618,26 @@ class ContainerSharder(ContainerReplicator):
|
||||
self.sharding_candidates.append(
|
||||
self._make_stats_info(broker, node, own_shard_range))
|
||||
|
||||
def _transform_sharding_candidate_stats(self):
|
||||
category = self.stats['sharding']['sharding_candidates']
|
||||
candidates = self.sharding_candidates
|
||||
def _identify_shrinking_candidate(self, broker, node):
|
||||
sequences = find_compactible_shard_sequences(
|
||||
broker, self.shrink_size, self.merge_size,
|
||||
1, -1)
|
||||
_, compactible_ranges = process_compactible_shard_sequences(
|
||||
sequences, Timestamp.now())
|
||||
|
||||
if compactible_ranges:
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
shrink_candidate = self._make_stats_info(
|
||||
broker, node, own_shard_range)
|
||||
# The number of ranges/donors that can be shrunk if the
|
||||
# tool is used with the current max_shrinking, max_expanding
|
||||
# settings.
|
||||
shrink_candidate['compactible_ranges'] = len(compactible_ranges)
|
||||
self.shrinking_candidates.append(shrink_candidate)
|
||||
|
||||
def _transform_candidate_stats(self, category, candidates, sort_keys):
|
||||
category['found'] = len(candidates)
|
||||
candidates.sort(key=lambda c: c['object_count'], reverse=True)
|
||||
candidates.sort(key=itemgetter(*sort_keys), reverse=True)
|
||||
if self.recon_candidates_limit >= 0:
|
||||
category['top'] = candidates[:self.recon_candidates_limit]
|
||||
else:
|
||||
@ -667,7 +685,16 @@ class ContainerSharder(ContainerReplicator):
|
||||
msg = ' '.join(['%s:%s' % (k, str(stats[k])) for k in keys])
|
||||
self.logger.info('Since %s %s - %s', last_report, category, msg)
|
||||
|
||||
self._transform_sharding_candidate_stats()
|
||||
# transform the sharding and shrinking candidate states
|
||||
# first sharding
|
||||
category = self.stats['sharding']['sharding_candidates']
|
||||
self._transform_candidate_stats(category, self.sharding_candidates,
|
||||
sort_keys=('object_count',))
|
||||
|
||||
# next shrinking
|
||||
category = self.stats['sharding']['shrinking_candidates']
|
||||
self._transform_candidate_stats(category, self.shrinking_candidates,
|
||||
sort_keys=('compactible_ranges',))
|
||||
|
||||
dump_recon_cache(
|
||||
{'sharding_stats': self.stats,
|
||||
@ -1770,6 +1797,8 @@ class ContainerSharder(ContainerReplicator):
|
||||
quote(broker.path))
|
||||
|
||||
if state == SHARDED and broker.is_root_container():
|
||||
# look for shrink stats
|
||||
self._identify_shrinking_candidate(broker, node)
|
||||
if is_leader:
|
||||
self._find_and_enable_shrinking_candidates(broker)
|
||||
self._find_and_enable_sharding_candidates(broker)
|
||||
|
@ -469,6 +469,12 @@ class TestSharder(BaseTestSharder):
|
||||
for call in fake_process_broker_calls[:2]]
|
||||
}
|
||||
})
|
||||
fake_stats.update({
|
||||
'shrinking_candidates': {
|
||||
'found': 0,
|
||||
'top': []
|
||||
}
|
||||
})
|
||||
check_recon(recon_data[0], sum(fake_periods[1:3]),
|
||||
sum(fake_periods[:3]), fake_stats)
|
||||
# periodic stats report after first broker has been visited during
|
||||
@ -5464,6 +5470,231 @@ class TestSharder(BaseTestSharder):
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx, "")
|
||||
|
||||
def test_shrinking_candidate_recon_dump(self):
|
||||
conf = {'recon_cache_path': self.tempdir,
|
||||
'devices': self.tempdir}
|
||||
|
||||
shard_bounds = (
|
||||
('', 'd'), ('d', 'g'), ('g', 'l'), ('l', 'o'), ('o', 't'),
|
||||
('t', 'x'), ('x', ''))
|
||||
|
||||
with self._mock_sharder(conf) as sharder:
|
||||
brokers = []
|
||||
shard_ranges = []
|
||||
C1, C2, C3 = 0, 1, 2
|
||||
|
||||
for container in ('c1', 'c2', 'c3'):
|
||||
broker = self._make_broker(
|
||||
container=container, hash_=container + 'hash',
|
||||
device=sharder.ring.devs[0]['device'], part=0)
|
||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||
('true', next(self.ts_iter).internal)})
|
||||
my_sr = broker.get_own_shard_range()
|
||||
my_sr.epoch = Timestamp.now()
|
||||
broker.merge_shard_ranges([my_sr])
|
||||
brokers.append(broker)
|
||||
shard_ranges.append(self._make_shard_ranges(
|
||||
shard_bounds, state=ShardRange.ACTIVE,
|
||||
object_count=(DEFAULT_SHARD_CONTAINER_THRESHOLD / 2),
|
||||
timestamp=next(self.ts_iter)))
|
||||
|
||||
# we want c2 to have 2 shrink pairs
|
||||
shard_ranges[C2][1].object_count = 0
|
||||
shard_ranges[C2][3].object_count = 0
|
||||
brokers[C2].merge_shard_ranges(shard_ranges[C2])
|
||||
brokers[C2].set_sharding_state()
|
||||
brokers[C2].set_sharded_state()
|
||||
|
||||
# we want c1 to have the same, but one can't be shrunk
|
||||
shard_ranges[C1][1].object_count = 0
|
||||
shard_ranges[C1][2].object_count = \
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD - 1
|
||||
shard_ranges[C1][3].object_count = 0
|
||||
brokers[C1].merge_shard_ranges(shard_ranges[C1])
|
||||
brokers[C1].set_sharding_state()
|
||||
brokers[C1].set_sharded_state()
|
||||
|
||||
# c3 we want to have more total_sharding donors then can be sharded
|
||||
# in one go.
|
||||
shard_ranges[C3][0].object_count = 0
|
||||
shard_ranges[C3][1].object_count = 0
|
||||
shard_ranges[C3][2].object_count = 0
|
||||
shard_ranges[C3][3].object_count = 0
|
||||
shard_ranges[C3][4].object_count = 0
|
||||
shard_ranges[C3][5].object_count = 0
|
||||
brokers[C3].merge_shard_ranges(shard_ranges[C3])
|
||||
brokers[C3].set_sharding_state()
|
||||
brokers[C3].set_sharded_state()
|
||||
|
||||
node = {'ip': '10.0.0.0', 'replication_ip': '10.0.1.0',
|
||||
'port': 1000, 'replication_port': 1100,
|
||||
'device': 'sda', 'zone': 0, 'region': 0, 'id': 1,
|
||||
'index': 0}
|
||||
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 3,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
'file_size': os.stat(brokers[C3].db_file).st_size,
|
||||
'path': brokers[C3].db_file,
|
||||
'root': brokers[C3].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 3
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C2].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C2].container,
|
||||
'file_size': os.stat(brokers[1].db_file).st_size,
|
||||
'path': brokers[C2].db_file,
|
||||
'root': brokers[C2].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 2
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# check shrinking stats are reset
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# set some ranges to shrinking and check that stats are updated; in
|
||||
# this case the container C2 no longer has any shrinkable ranges
|
||||
# and no longer appears in stats
|
||||
def shrink_actionable_ranges(broker):
|
||||
compactible = find_compactible_shard_sequences(
|
||||
broker, sharder.shrink_size, sharder.merge_size, 1, -1)
|
||||
self.assertNotEqual([], compactible)
|
||||
timestamp = next(self.ts_iter)
|
||||
acceptors, donors = process_compactible_shard_sequences(
|
||||
compactible, timestamp)
|
||||
finalize_shrinking(broker, acceptors, donors, timestamp)
|
||||
|
||||
shrink_actionable_ranges(brokers[C2])
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 2,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
'file_size': os.stat(brokers[C3].db_file).st_size,
|
||||
'path': brokers[C3].db_file,
|
||||
'root': brokers[C3].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 3
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# set some ranges to shrinking and check that stats are updated; in
|
||||
# this case the container C3 no longer has any actionable ranges
|
||||
# and no longer appears in stats
|
||||
shrink_actionable_ranges(brokers[C3])
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 1,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
# set some ranges to shrunk in C3 so that other sequences become
|
||||
# compactible
|
||||
now = next(self.ts_iter)
|
||||
shard_ranges = brokers[C3].get_shard_ranges()
|
||||
for (donor, acceptor) in zip(shard_ranges, shard_ranges[1:]):
|
||||
if donor.state == ShardRange.SHRINKING:
|
||||
donor.update_state(ShardRange.SHRUNK, state_timestamp=now)
|
||||
donor.set_deleted(timestamp=now)
|
||||
acceptor.lower = donor.lower
|
||||
acceptor.timestamp = now
|
||||
brokers[C3].merge_shard_ranges(shard_ranges)
|
||||
sharder._zero_stats()
|
||||
for broker in brokers:
|
||||
sharder._identify_shrinking_candidate(broker, node)
|
||||
sharder._report_stats()
|
||||
expected_shrinking_candidates_data = {
|
||||
'found': 2,
|
||||
'top': [
|
||||
{
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C3].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C3].container,
|
||||
'file_size': os.stat(brokers[C3].db_file).st_size,
|
||||
'path': brokers[C3].db_file,
|
||||
'root': brokers[C3].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 2
|
||||
}, {
|
||||
'object_count': mock.ANY,
|
||||
'account': brokers[C1].account,
|
||||
'meta_timestamp': mock.ANY,
|
||||
'container': brokers[C1].container,
|
||||
'file_size': os.stat(brokers[C1].db_file).st_size,
|
||||
'path': brokers[C1].db_file,
|
||||
'root': brokers[C1].path,
|
||||
'node_index': 0,
|
||||
'compactible_ranges': 1
|
||||
}
|
||||
]}
|
||||
self._assert_recon_stats(expected_shrinking_candidates_data,
|
||||
sharder, 'shrinking_candidates')
|
||||
|
||||
|
||||
class TestCleavingContext(BaseTestSharder):
|
||||
def test_init(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user