Merge "s-m-s-r: read shard_ranges from stdin"
This commit is contained in:
commit
cc0d831eed
|
@ -161,6 +161,7 @@ import json
|
||||||
import os.path
|
import os.path
|
||||||
import sys
|
import sys
|
||||||
import time
|
import time
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
from six.moves import input
|
from six.moves import input
|
||||||
|
|
||||||
|
@ -207,12 +208,22 @@ def _print_shard_range(sr, level=0):
|
||||||
print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str))
|
print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str))
|
||||||
|
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _open_input(args):
|
||||||
|
if args.input == '-':
|
||||||
|
args.input = '<STDIN>'
|
||||||
|
yield sys.stdin
|
||||||
|
else:
|
||||||
|
with open(args.input, 'r') as fd:
|
||||||
|
yield fd
|
||||||
|
|
||||||
|
|
||||||
def _load_and_validate_shard_data(args, require_index=True):
|
def _load_and_validate_shard_data(args, require_index=True):
|
||||||
required_keys = ['lower', 'upper', 'object_count']
|
required_keys = ['lower', 'upper', 'object_count']
|
||||||
if require_index:
|
if require_index:
|
||||||
required_keys.append('index')
|
required_keys.append('index')
|
||||||
try:
|
try:
|
||||||
with open(args.input, 'r') as fd:
|
with _open_input(args) as fd:
|
||||||
try:
|
try:
|
||||||
data = json.load(fd)
|
data = json.load(fd)
|
||||||
if not isinstance(data, list):
|
if not isinstance(data, list):
|
||||||
|
|
|
@ -526,6 +526,77 @@ class TestManageShardRanges(unittest.TestCase):
|
||||||
[(data['lower'], data['upper']) for data in self.shard_data],
|
[(data['lower'], data['upper']) for data in self.shard_data],
|
||||||
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])
|
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
|
def test_analyze_stdin(self):
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
stdin = StringIO()
|
||||||
|
stdin.write(json.dumps([])) # empty but valid json
|
||||||
|
stdin.seek(0)
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err), \
|
||||||
|
mock.patch('sys.stdin', stdin):
|
||||||
|
main(['-', 'analyze'])
|
||||||
|
expected = [
|
||||||
|
'Found no complete sequence of shard ranges.',
|
||||||
|
'Repairs necessary to fill gaps.',
|
||||||
|
'Gap filling not supported by this tool. No repairs performed.',
|
||||||
|
]
|
||||||
|
|
||||||
|
self.assertEqual(expected, out.getvalue().splitlines())
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.update_metadata({'X-Container-Sysmeta-Sharding':
|
||||||
|
(True, Timestamp.now().internal)})
|
||||||
|
shard_ranges = [
|
||||||
|
dict(sr, state=ShardRange.STATES[sr.state])
|
||||||
|
for sr in make_shard_ranges(broker, self.shard_data, '.shards_')
|
||||||
|
]
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
stdin = StringIO()
|
||||||
|
stdin.write(json.dumps(shard_ranges))
|
||||||
|
stdin.seek(0)
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err), \
|
||||||
|
mock.patch('sys.stdin', stdin):
|
||||||
|
main(['-', 'analyze'])
|
||||||
|
expected = [
|
||||||
|
'Found one complete sequence of 10 shard ranges '
|
||||||
|
'and no overlapping shard ranges.',
|
||||||
|
'No repairs necessary.',
|
||||||
|
]
|
||||||
|
self.assertEqual(expected, out.getvalue().splitlines())
|
||||||
|
|
||||||
|
def test_analyze_stdin_with_overlaps(self):
|
||||||
|
broker = self._make_broker()
|
||||||
|
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
|
shard_ranges = make_shard_ranges(
|
||||||
|
broker, self.shard_data, '.shards_')
|
||||||
|
with mock_timestamp_now(next(self.ts_iter)):
|
||||||
|
overlap_shard_ranges_1 = make_shard_ranges(
|
||||||
|
broker, self.overlap_shard_data_1, '.shards_')
|
||||||
|
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1)
|
||||||
|
shard_ranges = [
|
||||||
|
dict(sr, state=ShardRange.STATES[sr.state])
|
||||||
|
for sr in broker.get_shard_ranges()
|
||||||
|
]
|
||||||
|
out = StringIO()
|
||||||
|
err = StringIO()
|
||||||
|
stdin = StringIO()
|
||||||
|
stdin.write(json.dumps(shard_ranges))
|
||||||
|
stdin.seek(0)
|
||||||
|
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err), \
|
||||||
|
mock.patch('sys.stdin', stdin):
|
||||||
|
main(['-', 'analyze'])
|
||||||
|
expected = [
|
||||||
|
'Repairs necessary to remove overlapping shard ranges.',
|
||||||
|
'Chosen a complete sequence of 10 shard ranges with '
|
||||||
|
'current total of 100 object records to accept object records '
|
||||||
|
'from 10 overlapping donor shard ranges.',
|
||||||
|
'Once applied to the broker these changes will result in:',
|
||||||
|
' 10 shard ranges being removed.',
|
||||||
|
' 10 object records being moved to the chosen shard ranges.',
|
||||||
|
]
|
||||||
|
self.assertEqual(expected, out.getvalue().splitlines())
|
||||||
|
|
||||||
def _assert_enabled(self, broker, epoch):
|
def _assert_enabled(self, broker, epoch):
|
||||||
own_sr = broker.get_own_shard_range()
|
own_sr = broker.get_own_shard_range()
|
||||||
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
||||||
|
|
Loading…
Reference in New Issue