Merge "sharder: Keep cleaving on empty shard ranges"
This commit is contained in:
commit
2d87ad6333
@ -40,6 +40,11 @@ from swift.container.backend import ContainerBroker, \
|
||||
from swift.container.replicator import ContainerReplicator
|
||||
|
||||
|
||||
CLEAVE_SUCCESS = 0
|
||||
CLEAVE_FAILED = 1
|
||||
CLEAVE_EMPTY = 2
|
||||
|
||||
|
||||
def sharding_enabled(broker):
|
||||
# NB all shards will by default have been created with
|
||||
# X-Container-Sysmeta-Sharding set and will therefore be candidates for
|
||||
@ -636,6 +641,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
"""
|
||||
part = self.ring.get_part(shard_range.account, shard_range.container)
|
||||
node = self.find_local_handoff_for_part(part)
|
||||
put_timestamp = Timestamp.now().internal
|
||||
if not node:
|
||||
raise DeviceUnavailable(
|
||||
'No mounted devices found suitable for creating shard broker '
|
||||
@ -644,7 +650,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
shard_broker = ContainerBroker.create_broker(
|
||||
os.path.join(self.root, node['device']), part, shard_range.account,
|
||||
shard_range.container, epoch=shard_range.epoch,
|
||||
storage_policy_index=policy_index)
|
||||
storage_policy_index=policy_index, put_timestamp=put_timestamp)
|
||||
|
||||
# Get the valid info into the broker.container, etc
|
||||
shard_broker.get_info()
|
||||
@ -654,7 +660,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
'X-Container-Sysmeta-Sharding':
|
||||
('True', Timestamp.now().internal)})
|
||||
|
||||
return part, shard_broker, node['id']
|
||||
return part, shard_broker, node['id'], put_timestamp
|
||||
|
||||
def _audit_root_container(self, broker):
|
||||
# This is the root container, and therefore the tome of knowledge,
|
||||
@ -841,7 +847,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
last_index = next_index = 0
|
||||
for obj in objs:
|
||||
if dest_shard_range is None:
|
||||
# no more destinations: yield remainder of batch and return
|
||||
# no more destinations: yield remainder of batch and bail
|
||||
# NB there may be more batches of objects but none of them
|
||||
# will be placed so no point fetching them
|
||||
yield objs[last_index:], None, info
|
||||
@ -930,7 +936,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
continue
|
||||
|
||||
if dest_shard_range not in dest_brokers:
|
||||
part, dest_broker, node_id = self._get_shard_broker(
|
||||
part, dest_broker, node_id, _junk = self._get_shard_broker(
|
||||
dest_shard_range, src_broker.root_path, policy_index)
|
||||
# save the broker info that was sampled prior to the *first*
|
||||
# yielded objects for this destination
|
||||
@ -1159,12 +1165,15 @@ class ContainerSharder(ContainerReplicator):
|
||||
start = time.time()
|
||||
policy_index = broker.storage_policy_index
|
||||
try:
|
||||
shard_part, shard_broker, node_id = self._get_shard_broker(
|
||||
shard_range, broker.root_path, policy_index)
|
||||
shard_part, shard_broker, node_id, put_timestamp = \
|
||||
self._get_shard_broker(shard_range, broker.root_path,
|
||||
policy_index)
|
||||
except DeviceUnavailable as duex:
|
||||
self.logger.warning(str(duex))
|
||||
self._increment_stat('cleaved', 'failure', statsd=True)
|
||||
return False
|
||||
return CLEAVE_FAILED
|
||||
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
|
||||
# only cleave from the retiring db - misplaced objects handler will
|
||||
# deal with any objects in the fresh db
|
||||
@ -1175,13 +1184,36 @@ class ContainerSharder(ContainerReplicator):
|
||||
source_db_id = source_broker.get_info()['id']
|
||||
source_max_row = source_broker.get_max_row()
|
||||
sync_point = shard_broker.get_sync(source_db_id)
|
||||
if sync_point < source_max_row:
|
||||
if sync_point < source_max_row or source_max_row == -1:
|
||||
sync_from_row = max(cleaving_context.last_cleave_to_row or -1,
|
||||
sync_point)
|
||||
objects = None
|
||||
for objects, info in self.yield_objects(
|
||||
source_broker, shard_range,
|
||||
since_row=sync_from_row):
|
||||
shard_broker.merge_items(objects)
|
||||
if objects is None:
|
||||
self.logger.info("Cleaving '%s': %r - zero objects found",
|
||||
broker.path, shard_range)
|
||||
if shard_broker.get_info()['put_timestamp'] == put_timestamp:
|
||||
# This was just created; don't need to replicate this
|
||||
# SR because there was nothing there. So cleanup and
|
||||
# remove the shard_broker from its hand off location.
|
||||
self.delete_db(shard_broker)
|
||||
cleaving_context.cursor = shard_range.upper_str
|
||||
cleaving_context.ranges_done += 1
|
||||
cleaving_context.ranges_todo -= 1
|
||||
if shard_range.upper >= own_shard_range.upper:
|
||||
# cleaving complete
|
||||
cleaving_context.cleaving_done = True
|
||||
cleaving_context.store(broker)
|
||||
# Because nothing was here we wont count it in the shard
|
||||
# batch count.
|
||||
return CLEAVE_EMPTY
|
||||
# Else, it wasn't newly created by us, and
|
||||
# we don't know what's in it or why. Let it get
|
||||
# replicated and counted in the batch count.
|
||||
|
||||
# Note: the max row stored as a sync point is sampled *before*
|
||||
# objects are yielded to ensure that is less than or equal to
|
||||
# the last yielded row. Other sync points are also copied from the
|
||||
@ -1196,8 +1228,6 @@ class ContainerSharder(ContainerReplicator):
|
||||
self.logger.debug("Cleaving '%s': %r - shard db already in sync",
|
||||
broker.path, shard_range)
|
||||
|
||||
own_shard_range = broker.get_own_shard_range()
|
||||
|
||||
replication_quorum = self.existing_shard_replication_quorum
|
||||
if shard_range.includes(own_shard_range):
|
||||
# When shrinking, include deleted own (donor) shard range in
|
||||
@ -1239,7 +1269,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
'%s successes, %s required.', shard_range, broker.path,
|
||||
replication_successes, replication_quorum)
|
||||
self._increment_stat('cleaved', 'failure', statsd=True)
|
||||
return False
|
||||
return CLEAVE_FAILED
|
||||
|
||||
elapsed = round(time.time() - start, 3)
|
||||
self._min_stat('cleaved', 'min_time', elapsed)
|
||||
@ -1256,7 +1286,7 @@ class ContainerSharder(ContainerReplicator):
|
||||
'Cleaved %s for shard range %s in %gs.',
|
||||
broker.path, shard_range, elapsed)
|
||||
self._increment_stat('cleaved', 'success', statsd=True)
|
||||
return True
|
||||
return CLEAVE_SUCCESS
|
||||
|
||||
def _cleave(self, broker):
|
||||
# Returns True if misplaced objects have been moved and the entire
|
||||
@ -1301,23 +1331,30 @@ class ContainerSharder(ContainerReplicator):
|
||||
cleaving_context.ranges_todo, broker.path)
|
||||
|
||||
ranges_done = []
|
||||
for shard_range in ranges_todo[:self.cleave_batch_size]:
|
||||
for shard_range in ranges_todo:
|
||||
if shard_range.state == ShardRange.FOUND:
|
||||
break
|
||||
elif shard_range.state in (ShardRange.CREATED,
|
||||
ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE):
|
||||
if self._cleave_shard_range(
|
||||
broker, cleaving_context, shard_range):
|
||||
cleave_result = self._cleave_shard_range(
|
||||
broker, cleaving_context, shard_range)
|
||||
if cleave_result == CLEAVE_SUCCESS:
|
||||
ranges_done.append(shard_range)
|
||||
else:
|
||||
if len(ranges_done) == self.cleave_batch_size:
|
||||
break
|
||||
elif cleave_result == CLEAVE_FAILED:
|
||||
break
|
||||
# else, no errors, but no rows found either. keep going,
|
||||
# and don't count it against our batch size
|
||||
else:
|
||||
self.logger.warning('Unexpected shard range state for cleave',
|
||||
shard_range.state)
|
||||
break
|
||||
|
||||
if not ranges_done:
|
||||
# _cleave_shard_range always store()s the context on success; make
|
||||
# sure we *also* do that if we hit a failure right off the bat
|
||||
cleaving_context.store(broker)
|
||||
self.logger.debug(
|
||||
'Cleaved %s shard ranges for %s', len(ranges_done), broker.path)
|
||||
|
@ -1321,6 +1321,65 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual(8, context.cleave_to_row)
|
||||
self.assertEqual(8, context.max_row)
|
||||
|
||||
def test_cleave_root_empty_db_with_ranges(self):
|
||||
broker = self._make_broker()
|
||||
broker.enable_sharding(Timestamp.now())
|
||||
|
||||
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
shard_bounds, state=ShardRange.CREATED)
|
||||
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
sharder_conf = {'cleave_batch_size': 1}
|
||||
with self._mock_sharder(sharder_conf) as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
|
||||
info_lines = sharder.logger.get_lines_for_level('info')
|
||||
expected_zero_obj = [line for line in info_lines
|
||||
if " - zero objects found" in line]
|
||||
self.assertEqual(len(expected_zero_obj), len(shard_bounds))
|
||||
|
||||
cleaving_context = CleavingContext.load(broker)
|
||||
# even though there is a cleave_batch_size of 1, we don't count empty
|
||||
# ranges when cleaving seeing as they aren't replicated
|
||||
self.assertEqual(cleaving_context.ranges_done, 3)
|
||||
self.assertEqual(cleaving_context.ranges_todo, 0)
|
||||
self.assertTrue(cleaving_context.cleaving_done)
|
||||
|
||||
def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self):
|
||||
broker = self._make_broker()
|
||||
broker.enable_sharding(Timestamp.now())
|
||||
|
||||
shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
|
||||
shard_ranges = self._make_shard_ranges(
|
||||
shard_bounds, state=ShardRange.CREATED)
|
||||
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
sharder_conf = {'cleave_batch_size': 1}
|
||||
with self._mock_sharder(sharder_conf) as sharder:
|
||||
# pre-create a shard broker on a handoff location. This will force
|
||||
# the sharder to not skip it but instead force to replicate it and
|
||||
# use up a cleave_batch_size count.
|
||||
sharder._get_shard_broker(shard_ranges[0], broker.root_path,
|
||||
0)
|
||||
self.assertFalse(sharder._cleave(broker))
|
||||
|
||||
info_lines = sharder.logger.get_lines_for_level('info')
|
||||
expected_zero_obj = [line for line in info_lines
|
||||
if " - zero objects found" in line]
|
||||
self.assertEqual(len(expected_zero_obj), 1)
|
||||
|
||||
cleaving_context = CleavingContext.load(broker)
|
||||
# even though there is a cleave_batch_size of 1, we don't count empty
|
||||
# ranges when cleaving seeing as they aren't replicated
|
||||
self.assertEqual(cleaving_context.ranges_done, 1)
|
||||
self.assertEqual(cleaving_context.ranges_todo, 2)
|
||||
self.assertFalse(cleaving_context.cleaving_done)
|
||||
|
||||
def test_cleave_shard(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
own_shard_range = ShardRange(
|
||||
|
Loading…
Reference in New Issue
Block a user