Merge "sharding: don't replace own_shard_range without an epoch"

This commit is contained in:
Zuul 2024-02-08 01:04:58 +00:00 committed by Gerrit Code Review
commit 4d3f9fe952
4 changed files with 576 additions and 18 deletions

View File

@ -20,7 +20,8 @@ from eventlet import Timeout
from random import choice
from swift.container.sync_store import ContainerSyncStore
from swift.container.backend import ContainerBroker, DATADIR, SHARDED
from swift.container.backend import ContainerBroker, DATADIR, SHARDED, \
merge_shards
from swift.container.reconciler import (
MISPLACED_OBJECTS_ACCOUNT, incorrect_policy_index,
get_reconciler_container_name, get_row_to_q_entry_translator)
@ -31,6 +32,35 @@ from swift.common.http import is_success
from swift.common.utils import Timestamp, majority_size, get_db_files
def check_merge_own_shard_range(shards, broker, logger, source):
"""
If broker has own_shard_range *with an epoch* then filter out an
own_shard_range *without an epoch*, and log a warning about it.
:param shards: a list of candidate ShardRanges to merge
:param broker: a ContainerBroker
:param logger: a logger
:param source: string to log as source of shards
:return: a list of ShardRanges to actually merge
"""
# work-around for https://bugs.launchpad.net/swift/+bug/1980451
own_sr = broker.get_own_shard_range()
if own_sr.epoch is None:
return shards
to_merge = []
for shard in shards:
if shard['name'] == own_sr.name and not shard['epoch']:
shard_copy = dict(shard)
new_content = merge_shards(shard_copy, dict(own_sr))
if new_content and shard_copy['epoch'] is None:
logger.warning(
'Ignoring remote osr w/o epoch, own_sr: %r, remote_sr: %r,'
' source: %s', dict(own_sr), shard, source)
continue
to_merge.append(shard)
return to_merge
class ContainerReplicator(db_replicator.Replicator):
server_type = 'container'
brokerclass = ContainerBroker
@ -138,8 +168,10 @@ class ContainerReplicator(db_replicator.Replicator):
with Timeout(self.node_timeout):
response = http.replicate('get_shard_ranges')
if response and is_success(response.status):
broker.merge_shard_ranges(json.loads(
response.data.decode('ascii')))
shards = json.loads(response.data.decode('ascii'))
shards = check_merge_own_shard_range(
shards, broker, self.logger, '%s%s' % (http.host, http.path))
broker.merge_shard_ranges(shards)
def find_local_handoff_for_part(self, part):
"""
@ -394,11 +426,15 @@ class ContainerReplicatorRpc(db_replicator.ReplicatorRpc):
def _post_rsync_then_merge_hook(self, existing_broker, new_broker):
# Note the following hook will need to change to using a pointer and
# limit in the future.
new_broker.merge_shard_ranges(
existing_broker.get_all_shard_range_data())
shards = existing_broker.get_all_shard_range_data()
shards = check_merge_own_shard_range(
shards, new_broker, self.logger, 'rsync')
new_broker.merge_shard_ranges(shards)
def merge_shard_ranges(self, broker, args):
broker.merge_shard_ranges(args[0])
shards = check_merge_own_shard_range(
args[0], broker, self.logger, 'repl_req')
broker.merge_shard_ranges(shards)
return HTTPAccepted()
def get_shard_ranges(self, broker, args):

View File

@ -28,8 +28,8 @@ from swift.common.header_key_dict import HeaderKeyDict
from swift.common.internal_client import UnexpectedResponse
from swift.common.manager import Manager
from swift.common.memcached import MemcacheRing
from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
quorum_size, config_true_value, Timestamp, md5, Namespace
from swift.common.utils import ShardRange, parse_db_filename, quorum_size, \
config_true_value, Timestamp, md5, Namespace
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
SHARDED
from swift.container.sharder import CleavingContext, ContainerSharder
@ -244,9 +244,10 @@ class BaseTestContainerSharding(ReplProbeTest):
def get_db_file(self, part, node, account=None, container=None):
container_dir, container_hash = self.get_storage_dir(
part, node, account=account, container=container)
db_file = os.path.join(container_dir, container_hash + '.db')
self.assertTrue(get_db_files(db_file)) # sanity check
return db_file
for f in os.listdir(container_dir):
path = os.path.join(container_dir, f)
if path.endswith('.db'):
return path
def get_broker(self, part, node, account=None, container=None):
return ContainerBroker(
@ -259,10 +260,13 @@ class BaseTestContainerSharding(ReplProbeTest):
shard_part, shard_nodes[node_index], shard_range.account,
shard_range.container)
def categorize_container_dir_content(self, account=None, container=None):
def categorize_container_dir_content(self, account=None, container=None,
more_nodes=False):
account = account or self.brain.account
container = container or self.container_name
part, nodes = self.brain.ring.get_nodes(account, container)
if more_nodes:
nodes.extend(self.brain.ring.get_more_nodes(part))
storage_dirs = [
self.get_storage_dir(part, node, account=account,
container=container)[0]
@ -4047,6 +4051,229 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
broker.get_shard_usage()['object_count'])
self.assertFalse(broker.is_deleted())
def test_handoff_replication_does_not_cause_reset_epoch(self):
obj_names = self._make_object_names(100)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set
self.replicators.once()
# sanity check: we don't have nearly enough objects for this to shard
# automatically
self.sharders_once_non_auto(
number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(self.brain.nodes[0], 'unsharded', 0)
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '50', '--enable',
'--minimum-shard-size', '40'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
self.replicators.once()
# "Run container-sharder on all nodes to shard the container."
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
self.assert_container_listing(obj_names)
# now lets put the container again and make sure it lands on a handoff
self.brain.stop_primary_half()
self.brain.put_container(policy_index=int(self.policy))
self.brain.start_primary_half()
dir_content = self.categorize_container_dir_content(more_nodes=True)
# the handoff node is considered normal because it doesn't have an
# epoch
self.assertEqual(len(dir_content['normal_dbs']), 1)
self.assertEqual(len(dir_content['shard_dbs']), 3)
# let's replicate
self.replicators.once()
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# let's now check the handoff broker it should have all the shards
handoff_broker = ContainerBroker(dir_content['normal_dbs'][0])
self.assertEqual(len(handoff_broker.get_shard_ranges()), 2)
handoff_osr = handoff_broker.get_own_shard_range(no_default=True)
self.assertIsNotNone(handoff_osr.epoch)
def test_force_replication_of_a_reset_own_shard_range(self):
obj_names = self._make_object_names(100)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set
self.replicators.once()
# sanity check: we don't have nearly enough objects for this to shard
# automatically
self.sharders_once_non_auto(
number=self.brain.node_numbers[0],
additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(self.brain.nodes[0], 'unsharded', 0)
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '50', '--enable',
'--minimum-shard-size', '40'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
self.replicators.once()
# "Run container-sharder on all nodes to shard the container."
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
self.assert_container_listing(obj_names)
# Lets delete a primary to simulate a new primary and force an
# own_shard_range reset.
new_primary = self.brain.nodes[2]
db_file = self.get_db_file(self.brain.part, new_primary)
os.remove(db_file)
# issue a new PUT to create the "new" primary container
self.brain.put_container(policy_index=int(self.policy))
# put a bunch of objects that should land in the primary so it'll be
# shardable (in case this makes any kind of difference).
self.put_objects(obj_names)
# The new primary isn't considered a shard_db because it hasn't
# sunk with the other primaries yet.
dir_content = self.categorize_container_dir_content()
self.assertEqual(len(dir_content['normal_dbs']), 1)
self.assertEqual(len(dir_content['shard_dbs']), 2)
# run the sharders incase this will trigger a reset osr
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
new_primary_broker = self.get_broker(self.brain.part, new_primary)
# Nope, still no default/reset osr
self.assertIsNone(
new_primary_broker.get_own_shard_range(no_default=True))
# Let's reset the osr by hand.
reset_osr = new_primary_broker.get_own_shard_range()
self.assertIsNone(reset_osr.epoch)
self.assertEqual(reset_osr.state, ShardRange.ACTIVE)
new_primary_broker.merge_shard_ranges(reset_osr)
# now let's replicate with the old primaries
self.replicators.once()
# Pull an old primary own_shard_range
dir_content = self.categorize_container_dir_content()
old_broker = ContainerBroker(dir_content['shard_dbs'][0])
old_osr = old_broker.get_own_shard_range()
new_primary_broker = ContainerBroker(dir_content['normal_dbs'][0])
new_osr = new_primary_broker.get_own_shard_range()
# This version stops replicating a remote non-epoch osr over a local
# epoched osr. But it doesn't do the other way. So it means the
# primary with non-epoched OSR get's stuck with it, if it is newer then
# the other epoched versions.
self.assertIsNotNone(old_osr.epoch)
self.assertEqual(old_osr.state, ShardRange.SHARDED)
self.assertIsNone(new_osr.epoch)
self.assertGreater(new_osr.timestamp, old_osr.timestamp)
def test_manage_shard_ranges_missing_epoch_no_false_positives(self):
# when one replica of a shard is sharding before the others, it's epoch
# is not None but it is normal for the other replica to replicate to it
# sending their own shard ranges with epoch=None until they also shard
obj_names = self._make_object_names(4)
self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name,
headers={'X-Container-Sharding': 'on'})
# run replicators first time to get sync points set, and get container
# sharded into 4 shards
self.replicators.once()
self.assert_subprocess_success([
'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '2', '--enable'])
ranges = self.assert_container_state(
self.brain.nodes[0], 'unsharded', 2)
# "Run container-replicator to replicate them to other nodes."
self.replicators.once()
# "Run container-sharder on all nodes to shard the container."
self.sharders_once_non_auto(
additional_args='--partitions=%s' % self.brain.part)
# Run them again, just so the shards themselves can pull down the
# latest sharded versions of their OSRs.
self.sharders_once_non_auto()
# Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2)
ranges = self.assert_container_state(self.brain.nodes[2], 'sharded', 2)
self.assert_container_listing(obj_names)
# Now we need to shard a shard. A shard's OSR always exist and should
# have an epoch of None, so we should get some false positives.
# we'll shard ranges[1] which have a range of objs-0002 - MAX
shard_obj_names = ['objs-0001%d' % i for i in range(2)]
self.put_objects(shard_obj_names)
part, shard_node_numbers = self.get_part_and_node_numbers(ranges[1])
shard_nodes = self.brain.ring.get_part_nodes(part)
shard_broker = self.get_shard_broker(ranges[1], 0)
# set the account, container instance variables
shard_broker.get_info()
self.replicators.once()
self.assert_subprocess_success([
'swift-manage-shard-ranges',
shard_broker.db_file,
'find_and_replace', '2', '--enable'])
self.assert_container_state(
shard_nodes[0], 'unsharded', 2,
shard_broker.account, shard_broker.container, part)
# index 0 has an epoch now but 1 and 2 don't
for idx in 1, 2:
sb = self.get_shard_broker(ranges[1], idx)
osr = sb.get_own_shard_range(no_default=True)
self.assertIsNone(osr.epoch)
expected_false_positive_line_snippet = 'Ignoring remote osr w/o epoch:'
# run the replicator on the node with an epoch and it'll complain the
# others dont have an epoch and not set it.
replicator = self.run_custom_daemon(
ContainerReplicator, 'container-replicator',
shard_node_numbers[0], {})
warnings = replicator.logger.get_lines_for_level('warning')
self.assertFalse([w for w in warnings
if expected_false_positive_line_snippet in w])
# But it does send the new OSR with an epoch so the others should all
# have it now.
for idx in 1, 2:
sb = self.get_shard_broker(ranges[1], idx)
osr = sb.get_own_shard_range(no_default=True)
self.assertIsNotNone(osr.epoch)
def test_manage_shard_ranges_deleted_child_and_parent_gap(self):
# Test to produce a scenario where a parent container is stuck at
# sharding because of a gap in shard ranges. And the gap is caused by

View File

@ -36,7 +36,8 @@ import six
from swift.common.exceptions import LockTimeout
from swift.container.backend import ContainerBroker, \
update_new_item_from_existing, UNSHARDED, SHARDING, SHARDED, \
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges
COLLAPSED, SHARD_LISTING_STATES, SHARD_UPDATE_STATES, sift_shard_ranges, \
merge_shards
from swift.common.db import DatabaseAlreadyExists, GreenDBConnection, \
TombstoneReclaimer, GreenDBCursor
from swift.common.request_helpers import get_reserved_name
@ -6976,11 +6977,180 @@ class TestUpdateNewItemFromExisting(unittest.TestCase):
class TestModuleFunctions(unittest.TestCase):
def setUp(self):
super(TestModuleFunctions, self).setUp()
self.ts_iter = make_timestamp_iter()
self.ts = [next(self.ts_iter).internal for _ in range(10)]
def test_merge_shards_existing_none(self):
data = dict(ShardRange('a/o', self.ts[1]), reported=True)
exp_data = dict(data)
self.assertTrue(merge_shards(data, None))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_lt(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[1]), reported=True)
exp_data = dict(data, reported=False)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_gt(self):
existing = dict(ShardRange('a/o', self.ts[1]))
data = dict(ShardRange('a/o', self.ts[0]), reported=True)
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
# existing timestamp trumps data state_timestamp
data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE,
state_timestamp=self.ts[2])
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
# existing timestamp trumps data meta_timestamp
data = dict(ShardRange('a/o', self.ts[0]), state=ShardRange.ACTIVE,
meta_timestamp=self.ts[2])
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_merge_reported(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[0]), reported=False)
exp_data = dict(data)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
data = dict(ShardRange('a/o', self.ts[0]), reported=True)
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_retain_bounds(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[0]), lower='l', upper='u')
exp_data = dict(data, lower='', upper='')
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_retain_deleted(self):
existing = dict(ShardRange('a/o', self.ts[0]))
data = dict(ShardRange('a/o', self.ts[0]), deleted=1)
exp_data = dict(data, deleted=0)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_meta_ts_gte(self):
existing = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1],
object_count=1, bytes_used=2, tombstones=3))
data = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1],
object_count=10, bytes_used=20, tombstones=30))
exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3)
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
existing = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2],
object_count=1, bytes_used=2, tombstones=3))
exp_data = dict(data, object_count=1, bytes_used=2, tombstones=3,
meta_timestamp=self.ts[2])
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_meta_ts_lt(self):
existing = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[1],
object_count=1, bytes_used=2, tombstones=3,
epoch=self.ts[3]))
data = dict(
ShardRange('a/o', self.ts[0], meta_timestamp=self.ts[2],
object_count=10, bytes_used=20, tombstones=30,
epoch=None))
exp_data = dict(data, epoch=self.ts[3])
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_state_ts_eq(self):
# data has more advanced state
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=self.ts[5]))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
# data has less advanced state
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.FOUND, epoch=self.ts[5]))
exp_data = dict(data, state=ShardRange.CREATED, epoch=self.ts[4])
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_state_ts_gt(self):
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=self.ts[5]))
exp_data = dict(data, state_timestamp=self.ts[2],
state=ShardRange.CREATED, epoch=self.ts[4])
self.assertFalse(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_existing_ts_eq_state_ts_lt(self):
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[0],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=self.ts[5]))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
def test_merge_shards_epoch_reset(self):
# not sure if these scenarios are realistic, but we have seen epoch
# resets in prod
# same timestamps, data has more advanced state but no epoch
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.ACTIVE, epoch=None))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
self.assertIsNone(exp_data['epoch'])
# data has more advanced state_timestamp but no epoch
existing = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[1],
state=ShardRange.CREATED, epoch=self.ts[4]))
data = dict(
ShardRange('a/o', self.ts[0], state_timestamp=self.ts[2],
state=ShardRange.FOUND, epoch=None))
exp_data = dict(data)
self.assertTrue(merge_shards(data, existing))
self.assertEqual(exp_data, data)
self.assertIsNone(exp_data['epoch'])
def test_sift_shard_ranges(self):
ts_iter = make_timestamp_iter()
existing_shards = {}
sr1 = dict(ShardRange('a/o', next(ts_iter).internal))
sr2 = dict(ShardRange('a/o2', next(ts_iter).internal))
sr1 = dict(ShardRange('a/o', next(self.ts_iter).internal))
sr2 = dict(ShardRange('a/o2', next(self.ts_iter).internal))
new_shard_ranges = [sr1, sr2]
# first empty existing shards will just add the shards
@ -6994,7 +7164,7 @@ class TestModuleFunctions(unittest.TestCase):
# if there is a newer version in the existing shards then it won't be
# added to to_add
existing_shards['a/o'] = dict(
ShardRange('a/o', next(ts_iter).internal))
ShardRange('a/o', next(self.ts_iter).internal))
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual([sr2], list(to_add))
@ -7002,7 +7172,7 @@ class TestModuleFunctions(unittest.TestCase):
# But if a newer version is in new_shard_ranges then the old will be
# added to to_delete and new is added to to_add.
sr1['timestamp'] = next(ts_iter).internal
sr1['timestamp'] = next(self.ts_iter).internal
to_add, to_delete = sift_shard_ranges(new_shard_ranges,
existing_shards)
self.assertEqual(2, len(to_add))

View File

@ -32,6 +32,7 @@ from swift.container.reconciler import (
from swift.common.utils import Timestamp, encode_timestamps, ShardRange, \
get_db_files, make_db_file_path
from swift.common.storage_policy import POLICIES
from test import annotate_failure
from test.debug_logger import debug_logger
from test.unit.common import test_db_replicator
@ -1432,6 +1433,130 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
daemon.logger.get_lines_for_level('debug'))
daemon.logger.clear()
def test_sync_shard_ranges_merge_remote_osr(self):
def do_test(local_osr, remote_osr, exp_merge, exp_warning,
exp_rpc_warning):
put_timestamp = Timestamp.now().internal
# create "local" broker
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
# create "remote" broker
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
bounds = (('', 'g'), ('g', 'r'), ('r', ''))
shard_ranges = [
ShardRange('.shards_a/sr-%s' % upper, Timestamp.now(), lower,
upper, i + 1, 10 * (i + 1))
for i, (lower, upper) in enumerate(bounds)
]
for db in (broker, remote_broker):
db.merge_shard_ranges(shard_ranges)
if local_osr:
broker.merge_shard_ranges(ShardRange(**dict(local_osr)))
if remote_osr:
remote_broker.merge_shard_ranges(
ShardRange(**dict(remote_osr)))
daemon = replicator.ContainerReplicator({}, logger=debug_logger())
part, remote_node = self._get_broker_part_node(remote_broker)
part, local_node = self._get_broker_part_node(broker)
info = broker.get_replication_info()
success = daemon._repl_to_node(remote_node, broker, part, info)
self.assertTrue(success)
local_info = self._get_broker(
'a', 'c', node_index=0).get_info()
remote_info = self._get_broker(
'a', 'c', node_index=1).get_info()
for k, v in local_info.items():
if k == 'id':
continue
self.assertEqual(remote_info[k], v,
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
actual_osr = broker.get_own_shard_range(no_default=True)
actual_osr = dict(actual_osr) if actual_osr else actual_osr
if exp_merge:
exp_osr = (dict(remote_osr, meta_timestamp=mock.ANY)
if remote_osr else remote_osr)
else:
exp_osr = (dict(local_osr, meta_timestamp=mock.ANY)
if local_osr else local_osr)
self.assertEqual(exp_osr, actual_osr)
lines = daemon.logger.get_lines_for_level('warning')
if exp_warning:
self.assertEqual(len(lines), 1, lines)
self.assertIn("Ignoring remote osr w/o epoch", lines[0])
self.assertIn("own_sr: ", lines[0])
self.assertIn("'epoch': '%s'" % local_osr.epoch.normal,
lines[0])
self.assertIn("remote_sr: ", lines[0])
self.assertIn("'epoch': None", lines[0])
hash_ = os.path.splitext(os.path.basename(broker.db_file))[0]
url = "%s/%s/%s/%s" % (
remote_node['ip'], remote_node['device'], part, hash_)
self.assertIn("source: %s" % url, lines[0])
else:
self.assertFalse(lines)
lines = self.rpc.logger.get_lines_for_level('warning')
if exp_rpc_warning:
self.assertEqual(len(lines), 1, lines)
self.assertIn("Ignoring remote osr w/o epoch", lines[0])
self.assertIn("source: repl_req", lines[0])
else:
self.assertFalse(lines)
os.remove(broker.db_file)
os.remove(remote_broker.db_file)
return daemon
# we'll use other broker as a template to use the "default" osrs
other_broker = self._get_broker('a', 'c', node_index=2)
other_broker.initialize(Timestamp.now().internal, POLICIES.default.idx)
default_osr = other_broker.get_own_shard_range()
self.assertIsNone(default_osr.epoch)
osr_with_epoch = other_broker.get_own_shard_range()
osr_with_epoch.epoch = Timestamp.now()
osr_with_different_epoch = other_broker.get_own_shard_range()
osr_with_different_epoch.epoch = Timestamp.now()
default_osr_newer = ShardRange(**dict(default_osr))
default_osr_newer.timestamp = Timestamp.now()
# local_osr, remote_osr, exp_merge, exp_warning, exp_rpc_warning
tests = (
# First the None case, ie no osrs
(None, None, False, False, False),
# Default and not the other
(None, default_osr, True, False, False),
(default_osr, None, False, False, False),
(default_osr, default_osr, True, False, False),
(default_osr, None, False, False, False),
# With an epoch and no OSR is also fine
(None, osr_with_epoch, True, False, False),
(osr_with_epoch, None, False, False, False),
# even with the same or different epochs
(osr_with_epoch, osr_with_epoch, True, False, False),
(osr_with_epoch, osr_with_different_epoch, True, False, False),
# But if local does have an epoch but the remote doesn't: false
# positive, nothing will merge anyway, no warning.
(osr_with_epoch, default_osr, False, False, False),
# It's also OK if the remote has an epoch but not the local,
# this also works on the RPC side because merge_shards happen on
# to local then sends updated shards to the remote. So if the
# OSR on the remote is newer then the default the RPC side will
# actually get a merged OSR, ie get the remote one back.
(default_osr, osr_with_epoch, True, False, False),
# But if the local default is newer then the epoched remote side
# we'd get an error logged on the RPC side and the local is newer
# so wil fail to merge
(default_osr_newer, osr_with_epoch, False, False, True),
)
for i, params in enumerate(tests):
with annotate_failure((i, params)):
do_test(*params)
def test_sync_shard_ranges(self):
put_timestamp = Timestamp.now().internal
# create "local" broker