Merge "Enable shard ranges to be manually shrunk to root container"
This commit is contained in:
commit
1d34f321ac
@ -54,7 +54,13 @@ SHARD_STATS_STATES = [ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
SHARD_LISTING_STATES = SHARD_STATS_STATES + [ShardRange.CLEAVED]
|
||||
SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE, ShardRange.SHARDING]
|
||||
|
||||
# when auditing a shard gets its own shard range, which could be in any state
|
||||
# except FOUND, and any potential acceptors excluding FOUND ranges that may be
|
||||
# unwanted overlaps
|
||||
SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE, ShardRange.SHARDING,
|
||||
ShardRange.SHARDED, ShardRange.SHRINKING,
|
||||
ShardRange.SHRUNK]
|
||||
|
||||
# attribute names in order used when transforming shard ranges from dicts to
|
||||
# tuples and vice-versa
|
||||
@ -1716,7 +1722,10 @@ class ContainerBroker(DatabaseBroker):
|
||||
|
||||
The following alias values are supported: 'listing' maps to all states
|
||||
that are considered valid when listing objects; 'updating' maps to all
|
||||
states that are considered valid for redirecting an object update.
|
||||
states that are considered valid for redirecting an object update;
|
||||
'auditing' maps to all states that are considered valid for a shard
|
||||
container that is updating its own shard range table from a root (this
|
||||
currently maps to all states except FOUND).
|
||||
|
||||
:param states: a list of values each of which may be the name of a
|
||||
state, the number of a state, or an alias
|
||||
@ -1731,6 +1740,8 @@ class ContainerBroker(DatabaseBroker):
|
||||
resolved_states.update(SHARD_LISTING_STATES)
|
||||
elif state == 'updating':
|
||||
resolved_states.update(SHARD_UPDATE_STATES)
|
||||
elif state == 'auditing':
|
||||
resolved_states.update(SHARD_AUDITING_STATES)
|
||||
else:
|
||||
resolved_states.add(ShardRange.resolve_state(state)[0])
|
||||
return resolved_states
|
||||
|
@ -749,10 +749,14 @@ class ContainerController(BaseStorageServer):
|
||||
marker = end_marker = includes = None
|
||||
reverse = False
|
||||
states = params.get('states')
|
||||
fill_gaps = False
|
||||
fill_gaps = include_own = False
|
||||
if states:
|
||||
states = list_from_csv(states)
|
||||
fill_gaps = any(('listing' in states, 'updating' in states))
|
||||
# 'auditing' is used during shard audit; if the shard is
|
||||
# shrinking then it needs to get acceptor shard ranges, which
|
||||
# may be the root container itself, so use include_own
|
||||
include_own = 'auditing' in states
|
||||
try:
|
||||
states = broker.resolve_shard_range_states(states)
|
||||
except ValueError:
|
||||
@ -761,7 +765,8 @@ class ContainerController(BaseStorageServer):
|
||||
req.headers.get('x-backend-include-deleted', False))
|
||||
container_list = broker.get_shard_ranges(
|
||||
marker, end_marker, includes, reverse, states=states,
|
||||
include_deleted=include_deleted, fill_gaps=fill_gaps)
|
||||
include_deleted=include_deleted, fill_gaps=fill_gaps,
|
||||
include_own=include_own)
|
||||
else:
|
||||
resp_headers = gen_resp_headers(info, is_deleted=is_deleted)
|
||||
if is_deleted:
|
||||
|
@ -317,8 +317,7 @@ class CleavingContext(object):
|
||||
def range_done(self, new_cursor):
|
||||
self.ranges_done += 1
|
||||
self.ranges_todo -= 1
|
||||
if new_cursor is not None:
|
||||
self.cursor = new_cursor
|
||||
self.cursor = new_cursor
|
||||
|
||||
def done(self):
|
||||
return all((self.misplaced_done, self.cleaving_done,
|
||||
@ -744,11 +743,18 @@ class ContainerSharder(ContainerReplicator):
|
||||
shard_ranges = own_shard_range_from_root = None
|
||||
if own_shard_range:
|
||||
# Get the root view of the world, at least that part of the world
|
||||
# that overlaps with this shard's namespace
|
||||
# that overlaps with this shard's namespace. The
|
||||
# 'states=auditing' parameter will cause the root to include
|
||||
# its own shard range in the response, which is necessary for the
|
||||
# particular case when this shard should be shrinking to the root
|
||||
# container; when not shrinking to root, but to another acceptor,
|
||||
# the root range should be in sharded state and will not interfere
|
||||
# with cleaving, listing or updating behaviour.
|
||||
shard_ranges = self._fetch_shard_ranges(
|
||||
broker, newest=True,
|
||||
params={'marker': str_to_wsgi(own_shard_range.lower_str),
|
||||
'end_marker': str_to_wsgi(own_shard_range.upper_str)},
|
||||
'end_marker': str_to_wsgi(own_shard_range.upper_str),
|
||||
'states': 'auditing'},
|
||||
include_deleted=True)
|
||||
if shard_ranges:
|
||||
for shard_range in shard_ranges:
|
||||
@ -1394,10 +1400,17 @@ class ContainerSharder(ContainerReplicator):
|
||||
quote(broker.path))
|
||||
return cleaving_context.misplaced_done
|
||||
|
||||
ranges_todo = broker.get_shard_ranges(marker=cleaving_context.marker)
|
||||
shard_ranges = broker.get_shard_ranges(marker=cleaving_context.marker)
|
||||
# Ignore shrinking shard ranges: we never want to cleave objects to a
|
||||
# shrinking shard. Shrinking shard ranges are to be expected in a root;
|
||||
# shrinking shard ranges (other than own shard range) are not normally
|
||||
# expected in a shard but can occur if there is an overlapping shard
|
||||
# range that has been discovered from the root.
|
||||
ranges_todo = [sr for sr in shard_ranges
|
||||
if sr.state != ShardRange.SHRINKING]
|
||||
if cleaving_context.cursor:
|
||||
# always update ranges_todo in case more ranges have been found
|
||||
# since last visit
|
||||
# always update ranges_todo in case shard ranges have changed since
|
||||
# last visit
|
||||
cleaving_context.ranges_todo = len(ranges_todo)
|
||||
self.logger.debug('Continuing to cleave (%s done, %s todo): %s',
|
||||
cleaving_context.ranges_done,
|
||||
@ -1411,36 +1424,36 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
ranges_done = []
|
||||
for shard_range in ranges_todo:
|
||||
if shard_range.state == ShardRange.SHRINKING:
|
||||
# Ignore shrinking shard ranges: we never want to cleave
|
||||
# objects to a shrinking shard. Shrinking shard ranges are to
|
||||
# be expected in a root; shrinking shard ranges (other than own
|
||||
# shard range) are not normally expected in a shard but can
|
||||
# occur if there is an overlapping shard range that has been
|
||||
# discovered from the root.
|
||||
cleaving_context.range_done(None) # don't move the cursor
|
||||
continue
|
||||
elif shard_range.state in (ShardRange.CREATED,
|
||||
ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE):
|
||||
cleave_result = self._cleave_shard_range(
|
||||
broker, cleaving_context, shard_range)
|
||||
if cleave_result == CLEAVE_SUCCESS:
|
||||
ranges_done.append(shard_range)
|
||||
if len(ranges_done) == self.cleave_batch_size:
|
||||
break
|
||||
elif cleave_result == CLEAVE_FAILED:
|
||||
break
|
||||
# else, no errors, but no rows found either. keep going,
|
||||
# and don't count it against our batch size
|
||||
else:
|
||||
if cleaving_context.cleaving_done:
|
||||
# note: there may still be ranges_todo, for example: if this
|
||||
# shard is shrinking and has merged a root shard range in
|
||||
# sharded state along with an active acceptor shard range, but
|
||||
# the root range is irrelevant
|
||||
break
|
||||
|
||||
if len(ranges_done) == self.cleave_batch_size:
|
||||
break
|
||||
|
||||
if shard_range.state not in (ShardRange.CREATED,
|
||||
ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE):
|
||||
self.logger.info('Stopped cleave at unready %s', shard_range)
|
||||
break
|
||||
|
||||
if not ranges_done:
|
||||
# _cleave_shard_range always store()s the context on success; make
|
||||
# sure we *also* do that if we hit a failure right off the bat
|
||||
cleaving_context.store(broker)
|
||||
cleave_result = self._cleave_shard_range(
|
||||
broker, cleaving_context, shard_range)
|
||||
|
||||
if cleave_result == CLEAVE_SUCCESS:
|
||||
ranges_done.append(shard_range)
|
||||
elif cleave_result == CLEAVE_FAILED:
|
||||
break
|
||||
# else: CLEAVE_EMPTY: no errors, but no rows found either. keep
|
||||
# going, and don't count it against our batch size
|
||||
|
||||
# _cleave_shard_range always store()s the context on success; *also* do
|
||||
# that here in case we hit a failure right off the bat or ended loop
|
||||
# with skipped ranges
|
||||
cleaving_context.store(broker)
|
||||
self.logger.debug(
|
||||
'Cleaved %s shard ranges for %s',
|
||||
len(ranges_done), quote(broker.path))
|
||||
|
@ -1450,14 +1450,32 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
def test_shrinking(self):
|
||||
int_client = self.make_internal_client()
|
||||
|
||||
def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards):
|
||||
def check_node_data(node_data, exp_hdrs, exp_obj_count, exp_shards,
|
||||
exp_sharded_root_range=False):
|
||||
hdrs, range_data = node_data
|
||||
self.assert_dict_contains(exp_hdrs, hdrs)
|
||||
self.assert_shard_ranges_contiguous(exp_shards, range_data)
|
||||
self.assert_total_object_count(exp_obj_count, range_data)
|
||||
sharded_root_range = False
|
||||
other_range_data = []
|
||||
for data in range_data:
|
||||
sr = ShardRange.from_dict(data)
|
||||
if (sr.account == self.account and
|
||||
sr.container == self.container_name and
|
||||
sr.state == ShardRange.SHARDED):
|
||||
# only expect one root range
|
||||
self.assertFalse(sharded_root_range, range_data)
|
||||
sharded_root_range = True
|
||||
self.assertEqual(ShardRange.MIN, sr.lower, sr)
|
||||
self.assertEqual(ShardRange.MAX, sr.upper, sr)
|
||||
else:
|
||||
# include active root range in further assertions
|
||||
other_range_data.append(data)
|
||||
self.assertEqual(exp_sharded_root_range, sharded_root_range)
|
||||
self.assert_shard_ranges_contiguous(exp_shards, other_range_data)
|
||||
self.assert_total_object_count(exp_obj_count, other_range_data)
|
||||
|
||||
def check_shard_nodes_data(node_data, expected_state='unsharded',
|
||||
expected_shards=0, exp_obj_count=0):
|
||||
expected_shards=0, exp_obj_count=0,
|
||||
exp_sharded_root_range=False):
|
||||
# checks that shard range is consistent on all nodes
|
||||
root_path = '%s/%s' % (self.account, self.container_name)
|
||||
exp_shard_hdrs = {
|
||||
@ -1469,7 +1487,7 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
with annotate_failure('Node id %s.' % node_id):
|
||||
check_node_data(
|
||||
node_data, exp_shard_hdrs, exp_obj_count,
|
||||
expected_shards)
|
||||
expected_shards, exp_sharded_root_range)
|
||||
hdrs = node_data[0]
|
||||
object_counts.append(int(hdrs['X-Container-Object-Count']))
|
||||
bytes_used.append(int(hdrs['X-Container-Bytes-Used']))
|
||||
@ -1668,10 +1686,13 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
donor = orig_shard_ranges[0]
|
||||
shard_nodes_data = self.direct_get_container_shard_ranges(
|
||||
donor.account, donor.container)
|
||||
# the donor's shard range will have the acceptor's projected stats
|
||||
# the donor's shard range will have the acceptor's projected stats;
|
||||
# donor also has copy of root shard range that will be ignored;
|
||||
# note: expected_shards does not include the sharded root range
|
||||
obj_count, bytes_used = check_shard_nodes_data(
|
||||
shard_nodes_data, expected_state='sharded', expected_shards=1,
|
||||
exp_obj_count=len(second_shard_objects) + 1)
|
||||
exp_obj_count=len(second_shard_objects) + 1,
|
||||
exp_sharded_root_range=True)
|
||||
# but the donor is empty and so reports zero stats
|
||||
self.assertEqual(0, obj_count)
|
||||
self.assertEqual(0, bytes_used)
|
||||
@ -2700,6 +2721,29 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
||||
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
|
||||
self.assert_container_listing(obj_names)
|
||||
|
||||
# Let's pretend that some actor in the system has determined that all
|
||||
# the shard ranges should shrink back to root
|
||||
# TODO: replace this db manipulation if/when manage_shard_ranges can
|
||||
# manage shrinking...
|
||||
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||
shard_ranges = broker.get_shard_ranges()
|
||||
self.assertEqual(2, len(shard_ranges))
|
||||
for sr in shard_ranges:
|
||||
self.assertTrue(sr.update_state(ShardRange.SHRINKING))
|
||||
sr.epoch = sr.state_timestamp = Timestamp.now()
|
||||
own_sr = broker.get_own_shard_range()
|
||||
own_sr.update_state(ShardRange.ACTIVE, state_timestamp=Timestamp.now())
|
||||
broker.merge_shard_ranges(shard_ranges + [own_sr])
|
||||
|
||||
# replicate and run sharders
|
||||
self.replicators.once()
|
||||
self.sharders_once()
|
||||
|
||||
self.assert_container_state(self.brain.nodes[0], 'collapsed', 0)
|
||||
self.assert_container_state(self.brain.nodes[1], 'collapsed', 0)
|
||||
self.assert_container_state(self.brain.nodes[2], 'collapsed', 0)
|
||||
self.assert_container_listing(obj_names)
|
||||
|
||||
def test_manage_shard_ranges_used_poorly(self):
|
||||
obj_names = self._make_object_names(8)
|
||||
self.put_objects(obj_names)
|
||||
|
@ -3974,6 +3974,12 @@ class TestContainerBroker(unittest.TestCase):
|
||||
ContainerBroker.resolve_shard_range_states(
|
||||
['updating', 'listing']))
|
||||
|
||||
self.assertEqual(
|
||||
{ShardRange.CREATED, ShardRange.CLEAVED,
|
||||
ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.SHARDED,
|
||||
ShardRange.SHRINKING, ShardRange.SHRUNK},
|
||||
ContainerBroker.resolve_shard_range_states(['auditing']))
|
||||
|
||||
def check_bad_value(value):
|
||||
with self.assertRaises(ValueError) as cm:
|
||||
ContainerBroker.resolve_shard_range_states(value)
|
||||
@ -4072,7 +4078,7 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertFalse(actual)
|
||||
|
||||
@with_tempdir
|
||||
def test_overloap_shard_range_order(self, tempdir):
|
||||
def test_overlap_shard_range_order(self, tempdir):
|
||||
db_path = os.path.join(tempdir, 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
@ -3178,6 +3178,98 @@ class TestContainerController(unittest.TestCase):
|
||||
|
||||
do_test({'states': 'bad'}, 404)
|
||||
|
||||
def test_GET_shard_ranges_auditing(self):
|
||||
# verify that states=auditing causes own shard range to be included
|
||||
def put_shard_ranges(shard_ranges):
|
||||
headers = {'X-Timestamp': next(self.ts).normal,
|
||||
'X-Backend-Record-Type': 'shard'}
|
||||
body = json.dumps([dict(sr) for sr in shard_ranges])
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', method='PUT', headers=headers, body=body)
|
||||
self.assertEqual(202, req.get_response(self.controller).status_int)
|
||||
|
||||
def do_test(ts_now, extra_params):
|
||||
headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Backend-Include-Deleted': 'True'}
|
||||
params = {'format': 'json'}
|
||||
if extra_params:
|
||||
params.update(extra_params)
|
||||
req = Request.blank('/sda1/p/a/c?format=json', method='GET',
|
||||
headers=headers, params=params)
|
||||
with mock_timestamp_now(ts_now):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
self.assertEqual(resp.content_type, 'application/json')
|
||||
self.assertIn('X-Backend-Record-Type', resp.headers)
|
||||
self.assertEqual('shard', resp.headers['X-Backend-Record-Type'])
|
||||
return resp
|
||||
|
||||
# initially not all shards are shrinking and root is sharded
|
||||
own_sr = ShardRange('a/c', next(self.ts), '', '',
|
||||
state=ShardRange.SHARDED)
|
||||
shard_bounds = [('', 'f', ShardRange.SHRUNK, True),
|
||||
('f', 't', ShardRange.SHRINKING, False),
|
||||
('t', '', ShardRange.ACTIVE, False)]
|
||||
shard_ranges = [
|
||||
ShardRange('.shards_a/_%s' % upper, next(self.ts),
|
||||
lower, upper, state=state, deleted=deleted)
|
||||
for (lower, upper, state, deleted) in shard_bounds]
|
||||
overlap = ShardRange('.shards_a/c_bad', next(self.ts), '', 'f',
|
||||
state=ShardRange.FOUND)
|
||||
|
||||
# create container and PUT some shard ranges
|
||||
headers = {'X-Timestamp': next(self.ts).normal}
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c', method='PUT', headers=headers)
|
||||
self.assertIn(
|
||||
req.get_response(self.controller).status_int, (201, 202))
|
||||
put_shard_ranges(shard_ranges + [own_sr, overlap])
|
||||
|
||||
# do *not* expect own shard range in default case (no states param)
|
||||
ts_now = next(self.ts)
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in [overlap] + shard_ranges]
|
||||
resp = do_test(ts_now, {})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included when states=auditing
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges + [own_sr]]
|
||||
resp = do_test(ts_now, {'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included, marker/end_marker respected
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges[1:2] + [own_sr]]
|
||||
resp = do_test(ts_now, {'marker': 'f', 'end_marker': 't',
|
||||
'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# update shards to all shrinking and root to active
|
||||
shard_ranges[-1].update_state(ShardRange.SHRINKING, next(self.ts))
|
||||
own_sr.update_state(ShardRange.ACTIVE, next(self.ts))
|
||||
put_shard_ranges(shard_ranges + [own_sr])
|
||||
|
||||
# do *not* expect own shard range in default case (no states param)
|
||||
ts_now = next(self.ts)
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in [overlap] + shard_ranges]
|
||||
resp = do_test(ts_now, {})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included when states=auditing
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges[:2] + [own_sr] + shard_ranges[2:]]
|
||||
resp = do_test(ts_now, {'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
# expect own shard range to be included, marker/end_marker respected
|
||||
expected = [dict(sr, last_modified=sr.timestamp.isoformat)
|
||||
for sr in shard_ranges[1:2] + [own_sr]]
|
||||
resp = do_test(ts_now, {'marker': 'f', 'end_marker': 't',
|
||||
'states': 'auditing'})
|
||||
self.assertEqual(expected, json.loads(resp.body))
|
||||
|
||||
def test_GET_auto_record_type(self):
|
||||
# make a container
|
||||
ts_iter = make_timestamp_iter()
|
||||
|
@ -2258,14 +2258,14 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# run cleave - first batch is cleaved, shrinking range doesn't count
|
||||
# towards batch size of 2 but does count towards ranges_done
|
||||
# towards batch size of 2 nor towards ranges_done
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertFalse(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertFalse(context.cleaving_done)
|
||||
self.assertEqual(shard_ranges[2].upper_str, context.cursor)
|
||||
self.assertEqual(3, context.ranges_done)
|
||||
self.assertEqual(2, context.ranges_done)
|
||||
self.assertEqual(2, context.ranges_todo)
|
||||
|
||||
# run cleave - stops at shard range in FOUND state
|
||||
@ -2275,7 +2275,7 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertFalse(context.cleaving_done)
|
||||
self.assertEqual(shard_ranges[3].upper_str, context.cursor)
|
||||
self.assertEqual(4, context.ranges_done)
|
||||
self.assertEqual(3, context.ranges_done)
|
||||
self.assertEqual(1, context.ranges_todo)
|
||||
|
||||
# run cleave - final shard range in CREATED state, cleaving proceeds
|
||||
@ -2288,9 +2288,149 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(shard_ranges[4].upper_str, context.cursor)
|
||||
self.assertEqual(5, context.ranges_done)
|
||||
self.assertEqual(4, context.ranges_done)
|
||||
self.assertEqual(0, context.ranges_todo)
|
||||
|
||||
def test_cleave_shrinking_to_active_root_range(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
# a donor previously shrunk to own...
|
||||
deleted_range = ShardRange(
|
||||
'.shards/other', next(self.ts_iter), 'here', 'there', deleted=True,
|
||||
state=ShardRange.SHRUNK, epoch=next(self.ts_iter))
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', '',
|
||||
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
|
||||
# root is the acceptor...
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([deleted_range, own_shard_range, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# expect cleave to the root
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(root.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done)
|
||||
self.assertEqual(0, context.ranges_todo)
|
||||
|
||||
def test_cleave_shrinking_to_active_acceptor_with_sharded_root_range(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', 'there',
|
||||
state=ShardRange.SHARDING, epoch=next(self.ts_iter))
|
||||
# the intended acceptor...
|
||||
acceptor = ShardRange(
|
||||
'.shards_a/shard_d', next(self.ts_iter), 'here', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
# root range also gets pulled from root during audit...
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.SHARDED, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_shard_range, acceptor, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# sharded root range should always sort after an active acceptor so
|
||||
# expect cleave to acceptor first then cleaving completes
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(acceptor.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done) # cleaved the acceptor
|
||||
self.assertEqual(1, context.ranges_todo) # never reached sharded root
|
||||
|
||||
def test_cleave_shrinking_to_active_root_range_with_active_acceptor(self):
|
||||
# if shrinking shard has both active root and active other acceptor,
|
||||
# verify that shard only cleaves to one of them;
|
||||
# root will sort before acceptor if acceptor.upper==MAX
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', 'there',
|
||||
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
|
||||
# active acceptor with upper bound == MAX
|
||||
acceptor = ShardRange(
|
||||
'.shards/other', next(self.ts_iter), 'here', '', deleted=False,
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
# root is also active
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_shard_range, acceptor, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# expect cleave to the root
|
||||
acceptor.upper = ''
|
||||
acceptor.timestamp = next(self.ts_iter)
|
||||
broker.merge_shard_ranges([acceptor])
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(root.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done)
|
||||
self.assertEqual(1, context.ranges_todo)
|
||||
info = [
|
||||
line for line in self.logger.get_lines_for_level('info')
|
||||
if line.startswith('Replicating new shard container a/c')
|
||||
]
|
||||
self.assertEqual(1, len(info))
|
||||
|
||||
def test_cleave_shrinking_to_active_acceptor_with_active_root_range(self):
|
||||
# if shrinking shard has both active root and active other acceptor,
|
||||
# verify that shard only cleaves to one of them;
|
||||
# root will sort after acceptor if acceptor.upper<MAX
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.put_object(
|
||||
'here_a', next(self.ts_iter), 10, 'text/plain', 'etag_a', 0, 0)
|
||||
own_shard_range = ShardRange(
|
||||
broker.path, next(self.ts_iter), 'here', 'there',
|
||||
state=ShardRange.SHRINKING, epoch=next(self.ts_iter))
|
||||
# active acceptor with upper bound < MAX
|
||||
acceptor = ShardRange(
|
||||
'.shards/other', next(self.ts_iter), 'here', 'where',
|
||||
deleted=False, state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
# root is also active
|
||||
root = ShardRange(
|
||||
'a/c', next(self.ts_iter), '', '',
|
||||
state=ShardRange.ACTIVE, epoch=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_shard_range, acceptor, root])
|
||||
broker.set_sharding_sysmeta('Root', 'a/c')
|
||||
self.assertFalse(broker.is_root_container()) # sanity check
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
|
||||
# expect cleave to the acceptor
|
||||
with self._mock_sharder() as sharder:
|
||||
self.assertTrue(sharder._cleave(broker))
|
||||
context = CleavingContext.load(broker)
|
||||
self.assertTrue(context.misplaced_done)
|
||||
self.assertTrue(context.cleaving_done)
|
||||
self.assertEqual(acceptor.upper_str, context.cursor)
|
||||
self.assertEqual(1, context.ranges_done)
|
||||
self.assertEqual(1, context.ranges_todo)
|
||||
info = [
|
||||
line for line in self.logger.get_lines_for_level('info')
|
||||
if line.startswith('Replicating new shard container .shards/other')
|
||||
]
|
||||
self.assertEqual(1, len(info))
|
||||
|
||||
def _check_complete_sharding(self, account, container, shard_bounds):
|
||||
broker = self._make_sharding_broker(
|
||||
account=account, container=container, shard_bounds=shard_bounds)
|
||||
@ -4499,6 +4639,7 @@ class TestSharder(BaseTestSharder):
|
||||
mock_response = mock.MagicMock()
|
||||
mock_response.headers = {'x-backend-record-type':
|
||||
'shard'}
|
||||
shard_ranges.sort(key=ShardRange.sort_key)
|
||||
mock_response.body = json.dumps(
|
||||
[dict(sr) for sr in shard_ranges])
|
||||
mock_swift.make_request.return_value = mock_response
|
||||
@ -4520,7 +4661,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': marker, 'end_marker': end_marker}
|
||||
params = {'format': 'json', 'marker': marker, 'end_marker': end_marker,
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4591,7 +4733,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'}
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4605,7 +4748,7 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertEqual([own_shard_range],
|
||||
broker.get_shard_ranges(include_own=True))
|
||||
|
||||
# move root shard range to shrinking state
|
||||
# move root version of own shard range to shrinking state
|
||||
root_ts = next(self.ts_iter)
|
||||
self.assertTrue(shard_ranges[1].update_state(ShardRange.SHRINKING,
|
||||
state_timestamp=root_ts))
|
||||
@ -4626,7 +4769,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4659,7 +4803,8 @@ class TestSharder(BaseTestSharder):
|
||||
self.assertFalse(lines[2:])
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
self.assertFalse(broker.is_deleted())
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k'}
|
||||
params = {'format': 'json', 'marker': 'j', 'end_marker': 'k',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4788,7 +4933,8 @@ class TestSharder(BaseTestSharder):
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't'}
|
||||
params = {'format': 'json', 'marker': 'k', 'end_marker': 't',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
@ -4845,6 +4991,61 @@ class TestSharder(BaseTestSharder):
|
||||
self._do_test_audit_shard_container_merge_other_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
|
||||
def _do_test_audit_shard_container_with_root_ranges(self, *args):
|
||||
# shards may merge acceptors and the root range when shrinking; verify
|
||||
# that shard audit is ok with merged ranges
|
||||
def check_audit(own_state, acceptor_state, root_state):
|
||||
broker = self._make_broker(
|
||||
account='.shards_a',
|
||||
container='shard_c_%s' % next(self.ts_iter).normal)
|
||||
broker.set_sharding_sysmeta(*args)
|
||||
own_sr = broker.get_own_shard_range().copy(
|
||||
state=own_state, state_timestamp=next(self.ts_iter),
|
||||
lower='a', upper='b', timestamp=next(self.ts_iter))
|
||||
broker.merge_shard_ranges([own_sr])
|
||||
|
||||
# make acceptor and root ranges that overlap with the shard
|
||||
overlaps = self._make_shard_ranges([('a', 'c'), ('', '')],
|
||||
[acceptor_state, root_state])
|
||||
sharder, mock_swift = self.call_audit_container(
|
||||
broker, [own_sr] + overlaps)
|
||||
expected_headers = {'X-Backend-Record-Type': 'shard',
|
||||
'X-Newest': 'true',
|
||||
'X-Backend-Include-Deleted': 'True',
|
||||
'X-Backend-Override-Deleted': 'true'}
|
||||
params = {'format': 'json', 'marker': 'a', 'end_marker': 'b',
|
||||
'states': 'auditing'}
|
||||
mock_swift.make_request.assert_called_once_with(
|
||||
'GET', '/v1/a/c', expected_headers, acceptable_statuses=(2,),
|
||||
params=params)
|
||||
if own_state in (ShardRange.SHRINKING, ShardRange.SHRUNK):
|
||||
# check acceptor & root are merged into audited shard
|
||||
self.assertEqual(
|
||||
[dict(sr) for sr in overlaps],
|
||||
[dict(sr) for sr in broker.get_shard_ranges()])
|
||||
return sharder
|
||||
|
||||
def assert_ok(own_state, acceptor_state, root_state):
|
||||
sharder = check_audit(own_state, acceptor_state, root_state)
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0}
|
||||
with annotate_failure('with states %s %s %s'
|
||||
% (own_state, acceptor_state, root_state)):
|
||||
self._assert_stats(expected_stats, sharder, 'audit_shard')
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('warning'))
|
||||
self.assertFalse(sharder.logger.get_lines_for_level('error'))
|
||||
|
||||
for own_state in ShardRange.STATES:
|
||||
for acceptor_state in ShardRange.STATES:
|
||||
for root_state in ShardRange.STATES:
|
||||
assert_ok(own_state, acceptor_state, root_state)
|
||||
|
||||
def test_audit_old_style_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Root', 'a/c')
|
||||
|
||||
def test_audit_shard_container_with_root_ranges(self):
|
||||
self._do_test_audit_shard_container_with_root_ranges('Quoted-Root',
|
||||
'a/c')
|
||||
|
||||
def test_audit_deleted_range_in_root_container(self):
|
||||
broker = self._make_broker(account='.shards_a', container='shard_c')
|
||||
broker.set_sharding_sysmeta('Quoted-Root', 'a/c')
|
||||
@ -5638,13 +5839,8 @@ class TestCleavingContext(BaseTestSharder):
|
||||
self.assertEqual(4, ctx.ranges_todo)
|
||||
self.assertEqual('b', ctx.cursor)
|
||||
|
||||
ctx.range_done(None)
|
||||
self.assertEqual(2, ctx.ranges_done)
|
||||
self.assertEqual(3, ctx.ranges_todo)
|
||||
self.assertEqual('b', ctx.cursor)
|
||||
|
||||
ctx.ranges_todo = 9
|
||||
ctx.range_done('c')
|
||||
self.assertEqual(3, ctx.ranges_done)
|
||||
self.assertEqual(2, ctx.ranges_done)
|
||||
self.assertEqual(8, ctx.ranges_todo)
|
||||
self.assertEqual('c', ctx.cursor)
|
||||
|
Loading…
Reference in New Issue
Block a user