diff --git a/swift/container/backend.py b/swift/container/backend.py index 9407177e7a..8684a75d5f 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -1996,6 +1996,22 @@ class ContainerBroker(DatabaseBroker): self.update_metadata({'X-Container-Sysmeta-Shard-' + key: (value, Timestamp.now().internal)}) + def get_sharding_sysmeta_with_timestamps(self): + """ + Returns sharding specific info from the broker's metadata with + timestamps. + + :param key: if given the value stored under ``key`` in the sharding + info will be returned. + :return: a dict of sharding info with their timestamps. + """ + prefix = 'X-Container-Sysmeta-Shard-' + return { + k[len(prefix):]: v + for k, v in self.metadata.items() + if k.startswith(prefix) + } + def get_sharding_sysmeta(self, key=None): """ Returns sharding specific info from the broker's metadata. @@ -2005,13 +2021,11 @@ class ContainerBroker(DatabaseBroker): :return: either a dict of sharding info or the value stored under ``key`` in that dict. """ - prefix = 'X-Container-Sysmeta-Shard-' - metadata = self.metadata - info = dict((k[len(prefix):], v[0]) for - k, v in metadata.items() if k.startswith(prefix)) + info = self.get_sharding_sysmeta_with_timestamps() if key: - return info.get(key) - return info + return info.get(key, (None, None))[0] + else: + return {k: v[0] for k, v in info.items()} def _load_root_info(self): """ diff --git a/swift/container/sharder.py b/swift/container/sharder.py index d5f125968a..9e6b40a866 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -220,6 +220,10 @@ class CleavingContext(object): yield 'ranges_done', self.ranges_done yield 'ranges_todo', self.ranges_todo + def __repr__(self): + return '%s(%s)' % (self.__class__.__name__, ', '.join( + '%s=%r' % prop for prop in self)) + def _encode(cls, value): if value is not None and six.PY2 and isinstance(value, six.text_type): return value.encode('utf-8') @@ -241,6 +245,26 @@ class CleavingContext(object): def _make_ref(cls, broker): return broker.get_info()['id'] + @classmethod + def load_all(cls, broker): + """ + Returns all cleaving contexts stored in the broker. + + :param broker: + :return: list of tuples of (CleavingContext, timestamp) + """ + brokers = broker.get_brokers() + sysmeta = brokers[-1].get_sharding_sysmeta_with_timestamps() + + for key, (val, timestamp) in sysmeta.items(): + # If the value is of length 0, then the metadata is + # marked for deletion + if key.startswith("Context-") and len(val) > 0: + try: + yield cls(**json.loads(val)), timestamp + except ValueError: + continue + @classmethod def load(cls, broker): """ @@ -287,6 +311,11 @@ class CleavingContext(object): return all((self.misplaced_done, self.cleaving_done, self.max_row == self.cleave_to_row)) + def delete(self, broker): + # These will get reclaimed when `_reclaim_metadata` in + # common/db.py is called. + broker.set_sharding_sysmeta('Context-' + self.ref, '') + DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000 DEFAULT_SHARD_SHRINK_POINT = 25 @@ -723,12 +752,20 @@ class ContainerSharder(ContainerReplicator): self._increment_stat('audit_shard', 'success', statsd=True) return True + def _audit_cleave_contexts(self, broker): + now = Timestamp.now() + for context, last_mod in CleavingContext.load_all(broker): + if Timestamp(last_mod).timestamp + self.reclaim_age < \ + now.timestamp: + context.delete(broker) + def _audit_container(self, broker): if broker.is_deleted(): # if the container has been marked as deleted, all metadata will # have been erased so no point auditing. But we want it to pass, in # case any objects exist inside it. return True + self._audit_cleave_contexts(broker) if broker.is_root_container(): return self._audit_root_container(broker) return self._audit_shard_container(broker) @@ -1306,6 +1343,7 @@ class ContainerSharder(ContainerReplicator): modified_shard_ranges.append(own_shard_range) broker.merge_shard_ranges(modified_shard_ranges) if broker.set_sharded_state(): + cleaving_context.delete(broker) return True else: self.logger.warning( diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 0befd604f0..54a87137e0 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -27,6 +27,7 @@ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING from swift.common import utils from swift.common.manager import Manager +from swift.container.sharder import CleavingContext from swiftclient import client, get_auth, ClientException from swift.proxy.controllers.obj import num_container_updates @@ -601,6 +602,8 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges) self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE], [sr['state'] for sr in orig_root_shard_ranges]) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check self.direct_delete_container(expect_failure=True) self.assertLengthEqual(found['normal_dbs'], 2) @@ -618,6 +621,9 @@ class TestContainerSharding(BaseTestContainerSharding): orig_root_shard_ranges, shard_ranges, excludes=['meta_timestamp', 'state', 'state_timestamp']) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check + if run_replicators: Manager(['container-replicator']).once() # replication doesn't change the db file names @@ -648,6 +654,9 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertGreaterEqual(updated.meta_timestamp, orig['meta_timestamp']) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check + # Check that entire listing is available headers, actual_listing = self.assert_container_listing(obj_names) # ... and check some other container properties @@ -738,6 +747,16 @@ class TestContainerSharding(BaseTestContainerSharding): first_lower=orig_root_shard_ranges[0]['lower'], last_upper=orig_root_shard_ranges[0]['upper']) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual(len(contexts), 1) + context, _lm = contexts[0] + self.assertIs(context.cleaving_done, False) + self.assertIs(context.misplaced_done, True) + self.assertEqual(context.ranges_done, 2) + self.assertEqual(context.ranges_todo, 1) + self.assertEqual(context.max_row, + self.max_shard_size * 3 // 2) + # but third replica still has no idea it should be sharding self.assertLengthEqual(found_for_shard['normal_dbs'], 3) self.assertEqual( @@ -765,16 +784,36 @@ class TestContainerSharding(BaseTestContainerSharding): ShardRange.SHARDING, broker.get_own_shard_range().state) self.assertFalse(broker.get_shard_ranges()) + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) # length check + # ...until sub-shard ranges are replicated from another shard replica; # there may also be a sub-shard replica missing so run replicators on # all nodes to fix that if necessary self.brain.servers.start(number=shard_1_nodes[2]) self.replicators.once() + # Now that the replicators have all run, third replica sees cleaving + # contexts for the first two + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual(len(contexts), 2) + # now run sharder again on third replica self.sharders.once( number=shard_1_nodes[2], additional_args='--partitions=%s' % shard_1_part) + sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2]) + self.assertEqual('sharding', sharding_broker.get_db_state()) + + broker_id = broker.get_info()['id'] + # Old, unsharded DB doesn't have the context... + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual(len(contexts), 2) + self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts]) + # ...but the sharding one does + contexts = list(CleavingContext.load_all(sharding_broker)) + self.assertEqual(len(contexts), 3) + self.assertIn(broker_id, [ctx[0].ref for ctx in contexts]) # check original first shard range state and sub-shards - all replicas # should now be in consistent state @@ -847,6 +886,8 @@ class TestContainerSharding(BaseTestContainerSharding): self.assert_container_listing(['alpha'] + more_obj_names + obj_names) # Run sharders again so things settle. self.run_sharders(shard_1) + # Also run replicators to settle cleaving contexts + self.replicators.once() # check original first shard range shards for db_file in found_for_shard['shard_dbs']: @@ -857,6 +898,11 @@ class TestContainerSharding(BaseTestContainerSharding): self.assertEqual( [ShardRange.ACTIVE] * 3, [sr.state for sr in broker.get_shard_ranges()]) + + # Make sure our cleaving contexts got cleaned up + contexts = list(CleavingContext.load_all(broker)) + self.assertEqual([], contexts) + # check root shard ranges root_shard_ranges = self.direct_get_container_shard_ranges() for node, (hdrs, root_shards) in root_shard_ranges.items(): diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 1f499c4151..e112877525 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -21,6 +21,7 @@ import os import shutil from contextlib import contextmanager from tempfile import mkdtemp +from uuid import uuid4 import mock import unittest @@ -4486,6 +4487,63 @@ class TestSharder(BaseTestSharder): set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) + def test_audit_cleave_contexts(self): + + def add_cleave_context(id, last_modified): + params = {'ref': id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % id + with mock_timestamp_now(Timestamp(last_modified)): + broker.update_metadata( + {key: (json.dumps(params), + Timestamp(last_modified).internal)}) + + def get_context(id, broker): + data = broker.get_sharding_sysmeta().get('Context-%s' % id) + if data: + return CleavingContext(**json.loads(data)) + return data + + reclaim_age = 100 + broker = self._make_broker() + + # sanity check + self.assertIsNone(broker.get_own_shard_range(no_default=True)) + self.assertEqual(UNSHARDED, broker.get_db_state()) + + # Setup some cleaving contexts + id_old, id_newish = [str(uuid4()) for _ in range(2)] + contexts = ((id_old, 1), + (id_newish, reclaim_age // 2)) + for id, last_modified in contexts: + add_cleave_context(id, last_modified) + + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age + 2)): + sharder._audit_cleave_contexts(broker) + + old_ctx = get_context(id_old, broker) + self.assertEqual(old_ctx, "") + + newish_ctx = get_context(id_newish, broker) + self.assertEqual(newish_ctx.ref, id_newish) + + # If we push time another reclaim age later, and they all be removed + # minus id_missing_lm as it has a later last_modified. + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age * 2)): + sharder._audit_cleave_contexts(broker) + + newish_ctx = get_context(id_newish, broker) + self.assertEqual(newish_ctx, "") + class TestCleavingContext(BaseTestSharder): def test_init(self): @@ -4571,11 +4629,85 @@ class TestCleavingContext(BaseTestSharder): self.assertEqual(2, ctx.ranges_done) self.assertEqual(4, ctx.ranges_todo) + def test_load_all(self): + broker = self._make_broker() + last_ctx = None + timestamp = Timestamp.now() + + db_ids = [str(uuid4()) for _ in range(6)] + for db_id in db_ids: + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), timestamp.internal)}) + first_ctx = None + for ctx, lm in CleavingContext.load_all(broker): + if not first_ctx: + first_ctx = ctx + last_ctx = ctx + self.assertIn(ctx.ref, db_ids) + self.assertEqual(lm, timestamp.internal) + + # If a context is deleted (metadata is "") then it's skipped + last_ctx.delete(broker) + db_ids.remove(last_ctx.ref) + + # and let's modify the first + with mock_timestamp_now() as new_timestamp: + first_ctx.store(broker) + + for ctx, lm in CleavingContext.load_all(broker): + self.assertIn(ctx.ref, db_ids) + if ctx.ref == first_ctx.ref: + self.assertEqual(lm, new_timestamp.internal) + else: + self.assertEqual(lm, timestamp.internal) + + def test_delete(self): + broker = self._make_broker() + + db_id = broker.get_info()['id'] + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), Timestamp.now().internal)}) + ctx = CleavingContext.load(broker) + self.assertEqual(db_id, ctx.ref) + + # Now let's delete it. When deleted the metadata key will exist, but + # the value will be "" as this means it'll be reaped later. + ctx.delete(broker) + sysmeta = broker.get_sharding_sysmeta() + for key, val in sysmeta.items(): + if key == "Context-%s" % db_id: + self.assertEqual(val, "") + break + else: + self.fail("Deleted context 'Context-%s' not found") + def test_store(self): broker = self._make_sharding_broker() old_db_id = broker.get_brokers()[0].get_info()['id'] + last_mod = Timestamp.now() ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4) - ctx.store(broker) + with mock_timestamp_now(last_mod): + ctx.store(broker) key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id data = json.loads(broker.metadata[key][0]) expected = {'ref': old_db_id, @@ -4588,6 +4720,8 @@ class TestCleavingContext(BaseTestSharder): 'ranges_done': 2, 'ranges_todo': 4} self.assertEqual(expected, data) + # last modified is the metadata timestamp + self.assertEqual(broker.metadata[key][1], last_mod.internal) def test_store_add_row_load(self): # adding row to older db changes only max_row in the context