diff --git a/bin/swift-container-sharder b/bin/swift-container-sharder index 3e6551319b..0374b8d91b 100755 --- a/bin/swift-container-sharder +++ b/bin/swift-container-sharder @@ -29,5 +29,9 @@ if __name__ == '__main__': help='Shard containers only in given partitions. ' 'Comma-separated list. ' 'Only has effect if --once is used.') + parser.add_option('--no-auto-shard', action='store_false', + dest='auto_shard', default=None, + help='Disable auto-sharding. Overrides the auto_shard ' + 'value in the config file.') conf_file, options = parse_options(parser=parser, once=True) run_daemon(ContainerSharder, conf_file, **options) diff --git a/swift/common/utils.py b/swift/common/utils.py index dbb95cb90e..c22cf9c1e6 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -4848,13 +4848,15 @@ class ShardRange(object): SHRINKING = 50 SHARDING = 60 SHARDED = 70 + SHRUNK = 80 STATES = {FOUND: 'found', CREATED: 'created', CLEAVED: 'cleaved', ACTIVE: 'active', SHRINKING: 'shrinking', SHARDING: 'sharding', - SHARDED: 'sharded'} + SHARDED: 'sharded', + SHRUNK: 'shrunk'} STATES_BY_NAME = dict((v, k) for k, v in STATES.items()) class OuterBound(object): @@ -4912,6 +4914,13 @@ class ShardRange(object): self.epoch = epoch self.reported = reported + @classmethod + def sort_key(cls, sr): + # defines the sort order for shard ranges + # note if this ever changes to *not* sort by upper first then it breaks + # a key assumption for bisect, which is used by utils.find_shard_range + return sr.upper, sr.state, sr.lower, sr.name + @classmethod def _encode(cls, value): if six.PY2 and isinstance(value, six.text_type): diff --git a/swift/container/backend.py b/swift/container/backend.py index 01e854c3f8..0ce77702d5 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -409,7 +409,8 @@ class ContainerBroker(DatabaseBroker): own_shard_range = self.get_own_shard_range() if own_shard_range.state in (ShardRange.SHARDING, ShardRange.SHRINKING, - ShardRange.SHARDED): + ShardRange.SHARDED, + ShardRange.SHRUNK): return bool(self.get_shard_ranges()) return False @@ -1775,10 +1776,7 @@ class ContainerBroker(DatabaseBroker): include_deleted=include_deleted, states=states, include_own=include_own, exclude_others=exclude_others)] - # note if this ever changes to *not* sort by upper first then it breaks - # a key assumption for bisect, which is used by utils.find_shard_ranges - shard_ranges.sort(key=lambda sr: ( - sr.upper, sr.state, sr.lower, sr.name)) + shard_ranges.sort(key=ShardRange.sort_key) if includes: shard_range = find_shard_range(includes, shard_ranges) return [shard_range] if shard_range else [] diff --git a/swift/container/sharder.py b/swift/container/sharder.py index a942992029..dfa7ebe413 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -313,6 +313,12 @@ class CleavingContext(object): self.cleaving_done = False self.cleave_to_row = self.max_row + def range_done(self, new_cursor): + self.ranges_done += 1 + self.ranges_todo -= 1 + if new_cursor is not None: + self.cursor = new_cursor + def done(self): return all((self.misplaced_done, self.cleaving_done, self.max_row == self.cleave_to_row)) @@ -689,7 +695,8 @@ class ContainerSharder(ContainerReplicator): own_shard_range = broker.get_own_shard_range() if own_shard_range.state in (ShardRange.SHARDING, ShardRange.SHARDED): - shard_ranges = broker.get_shard_ranges() + shard_ranges = [sr for sr in broker.get_shard_ranges() + if sr.state != ShardRange.SHRINKING] missing_ranges = find_missing_ranges(shard_ranges) if missing_ranges: warnings.append( @@ -698,6 +705,10 @@ class ContainerSharder(ContainerReplicator): for lower, upper in missing_ranges])) for state in ShardRange.STATES: + if state == ShardRange.SHRINKING: + # Shrinking is how we resolve overlaps; we've got to + # allow multiple shards in that state + continue shard_ranges = broker.get_shard_ranges(states=state) overlaps = find_overlapping_ranges(shard_ranges) for overlapping_ranges in overlaps: @@ -718,7 +729,6 @@ class ContainerSharder(ContainerReplicator): return True def _audit_shard_container(self, broker): - # Get the root view of the world. self._increment_stat('audit_shard', 'attempted') warnings = [] errors = [] @@ -728,8 +738,10 @@ class ContainerSharder(ContainerReplicator): own_shard_range = broker.get_own_shard_range(no_default=True) - shard_range = None + shard_ranges = own_shard_range_from_root = None if own_shard_range: + # Get the root view of the world, at least that part of the world + # that overlaps with this shard's namespace shard_ranges = self._fetch_shard_ranges( broker, newest=True, params={'marker': str_to_wsgi(own_shard_range.lower_str), @@ -737,15 +749,18 @@ class ContainerSharder(ContainerReplicator): include_deleted=True) if shard_ranges: for shard_range in shard_ranges: - if (shard_range.lower == own_shard_range.lower and - shard_range.upper == own_shard_range.upper and - shard_range.name == own_shard_range.name): + # look for this shard range in the list of shard ranges + # received from root; the root may have different lower and + # upper bounds for this shard (e.g. if this shard has been + # expanded in the root to accept a shrinking shard) so we + # only match on name. + if shard_range.name == own_shard_range.name: + own_shard_range_from_root = shard_range break else: # this is not necessarily an error - some replicas of the # root may not yet know about this shard container warnings.append('root has no matching shard range') - shard_range = None elif not own_shard_range.deleted: warnings.append('unable to get shard ranges from root') # else, our shard range is deleted, so root may have reclaimed it @@ -764,13 +779,39 @@ class ContainerSharder(ContainerReplicator): self._increment_stat('audit_shard', 'failure', statsd=True) return False - if shard_range: - self.logger.debug('Updating shard from root %s', dict(shard_range)) - broker.merge_shard_ranges(shard_range) + if own_shard_range_from_root: + # iff we find our own shard range in the root response, merge it + # and reload own shard range (note: own_range_from_root may not + # necessarily be 'newer' than the own shard range we already have, + # but merging will get us to the 'newest' state) + self.logger.debug('Updating own shard range from root') + broker.merge_shard_ranges(own_shard_range_from_root) + orig_own_shard_range = own_shard_range own_shard_range = broker.get_own_shard_range() + if (orig_own_shard_range != own_shard_range or + orig_own_shard_range.state != own_shard_range.state): + self.logger.debug( + 'Updated own shard range from %s to %s', + orig_own_shard_range, own_shard_range) + if own_shard_range.state in (ShardRange.SHRINKING, + ShardRange.SHRUNK): + # If the up-to-date state is shrinking, save off *all* shards + # returned because these may contain shards into which this + # shard is to shrink itself; shrinking is the only case when we + # want to learn about *other* shard ranges from the root. + # We need to include shrunk state too, because one replica of a + # shard may already have moved the own_shard_range state to + # shrunk while another replica may still be in the process of + # shrinking. + other_shard_ranges = [sr for sr in shard_ranges + if sr is not own_shard_range_from_root] + self.logger.debug('Updating %s other shard range(s) from root', + len(other_shard_ranges)) + broker.merge_shard_ranges(other_shard_ranges) delete_age = time.time() - self.reclaim_age - if (own_shard_range.state == ShardRange.SHARDED and + deletable_states = (ShardRange.SHARDED, ShardRange.SHRUNK) + if (own_shard_range.state in deletable_states and own_shard_range.deleted and own_shard_range.timestamp < delete_age and broker.empty()): @@ -1100,7 +1141,7 @@ class ContainerSharder(ContainerReplicator): own_shard_range = broker.get_own_shard_range() shard_ranges = broker.get_shard_ranges() if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper: - self.logger.debug('Scan already completed for %s', + self.logger.debug('Scan for shard ranges already completed for %s', quote(broker.path)) return 0 @@ -1234,9 +1275,7 @@ class ContainerSharder(ContainerReplicator): # SR because there was nothing there. So cleanup and # remove the shard_broker from its hand off location. self.delete_db(shard_broker) - cleaving_context.cursor = shard_range.upper_str - cleaving_context.ranges_done += 1 - cleaving_context.ranges_todo -= 1 + cleaving_context.range_done(shard_range.upper_str) if shard_range.upper >= own_shard_range.upper: # cleaving complete cleaving_context.cleaving_done = True @@ -1269,7 +1308,7 @@ class ContainerSharder(ContainerReplicator): # will atomically update its namespace *and* delete the donor. # Don't do this when sharding a shard because the donor # namespace should not be deleted until all shards are cleaved. - if own_shard_range.update_state(ShardRange.SHARDED): + if own_shard_range.update_state(ShardRange.SHRUNK): own_shard_range.set_deleted() broker.merge_shard_ranges(own_shard_range) shard_broker.merge_shard_ranges(own_shard_range) @@ -1309,9 +1348,7 @@ class ContainerSharder(ContainerReplicator): self._min_stat('cleaved', 'min_time', elapsed) self._max_stat('cleaved', 'max_time', elapsed) broker.merge_shard_ranges(shard_range) - cleaving_context.cursor = shard_range.upper_str - cleaving_context.ranges_done += 1 - cleaving_context.ranges_todo -= 1 + cleaving_context.range_done(shard_range.upper_str) if shard_range.upper >= own_shard_range.upper: # cleaving complete cleaving_context.cleaving_done = True @@ -1366,8 +1403,15 @@ class ContainerSharder(ContainerReplicator): ranges_done = [] for shard_range in ranges_todo: - if shard_range.state == ShardRange.FOUND: - break + if shard_range.state == ShardRange.SHRINKING: + # Ignore shrinking shard ranges: we never want to cleave + # objects to a shrinking shard. Shrinking shard ranges are to + # be expected in a root; shrinking shard ranges (other than own + # shard range) are not normally expected in a shard but can + # occur if there is an overlapping shard range that has been + # discovered from the root. + cleaving_context.range_done(None) # don't move the cursor + continue elif shard_range.state in (ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE): @@ -1382,8 +1426,7 @@ class ContainerSharder(ContainerReplicator): # else, no errors, but no rows found either. keep going, # and don't count it against our batch size else: - self.logger.warning('Unexpected shard range state for cleave', - shard_range.state) + self.logger.info('Stopped cleave at unready %s', shard_range) break if not ranges_done: @@ -1407,7 +1450,12 @@ class ContainerSharder(ContainerReplicator): for sr in modified_shard_ranges: sr.update_state(ShardRange.ACTIVE) own_shard_range = broker.get_own_shard_range() - own_shard_range.update_state(ShardRange.SHARDED) + if own_shard_range.state in (ShardRange.SHRINKING, + ShardRange.SHRUNK): + next_state = ShardRange.SHRUNK + else: + next_state = ShardRange.SHARDED + own_shard_range.update_state(next_state) own_shard_range.update_meta(0, 0) if (not broker.is_root_container() and not own_shard_range.deleted): @@ -1518,7 +1566,8 @@ class ContainerSharder(ContainerReplicator): own_shard_range = broker.get_own_shard_range() if own_shard_range.state in (ShardRange.SHARDING, ShardRange.SHRINKING, - ShardRange.SHARDED): + ShardRange.SHARDED, + ShardRange.SHRUNK): if broker.get_shard_ranges(): # container has been given shard ranges rather than # found them e.g. via replication or a shrink event @@ -1593,6 +1642,7 @@ class ContainerSharder(ContainerReplicator): - if not a root container, reports shard range stats to the root container """ + self.logger.info('Container sharder cycle starting, auto-sharding %s', self.auto_shard) if isinstance(devices_to_shard, (list, tuple)): @@ -1659,8 +1709,14 @@ class ContainerSharder(ContainerReplicator): self._report_stats() + def _set_auto_shard_from_command_line(self, **kwargs): + auto_shard = kwargs.get('auto_shard', None) + if auto_shard is not None: + self.auto_shard = config_true_value(auto_shard) + def run_forever(self, *args, **kwargs): """Run the container sharder until stopped.""" + self._set_auto_shard_from_command_line(**kwargs) self.reported = time.time() time.sleep(random() * self.interval) while True: @@ -1683,6 +1739,7 @@ class ContainerSharder(ContainerReplicator): override_options = parse_override_options(once=True, **kwargs) devices_to_shard = override_options.devices or Everything() partitions_to_shard = override_options.partitions or Everything() + self._set_auto_shard_from_command_line(**kwargs) begin = self.reported = time.time() self._one_shard_cycle(devices_to_shard=devices_to_shard, partitions_to_shard=partitions_to_shard) diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index 118d1ebc64..2fad4b6581 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -28,7 +28,8 @@ from swift.common.manager import Manager from swift.common.memcached import MemcacheRing from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ quorum_size, config_true_value, Timestamp -from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING +from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \ + SHARDED from swift.container.sharder import CleavingContext from swiftclient import client, get_auth, ClientException @@ -1611,7 +1612,7 @@ class TestContainerSharding(BaseTestContainerSharding): broker = self.get_broker( part, node, donor.account, donor.container) own_sr = broker.get_own_shard_range() - self.assertEqual(ShardRange.SHARDED, own_sr.state) + self.assertEqual(ShardRange.SHRUNK, own_sr.state) self.assertTrue(own_sr.deleted) # delete all the second shard's object apart from 'alpha' @@ -2417,6 +2418,16 @@ class TestContainerShardingMoreUTF8(TestContainerSharding): class TestManagedContainerSharding(BaseTestContainerSharding): '''Test sharding using swift-manage-shard-ranges''' + + def sharders_once(self, **kwargs): + # inhibit auto_sharding regardless of the config setting + additional_args = kwargs.get('additional_args', []) + if not isinstance(additional_args, list): + additional_args = [additional_args] + additional_args.append('--no-auto-shard') + kwargs['additional_args'] = additional_args + self.sharders.once(**kwargs) + def test_manage_shard_ranges(self): obj_names = self._make_object_names(4) self.put_objects(obj_names) @@ -2429,7 +2440,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding): # sanity check: we don't have nearly enough objects for this to shard # automatically - self.sharders.once(number=self.brain.node_numbers[0], + self.sharders_once(number=self.brain.node_numbers[0], additional_args='--partitions=%s' % self.brain.part) self.assert_container_state(self.brain.nodes[0], 'unsharded', 0) @@ -2442,7 +2453,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding): # "Run container-replicator to replicate them to other nodes." self.replicators.once() # "Run container-sharder on all nodes to shard the container." - self.sharders.once(additional_args='--partitions=%s' % self.brain.part) + self.sharders_once(additional_args='--partitions=%s' % self.brain.part) # Everybody's settled self.assert_container_state(self.brain.nodes[0], 'sharded', 2) @@ -2460,23 +2471,42 @@ class TestManagedContainerSharding(BaseTestContainerSharding): # run replicators first time to get sync points set self.replicators.once() + # find 4 shard ranges on nodes[0] - let's denote these ranges 0.0, 0.1, + # 0.2 and 0.3 that are installed with epoch_0 subprocess.check_output([ 'swift-manage-shard-ranges', self.get_db_file(self.brain.part, self.brain.nodes[0]), 'find_and_replace', '2', '--enable'], stderr=subprocess.STDOUT) - self.assert_container_state(self.brain.nodes[0], 'unsharded', 4) + shard_ranges_0 = self.assert_container_state(self.brain.nodes[0], + 'unsharded', 4) - # *Also* go find shard ranges on *another node*, like a dumb-dumb + # *Also* go find 3 shard ranges on *another node*, like a dumb-dumb - + # let's denote these ranges 1.0, 1.1 and 1.2 that are installed with + # epoch_1 subprocess.check_output([ 'swift-manage-shard-ranges', self.get_db_file(self.brain.part, self.brain.nodes[1]), 'find_and_replace', '3', '--enable'], stderr=subprocess.STDOUT) - self.assert_container_state(self.brain.nodes[1], 'unsharded', 3) + shard_ranges_1 = self.assert_container_state(self.brain.nodes[1], + 'unsharded', 3) - # Run things out of order (they were likely running as daemons anyway) - self.sharders.once(number=self.brain.node_numbers[0], + # Run sharder in specific order so that the replica with the older + # epoch_0 starts sharding first - this will prove problematic later! + # On first pass the first replica passes audit, creates shards and then + # syncs shard ranges with the other replicas. It proceeds to cleave + # shard 0.0, but after 0.0 cleaving stalls because it will now have + # shard range 1.0 in 'found' state from the other replica that it + # cannot yet cleave. + self.sharders_once(number=self.brain.node_numbers[0], additional_args='--partitions=%s' % self.brain.part) - self.sharders.once(number=self.brain.node_numbers[1], + + # On first pass the second replica passes audit (it has its own found + # ranges and the first replicas created shard ranges but none in the + # same state overlap), creates its shards and then syncs shard ranges + # with the other replicas. All of the 7 shard ranges on this replica + # are now in created state so it proceeds to cleave the first two shard + # ranges, 0.1 and 1.0. + self.sharders_once(number=self.brain.node_numbers[1], additional_args='--partitions=%s' % self.brain.part) self.replicators.once() @@ -2486,14 +2516,164 @@ class TestManagedContainerSharding(BaseTestContainerSharding): # There's a race: the third replica may be sharding, may be unsharded # Try it again a few times - self.sharders.once(additional_args='--partitions=%s' % self.brain.part) + self.sharders_once(additional_args='--partitions=%s' % self.brain.part) self.replicators.once() - self.sharders.once(additional_args='--partitions=%s' % self.brain.part) + self.sharders_once(additional_args='--partitions=%s' % self.brain.part) - # It's not really fixing itself... - self.assert_container_state(self.brain.nodes[0], 'sharding', 7) - self.assert_container_state(self.brain.nodes[1], 'sharding', 7) + # It's not really fixing itself... the sharder audit will detect + # overlapping ranges which prevents cleaving proceeding; expect the + # shard ranges to be mostly still in created state, with one or two + # possibly cleaved during first pass before the sharding got stalled + shard_ranges = self.assert_container_state(self.brain.nodes[0], + 'sharding', 7) + for sr in shard_ranges: + self.assertIn(sr.state, (ShardRange.CREATED, ShardRange.CLEAVED)) + shard_ranges = self.assert_container_state(self.brain.nodes[1], + 'sharding', 7) + for sr in shard_ranges: + self.assertIn(sr.state, (ShardRange.CREATED, ShardRange.CLEAVED)) # But hey, at least listings still work! They're just going to get # horribly out of date as more objects are added self.assert_container_listing(obj_names) + + # Let's pretend that some actor in the system has determined that the + # second set of 3 shard ranges (1.*) are correct and the first set of 4 + # (0.*) are not desired, so shrink shard ranges 0.*. We've already + # checked they are in cleaved or created state so it's ok to move them + # to shrinking. + # TODO: replace this db manipulation if/when manage_shard_ranges can + # manage shrinking... + for sr in shard_ranges_0: + self.assertTrue(sr.update_state(ShardRange.SHRINKING)) + sr.epoch = sr.state_timestamp = Timestamp.now() + broker = self.get_broker(self.brain.part, self.brain.nodes[0]) + broker.merge_shard_ranges(shard_ranges_0) + + # make sure all root replicas now sync their shard ranges + self.replicators.once() + # At this point one of the first two replicas may have done some useful + # cleaving of 1.* shards, the other may have only cleaved 0.* shards, + # and the third replica may have cleaved no shards. We therefore need + # two more passes of the sharder to get to a predictable state where + # all replicas have cleaved all three 0.* shards. + self.sharders_once() + self.sharders_once() + + # now we expect all replicas to have just the three 1.* shards, with + # the 0.* shards all deleted + brokers = {} + orig_shard_ranges = sorted(shard_ranges_0 + shard_ranges_1, + key=ShardRange.sort_key) + for node in (0, 1, 2): + with annotate_failure('node %s' % node): + broker = self.get_broker(self.brain.part, + self.brain.nodes[node]) + brokers[node] = broker + shard_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges_1, shard_ranges) + shard_ranges = broker.get_shard_ranges(include_deleted=True) + self.assertLengthEqual(shard_ranges, len(orig_shard_ranges)) + self.assertEqual(orig_shard_ranges, shard_ranges) + self.assertEqual(ShardRange.SHARDED, + broker._own_shard_range().state) + # Sadly, the first replica to start sharding us still reporting its db + # state to be 'unsharded' because, although it has sharded, it's shard + # db epoch (epoch_0) does not match its own shard range epoch + # (epoch_1), and that is because the second replica (with epoch_1) + # updated the own shard range and replicated it to all other replicas. + # If we had run the sharder on the second replica before the first + # replica, then by the time the first replica started sharding it would + # have learnt the newer epoch_1 and we wouldn't see this inconsistency. + self.assertEqual(UNSHARDED, brokers[0].get_db_state()) + self.assertEqual(SHARDED, brokers[1].get_db_state()) + self.assertEqual(SHARDED, brokers[2].get_db_state()) + epoch_1 = brokers[1].db_epoch + self.assertEqual(epoch_1, brokers[2].db_epoch) + self.assertLess(brokers[0].db_epoch, epoch_1) + # the root replica that thinks it is unsharded is problematic - it will + # not return shard ranges for listings, but has no objects, so it's + # luck of the draw whether we get a listing or not at this point :( + + # check the unwanted shards did shrink away... + for shard_range in shard_ranges_0: + with annotate_failure(shard_range): + found_for_shard = self.categorize_container_dir_content( + shard_range.account, shard_range.container) + self.assertLengthEqual(found_for_shard['shard_dbs'], 3) + actual = [] + for shard_db in found_for_shard['shard_dbs']: + broker = ContainerBroker(shard_db) + own_sr = broker.get_own_shard_range() + actual.append( + (broker.get_db_state(), own_sr.state, own_sr.deleted)) + self.assertEqual([(SHARDED, ShardRange.SHRUNK, True)] * 3, + actual) + + # Run the sharders again: the first replica that is still 'unsharded' + # because of the older epoch_0 in its db filename will now start to + # shard again with a newer epoch_1 db, and will start to re-cleave the + # 3 active shards, albeit with zero objects to cleave. + self.sharders_once() + for node in (0, 1, 2): + with annotate_failure('node %s' % node): + broker = self.get_broker(self.brain.part, + self.brain.nodes[node]) + brokers[node] = broker + shard_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges_1, shard_ranges) + shard_ranges = broker.get_shard_ranges(include_deleted=True) + self.assertLengthEqual(shard_ranges, len(orig_shard_ranges)) + self.assertEqual(orig_shard_ranges, shard_ranges) + self.assertEqual(ShardRange.SHARDED, + broker._own_shard_range().state) + self.assertEqual(epoch_1, broker.db_epoch) + self.assertIn(brokers[0].get_db_state(), (SHARDING, SHARDED)) + self.assertEqual(SHARDED, brokers[1].get_db_state()) + self.assertEqual(SHARDED, brokers[2].get_db_state()) + + # This cycle of the sharders also guarantees that all shards have had + # their state updated to ACTIVE from the root; this was not necessarily + # true at end of the previous sharder pass because a shard audit (when + # the shard is updated from a root) may have happened before all roots + # have had their shard ranges transitioned to ACTIVE. + for shard_range in shard_ranges_1: + with annotate_failure(shard_range): + found_for_shard = self.categorize_container_dir_content( + shard_range.account, shard_range.container) + self.assertLengthEqual(found_for_shard['normal_dbs'], 3) + actual = [] + for shard_db in found_for_shard['normal_dbs']: + broker = ContainerBroker(shard_db) + own_sr = broker.get_own_shard_range() + actual.append( + (broker.get_db_state(), own_sr.state, own_sr.deleted)) + self.assertEqual([(UNSHARDED, ShardRange.ACTIVE, False)] * 3, + actual) + + # We may need one more pass of the sharder before all three shard + # ranges are cleaved (2 per pass) and all the root replicas are + # predictably in sharded state. Note: the accelerated cleaving of >2 + # zero-object shard ranges per cycle is defeated if a shard happens + # to exist on the same node as the root because the roots cleaving + # process doesn't think that it created the shard db and will therefore + # replicate it as per a normal cleave. + self.sharders_once() + for node in (0, 1, 2): + with annotate_failure('node %s' % node): + broker = self.get_broker(self.brain.part, + self.brain.nodes[node]) + brokers[node] = broker + shard_ranges = broker.get_shard_ranges() + self.assertEqual(shard_ranges_1, shard_ranges) + shard_ranges = broker.get_shard_ranges(include_deleted=True) + self.assertLengthEqual(shard_ranges, len(orig_shard_ranges)) + self.assertEqual(orig_shard_ranges, shard_ranges) + self.assertEqual(ShardRange.SHARDED, + broker._own_shard_range().state) + self.assertEqual(epoch_1, broker.db_epoch) + self.assertEqual(SHARDED, broker.get_db_state()) + + # Finally, with all root replicas in a consistent state, the listing + # will be be predictably correct + self.assert_container_listing(obj_names) diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 9555bdc073..45087ec8a5 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -109,8 +109,11 @@ class BaseTestSharder(unittest.TestCase): return broker def _make_shard_ranges(self, bounds, state=None, object_count=0): + if not isinstance(state, (tuple, list)): + state = [state] * len(bounds) + state_iter = iter(state) return [ShardRange('.shards_a/c_%s' % upper, Timestamp.now(), - lower, upper, state=state, + lower, upper, state=next(state_iter), object_count=object_count) for lower, upper in bounds] @@ -2219,6 +2222,71 @@ class TestSharder(BaseTestSharder): shard_broker.get_syncs()) self.assertEqual(objects[5:], shard_broker.get_objects()) + def test_cleave_skips_shrinking_and_stops_at_found(self): + broker = self._make_broker() + broker.enable_sharding(Timestamp.now()) + shard_bounds = (('', 'b'), + ('b', 'c'), + ('b', 'd'), + ('d', 'f'), + ('f', '')) + # make sure there is an object in every shard range so cleaving will + # occur in batches of 2 + objects = [ + ('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0), + ('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0), + ('c', self.ts_encoded(), 1, 'text/plain', 'etag_c', 0, 0), + ('d', self.ts_encoded(), 2, 'text/plain', 'etag_d', 0, 0), + ('e', self.ts_encoded(), 3, 'text/plain', 'etag_e', 0, 0), + ('f', self.ts_encoded(), 100, 'text/plain', 'etag_f', 0, 0), + ('x', self.ts_encoded(), 0, '', '', 1, 0), # deleted + ('z', self.ts_encoded(), 1000, 'text/plain', 'etag_z', 0, 0) + ] + for obj in objects: + broker.put_object(*obj) + shard_ranges = self._make_shard_ranges( + shard_bounds, state=[ShardRange.CREATED, + ShardRange.SHRINKING, + ShardRange.CREATED, + ShardRange.CREATED, + ShardRange.FOUND]) + broker.merge_shard_ranges(shard_ranges) + self.assertTrue(broker.set_sharding_state()) + + # run cleave - first batch is cleaved, shrinking range doesn't count + # towards batch size of 2 but does count towards ranges_done + with self._mock_sharder() as sharder: + self.assertFalse(sharder._cleave(broker)) + context = CleavingContext.load(broker) + self.assertTrue(context.misplaced_done) + self.assertFalse(context.cleaving_done) + self.assertEqual(shard_ranges[2].upper_str, context.cursor) + self.assertEqual(3, context.ranges_done) + self.assertEqual(2, context.ranges_todo) + + # run cleave - stops at shard range in FOUND state + with self._mock_sharder() as sharder: + self.assertFalse(sharder._cleave(broker)) + context = CleavingContext.load(broker) + self.assertTrue(context.misplaced_done) + self.assertFalse(context.cleaving_done) + self.assertEqual(shard_ranges[3].upper_str, context.cursor) + self.assertEqual(4, context.ranges_done) + self.assertEqual(1, context.ranges_todo) + + # run cleave - final shard range in CREATED state, cleaving proceeds + shard_ranges[4].update_state(ShardRange.CREATED, + state_timestamp=Timestamp.now()) + broker.merge_shard_ranges(shard_ranges[4:]) + with self._mock_sharder() as sharder: + self.assertTrue(sharder._cleave(broker)) + context = CleavingContext.load(broker) + self.assertTrue(context.misplaced_done) + self.assertTrue(context.cleaving_done) + self.assertEqual(shard_ranges[4].upper_str, context.cursor) + self.assertEqual(5, context.ranges_done) + self.assertEqual(0, context.ranges_todo) + def _check_complete_sharding(self, account, container, shard_bounds): broker = self._make_sharding_broker( account=account, container=container, shard_bounds=shard_bounds) @@ -4119,7 +4187,8 @@ class TestSharder(BaseTestSharder): for state in sorted(ShardRange.STATES): if state in (ShardRange.SHARDING, ShardRange.SHRINKING, - ShardRange.SHARDED): + ShardRange.SHARDED, + ShardRange.SHRUNK): epoch = None else: epoch = Timestamp.now() @@ -4318,6 +4387,10 @@ class TestSharder(BaseTestSharder): expected_stats = {'attempted': 1, 'success': 0, 'failure': 1} shard_bounds = (('a', 'j'), ('k', 't'), ('s', 'z')) for state, state_text in ShardRange.STATES.items(): + if state in (ShardRange.SHRINKING, + ShardRange.SHARDED, + ShardRange.SHRUNK): + continue # tested separately below shard_ranges = self._make_shard_ranges(shard_bounds, state) broker.merge_shard_ranges(shard_ranges) with self._mock_sharder() as sharder: @@ -4331,28 +4404,72 @@ class TestSharder(BaseTestSharder): self._assert_stats(expected_stats, sharder, 'audit_root') mocked.assert_not_called() + shard_ranges = self._make_shard_ranges(shard_bounds, + ShardRange.SHRINKING) + broker.merge_shard_ranges(shard_ranges) + with self._mock_sharder() as sharder: + with mock.patch.object( + sharder, '_audit_shard_container') as mocked: + sharder._audit_container(broker) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0}, + sharder, 'audit_root') + mocked.assert_not_called() + + for state in (ShardRange.SHRUNK, ShardRange.SHARDED): + shard_ranges = self._make_shard_ranges(shard_bounds, state) + for sr in shard_ranges: + sr.set_deleted(Timestamp.now()) + broker.merge_shard_ranges(shard_ranges) + with self._mock_sharder() as sharder: + with mock.patch.object( + sharder, '_audit_shard_container') as mocked: + sharder._audit_container(broker) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self._assert_stats({'attempted': 1, 'success': 1, 'failure': 0}, + sharder, 'audit_root') + mocked.assert_not_called() + + # Put the shards back to a "useful" state + shard_ranges = self._make_shard_ranges(shard_bounds, + ShardRange.ACTIVE) + broker.merge_shard_ranges(shard_ranges) + def assert_missing_warning(line): self.assertIn( 'Audit failed for root %s' % broker.db_file, line) self.assertIn('missing range(s): -a j-k z-', line) - own_shard_range = broker.get_own_shard_range() - states = (ShardRange.SHARDING, ShardRange.SHARDED) - for state in states: - own_shard_range.update_state( - state, state_timestamp=next(self.ts_iter)) - broker.merge_shard_ranges([own_shard_range]) - with self._mock_sharder() as sharder: - with mock.patch.object( - sharder, '_audit_shard_container') as mocked: - sharder._audit_container(broker) - lines = sharder.logger.get_lines_for_level('warning') - assert_missing_warning(lines[0]) - assert_overlap_warning(lines[0], state_text) - self.assertFalse(lines[1:]) - self.assertFalse(sharder.logger.get_lines_for_level('error')) - self._assert_stats(expected_stats, sharder, 'audit_root') - mocked.assert_not_called() + def check_missing(): + own_shard_range = broker.get_own_shard_range() + states = (ShardRange.SHARDING, ShardRange.SHARDED) + for state in states: + own_shard_range.update_state( + state, state_timestamp=next(self.ts_iter)) + broker.merge_shard_ranges([own_shard_range]) + with self._mock_sharder() as sharder: + with mock.patch.object( + sharder, '_audit_shard_container') as mocked: + sharder._audit_container(broker) + lines = sharder.logger.get_lines_for_level('warning') + assert_missing_warning(lines[0]) + assert_overlap_warning(lines[0], 'active') + self.assertFalse(lines[1:]) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self._assert_stats(expected_stats, sharder, 'audit_root') + mocked.assert_not_called() + + check_missing() + + # fill the gaps with shrinking shards and check that these are still + # reported as 'missing' + missing_shard_bounds = (('', 'a'), ('j', 'k'), ('z', '')) + shrinking_shard_ranges = self._make_shard_ranges(missing_shard_bounds, + ShardRange.SHRINKING) + broker.merge_shard_ranges(shrinking_shard_ranges) + check_missing() def call_audit_container(self, broker, shard_ranges, exc=None): with self._mock_sharder() as sharder: @@ -4388,13 +4505,17 @@ class TestSharder(BaseTestSharder): 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), params=params) - def test_audit_old_style_shard_container(self): - broker = self._make_broker(account='.shards_a', container='shard_c') - broker.set_sharding_sysmeta('Root', 'a/c') + def _do_test_audit_shard_container(self, *args): # include overlaps to verify correct match for updating own shard range + broker = self._make_broker(account='.shards_a', container='shard_c') + broker.set_sharding_sysmeta(*args) shard_bounds = ( - ('a', 'j'), ('k', 't'), ('k', 's'), ('l', 's'), ('s', 'z')) - shard_ranges = self._make_shard_ranges(shard_bounds, ShardRange.ACTIVE) + ('a', 'j'), ('k', 't'), ('k', 'u'), ('l', 'v'), ('s', 'z')) + shard_states = ( + ShardRange.ACTIVE, ShardRange.ACTIVE, ShardRange.ACTIVE, + ShardRange.FOUND, ShardRange.CREATED + ) + shard_ranges = self._make_shard_ranges(shard_bounds, shard_states) shard_ranges[1].name = broker.path expected_stats = {'attempted': 1, 'success': 0, 'failure': 1} @@ -4424,21 +4545,24 @@ class TestSharder(BaseTestSharder): self.assertFalse(sharder.logger.get_lines_for_level('error')) self.assertFalse(broker.is_deleted()) - # create own shard range, no match in root + # own shard range bounds don't match what's in root (e.g. this shard is + # expanding to be an acceptor) expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} own_shard_range = broker.get_own_shard_range() # get the default own_shard_range.lower = 'j' own_shard_range.upper = 'k' + own_shard_range.name = broker.path broker.merge_shard_ranges([own_shard_range]) + # bump timestamp of root shard range to be newer than own + now = Timestamp.now() + self.assertTrue(shard_ranges[1].update_state(ShardRange.ACTIVE, + state_timestamp=now)) + shard_ranges[1].timestamp = now sharder, mock_swift = self.call_audit_container(broker, shard_ranges) - lines = sharder.logger.get_lines_for_level('warning') - self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0]) - self.assertNotIn('account not in shards namespace', lines[0]) - self.assertNotIn('missing own shard range', lines[0]) - self.assertIn('root has no matching shard range', lines[0]) - self.assertNotIn('unable to get shard ranges from root', lines[0]) self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertFalse(lines[1:]) + self.assertEqual(['Updating own shard range from root', mock.ANY], + sharder.logger.get_lines_for_level('debug')) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) self.assertFalse(sharder.logger.get_lines_for_level('error')) self.assertFalse(broker.is_deleted()) expected_headers = {'X-Backend-Record-Type': 'shard', @@ -4449,12 +4573,55 @@ class TestSharder(BaseTestSharder): mock_swift.make_request.assert_called_once_with( 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), params=params) + # own shard range bounds are updated from root version + own_shard_range = broker.get_own_shard_range() + self.assertEqual(ShardRange.ACTIVE, own_shard_range.state) + self.assertEqual(now, own_shard_range.state_timestamp) + self.assertEqual('k', own_shard_range.lower) + self.assertEqual('t', own_shard_range.upper) + # check other shard ranges from root are not merged (not shrinking) + self.assertEqual([own_shard_range], + broker.get_shard_ranges(include_own=True)) - # create own shard range, failed response from root + # move root shard range to shrinking state + now = Timestamp.now() + self.assertTrue(shard_ranges[1].update_state(ShardRange.SHRINKING, + state_timestamp=now)) + # bump own shard range state timestamp so it is newer than root + now = Timestamp.now() + own_shard_range = broker.get_own_shard_range() + own_shard_range.update_state(ShardRange.ACTIVE, state_timestamp=now) + broker.merge_shard_ranges([own_shard_range]) + + sharder, mock_swift = self.call_audit_container(broker, shard_ranges) + self._assert_stats(expected_stats, sharder, 'audit_shard') + self.assertEqual(['Updating own shard range from root'], + sharder.logger.get_lines_for_level('debug')) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self.assertFalse(broker.is_deleted()) + expected_headers = {'X-Backend-Record-Type': 'shard', + 'X-Newest': 'true', + 'X-Backend-Include-Deleted': 'True', + 'X-Backend-Override-Deleted': 'true'} + params = {'format': 'json', 'marker': 'k', 'end_marker': 't'} + mock_swift.make_request.assert_called_once_with( + 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), + params=params) + # check own shard range bounds + own_shard_range = broker.get_own_shard_range() + # own shard range state has not changed (root is older) + self.assertEqual(ShardRange.ACTIVE, own_shard_range.state) + self.assertEqual(now, own_shard_range.state_timestamp) + self.assertEqual('k', own_shard_range.lower) + self.assertEqual('t', own_shard_range.upper) + + # reset own shard range bounds, failed response from root expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} own_shard_range = broker.get_own_shard_range() # get the default own_shard_range.lower = 'j' own_shard_range.upper = 'k' + own_shard_range.timestamp = Timestamp.now() broker.merge_shard_ranges([own_shard_range]) sharder, mock_swift = self.call_audit_container( broker, shard_ranges, @@ -4470,6 +4637,7 @@ class TestSharder(BaseTestSharder): self.assertFalse(lines[2:]) self.assertFalse(sharder.logger.get_lines_for_level('error')) self.assertFalse(broker.is_deleted()) + params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'} mock_swift.make_request.assert_called_once_with( 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), params=params) @@ -4486,6 +4654,8 @@ class TestSharder(BaseTestSharder): own_shard_range = broker.get_own_shard_range() self.assertEqual(ShardRange.SHARDING, own_shard_range.state) self.assertEqual(now, own_shard_range.state_timestamp) + self.assertEqual(['Updating own shard range from root', mock.ANY], + sharder.logger.get_lines_for_level('debug')) own_shard_range.update_state(ShardRange.SHARDED, state_timestamp=Timestamp.now()) @@ -4500,110 +4670,111 @@ class TestSharder(BaseTestSharder): self.assert_no_audit_messages(sharder, mock_swift) self.assertTrue(broker.is_deleted()) + def test_audit_old_style_shard_container(self): + self._do_test_audit_shard_container('Root', 'a/c') + def test_audit_shard_container(self): - broker = self._make_broker(account='.shards_a', container='shard_c') - broker.set_sharding_sysmeta('Quoted-Root', 'a/c') - # include overlaps to verify correct match for updating own shard range + self._do_test_audit_shard_container('Quoted-Root', 'a/c') + + def _do_test_audit_shard_container_merge_other_ranges(self, *args): + # verify that shard only merges other ranges from root when it is + # shrinking or shrunk shard_bounds = ( - ('a', 'j'), ('k', 't'), ('k', 's'), ('l', 's'), ('s', 'z')) - shard_ranges = self._make_shard_ranges(shard_bounds, ShardRange.ACTIVE) - shard_ranges[1].name = broker.path - expected_stats = {'attempted': 1, 'success': 0, 'failure': 1} + ('a', 'p'), ('k', 't'), ('p', 'u')) + shard_states = ( + ShardRange.ACTIVE, ShardRange.ACTIVE, ShardRange.FOUND, + ) + shard_ranges = self._make_shard_ranges(shard_bounds, shard_states) - # bad account name - broker.account = 'bad_account' - sharder, mock_swift = self.call_audit_container(broker, shard_ranges) - lines = sharder.logger.get_lines_for_level('warning') - self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0]) - self.assertIn('account not in shards namespace', lines[0]) - self.assertNotIn('root has no matching shard range', lines[0]) - self.assertNotIn('unable to get shard ranges from root', lines[0]) - self.assertIn('Audit failed for shard %s' % broker.db_file, lines[1]) - self.assertIn('missing own shard range', lines[1]) - self.assertFalse(lines[2:]) - self.assertFalse(broker.is_deleted()) + def check_audit(own_state, root_state): + broker = self._make_broker( + account='.shards_a', + container='shard_c_%s' % root_ts.normal) + broker.set_sharding_sysmeta(*args) + shard_ranges[1].name = broker.path - # missing own shard range - broker.get_info() - sharder, mock_swift = self.call_audit_container(broker, shard_ranges) - lines = sharder.logger.get_lines_for_level('warning') - self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertIn('Audit failed for shard %s' % broker.db_file, lines[0]) - self.assertIn('missing own shard range', lines[0]) - self.assertNotIn('unable to get shard ranges from root', lines[0]) - self.assertFalse(lines[1:]) - self.assertFalse(sharder.logger.get_lines_for_level('error')) - self.assertFalse(broker.is_deleted()) + # make own shard range match shard_ranges[1] + own_sr = shard_ranges[1] + expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} + self.assertTrue(own_sr.update_state(own_state, + state_timestamp=own_ts)) + own_sr.timestamp = own_ts + broker.merge_shard_ranges([shard_ranges[1]]) - # create own shard range, no match in root - expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} - own_shard_range = broker.get_own_shard_range() # get the default - own_shard_range.lower = 'j' - own_shard_range.upper = 'k' - broker.merge_shard_ranges([own_shard_range]) - sharder, mock_swift = self.call_audit_container(broker, shard_ranges) - lines = sharder.logger.get_lines_for_level('warning') - self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[0]) - self.assertNotIn('account not in shards namespace', lines[0]) - self.assertNotIn('missing own shard range', lines[0]) - self.assertIn('root has no matching shard range', lines[0]) - self.assertNotIn('unable to get shard ranges from root', lines[0]) - self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertFalse(lines[1:]) - self.assertFalse(sharder.logger.get_lines_for_level('error')) - self.assertFalse(broker.is_deleted()) - expected_headers = {'X-Backend-Record-Type': 'shard', - 'X-Newest': 'true', - 'X-Backend-Include-Deleted': 'True', - 'X-Backend-Override-Deleted': 'true'} - params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'} - mock_swift.make_request.assert_called_once_with( - 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), - params=params) + # bump state and timestamp of root shard_ranges[1] to be newer + self.assertTrue(shard_ranges[1].update_state( + root_state, state_timestamp=root_ts)) + shard_ranges[1].timestamp = root_ts + sharder, mock_swift = self.call_audit_container(broker, + shard_ranges) + self._assert_stats(expected_stats, sharder, 'audit_shard') + debug_lines = sharder.logger.get_lines_for_level('debug') + self.assertGreater(len(debug_lines), 0) + self.assertEqual('Updating own shard range from root', + debug_lines[0]) + self.assertFalse(sharder.logger.get_lines_for_level('warning')) + self.assertFalse(sharder.logger.get_lines_for_level('error')) + self.assertFalse(broker.is_deleted()) + expected_headers = {'X-Backend-Record-Type': 'shard', + 'X-Newest': 'true', + 'X-Backend-Include-Deleted': 'True', + 'X-Backend-Override-Deleted': 'true'} + params = {'format': 'json', 'marker': 'k', 'end_marker': 't'} + mock_swift.make_request.assert_called_once_with( + 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), + params=params) + return broker, shard_ranges - # create own shard range, failed response from root - expected_stats = {'attempted': 1, 'success': 1, 'failure': 0} - own_shard_range = broker.get_own_shard_range() # get the default - own_shard_range.lower = 'j' - own_shard_range.upper = 'k' - broker.merge_shard_ranges([own_shard_range]) - sharder, mock_swift = self.call_audit_container( - broker, shard_ranges, - exc=internal_client.UnexpectedResponse('bad', 'resp')) - lines = sharder.logger.get_lines_for_level('warning') - self.assertIn('Failed to get shard ranges', lines[0]) - self.assertIn('Audit warnings for shard %s' % broker.db_file, lines[1]) - self.assertNotIn('account not in shards namespace', lines[1]) - self.assertNotIn('missing own shard range', lines[1]) - self.assertNotIn('root has no matching shard range', lines[1]) - self.assertIn('unable to get shard ranges from root', lines[1]) - self._assert_stats(expected_stats, sharder, 'audit_shard') - self.assertFalse(lines[2:]) - self.assertFalse(sharder.logger.get_lines_for_level('error')) - self.assertFalse(broker.is_deleted()) - mock_swift.make_request.assert_called_once_with( - 'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,), - params=params) + # make root's copy of shard range newer than shard's local copy, so + # shard will always update its own shard range from root, and may merge + # other shard ranges + for own_state in ShardRange.STATES: + for root_state in ShardRange.STATES: + with annotate_failure('own_state=%s, root_state=%s' % + (own_state, root_state)): + own_ts = Timestamp.now() + root_ts = Timestamp(float(own_ts) + 1) + broker, shard_ranges = check_audit(own_state, root_state) + # own shard range is updated from newer root version + own_shard_range = broker.get_own_shard_range() + self.assertEqual(root_state, own_shard_range.state) + self.assertEqual(root_ts, own_shard_range.state_timestamp) + updated_ranges = broker.get_shard_ranges(include_own=True) + if root_state in (ShardRange.SHRINKING, ShardRange.SHRUNK): + # check other shard ranges from root are merged + self.assertEqual(shard_ranges, updated_ranges) + else: + # check other shard ranges from root are not merged + self.assertEqual(shard_ranges[1:2], updated_ranges) - # make own shard range match one in root, but different state - shard_ranges[1].timestamp = Timestamp.now() - broker.merge_shard_ranges([shard_ranges[1]]) - now = Timestamp.now() - shard_ranges[1].update_state(ShardRange.SHARDING, state_timestamp=now) - sharder, mock_swift = self.call_audit_container(broker, shard_ranges) - self.assert_no_audit_messages(sharder, mock_swift) - self.assertFalse(broker.is_deleted()) - # own shard range state is updated from root version - own_shard_range = broker.get_own_shard_range() - self.assertEqual(ShardRange.SHARDING, own_shard_range.state) - self.assertEqual(now, own_shard_range.state_timestamp) + # make root's copy of shard range older than shard's local copy, so + # shard will never update its own shard range from root, but may merge + # other shard ranges + for own_state in ShardRange.STATES: + for root_state in ShardRange.STATES: + with annotate_failure('own_state=%s, root_state=%s' % + (own_state, root_state)): + root_ts = Timestamp.now() + own_ts = Timestamp(float(root_ts) + 1) + broker, shard_ranges = check_audit(own_state, root_state) + # own shard range is not updated from older root version + own_shard_range = broker.get_own_shard_range() + self.assertEqual(own_state, own_shard_range.state) + self.assertEqual(own_ts, own_shard_range.state_timestamp) + updated_ranges = broker.get_shard_ranges(include_own=True) + if own_state in (ShardRange.SHRINKING, ShardRange.SHRUNK): + # check other shard ranges from root are merged + self.assertEqual(shard_ranges, updated_ranges) + else: + # check other shard ranges from root are not merged + self.assertEqual(shard_ranges[1:2], updated_ranges) - own_shard_range.update_state(ShardRange.SHARDED, - state_timestamp=Timestamp.now()) - broker.merge_shard_ranges([own_shard_range]) - sharder, mock_swift = self.call_audit_container(broker, shard_ranges) - self.assert_no_audit_messages(sharder, mock_swift) + def test_audit_old_style_shard_container_merge_other_ranges(self): + self._do_test_audit_shard_container_merge_other_ranges('Root', 'a/c') + + def test_audit_shard_container_merge_other_ranges(self): + self._do_test_audit_shard_container_merge_other_ranges('Quoted-Root', + 'a/c') def test_audit_deleted_range_in_root_container(self): broker = self._make_broker(account='.shards_a', container='shard_c') @@ -5354,8 +5525,10 @@ class TestCleavingContext(BaseTestSharder): self.assertEqual(0, ctx.ranges_done) self.assertEqual(0, ctx.ranges_todo) ctx.reset() + check_context() # check idempotency ctx.reset() + check_context() def test_start(self): ctx = CleavingContext('test', 'curs', 12, 11, 2, True, True) @@ -5371,5 +5544,30 @@ class TestCleavingContext(BaseTestSharder): self.assertEqual(0, ctx.ranges_done) self.assertEqual(0, ctx.ranges_todo) ctx.start() + check_context() # check idempotency ctx.start() + check_context() + + def test_range_done(self): + ctx = CleavingContext('test', '', 12, 11, 2, True, True) + self.assertEqual(0, ctx.ranges_done) + self.assertEqual(0, ctx.ranges_todo) + self.assertEqual('', ctx.cursor) + + ctx.ranges_todo = 5 + ctx.range_done('b') + self.assertEqual(1, ctx.ranges_done) + self.assertEqual(4, ctx.ranges_todo) + self.assertEqual('b', ctx.cursor) + + ctx.range_done(None) + self.assertEqual(2, ctx.ranges_done) + self.assertEqual(3, ctx.ranges_todo) + self.assertEqual('b', ctx.cursor) + + ctx.ranges_todo = 9 + ctx.range_done('c') + self.assertEqual(3, ctx.ranges_done) + self.assertEqual(8, ctx.ranges_todo) + self.assertEqual('c', ctx.cursor)