diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index e6a3541258..020bfc95cd 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -14,7 +14,8 @@ # limitations under the License. import os -from os.path import isdir, isfile, join +import errno +from os.path import isdir, isfile, join, dirname import random import shutil import time @@ -31,7 +32,7 @@ from swift.common.ring.utils import is_local_device from swift.common.utils import whataremyips, unlink_older_than, \ compute_eta, get_logger, dump_recon_cache, ismount, \ rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \ - tpool_reraise, config_auto_int_value + tpool_reraise, config_auto_int_value, storage_directory from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE @@ -95,7 +96,8 @@ class ObjectReplicator(Daemon): conf.get('handoff_delete', 'auto'), 0) self._diskfile_mgr = DiskFileManager(conf, self.logger) - def sync(self, node, job, suffixes): # Just exists for doc anchor point + # Just exists for doc anchor point + def sync(self, node, job, suffixes, *args, **kwargs): """ Synchronize local suffix directories from a partition with a remote node. @@ -106,7 +108,7 @@ class ObjectReplicator(Daemon): :returns: boolean indicating success or failure """ - return self.sync_method(node, job, suffixes) + return self.sync_method(node, job, suffixes, *args, **kwargs) def get_object_ring(self, policy_idx): """ @@ -168,7 +170,7 @@ class ObjectReplicator(Daemon): sync method in Swift. """ if not os.path.exists(job['path']): - return False + return False, set() args = [ 'rsync', '--recursive', @@ -193,14 +195,15 @@ class ObjectReplicator(Daemon): args.append(spath) had_any = True if not had_any: - return False + return False, set() data_dir = get_data_dir(job['policy_idx']) args.append(join(rsync_module, node['device'], data_dir, job['partition'])) - return self._rsync(args) == 0 + return self._rsync(args) == 0, set() - def ssync(self, node, job, suffixes): - return ssync_sender.Sender(self, node, job, suffixes)() + def ssync(self, node, job, suffixes, remote_check_objs=None): + return ssync_sender.Sender( + self, node, job, suffixes, remote_check_objs)() def check_ring(self, object_ring): """ @@ -233,9 +236,18 @@ class ObjectReplicator(Daemon): try: responses = [] suffixes = tpool.execute(tpool_get_suffixes, job['path']) + synced_remote_regions = {} + delete_objs = None if suffixes: for node in job['nodes']: - success = self.sync(node, job, suffixes) + kwargs = {} + if node['region'] in synced_remote_regions and \ + self.conf.get('sync_method') == 'ssync': + kwargs['remote_check_objs'] = \ + synced_remote_regions[node['region']] + # cand_objs is a list of objects for deletion + success, cand_objs = self.sync( + node, job, suffixes, **kwargs) if success: with Timeout(self.http_timeout): conn = http_connect( @@ -244,7 +256,14 @@ class ObjectReplicator(Daemon): node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() + if node['region'] != job['region'] and cand_objs: + synced_remote_regions[node['region']] = cand_objs responses.append(success) + for region, cand_objs in synced_remote_regions.iteritems(): + if delete_objs is None: + delete_objs = cand_objs + else: + delete_objs = delete_objs.intersection(cand_objs) if self.handoff_delete: # delete handoff if we have had handoff_delete successes delete_handoff = len([resp for resp in responses if resp]) >= \ @@ -254,14 +273,37 @@ class ObjectReplicator(Daemon): delete_handoff = len(responses) == len(job['nodes']) and \ all(responses) if not suffixes or delete_handoff: - self.logger.info(_("Removing partition: %s"), job['path']) - tpool.execute(shutil.rmtree, job['path'], ignore_errors=True) + if delete_objs: + self.logger.info(_("Removing %s objecs"), + len(delete_objs)) + delete_objs = [ + storage_directory(job['obj_path'], + job['partition'], + object_hash) + for object_hash in delete_objs] + self.delete_handoff_paths(delete_objs) + else: + self.logger.info(_("Removing partition: %s"), job['path']) + tpool.execute( + shutil.rmtree, job['path'], ignore_errors=True) except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) finally: self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin) + def delete_handoff_paths(self, paths): + for object_path in paths: + tpool.execute(shutil.rmtree, object_path, ignore_errors=True) + suffix_dir = dirname(object_path) + try: + os.rmdir(suffix_dir) + except OSError as e: + if e.errno not in (errno.ENOENT, errno.ENOTEMPTY): + self.logger.exception( + "Unexpected error trying to cleanup suffix dir:%r", + suffix_dir) + def update(self, job): """ High-level method that replicates a single partition. @@ -280,6 +322,8 @@ class ObjectReplicator(Daemon): self.suffix_hash += hashed self.logger.update_stats('suffix.hashes', hashed) attempts_left = len(job['nodes']) + synced_remote_regions = set() + random.shuffle(job['nodes']) nodes = itertools.chain( job['nodes'], job['object_ring'].get_more_nodes(int(job['partition']))) @@ -287,6 +331,10 @@ class ObjectReplicator(Daemon): # If this throws StopIteration it will be caught way below node = next(nodes) attempts_left -= 1 + # if we have already synced to this remote region, + # don't sync again on this replication pass + if node['region'] in synced_remote_regions: + continue try: with Timeout(self.http_timeout): resp = http_connect( @@ -320,7 +368,7 @@ class ObjectReplicator(Daemon): suffixes = [suffix for suffix in local_hash if local_hash[suffix] != remote_hash.get(suffix, -1)] - self.sync(node, job, suffixes) + success, _junk = self.sync(node, job, suffixes) with Timeout(self.http_timeout): conn = http_connect( node['replication_ip'], node['replication_port'], @@ -328,6 +376,9 @@ class ObjectReplicator(Daemon): '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() + # add only remote region when replicate succeeded + if success and node['region'] != job['region']: + synced_remote_regions.add(node['region']) self.suffix_sync += len(suffixes) self.logger.update_stats('suffix.syncs', len(suffixes)) except (Exception, Timeout): @@ -450,11 +501,13 @@ class ObjectReplicator(Daemon): jobs.append( dict(path=job_path, device=local_dev['device'], + obj_path=obj_path, nodes=nodes, delete=len(nodes) > len(part_nodes) - 1, policy_idx=policy.idx, partition=partition, - object_ring=obj_ring)) + object_ring=obj_ring, + region=local_dev['region'])) except ValueError: continue return jobs diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index bbcb0f3bdb..7496189ad8 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -14,6 +14,7 @@ # limitations under the License. import urllib +from itertools import ifilter from swift.common import bufferedhttp from swift.common import exceptions from swift.common import http @@ -28,7 +29,7 @@ class Sender(object): process is there. """ - def __init__(self, daemon, node, job, suffixes): + def __init__(self, daemon, node, job, suffixes, remote_check_objs=None): self.daemon = daemon self.node = node self.job = job @@ -37,7 +38,11 @@ class Sender(object): self.response = None self.response_buffer = '' self.response_chunk_left = 0 - self.send_list = None + self.available_set = set() + # When remote_check_objs is given in job, ssync_sender trys only to + # make sure those objects exist or not in remote. + self.remote_check_objs = remote_check_objs + self.send_list = [] self.failures = 0 @property @@ -45,8 +50,16 @@ class Sender(object): return int(self.job.get('policy_idx', 0)) def __call__(self): + """ + Perform ssync with remote node. + + :returns: a 2-tuple, in the form (success, can_delete_objs). + + Success is a boolean, and can_delete_objs is an iterable of strings + representing the hashes which are in sync with the remote node. + """ if not self.suffixes: - return True + return True, set() try: # Double try blocks in case our main error handler fails. try: @@ -57,9 +70,20 @@ class Sender(object): # other exceptions will be logged with a full stack trace. self.connect() self.missing_check() - self.updates() + if not self.remote_check_objs: + self.updates() + can_delete_obj = self.available_set + else: + # when we are initialized with remote_check_objs we don't + # *send* any requested updates; instead we only collect + # what's already in sync and safe for deletion + can_delete_obj = self.available_set.difference( + self.send_list) self.disconnect() - return self.failures == 0 + if not self.failures: + return True, can_delete_obj + else: + return False, set() except (exceptions.MessageTimeout, exceptions.ReplicationException) as err: self.daemon.logger.error( @@ -85,7 +109,7 @@ class Sender(object): # would only get called if the above except Exception handler # failed (bad node or job data). self.daemon.logger.exception('EXCEPTION in replication.Sender') - return False + return False, set() def connect(self): """ @@ -96,7 +120,7 @@ class Sender(object): self.daemon.conn_timeout, 'connect send'): self.connection = bufferedhttp.BufferedHTTPConnection( '%s:%s' % (self.node['replication_ip'], - self.node['replication_port'])) + self.node['replication_port'])) self.connection.putrequest('REPLICATION', '/%s/%s' % ( self.node['device'], self.job['partition'])) self.connection.putheader('Transfer-Encoding', 'chunked') @@ -169,10 +193,14 @@ class Sender(object): self.daemon.node_timeout, 'missing_check start'): msg = ':MISSING_CHECK: START\r\n' self.connection.send('%x\r\n%s\r\n' % (len(msg), msg)) - for path, object_hash, timestamp in \ - self.daemon._diskfile_mgr.yield_hashes( - self.job['device'], self.job['partition'], - self.policy_idx, self.suffixes): + hash_gen = self.daemon._diskfile_mgr.yield_hashes( + self.job['device'], self.job['partition'], + self.policy_idx, self.suffixes) + if self.remote_check_objs: + hash_gen = ifilter(lambda (path, object_hash, timestamp): + object_hash in self.remote_check_objs, hash_gen) + for path, object_hash, timestamp in hash_gen: + self.available_set.add(object_hash) with exceptions.MessageTimeout( self.daemon.node_timeout, 'missing_check send line'): @@ -197,7 +225,6 @@ class Sender(object): elif line: raise exceptions.ReplicationException( 'Unexpected response: %r' % line[:1024]) - self.send_list = [] while True: with exceptions.MessageTimeout( self.daemon.http_timeout, 'missing_check line wait'): diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index c937787244..0417194cd4 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -22,13 +22,15 @@ import cPickle as pickle import time import tempfile from contextlib import contextmanager, closing +from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess from eventlet import Timeout, tpool from test.unit import FakeLogger, patch_policies from swift.common import utils -from swift.common.utils import hash_path, mkdirs, normalize_timestamp +from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ + storage_directory from swift.common import ring from swift.obj import diskfile, replicator as object_replicator from swift.common.storage_policy import StoragePolicy, POLICIES @@ -84,9 +86,20 @@ class MockProcess(object): def __init__(self, *args, **kwargs): targs = MockProcess.check_args.next() for targ in targs: - if targ not in args[0]: - process_errors.append("Invalid: %s not in %s" % (targ, - args)) + # Allow more than 2 candidate targs + # (e.g. a case that either node is fine when nodes shuffled) + if isinstance(targ, tuple): + allowed = False + for target in targ: + if target in args[0]: + allowed = True + if not allowed: + process_errors.append("Invalid: %s not in %s" % (targ, + args)) + else: + if targ not in args[0]: + process_errors.append("Invalid: %s not in %s" % (targ, + args)) self.stdout = self.Stream() def wait(self): @@ -112,14 +125,19 @@ def _create_test_rings(path): [2, 3, 0, 1, 6, 4, 5], ] intended_devs = [ - {'id': 0, 'device': 'sda', 'zone': 0, 'ip': '127.0.0.0', 'port': 6000}, - {'id': 1, 'device': 'sda', 'zone': 1, 'ip': '127.0.0.1', 'port': 6000}, - {'id': 2, 'device': 'sda', 'zone': 2, 'ip': '127.0.0.2', 'port': 6000}, - {'id': 3, 'device': 'sda', 'zone': 4, 'ip': '127.0.0.3', 'port': 6000}, - {'id': 4, 'device': 'sda', 'zone': 5, 'ip': '127.0.0.4', 'port': 6000}, + {'id': 0, 'device': 'sda', 'zone': 0, + 'region': 1, 'ip': '127.0.0.0', 'port': 6000}, + {'id': 1, 'device': 'sda', 'zone': 1, + 'region': 2, 'ip': '127.0.0.1', 'port': 6000}, + {'id': 2, 'device': 'sda', 'zone': 2, + 'region': 1, 'ip': '127.0.0.2', 'port': 6000}, + {'id': 3, 'device': 'sda', 'zone': 4, + 'region': 2, 'ip': '127.0.0.3', 'port': 6000}, + {'id': 4, 'device': 'sda', 'zone': 5, + 'region': 1, 'ip': '127.0.0.4', 'port': 6000}, {'id': 5, 'device': 'sda', 'zone': 6, - 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000}, - {'id': 6, 'device': 'sda', 'zone': 7, + 'region': 2, 'ip': 'fe80::202:b3ff:fe1e:8329', 'port': 6000}, + {'id': 6, 'device': 'sda', 'zone': 7, 'region': 1, 'ip': '2001:0db8:85a3:0000:0000:8a2e:0370:7334', 'port': 6000}, ] intended_part_shift = 30 @@ -200,10 +218,11 @@ class TestObjectReplicator(unittest.TestCase): nodes = [node for node in ring.get_part_nodes(int(cur_part)) if node['ip'] not in _ips()] + rsync_mods = tuple(['%s::object/sda/objects/%s' % + (node['ip'], cur_part) for node in nodes]) for node in nodes: - rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) process_arg_checker.append( - (0, '', ['rsync', whole_path_from, rsync_mod])) + (0, '', ['rsync', whole_path_from, rsync_mods])) with _mock_process(process_arg_checker): replicator.run_once() self.assertFalse(process_errors) @@ -233,10 +252,11 @@ class TestObjectReplicator(unittest.TestCase): nodes = [node for node in ring.get_part_nodes(int(cur_part)) if node['ip'] not in _ips()] + rsync_mods = tuple(['%s::object/sda/objects-1/%s' % + (node['ip'], cur_part) for node in nodes]) for node in nodes: - rsync_mod = '%s::object/sda/objects-1/%s' % (node['ip'], cur_part) process_arg_checker.append( - (0, '', ['rsync', whole_path_from, rsync_mod])) + (0, '', ['rsync', whole_path_from, rsync_mods])) with _mock_process(process_arg_checker): replicator.run_once() self.assertFalse(process_errors) @@ -530,6 +550,40 @@ class TestObjectReplicator(unittest.TestCase): # The file should still exist self.assertTrue(os.access(part_path, os.F_OK)) + def test_delete_partition_with_handoff_delete_fail_in_other_region(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o') + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '1', data_dir) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + ring = self.replicator.get_object_ring(0) + nodes = [node for node in + ring.get_part_nodes(1) + if node['ip'] not in _ips()] + process_arg_checker = [] + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1) + if node['region'] != 1: + # the rsync calls for other region to fail + ret_code = 1 + else: + ret_code = 0 + process_arg_checker.append( + (ret_code, '', ['rsync', whole_path_from, rsync_mod])) + with _mock_process(process_arg_checker): + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(part_path, os.F_OK)) + def test_delete_partition_override_params(self): df = self.df_mgr.get_diskfile('sda', '0', 'a', 'c', 'o') mkdirs(df._datadir) @@ -564,6 +618,190 @@ class TestObjectReplicator(unittest.TestCase): self.assertFalse(os.access(pol1_part_path, os.F_OK)) self.assertTrue(os.access(pol0_part_path, os.F_OK)) + def test_delete_partition_ssync(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o') + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('0') + f.close() + ohash = hash_path('a', 'c', 'o') + whole_path_from = storage_directory(self.objects, 1, ohash) + suffix_dir_path = os.path.dirname(whole_path_from) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + + self.call_nums = 0 + self.conf['sync_method'] = 'ssync' + + def _fake_ssync(node, job, suffixes, **kwargs): + success = True + ret_val = [whole_path_from] + if self.call_nums == 2: + # ssync should return (True, []) only when the second + # candidate node has not get the replica yet. + success = False + ret_val = [] + self.call_nums += 1 + return success, set(ret_val) + + self.replicator.sync_method = _fake_ssync + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + # The file should be deleted at the second replicate call + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertFalse(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + # The partition should be deleted at the third replicate call + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertFalse(os.access(suffix_dir_path, os.F_OK)) + self.assertFalse(os.access(part_path, os.F_OK)) + del self.call_nums + + def test_delete_partition_ssync_with_sync_failure(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o') + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('0') + f.close() + ohash = hash_path('a', 'c', 'o') + whole_path_from = storage_directory(self.objects, 1, ohash) + suffix_dir_path = os.path.dirname(whole_path_from) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + self.call_nums = 0 + self.conf['sync_method'] = 'ssync' + + def _fake_ssync(node, job, suffixes): + success = False + ret_val = [] + if self.call_nums == 2: + # ssync should return (True, []) only when the second + # candidate node has not get the replica yet. + success = True + ret_val = [whole_path_from] + self.call_nums += 1 + return success, set(ret_val) + + self.replicator.sync_method = _fake_ssync + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + del self.call_nums + + def test_delete_partition_ssync_with_cleanup_failure(self): + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + self.replicator.logger = mock_logger = mock.MagicMock() + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o') + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('0') + f.close() + ohash = hash_path('a', 'c', 'o') + whole_path_from = storage_directory(self.objects, 1, ohash) + suffix_dir_path = os.path.dirname(whole_path_from) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + + self.call_nums = 0 + self.conf['sync_method'] = 'ssync' + + def _fake_ssync(node, job, suffixes, **kwargs): + success = True + ret_val = [whole_path_from] + if self.call_nums == 2: + # ssync should return (True, []) only when the second + # candidate node has not get the replica yet. + success = False + ret_val = [] + self.call_nums += 1 + return success, set(ret_val) + + rmdir_func = os.rmdir + + def raise_exception_rmdir(exception_class, error_no): + instance = exception_class() + instance.errno = error_no + + def func(directory): + if directory == suffix_dir_path: + raise instance + else: + rmdir_func(directory) + + return func + + self.replicator.sync_method = _fake_ssync + self.replicator.replicate() + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + + # Fail with ENOENT + with mock.patch('os.rmdir', + raise_exception_rmdir(OSError, ENOENT)): + self.replicator.replicate() + self.assertEquals(mock_logger.exception.call_count, 0) + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + + # Fail with ENOTEMPTY + with mock.patch('os.rmdir', + raise_exception_rmdir(OSError, ENOTEMPTY)): + self.replicator.replicate() + self.assertEquals(mock_logger.exception.call_count, 0) + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + + # Fail with ENOTDIR + with mock.patch('os.rmdir', + raise_exception_rmdir(OSError, ENOTDIR)): + self.replicator.replicate() + self.assertEquals(mock_logger.exception.call_count, 1) + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + + # Finally we can cleanup everything + self.replicator.replicate() + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertFalse(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + self.assertFalse(os.access(whole_path_from, os.F_OK)) + self.assertFalse(os.access(suffix_dir_path, os.F_OK)) + self.assertFalse(os.access(part_path, os.F_OK)) + def test_run_once_recover_from_failure(self): conf = dict(swift_dir=self.testdir, devices=self.devices, mount_check='false', timeout='300', stats_interval='1') @@ -781,7 +1019,8 @@ class TestObjectReplicator(unittest.TestCase): resp.read.return_value = pickle.dumps({'a83': 'c130a2c17ed45102a' 'ada0f4eee69494ff'}) set_default(self) - self.replicator.sync = fake_func = mock.MagicMock() + self.replicator.sync = fake_func = \ + mock.MagicMock(return_value=(True, [])) self.replicator.update(local_job) reqs = [] for node in local_job['nodes']: @@ -792,6 +1031,26 @@ class TestObjectReplicator(unittest.TestCase): self.assertEquals(self.replicator.suffix_sync, 2) self.assertEquals(self.replicator.suffix_hash, 1) self.assertEquals(self.replicator.suffix_count, 1) + + # Efficient Replication Case + set_default(self) + self.replicator.sync = fake_func = \ + mock.MagicMock(return_value=(True, [])) + all_jobs = self.replicator.collect_jobs() + job = None + for tmp in all_jobs: + if tmp['partition'] == '3': + job = tmp + break + # The candidate nodes to replicate (i.e. dev1 and dev3) + # belong to another region + self.replicator.update(job) + self.assertEquals(fake_func.call_count, 1) + self.assertEquals(self.replicator.replication_count, 1) + self.assertEquals(self.replicator.suffix_sync, 1) + self.assertEquals(self.replicator.suffix_hash, 1) + self.assertEquals(self.replicator.suffix_count, 1) + mock_http.reset_mock() mock_logger.reset_mock() diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 4623308a70..d4538b454d 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -137,7 +137,9 @@ class TestSender(unittest.TestCase): job = dict(partition='9') self.sender = ssync_sender.Sender(self.replicator, node, job, None) self.sender.suffixes = ['abc'] - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) call = self.replicator.logger.error.mock_calls[0] self.assertEqual( call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) @@ -154,7 +156,9 @@ class TestSender(unittest.TestCase): job = dict(partition='9') self.sender = ssync_sender.Sender(self.replicator, node, job, None) self.sender.suffixes = ['abc'] - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) call = self.replicator.logger.error.mock_calls[0] self.assertEqual( call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) @@ -167,7 +171,9 @@ class TestSender(unittest.TestCase): self.sender = ssync_sender.Sender(self.replicator, node, job, None) self.sender.suffixes = ['abc'] self.sender.connect = 'cause exception' - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) call = self.replicator.logger.exception.mock_calls[0] self.assertEqual( call[1], @@ -181,7 +187,9 @@ class TestSender(unittest.TestCase): self.sender = ssync_sender.Sender(self.replicator, node, job, None) self.sender.suffixes = ['abc'] self.sender.connect = 'cause exception' - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) self.replicator.logger.exception.assert_called_once_with( 'EXCEPTION in replication.Sender') @@ -191,7 +199,9 @@ class TestSender(unittest.TestCase): self.sender.missing_check = mock.MagicMock() self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() - self.assertTrue(self.sender()) + success, candidates = self.sender() + self.assertTrue(success) + self.assertEquals(candidates, set()) self.sender.connect.assert_called_once_with() self.sender.missing_check.assert_called_once_with() self.sender.updates.assert_called_once_with() @@ -204,7 +214,9 @@ class TestSender(unittest.TestCase): self.sender.updates = mock.MagicMock() self.sender.disconnect = mock.MagicMock() self.sender.failures = 1 - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) self.sender.connect.assert_called_once_with() self.sender.missing_check.assert_called_once_with() self.sender.updates.assert_called_once_with() @@ -243,6 +255,94 @@ class TestSender(unittest.TestCase): method_name, mock_method.mock_calls, expected_calls)) + def test_call_and_missing_check(self): + def yield_hashes(device, partition, policy_index, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc'] \ + and policy_index == 0: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + + self.sender.connection = FakeConnection() + self.sender.job = {'device': 'dev', 'partition': '9'} + self.sender.suffixes = ['abc'] + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + '9d41d8cd98f00b204e9800998ecf0abc\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.connect = mock.MagicMock() + self.sender.updates = mock.MagicMock() + self.sender.disconnect = mock.MagicMock() + success, candidates = self.sender() + self.assertTrue(success) + self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(self.sender.failures, 0) + + def test_call_and_missing_check_with_obj_list(self): + def yield_hashes(device, partition, policy_index, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc'] \ + and policy_index == 0: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + job = {'device': 'dev', 'partition': '9'} + self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'], + ['9d41d8cd98f00b204e9800998ecf0abc']) + self.sender.connection = FakeConnection() + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.connect = mock.MagicMock() + self.sender.updates = mock.MagicMock() + self.sender.disconnect = mock.MagicMock() + success, candidates = self.sender() + self.assertTrue(success) + self.assertEqual(candidates, set(['9d41d8cd98f00b204e9800998ecf0abc'])) + self.assertEqual(self.sender.failures, 0) + + def test_call_and_missing_check_with_obj_list_but_required(self): + def yield_hashes(device, partition, policy_index, suffixes=None): + if device == 'dev' and partition == '9' and suffixes == ['abc'] \ + and policy_index == 0: + yield ( + '/srv/node/dev/objects/9/abc/' + '9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0abc', + '1380144470.00000') + else: + raise Exception( + 'No match for %r %r %r' % (device, partition, suffixes)) + job = {'device': 'dev', 'partition': '9'} + self.sender = ssync_sender.Sender(self.replicator, None, job, ['abc'], + ['9d41d8cd98f00b204e9800998ecf0abc']) + self.sender.connection = FakeConnection() + self.sender.response = FakeResponse( + chunk_body=( + ':MISSING_CHECK: START\r\n' + '9d41d8cd98f00b204e9800998ecf0abc\r\n' + ':MISSING_CHECK: END\r\n')) + self.sender.daemon._diskfile_mgr.yield_hashes = yield_hashes + self.sender.connect = mock.MagicMock() + self.sender.updates = mock.MagicMock() + self.sender.disconnect = mock.MagicMock() + success, candidates = self.sender() + self.assertTrue(success) + self.assertEqual(candidates, set()) + def test_connect_send_timeout(self): self.replicator.conn_timeout = 0.01 node = dict(replication_ip='1.2.3.4', replication_port=5678, @@ -257,7 +357,9 @@ class TestSender(unittest.TestCase): with mock.patch.object( ssync_sender.bufferedhttp.BufferedHTTPConnection, 'putrequest', putrequest): - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) call = self.replicator.logger.error.mock_calls[0] self.assertEqual( call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) @@ -279,7 +381,9 @@ class TestSender(unittest.TestCase): with mock.patch.object( ssync_sender.bufferedhttp, 'BufferedHTTPConnection', FakeBufferedHTTPConnection): - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) call = self.replicator.logger.error.mock_calls[0] self.assertEqual( call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) @@ -302,7 +406,9 @@ class TestSender(unittest.TestCase): with mock.patch.object( ssync_sender.bufferedhttp, 'BufferedHTTPConnection', FakeBufferedHTTPConnection): - self.assertFalse(self.sender()) + success, candidates = self.sender() + self.assertFalse(success) + self.assertEquals(candidates, set()) call = self.replicator.logger.error.mock_calls[0] self.assertEqual( call[1][:-1], ('%s:%s/%s/%s %s', '1.2.3.4', 5678, 'sda1', '9')) @@ -389,6 +495,7 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(self.sender.send_list, []) + self.assertEqual(self.sender.available_set, set()) def test_missing_check_has_suffixes(self): def yield_hashes(device, partition, policy_idx, suffixes=None): @@ -431,6 +538,10 @@ class TestSender(unittest.TestCase): '33\r\n9d41d8cd98f00b204e9800998ecf1def 1380144474.44444\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(self.sender.send_list, []) + candidates = ['9d41d8cd98f00b204e9800998ecf0abc', + '9d41d8cd98f00b204e9800998ecf0def', + '9d41d8cd98f00b204e9800998ecf1def'] + self.assertEqual(self.sender.available_set, set(candidates)) def test_missing_check_far_end_disconnect(self): def yield_hashes(device, partition, policy_idx, suffixes=None): @@ -462,6 +573,8 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(self.sender.available_set, + set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_missing_check_far_end_disconnect2(self): def yield_hashes(device, partition, policy_idx, suffixes=None): @@ -494,6 +607,8 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(self.sender.available_set, + set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_missing_check_far_end_unexpected(self): def yield_hashes(device, partition, policy_idx, suffixes=None): @@ -525,6 +640,8 @@ class TestSender(unittest.TestCase): '17\r\n:MISSING_CHECK: START\r\n\r\n' '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') + self.assertEqual(self.sender.available_set, + set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_missing_check_send_list(self): def yield_hashes(device, partition, policy_idx, suffixes=None): @@ -556,6 +673,8 @@ class TestSender(unittest.TestCase): '33\r\n9d41d8cd98f00b204e9800998ecf0abc 1380144470.00000\r\n\r\n' '15\r\n:MISSING_CHECK: END\r\n\r\n') self.assertEqual(self.sender.send_list, ['0123abc']) + self.assertEqual(self.sender.available_set, + set(['9d41d8cd98f00b204e9800998ecf0abc'])) def test_updates_timeout(self): self.sender.connection = FakeConnection()