From a46f2324ab926ae71d3a4d3214267ffd82fd1ee8 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Mon, 19 Sep 2022 14:33:20 +0100 Subject: [PATCH] sharder: always merge child shard ranges fetched from root While the sharder is auditing shard container DBs it would previously only merge shard ranges fetched from root into the shard DB if the shard was shrinking; shrinking is the only time when a shard normally *must* receive sub-shards from the root. With this patch the sharder will also merge shard ranges fetched from the root if they are known to be the children of the shard, regardless of the state of the shard. Children shard ranges would previously only have been merged during replication with peers of the shard; merging shard-ranges from the root during audit potentially speeds their propagation to peers that have yet to replicate. Change-Id: I57aafc537ff94b081d0e1ea70e7fb7dd3598c61e --- swift/container/sharder.py | 12 +++ test/probe/test_sharder.py | 52 +++------- test/unit/container/test_sharder.py | 149 ++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 36 deletions(-) diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 28b3e3c532..466325a100 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -1201,6 +1201,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): shard range is found in ``shard_ranges``. """ own_shard_range_from_root = None + children_shard_ranges = [] other_shard_ranges = [] for shard_range in shard_ranges: # look for this shard range in the list of shard ranges received @@ -1222,9 +1223,20 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator): self.logger.info( 'Updated own shard range from %s to %s', orig_own_shard_range, own_shard_range) + elif shard_range.is_child_of(own_shard_range): + children_shard_ranges.append(shard_range) else: other_shard_ranges.append(shard_range) + if children_shard_ranges and not broker.is_sharded(): + # Merging shard ranges from the root is only necessary until this + # DB is fully cleaved and reaches SHARDED DB state, after which it + # is useful for debugging for the set of sub-shards to which a + # shards has sharded to be frozen. + self.logger.debug('Updating %d children shard ranges from root', + len(children_shard_ranges)) + broker.merge_shard_ranges(children_shard_ranges) + if (other_shard_ranges and own_shard_range.state in ShardRange.SHRINKING_STATES): # If own_shard_range state is shrinking, save off *all* shards diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 71fac12336..02f66778b0 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -1106,60 +1106,40 @@ class TestContainerSharding(BaseAutoContainerSharding): # but third replica still has no idea it should be sharding self.assertLengthEqual(found_for_shard['normal_dbs'], 3) - self.assertEqual( - ShardRange.ACTIVE, - ContainerBroker( - found_for_shard['normal_dbs'][2]).get_own_shard_range().state) + broker = ContainerBroker(found_for_shard['normal_dbs'][2]) + self.assertEqual(ShardRange.ACTIVE, broker.get_own_shard_range().state) - # ...but once sharder runs on third replica it will learn its state; - # note that any root replica on the stopped container server also won't - # know about the shards being in sharding state, so leave that server - # stopped for now so that shard fetches its state from an up-to-date - # root replica + # ...but once sharder runs on third replica it will learn its state and + # fetch its sub-shard ranges durng audit; note that any root replica on + # the stopped container server also won't know about the shards being + # in sharding state, so leave that server stopped for now so that shard + # fetches its state from an up-to-date root replica self.sharders.once( number=shard_1_nodes[2], additional_args='--partitions=%s' % shard_1_part) - # third replica is sharding but has no sub-shard ranges yet... + # third replica is sharding and has sub-shard ranges so can start + # cleaving... found_for_shard = self.categorize_container_dir_content( shard_1.account, shard_1.container) - self.assertLengthEqual(found_for_shard['shard_dbs'], 2) + self.assertLengthEqual(found_for_shard['shard_dbs'], 3) self.assertLengthEqual(found_for_shard['normal_dbs'], 3) - broker = ContainerBroker(found_for_shard['normal_dbs'][2]) - self.assertEqual('unsharded', broker.get_db_state()) + sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2]) + self.assertEqual('sharding', sharding_broker.get_db_state()) self.assertEqual( - ShardRange.SHARDING, broker.get_own_shard_range().state) - self.assertFalse(broker.get_shard_ranges()) + ShardRange.SHARDING, sharding_broker.get_own_shard_range().state) + self.assertEqual(3, len(sharding_broker.get_shard_ranges())) - contexts = list(CleavingContext.load_all(broker)) - self.assertEqual([], contexts) # length check - - # ...until sub-shard ranges are replicated from another shard replica; # there may also be a sub-shard replica missing so run replicators on # all nodes to fix that if necessary self.brain.servers.start(number=shard_1_nodes[2]) self.replicators.once() # Now that the replicators have all run, third replica sees cleaving - # contexts for the first two - contexts = list(CleavingContext.load_all(broker)) - self.assertEqual(len(contexts), 2) - - # now run sharder again on third replica - self.sharders.once( - number=shard_1_nodes[2], - additional_args='--partitions=%s' % shard_1_part) - sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2]) - self.assertEqual('sharding', sharding_broker.get_db_state()) - - broker_id = broker.get_info()['id'] - # Old, unsharded DB doesn't have the context... - contexts = list(CleavingContext.load_all(broker)) - self.assertEqual(len(contexts), 2) - self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts]) - # ...but the sharding one does + # contexts for the first two (plus its own cleaving context) contexts = list(CleavingContext.load_all(sharding_broker)) self.assertEqual(len(contexts), 3) + broker_id = broker.get_info()['id'] self.assertIn(broker_id, [ctx[0].ref for ctx in contexts]) # check original first shard range state and sub-shards - all replicas diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 67da6c9d6e..7d44c00aa1 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -6026,6 +6026,155 @@ class TestSharder(BaseTestSharder): warning_lines[1]) self.assertFalse(sharder.logger.get_lines_for_level('error')) + def test_audit_shard_container_children_merged_while_sharding(self): + # Verify that sharding shard will always merge children shard ranges + def do_test(child_deleted, child_state): + root_sr = ShardRange('a/root', next(self.ts_iter), + state=ShardRange.SHARDED) + parent_path = ShardRange.make_path( + '.shards_a', 'root', root_sr.container, + next(self.ts_iter), 2) + parent_sr = ShardRange( + parent_path, next(self.ts_iter), 'a', 'd', + state=ShardRange.SHARDING) + child_srs = [] + for i, lower, upper in ((0, 'a', 'b'), (0, 'b', 'd')): + child_path = ShardRange.make_path( + '.shards_a', 'root', parent_sr.container, + next(self.ts_iter), i) + child_sr = ShardRange( + child_path, next(self.ts_iter), lower, upper, + state=child_state, deleted=child_deleted) + self.assertTrue(child_sr.is_child_of(parent_sr)) + child_srs.append(child_sr) + other_path = ShardRange.make_path( + '.shards_a', 'root', root_sr.container, + next(self.ts_iter), 3) # different index w.r.t. parent + other_sr = ShardRange( + other_path, next(self.ts_iter), 'a', 'd', + state=ShardRange.ACTIVE) + self.assertFalse(other_sr.is_child_of(parent_sr)) + + # the parent is sharding... + broker = self._make_broker(account=parent_sr.account, + container=parent_sr.container) + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + broker.merge_shard_ranges(parent_sr) + self.assertEqual(UNSHARDED, broker.get_db_state()) + self.assertEqual([parent_sr], + broker.get_shard_ranges(include_own=True)) + + ranges_from_root = child_srs + [parent_sr, root_sr, other_sr] + sharder, mock_swift = self.call_audit_container( + broker, ranges_from_root) + expected_headers = {'X-Backend-Record-Type': 'shard', + 'X-Newest': 'true', + 'X-Backend-Include-Deleted': 'True', + 'X-Backend-Override-Deleted': 'true'} + params = {'format': 'json', 'marker': 'a', 'end_marker': 'd', + 'states': 'auditing'} + mock_swift.make_request.assert_called_once_with( + 'GET', '/v1/a/c', expected_headers, + acceptable_statuses=(2,), params=params) + + self._assert_shard_ranges_equal( + sorted(child_srs + [parent_sr], + key=ShardRange.sort_key), + sorted(broker.get_shard_ranges( + include_own=True, include_deleted=True), + key=ShardRange.sort_key)) + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} + self._assert_stats(expected_stats, sharder, 'audit_shard') + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + + for child_deleted in (False, True): + for child_state in ShardRange.STATES: + with annotate_failure('deleted: %s, state: %s' + % (child_deleted, child_state)): + do_test(child_deleted, child_state) + + def test_audit_shard_container_children_not_merged_once_sharded(self): + # Verify that sharding shard will not merge children shard ranges + # once the DB is sharded (but continues to merge own shard range + # received from root) + root_sr = ShardRange('a/root', next(self.ts_iter), + state=ShardRange.SHARDED) + ts = next(self.ts_iter) + parent_path = ShardRange.make_path( + '.shards_a', 'root', root_sr.container, ts, 2) + parent_sr = ShardRange( + parent_path, ts, 'a', 'b', state=ShardRange.ACTIVE, epoch=ts) + child_srs = [] + for i, lower, upper in ((0, 'a', 'ab'), (0, 'ab', 'b')): + child_path = ShardRange.make_path( + '.shards_a', 'root', parent_sr.container, + next(self.ts_iter), i) + child_sr = ShardRange( + child_path, next(self.ts_iter), lower, upper, + state=ShardRange.CLEAVED) + self.assertTrue(child_sr.is_child_of(parent_sr)) + child_srs.append(child_sr) + + # DB is unsharded... + broker = self._make_broker(account=parent_sr.account, + container=parent_sr.container) + broker.set_sharding_sysmeta('Quoted-Root', 'a/c') + broker.merge_shard_ranges(parent_sr) + self.assertEqual(UNSHARDED, broker.get_db_state()) + + self.assertTrue(parent_sr.update_state( + ShardRange.SHARDING, state_timestamp=next(self.ts_iter))) + ranges_from_root = child_srs + [parent_sr, root_sr] + sharder, _ = self.call_audit_container(broker, ranges_from_root) + + # children ranges from root are merged + self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges()) + # own sr from root is merged + self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY), + dict(broker.get_own_shard_range())) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + + # DB is sharding... + self.assertTrue(broker.set_sharding_state()) + self.assertEqual(SHARDING, broker.get_db_state()) + parent_sr.state_timestamp = next(self.ts_iter) + for child_sr in child_srs: + child_sr.update_state(ShardRange.ACTIVE, + state_timestamp=next(self.ts_iter)) + + sharder, _ = self.call_audit_container(broker, ranges_from_root) + + # children ranges from root are merged + self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges()) + # own sr from root is merged + self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY), + dict(broker.get_own_shard_range())) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + + # DB is sharded... + self.assertTrue(broker.set_sharded_state()) + self.assertEqual(SHARDED, broker.get_db_state()) + self.assertTrue(parent_sr.update_state( + ShardRange.SHARDED, state_timestamp=next(self.ts_iter))) + updated_child_srs = [ + child_sr.copy(state=ShardRange.SHARDING, + state_timestamp=next(self.ts_iter)) + for child_sr in child_srs] + + ranges_from_root = updated_child_srs + [parent_sr, root_sr] + sharder, _ = self.call_audit_container(broker, ranges_from_root) + + # children ranges from root are NOT merged + self._assert_shard_ranges_equal(child_srs, broker.get_shard_ranges()) + # own sr from root is merged + self.assertEqual(dict(parent_sr, meta_timestamp=mock.ANY), + dict(broker.get_own_shard_range())) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + def test_audit_shard_deleted_range_in_root_container(self): # verify that shard DB is marked deleted when its own shard range is # updated with deleted version from root