From 6e9e922cd6d6f7144605c5e959d3ff64f4bbc461 Mon Sep 17 00:00:00 2001 From: Matthew Oliver Date: Fri, 13 Sep 2019 16:16:06 +1000 Subject: [PATCH] Sharding: Clean up old CleaveConext's during audit This is a combination of 4 commits: ========== Sharding: Clean up old CleaveConext's during audit There is a sharding edge case where more CleaveContext are generated and stored in the sharding container DB. If this number get's high enough, like in the linked bug. If enough CleaveContects build up in the DB then this can lead to the 503's when attempting to list the container due to all the `X-Container-Sysmeta-Shard-Context-*` headers. This patch resolves this by tracking the a CleaveContext's last modified. And during the sharding audit, any context's that hasn't been touched after reclaim_age are deleted. This plus the skip empty ranges patches should improve these handoff shards. (cherry picked from commit 81a41da5420313f9cdb9c759bbb0f46c0d20c5af) ---------- Sharding: Use the metadata timestamp as last_modified This is a follow up patch from the cleaning up cleave context's patch (patch 681970). Instead of tracking a last_modified timestamp, and storing it in the context metadata, use the timestamp we use when storing any metadata. Reducing duplication is nice, but there's a more significant reason to do this: affected container DBs can start getting cleaned up as soon as they're running the new code rather than needing to wait for an additional reclaim_age. (cherry picked from commit 370ac4cd70489a49b2b6408638c9b35006f57053) ---------- Make sharding methods with only one job (cherry picked from commit f56071e57392573b7aea014bba6757a01a8a59ad) ---------- sharding: Update probe test to verify CleavingContext cleanup (cherry picked from commit 9495bc0003817805750dd78f3d93dd1a237f1553) ========== Co-Authored-By: Clay Gerrard Co-Authored-By: Tim Burke Change-Id: I1e502c328be16fca5f1cca2186b27a0545fecc16 Closes-Bug: #1843313 --- swift/container/backend.py | 26 ++++-- swift/container/sharder.py | 38 ++++++++ test/probe/test_sharder.py | 46 ++++++++++ test/unit/container/test_sharder.py | 136 +++++++++++++++++++++++++++- 4 files changed, 239 insertions(+), 7 deletions(-) diff --git a/swift/container/backend.py b/swift/container/backend.py index 9407177e7a..8684a75d5f 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -1996,6 +1996,22 @@ class ContainerBroker(DatabaseBroker): self.update_metadata({'X-Container-Sysmeta-Shard-' + key: (value, Timestamp.now().internal)}) + def get_sharding_sysmeta_with_timestamps(self): + """ + Returns sharding specific info from the broker's metadata with + timestamps. + + :param key: if given the value stored under ``key`` in the sharding + info will be returned. + :return: a dict of sharding info with their timestamps. + """ + prefix = 'X-Container-Sysmeta-Shard-' + return { + k[len(prefix):]: v + for k, v in self.metadata.items() + if k.startswith(prefix) + } + def get_sharding_sysmeta(self, key=None): """ Returns sharding specific info from the broker's metadata. @@ -2005,13 +2021,11 @@ class ContainerBroker(DatabaseBroker): :return: either a dict of sharding info or the value stored under ``key`` in that dict. """ - prefix = 'X-Container-Sysmeta-Shard-' - metadata = self.metadata - info = dict((k[len(prefix):], v[0]) for - k, v in metadata.items() if k.startswith(prefix)) + info = self.get_sharding_sysmeta_with_timestamps() if key: - return info.get(key) - return info + return info.get(key, (None, None))[0] + else: + return {k: v[0] for k, v in info.items()} def _load_root_info(self): """ diff --git a/swift/container/sharder.py b/swift/container/sharder.py index d5f125968a..9e6b40a866 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -220,6 +220,10 @@ class CleavingContext(object): yield 'ranges_done', self.ranges_done yield 'ranges_todo', self.ranges_todo + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, ', '.join( + '%s=%r' % prop for prop in self)) + def _encode(cls, value): if value is not None and six.PY2 and isinstance(value, six.text_type): return value.encode('utf-8') @@ -241,6 +245,26 @@ class CleavingContext(object): def _make_ref(cls, broker): return broker.get_info()['id'] + @classmethod + def load_all(cls, broker): + """ + Returns all cleaving contexts stored in the broker. + + :param broker: + :return: list of tuples of (CleavingContext, timestamp) + """ + brokers = broker.get_brokers() + sysmeta = brokers[-1].get_sharding_sysmeta_with_timestamps() + + for key, (val, timestamp) in sysmeta.items(): + # If the value is of length 0, then the metadata is + # marked for deletion + if key.startswith("Context-") and len(val) > 0: + try: + yield cls(**json.loads(val)), timestamp + except ValueError: + continue + @classmethod def load(cls, broker): """ @@ -287,6 +311,11 @@ class CleavingContext(object): return all((self.misplaced_done, self.cleaving_done, self.max_row == self.cleave_to_row)) + def delete(self, broker): + # These will get reclaimed when `_reclaim_metadata` in + # common/db.py is called. + broker.set_sharding_sysmeta('Context-' + self.ref, '') + DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000 DEFAULT_SHARD_SHRINK_POINT = 25 @@ -723,12 +752,20 @@ class ContainerSharder(ContainerReplicator): self._increment_stat('audit_shard', 'success', statsd=True) return True + def _audit_cleave_contexts(self, broker): + now = Timestamp.now() + for context, last_mod in CleavingContext.load_all(broker): + if Timestamp(last_mod).timestamp + self.reclaim_age < \ + now.timestamp: + context.delete(broker) + def _audit_container(self, broker): if broker.is_deleted(): # if the container has been marked as deleted, all metadata will # have been erased so no point auditing. But we want it to pass, in # case any objects exist inside it. return True + self._audit_cleave_contexts(broker) if broker.is_root_container(): return self._audit_root_container(broker) return self._audit_shard_container(broker) @@ -1306,6 +1343,7 @@ class ContainerSharder(ContainerReplicator): modified_shard_ranges.append(own_shard_range) broker.merge_shard_ranges(modified_shard_ranges) if broker.set_sharded_state(): + cleaving_context.delete(broker) return True else: self.logger.warning( diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 0befd604f0..54a87137e0 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -27,6 +27,7 @@ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING from swift.common import utils from swift.common.manager import Manager +from swift.container.sharder import CleavingContext from swiftclient import client, get_auth, ClientException from swift.proxy.controllers.obj import num_container_updates @@ -601,6 +602,8 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges) self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE], [sr['state'] for sr in orig_root_shard_ranges]) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check self.direct_delete_container(expect_failure=True) self.assertLengthEqual(found['normal_dbs'], 2) @@ -618,6 +621,9 @@ class TestContainerSharding(BaseTestContainerSharding): orig_root_shard_ranges, shard_ranges, excludes=['meta_timestamp', 'state', 'state_timestamp']) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check + if run_replicators: Manager(['container-replicator']).once() # replication doesn't change the db file names @@ -648,6 +654,9 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertGreaterEqual(updated.meta_timestamp, orig['meta_timestamp']) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check + # Check that entire listing is available headers, actual_listing = self.assert_container_listing(obj_names) # ... and check some other container properties @@ -738,6 +747,16 @@ class TestContainerSharding(BaseTestContainerSharding): first_lower=orig_root_shard_ranges[0]['lower'], last_upper=orig_root_shard_ranges[0]['upper']) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual(len(contexts), 1) + context, _lm = contexts[0] + self.assertIs(context.cleaving_done, False) + self.assertIs(context.misplaced_done, True) + self.assertEqual(context.ranges_done, 2) + self.assertEqual(context.ranges_todo, 1) + self.assertEqual(context.max_row, + self.max_shard_size * 3 // 2) + # but third replica still has no idea it should be sharding self.assertLengthEqual(found_for_shard['normal_dbs'], 3) self.assertEqual( @@ -765,16 +784,36 @@ class TestContainerSharding(BaseTestContainerSharding): ShardRange.SHARDING, broker.get_own_shard_range().state) self.assertFalse(broker.get_shard_ranges()) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check + # ...until sub-shard ranges are replicated from another shard replica; # there may also be a sub-shard replica missing so run replicators on # all nodes to fix that if necessary self.brain.servers.start(number=shard_1_nodes[2]) self.replicators.once() + # Now that the replicators have all run, third replica sees cleaving + # contexts for the first two + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual(len(contexts), 2) + # now run sharder again on third replica self.sharders.once( number=shard_1_nodes[2], additional_args='--partitions=%s' % shard_1_part) + sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2]) + self.assertEqual('sharding', sharding_broker.get_db_state()) + + broker_id = broker.get_info()['id'] + # Old, unsharded DB doesn't have the context... + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual(len(contexts), 2) + self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts]) + # ...but the sharding one does + contexts = list(CleavingContext.load_all(sharding_broker)) + self.assertEqual(len(contexts), 3) + self.assertIn(broker_id, [ctx[0].ref for ctx in contexts]) # check original first shard range state and sub-shards - all replicas # should now be in consistent state @@ -847,6 +886,8 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing(['alpha'] + more_obj_names + obj_names) # Run sharders again so things settle. self.run_sharders(shard_1) + # Also run replicators to settle cleaving contexts + self.replicators.once() # check original first shard range shards for db_file in found_for_shard['shard_dbs']: @@ -857,6 +898,11 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertEqual( [ShardRange.ACTIVE] * 3, [sr.state for sr in broker.get_shard_ranges()]) + + # Make sure our cleaving contexts got cleaned up + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) + # check root shard ranges root_shard_ranges = self.direct_get_container_shard_ranges() for node, (hdrs, root_shards) in root_shard_ranges.items(): diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 1f499c4151..e112877525 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -21,6 +21,7 @@ import os import shutil from contextlib import contextmanager from tempfile import mkdtemp +from uuid import uuid4 import mock import unittest @@ -4486,6 +4487,63 @@ class TestSharder(BaseTestSharder): set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) + def test_audit_cleave_contexts(self): + + def add_cleave_context(id, last_modified): + params = {'ref': id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % id + with mock_timestamp_now(Timestamp(last_modified)): + broker.update_metadata( + {key: (json.dumps(params), + Timestamp(last_modified).internal)}) + + def get_context(id, broker): + data = broker.get_sharding_sysmeta().get('Context-%s' % id) + if data: + return CleavingContext(**json.loads(data)) + return data + + reclaim_age = 100 + broker = self._make_broker() + + # sanity check + self.assertIsNone(broker.get_own_shard_range(no_default=True)) + self.assertEqual(UNSHARDED, broker.get_db_state()) + + # Setup some cleaving contexts + id_old, id_newish = [str(uuid4()) for _ in range(2)] + contexts = ((id_old, 1), + (id_newish, reclaim_age // 2)) + for id, last_modified in contexts: + add_cleave_context(id, last_modified) + + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age + 2)): + sharder._audit_cleave_contexts(broker) + + old_ctx = get_context(id_old, broker) + self.assertEqual(old_ctx, "") + + newish_ctx = get_context(id_newish, broker) + self.assertEqual(newish_ctx.ref, id_newish) + + # If we push time another reclaim age later, and they all be removed + # minus id_missing_lm as it has a later last_modified. + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age * 2)): + sharder._audit_cleave_contexts(broker) + + newish_ctx = get_context(id_newish, broker) + self.assertEqual(newish_ctx, "") + class TestCleavingContext(BaseTestSharder): def test_init(self): @@ -4571,11 +4629,85 @@ class TestCleavingContext(BaseTestSharder): self.assertEqual(2, ctx.ranges_done) self.assertEqual(4, ctx.ranges_todo) + def test_load_all(self): + broker = self._make_broker() + last_ctx = None + timestamp = Timestamp.now() + + db_ids = [str(uuid4()) for _ in range(6)] + for db_id in db_ids: + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), timestamp.internal)}) + first_ctx = None + for ctx, lm in CleavingContext.load_all(broker): + if not first_ctx: + first_ctx = ctx + last_ctx = ctx + self.assertIn(ctx.ref, db_ids) + self.assertEqual(lm, timestamp.internal) + + # If a context is deleted (metadata is "") then it's skipped + last_ctx.delete(broker) + db_ids.remove(last_ctx.ref) + + # and let's modify the first + with mock_timestamp_now() as new_timestamp: + first_ctx.store(broker) + + for ctx, lm in CleavingContext.load_all(broker): + self.assertIn(ctx.ref, db_ids) + if ctx.ref == first_ctx.ref: + self.assertEqual(lm, new_timestamp.internal) + else: + self.assertEqual(lm, timestamp.internal) + + def test_delete(self): + broker = self._make_broker() + + db_id = broker.get_info()['id'] + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), Timestamp.now().internal)}) + ctx = CleavingContext.load(broker) + self.assertEqual(db_id, ctx.ref) + + # Now let's delete it. When deleted the metadata key will exist, but + # the value will be "" as this means it'll be reaped later. + ctx.delete(broker) + sysmeta = broker.get_sharding_sysmeta() + for key, val in sysmeta.items(): + if key == "Context-%s" % db_id: + self.assertEqual(val, "") + break + else: + self.fail("Deleted context 'Context-%s' not found") + def test_store(self): broker = self._make_sharding_broker() old_db_id = broker.get_brokers()[0].get_info()['id'] + last_mod = Timestamp.now() ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4) - ctx.store(broker) + with mock_timestamp_now(last_mod): + ctx.store(broker) key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id data = json.loads(broker.metadata[key][0]) expected = {'ref': old_db_id, @@ -4588,6 +4720,8 @@ class TestCleavingContext(BaseTestSharder): 'ranges_done': 2, 'ranges_todo': 4} self.assertEqual(expected, data) + # last modified is the metadata timestamp + self.assertEqual(broker.metadata[key][1], last_mod.internal) def test_store_add_row_load(self): # adding row to older db changes only max_row in the context