Merge "Sharding: Clean up old CleaveConext's during audit" into stable/stein
This commit is contained in:
commit
12fd239274
|
@ -1996,6 +1996,22 @@ class ContainerBroker(DatabaseBroker):
|
|||
self.update_metadata({'X-Container-Sysmeta-Shard-' + key:
|
||||
(value, Timestamp.now().internal)})
|
||||
|
||||
def get_sharding_sysmeta_with_timestamps(self):
|
||||
"""
|
||||
Returns sharding specific info from the broker's metadata with
|
||||
timestamps.
|
||||
|
||||
:param key: if given the value stored under ``key`` in the sharding
|
||||
info will be returned.
|
||||
:return: a dict of sharding info with their timestamps.
|
||||
"""
|
||||
prefix = 'X-Container-Sysmeta-Shard-'
|
||||
return {
|
||||
k[len(prefix):]: v
|
||||
for k, v in self.metadata.items()
|
||||
if k.startswith(prefix)
|
||||
}
|
||||
|
||||
def get_sharding_sysmeta(self, key=None):
|
||||
"""
|
||||
Returns sharding specific info from the broker's metadata.
|
||||
|
@ -2005,13 +2021,11 @@ class ContainerBroker(DatabaseBroker):
|
|||
:return: either a dict of sharding info or the value stored under
|
||||
``key`` in that dict.
|
||||
"""
|
||||
prefix = 'X-Container-Sysmeta-Shard-'
|
||||
metadata = self.metadata
|
||||
info = dict((k[len(prefix):], v[0]) for
|
||||
k, v in metadata.items() if k.startswith(prefix))
|
||||
info = self.get_sharding_sysmeta_with_timestamps()
|
||||
if key:
|
||||
return info.get(key)
|
||||
return info
|
||||
return info.get(key, (None, None))[0]
|
||||
else:
|
||||
return {k: v[0] for k, v in info.items()}
|
||||
|
||||
def _load_root_info(self):
|
||||
"""
|
||||
|
|
|
@ -220,6 +220,10 @@ class CleavingContext(object):
|
|||
yield 'ranges_done', self.ranges_done
|
||||
yield 'ranges_todo', self.ranges_todo
|
||||
|
||||
def __repr__(self):
|
||||
return '%s(%s)' % (self.__class__.__name__, ', '.join(
|
||||
'%s=%r' % prop for prop in self))
|
||||
|
||||
def _encode(cls, value):
|
||||
if value is not None and six.PY2 and isinstance(value, six.text_type):
|
||||
return value.encode('utf-8')
|
||||
|
@ -241,6 +245,26 @@ class CleavingContext(object):
|
|||
def _make_ref(cls, broker):
|
||||
return broker.get_info()['id']
|
||||
|
||||
@classmethod
|
||||
def load_all(cls, broker):
|
||||
"""
|
||||
Returns all cleaving contexts stored in the broker.
|
||||
|
||||
:param broker:
|
||||
:return: list of tuples of (CleavingContext, timestamp)
|
||||
"""
|
||||
brokers = broker.get_brokers()
|
||||
sysmeta = brokers[-1].get_sharding_sysmeta_with_timestamps()
|
||||
|
||||
for key, (val, timestamp) in sysmeta.items():
|
||||
# If the value is of length 0, then the metadata is
|
||||
# marked for deletion
|
||||
if key.startswith("Context-") and len(val) > 0:
|
||||
try:
|
||||
yield cls(**json.loads(val)), timestamp
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
@classmethod
|
||||
def load(cls, broker):
|
||||
"""
|
||||
|
@ -287,6 +311,11 @@ class CleavingContext(object):
|
|||
return all((self.misplaced_done, self.cleaving_done,
|
||||
self.max_row == self.cleave_to_row))
|
||||
|
||||
def delete(self, broker):
|
||||
# These will get reclaimed when `_reclaim_metadata` in
|
||||
# common/db.py is called.
|
||||
broker.set_sharding_sysmeta('Context-' + self.ref, '')
|
||||
|
||||
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000
|
||||
DEFAULT_SHARD_SHRINK_POINT = 25
|
||||
|
@ -723,12 +752,20 @@ class ContainerSharder(ContainerReplicator):
|
|||
self._increment_stat('audit_shard', 'success', statsd=True)
|
||||
return True
|
||||
|
||||
def _audit_cleave_contexts(self, broker):
|
||||
now = Timestamp.now()
|
||||
for context, last_mod in CleavingContext.load_all(broker):
|
||||
if Timestamp(last_mod).timestamp + self.reclaim_age < \
|
||||
now.timestamp:
|
||||
context.delete(broker)
|
||||
|
||||
def _audit_container(self, broker):
|
||||
if broker.is_deleted():
|
||||
# if the container has been marked as deleted, all metadata will
|
||||
# have been erased so no point auditing. But we want it to pass, in
|
||||
# case any objects exist inside it.
|
||||
return True
|
||||
self._audit_cleave_contexts(broker)
|
||||
if broker.is_root_container():
|
||||
return self._audit_root_container(broker)
|
||||
return self._audit_shard_container(broker)
|
||||
|
@ -1306,6 +1343,7 @@ class ContainerSharder(ContainerReplicator):
|
|||
modified_shard_ranges.append(own_shard_range)
|
||||
broker.merge_shard_ranges(modified_shard_ranges)
|
||||
if broker.set_sharded_state():
|
||||
cleaving_context.delete(broker)
|
||||
return True
|
||||
else:
|
||||
self.logger.warning(
|
||||
|
|
|
@ -27,6 +27,7 @@ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
|
|||
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
|
||||
from swift.common import utils
|
||||
from swift.common.manager import Manager
|
||||
from swift.container.sharder import CleavingContext
|
||||
from swiftclient import client, get_auth, ClientException
|
||||
|
||||
from swift.proxy.controllers.obj import num_container_updates
|
||||
|
@ -601,6 +602,8 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
|
||||
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
|
||||
[sr['state'] for sr in orig_root_shard_ranges])
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
self.direct_delete_container(expect_failure=True)
|
||||
|
||||
self.assertLengthEqual(found['normal_dbs'], 2)
|
||||
|
@ -618,6 +621,9 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
orig_root_shard_ranges, shard_ranges,
|
||||
excludes=['meta_timestamp', 'state', 'state_timestamp'])
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
|
||||
if run_replicators:
|
||||
Manager(['container-replicator']).once()
|
||||
# replication doesn't change the db file names
|
||||
|
@ -648,6 +654,9 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
self.assertGreaterEqual(updated.meta_timestamp,
|
||||
orig['meta_timestamp'])
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
|
||||
# Check that entire listing is available
|
||||
headers, actual_listing = self.assert_container_listing(obj_names)
|
||||
# ... and check some other container properties
|
||||
|
@ -738,6 +747,16 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
first_lower=orig_root_shard_ranges[0]['lower'],
|
||||
last_upper=orig_root_shard_ranges[0]['upper'])
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 1)
|
||||
context, _lm = contexts[0]
|
||||
self.assertIs(context.cleaving_done, False)
|
||||
self.assertIs(context.misplaced_done, True)
|
||||
self.assertEqual(context.ranges_done, 2)
|
||||
self.assertEqual(context.ranges_todo, 1)
|
||||
self.assertEqual(context.max_row,
|
||||
self.max_shard_size * 3 // 2)
|
||||
|
||||
# but third replica still has no idea it should be sharding
|
||||
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
||||
self.assertEqual(
|
||||
|
@ -765,16 +784,36 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
||||
self.assertFalse(broker.get_shard_ranges())
|
||||
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts) # length check
|
||||
|
||||
# ...until sub-shard ranges are replicated from another shard replica;
|
||||
# there may also be a sub-shard replica missing so run replicators on
|
||||
# all nodes to fix that if necessary
|
||||
self.brain.servers.start(number=shard_1_nodes[2])
|
||||
self.replicators.once()
|
||||
|
||||
# Now that the replicators have all run, third replica sees cleaving
|
||||
# contexts for the first two
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 2)
|
||||
|
||||
# now run sharder again on third replica
|
||||
self.sharders.once(
|
||||
number=shard_1_nodes[2],
|
||||
additional_args='--partitions=%s' % shard_1_part)
|
||||
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||
self.assertEqual('sharding', sharding_broker.get_db_state())
|
||||
|
||||
broker_id = broker.get_info()['id']
|
||||
# Old, unsharded DB doesn't have the context...
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual(len(contexts), 2)
|
||||
self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||
# ...but the sharding one does
|
||||
contexts = list(CleavingContext.load_all(sharding_broker))
|
||||
self.assertEqual(len(contexts), 3)
|
||||
self.assertIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||
|
||||
# check original first shard range state and sub-shards - all replicas
|
||||
# should now be in consistent state
|
||||
|
@ -847,6 +886,8 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
self.assert_container_listing(['alpha'] + more_obj_names + obj_names)
|
||||
# Run sharders again so things settle.
|
||||
self.run_sharders(shard_1)
|
||||
# Also run replicators to settle cleaving contexts
|
||||
self.replicators.once()
|
||||
|
||||
# check original first shard range shards
|
||||
for db_file in found_for_shard['shard_dbs']:
|
||||
|
@ -857,6 +898,11 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||
self.assertEqual(
|
||||
[ShardRange.ACTIVE] * 3,
|
||||
[sr.state for sr in broker.get_shard_ranges()])
|
||||
|
||||
# Make sure our cleaving contexts got cleaned up
|
||||
contexts = list(CleavingContext.load_all(broker))
|
||||
self.assertEqual([], contexts)
|
||||
|
||||
# check root shard ranges
|
||||
root_shard_ranges = self.direct_get_container_shard_ranges()
|
||||
for node, (hdrs, root_shards) in root_shard_ranges.items():
|
||||
|
|
|
@ -21,6 +21,7 @@ import os
|
|||
import shutil
|
||||
from contextlib import contextmanager
|
||||
from tempfile import mkdtemp
|
||||
from uuid import uuid4
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
|
@ -4486,6 +4487,63 @@ class TestSharder(BaseTestSharder):
|
|||
set((call[0][0].path, call[0][1]['id'], call[0][2])
|
||||
for call in mock_process_broker.call_args_list))
|
||||
|
||||
def test_audit_cleave_contexts(self):
|
||||
|
||||
def add_cleave_context(id, last_modified):
|
||||
params = {'ref': id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
|
||||
with mock_timestamp_now(Timestamp(last_modified)):
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params),
|
||||
Timestamp(last_modified).internal)})
|
||||
|
||||
def get_context(id, broker):
|
||||
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
|
||||
if data:
|
||||
return CleavingContext(**json.loads(data))
|
||||
return data
|
||||
|
||||
reclaim_age = 100
|
||||
broker = self._make_broker()
|
||||
|
||||
# sanity check
|
||||
self.assertIsNone(broker.get_own_shard_range(no_default=True))
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
|
||||
# Setup some cleaving contexts
|
||||
id_old, id_newish = [str(uuid4()) for _ in range(2)]
|
||||
contexts = ((id_old, 1),
|
||||
(id_newish, reclaim_age // 2))
|
||||
for id, last_modified in contexts:
|
||||
add_cleave_context(id, last_modified)
|
||||
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
old_ctx = get_context(id_old, broker)
|
||||
self.assertEqual(old_ctx, "")
|
||||
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx.ref, id_newish)
|
||||
|
||||
# If we push time another reclaim age later, and they all be removed
|
||||
# minus id_missing_lm as it has a later last_modified.
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx, "")
|
||||
|
||||
|
||||
class TestCleavingContext(BaseTestSharder):
|
||||
def test_init(self):
|
||||
|
@ -4571,11 +4629,85 @@ class TestCleavingContext(BaseTestSharder):
|
|||
self.assertEqual(2, ctx.ranges_done)
|
||||
self.assertEqual(4, ctx.ranges_todo)
|
||||
|
||||
def test_load_all(self):
|
||||
broker = self._make_broker()
|
||||
last_ctx = None
|
||||
timestamp = Timestamp.now()
|
||||
|
||||
db_ids = [str(uuid4()) for _ in range(6)]
|
||||
for db_id in db_ids:
|
||||
params = {'ref': db_id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params), timestamp.internal)})
|
||||
first_ctx = None
|
||||
for ctx, lm in CleavingContext.load_all(broker):
|
||||
if not first_ctx:
|
||||
first_ctx = ctx
|
||||
last_ctx = ctx
|
||||
self.assertIn(ctx.ref, db_ids)
|
||||
self.assertEqual(lm, timestamp.internal)
|
||||
|
||||
# If a context is deleted (metadata is "") then it's skipped
|
||||
last_ctx.delete(broker)
|
||||
db_ids.remove(last_ctx.ref)
|
||||
|
||||
# and let's modify the first
|
||||
with mock_timestamp_now() as new_timestamp:
|
||||
first_ctx.store(broker)
|
||||
|
||||
for ctx, lm in CleavingContext.load_all(broker):
|
||||
self.assertIn(ctx.ref, db_ids)
|
||||
if ctx.ref == first_ctx.ref:
|
||||
self.assertEqual(lm, new_timestamp.internal)
|
||||
else:
|
||||
self.assertEqual(lm, timestamp.internal)
|
||||
|
||||
def test_delete(self):
|
||||
broker = self._make_broker()
|
||||
|
||||
db_id = broker.get_info()['id']
|
||||
params = {'ref': db_id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
||||
ctx = CleavingContext.load(broker)
|
||||
self.assertEqual(db_id, ctx.ref)
|
||||
|
||||
# Now let's delete it. When deleted the metadata key will exist, but
|
||||
# the value will be "" as this means it'll be reaped later.
|
||||
ctx.delete(broker)
|
||||
sysmeta = broker.get_sharding_sysmeta()
|
||||
for key, val in sysmeta.items():
|
||||
if key == "Context-%s" % db_id:
|
||||
self.assertEqual(val, "")
|
||||
break
|
||||
else:
|
||||
self.fail("Deleted context 'Context-%s' not found")
|
||||
|
||||
def test_store(self):
|
||||
broker = self._make_sharding_broker()
|
||||
old_db_id = broker.get_brokers()[0].get_info()['id']
|
||||
last_mod = Timestamp.now()
|
||||
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4)
|
||||
ctx.store(broker)
|
||||
with mock_timestamp_now(last_mod):
|
||||
ctx.store(broker)
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id
|
||||
data = json.loads(broker.metadata[key][0])
|
||||
expected = {'ref': old_db_id,
|
||||
|
@ -4588,6 +4720,8 @@ class TestCleavingContext(BaseTestSharder):
|
|||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
self.assertEqual(expected, data)
|
||||
# last modified is the metadata timestamp
|
||||
self.assertEqual(broker.metadata[key][1], last_mod.internal)
|
||||
|
||||
def test_store_add_row_load(self):
|
||||
# adding row to older db changes only max_row in the context
|
||||
|
|
Loading…
Reference in New Issue