Eliminate Scan-Done sysmeta

The scan-done sysmeta is not required since the same info can be
inferred from the shard range table. Also refactor _cleave and change
return value to avoid confusion i.e. return indicates when *all*
cleaving is complete.

Store cleaving_context progress as each chard range is cleaved rather
than at end of a batch.

Change-Id: I90be4ada8fefbf3dda12fdf5dca46fb405a95a6a
This commit is contained in:
Alistair Coles 2018-04-16 08:42:08 +01:00
parent ff5b0b44b5
commit 066f4465a7
5 changed files with 139 additions and 165 deletions

View File

@ -733,14 +733,6 @@ majority quorum again to make sure it is still the scanner, and if so will write
the found ranges to the shard_ranges table. The other nodes will get these the found ranges to the shard_ranges table. The other nodes will get these
ranges via container replication. ranges via container replication.
Once the scanner node has found all ranges, it'll set some metadata to say it
has::
X-Container-Sysmeta-Shard-Scan-Done
This way the other nodes will know when they've finished sharding. And then
it'll start sharding itself.
**Comments/Discussion:** **Comments/Discussion:**
- Currently there is no check to see if the current scanner has stalled, died - Currently there is no check to see if the current scanner has stalled, died

View File

@ -189,7 +189,6 @@ def _replace_shard_ranges(broker, args, shard_data, timeout=None):
broker.merge_shard_ranges(shard_ranges) broker.merge_shard_ranges(shard_ranges)
# Update metadata *after* merge, just like in the sharder # Update metadata *after* merge, just like in the sharder
broker.update_sharding_info({'Scan-Done': 'True'})
print('Injected %d shard ranges.' % len(shard_ranges)) print('Injected %d shard ranges.' % len(shard_ranges))
print('Run container-replicator to replicate them to other nodes.') print('Run container-replicator to replicate them to other nodes.')

View File

