swift/test/unit/cli/test_manage_shard_ranges.py
Alistair Coles 12bb4839f0 swift-manage-shard-ranges: add 'compact' command
This patch adds a 'compact' command to swift-manage-shard-ranges that
enables sequences of contiguous shards with low object counts to be
compacted into another existing shard, or into the root container.

Change-Id: Ia8f3297d610b5a5cf5598d076fdaf30211832366
2021-02-05 17:18:29 +00:00

941 lines
43 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may not
# use this file except in compliance with the License. You may obtain a copy
# of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import os
import unittest
import mock
from shutil import rmtree
from tempfile import mkdtemp
from six.moves import cStringIO as StringIO
from swift.cli.manage_shard_ranges import main
from swift.common import utils
from swift.common.utils import Timestamp, ShardRange
from swift.container.backend import ContainerBroker
from swift.container.sharder import make_shard_ranges
from test.unit import mock_timestamp_now
class TestManageShardRanges(unittest.TestCase):
def setUp(self):
self.testdir = os.path.join(mkdtemp(), 'tmp_test_cli_find_shards')
utils.mkdirs(self.testdir)
rmtree(self.testdir)
self.shard_data = [
{'index': 0, 'lower': '', 'upper': 'obj09',
'object_count': 10},
{'index': 1, 'lower': 'obj09', 'upper': 'obj19',
'object_count': 10},
{'index': 2, 'lower': 'obj19', 'upper': 'obj29',
'object_count': 10},
{'index': 3, 'lower': 'obj29', 'upper': 'obj39',
'object_count': 10},
{'index': 4, 'lower': 'obj39', 'upper': 'obj49',
'object_count': 10},
{'index': 5, 'lower': 'obj49', 'upper': 'obj59',
'object_count': 10},
{'index': 6, 'lower': 'obj59', 'upper': 'obj69',
'object_count': 10},
{'index': 7, 'lower': 'obj69', 'upper': 'obj79',
'object_count': 10},
{'index': 8, 'lower': 'obj79', 'upper': 'obj89',
'object_count': 10},
{'index': 9, 'lower': 'obj89', 'upper': '',
'object_count': 10},
]
def tearDown(self):
rmtree(os.path.dirname(self.testdir))
def assert_starts_with(self, value, prefix):
self.assertTrue(value.startswith(prefix),
"%r does not start with %r" % (value, prefix))
def assert_formatted_json(self, output, expected):
try:
loaded = json.loads(output)
except ValueError as err:
self.fail('Invalid JSON: %s\n%r' % (err, output))
# Check this one first, for a prettier diff
self.assertEqual(loaded, expected)
formatted = json.dumps(expected, sort_keys=True, indent=2) + '\n'
self.assertEqual(output, formatted)
def _make_broker(self, account='a', container='c',
device='sda', part=0):
datadir = os.path.join(
self.testdir, device, 'containers', str(part), 'ash', 'hash')
db_file = os.path.join(datadir, 'hash.db')
broker = ContainerBroker(
db_file, account=account, container=container)
broker.initialize()
return broker
def _move_broker_to_sharded_state(self, broker):
epoch = Timestamp.now()
broker.enable_sharding(epoch)
self.assertTrue(broker.set_sharding_state())
self.assertTrue(broker.set_sharded_state())
own_sr = broker.get_own_shard_range()
own_sr.update_state(ShardRange.SHARDED, epoch)
broker.merge_shard_ranges([own_sr])
return epoch
def test_find_shard_ranges(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)
broker.account = 'a'
broker.container = 'c'
broker.initialize()
ts = utils.Timestamp.now()
broker.merge_items([
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 0,
'content_type': 'application/octet-stream', 'etag': 'not-really',
'deleted': 0, 'storage_policy_index': 0,
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
for i in range(100)])
# Default uses a large enough value that sharding isn't required
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find'])
self.assert_formatted_json(out.getvalue(), [])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 0 ranges in ')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find', '100'])
self.assert_formatted_json(out.getvalue(), [])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 0 ranges in ')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find', '99'])
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj98', 'object_count': 99},
{'index': 1, 'lower': 'obj98', 'upper': '', 'object_count': 1},
])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 2 ranges in ')
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([db_file, 'find', '10'])
self.assert_formatted_json(out.getvalue(), [
{'index': 0, 'lower': '', 'upper': 'obj09', 'object_count': 10},
{'index': 1, 'lower': 'obj09', 'upper': 'obj19',
'object_count': 10},
{'index': 2, 'lower': 'obj19', 'upper': 'obj29',
'object_count': 10},
{'index': 3, 'lower': 'obj29', 'upper': 'obj39',
'object_count': 10},
{'index': 4, 'lower': 'obj39', 'upper': 'obj49',
'object_count': 10},
{'index': 5, 'lower': 'obj49', 'upper': 'obj59',
'object_count': 10},
{'index': 6, 'lower': 'obj59', 'upper': 'obj69',
'object_count': 10},
{'index': 7, 'lower': 'obj69', 'upper': 'obj79',
'object_count': 10},
{'index': 8, 'lower': 'obj79', 'upper': 'obj89',
'object_count': 10},
{'index': 9, 'lower': 'obj89', 'upper': '', 'object_count': 10},
])
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
self.assert_starts_with(err_lines[1], 'Found 10 ranges in ')
def test_info(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'info'])
expected = ['Sharding enabled = True',
'Own shard range: None',
'db_state = unsharded',
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
retiring_db_id = broker.get_info()['id']
broker.merge_shard_ranges(ShardRange('.shards/cc', Timestamp.now()))
epoch = Timestamp.now()
with mock_timestamp_now(epoch) as now:
broker.enable_sharding(epoch)
self.assertTrue(broker.set_sharding_state())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now(now):
main([broker.db_file, 'info'])
expected = ['Sharding enabled = True',
'Own shard range: {',
' "bytes_used": 0,',
' "deleted": 0,',
' "epoch": "%s",' % epoch.internal,
' "lower": "",',
' "meta_timestamp": "%s",' % now.internal,
' "name": "a/c",',
' "object_count": 0,',
' "reported": 0,',
' "state": "sharding",',
' "state_timestamp": "%s",' % now.internal,
' "timestamp": "%s",' % now.internal,
' "upper": ""',
'}',
'db_state = sharding',
'Retiring db id: %s' % retiring_db_id,
'Cleaving context: {',
' "cleave_to_row": null,',
' "cleaving_done": false,',
' "cursor": "",',
' "last_cleave_to_row": null,',
' "max_row": -1,',
' "misplaced_done": false,',
' "ranges_done": 0,',
' "ranges_todo": 0,',
' "ref": "%s"' % retiring_db_id,
'}',
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
# The json.dumps() in py2 produces trailing space, not in py3.
result = [x.rstrip() for x in out.getvalue().splitlines()]
self.assertEqual(expected, result)
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self.assertTrue(broker.set_sharded_state())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now(now):
main([broker.db_file, 'info'])
expected = ['Sharding enabled = True',
'Own shard range: {',
' "bytes_used": 0,',
' "deleted": 0,',
' "epoch": "%s",' % epoch.internal,
' "lower": "",',
' "meta_timestamp": "%s",' % now.internal,
' "name": "a/c",',
' "object_count": 0,',
' "reported": 0,',
' "state": "sharding",',
' "state_timestamp": "%s",' % now.internal,
' "timestamp": "%s",' % now.internal,
' "upper": ""',
'}',
'db_state = sharded',
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected,
[x.rstrip() for x in out.getvalue().splitlines()])
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
def test_replace(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
input_file = os.path.join(self.testdir, 'shards')
with open(input_file, 'w') as fd:
json.dump(self.shard_data, fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'replace', input_file])
expected = [
'No shard ranges found to delete.',
'Injected 10 shard ranges.',
'Run container-replicator to replicate them to other nodes.',
'Use the enable sub-command to enable sharding.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self.assertEqual(
[(data['lower'], data['upper']) for data in self.shard_data],
[(sr.lower_str, sr.upper_str) for sr in broker.get_shard_ranges()])
def _assert_enabled(self, broker, epoch):
own_sr = broker.get_own_shard_range()
self.assertEqual(ShardRange.SHARDING, own_sr.state)
self.assertEqual(epoch, own_sr.epoch)
self.assertEqual(ShardRange.MIN, own_sr.lower)
self.assertEqual(ShardRange.MAX, own_sr.upper)
self.assertEqual(
'True', broker.metadata['X-Container-Sysmeta-Sharding'][0])
def test_enable(self):
broker = self._make_broker()
broker.update_metadata({'X-Container-Sysmeta-Sharding':
(True, Timestamp.now().internal)})
# no shard ranges
out = StringIO()
err = StringIO()
with self.assertRaises(SystemExit):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'enable'])
expected = ["WARNING: invalid shard ranges: ['No shard ranges.'].",
'Aborting.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
# success
shard_ranges = []
for data in self.shard_data:
path = ShardRange.make_path(
'.shards_a', 'c', 'c', Timestamp.now(), data['index'])
shard_ranges.append(
ShardRange(path, Timestamp.now(), data['lower'],
data['upper'], data['object_count']))
broker.merge_shard_ranges(shard_ranges)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now() as now:
main([broker.db_file, 'enable'])
expected = [
"Container moved to state 'sharding' with epoch %s." %
now.internal,
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
# already enabled
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'enable'])
expected = [
"Container already in state 'sharding' with epoch %s." %
now.internal,
'No action required.',
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
def test_find_replace_enable(self):
db_file = os.path.join(self.testdir, 'hash.db')
broker = ContainerBroker(db_file)
broker.account = 'a'
broker.container = 'c'
broker.initialize()
ts = utils.Timestamp.now()
broker.merge_items([
{'name': 'obj%02d' % i, 'created_at': ts.internal, 'size': 0,
'content_type': 'application/octet-stream', 'etag': 'not-really',
'deleted': 0, 'storage_policy_index': 0,
'ctype_timestamp': ts.internal, 'meta_timestamp': ts.internal}
for i in range(100)])
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with mock_timestamp_now() as now:
main([broker.db_file, 'find_and_replace', '10', '--enable'])
expected = [
'No shard ranges found to delete.',
'Injected 10 shard ranges.',
'Run container-replicator to replicate them to other nodes.',
"Container moved to state 'sharding' with epoch %s." %
now.internal,
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
found_shard_ranges = broker.get_shard_ranges()
self.assertEqual(
[(data['lower'], data['upper']) for data in self.shard_data],
[(sr.lower_str, sr.upper_str) for sr in found_shard_ranges])
# Do another find & replace but quit when prompted about existing
# shard ranges
out = StringIO()
err = StringIO()
to_patch = 'swift.cli.manage_shard_ranges.input'
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err), \
mock_timestamp_now() as now, \
mock.patch(to_patch, return_value='q'):
main([broker.db_file, 'find_and_replace', '10'])
# Shard ranges haven't changed at all
self.assertEqual(found_shard_ranges, broker.get_shard_ranges())
expected = ['This will delete existing 10 shard ranges.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
err.getvalue().splitlines())
def test_compact_bad_args(self):
broker = self._make_broker()
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
with self.assertRaises(SystemExit):
main([broker.db_file, 'compact', '--shrink-threshold', '0'])
with self.assertRaises(SystemExit):
main([broker.db_file, 'compact', '--expansion-limit', '0'])
with self.assertRaises(SystemExit):
main([broker.db_file, 'compact', '--max-shrinking', '0'])
with self.assertRaises(SystemExit):
main([broker.db_file, 'compact', '--max-expanding', '0'])
def test_compact_not_root(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
broker.merge_shard_ranges(shard_ranges)
# make broker appear to not be a root container
out = StringIO()
err = StringIO()
broker.set_sharding_sysmeta('Quoted-Root', 'not_a/c')
self.assertFalse(broker.is_root_container())
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['WARNING: Shard containers cannot be compacted.',
'This command should be used on a root container.'],
out_lines[:2]
)
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.FOUND] * 10,
[sr.state for sr in updated_ranges])
def test_compact_not_sharded(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
broker.merge_shard_ranges(shard_ranges)
# make broker appear to be a root container but it isn't sharded
out = StringIO()
err = StringIO()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
self.assertTrue(broker.is_root_container())
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['WARNING: Container is not yet sharded so cannot be compacted.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.FOUND] * 10,
[sr.state for sr in updated_ranges])
def test_compact_overlapping_shard_ranges(self):
# verify that containers with overlaps will not be compacted
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
shard_ranges[3].upper = shard_ranges[4].upper
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file,
'compact', '--yes', '--max-expanding', '10'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['WARNING: Container has overlapping shard ranges so cannot be '
'compacted.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.ACTIVE] * 10,
[sr.state for sr in updated_ranges])
def test_compact_shard_ranges_in_found_state(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['No shards identified for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assertEqual([ShardRange.FOUND] * 10,
[sr.state for sr in updated_ranges])
def test_compact_user_input(self):
# verify user input 'y' or 'n' is respected
small_ranges = (3, 4, 7)
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
if i not in small_ranges:
sr.object_count = 100001
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
def do_compact(user_input):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out),\
mock.patch('sys.stderr', err), \
mock.patch('swift.cli.manage_shard_ranges.input',
return_value=user_input):
ret = main([broker.db_file, 'compact',
'--max-shrinking', '99'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertIn('can expand to accept 20 objects', out_lines[0])
self.assertIn('(object count 10)', out_lines[1])
self.assertIn('(object count 10)', out_lines[2])
self.assertIn('can expand to accept 10 objects', out_lines[3])
self.assertIn('(object count 10)', out_lines[4])
broker_ranges = broker.get_shard_ranges()
return broker_ranges
broker_ranges = do_compact('n')
# expect no changes to shard ranges
self.assertEqual(shard_ranges, broker_ranges)
for i, sr in enumerate(broker_ranges):
self.assertEqual(ShardRange.ACTIVE, sr.state)
broker_ranges = do_compact('y')
# expect updated shard ranges
shard_ranges[5].lower = shard_ranges[3].lower
shard_ranges[8].lower = shard_ranges[7].lower
self.assertEqual(shard_ranges, broker_ranges)
for i, sr in enumerate(broker_ranges):
if i in small_ranges:
self.assertEqual(ShardRange.SHRINKING, sr.state)
else:
self.assertEqual(ShardRange.ACTIVE, sr.state)
def test_compact_three_donors_two_acceptors(self):
small_ranges = (2, 3, 4, 7)
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
if i not in small_ranges:
sr.object_count = 100001
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 2 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
for i, sr in enumerate(updated_ranges):
if i in small_ranges:
self.assertEqual(ShardRange.SHRINKING, sr.state)
else:
self.assertEqual(ShardRange.ACTIVE, sr.state)
shard_ranges[5].lower = shard_ranges[2].lower
shard_ranges[8].lower = shard_ranges[7].lower
self.assertEqual(shard_ranges, updated_ranges)
for i in (5, 8):
# acceptors should have updated timestamp
self.assertLess(shard_ranges[i].timestamp,
updated_ranges[i].timestamp)
# check idempotency
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99'])
self.assertEqual(0, ret)
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
for i, sr in enumerate(updated_ranges):
if i in small_ranges:
self.assertEqual(ShardRange.SHRINKING, sr.state)
else:
self.assertEqual(ShardRange.ACTIVE, sr.state)
def test_compact_all_donors_shrink_to_root(self):
# by default all shard ranges are small enough to shrink so the root
# becomes the acceptor
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
epoch = self._move_broker_to_sharded_state(broker)
own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(epoch, own_sr.state_timestamp) # sanity check
self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99'])
self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' %
(out.getvalue(), err.getvalue()))
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 1 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 10,
[sr.state for sr in updated_ranges])
updated_own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(own_sr.timestamp, updated_own_sr.timestamp)
self.assertEqual(own_sr.epoch, updated_own_sr.epoch)
self.assertLess(own_sr.state_timestamp,
updated_own_sr.state_timestamp)
self.assertEqual(ShardRange.ACTIVE, updated_own_sr.state)
def test_compact_single_donor_shrink_to_root(self):
# single shard range small enough to shrink so the root becomes the
# acceptor
broker = self._make_broker()
shard_data = [
{'index': 0, 'lower': '', 'upper': '', 'object_count': 10}
]
shard_ranges = make_shard_ranges(broker, shard_data, '.shards_')
shard_ranges[0].update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
epoch = self._move_broker_to_sharded_state(broker)
own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(epoch, own_sr.state_timestamp) # sanity check
self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes'])
self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' %
(out.getvalue(), err.getvalue()))
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 1 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING],
[sr.state for sr in updated_ranges])
updated_own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(own_sr.timestamp, updated_own_sr.timestamp)
self.assertEqual(own_sr.epoch, updated_own_sr.epoch)
self.assertLess(own_sr.state_timestamp,
updated_own_sr.state_timestamp)
self.assertEqual(ShardRange.ACTIVE, updated_own_sr.state)
def test_compact_donors_but_no_suitable_acceptor(self):
# if shard ranges are already shrinking, check that the final one is
# not made into an acceptor if a suitable adjacent acceptor is not
# found (unexpected scenario but possible in an overlap situation)
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, state in enumerate([ShardRange.SHRINKING] * 3 +
[ShardRange.SHARDING] +
[ShardRange.ACTIVE] * 6):
shard_ranges[i].update_state(state)
broker.merge_shard_ranges(shard_ranges)
epoch = self._move_broker_to_sharded_state(broker)
with mock_timestamp_now(epoch):
own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(epoch, own_sr.state_timestamp) # sanity check
self.assertEqual(ShardRange.SHARDED, own_sr.state) # sanity check
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99'])
self.assertEqual(0, ret, 'stdout:\n%s\nstderr\n%s' %
(out.getvalue(), err.getvalue()))
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 1 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
shard_ranges[9].lower = shard_ranges[4].lower # expanded acceptor
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 3 + # unchanged
[ShardRange.SHARDING] + # unchanged
[ShardRange.SHRINKING] * 5 + # moved to shrinking
[ShardRange.ACTIVE], # unchanged
[sr.state for sr in updated_ranges])
with mock_timestamp_now(epoch): # force equal meta-timestamp
updated_own_sr = broker.get_own_shard_range(no_default=True)
self.assertEqual(dict(own_sr), dict(updated_own_sr))
def test_compact_no_gaps(self):
# verify that compactable sequences do not include gaps
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
gapped_ranges = shard_ranges[:3] + shard_ranges[4:]
broker.merge_shard_ranges(gapped_ranges)
self._move_broker_to_sharded_state(broker)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 2 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
gapped_ranges[2].lower = gapped_ranges[0].lower
gapped_ranges[8].lower = gapped_ranges[3].lower
self.assertEqual(gapped_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 2 + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] * 5 + [ShardRange.ACTIVE],
[sr.state for sr in updated_ranges])
def test_compact_max_shrinking_default(self):
# verify default limit on number of shrinking shards per acceptor
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
def do_compact():
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 5 shard sequences for compaction.'],
out_lines[:1])
return broker.get_shard_ranges()
updated_ranges = do_compact()
for acceptor in (1, 3, 5, 7, 9):
shard_ranges[acceptor].lower = shard_ranges[acceptor - 1].lower
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5,
[sr.state for sr in updated_ranges])
# check idempotency
updated_ranges = do_compact()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING, ShardRange.ACTIVE] * 5,
[sr.state for sr in updated_ranges])
def test_compact_max_shrinking(self):
# verify option to limit the number of shrinking shards per acceptor
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
def do_compact():
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '7'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 2 shard sequences for compaction.'],
out_lines[:1])
return broker.get_shard_ranges()
updated_ranges = do_compact()
shard_ranges[7].lower = shard_ranges[0].lower
shard_ranges[9].lower = shard_ranges[8].lower
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
[sr.state for sr in updated_ranges])
# check idempotency
updated_ranges = do_compact()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 7 + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
[sr.state for sr in updated_ranges])
def test_compact_max_expanding(self):
# verify option to limit the number of expanding shards per acceptor
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
out = StringIO()
err = StringIO()
# note: max_shrinking is set to 3 so that there is opportunity for more
# than 2 acceptors
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '3', '--max-expanding', '2'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 2 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
shard_ranges[3].lower = shard_ranges[0].lower
shard_ranges[7].lower = shard_ranges[4].lower
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] * 3 + [ShardRange.ACTIVE] * 3,
[sr.state for sr in updated_ranges])
def test_compact_expansion_limit(self):
# verify option to limit the size of each acceptor after compaction
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--expansion-limit', '20'])
self.assertEqual(0, ret, out.getvalue())
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 5 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
shard_ranges[1].lower = shard_ranges[0].lower
shard_ranges[3].lower = shard_ranges[2].lower
shard_ranges[5].lower = shard_ranges[4].lower
shard_ranges[7].lower = shard_ranges[6].lower
shard_ranges[9].lower = shard_ranges[8].lower
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] + [ShardRange.ACTIVE] +
[ShardRange.SHRINKING] + [ShardRange.ACTIVE],
[sr.state for sr in updated_ranges])
def test_compact_shrink_threshold(self):
# verify option to set the shrink threshold for compaction;
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
for i, sr in enumerate(shard_ranges):
sr.update_state(ShardRange.ACTIVE)
# (n-2)th shard range has one extra object
shard_ranges[-2].object_count = 11
broker.merge_shard_ranges(shard_ranges)
self._move_broker_to_sharded_state(broker)
# with threshold set to 10 no shard ranges can be shrunk
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99',
'--shrink-threshold', '10'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['No shards identified for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.ACTIVE] * 10,
[sr.state for sr in updated_ranges])
# with threshold == 11 all but the final 2 shard ranges can be shrunk;
# note: the (n-1)th shard range is NOT shrunk to root
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'compact', '--yes',
'--max-shrinking', '99',
'--shrink-threshold', '11'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Updated 1 shard sequences for compaction.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
shard_ranges[8].lower = shard_ranges[0].lower
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2,
[sr.state for sr in updated_ranges])