Merge "sharder: always set state to CLEAVED after cleaving"
This commit is contained in:
@@ -1660,6 +1660,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
|
def _cleave_shard_broker(self, broker, cleaving_context, shard_range,
|
||||||
own_shard_range, shard_broker, put_timestamp,
|
own_shard_range, shard_broker, put_timestamp,
|
||||||
shard_part, node_id):
|
shard_part, node_id):
|
||||||
|
result = CLEAVE_SUCCESS
|
||||||
start = time.time()
|
start = time.time()
|
||||||
# only cleave from the retiring db - misplaced objects handler will
|
# only cleave from the retiring db - misplaced objects handler will
|
||||||
# deal with any objects in the fresh db
|
# deal with any objects in the fresh db
|
||||||
@@ -1685,15 +1686,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
# This was just created; don't need to replicate this
|
# This was just created; don't need to replicate this
|
||||||
# SR because there was nothing there. So cleanup and
|
# SR because there was nothing there. So cleanup and
|
||||||
# remove the shard_broker from its hand off location.
|
# remove the shard_broker from its hand off location.
|
||||||
self.delete_db(shard_broker)
|
|
||||||
cleaving_context.range_done(shard_range.upper_str)
|
|
||||||
if shard_range.upper >= own_shard_range.upper:
|
|
||||||
# cleaving complete
|
|
||||||
cleaving_context.cleaving_done = True
|
|
||||||
cleaving_context.store(broker)
|
|
||||||
# Because nothing was here we wont count it in the shard
|
# Because nothing was here we wont count it in the shard
|
||||||
# batch count.
|
# batch count.
|
||||||
return CLEAVE_EMPTY
|
result = CLEAVE_EMPTY
|
||||||
# Else, it wasn't newly created by us, and
|
# Else, it wasn't newly created by us, and
|
||||||
# we don't know what's in it or why. Let it get
|
# we don't know what's in it or why. Let it get
|
||||||
# replicated and counted in the batch count.
|
# replicated and counted in the batch count.
|
||||||
@@ -1742,40 +1737,46 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
shard_broker.merge_shard_ranges(shard_range)
|
shard_broker.merge_shard_ranges(shard_range)
|
||||||
replication_quorum = self.shard_replication_quorum
|
replication_quorum = self.shard_replication_quorum
|
||||||
|
|
||||||
self.logger.info(
|
if result == CLEAVE_EMPTY:
|
||||||
'Replicating new shard container %s for %s',
|
self.delete_db(shard_broker)
|
||||||
quote(shard_broker.path), own_shard_range)
|
else: # result == CLEAVE_SUCCESS:
|
||||||
|
self.logger.info(
|
||||||
|
'Replicating new shard container %s for %s',
|
||||||
|
quote(shard_broker.path), own_shard_range)
|
||||||
|
|
||||||
success, responses = self._replicate_object(
|
success, responses = self._replicate_object(
|
||||||
shard_part, shard_broker.db_file, node_id)
|
shard_part, shard_broker.db_file, node_id)
|
||||||
|
|
||||||
replication_successes = responses.count(True)
|
replication_successes = responses.count(True)
|
||||||
if (not success and (not responses or
|
if (not success and (not responses or
|
||||||
replication_successes < replication_quorum)):
|
replication_successes < replication_quorum)):
|
||||||
# insufficient replication or replication not even attempted;
|
# insufficient replication or replication not even attempted;
|
||||||
# break because we don't want to progress the cleave cursor
|
# break because we don't want to progress the cleave cursor
|
||||||
# until each shard range has been successfully cleaved
|
# until each shard range has been successfully cleaved
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Failed to sufficiently replicate cleaved shard %s for %s: '
|
'Failed to sufficiently replicate cleaved shard %s for %s:'
|
||||||
'%s successes, %s required.', shard_range, quote(broker.path),
|
' %s successes, %s required.', shard_range,
|
||||||
replication_successes, replication_quorum)
|
quote(broker.path),
|
||||||
self._increment_stat('cleaved', 'failure', statsd=True)
|
replication_successes, replication_quorum)
|
||||||
return CLEAVE_FAILED
|
self._increment_stat('cleaved', 'failure', statsd=True)
|
||||||
|
result = CLEAVE_FAILED
|
||||||
|
else:
|
||||||
|
elapsed = round(time.time() - start, 3)
|
||||||
|
self._min_stat('cleaved', 'min_time', elapsed)
|
||||||
|
self._max_stat('cleaved', 'max_time', elapsed)
|
||||||
|
self.logger.info(
|
||||||
|
'Cleaved %s for shard range %s in %gs.',
|
||||||
|
quote(broker.path), shard_range, elapsed)
|
||||||
|
self._increment_stat('cleaved', 'success', statsd=True)
|
||||||
|
|
||||||
elapsed = round(time.time() - start, 3)
|
if result in (CLEAVE_SUCCESS, CLEAVE_EMPTY):
|
||||||
self._min_stat('cleaved', 'min_time', elapsed)
|
broker.merge_shard_ranges(shard_range)
|
||||||
self._max_stat('cleaved', 'max_time', elapsed)
|
cleaving_context.range_done(shard_range.upper_str)
|
||||||
broker.merge_shard_ranges(shard_range)
|
if shard_range.upper >= own_shard_range.upper:
|
||||||
cleaving_context.range_done(shard_range.upper_str)
|
# cleaving complete
|
||||||
if shard_range.upper >= own_shard_range.upper:
|
cleaving_context.cleaving_done = True
|
||||||
# cleaving complete
|
cleaving_context.store(broker)
|
||||||
cleaving_context.cleaving_done = True
|
return result
|
||||||
cleaving_context.store(broker)
|
|
||||||
self.logger.info(
|
|
||||||
'Cleaved %s for shard range %s in %gs.',
|
|
||||||
quote(broker.path), shard_range, elapsed)
|
|
||||||
self._increment_stat('cleaved', 'success', statsd=True)
|
|
||||||
return CLEAVE_SUCCESS
|
|
||||||
|
|
||||||
def _cleave_shard_range(self, broker, cleaving_context, shard_range,
|
def _cleave_shard_range(self, broker, cleaving_context, shard_range,
|
||||||
own_shard_range):
|
own_shard_range):
|
||||||
|
@@ -1332,6 +1332,81 @@ class TestContainerSharding(BaseAutoContainerSharding):
|
|||||||
def test_sharded_listing_with_replicators(self):
|
def test_sharded_listing_with_replicators(self):
|
||||||
self._test_sharded_listing(run_replicators=True)
|
self._test_sharded_listing(run_replicators=True)
|
||||||
|
|
||||||
|
def test_listing_under_populated_replica(self):
|
||||||
|
# the leader node and one other primary have all the objects and will
|
||||||
|
# cleave to 4 shard ranges, but the third primary only has 1 object in
|
||||||
|
# the final shard range
|
||||||
|
obj_names = self._make_object_names(2 * self.max_shard_size)
|
||||||
|
self.brain.servers.stop(number=self.brain.node_numbers[2])
|
||||||
|
self.put_objects(obj_names)
|
||||||
|
self.brain.servers.start(number=self.brain.node_numbers[2])
|
||||||
|
subset_obj_names = [obj_names[-1]]
|
||||||
|
self.put_objects(subset_obj_names)
|
||||||
|
self.brain.servers.stop(number=self.brain.node_numbers[2])
|
||||||
|
|
||||||
|
# sanity check: the first 2 primaries will list all objects
|
||||||
|
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
|
||||||
|
|
||||||
|
# Run sharder on the fully populated nodes, starting with the leader
|
||||||
|
client.post_container(self.url, self.admin_token, self.container_name,
|
||||||
|
headers={'X-Container-Sharding': 'on'})
|
||||||
|
self.sharders.once(number=self.brain.node_numbers[0],
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
self.sharders.once(number=self.brain.node_numbers[1],
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
# Verify that the first 2 primary nodes have cleaved the first batch of
|
||||||
|
# 2 shard ranges
|
||||||
|
broker = self.get_broker(self.brain.part, self.brain.nodes[0])
|
||||||
|
self.assertEqual('sharding', broker.get_db_state())
|
||||||
|
shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
||||||
|
self.assertLengthEqual(shard_ranges, 4)
|
||||||
|
self.assertEqual([ShardRange.CLEAVED, ShardRange.CLEAVED,
|
||||||
|
ShardRange.CREATED, ShardRange.CREATED],
|
||||||
|
[sr['state'] for sr in shard_ranges])
|
||||||
|
self.assertEqual(
|
||||||
|
{False},
|
||||||
|
set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)]))
|
||||||
|
|
||||||
|
# listing is complete (from the fully populated primaries at least);
|
||||||
|
# the root serves the listing parts for the last 2 shard ranges which
|
||||||
|
# are not yet cleaved
|
||||||
|
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
|
||||||
|
|
||||||
|
# Run the sharder on the under-populated node to get it fully
|
||||||
|
# cleaved.
|
||||||
|
self.brain.servers.start(number=self.brain.node_numbers[2])
|
||||||
|
Manager(['container-replicator']).once(
|
||||||
|
number=self.brain.node_numbers[2])
|
||||||
|
self.sharders.once(number=self.brain.node_numbers[2],
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
broker = self.get_broker(self.brain.part, self.brain.nodes[2])
|
||||||
|
self.assertEqual('sharded', broker.get_db_state())
|
||||||
|
shard_ranges = [dict(sr) for sr in broker.get_shard_ranges()]
|
||||||
|
self.assertLengthEqual(shard_ranges, 4)
|
||||||
|
self.assertEqual([ShardRange.ACTIVE, ShardRange.ACTIVE,
|
||||||
|
ShardRange.ACTIVE, ShardRange.ACTIVE],
|
||||||
|
[sr['state'] for sr in shard_ranges])
|
||||||
|
self.assertEqual(
|
||||||
|
{True, False},
|
||||||
|
set([ctx.done() for ctx, _ in CleavingContext.load_all(broker)]))
|
||||||
|
|
||||||
|
# Get a consistent view of shard range states then check listing
|
||||||
|
Manager(['container-replicator']).once(
|
||||||
|
number=self.brain.node_numbers[2])
|
||||||
|
# oops, the listing is incomplete because the last 2 listing parts are
|
||||||
|
# now served by the under-populated shard ranges.
|
||||||
|
self.assert_container_listing(
|
||||||
|
obj_names[:self.max_shard_size] + subset_obj_names,
|
||||||
|
req_hdrs={'x-newest': 'true'})
|
||||||
|
|
||||||
|
# but once another replica has completed cleaving the listing is
|
||||||
|
# complete again
|
||||||
|
self.sharders.once(number=self.brain.node_numbers[1],
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
self.assert_container_listing(obj_names, req_hdrs={'x-newest': 'true'})
|
||||||
|
|
||||||
def test_async_pendings(self):
|
def test_async_pendings(self):
|
||||||
obj_names = self._make_object_names(self.max_shard_size * 2)
|
obj_names = self._make_object_names(self.max_shard_size * 2)
|
||||||
|
|
||||||
|
@@ -1663,6 +1663,9 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertEqual(cleaving_context.ranges_todo, 0)
|
self.assertEqual(cleaving_context.ranges_todo, 0)
|
||||||
self.assertTrue(cleaving_context.cleaving_done)
|
self.assertTrue(cleaving_context.cleaving_done)
|
||||||
|
|
||||||
|
self.assertEqual([ShardRange.CLEAVED] * 3,
|
||||||
|
[sr.state for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self):
|
def test_cleave_root_empty_db_with_pre_existing_shard_db_handoff(self):
|
||||||
broker = self._make_broker()
|
broker = self._make_broker()
|
||||||
broker.enable_sharding(Timestamp.now())
|
broker.enable_sharding(Timestamp.now())
|
||||||
@@ -1695,6 +1698,10 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertEqual(cleaving_context.ranges_todo, 2)
|
self.assertEqual(cleaving_context.ranges_todo, 2)
|
||||||
self.assertFalse(cleaving_context.cleaving_done)
|
self.assertFalse(cleaving_context.cleaving_done)
|
||||||
|
|
||||||
|
self.assertEqual(
|
||||||
|
[ShardRange.CLEAVED, ShardRange.CREATED, ShardRange.CREATED],
|
||||||
|
[sr.state for sr in broker.get_shard_ranges()])
|
||||||
|
|
||||||
def test_cleave_shard_range_no_own_shard_range(self):
|
def test_cleave_shard_range_no_own_shard_range(self):
|
||||||
# create an unsharded broker that has shard ranges but no
|
# create an unsharded broker that has shard ranges but no
|
||||||
# own_shard_range, verify that it does not cleave...
|
# own_shard_range, verify that it does not cleave...
|
||||||
|
Reference in New Issue
Block a user