clean up _one_shard_cycle state machine
* break out per broker handling from _one_shard_cycle to _process_broker * make _process_broker the one place where sharding states are transitioned Change-Id: I3b65e8988bcde697cc286e274ccc41c2671f1937
This commit is contained in:
@@ -479,7 +479,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
# overlap with anything
|
||||
return continue_with_container
|
||||
|
||||
def _update_shard_range_counts(self, root_account, root_container, broker):
|
||||
def _update_shard_range_counts(self, broker, root_account, root_container):
|
||||
if broker.container == root_container:
|
||||
return
|
||||
shard_range = broker.get_own_shard_range()
|
||||
@@ -495,6 +495,99 @@ class ContainerSharder(ContainerReplicator):
|
||||
item = broker.metadata.get(header)
|
||||
return None if item is None else item[0]
|
||||
|
||||
def _process_broker(self, broker, node, part):
|
||||
root_account, root_container = broker.get_shard_root_path()
|
||||
shard_range = broker.get_own_shard_range()
|
||||
|
||||
# Before we do any heavy lifting, lets do an audit on the shard
|
||||
# container. We grab the root's view of the shard_points and make
|
||||
# sure this container exists in it and in what should be it's
|
||||
# parent. If its in both great, If it exists in either but not the
|
||||
# other, then this needs to be fixed. If, however, it doesn't
|
||||
# exist in either then this container may not exist anymore so
|
||||
# quarantine it.
|
||||
# if not self._audit_shard_container(broker, shard_range,
|
||||
# root_account,
|
||||
# root_container):
|
||||
# continue
|
||||
|
||||
# now look and deal with misplaced objects.
|
||||
self._misplaced_objects(broker, node, root_account, root_container,
|
||||
shard_range)
|
||||
|
||||
if broker.is_deleted():
|
||||
# This container is deleted so we can skip it. We still want
|
||||
# deleted containers to go via misplaced items, cause they may
|
||||
# have new objects sitting in them that may need to move.
|
||||
return
|
||||
|
||||
state = broker.get_db_state()
|
||||
if state == DB_STATE_NOTFOUND:
|
||||
return
|
||||
|
||||
self.shard_cleanups = dict()
|
||||
# TODO: bring back leader election (maybe?); if so make it
|
||||
# on-demand since we may not need to know if we are leader for all
|
||||
# states
|
||||
leader = node['index'] == 0
|
||||
try:
|
||||
|
||||
if state == DB_STATE_UNSHARDED:
|
||||
if broker.get_shard_ranges():
|
||||
# container may have been given shard ranges rather
|
||||
# than found them e.g. via replication or a shrink event
|
||||
broker.set_sharding_state()
|
||||
state = DB_STATE_SHARDING
|
||||
elif leader:
|
||||
object_count = broker.get_info()['object_count']
|
||||
if object_count >= self.shard_container_size:
|
||||
broker.set_sharding_state()
|
||||
state = DB_STATE_SHARDING
|
||||
|
||||
self._update_shard_range_counts(
|
||||
broker, root_account, root_container)
|
||||
|
||||
if state == DB_STATE_SHARDING:
|
||||
scan_complete = config_true_value(
|
||||
get_sharding_info(broker, 'Scan-Done'))
|
||||
if leader and not scan_complete:
|
||||
scan_complete = self._find_shard_ranges(
|
||||
broker, node, part)
|
||||
|
||||
# always try to cleave any pending shard ranges
|
||||
cleave_complete = self._cleave(broker, node, root_account,
|
||||
root_container)
|
||||
|
||||
if scan_complete and cleave_complete:
|
||||
# we've finished sharding this container.
|
||||
broker.set_sharded_state()
|
||||
state = DB_STATE_SHARDED
|
||||
self.logger.info('Completed sharding of %s/%s',
|
||||
broker.account, broker.container)
|
||||
self.logger.increment('sharding_complete')
|
||||
|
||||
self._update_shard_range_counts(
|
||||
broker, root_account, root_container)
|
||||
|
||||
if state == DB_STATE_SHARDED:
|
||||
if not broker.is_root_container():
|
||||
# sharded shard containers get cleaned up
|
||||
self.logger.info('Deleting sharded shard %s/%s',
|
||||
broker.account, broker.container)
|
||||
# We aren't in the root container.
|
||||
self._update_shard_ranges(root_account, root_container,
|
||||
'PUT', broker.get_shard_ranges())
|
||||
timestamp = Timestamp.now().internal
|
||||
shard_range = broker.get_own_shard_range()
|
||||
shard_range.timestamp = timestamp
|
||||
self._update_shard_ranges(root_account, root_container,
|
||||
'DELETE', [shard_range])
|
||||
broker.delete_db(timestamp)
|
||||
finally:
|
||||
self.logger.increment('scanned')
|
||||
self.stats['containers_scanned'] += 1
|
||||
self._periodic_report_stats()
|
||||
|
||||
def _one_shard_cycle(self):
|
||||
"""
|
||||
The main function, everything the sharder does forks from this method.
|
||||
@@ -545,95 +638,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
if not sharded:
|
||||
# Not a shard container
|
||||
continue
|
||||
root_account, root_container = broker.get_shard_root_path()
|
||||
shard_range = broker.get_own_shard_range()
|
||||
|
||||
# Before we do any heavy lifting, lets do an audit on the shard
|
||||
# container. We grab the root's view of the shard_points and make
|
||||
# sure this container exists in it and in what should be it's
|
||||
# parent. If its in both great, If it exists in either but not the
|
||||
# other, then this needs to be fixed. If, however, it doesn't
|
||||
# exist in either then this container may not exist anymore so
|
||||
# quarantine it.
|
||||
# if not self._audit_shard_container(broker, shard_range,
|
||||
# root_account,
|
||||
# root_container):
|
||||
# continue
|
||||
|
||||
# now look and deal with misplaced objects.
|
||||
self._misplaced_objects(broker, node, root_account, root_container,
|
||||
shard_range)
|
||||
|
||||
if broker.is_deleted():
|
||||
# This container is deleted so we can skip it. We still want
|
||||
# deleted containers to go via misplaced items, cause they may
|
||||
# have new objects sitting in them that may need to move.
|
||||
continue
|
||||
|
||||
state = broker.get_db_state()
|
||||
if state in (DB_STATE_SHARDED, DB_STATE_NOTFOUND):
|
||||
continue
|
||||
|
||||
self.shard_cleanups = dict()
|
||||
|
||||
try:
|
||||
if broker.is_shrinking():
|
||||
# No matter what, we want to finish this shrink stage
|
||||
# before anything else.
|
||||
self._shrink(broker, root_account, root_container)
|
||||
continue
|
||||
|
||||
# The sharding mechanism is rather interesting. If the
|
||||
# container requires sharding.. that is big enough and
|
||||
# sharding hasn't started yet. Then we need to identify a
|
||||
# primary node to scan for shards. The others will wait for
|
||||
# shards to appear in the shard_ranges table
|
||||
# (via replication). They will start the sharding process
|
||||
|
||||
if state == DB_STATE_UNSHARDED:
|
||||
skip_shrinking = self.get_metadata_item(
|
||||
broker, 'X-Container-Sysmeta-Sharding')
|
||||
obj_count = broker.get_info()['object_count']
|
||||
if not skip_shrinking and obj_count < \
|
||||
(self.shard_container_size *
|
||||
self.shard_shrink_point):
|
||||
# TODO: Shrink
|
||||
# self._shrink(broker, root_account, root_container)
|
||||
continue
|
||||
elif obj_count < self.shard_container_size:
|
||||
self.logger.debug(
|
||||
'Skipping container %s/%s with object count %s' %
|
||||
(broker.account, broker.container, obj_count))
|
||||
continue
|
||||
|
||||
# We are either in the sharding state or we need to start
|
||||
# sharding.
|
||||
scan_complete = config_true_value(
|
||||
get_sharding_info(broker, 'Scan-Done'))
|
||||
# TODO: bring back leader election (maybe?)
|
||||
if node['index'] == 0 and not scan_complete:
|
||||
if broker.get_db_state == DB_STATE_UNSHARDED:
|
||||
# TODO (acoles): I *think* this is the right place to
|
||||
# transition to SHARDING state - we are about to scan
|
||||
# for pivots so want the original db closed to new
|
||||
# updates. But, we seem to conditionally call this
|
||||
# method in other places too? Have we done all the
|
||||
# checks needed to know that this container is going to
|
||||
# shard?
|
||||
broker.set_sharding_state()
|
||||
scan_complete = self._find_shard_ranges(broker, node, part)
|
||||
if scan_complete:
|
||||
self._cleave(broker, node, root_account,
|
||||
root_container)
|
||||
else:
|
||||
self._cleave(broker, node, root_account,
|
||||
root_container)
|
||||
finally:
|
||||
self._update_shard_range_counts(root_account, root_container,
|
||||
broker)
|
||||
self.logger.increment('scanned')
|
||||
self.stats['containers_scanned'] += 1
|
||||
self._periodic_report_stats()
|
||||
self._process_broker(broker, node, part)
|
||||
|
||||
# wipe out the cache do disable bypass in delete_db
|
||||
cleanups = self.shard_cleanups
|
||||
@@ -774,7 +779,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
table.
|
||||
|
||||
:param broker:
|
||||
:return:
|
||||
:return: True if the last shard range was found, False otherwise
|
||||
"""
|
||||
self.logger.info('Started scan for shard ranges on %s/%s',
|
||||
broker.account, broker.container)
|
||||
@@ -789,7 +794,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
# we didn't find anything
|
||||
self.logger.warning("No shard ranges found, something went "
|
||||
"wrong. We will try again next cycle.")
|
||||
return
|
||||
return last_found
|
||||
|
||||
# TODO: if we bring back leader election, this is about the spot where
|
||||
# we should confirm we're still the scanner
|
||||
@@ -839,9 +844,6 @@ class ContainerSharder(ContainerReplicator):
|
||||
self.cpool.spawn(
|
||||
self._replicate_object, part, broker.db_file, node['id'])
|
||||
|
||||
if broker.get_db_state == DB_STATE_UNSHARDED:
|
||||
broker.set_sharding_state()
|
||||
|
||||
self.logger.info(
|
||||
"Completed scan for shard ranges: %d found, %d created.",
|
||||
len(found_ranges), len(shard_ranges))
|
||||
@@ -849,7 +851,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
# We've found the last shard range, so mark that in metadata
|
||||
update_sharding_info(broker, {'Scan-Done': True})
|
||||
self.logger.info("Final shard range reached.")
|
||||
return True
|
||||
return last_found
|
||||
|
||||
def _shrink(self, broker, root_account, root_container):
|
||||
"""shrinking is a 2 phase process
|
||||
@@ -1141,32 +1143,18 @@ class ContainerSharder(ContainerReplicator):
|
||||
break
|
||||
qry['marker'] = objects[-1]['name']
|
||||
|
||||
def _sharding_complete(self, root_account, root_container, broker):
|
||||
broker.set_sharded_state()
|
||||
if root_container != broker.container:
|
||||
# We aren't in the root container.
|
||||
self._update_shard_ranges(root_account, root_container, 'PUT',
|
||||
broker.get_shard_ranges())
|
||||
timestamp = Timestamp.now().internal
|
||||
shard_range = broker.get_own_shard_range()
|
||||
shard_range.timestamp = timestamp
|
||||
self._update_shard_ranges(root_account, root_container, 'DELETE',
|
||||
[shard_range])
|
||||
broker.delete_db(timestamp)
|
||||
|
||||
def _cleave(self, broker, node, root_account, root_container):
|
||||
# Returns True if all available shard ranges are successfully cleaved,
|
||||
# False otherwise
|
||||
shard_ranges = broker.get_shard_ranges()
|
||||
if not shard_ranges:
|
||||
return
|
||||
return True
|
||||
|
||||
state = broker.get_db_state()
|
||||
if state == DB_STATE_SHARDED:
|
||||
self.logger.info('Passing over already sharded container %s/%s',
|
||||
broker.account, broker.container)
|
||||
return
|
||||
|
||||
if state == DB_STATE_UNSHARDED:
|
||||
broker.set_sharding_state()
|
||||
return True
|
||||
|
||||
sharding_info = get_sharding_info(broker)
|
||||
scan_complete = sharding_info.get('Scan-Done')
|
||||
@@ -1207,13 +1195,9 @@ class ContainerSharder(ContainerReplicator):
|
||||
ranges_todo.append(last_range)
|
||||
|
||||
if not ranges_todo:
|
||||
if scan_complete:
|
||||
self._sharding_complete(root_account, root_container, broker)
|
||||
else:
|
||||
self.logger.info('No new shard range for %s/%s found, will '
|
||||
'try again next cycle',
|
||||
broker.account, broker.container)
|
||||
return
|
||||
self.logger.info('No new shard ranges to cleave for %s/%s',
|
||||
broker.account, broker.container)
|
||||
return True
|
||||
|
||||
ranges_done = []
|
||||
policy_index = broker.storage_policy_index
|
||||
@@ -1229,7 +1213,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
self.logger.warning(str(duex))
|
||||
self.logger.increment('failure')
|
||||
self.stats['containers_failed'] += 1
|
||||
return
|
||||
return False
|
||||
|
||||
self._add_shard_metadata(new_broker, root_account,
|
||||
root_container, shard_range)
|
||||
@@ -1259,9 +1243,9 @@ class ContainerSharder(ContainerReplicator):
|
||||
new_broker.account, new_broker.container)
|
||||
self.cpool.spawn(
|
||||
self._replicate_object, new_part, new_broker.db_file, node_id)
|
||||
self.logger.info('Node %d sharded %s/%s at shard range %s.',
|
||||
self.logger.info('Node %d sharded %s/%s for shard range %s.',
|
||||
node['id'], broker.account, broker.container,
|
||||
shard_range.upper)
|
||||
shard_range)
|
||||
self.logger.increment('sharded')
|
||||
self.stats['containers_sharded'] += 1
|
||||
|
||||
@@ -1275,13 +1259,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
# all results
|
||||
# TODO: why not use waitall? plus a timeout might be a good idea
|
||||
any(self.cpool)
|
||||
|
||||
if scan_complete and len(ranges_done) == len(ranges_todo):
|
||||
# we've finished sharding this container.
|
||||
self.logger.info('Completed sharding of %s/%s',
|
||||
broker.account, broker.container)
|
||||
self._sharding_complete(root_account, root_container, broker)
|
||||
self.logger.increment('sharding_complete')
|
||||
return len(ranges_done) == len(ranges_todo)
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
"""Run the container sharder until stopped."""
|
||||
|
||||
@@ -22,7 +22,7 @@ import unittest
|
||||
|
||||
|
||||
from swift.container.backend import ContainerBroker, DB_STATE_UNSHARDED, \
|
||||
DB_STATE_SHARDING, DB_STATE_SHARDED
|
||||
DB_STATE_SHARDING
|
||||
from swift.container.sharder import ContainerSharder, RangeAnalyser, \
|
||||
update_sharding_info
|
||||
from swift.common.utils import ShardRange, Timestamp, hash_path, \
|
||||
@@ -537,7 +537,8 @@ class TestSharder(unittest.TestCase):
|
||||
# run cleave - no shard ranges, nothing happens
|
||||
node = {'id': 2, 'index': 1}
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._cleave(broker, node, 'a', 'c')
|
||||
self.assertTrue(sharder._cleave(broker, node, 'a', 'c'))
|
||||
|
||||
self.assertEqual(DB_STATE_UNSHARDED, broker.get_db_state())
|
||||
sharder._replicate_object.assert_not_called()
|
||||
self.assertFalse(os.path.exists(expected_shard_dbs[0]))
|
||||
@@ -547,10 +548,11 @@ class TestSharder(unittest.TestCase):
|
||||
|
||||
broker.merge_shard_ranges(
|
||||
[dict(shard_range) for shard_range in initial_shard_ranges[:3]])
|
||||
broker.set_sharding_state()
|
||||
|
||||
# run cleave again now we have shard ranges
|
||||
# run cleave again now we have shard ranges - first batch is cleaved
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._cleave(broker, node, 'a', 'c')
|
||||
self.assertFalse(sharder._cleave(broker, node, 'a', 'c'))
|
||||
|
||||
self.assertEqual(DB_STATE_SHARDING, broker.get_db_state())
|
||||
sharder._replicate_object.assert_has_calls(
|
||||
@@ -589,9 +591,9 @@ class TestSharder(unittest.TestCase):
|
||||
self.assertEqual(['there', mock.ANY],
|
||||
metadata['X-Container-Sysmeta-Shard-Last-1'])
|
||||
|
||||
# run cleave - should process the third range
|
||||
# run cleave - should process the third range, which is final range
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._cleave(broker, node, 'a', 'c')
|
||||
self.assertTrue(sharder._cleave(broker, node, 'a', 'c'))
|
||||
|
||||
self.assertEqual(DB_STATE_SHARDING, broker.get_db_state())
|
||||
sharder._replicate_object.assert_called_once_with(
|
||||
@@ -614,7 +616,7 @@ class TestSharder(unittest.TestCase):
|
||||
|
||||
# run cleave - should be a no-op, all existing ranges have been cleaved
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._cleave(broker, node, 'a', 'c')
|
||||
self.assertTrue(sharder._cleave(broker, node, 'a', 'c'))
|
||||
|
||||
self.assertEqual(DB_STATE_SHARDING, broker.get_db_state())
|
||||
sharder._replicate_object.assert_not_called()
|
||||
@@ -624,9 +626,8 @@ class TestSharder(unittest.TestCase):
|
||||
update_sharding_info(broker, {'Scan-Done': True})
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._cleave(broker, node, 'a', 'c')
|
||||
self.assertTrue(sharder._cleave(broker, node, 'a', 'c'))
|
||||
|
||||
self.assertEqual(DB_STATE_SHARDED, broker.get_db_state())
|
||||
sharder._replicate_object.assert_called_once_with(
|
||||
0, expected_shard_dbs[3], 0)
|
||||
updated_shard_ranges = broker.get_shard_ranges()
|
||||
@@ -653,11 +654,11 @@ class TestSharder(unittest.TestCase):
|
||||
self.assertEqual(initial_root_info['object_count'],
|
||||
sum(info['object_count'] for info in shard_infos))
|
||||
|
||||
broker.set_sharded_state()
|
||||
# run cleave - should be a no-op
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._cleave(broker, node, 'a', 'c')
|
||||
self.assertTrue(sharder._cleave(broker, node, 'a', 'c'))
|
||||
|
||||
self.assertEqual(DB_STATE_SHARDED, broker.get_db_state())
|
||||
sharder._replicate_object.assert_not_called()
|
||||
|
||||
def test_misplaced_objects_root_container(self):
|
||||
|
||||
Reference in New Issue
Block a user