diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 3912fb35d6..007d511e87 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -40,6 +40,11 @@ from swift.container.backend import ContainerBroker, \ from swift.container.replicator import ContainerReplicator +CLEAVE_SUCCESS = 0 +CLEAVE_FAILED = 1 +CLEAVE_EMPTY = 2 + + def sharding_enabled(broker): # NB all shards will by default have been created with # X-Container-Sysmeta-Sharding set and will therefore be candidates for @@ -636,6 +641,7 @@ class ContainerSharder(ContainerReplicator): """ part = self.ring.get_part(shard_range.account, shard_range.container) node = self.find_local_handoff_for_part(part) + put_timestamp = Timestamp.now().internal if not node: raise DeviceUnavailable( 'No mounted devices found suitable for creating shard broker ' @@ -644,7 +650,7 @@ class ContainerSharder(ContainerReplicator): shard_broker = ContainerBroker.create_broker( os.path.join(self.root, node['device']), part, shard_range.account, shard_range.container, epoch=shard_range.epoch, - storage_policy_index=policy_index) + storage_policy_index=policy_index, put_timestamp=put_timestamp) # Get the valid info into the broker.container, etc shard_broker.get_info() @@ -654,7 +660,7 @@ class ContainerSharder(ContainerReplicator): 'X-Container-Sysmeta-Sharding': ('True', Timestamp.now().internal)}) - return part, shard_broker, node['id'] + return part, shard_broker, node['id'], put_timestamp def _audit_root_container(self, broker): # This is the root container, and therefore the tome of knowledge, @@ -844,7 +850,7 @@ class ContainerSharder(ContainerReplicator): last_index = next_index = 0 for obj in objs: if dest_shard_range is None: - # no more destinations: yield remainder of batch and return + # no more destinations: yield remainder of batch and bail # NB there may be more batches of objects but none of them # will be placed so no point fetching them yield objs[last_index:], None, info @@ -933,7 +939,7 @@ class ContainerSharder(ContainerReplicator): continue if dest_shard_range not in dest_brokers: - part, dest_broker, node_id = self._get_shard_broker( + part, dest_broker, node_id, _junk = self._get_shard_broker( dest_shard_range, src_broker.root_path, policy_index) # save the broker info that was sampled prior to the *first* # yielded objects for this destination @@ -1162,12 +1168,15 @@ class ContainerSharder(ContainerReplicator): start = time.time() policy_index = broker.storage_policy_index try: - shard_part, shard_broker, node_id = self._get_shard_broker( - shard_range, broker.root_path, policy_index) + shard_part, shard_broker, node_id, put_timestamp = \ + self._get_shard_broker(shard_range, broker.root_path, + policy_index) except DeviceUnavailable as duex: self.logger.warning(str(duex)) self._increment_stat('cleaved', 'failure', statsd=True) - return False + return CLEAVE_FAILED + + own_shard_range = broker.get_own_shard_range() # only cleave from the retiring db - misplaced objects handler will # deal with any objects in the fresh db @@ -1178,13 +1187,36 @@ class ContainerSharder(ContainerReplicator): source_db_id = source_broker.get_info()['id'] source_max_row = source_broker.get_max_row() sync_point = shard_broker.get_sync(source_db_id) - if sync_point < source_max_row: + if sync_point < source_max_row or source_max_row == -1: sync_from_row = max(cleaving_context.last_cleave_to_row or -1, sync_point) + objects = None for objects, info in self.yield_objects( source_broker, shard_range, since_row=sync_from_row): shard_broker.merge_items(objects) + if objects is None: + self.logger.info("Cleaving '%s': %r - zero objects found", + broker.path, shard_range) + if shard_broker.get_info()['put_timestamp'] == put_timestamp: + # This was just created; don't need to replicate this + # SR because there was nothing there. So cleanup and + # remove the shard_broker from its hand off location. + self.delete_db(shard_broker) + cleaving_context.cursor = shard_range.upper_str + cleaving_context.ranges_done += 1 + cleaving_context.ranges_todo -= 1 + if shard_range.upper >= own_shard_range.upper: + # cleaving complete + cleaving_context.cleaving_done = True + cleaving_context.store(broker) + # Because nothing was here we wont count it in the shard + # batch count. + return CLEAVE_EMPTY + # Else, it wasn't newly created by us, and + # we don't know what's in it or why. Let it get + # replicated and counted in the batch count. + # Note: the max row stored as a sync point is sampled *before* # objects are yielded to ensure that is less than or equal to # the last yielded row. Other sync points are also copied from the @@ -1199,8 +1231,6 @@ class ContainerSharder(ContainerReplicator): self.logger.debug("Cleaving '%s': %r - shard db already in sync", broker.path, shard_range) - own_shard_range = broker.get_own_shard_range() - replication_quorum = self.existing_shard_replication_quorum if shard_range.includes(own_shard_range): # When shrinking, include deleted own (donor) shard range in @@ -1242,7 +1272,7 @@ class ContainerSharder(ContainerReplicator): '%s successes, %s required.', shard_range, broker.path, replication_successes, replication_quorum) self._increment_stat('cleaved', 'failure', statsd=True) - return False + return CLEAVE_FAILED elapsed = round(time.time() - start, 3) self._min_stat('cleaved', 'min_time', elapsed) @@ -1259,7 +1289,7 @@ class ContainerSharder(ContainerReplicator): 'Cleaved %s for shard range %s in %gs.', broker.path, shard_range, elapsed) self._increment_stat('cleaved', 'success', statsd=True) - return True + return CLEAVE_SUCCESS def _cleave(self, broker): # Returns True if misplaced objects have been moved and the entire @@ -1304,23 +1334,30 @@ class ContainerSharder(ContainerReplicator): cleaving_context.ranges_todo, broker.path) ranges_done = [] - for shard_range in ranges_todo[:self.cleave_batch_size]: + for shard_range in ranges_todo: if shard_range.state == ShardRange.FOUND: break elif shard_range.state in (ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE): - if self._cleave_shard_range( - broker, cleaving_context, shard_range): + cleave_result = self._cleave_shard_range( + broker, cleaving_context, shard_range) + if cleave_result == CLEAVE_SUCCESS: ranges_done.append(shard_range) - else: + if len(ranges_done) == self.cleave_batch_size: + break + elif cleave_result == CLEAVE_FAILED: break + # else, no errors, but no rows found either. keep going, + # and don't count it against our batch size else: self.logger.warning('Unexpected shard range state for cleave', shard_range.state) break if not ranges_done: + # _cleave_shard_range always store()s the context on success; make + # sure we *also* do that if we hit a failure right off the bat cleaving_context.store(broker) self.logger.debug( 'Cleaved %s shard ranges for %s', len(ranges_done), broker.path) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index a563671802..641d885a12 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -1321,6 +1321,65 @@ class TestSharder(BaseTestSharder): self.assertEqual(8, context.cleave_to_row) self.assertEqual(8, context.max_row) + def test_cleave_root_empty_db_with_ranges(self): + broker = self._make_broker() + broker.enable_sharding(Timestamp.now()) + + shard_bounds = (('', 'd'), ('d', 'x'), ('x', '')) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.CREATED) + + broker.merge_shard_ranges(shard_ranges) + self.assertTrue(broker.set_sharding_state()) + + sharder_conf = {'cleave_batch_size': 1} + with self._mock_sharder(sharder_conf) as sharder: + self.assertTrue(sharder._cleave(broker)) + + info_lines = sharder.logger.get_lines_for_level('info') + expected_zero_obj = [line for line in info_lines + if " - zero objects found" in line] + self.assertEqual(len(expected_zero_obj), len(shard_bounds)) + + cleaving_context = CleavingContext.load(broker) + # even though there is a cleave_batch_size of 1, we don't count empty + # ranges when cleaving seeing as they aren't replicated + self.assertEqual(cleaving_context.ranges_done, 3) + self.assertEqual(cleaving_context.ranges_todo, 0) + self.assertTrue(cleaving_context.cleaving_done) + + def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self): + broker = self._make_broker() + broker.enable_sharding(Timestamp.now()) + + shard_bounds = (('', 'd'), ('d', 'x'), ('x', '')) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=ShardRange.CREATED) + + broker.merge_shard_ranges(shard_ranges) + self.assertTrue(broker.set_sharding_state()) + + sharder_conf = {'cleave_batch_size': 1} + with self._mock_sharder(sharder_conf) as sharder: + # pre-create a shard broker on a handoff location. This will force + # the sharder to not skip it but instead force to replicate it and + # use up a cleave_batch_size count. + sharder._get_shard_broker(shard_ranges[0], broker.root_path, + 0) + self.assertFalse(sharder._cleave(broker)) + + info_lines = sharder.logger.get_lines_for_level('info') + expected_zero_obj = [line for line in info_lines + if " - zero objects found" in line] + self.assertEqual(len(expected_zero_obj), 1) + + cleaving_context = CleavingContext.load(broker) + # even though there is a cleave_batch_size of 1, we don't count empty + # ranges when cleaving seeing as they aren't replicated + self.assertEqual(cleaving_context.ranges_done, 1) + self.assertEqual(cleaving_context.ranges_todo, 2) + self.assertFalse(cleaving_context.cleaving_done) + def test_cleave_shard(self): broker = self._make_broker(account='.shards_a', container='shard_c') own_shard_range = ShardRange(