@ -889,7 +889,6 @@ class ContainerSharder(ContainerReplicator):
if broker.set_sharding_state(): if broker.set_sharding_state():
state = SHARDING state = SHARDING
elif is_leader: elif is_leader:
broker.update_sharding_info({'Scan-Done': 'False'})
if broker.set_sharding_state(): if broker.set_sharding_state():
state = SHARDING state = SHARDING
else: else:
@ -899,11 +898,10 @@ class ContainerSharder(ContainerReplicator):
% (own_shard_range.state_text, broker.path)) % (own_shard_range.state_text, broker.path))
if state == SHARDING: if state == SHARDING:
num_found = 0 if is_leader:
scan_complete = config_true_value( num_found = self._find_shard_ranges(broker)
broker.get_sharding_info('Scan-Done')) else:
if is_leader and not scan_complete: num_found = 0
scan_complete, num_found = self._find_shard_ranges(broker)
# create shard containers for newly found ranges # create shard containers for newly found ranges
num_created = self._create_shard_containers(broker) num_created = self._create_shard_containers(broker)
@ -916,7 +914,7 @@ class ContainerSharder(ContainerReplicator):
# always try to cleave any pending shard ranges # always try to cleave any pending shard ranges
cleave_complete = self._cleave(broker) cleave_complete = self._cleave(broker)
if scan_complete and cleave_complete: if cleave_complete:
self.logger.debug('Completed cleaving of %s', broker.path) self.logger.debug('Completed cleaving of %s', broker.path)
if self._complete_sharding(broker): if self._complete_sharding(broker):
state = SHARDED state = SHARDED
@ -1063,27 +1061,30 @@ class ContainerSharder(ContainerReplicator):
:return: a tuple of (success, num of shard ranges found) where success :return: a tuple of (success, num of shard ranges found) where success
is True if the last shard range has been found, False otherwise. is True if the last shard range has been found, False otherwise.
""" """
self.logger.info('Started scan for shard ranges on %s/%s', own_shard_range = broker.get_own_shard_range()
broker.account, broker.container) shard_ranges = broker.get_shard_ranges()
if shard_ranges and shard_ranges[-1].upper >= own_shard_range.upper:
self.logger.debug('Scan already completed for %s', broker.path)
return 0
self.logger.info('Starting scan for shard ranges on %s', broker.path)
self._increment_stat('scanned', 'attempted') self._increment_stat('scanned', 'attempted')
start = time.time() start = time.time()
shard_data, last_found = broker.find_shard_ranges( shard_data, last_found = broker.find_shard_ranges(
self.split_size, limit=self.scanner_batch_size, self.split_size, limit=self.scanner_batch_size,
existing_ranges=broker.get_shard_ranges()) existing_ranges=shard_ranges)
elapsed = time.time() - start elapsed = time.time() - start
if not shard_data: if not shard_data:
if last_found: if last_found:
self.logger.info("Already found all shard ranges") self.logger.info("Already found all shard ranges")
# set scan done in case it's missing
broker.update_sharding_info({'Scan-Done': 'True'})
self._increment_stat('scanned', 'success', statsd=True) self._increment_stat('scanned', 'success', statsd=True)
else: else:
# we didn't find anything # we didn't find anything
self.logger.warning("No shard ranges found") self.logger.warning("No shard ranges found")
self._increment_stat('scanned', 'failure', statsd=True) self._increment_stat('scanned', 'failure', statsd=True)
return last_found, 0 return 0
# TODO: if we bring back leader election, this is about the spot where # TODO: if we bring back leader election, this is about the spot where
# we should confirm we're still the scanner # we should confirm we're still the scanner
@ -1098,11 +1099,9 @@ class ContainerSharder(ContainerReplicator):
self._max_stat('scanned', 'max_time', round(elapsed / num_found, 3)) self._max_stat('scanned', 'max_time', round(elapsed / num_found, 3))
if last_found: if last_found:
# We've found the last shard range, so mark that in metadata
broker.update_sharding_info({'Scan-Done': 'True'})
self.logger.info("Final shard range reached.") self.logger.info("Final shard range reached.")
self._increment_stat('scanned', 'success', statsd=True) self._increment_stat('scanned', 'success', statsd=True)
return last_found, num_found return num_found
def _create_shard_containers(self, broker): def _create_shard_containers(self, broker):
# Create shard containers that are ready to receive redirected object # Create shard containers that are ready to receive redirected object
@ -1256,32 +1255,97 @@ class ContainerSharder(ContainerReplicator):
# no need to change namespace or stats # no need to change namespace or stats
acceptor.update_state(ShardRange.ACTIVE, acceptor.update_state(ShardRange.ACTIVE,
state_timestamp=Timestamp.now()) state_timestamp=Timestamp.now())
# Set Scan-Done so that the donor will not scan itself and will
# transition to SHARDED state once it has cleaved to the acceptor;
# TODO: if the PUT request successfully write the Scan-Done sysmeta
# but fails to write the acceptor shard range, then the shard is
# left with Scan-Done set, and if it was to then grow in size to
# eventually need sharding, we don't want Scan-Done prematurely
# set. This is an argument for basing 'scan done condition' on the
# existence of a shard range whose upper >= shard own range, rather
# than using sysmeta Scan-Done.
headers = {'X-Container-Sysmeta-Shard-Scan-Done': True}
# Now send a copy of the expanded acceptor, with an updated # Now send a copy of the expanded acceptor, with an updated
# timestamp, to the donor container. This forces the donor to # timestamp, to the donor container. This forces the donor to
# asynchronously cleave its entire contents to the acceptor. The # asynchronously cleave its entire contents to the acceptor and
# donor will pass a deleted copy of its own shard range and the # delete itself. The donor will pass its own deleted shard range to
# newer expended acceptor shard range to the acceptor when # the acceptor when cleaving. Subsequent updates from the donor or
# cleaving. Subsequent updates from the acceptor will then update # the acceptor will then update the root to have the deleted donor
# the root to have the expanded acceptor namespace and deleted # shard range.
# donor shard range. Once cleaved, the donor will also update the self._send_shard_ranges(
# root directly with its deleted own shard range and the expanded donor.account, donor.container, [donor, acceptor])
# acceptor shard range.
self._send_shard_ranges(donor.account, donor.container, def _cleave_shard_range(self, broker, cleaving_context, shard_range):
[donor, acceptor], headers=headers) self.logger.info("Cleaving '%s': %r from row %s",
broker.path, shard_range,
cleaving_context.last_cleave_to_row)
self._increment_stat('cleaved', 'attempted')
start = time.time()
policy_index = broker.storage_policy_index
try:
# use force here because we may want to update existing shard
# metadata timestamps
shard_part, shard_broker, node_id = self._get_shard_broker(
shard_range, broker.root_path, policy_index, force=True)
except DeviceUnavailable as duex:
self.logger.warning(str(duex))
self._increment_stat('cleaved', 'failure', statsd=True)
return False
brokers = broker.get_brokers()
with shard_broker.sharding_lock():
for source_broker in brokers:
for objects, info in self.yield_objects(
source_broker, shard_range, policy_index,
since_row=cleaving_context.last_cleave_to_row):
shard_broker.merge_items(objects)
own_shard_range = broker.get_own_shard_range()
if shard_range.includes(own_shard_range):
# When shrinking, include deleted own (donor) shard range in
# the replicated db so that when acceptor next updates root it
# will atomically update its namespace *and* delete the donor.
# Don't do this when sharding a shard because the donor
# namespace should not be deleted until all shards are cleaved.
if own_shard_range.update_state(ShardRange.SHARDED):
own_shard_range.set_deleted()
broker.merge_shard_ranges([own_shard_range])
shard_broker.merge_shard_ranges([own_shard_range])
elif shard_range.state == ShardRange.CREATED:
# The shard range object stats may have changed since the shard
# range was found, so update with stats of objects actually
# copied to the shard broker. Only do this the first time each
# shard range is cleaved.
info = shard_broker.get_info()
shard_range.update_meta(
info['object_count'], info['bytes_used'])
shard_range.update_state(ShardRange.CLEAVED)
shard_broker.merge_shard_ranges([shard_range])
self.logger.info(
'Replicating new shard container %s for %s',
shard_broker.path, shard_broker.get_own_shard_range())
success, responses = self._replicate_object(
shard_part, shard_broker.db_file, node_id)
quorum = quorum_size(self.ring.replica_count)
if not success and responses.count(True) < quorum:
# break because we don't want to progress the cleave cursor
# until each shard range has been successfully cleaved
self.logger.warning(
'Failed to sufficiently replicate cleaved shard %s for %s',
shard_range, broker.path)
self._increment_stat('cleaved', 'failure', statsd=True)
return False
elapsed = round(time.time() - start, 3)
self._min_stat('cleaved', 'min_time', elapsed)
self._max_stat('cleaved', 'max_time', elapsed)
broker.merge_shard_ranges([shard_range])
cleaving_context.cursor = str(shard_range.upper)
if shard_range.upper >= own_shard_range.upper:
# cleaving complete
cleaving_context.cleaving_done = True
cleaving_context.store(broker)
self.logger.info(
'Cleaved %s for shard range %s.', broker.path, shard_range)
self._increment_stat('cleaved', 'success', statsd=True)
return True
def _cleave(self, broker): def _cleave(self, broker):
# Returns True if misplaced objects have been moved and all *available* # Returns True if misplaced objects have been moved and the entire
# shard ranges have been successfully cleaved, False otherwise # container namespace has been successfully cleaved, False otherwise
state = broker.get_db_state() state = broker.get_db_state()
if state == SHARDED: if state == SHARDED:
self.logger.debug('Passing over already sharded container %s/%s', self.logger.debug('Passing over already sharded container %s/%s',
@ -1307,104 +1371,33 @@ class ContainerSharder(ContainerReplicator):
broker.path) broker.path)
return cleaving_context.misplaced_done return cleaving_context.misplaced_done
ranges_todo = broker.get_shard_ranges(
marker=cleaving_context.marker,
states=[ShardRange.CREATED, ShardRange.CLEAVED, ShardRange.ACTIVE])
if not ranges_todo:
self.logger.debug('No shard ranges to cleave for %s', broker.path)
return cleaving_context.misplaced_done
if cleaving_context.cursor: if cleaving_context.cursor:
self.logger.debug('Continuing to cleave %s', broker.path) self.logger.debug('Continuing to cleave %s', broker.path)
else: else:
self.logger.debug('Starting to cleave %s', broker.path) self.logger.debug('Starting to cleave %s', broker.path)
cleaving_context.start() cleaving_context.start()
own_shard_range = broker.get_own_shard_range() ranges_todo = broker.get_shard_ranges(marker=cleaving_context.marker)
ranges_done = [] ranges_done = []
policy_index = broker.storage_policy_index
brokers = broker.get_brokers()
for shard_range in ranges_todo[:self.shard_batch_size]: for shard_range in ranges_todo[:self.shard_batch_size]:
self.logger.info("Cleaving '%s': %r from row %s", if shard_range.state == ShardRange.FOUND:
broker.path, shard_range,
cleaving_context.last_cleave_to_row)
self._increment_stat('cleaved', 'attempted')
start = time.time()
try:
# use force here because we may want to update existing shard
# metadata timestamps
shard_part, shard_broker, node_id = self._get_shard_broker(
shard_range, broker.root_path, policy_index, force=True)
except DeviceUnavailable as duex:
self.logger.warning(str(duex))
self._increment_stat('cleaved', 'failure', statsd=True)
return False
with shard_broker.sharding_lock():
for source_broker in brokers:
for objects, info in self.yield_objects(
source_broker, shard_range, policy_index,
since_row=cleaving_context.last_cleave_to_row):
shard_broker.merge_items(objects)
if shard_range.includes(own_shard_range):
# When shrinking, include deleted own (donor) shard range in
# the replicated db so that when acceptor next updates root it
# will atomically update its namespace *and* delete the donor.
# Don't do this when sharding a shard because the donor
# namespace should not be deleted until all shards are cleaved.
if own_shard_range.update_state(ShardRange.SHARDED):
own_shard_range.set_deleted()
broker.merge_shard_ranges([own_shard_range])
shard_broker.merge_shard_ranges([own_shard_range])
elif shard_range.state == ShardRange.CREATED:
# The shard range object stats may have changed since the shard
# range was found, so update with stats of objects actually
# copied to the shard broker. Only do this the first time each
# shard range is cleaved.
info = shard_broker.get_info()
shard_range.update_meta(
info['object_count'], info['bytes_used'])
shard_range.update_state(ShardRange.CLEAVED)
shard_broker.merge_shard_ranges([shard_range])
self.logger.info(
'Replicating new shard container %s for %s',
shard_broker.path, shard_broker.get_own_shard_range())
success, responses = self._replicate_object(
shard_part, shard_broker.db_file, node_id)
quorum = quorum_size(self.ring.replica_count)
if not success and responses.count(True) < quorum:
# break because we don't want to progress the cleave cursor
# until each shard range has been successfully cleaved
self.logger.warning(
'Failed to sufficiently replicate cleaved shard %s for %s',
shard_range, broker.path)
break break
elif shard_range.state in (ShardRange.CREATED,
ranges_done.append(shard_range) ShardRange.CLEAVED,
ShardRange.ACTIVE):
self.logger.info( if self._cleave_shard_range(
'Cleaved %s for shard range %s.', broker.path, shard_range) broker, cleaving_context, shard_range):
self._increment_stat('cleaved', 'success', statsd=True) ranges_done.append(shard_range)
elapsed = round(time.time() - start, 3) else:
self._min_stat('cleaved', 'min_time', elapsed) break
self._max_stat('cleaved', 'max_time', elapsed) else:
self.logger.warning('Unexpected shard range state for cleave',
if ranges_done: shard_range.state)
broker.merge_shard_ranges(ranges_done) break
cleaving_context.cursor = str(ranges_done[-1].upper) self.logger.debug(
if ranges_done[-1].upper >= own_shard_range.upper: 'Cleaved %s shard ranges for %s', len(ranges_done), broker.path)
# cleaving complete
cleaving_context.cleaving_done = True
cleaving_context.store(broker)
else:
self.logger.warning('No cleaving progress made: %s', broker.path)
return (cleaving_context.misplaced_done and return (cleaving_context.misplaced_done and
(len(ranges_done) == len(ranges_todo))) cleaving_context.cleaving_done)
def _complete_sharding(self, broker): def _complete_sharding(self, broker):
# TODO: wrap this in a lock # TODO: wrap this in a lock

