Sharding: Clean up old CleaveConext's during audit

This is a combination of 4 commits:

==========

Sharding: Clean up old CleaveConext's during audit

There is a sharding edge case where more CleaveContext are generated and
stored in the sharding container DB. If this number get's high enough,
like in the linked bug. If enough CleaveContects build up in the DB then
this can lead to the 503's when attempting to list the container due to
all the `X-Container-Sysmeta-Shard-Context-*` headers.

This patch resolves this by tracking the a CleaveContext's last
modified. And during the sharding audit, any context's that hasn't been
touched after reclaim_age are deleted.

This plus the skip empty ranges patches should improve these handoff
shards.

(cherry picked from commit 81a41da542)

----------

Sharding: Use the metadata timestamp as last_modified

This is a follow up patch from the cleaning up cleave context's patch
(patch 681970). Instead of tracking a last_modified timestamp, and storing
it in the context metadata, use the timestamp we use when storing any
metadata.

Reducing duplication is nice, but there's a more significant reason to
do this: affected container DBs can start getting cleaned up as soon as
they're running the new code rather than needing to wait for an
additional reclaim_age.

(cherry picked from commit 370ac4cd70)

----------

Make sharding methods with only one job

(cherry picked from commit f56071e573)

----------

sharding: Update probe test to verify CleavingContext cleanup

(cherry picked from commit 9495bc0003)

==========

Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com>
Co-Authored-By: Tim Burke <tim.burke@gmail.com>
Change-Id: I1e502c328be16fca5f1cca2186b27a0545fecc16
Closes-Bug: #1843313
This commit is contained in:
Matthew Oliver 2019-09-13 16:16:06 +10:00 committed by Tim Burke
parent 3d5f7aa41d
commit 6e9e922cd6
4 changed files with 239 additions and 7 deletions

View File

