diff --git a/swift/container/replicator.py b/swift/container/replicator.py index 07c6f8bd36..411d297736 100644 --- a/swift/container/replicator.py +++ b/swift/container/replicator.py @@ -20,7 +20,8 @@ from eventlet import Timeout from random import choice from swift.container.sync_store import ContainerSyncStore -from swift.container.backend import ContainerBroker, DATADIR, SHARDED +from swift.container.backend import ContainerBroker, DATADIR, SHARDED, \ + merge_shards from swift.container.reconciler import ( MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index, get_reconciler_container_name, get_row_to_q_entry_translator) @@ -31,6 +32,35 @@ from swift.common.http import is_success from swift.common.utils import Timestamp, majority_size, get_db_files +def check_merge_own_shard_range(shards, broker, logger, source): + """ + If broker has own_shard_range *with an epoch* then filter out an + own_shard_range *without an epoch*, and log a warning about it. + + :param shards: a list of candidate ShardRanges to merge + :param broker: a ContainerBroker + :param logger: a logger + :param source: string to log as source of shards + :return: a list of ShardRanges to actually merge + """ + # work-around for https://bugs.launchpad.net/swift/+bug/1980451 + own_sr = broker.get_own_shard_range() + if own_sr.epoch is None: + return shards + to_merge = [] + for shard in shards: + if shard['name'] == own_sr.name and not shard['epoch']: + shard_copy = dict(shard) + new_content = merge_shards(shard_copy, dict(own_sr)) + if new_content and shard_copy['epoch'] is None: + logger.warning( + 'Ignoring remote osr w/o epoch, own_sr: %r, remote_sr: %r,' + ' source: %s', dict(own_sr), shard, source) + continue + to_merge.append(shard) + return to_merge + + class ContainerReplicator(db_replicator.Replicator): server_type = 'container' brokerclass = ContainerBroker @@ -138,8 +168,10 @@ class ContainerReplicator(db_replicator.Replicator): with Timeout(self.node_timeout): response = http.replicate('get_shard_ranges') if response and is_success(response.status): - broker.merge_shard_ranges(json.loads( - response.data.decode('ascii'))) + shards = json.loads(response.data.decode('ascii')) + shards = check_merge_own_shard_range( + shards, broker, self.logger, '%s%s' % (http.host, http.path)) + broker.merge_shard_ranges(shards) def find_local_handoff_for_part(self, part): """ @@ -394,11 +426,15 @@ class ContainerReplicatorRpc(db_replicator.ReplicatorRpc): def _post_rsync_then_merge_hook(self, existing_broker, new_broker): # Note the following hook will need to change to using a pointer and # limit in the future. - new_broker.merge_shard_ranges( - existing_broker.get_all_shard_range_data()) + shards = existing_broker.get_all_shard_range_data() + shards = check_merge_own_shard_range( + shards, new_broker, self.logger, 'rsync') + new_broker.merge_shard_ranges(shards) def merge_shard_ranges(self, broker, args): - broker.merge_shard_ranges(args[0]) + shards = check_merge_own_shard_range( + args[0], broker, self.logger, 'repl_req') + broker.merge_shard_ranges(shards) return HTTPAccepted() def get_shard_ranges(self, broker, args): diff --git a/test/probe/test_sharder.py b/test/probe/test_sharder.py index cb2727e97f..c8aa662955 100644 --- a/test/probe/test_sharder.py +++ b/test/probe/test_sharder.py @@ -28,8 +28,8 @@ from swift.common.header_key_dict import HeaderKeyDict from swift.common.internal_client import UnexpectedResponse from swift.common.manager import Manager from swift.common.memcached import MemcacheRing -from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \ - quorum_size, config_true_value, Timestamp, md5, Namespace +from swift.common.utils import ShardRange, parse_db_filename, quorum_size, \ + config_true_value, Timestamp, md5, Namespace from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \ SHARDED from swift.container.sharder import CleavingContext, ContainerSharder @@ -244,9 +244,10 @@ class BaseTestContainerSharding(ReplProbeTest): def get_db_file(self, part, node, account=None, container=None): container_dir, container_hash = self.get_storage_dir( part, node, account=account, container=container) - db_file = os.path.join(container_dir, container_hash + '.db') - self.assertTrue(get_db_files(db_file)) # sanity check - return db_file + for f in os.listdir(container_dir): + path = os.path.join(container_dir, f) + if path.endswith('.db'): + return path def get_broker(self, part, node, account=None, container=None): return ContainerBroker( @@ -259,10 +260,13 @@ class BaseTestContainerSharding(ReplProbeTest): shard_part, shard_nodes[node_index], shard_range.account, shard_range.container) - def categorize_container_dir_content(self, account=None, container=None): + def categorize_container_dir_content(self, account=None, container=None, + more_nodes=False): account = account or self.brain.account container = container or self.container_name part, nodes = self.brain.ring.get_nodes(account, container) + if more_nodes: + nodes.extend(self.brain.ring.get_more_nodes(part)) storage_dirs = [ self.get_storage_dir(part, node, account=account, container=container)[0] @@ -4047,6 +4051,229 @@ class TestManagedContainerSharding(BaseTestContainerSharding): broker.get_shard_usage()['object_count']) self.assertFalse(broker.is_deleted()) + def test_handoff_replication_does_not_cause_reset_epoch(self): + obj_names = self._make_object_names(100) + self.put_objects(obj_names) + + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + + # run replicators first time to get sync points set + self.replicators.once() + + # sanity check: we don't have nearly enough objects for this to shard + # automatically + self.sharders_once_non_auto( + number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 0) + + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '50', '--enable', + '--minimum-shard-size', '40']) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) + + # "Run container-replicator to replicate them to other nodes." + self.replicators.once() + # "Run container-sharder on all nodes to shard the container." + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + + # Everybody's settled + self.assert_container_state(self.brain.nodes[0], 'sharded', 2) + self.assert_container_state(self.brain.nodes[1], 'sharded', 2) + self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_listing(obj_names) + + # now lets put the container again and make sure it lands on a handoff + self.brain.stop_primary_half() + self.brain.put_container(policy_index=int(self.policy)) + self.brain.start_primary_half() + + dir_content = self.categorize_container_dir_content(more_nodes=True) + # the handoff node is considered normal because it doesn't have an + # epoch + self.assertEqual(len(dir_content['normal_dbs']), 1) + self.assertEqual(len(dir_content['shard_dbs']), 3) + + # let's replicate + self.replicators.once() + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + + # let's now check the handoff broker it should have all the shards + handoff_broker = ContainerBroker(dir_content['normal_dbs'][0]) + self.assertEqual(len(handoff_broker.get_shard_ranges()), 2) + handoff_osr = handoff_broker.get_own_shard_range(no_default=True) + self.assertIsNotNone(handoff_osr.epoch) + + def test_force_replication_of_a_reset_own_shard_range(self): + obj_names = self._make_object_names(100) + self.put_objects(obj_names) + + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + + # run replicators first time to get sync points set + self.replicators.once() + + # sanity check: we don't have nearly enough objects for this to shard + # automatically + self.sharders_once_non_auto( + number=self.brain.node_numbers[0], + additional_args='--partitions=%s' % self.brain.part) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 0) + + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '50', '--enable', + '--minimum-shard-size', '40']) + self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) + + # "Run container-replicator to replicate them to other nodes." + self.replicators.once() + # "Run container-sharder on all nodes to shard the container." + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + + # Everybody's settled + self.assert_container_state(self.brain.nodes[0], 'sharded', 2) + self.assert_container_state(self.brain.nodes[1], 'sharded', 2) + self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_listing(obj_names) + + # Lets delete a primary to simulate a new primary and force an + # own_shard_range reset. + new_primary = self.brain.nodes[2] + db_file = self.get_db_file(self.brain.part, new_primary) + os.remove(db_file) + + # issue a new PUT to create the "new" primary container + self.brain.put_container(policy_index=int(self.policy)) + + # put a bunch of objects that should land in the primary so it'll be + # shardable (in case this makes any kind of difference). + self.put_objects(obj_names) + + # The new primary isn't considered a shard_db because it hasn't + # sunk with the other primaries yet. + dir_content = self.categorize_container_dir_content() + self.assertEqual(len(dir_content['normal_dbs']), 1) + self.assertEqual(len(dir_content['shard_dbs']), 2) + + # run the sharders incase this will trigger a reset osr + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + new_primary_broker = self.get_broker(self.brain.part, new_primary) + # Nope, still no default/reset osr + self.assertIsNone( + new_primary_broker.get_own_shard_range(no_default=True)) + + # Let's reset the osr by hand. + reset_osr = new_primary_broker.get_own_shard_range() + self.assertIsNone(reset_osr.epoch) + self.assertEqual(reset_osr.state, ShardRange.ACTIVE) + new_primary_broker.merge_shard_ranges(reset_osr) + + # now let's replicate with the old primaries + self.replicators.once() + # Pull an old primary own_shard_range + dir_content = self.categorize_container_dir_content() + old_broker = ContainerBroker(dir_content['shard_dbs'][0]) + old_osr = old_broker.get_own_shard_range() + new_primary_broker = ContainerBroker(dir_content['normal_dbs'][0]) + new_osr = new_primary_broker.get_own_shard_range() + + # This version stops replicating a remote non-epoch osr over a local + # epoched osr. But it doesn't do the other way. So it means the + # primary with non-epoched OSR get's stuck with it, if it is newer then + # the other epoched versions. + self.assertIsNotNone(old_osr.epoch) + self.assertEqual(old_osr.state, ShardRange.SHARDED) + + self.assertIsNone(new_osr.epoch) + self.assertGreater(new_osr.timestamp, old_osr.timestamp) + + def test_manage_shard_ranges_missing_epoch_no_false_positives(self): + # when one replica of a shard is sharding before the others, it's epoch + # is not None but it is normal for the other replica to replicate to it + # sending their own shard ranges with epoch=None until they also shard + obj_names = self._make_object_names(4) + self.put_objects(obj_names) + client.post_container(self.url, self.admin_token, self.container_name, + headers={'X-Container-Sharding': 'on'}) + # run replicators first time to get sync points set, and get container + # sharded into 4 shards + self.replicators.once() + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + self.get_db_file(self.brain.part, self.brain.nodes[0]), + 'find_and_replace', '2', '--enable']) + ranges = self.assert_container_state( + self.brain.nodes[0], 'unsharded', 2) + + # "Run container-replicator to replicate them to other nodes." + self.replicators.once() + # "Run container-sharder on all nodes to shard the container." + self.sharders_once_non_auto( + additional_args='--partitions=%s' % self.brain.part) + # Run them again, just so the shards themselves can pull down the + # latest sharded versions of their OSRs. + self.sharders_once_non_auto() + + # Everybody's settled + self.assert_container_state(self.brain.nodes[0], 'sharded', 2) + self.assert_container_state(self.brain.nodes[1], 'sharded', 2) + ranges = self.assert_container_state(self.brain.nodes[2], 'sharded', 2) + self.assert_container_listing(obj_names) + + # Now we need to shard a shard. A shard's OSR always exist and should + # have an epoch of None, so we should get some false positives. + # we'll shard ranges[1] which have a range of objs-0002 - MAX + shard_obj_names = ['objs-0001%d' % i for i in range(2)] + self.put_objects(shard_obj_names) + + part, shard_node_numbers = self.get_part_and_node_numbers(ranges[1]) + shard_nodes = self.brain.ring.get_part_nodes(part) + shard_broker = self.get_shard_broker(ranges[1], 0) + # set the account, container instance variables + shard_broker.get_info() + self.replicators.once() + self.assert_subprocess_success([ + 'swift-manage-shard-ranges', + shard_broker.db_file, + 'find_and_replace', '2', '--enable']) + self.assert_container_state( + shard_nodes[0], 'unsharded', 2, + shard_broker.account, shard_broker.container, part) + + # index 0 has an epoch now but 1 and 2 don't + for idx in 1, 2: + sb = self.get_shard_broker(ranges[1], idx) + osr = sb.get_own_shard_range(no_default=True) + self.assertIsNone(osr.epoch) + + expected_false_positive_line_snippet = 'Ignoring remote osr w/o epoch:' + # run the replicator on the node with an epoch and it'll complain the + # others dont have an epoch and not set it. + replicator = self.run_custom_daemon( + ContainerReplicator, 'container-replicator', + shard_node_numbers[0], {}) + warnings = replicator.logger.get_lines_for_level('warning') + + self.assertFalse([w for w in warnings + if expected_false_positive_line_snippet in w]) + + # But it does send the new OSR with an epoch so the others should all + # have it now. + for idx in 1, 2: + sb = self.get_shard_broker(ranges[1], idx) + osr = sb.get_own_shard_range(no_default=True) + self.assertIsNotNone(osr.epoch) + def test_manage_shard_ranges_deleted_child_and_parent_gap(self): # Test to produce a scenario where a parent container is stuck at # sharding because of a gap in shard ranges. And the gap is caused by diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 7a14e88251..097f711dec 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -36,7 +36,8 @@ import six from swift.common.exceptions import LockTimeout from swift.container.backend import ContainerBroker, \ update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \ - COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges + COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges, \ + merge_shards from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \ TombstoneReclaimer, GreenDBCursor from swift.common.request_helpers import get_reserved_name @@ -6976,11 +6977,180 @@ class TestUpdateNewItemFromExisting(unittest.TestCase): class TestModuleFunctions(unittest.TestCase): + def setUp(self): + super(TestModuleFunctions, self).setUp() + self.ts_iter = make_timestamp_iter() + self.ts = [next(self.ts_iter).internal for _ in range(10)] + + def test_merge_shards_existing_none(self): + data = dict(ShardRange('a/o', self.ts[1]), reported=True) + exp_data = dict(data) + self.assertTrue(merge_shards(data, None)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_lt(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[1]), reported=True) + exp_data = dict(data, reported=False) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_gt(self): + existing = dict(ShardRange('a/o', self.ts[1])) + data = dict(ShardRange('a/o', self.ts[0]), reported=True) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + # existing timestamp trumps data state_timestamp + data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE, + state_timestamp=self.ts[2]) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + # existing timestamp trumps data meta_timestamp + data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE, + meta_timestamp=self.ts[2]) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_merge_reported(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[0]), reported=False) + exp_data = dict(data) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + data = dict(ShardRange('a/o', self.ts[0]), reported=True) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_retain_bounds(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[0]), lower='l', upper='u') + exp_data = dict(data, lower='', upper='') + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_retain_deleted(self): + existing = dict(ShardRange('a/o', self.ts[0])) + data = dict(ShardRange('a/o', self.ts[0]), deleted=1) + exp_data = dict(data, deleted=0) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_meta_ts_gte(self): + existing = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1], + object_count=1, bytes_used=2, tombstones=3)) + data = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1], + object_count=10, bytes_used=20, tombstones=30)) + exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + existing = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2], + object_count=1, bytes_used=2, tombstones=3)) + exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3, + meta_timestamp=self.ts[2]) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_meta_ts_lt(self): + existing = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1], + object_count=1, bytes_used=2, tombstones=3, + epoch=self.ts[3])) + data = dict( + ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2], + object_count=10, bytes_used=20, tombstones=30, + epoch=None)) + exp_data = dict(data, epoch=self.ts[3]) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_state_ts_eq(self): + # data has more advanced state + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=self.ts[5])) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + # data has less advanced state + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.FOUND, epoch=self.ts[5])) + exp_data = dict(data, state=ShardRange.CREATED, epoch=self.ts[4]) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_state_ts_gt(self): + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=self.ts[5])) + exp_data = dict(data, state_timestamp=self.ts[2], + state=ShardRange.CREATED, epoch=self.ts[4]) + self.assertFalse(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_existing_ts_eq_state_ts_lt(self): + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[0], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=self.ts[5])) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + + def test_merge_shards_epoch_reset(self): + # not sure if these scenarios are realistic, but we have seen epoch + # resets in prod + # same timestamps, data has more advanced state but no epoch + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.ACTIVE, epoch=None)) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + self.assertIsNone(exp_data['epoch']) + + # data has more advanced state_timestamp but no epoch + existing = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1], + state=ShardRange.CREATED, epoch=self.ts[4])) + data = dict( + ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2], + state=ShardRange.FOUND, epoch=None)) + exp_data = dict(data) + self.assertTrue(merge_shards(data, existing)) + self.assertEqual(exp_data, data) + self.assertIsNone(exp_data['epoch']) + def test_sift_shard_ranges(self): - ts_iter = make_timestamp_iter() existing_shards = {} - sr1 = dict(ShardRange('a/o', next(ts_iter).internal)) - sr2 = dict(ShardRange('a/o2', next(ts_iter).internal)) + sr1 = dict(ShardRange('a/o', next(self.ts_iter).internal)) + sr2 = dict(ShardRange('a/o2', next(self.ts_iter).internal)) new_shard_ranges = [sr1, sr2] # first empty existing shards will just add the shards @@ -6994,7 +7164,7 @@ class TestModuleFunctions(unittest.TestCase): # if there is a newer version in the existing shards then it won't be # added to to_add existing_shards['a/o'] = dict( - ShardRange('a/o', next(ts_iter).internal)) + ShardRange('a/o', next(self.ts_iter).internal)) to_add, to_delete = sift_shard_ranges(new_shard_ranges, existing_shards) self.assertEqual([sr2], list(to_add)) @@ -7002,7 +7172,7 @@ class TestModuleFunctions(unittest.TestCase): # But if a newer version is in new_shard_ranges then the old will be # added to to_delete and new is added to to_add. - sr1['timestamp'] = next(ts_iter).internal + sr1['timestamp'] = next(self.ts_iter).internal to_add, to_delete = sift_shard_ranges(new_shard_ranges, existing_shards) self.assertEqual(2, len(to_add)) diff --git a/test/unit/container/test_replicator.py b/test/unit/container/test_replicator.py index 2cb139dc3c..4c84fcb9dd 100644 --- a/test/unit/container/test_replicator.py +++ b/test/unit/container/test_replicator.py @@ -32,6 +32,7 @@ from swift.container.reconciler import ( from swift.common.utils import Timestamp, encode_timestamps, ShardRange, \ get_db_files, make_db_file_path from swift.common.storage_policy import POLICIES +from test import annotate_failure from test.debug_logger import debug_logger from test.unit.common import test_db_replicator @@ -1432,6 +1433,130 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync): daemon.logger.get_lines_for_level('debug')) daemon.logger.clear() + def test_sync_shard_ranges_merge_remote_osr(self): + def do_test(local_osr, remote_osr, exp_merge, exp_warning, + exp_rpc_warning): + put_timestamp = Timestamp.now().internal + # create "local" broker + broker = self._get_broker('a', 'c', node_index=0) + broker.initialize(put_timestamp, POLICIES.default.idx) + # create "remote" broker + remote_broker = self._get_broker('a', 'c', node_index=1) + remote_broker.initialize(put_timestamp, POLICIES.default.idx) + + bounds = (('', 'g'), ('g', 'r'), ('r', '')) + shard_ranges = [ + ShardRange('.shards_a/sr-%s' % upper, Timestamp.now(), lower, + upper, i + 1, 10 * (i + 1)) + for i, (lower, upper) in enumerate(bounds) + ] + + for db in (broker, remote_broker): + db.merge_shard_ranges(shard_ranges) + + if local_osr: + broker.merge_shard_ranges(ShardRange(**dict(local_osr))) + if remote_osr: + remote_broker.merge_shard_ranges( + ShardRange(**dict(remote_osr))) + + daemon = replicator.ContainerReplicator({}, logger=debug_logger()) + part, remote_node = self._get_broker_part_node(remote_broker) + part, local_node = self._get_broker_part_node(broker) + info = broker.get_replication_info() + success = daemon._repl_to_node(remote_node, broker, part, info) + self.assertTrue(success) + local_info = self._get_broker( + 'a', 'c', node_index=0).get_info() + remote_info = self._get_broker( + 'a', 'c', node_index=1).get_info() + for k, v in local_info.items(): + if k == 'id': + continue + self.assertEqual(remote_info[k], v, + "mismatch remote %s %r != %r" % ( + k, remote_info[k], v)) + actual_osr = broker.get_own_shard_range(no_default=True) + actual_osr = dict(actual_osr) if actual_osr else actual_osr + if exp_merge: + exp_osr = (dict(remote_osr, meta_timestamp=mock.ANY) + if remote_osr else remote_osr) + else: + exp_osr = (dict(local_osr, meta_timestamp=mock.ANY) + if local_osr else local_osr) + self.assertEqual(exp_osr, actual_osr) + lines = daemon.logger.get_lines_for_level('warning') + if exp_warning: + self.assertEqual(len(lines), 1, lines) + self.assertIn("Ignoring remote osr w/o epoch", lines[0]) + self.assertIn("own_sr: ", lines[0]) + self.assertIn("'epoch': '%s'" % local_osr.epoch.normal, + lines[0]) + self.assertIn("remote_sr: ", lines[0]) + self.assertIn("'epoch': None", lines[0]) + hash_ = os.path.splitext(os.path.basename(broker.db_file))[0] + url = "%s/%s/%s/%s" % ( + remote_node['ip'], remote_node['device'], part, hash_) + self.assertIn("source: %s" % url, lines[0]) + else: + self.assertFalse(lines) + lines = self.rpc.logger.get_lines_for_level('warning') + if exp_rpc_warning: + self.assertEqual(len(lines), 1, lines) + self.assertIn("Ignoring remote osr w/o epoch", lines[0]) + self.assertIn("source: repl_req", lines[0]) + else: + self.assertFalse(lines) + + os.remove(broker.db_file) + os.remove(remote_broker.db_file) + return daemon + + # we'll use other broker as a template to use the "default" osrs + other_broker = self._get_broker('a', 'c', node_index=2) + other_broker.initialize(Timestamp.now().internal, POLICIES.default.idx) + default_osr = other_broker.get_own_shard_range() + self.assertIsNone(default_osr.epoch) + osr_with_epoch = other_broker.get_own_shard_range() + osr_with_epoch.epoch = Timestamp.now() + osr_with_different_epoch = other_broker.get_own_shard_range() + osr_with_different_epoch.epoch = Timestamp.now() + default_osr_newer = ShardRange(**dict(default_osr)) + default_osr_newer.timestamp = Timestamp.now() + + # local_osr, remote_osr, exp_merge, exp_warning, exp_rpc_warning + tests = ( + # First the None case, ie no osrs + (None, None, False, False, False), + # Default and not the other + (None, default_osr, True, False, False), + (default_osr, None, False, False, False), + (default_osr, default_osr, True, False, False), + (default_osr, None, False, False, False), + # With an epoch and no OSR is also fine + (None, osr_with_epoch, True, False, False), + (osr_with_epoch, None, False, False, False), + # even with the same or different epochs + (osr_with_epoch, osr_with_epoch, True, False, False), + (osr_with_epoch, osr_with_different_epoch, True, False, False), + # But if local does have an epoch but the remote doesn't: false + # positive, nothing will merge anyway, no warning. + (osr_with_epoch, default_osr, False, False, False), + # It's also OK if the remote has an epoch but not the local, + # this also works on the RPC side because merge_shards happen on + # to local then sends updated shards to the remote. So if the + # OSR on the remote is newer then the default the RPC side will + # actually get a merged OSR, ie get the remote one back. + (default_osr, osr_with_epoch, True, False, False), + # But if the local default is newer then the epoched remote side + # we'd get an error logged on the RPC side and the local is newer + # so wil fail to merge + (default_osr_newer, osr_with_epoch, False, False, True), + ) + for i, params in enumerate(tests): + with annotate_failure((i, params)): + do_test(*params) + def test_sync_shard_ranges(self): put_timestamp = Timestamp.now().internal # create "local" broker