OpenStack Storage (Swift)
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

7357 lines
345 KiB

# Copyright (c) 2010-2017 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import json
import random
import eventlet
import os
import shutil
from contextlib import contextmanager
from tempfile import mkdtemp
from uuid import uuid4
import mock
import unittest
from collections import defaultdict
import time
from copy import deepcopy
import six
from swift.common import internal_client
from swift.container import replicator
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
SHARDED, DATADIR
from swift.container.sharder import ContainerSharder, sharding_enabled, \
CleavingContext, DEFAULT_SHARDER_CONF, finalize_shrinking, \
find_shrinking_candidates, process_compactible_shard_sequences, \
find_compactible_shard_sequences, is_shrinking_candidate, \
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf
from swift.common.utils import ShardRange, Timestamp, hash_path, \
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
from test import annotate_failure
from test.debug_logger import debug_logger
from test.unit import FakeRing, make_timestamp_iter, unlink_files, \
mocked_http_conn, mock_timestamp_now, mock_timestamp_now_with_iter, \
attach_fake_replication_rpc
class BaseTestSharder(unittest.TestCase):
def setUp(self):
self.tempdir = mkdtemp()
self.ts_iter = make_timestamp_iter()
self.logger = debug_logger('sharder-test')
def tearDown(self):
shutil.rmtree(self.tempdir, ignore_errors=True)
def _assert_shard_ranges_equal(self, expected, actual):
self.assertEqual([dict(sr) for sr in expected],
[dict(sr) for sr in actual])
def _make_broker(self, account='a', container='c', epoch=None,
device='sda', part=0, hash_=None):
hash_ = hash_ or md5(
container.encode('utf-8'), usedforsecurity=False).hexdigest()
datadir = os.path.join(
self.tempdir, device, 'containers', str(part), hash_[-3:], hash_)
if epoch:
filename = '%s_%s.db' % (hash, epoch)
else:
filename = hash_ + '.db'
db_file = os.path.join(datadir, filename)
broker = ContainerBroker(
db_file, account=account, container=container,
logger=self.logger)
broker.initialize()
return broker
def _make_old_style_sharding_broker(self, account='a', container='c',
shard_bounds=(('', 'middle'),
('middle', ''))):
broker = self._make_broker(account=account, container=container)
broker.set_sharding_sysmeta('Root', 'a/c')
old_db_id = broker.get_info()['id']
broker.enable_sharding(next(self.ts_iter))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CLEAVED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
broker = ContainerBroker(broker.db_file, account='a', container='c')
self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check
return broker
def _make_sharding_broker(self, account='a', container='c',
shard_bounds=(('', 'middle'), ('middle', ''))):
broker = self._make_broker(account=account, container=container)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
old_db_id = broker.get_info()['id']
broker.enable_sharding(next(self.ts_iter))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CLEAVED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
broker = ContainerBroker(broker.db_file, account='a', container='c')
self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check
return broker
def _make_shard_ranges(self, bounds, state=None, object_count=0,
timestamp=Timestamp.now(), **kwargs):
if not isinstance(state, (tuple, list)):
state = [state] * len(bounds)
state_iter = iter(state)
return [ShardRange('.shards_a/c_%s_%s' % (upper, index), timestamp,
lower, upper, state=next(state_iter),
object_count=object_count, **kwargs)
for index, (lower, upper) in enumerate(bounds)]
def ts_encoded(self):
# make a unique timestamp string with multiple timestamps encoded;
# use different deltas between component timestamps
timestamps = [next(self.ts_iter) for i in range(4)]
return encode_timestamps(
timestamps[0], timestamps[1], timestamps[3])
class TestSharder(BaseTestSharder):
def _do_test_init(self, conf, expected, use_logger=True):
logger = self.logger if use_logger else None
if logger:
logger.clear()
with mock.patch(
'swift.container.sharder.internal_client.InternalClient') \
as mock_ic:
with mock.patch('swift.common.db_replicator.ring.Ring') \
as mock_ring:
mock_ring.return_value = mock.MagicMock()
mock_ring.return_value.replica_count = 3
sharder = ContainerSharder(conf, logger=logger)
mock_ring.assert_called_once_with(
'/etc/swift', ring_name='container')
for k, v in expected.items():
self.assertTrue(hasattr(sharder, k), 'Missing attr %s' % k)
self.assertEqual(v, getattr(sharder, k),
'Incorrect value: expected %s=%s but got %s' %
(k, v, getattr(sharder, k)))
return sharder, mock_ic
def test_init(self):
# default values
expected = {
'mount_check': True, 'bind_ip': '0.0.0.0', 'port': 6201,
'per_diff': 1000, 'max_diffs': 100, 'interval': 30,
'databases_per_second': 50,
'cleave_row_batch_size': 10000,
'node_timeout': 10, 'conn_timeout': 5,
'rsync_compress': False,
'rsync_module': '{replication_ip}::container',
'reclaim_age': 86400 * 7,
'shard_container_threshold': 1000000,
'rows_per_shard': 500000,
'shrink_threshold': 100000,
'expansion_limit': 750000,
'cleave_batch_size': 2,
'shard_scanner_batch_size': 10,
'rcache': '/var/cache/swift/container.recon',
'shards_account_prefix': '.shards_',
'auto_shard': False,
'recon_candidates_limit': 5,
'recon_sharded_timeout': 43200,
'shard_replication_quorum': 2,
'existing_shard_replication_quorum': 2,
'max_shrinking': 1,
'max_expanding': -1
}
sharder, mock_ic = self._do_test_init({}, expected, use_logger=False)
self.assertEqual(
'container-sharder', sharder.logger.logger.name)
mock_ic.assert_called_once_with(
'/etc/swift/internal-client.conf', 'Swift Container Sharder', 3,
allow_modify_pipeline=False,
use_replication_network=True)
# non-default values
conf = {
'mount_check': False, 'bind_ip': '10.11.12.13', 'bind_port': 62010,
'per_diff': 2000, 'max_diffs': 200, 'interval': 60,
'databases_per_second': 5,
'cleave_row_batch_size': 3000,
'node_timeout': 20, 'conn_timeout': 1,
'rsync_compress': True,
'rsync_module': '{replication_ip}::container_sda/',
'reclaim_age': 86400 * 14,
'shrink_threshold': 7000000,
'expansion_limit': 17000000,
'shard_container_threshold': 20000000,
'cleave_batch_size': 4,
'shard_scanner_batch_size': 8,
'request_tries': 2,
'internal_client_conf_path': '/etc/swift/my-sharder-ic.conf',
'recon_cache_path': '/var/cache/swift-alt',
'auto_create_account_prefix': '...',
'auto_shard': 'yes',
'recon_candidates_limit': 10,
'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0,
'max_shrinking': 5,
'max_expanding': 4,
'rows_per_shard': 13, # should be ignored - not configurable
}
expected = {
'mount_check': False, 'bind_ip': '10.11.12.13', 'port': 62010,
'per_diff': 2000, 'max_diffs': 200, 'interval': 60,
'databases_per_second': 5,
'cleave_row_batch_size': 3000,
'node_timeout': 20, 'conn_timeout': 1,
'rsync_compress': True,
'rsync_module': '{replication_ip}::container_sda',
'reclaim_age': 86400 * 14,
'shard_container_threshold': 20000000,
'rows_per_shard': 10000000,
'shrink_threshold': 7000000,
'expansion_limit': 17000000,
'cleave_batch_size': 4,
'shard_scanner_batch_size': 8,
'rcache': '/var/cache/swift-alt/container.recon',
'shards_account_prefix': '...shards_',
'auto_shard': True,
'recon_candidates_limit': 10,
'recon_sharded_timeout': 7200,
'shard_replication_quorum': 1,
'existing_shard_replication_quorum': 0,
'max_shrinking': 5,
'max_expanding': 4
}
sharder, mock_ic = self._do_test_init(conf, expected)
mock_ic.assert_called_once_with(
'/etc/swift/my-sharder-ic.conf', 'Swift Container Sharder', 2,
allow_modify_pipeline=False,
use_replication_network=True)
self.assertEqual(self.logger.get_lines_for_level('warning'), [
'Option auto_create_account_prefix is deprecated. '
'Configure auto_create_account_prefix under the '
'swift-constraints section of swift.conf. This option '
'will be ignored in a future release.'])
expected.update({'shard_replication_quorum': 3,
'existing_shard_replication_quorum': 3})
conf.update({'shard_replication_quorum': 4,
'existing_shard_replication_quorum': 4})
self._do_test_init(conf, expected)
warnings = self.logger.get_lines_for_level('warning')
self.assertEqual(warnings[:1], [
'Option auto_create_account_prefix is deprecated. '
'Configure auto_create_account_prefix under the '
'swift-constraints section of swift.conf. This option '
'will be ignored in a future release.'])
self.assertEqual(warnings[1:], [
'shard_replication_quorum of 4 exceeds replica count 3, '
'reducing to 3',
'existing_shard_replication_quorum of 4 exceeds replica count 3, '
'reducing to 3',
])
with self.assertRaises(ValueError) as cm:
self._do_test_init({'shard_shrink_point': 101}, {})
self.assertIn(
'greater than 0, less than 100, not "101"', str(cm.exception))
with self.assertRaises(ValueError) as cm:
self._do_test_init({'shard_shrink_merge_point': 101}, {})
self.assertIn(
'greater than 0, less than 100, not "101"', str(cm.exception))
def test_init_deprecated_options(self):
# percent values applied if absolute values not given
conf = {
'shard_shrink_point': 15, # trumps shrink_threshold
'shard_shrink_merge_point': 95, # trumps expansion_limit
'shard_container_threshold': 20000000,
}
expected = {
'mount_check': True, 'bind_ip': '0.0.0.0', 'port': 6201,
'per_diff': 1000, 'max_diffs': 100, 'interval': 30,
'databases_per_second': 50,
'cleave_row_batch_size': 10000,
'node_timeout': 10, 'conn_timeout': 5,
'rsync_compress': False,
'rsync_module': '{replication_ip}::container',
'reclaim_age': 86400 * 7,
'shard_container_threshold': 20000000,
'rows_per_shard': 10000000,
'shrink_threshold': 3000000,
'expansion_limit': 19000000,
'cleave_batch_size': 2,
'shard_scanner_batch_size': 10,
'rcache': '/var/cache/swift/container.recon',
'shards_account_prefix': '.shards_',
'auto_shard': False,
'recon_candidates_limit': 5,
'shard_replication_quorum': 2,
'existing_shard_replication_quorum': 2,
'max_shrinking': 1,
'max_expanding': -1
}
self._do_test_init(conf, expected)
# absolute values override percent values
conf = {
'shard_shrink_point': 15, # trumps shrink_threshold
'shrink_threshold': 7000000,
'shard_shrink_merge_point': 95, # trumps expansion_limit
'expansion_limit': 17000000,
'shard_container_threshold': 20000000,
}
expected = {
'mount_check': True, 'bind_ip': '0.0.0.0', 'port': 6201,
'per_diff': 1000, 'max_diffs': 100, 'interval': 30,
'databases_per_second': 50,
'cleave_row_batch_size': 10000,
'node_timeout': 10, 'conn_timeout': 5,
'rsync_compress': False,
'rsync_module': '{replication_ip}::container',
'reclaim_age': 86400 * 7,
'shard_container_threshold': 20000000,
'rows_per_shard': 10000000,
'shrink_threshold': 7000000,
'expansion_limit': 17000000,
'cleave_batch_size': 2,
'shard_scanner_batch_size': 10,
'rcache': '/var/cache/swift/container.recon',
'shards_account_prefix': '.shards_',
'auto_shard': False,
'recon_candidates_limit': 5,
'shard_replication_quorum': 2,
'existing_shard_replication_quorum': 2,
'max_shrinking': 1,
'max_expanding': -1
}
self._do_test_init(conf, expected)
def test_init_internal_client_conf_loading_error(self):
with mock.patch('swift.common.db_replicator.ring.Ring') \
as mock_ring:
mock_ring.return_value = mock.MagicMock()
mock_ring.return_value.replica_count = 3
with self.assertRaises(SystemExit) as cm:
ContainerSharder(
{'internal_client_conf_path':
os.path.join(self.tempdir, 'nonexistent')})
self.assertIn('Unable to load internal client', str(cm.exception))
with mock.patch('swift.common.db_replicator.ring.Ring') \
as mock_ring:
mock_ring.return_value = mock.MagicMock()
mock_ring.return_value.replica_count = 3
with mock.patch(
'swift.container.sharder.internal_client.InternalClient',
side_effect=Exception('kaboom')):
with self.assertRaises(Exception) as cm:
ContainerSharder({})
self.assertIn('kaboom', str(cm.exception))
def _assert_stats(self, expected, sharder, category):
# assertEqual doesn't work with a defaultdict
stats = sharder.stats['sharding'][category]
for k, v in expected.items():
actual = stats[k]
self.assertEqual(
v, actual, 'Expected %s but got %s for %s in %s' %
(v, actual, k, stats))
return stats
def _assert_recon_stats(self, expected, sharder, category):
with open(sharder.rcache, 'rb') as fd:
recon = json.load(fd)
stats = recon['sharding_stats']['sharding'].get(category)
self.assertEqual(expected, stats)
def test_increment_stats(self):
with self._mock_sharder() as sharder:
sharder._increment_stat('visited', 'success')
sharder._increment_stat('visited', 'success')
sharder._increment_stat('visited', 'failure')
sharder._increment_stat('visited', 'completed')
sharder._increment_stat('cleaved', 'success')
sharder._increment_stat('scanned', 'found', step=4)
expected = {'success': 2,
'failure': 1,
'completed': 1}
self._assert_stats(expected, sharder, 'visited')
self._assert_stats({'success': 1}, sharder, 'cleaved')
self._assert_stats({'found': 4}, sharder, 'scanned')
def test_increment_stats_with_statsd(self):
with self._mock_sharder() as sharder:
sharder._increment_stat('visited', 'success', statsd=True)
sharder._increment_stat('visited', 'success', statsd=True)
sharder._increment_stat('visited', 'failure', statsd=True)
sharder._increment_stat('visited', 'failure', statsd=False)
sharder._increment_stat('visited', 'completed')
expected = {'success': 2,
'failure': 2,
'completed': 1}
self._assert_stats(expected, sharder, 'visited')
counts = sharder.logger.get_increment_counts()
self.assertEqual(2, counts.get('visited_success'))
self.assertEqual(1, counts.get('visited_failure'))
self.assertIsNone(counts.get('visited_completed'))
def test_run_forever(self):
conf = {'recon_cache_path': self.tempdir,
'devices': self.tempdir}
with self._mock_sharder(conf) as sharder:
sharder._check_node = lambda node: os.path.join(
sharder.conf['devices'], node['device'])
sharder.logger.clear()
brokers = []
for container in ('c1', 'c2'):
broker = self._make_broker(
container=container, hash_=container + 'hash',
device=sharder.ring.devs[0]['device'], part=0)
broker.update_metadata({'X-Container-Sysmeta-Sharding':
('true', next(self.ts_iter).internal)})
brokers.append(broker)
fake_stats = {
'scanned': {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': 99, 'max_time': 123},
'created': {'attempted': 1, 'success': 1, 'failure': 1},
'cleaved': {'attempted': 1, 'success': 1, 'failure': 0,
'min_time': 0.01, 'max_time': 1.3},
'misplaced': {'attempted': 1, 'success': 1, 'failure': 0,
'found': 1, 'placed': 1, 'unplaced': 0},
'audit_root': {'attempted': 5, 'success': 4, 'failure': 1},
'audit_shard': {'attempted': 2, 'success': 2, 'failure': 0},
}
# NB these are time increments not absolute times...
fake_periods = [1, 2, 3, 3600, 4, 15, 15, 0]
fake_periods_iter = iter(fake_periods)
recon_data = []
fake_process_broker_calls = []
def mock_dump_recon_cache(data, *args):
recon_data.append(deepcopy(data))
with mock.patch('swift.container.sharder.time.time') as fake_time:
def fake_process_broker(broker, *args, **kwargs):
# increment time and inject some fake stats
fake_process_broker_calls.append((broker, args, kwargs))
try:
fake_time.return_value += next(fake_periods_iter)
except StopIteration:
# bail out
fake_time.side_effect = Exception('Test over')
sharder.stats['sharding'].update(fake_stats)
with mock.patch(
'swift.container.sharder.time.sleep') as mock_sleep:
with mock.patch(
'swift.container.sharder.is_sharding_candidate',
return_value=True):
with mock.patch(
'swift.container.sharder.dump_recon_cache',
mock_dump_recon_cache):
fake_time.return_value = next(fake_periods_iter)
sharder._is_sharding_candidate = lambda x: True
sharder._process_broker = fake_process_broker
with self.assertRaises(Exception) as cm:
sharder.run_forever()
self.assertEqual('Test over', str(cm.exception))
# four cycles are started, two brokers visited per cycle, but
# fourth never completes
self.assertEqual(8, len(fake_process_broker_calls))
# expect initial random sleep then one sleep between first and
# second pass
self.assertEqual(2, mock_sleep.call_count)
self.assertLessEqual(mock_sleep.call_args_list[0][0][0], 30)
self.assertLessEqual(mock_sleep.call_args_list[1][0][0],
30 - fake_periods[0])
lines = sharder.logger.get_lines_for_level('info')
categories = ('visited', 'scanned', 'created', 'cleaved',
'misplaced', 'audit_root', 'audit_shard')
def check_categories(start_time):
for category in categories:
line = lines.pop(0)
self.assertIn('Since %s' % time.ctime(start_time), line)
self.assertIn(category, line)
for k, v in fake_stats.get(category, {}).items():
self.assertIn('%s:%s' % (k, v), line)
def check_logs(cycle_time, start_time,
expect_periodic_stats=False):
self.assertIn('Container sharder cycle starting', lines.pop(0))
check_categories(start_time)
if expect_periodic_stats:
check_categories(start_time)
self.assertIn('Container sharder cycle completed: %.02fs' %
cycle_time, lines.pop(0))
check_logs(sum(fake_periods[1:3]), fake_periods[0])
check_logs(sum(fake_periods[3:5]), sum(fake_periods[:3]),
expect_periodic_stats=True)
check_logs(sum(fake_periods[5:7]), sum(fake_periods[:5]))
# final cycle start but then exception pops to terminate test
self.assertIn('Container sharder cycle starting', lines.pop(0))
self.assertFalse(lines)
lines = sharder.logger.get_lines_for_level('error')
self.assertIn(
'Unhandled exception while dumping progress', lines[0])
self.assertIn('Test over', lines[0])
def check_recon(data, time, last, expected_stats):
self.assertEqual(time, data['sharding_time'])
self.assertEqual(last, data['sharding_last'])
self.assertEqual(
expected_stats, dict(data['sharding_stats']['sharding']))
def stats_for_candidate(broker):
return {'object_count': 0,
'account': broker.account,
'meta_timestamp': mock.ANY,
'container': broker.container,
'file_size': os.stat(broker.db_file).st_size,
'path': broker.db_file,
'root': broker.path,
'node_index': 0}
self.assertEqual(4, len(recon_data))
# stats report at end of first cycle
fake_stats.update({'visited': {'attempted': 2, 'skipped': 0,
'success': 2, 'failure': 0,
'completed': 0}})
fake_stats.update({
'sharding_candidates': {
'found': 2,
'top': [stats_for_candidate(call[0])
for call in fake_process_broker_calls[:2]]
}
})
fake_stats.update({
'shrinking_candidates': {
'found': 0,
'top': []
}
})
check_recon(recon_data[0], sum(fake_periods[1:3]),
sum(fake_periods[:3]), fake_stats)
# periodic stats report after first broker has been visited during
# second cycle - one candidate identified so far this cycle
fake_stats.update({'visited': {'attempted': 1, 'skipped': 0,
'success': 1, 'failure': 0,
'completed': 0}})
fake_stats.update({
'sharding_candidates': {
'found': 1,
'top': [stats_for_candidate(call[0])
for call in fake_process_broker_calls[2:3]]
}
})
check_recon(recon_data[1], fake_periods[3],
sum(fake_periods[:4]), fake_stats)
# stats report at end of second cycle - both candidates reported
fake_stats.update({'visited': {'attempted': 2, 'skipped': 0,
'success': 2, 'failure': 0,
'completed': 0}})
fake_stats.update({
'sharding_candidates': {
'found': 2,
'top': [stats_for_candidate(call[0])
for call in fake_process_broker_calls[2:4]]
}
})
check_recon(recon_data[2], sum(fake_periods[3:5]),
sum(fake_periods[:5]), fake_stats)
# stats report at end of third cycle
fake_stats.update({'visited': {'attempted': 2, 'skipped': 0,
'success': 2, 'failure': 0,
'completed': 0}})
fake_stats.update({
'sharding_candidates': {
'found': 2,
'top': [stats_for_candidate(call[0])
for call in fake_process_broker_calls[4:6]]
}
})
check_recon(recon_data[3], sum(fake_periods[5:7]),
sum(fake_periods[:7]), fake_stats)
def test_one_shard_cycle(self):
conf = {'recon_cache_path': self.tempdir,
'devices': self.tempdir,
'shard_container_threshold': 9}
def fake_ismount(path):
# unmounted_dev is defined from .get_more_nodes() below
unmounted_path = os.path.join(conf['devices'],
unmounted_dev['device'])
if path == unmounted_path:
return False
else:
return True
with self._mock_sharder(conf) as sharder, \
mock.patch('swift.common.utils.ismount', fake_ismount), \
mock.patch('swift.container.sharder.is_local_device',
return_value=True):
sharder.reported = time.time()
brokers = []
device_ids = set(d['id'] for d in sharder.ring.devs)
sharder.ring.max_more_nodes = 1
unmounted_dev = next(sharder.ring.get_more_nodes(1))
unmounted_dev['device'] = 'xxxx'
sharder.ring.add_node(unmounted_dev)
for device_id in device_ids:
brokers.append(self._make_broker(
container='c%s' % device_id, hash_='c%shash' % device_id,
device=sharder.ring.devs[device_id]['device'], part=0))
# enable a/c2 and a/c3 for sharding
for broker in brokers[1:]:
broker.update_metadata({'X-Container-Sysmeta-Sharding':
('true', next(self.ts_iter).internal)})
# make a/c2 a candidate for sharding
for i in range(10):
brokers[1].put_object('o%s' % i, next(self.ts_iter).internal,
0, 'text/plain', 'etag', 0)
# check only sharding enabled containers are processed
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {'stale_node_id'}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(2, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
self.assertEqual({'a/c1', 'a/c2'}, set(processed_paths))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
expected_stats = {'attempted': 2, 'success': 2, 'failure': 0,
'skipped': 1, 'completed': 0}
self._assert_recon_stats(expected_stats, sharder, 'visited')
expected_candidate_stats = {
'found': 1,
'top': [{'object_count': 10, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1}]}
self._assert_recon_stats(
expected_candidate_stats, sharder, 'sharding_candidates')
self._assert_recon_stats(None, sharder, 'sharding_progress')
# enable and progress container a/c1 by giving it shard ranges
now = next(self.ts_iter)
brokers[0].merge_shard_ranges(
[ShardRange('a/c0', now, '', '', state=ShardRange.SHARDING),
ShardRange('.s_a/1', now, '', 'b', state=ShardRange.ACTIVE),
ShardRange('.s_a/2', now, 'b', 'c', state=ShardRange.CLEAVED),
ShardRange('.s_a/3', now, 'c', 'd', state=ShardRange.CREATED),
ShardRange('.s_a/4', now, 'd', 'e', state=ShardRange.CREATED),
ShardRange('.s_a/5', now, 'e', '', state=ShardRange.FOUND)])
brokers[1].merge_shard_ranges(
[ShardRange('a/c1', now, '', '', state=ShardRange.SHARDING),
ShardRange('.s_a/6', now, '', 'b', state=ShardRange.ACTIVE),
ShardRange('.s_a/7', now, 'b', 'c', state=ShardRange.ACTIVE),
ShardRange('.s_a/8', now, 'c', 'd', state=ShardRange.CLEAVED),
ShardRange('.s_a/9', now, 'd', 'e', state=ShardRange.CREATED),
ShardRange('.s_a/0', now, 'e', '', state=ShardRange.CREATED)])
for i in range(11):
brokers[2].put_object('o%s' % i, next(self.ts_iter).internal,
0, 'text/plain', 'etag', 0)
def mock_processing(broker, node, part):
if broker.path == 'a/c1':
raise Exception('kapow!')
elif broker.path not in ('a/c0', 'a/c2'):
raise BaseException("I don't know how to handle a broker "
"for %s" % broker.path)
# check exceptions are handled
sharder.logger.clear()
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker', side_effect=mock_processing
) as mock_process_broker:
sharder._local_device_ids = {'stale_node_id'}
sharder._one_shard_cycle(Everything(), Everything())
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % \
unmounted_dev['device']
self.assertIn(expected, lines[0])
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths))
lines = sharder.logger.get_lines_for_level('error')
self.assertIn('Unhandled exception while processing', lines[0])
self.assertFalse(lines[1:])
sharder.logger.clear()
expected_stats = {'attempted': 3, 'success': 2, 'failure': 1,
'skipped': 0, 'completed': 0}
self._assert_recon_stats(expected_stats, sharder, 'visited')
expected_candidate_stats = {
'found': 1,
'top': [{'object_count': 11, 'account': 'a', 'container': 'c2',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[2].db_file, 'root': 'a/c2',
'node_index': 2}]}
self._assert_recon_stats(
expected_candidate_stats, sharder, 'sharding_candidates')
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[0].db_file).st_size,
'path': brokers[0].db_file, 'root': 'a/c0',
'node_index': 0,
'found': 1, 'created': 2, 'cleaved': 1, 'active': 1,
'state': 'sharding', 'db_state': 'unsharded',
'error': None},
{'object_count': 10, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': 'kapow!'}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# check that candidates and in progress stats don't stick in recon
own_shard_range = brokers[0].get_own_shard_range()
own_shard_range.state = ShardRange.ACTIVE
brokers[0].merge_shard_ranges([own_shard_range])
for i in range(10):
brokers[1].delete_object(
'o%s' % i, next(self.ts_iter).internal)
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
self.assertEqual(device_ids, sharder._local_device_ids)
self.assertEqual(3, mock_process_broker.call_count)
processed_paths = [call[0][0].path
for call in mock_process_broker.call_args_list]
self.assertEqual({'a/c0', 'a/c1', 'a/c2'}, set(processed_paths))
self.assertFalse(sharder.logger.get_lines_for_level('error'))
expected_stats = {'attempted': 3, 'success': 3, 'failure': 0,
'skipped': 0, 'completed': 0}
self._assert_recon_stats(expected_stats, sharder, 'visited')
self._assert_recon_stats(
expected_candidate_stats, sharder, 'sharding_candidates')
self._assert_recon_stats(None, sharder, 'sharding_progress')
# let's progress broker 1 (broker[0])
brokers[0].enable_sharding(next(self.ts_iter))
brokers[0].set_sharding_state()
shard_ranges = brokers[0].get_shard_ranges()
for sr in shard_ranges[:-1]:
sr.update_state(ShardRange.CLEAVED)
brokers[0].merge_shard_ranges(shard_ranges)
with mock.patch('eventlet.sleep'), mock.patch.object(
sharder, '_process_broker'
) as mock_process_broker:
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[0].db_file).st_size,
'path': brokers[0].db_file, 'root': 'a/c0',
'node_index': 0,
'found': 1, 'created': 0, 'cleaved': 3, 'active': 1,
'state': 'sharding', 'db_state': 'sharding',
'error': None},
{'object_count': 0, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': None}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# Now complete sharding broker 1.
shard_ranges[-1].update_state(ShardRange.CLEAVED)
own_sr = brokers[0].get_own_shard_range()
own_sr.update_state(ShardRange.SHARDED)
brokers[0].merge_shard_ranges(shard_ranges + [own_sr])
# make and complete a cleave context, this is used for the
# recon_sharded_timeout timer.
cxt = CleavingContext.load(brokers[0])
cxt.misplaced_done = cxt.cleaving_done = True
ts_now = next(self.ts_iter)
with mock_timestamp_now(ts_now):
cxt.store(brokers[0])
self.assertTrue(brokers[0].set_sharded_state())
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c0',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[0].db_file).st_size,
'path': brokers[0].db_file, 'root': 'a/c0',
'node_index': 0,
'found': 0, 'created': 0, 'cleaved': 4, 'active': 1,
'state': 'sharded', 'db_state': 'sharded',
'error': None},
{'object_count': 0, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': None}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# one more cycle at recon_sharded_timeout seconds into the
# future to check that the completed broker is still reported
ts_now = Timestamp(ts_now.timestamp +
sharder.recon_sharded_timeout)
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
# when we move recon_sharded_timeout + 1 seconds into the future,
# broker 1 will be removed from the progress report
ts_now = Timestamp(ts_now.timestamp +
sharder.recon_sharded_timeout + 1)
with mock.patch('eventlet.sleep'), \
mock.patch.object(sharder, '_process_broker') \
as mock_process_broker, mock_timestamp_now(ts_now):
sharder._local_device_ids = {999}
sharder._one_shard_cycle(Everything(), Everything())
expected_in_progress_stats = {
'all': [{'object_count': 0, 'account': 'a', 'container': 'c1',
'meta_timestamp': mock.ANY,
'file_size': os.stat(brokers[1].db_file).st_size,
'path': brokers[1].db_file, 'root': 'a/c1',
'node_index': 1,
'found': 0, 'created': 2, 'cleaved': 1, 'active': 2,
'state': 'sharding', 'db_state': 'unsharded',
'error': None}]}
self._assert_stats(
expected_in_progress_stats, sharder, 'sharding_in_progress')
def test_one_shard_cycle_no_containers(self):
conf = {'recon_cache_path': self.tempdir,
'devices': self.tempdir,
'mount_check': False}
with self._mock_sharder(conf) as sharder:
for dev in sharder.ring.devs:
os.mkdir(os.path.join(self.tempdir, dev['device']))
with mock.patch('swift.container.sharder.is_local_device',
return_value=True):
sharder._one_shard_cycle(Everything(), Everything())
self.assertEqual([], sharder.logger.get_lines_for_level('warning'))
self.assertIn('Found no containers directories',
sharder.logger.get_lines_for_level('info'))
with self._mock_sharder(conf) as sharder:
os.mkdir(os.path.join(self.tempdir, dev['device'], 'containers'))
with mock.patch('swift.container.sharder.is_local_device',
return_value=True):
sharder._one_shard_cycle(Everything(), Everything())
self.assertEqual([], sharder.logger.get_lines_for_level('warning'))
self.assertNotIn('Found no containers directories',
sharder.logger.get_lines_for_level('info'))
def test_ratelimited_roundrobin(self):
n_databases = 100
def stub_iter(dirs):
for i in range(n_databases):
yield i, '/srv/node/sda/path/to/container.db', {}
now = time.time()
clock = {
'sleeps': [],
'now': now,
}
def fake_sleep(t):
clock['sleeps'].append(t)
clock['now'] += t
def fake_time():
return clock['now']
with self._mock_sharder({'databases_per_second': 1}) as sharder, \
mock.patch('swift.common.db_replicator.roundrobin_datadirs',
stub_iter), \
mock.patch('time.time', fake_time), \
mock.patch('eventlet.sleep', fake_sleep):
list(sharder.roundrobin_datadirs(None))
# 100 db at 1/s should take ~100s
run_time = sum(clock['sleeps'])
self.assertTrue(97 <= run_time < 100, 'took %s' % run_time)
n_databases = 1000
now = time.time()
clock = {
'sleeps': [],
'now': now,
}
with self._mock_sharder({'databases_per_second': 50}) as sharder, \
mock.patch('swift.common.db_replicator.roundrobin_datadirs',
stub_iter), \
mock.patch('time.time', fake_time), \
mock.patch('eventlet.sleep', fake_sleep):
list(sharder.roundrobin_datadirs(None))
# 1000 db at 50/s
run_time = sum(clock['sleeps'])
self.assertTrue(18 <= run_time < 20, 'took %s' % run_time)
@contextmanager
def _mock_sharder(self, conf=None, replicas=3):
self.logger.clear()
conf = conf or {}
conf['devices'] = self.tempdir
fake_ring = FakeRing(replicas=replicas, separate_replication=True)
with mock.patch(
'swift.container.sharder.internal_client.InternalClient'):
with mock.patch(
'swift.common.db_replicator.ring.Ring',
return_value=fake_ring):
sharder = ContainerSharder(conf, logger=self.logger)
sharder._local_device_ids = {0, 1, 2}
sharder._replicate_object = mock.MagicMock(
return_value=(True, [True] * sharder.ring.replica_count))
yield sharder
def _get_raw_object_records(self, broker):
# use list_objects_iter with no-op transform_func to get back actual
# un-transformed rows with encoded timestamps
return [list(obj) for obj in broker.list_objects_iter(
10, '', '', '', '', include_deleted=None, all_policies=True,
transform_func=lambda record: record)]
def _check_objects(self, expected_objs, shard_dbs):
shard_dbs = shard_dbs if isinstance(shard_dbs, list) else [shard_dbs]
shard_objs = []
for shard_db in shard_dbs:
shard_broker = ContainerBroker(shard_db)
shard_objs.extend(self._get_raw_object_records(shard_broker))
expected_objs = [list(obj) for obj in expected_objs]
self.assertEqual(expected_objs, shard_objs)
def _check_shard_range(self, expected, actual):
expected_dict = dict(expected)
actual_dict = dict(actual)
self.assertGreater(actual_dict.pop('meta_timestamp'),
expected_dict.pop('meta_timestamp'))
self.assertEqual(expected_dict, actual_dict)
def test_check_node(self):
node = {
'replication_ip': '127.0.0.1',
'replication_port': 5000,
'device': 'd100',
}
with self._mock_sharder() as sharder:
sharder.mount_check = True
sharder.ips = ['127.0.0.1']
sharder.port = 5000
# normal behavior
with mock.patch(
'swift.common.utils.ismount',
lambda *args: True):
r = sharder._check_node(node)
expected = os.path.join(sharder.conf['devices'], node['device'])
self.assertEqual(r, expected)
# test with an unmounted drive
with mock.patch(
'swift.common.utils.ismount',
lambda *args: False):
r = sharder._check_node(node)
self.assertEqual(r, False)
lines = sharder.logger.get_lines_for_level('warning')
expected = 'Skipping %s as it is not mounted' % node['device']
self.assertIn(expected, lines[0])
def test_fetch_shard_ranges_unexpected_response(self):
broker = self._make_broker()
exc = internal_client.UnexpectedResponse(
'Unexpected response: 404', None)
with self._mock_sharder() as sharder:
sharder.int_client.make_request.side_effect = exc
self.assertIsNone(sharder._fetch_shard_ranges(broker))
lines = sharder.logger.get_lines_for_level('warning')
self.assertIn('Unexpected response: 404', lines[0])
self.assertFalse(lines[1:])
def test_fetch_shard_ranges_bad_record_type(self):
def do_test(mock_resp_headers):
with self._mock_sharder() as sharder:
mock_make_request = mock.MagicMock(
return_value=mock.MagicMock(headers=mock_resp_headers))
sharder.int_client.make_request = mock_make_request
self.assertIsNone(sharder._fetch_shard_ranges(broker))
lines = sharder.logger.get_lines_for_level('error')
self.assertIn('unexpected record type', lines[0])
self.assertFalse(lines[1:])
broker = self._make_broker()
do_test({})
do_test({'x-backend-record-type': 'object'})
do_test({'x-backend-record-type': 'disco'})
def test_fetch_shard_ranges_bad_data(self):
def do_test(mock_resp_body):
mock_resp_headers = {'x-backend-record-type': 'shard'}
with self._mock_sharder() as sharder:
mock_make_request = mock.MagicMock(
return_value=mock.MagicMock(headers=mock_resp_headers,
body=mock_resp_body))
sharder.int_client.make_request = mock_make_request
self.assertIsNone(sharder._fetch_shard_ranges(broker))
lines = sharder.logger.get_lines_for_level('error')
self.assertIn('invalid data', lines[0])
self.assertFalse(lines[1:])
broker = self._make_broker()
do_test({})
do_test('')
do_test(json.dumps({}))
do_test(json.dumps([{'account': 'a', 'container': 'c'}]))
def test_fetch_shard_ranges_ok(self):
def do_test(mock_resp_body, params):
mock_resp_headers = {'x-backend-record-type': 'shard'}
with self._mock_sharder() as sharder:
mock_make_request = mock.MagicMock(
return_value=mock.MagicMock(headers=mock_resp_headers,
body=mock_resp_body))
sharder.int_client.make_request = mock_make_request
mock_make_path = mock.MagicMock(return_value='/v1/a/c')
sharder.int_client.make_path = mock_make_path
actual = sharder._fetch_shard_ranges(broker, params=params)
sharder.int_client.make_path.assert_called_once_with('a', 'c')
self.assertFalse(sharder.logger.get_lines_for_level('error'))
return actual, mock_make_request
expected_headers = {'X-Backend-Record-Type': 'shard',
'X-Backend-Include-Deleted': 'False',
'X-Backend-Override-Deleted': 'true'}
broker = self._make_broker()
shard_ranges = self._make_shard_ranges((('', 'm'), ('m', '')))
params = {'format': 'json'}
actual, mock_call = do_test(json.dumps([dict(shard_ranges[0])]),
params={})
mock_call.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
self._assert_shard_ranges_equal([shard_ranges[0]], actual)
params = {'format': 'json', 'includes': 'thing'}
actual, mock_call = do_test(
json.dumps([dict(sr) for sr in shard_ranges]), params=params)
self._assert_shard_ranges_equal(shard_ranges, actual)
mock_call.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
params = {'format': 'json',
'end_marker': 'there', 'marker': 'here'}
actual, mock_call = do_test(json.dumps([]), params=params)
self._assert_shard_ranges_equal([], actual)
mock_call.assert_called_once_with(
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
params=params)
def _check_cleave_root(self, conf=None):
broker = self._make_broker()
objects = [
# shard 0
('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0),
('here', self.ts_encoded(), 10, 'text/plain', 'etag_here', 0, 0),
# shard 1
('m', self.ts_encoded(), 1, 'text/plain', 'etag_m', 0, 0),
('n', self.ts_encoded(), 2, 'text/plain', 'etag_n', 0, 0),
('there', self.ts_encoded(), 3, 'text/plain', 'etag_there', 0, 0),
# shard 2
('where', self.ts_encoded(), 100, 'text/plain', 'etag_where', 0,
0),
# shard 3
('x', self.ts_encoded(), 0, '', '', 1, 0), # deleted
('y', self.ts_encoded(), 1000, 'text/plain', 'etag_y', 0, 0),
# shard 4
('yyyy', self.ts_encoded(), 14, 'text/plain', 'etag_yyyy', 0, 0),
]
for obj in objects:
broker.put_object(*obj)
initial_root_info = broker.get_info()
broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'here'), ('here', 'there'),
('there', 'where'), ('where', 'yonder'),
('yonder', ''))
shard_ranges = self._make_shard_ranges(shard_bounds)
expected_shard_dbs = []
for shard_range in shard_ranges:
db_hash = hash_path(shard_range.account, shard_range.container)
expected_shard_dbs.append(
os.path.join(self.tempdir, 'sda', 'containers', '0',
db_hash[-3:], db_hash, db_hash + '.db'))
# used to accumulate stats from sharded dbs
total_shard_stats = {'object_count': 0, 'bytes_used': 0}
# run cleave - no shard ranges, nothing happens
with self._mock_sharder(conf=conf) as sharder:
self.assertFalse(sharder._cleave(broker))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual('', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(0, context.ranges_done)
self.assertEqual(0, context.ranges_todo)
self.assertEqual(UNSHARDED, broker.get_db_state())
sharder._replicate_object.assert_not_called()
for db in expected_shard_dbs:
with annotate_failure(db):
self.assertFalse(os.path.exists(db))
# run cleave - all shard ranges in found state, nothing happens
broker.merge_shard_ranges(shard_ranges[:4])
self.assertTrue(broker.set_sharding_state())
with self._mock_sharder(conf=conf) as sharder:
self.assertFalse(sharder._cleave(broker))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual('', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(0, context.ranges_done)
self.assertEqual(4, context.ranges_todo)
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_not_called()
for db in expected_shard_dbs:
with annotate_failure(db):
self.assertFalse(os.path.exists(db))
for shard_range in broker.get_shard_ranges():
with annotate_failure(shard_range):
self.assertEqual(ShardRange.FOUND, shard_range.state)
# move first shard range to created state, first shard range is cleaved
shard_ranges[0].update_state(ShardRange.CREATED)
broker.merge_shard_ranges(shard_ranges[:1])
with self._mock_sharder(conf=conf) as sharder:
self.assertFalse(sharder._cleave(broker))
expected = {'attempted': 1, 'success': 1, 'failure': 0,
'min_time': mock.ANY, 'max_time': mock.ANY}
stats = self._assert_stats(expected, sharder, 'cleaved')
self.assertIsInstance(stats['min_time'], float)
self.assertIsInstance(stats['max_time'], float)
self.assertLessEqual(stats['min_time'], stats['max_time'])
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_called_once_with(
0, expected_shard_dbs[0], 0)
shard_broker = ContainerBroker(expected_shard_dbs[0])
shard_own_sr = shard_broker.get_own_shard_range()
self.assertEqual(ShardRange.CLEAVED, shard_own_sr.state)
shard_info = shard_broker.get_info()
total_shard_stats['object_count'] += shard_info['object_count']
total_shard_stats['bytes_used'] += shard_info['bytes_used']
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(4, len(updated_shard_ranges))
# update expected state and metadata, check cleaved shard range
shard_ranges[0].bytes_used = 20
shard_ranges[0].object_count = 2
shard_ranges[0].state = ShardRange.CLEAVED
self._check_shard_range(shard_ranges[0], updated_shard_ranges[0])
self._check_objects(objects[:2], expected_shard_dbs[0])
# other shard ranges should be unchanged
for i in range(1, len(shard_ranges)):
with annotate_failure(i):
self.assertFalse(os.path.exists(expected_shard_dbs[i]))
for i in range(1, len(updated_shard_ranges)):
with annotate_failure(i):
self.assertEqual(dict(shard_ranges[i]),
dict(updated_shard_ranges[i]))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual('here', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(1, context.ranges_done)
self.assertEqual(3, context.ranges_todo)
unlink_files(expected_shard_dbs)
# move more shard ranges to created state
for i in range(1, 4):
shard_ranges[i].update_state(ShardRange.CREATED)
broker.merge_shard_ranges(shard_ranges[1:4])
# replication of next shard range is not sufficiently successful
with self._mock_sharder(conf=conf) as sharder:
quorum = quorum_size(sharder.ring.replica_count)
successes = [True] * (quorum - 1)
fails = [False] * (sharder.ring.replica_count - len(successes))
responses = successes + fails
random.shuffle(responses)
sharder._replicate_object = mock.MagicMock(
side_effect=((False, responses),))
self.assertFalse(sharder._cleave(broker))
sharder._replicate_object.assert_called_once_with(
0, expected_shard_dbs[1], 0)
# cleaving state is unchanged
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(4, len(updated_shard_ranges))
for i in range(1, len(updated_shard_ranges)):
with annotate_failure(i):
self.assertEqual(dict(shard_ranges[i]),
dict(updated_shard_ranges[i]))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual('here', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(1, context.ranges_done)
self.assertEqual(3, context.ranges_todo)
# try again, this time replication is sufficiently successful
with self._mock_sharder(conf=conf) as sharder:
successes = [True] * quorum
fails = [False] * (sharder.ring.replica_count - len(successes))
responses1 = successes + fails
responses2 = fails + successes
sharder._replicate_object = mock.MagicMock(
side_effect=((False, responses1), (False, responses2)))
self.assertFalse(sharder._cleave(broker))
expected = {'attempted': 2, 'success': 2, 'failure': 0,
'min_time': mock.ANY, 'max_time': mock.ANY}
stats = self._assert_stats(expected, sharder, 'cleaved')
self.assertIsInstance(stats['min_time'], float)
self.assertIsInstance(stats['max_time'], float)
self.assertLessEqual(stats['min_time'], stats['max_time'])
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_has_calls(
[mock.call(0, db, 0) for db in expected_shard_dbs[1:3]]
)
for db in expected_shard_dbs[1:3]:
shard_broker = ContainerBroker(db)
shard_own_sr = shard_broker.get_own_shard_range()
self.assertEqual(ShardRange.CLEAVED, shard_own_sr.state)
shard_info = shard_broker.get_info()
total_shard_stats['object_count'] += shard_info['object_count']
total_shard_stats['bytes_used'] += shard_info['bytes_used']
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(4, len(updated_shard_ranges))
# only 2 are cleaved per batch
# update expected state and metadata, check cleaved shard ranges
shard_ranges[1].bytes_used = 6
shard_ranges[1].object_count = 3
shard_ranges[1].state = ShardRange.CLEAVED
shard_ranges[2].bytes_used = 100
shard_ranges[2].object_count = 1
shard_ranges[2].state = ShardRange.CLEAVED
for i in range(0, 3):
with annotate_failure(i):
self._check_shard_range(
shard_ranges[i], updated_shard_ranges[i])
self._check_objects(objects[2:5], expected_shard_dbs[1])
self._check_objects(objects[5:6], expected_shard_dbs[2])
# other shard ranges should be unchanged
self.assertFalse(os.path.exists(expected_shard_dbs[0]))
for i, db in enumerate(expected_shard_dbs[3:], 3):
with annotate_failure(i):
self.assertFalse(os.path.exists(db))
for i, updated_shard_range in enumerate(updated_shard_ranges[3:], 3):
with annotate_failure(i):
self.assertEqual(dict(shard_ranges[i]),
dict(updated_shard_range))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual('where', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(3, context.ranges_done)
self.assertEqual(1, context.ranges_todo)
unlink_files(expected_shard_dbs)
# run cleave again - should process the fourth range
with self._mock_sharder(conf=conf) as sharder:
self.assertFalse(sharder._cleave(broker))
expected = {'attempted': 1, 'success': 1, 'failure': 0,
'min_time': mock.ANY, 'max_time': mock.ANY}
stats = self._assert_stats(expected, sharder, 'cleaved')
self.assertIsInstance(stats['min_time'], float)
self.assertIsInstance(stats['max_time'], float)
self.assertLessEqual(stats['min_time'], stats['max_time'])
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_called_once_with(
0, expected_shard_dbs[3], 0)
shard_broker = ContainerBroker(expected_shard_dbs[3])
shard_own_sr = shard_broker.get_own_shard_range()
self.assertEqual(ShardRange.CLEAVED, shard_own_sr.state)
shard_info = shard_broker.get_info()
total_shard_stats['object_count'] += shard_info['object_count']
total_shard_stats['bytes_used'] += shard_info['bytes_used']
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(4, len(updated_shard_ranges))
shard_ranges[3].bytes_used = 1000
shard_ranges[3].object_count = 1
shard_ranges[3].state = ShardRange.CLEAVED
for i in range(0, 4):
with annotate_failure(i):
self._check_shard_range(
shard_ranges[i], updated_shard_ranges[i])
# NB includes the deleted object
self._check_objects(objects[6:8], expected_shard_dbs[3])
# other shard ranges should be unchanged
for i, db in enumerate(expected_shard_dbs[:3]):
with annotate_failure(i):
self.assertFalse(os.path.exists(db))
self.assertFalse(os.path.exists(expected_shard_dbs[4]))
for i, updated_shard_range in enumerate(updated_shard_ranges[4:], 4):
with annotate_failure(i):
self.assertEqual(dict(shard_ranges[i]),
dict(updated_shard_range))
self.assertFalse(os.path.exists(expected_shard_dbs[4]))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual('yonder', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(4, context.ranges_done)
self.assertEqual(0, context.ranges_todo)
unlink_files(expected_shard_dbs)
# run cleave - should be a no-op, all existing ranges have been cleaved
with self._mock_sharder(conf=conf) as sharder:
self.assertFalse(sharder._cleave(broker))
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_not_called()
# add final shard range - move this to ACTIVE state and update stats to
# simulate another replica having cleaved it and replicated its state
shard_ranges[4].update_state(ShardRange.ACTIVE)
shard_ranges[4].update_meta(2, 15)
broker.merge_shard_ranges(shard_ranges[4:])
with self._mock_sharder(conf=conf) as sharder:
self.assertTrue(sharder._cleave(broker))
expected = {'attempted': 1, 'success': 1, 'failure': 0,
'min_time': mock.ANY, 'max_time': mock.ANY}
stats = self._assert_stats(expected, sharder, 'cleaved')
self.assertIsInstance(stats['min_time'], float)
self.assertIsInstance(stats['max_time'], float)
self.assertLessEqual(stats['min_time'], stats['max_time'])
sharder._replicate_object.assert_called_once_with(
0, expected_shard_dbs[4], 0)
shard_broker = ContainerBroker(expected_shard_dbs[4])
shard_own_sr = shard_broker.get_own_shard_range()
self.assertEqual(ShardRange.ACTIVE, shard_own_sr.state)
shard_info = shard_broker.get_info()
total_shard_stats['object_count'] += shard_info['object_count']
total_shard_stats['bytes_used'] += shard_info['bytes_used']
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(5, len(updated_shard_ranges))
# NB stats of the ACTIVE shard range should not be reset by cleaving
for i in range(0, 4):
with annotate_failure(i):
self._check_shard_range(
shard_ranges[i], updated_shard_ranges[i])
self.assertEqual(dict(shard_ranges[4]), dict(updated_shard_ranges[4]))
# object copied to shard
self._check_objects(objects[8:], expected_shard_dbs[4])
# other shard ranges should be unchanged
for i, db in enumerate(expected_shard_dbs[:4]):
with annotate_failure(i):
self.assertFalse(os.path.exists(db))
self.assertEqual(initial_root_info['object_count'],
total_shard_stats['object_count'])
self.assertEqual(initial_root_info['bytes_used'],
total_shard_stats['bytes_used'])
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertTrue(context.cleaving_done)
self.assertTrue(context.done())
self.assertEqual('', context.cursor)
self.assertEqual(9, context.cleave_to_row)
self.assertEqual(9, context.max_row)
self.assertEqual(5, context.ranges_done)
self.assertEqual(0, context.ranges_todo)
with self._mock_sharder(conf=conf) as sharder:
self.assertTrue(sharder._cleave(broker))
sharder._replicate_object.assert_not_called()
self.assertTrue(broker.set_sharded_state())
# run cleave - should be a no-op
with self._mock_sharder(conf=conf) as sharder:
self.assertTrue(sharder._cleave(broker))
sharder._replicate_object.assert_not_called()
def test_cleave_root(self):
self._check_cleave_root()
def test_cleave_root_listing_limit_one(self):
# force yield_objects to update its marker and call to the broker's
# get_objects() for each shard range, to check the marker moves on
self._check_cleave_root(conf={'cleave_row_batch_size': 1})
def test_cleave_root_ranges_change(self):
# verify that objects are not missed if shard ranges change between
# cleaving batches
broker = self._make_broker()
objects = [
('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0),
('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0),
('c', self.ts_encoded(), 1, 'text/plain', 'etag_c', 0, 0),
('d', self.ts_encoded(), 2, 'text/plain', 'etag_d', 0, 0),
('e', self.ts_encoded(), 3, 'text/plain', 'etag_e', 0, 0),
('f', self.ts_encoded(), 100, 'text/plain', 'etag_f', 0, 0),
('x', self.ts_encoded(), 0, '', '', 1, 0), # deleted
('z', self.ts_encoded(), 1000, 'text/plain', 'etag_z', 0, 0)
]
for obj in objects:
broker.put_object(*obj)
broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED)
expected_shard_dbs = []
for shard_range in shard_ranges:
db_hash = hash_path(shard_range.account, shard_range.container)
expected_shard_dbs.append(
os.path.join(self.tempdir, 'sda', 'containers', '0',
db_hash[-3:], db_hash, db_hash + '.db'))
broker.merge_shard_ranges(shard_ranges[:3])
self.assertTrue(broker.set_sharding_state())
# run cleave - first batch is cleaved
with self._mock_sharder() as sharder:
self.assertFalse(sharder._cleave(broker))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual(shard_ranges[1].upper_str, context.cursor)
self.assertEqual(8, context.cleave_to_row)
self.assertEqual(8, context.max_row)
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_has_calls(
[mock.call(0, db, 0) for db in expected_shard_dbs[:2]]
)
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(3, len(updated_shard_ranges))
# first 2 shard ranges should have updated object count, bytes used and
# meta_timestamp
shard_ranges[0].bytes_used = 23
shard_ranges[0].object_count = 4
shard_ranges[0].state = ShardRange.CLEAVED
self._check_shard_range(shard_ranges[0], updated_shard_ranges[0])
shard_ranges[1].bytes_used = 103
shard_ranges[1].object_count = 2
shard_ranges[1].state = ShardRange.CLEAVED
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
self._check_objects(objects[:4], expected_shard_dbs[0])
self._check_objects(objects[4:7], expected_shard_dbs[1])
self.assertFalse(os.path.exists(expected_shard_dbs[2]))
# third shard range should be unchanged - not yet cleaved
self.assertEqual(dict(shard_ranges[2]),
dict(updated_shard_ranges[2]))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.done())
self.assertEqual(shard_ranges[1].upper_str, context.cursor)
self.assertEqual(8, context.cleave_to_row)
self.assertEqual(8, context.max_row)
# now change the shard ranges so that third consumes second
shard_ranges[1].set_deleted()
shard_ranges[2].lower = 'd'
shard_ranges[2].timestamp = Timestamp.now()
broker.merge_shard_ranges(shard_ranges[1:3])
# run cleave - should process the extended third (final) range
with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker))
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_called_once_with(
0, expected_shard_dbs[2], 0)
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(2, len(updated_shard_ranges))
self._check_shard_range(shard_ranges[0], updated_shard_ranges[0])
# third shard range should now have updated object count, bytes used,
# including objects previously in the second shard range
shard_ranges[2].bytes_used = 1103
shard_ranges[2].object_count = 3
shard_ranges[2].state = ShardRange.CLEAVED
self._check_shard_range(shard_ranges[2], updated_shard_ranges[1])
self._check_objects(objects[4:8], expected_shard_dbs[2])
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertTrue(context.cleaving_done)
self.assertTrue(context.done())
self.assertEqual(shard_ranges[2].upper_str, context.cursor)
self.assertEqual(8, context.cleave_to_row)
self.assertEqual(8, context.max_row)
def test_cleave_root_empty_db_with_ranges(self):
broker = self._make_broker()
broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
sharder_conf = {'cleave_batch_size': 1}
with self._mock_sharder(sharder_conf) as sharder:
self.assertTrue(sharder._cleave(broker))
info_lines = sharder.logger.get_lines_for_level('info')
expected_zero_obj = [line for line in info_lines
if " - zero objects found" in line]
self.assertEqual(len(expected_zero_obj), len(shard_bounds))
cleaving_context = CleavingContext.load(broker)
# even though there is a cleave_batch_size of 1, we don't count empty
# ranges when cleaving seeing as they aren't replicated
self.assertEqual(cleaving_context.ranges_done, 3)
self.assertEqual(cleaving_context.ranges_todo, 0)
self.assertTrue(cleaving_context.cleaving_done)
def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self):
broker = self._make_broker()
broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED)
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
sharder_conf = {'cleave_batch_size': 1}
with self._mock_sharder(sharder_conf) as sharder:
# pre-create a shard broker on a handoff location. This will force
# the sharder to not skip it but instead force to replicate it and
# use up a cleave_batch_size count.
sharder._get_shard_broker(shard_ranges[0], broker.root_path,
0)
self.assertFalse(sharder._cleave(broker))
info_lines = sharder.logger.get_lines_for_level('info')
expected_zero_obj = [line for line in info_lines
if " - zero objects found" in line]
self.assertEqual(len(expected_zero_obj), 1)
cleaving_context = CleavingContext.load(broker)
# even though there is a cleave_batch_size of 1, we don't count empty
# ranges when cleaving seeing as they aren't replicated
self.assertEqual(cleaving_context.ranges_done, 1)
self.assertEqual(cleaving_context.ranges_todo, 2)
self.assertFalse(cleaving_context.cleaving_done)
def test_cleave_shard(self):
broker = self._make_broker(account='.shards_a', container='shard_c')
own_shard_range = ShardRange(
broker.path, Timestamp.now(), 'here', 'where',
state=ShardRange.SHARDING, epoch=Timestamp.now())
broker.merge_shard_ranges([own_shard_range])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
objects = [
('m', self.ts_encoded(), 1, 'text/plain', 'etag_m', 0, 0),
('n', self.ts_encoded(), 2, 'text/plain', 'etag_n', 0, 0),
('there', self.ts_encoded(), 3, 'text/plain', 'etag_there', 0, 0),
('where', self.ts_encoded(), 100, 'text/plain', 'etag_where', 0,
0),
]
misplaced_objects = [
('a', self.ts_encoded(), 1, 'text/plain', 'etag_a', 0, 0),
('z', self.ts_encoded(), 100, 'text/plain', 'etag_z', 1, 0),
]
for obj in objects + misplaced_objects:
broker.put_object(*obj)
shard_bounds = (('here', 'there'),
('there', 'where'))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED)
expected_shard_dbs = []
for shard_range in shard_ranges:
db_hash = hash_path(shard_range.account, shard_range.container)
expected_shard_dbs.append(
os.path.join(self.tempdir, 'sda', 'containers', '0',
db_hash[-3:], db_hash, db_hash + '.db'))
misplaced_bounds = (('', 'here'),
('where', ''))
misplaced_ranges = self._make_shard_ranges(
misplaced_bounds, state=ShardRange.ACTIVE)
misplaced_dbs = []
for shard_range in misplaced_ranges:
db_hash = hash_path(shard_range.account, shard_range.container)
misplaced_dbs.append(
os.path.join(self.tempdir, 'sda', 'containers', '0',
db_hash[-3:], db_hash, db_hash + '.db'))
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
# run cleave - first range is cleaved but move of misplaced objects is
# not successful
sharder_conf = {'cleave_batch_size': 1}
with self._mock_sharder(sharder_conf) as sharder:
with mock.patch.object(
sharder, '_make_shard_range_fetcher',
return_value=lambda: iter(misplaced_ranges)):
# cause misplaced objects replication to not succeed
quorum = quorum_size(sharder.ring.replica_count)
successes = [True] * (quorum - 1)
fails = [False] * (sharder.ring.replica_count - len(successes))
responses = successes + fails
random.shuffle(responses)
bad_result = (False, responses)
ok_result = (True, [True] * sharder.ring.replica_count)
sharder._replicate_object = mock.MagicMock(
# result for misplaced, misplaced, cleave
side_effect=(bad_result, ok_result, ok_result))
self.assertFalse(sharder._cleave(broker))
context = CleavingContext.load(broker)
self.assertFalse(context.misplaced_done)
self.assertFalse(context.cleaving_done)
self.assertEqual(shard_ranges[0].upper_str, context.cursor)
self.assertEqual(6, context.cleave_to_row)
self.assertEqual(6, context.max_row)
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_has_calls(
[mock.call(0, misplaced_dbs[0], 0),
mock.call(0, misplaced_dbs[1], 0),
mock.call(0, expected_shard_dbs[0], 0)])
shard_broker = ContainerBroker(expected_shard_dbs[0])
# NB cleaving a shard, state goes to CLEAVED not ACTIVE
shard_own_sr = shard_broker.get_own_shard_range()
self.assertEqual(ShardRange.CLEAVED, shard_own_sr.state)
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(2, len(updated_shard_ranges))
# first shard range should have updated object count, bytes used and
# meta_timestamp
shard_ranges[0].bytes_used = 6
shard_ranges[0].object_count = 3
shard_ranges[0].state = ShardRange.CLEAVED
self._check_shard_range(shard_ranges[0], updated_shard_ranges[0])
self._check_objects(objects[:3], expected_shard_dbs[0])
self.assertFalse(os.path.exists(expected_shard_dbs[1]))
self._check_objects(misplaced_objects[:1], misplaced_dbs[0])
self._check_objects(misplaced_objects[1:], misplaced_dbs[1])
unlink_files(expected_shard_dbs)
unlink_files(misplaced_dbs)
# run cleave - second (final) range is cleaved; move this range to
# CLEAVED state and update stats to simulate another replica having
# cleaved it and replicated its state
shard_ranges[1].update_state(ShardRange.CLEAVED)
shard_ranges[1].update_meta(2, 15)
broker.merge_shard_ranges(shard_ranges[1:2])
with self._mock_sharder(sharder_conf) as sharder:
with mock.patch.object(
sharder, '_make_shard_range_fetcher',
return_value=lambda: iter(misplaced_ranges)):
self.assertTrue(sharder._cleave(broker))
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertTrue(context.cleaving_done)
self.assertEqual(shard_ranges[1].upper_str, context.cursor)
self.assertEqual(6, context.cleave_to_row)
self.assertEqual(6, context.max_row)
self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_has_calls(
[mock.call(0, misplaced_dbs[0], 0),
mock.call(0, expected_shard_dbs[1], 0)])
shard_broker = ContainerBroker(expected_shard_dbs[1])
shard_own_sr = shard_broker.get_own_shard_range()
self.assertEqual(ShardRange.CLEAVED, shard_own_sr.state)
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(2, len(updated_shard_ranges))
# second shard range should have updated object count, bytes used and
# meta_timestamp
self.assertEqual(dict(shard_ranges[1]), dict(updated_shard_ranges[1]))
self._check_objects(objects[3:], expected_shard_dbs[1])
self.assertFalse(os.path.exists(expected_shard_dbs[0]))
self._check_objects(misplaced_objects[:1], misplaced_dbs[0])
self.assertFalse(os.path.exists(misplaced_dbs[1]))
def test_cleave_shard_shrinking(self):
unique = [0]
def do_test(acceptor_state, acceptor_bounds, expect_delete,
exp_progress_bounds=None):
# 'unique' ensures fresh dbs on each test iteration
unique[0] += 1
broker = self._make_broker(account='.shards_a',
container='donor_%s' % unique[0])
own_shard_range = ShardRange(
broker.path, next(self.ts_iter), 'h', 'w',
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
broker.merge_shard_ranges([own_shard_range])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
objects = [
('i', self.ts_encoded(), 3, 'text/plain', 'etag_t', 0, 0),
('m', self.ts_encoded(), 33, 'text/plain', 'etag_m', 0, 0),
('w', self.ts_encoded(), 100, 'text/plain', 'etag_w', 0, 0),
]
for obj in objects:
broker.put_object(*obj)
acceptor_epoch = next(self.ts_iter)
acceptors = [
ShardRange('.shards_a/acceptor_%s_%s' % (unique[0], bounds[1]),
Timestamp.now(), bounds[0], bounds[1],
'1000', '11111',
state=acceptor_state, epoch=acceptor_epoch)
for bounds in acceptor_bounds]
# by default expect cleaving to progress through all acceptors
if exp_progress_bounds is None:
exp_progress_acceptors = acceptors
else:
exp_progress_acceptors = [
ShardRange(
'.shards_a/acceptor_%s_%s' % (unique[0], bounds[1]),
Timestamp.now(), bounds[0], bounds[1], '1000', '11111',
state=acceptor_state, epoch=acceptor_epoch)
for bounds in exp_progress_bounds]
expected_acceptor_dbs = []
for acceptor in exp_progress_acceptors:
db_hash = hash_path(acceptor.account,
acceptor.container)
# NB expected cleaved db name includes acceptor epoch
db_name = '%s_%s.db' % (db_hash, acceptor_epoch.internal)
expected_acceptor_dbs.append(
os.path.join(self.tempdir, 'sda', 'containers', '0',
db_hash[-3:], db_hash, db_name))
broker.merge_shard_ranges(acceptors)
broker.set_sharding_state()
# run cleave
with mock_timestamp_now_with_iter(self.ts_iter):
with self._mock_sharder() as sharder:
sharder.cleave_batch_size = 3
self.assertEqual(expect_delete, sharder._cleave(broker))
# check the cleave context and source broker
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertEqual(expect_delete, context.cleaving_done)
if exp_progress_acceptors:
expected_cursor = exp_progress_acceptors[-1].upper_str
else:
expected_cursor = own_shard_range.lower_str
self.assertEqual(expected_cursor, context.cursor)
self.assertEqual(3, context.cleave_to_row)
self.assertEqual(3, context.max_row)
self.assertEqual(SHARDING, broker.get_db_state())
own_sr = broker.get_own_shard_range()
if expect_delete and len(acceptor_bounds) == 1:
self.assertTrue(own_sr.deleted)
self.assertEqual(ShardRange.SHRUNK, own_sr.state)
else:
self.assertFalse(own_sr.deleted)
self.assertEqual(ShardRange.SHRINKING, own_sr.state)
# check the acceptor db's
sharder._replicate_object.assert_has_calls(
[mock.call(0, acceptor_db, 0)
for acceptor_db in expected_acceptor_dbs])
for acceptor_db in expected_acceptor_dbs:
self.assertTrue(os.path.exists(acceptor_db))
# NB when *shrinking* a shard container then expect the
# acceptor broker's own shard range state to remain in the
# original state of the acceptor shard range rather than being
# set to CLEAVED as it would when *sharding*.
acceptor_broker = ContainerBroker(acceptor_db)
self.assertEqual(acceptor_state,
acceptor_broker.get_own_shard_range().state)
acceptor_ranges = acceptor_broker.get_shard_ranges(
include_deleted=True)
if expect_delete and len(acceptor_bounds) == 1:
# special case when deleted shrinking shard range is
# forwarded to single enclosing acceptor
self.assertEqual([own_sr], acceptor_ranges)
self.assertTrue(acceptor_ranges[0].deleted)
self.assertEqual(ShardRange.SHRUNK,
acceptor_ranges[0].state)
else:
self.assertEqual([], acceptor_ranges)
expected_objects = [
obj for obj in objects
if any(acceptor.lower < obj[0] <= acceptor.upper
for acceptor in exp_progress_acceptors)
]
self._check_objects(expected_objects, expected_acceptor_dbs)
# check that *shrinking* shard's copies of acceptor ranges are not
# updated as they would be if *sharding*
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual([dict(sr) for sr in acceptors],
[dict(sr) for sr in updated_shard_ranges])
# check that *shrinking* shard's copies of acceptor ranges are not
# updated when completing sharding as they would be if *sharding*
with mock_timestamp_now_with_iter(self.ts_iter):
sharder._complete_sharding(broker)
updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual([dict(sr) for sr in acceptors],
[dict(sr) for sr in updated_shard_ranges])
own_sr = broker.get_own_shard_range()
self.assertEqual(expect_delete, own_sr.deleted)
if expect_delete:
self.assertEqual(ShardRange.SHRUNK, own_sr.state)
else:
self.assertEqual(ShardRange.SHRINKING, own_sr.state)
# note: shrinking shard bounds are (h, w)
# shrinking to a single acceptor with enclosing namespace
expect_delete = True
do_test(ShardRange.CREATED, (('h', ''),), expect_delete)
do_test(ShardRange.CLEAVED, (('h', ''),), expect_delete)
do_test(ShardRange.ACTIVE, (('h', ''),), expect_delete)
# shrinking to multiple acceptors that enclose namespace
do_test(ShardRange.CREATED, (('d', 'k'), ('k', '')), expect_delete)
do_test(ShardRange.CLEAVED, (('d', 'k'), ('k', '')), expect_delete)
do_test(ShardRange.ACTIVE, (('d', 'k'), ('k', '')), expect_delete)
do_test(ShardRange.CLEAVED, (('d', 'k'), ('k', 't'), ('t', '')),
expect_delete)
do_test(ShardRange.CREATED, (('d', 'k'), ('k', 't'), ('t', '')),
expect_delete)
do_test(ShardRange.ACTIVE, (('d', 'k'), ('k', 't'), ('t', '')),
expect_delete)
# shrinking to incomplete acceptors, gap at end of namespace
expect_delete = False
do_test(ShardRange.CREATED, (('d', 'k'),), expect_delete)
do_test(ShardRange.CLEAVED, (('d', 'k'), ('k', 't')), expect_delete)
# shrinking to incomplete acceptors, gap at start and end of namespace
do_test(ShardRange.CREATED, (('k', 't'),), expect_delete,
exp_progress_bounds=())
# shrinking to incomplete acceptors, gap at start of namespace
do_test(ShardRange.CLEAVED, (('k', 't'), ('t', '')), expect_delete,
exp_progress_bounds=())
# shrinking to incomplete acceptors, gap in middle - some progress
do_test(ShardRange.CLEAVED, (('d', 'k'), ('t', '')), expect_delete,
exp_progress_bounds=(('d', 'k'),))
def test_cleave_repeated(self):
# verify that if new objects are merged into retiring db after cleaving
# started then cleaving will repeat but only new objects are cleaved
# in the repeated cleaving pass
broker = self._make_broker()
objects = [
('obj%03d' % i, next(self.ts_iter), 1, 'text/plain', 'etag', 0, 0)
for i in range(10)
]
new_objects = [
(name, next(self.ts_iter), 1, 'text/plain', 'etag', 0, 0)
for name in ('alpha', 'zeta')
]
for obj in objects:
broker.put_object(*obj)
broker._commit_puts()
broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'obj004'), ('obj004', ''))
shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED)
expected_shard_dbs = []
for shard_range in shard_ranges:
db_hash = hash_path(shard_range.account, shard_range.container)
expected_shard_dbs.append(
os.path.join(self.tempdir, 'sda', 'containers', '0',
db_hash[-3:], db_hash, db_hash + '.db'))
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.set_sharding_state())
old_broker = broker.get_brokers()[0]
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
'index': 0}
calls = []
key = ('name', 'created_at', 'size', 'content_type', 'etag', 'deleted')
def mock_replicate_object(part, db, node_id):
# merge new objects between cleave of first and second shard ranges
if not calls:
old_broker.merge_items(
[dict(zip(key, obj)) for obj in new_objects])
calls.append((part, db, node_id))
return True, [True, True, True]
with self._mock_sharder() as sharder:
sharder._audit_container = mock.MagicMock()
sharder._replicate_object = mock_replicate_object