Add override_devices/override_partitions options

Change-Id: Id9a0bee6faf95af52d4c281dea5b896179e71a35
This commit is contained in:
Tim Burke 2018-02-27 11:25:11 +00:00
parent 37df7edec4
commit 45bf19d870
4 changed files with 91 additions and 39 deletions

View File

@ -17,7 +17,17 @@
from swift.container.sharder import ContainerSharder
from swift.common.utils import parse_options
from swift.common.daemon import run_daemon
from optparse import OptionParser
if __name__ == '__main__':
conf_file, options = parse_options(once=True)
parser = OptionParser("%prog CONFIG [options]")
parser.add_option('-d', '--devices',
help='Shard containers only on given devices. '
'Comma-separated list. '
'Only has effect if --once is used.')
parser.add_option('-p', '--partitions',
help='Shard containers only in given partitions. '
'Comma-separated list. '
'Only has effect if --once is used.')
conf_file, options = parse_options(parser=parser, once=True)
run_daemon(ContainerSharder, conf_file, **options)

View File

@ -637,13 +637,16 @@ class Server(object):
{'server': self.server, 'pid': pid, 'conf': conf_file})
return 0
def spawn(self, conf_file, once=False, wait=True, daemon=True, **kwargs):
def spawn(self, conf_file, once=False, wait=True, daemon=True,
additional_args=None, **kwargs):
"""Launch a subprocess for this server.
:param conf_file: path to conf_file to use as first arg
:param once: boolean, add once argument to command
:param wait: boolean, if true capture stdout with a pipe
:param daemon: boolean, if false ask server to log to console
:param additional_args: list of additional arguments to pass
on the command line
:returns: the pid of the spawned process
"""
@ -653,6 +656,10 @@ class Server(object):
if not daemon:
# ask the server to log to console
args.append('verbose')
if additional_args:
if isinstance(additional_args, str):
additional_args = [additional_args]
args.extend(additional_args)
# figure out what we're going to do with stdio
if not daemon:

View File

@ -33,7 +33,7 @@ from swift.common.constraints import check_drive, CONTAINER_LISTING_LIMIT
from swift.common.ring.utils import is_local_device
from swift.common.utils import get_logger, config_true_value, \
dump_recon_cache, whataremyips, Timestamp, ShardRange, GreenAsyncPile, \
config_float_value, config_positive_int_value, FileLikeIter
config_float_value, config_positive_int_value, FileLikeIter, list_from_csv
from swift.common.storage_policy import POLICIES
@ -704,7 +704,8 @@ class ContainerSharder(ContainerReplicator):
self.stats['containers_scanned'] += 1
self._periodic_report_stats()
def _one_shard_cycle(self):
def _one_shard_cycle(self, override_devices=None,
override_partitions=None):
"""
The main function, everything the sharder does forks from this method.
@ -719,26 +720,39 @@ class ContainerSharder(ContainerReplicator):
- Shrinking (check to see if we need to shrink this container).
"""
self.logger.info('Starting container sharding cycle')
if override_devices:
self.logger.info('(Override devices: %s)',
', '.join(override_devices))
if override_partitions:
self.logger.info('(Override partitions: %s)',
', '.join(override_partitions))
dirs = []
self.shard_cleanups = dict()
self.ips = whataremyips()
for node in self.ring.devs:
if node and is_local_device(self.ips, self.port,
node['replication_ip'],
node['replication_port']):
if not check_drive(self.root, node['device'],
self.mount_check):
self.logger.warn(
'Skipping %(device)s as it is not mounted' % node)
continue
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
# Populate self._local_device_ids so we can find
# handoffs for shards later
self._local_device_ids.add(node['id'])
dirs.append((datadir, node['id']))
if not node:
continue
if override_devices and node['device'] not in override_devices:
continue
if not is_local_device(self.ips, self.port,
node['replication_ip'],
node['replication_port']):
continue
if not check_drive(self.root, node['device'],
self.mount_check):
self.logger.warning(
'Skipping %(device)s as it is not mounted' % node)
continue
datadir = os.path.join(self.root, node['device'], self.datadir)
if os.path.isdir(datadir):
# Populate self._local_device_ids so we can find
# handoffs for shards later
self._local_device_ids.add(node['id'])
dirs.append((datadir, node['id']))
for part, path, node_id in db_replicator.roundrobin_datadirs(dirs):
# NB: get_part_nodes always provides an 'index' key
if override_partitions and part not in override_partitions:
continue
for node in self.ring.get_part_nodes(int(part)):
if node['id'] == node_id:
break
@ -1212,8 +1226,11 @@ class ContainerSharder(ContainerReplicator):
"""Run the container sharder once."""
self.logger.info('Begin container sharder "once" mode')
self._zero_stats()
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = list_from_csv(kwargs.get('partitions'))
begin = self.reported = time.time()
self._one_shard_cycle()
self._one_shard_cycle(override_devices=override_devices,
override_partitions=override_partitions)
elapsed = time.time() - begin
self.logger.info(
'Container sharder "once" mode completed: %.02fs', elapsed)

