diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index c06586c079..b3df0ce28f 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -256,7 +256,7 @@ class ObjectReplicator(Daemon): node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() - if node['region'] != job['region'] and cand_objs: + if node['region'] != job['region']: synced_remote_regions[node['region']] = cand_objs responses.append(success) for region, cand_objs in synced_remote_regions.iteritems(): @@ -272,21 +272,25 @@ class ObjectReplicator(Daemon): # delete handoff if all syncs were successful delete_handoff = len(responses) == len(job['nodes']) and \ all(responses) - if not suffixes or delete_handoff: + if delete_handoff: if delete_objs: self.logger.info(_("Removing %s objects"), len(delete_objs)) self.delete_handoff_objs(job, delete_objs) - else: - self.logger.info(_("Removing partition: %s"), job['path']) - tpool.execute( - shutil.rmtree, job['path'], ignore_errors=True) + elif self.conf.get('sync_method') == 'rsync': + self.delete_partition(job['path']) + elif not suffixes: + self.delete_partition(job['path']) except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) finally: self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin) + def delete_partition(self, path): + self.logger.info(_("Removing partition: %s"), path) + tpool.execute(shutil.rmtree, path, ignore_errors=True) + def delete_handoff_objs(self, job, delete_objs): for object_hash in delete_objs: object_path = storage_directory(job['obj_path'], job['partition'], diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index dc288c0113..1058ab262d 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -70,7 +70,7 @@ class Sender(object): # other exceptions will be logged with a full stack trace. self.connect() self.missing_check() - if not self.remote_check_objs: + if self.remote_check_objs is None: self.updates() can_delete_obj = self.available_set else: @@ -196,7 +196,7 @@ class Sender(object): hash_gen = self.daemon._diskfile_mgr.yield_hashes( self.job['device'], self.job['partition'], self.policy_idx, self.suffixes) - if self.remote_check_objs: + if self.remote_check_objs is not None: hash_gen = ifilter(lambda (path, object_hash, timestamp): object_hash in self.remote_check_objs, hash_gen) for path, object_hash, timestamp in hash_gen: diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 47288b9ed2..bf1c5bcb52 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -27,7 +27,7 @@ from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess from eventlet import Timeout, tpool -from test.unit import FakeLogger, patch_policies +from test.unit import FakeLogger, debug_logger, patch_policies from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory @@ -186,7 +186,7 @@ class TestObjectReplicator(unittest.TestCase): _create_test_rings(self.testdir) self.conf = dict( swift_dir=self.testdir, devices=self.devices, mount_check='false', - timeout='300', stats_interval='1') + timeout='300', stats_interval='1', sync_method='rsync') self.replicator = object_replicator.ObjectReplicator(self.conf) self.replicator.logger = FakeLogger() self.df_mgr = diskfile.DiskFileManager(self.conf, @@ -713,6 +713,46 @@ class TestObjectReplicator(unittest.TestCase): self.assertTrue(os.access(part_path, os.F_OK)) del self.call_nums + def test_delete_objs_ssync_only_when_in_sync(self): + self.replicator.logger = debug_logger('test-replicator') + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o') + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('0') + f.close() + ohash = hash_path('a', 'c', 'o') + whole_path_from = storage_directory(self.objects, 1, ohash) + suffix_dir_path = os.path.dirname(whole_path_from) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + self.call_nums = 0 + self.conf['sync_method'] = 'ssync' + + in_sync_objs = [] + + def _fake_ssync(node, job, suffixes, remote_check_objs=None): + self.call_nums += 1 + if remote_check_objs is None: + # sync job + ret_val = [whole_path_from] + else: + ret_val = in_sync_objs + return True, set(ret_val) + + self.replicator.sync_method = _fake_ssync + self.replicator.replicate() + self.assertEqual(3, self.call_nums) + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + + del self.call_nums + def test_delete_partition_ssync_with_cleanup_failure(self): with mock.patch('swift.obj.replicator.http_connect', mock_http_connect(200)):