Merge "sharder: always get ranges from root while shrinking"
This commit is contained in:
commit
3ad39cd0b8
@ -5316,6 +5316,7 @@ class ShardRange(object):
|
||||
SHARDED: 'sharded',
|
||||
SHRUNK: 'shrunk'}
|
||||
STATES_BY_NAME = dict((v, k) for k, v in STATES.items())
|
||||
SHRINKING_STATES = (SHRINKING, SHRUNK)
|
||||
|
||||
@functools.total_ordering
|
||||
class MaxBound(ShardRangeOuterBound):
|
||||
|
@ -1219,15 +1219,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
if (orig_own_shard_range != own_shard_range or
|
||||
orig_own_shard_range.state != own_shard_range.state):
|
||||
self.logger.debug(
|
||||
self.logger.info(
|
||||
'Updated own shard range from %s to %s',
|
||||
orig_own_shard_range, own_shard_range)
|
||||
else:
|
||||
other_shard_ranges.append(shard_range)
|
||||
|
||||
if (other_shard_ranges and own_shard_range_from_root and
|
||||
own_shard_range.state in
|
||||
(ShardRange.SHRINKING, ShardRange.SHRUNK)):
|
||||
if (other_shard_ranges and
|
||||
own_shard_range.state in ShardRange.SHRINKING_STATES):
|
||||
# If own_shard_range state is shrinking, save off *all* shards
|
||||
# returned because these may contain shards into which this
|
||||
# shard is to shrink itself; shrinking is the only case when we
|
||||
@ -1259,7 +1258,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
own_shard_range.timestamp < delete_age and
|
||||
broker.empty()):
|
||||
broker.delete_db(Timestamp.now().internal)
|
||||
self.logger.debug('Deleted shard container %s (%s)',
|
||||
self.logger.debug('Marked shard container as deleted %s (%s)',
|
||||
broker.db_file, quote(broker.path))
|
||||
|
||||
def _do_audit_shard_container(self, broker):
|
||||
@ -1796,7 +1795,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
quote(broker.path), shard_range)
|
||||
|
||||
replication_quorum = self.existing_shard_replication_quorum
|
||||
if own_shard_range.state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
|
||||
if own_shard_range.state in ShardRange.SHRINKING_STATES:
|
||||
if shard_range.includes(own_shard_range):
|
||||
# When shrinking to a single acceptor that completely encloses
|
||||
# this shard's namespace, include deleted own (donor) shard
|
||||
@ -2001,8 +2000,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
quote(broker.path))
|
||||
return False
|
||||
own_shard_range.update_meta(0, 0)
|
||||
if own_shard_range.state in (ShardRange.SHRINKING,
|
||||
ShardRange.SHRUNK):
|
||||
if own_shard_range.state in ShardRange.SHRINKING_STATES:
|
||||
own_shard_range.update_state(ShardRange.SHRUNK)
|
||||
modified_shard_ranges = []
|
||||
else:
|
||||
|
@ -45,7 +45,8 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
|
||||
is_sharding_candidate, find_paths, rank_paths, ContainerSharderConf, \
|
||||
find_paths_with_gaps
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
|
||||
encode_timestamps, parse_db_filename, quorum_size, Everything, md5, \
|
||||
ShardName
|
||||
from test import annotate_failure
|
||||
|
||||
from test.debug_logger import debug_logger
|
||||
@ -5605,10 +5606,17 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(shard_ranges[1].update_state(ShardRange.ACTIVE,
|
||||
state_timestamp=root_ts))
|
||||
shard_ranges[1].timestamp = root_ts
|
||||
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
|
||||
with mock_timestamp_now() as ts_now:
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertEqual(['Updating own shard range from root', mock.ANY],
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(broker.is_deleted())
|
||||
@ -5695,19 +5703,27 @@ class TestSharder(BaseTestSharder):
|
||||
# make own shard range match one in root, but different state
|
||||
own_ts = next(self.ts_iter)
|
||||
shard_ranges[1].timestamp = own_ts
|
||||
broker.merge_shard_ranges([shard_ranges[1]])
|
||||
own_shard_range = shard_ranges[1].copy()
|
||||
broker.merge_shard_ranges([own_shard_range])
|
||||
root_ts = next(self.ts_iter)
|
||||
shard_ranges[1].update_state(ShardRange.SHARDING,
|
||||
state_timestamp=root_ts)
|
||||
sharder, mock_swift = self.call_audit_container(broker, shard_ranges)
|
||||
with mock_timestamp_now() as ts_now:
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, shard_ranges)
|
||||
self.assert_no_audit_messages(sharder, mock_swift)
|
||||
self.assertFalse(broker.is_deleted())
|
||||
self.assertEqual(['Updating own shard range from root'],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
own_shard_range.meta_timestamp = ts_now
|
||||
expected = shard_ranges[1].copy(meta_timestamp=ts_now)
|
||||
self.assertEqual(['Updated own shard range from %s to %s'
|
||||
% (own_shard_range, expected)],
|
||||
sharder.logger.get_lines_for_level('info'))
|
||||
# own shard range state is updated from root version
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
self.assertEqual(ShardRange.SHARDING, own_shard_range.state)
|
||||
self.assertEqual(root_ts, own_shard_range.state_timestamp)
|
||||
self.assertEqual(['Updating own shard range from root', mock.ANY],
|
||||
sharder.logger.get_lines_for_level('debug'))
|
||||
|
||||
own_shard_range.update_state(ShardRange.SHARDED,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
@ -5834,60 +5850,181 @@ class TestSharder(BaseTestSharder):
|
||||
self._do_test_audit_shard_container_merge_other_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
|
||||
def _do_test_audit_shard_container_with_root_ranges(self, *args):
|
||||
# shards may merge acceptors and the root range when shrinking; verify
|
||||
# that shard audit is ok with merged ranges
|
||||
def check_audit(own_state, acceptor_state, root_state):
|
||||
broker = self._make_broker(
|
||||
account='.shards_a',
|
||||
container='shard_c_%s' % next(self.ts_iter).normal)
|
||||
broker.set_sharding_sysmeta(*args)
|
||||
own_sr = broker.get_own_shard_range().copy(
|
||||
state=own_state, state_timestamp=next(self.ts_iter),
|
||||
lower='a', upper='b', timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
def _assert_merge_into_shard(self, own_shard_range, shard_ranges,
|
||||
root_shard_ranges, expected, *args):
|
||||
# create a shard broker, initialise with shard_ranges, run audit on it
|
||||
# supplying given root_shard_ranges and verify that the broker ends up
|
||||
# with expected shard ranges.
|
||||
broker = self._make_broker(account=own_shard_range.account,
|
||||
container=own_shard_range.container)
|
||||
broker.set_sharding_sysmeta(*args)
|
||||
broker.merge_shard_ranges([own_shard_range] + shard_ranges)
|
||||
self.assertFalse(broker.is_root_container())
|
||||
|
||||
# make acceptor and root ranges that overlap with the shard
|
||||
overlaps = self._make_shard_ranges([('a', 'c'), ('', '')],
|
||||
[acceptor_state, root_state])
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, [own_sr] + overlaps)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
if own_state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
|
||||
# check acceptor & root are merged into audited shard
|
||||
self.assertEqual(
|
||||
[dict(sr) for sr in overlaps],
|
||||
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||
return sharder
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, root_shard_ranges)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
|
||||
def assert_ok(own_state, acceptor_state, root_state):
|
||||
sharder = check_audit(own_state, acceptor_state, root_state)
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_shard_ranges_equal(expected, broker.get_shard_ranges())
|
||||
self.assertEqual(own_shard_range,
|
||||
broker.get_own_shard_range(no_default=True))
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
return sharder
|
||||
|
||||
def _do_test_audit_shard_root_ranges_not_merged(self, *args):
|
||||
# Make root and other ranges that fully contain the shard namespace...
|
||||
root_own_sr = ShardRange('a/c', next(self.ts_iter))
|
||||
acceptor = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c',
|
||||
next(self.ts_iter), 1)),
|
||||
next(self.ts_iter), 'a', 'c')
|
||||
|
||||
def do_test(own_state, acceptor_state, root_state):
|
||||
acceptor_from_root = acceptor.copy(
|
||||
timestamp=next(self.ts_iter), state=acceptor_state)
|
||||
root_from_root = root_own_sr.copy(
|
||||
timestamp=next(self.ts_iter), state=root_state)
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
own_sr_name = ShardName.create(
|
||||
'.shards_a', 'c', 'c', next(self.ts_iter), 0)
|
||||
own_sr = ShardRange(
|
||||
str(own_sr_name), next(self.ts_iter), state=own_state,
|
||||
state_timestamp=next(self.ts_iter), lower='a', upper='b')
|
||||
expected = existing = []
|
||||
sharder = self._assert_merge_into_shard(
|
||||
own_sr, existing,
|
||||
[own_sr, acceptor_from_root, root_from_root],
|
||||
expected, *args)
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.STATES:
|
||||
if own_state in ShardRange.SHRINKING_STATES:
|
||||
# shrinking states are covered by other tests
|
||||
continue
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
assert_ok(own_state, acceptor_state, root_state)
|
||||
do_test(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_old_style_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Root', 'a/c')
|
||||
def test_audit_old_style_shard_root_ranges_not_merged_not_shrinking(self):
|
||||
# verify that other shard ranges from root are NOT merged into shard
|
||||
# when it is NOT in a shrinking state
|
||||
self._do_test_audit_shard_root_ranges_not_merged('Root', 'a/c')
|
||||
|
||||
def test_audit_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
def test_audit_shard_root_ranges_not_merged_not_shrinking(self):
|
||||
# verify that other shard ranges from root are NOT merged into shard
|
||||
# when it is NOT in a shrinking state
|
||||
self._do_test_audit_shard_root_ranges_not_merged('Quoted-Root', 'a/c')
|
||||
|
||||
def test_audit_shard_root_ranges_with_own_merged_while_shrinking(self):
|
||||
# Verify that shrinking shard will merge root and other ranges,
|
||||
# including root range.
|
||||
# Make root and other ranges that fully contain the shard namespace...
|
||||
root_own_sr = ShardRange('a/c', next(self.ts_iter))
|
||||
acceptor = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c',
|
||||
next(self.ts_iter), 1)),
|
||||
next(self.ts_iter), 'a', 'c')
|
||||
|
||||
def do_test(own_state, acceptor_state, root_state):
|
||||
acceptor_from_root = acceptor.copy(
|
||||
timestamp=next(self.ts_iter), state=acceptor_state)
|
||||
root_from_root = root_own_sr.copy(
|
||||
timestamp=next(self.ts_iter), state=root_state)
|
||||
ts = next(self.ts_iter)
|
||||
own_sr = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
|
||||
ts, lower='a', upper='b', state=own_state, state_timestamp=ts)
|
||||
expected = [acceptor_from_root, root_from_root]
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
sharder = self._assert_merge_into_shard(
|
||||
own_sr, [],
|
||||
# own sr is in ranges fetched from root
|
||||
[own_sr, acceptor_from_root, root_from_root],
|
||||
expected, 'Quoted-Root', 'a/c')
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.SHRINKING_STATES:
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
do_test(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_shard_root_ranges_missing_own_merged_while_shrinking(self):
|
||||
# Verify that shrinking shard will merge root and other ranges,
|
||||
# including root range.
|
||||
# Make root and other ranges that fully contain the shard namespace...
|
||||
root_own_sr = ShardRange('a/c', next(self.ts_iter))
|
||||
acceptor = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c',
|
||||
next(self.ts_iter), 1)),
|
||||
next(self.ts_iter), 'a', 'c')
|
||||
|
||||
def do_test(own_state, acceptor_state, root_state):
|
||||
acceptor_from_root = acceptor.copy(
|
||||
timestamp=next(self.ts_iter), state=acceptor_state)
|
||||
root_from_root = root_own_sr.copy(
|
||||
timestamp=next(self.ts_iter), state=root_state)
|
||||
ts = next(self.ts_iter)
|
||||
own_sr = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
|
||||
ts, lower='a', upper='b', state=own_state, state_timestamp=ts)
|
||||
expected = [acceptor_from_root, root_from_root]
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
sharder = self._assert_merge_into_shard(
|
||||
own_sr, [],
|
||||
# own sr is NOT in ranges fetched from root
|
||||
[acceptor_from_root, root_from_root],
|
||||
expected, 'Quoted-Root', 'a/c')
|
||||
warning_lines = sharder.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(1, len(warning_lines))
|
||||
self.assertIn('root has no matching shard range',
|
||||
warning_lines[0])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.SHRINKING_STATES:
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
do_test(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_shard_root_ranges_fetch_fails_while_shrinking(self):
|
||||
# check audit copes with failed response while shard is shrinking
|
||||
ts = next(self.ts_iter)
|
||||
own_sr = ShardRange(
|
||||
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
|
||||
ts, lower='a', upper='b', state=ShardRange.SHRINKING,
|
||||
state_timestamp=ts)
|
||||
broker = self._make_broker(account=own_sr.account,
|
||||
container=own_sr.container)
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
broker.merge_shard_ranges(own_sr)
|
||||
self.assertFalse(broker.is_root_container())
|
||||
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, [], exc=internal_client.UnexpectedResponse('bad', 'resp'))
|
||||
self.assertEqual([], broker.get_shard_ranges())
|
||||
self.assertEqual(own_sr, broker.get_own_shard_range(no_default=True))
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
warning_lines = sharder.logger.get_lines_for_level('warning')
|
||||
self.assertEqual(2, len(warning_lines))
|
||||
self.assertIn('Failed to get shard ranges from a/c: bad',
|
||||
warning_lines[0])
|
||||
self.assertIn('unable to get shard ranges from root',
|
||||
warning_lines[1])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_audit_shard_deleted_range_in_root_container(self):
|
||||
# verify that shard DB is marked deleted when its own shard range is
|
||||
|
Loading…
Reference in New Issue
Block a user