diff --git a/doc/source/config/container_server_config.rst b/doc/source/config/container_server_config.rst index 36de823834..acbb32310c 100644 --- a/doc/source/config/container_server_config.rst +++ b/doc/source/config/container_server_config.rst @@ -326,7 +326,7 @@ shard_container_threshold 1000000 When auto-sharding is shrinking and merging shard containers. -shard_shrink_point 5 When auto-sharding is +shard_shrink_point 10 When auto-sharding is enabled this defines the object count below which a 'donor' shard container @@ -338,7 +338,7 @@ shard_shrink_point 5 When auto-sharding is percentage of shard_container_threshold e.g. the default value of - 5 means 5% of the + 10 means 10% of the shard_container_threshold. shard_shrink_merge_point 75 When auto-sharding is diff --git a/etc/container-server.conf-sample b/etc/container-server.conf-sample index 09f3bc7e97..d03217cbb8 100644 --- a/etc/container-server.conf-sample +++ b/etc/container-server.conf-sample @@ -377,7 +377,7 @@ use = egg:swift#xprofile # another 'acceptor' shard container. shard_shrink_point is a percentage of # shard_container_threshold e.g. the default value of 5 means 5% of the # shard_container_threshold. -# shard_shrink_point = 5 +# shard_shrink_point = 10 # # When auto-sharding is enabled shard_shrink_merge_point defines the maximum # allowed size of an acceptor shard container after having a donor merged into @@ -407,6 +407,16 @@ use = egg:swift#xprofile # sharding container and merged to a shard container during cleaving. # cleave_row_batch_size = 10000 # +# max_expanding defines the maximum number of shards that could be expanded in a +# single cycle of the sharder. Defaults to unlimited (-1). +# max_expanding = -1 +# +# max_shrinking defines the maximum number of shards that should be shrunk into +# each expanding shard. Defaults to 1. +# NOTE: Using values greater than 1 may result in temporary gaps in object listings +# until all selected shards have shrunk. +# max_shrinking = 1 +# # Defines the number of successfully replicated shard dbs required when # cleaving a previously uncleaved shard range before the sharder will progress # to the next shard range. The value should be less than or equal to the diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py index 787d2c2a29..43efe57fa6 100644 --- a/swift/cli/manage_shard_ranges.py +++ b/swift/cli/manage_shard_ranges.py @@ -164,16 +164,19 @@ import time from six.moves import input -from swift.common.utils import Timestamp, get_logger, ShardRange +from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \ + config_percent_value, config_positive_int_value from swift.container.backend import ContainerBroker, UNSHARDED from swift.container.sharder import make_shard_ranges, sharding_enabled, \ CleavingContext, process_compactible_shard_sequences, \ - find_compactible_shard_sequences, find_overlapping_ranges + find_compactible_shard_sequences, find_overlapping_ranges, \ + DEFAULT_MAX_SHRINKING, DEFAULT_MAX_EXPANDING, \ + DEFAULT_SHARD_CONTAINER_THRESHOLD, DEFAULT_SHARD_SHRINK_POINT, \ + DEFAULT_SHARD_MERGE_POINT -DEFAULT_ROWS_PER_SHARD = 500000 -DEFAULT_SHRINK_THRESHOLD = 10000 -DEFAULT_MAX_SHRINKING = 1 -DEFAULT_MAX_EXPANDING = -1 +DEFAULT_ROWS_PER_SHARD = DEFAULT_SHARD_CONTAINER_THRESHOLD // 2 +DEFAULT_SHRINK_THRESHOLD = DEFAULT_SHARD_CONTAINER_THRESHOLD * \ + config_percent_value(DEFAULT_SHARD_SHRINK_POINT) def _print_shard_range(sr, level=0): @@ -489,7 +492,7 @@ def _positive_int(arg): def _add_find_args(parser): parser.add_argument('rows_per_shard', nargs='?', type=int, - default=DEFAULT_ROWS_PER_SHARD) + default=None) def _add_replace_args(parser): @@ -516,6 +519,9 @@ def _add_enable_args(parser): def _make_parser(): parser = argparse.ArgumentParser(description='Manage shard ranges') parser.add_argument('container_db') + parser.add_argument('--config', dest='conf_file', required=False, + help='Path to config file with [container-sharder] ' + 'section') parser.add_argument('--verbose', '-v', action='count', default=0, help='Increase output verbosity') subparsers = parser.add_subparsers( @@ -589,13 +595,13 @@ def _make_parser(): help='Apply shard range changes to broker without prompting.') compact_parser.add_argument('--shrink-threshold', nargs='?', type=_positive_int, - default=DEFAULT_SHRINK_THRESHOLD, + default=None, help='The number of rows below which a shard ' 'can qualify for shrinking. Defaults to ' '%d' % DEFAULT_SHRINK_THRESHOLD) compact_parser.add_argument('--expansion-limit', nargs='?', type=_positive_int, - default=DEFAULT_ROWS_PER_SHARD, + default=None, help='Maximum number of rows for an expanding ' 'shard to have after compaction has ' 'completed. Defaults to %d' % @@ -608,7 +614,7 @@ def _make_parser(): # temporary gap(s) in object listings where the shrunk donors are missing. compact_parser.add_argument('--max-shrinking', nargs='?', type=_positive_int, - default=DEFAULT_MAX_SHRINKING, + default=None, help='Maximum number of shards that should be ' 'shrunk into each expanding shard. ' 'Defaults to 1. Using values greater ' @@ -617,7 +623,7 @@ def _make_parser(): 'shards have shrunk.') compact_parser.add_argument('--max-expanding', nargs='?', type=_positive_int, - default=DEFAULT_MAX_EXPANDING, + default=None, help='Maximum number of shards that should be ' 'expanded. Defaults to unlimited.') compact_parser.set_defaults(func=compact_shard_ranges) @@ -637,7 +643,47 @@ def main(args=None): parser.print_help() print('\nA sub-command is required.') return 1 - logger = get_logger({}, name='ContainerBroker', log_to_console=True) + conf = {} + rows_per_shard = DEFAULT_ROWS_PER_SHARD + shrink_threshold = DEFAULT_SHRINK_THRESHOLD + expansion_limit = DEFAULT_ROWS_PER_SHARD + if args.conf_file: + try: + conf = readconf(args.conf_file, 'container-sharder') + shard_container_threshold = config_positive_int_value(conf.get( + 'shard_container_threshold', + DEFAULT_SHARD_CONTAINER_THRESHOLD)) + if shard_container_threshold: + rows_per_shard = shard_container_threshold // 2 + shrink_threshold = int( + shard_container_threshold * config_percent_value( + conf.get('shard_shrink_point', + DEFAULT_SHARD_SHRINK_POINT))) + expansion_limit = int( + shard_container_threshold * config_percent_value( + conf.get('shard_shrink_merge_point', + DEFAULT_SHARD_MERGE_POINT))) + except Exception as exc: + print('Error opening config file %s: %s' % (args.conf_file, exc), + file=sys.stderr) + return 2 + + # seems having sub parsers mean sometimes an arg wont exist in the args + # namespace. But we can check if it is with the 'in' statement. + if "max_shrinking" in args and args.max_shrinking is None: + args.max_shrinking = int(conf.get( + "max_shrinking", DEFAULT_MAX_SHRINKING)) + if "max_expanding" in args and args.max_expanding is None: + args.max_expanding = int(conf.get( + "max_expanding", DEFAULT_MAX_EXPANDING)) + if "shrink_threshold" in args and args.shrink_threshold is None: + args.shrink_threshold = shrink_threshold + if "expansion_limit" in args and args.expansion_limit is None: + args.expansion_limit = expansion_limit + if "rows_per_shard" in args and args.rows_per_shard is None: + args.rows_per_shard = rows_per_shard + + logger = get_logger(conf, name='ContainerBroker', log_to_console=True) broker = ContainerBroker(os.path.realpath(args.container_db), logger=logger, skip_commits=True) try: diff --git a/swift/common/utils.py b/swift/common/utils.py index 56a297839f..a23afffa8f 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -479,6 +479,13 @@ def config_auto_int_value(value, default): return value +def config_percent_value(value): + try: + return config_float_value(value, 0, 100) / 100.0 + except ValueError as err: + raise ValueError("%s: %s" % (str(err), value)) + + def append_underscore(prefix): if prefix and not prefix.endswith('_'): prefix += '_' diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 357792c6ad..9a7b7e90d5 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -35,9 +35,8 @@ from swift.common.ring.utils import is_local_device from swift.common.swob import str_to_wsgi from swift.common.utils import get_logger, config_true_value, \ dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \ - config_float_value, config_positive_int_value, \ - quorum_size, parse_override_options, Everything, config_auto_int_value, \ - ShardRangeList + config_positive_int_value, quorum_size, parse_override_options, \ + Everything, config_auto_int_value, ShardRangeList, config_percent_value from swift.container.backend import ContainerBroker, \ RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \ SHARD_UPDATE_STATES @@ -469,8 +468,10 @@ class CleavingContext(object): DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000 -DEFAULT_SHARD_SHRINK_POINT = 25 +DEFAULT_SHARD_SHRINK_POINT = 10 DEFAULT_SHARD_MERGE_POINT = 75 +DEFAULT_MAX_SHRINKING = 1 +DEFAULT_MAX_EXPANDING = -1 class ContainerSharder(ContainerReplicator): @@ -491,18 +492,10 @@ class ContainerSharder(ContainerReplicator): else: auto_create_account_prefix = AUTO_CREATE_ACCOUNT_PREFIX self.shards_account_prefix = (auto_create_account_prefix + 'shards_') - - def percent_value(key, default): - try: - value = conf.get(key, default) - return config_float_value(value, 0, 100) / 100.0 - except ValueError as err: - raise ValueError("%s: %s" % (str(err), key)) - - self.shard_shrink_point = percent_value('shard_shrink_point', - DEFAULT_SHARD_SHRINK_POINT) - self.shrink_merge_point = percent_value('shard_shrink_merge_point', - DEFAULT_SHARD_MERGE_POINT) + self.shard_shrink_point = config_percent_value( + conf.get('shard_shrink_point', DEFAULT_SHARD_SHRINK_POINT)) + self.shrink_merge_point = config_percent_value( + conf.get('shard_shrink_merge_point', DEFAULT_SHARD_MERGE_POINT)) self.shard_container_threshold = config_positive_int_value( conf.get('shard_container_threshold', DEFAULT_SHARD_CONTAINER_THRESHOLD)) @@ -517,6 +510,10 @@ class ContainerSharder(ContainerReplicator): conf.get('cleave_batch_size', 2)) self.cleave_row_batch_size = config_positive_int_value( conf.get('cleave_row_batch_size', 10000)) + self.max_shrinking = int(conf.get('max_shrinking', + DEFAULT_MAX_SHRINKING)) + self.max_expanding = int(conf.get('max_expanding', + DEFAULT_MAX_EXPANDING)) self.auto_shard = config_true_value(conf.get('auto_shard', False)) self.sharding_candidates = [] self.shrinking_candidates = [] @@ -627,7 +624,7 @@ class ContainerSharder(ContainerReplicator): def _identify_shrinking_candidate(self, broker, node): sequences = find_compactible_shard_sequences( broker, self.shrink_size, self.merge_size, - 1, -1) + self.max_shrinking, self.max_expanding) # compactible_ranges are all apart from final acceptor in each sequence compactible_ranges = sum(len(seq) - 1 for seq in sequences) @@ -1694,8 +1691,8 @@ class ContainerSharder(ContainerReplicator): return compactible_sequences = find_compactible_shard_sequences( - broker, self.shrink_size, self.merge_size, 1, -1, - include_shrinking=True) + broker, self.shrink_size, self.merge_size, self.max_shrinking, + self.max_expanding, include_shrinking=True) self.logger.debug('Found %s compactible sequences of length(s) %s' % (len(compactible_sequences), [len(s) for s in compactible_sequences])) diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index e24a471a92..0937c43418 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -13,6 +13,8 @@ import json import os import unittest +from argparse import Namespace +from textwrap import dedent import mock from shutil import rmtree @@ -93,6 +95,168 @@ class TestManageShardRanges(unittest.TestCase): broker.merge_shard_ranges([own_sr]) return epoch + def test_conf_file_options(self): + db_file = os.path.join(self.testdir, 'hash.db') + broker = ContainerBroker(db_file, account='a', container='c') + broker.initialize() + + conf = """ + [container-sharder] + shard_shrink_point = 15 + shard_shrink_merge_point = 65 + shard_container_threshold = 1000 + max_shrinking = 33 + max_expanding = 31 + """ + + conf_file = os.path.join(self.testdir, 'sharder.conf') + with open(conf_file, 'w') as fd: + fd.write(dedent(conf)) + + # default values + with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked: + main([db_file, 'find']) + expected = Namespace(conf_file=None, + container_db=mock.ANY, + func=mock.ANY, + rows_per_shard=500000, + subcommand='find', + verbose=0) + mocked.assert_called_once_with(mock.ANY, expected) + + # conf file + with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked: + main([db_file, '--config', conf_file, 'find']) + expected = Namespace(conf_file=conf_file, + container_db=mock.ANY, + func=mock.ANY, + rows_per_shard=500, + subcommand='find', + verbose=0) + mocked.assert_called_once_with(mock.ANY, expected) + + # cli options override conf file + with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked: + main([db_file, '--config', conf_file, 'find', '12345']) + expected = Namespace(conf_file=conf_file, + container_db=mock.ANY, + func=mock.ANY, + rows_per_shard=12345, + subcommand='find', + verbose=0) + mocked.assert_called_once_with(mock.ANY, expected) + + # default values + with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \ + as mocked: + main([db_file, 'compact']) + expected = Namespace(conf_file=None, + container_db=mock.ANY, + func=mock.ANY, + subcommand='compact', + verbose=0, + max_expanding=-1, + max_shrinking=1, + shrink_threshold=100000, + expansion_limit=500000, + yes=False) + mocked.assert_called_once_with(mock.ANY, expected) + + # conf file + with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \ + as mocked: + main([db_file, '--config', conf_file, 'compact']) + expected = Namespace(conf_file=conf_file, + container_db=mock.ANY, + func=mock.ANY, + subcommand='compact', + verbose=0, + max_expanding=31, + max_shrinking=33, + shrink_threshold=150, + expansion_limit=650, + yes=False) + mocked.assert_called_once_with(mock.ANY, expected) + + # conf file - small percentages resulting in zero absolute values + # should be respected rather than falling back to defaults, to avoid + # nasty surprises + conf = """ + [container-sharder] + shard_shrink_point = 1 + shard_shrink_merge_point = 2 + shard_container_threshold = 10 + max_shrinking = 33 + max_expanding = 31 + """ + conf_file = os.path.join(self.testdir, 'sharder.conf') + with open(conf_file, 'w') as fd: + fd.write(dedent(conf)) + + with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \ + as mocked: + main([db_file, '--config', conf_file, 'compact']) + expected = Namespace(conf_file=conf_file, + container_db=mock.ANY, + func=mock.ANY, + subcommand='compact', + verbose=0, + max_expanding=31, + max_shrinking=33, + shrink_threshold=0, + expansion_limit=0, + yes=False) + mocked.assert_called_once_with(mock.ANY, expected) + + # cli options + with mock.patch('swift.cli.manage_shard_ranges.compact_shard_ranges') \ + as mocked: + main([db_file, '--config', conf_file, 'compact', + '--max-shrinking', '22', + '--max-expanding', '11', + '--expansion-limit', '3456', + '--shrink-threshold', '1234']) + expected = Namespace(conf_file=conf_file, + container_db=mock.ANY, + func=mock.ANY, + subcommand='compact', + verbose=0, + max_expanding=11, + max_shrinking=22, + shrink_threshold=1234, + expansion_limit=3456, + yes=False) + mocked.assert_called_once_with(mock.ANY, expected) + + # conf file - invalid value for shard_container_threshold + conf = """ + [container-sharder] + shard_shrink_point = 1 + shard_shrink_merge_point = 2 + shard_container_threshold = 0 + max_shrinking = 33 + max_expanding = 31 + """ + conf_file = os.path.join(self.testdir, 'sharder.conf') + with open(conf_file, 'w') as fd: + fd.write(dedent(conf)) + + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + main([db_file, '--config', conf_file, 'compact']) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Error opening config file') + + # conf file - cannot open conf file + conf_file = os.path.join(self.testdir, 'missing_sharder.conf') + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + main([db_file, '--config', conf_file, 'compact']) + err_lines = err.getvalue().split('\n') + self.assert_starts_with(err_lines[0], 'Error opening config file') + def test_find_shard_ranges(self): db_file = os.path.join(self.testdir, 'hash.db') broker = ContainerBroker(db_file) diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index cc2785d2fe..af4435c0dd 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -3048,6 +3048,30 @@ cluster_dfw1 = http://dfw1.host/v1/ self.assertIn('greater than %s' % minimum, cm.exception.args[0]) self.assertIn('less than %s' % maximum, cm.exception.args[0]) + def test_config_percent_value(self): + for arg, expected in ( + (99, 0.99), + (25.5, 0.255), + ('99', 0.99), + ('25.5', 0.255), + (0, 0.0), + ('0', 0.0), + ('100', 1.0), + (100, 1.0), + (1, 0.01), + ('1', 0.01), + (25, 0.25)): + actual = utils.config_percent_value(arg) + self.assertEqual(expected, actual) + + # bad values + for val in (-1, '-1', 101, '101'): + with self.assertRaises(ValueError) as cm: + utils.config_percent_value(val) + self.assertIn('Config option must be a number, greater than 0, ' + 'less than 100, not "{}"'.format(val), + cm.exception.args[0]) + def test_config_auto_int_value(self): expectations = { # (value, default) : expected, diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 1aab95e032..a8c65c23ef 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -160,7 +160,7 @@ class TestSharder(BaseTestSharder): 'rsync_compress': False, 'rsync_module': '{replication_ip}::container', 'reclaim_age': 86400 * 7, - 'shard_shrink_point': 0.25, + 'shard_shrink_point': 0.10, 'shrink_merge_point': 0.75, 'shard_container_threshold': 1000000, 'split_size': 500000, @@ -172,7 +172,9 @@ class TestSharder(BaseTestSharder): 'recon_candidates_limit': 5, 'recon_sharded_timeout': 43200, 'shard_replication_quorum': 2, - 'existing_shard_replication_quorum': 2 + 'existing_shard_replication_quorum': 2, + 'max_shrinking': 1, + 'max_expanding': -1 } sharder, mock_ic = do_test({}, expected, logger=None) self.assertEqual( @@ -204,7 +206,9 @@ class TestSharder(BaseTestSharder): 'recon_candidates_limit': 10, 'recon_sharded_timeout': 7200, 'shard_replication_quorum': 1, - 'existing_shard_replication_quorum': 0 + 'existing_shard_replication_quorum': 0, + 'max_shrinking': 5, + 'max_expanding': 4 } expected = { 'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010, @@ -227,7 +231,9 @@ class TestSharder(BaseTestSharder): 'recon_candidates_limit': 10, 'recon_sharded_timeout': 7200, 'shard_replication_quorum': 1, - 'existing_shard_replication_quorum': 0 + 'existing_shard_replication_quorum': 0, + 'max_shrinking': 5, + 'max_expanding': 4 } sharder, mock_ic = do_test(conf, expected) mock_ic.assert_called_once_with( @@ -262,13 +268,11 @@ class TestSharder(BaseTestSharder): do_test({'shard_shrink_point': 101}, {}) self.assertIn( 'greater than 0, less than 100, not "101"', str(cm.exception)) - self.assertIn('shard_shrink_point', str(cm.exception)) with self.assertRaises(ValueError) as cm: do_test({'shard_shrink_merge_point': 101}, {}) self.assertIn( 'greater than 0, less than 100, not "101"', str(cm.exception)) - self.assertIn('shard_shrink_merge_point', str(cm.exception)) def test_init_internal_client_conf_loading_error(self): with mock.patch('swift.common.db_replicator.ring.Ring') \