View File

@ -264,9 +264,9 @@ class TestContainerSharding(ReplProbeTest):
self.assertEqual(
str(expected_state), headers['X-Backend-Sharding-State'])
def get_node_numbers(self, account, container):
def get_part_and_node_numbers(self, account, container):
part, nodes = self.brain.ring.get_nodes(account, container)
return (n['id'] + 1 for n in nodes)
return part, (n['id'] + 1 for n in nodes)
def test_sharding_listing(self):
# verify parameterised listing of a container during sharding
@ -324,11 +324,13 @@ class TestContainerSharding(ReplProbeTest):
headers={'X-Container-Sharding': 'on'})
# First run the 'leader' in charge of scanning, which finds all shard
# ranges and cleaves first two
self.sharders.once(number=self.brain.node_numbers[0])
self.sharders.once(number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
# Then run sharder on other nodes which will also cleave first two
# shard ranges
for n in self.brain.node_numbers[1:]:
self.sharders.once(number=n)
self.sharders.once(
number=n, additional_args='--partitions=%s' % self.brain.part)
# sanity check shard range states
shard_ranges = self.get_container_shard_ranges()
@ -356,7 +358,7 @@ class TestContainerSharding(ReplProbeTest):
do_listing_checks(exp_obj_names)
# run all the sharders again and the last two shard ranges get cleaved
self.sharders.once()
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
shard_ranges = self.get_container_shard_ranges()
shard_ranges = [ShardRange.from_dict(d) for d in shard_ranges]
for shard_range in shard_ranges:
@ -394,7 +396,8 @@ class TestContainerSharding(ReplProbeTest):
pre_sharding_headers.get('x-container-sharding'))
# Only run the one in charge of scanning
self.sharders.once(number=self.brain.node_numbers[0])
self.sharders.once(number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
# Verify that we have one sharded db -- though the other normal DBs
# received the shard ranges that got defined
@ -430,7 +433,7 @@ class TestContainerSharding(ReplProbeTest):
self.assertLengthEqual(found['normal_dbs'], 2)
# Now that everyone has shard ranges, run *everyone*
self.sharders.once()
self.sharders.once(additional_args='--partitions=%s' % self.brain.part)
# Verify that we only have shard dbs now
found = self.categorize_container_dir_content()
@ -448,9 +451,9 @@ class TestContainerSharding(ReplProbeTest):
for orig, updated in zip(orig_root_shard_ranges,
broker.get_shard_ranges()):
self.assertGreaterEqual(updated.state_timestamp,
orig['meta_timestamp'])
self.assertGreaterEqual(updated.meta_timestamp,
orig['state_timestamp'])
self.assertGreaterEqual(updated.meta_timestamp,
orig['meta_timestamp'])
# Check that entire listing is available
headers, actual_listing = self.assert_container_listing(obj_names)
@ -486,11 +489,14 @@ class TestContainerSharding(ReplProbeTest):
# ... but, we've added enough that we need to shard *again* into three
# new shards which takes two sharder cycles to cleave in batches of 2
shard = ShardRange.from_dict(orig_root_shard_ranges[0])
node_nums = self.get_node_numbers(shard.account, shard.container)
# run first cycle of sharders in order, leader first, to get to
# predictable state where all nodes have cleaved 2 out of 3 ranges
for node_number in node_nums:
self.sharders.once(number=node_number)
shard_part, shard_nodes = self.get_part_and_node_numbers(
shard.account, shard.container)
for node_number in shard_nodes:
self.sharders.once(
number=node_number,
additional_args='--partitions=%s' % shard_part)
self.assert_container_listing(more_obj_names + obj_names)
# add another object that lands in the first of the new sub-shards
self.put_objects(['alpha'])
@ -498,7 +504,7 @@ class TestContainerSharding(ReplProbeTest):
# ... and this should appear in container listing
self.assert_container_listing(['alpha'] + more_obj_names + obj_names)
# Run sharders again so things settle.
self.sharders.once()
self.sharders.once(additional_args='--partitions=%s' % shard_part)
# TODO: assert that 3 new shards are now in ACTIVE state
headers, final_listing = self.assert_container_listing(
['alpha'] + more_obj_names + obj_names)
@ -607,7 +613,8 @@ class TestContainerSharding(ReplProbeTest):
#
# Note that during this shard cycle the leader replicates to other
# nodes so they will end up with ~2 * max * 4/5 objects.
self.sharders.once(number=self.brain.node_numbers[0])
self.sharders.once(number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
# Verify that we have one shard db -- though the other normal DBs
# received the shard ranges that got defined
@ -648,7 +655,9 @@ class TestContainerSharding(ReplProbeTest):
# Run the other sharders so we're all in (roughly) the same state
for n in self.brain.node_numbers[1:]:
self.sharders.once(number=n)
self.sharders.once(
number=n,
additional_args='--partitions=%s' % self.brain.part)
found = self.categorize_container_dir_content()
self.assertLengthEqual(found['shard_dbs'], 3)
self.assertLengthEqual(found['normal_dbs'], 3)
@ -771,7 +780,9 @@ class TestContainerSharding(ReplProbeTest):
self.assert_container_listing(obj_names)
# Only run the one in charge of scanning
self.sharders.once(number=self.brain.node_numbers[0])
self.sharders.once(
number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
# check root container
root_nodes_data = self.direct_get_container_shard_ranges()
@ -814,7 +825,8 @@ class TestContainerSharding(ReplProbeTest):
self.assertEqual(exp_obj_count, total_shard_object_count)
# Now that everyone has shard ranges, run *everyone*
self.sharders.once()
self.sharders.once(
additional_args='--partitions=%s' % self.brain.part)
# all root container nodes should now be in sharded state
root_nodes_data = self.direct_get_container_shard_ranges()
@ -1004,7 +1016,9 @@ class TestContainerSharding(ReplProbeTest):
# shard the container - first two shard ranges are cleaved
for number in node_numbers[:2]:
self.sharders.once(number=number)
self.sharders.once(
number=number,
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_listing(obj_names) # sanity check
return obj_names
@ -1042,7 +1056,9 @@ class TestContainerSharding(ReplProbeTest):
# complete cleaving third shard range...
for number in node_numbers[:2]:
self.sharders.once(number=number)
self.sharders.once(
number=number,
additional_args='--partitions=%s' % self.brain.part)
# ...and now in sharded state
self.assert_container_state(node_numbers[0], SHARDED, 3)
self.assert_container_state(node_numbers[1], SHARDED, 3)
@ -1098,5 +1114,7 @@ class TestContainerSharding(ReplProbeTest):
# misplaced objects get moved on next sharder cycle...
for number in node_numbers[:2]:
self.sharders.once(number=number)
self.sharders.once(
number=number,
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_listing(['alpha'] + obj_names)