Add a config file option to swift-manage-shard-ranges

While working on the shrinking recon drops, we want to display numbers
that directly relate to how tool should behave. But currently all
options of the s-m-s-r tool is driven by cli options.

This creates a disconnect, defining what should be used in the sharder
and in the tool via options are bound for failure. It would be much
better to be able to define the required default options for your
environment in one place that both the sharder and tool could use.

This patch does some refactoring and adding max_shrinking and
max_expanding options to the sharding config. As well as adds a
--config option to the tool.

The --config option expects a config with at '[container-sharder]'
section. It only supports the shard options:
 - max_shrinking
 - max_expanding
 - shard_container_threshold
 - shard_shrink_point
 - shard_merge_point

The latter 2 are used to generate the s-m-s-r's:
 - shrink_threshold
 - expansion_limit
 - rows_per_shard

Use of cli arguments take precedence over that of the config.

Change-Id: I4d0147ce284a1a318b3cd88975e060956d186aec
This commit is contained in:
Matthew Oliver 2021-02-09 15:30:16 +11:00
parent b8aefd750e
commit fb186f6710
8 changed files with 292 additions and 40 deletions

View File

@ -326,7 +326,7 @@ shard_container_threshold 1000000 When auto-sharding is
shrinking and merging
shard containers.
shard_shrink_point 5 When auto-sharding is
shard_shrink_point 10 When auto-sharding is
enabled this defines the
object count below which
a 'donor' shard container
@ -338,7 +338,7 @@ shard_shrink_point 5 When auto-sharding is
percentage of
shard_container_threshold
e.g. the default value of
5 means 5% of the
10 means 10% of the
shard_container_threshold.
shard_shrink_merge_point 75 When auto-sharding is

View File

@ -377,7 +377,7 @@ use = egg:swift#xprofile
# another 'acceptor' shard container. shard_shrink_point is a percentage of
# shard_container_threshold e.g. the default value of 5 means 5% of the
# shard_container_threshold.
# shard_shrink_point = 5
# shard_shrink_point = 10
#
# When auto-sharding is enabled shard_shrink_merge_point defines the maximum
# allowed size of an acceptor shard container after having a donor merged into
@ -407,6 +407,16 @@ use = egg:swift#xprofile
# sharding container and merged to a shard container during cleaving.
# cleave_row_batch_size = 10000
#
# max_expanding defines the maximum number of shards that could be expanded in a
# single cycle of the sharder. Defaults to unlimited (-1).
# max_expanding = -1
#
# max_shrinking defines the maximum number of shards that should be shrunk into
# each expanding shard. Defaults to 1.
# NOTE: Using values greater than 1 may result in temporary gaps in object listings
# until all selected shards have shrunk.
# max_shrinking = 1
#
# Defines the number of successfully replicated shard dbs required when
# cleaving a previously uncleaved shard range before the sharder will progress
# to the next shard range. The value should be less than or equal to the

View File

