Merge "Only try to fetch or sync shard ranges if the remote supports sharding"
This commit is contained in:
commit
cdb1736e01
@ -78,8 +78,9 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
broker.merge_timestamps(*(remote_info[key] for key in
|
broker.merge_timestamps(*(remote_info[key] for key in
|
||||||
sync_timestamps))
|
sync_timestamps))
|
||||||
|
|
||||||
# Grab remote's shard ranges, too
|
if 'shard_max_row' in remote_info:
|
||||||
self._fetch_and_merge_shard_ranges(http, broker)
|
# Grab remote's shard ranges, too
|
||||||
|
self._fetch_and_merge_shard_ranges(http, broker)
|
||||||
|
|
||||||
return super(ContainerReplicator, self)._handle_sync_response(
|
return super(ContainerReplicator, self)._handle_sync_response(
|
||||||
node, response, info, broker, http, different_region)
|
node, response, info, broker, http, different_region)
|
||||||
@ -101,8 +102,16 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
|
|
||||||
def _choose_replication_mode(self, node, rinfo, info, local_sync, broker,
|
def _choose_replication_mode(self, node, rinfo, info, local_sync, broker,
|
||||||
http, different_region):
|
http, different_region):
|
||||||
# Always replicate shard ranges
|
if 'shard_max_row' in rinfo:
|
||||||
shard_range_success = self._sync_shard_ranges(broker, http, info['id'])
|
# Always replicate shard ranges to new-enough swift
|
||||||
|
shard_range_success = self._sync_shard_ranges(
|
||||||
|
broker, http, info['id'])
|
||||||
|
else:
|
||||||
|
shard_range_success = False
|
||||||
|
self.logger.warning(
|
||||||
|
'%s is unable to replicate shard ranges to peer %s; '
|
||||||
|
'peer may need upgrading', broker.db_file,
|
||||||
|
'%(ip)s:%(port)s/%(device)s' % node)
|
||||||
if broker.sharding_initiated():
|
if broker.sharding_initiated():
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'%s is able to shard -- refusing to replicate objects to peer '
|
'%s is able to shard -- refusing to replicate objects to peer '
|
||||||
|
@ -1860,6 +1860,77 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|||||||
self._check_replication_local_sharding_remote_unsharded(
|
self._check_replication_local_sharding_remote_unsharded(
|
||||||
{'per_diff': 1})
|
{'per_diff': 1})
|
||||||
|
|
||||||
|
def _check_only_sync(self, local_broker, remote_node_index, repl_conf):
|
||||||
|
daemon, repl_calls, rsync_calls = self.check_replicate(
|
||||||
|
local_broker, remote_node_index, repl_conf,
|
||||||
|
expect_success=False)
|
||||||
|
|
||||||
|
# When talking to an old (pre-2.18.0) container server, abort
|
||||||
|
# replication when we're sharding or sharded. Wait for the
|
||||||
|
# rolling upgrade that's presumably in-progress to finish instead.
|
||||||
|
self.assertEqual(1, daemon.stats['deferred'])
|
||||||
|
self.assertEqual(0, daemon.stats['diff'])
|
||||||
|
self.assertEqual(0, daemon.stats['rsync'])
|
||||||
|
self.assertEqual(['sync'],
|
||||||
|
[call[0] for call in repl_calls])
|
||||||
|
self.assertFalse(rsync_calls)
|
||||||
|
lines = daemon.logger.get_lines_for_level('warning')
|
||||||
|
self.assertIn('unable to replicate shard ranges', lines[0])
|
||||||
|
self.assertIn('refusing to replicate objects', lines[1])
|
||||||
|
self.assertFalse(lines[2:])
|
||||||
|
# sync
|
||||||
|
local_id = local_broker.get_info()['id']
|
||||||
|
self.assertEqual(local_id, repl_calls[0][1][2])
|
||||||
|
remote_broker = self._get_broker(
|
||||||
|
local_broker.account, local_broker.container, node_index=1)
|
||||||
|
self.assertNotEqual(local_id, remote_broker.get_info()['id'])
|
||||||
|
self.assertEqual([], remote_broker.get_shard_ranges())
|
||||||
|
|
||||||
|
def _check_replication_local_sharding_remote_presharding(self, repl_conf):
|
||||||
|
local_context = self._setup_replication_test(0)
|
||||||
|
self._merge_object(index=slice(0, 3), **local_context)
|
||||||
|
local_broker = local_context['broker']
|
||||||
|
epoch = Timestamp.now()
|
||||||
|
self._goto_sharding_state(local_broker, epoch)
|
||||||
|
self._merge_shard_range(index=0, **local_context)
|
||||||
|
self._merge_object(index=slice(3, 11), **local_context)
|
||||||
|
|
||||||
|
remote_context = self._setup_replication_test(1)
|
||||||
|
self._merge_object(index=11, **remote_context)
|
||||||
|
|
||||||
|
orig_get_remote_info = \
|
||||||
|
replicator.ContainerReplicatorRpc._get_synced_replication_info
|
||||||
|
|
||||||
|
def presharding_get_remote_info(*args):
|
||||||
|
rinfo = orig_get_remote_info(*args)
|
||||||
|
del rinfo['shard_max_row']
|
||||||
|
return rinfo
|
||||||
|
|
||||||
|
with mock.patch('swift.container.replicator.'
|
||||||
|
'ContainerReplicatorRpc._get_synced_replication_info',
|
||||||
|
presharding_get_remote_info):
|
||||||
|
self._check_only_sync(local_broker, 1, repl_conf)
|
||||||
|
|
||||||
|
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||||
|
self.assertEqual(
|
||||||
|
[remote_broker._db_file], get_db_files(remote_broker.db_file))
|
||||||
|
self.assertEqual(remote_context['objects'][11:12],
|
||||||
|
remote_broker.get_objects())
|
||||||
|
|
||||||
|
self.assert_info_synced(
|
||||||
|
local_broker, 1,
|
||||||
|
mismatches=['db_state', 'object_count', 'bytes_used',
|
||||||
|
'status_changed_at', 'hash'])
|
||||||
|
|
||||||
|
self._check_only_sync(local_broker, 1, repl_conf)
|
||||||
|
|
||||||
|
def test_replication_local_sharding_remote_presharding(self):
|
||||||
|
self._check_replication_local_sharding_remote_presharding({})
|
||||||
|
|
||||||
|
def test_replication_local_sharding_remote_presharding_large_diff(self):
|
||||||
|
self._check_replication_local_sharding_remote_presharding(
|
||||||
|
{'per_diff': 1})
|
||||||
|
|
||||||
def _check_replication_local_sharding_remote_sharding(self, repl_conf):
|
def _check_replication_local_sharding_remote_sharding(self, repl_conf):
|
||||||
local_context = self._setup_replication_test(0)
|
local_context = self._setup_replication_test(0)
|
||||||
self._merge_object(index=slice(0, 5), **local_context)
|
self._merge_object(index=slice(0, 5), **local_context)
|
||||||
|
Loading…
x
Reference in New Issue
Block a user