Merge "Fix stats calculation in object-reconstructor"
This commit is contained in:
commit
517083e999
@ -800,12 +800,13 @@ class ObjectReconstructor(Daemon):
|
||||
override_devices = override_devices or []
|
||||
override_partitions = override_partitions or []
|
||||
ips = whataremyips(self.bind_ip)
|
||||
for policy in POLICIES:
|
||||
if policy.policy_type != EC_POLICY:
|
||||
continue
|
||||
self._diskfile_mgr = self._df_router[policy]
|
||||
ec_policies = (policy for policy in POLICIES
|
||||
if policy.policy_type == EC_POLICY)
|
||||
|
||||
policy2devices = {}
|
||||
|
||||
for policy in ec_policies:
|
||||
self.load_object_ring(policy)
|
||||
data_dir = get_data_dir(policy)
|
||||
local_devices = list(six.moves.filter(
|
||||
lambda dev: dev and is_local_device(
|
||||
ips, self.port,
|
||||
@ -813,21 +814,23 @@ class ObjectReconstructor(Daemon):
|
||||
policy.object_ring.devs))
|
||||
|
||||
if override_devices:
|
||||
self.device_count = len(override_devices)
|
||||
else:
|
||||
self.device_count = len(local_devices)
|
||||
local_devices = list(six.moves.filter(
|
||||
lambda dev_info: dev_info['device'] in override_devices,
|
||||
local_devices))
|
||||
|
||||
policy2devices[policy] = local_devices
|
||||
self.device_count += len(local_devices)
|
||||
|
||||
for policy, local_devices in policy2devices.items():
|
||||
df_mgr = self._df_router[policy]
|
||||
for local_dev in local_devices:
|
||||
if override_devices and (local_dev['device'] not in
|
||||
override_devices):
|
||||
continue
|
||||
self.reconstruction_device_count += 1
|
||||
dev_path = self._df_router[policy].get_dev_path(
|
||||
local_dev['device'])
|
||||
dev_path = df_mgr.get_dev_path(local_dev['device'])
|
||||
if not dev_path:
|
||||
self.logger.warning(_('%s is not mounted'),
|
||||
local_dev['device'])
|
||||
continue
|
||||
data_dir = get_data_dir(policy)
|
||||
obj_path = join(dev_path, data_dir)
|
||||
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
|
||||
unlink_older_than(tmp_path, time.time() -
|
||||
|
@ -38,7 +38,7 @@ from swift.common.bufferedhttp import http_connect
|
||||
from swift.common.daemon import Daemon
|
||||
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
|
||||
from swift.obj import ssync_sender
|
||||
from swift.obj.diskfile import DiskFileManager, get_data_dir, get_tmp_dir
|
||||
from swift.obj.diskfile import get_data_dir, get_tmp_dir, DiskFileRouter
|
||||
from swift.common.storage_policy import POLICIES, REPL_POLICY
|
||||
|
||||
DEFAULT_RSYNC_TIMEOUT = 900
|
||||
@ -121,7 +121,7 @@ class ObjectReplicator(Daemon):
|
||||
'operation, please disable handoffs_first and '
|
||||
'handoff_delete before the next '
|
||||
'normal rebalance')
|
||||
self._diskfile_mgr = DiskFileManager(conf, self.logger)
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
|
||||
def _zero_stats(self):
|
||||
"""Zero out the stats."""
|
||||
@ -406,9 +406,10 @@ class ObjectReplicator(Daemon):
|
||||
target_devs_info = set()
|
||||
failure_devs_info = set()
|
||||
begin = time.time()
|
||||
df_mgr = self._df_router[job['policy']]
|
||||
try:
|
||||
hashed, local_hash = tpool_reraise(
|
||||
self._diskfile_mgr._get_hashes, job['path'],
|
||||
df_mgr._get_hashes, job['path'],
|
||||
do_listdir=_do_listdir(
|
||||
int(job['partition']),
|
||||
self.replication_cycle),
|
||||
@ -462,7 +463,7 @@ class ObjectReplicator(Daemon):
|
||||
self.stats['hashmatch'] += 1
|
||||
continue
|
||||
hashed, recalc_hash = tpool_reraise(
|
||||
self._diskfile_mgr._get_hashes,
|
||||
df_mgr._get_hashes,
|
||||
job['path'], recalculate=suffixes,
|
||||
reclaim_age=self.reclaim_age)
|
||||
self.logger.update_stats('suffix.hashes', hashed)
|
||||
|
@ -80,7 +80,7 @@ class Sender(object):
|
||||
|
||||
def __init__(self, daemon, node, job, suffixes, remote_check_objs=None):
|
||||
self.daemon = daemon
|
||||
self.df_mgr = self.daemon._diskfile_mgr
|
||||
self.df_mgr = self.daemon._df_router[job['policy']]
|
||||
self.node = node
|
||||
self.job = job
|
||||
self.suffixes = suffixes
|
||||
|
@ -19,28 +19,9 @@ import tempfile
|
||||
import unittest
|
||||
import time
|
||||
|
||||
from swift.common import utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import diskfile
|
||||
|
||||
from test.unit import debug_logger
|
||||
|
||||
|
||||
class FakeReplicator(object):
|
||||
def __init__(self, testdir, policy=None):
|
||||
self.logger = debug_logger('test-ssync-sender')
|
||||
self.conn_timeout = 1
|
||||
self.node_timeout = 2
|
||||
self.http_timeout = 3
|
||||
self.network_chunk_size = 65536
|
||||
self.disk_chunk_size = 4096
|
||||
conf = {
|
||||
'devices': testdir,
|
||||
'mount_check': 'false',
|
||||
}
|
||||
policy = POLICIES.default if policy is None else policy
|
||||
self._diskfile_router = diskfile.DiskFileRouter(conf, self.logger)
|
||||
self._diskfile_mgr = self._diskfile_router[policy]
|
||||
|
||||
|
||||
def write_diskfile(df, timestamp, data='test data', frag_index=None,
|
||||
@ -74,9 +55,18 @@ def write_diskfile(df, timestamp, data='test data', frag_index=None,
|
||||
|
||||
class BaseTest(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.device = 'dev'
|
||||
self.partition = '9'
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
# sender side setup
|
||||
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
|
||||
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
|
||||
self.daemon_conf = {
|
||||
'devices': self.tx_testdir,
|
||||
'mount_check': 'false',
|
||||
}
|
||||
# daemon will be set in subclass setUp
|
||||
self.daemon = None
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmpdir, ignore_errors=True)
|
||||
@ -90,7 +80,7 @@ class BaseTest(unittest.TestCase):
|
||||
object_parts = account, container, obj
|
||||
timestamp = Timestamp(time.time()) if timestamp is None else timestamp
|
||||
if df_mgr is None:
|
||||
df_mgr = self.daemon._diskfile_router[policy]
|
||||
df_mgr = self.daemon._df_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
device, partition, *object_parts, policy=policy,
|
||||
frag_index=frag_index)
|
||||
|
@ -862,7 +862,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
|
||||
self.suffixes = suffixes
|
||||
self.daemon = daemon
|
||||
self.job = job
|
||||
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
|
||||
hash_gen = self.daemon._df_router[job['policy']].yield_hashes(
|
||||
self.job['device'], self.job['partition'],
|
||||
self.job['policy'], self.suffixes,
|
||||
frag_index=self.job.get('frag_index'))
|
||||
|
@ -12,7 +12,7 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import collections
|
||||
import unittest
|
||||
import os
|
||||
import mock
|
||||
@ -26,10 +26,10 @@ from collections import defaultdict
|
||||
from errno import ENOENT, ENOTEMPTY, ENOTDIR
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from eventlet import Timeout, tpool
|
||||
from eventlet import Timeout
|
||||
|
||||
from test.unit import (debug_logger, patch_policies, make_timestamp_iter,
|
||||
mocked_http_conn)
|
||||
mocked_http_conn, FakeLogger)
|
||||
from swift.common import utils
|
||||
from swift.common.utils import (hash_path, mkdirs, normalize_timestamp,
|
||||
storage_directory)
|
||||
@ -1623,68 +1623,80 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
object_replicator.http_connect = was_connector
|
||||
|
||||
def test_run_once_recover_from_timeout(self):
|
||||
# verify that replicator will pass over all policies' partitions even
|
||||
# if a timeout occurs while replicating one partition to one node.
|
||||
timeouts = [Timeout()]
|
||||
|
||||
def fake_get_hashes(df_mgr, part_path, **kwargs):
|
||||
self.get_hash_count += 1
|
||||
# Simulate a REPLICATE timeout by raising Timeout for second call
|
||||
# to get_hashes (with recalculate suffixes) for a specific
|
||||
# partition
|
||||
if (timeouts and '/objects/' in part_path and
|
||||
part_path.endswith('0') and 'recalculate' in kwargs):
|
||||
raise timeouts.pop(0)
|
||||
return 1, {'abc': 'def'}
|
||||
|
||||
# map partition_path -> [nodes]
|
||||
sync_paths = collections.defaultdict(list)
|
||||
|
||||
def fake_sync(node, job, suffixes, *args, **kwargs):
|
||||
sync_paths[job['path']].append(node)
|
||||
return True, {}
|
||||
|
||||
conf = dict(swift_dir=self.testdir, devices=self.devices,
|
||||
bind_ips=_ips()[0],
|
||||
bind_ip=_ips()[0], # local dev has id=0
|
||||
mount_check='false', timeout='300', stats_interval='1')
|
||||
replicator = object_replicator.ObjectReplicator(conf)
|
||||
was_connector = object_replicator.http_connect
|
||||
was_get_hashes = object_replicator.DiskFileManager._get_hashes
|
||||
was_execute = tpool.execute
|
||||
self.get_hash_count = 0
|
||||
try:
|
||||
with mock.patch('swift.obj.diskfile.DiskFileManager._get_hashes',
|
||||
fake_get_hashes):
|
||||
with mock.patch('swift.obj.replicator.http_connect',
|
||||
mock_http_connect(200)):
|
||||
with mock.patch('swift.obj.replicator.dump_recon_cache'):
|
||||
replicator = object_replicator.ObjectReplicator(
|
||||
conf, logger=FakeLogger())
|
||||
|
||||
def fake_get_hashes(*args, **kwargs):
|
||||
self.get_hash_count += 1
|
||||
if self.get_hash_count == 3:
|
||||
# raise timeout on last call to get hashes
|
||||
raise Timeout()
|
||||
return 2, {'abc': 'def'}
|
||||
self.get_hash_count = 0
|
||||
with mock.patch.object(replicator, 'sync', fake_sync):
|
||||
replicator.run_once()
|
||||
|
||||
def fake_exc(tester, *args, **kwargs):
|
||||
if 'Error syncing partition timeout' in args[0]:
|
||||
tester.i_failed = True
|
||||
log_lines = replicator.logger.get_lines_for_level('error')
|
||||
self.assertIn("Error syncing with node:", log_lines[0])
|
||||
self.assertFalse(log_lines[1:])
|
||||
# setup creates 4 partitions; partition 1 does not map to local dev id
|
||||
# 0 so will be handled by update_delete(); partitions 0, 2, 3 are
|
||||
# handled by update() for each of two policies, so expect 6 paths to be
|
||||
# sync'd
|
||||
self.assertEqual(6, len(sync_paths))
|
||||
# partition 3 has 2 nodes in remote region, only first node is sync'd.
|
||||
# partition 0 in policy 0 has fake_get_hashes timeout before first
|
||||
# sync, so only second node is sync'd.
|
||||
# other partitions are sync'd to 2 nodes in same region.
|
||||
expected_node_count = { # map path_end -> expected sync node count
|
||||
'/objects/0': 1,
|
||||
'/objects/1': 2,
|
||||
'/objects/2': 2,
|
||||
'/objects/3': 1,
|
||||
'/objects-1/0': 2,
|
||||
'/objects-1/1': 2,
|
||||
'/objects-1/2': 2,
|
||||
'/objects-1/3': 1
|
||||
}
|
||||
for path, nodes in sync_paths.items():
|
||||
path_end = path[path.index('/objects'):]
|
||||
self.assertEqual(expected_node_count[path_end], len(nodes),
|
||||
'Expected %s but got %s for path %s' %
|
||||
(expected_node_count[path_end], len(nodes), path))
|
||||
# partitions 0 and 2 attempt 3 calls each per policy to get_hashes = 12
|
||||
# partitions 3 attempts 2 calls per policy to get_hashes = 4
|
||||
# partitions 1 dosn't get_hashes because of update_deleted
|
||||
self.assertEqual(16, self.get_hash_count)
|
||||
|
||||
self.i_failed = False
|
||||
object_replicator.http_connect = mock_http_connect(200)
|
||||
object_replicator.DiskFileManager._get_hashes = fake_get_hashes
|
||||
replicator.logger.exception = \
|
||||
lambda *args, **kwargs: fake_exc(self, *args, **kwargs)
|
||||
# Write some files into '1' and run replicate- they should be moved
|
||||
# to the other partitions and then node should get deleted.
|
||||
cur_part = '1'
|
||||
df = self.df_mgr.get_diskfile('sda', cur_part, 'a', 'c', 'o',
|
||||
policy=POLICIES.legacy)
|
||||
mkdirs(df._datadir)
|
||||
f = open(os.path.join(df._datadir,
|
||||
normalize_timestamp(time.time()) + '.data'),
|
||||
'wb')
|
||||
f.write('1234567890')
|
||||
f.close()
|
||||
ohash = hash_path('a', 'c', 'o')
|
||||
data_dir = ohash[-3:]
|
||||
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
|
||||
process_arg_checker = []
|
||||
ring = replicator.load_object_ring(POLICIES[0])
|
||||
nodes = [node for node in
|
||||
ring.get_part_nodes(int(cur_part))
|
||||
if node['ip'] not in _ips()]
|
||||
|
||||
for node in nodes:
|
||||
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'],
|
||||
cur_part)
|
||||
process_arg_checker.append(
|
||||
(0, '', ['rsync', whole_path_from, rsync_mod]))
|
||||
self.assertTrue(os.access(os.path.join(self.objects,
|
||||
'1', data_dir, ohash),
|
||||
os.F_OK))
|
||||
with _mock_process(process_arg_checker):
|
||||
replicator.run_once()
|
||||
self.assertFalse(process_errors)
|
||||
self.assertFalse(self.i_failed)
|
||||
finally:
|
||||
object_replicator.http_connect = was_connector
|
||||
object_replicator.DiskFileManager._get_hashes = was_get_hashes
|
||||
tpool.execute = was_execute
|
||||
# attempt to 16 times but succeeded only 15 times due to Timeout
|
||||
suffix_hashes = sum(
|
||||
count for (metric, count), _junk in
|
||||
replicator.logger.log_dict['update_stats']
|
||||
if metric == 'suffix.hashes')
|
||||
self.assertEqual(15, suffix_hashes)
|
||||
|
||||
def test_run(self):
|
||||
with _mock_process([(0, '')] * 100):
|
||||
@ -1737,7 +1749,8 @@ class TestObjectReplicator(unittest.TestCase):
|
||||
do_listdir_results = [False, False, True, False, True, False]
|
||||
mock_do_listdir.side_effect = do_listdir_results
|
||||
expected_tpool_calls = [
|
||||
mock.call(self.replicator._diskfile_mgr._get_hashes, job['path'],
|
||||
mock.call(self.replicator._df_router[job['policy']]._get_hashes,
|
||||
job['path'],
|
||||
do_listdir=do_listdir,
|
||||
reclaim_age=self.replicator.reclaim_age)
|
||||
for job, do_listdir in zip(jobs, do_listdir_results)
|
||||
|
@ -31,9 +31,10 @@ from swift.common.utils import Timestamp
|
||||
from swift.obj import ssync_sender, server
|
||||
from swift.obj.reconstructor import RebuildingECDiskFileStream, \
|
||||
ObjectReconstructor
|
||||
from swift.obj.replicator import ObjectReplicator
|
||||
|
||||
from test.unit import patch_policies, debug_logger, encode_frag_archive_bodies
|
||||
from test.unit.obj.common import BaseTest, FakeReplicator
|
||||
from test.unit.obj.common import BaseTest
|
||||
|
||||
|
||||
class TestBaseSsync(BaseTest):
|
||||
@ -46,13 +47,6 @@ class TestBaseSsync(BaseTest):
|
||||
"""
|
||||
def setUp(self):
|
||||
super(TestBaseSsync, self).setUp()
|
||||
self.device = 'dev'
|
||||
self.partition = '9'
|
||||
# sender side setup
|
||||
self.tx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
|
||||
utils.mkdirs(os.path.join(self.tx_testdir, self.device))
|
||||
self.daemon = FakeReplicator(self.tx_testdir)
|
||||
|
||||
# rx side setup
|
||||
self.rx_testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_receiver')
|
||||
utils.mkdirs(os.path.join(self.rx_testdir, self.device))
|
||||
@ -142,7 +136,7 @@ class TestBaseSsync(BaseTest):
|
||||
return diskfiles
|
||||
|
||||
def _open_tx_diskfile(self, obj_name, policy, frag_index=None):
|
||||
df_mgr = self.daemon._diskfile_router[policy]
|
||||
df_mgr = self.daemon._df_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
self.device, self.partition, account='a', container='c',
|
||||
obj=obj_name, policy=policy, frag_index=frag_index)
|
||||
@ -310,6 +304,8 @@ class TestBaseSsyncEC(TestBaseSsync):
|
||||
def setUp(self):
|
||||
super(TestBaseSsyncEC, self).setUp()
|
||||
self.policy = POLICIES.default
|
||||
self.daemon = ObjectReconstructor(self.daemon_conf,
|
||||
debug_logger('test-ssync-sender'))
|
||||
|
||||
def _get_object_data(self, path, frag_index=None, **kwargs):
|
||||
# return a frag archive for given object name and frag index.
|
||||
@ -337,7 +333,7 @@ class TestSsyncEC(TestBaseSsyncEC):
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 has primary and handoff fragment archives
|
||||
t1 = next(self.ts_iter)
|
||||
@ -421,7 +417,7 @@ class TestSsyncEC(TestBaseSsyncEC):
|
||||
# create sender side diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
expected_subreqs = defaultdict(list)
|
||||
@ -531,7 +527,7 @@ class TestSsyncEC(TestBaseSsyncEC):
|
||||
tx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 only has primary
|
||||
t1 = next(self.ts_iter)
|
||||
@ -631,7 +627,7 @@ class TestSsyncEC(TestBaseSsyncEC):
|
||||
|
||||
def test_send_with_frag_index_none(self):
|
||||
policy = POLICIES.default
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# create an ec fragment on the remote node
|
||||
ts1 = next(self.ts_iter)
|
||||
@ -692,7 +688,7 @@ class TestSsyncEC(TestBaseSsyncEC):
|
||||
# create non durable tx obj by not committing, then create a legacy
|
||||
# .durable file
|
||||
tx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
t1 = next(self.ts_iter)
|
||||
tx_objs['o1'] = self._create_ondisk_files(
|
||||
@ -791,7 +787,7 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
|
||||
# create sender side diskfiles...
|
||||
self.tx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[self.policy]
|
||||
tx_df_mgr = self.daemon._df_router[self.policy]
|
||||
t1 = next(self.ts_iter)
|
||||
self.tx_objs['o1'] = self._create_ondisk_files(
|
||||
tx_df_mgr, 'o1', self.policy, t1, (self.tx_node_index,))
|
||||
@ -1073,6 +1069,11 @@ class TestSsyncECReconstructorSyncJob(TestBaseSsyncEC):
|
||||
|
||||
@patch_policies
|
||||
class TestSsyncReplication(TestBaseSsync):
|
||||
def setUp(self):
|
||||
super(TestSsyncReplication, self).setUp()
|
||||
self.daemon = ObjectReplicator(self.daemon_conf,
|
||||
debug_logger('test-ssync-sender'))
|
||||
|
||||
def test_sync(self):
|
||||
policy = POLICIES.default
|
||||
rx_node_index = 0
|
||||
@ -1082,7 +1083,7 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
# o1 and o2 are on tx only
|
||||
t1 = next(self.ts_iter)
|
||||
@ -1204,7 +1205,7 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
rx_objs = {}
|
||||
tx_tombstones = {}
|
||||
rx_tombstones = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
expected_subreqs = defaultdict(list)
|
||||
@ -1349,7 +1350,7 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
rx_node_index = 0
|
||||
|
||||
# create diskfiles...
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
# rx has data at t1 but no meta
|
||||
@ -1434,7 +1435,7 @@ class TestSsyncReplication(TestBaseSsync):
|
||||
# create diskfiles...
|
||||
tx_objs = {}
|
||||
rx_objs = {}
|
||||
tx_df_mgr = self.daemon._diskfile_router[policy]
|
||||
tx_df_mgr = self.daemon._df_router[policy]
|
||||
rx_df_mgr = self.rx_controller._diskfile_router[policy]
|
||||
|
||||
expected_subreqs = defaultdict(list)
|
||||
|
@ -24,9 +24,10 @@ from swift.common import exceptions, utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import Timestamp
|
||||
from swift.obj import ssync_sender, diskfile, ssync_receiver
|
||||
from swift.obj.replicator import ObjectReplicator
|
||||
|
||||
from test.unit import patch_policies, make_timestamp_iter
|
||||
from test.unit.obj.common import FakeReplicator, BaseTest
|
||||
from test.unit import patch_policies, make_timestamp_iter, debug_logger
|
||||
from test.unit.obj.common import BaseTest
|
||||
|
||||
|
||||
class NullBufferedHTTPConnection(object):
|
||||
@ -84,10 +85,10 @@ class TestSender(BaseTest):
|
||||
|
||||
def setUp(self):
|
||||
super(TestSender, self).setUp()
|
||||
self.testdir = os.path.join(self.tmpdir, 'tmp_test_ssync_sender')
|
||||
utils.mkdirs(os.path.join(self.testdir, 'dev'))
|
||||
self.daemon = FakeReplicator(self.testdir)
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, None, None)
|
||||
self.daemon = ObjectReplicator(self.daemon_conf,
|
||||
debug_logger('test-ssync-sender'))
|
||||
job = {'policy': POLICIES.legacy} # sufficient for Sender.__init__
|
||||
self.sender = ssync_sender.Sender(self.daemon, None, job, None)
|
||||
|
||||
def test_call_catches_MessageTimeout(self):
|
||||
|
||||
@ -146,8 +147,7 @@ class TestSender(BaseTest):
|
||||
'1.2.3.4:5678/sda1/9 EXCEPTION in ssync.Sender:'))
|
||||
|
||||
def test_call_catches_exception_handling_exception(self):
|
||||
job = node = None # Will cause inside exception handler to fail
|
||||
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
|
||||
self.sender.node = None # Will cause inside exception handler to fail
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.connect = 'cause exception'
|
||||
success, candidates = self.sender()
|
||||
@ -459,7 +459,7 @@ class TestSender(BaseTest):
|
||||
':UPDATES: START\r\n'
|
||||
':UPDATES: END\r\n'
|
||||
))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
df = mock.MagicMock()
|
||||
df.content_length = 0
|
||||
@ -505,7 +505,7 @@ class TestSender(BaseTest):
|
||||
':MISSING_CHECK: START\r\n'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
@ -541,7 +541,7 @@ class TestSender(BaseTest):
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
@ -578,7 +578,7 @@ class TestSender(BaseTest):
|
||||
':MISSING_CHECK: START\r\n'
|
||||
'9d41d8cd98f00b204e9800998ecf0abc d\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.connect = mock.MagicMock()
|
||||
self.sender.updates = mock.MagicMock()
|
||||
self.sender.disconnect = mock.MagicMock()
|
||||
@ -743,7 +743,7 @@ class TestSender(BaseTest):
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.missing_check()
|
||||
self.assertEqual(
|
||||
''.join(self.sender.connection.sent),
|
||||
@ -791,7 +791,7 @@ class TestSender(BaseTest):
|
||||
chunk_body=(
|
||||
':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.missing_check()
|
||||
self.assertEqual(
|
||||
''.join(self.sender.connection.sent),
|
||||
@ -836,7 +836,7 @@ class TestSender(BaseTest):
|
||||
'policy': POLICIES.legacy,
|
||||
}
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.response = FakeResponse(chunk_body='\r\n')
|
||||
exc = None
|
||||
try:
|
||||
@ -875,7 +875,7 @@ class TestSender(BaseTest):
|
||||
'policy': POLICIES.legacy,
|
||||
}
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.response = FakeResponse(
|
||||
chunk_body=':MISSING_CHECK: START\r\n')
|
||||
exc = None
|
||||
@ -915,7 +915,7 @@ class TestSender(BaseTest):
|
||||
'policy': POLICIES.legacy,
|
||||
}
|
||||
self.sender.suffixes = ['abc']
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.response = FakeResponse(chunk_body='OH HAI\r\n')
|
||||
exc = None
|
||||
try:
|
||||
@ -959,7 +959,7 @@ class TestSender(BaseTest):
|
||||
':MISSING_CHECK: START\r\n'
|
||||
'0123abc dm\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.missing_check()
|
||||
self.assertEqual(
|
||||
''.join(self.sender.connection.sent),
|
||||
@ -1001,7 +1001,7 @@ class TestSender(BaseTest):
|
||||
':MISSING_CHECK: START\r\n'
|
||||
'0123abc d extra response parts\r\n'
|
||||
':MISSING_CHECK: END\r\n'))
|
||||
self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes
|
||||
self.sender.df_mgr.yield_hashes = yield_hashes
|
||||
self.sender.missing_check()
|
||||
self.assertEqual(self.sender.send_map,
|
||||
{'0123abc': {'data': True}})
|
||||
@ -1307,7 +1307,7 @@ class TestSender(BaseTest):
|
||||
self.assertEqual(path, '/a/c/o')
|
||||
self.assertTrue(isinstance(df, diskfile.DiskFile))
|
||||
self.assertEqual(expected, df.get_metadata())
|
||||
self.assertEqual(os.path.join(self.testdir, 'dev/objects/9/',
|
||||
self.assertEqual(os.path.join(self.tx_testdir, 'dev/objects/9/',
|
||||
object_hash[-3:], object_hash),
|
||||
df._datadir)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user