@ -164,16 +164,19 @@ import time
from six.moves import input
from swift.common.utils import Timestamp, get_logger, ShardRange
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
config_percent_value, config_positive_int_value
from swift.container.backend import ContainerBroker, UNSHARDED
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
CleavingContext, process_compactible_shard_sequences, \
find_compactible_shard_sequences, find_overlapping_ranges
find_compactible_shard_sequences, find_overlapping_ranges, \
DEFAULT_MAX_SHRINKING, DEFAULT_MAX_EXPANDING, \
DEFAULT_SHARD_CONTAINER_THRESHOLD, DEFAULT_SHARD_SHRINK_POINT, \
DEFAULT_SHARD_MERGE_POINT
DEFAULT_ROWS_PER_SHARD = 500000
DEFAULT_SHRINK_THRESHOLD = 10000
DEFAULT_MAX_SHRINKING = 1
DEFAULT_MAX_EXPANDING = -1
DEFAULT_ROWS_PER_SHARD = DEFAULT_SHARD_CONTAINER_THRESHOLD // 2
DEFAULT_SHRINK_THRESHOLD = DEFAULT_SHARD_CONTAINER_THRESHOLD * \
config_percent_value(DEFAULT_SHARD_SHRINK_POINT)
def _print_shard_range(sr, level=0):
@ -489,7 +492,7 @@ def _positive_int(arg):
def _add_find_args(parser):
parser.add_argument('rows_per_shard', nargs='?', type=int,
default=DEFAULT_ROWS_PER_SHARD)
default=None)
def _add_replace_args(parser):
@ -516,6 +519,9 @@ def _add_enable_args(parser):
def _make_parser():
parser = argparse.ArgumentParser(description='Manage shard ranges')
parser.add_argument('container_db')
parser.add_argument('--config', dest='conf_file', required=False,
help='Path to config file with [container-sharder] '
'section')
parser.add_argument('--verbose', '-v', action='count', default=0,
help='Increase output verbosity')
subparsers = parser.add_subparsers(
@ -589,13 +595,13 @@ def _make_parser():
help='Apply shard range changes to broker without prompting.')
compact_parser.add_argument('--shrink-threshold', nargs='?',
type=_positive_int,
default=DEFAULT_SHRINK_THRESHOLD,
default=None,
help='The number of rows below which a shard '
'can qualify for shrinking. Defaults to '
'%d' % DEFAULT_SHRINK_THRESHOLD)
compact_parser.add_argument('--expansion-limit', nargs='?',
type=_positive_int,
default=DEFAULT_ROWS_PER_SHARD,
default=None,
help='Maximum number of rows for an expanding '
'shard to have after compaction has '
'completed. Defaults to %d' %
@ -608,7 +614,7 @@ def _make_parser():
# temporary gap(s) in object listings where the shrunk donors are missing.
compact_parser.add_argument('--max-shrinking', nargs='?',
type=_positive_int,
default=DEFAULT_MAX_SHRINKING,
default=None,
help='Maximum number of shards that should be '
'shrunk into each expanding shard. '
'Defaults to 1. Using values greater '
@ -617,7 +623,7 @@ def _make_parser():
'shards have shrunk.')
compact_parser.add_argument('--max-expanding', nargs='?',
type=_positive_int,
default=DEFAULT_MAX_EXPANDING,
default=None,
help='Maximum number of shards that should be '
'expanded. Defaults to unlimited.')
compact_parser.set_defaults(func=compact_shard_ranges)
@ -637,7 +643,47 @@ def main(args=None):
parser.print_help()
print('\nA sub-command is required.')
return 1
logger = get_logger({}, name='ContainerBroker', log_to_console=True)
conf = {}
rows_per_shard = DEFAULT_ROWS_PER_SHARD
shrink_threshold = DEFAULT_SHRINK_THRESHOLD
expansion_limit = DEFAULT_ROWS_PER_SHARD
if args.conf_file:
try:
conf = readconf(args.conf_file, 'container-sharder')
shard_container_threshold = config_positive_int_value(conf.get(
'shard_container_threshold',
DEFAULT_SHARD_CONTAINER_THRESHOLD))
if shard_container_threshold:
rows_per_shard = shard_container_threshold // 2
shrink_threshold = int(
shard_container_threshold * config_percent_value(
conf.get('shard_shrink_point',
DEFAULT_SHARD_SHRINK_POINT)))
expansion_limit = int(
shard_container_threshold * config_percent_value(
conf.get('shard_shrink_merge_point',
DEFAULT_SHARD_MERGE_POINT)))
except Exception as exc:
print('Error opening config file %s: %s' % (args.conf_file, exc),
file=sys.stderr)
return 2
# seems having sub parsers mean sometimes an arg wont exist in the args
# namespace. But we can check if it is with the 'in' statement.
if "max_shrinking" in args and args.max_shrinking is None:
args.max_shrinking = int(conf.get(
"max_shrinking", DEFAULT_MAX_SHRINKING))
if "max_expanding" in args and args.max_expanding is None:
args.max_expanding = int(conf.get(
"max_expanding", DEFAULT_MAX_EXPANDING))
if "shrink_threshold" in args and args.shrink_threshold is None:
args.shrink_threshold = shrink_threshold
if "expansion_limit" in args and args.expansion_limit is None:
args.expansion_limit = expansion_limit
if "rows_per_shard" in args and args.rows_per_shard is None:
args.rows_per_shard = rows_per_shard
logger = get_logger(conf, name='ContainerBroker', log_to_console=True)
broker = ContainerBroker(os.path.realpath(args.container_db),
logger=logger, skip_commits=True)
try:

View File

@ -479,6 +479,13 @@ def config_auto_int_value(value, default):
return value
def config_percent_value(value):
try:
return config_float_value(value, 0, 100) / 100.0
except ValueError as err:
raise ValueError("%s: %s" % (str(err), value))
def append_underscore(prefix):
if prefix and not prefix.endswith('_'):
prefix += '_'

View File

@ -35,9 +35,8 @@ from swift.common.ring.utils import is_local_device
from swift.common.swob import str_to_wsgi
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
config_float_value, config_positive_int_value, \
quorum_size, parse_override_options, Everything, config_auto_int_value, \
ShardRangeList
config_positive_int_value, quorum_size, parse_override_options, \
Everything, config_auto_int_value, ShardRangeList, config_percent_value
from swift.container.backend import ContainerBroker, \
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
SHARD_UPDATE_STATES
@ -469,8 +468,10 @@ class CleavingContext(object):
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000
DEFAULT_SHARD_SHRINK_POINT = 25
DEFAULT_SHARD_SHRINK_POINT = 10
DEFAULT_SHARD_MERGE_POINT = 75
DEFAULT_MAX_SHRINKING = 1
DEFAULT_MAX_EXPANDING = -1
class ContainerSharder(ContainerReplicator):
@ -491,18 +492,10 @@ class ContainerSharder(ContainerReplicator):
else:
auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX
self.shards_account_prefix = (auto_create_account_prefix + 'shards_')
def percent_value(key, default):
try:
value = conf.get(key, default)
return config_float_value(value, 0, 100) / 100.0
except ValueError as err:
raise ValueError("%s: %s" % (str(err), key))
self.shard_shrink_point = percent_value('shard_shrink_point',
DEFAULT_SHARD_SHRINK_POINT)
self.shrink_merge_point = percent_value('shard_shrink_merge_point',
DEFAULT_SHARD_MERGE_POINT)
self.shard_shrink_point = config_percent_value(
conf.get('shard_shrink_point', DEFAULT_SHARD_SHRINK_POINT))
self.shrink_merge_point = config_percent_value(
conf.get('shard_shrink_merge_point', DEFAULT_SHARD_MERGE_POINT))
self.shard_container_threshold = config_positive_int_value(
conf.get('shard_container_threshold',
DEFAULT_SHARD_CONTAINER_THRESHOLD))
@ -517,6 +510,10 @@ class ContainerSharder(ContainerReplicator):
conf.get('cleave_batch_size', 2))
self.cleave_row_batch_size = config_positive_int_value(
conf.get('cleave_row_batch_size', 10000))
self.max_shrinking = int(conf.get('max_shrinking',
DEFAULT_MAX_SHRINKING))
self.max_expanding = int(conf.get('max_expanding',
DEFAULT_MAX_EXPANDING))
self.auto_shard = config_true_value(conf.get('auto_shard', False))
self.sharding_candidates = []
self.shrinking_candidates = []
@ -627,7 +624,7 @@ class ContainerSharder(ContainerReplicator):
def _identify_shrinking_candidate(self, broker, node):
sequences = find_compactible_shard_sequences(
broker, self.shrink_size, self.merge_size,
1, -1)
self.max_shrinking, self.max_expanding)
# compactible_ranges are all apart from final acceptor in each sequence
compactible_ranges = sum(len(seq) - 1 for seq in sequences)
@ -1694,8 +1691,8 @@ class ContainerSharder(ContainerReplicator):
return
compactible_sequences = find_compactible_shard_sequences(
broker, self.shrink_size, self.merge_size, 1, -1,
include_shrinking=True)
broker, self.shrink_size, self.merge_size, self.max_shrinking,
self.max_expanding, include_shrinking=True)
self.logger.debug('Found %s compactible sequences of length(s) %s' %
(len(compactible_sequences),
[len(s) for s in compactible_sequences]))

