Merge "Add unit tests for replicator sync_shard_ranges"
This commit is contained in:
@@ -23,6 +23,7 @@ import random
|
||||
import sqlite3
|
||||
|
||||
from swift.common import db_replicator
|
||||
from swift.common.swob import HTTPServerError
|
||||
from swift.container import replicator, backend, server, sync_store
|
||||
from swift.container.reconciler import (
|
||||
MISPLACED_OBJECTS_ACCOUNT, get_reconciler_container_name)
|
||||
@@ -32,7 +33,7 @@ from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit.common import test_db_replicator
|
||||
from test.unit import patch_policies, make_timestamp_iter, mock_check_drive, \
|
||||
debug_logger
|
||||
debug_logger, EMPTY_ETAG, FakeLogger
|
||||
from contextlib import contextmanager
|
||||
|
||||
from test.unit.common.test_db_replicator import attach_fake_replication_rpc
|
||||
@@ -1355,6 +1356,68 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
# reverse replication direction and expect syncs to propagate
|
||||
check_replicate(remote_broker_ranges, remote_broker, broker)
|
||||
|
||||
def test_sync_shard_ranges_error(self):
|
||||
# verify that replication is not considered successful if
|
||||
# merge_shard_ranges fails
|
||||
put_time = Timestamp.now().internal
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_time, POLICIES.default.idx)
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_time, POLICIES.default.idx)
|
||||
# put an object into local broker
|
||||
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
# get an own shard range into local broker
|
||||
broker.enable_sharding(Timestamp.now())
|
||||
self.assertFalse(broker.sharding_initiated())
|
||||
|
||||
replicate_hook = mock.MagicMock()
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, errors={'merge_shard_ranges': [HTTPServerError()]},
|
||||
replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
info = broker.get_replication_info()
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
daemon.logger = FakeLogger()
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertFalse(success)
|
||||
# broker only has its own shard range so expect objects to be sync'd
|
||||
self.assertEqual(
|
||||
['sync', 'get_shard_ranges', 'merge_shard_ranges', 'merge_items',
|
||||
'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
error_lines = daemon.logger.get_lines_for_level('error')
|
||||
self.assertIn('Bad response 500', error_lines[0])
|
||||
self.assertFalse(error_lines[1:])
|
||||
self.assertEqual(1, daemon.stats['diff'])
|
||||
self.assertEqual(1, daemon.logger.get_increment_counts()['diffs'])
|
||||
|
||||
def test_sync_shard_ranges_none_to_sync(self):
|
||||
# verify that merge_shard_ranges is not sent if there are no shard
|
||||
# ranges to sync
|
||||
put_time = Timestamp.now().internal
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_time, POLICIES.default.idx)
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_time, POLICIES.default.idx)
|
||||
# put an object into local broker
|
||||
broker.put_object('obj', Timestamp.now().internal, 0, 'text/plain',
|
||||
EMPTY_ETAG)
|
||||
|
||||
replicate_hook = mock.MagicMock()
|
||||
fake_repl_connection = attach_fake_replication_rpc(
|
||||
self.rpc, replicate_hook=replicate_hook)
|
||||
db_replicator.ReplConnection = fake_repl_connection
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
info = broker.get_replication_info()
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
self.assertEqual(
|
||||
['sync', 'get_shard_ranges', 'merge_items', 'merge_syncs'],
|
||||
[call[0][0] for call in replicate_hook.call_args_list])
|
||||
|
||||
def test_sync_shard_ranges_with_rsync(self):
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
put_timestamp = time.time()
|
||||
|
||||
Reference in New Issue
Block a user