View File

@ -1073,8 +1073,7 @@ class TestContainerSharding(ReplProbeTest):
# nodes on which sharder has not run are still in unsharded state # nodes on which sharder has not run are still in unsharded state
# but have had shard ranges replicated to them # but have had shard ranges replicated to them
exp_obj_count = len(obj_names) exp_obj_count = len(obj_names)
exp_hdrs = {'X-Container-Sysmeta-Shard-Scan-Done': 'True', exp_hdrs = {'X-Backend-Sharding-State': str(UNSHARDED),
'X-Backend-Sharding-State': str(UNSHARDED),
'X-Container-Object-Count': str(exp_obj_count)} 'X-Container-Object-Count': str(exp_obj_count)}
node_id = self.brain.node_numbers[1] - 1 node_id = self.brain.node_numbers[1] - 1
check_node_data( check_node_data(
@ -1260,8 +1259,7 @@ class TestContainerSharding(ReplProbeTest):
# check root container # check root container
root_nodes_data = self.direct_get_container_shard_ranges() root_nodes_data = self.direct_get_container_shard_ranges()
self.assertEqual(3, len(root_nodes_data)) self.assertEqual(3, len(root_nodes_data))
exp_hdrs = {'X-Container-Sysmeta-Shard-Scan-Done': 'True', exp_hdrs = {'X-Backend-Sharding-State': str(COLLAPSED),
'X-Backend-Sharding-State': str(COLLAPSED),
# just the alpha object # just the alpha object
'X-Container-Object-Count': '1'} 'X-Container-Object-Count': '1'}
for node_id, node_data in root_nodes_data.items(): for node_id, node_data in root_nodes_data.items():

View File

@ -574,7 +574,7 @@ class TestSharder(BaseTestSharder):
total_shard_stats = {'object_count': 0, 'bytes_used': 0} total_shard_stats = {'object_count': 0, 'bytes_used': 0}
# run cleave - no shard ranges, nothing happens # run cleave - no shard ranges, nothing happens
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker)) self.assertFalse(sharder._cleave(broker))
context = CleavingContext.load(broker) context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done) self.assertTrue(context.misplaced_done)
@ -594,7 +594,7 @@ class TestSharder(BaseTestSharder):
self.assertTrue(broker.set_sharding_state()) self.assertTrue(broker.set_sharding_state())
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker)) self.assertFalse(sharder._cleave(broker))
context = CleavingContext.load(broker) context = CleavingContext.load(broker)
self.assertTrue(context.misplaced_done) self.assertTrue(context.misplaced_done)
@ -616,7 +616,7 @@ class TestSharder(BaseTestSharder):
shard_ranges[0].update_state(ShardRange.CREATED) shard_ranges[0].update_state(ShardRange.CREATED)
broker.merge_shard_ranges(shard_ranges[:1]) broker.merge_shard_ranges(shard_ranges[:1])
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker)) self.assertFalse(sharder._cleave(broker))
expected = {'attempted': 1, 'success': 1, 'failure': 0, expected = {'attempted': 1, 'success': 1, 'failure': 0,
'min_time': mock.ANY, 'max_time': mock.ANY} 'min_time': mock.ANY, 'max_time': mock.ANY}
@ -758,7 +758,7 @@ class TestSharder(BaseTestSharder):
# run cleave again - should process the fourth range # run cleave again - should process the fourth range
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker)) self.assertFalse(sharder._cleave(broker))
expected = {'attempted': 1, 'success': 1, 'failure': 0, expected = {'attempted': 1, 'success': 1, 'failure': 0,
'min_time': mock.ANY, 'max_time': mock.ANY} 'min_time': mock.ANY, 'max_time': mock.ANY}
@ -811,7 +811,7 @@ class TestSharder(BaseTestSharder):
# run cleave - should be a no-op, all existing ranges have been cleaved # run cleave - should be a no-op, all existing ranges have been cleaved
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker)) self.assertFalse(sharder._cleave(broker))
self.assertEqual(SHARDING, broker.get_db_state()) self.assertEqual(SHARDING, broker.get_db_state())
sharder._replicate_object.assert_not_called() sharder._replicate_object.assert_not_called()
@ -821,7 +821,6 @@ class TestSharder(BaseTestSharder):
shard_ranges[4].update_state(ShardRange.ACTIVE) shard_ranges[4].update_state(ShardRange.ACTIVE)
shard_ranges[4].update_meta(2, 15) shard_ranges[4].update_meta(2, 15)
broker.merge_shard_ranges(shard_ranges[4:]) broker.merge_shard_ranges(shard_ranges[4:])
broker.update_sharding_info({'Scan-Done': 'True'})
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
self.assertTrue(sharder._cleave(broker)) self.assertTrue(sharder._cleave(broker))
@ -2569,10 +2568,9 @@ class TestSharder(BaseTestSharder):
def _check_find_shard_ranges_none_found(self, broker, objects): def _check_find_shard_ranges_none_found(self, broker, objects):
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertGreater(sharder.split_size, len(objects)) self.assertGreater(sharder.split_size, len(objects))
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertFalse(last_found)
self.assertFalse(broker.get_shard_ranges()) self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
'found': 0, 'min_time': mock.ANY, 'found': 0, 'min_time': mock.ANY,
@ -2581,10 +2579,9 @@ class TestSharder(BaseTestSharder):
self.assertGreaterEqual(stats['max_time'], stats['min_time']) self.assertGreaterEqual(stats['max_time'], stats['min_time'])
with self._mock_sharder(conf={'shard_container_size': 200}) as sharder: with self._mock_sharder(conf={'shard_container_size': 200}) as sharder:
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(sharder.split_size, len(objects)) self.assertEqual(sharder.split_size, len(objects))
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertFalse(last_found)
self.assertFalse(broker.get_shard_ranges()) self.assertFalse(broker.get_shard_ranges())
expected_stats = {'attempted': 1, 'success': 0, 'failure': 1, expected_stats = {'attempted': 1, 'success': 0, 'failure': 1,
'found': 0, 'min_time': mock.ANY, 'found': 0, 'min_time': mock.ANY,
@ -2614,18 +2611,17 @@ class TestSharder(BaseTestSharder):
] ]
self._assert_shard_ranges_equal(expected_ranges, self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges()) broker.get_shard_ranges())
self.assertEqual('True', broker.get_sharding_info('Scan-Done'))
# first invocation finds both ranges
broker, objects = self._setup_find_ranges( broker, objects = self._setup_find_ranges(
account, cont, lower, upper) account, cont, lower, upper)
with self._mock_sharder(conf={'shard_container_size': 199, with self._mock_sharder(conf={'shard_container_size': 199,
'auto_create_account_prefix': '.int_'} 'auto_create_account_prefix': '.int_'}
) as sharder: ) as sharder:
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(99, sharder.split_size) self.assertEqual(99, sharder.split_size)
self.assertEqual(2, num_found) self.assertEqual(2, num_found)
self.assertTrue(last_found)
check_ranges() check_ranges()
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': mock.ANY, 'found': 2, 'min_time': mock.ANY,
@ -2633,15 +2629,15 @@ class TestSharder(BaseTestSharder):
stats = self._assert_stats(expected_stats, sharder, 'scanned') stats = self._assert_stats(expected_stats, sharder, 'scanned')
self.assertGreaterEqual(stats['max_time'], stats['min_time']) self.assertGreaterEqual(stats['max_time'], stats['min_time'])
# second invocation finds none
with self._mock_sharder(conf={'shard_container_size': 199, with self._mock_sharder(conf={'shard_container_size': 199,
'auto_create_account_prefix': '.int_'} 'auto_create_account_prefix': '.int_'}
) as sharder: ) as sharder:
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertTrue(last_found)
self.assertEqual(2, len(broker.get_shard_ranges())) self.assertEqual(2, len(broker.get_shard_ranges()))
check_ranges() check_ranges()
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, expected_stats = {'attempted': 0, 'success': 0, 'failure': 0,
'found': 0, 'min_time': mock.ANY, 'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY} 'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned') stats = self._assert_stats(expected_stats, sharder, 'scanned')
@ -2674,14 +2670,12 @@ class TestSharder(BaseTestSharder):
conf={'shard_container_size': 90, conf={'shard_container_size': 90,
'shard_scanner_batch_size': 2}) as sharder: 'shard_scanner_batch_size': 2}) as sharder:
with mock_timestamp_now(now): with mock_timestamp_now(now):
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(45, sharder.split_size) self.assertEqual(45, sharder.split_size)
self.assertEqual(2, num_found) self.assertEqual(2, num_found)
self.assertFalse(last_found)
self.assertEqual(2, len(broker.get_shard_ranges())) self.assertEqual(2, len(broker.get_shard_ranges()))
self._assert_shard_ranges_equal(expected_ranges[:2], self._assert_shard_ranges_equal(expected_ranges[:2],
broker.get_shard_ranges()) broker.get_shard_ranges())
self.assertIsNone(broker.get_sharding_info('Scan-Done'))
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 2, 'min_time': mock.ANY, 'found': 2, 'min_time': mock.ANY,
'max_time': mock.ANY} 'max_time': mock.ANY}
@ -2693,13 +2687,11 @@ class TestSharder(BaseTestSharder):
'shard_scanner_batch_size': 2} 'shard_scanner_batch_size': 2}
) as sharder: ) as sharder:
with mock_timestamp_now(now): with mock_timestamp_now(now):
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(1, num_found) self.assertEqual(1, num_found)
self.assertTrue(last_found)
self.assertEqual(3, len(broker.get_shard_ranges())) self.assertEqual(3, len(broker.get_shard_ranges()))
self._assert_shard_ranges_equal(expected_ranges, self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges()) broker.get_shard_ranges())
self.assertEqual('True', broker.get_sharding_info('Scan-Done'))
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0, expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 1, 'min_time': mock.ANY, 'found': 1, 'min_time': mock.ANY,
'max_time': mock.ANY} 'max_time': mock.ANY}
@ -2711,14 +2703,12 @@ class TestSharder(BaseTestSharder):
'shard_scanner_batch_size': 2} 'shard_scanner_batch_size': 2}
) as sharder: ) as sharder:
sharder._send_shard_ranges = mock.MagicMock(return_value=True) sharder._send_shard_ranges = mock.MagicMock(return_value=True)
last_found, num_found = sharder._find_shard_ranges(broker) num_found = sharder._find_shard_ranges(broker)
self.assertEqual(0, num_found) self.assertEqual(0, num_found)
self.assertTrue(last_found)
self.assertEqual(3, len(broker.get_shard_ranges())) self.assertEqual(3, len(broker.get_shard_ranges()))
self._assert_shard_ranges_equal(expected_ranges, self._assert_shard_ranges_equal(expected_ranges,
broker.get_shard_ranges()) broker.get_shard_ranges())
self.assertEqual('True', broker.get_sharding_info('Scan-Done')) expected_stats = {'attempted': 0, 'success': 0, 'failure': 0,
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
'found': 0, 'min_time': mock.ANY, 'found': 0, 'min_time': mock.ANY,
'max_time': mock.ANY} 'max_time': mock.ANY}
stats = self._assert_stats(expected_stats, sharder, 'scanned') stats = self._assert_stats(expected_stats, sharder, 'scanned')
@ -3050,6 +3040,8 @@ class TestSharder(BaseTestSharder):
own_sr.epoch = epoch own_sr.epoch = epoch
broker.merge_shard_ranges([own_sr]) broker.merge_shard_ranges([own_sr])
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
sharder.logger = debug_logger()
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
# we're not testing rest of the process here so prevent any # we're not testing rest of the process here so prevent any
# attempt to progress shard range states # attempt to progress shard range states