@ -1996,6 +1996,22 @@ class ContainerBroker(DatabaseBroker):
self.update_metadata({'X-Container-Sysmeta-Shard-' + key:
(value, Timestamp.now().internal)})
def get_sharding_sysmeta_with_timestamps(self):
"""
Returns sharding specific info from the broker's metadata with
timestamps.
:param key: if given the value stored under ``key`` in the sharding
info will be returned.
:return: a dict of sharding info with their timestamps.
"""
prefix = 'X-Container-Sysmeta-Shard-'
return {
k[len(prefix):]: v
for k, v in self.metadata.items()
if k.startswith(prefix)
}
def get_sharding_sysmeta(self, key=None):
"""
Returns sharding specific info from the broker's metadata.
@ -2005,13 +2021,11 @@ class ContainerBroker(DatabaseBroker):
:return: either a dict of sharding info or the value stored under
``key`` in that dict.
"""
prefix = 'X-Container-Sysmeta-Shard-'
metadata = self.metadata
info = dict((k[len(prefix):], v[0]) for
k, v in metadata.items() if k.startswith(prefix))
info = self.get_sharding_sysmeta_with_timestamps()
if key:
return info.get(key)
return info
return info.get(key, (None, None))[0]
else:
return {k: v[0] for k, v in info.items()}
def _load_root_info(self):
"""

View File

@ -220,6 +220,10 @@ class CleavingContext(object):
yield 'ranges_done', self.ranges_done
yield 'ranges_todo', self.ranges_todo
def __repr__(self):
return '%s(%s)' % (self.__class__.__name__, ', '.join(
'%s=%r' % prop for prop in self))
def _encode(cls, value):
if value is not None and six.PY2 and isinstance(value, six.text_type):
return value.encode('utf-8')
@ -241,6 +245,26 @@ class CleavingContext(object):
def _make_ref(cls, broker):
return broker.get_info()['id']
@classmethod
def load_all(cls, broker):
"""
Returns all cleaving contexts stored in the broker.
:param broker:
:return: list of tuples of (CleavingContext, timestamp)
"""
brokers = broker.get_brokers()
sysmeta = brokers[-1].get_sharding_sysmeta_with_timestamps()
for key, (val, timestamp) in sysmeta.items():
# If the value is of length 0, then the metadata is
# marked for deletion
if key.startswith("Context-") and len(val) > 0:
try:
yield cls(**json.loads(val)), timestamp
except ValueError:
continue
@classmethod
def load(cls, broker):
"""
@ -287,6 +311,11 @@ class CleavingContext(object):
return all((self.misplaced_done, self.cleaving_done,
self.max_row == self.cleave_to_row))
def delete(self, broker):
# These will get reclaimed when `_reclaim_metadata` in
# common/db.py is called.
broker.set_sharding_sysmeta('Context-' + self.ref, '')
DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000
DEFAULT_SHARD_SHRINK_POINT = 25
@ -723,12 +752,20 @@ class ContainerSharder(ContainerReplicator):
self._increment_stat('audit_shard', 'success', statsd=True)
return True
def _audit_cleave_contexts(self, broker):
now = Timestamp.now()
for context, last_mod in CleavingContext.load_all(broker):
if Timestamp(last_mod).timestamp + self.reclaim_age < \
now.timestamp:
context.delete(broker)
def _audit_container(self, broker):
if broker.is_deleted():
# if the container has been marked as deleted, all metadata will
# have been erased so no point auditing. But we want it to pass, in
# case any objects exist inside it.
return True
self._audit_cleave_contexts(broker)
if broker.is_root_container():
return self._audit_root_container(broker)
return self._audit_shard_container(broker)
@ -1306,6 +1343,7 @@ class ContainerSharder(ContainerReplicator):
modified_shard_ranges.append(own_shard_range)
broker.merge_shard_ranges(modified_shard_ranges)
if broker.set_sharded_state():
cleaving_context.delete(broker)
return True
else:
self.logger.warning(

View File

@ -27,6 +27,7 @@ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
from swift.common import utils
from swift.common.manager import Manager
from swift.container.sharder import CleavingContext
from swiftclient import client, get_auth, ClientException
from swift.proxy.controllers.obj import num_container_updates
@ -601,6 +602,8 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
[sr['state'] for sr in orig_root_shard_ranges])
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts) # length check
self.direct_delete_container(expect_failure=True)
self.assertLengthEqual(found['normal_dbs'], 2)
@ -618,6 +621,9 @@ class TestContainerSharding(BaseTestContainerSharding):
orig_root_shard_ranges, shard_ranges,
excludes=['meta_timestamp', 'state', 'state_timestamp'])
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts) # length check
if run_replicators:
Manager(['container-replicator']).once()
# replication doesn't change the db file names
@ -648,6 +654,9 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertGreaterEqual(updated.meta_timestamp,
orig['meta_timestamp'])
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts) # length check
# Check that entire listing is available
headers, actual_listing = self.assert_container_listing(obj_names)
# ... and check some other container properties
@ -738,6 +747,16 @@ class TestContainerSharding(BaseTestContainerSharding):
first_lower=orig_root_shard_ranges[0]['lower'],
last_upper=orig_root_shard_ranges[0]['upper'])
contexts = list(CleavingContext.load_all(broker))
self.assertEqual(len(contexts), 1)
context, _lm = contexts[0]
self.assertIs(context.cleaving_done, False)
self.assertIs(context.misplaced_done, True)
self.assertEqual(context.ranges_done, 2)
self.assertEqual(context.ranges_todo, 1)
self.assertEqual(context.max_row,
self.max_shard_size * 3 // 2)
# but third replica still has no idea it should be sharding
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
self.assertEqual(
@ -765,16 +784,36 @@ class TestContainerSharding(BaseTestContainerSharding):
ShardRange.SHARDING, broker.get_own_shard_range().state)
self.assertFalse(broker.get_shard_ranges())
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts) # length check
# ...until sub-shard ranges are replicated from another shard replica;
# there may also be a sub-shard replica missing so run replicators on
# all nodes to fix that if necessary
self.brain.servers.start(number=shard_1_nodes[2])
self.replicators.once()
# Now that the replicators have all run, third replica sees cleaving
# contexts for the first two
contexts = list(CleavingContext.load_all(broker))
self.assertEqual(len(contexts), 2)
# now run sharder again on third replica
self.sharders.once(
number=shard_1_nodes[2],
additional_args='--partitions=%s' % shard_1_part)
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
self.assertEqual('sharding', sharding_broker.get_db_state())
broker_id = broker.get_info()['id']
# Old, unsharded DB doesn't have the context...
contexts = list(CleavingContext.load_all(broker))
self.assertEqual(len(contexts), 2)
self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts])
# ...but the sharding one does
contexts = list(CleavingContext.load_all(sharding_broker))
self.assertEqual(len(contexts), 3)
self.assertIn(broker_id, [ctx[0].ref for ctx in contexts])
# check original first shard range state and sub-shards - all replicas
# should now be in consistent state
@ -847,6 +886,8 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assert_container_listing(['alpha'] + more_obj_names + obj_names)
# Run sharders again so things settle.
self.run_sharders(shard_1)
# Also run replicators to settle cleaving contexts
self.replicators.once()
# check original first shard range shards
for db_file in found_for_shard['shard_dbs']:
@ -857,6 +898,11 @@ class TestContainerSharding(BaseTestContainerSharding):
self.assertEqual(
[ShardRange.ACTIVE] * 3,
[sr.state for sr in broker.get_shard_ranges()])
# Make sure our cleaving contexts got cleaned up
contexts = list(CleavingContext.load_all(broker))
self.assertEqual([], contexts)
# check root shard ranges
root_shard_ranges = self.direct_get_container_shard_ranges()
for node, (hdrs, root_shards) in root_shard_ranges.items():

View File

