Sharder: add more logs during container cleaving.
Add more logs for each important cleaving step to help investigate some slow cleaving processes. Also fixed some misleading test names. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Change-Id: I4727dcb6268c68c04d86e109aa14f54234a656f9
This commit is contained in:
parent
61cc0573e3
commit
c4ad66b494
|
@ -1777,8 +1777,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||||
self._send_shard_ranges(
|
self._send_shard_ranges(
|
||||||
broker.root_account, broker.root_container, created_ranges)
|
broker.root_account, broker.root_container, created_ranges)
|
||||||
self.logger.info(
|
self.logger.info(
|
||||||
"Completed creating shard range containers: %d created.",
|
"Completed creating shard range containers: %d created, "
|
||||||
len(created_ranges))
|
"from sharding container %s",
|
||||||
|
len(created_ranges), quote(broker.path))
|
||||||
return len(created_ranges)
|
return len(created_ranges)
|
||||||
|
|
||||||
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
|
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
|
||||||
|
@ -1967,7 +1968,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||||
own_shard_range = broker.get_own_shard_range()
|
own_shard_range = broker.get_own_shard_range()
|
||||||
cleaving_context.cursor = own_shard_range.lower_str
|
cleaving_context.cursor = own_shard_range.lower_str
|
||||||
cleaving_context.ranges_todo = len(ranges_todo)
|
cleaving_context.ranges_todo = len(ranges_todo)
|
||||||
self.logger.debug('Starting to cleave (%s todo): %s',
|
self.logger.info('Starting to cleave (%s todo): %s',
|
||||||
cleaving_context.ranges_todo, quote(broker.path))
|
cleaving_context.ranges_todo, quote(broker.path))
|
||||||
|
|
||||||
own_shard_range = broker.get_own_shard_range(no_default=True)
|
own_shard_range = broker.get_own_shard_range(no_default=True)
|
||||||
|
@ -2173,9 +2174,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||||
ShardRange.SHRUNK):
|
ShardRange.SHRUNK):
|
||||||
if broker.get_shard_ranges():
|
if broker.get_shard_ranges():
|
||||||
# container has been given shard ranges rather than
|
# container has been given shard ranges rather than
|
||||||
# found them e.g. via replication or a shrink event
|
# found them e.g. via replication or a shrink event,
|
||||||
|
# or manually triggered cleaving.
|
||||||
if broker.set_sharding_state():
|
if broker.set_sharding_state():
|
||||||
state = SHARDING
|
state = SHARDING
|
||||||
|
self.logger.info('Kick off container cleaving on %s, '
|
||||||
|
'own shard range in state %r',
|
||||||
|
quote(broker.path),
|
||||||
|
own_shard_range.state_text)
|
||||||
elif is_leader:
|
elif is_leader:
|
||||||
if broker.set_sharding_state():
|
if broker.set_sharding_state():
|
||||||
state = SHARDING
|
state = SHARDING
|
||||||
|
@ -2202,14 +2208,16 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||||
cleave_complete = self._cleave(broker)
|
cleave_complete = self._cleave(broker)
|
||||||
|
|
||||||
if cleave_complete:
|
if cleave_complete:
|
||||||
self.logger.info('Completed cleaving of %s',
|
|
||||||
quote(broker.path))
|
|
||||||
if self._complete_sharding(broker):
|
if self._complete_sharding(broker):
|
||||||
state = SHARDED
|
state = SHARDED
|
||||||
self._increment_stat('visited', 'completed', statsd=True)
|
self._increment_stat('visited', 'completed', statsd=True)
|
||||||
else:
|
self.logger.info(
|
||||||
self.logger.debug('Remaining in sharding state %s',
|
'Completed cleaving of %s, DB set to sharded state',
|
||||||
quote(broker.path))
|
quote(broker.path))
|
||||||
|
else:
|
||||||
|
self.logger.info(
|
||||||
|
'Completed cleaving of %s, DB remaining in sharding '
|
||||||
|
'state', quote(broker.path))
|
||||||
|
|
||||||
if not broker.is_deleted():
|
if not broker.is_deleted():
|
||||||
if state == SHARDED and broker.is_root_container():
|
if state == SHARDED and broker.is_root_container():
|
||||||
|
|
|
@ -2112,7 +2112,6 @@ class TestSharder(BaseTestSharder):
|
||||||
os.path.join(self.tempdir, 'sda', 'containers', '0',
|
os.path.join(self.tempdir, 'sda', 'containers', '0',
|
||||||
db_hash[-3:], db_hash, db_hash + '.db'))
|
db_hash[-3:], db_hash, db_hash + '.db'))
|
||||||
broker.merge_shard_ranges(shard_ranges)
|
broker.merge_shard_ranges(shard_ranges)
|
||||||
self.assertTrue(broker.set_sharding_state())
|
|
||||||
old_broker = broker.get_brokers()[0]
|
old_broker = broker.get_brokers()[0]
|
||||||
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
'index': 0}
|
'index': 0}
|
||||||
|
@ -2166,6 +2165,12 @@ class TestSharder(BaseTestSharder):
|
||||||
self.assertEqual('', context.cursor)
|
self.assertEqual('', context.cursor)
|
||||||
self.assertEqual(10, context.cleave_to_row)
|
self.assertEqual(10, context.cleave_to_row)
|
||||||
self.assertEqual(12, context.max_row) # note that max row increased
|
self.assertEqual(12, context.max_row) # note that max row increased
|
||||||
|
lines = sharder.logger.get_lines_for_level('info')
|
||||||
|
self.assertEqual(
|
||||||
|
["Kick off container cleaving on a/c, own shard range in state "
|
||||||
|
"'sharding'", "Starting to cleave (2 todo): a/c"], lines[:2])
|
||||||
|
self.assertIn('Completed cleaving of a/c, DB remaining in '
|
||||||
|
'sharding state', lines[1:])
|
||||||
lines = sharder.logger.get_lines_for_level('warning')
|
lines = sharder.logger.get_lines_for_level('warning')
|
||||||
self.assertIn('Repeat cleaving required', lines[0])
|
self.assertIn('Repeat cleaving required', lines[0])
|
||||||
self.assertFalse(lines[1:])
|
self.assertFalse(lines[1:])
|
||||||
|
@ -2194,6 +2199,10 @@ class TestSharder(BaseTestSharder):
|
||||||
shard_ranges[1].state = ShardRange.ACTIVE
|
shard_ranges[1].state = ShardRange.ACTIVE
|
||||||
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
|
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
|
||||||
self._check_objects(new_objects[1:], expected_shard_dbs[1])
|
self._check_objects(new_objects[1:], expected_shard_dbs[1])
|
||||||
|
lines = sharder.logger.get_lines_for_level('info')
|
||||||
|
self.assertEqual('Starting to cleave (2 todo): a/c', lines[0])
|
||||||
|
self.assertIn('Completed cleaving of a/c, DB set to sharded state',
|
||||||
|
lines[1:])
|
||||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||||
|
|
||||||
def test_cleave_multiple_storage_policies(self):
|
def test_cleave_multiple_storage_policies(self):
|
||||||
|
@ -5005,7 +5014,7 @@ class TestSharder(BaseTestSharder):
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||||
broker.logger.clear()
|
broker.logger.clear()
|
||||||
|
|
||||||
def _check_process_broker_sharding_no_others(self, start_state, deleted):
|
def _check_process_broker_sharding_others(self, start_state, deleted):
|
||||||
# verify that when existing own_shard_range has given state and there
|
# verify that when existing own_shard_range has given state and there
|
||||||
# are other shard ranges then the sharding process will complete
|
# are other shard ranges then the sharding process will complete
|
||||||
broker = self._make_broker(hash_='hash%s%s' % (start_state, deleted))
|
broker = self._make_broker(hash_='hash%s%s' % (start_state, deleted))
|
||||||
|
@ -5021,6 +5030,7 @@ class TestSharder(BaseTestSharder):
|
||||||
broker.delete_db(next(self.ts_iter).internal)
|
broker.delete_db(next(self.ts_iter).internal)
|
||||||
|
|
||||||
with self._mock_sharder() as sharder:
|
with self._mock_sharder() as sharder:
|
||||||
|
# pretend shard containers are created ok so sharding proceeds
|
||||||
with mock.patch.object(
|
with mock.patch.object(
|
||||||
sharder, '_send_shard_ranges', return_value=True):
|
sharder, '_send_shard_ranges', return_value=True):
|
||||||
with mock_timestamp_now_with_iter(self.ts_iter):
|
with mock_timestamp_now_with_iter(self.ts_iter):
|
||||||
|
@ -5030,33 +5040,36 @@ class TestSharder(BaseTestSharder):
|
||||||
final_own_sr = broker.get_own_shard_range(no_default=True)
|
final_own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
self.assertEqual(SHARDED, broker.get_db_state())
|
self.assertEqual(SHARDED, broker.get_db_state())
|
||||||
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
|
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
|
||||||
|
lines = broker.logger.get_lines_for_level('info')
|
||||||
|
self.assertIn('Completed creating shard range containers: 2 created, '
|
||||||
|
'from sharding container a/c', lines)
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||||
# self.assertEqual(deleted, broker.is_deleted())
|
self.assertEqual(deleted, broker.is_deleted())
|
||||||
return own_sr, final_own_sr
|
return own_sr, final_own_sr
|
||||||
|
|
||||||
def test_process_broker_sharding_with_own_shard_range_no_others(self):
|
def test_process_broker_sharding_completes_with_own_and_other_ranges(self):
|
||||||
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
own_sr, final_own_sr = self._check_process_broker_sharding_others(
|
||||||
ShardRange.SHARDING, False)
|
ShardRange.SHARDING, False)
|
||||||
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
|
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
|
||||||
meta_timestamp=mock.ANY)
|
meta_timestamp=mock.ANY)
|
||||||
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
# verify that deleted DBs will be sharded
|
# verify that deleted DBs will be sharded
|
||||||
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
own_sr, final_own_sr = self._check_process_broker_sharding_others(
|
||||||
ShardRange.SHARDING, True)
|
ShardRange.SHARDING, True)
|
||||||
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
|
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
|
||||||
meta_timestamp=mock.ANY)
|
meta_timestamp=mock.ANY)
|
||||||
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
own_sr, final_own_sr = self._check_process_broker_sharding_others(
|
||||||
ShardRange.SHRINKING, False)
|
ShardRange.SHRINKING, False)
|
||||||
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
|
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
|
||||||
meta_timestamp=mock.ANY)
|
meta_timestamp=mock.ANY)
|
||||||
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
# verify that deleted DBs will be shrunk
|
# verify that deleted DBs will be shrunk
|
||||||
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
own_sr, final_own_sr = self._check_process_broker_sharding_others(
|
||||||
ShardRange.SHRINKING, True)
|
ShardRange.SHRINKING, True)
|
||||||
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
|
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
|
||||||
meta_timestamp=mock.ANY)
|
meta_timestamp=mock.ANY)
|
||||||
|
@ -5114,9 +5127,10 @@ class TestSharder(BaseTestSharder):
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||||
broker.logger.clear()
|
broker.logger.clear()
|
||||||
|
|
||||||
def _check_process_broker_sharding_others(self, state):
|
def _check_process_broker_sharding_stalls_others(self, state):
|
||||||
# verify states in which own_shard_range will cause sharding
|
# verify states in which own_shard_range will cause sharding
|
||||||
# process to start when other shard ranges are in the db
|
# process to start when other shard ranges are in the db, but stop
|
||||||
|
# when shard containers have not been created
|
||||||
broker = self._make_broker(hash_='hash%s' % state)
|
broker = self._make_broker(hash_='hash%s' % state)
|
||||||
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
'index': 0}
|
'index': 0}
|
||||||
|
@ -5148,10 +5162,10 @@ class TestSharder(BaseTestSharder):
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||||
|
|
||||||
def test_process_broker_sharding_with_own_shard_range_and_others(self):
|
def test_process_broker_sharding_stalls_with_own_and_other_ranges(self):
|
||||||
self._check_process_broker_sharding_others(ShardRange.SHARDING)
|
self._check_process_broker_sharding_stalls_others(ShardRange.SHARDING)
|
||||||
self._check_process_broker_sharding_others(ShardRange.SHRINKING)
|
self._check_process_broker_sharding_stalls_others(ShardRange.SHRINKING)
|
||||||
self._check_process_broker_sharding_others(ShardRange.SHARDED)
|
self._check_process_broker_sharding_stalls_others(ShardRange.SHARDED)
|
||||||
|
|
||||||
def test_process_broker_leader_auto_shard(self):
|
def test_process_broker_leader_auto_shard(self):
|
||||||
# verify conditions for acting as auto-shard leader
|
# verify conditions for acting as auto-shard leader
|
||||||
|
|
Loading…
Reference in New Issue