swift-manage-shard-ranges: add repair and analyze commands

Adds a repair command that reads shard ranges from the container DB,
identifies overlapping shard ranges and recommends how overlaps can be
repaired by shrinking into a single chosen path.  Prompted user input
or a '-y' option will then cause the appropriately modified shard
ranges to be merged into the container DB.

The repair command does not fix gaps in shard range paths and can
therefore only succeed if there is at least one unbroken path of
shards through the entire namespace.

Also adds an analyze command that loads shard data from a file and
reports overlapping shard ranges, but takes no action.  The analyze
command is similar to the repair command but reads shard data from
file rather than a container db and makes no changes to the db.

e.g.:
  swift-manage-shard-ranges <db-file-name> repair
  swift-manage-shard-ranges <shard-data.json> analyze

to see more detail:
  swift-manage-shard-ranges -v <shard-data.json> analyze

For consistency with the new repair command, and to be more cautious,
this patch changes the user input required to apply the compact
command changes to the DB from 'y' to 'yes'.

Change-Id: I9ec411462e4aaf9f21aba6c5fd7698ff75a07de3
This commit is contained in:
Alistair Coles 2020-12-04 20:49:59 +00:00
parent b17dd7ec75
commit c9c42c07c9
9 changed files with 1255 additions and 83 deletions

View File

