Sharder: warn when sharding appears to have stalled.

This patch add a configurable timeout after which the sharder
will warn if a container DB has not completed sharding.

The new config is container_sharding_timeout with a default of
172800 seconds (2 days).

Drive-by fix: recording sharding progress will cover the case
of shard range shrinking too.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>
Change-Id: I6ce299b5232a8f394e35f148317f9e08208a0c0f
This commit is contained in:
Jianjian Huo 2022-09-26 14:58:43 -07:00
parent 65a1f4a2ff
commit 4ed2b89cb7
3 changed files with 204 additions and 104 deletions

View File

@ -507,6 +507,12 @@ use = egg:swift#xprofile
# The default is 12 hours (12 x 60 x 60)
# recon_sharded_timeout = 43200
#
# Maximum amount of time in seconds after sharding has been started on a shard
# container and before it's considered as timeout. After this amount of time,
# sharder will warn that a container DB has not completed sharding.
# The default is 48 hours (48 x 60 x 60)
# container_sharding_timeout = 172800
#
# Large databases tend to take a while to work with, but we want to make sure
# we write down our progress. Use a larger-than-normal broker timeout to make
# us less likely to bomb out on a LockTimeout.

View File

@ -709,6 +709,8 @@ class ContainerSharderConf(object):
'recon_candidates_limit', int, 5)
self.recon_sharded_timeout = get_val(
'recon_sharded_timeout', int, 43200)
self.container_sharding_timeout = get_val(
'container_sharding_timeout', int, 172800)
self.conn_timeout = get_val(
'conn_timeout', float, 5)
self.auto_shard = get_val(
@ -910,36 +912,59 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
category['top'] = candidates
def _record_sharding_progress(self, broker, node, error):
own_shard_range = broker.get_own_shard_range()
db_state = broker.get_db_state()
if (db_state in (UNSHARDED, SHARDING, SHARDED)
and own_shard_range.state in (ShardRange.SHARDING,
ShardRange.SHARDED)):
if db_state == SHARDED:
contexts = CleavingContext.load_all(broker)
if not contexts:
return
context_ts = max(float(ts) for c, ts in contexts)
if context_ts + self.recon_sharded_timeout \
< Timestamp.now().timestamp:
# last context timestamp too old for the
# broker to be recorded
return
if db_state not in (UNSHARDED, SHARDING, SHARDED):
return
own_shard_range = broker.get_own_shard_range()
if own_shard_range.state not in (
ShardRange.SHARDING, ShardRange.SHARDED,
ShardRange.SHRINKING, ShardRange.SHRUNK):
return
info = self._make_stats_info(broker, node, own_shard_range)
info['state'] = own_shard_range.state_text
info['db_state'] = broker.get_db_state()
states = [ShardRange.FOUND, ShardRange.CREATED,
ShardRange.CLEAVED, ShardRange.ACTIVE]
shard_ranges = broker.get_shard_ranges(states=states)
state_count = {}
for state in states:
state_count[ShardRange.STATES[state]] = 0
for shard_range in shard_ranges:
state_count[shard_range.state_text] += 1
info.update(state_count)
info['error'] = error and str(error)
self._append_stat('sharding_in_progress', 'all', info)
if db_state == SHARDED:
contexts = CleavingContext.load_all(broker)
if not contexts:
return
context_ts = max(float(ts) for c, ts in contexts)
if context_ts + self.recon_sharded_timeout \
< float(Timestamp.now()):
# last context timestamp too old for the
# broker to be recorded
return
info = self._make_stats_info(broker, node, own_shard_range)
info['state'] = own_shard_range.state_text
info['db_state'] = broker.get_db_state()
states = [ShardRange.FOUND, ShardRange.CREATED,
ShardRange.CLEAVED, ShardRange.ACTIVE]
shard_ranges = broker.get_shard_ranges(states=states)
state_count = {}
for state in states:
state_count[ShardRange.STATES[state]] = 0
for shard_range in shard_ranges:
state_count[shard_range.state_text] += 1
info.update(state_count)
info['error'] = error and str(error)
self._append_stat('sharding_in_progress', 'all', info)
if broker.sharding_required() and (
own_shard_range.epoch is not None) and (
float(own_shard_range.epoch) +
self.container_sharding_timeout <
time.time()):
# Note: There is no requirement that own_shard_range.epoch equals
# the time at which the own_shard_range was merged into the
# container DB, which predicates sharding starting. But s-m-s-r and
# auto-sharding do set epoch and then merge, so we use it to tell
# whether sharding has been taking too long or not.
self.logger.warning(
'Cleaving has not completed in %.2f seconds since %s.'
' Container DB file and path: %s (%s), DB state: %s,'
' own_shard_range state: %s, state count of shard ranges: %s' %
(time.time() - float(own_shard_range.epoch),
own_shard_range.epoch.isoformat, broker.db_file,
quote(broker.path), db_state,
own_shard_range.state_text, str(state_count)))
def _report_stats(self):
# report accumulated stats since start of one sharder cycle

View File

@ -114,6 +114,21 @@ class BaseTestSharder(unittest.TestCase):
self.assertNotEqual(old_db_id, broker.get_info()['id']) # sanity check
return broker
def _make_shrinking_broker(self, account='.shards_a', container='shard_c',
lower='here', upper='there', objects=None):
# caller should merge any acceptor range(s) into returned broker
broker = self._make_broker(account=account, container=container)
for obj in objects or []:
broker.put_object(*obj)
own_shard_range = ShardRange(
broker.path, next(self.ts_iter), lower, upper,
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
broker.merge_shard_ranges([own_shard_range])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
self.assertTrue(broker.set_sharding_state())
return broker
def _make_shard_ranges(self, bounds, state=None, object_count=0,
timestamp=Timestamp.now(), **kwargs):
if not isinstance(state, (tuple, list)):
@ -1922,22 +1937,14 @@ class TestSharder(BaseTestSharder):
# 'unique' ensures fresh dbs on each test iteration
unique[0] += 1
broker = self._make_broker(account='.shards_a',
container='donor_%s' % unique[0])
own_shard_range = ShardRange(
broker.path, next(self.ts_iter), 'h', 'w',
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
broker.merge_shard_ranges([own_shard_range])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
objects = [
('i', self.ts_encoded(), 3, 'text/plain', 'etag_t', 0, 0),
('m', self.ts_encoded(), 33, 'text/plain', 'etag_m', 0, 0),
('w', self.ts_encoded(), 100, 'text/plain', 'etag_w', 0, 0),
]
for obj in objects:
broker.put_object(*obj)
broker = self._make_shrinking_broker(
container='donor_%s' % unique[0], lower='h', upper='w',
objects=objects)
acceptor_epoch = next(self.ts_iter)
acceptors = [
ShardRange('.shards_a/acceptor_%s_%s' % (unique[0], bounds[1]),
@ -1966,7 +1973,6 @@ class TestSharder(BaseTestSharder):
db_hash[-3:], db_hash, db_name))
broker.merge_shard_ranges(acceptors)
broker.set_sharding_state()
# run cleave
with mock_timestamp_now_with_iter(self.ts_iter):
@ -1978,15 +1984,15 @@ class TestSharder(BaseTestSharder):
context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done)
self.assertEqual(expect_delete, context.cleaving_done)
own_sr = broker.get_own_shard_range()
if exp_progress_acceptors:
expected_cursor = exp_progress_acceptors[-1].upper_str
else:
expected_cursor = own_shard_range.lower_str
expected_cursor = own_sr.lower_str
self.assertEqual(expected_cursor, context.cursor)
self.assertEqual(3, context.cleave_to_row)
self.assertEqual(3, context.max_row)
self.assertEqual(SHARDING, broker.get_db_state())
own_sr = broker.get_own_shard_range()
if expect_delete and len(acceptor_bounds) == 1:
self.assertTrue(own_sr.deleted)
self.assertEqual(ShardRange.SHRUNK, own_sr.state)
@ -2754,24 +2760,18 @@ class TestSharder(BaseTestSharder):
self.assertEqual(0, context.ranges_todo)
def test_cleave_shrinking_to_active_root_range(self):
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.put_object(
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
# a donor previously shrunk to own...
broker = self._make_shrinking_broker(account='.shards_a',
container='shard_c')
deleted_range = ShardRange(
'.shards/other', next(self.ts_iter), 'here', 'there', deleted=True,
state=ShardRange.SHRUNK, epoch=next(self.ts_iter))
own_shard_range = ShardRange(
broker.path, next(self.ts_iter), 'here', '',
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
# root is the acceptor...
root = ShardRange(
'a/c', next(self.ts_iter), '', '',
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
broker.merge_shard_ranges([deleted_range, own_shard_range, root])
broker.merge_shard_ranges([deleted_range, root])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
self.assertTrue(broker.set_sharding_state())
# expect cleave to the root
with self._mock_sharder() as sharder:
@ -2818,12 +2818,9 @@ class TestSharder(BaseTestSharder):
# if shrinking shard has both active root and active other acceptor,
# verify that shard only cleaves to one of them;
# root will sort before acceptor if acceptor.upper==MAX
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.put_object(
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
own_shard_range = ShardRange(
broker.path, next(self.ts_iter), 'here', 'there',
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
objects = (
('here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0),)
broker = self._make_shrinking_broker(objects=objects)
# active acceptor with upper bound == MAX
acceptor = ShardRange(
'.shards/other', next(self.ts_iter), 'here', '', deleted=False,
@ -2832,10 +2829,9 @@ class TestSharder(BaseTestSharder):
root = ShardRange(
'a/c', next(self.ts_iter), '', '',
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
broker.merge_shard_ranges([own_shard_range, acceptor, root])
broker.merge_shard_ranges([acceptor, root])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
self.assertTrue(broker.set_sharding_state())
# expect cleave to the root
acceptor.upper = ''
@ -2859,12 +2855,9 @@ class TestSharder(BaseTestSharder):
# if shrinking shard has both active root and active other acceptor,
# verify that shard only cleaves to one of them;
# root will sort after acceptor if acceptor.upper<MAX
broker = self._make_broker(account='.shards_a', container='shard_c')
broker.put_object(
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
own_shard_range = ShardRange(
broker.path, next(self.ts_iter), 'here', 'there',
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
objects = (
('here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0),)
broker = self._make_shrinking_broker(objects=objects)
# active acceptor with upper bound < MAX
acceptor = ShardRange(
'.shards/other', next(self.ts_iter), 'here', 'where',
@ -2873,10 +2866,7 @@ class TestSharder(BaseTestSharder):
root = ShardRange(
'a/c', next(self.ts_iter), '', '',
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
broker.merge_shard_ranges([own_shard_range, acceptor, root])
broker.set_sharding_sysmeta('Root', 'a/c')
self.assertFalse(broker.is_root_container()) # sanity check
self.assertTrue(broker.set_sharding_state())
broker.merge_shard_ranges([acceptor, root])
# expect cleave to the acceptor
with self._mock_sharder() as sharder:
@ -2893,6 +2883,25 @@ class TestSharder(BaseTestSharder):
]
self.assertEqual(1, len(info))
def _check_not_complete_sharding(self, broker):
with self._mock_sharder() as sharder:
self.assertFalse(sharder._complete_sharding(broker))
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertIn(
'Repeat cleaving required for %r' % broker.db_files[0],
warning_lines[0])
self.assertFalse(warning_lines[1:])
sharder.logger.clear()
context = CleavingContext.load(broker)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.misplaced_done)
self.assertEqual('', context.cursor)
self.assertEqual(ShardRange.SHARDING,
broker.get_own_shard_range().state)
for shard_range in broker.get_shard_ranges():
self.assertEqual(ShardRange.CLEAVED, shard_range.state)
self.assertEqual(SHARDING, broker.get_db_state())
def _check_complete_sharding(self, account, container, shard_bounds):
broker = self._make_sharding_broker(
account=account, container=container, shard_bounds=shard_bounds)
@ -2902,27 +2911,8 @@ class TestSharder(BaseTestSharder):
broker.get_brokers()[0].merge_items([obj])
self.assertEqual(2, len(broker.db_files)) # sanity check
def check_not_complete():
with self._mock_sharder() as sharder:
self.assertFalse(sharder._complete_sharding(broker))
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertIn(
'Repeat cleaving required for %r' % broker.db_files[0],
warning_lines[0])
self.assertFalse(warning_lines[1:])
sharder.logger.clear()
context = CleavingContext.load(broker)
self.assertFalse(context.cleaving_done)
self.assertFalse(context.misplaced_done)
self.assertEqual('', context.cursor)
self.assertEqual(ShardRange.SHARDING,
broker.get_own_shard_range().state)
for shard_range in broker.get_shard_ranges():
self.assertEqual(ShardRange.CLEAVED, shard_range.state)
self.assertEqual(SHARDING, broker.get_db_state())
# no cleave context progress
check_not_complete()
self._check_not_complete_sharding(broker)
# cleaving_done is False
context = CleavingContext.load(broker)
@ -2931,13 +2921,13 @@ class TestSharder(BaseTestSharder):
context.cleaving_done = False
context.misplaced_done = True
context.store(broker)
check_not_complete()
self._check_not_complete_sharding(broker)
# misplaced_done is False
context.misplaced_done = False
context.cleaving_done = True
context.store(broker)
check_not_complete()
self._check_not_complete_sharding(broker)
# modified db max row
old_broker = broker.get_brokers()[0]
@ -2949,13 +2939,13 @@ class TestSharder(BaseTestSharder):
context.misplaced_done = True
context.cleaving_done = True
context.store(broker)
check_not_complete()
self._check_not_complete_sharding(broker)
# db id changes
broker.get_brokers()[0].newid('fake_remote_id')
context.cleave_to_row = 2 # pretend all rows have been cleaved, again
context.store(broker)
check_not_complete()
self._check_not_complete_sharding(broker)
# context ok
context = CleavingContext.load(broker)
@ -3028,6 +3018,91 @@ class TestSharder(BaseTestSharder):
sharder._record_sharding_progress(broker, {}, None)
mocked.assert_not_called()
def test_incomplete_sharding_progress_warning_log(self):
# test to verify sharder will print warning logs if sharding has been
# taking too long.
broker = self._make_sharding_broker(
'a', 'c', (('', 'mid'), ('mid', '')))
obj = {'name': 'obj', 'created_at': next(self.ts_iter).internal,
'size': 14, 'content_type': 'text/plain', 'etag': 'an etag',
'deleted': 0}
broker.get_brokers()[0].merge_items([obj])
self.assertEqual(2, len(broker.db_files))
# sharding is not complete due to no cleave context progress.
self._check_not_complete_sharding(broker)
own_shard_range = broker.get_own_shard_range()
# advance time but still within 'container_sharding_timeout'.
future_time = 10000 + float(own_shard_range.epoch)
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._record_sharding_progress(broker, {}, None)
self.assertEqual([], self.logger.get_lines_for_level('warning'))
future_time = 172800 + float(own_shard_range.epoch)
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._record_sharding_progress(broker, {}, None)
self.assertEqual([], self.logger.get_lines_for_level('warning'))
# advance time beyond 'container_sharding_timeout'.
future_time = 172800 + float(own_shard_range.epoch) + 1
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._record_sharding_progress(broker, {}, None)
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertIn(
'Cleaving has not completed in %.2f seconds since %s.' %
(future_time - float(own_shard_range.epoch),
own_shard_range.epoch.isoformat),
warning_lines[0])
def test_incomplete_shrinking_progress_warning_log(self):
# test to verify sharder will print warning logs if shrinking has been
# taking too long.
broker = self._make_shrinking_broker()
obj = {'name': 'obj', 'created_at': next(self.ts_iter).internal,
'size': 14, 'content_type': 'text/plain', 'etag': 'an etag',
'deleted': 0}
broker.get_brokers()[0].merge_items([obj])
# active acceptor with upper bound < MAX
acceptor = ShardRange(
'.shards/other', next(self.ts_iter), 'here', 'where',
deleted=False, state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
broker.merge_shard_ranges([acceptor])
context = CleavingContext.load(broker)
self.assertFalse(context.cleaving_done)
own_shard_range = broker.get_own_shard_range()
# advance time but still within 'container_sharding_timeout'.
future_time = 10000 + float(own_shard_range.epoch)
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._record_sharding_progress(broker, {}, None)
self.assertEqual([], self.logger.get_lines_for_level('warning'))
future_time = 172800 + float(own_shard_range.epoch)
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._record_sharding_progress(broker, {}, None)
self.assertEqual([], self.logger.get_lines_for_level('warning'))
# advance time beyond 'container_sharding_timeout'.
future_time = 172800 + float(own_shard_range.epoch) + 1
with mock.patch(
'swift.container.sharder.time.time',
return_value=future_time), self._mock_sharder() as sharder:
sharder._record_sharding_progress(broker, {}, None)
warning_lines = sharder.logger.get_lines_for_level('warning')
self.assertIn(
'Cleaving has not completed in %.2f seconds since %s.' %
(future_time - float(own_shard_range.epoch),
own_shard_range.epoch.isoformat),
warning_lines[0])
def test_identify_sharding_old_style_candidate(self):
brokers = [self._make_broker(container='c%03d' % i) for i in range(6)]
for broker in brokers:
@ -6001,17 +6076,8 @@ class TestSharder(BaseTestSharder):
def test_audit_shard_root_ranges_fetch_fails_while_shrinking(self):
# check audit copes with failed response while shard is shrinking
ts = next(self.ts_iter)
own_sr = ShardRange(
str(ShardName.create('.shards_a', 'c', 'c', ts, 0)),
ts, lower='a', upper='b', state=ShardRange.SHRINKING,
state_timestamp=ts)
broker = self._make_broker(account=own_sr.account,
container=own_sr.container)
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
broker.merge_shard_ranges(own_sr)
self.assertFalse(broker.is_root_container())
broker = self._make_shrinking_broker(lower='a', upper='b')
own_sr = broker.get_own_shard_range()
sharder, mock_swift = self.call_audit_container(
broker, [], exc=internal_client.UnexpectedResponse('bad', 'resp'))
self.assertEqual([], broker.get_shard_ranges())
@ -8093,6 +8159,7 @@ class TestContainerSharderConf(unittest.TestCase):
'broker_timeout': 60,
'recon_candidates_limit': 5,
'recon_sharded_timeout': 43200,
'container_sharding_timeout': 172800,
'conn_timeout': 5.0,
'auto_shard': False,
'shrink_threshold': 100000,
@ -8113,6 +8180,7 @@ class TestContainerSharderConf(unittest.TestCase):
'broker_timeout': 61,
'recon_candidates_limit': 6,
'recon_sharded_timeout': 43201,
'container_sharding_timeout': 172801,
'conn_timeout': 5.1,
'auto_shard': True,
'shrink_threshold': 100001,
@ -8133,6 +8201,7 @@ class TestContainerSharderConf(unittest.TestCase):
'broker_timeout': 61,
'recon_candidates_limit': 6,
'recon_sharded_timeout': 43201,
'container_sharding_timeout': 172801,
'conn_timeout': 5.1,
'auto_shard': True,
'minimum_shard_size': 1}