swift-manage-shard-ranges: add 'merge' subcommand
Adds a subcommand to swift-manage-shard-ranges that allows arbitrary shard ranges to be read from a file and merged into a container DB. The file should be a JSON serialized list of dicts. The merge subcommand is only recommended for emergency shard range manipulation by expert users. Change-Id: Ic9ffcc042399f3834027a7935b64292d1fffe8d5
This commit is contained in:
parent
6504a5f59e
commit
d2edf6646d
|
@ -168,7 +168,8 @@ from six.moves import input
|
||||||
|
|
||||||
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
|
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
|
||||||
ShardRangeList, non_negative_int, config_positive_int_value
|
ShardRangeList, non_negative_int, config_positive_int_value
|
||||||
from swift.container.backend import ContainerBroker, UNSHARDED
|
from swift.container.backend import ContainerBroker, UNSHARDED, \
|
||||||
|
sift_shard_ranges
|
||||||
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
|
||||||
CleavingContext, process_compactible_shard_sequences, \
|
CleavingContext, process_compactible_shard_sequences, \
|
||||||
find_compactible_shard_sequences, find_overlapping_ranges, \
|
find_compactible_shard_sequences, find_overlapping_ranges, \
|
||||||
|
@ -427,6 +428,61 @@ def delete_shard_ranges(broker, args):
|
||||||
return EXIT_SUCCESS
|
return EXIT_SUCCESS
|
||||||
|
|
||||||
|
|
||||||
|
def combine_shard_ranges(new_shard_ranges, existing_shard_ranges):
|
||||||
|
"""
|
||||||
|
Combines new and existing shard ranges based on most recent state.
|
||||||
|
|
||||||
|
:param new_shard_ranges: a list of ShardRange instances.
|
||||||
|
:param existing_shard_ranges: a list of ShardRange instances.
|
||||||
|
:return: a list of ShardRange instances.
|
||||||
|
"""
|
||||||
|
new_shard_ranges = [dict(sr) for sr in new_shard_ranges]
|
||||||
|
existing_shard_ranges = [dict(sr) for sr in existing_shard_ranges]
|
||||||
|
to_add, to_delete = sift_shard_ranges(
|
||||||
|
new_shard_ranges,
|
||||||
|
dict((sr['name'], sr) for sr in existing_shard_ranges))
|
||||||
|
result = [ShardRange.from_dict(existing)
|
||||||
|
for existing in existing_shard_ranges
|
||||||
|
if existing['name'] not in to_delete]
|
||||||
|
result.extend([ShardRange.from_dict(sr) for sr in to_add])
|
||||||
|
return sorted([sr for sr in result if not sr.deleted],
|
||||||
|
key=ShardRange.sort_key)
|
||||||
|
|
||||||
|
|
||||||
|
def merge_shard_ranges(broker, args):
|
||||||
|
_check_own_shard_range(broker, args)
|
||||||
|
shard_data = _load_and_validate_shard_data(args, require_index=False)
|
||||||
|
new_shard_ranges = ShardRangeList([ShardRange.from_dict(sr)
|
||||||
|
for sr in shard_data])
|
||||||
|
new_shard_ranges.sort(key=ShardRange.sort_key)
|
||||||
|
|
||||||
|
# do some checks before merging...
|
||||||
|
existing_shard_ranges = ShardRangeList(
|
||||||
|
broker.get_shard_ranges(include_deleted=True))
|
||||||
|
outcome = combine_shard_ranges(new_shard_ranges, existing_shard_ranges)
|
||||||
|
if args.verbose:
|
||||||
|
print('This change will result in the following shard ranges in the '
|
||||||
|
'affected namespace:')
|
||||||
|
print(json.dumps([dict(sr) for sr in outcome], indent=2))
|
||||||
|
overlaps = find_overlapping_ranges(outcome)
|
||||||
|
if overlaps:
|
||||||
|
print('WARNING: this change will result in shard ranges overlaps!')
|
||||||
|
paths_with_gaps = find_paths_with_gaps(outcome)
|
||||||
|
gaps = [gap for start_path, gap, end_path in paths_with_gaps
|
||||||
|
if existing_shard_ranges.includes(gap)]
|
||||||
|
if gaps:
|
||||||
|
print('WARNING: this change will result in shard ranges gaps!')
|
||||||
|
|
||||||
|
if not _proceed(args):
|
||||||
|
return EXIT_USER_QUIT
|
||||||
|
|
||||||
|
with broker.updated_timeout(args.replace_timeout):
|
||||||
|
broker.merge_shard_ranges(new_shard_ranges)
|
||||||
|
print('Injected %d shard ranges.' % len(new_shard_ranges))
|
||||||
|
print('Run container-replicator to replicate them to other nodes.')
|
||||||
|
return EXIT_SUCCESS
|
||||||
|
|
||||||
|
|
||||||
def _replace_shard_ranges(broker, args, shard_data, timeout=0):
|
def _replace_shard_ranges(broker, args, shard_data, timeout=0):
|
||||||
own_shard_range = _check_own_shard_range(broker, args)
|
own_shard_range = _check_own_shard_range(broker, args)
|
||||||
shard_ranges = make_shard_ranges(
|
shard_ranges = make_shard_ranges(
|
||||||
|
@ -957,6 +1013,22 @@ def _make_parser():
|
||||||
'info', help='Print container db info')
|
'info', help='Print container db info')
|
||||||
info_parser.set_defaults(func=db_info)
|
info_parser.set_defaults(func=db_info)
|
||||||
|
|
||||||
|
# merge
|
||||||
|
merge_parser = subparsers.add_parser(
|
||||||
|
'merge',
|
||||||
|
help='Merge shard range(s) from file with existing shard ranges. This '
|
||||||
|
'subcommand should only be used if you are confident that you '
|
||||||
|
'know what you are doing. Shard ranges should not typically be '
|
||||||
|
'modified in this way.')
|
||||||
|
merge_parser.add_argument('input', metavar='input_file',
|
||||||
|
type=str, help='Name of file')
|
||||||
|
merge_parser.add_argument(
|
||||||
|
'--replace-timeout', type=int, default=600,
|
||||||
|
help='Minimum DB timeout to use when merging shard ranges.')
|
||||||
|
_add_account_prefix_arg(merge_parser)
|
||||||
|
_add_prompt_args(merge_parser)
|
||||||
|
merge_parser.set_defaults(func=merge_shard_ranges)
|
||||||
|
|
||||||
# replace
|
# replace
|
||||||
replace_parser = subparsers.add_parser(
|
replace_parser = subparsers.add_parser(
|
||||||
'replace',
|
'replace',
|
||||||
|
|
|
@ -315,6 +315,38 @@ def merge_shards(shard_data, existing):
|
||||||
return new_content
|
return new_content
|
||||||
|
|
||||||
|
|
||||||
|
def sift_shard_ranges(new_shard_ranges, existing_shard_ranges):
|
||||||
|
"""
|
||||||
|
Compares new and existing shard ranges, updating the new shard ranges with
|
||||||
|
any more recent state from the existing, and returns shard ranges sorted
|
||||||
|
into those that need adding because they contain new or updated state and
|
||||||
|
those that need deleting because their state has been superseded.
|
||||||
|
|
||||||
|
:param new_shard_ranges: a list of dicts, each of which represents a shard
|
||||||
|
range.
|
||||||
|
:param existing_shard_ranges: a dict mapping shard range names to dicts
|
||||||
|
representing a shard range.
|
||||||
|
:return: a tuple (to_add, to_delete); to_add is a list of dicts, each of
|
||||||
|
which represents a shard range that is to be added to the existing
|
||||||
|
shard ranges; to_delete is a set of shard range names that are to be
|
||||||
|
deleted.
|
||||||
|
"""
|
||||||
|
to_delete = set()
|
||||||
|
to_add = {}
|
||||||
|
for item in new_shard_ranges:
|
||||||
|
item_ident = item['name']
|
||||||
|
existing = existing_shard_ranges.get(item_ident)
|
||||||
|
if merge_shards(item, existing):
|
||||||
|
# exists with older timestamp
|
||||||
|
if item_ident in existing_shard_ranges:
|
||||||
|
to_delete.add(item_ident)
|
||||||
|
# duplicate entries in item_list
|
||||||
|
if (item_ident not in to_add or
|
||||||
|
merge_shards(item, to_add[item_ident])):
|
||||||
|
to_add[item_ident] = item
|
||||||
|
return to_add.values(), to_delete
|
||||||
|
|
||||||
|
|
||||||
class ContainerBroker(DatabaseBroker):
|
class ContainerBroker(DatabaseBroker):
|
||||||
"""
|
"""
|
||||||
Encapsulates working with a container database.
|
Encapsulates working with a container database.
|
||||||
|
@ -1421,28 +1453,14 @@ class ContainerBroker(DatabaseBroker):
|
||||||
chunk = [record['name'] for record
|
chunk = [record['name'] for record
|
||||||
in item_list[offset:offset + SQLITE_ARG_LIMIT]]
|
in item_list[offset:offset + SQLITE_ARG_LIMIT]]
|
||||||
records.update(
|
records.update(
|
||||||
(rec[0], rec) for rec in curs.execute(
|
(rec[0], dict(zip(SHARD_RANGE_KEYS, rec)))
|
||||||
|
for rec in curs.execute(
|
||||||
'SELECT %s FROM %s '
|
'SELECT %s FROM %s '
|
||||||
'WHERE deleted IN (0, 1) AND name IN (%s)' %
|
'WHERE deleted IN (0, 1) AND name IN (%s)' %
|
||||||
(', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE,
|
(', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE,
|
||||||
','.join('?' * len(chunk))), chunk))
|
','.join('?' * len(chunk))), chunk))
|
||||||
|
|
||||||
# Sort item_list into things that need adding and deleting
|
to_add, to_delete = sift_shard_ranges(item_list, records)
|
||||||
to_delete = set()
|
|
||||||
to_add = {}
|
|
||||||
for item in item_list:
|
|
||||||
item_ident = item['name']
|
|
||||||
existing = records.get(item_ident)
|
|
||||||
if existing:
|
|
||||||
existing = dict(zip(SHARD_RANGE_KEYS, existing))
|
|
||||||
if merge_shards(item, existing):
|
|
||||||
# exists with older timestamp
|
|
||||||
if item_ident in records:
|
|
||||||
to_delete.add(item_ident)
|
|
||||||
# duplicate entries in item_list
|
|
||||||
if (item_ident not in to_add or
|
|
||||||
merge_shards(item, to_add[item_ident])):
|
|
||||||
to_add[item_ident] = item
|
|
||||||
|
|
||||||
if to_delete:
|
if to_delete:
|
||||||
curs.executemany(
|
curs.executemany(
|
||||||
|
@ -1455,7 +1473,7 @@ class ContainerBroker(DatabaseBroker):
|
||||||
'INSERT INTO %s (%s) VALUES (%s)' %
|
'INSERT INTO %s (%s) VALUES (%s)' %
|
||||||
(SHARD_RANGE_TABLE, ','.join(SHARD_RANGE_KEYS), vals),
|
(SHARD_RANGE_TABLE, ','.join(SHARD_RANGE_KEYS), vals),
|
||||||
tuple([item[k] for k in SHARD_RANGE_KEYS]
|
tuple([item[k] for k in SHARD_RANGE_KEYS]
|
||||||
for item in to_add.values()))
|
for item in to_add))
|
||||||
conn.commit()
|
conn.commit()
|
||||||
|
|
||||||
migrations = {
|
migrations = {
|
||||||
|
|
|
@ -24,7 +24,7 @@ from tempfile import mkdtemp
|
||||||
import six
|
import six
|
||||||
from six.moves import cStringIO as StringIO
|
from six.moves import cStringIO as StringIO
|
||||||
|
|
||||||
from swift.cli.manage_shard_ranges import main
|
from swift.cli.manage_shard_ranges import main, combine_shard_ranges
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import Timestamp, ShardRange
|
from swift.common.utils import Timestamp, ShardRange
|
||||||
from swift.container.backend import ContainerBroker
|
from swift.container.backend import ContainerBroker
|
||||||
|
@ -730,6 +730,230 @@ class TestManageShardRanges(unittest.TestCase):
|
||||||
self.assertEqual(expected, err.getvalue().splitlines())
|
self.assertEqual(expected, err.getvalue().splitlines())
|
||||||
self.assertEqual(expected_shard_ranges[:1], json.loads(out.getvalue()))
|
self.assertEqual(expected_shard_ranges[:1], json.loads(out.getvalue()))
|
||||||
|
|
||||||
|
def test_merge(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||||
|
(True, Timestamp.now().internal)})
|
||||||
|
good_shard_ranges = []
|
||||||
|
for shard in self.shard_data[:3]:
|
||||||
|
good_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=shard['lower'],
|
||||||
|
upper=shard['upper']))
|
||||||
|
# insert an overlap..
|
||||||
|
bad_shard_range = ShardRange(
|
||||||
|
name='a/c_bad_' + self.shard_data[1]['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=self.shard_data[1]['lower'],
|
||||||
|
upper=self.shard_data[2]['upper'])
|
||||||
|
broker.merge_shard_ranges(good_shard_ranges + [bad_shard_range])
|
||||||
|
self.assertEqual(
|
||||||
|
[('', 'obj09'),
|
||||||
|
('obj09', 'obj19'),
|
||||||
|
('obj09', 'obj29'),
|
||||||
|
('obj19', 'obj29')],
|
||||||
|
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
|
# use command to merge in a deleted version of the bad shard range
|
||||||
|
bad_shard_range.update_state(ShardRange.SHRUNK,
|
||||||
|
state_timestamp=next(self.ts_iter))
|
||||||
|
bad_shard_range.set_deleted(next(self.ts_iter))
|
||||||
|
bad_shard_range.update_meta(0, 0, next(self.ts_iter))
|
||||||
|
input_file = os.path.join(self.testdir, 'shards')
|
||||||
|
with open(input_file, 'w') as fd:
|
||||||
|
json.dump([dict(bad_shard_range)], fd)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, '-v', 'merge', input_file,
|
||||||
|
'--replace-timeout', '1', '--yes'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
affected_shard_ranges = [dict(sr) for sr in good_shard_ranges]
|
||||||
|
expected_msg = [
|
||||||
|
'This change will result in the following shard ranges in the '
|
||||||
|
'affected namespace:']
|
||||||
|
expected_msg.extend(
|
||||||
|
json.dumps(affected_shard_ranges, indent=2).splitlines())
|
||||||
|
expected_msg.extend(
|
||||||
|
['Injected 1 shard ranges.',
|
||||||
|
'Run container-replicator to replicate them to other nodes.'])
|
||||||
|
self.assertEqual(expected_msg, out.getvalue().splitlines())
|
||||||
|
self.assertEqual(['Loaded db broker for a/c'],
|
||||||
|
err.getvalue().splitlines())
|
||||||
|
self.assertEqual(
|
||||||
|
[dict(sr) for sr in good_shard_ranges],
|
||||||
|
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||||
|
self.assertEqual(
|
||||||
|
dict(bad_shard_range),
|
||||||
|
dict(broker.get_shard_ranges(include_deleted=True)[3]))
|
||||||
|
|
||||||
|
def test_merge_fills_gap(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||||
|
(True, Timestamp.now().internal)})
|
||||||
|
old_shard_ranges = []
|
||||||
|
for shard in self.shard_data[:1]:
|
||||||
|
old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=shard['lower'],
|
||||||
|
upper=shard['upper']))
|
||||||
|
|
||||||
|
# use command to merge in a deleted version of the existing and two
|
||||||
|
# new ranges
|
||||||
|
new_shard_ranges = [
|
||||||
|
old_shard_ranges[0].copy(deleted=True,
|
||||||
|
timestamp=next(self.ts_iter)),
|
||||||
|
ShardRange(
|
||||||
|
name='a/c_1_' + self.shard_data[0]['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=self.shard_data[0]['lower'],
|
||||||
|
upper=self.shard_data[0]['upper'] + 'a'),
|
||||||
|
ShardRange(
|
||||||
|
name='a/c_1_' + self.shard_data[0]['upper'] + 'a',
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=self.shard_data[0]['upper'] + 'a',
|
||||||
|
upper=self.shard_data[1]['upper'] + 'a'),
|
||||||
|
]
|
||||||
|
|
||||||
|
input_file = os.path.join(self.testdir, 'shards')
|
||||||
|
with open(input_file, 'w') as fd:
|
||||||
|
json.dump([dict(sr) for sr in new_shard_ranges], fd)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, '-v', 'merge', input_file,
|
||||||
|
'--replace-timeout', '1', '--yes'])
|
||||||
|
self.assertEqual(0, ret)
|
||||||
|
affected_shard_ranges = [dict(sr) for sr in new_shard_ranges[1:]]
|
||||||
|
expected_msg = [
|
||||||
|
'This change will result in the following shard ranges in the '
|
||||||
|
'affected namespace:']
|
||||||
|
expected_msg.extend(
|
||||||
|
json.dumps(affected_shard_ranges, indent=2).splitlines())
|
||||||
|
expected_msg.extend(
|
||||||
|
['Injected 3 shard ranges.',
|
||||||
|
'Run container-replicator to replicate them to other nodes.'])
|
||||||
|
self.assertEqual(expected_msg, out.getvalue().splitlines())
|
||||||
|
self.assertEqual(['Loaded db broker for a/c'],
|
||||||
|
err.getvalue().splitlines())
|
||||||
|
self.assertEqual(
|
||||||
|
[dict(sr) for sr in new_shard_ranges[1:]],
|
||||||
|
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||||
|
self.assertEqual(
|
||||||
|
[dict(sr) for sr in new_shard_ranges],
|
||||||
|
[dict(sr) for sr in broker.get_shard_ranges(include_deleted=True)])
|
||||||
|
|
||||||
|
def test_merge_warns_of_overlap(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||||
|
(True, Timestamp.now().internal)})
|
||||||
|
old_shard_ranges = []
|
||||||
|
for shard in self.shard_data[:3]:
|
||||||
|
old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=shard['lower'],
|
||||||
|
upper=shard['upper']))
|
||||||
|
broker.merge_shard_ranges(old_shard_ranges)
|
||||||
|
|
||||||
|
# use command to merge in a new range that overlaps...
|
||||||
|
new_shard_range = ShardRange(
|
||||||
|
name='a/c_bad_' + self.shard_data[1]['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=self.shard_data[1]['lower'] + 'a',
|
||||||
|
upper=self.shard_data[1]['upper'])
|
||||||
|
input_file = os.path.join(self.testdir, 'shards')
|
||||||
|
with open(input_file, 'w') as fd:
|
||||||
|
json.dump([dict(new_shard_range)], fd)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, '-v', 'merge', input_file,
|
||||||
|
'--replace-timeout', '1', '-n'])
|
||||||
|
self.assertEqual(3, ret)
|
||||||
|
affected_shard_ranges = [
|
||||||
|
dict(sr) for sr in [old_shard_ranges[0], old_shard_ranges[1],
|
||||||
|
new_shard_range, old_shard_ranges[2]]]
|
||||||
|
expected_msg = [
|
||||||
|
'This change will result in the following shard ranges in the '
|
||||||
|
'affected namespace:']
|
||||||
|
expected_msg.extend(
|
||||||
|
json.dumps(affected_shard_ranges, indent=2).splitlines())
|
||||||
|
expected_msg.extend(
|
||||||
|
['WARNING: this change will result in shard ranges overlaps!',
|
||||||
|
'No changes applied'])
|
||||||
|
self.assertEqual(expected_msg, out.getvalue().splitlines())
|
||||||
|
self.assertEqual(['Loaded db broker for a/c'],
|
||||||
|
err.getvalue().splitlines())
|
||||||
|
self.assertEqual(
|
||||||
|
[dict(sr) for sr in old_shard_ranges],
|
||||||
|
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
|
# repeat without -v flag
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, 'merge', input_file,
|
||||||
|
'--replace-timeout', '1', '-n'])
|
||||||
|
self.assertEqual(3, ret)
|
||||||
|
expected_msg = [
|
||||||
|
'WARNING: this change will result in shard ranges overlaps!',
|
||||||
|
'No changes applied']
|
||||||
|
self.assertEqual(expected_msg, out.getvalue().splitlines())
|
||||||
|
self.assertEqual(['Loaded db broker for a/c'],
|
||||||
|
err.getvalue().splitlines())
|
||||||
|
self.assertEqual(
|
||||||
|
[dict(sr) for sr in old_shard_ranges],
|
||||||
|
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
|
def test_merge_warns_of_gap(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||||
|
(True, Timestamp.now().internal)})
|
||||||
|
old_shard_ranges = []
|
||||||
|
for shard in self.shard_data[:3]:
|
||||||
|
old_shard_ranges.append(ShardRange(name='a/c_' + shard['lower'],
|
||||||
|
timestamp=next(self.ts_iter),
|
||||||
|
state=ShardRange.ACTIVE,
|
||||||
|
lower=shard['lower'],
|
||||||
|
upper=shard['upper']))
|
||||||
|
broker.merge_shard_ranges(old_shard_ranges)
|
||||||
|
|
||||||
|
# use command to merge in a deleted range that creates a gap...
|
||||||
|
new_shard_range = old_shard_ranges[1].copy(
|
||||||
|
timestamp=next(self.ts_iter), deleted=True)
|
||||||
|
input_file = os.path.join(self.testdir, 'shards')
|
||||||
|
with open(input_file, 'w') as fd:
|
||||||
|
json.dump([dict(new_shard_range)], fd)
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
|
||||||
|
ret = main([broker.db_file, '-v', 'merge', input_file,
|
||||||
|
'--replace-timeout', '1', '-n'])
|
||||||
|
self.assertEqual(3, ret)
|
||||||
|
affected_shard_ranges = [
|
||||||
|
dict(sr) for sr in [old_shard_ranges[0], old_shard_ranges[2]]]
|
||||||
|
expected_msg = [
|
||||||
|
'This change will result in the following shard ranges in the '
|
||||||
|
'affected namespace:']
|
||||||
|
expected_msg.extend(
|
||||||
|
json.dumps(affected_shard_ranges, indent=2).splitlines())
|
||||||
|
expected_msg.extend(
|
||||||
|
['WARNING: this change will result in shard ranges gaps!',
|
||||||
|
'No changes applied'])
|
||||||
|
self.assertEqual(expected_msg, out.getvalue().splitlines())
|
||||||
|
self.assertEqual(['Loaded db broker for a/c'],
|
||||||
|
err.getvalue().splitlines())
|
||||||
|
self.assertEqual(
|
||||||
|
[dict(sr) for sr in old_shard_ranges],
|
||||||
|
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
def test_replace(self):
|
def test_replace(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||||
|
@ -2523,3 +2747,46 @@ class TestManageShardRanges(unittest.TestCase):
|
||||||
self.assertIn(
|
self.assertIn(
|
||||||
"argument --yes/-y: not allowed with argument --dry-run/-n",
|
"argument --yes/-y: not allowed with argument --dry-run/-n",
|
||||||
err_lines[-2], err_lines)
|
err_lines[-2], err_lines)
|
||||||
|
|
||||||
|
def test_combine_shard_ranges(self):
|
||||||
|
ts_iter = make_timestamp_iter()
|
||||||
|
this = ShardRange('a/o', next(ts_iter).internal)
|
||||||
|
that = ShardRange('a/o', next(ts_iter).internal)
|
||||||
|
actual = combine_shard_ranges([dict(this)], [dict(that)])
|
||||||
|
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
|
||||||
|
actual = combine_shard_ranges([dict(that)], [dict(this)])
|
||||||
|
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
|
||||||
|
|
||||||
|
ts = next(ts_iter).internal
|
||||||
|
this = ShardRange('a/o', ts, state=ShardRange.ACTIVE,
|
||||||
|
state_timestamp=next(ts_iter))
|
||||||
|
that = ShardRange('a/o', ts, state=ShardRange.CREATED,
|
||||||
|
state_timestamp=next(ts_iter))
|
||||||
|
actual = combine_shard_ranges([dict(this)], [dict(that)])
|
||||||
|
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
|
||||||
|
actual = combine_shard_ranges([dict(that)], [dict(this)])
|
||||||
|
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
|
||||||
|
|
||||||
|
that.update_meta(1, 2, meta_timestamp=next(ts_iter))
|
||||||
|
this.update_meta(3, 4, meta_timestamp=next(ts_iter))
|
||||||
|
expected = that.copy(object_count=this.object_count,
|
||||||
|
bytes_used=this.bytes_used,
|
||||||
|
meta_timestamp=this.meta_timestamp)
|
||||||
|
actual = combine_shard_ranges([dict(this)], [dict(that)])
|
||||||
|
self.assertEqual([dict(expected)], [dict(sr) for sr in actual])
|
||||||
|
actual = combine_shard_ranges([dict(that)], [dict(this)])
|
||||||
|
self.assertEqual([dict(expected)], [dict(sr) for sr in actual])
|
||||||
|
|
||||||
|
this = ShardRange('a/o', next(ts_iter).internal)
|
||||||
|
that = ShardRange('a/o', next(ts_iter).internal, deleted=True)
|
||||||
|
actual = combine_shard_ranges([dict(this)], [dict(that)])
|
||||||
|
self.assertFalse(actual, [dict(sr) for sr in actual])
|
||||||
|
actual = combine_shard_ranges([dict(that)], [dict(this)])
|
||||||
|
self.assertFalse(actual, [dict(sr) for sr in actual])
|
||||||
|
|
||||||
|
this = ShardRange('a/o', next(ts_iter).internal, deleted=True)
|
||||||
|
that = ShardRange('a/o', next(ts_iter).internal)
|
||||||
|
actual = combine_shard_ranges([dict(this)], [dict(that)])
|
||||||
|
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
|
||||||
|
actual = combine_shard_ranges([dict(that)], [dict(this)])
|
||||||
|
self.assertEqual([dict(that)], [dict(sr) for sr in actual])
|
||||||
|
|
|
@ -36,7 +36,7 @@ import six
|
||||||
from swift.common.exceptions import LockTimeout
|
from swift.common.exceptions import LockTimeout
|
||||||
from swift.container.backend import ContainerBroker, \
|
from swift.container.backend import ContainerBroker, \
|
||||||
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
|
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
|
||||||
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES
|
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges
|
||||||
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
|
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
|
||||||
TombstoneReclaimer
|
TombstoneReclaimer
|
||||||
from swift.common.request_helpers import get_reserved_name
|
from swift.common.request_helpers import get_reserved_name
|
||||||
|
@ -6484,3 +6484,39 @@ class TestUpdateNewItemFromExisting(unittest.TestCase):
|
||||||
|
|
||||||
for scenario in self.scenarios_when_some_new_item_wins:
|
for scenario in self.scenarios_when_some_new_item_wins:
|
||||||
self._test_scenario(scenario, True)
|
self._test_scenario(scenario, True)
|
||||||
|
|
||||||
|
|
||||||
|
class TestModuleFunctions(unittest.TestCase):
|
||||||
|
def test_sift_shard_ranges(self):
|
||||||
|
ts_iter = make_timestamp_iter()
|
||||||
|
existing_shards = {}
|
||||||
|
sr1 = dict(ShardRange('a/o', next(ts_iter).internal))
|
||||||
|
sr2 = dict(ShardRange('a/o2', next(ts_iter).internal))
|
||||||
|
new_shard_ranges = [sr1, sr2]
|
||||||
|
|
||||||
|
# first empty existing shards will just add the shards
|
||||||
|
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
|
||||||
|
existing_shards)
|
||||||
|
self.assertEqual(2, len(to_add))
|
||||||
|
self.assertIn(sr1, to_add)
|
||||||
|
self.assertIn(sr2, to_add)
|
||||||
|
self.assertFalse(to_delete)
|
||||||
|
|
||||||
|
# if there is a newer version in the existing shards then it won't be
|
||||||
|
# added to to_add
|
||||||
|
existing_shards['a/o'] = dict(
|
||||||
|
ShardRange('a/o', next(ts_iter).internal))
|
||||||
|
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
|
||||||
|
existing_shards)
|
||||||
|
self.assertEqual([sr2], list(to_add))
|
||||||
|
self.assertFalse(to_delete)
|
||||||
|
|
||||||
|
# But if a newer version is in new_shard_ranges then the old will be
|
||||||
|
# added to to_delete and new is added to to_add.
|
||||||
|
sr1['timestamp'] = next(ts_iter).internal
|
||||||
|
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
|
||||||
|
existing_shards)
|
||||||
|
self.assertEqual(2, len(to_add))
|
||||||
|
self.assertIn(sr1, to_add)
|
||||||
|
self.assertIn(sr2, to_add)
|
||||||
|
self.assertEqual({'a/o'}, to_delete)
|
||||||
|
|
Loading…
Reference in New Issue