diff --git a/swift/cli/manage_shard_ranges.py b/swift/cli/manage_shard_ranges.py index 9bfd89e543..33e870d455 100644 --- a/swift/cli/manage_shard_ranges.py +++ b/swift/cli/manage_shard_ranges.py @@ -168,7 +168,8 @@ from six.moves import input from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \ ShardRangeList, non_negative_int, config_positive_int_value -from swift.container.backend import ContainerBroker, UNSHARDED +from swift.container.backend import ContainerBroker, UNSHARDED, \ + sift_shard_ranges from swift.container.sharder import make_shard_ranges, sharding_enabled, \ CleavingContext, process_compactible_shard_sequences, \ find_compactible_shard_sequences, find_overlapping_ranges, \ @@ -427,6 +428,61 @@ def delete_shard_ranges(broker, args): return EXIT_SUCCESS +def combine_shard_ranges(new_shard_ranges, existing_shard_ranges): + """ + Combines new and existing shard ranges based on most recent state. + + :param new_shard_ranges: a list of ShardRange instances. + :param existing_shard_ranges: a list of ShardRange instances. + :return: a list of ShardRange instances. + """ + new_shard_ranges = [dict(sr) for sr in new_shard_ranges] + existing_shard_ranges = [dict(sr) for sr in existing_shard_ranges] + to_add, to_delete = sift_shard_ranges( + new_shard_ranges, + dict((sr['name'], sr) for sr in existing_shard_ranges)) + result = [ShardRange.from_dict(existing) + for existing in existing_shard_ranges + if existing['name'] not in to_delete] + result.extend([ShardRange.from_dict(sr) for sr in to_add]) + return sorted([sr for sr in result if not sr.deleted], + key=ShardRange.sort_key) + + +def merge_shard_ranges(broker, args): + _check_own_shard_range(broker, args) + shard_data = _load_and_validate_shard_data(args, require_index=False) + new_shard_ranges = ShardRangeList([ShardRange.from_dict(sr) + for sr in shard_data]) + new_shard_ranges.sort(key=ShardRange.sort_key) + + # do some checks before merging... + existing_shard_ranges = ShardRangeList( + broker.get_shard_ranges(include_deleted=True)) + outcome = combine_shard_ranges(new_shard_ranges, existing_shard_ranges) + if args.verbose: + print('This change will result in the following shard ranges in the ' + 'affected namespace:') + print(json.dumps([dict(sr) for sr in outcome], indent=2)) + overlaps = find_overlapping_ranges(outcome) + if overlaps: + print('WARNING: this change will result in shard ranges overlaps!') + paths_with_gaps = find_paths_with_gaps(outcome) + gaps = [gap for start_path, gap, end_path in paths_with_gaps + if existing_shard_ranges.includes(gap)] + if gaps: + print('WARNING: this change will result in shard ranges gaps!') + + if not _proceed(args): + return EXIT_USER_QUIT + + with broker.updated_timeout(args.replace_timeout): + broker.merge_shard_ranges(new_shard_ranges) + print('Injected %d shard ranges.' % len(new_shard_ranges)) + print('Run container-replicator to replicate them to other nodes.') + return EXIT_SUCCESS + + def _replace_shard_ranges(broker, args, shard_data, timeout=0): own_shard_range = _check_own_shard_range(broker, args) shard_ranges = make_shard_ranges( @@ -957,6 +1013,22 @@ def _make_parser(): 'info', help='Print container db info') info_parser.set_defaults(func=db_info) + # merge + merge_parser = subparsers.add_parser( + 'merge', + help='Merge shard range(s) from file with existing shard ranges. This ' + 'subcommand should only be used if you are confident that you ' + 'know what you are doing. Shard ranges should not typically be ' + 'modified in this way.') + merge_parser.add_argument('input', metavar='input_file', + type=str, help='Name of file') + merge_parser.add_argument( + '--replace-timeout', type=int, default=600, + help='Minimum DB timeout to use when merging shard ranges.') + _add_account_prefix_arg(merge_parser) + _add_prompt_args(merge_parser) + merge_parser.set_defaults(func=merge_shard_ranges) + # replace replace_parser = subparsers.add_parser( 'replace', diff --git a/swift/container/backend.py b/swift/container/backend.py index eaabc2c52b..9806161930 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -315,6 +315,38 @@ def merge_shards(shard_data, existing): return new_content +def sift_shard_ranges(new_shard_ranges, existing_shard_ranges): + """ + Compares new and existing shard ranges, updating the new shard ranges with + any more recent state from the existing, and returns shard ranges sorted + into those that need adding because they contain new or updated state and + those that need deleting because their state has been superseded. + + :param new_shard_ranges: a list of dicts, each of which represents a shard + range. + :param existing_shard_ranges: a dict mapping shard range names to dicts + representing a shard range. + :return: a tuple (to_add, to_delete); to_add is a list of dicts, each of + which represents a shard range that is to be added to the existing + shard ranges; to_delete is a set of shard range names that are to be + deleted. + """ + to_delete = set() + to_add = {} + for item in new_shard_ranges: + item_ident = item['name'] + existing = existing_shard_ranges.get(item_ident) + if merge_shards(item, existing): + # exists with older timestamp + if item_ident in existing_shard_ranges: + to_delete.add(item_ident) + # duplicate entries in item_list + if (item_ident not in to_add or + merge_shards(item, to_add[item_ident])): + to_add[item_ident] = item + return to_add.values(), to_delete + + class ContainerBroker(DatabaseBroker): """ Encapsulates working with a container database. @@ -1421,28 +1453,14 @@ class ContainerBroker(DatabaseBroker): chunk = [record['name'] for record in item_list[offset:offset + SQLITE_ARG_LIMIT]] records.update( - (rec[0], rec) for rec in curs.execute( + (rec[0], dict(zip(SHARD_RANGE_KEYS, rec))) + for rec in curs.execute( 'SELECT %s FROM %s ' 'WHERE deleted IN (0, 1) AND name IN (%s)' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, ','.join('?' * len(chunk))), chunk)) - # Sort item_list into things that need adding and deleting - to_delete = set() - to_add = {} - for item in item_list: - item_ident = item['name'] - existing = records.get(item_ident) - if existing: - existing = dict(zip(SHARD_RANGE_KEYS, existing)) - if merge_shards(item, existing): - # exists with older timestamp - if item_ident in records: - to_delete.add(item_ident) - # duplicate entries in item_list - if (item_ident not in to_add or - merge_shards(item, to_add[item_ident])): - to_add[item_ident] = item + to_add, to_delete = sift_shard_ranges(item_list, records) if to_delete: curs.executemany( @@ -1455,7 +1473,7 @@ class ContainerBroker(DatabaseBroker): 'INSERT INTO %s (%s) VALUES (%s)' % (SHARD_RANGE_TABLE, ','.join(SHARD_RANGE_KEYS), vals), tuple([item[k] for k in SHARD_RANGE_KEYS] - for item in to_add.values())) + for item in to_add)) conn.commit() migrations = { diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 469b8da59d..1071bfa448 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -24,7 +24,7 @@ from tempfile import mkdtemp import six from six.moves import cStringIO as StringIO -from swift.cli.manage_shard_ranges import main +from swift.cli.manage_shard_ranges import main, combine_shard_ranges from swift.common import utils from swift.common.utils import Timestamp, ShardRange from swift.container.backend import ContainerBroker @@ -730,6 +730,230 @@ class TestManageShardRanges(unittest.TestCase): self.assertEqual(expected, err.getvalue().splitlines()) self.assertEqual(expected_shard_ranges[:1], json.loads(out.getvalue())) + def test_merge(self): + broker = self._make_broker() + broker.update_metadata({'X-Container-Sysmeta-Sharding': + (True, Timestamp.now().internal)}) + good_shard_ranges = [] + for shard in self.shard_data[:3]: + good_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=shard['lower'], + upper=shard['upper'])) + # insert an overlap.. + bad_shard_range = ShardRange( + name='a/c_bad_' + self.shard_data[1]['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=self.shard_data[1]['lower'], + upper=self.shard_data[2]['upper']) + broker.merge_shard_ranges(good_shard_ranges + [bad_shard_range]) + self.assertEqual( + [('', 'obj09'), + ('obj09', 'obj19'), + ('obj09', 'obj29'), + ('obj19', 'obj29')], + [(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()]) + + # use command to merge in a deleted version of the bad shard range + bad_shard_range.update_state(ShardRange.SHRUNK, + state_timestamp=next(self.ts_iter)) + bad_shard_range.set_deleted(next(self.ts_iter)) + bad_shard_range.update_meta(0, 0, next(self.ts_iter)) + input_file = os.path.join(self.testdir, 'shards') + with open(input_file, 'w') as fd: + json.dump([dict(bad_shard_range)], fd) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, '-v', 'merge', input_file, + '--replace-timeout', '1', '--yes']) + self.assertEqual(0, ret) + affected_shard_ranges = [dict(sr) for sr in good_shard_ranges] + expected_msg = [ + 'This change will result in the following shard ranges in the ' + 'affected namespace:'] + expected_msg.extend( + json.dumps(affected_shard_ranges, indent=2).splitlines()) + expected_msg.extend( + ['Injected 1 shard ranges.', + 'Run container-replicator to replicate them to other nodes.']) + self.assertEqual(expected_msg, out.getvalue().splitlines()) + self.assertEqual(['Loaded db broker for a/c'], + err.getvalue().splitlines()) + self.assertEqual( + [dict(sr) for sr in good_shard_ranges], + [dict(sr) for sr in broker.get_shard_ranges()]) + self.assertEqual( + dict(bad_shard_range), + dict(broker.get_shard_ranges(include_deleted=True)[3])) + + def test_merge_fills_gap(self): + broker = self._make_broker() + broker.update_metadata({'X-Container-Sysmeta-Sharding': + (True, Timestamp.now().internal)}) + old_shard_ranges = [] + for shard in self.shard_data[:1]: + old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=shard['lower'], + upper=shard['upper'])) + + # use command to merge in a deleted version of the existing and two + # new ranges + new_shard_ranges = [ + old_shard_ranges[0].copy(deleted=True, + timestamp=next(self.ts_iter)), + ShardRange( + name='a/c_1_' + self.shard_data[0]['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=self.shard_data[0]['lower'], + upper=self.shard_data[0]['upper'] + 'a'), + ShardRange( + name='a/c_1_' + self.shard_data[0]['upper'] + 'a', + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=self.shard_data[0]['upper'] + 'a', + upper=self.shard_data[1]['upper'] + 'a'), + ] + + input_file = os.path.join(self.testdir, 'shards') + with open(input_file, 'w') as fd: + json.dump([dict(sr) for sr in new_shard_ranges], fd) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, '-v', 'merge', input_file, + '--replace-timeout', '1', '--yes']) + self.assertEqual(0, ret) + affected_shard_ranges = [dict(sr) for sr in new_shard_ranges[1:]] + expected_msg = [ + 'This change will result in the following shard ranges in the ' + 'affected namespace:'] + expected_msg.extend( + json.dumps(affected_shard_ranges, indent=2).splitlines()) + expected_msg.extend( + ['Injected 3 shard ranges.', + 'Run container-replicator to replicate them to other nodes.']) + self.assertEqual(expected_msg, out.getvalue().splitlines()) + self.assertEqual(['Loaded db broker for a/c'], + err.getvalue().splitlines()) + self.assertEqual( + [dict(sr) for sr in new_shard_ranges[1:]], + [dict(sr) for sr in broker.get_shard_ranges()]) + self.assertEqual( + [dict(sr) for sr in new_shard_ranges], + [dict(sr) for sr in broker.get_shard_ranges(include_deleted=True)]) + + def test_merge_warns_of_overlap(self): + broker = self._make_broker() + broker.update_metadata({'X-Container-Sysmeta-Sharding': + (True, Timestamp.now().internal)}) + old_shard_ranges = [] + for shard in self.shard_data[:3]: + old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=shard['lower'], + upper=shard['upper'])) + broker.merge_shard_ranges(old_shard_ranges) + + # use command to merge in a new range that overlaps... + new_shard_range = ShardRange( + name='a/c_bad_' + self.shard_data[1]['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=self.shard_data[1]['lower'] + 'a', + upper=self.shard_data[1]['upper']) + input_file = os.path.join(self.testdir, 'shards') + with open(input_file, 'w') as fd: + json.dump([dict(new_shard_range)], fd) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, '-v', 'merge', input_file, + '--replace-timeout', '1', '-n']) + self.assertEqual(3, ret) + affected_shard_ranges = [ + dict(sr) for sr in [old_shard_ranges[0], old_shard_ranges[1], + new_shard_range, old_shard_ranges[2]]] + expected_msg = [ + 'This change will result in the following shard ranges in the ' + 'affected namespace:'] + expected_msg.extend( + json.dumps(affected_shard_ranges, indent=2).splitlines()) + expected_msg.extend( + ['WARNING: this change will result in shard ranges overlaps!', + 'No changes applied']) + self.assertEqual(expected_msg, out.getvalue().splitlines()) + self.assertEqual(['Loaded db broker for a/c'], + err.getvalue().splitlines()) + self.assertEqual( + [dict(sr) for sr in old_shard_ranges], + [dict(sr) for sr in broker.get_shard_ranges()]) + + # repeat without -v flag + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, 'merge', input_file, + '--replace-timeout', '1', '-n']) + self.assertEqual(3, ret) + expected_msg = [ + 'WARNING: this change will result in shard ranges overlaps!', + 'No changes applied'] + self.assertEqual(expected_msg, out.getvalue().splitlines()) + self.assertEqual(['Loaded db broker for a/c'], + err.getvalue().splitlines()) + self.assertEqual( + [dict(sr) for sr in old_shard_ranges], + [dict(sr) for sr in broker.get_shard_ranges()]) + + def test_merge_warns_of_gap(self): + broker = self._make_broker() + broker.update_metadata({'X-Container-Sysmeta-Sharding': + (True, Timestamp.now().internal)}) + old_shard_ranges = [] + for shard in self.shard_data[:3]: + old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'], + timestamp=next(self.ts_iter), + state=ShardRange.ACTIVE, + lower=shard['lower'], + upper=shard['upper'])) + broker.merge_shard_ranges(old_shard_ranges) + + # use command to merge in a deleted range that creates a gap... + new_shard_range = old_shard_ranges[1].copy( + timestamp=next(self.ts_iter), deleted=True) + input_file = os.path.join(self.testdir, 'shards') + with open(input_file, 'w') as fd: + json.dump([dict(new_shard_range)], fd) + out = StringIO() + err = StringIO() + with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err): + ret = main([broker.db_file, '-v', 'merge', input_file, + '--replace-timeout', '1', '-n']) + self.assertEqual(3, ret) + affected_shard_ranges = [ + dict(sr) for sr in [old_shard_ranges[0], old_shard_ranges[2]]] + expected_msg = [ + 'This change will result in the following shard ranges in the ' + 'affected namespace:'] + expected_msg.extend( + json.dumps(affected_shard_ranges, indent=2).splitlines()) + expected_msg.extend( + ['WARNING: this change will result in shard ranges gaps!', + 'No changes applied']) + self.assertEqual(expected_msg, out.getvalue().splitlines()) + self.assertEqual(['Loaded db broker for a/c'], + err.getvalue().splitlines()) + self.assertEqual( + [dict(sr) for sr in old_shard_ranges], + [dict(sr) for sr in broker.get_shard_ranges()]) + def test_replace(self): broker = self._make_broker() broker.update_metadata({'X-Container-Sysmeta-Sharding': @@ -2523,3 +2747,46 @@ class TestManageShardRanges(unittest.TestCase): self.assertIn( "argument --yes/-y: not allowed with argument --dry-run/-n", err_lines[-2], err_lines) + + def test_combine_shard_ranges(self): + ts_iter = make_timestamp_iter() + this = ShardRange('a/o', next(ts_iter).internal) + that = ShardRange('a/o', next(ts_iter).internal) + actual = combine_shard_ranges([dict(this)], [dict(that)]) + self.assertEqual([dict(that)], [dict(sr) for sr in actual]) + actual = combine_shard_ranges([dict(that)], [dict(this)]) + self.assertEqual([dict(that)], [dict(sr) for sr in actual]) + + ts = next(ts_iter).internal + this = ShardRange('a/o', ts, state=ShardRange.ACTIVE, + state_timestamp=next(ts_iter)) + that = ShardRange('a/o', ts, state=ShardRange.CREATED, + state_timestamp=next(ts_iter)) + actual = combine_shard_ranges([dict(this)], [dict(that)]) + self.assertEqual([dict(that)], [dict(sr) for sr in actual]) + actual = combine_shard_ranges([dict(that)], [dict(this)]) + self.assertEqual([dict(that)], [dict(sr) for sr in actual]) + + that.update_meta(1, 2, meta_timestamp=next(ts_iter)) + this.update_meta(3, 4, meta_timestamp=next(ts_iter)) + expected = that.copy(object_count=this.object_count, + bytes_used=this.bytes_used, + meta_timestamp=this.meta_timestamp) + actual = combine_shard_ranges([dict(this)], [dict(that)]) + self.assertEqual([dict(expected)], [dict(sr) for sr in actual]) + actual = combine_shard_ranges([dict(that)], [dict(this)]) + self.assertEqual([dict(expected)], [dict(sr) for sr in actual]) + + this = ShardRange('a/o', next(ts_iter).internal) + that = ShardRange('a/o', next(ts_iter).internal, deleted=True) + actual = combine_shard_ranges([dict(this)], [dict(that)]) + self.assertFalse(actual, [dict(sr) for sr in actual]) + actual = combine_shard_ranges([dict(that)], [dict(this)]) + self.assertFalse(actual, [dict(sr) for sr in actual]) + + this = ShardRange('a/o', next(ts_iter).internal, deleted=True) + that = ShardRange('a/o', next(ts_iter).internal) + actual = combine_shard_ranges([dict(this)], [dict(that)]) + self.assertEqual([dict(that)], [dict(sr) for sr in actual]) + actual = combine_shard_ranges([dict(that)], [dict(this)]) + self.assertEqual([dict(that)], [dict(sr) for sr in actual]) diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 206f3f0fa8..526193f20f 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -36,7 +36,7 @@ import six from swift.common.exceptions import LockTimeout from swift.container.backend import ContainerBroker, \ update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \ - COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES + COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \ TombstoneReclaimer from swift.common.request_helpers import get_reserved_name @@ -6484,3 +6484,39 @@ class TestUpdateNewItemFromExisting(unittest.TestCase): for scenario in self.scenarios_when_some_new_item_wins: self._test_scenario(scenario, True) + + +class TestModuleFunctions(unittest.TestCase): + def test_sift_shard_ranges(self): + ts_iter = make_timestamp_iter() + existing_shards = {} + sr1 = dict(ShardRange('a/o', next(ts_iter).internal)) + sr2 = dict(ShardRange('a/o2', next(ts_iter).internal)) + new_shard_ranges = [sr1, sr2] + + # first empty existing shards will just add the shards + to_add, to_delete = sift_shard_ranges(new_shard_ranges, + existing_shards) + self.assertEqual(2, len(to_add)) + self.assertIn(sr1, to_add) + self.assertIn(sr2, to_add) + self.assertFalse(to_delete) + + # if there is a newer version in the existing shards then it won't be + # added to to_add + existing_shards['a/o'] = dict( + ShardRange('a/o', next(ts_iter).internal)) + to_add, to_delete = sift_shard_ranges(new_shard_ranges, + existing_shards) + self.assertEqual([sr2], list(to_add)) + self.assertFalse(to_delete) + + # But if a newer version is in new_shard_ranges then the old will be + # added to to_delete and new is added to to_add. + sr1['timestamp'] = next(ts_iter).internal + to_add, to_delete = sift_shard_ranges(new_shard_ranges, + existing_shards) + self.assertEqual(2, len(to_add)) + self.assertIn(sr1, to_add) + self.assertIn(sr2, to_add) + self.assertEqual({'a/o'}, to_delete)