Sharding: Clean up old CleaveConext's during audit
There is a sharding edge case where more CleaveContext are generated and stored in the sharding container DB. If this number get's high enough, like in the linked bug. If enough CleaveContects build up in the DB then this can lead to the 503's when attempting to list the container due to all the `X-Container-Sysmeta-Shard-Context-*` headers. This patch resolves this by tracking the a CleaveContext's last modified. And during the sharding audit, any context's that hasn't been touched after reclaim_age are deleted. This plus the skip empty ranges patches should improve these handoff shards. Change-Id: I1e502c328be16fca5f1cca2186b27a0545fecc16 Closes-Bug: #1843313
This commit is contained in:
parent
5cb53838a6
commit
81a41da542
@ -197,7 +197,8 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size):
|
||||
class CleavingContext(object):
|
||||
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
|
||||
last_cleave_to_row=None, cleaving_done=False,
|
||||
misplaced_done=False, ranges_done=0, ranges_todo=0):
|
||||
misplaced_done=False, ranges_done=0, ranges_todo=0,
|
||||
last_modified=None):
|
||||
self.ref = ref
|
||||
self._cursor = None
|
||||
self.cursor = cursor
|
||||
@ -208,6 +209,7 @@ class CleavingContext(object):
|
||||
self.misplaced_done = misplaced_done
|
||||
self.ranges_done = ranges_done
|
||||
self.ranges_todo = ranges_todo
|
||||
self.last_modified = last_modified
|
||||
|
||||
def __iter__(self):
|
||||
yield 'ref', self.ref
|
||||
@ -219,6 +221,7 @@ class CleavingContext(object):
|
||||
yield 'misplaced_done', self.misplaced_done
|
||||
yield 'ranges_done', self.ranges_done
|
||||
yield 'ranges_todo', self.ranges_todo
|
||||
yield 'last_modified', self.last_modified
|
||||
|
||||
def _encode(cls, value):
|
||||
if value is not None and six.PY2 and isinstance(value, six.text_type):
|
||||
@ -241,6 +244,26 @@ class CleavingContext(object):
|
||||
def _make_ref(cls, broker):
|
||||
return broker.get_info()['id']
|
||||
|
||||
@classmethod
|
||||
def load_all(cls, broker):
|
||||
"""
|
||||
Returns all cleaving contexts stored in the broker.
|
||||
|
||||
:param broker:
|
||||
:return: list of CleavingContexts
|
||||
"""
|
||||
brokers = broker.get_brokers()
|
||||
sysmeta = brokers[-1].get_sharding_sysmeta()
|
||||
|
||||
for key, val in sysmeta.items():
|
||||
# If the value is of length 0, then the metadata is
|
||||
# marked for deletion
|
||||
if key.startswith("Context-") and len(val) > 0:
|
||||
try:
|
||||
yield cls(**json.loads(val))
|
||||
except ValueError:
|
||||
continue
|
||||
|
||||
@classmethod
|
||||
def load(cls, broker):
|
||||
"""
|
||||
@ -265,6 +288,7 @@ class CleavingContext(object):
|
||||
return cls(**data)
|
||||
|
||||
def store(self, broker):
|
||||
self.last_modified = Timestamp.now().internal
|
||||
broker.set_sharding_sysmeta('Context-' + self.ref,
|
||||
json.dumps(dict(self)))
|
||||
|
||||
@ -287,6 +311,11 @@ class CleavingContext(object):
|
||||
return all((self.misplaced_done, self.cleaving_done,
|
||||
self.max_row == self.cleave_to_row))
|
||||
|
||||
def delete(self, broker):
|
||||
# These will get reclaimed when `_reclaim_metadata` in
|
||||
# common/db.py is called.
|
||||
broker.set_sharding_sysmeta('Context-' + self.ref, '')
|
||||
|
||||
|
||||
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000
|
||||
DEFAULT_SHARD_SHRINK_POINT = 25
|
||||
@ -724,12 +753,23 @@ class ContainerSharder(ContainerReplicator):
|
||||
self._increment_stat('audit_shard', 'success', statsd=True)
|
||||
return True
|
||||
|
||||
def _audit_cleave_contexts(self, broker):
|
||||
for context in CleavingContext.load_all(broker):
|
||||
now = Timestamp.now()
|
||||
last_mod = context.last_modified
|
||||
if not last_mod:
|
||||
context.store(broker)
|
||||
elif Timestamp(last_mod).timestamp + self.reclaim_age < \
|
||||
now.timestamp:
|
||||
context.delete(broker)
|
||||
|
||||
def _audit_container(self, broker):
|
||||
if broker.is_deleted():
|
||||
# if the container has been marked as deleted, all metadata will
|
||||
# have been erased so no point auditing. But we want it to pass, in
|
||||
# case any objects exist inside it.
|
||||
return True
|
||||
self._audit_cleave_contexts(broker)
|
||||
if broker.is_root_container():
|
||||
return self._audit_root_container(broker)
|
||||
return self._audit_shard_container(broker)
|
||||
@ -1307,6 +1347,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
modified_shard_ranges.append(own_shard_range)
|
||||
broker.merge_shard_ranges(modified_shard_ranges)
|
||||
if broker.set_sharded_state():
|
||||
cleaving_context.delete(broker)
|
||||
return True
|
||||
else:
|
||||
self.logger.warning(
|
||||
|
@ -201,6 +201,7 @@ class TestManageShardRanges(unittest.TestCase):
|
||||
' "cleaving_done": false,',
|
||||
' "cursor": "",',
|
||||
' "last_cleave_to_row": null,',
|
||||
' "last_modified": null,',
|
||||
' "max_row": -1,',
|
||||
' "misplaced_done": false,',
|
||||
' "ranges_done": 0,',
|
||||
|
@ -21,6 +21,7 @@ import os
|
||||
import shutil
|
||||
from contextlib import contextmanager
|
||||
from tempfile import mkdtemp
|
||||
from uuid import uuid4
|
||||
|
||||
import mock
|
||||
import unittest
|
||||
@ -4486,6 +4487,83 @@ class TestSharder(BaseTestSharder):
|
||||
set((call[0][0].path, call[0][1]['id'], call[0][2])
|
||||
for call in mock_process_broker.call_args_list))
|
||||
|
||||
def test_audit_cleave_contexts(self):
|
||||
|
||||
def add_cleave_context(id, last_modified):
|
||||
params = {'ref': id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
if last_modified is not None:
|
||||
params['last_modified'] = last_modified
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
|
||||
with mock_timestamp_now(Timestamp(0)):
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
||||
|
||||
def get_context(id, broker):
|
||||
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
|
||||
if data:
|
||||
return CleavingContext(**json.loads(data))
|
||||
return data
|
||||
|
||||
reclaim_age = 100
|
||||
broker = self._make_broker()
|
||||
|
||||
# sanity check
|
||||
self.assertIsNone(broker.get_own_shard_range(no_default=True))
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
|
||||
# Setup some cleaving contexts
|
||||
id_missing_lm, id_old, id_newish = [str(uuid4()) for _ in range(3)]
|
||||
contexts = ((id_missing_lm, None),
|
||||
(id_old, 1),
|
||||
(id_newish, reclaim_age // 2))
|
||||
for id, last_modified in contexts:
|
||||
add_cleave_context(id, last_modified)
|
||||
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
# now is reclaim_age + 1, so the old should've been removed and the
|
||||
# context with the missing last modified should now be reclaim + 1
|
||||
missing_lm_ctx = get_context(id_missing_lm, broker)
|
||||
self.assertEqual(missing_lm_ctx.last_modified,
|
||||
Timestamp(reclaim_age + 2).internal)
|
||||
|
||||
old_ctx = get_context(id_old, broker)
|
||||
self.assertEqual(old_ctx, "")
|
||||
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx.ref, id_newish)
|
||||
|
||||
# If we push time another reclaim age later, and they all be removed
|
||||
# minus id_missing_lm as it has a later last_modified.
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
missing_lm_ctx = get_context(id_missing_lm, broker)
|
||||
self.assertEqual(missing_lm_ctx.last_modified,
|
||||
Timestamp(reclaim_age + 2).internal)
|
||||
|
||||
newish_ctx = get_context(id_newish, broker)
|
||||
self.assertEqual(newish_ctx, "")
|
||||
|
||||
# Fast forward again and they're all cleaned up
|
||||
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
|
||||
with mock_timestamp_now(Timestamp(reclaim_age * 3)):
|
||||
sharder._audit_cleave_contexts(broker)
|
||||
|
||||
missing_lm_ctx = get_context(id_missing_lm, broker)
|
||||
self.assertEqual(missing_lm_ctx, "")
|
||||
|
||||
|
||||
class TestCleavingContext(BaseTestSharder):
|
||||
def test_init(self):
|
||||
@ -4505,6 +4583,7 @@ class TestCleavingContext(BaseTestSharder):
|
||||
'max_row': 12,
|
||||
'cleave_to_row': 11,
|
||||
'last_cleave_to_row': 10,
|
||||
'last_modified': None,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 0,
|
||||
@ -4524,6 +4603,7 @@ class TestCleavingContext(BaseTestSharder):
|
||||
'max_row': 12,
|
||||
'cleave_to_row': 11,
|
||||
'last_cleave_to_row': 10,
|
||||
'last_modified': None,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 0,
|
||||
@ -4571,11 +4651,102 @@ class TestCleavingContext(BaseTestSharder):
|
||||
self.assertEqual(2, ctx.ranges_done)
|
||||
self.assertEqual(4, ctx.ranges_todo)
|
||||
|
||||
def test_load_all(self):
|
||||
broker = self._make_broker()
|
||||
last_ctx = None
|
||||
|
||||
db_ids = [str(uuid4()) for _ in range(6)]
|
||||
for db_id in db_ids:
|
||||
params = {'ref': db_id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
||||
for ctx in CleavingContext.load_all(broker):
|
||||
last_ctx = ctx
|
||||
self.assertIn(ctx.ref, db_ids)
|
||||
|
||||
# If a context is deleted (metadata is "") then it's skipped
|
||||
last_ctx.delete(broker)
|
||||
db_ids.remove(last_ctx.ref)
|
||||
|
||||
for ctx in CleavingContext.load_all(broker):
|
||||
self.assertIn(ctx.ref, db_ids)
|
||||
|
||||
def test_delete(self):
|
||||
broker = self._make_broker()
|
||||
|
||||
db_id = broker.get_info()['id']
|
||||
params = {'ref': db_id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
||||
ctx = CleavingContext.load(broker)
|
||||
self.assertEqual(db_id, ctx.ref)
|
||||
|
||||
# Now let's delete it. When deleted the metadata key will exist, but
|
||||
# the value will be "" as this means it'll be reaped later.
|
||||
ctx.delete(broker)
|
||||
sysmeta = broker.get_sharding_sysmeta()
|
||||
for key, val in sysmeta.items():
|
||||
if key == "Context-%s" % db_id:
|
||||
self.assertEqual(val, "")
|
||||
break
|
||||
else:
|
||||
self.fail("Deleted context 'Context-%s' not found")
|
||||
|
||||
def test_last_modified(self):
|
||||
broker = self._make_broker()
|
||||
|
||||
db_id = broker.get_info()['id']
|
||||
params = {'ref': db_id,
|
||||
'cursor': 'curs',
|
||||
'max_row': 2,
|
||||
'cleave_to_row': 2,
|
||||
'last_cleave_to_row': 1,
|
||||
'cleaving_done': False,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
'ranges_todo': 4}
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
|
||||
broker.update_metadata(
|
||||
{key: (json.dumps(params), Timestamp.now().internal)})
|
||||
ctx = CleavingContext.load(broker)
|
||||
self.assertIsNone(ctx.last_modified)
|
||||
|
||||
# after a store/save the last_modified will be updated
|
||||
ctx.store(broker)
|
||||
ctx = CleavingContext.load(broker)
|
||||
self.assertIsNotNone(ctx.last_modified)
|
||||
last_modified = ctx.last_modified
|
||||
|
||||
# Store again it'll be updated again
|
||||
ctx.store(broker)
|
||||
ctx = CleavingContext.load(broker)
|
||||
self.assertGreater(ctx.last_modified, last_modified)
|
||||
|
||||
def test_store(self):
|
||||
broker = self._make_sharding_broker()
|
||||
old_db_id = broker.get_brokers()[0].get_info()['id']
|
||||
last_mod = Timestamp.now()
|
||||
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4)
|
||||
ctx.store(broker)
|
||||
with mock_timestamp_now(last_mod):
|
||||
ctx.store(broker)
|
||||
key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id
|
||||
data = json.loads(broker.metadata[key][0])
|
||||
expected = {'ref': old_db_id,
|
||||
@ -4583,6 +4754,7 @@ class TestCleavingContext(BaseTestSharder):
|
||||
'max_row': 12,
|
||||
'cleave_to_row': 11,
|
||||
'last_cleave_to_row': 2,
|
||||
'last_modified': last_mod.internal,
|
||||
'cleaving_done': True,
|
||||
'misplaced_done': True,
|
||||
'ranges_done': 2,
|
||||
|
Loading…
Reference in New Issue
Block a user