View File

@ -13,6 +13,8 @@
import json
import os
import unittest
from argparse import Namespace
from textwrap import dedent
import mock
from shutil import rmtree
@ -93,6 +95,168 @@ class TestManageShardRanges(unittest.TestCase):
broker.merge_shard_ranges([own_sr])
return epoch
def test_conf_file_options(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file, account='a', container='c')
broker.initialize()
conf = """
[container-sharder]
shard_shrink_point = 15
shard_shrink_merge_point = 65
shard_container_threshold = 1000
max_shrinking = 33
max_expanding = 31
"""
conf_file = os.path.join(self.testdir, 'sharder.conf')
with open(conf_file, 'w') as fd:
fd.write(dedent(conf))
# default values
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
main([db_file, 'find'])
expected = Namespace(conf_file=None,
container_db=mock.ANY,
func=mock.ANY,
rows_per_shard=500000,
subcommand='find',
verbose=0)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
main([db_file, '--config', conf_file, 'find'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
func=mock.ANY,
rows_per_shard=500,
subcommand='find',
verbose=0)
mocked.assert_called_once_with(mock.ANY, expected)
# cli options override conf file
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
main([db_file, '--config', conf_file, 'find', '12345'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
func=mock.ANY,
rows_per_shard=12345,
subcommand='find',
verbose=0)
mocked.assert_called_once_with(mock.ANY, expected)
# default values
with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \
as mocked:
main([db_file, 'compact'])
expected = Namespace(conf_file=None,
container_db=mock.ANY,
func=mock.ANY,
subcommand='compact',
verbose=0,
max_expanding=-1,
max_shrinking=1,
shrink_threshold=100000,
expansion_limit=500000,
yes=False)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file
with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \
as mocked:
main([db_file, '--config', conf_file, 'compact'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
func=mock.ANY,
subcommand='compact',
verbose=0,
max_expanding=31,
max_shrinking=33,
shrink_threshold=150,
expansion_limit=650,
yes=False)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file - small percentages resulting in zero absolute values
# should be respected rather than falling back to defaults, to avoid
# nasty surprises
conf = """
[container-sharder]
shard_shrink_point = 1
shard_shrink_merge_point = 2
shard_container_threshold = 10
max_shrinking = 33
max_expanding = 31
"""
conf_file = os.path.join(self.testdir, 'sharder.conf')
with open(conf_file, 'w') as fd:
fd.write(dedent(conf))
with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \
as mocked:
main([db_file, '--config', conf_file, 'compact'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
func=mock.ANY,
subcommand='compact',
verbose=0,
max_expanding=31,
max_shrinking=33,
shrink_threshold=0,
expansion_limit=0,
yes=False)
mocked.assert_called_once_with(mock.ANY, expected)
# cli options
with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \
as mocked:
main([db_file, '--config', conf_file, 'compact',
'--max-shrinking', '22',
'--max-expanding', '11',
'--expansion-limit', '3456',
'--shrink-threshold', '1234'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
func=mock.ANY,
subcommand='compact',
verbose=0,
max_expanding=11,
max_shrinking=22,
shrink_threshold=1234,
expansion_limit=3456,
yes=False)
mocked.assert_called_once_with(mock.ANY, expected)
# conf file - invalid value for shard_container_threshold
conf = """
[container-sharder]
shard_shrink_point = 1
shard_shrink_merge_point = 2
shard_container_threshold = 0
max_shrinking = 33
max_expanding = 31
"""
conf_file = os.path.join(self.testdir, 'sharder.conf')
with open(conf_file, 'w') as fd:
fd.write(dedent(conf))
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, '--config', conf_file, 'compact'])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error opening config file')
# conf file - cannot open conf file
conf_file = os.path.join(self.testdir, 'missing_sharder.conf')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, '--config', conf_file, 'compact'])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Error opening config file')
def test_find_shard_ranges(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)

View File

@ -3048,6 +3048,30 @@ cluster_dfw1 = http://dfw1.host/v1/
self.assertIn('greater than %s' % minimum, cm.exception.args[0])
self.assertIn('less than %s' % maximum, cm.exception.args[0])
def test_config_percent_value(self):
for arg, expected in (
(99, 0.99),
(25.5, 0.255),
('99', 0.99),
('25.5', 0.255),
(0, 0.0),
('0', 0.0),
('100', 1.0),
(100, 1.0),
(1, 0.01),
('1', 0.01),
(25, 0.25)):
actual = utils.config_percent_value(arg)
self.assertEqual(expected, actual)
# bad values
for val in (-1, '-1', 101, '101'):
with self.assertRaises(ValueError) as cm:
utils.config_percent_value(val)
self.assertIn('Config option must be a number, greater than 0, '
'less than 100, not "{}"'.format(val),
cm.exception.args[0])
def test_config_auto_int_value(self):
expectations = {
# (value, default) : expected,

View File

@ -160,7 +160,7 @@ class TestSharder(BaseTestSharder):
'rsync_compress': False,
'rsync_module': '{replication_ip}::container',
'reclaim_age': 86400 * 7,
'shard_shrink_point': 0.25,
'shard_shrink_point': 0.10,
'shrink_merge_point': 0.75,
'shard_container_threshold': 1000000,
'split_size': 500000,
@ -172,7 +172,9 @@ class TestSharder(BaseTestSharder):
'recon_candidates_limit': 5,
'recon_sharded_timeout': 43200,
'shard_replication_quorum': 2,
'existing_shard_replication_quorum': 2
'existing_shard_replication_quorum': 2,
'max_shrinking': 1,
'max_expanding': -1
}
sharder, mock_ic = do_test({}, expected, logger=None)
self.assertEqual(
@ -204,7 +206,9 @@ class TestSharder(BaseTestSharder):
'recon_candidates_limit': 10,
'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0
'existing_shard_replication_quorum': 0,
'max_shrinking': 5,
'max_expanding': 4
}
expected = {
'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010,
@ -227,7 +231,9 @@ class TestSharder(BaseTestSharder):
'recon_candidates_limit': 10,
'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0
'existing_shard_replication_quorum': 0,
'max_shrinking': 5,
'max_expanding': 4
}
sharder, mock_ic = do_test(conf, expected)
mock_ic.assert_called_once_with(
@ -262,13 +268,11 @@ class TestSharder(BaseTestSharder):
do_test({'shard_shrink_point': 101}, {})
self.assertIn(
'greater than 0, less than 100, not "101"', str(cm.exception))
self.assertIn('shard_shrink_point', str(cm.exception))
with self.assertRaises(ValueError) as cm:
do_test({'shard_shrink_merge_point': 101}, {})
self.assertIn(
'greater than 0, less than 100, not "101"', str(cm.exception))
self.assertIn('shard_shrink_merge_point', str(cm.exception))
def test_init_internal_client_conf_loading_error(self):
with mock.patch('swift.common.db_replicator.ring.Ring') \