Merge "sharding: Update probe test to verify CleavingContext cleanup"
This commit is contained in:
commit
d059505aba
@ -220,6 +220,10 @@ class CleavingContext(object):
|
|||||||
yield 'ranges_done', self.ranges_done
|
yield 'ranges_done', self.ranges_done
|
||||||
yield 'ranges_todo', self.ranges_todo
|
yield 'ranges_todo', self.ranges_todo
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return '%s(%s)' % (self.__class__.__name__, ', '.join(
|
||||||
|
'%s=%r' % prop for prop in self))
|
||||||
|
|
||||||
def _encode(cls, value):
|
def _encode(cls, value):
|
||||||
if value is not None and six.PY2 and isinstance(value, six.text_type):
|
if value is not None and six.PY2 and isinstance(value, six.text_type):
|
||||||
return value.encode('utf-8')
|
return value.encode('utf-8')
|
||||||
@ -750,8 +754,8 @@ class ContainerSharder(ContainerReplicator):
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
def _audit_cleave_contexts(self, broker):
|
def _audit_cleave_contexts(self, broker):
|
||||||
|
now = Timestamp.now()
|
||||||
for context, last_mod in CleavingContext.load_all(broker):
|
for context, last_mod in CleavingContext.load_all(broker):
|
||||||
now = Timestamp.now()
|
|
||||||
if Timestamp(last_mod).timestamp + self.reclaim_age < \
|
if Timestamp(last_mod).timestamp + self.reclaim_age < \
|
||||||
now.timestamp:
|
now.timestamp:
|
||||||
context.delete(broker)
|
context.delete(broker)
|
||||||
|
@ -28,6 +28,7 @@ from swift.common.direct_client import DirectClientException
|
|||||||
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
|
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
|
||||||
quorum_size, config_true_value, Timestamp
|
quorum_size, config_true_value, Timestamp
|
||||||
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
|
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING
|
||||||
|
from swift.container.sharder import CleavingContext
|
||||||
from swiftclient import client, get_auth, ClientException
|
from swiftclient import client, get_auth, ClientException
|
||||||
|
|
||||||
from swift.proxy.controllers.base import get_cache_key
|
from swift.proxy.controllers.base import get_cache_key
|
||||||
@ -616,6 +617,8 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
|
self.assert_shard_ranges_contiguous(2, orig_root_shard_ranges)
|
||||||
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
|
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE],
|
||||||
[sr['state'] for sr in orig_root_shard_ranges])
|
[sr['state'] for sr in orig_root_shard_ranges])
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual([], contexts) # length check
|
||||||
self.direct_delete_container(expect_failure=True)
|
self.direct_delete_container(expect_failure=True)
|
||||||
|
|
||||||
self.assertLengthEqual(found['normal_dbs'], 2)
|
self.assertLengthEqual(found['normal_dbs'], 2)
|
||||||
@ -633,6 +636,9 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
orig_root_shard_ranges, shard_ranges,
|
orig_root_shard_ranges, shard_ranges,
|
||||||
excludes=['meta_timestamp', 'state', 'state_timestamp'])
|
excludes=['meta_timestamp', 'state', 'state_timestamp'])
|
||||||
|
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual([], contexts) # length check
|
||||||
|
|
||||||
if run_replicators:
|
if run_replicators:
|
||||||
Manager(['container-replicator']).once()
|
Manager(['container-replicator']).once()
|
||||||
# replication doesn't change the db file names
|
# replication doesn't change the db file names
|
||||||
@ -663,6 +669,9 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assertGreaterEqual(updated.meta_timestamp,
|
self.assertGreaterEqual(updated.meta_timestamp,
|
||||||
orig['meta_timestamp'])
|
orig['meta_timestamp'])
|
||||||
|
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual([], contexts) # length check
|
||||||
|
|
||||||
# Check that entire listing is available
|
# Check that entire listing is available
|
||||||
headers, actual_listing = self.assert_container_listing(obj_names)
|
headers, actual_listing = self.assert_container_listing(obj_names)
|
||||||
# ... and check some other container properties
|
# ... and check some other container properties
|
||||||
@ -753,6 +762,16 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
first_lower=orig_root_shard_ranges[0]['lower'],
|
first_lower=orig_root_shard_ranges[0]['lower'],
|
||||||
last_upper=orig_root_shard_ranges[0]['upper'])
|
last_upper=orig_root_shard_ranges[0]['upper'])
|
||||||
|
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual(len(contexts), 1)
|
||||||
|
context, _lm = contexts[0]
|
||||||
|
self.assertIs(context.cleaving_done, False)
|
||||||
|
self.assertIs(context.misplaced_done, True)
|
||||||
|
self.assertEqual(context.ranges_done, 2)
|
||||||
|
self.assertEqual(context.ranges_todo, 1)
|
||||||
|
self.assertEqual(context.max_row,
|
||||||
|
self.max_shard_size * 3 // 2)
|
||||||
|
|
||||||
# but third replica still has no idea it should be sharding
|
# but third replica still has no idea it should be sharding
|
||||||
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
self.assertLengthEqual(found_for_shard['normal_dbs'], 3)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -780,16 +799,36 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
ShardRange.SHARDING, broker.get_own_shard_range().state)
|
||||||
self.assertFalse(broker.get_shard_ranges())
|
self.assertFalse(broker.get_shard_ranges())
|
||||||
|
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual([], contexts) # length check
|
||||||
|
|
||||||
# ...until sub-shard ranges are replicated from another shard replica;
|
# ...until sub-shard ranges are replicated from another shard replica;
|
||||||
# there may also be a sub-shard replica missing so run replicators on
|
# there may also be a sub-shard replica missing so run replicators on
|
||||||
# all nodes to fix that if necessary
|
# all nodes to fix that if necessary
|
||||||
self.brain.servers.start(number=shard_1_nodes[2])
|
self.brain.servers.start(number=shard_1_nodes[2])
|
||||||
self.replicators.once()
|
self.replicators.once()
|
||||||
|
|
||||||
|
# Now that the replicators have all run, third replica sees cleaving
|
||||||
|
# contexts for the first two
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual(len(contexts), 2)
|
||||||
|
|
||||||
# now run sharder again on third replica
|
# now run sharder again on third replica
|
||||||
self.sharders.once(
|
self.sharders.once(
|
||||||
number=shard_1_nodes[2],
|
number=shard_1_nodes[2],
|
||||||
additional_args='--partitions=%s' % shard_1_part)
|
additional_args='--partitions=%s' % shard_1_part)
|
||||||
|
sharding_broker = ContainerBroker(found_for_shard['normal_dbs'][2])
|
||||||
|
self.assertEqual('sharding', sharding_broker.get_db_state())
|
||||||
|
|
||||||
|
broker_id = broker.get_info()['id']
|
||||||
|
# Old, unsharded DB doesn't have the context...
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual(len(contexts), 2)
|
||||||
|
self.assertNotIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||||
|
# ...but the sharding one does
|
||||||
|
contexts = list(CleavingContext.load_all(sharding_broker))
|
||||||
|
self.assertEqual(len(contexts), 3)
|
||||||
|
self.assertIn(broker_id, [ctx[0].ref for ctx in contexts])
|
||||||
|
|
||||||
# check original first shard range state and sub-shards - all replicas
|
# check original first shard range state and sub-shards - all replicas
|
||||||
# should now be in consistent state
|
# should now be in consistent state
|
||||||
@ -865,6 +904,8 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assert_container_listing(['alpha'] + more_obj_names + obj_names)
|
self.assert_container_listing(['alpha'] + more_obj_names + obj_names)
|
||||||
# Run sharders again so things settle.
|
# Run sharders again so things settle.
|
||||||
self.run_sharders(shard_1)
|
self.run_sharders(shard_1)
|
||||||
|
# Also run replicators to settle cleaving contexts
|
||||||
|
self.replicators.once()
|
||||||
|
|
||||||
# check original first shard range shards
|
# check original first shard range shards
|
||||||
for db_file in found_for_shard['shard_dbs']:
|
for db_file in found_for_shard['shard_dbs']:
|
||||||
@ -875,6 +916,11 @@ class TestContainerSharding(BaseTestContainerSharding):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
[ShardRange.ACTIVE] * 3,
|
[ShardRange.ACTIVE] * 3,
|
||||||
[sr.state for sr in broker.get_shard_ranges()])
|
[sr.state for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
|
# Make sure our cleaving contexts got cleaned up
|
||||||
|
contexts = list(CleavingContext.load_all(broker))
|
||||||
|
self.assertEqual([], contexts)
|
||||||
|
|
||||||
# check root shard ranges
|
# check root shard ranges
|
||||||
root_shard_ranges = self.direct_get_container_shard_ranges()
|
root_shard_ranges = self.direct_get_container_shard_ranges()
|
||||||
for node, (hdrs, root_shards) in root_shard_ranges.items():
|
for node, (hdrs, root_shards) in root_shard_ranges.items():
|
||||||
|
Loading…
x
Reference in New Issue
Block a user