@ -165,11 +165,12 @@ import time
from six.moves import input
from swift.common.utils import Timestamp, get_logger, ShardRange, readconf, \
config_percent_value, config_positive_int_value
config_percent_value, config_positive_int_value, ShardRangeList
from swift.container.backend import ContainerBroker, UNSHARDED
from swift.container.sharder import make_shard_ranges, sharding_enabled, \
CleavingContext, process_compactible_shard_sequences, \
find_compactible_shard_sequences, find_overlapping_ranges, \
find_paths, rank_paths, finalize_shrinking, \
DEFAULT_MAX_SHRINKING, DEFAULT_MAX_EXPANDING, \
DEFAULT_SHARD_CONTAINER_THRESHOLD, DEFAULT_SHARD_SHRINK_POINT, \
DEFAULT_SHARD_MERGE_POINT
@ -179,6 +180,25 @@ DEFAULT_SHRINK_THRESHOLD = DEFAULT_SHARD_CONTAINER_THRESHOLD * \
config_percent_value(DEFAULT_SHARD_SHRINK_POINT)
class ManageShardRangesException(Exception):
pass
class GapsFoundException(ManageShardRangesException):
pass
class InvalidStateException(ManageShardRangesException):
pass
class InvalidSolutionException(ManageShardRangesException):
def __init__(self, msg, acceptor_path, overlapping_donors):
super(InvalidSolutionException, self).__init__(msg)
self.acceptor_path = acceptor_path
self.overlapping_donors = overlapping_donors
def _print_shard_range(sr, level=0):
indent = ' ' * level
print(indent + '%r' % sr.name)
@ -187,16 +207,19 @@ def _print_shard_range(sr, level=0):
print(indent + ' state: %9s upper: %r' % (sr.state_text, sr.upper_str))
def _load_and_validate_shard_data(args):
def _load_and_validate_shard_data(args, require_index=True):
required_keys = ['lower', 'upper', 'object_count']
if require_index:
required_keys.append('index')
try:
with open(args.input, 'r') as fd:
try:
data = json.load(fd)
if not isinstance(data, list):
raise ValueError('Shard data must be a list of dicts')
for k in ('lower', 'upper', 'index', 'object_count'):
for k in required_keys:
for shard in data:
shard[k]
shard[k] # trigger KeyError for missing required key
return data
except (TypeError, ValueError, KeyError) as err:
print('Failed to load valid shard range data: %r' % err,
@ -473,8 +496,8 @@ def compact_shard_ranges(broker, args):
_print_shard_range(acceptor, level=1)
print('Once applied to the broker these changes will result in shard '
'range compaction the next time the sharder runs.')
choice = input('Do you want to apply these changes? [y/N]')
if choice != 'y':
choice = input('Do you want to apply these changes? [yes/N]')
if choice != 'yes':
print('No changes applied')
return 0
@ -486,6 +509,160 @@ def compact_shard_ranges(broker, args):
return 0
def _find_overlapping_donors(shard_ranges, own_sr, args):
shard_ranges = ShardRangeList(shard_ranges)
if ShardRange.SHARDING in shard_ranges.states:
# This may be over-cautious, but for now we'll avoid dealing with
# SHARDING shards (which by design will temporarily overlap with their
# sub-shards) and require repair to be re-tried once sharding has
# completed. Note that once a shard ranges moves from SHARDING to
# SHARDED state and is deleted, some replicas of the shard may still be
# in the process of sharding but we cannot detect that at the root.
raise InvalidStateException('Found shard ranges in sharding state')
if ShardRange.SHRINKING in shard_ranges.states:
# Also stop now if there are SHRINKING shard ranges: we would need to
# ensure that these were not chosen as acceptors, but for now it is
# simpler to require repair to be re-tried once shrinking has
# completes.
raise InvalidStateException('Found shard ranges in shrinking state')
paths = find_paths(shard_ranges)
ranked_paths = rank_paths(paths, own_sr)
if not (ranked_paths and ranked_paths[0].includes(own_sr)):
# individual paths do not have gaps within them; if no path spans the
# entire namespace then there must be a gap in the shard_ranges
raise GapsFoundException
# simple repair strategy: choose the highest ranked complete sequence and
# shrink all other shard ranges into it
acceptor_path = ranked_paths[0]
acceptor_names = set(sr.name for sr in acceptor_path)
overlapping_donors = ShardRangeList([sr for sr in shard_ranges
if sr.name not in acceptor_names])
# check that the solution makes sense: if the acceptor path has the most
# progressed continuous cleaving, which has reached cleaved_upper, then we
# don't expect any shard ranges beyond cleaved_upper to be in states
# CLEAVED or ACTIVE, otherwise there should have been a better acceptor
# path that reached them.
cleaved_states = {ShardRange.CLEAVED, ShardRange.ACTIVE}
cleaved_upper = acceptor_path.find_lower(
lambda sr: sr.state not in cleaved_states)
beyond_cleaved = acceptor_path.filter(marker=cleaved_upper)
if beyond_cleaved.states.intersection(cleaved_states):
raise InvalidSolutionException(
'Isolated cleaved and/or active shard ranges in acceptor path',
acceptor_path, overlapping_donors)
beyond_cleaved = overlapping_donors.filter(marker=cleaved_upper)
if beyond_cleaved.states.intersection(cleaved_states):
raise InvalidSolutionException(
'Isolated cleaved and/or active shard ranges in donor ranges',
acceptor_path, overlapping_donors)
return acceptor_path, overlapping_donors
def print_repair_solution(acceptor_path, overlapping_donors):
print('Donors:')
for donor in sorted(overlapping_donors):
_print_shard_range(donor, level=1)
print('Acceptors:')
for acceptor in acceptor_path:
_print_shard_range(acceptor, level=1)
def find_repair_solution(shard_ranges, own_sr, args):
try:
acceptor_path, overlapping_donors = _find_overlapping_donors(
shard_ranges, own_sr, args)
except GapsFoundException:
print('Found no complete sequence of shard ranges.')
print('Repairs necessary to fill gaps.')
print('Gap filling not supported by this tool. No repairs performed.')
raise
except InvalidStateException as exc:
print('WARNING: %s' % exc)
print('No repairs performed.')
raise
except InvalidSolutionException as exc:
print('ERROR: %s' % exc)
print_repair_solution(exc.acceptor_path, exc.overlapping_donors)
print('No repairs performed.')
raise
if not overlapping_donors:
print('Found one complete sequence of %d shard ranges and no '
'overlapping shard ranges.' % len(acceptor_path))
print('No repairs necessary.')
return None, None
print('Repairs necessary to remove overlapping shard ranges.')
print('Chosen a complete sequence of %d shard ranges with current total '
'of %d object records to accept object records from %d overlapping '
'donor shard ranges.' %
(len(acceptor_path), acceptor_path.object_count,
len(overlapping_donors)))
if args.verbose:
print_repair_solution(acceptor_path, overlapping_donors)
print('Once applied to the broker these changes will result in:')
print(' %d shard ranges being removed.' % len(overlapping_donors))
print(' %d object records being moved to the chosen shard ranges.'
% overlapping_donors.object_count)
return acceptor_path, overlapping_donors
def repair_shard_ranges(broker, args):
if not broker.is_root_container():
print('WARNING: Shard containers cannot be repaired.')
print('This command should be used on a root container.')
return 2
shard_ranges = broker.get_shard_ranges()
if not shard_ranges:
print('No shards found, nothing to do.')
return 0
own_sr = broker.get_own_shard_range()
try:
acceptor_path, overlapping_donors = find_repair_solution(
shard_ranges, own_sr, args)
except ManageShardRangesException:
return 1
if not acceptor_path:
return 0
if not args.yes:
choice = input('Do you want to apply these changes to the container '
'DB? [yes/N]')
if choice != 'yes':
print('No changes applied')
return 0
# merge changes to the broker...
# note: acceptors do not need to be modified since they already span the
# complete range
ts_now = Timestamp.now()
finalize_shrinking(broker, [], overlapping_donors, ts_now)
print('Updated %s donor shard ranges.' % len(overlapping_donors))
print('Run container-replicator to replicate the changes to other nodes.')
print('Run container-sharder on all nodes to repair shards.')
return 0
def analyze_shard_ranges(args):
shard_data = _load_and_validate_shard_data(args, require_index=False)
shard_ranges = [ShardRange.from_dict(data) for data in shard_data]
whole_sr = ShardRange('whole/namespace', 0)
try:
find_repair_solution(shard_ranges, whole_sr, args)
except ManageShardRangesException:
return 1
return 0
def _positive_int(arg):
val = int(arg)
if val <= 0:
@ -519,14 +696,29 @@ def _add_enable_args(parser):
help='DB timeout to use when enabling sharding.')
def _add_yes_arg(parser):
parser.add_argument(
'--yes', '-y', action='store_true', default=False,
help='Apply shard range changes to broker without prompting.')
def _make_parser():
parser = argparse.ArgumentParser(description='Manage shard ranges')
parser.add_argument('container_db')
parser.add_argument('path_to_file',
help='Path to a container DB file or, for the analyze '
'subcommand, a shard data file.')
parser.add_argument('--config', dest='conf_file', required=False,
help='Path to config file with [container-sharder] '
'section')
parser.add_argument('--verbose', '-v', action='count', default=0,
help='Increase output verbosity')
# this is useful for probe tests that shard containers with unrealistically
# low numbers of objects, of which a significant proportion may still be in
# the pending file
parser.add_argument(
'--force-commits', action='store_true', default=False,
help='Force broker to commit pending object updates before finding '
'shard ranges. By default the broker will skip commits.')
subparsers = parser.add_subparsers(
dest='subcommand', help='Sub-command help', title='Sub-commands')
@ -595,9 +787,7 @@ def _make_parser():
'compact',
help='Compact shard ranges with less than the shrink-threshold number '
'of rows. This command only works on root containers.')
compact_parser.add_argument(
'--yes', '-y', action='store_true', default=False,
help='Apply shard range changes to broker without prompting.')
_add_yes_arg(compact_parser)
compact_parser.add_argument('--shrink-threshold', nargs='?',
type=_positive_int,
default=None,
@ -633,6 +823,21 @@ def _make_parser():
'expanded. Defaults to unlimited.')
compact_parser.set_defaults(func=compact_shard_ranges)
# repair
repair_parser = subparsers.add_parser(
'repair',
help='Repair overlapping shard ranges. No action will be taken '
'without user confirmation unless the -y option is used.')
_add_yes_arg(repair_parser)
repair_parser.set_defaults(func=repair_shard_ranges)
# analyze
analyze_parser = subparsers.add_parser(
'analyze',
help='Analyze shard range json data read from file. Use -v to see '
'more detailed analysis.')
analyze_parser.set_defaults(func=analyze_shard_ranges)
return parser
@ -648,6 +853,7 @@ def main(args=None):
parser.print_help()
print('\nA sub-command is required.')
return 1
conf = {}
rows_per_shard = DEFAULT_ROWS_PER_SHARD
shrink_threshold = DEFAULT_SHRINK_THRESHOLD
@ -688,16 +894,21 @@ def main(args=None):
if "rows_per_shard" in args and args.rows_per_shard is None:
args.rows_per_shard = rows_per_shard
logger = get_logger(conf, name='ContainerBroker', log_to_console=True)
broker = ContainerBroker(os.path.realpath(args.container_db),
logger=logger, skip_commits=True)
if args.func in (analyze_shard_ranges,):
args.input = args.path_to_file
return args.func(args) or 0
logger = get_logger({}, name='ContainerBroker', log_to_console=True)
broker = ContainerBroker(os.path.realpath(args.path_to_file),
logger=logger,
skip_commits=not args.force_commits)
try:
broker.get_info()
except Exception as exc:
print('Error opening container DB %s: %s' % (args.container_db, exc),
print('Error opening container DB %s: %s' % (args.path_to_file, exc),
file=sys.stderr)
return 2
print('Loaded db broker for %s.' % broker.path, file=sys.stderr)
print('Loaded db broker for %s' % broker.path, file=sys.stderr)
return args.func(broker, args)

View File

@ -5321,17 +5321,19 @@ class ShardRange(object):
valid state number.
"""
try:
state = state.lower()
state_num = cls.STATES_BY_NAME[state]
except (KeyError, AttributeError):
try:
state_name = cls.STATES[state]
except KeyError:
raise ValueError('Invalid state %r' % state)
else:
state_num = state
else:
state_name = state
# maybe it's a number
float_state = float(state)
state_num = int(float_state)
if state_num != float_state:
raise ValueError('Invalid state %r' % state)
state_name = cls.STATES[state_num]
except (ValueError, TypeError):
# maybe it's a state name
state_name = state.lower()
state_num = cls.STATES_BY_NAME[state_name]
except (KeyError, AttributeError):
raise ValueError('Invalid state %r' % state)
return state_num, state_name
@property
@ -5340,14 +5342,7 @@ class ShardRange(object):
@state.setter
def state(self, state):
try:
float_state = float(state)
int_state = int(float_state)
except (ValueError, TypeError):
raise ValueError('Invalid state %r' % state)
if int_state != float_state or int_state not in self.STATES:
raise ValueError('Invalid state %r' % state)
self._state = int_state
self._state = self.resolve_state(state)[0]
@property
def state_text(self):
@ -5635,6 +5630,14 @@ class ShardRangeList(UserList):
"""
return sum(sr.bytes_used for sr in self)
@property
def timestamps(self):
return set(sr.timestamp for sr in self)
@property
def states(self):
return set(sr.state for sr in self)
def includes(self, other):
"""
Check if another ShardRange namespace is enclosed between the list's
@ -5652,6 +5655,44 @@ class ShardRangeList(UserList):
"""
return self.lower <= other.lower and self.upper >= other.upper
def filter(self, includes=None, marker=None, end_marker=None):
"""
Filter the list for those shard ranges whose namespace includes the
``includes`` name or any part of the namespace between ``marker`` and
``end_marker``. If none of ``includes``, ``marker`` or ``end_marker``
are specified then all shard ranges will be returned.
:param includes: a string; if not empty then only the shard range, if
any, whose namespace includes this string will be returned, and
``marker`` and ``end_marker`` will be ignored.
:param marker: if specified then only shard ranges whose upper bound is
greater than this value will be returned.
:param end_marker: if specified then only shard ranges whose lower
bound is less than this value will be returned.
:return: A new instance of :class:`~swift.common.utils.ShardRangeList`
containing the filtered shard ranges.
"""
return ShardRangeList(
filter_shard_ranges(self, includes, marker, end_marker))
def find_lower(self, condition):
"""
Finds the first shard range satisfies the given condition and returns
its lower bound.
:param condition: A function that must accept a single argument of type
:class:`~swift.common.utils.ShardRange` and return True if the
shard range satisfies the condition or False otherwise.
:return: The lower bound of the first shard range to satisfy the
condition, or the ``upper`` value of this list if no such shard
range is found.
"""
for sr in self:
if condition(sr):
return sr.lower
return self.upper
def find_shard_range(item, ranges):
"""
@ -5670,6 +5711,22 @@ def find_shard_range(item, ranges):
def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
"""
Filter the given shard ranges to those whose namespace includes the
``includes`` name or any part of the namespace between ``marker`` and
``end_marker``. If none of ``includes``, ``marker`` or ``end_marker`` are
specified then all shard ranges will be returned.
:param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`.
:param includes: a string; if not empty then only the shard range, if any,
whose namespace includes this string will be returned, and ``marker``
and ``end_marker`` will be ignored.
:param marker: if specified then only shard ranges whose upper bound is
greater than this value will be returned.
:param end_marker: if specified then only shard ranges whose lower bound is
less than this value will be returned.
:return: A filtered list of :class:`~swift.common.utils.ShardRange`.
"""
if includes:
shard_range = find_shard_range(includes, shard_ranges)
return [shard_range] if shard_range else []
@ -5685,6 +5742,10 @@ def filter_shard_ranges(shard_ranges, includes, marker, end_marker):
if marker or end_marker:
return list(filter(shard_range_filter, shard_ranges))
if marker == ShardRange.MAX or end_marker == ShardRange.MIN:
# MIN and MAX are both Falsy so not handled by shard_range_filter
return []
return shard_ranges

View File

@ -32,7 +32,7 @@ from swift.common.utils import Timestamp, encode_timestamps, \
decode_timestamps, extract_swift_bytes, storage_directory, hash_path, \
ShardRange, renamer, MD5_OF_EMPTY_STRING, mkdirs, get_db_files, \
parse_db_filename, make_db_file_path, split_path, RESERVED_BYTE, \
filter_shard_ranges
filter_shard_ranges, ShardRangeList
from swift.common.db import DatabaseBroker, utf8encode, BROKER_TIMEOUT, \
zero_like, DatabaseAlreadyExists, SQLITE_ARG_LIMIT
@ -1388,7 +1388,7 @@ class ContainerBroker(DatabaseBroker):
"""
if not shard_ranges:
return
if not isinstance(shard_ranges, list):
if not isinstance(shard_ranges, (list, ShardRangeList)):
shard_ranges = [shard_ranges]
item_list = []

View File

@ -12,7 +12,7 @@
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import collections
import errno
import json
import time
@ -339,6 +339,106 @@ def process_compactible_shard_sequences(broker, sequences):
finalize_shrinking(broker, acceptor_ranges, shrinking_ranges, timestamp)
def find_paths(shard_ranges):
"""
Returns a list of all continuous paths through the shard ranges. An
individual path may not necessarily span the entire namespace, but it will
span a continuous namespace without gaps.
:param shard_ranges: A list of :class:`~swift.common.utils.ShardRange`.
:return: A list of :class:`~swift.common.utils.ShardRangeList`.
"""
# A node is a point in the namespace that is used as a bound of any shard
# range. Shard ranges form the edges between nodes.
# First build a dict mapping nodes to a list of edges that leave that node
# (in other words, shard ranges whose lower bound equals the node)
node_successors = collections.defaultdict(list)
for shard_range in shard_ranges:
if shard_range.state == ShardRange.SHRINKING:
# shrinking shards are not a viable edge in any path
continue
node_successors[shard_range.lower].append(shard_range)
paths = []
def clone_path(other=None):
# create a new path, possibly cloning another path, and add it to the
# list of all paths through the shards
path = ShardRangeList() if other is None else ShardRangeList(other)
paths.append(path)
return path
# we need to keep track of every path that ends at each node so that when
# we visit the node we can extend those paths, or clones of them, with the
# edges that leave the node
paths_to_node = collections.defaultdict(list)
# visit the nodes in ascending order by name...
for node, edges in sorted(node_successors.items()):
if not edges:
# this node is a dead-end, so there's no path updates to make
continue
if not paths_to_node[node]:
# this is either the first node to be visited, or it has no paths
# leading to it, so we need to start a new path here
paths_to_node[node].append(clone_path([]))
for path_to_node in paths_to_node[node]:
# extend each path that arrives at this node with all of the
# possible edges that leave the node; if more than edge leaves the
# node then we will make clones of the path to the node and extend
# those clones, adding to the collection of all paths though the
# shards
for i, edge in enumerate(edges):
if i == len(edges) - 1:
# the last edge is used to extend the original path to the
# node; there is nothing special about the last edge, but
# doing this last means the original path to the node can
# be cloned for all other edges before being modified here
path = path_to_node
else:
# for all but one of the edges leaving the node we need to
# make a clone the original path
path = clone_path(path_to_node)
# extend the path with the edge
path.append(edge)
# keep track of which node this path now arrives at
paths_to_node[edge.upper].append(path)
return paths
def rank_paths(paths, shard_range_to_span):
"""
Sorts the given list of paths such that the most preferred path is the
first item in the list.
:param paths: A list of :class:`~swift.common.utils.ShardRangeList`.
:param shard_range_to_span: An instance of
:class:`~swift.common.utils.ShardRange` that describes the namespace
that would ideally be spanned by a path. Paths that include this
namespace will be preferred over those that do not.
:return: A sorted list of :class:`~swift.common.utils.ShardRangeList`.
"""
def sort_key(path):
# defines the order of preference for paths through shards
return (
# complete path for the namespace
path.includes(shard_range_to_span),
# most cleaving progress
path.find_lower(lambda sr: sr.state not in (
ShardRange.CLEAVED, ShardRange.ACTIVE)),
# largest object count
path.object_count,
# fewest timestamps
-1 * len(path.timestamps),
# newest timestamp
sorted(path.timestamps)[-1]
)
paths.sort(key=sort_key, reverse=True)
return paths
class CleavingContext(object):
def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None,
last_cleave_to_row=None, cleaving_done=False,

View File

@ -169,13 +169,16 @@ class BaseTestContainerSharding(ReplProbeTest):
else:
conn.delete_object(self.container_name, obj)
def get_container_shard_ranges(self, account=None, container=None):
def get_container_shard_ranges(self, account=None, container=None,
include_deleted=False):
account = account if account else self.account
container = container if container else self.container_to_shard
path = self.internal_client.make_path(account, container)
headers = {'X-Backend-Record-Type': 'shard'}
if include_deleted:
headers['X-Backend-Include-Deleted'] = 'true'
resp = self.internal_client.make_request(
'GET', path + '?format=json', {'X-Backend-Record-Type': 'shard'},
[200])
'GET', path + '?format=json', headers, [200])
return [ShardRange.from_dict(sr) for sr in json.loads(resp.body)]
def direct_get_container_shard_ranges(self, account=None, container=None,
@ -371,6 +374,21 @@ class BaseTestContainerSharding(ReplProbeTest):
expected_state, headers['X-Backend-Sharding-State'])
return [ShardRange.from_dict(sr) for sr in shard_ranges]
def assert_subprocess_success(self, cmd_args):
try:
subprocess.check_output(cmd_args, stderr=subprocess.STDOUT)
except Exception as exc:
# why not 'except CalledProcessError'? because in my py3.6 tests
# the CalledProcessError wasn't caught by that! despite type(exc)
# being a CalledProcessError, isinstance(exc, CalledProcessError)
# is False and the type has a different hash - could be
# related to https://github.com/eventlet/eventlet/issues/413
try:
# assume this is a CalledProcessError
self.fail('%s with output:\n%s' % (exc, exc.output))
except AttributeError:
raise exc
def get_part_and_node_numbers(self, shard_range):
"""Return the partition and node numbers for a shard range."""
part, nodes = self.brain.ring.get_nodes(
@ -2841,7 +2859,8 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.assertEqual(ShardRange.ACTIVE,
broker.get_own_shard_range().state)
def test_manage_shard_ranges_used_poorly(self):
def test_manage_shard_ranges_repair_root(self):
# provoke overlaps in root container and repair
obj_names = self._make_object_names(8)
self.put_objects(obj_names)
@ -2917,18 +2936,12 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
# horribly out of date as more objects are added
self.assert_container_listing(obj_names)
# Let's pretend that some actor in the system has determined that the
# second set of 3 shard ranges (1.*) are correct and the first set of 4
# (0.*) are not desired, so shrink shard ranges 0.*. We've already
# checked they are in cleaved or created state so it's ok to move them
# to shrinking.
# TODO: replace this db manipulation if/when manage_shard_ranges can
# manage shrinking...
for sr in shard_ranges_0:
self.assertTrue(sr.update_state(ShardRange.SHRINKING))
sr.epoch = sr.state_timestamp = Timestamp.now()
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
broker.merge_shard_ranges(shard_ranges_0)
# 'swift-manage-shard-ranges repair' will choose the second set of 3
# shard ranges (1.*) with newer timestamp over the first set of 4
# (0.*), and shrink shard ranges 0.*.
db_file = self.get_db_file(self.brain.part, self.brain.nodes[0])
self.assert_subprocess_success(
['swift-manage-shard-ranges', db_file, 'repair', '--yes'])
# make sure all root replicas now sync their shard ranges
self.replicators.once()
@ -3057,3 +3070,139 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
# Finally, with all root replicas in a consistent state, the listing
# will be be predictably correct
self.assert_container_listing(obj_names)
def test_manage_shard_ranges_repair_shard(self):
# provoke overlaps in a shard container and repair them
obj_names = self._make_object_names(24)
initial_obj_names = obj_names[::2]
# put 12 objects in container
self.put_objects(initial_obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set
self.replicators.once()
# find 3 shard ranges on root nodes[0] and get the root sharded
subprocess.check_output([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '4', '--enable'], stderr=subprocess.STDOUT)
self.replicators.once()
# cleave first two shards
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
# cleave third shard
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
# ensure all shards learn their ACTIVE state from root
self.sharders_once()
for node in (0, 1, 2):
with annotate_failure('node %d' % node):
shard_ranges = self.assert_container_state(
self.brain.nodes[node], 'sharded', 3)
for sr in shard_ranges:
self.assertEqual(ShardRange.ACTIVE, sr.state)
self.assert_container_listing(initial_obj_names)
# add objects to second shard range so it has 8 objects ; this range
# has bounds (obj-0006,obj-0014]
root_shard_ranges = self.get_container_shard_ranges()
self.assertEqual(3, len(root_shard_ranges))
shard_1 = root_shard_ranges[1]
self.assertEqual(obj_names[6], shard_1.lower)
self.assertEqual(obj_names[14], shard_1.upper)
more_obj_names = obj_names[7:15:2]
self.put_objects(more_obj_names)
expected_obj_names = sorted(initial_obj_names + more_obj_names)
self.assert_container_listing(expected_obj_names)
shard_1_part, shard_1_nodes = self.brain.ring.get_nodes(
shard_1.account, shard_1.container)
# find 3 sub-shards on one shard node; use --force-commits to ensure
# the recently PUT objects are included when finding the shard range
# pivot points
subprocess.check_output([
'swift-manage-shard-ranges', '--force-commits',
self.get_db_file(shard_1_part, shard_1_nodes[1], shard_1.account,
shard_1.container),
'find_and_replace', '3', '--enable'],
stderr=subprocess.STDOUT)
# ... and mistakenly find 4 shard ranges on a different shard node :(
subprocess.check_output([
'swift-manage-shard-ranges', '--force-commits',
self.get_db_file(shard_1_part, shard_1_nodes[2], shard_1.account,
shard_1.container),
'find_and_replace', '2', '--enable'],
stderr=subprocess.STDOUT)
# replicate the muddle of shard ranges between shard replicas, merged
# result is:
# '' - 6 shard ACTIVE
# 6 - 8 sub-shard FOUND
# 6 - 9 sub-shard FOUND
# 8 - 10 sub-shard FOUND
# 9 - 12 sub-shard FOUND
# 10 - 12 sub-shard FOUND
# 12 - 14 sub-shard FOUND
# 12 - 14 sub-shard FOUND
# 6 - 14 shard SHARDING
# 14 - '' shard ACTIVE
self.replicators.once()
# try hard to shard the shard...
self.sharders_once(additional_args='--partitions=%s' % shard_1_part)
self.sharders_once(additional_args='--partitions=%s' % shard_1_part)
self.sharders_once(additional_args='--partitions=%s' % shard_1_part)
# sharding hasn't completed and there's overlaps in the shard and root:
# the sub-shards will have been cleaved in the order listed above, but
# sub-shards (10 -12) and one of (12 - 14) will be overlooked because
# the cleave cursor will have moved past their namespace before they
# were yielded by the shard range iterator, so we now have:
# '' - 6 shard ACTIVE
# 6 - 8 sub-shard ACTIVE
# 6 - 9 sub-shard ACTIVE
# 8 - 10 sub-shard ACTIVE
# 10 - 12 sub-shard CREATED
# 9 - 12 sub-shard ACTIVE
# 12 - 14 sub-shard CREATED
# 12 - 14 sub-shard ACTIVE
# 14 - '' shard ACTIVE
sub_shard_ranges = self.get_container_shard_ranges(
shard_1.account, shard_1.container)
self.assertEqual(7, len(sub_shard_ranges), sub_shard_ranges)
root_shard_ranges = self.get_container_shard_ranges()
self.assertEqual(9, len(root_shard_ranges), root_shard_ranges)
self.assertEqual([ShardRange.ACTIVE] * 4 +
[ShardRange.CREATED, ShardRange.ACTIVE] * 2 +
[ShardRange.ACTIVE],
[sr.state for sr in root_shard_ranges])
# fix the overlaps - a set of 3 ACTIVE sub-shards will be chosen and 4
# other sub-shards will be shrunk away; apply the fix at the root
# container
db_file = self.get_db_file(self.brain.part, self.brain.nodes[0])
self.assert_subprocess_success(
['swift-manage-shard-ranges', db_file, 'repair', '--yes'])
self.replicators.once()
self.sharders_once()
self.sharders_once()
# check root now has just 5 shard ranges
root_shard_ranges = self.get_container_shard_ranges()
self.assertEqual(5, len(root_shard_ranges), root_shard_ranges)
self.assertEqual([ShardRange.ACTIVE] * 5,
[sr.state for sr in root_shard_ranges])
# check there are 1 sharded shard and 4 shrunk sub-shard ranges in the
# root (note, shard_1's shard ranges aren't updated once it has sharded
# because the sub-shards report their state to the root; we cannot make
# assertions about shrunk states in shard_1's shard range table)
root_shard_ranges = self.get_container_shard_ranges(
include_deleted=True)
self.assertEqual(10, len(root_shard_ranges), root_shard_ranges)
shrunk_shard_ranges = [sr for sr in root_shard_ranges
if sr.state == ShardRange.SHRUNK]
self.assertEqual(4, len(shrunk_shard_ranges), root_shard_ranges)
self.assertEqual([True] * 4,
[sr.deleted for sr in shrunk_shard_ranges])
sharded_shard_ranges = [sr for sr in root_shard_ranges
if sr.state == ShardRange.SHARDED]
self.assertEqual(1, len(sharded_shard_ranges), root_shard_ranges)
self.assert_container_listing(expected_obj_names)

View File

@ -27,11 +27,12 @@ from swift.common import utils
from swift.common.utils import Timestamp, ShardRange
from swift.container.backend import ContainerBroker
from swift.container.sharder import make_shard_ranges
from test.unit import mock_timestamp_now
from test.unit import mock_timestamp_now, make_timestamp_iter, with_tempdir
class TestManageShardRanges(unittest.TestCase):
def setUp(self):
self.ts_iter = make_timestamp_iter()
self.testdir = os.path.join(mkdtemp(), 'tmp_test_cli_find_shards')
utils.mkdirs(self.testdir)
rmtree(self.testdir)
@ -58,9 +59,40 @@ class TestManageShardRanges(unittest.TestCase):
'object_count': 10},
]
self.overlap_shard_data_1 = [
{'index': 0, 'lower': '', 'upper': 'obj10', 'object_count': 1},
{'index': 1, 'lower': 'obj10', 'upper': 'obj20',
'object_count': 1},
{'index': 2, 'lower': 'obj20', 'upper': 'obj30',
'object_count': 1},
{'index': 3, 'lower': 'obj30', 'upper': 'obj39',
'object_count': 1},
{'index': 4, 'lower': 'obj39', 'upper': 'obj49',
'object_count': 1},
{'index': 5, 'lower': 'obj49', 'upper': 'obj58',
'object_count': 1},
{'index': 6, 'lower': 'obj58', 'upper': 'obj68',
'object_count': 1},
{'index': 7, 'lower': 'obj68', 'upper': 'obj78',
'object_count': 1},
{'index': 8, 'lower': 'obj78', 'upper': 'obj88',
'object_count': 1},
{'index': 9, 'lower': 'obj88', 'upper': '', 'object_count': 1},
]
self.overlap_shard_data_2 = [
{'index': 0, 'lower': '', 'upper': 'obj11', 'object_count': 1},
{'index': 1, 'lower': 'obj11', 'upper': 'obj21',
'object_count': 1},
]
def tearDown(self):
rmtree(os.path.dirname(self.testdir))
def assert_shard_ranges_equal(self, expected, actual):
self.assertEqual([dict(sr) for sr in expected],
[dict(sr) for sr in actual])
def assert_starts_with(self, value, prefix):
self.assertTrue(value.startswith(prefix),
"%r does not start with %r" % (value, prefix))
@ -117,10 +149,11 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
main([db_file, 'find'])
expected = Namespace(conf_file=None,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
rows_per_shard=500000,
subcommand='find',
force_commits=False,
verbose=0)
mocked.assert_called_once_with(mock.ANY, expected)
@ -128,10 +161,11 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
main([db_file, '--config', conf_file, 'find'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
rows_per_shard=500,
subcommand='find',
force_commits=False,
verbose=0)
mocked.assert_called_once_with(mock.ANY, expected)
@ -139,10 +173,11 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('swift.cli.manage_shard_ranges.find_ranges') as mocked:
main([db_file, '--config', conf_file, 'find', '12345'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
rows_per_shard=12345,
subcommand='find',
force_commits=False,
verbose=0)
mocked.assert_called_once_with(mock.ANY, expected)
@ -151,9 +186,10 @@ class TestManageShardRanges(unittest.TestCase):
as mocked:
main([db_file, 'compact'])
expected = Namespace(conf_file=None,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
subcommand='compact',
force_commits=False,
verbose=0,
max_expanding=-1,
max_shrinking=1,
@ -167,9 +203,10 @@ class TestManageShardRanges(unittest.TestCase):
as mocked:
main([db_file, '--config', conf_file, 'compact'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
subcommand='compact',
force_commits=False,
verbose=0,
max_expanding=31,
max_shrinking=33,
@ -197,9 +234,10 @@ class TestManageShardRanges(unittest.TestCase):
as mocked:
main([db_file, '--config', conf_file, 'compact'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
subcommand='compact',
force_commits=False,
verbose=0,
max_expanding=31,
max_shrinking=33,
@ -217,9 +255,10 @@ class TestManageShardRanges(unittest.TestCase):
'--expansion-limit', '3456',
'--shrink-threshold', '1234'])
expected = Namespace(conf_file=conf_file,
container_db=mock.ANY,
path_to_file=mock.ANY,
func=mock.ANY,
subcommand='compact',
force_commits=False,
verbose=0,
max_expanding=11,
max_shrinking=22,
@ -344,7 +383,7 @@ class TestManageShardRanges(unittest.TestCase):
'Metadata:',
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
retiring_db_id = broker.get_info()['id']
@ -391,7 +430,7 @@ class TestManageShardRanges(unittest.TestCase):
# The json.dumps() in py2 produces trailing space, not in py3.
result = [x.rstrip() for x in out.getvalue().splitlines()]
self.assertEqual(expected, result)
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertTrue(broker.set_sharded_state())
@ -420,7 +459,7 @@ class TestManageShardRanges(unittest.TestCase):
' X-Container-Sysmeta-Sharding = True']
self.assertEqual(expected,
[x.rstrip() for x in out.getvalue().splitlines()])
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
def test_show(self):
@ -430,7 +469,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'show'])
expected = [
'Loaded db broker for a/c.',
'Loaded db broker for a/c',
'No shard data found.',
]
self.assertEqual(expected, err.getvalue().splitlines())
@ -447,7 +486,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'show'])
expected = [
'Loaded db broker for a/c.',
'Loaded db broker for a/c',
'Existing shard ranges:',
]
self.assertEqual(expected, err.getvalue().splitlines())
@ -458,7 +497,7 @@ class TestManageShardRanges(unittest.TestCase):
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
main([broker.db_file, 'show', '--includes', 'foo'])
expected = [
'Loaded db broker for a/c.',
'Loaded db broker for a/c',
'Existing shard ranges:',
]
self.assertEqual(expected, err.getvalue().splitlines())
@ -481,7 +520,7 @@ class TestManageShardRanges(unittest.TestCase):
'Run container-replicator to replicate them to other nodes.',
'Use the enable sub-command to enable sharding.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self.assertEqual(
[(data['lower'], data['upper']) for data in self.shard_data],
@ -509,7 +548,7 @@ class TestManageShardRanges(unittest.TestCase):
expected = ["WARNING: invalid shard ranges: ['No shard ranges.'].",
'Aborting.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
# success
@ -531,7 +570,7 @@ class TestManageShardRanges(unittest.TestCase):
now.internal,
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
@ -546,7 +585,7 @@ class TestManageShardRanges(unittest.TestCase):
'No action required.',
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
@ -576,7 +615,7 @@ class TestManageShardRanges(unittest.TestCase):
now.internal,
'Run container-sharder on all nodes to shard the container.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
self._assert_enabled(broker, now)
found_shard_ranges = broker.get_shard_ranges()
@ -597,7 +636,7 @@ class TestManageShardRanges(unittest.TestCase):
self.assertEqual(found_shard_ranges, broker.get_shard_ranges())
expected = ['This will delete existing 10 shard ranges.']
self.assertEqual(expected, out.getvalue().splitlines())
self.assertEqual(['Loaded db broker for a/c.'],
self.assertEqual(['Loaded db broker for a/c'],
err.getvalue().splitlines())
def test_compact_bad_args(self):
@ -710,7 +749,7 @@ class TestManageShardRanges(unittest.TestCase):
[sr.state for sr in updated_ranges])
def test_compact_user_input(self):
# verify user input 'y' or 'n' is respected
# verify user input 'yes' or 'n' is respected
small_ranges = (3, 4, 7)
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
@ -757,7 +796,7 @@ class TestManageShardRanges(unittest.TestCase):
'Once applied to the broker these changes will result in '
'shard range compaction the next time the sharder runs.',
]
if user_input == 'y':
if user_input == 'yes':
expected.extend([
'Updated 2 shard sequences for compaction.',
'Run container-replicator to replicate the changes to '
@ -779,7 +818,7 @@ class TestManageShardRanges(unittest.TestCase):
for i, sr in enumerate(broker_ranges):
self.assertEqual(ShardRange.ACTIVE, sr.state)
broker_ranges = do_compact('y')
broker_ranges = do_compact('yes')
# expect updated shard ranges
shard_ranges[5].lower = shard_ranges[3].lower
shard_ranges[8].lower = shard_ranges[7].lower
@ -1237,3 +1276,329 @@ class TestManageShardRanges(unittest.TestCase):
self.assertEqual(shard_ranges, updated_ranges)
self.assertEqual([ShardRange.SHRINKING] * 8 + [ShardRange.ACTIVE] * 2,
[sr.state for sr in updated_ranges])
def test_repair_not_root(self):
broker = self._make_broker()
shard_ranges = make_shard_ranges(broker, self.shard_data, '.shards_')
broker.merge_shard_ranges(shard_ranges)
# make broker appear to not be a root container
out = StringIO()
err = StringIO()
broker.set_sharding_sysmeta('Quoted-Root', 'not_a/c')
self.assertFalse(broker.is_root_container())
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'repair'])
self.assertEqual(2, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['WARNING: Shard containers cannot be repaired.',
'This command should be used on a root container.'],
out_lines[:2]
)
updated_ranges = broker.get_shard_ranges()
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
def test_repair_no_shard_ranges(self):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
self.assertTrue(broker.is_root_container())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'repair'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['No shards found, nothing to do.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assert_shard_ranges_equal([], updated_ranges)
def test_repair_gaps_one_incomplete_sequence(self):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
with mock_timestamp_now(next(self.ts_iter)):
shard_ranges = make_shard_ranges(
broker, self.shard_data[:-1], '.shards_')
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.is_root_container())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'repair'])
self.assertEqual(1, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Found no complete sequence of shard ranges.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
def test_repair_gaps_overlapping_incomplete_sequences(self):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
with mock_timestamp_now(next(self.ts_iter)):
shard_ranges = make_shard_ranges(
broker, self.shard_data[:-1], '.shards_')
with mock_timestamp_now(next(self.ts_iter)):
# use new time to get distinct shard names
overlap_shard_ranges = make_shard_ranges(
broker,
self.overlap_shard_data_1[:2] + self.overlap_shard_data_1[6:],
'.shards_')
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'repair'])
self.assertEqual(1, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Found no complete sequence of shard ranges.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
expected = sorted(shard_ranges + overlap_shard_ranges,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(expected, updated_ranges)
def test_repair_not_needed(self):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
shard_ranges = make_shard_ranges(
broker, self.shard_data, '.shards_')
broker.merge_shard_ranges(shard_ranges)
self.assertTrue(broker.is_root_container())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'repair'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Found one complete sequence of 10 shard ranges and no '
'overlapping shard ranges.',
'No repairs necessary.'],
out_lines[:2])
updated_ranges = broker.get_shard_ranges()
self.assert_shard_ranges_equal(shard_ranges, updated_ranges)
def _do_test_repair_exits_if_undesirable_state(self, undesirable_state):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
with mock_timestamp_now(next(self.ts_iter)):
shard_ranges = make_shard_ranges(
broker, self.shard_data, '.shards_')
# make one shard be in an undesirable state
shard_ranges[2].update_state(undesirable_state)
with mock_timestamp_now(next(self.ts_iter)):
overlap_shard_ranges_2 = make_shard_ranges(
broker, self.overlap_shard_data_2, '.shards_')
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_2)
self.assertTrue(broker.is_root_container())
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), \
mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'repair'])
self.assertEqual(1, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['WARNING: Found shard ranges in %s state'
% ShardRange.STATES[undesirable_state]], out_lines[:1])
# nothing changed in DB
self.assert_shard_ranges_equal(
sorted(shard_ranges + overlap_shard_ranges_2,
key=ShardRange.sort_key),
broker.get_shard_ranges())
def test_repair_exits_if_sharding_state(self):
self._do_test_repair_exits_if_undesirable_state(ShardRange.SHARDING)
def test_repair_exits_if_shrinking_state(self):
self._do_test_repair_exits_if_undesirable_state(ShardRange.SHRINKING)
def test_repair_one_complete_sequences_one_incomplete(self):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
with mock_timestamp_now(next(self.ts_iter)):
shard_ranges = make_shard_ranges(
broker, self.shard_data, '.shards_')
with mock_timestamp_now(next(self.ts_iter)):
overlap_shard_ranges_2 = make_shard_ranges(
broker, self.overlap_shard_data_2, '.shards_')
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_2)
self.assertTrue(broker.is_root_container())
def do_repair(user_input, ts_now):
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), \
mock.patch('sys.stderr', err), \
mock_timestamp_now(ts_now), \
mock.patch('swift.cli.manage_shard_ranges.input',
return_value=user_input):
ret = main([broker.db_file, 'repair'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Repairs necessary to remove overlapping shard ranges.'],
out_lines[:1])
# user input 'n'
ts_now = next(self.ts_iter)
do_repair('n', ts_now)
updated_ranges = broker.get_shard_ranges()
expected = sorted(
shard_ranges + overlap_shard_ranges_2,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(expected, updated_ranges)
# user input 'yes'
ts_now = next(self.ts_iter)
do_repair('yes', ts_now)
updated_ranges = broker.get_shard_ranges()
for sr in overlap_shard_ranges_2:
sr.update_state(ShardRange.SHRINKING, ts_now)
sr.epoch = ts_now
expected = sorted(
shard_ranges + overlap_shard_ranges_2,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(expected, updated_ranges)
def test_repair_two_complete_sequences_one_incomplete(self):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
with mock_timestamp_now(next(self.ts_iter)):
shard_ranges = make_shard_ranges(
broker, self.shard_data, '.shards_')
with mock_timestamp_now(next(self.ts_iter)):
overlap_shard_ranges_1 = make_shard_ranges(
broker, self.overlap_shard_data_1, '.shards_')
with mock_timestamp_now(next(self.ts_iter)):
overlap_shard_ranges_2 = make_shard_ranges(
broker, self.overlap_shard_data_2, '.shards_')
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1 +
overlap_shard_ranges_2)
self.assertTrue(broker.is_root_container())
out = StringIO()
err = StringIO()
ts_now = next(self.ts_iter)
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err), \
mock_timestamp_now(ts_now):
ret = main([broker.db_file, 'repair', '--yes'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Repairs necessary to remove overlapping shard ranges.'],
out_lines[:1])
updated_ranges = broker.get_shard_ranges()
for sr in overlap_shard_ranges_1 + overlap_shard_ranges_2:
sr.update_state(ShardRange.SHRINKING, ts_now)
sr.epoch = ts_now
expected = sorted(
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(expected, updated_ranges)
@with_tempdir
def test_show_and_analyze(self, tempdir):
broker = self._make_broker()
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
with mock_timestamp_now(next(self.ts_iter)): # t1
shard_ranges = make_shard_ranges(
broker, self.shard_data, '.shards_')
with mock_timestamp_now(next(self.ts_iter)):
overlap_shard_ranges_1 = make_shard_ranges(
broker, self.overlap_shard_data_1, '.shards_')
with mock_timestamp_now(next(self.ts_iter)):
overlap_shard_ranges_2 = make_shard_ranges(
broker, self.overlap_shard_data_2, '.shards_')
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1 +
overlap_shard_ranges_2)
self.assertTrue(broker.is_root_container())
# run show command
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'show'])
self.assertEqual(0, ret)
err_lines = err.getvalue().split('\n')
self.assert_starts_with(err_lines[0], 'Loaded db broker for ')
shard_json = json.loads(out.getvalue())
expected = sorted(
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(
expected, [ShardRange.from_dict(data) for data in shard_json])
# dump data to a file and then run analyze subcommand
shard_file = os.path.join(tempdir, 'shards.json')
with open(shard_file, 'w') as fd:
json.dump(shard_json, fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([shard_file, 'analyze'])
self.assertEqual(0, ret)
self.assertEqual('', err.getvalue())
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Repairs necessary to remove overlapping shard ranges.'],
out_lines[:1])
# no changes made to broker
updated_ranges = broker.get_shard_ranges()
expected = sorted(
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(expected, updated_ranges)
# tweak timestamps to make the preferred path include shards from two
# sets, so that shards to remove have name-timestamps that are also in
# shards to keep
t4 = next(self.ts_iter)
for sr in shard_ranges[:5] + overlap_shard_ranges_1[5:]:
sr.timestamp = t4
broker.merge_shard_ranges(shard_ranges + overlap_shard_ranges_1 +
overlap_shard_ranges_2)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([broker.db_file, 'show'])
self.assertEqual(0, ret)
shard_json = json.loads(out.getvalue())
expected = sorted(
shard_ranges + overlap_shard_ranges_1 + overlap_shard_ranges_2,
key=ShardRange.sort_key)
self.assert_shard_ranges_equal(
expected, [ShardRange.from_dict(data) for data in shard_json])
with open(shard_file, 'w') as fd:
json.dump(shard_json, fd)
out = StringIO()
err = StringIO()
with mock.patch('sys.stdout', out), mock.patch('sys.stderr', err):
ret = main([shard_file, 'analyze'])
self.assertEqual(0, ret)
self.assertEqual('', err.getvalue())
out_lines = out.getvalue().split('\n')
self.assertEqual(
['Repairs necessary to remove overlapping shard ranges.'],
out_lines[:1])

View File

@ -8074,8 +8074,9 @@ class TestShardRange(unittest.TestCase):
self.assertEqual(utils.Timestamp(0), sr.state_timestamp)
def test_state_setter(self):
for state in utils.ShardRange.STATES:
for test_value in (state, str(state)):
for state, state_name in utils.ShardRange.STATES.items():
for test_value in (
state, str(state), state_name, state_name.upper()):
sr = utils.ShardRange('a/test', next(self.ts_iter), 'l', 'u')
sr.state = test_value
actual = sr.state
@ -8124,6 +8125,8 @@ class TestShardRange(unittest.TestCase):
(number, name), utils.ShardRange.resolve_state(name.title()))
self.assertEqual(
(number, name), utils.ShardRange.resolve_state(number))
self.assertEqual(
(number, name), utils.ShardRange.resolve_state(str(number)))
def check_bad_value(value):
with self.assertRaises(ValueError) as cm:
@ -8644,12 +8647,16 @@ class TestShardRange(unittest.TestCase):
class TestShardRangeList(unittest.TestCase):
def setUp(self):
self.ts_iter = make_timestamp_iter()
self.t1 = next(self.ts_iter)
self.t2 = next(self.ts_iter)
self.ts_iter = make_timestamp_iter()
self.shard_ranges = [
utils.ShardRange('a/b', utils.Timestamp.now(), 'a', 'b',
utils.ShardRange('a/b', self.t1, 'a', 'b',
object_count=2, bytes_used=22),
utils.ShardRange('b/c', utils.Timestamp.now(), 'b', 'c',
utils.ShardRange('b/c', self.t2, 'b', 'c',
object_count=4, bytes_used=44),
utils.ShardRange('x/y', utils.Timestamp.now(), 'x', 'y',
utils.ShardRange('c/y', self.t1, 'c', 'y',
object_count=6, bytes_used=66),
]
@ -8729,6 +8736,86 @@ class TestShardRangeList(unittest.TestCase):
sr = utils.ShardRange('a/entire', utils.Timestamp.now(), '', '')
self.assertTrue(srl_entire.includes(sr))
def test_timestamps(self):
srl = ShardRangeList(self.shard_ranges)
self.assertEqual({self.t1, self.t2}, srl.timestamps)
t3 = next(self.ts_iter)
self.shard_ranges[2].timestamp = t3
self.assertEqual({self.t1, self.t2, t3}, srl.timestamps)
srl.pop(0)
self.assertEqual({self.t2, t3}, srl.timestamps)
def test_states(self):
srl = ShardRangeList()
self.assertEqual(set(), srl.states)
srl = ShardRangeList(self.shard_ranges)
self.shard_ranges[0].update_state(
utils.ShardRange.CREATED, next(self.ts_iter))
self.shard_ranges[1].update_state(
utils.ShardRange.CLEAVED, next(self.ts_iter))
self.shard_ranges[2].update_state(
utils.ShardRange.ACTIVE, next(self.ts_iter))
self.assertEqual({utils.ShardRange.CREATED,
utils.ShardRange.CLEAVED,
utils.ShardRange.ACTIVE},
srl.states)
def test_filter(self):
srl = ShardRangeList(self.shard_ranges)
self.assertEqual(self.shard_ranges, srl.filter())
self.assertEqual(self.shard_ranges,
srl.filter(marker='', end_marker=''))
self.assertEqual(self.shard_ranges,
srl.filter(marker=utils.ShardRange.MIN,
end_marker=utils.ShardRange.MAX))
self.assertEqual([], srl.filter(marker=utils.ShardRange.MAX,
end_marker=utils.ShardRange.MIN))
self.assertEqual([], srl.filter(marker=utils.ShardRange.MIN,
end_marker=utils.ShardRange.MIN))
self.assertEqual([], srl.filter(marker=utils.ShardRange.MAX,
end_marker=utils.ShardRange.MAX))
self.assertEqual(self.shard_ranges[:1],
srl.filter(marker='', end_marker='b'))
self.assertEqual(self.shard_ranges[1:3],
srl.filter(marker='b', end_marker='y'))
self.assertEqual([],
srl.filter(marker='y', end_marker='y'))
self.assertEqual([],
srl.filter(marker='y', end_marker='x'))
# includes trumps marker & end_marker
self.assertEqual(self.shard_ranges[0:1],
srl.filter(includes='b', marker='c', end_marker='y'))
self.assertEqual(self.shard_ranges[0:1],
srl.filter(includes='b', marker='', end_marker=''))
self.assertEqual([], srl.filter(includes='z'))
def test_find_lower(self):
srl = ShardRangeList(self.shard_ranges)
self.shard_ranges[0].update_state(
utils.ShardRange.CREATED, next(self.ts_iter))
self.shard_ranges[1].update_state(
utils.ShardRange.CLEAVED, next(self.ts_iter))
self.shard_ranges[2].update_state(
utils.ShardRange.ACTIVE, next(self.ts_iter))
def do_test(states):
return srl.find_lower(lambda sr: sr.state in states)
self.assertEqual(srl.upper,
do_test([utils.ShardRange.FOUND]))
self.assertEqual(self.shard_ranges[0].lower,
do_test([utils.ShardRange.CREATED]))
self.assertEqual(self.shard_ranges[0].lower,
do_test((utils.ShardRange.CREATED,
utils.ShardRange.CLEAVED)))
self.assertEqual(self.shard_ranges[1].lower,
do_test((utils.ShardRange.ACTIVE,
utils.ShardRange.CLEAVED)))
self.assertEqual(self.shard_ranges[2].lower,
do_test([utils.ShardRange.ACTIVE]))
@patch('ctypes.get_errno')
@patch.object(utils, '_sys_posix_fallocate')

View File

@ -39,7 +39,7 @@ from swift.container.backend import ContainerBroker, \
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection
from swift.common.request_helpers import get_reserved_name
from swift.common.utils import Timestamp, encode_timestamps, hash_path, \
ShardRange, make_db_file_path, md5
ShardRange, make_db_file_path, md5, ShardRangeList
from swift.common.storage_policy import POLICIES
import mock
@ -4811,7 +4811,7 @@ class TestContainerBroker(unittest.TestCase):
@with_tempdir
def test_merge_shard_ranges(self, tempdir):
ts = [next(self.ts) for _ in range(13)]
ts = [next(self.ts) for _ in range(14)]
db_path = os.path.join(
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
broker = ContainerBroker(
@ -4907,6 +4907,15 @@ class TestContainerBroker(unittest.TestCase):
self._assert_shard_ranges(
broker, [sr_b_2_2_deleted, sr_c_10_10_deleted])
# merge a ShardRangeList
sr_b_13 = ShardRange('a/c_b', ts[13], lower='a', upper='b',
object_count=10, meta_timestamp=ts[13])
sr_c_13 = ShardRange('a/c_c', ts[13], lower='b', upper='c',
object_count=10, meta_timestamp=ts[13])
broker.merge_shard_ranges(ShardRangeList([sr_c_13, sr_b_13]))
self._assert_shard_ranges(
broker, [sr_b_13, sr_c_13])
@with_tempdir
def test_merge_shard_ranges_state(self, tempdir):
db_path = os.path.join(

View File

@ -42,7 +42,7 @@ from swift.container.sharder import ContainerSharder, sharding_enabled, \
DEFAULT_SHARD_CONTAINER_THRESHOLD, finalize_shrinking, \
find_shrinking_candidates, process_compactible_shard_sequences, \
find_compactible_shard_sequences, is_shrinking_candidate, \
is_sharding_candidate
is_sharding_candidate, find_paths, rank_paths
from swift.common.utils import ShardRange, Timestamp, hash_path, \
encode_timestamps, parse_db_filename, quorum_size, Everything, md5
from test import annotate_failure
@ -6783,3 +6783,193 @@ class TestSharderFunctions(BaseTestSharder):
for object_count in (10, 11):
with annotate_failure('%s %s' % (state, object_count)):
do_check_false(state, object_count)
def test_find_and_rank_whole_path_split(self):
ts_0 = next(self.ts_iter)
ts_1 = next(self.ts_iter)
bounds_0 = (
('', 'f'),
('f', 'k'),
('k', 's'),
('s', 'x'),
('x', ''),
)
bounds_1 = (
('', 'g'),
('g', 'l'),
('l', 't'),
('t', 'y'),
('y', ''),
)
# path with newer timestamp wins
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
timestamp=ts_0)
ranges_1 = self._make_shard_ranges(bounds_1, ShardRange.ACTIVE,
timestamp=ts_1)
paths = find_paths(ranges_0 + ranges_1)
self.assertEqual(2, len(paths))
self.assertIn(ranges_0, paths)
self.assertIn(ranges_1, paths)
own_sr = ShardRange('a/c', Timestamp.now())
self.assertEqual(
[
ranges_1, # complete and newer timestamp
ranges_0, # complete
],
rank_paths(paths, own_sr))
# but object_count trumps matching timestamp
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
timestamp=ts_1, object_count=1)
paths = find_paths(ranges_0 + ranges_1)
self.assertEqual(2, len(paths))
self.assertIn(ranges_0, paths)
self.assertIn(ranges_1, paths)
self.assertEqual(
[
ranges_0, # complete with more objects
ranges_1, # complete
],
rank_paths(paths, own_sr))
def test_find_and_rank_two_sub_path_splits(self):
ts_0 = next(self.ts_iter)
ts_1 = next(self.ts_iter)
ts_2 = next(self.ts_iter)
bounds_0 = (
('', 'a'),
('a', 'm'),
('m', 'p'),
('p', 't'),
('t', 'x'),
('x', 'y'),
('y', ''),
)
bounds_1 = (
('a', 'g'), # split at 'a'
('g', 'l'),
('l', 'm'), # rejoin at 'm'
)
bounds_2 = (
('t', 'y'), # split at 't', rejoin at 'y'
)
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
timestamp=ts_0)
ranges_1 = self._make_shard_ranges(bounds_1, ShardRange.ACTIVE,
timestamp=ts_1, object_count=1)
ranges_2 = self._make_shard_ranges(bounds_2, ShardRange.ACTIVE,
timestamp=ts_2, object_count=1)
# all paths are complete
mix_path_0 = ranges_0[:1] + ranges_1 + ranges_0[2:] # 3 objects
mix_path_1 = ranges_0[:4] + ranges_2 + ranges_0[6:] # 1 object
mix_path_2 = (ranges_0[:1] + ranges_1 + ranges_0[2:4] + ranges_2 +
ranges_0[6:]) # 4 objects
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
self.assertEqual(4, len(paths))
self.assertIn(ranges_0, paths)
self.assertIn(mix_path_0, paths)
self.assertIn(mix_path_1, paths)
self.assertIn(mix_path_2, paths)
own_sr = ShardRange('a/c', Timestamp.now())
self.assertEqual(
[
mix_path_2, # has 4 objects, 3 different timestamps
mix_path_0, # has 3 objects, 2 different timestamps
mix_path_1, # has 1 object, 2 different timestamps
ranges_0, # has 0 objects, 1 timestamp
],
rank_paths(paths, own_sr)
)
def test_find_and_rank_most_cleave_progress(self):
ts_0 = next(self.ts_iter)
ts_1 = next(self.ts_iter)
ts_2 = next(self.ts_iter)
bounds_0 = (
('', 'f'),
('f', 'k'),
('k', 'p'),
('p', '')
)
bounds_1 = (
('', 'g'),
('g', 'l'),
('l', 'q'),
('q', '')
)
bounds_2 = (
('', 'r'),
('r', '')
)
ranges_0 = self._make_shard_ranges(
bounds_0, [ShardRange.CLEAVED] * 3 + [ShardRange.CREATED],
timestamp=ts_1, object_count=1)
ranges_1 = self._make_shard_ranges(
bounds_1, [ShardRange.CLEAVED] * 4,
timestamp=ts_0)
ranges_2 = self._make_shard_ranges(
bounds_2, [ShardRange.CLEAVED, ShardRange.CREATED],
timestamp=ts_2, object_count=1)
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
self.assertEqual(3, len(paths))
own_sr = ShardRange('a/c', Timestamp.now())
self.assertEqual(
[
ranges_1, # cleaved to end
ranges_2, # cleaved to r
ranges_0, # cleaved to p
],
rank_paths(paths, own_sr)
)
ranges_2 = self._make_shard_ranges(
bounds_2, [ShardRange.ACTIVE] * 2,
timestamp=ts_2, object_count=1)
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
self.assertEqual(
[
ranges_2, # active to end, newer timestamp
ranges_1, # cleaved to r
ranges_0, # cleaved to p
],
rank_paths(paths, own_sr)
)
def test_find_and_rank_no_complete_path(self):
ts_0 = next(self.ts_iter)
ts_1 = next(self.ts_iter)
ts_2 = next(self.ts_iter)
bounds_0 = (
('', 'f'),
('f', 'k'),
('k', 'm'),
)
bounds_1 = (
('', 'g'),
('g', 'l'),
('l', 'n'),
)
bounds_2 = (
('', 'l'),
)
ranges_0 = self._make_shard_ranges(bounds_0, ShardRange.ACTIVE,
timestamp=ts_0)
ranges_1 = self._make_shard_ranges(bounds_1, ShardRange.ACTIVE,
timestamp=ts_1, object_count=1)
ranges_2 = self._make_shard_ranges(bounds_2, ShardRange.ACTIVE,
timestamp=ts_2, object_count=1)
mix_path_0 = ranges_2 + ranges_1[2:]
paths = find_paths(ranges_0 + ranges_1 + ranges_2)
self.assertEqual(3, len(paths))
self.assertIn(ranges_0, paths)
self.assertIn(ranges_1, paths)
self.assertIn(mix_path_0, paths)
own_sr = ShardRange('a/c', Timestamp.now())
self.assertEqual(
[
ranges_1, # cleaved to n, one timestamp
mix_path_0, # cleaved to n, has two different timestamps
ranges_0, # cleaved to m
],
rank_paths(paths, own_sr)
)