Merge "sharder: always merge child shard ranges fetched from root"
This commit is contained in:
@@ -1201,6 +1201,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
shard range is found in ``shard_ranges``.
|
||||
"""
|
||||
own_shard_range_from_root = None
|
||||
children_shard_ranges = []
|
||||
other_shard_ranges = []
|
||||
for shard_range in shard_ranges:
|
||||
# look for this shard range in the list of shard ranges received
|
||||
@@ -1222,9 +1223,20 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self.logger.info(
|
||||
'Updated own shard range from %s to %s',
|
||||
orig_own_shard_range, own_shard_range)
|
||||
elif shard_range.is_child_of(own_shard_range):
|
||||
children_shard_ranges.append(shard_range)
|
||||
else:
|
||||
other_shard_ranges.append(shard_range)
|
||||
|
||||
if children_shard_ranges and not broker.is_sharded():
|
||||
# Merging shard ranges from the root is only necessary until this
|
||||
# DB is fully cleaved and reaches SHARDED DB state, after which it
|
||||
# is useful for debugging for the set of sub-shards to which a
|
||||
# shards has sharded to be frozen.
|
||||
self.logger.debug('Updating %d children shard ranges from root',
|
||||
len(children_shard_ranges))
|
||||
broker.merge_shard_ranges(children_shard_ranges)
|
||||
|
||||
if (other_shard_ranges and
|
||||
own_shard_range.state in ShardRange.SHRINKING_STATES):
|
||||
# If own_shard_range state is shrinking, save off *all* shards
|
||||
|
||||
@@ -1106,60 +1106,40 @@ class TestContainerSharding(BaseAutoContainerSharding):
|
||||
|
||||
# but third replica still has no idea it should be sharding
|
||||
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
||||
self.assertEqual(
|
||||
ShardRange.ACTIVE,
|
||||
ContainerBroker(
|
||||
found_for_shard['normal_dbs'][2]).get_own_shard_range().state)
|
||||
broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state)
|
||||
|
||||
# ...but once sharder runs on third replica it will learn its state;
|
||||
# note that any root replica on the stopped container server also won't
|
||||
# know about the shards being in sharding state, so leave that server
|
||||
# stopped for now so that shard fetches its state from an up-to-date
|
||||
# root replica
|
||||
# ...but once sharder runs on third replica it will learn its state and
|
||||
# fetch its sub-shard ranges durng audit; note that any root replica on
|
||||
# the stopped container server also won't know about the shards being
|
||||
# in sharding state, so leave that server stopped for now so that shard
|
||||
# fetches its state from an up-to-date root replica
|
||||
self.sharders.once(
|
||||
number=shard_1_nodes[2],
|
||||
additional_args='--partitions=%s' % shard_1_part)
|
||||
|
||||
# third replica is sharding but has no sub-shard ranges yet...
|
||||
# third replica is sharding and has sub-shard ranges so can start
|
||||
# cleaving...
|
||||
found_for_shard = self.categorize_container_dir_content(
|
||||
shard_1.account, shard_1.container)
|
||||
self.assertLengthEqual(found_for_shard['shard_dbs'], 2)
|
||||
self.assertLengthEqual(found_for_shard['shard_dbs'], 3)
|
||||
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
||||
broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('unsharded', broker.get_db_state())
|
||||
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('sharding', sharding_broker.get_db_state())
|
||||
self.assertEqual(
|
||||
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
||||
self.assertFalse(broker.get_shard_ranges())
|
||||
ShardRange.SHARDING, sharding_broker.get_own_shard_range().state)
|
||||
self.assertEqual(3, len(sharding_broker.get_shard_ranges()))
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
|
||||
# ...until sub-shard ranges are replicated from another shard replica;
|
||||
# there may also be a sub-shard replica missing so run replicators on
|
||||
# all nodes to fix that if necessary
|
||||
self.brain.servers.start(number=shard_1_nodes[2])
|
||||
self.replicators.once()
|
||||
|
||||
# Now that the replicators have all run, third replica sees cleaving
|
||||
# contexts for the first two
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 2)
|
||||
|
||||
# now run sharder again on third replica
|
||||
self.sharders.once(
|
||||
number=shard_1_nodes[2],
|
||||
additional_args='--partitions=%s' % shard_1_part)
|
||||
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('sharding', sharding_broker.get_db_state())
|
||||
|
||||
broker_id = broker.get_info()['id']
|
||||
# Old, unsharded DB doesn't have the context...
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 2)
|
||||
self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||
# ...but the sharding one does
|
||||
# contexts for the first two (plus its own cleaving context)
|
||||
contexts = list(CleavingContext.load_all(sharding_broker))
|
||||
self.assertEqual(len(contexts), 3)
|
||||
broker_id = broker.get_info()['id']
|
||||
self.assertIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||
|
||||
# check original first shard range state and sub-shards - all replicas
|
||||
|
||||
@@ -6026,6 +6026,155 @@ class TestSharder(BaseTestSharder):
|
||||
warning_lines[1])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_audit_shard_container_children_merged_while_sharding(self):
|
||||
# Verify that sharding shard will always merge children shard ranges
|
||||
def do_test(child_deleted, child_state):
|
||||
root_sr = ShardRange('a/root', next(self.ts_iter),
|
||||
state=ShardRange.SHARDED)
|
||||
parent_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', root_sr.container,
|
||||
next(self.ts_iter), 2)
|
||||
parent_sr = ShardRange(
|
||||
parent_path, next(self.ts_iter), 'a', 'd',
|
||||
state=ShardRange.SHARDING)
|
||||
child_srs = []
|
||||
for i, lower, upper in ((0, 'a', 'b'), (0, 'b', 'd')):
|
||||
child_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', parent_sr.container,
|
||||
next(self.ts_iter), i)
|
||||
child_sr = ShardRange(
|
||||
child_path, next(self.ts_iter), lower, upper,
|
||||
state=child_state, deleted=child_deleted)
|
||||
self.assertTrue(child_sr.is_child_of(parent_sr))
|
||||
child_srs.append(child_sr)
|
||||
other_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', root_sr.container,
|
||||
next(self.ts_iter), 3) # different index w.r.t. parent
|
||||
other_sr = ShardRange(
|
||||
other_path, next(self.ts_iter), 'a', 'd',
|
||||
state=ShardRange.ACTIVE)
|
||||
self.assertFalse(other_sr.is_child_of(parent_sr))
|
||||
|
||||
# the parent is sharding...
|
||||
broker = self._make_broker(account=parent_sr.account,
|
||||
container=parent_sr.container)
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
broker.merge_shard_ranges(parent_sr)
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
self.assertEqual([parent_sr],
|
||||
broker.get_shard_ranges(include_own=True))
|
||||
|
||||
ranges_from_root = child_srs + [parent_sr, root_sr, other_sr]
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, ranges_from_root)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'd',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers,
|
||||
acceptable_statuses=(2,), params=params)
|
||||
|
||||
self._assert_shard_ranges_equal(
|
||||
sorted(child_srs + [parent_sr],
|
||||
key=ShardRange.sort_key),
|
||||
sorted(broker.get_shard_ranges(
|
||||
include_own=True, include_deleted=True),
|
||||
key=ShardRange.sort_key))
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for child_deleted in (False, True):
|
||||
for child_state in ShardRange.STATES:
|
||||
with annotate_failure('deleted: %s, state: %s'
|
||||
% (child_deleted, child_state)):
|
||||
do_test(child_deleted, child_state)
|
||||
|
||||
def test_audit_shard_container_children_not_merged_once_sharded(self):
|
||||
# Verify that sharding shard will not merge children shard ranges
|
||||
# once the DB is sharded (but continues to merge own shard range
|
||||
# received from root)
|
||||
root_sr = ShardRange('a/root', next(self.ts_iter),
|
||||
state=ShardRange.SHARDED)
|
||||
ts = next(self.ts_iter)
|
||||
parent_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', root_sr.container, ts, 2)
|
||||
parent_sr = ShardRange(
|
||||
parent_path, ts, 'a', 'b', state=ShardRange.ACTIVE, epoch=ts)
|
||||
child_srs = []
|
||||
for i, lower, upper in ((0, 'a', 'ab'), (0, 'ab', 'b')):
|
||||
child_path = ShardRange.make_path(
|
||||
'.shards_a', 'root', parent_sr.container,
|
||||
next(self.ts_iter), i)
|
||||
child_sr = ShardRange(
|
||||
child_path, next(self.ts_iter), lower, upper,
|
||||
state=ShardRange.CLEAVED)
|
||||
self.assertTrue(child_sr.is_child_of(parent_sr))
|
||||
child_srs.append(child_sr)
|
||||
|
||||
# DB is unsharded...
|
||||
broker = self._make_broker(account=parent_sr.account,
|
||||
container=parent_sr.container)
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
broker.merge_shard_ranges(parent_sr)
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
|
||||
self.assertTrue(parent_sr.update_state(
|
||||
ShardRange.SHARDING, state_timestamp=next(self.ts_iter)))
|
||||
ranges_from_root = child_srs + [parent_sr, root_sr]
|
||||
sharder, _ = self.call_audit_container(broker, ranges_from_root)
|
||||
|
||||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
# DB is sharding...
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
parent_sr.state_timestamp = next(self.ts_iter)
|
||||
for child_sr in child_srs:
|
||||
child_sr.update_state(ShardRange.ACTIVE,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
|
||||
sharder, _ = self.call_audit_container(broker, ranges_from_root)
|
||||
|
||||
# children ranges from root are merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
# DB is sharded...
|
||||
self.assertTrue(broker.set_sharded_state())
|
||||
self.assertEqual(SHARDED, broker.get_db_state())
|
||||
self.assertTrue(parent_sr.update_state(
|
||||
ShardRange.SHARDED, state_timestamp=next(self.ts_iter)))
|
||||
updated_child_srs = [
|
||||
child_sr.copy(state=ShardRange.SHARDING,
|
||||
state_timestamp=next(self.ts_iter))
|
||||
for child_sr in child_srs]
|
||||
|
||||
ranges_from_root = updated_child_srs + [parent_sr, root_sr]
|
||||
sharder, _ = self.call_audit_container(broker, ranges_from_root)
|
||||
|
||||
# children ranges from root are NOT merged
|
||||
self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges())
|
||||
# own sr from root is merged
|
||||
self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY),
|
||||
dict(broker.get_own_shard_range()))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
def test_audit_shard_deleted_range_in_root_container(self):
|
||||
# verify that shard DB is marked deleted when its own shard range is
|
||||
# updated with deleted version from root
|
||||
|
||||
Reference in New Issue
Block a user