@ -21,6 +21,7 @@ import os
import shutil
from contextlib import contextmanager
from tempfile import mkdtemp
from uuid import uuid4
import mock
import unittest
@ -4486,6 +4487,63 @@ class TestSharder(BaseTestSharder):
set((call[0][0].path, call[0][1]['id'], call[0][2])
for call in mock_process_broker.call_args_list))
def test_audit_cleave_contexts(self):
def add_cleave_context(id, last_modified):
params = {'ref': id,
'cursor': 'curs',
'max_row': 2,
'cleave_to_row': 2,
'last_cleave_to_row': 1,
'cleaving_done': False,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
key = 'X-Container-Sysmeta-Shard-Context-%s' % id
with mock_timestamp_now(Timestamp(last_modified)):
broker.update_metadata(
{key: (json.dumps(params),
Timestamp(last_modified).internal)})
def get_context(id, broker):
data = broker.get_sharding_sysmeta().get('Context-%s' % id)
if data:
return CleavingContext(**json.loads(data))
return data
reclaim_age = 100
broker = self._make_broker()
# sanity check
self.assertIsNone(broker.get_own_shard_range(no_default=True))
self.assertEqual(UNSHARDED, broker.get_db_state())
# Setup some cleaving contexts
id_old, id_newish = [str(uuid4()) for _ in range(2)]
contexts = ((id_old, 1),
(id_newish, reclaim_age // 2))
for id, last_modified in contexts:
add_cleave_context(id, last_modified)
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
with mock_timestamp_now(Timestamp(reclaim_age + 2)):
sharder._audit_cleave_contexts(broker)
old_ctx = get_context(id_old, broker)
self.assertEqual(old_ctx, "")
newish_ctx = get_context(id_newish, broker)
self.assertEqual(newish_ctx.ref, id_newish)
# If we push time another reclaim age later, and they all be removed
# minus id_missing_lm as it has a later last_modified.
with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder:
with mock_timestamp_now(Timestamp(reclaim_age * 2)):
sharder._audit_cleave_contexts(broker)
newish_ctx = get_context(id_newish, broker)
self.assertEqual(newish_ctx, "")
class TestCleavingContext(BaseTestSharder):
def test_init(self):
@ -4571,10 +4629,84 @@ class TestCleavingContext(BaseTestSharder):
self.assertEqual(2, ctx.ranges_done)
self.assertEqual(4, ctx.ranges_todo)
def test_load_all(self):
broker = self._make_broker()
last_ctx = None
timestamp = Timestamp.now()
db_ids = [str(uuid4()) for _ in range(6)]
for db_id in db_ids:
params = {'ref': db_id,
'cursor': 'curs',
'max_row': 2,
'cleave_to_row': 2,
'last_cleave_to_row': 1,
'cleaving_done': False,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
broker.update_metadata(
{key: (json.dumps(params), timestamp.internal)})
first_ctx = None
for ctx, lm in CleavingContext.load_all(broker):
if not first_ctx:
first_ctx = ctx
last_ctx = ctx
self.assertIn(ctx.ref, db_ids)
self.assertEqual(lm, timestamp.internal)
# If a context is deleted (metadata is "") then it's skipped
last_ctx.delete(broker)
db_ids.remove(last_ctx.ref)
# and let's modify the first
with mock_timestamp_now() as new_timestamp:
first_ctx.store(broker)
for ctx, lm in CleavingContext.load_all(broker):
self.assertIn(ctx.ref, db_ids)
if ctx.ref == first_ctx.ref:
self.assertEqual(lm, new_timestamp.internal)
else:
self.assertEqual(lm, timestamp.internal)
def test_delete(self):
broker = self._make_broker()
db_id = broker.get_info()['id']
params = {'ref': db_id,
'cursor': 'curs',
'max_row': 2,
'cleave_to_row': 2,
'last_cleave_to_row': 1,
'cleaving_done': False,
'misplaced_done': True,
'ranges_done': 2,
'ranges_todo': 4}
key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id
broker.update_metadata(
{key: (json.dumps(params), Timestamp.now().internal)})
ctx = CleavingContext.load(broker)
self.assertEqual(db_id, ctx.ref)
# Now let's delete it. When deleted the metadata key will exist, but
# the value will be "" as this means it'll be reaped later.
ctx.delete(broker)
sysmeta = broker.get_sharding_sysmeta()
for key, val in sysmeta.items():
if key == "Context-%s" % db_id:
self.assertEqual(val, "")
break
else:
self.fail("Deleted context 'Context-%s' not found")
def test_store(self):
broker = self._make_sharding_broker()
old_db_id = broker.get_brokers()[0].get_info()['id']
last_mod = Timestamp.now()
ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4)
with mock_timestamp_now(last_mod):
ctx.store(broker)
key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id
data = json.loads(broker.metadata[key][0])
@ -4588,6 +4720,8 @@ class TestCleavingContext(BaseTestSharder):
'ranges_done': 2,
'ranges_todo': 4}
self.assertEqual(expected, data)
# last modified is the metadata timestamp
self.assertEqual(broker.metadata[key][1], last_mod.internal)
def test_store_add_row_load(self):
# adding row to older db changes only max_row in the context