From 45bf19d870dd196f5f2ac59cac4813e24bc01dbb Mon Sep 17 00:00:00 2001 From: Tim Burke Date: Tue, 27 Feb 2018 11:25:11 +0000 Subject: [PATCH] Add override_devices/override_partitions options Change-Id: Id9a0bee6faf95af52d4c281dea5b896179e71a35 --- bin/swift-container-sharder | 12 +++++++- swift/common/manager.py | 9 +++++- swift/container/sharder.py | 51 +++++++++++++++++++++----------- test/probe/test_sharder.py | 58 ++++++++++++++++++++++++------------- 4 files changed, 91 insertions(+), 39 deletions(-) diff --git a/bin/swift-container-sharder b/bin/swift-container-sharder index 37604bdc49..3e6551319b 100755 --- a/bin/swift-container-sharder +++ b/bin/swift-container-sharder @@ -17,7 +17,17 @@ from swift.container.sharder import ContainerSharder from swift.common.utils import parse_options from swift.common.daemon import run_daemon +from optparse import OptionParser if __name__ == '__main__': - conf_file, options = parse_options(once=True) + parser = OptionParser("%prog CONFIG [options]") + parser.add_option('-d', '--devices', + help='Shard containers only on given devices. ' + 'Comma-separated list. ' + 'Only has effect if --once is used.') + parser.add_option('-p', '--partitions', + help='Shard containers only in given partitions. ' + 'Comma-separated list. ' + 'Only has effect if --once is used.') + conf_file, options = parse_options(parser=parser, once=True) run_daemon(ContainerSharder, conf_file, **options) diff --git a/swift/common/manager.py b/swift/common/manager.py index 9a42e5a6fb..71f9e689b3 100644 --- a/swift/common/manager.py +++ b/swift/common/manager.py @@ -637,13 +637,16 @@ class Server(object): {'server': self.server, 'pid': pid, 'conf': conf_file}) return 0 - def spawn(self, conf_file, once=False, wait=True, daemon=True, **kwargs): + def spawn(self, conf_file, once=False, wait=True, daemon=True, + additional_args=None, **kwargs): """Launch a subprocess for this server. :param conf_file: path to conf_file to use as first arg :param once: boolean, add once argument to command :param wait: boolean, if true capture stdout with a pipe :param daemon: boolean, if false ask server to log to console + :param additional_args: list of additional arguments to pass + on the command line :returns: the pid of the spawned process """ @@ -653,6 +656,10 @@ class Server(object): if not daemon: # ask the server to log to console args.append('verbose') + if additional_args: + if isinstance(additional_args, str): + additional_args = [additional_args] + args.extend(additional_args) # figure out what we're going to do with stdio if not daemon: diff --git a/swift/container/sharder.py b/swift/container/sharder.py index 6aec6ea9ae..bb2ee6daad 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -33,7 +33,7 @@ from swift.common.constraints import check_drive, CONTAINER_LISTING_LIMIT from swift.common.ring.utils import is_local_device from swift.common.utils import get_logger, config_true_value, \ dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \ - config_float_value, config_positive_int_value, FileLikeIter + config_float_value, config_positive_int_value, FileLikeIter, list_from_csv from swift.common.storage_policy import POLICIES @@ -704,7 +704,8 @@ class ContainerSharder(ContainerReplicator): self.stats['containers_scanned'] += 1 self._periodic_report_stats() - def _one_shard_cycle(self): + def _one_shard_cycle(self, override_devices=None, + override_partitions=None): """ The main function, everything the sharder does forks from this method. @@ -719,26 +720,39 @@ class ContainerSharder(ContainerReplicator): - Shrinking (check to see if we need to shrink this container). """ self.logger.info('Starting container sharding cycle') + if override_devices: + self.logger.info('(Override devices: %s)', + ', '.join(override_devices)) + if override_partitions: + self.logger.info('(Override partitions: %s)', + ', '.join(override_partitions)) dirs = [] self.shard_cleanups = dict() self.ips = whataremyips() for node in self.ring.devs: - if node and is_local_device(self.ips, self.port, - node['replication_ip'], - node['replication_port']): - if not check_drive(self.root, node['device'], - self.mount_check): - self.logger.warn( - 'Skipping %(device)s as it is not mounted' % node) - continue - datadir = os.path.join(self.root, node['device'], self.datadir) - if os.path.isdir(datadir): - # Populate self._local_device_ids so we can find - # handoffs for shards later - self._local_device_ids.add(node['id']) - dirs.append((datadir, node['id'])) + if not node: + continue + if override_devices and node['device'] not in override_devices: + continue + if not is_local_device(self.ips, self.port, + node['replication_ip'], + node['replication_port']): + continue + if not check_drive(self.root, node['device'], + self.mount_check): + self.logger.warning( + 'Skipping %(device)s as it is not mounted' % node) + continue + datadir = os.path.join(self.root, node['device'], self.datadir) + if os.path.isdir(datadir): + # Populate self._local_device_ids so we can find + # handoffs for shards later + self._local_device_ids.add(node['id']) + dirs.append((datadir, node['id'])) for part, path, node_id in db_replicator.roundrobin_datadirs(dirs): # NB: get_part_nodes always provides an 'index' key + if override_partitions and part not in override_partitions: + continue for node in self.ring.get_part_nodes(int(part)): if node['id'] == node_id: break @@ -1212,8 +1226,11 @@ class ContainerSharder(ContainerReplicator): """Run the container sharder once.""" self.logger.info('Begin container sharder "once" mode') self._zero_stats() + override_devices = list_from_csv(kwargs.get('devices')) + override_partitions = list_from_csv(kwargs.get('partitions')) begin = self.reported = time.time() - self._one_shard_cycle() + self._one_shard_cycle(override_devices=override_devices, + override_partitions=override_partitions) elapsed = time.time() - begin self.logger.info( 'Container sharder "once" mode completed: %.02fs', elapsed) diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index bd9a5ce430..301c392fae 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -264,9 +264,9 @@ class TestContainerSharding(ReplProbeTest): self.assertEqual( str(expected_state), headers['X-Backend-Sharding-State']) - def get_node_numbers(self, account, container): + def get_part_and_node_numbers(self, account, container): part, nodes = self.brain.ring.get_nodes(account, container) - return (n['id'] + 1 for n in nodes) + return part, (n['id'] + 1 for n in nodes) def test_sharding_listing(self): # verify parameterised listing of a container during sharding @@ -324,11 +324,13 @@ class TestContainerSharding(ReplProbeTest): headers={'X-Container-Sharding': 'on'}) # First run the 'leader' in charge of scanning, which finds all shard # ranges and cleaves first two - self.sharders.once(number=self.brain.node_numbers[0]) + self.sharders.once(number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) # Then run sharder on other nodes which will also cleave first two # shard ranges for n in self.brain.node_numbers[1:]: - self.sharders.once(number=n) + self.sharders.once( + number=n, additional_args='--partitions=%s' % self.brain.part) # sanity check shard range states shard_ranges = self.get_container_shard_ranges() @@ -356,7 +358,7 @@ class TestContainerSharding(ReplProbeTest): do_listing_checks(exp_obj_names) # run all the sharders again and the last two shard ranges get cleaved - self.sharders.once() + self.sharders.once(additional_args='--partitions=%s' % self.brain.part) shard_ranges = self.get_container_shard_ranges() shard_ranges = [ShardRange.from_dict(d) for d in shard_ranges] for shard_range in shard_ranges: @@ -394,7 +396,8 @@ class TestContainerSharding(ReplProbeTest): pre_sharding_headers.get('x-container-sharding')) # Only run the one in charge of scanning - self.sharders.once(number=self.brain.node_numbers[0]) + self.sharders.once(number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) # Verify that we have one sharded db -- though the other normal DBs # received the shard ranges that got defined @@ -430,7 +433,7 @@ class TestContainerSharding(ReplProbeTest): self.assertLengthEqual(found['normal_dbs'], 2) # Now that everyone has shard ranges, run *everyone* - self.sharders.once() + self.sharders.once(additional_args='--partitions=%s' % self.brain.part) # Verify that we only have shard dbs now found = self.categorize_container_dir_content() @@ -448,9 +451,9 @@ class TestContainerSharding(ReplProbeTest): for orig, updated in zip(orig_root_shard_ranges, broker.get_shard_ranges()): self.assertGreaterEqual(updated.state_timestamp, - orig['meta_timestamp']) - self.assertGreaterEqual(updated.meta_timestamp, orig['state_timestamp']) + self.assertGreaterEqual(updated.meta_timestamp, + orig['meta_timestamp']) # Check that entire listing is available headers, actual_listing = self.assert_container_listing(obj_names) @@ -486,11 +489,14 @@ class TestContainerSharding(ReplProbeTest): # ... but, we've added enough that we need to shard *again* into three # new shards which takes two sharder cycles to cleave in batches of 2 shard = ShardRange.from_dict(orig_root_shard_ranges[0]) - node_nums = self.get_node_numbers(shard.account, shard.container) # run first cycle of sharders in order, leader first, to get to # predictable state where all nodes have cleaved 2 out of 3 ranges - for node_number in node_nums: - self.sharders.once(number=node_number) + shard_part, shard_nodes = self.get_part_and_node_numbers( + shard.account, shard.container) + for node_number in shard_nodes: + self.sharders.once( + number=node_number, + additional_args='--partitions=%s' % shard_part) self.assert_container_listing(more_obj_names + obj_names) # add another object that lands in the first of the new sub-shards self.put_objects(['alpha']) @@ -498,7 +504,7 @@ class TestContainerSharding(ReplProbeTest): # ... and this should appear in container listing self.assert_container_listing(['alpha'] + more_obj_names + obj_names) # Run sharders again so things settle. - self.sharders.once() + self.sharders.once(additional_args='--partitions=%s' % shard_part) # TODO: assert that 3 new shards are now in ACTIVE state headers, final_listing = self.assert_container_listing( ['alpha'] + more_obj_names + obj_names) @@ -607,7 +613,8 @@ class TestContainerSharding(ReplProbeTest): # # Note that during this shard cycle the leader replicates to other # nodes so they will end up with ~2 * max * 4/5 objects. - self.sharders.once(number=self.brain.node_numbers[0]) + self.sharders.once(number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) # Verify that we have one shard db -- though the other normal DBs # received the shard ranges that got defined @@ -648,7 +655,9 @@ class TestContainerSharding(ReplProbeTest): # Run the other sharders so we're all in (roughly) the same state for n in self.brain.node_numbers[1:]: - self.sharders.once(number=n) + self.sharders.once( + number=n, + additional_args='--partitions=%s' % self.brain.part) found = self.categorize_container_dir_content() self.assertLengthEqual(found['shard_dbs'], 3) self.assertLengthEqual(found['normal_dbs'], 3) @@ -771,7 +780,9 @@ class TestContainerSharding(ReplProbeTest): self.assert_container_listing(obj_names) # Only run the one in charge of scanning - self.sharders.once(number=self.brain.node_numbers[0]) + self.sharders.once( + number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) # check root container root_nodes_data = self.direct_get_container_shard_ranges() @@ -814,7 +825,8 @@ class TestContainerSharding(ReplProbeTest): self.assertEqual(exp_obj_count, total_shard_object_count) # Now that everyone has shard ranges, run *everyone* - self.sharders.once() + self.sharders.once( + additional_args='--partitions=%s' % self.brain.part) # all root container nodes should now be in sharded state root_nodes_data = self.direct_get_container_shard_ranges() @@ -1004,7 +1016,9 @@ class TestContainerSharding(ReplProbeTest): # shard the container - first two shard ranges are cleaved for number in node_numbers[:2]: - self.sharders.once(number=number) + self.sharders.once( + number=number, + additional_args='--partitions=%s' % self.brain.part) self.assert_container_listing(obj_names) # sanity check return obj_names @@ -1042,7 +1056,9 @@ class TestContainerSharding(ReplProbeTest): # complete cleaving third shard range... for number in node_numbers[:2]: - self.sharders.once(number=number) + self.sharders.once( + number=number, + additional_args='--partitions=%s' % self.brain.part) # ...and now in sharded state self.assert_container_state(node_numbers[0], SHARDED, 3) self.assert_container_state(node_numbers[1], SHARDED, 3) @@ -1098,5 +1114,7 @@ class TestContainerSharding(ReplProbeTest): # misplaced objects get moved on next sharder cycle... for number in node_numbers[:2]: - self.sharders.once(number=number) + self.sharders.once( + number=number, + additional_args='--partitions=%s' % self.brain.part) self.assert_container_listing(['alpha'] + obj_names)