From f578a35100f5dcd0046c79e810441633d28f55ff Mon Sep 17 00:00:00 2001 From: Kota Tsuyuzaki Date: Thu, 12 Feb 2015 16:18:54 -0800 Subject: [PATCH] Fix efficient replication handoff delete Current code might delete local handoff objects incorrectly when remote node requires whole of the objects at poking because empty cand_objs won't be applied to the delete candidate objects list. This patch ensures the delete candidate objects list always will be updated (i.e. it will be empty list when the poke job find whole local objects are required by remote), and then, handle deleting objects correctly according to the delete candidate. This patch includes a test written by Clay Gerrard at [1]. Co-Authored-By: Clay Gerrard 1: https://review.openstack.org/#/c/155542/ Change-Id: Ie8f75ed65c7bfefbb18ddccd9fe0e41b72dca0a4 --- swift/obj/replicator.py | 16 +++++++----- swift/obj/ssync_sender.py | 4 +-- test/unit/obj/test_replicator.py | 44 ++++++++++++++++++++++++++++++-- 3 files changed, 54 insertions(+), 10 deletions(-) diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index c06586c079..b3df0ce28f 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -256,7 +256,7 @@ class ObjectReplicator(Daemon): node['device'], job['partition'], 'REPLICATE', '/' + '-'.join(suffixes), headers=self.headers) conn.getresponse().read() - if node['region'] != job['region'] and cand_objs: + if node['region'] != job['region']: synced_remote_regions[node['region']] = cand_objs responses.append(success) for region, cand_objs in synced_remote_regions.iteritems(): @@ -272,21 +272,25 @@ class ObjectReplicator(Daemon): # delete handoff if all syncs were successful delete_handoff = len(responses) == len(job['nodes']) and \ all(responses) - if not suffixes or delete_handoff: + if delete_handoff: if delete_objs: self.logger.info(_("Removing %s objects"), len(delete_objs)) self.delete_handoff_objs(job, delete_objs) - else: - self.logger.info(_("Removing partition: %s"), job['path']) - tpool.execute( - shutil.rmtree, job['path'], ignore_errors=True) + elif self.conf.get('sync_method') == 'rsync': + self.delete_partition(job['path']) + elif not suffixes: + self.delete_partition(job['path']) except (Exception, Timeout): self.logger.exception(_("Error syncing handoff partition")) finally: self.partition_times.append(time.time() - begin) self.logger.timing_since('partition.delete.timing', begin) + def delete_partition(self, path): + self.logger.info(_("Removing partition: %s"), path) + tpool.execute(shutil.rmtree, path, ignore_errors=True) + def delete_handoff_objs(self, job, delete_objs): for object_hash in delete_objs: object_path = storage_directory(job['obj_path'], job['partition'], diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index dc288c0113..1058ab262d 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -70,7 +70,7 @@ class Sender(object): # other exceptions will be logged with a full stack trace. self.connect() self.missing_check() - if not self.remote_check_objs: + if self.remote_check_objs is None: self.updates() can_delete_obj = self.available_set else: @@ -196,7 +196,7 @@ class Sender(object): hash_gen = self.daemon._diskfile_mgr.yield_hashes( self.job['device'], self.job['partition'], self.policy_idx, self.suffixes) - if self.remote_check_objs: + if self.remote_check_objs is not None: hash_gen = ifilter(lambda (path, object_hash, timestamp): object_hash in self.remote_check_objs, hash_gen) for path, object_hash, timestamp in hash_gen: diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index 47288b9ed2..bf1c5bcb52 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -27,7 +27,7 @@ from errno import ENOENT, ENOTEMPTY, ENOTDIR from eventlet.green import subprocess from eventlet import Timeout, tpool -from test.unit import FakeLogger, patch_policies +from test.unit import FakeLogger, debug_logger, patch_policies from swift.common import utils from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \ storage_directory @@ -186,7 +186,7 @@ class TestObjectReplicator(unittest.TestCase): _create_test_rings(self.testdir) self.conf = dict( swift_dir=self.testdir, devices=self.devices, mount_check='false', - timeout='300', stats_interval='1') + timeout='300', stats_interval='1', sync_method='rsync') self.replicator = object_replicator.ObjectReplicator(self.conf) self.replicator.logger = FakeLogger() self.df_mgr = diskfile.DiskFileManager(self.conf, @@ -713,6 +713,46 @@ class TestObjectReplicator(unittest.TestCase): self.assertTrue(os.access(part_path, os.F_OK)) del self.call_nums + def test_delete_objs_ssync_only_when_in_sync(self): + self.replicator.logger = debug_logger('test-replicator') + with mock.patch('swift.obj.replicator.http_connect', + mock_http_connect(200)): + df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o') + mkdirs(df._datadir) + f = open(os.path.join(df._datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('0') + f.close() + ohash = hash_path('a', 'c', 'o') + whole_path_from = storage_directory(self.objects, 1, ohash) + suffix_dir_path = os.path.dirname(whole_path_from) + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + self.call_nums = 0 + self.conf['sync_method'] = 'ssync' + + in_sync_objs = [] + + def _fake_ssync(node, job, suffixes, remote_check_objs=None): + self.call_nums += 1 + if remote_check_objs is None: + # sync job + ret_val = [whole_path_from] + else: + ret_val = in_sync_objs + return True, set(ret_val) + + self.replicator.sync_method = _fake_ssync + self.replicator.replicate() + self.assertEqual(3, self.call_nums) + # The file should still exist + self.assertTrue(os.access(whole_path_from, os.F_OK)) + self.assertTrue(os.access(suffix_dir_path, os.F_OK)) + self.assertTrue(os.access(part_path, os.F_OK)) + + del self.call_nums + def test_delete_partition_ssync_with_cleanup_failure(self): with mock.patch('swift.obj.replicator.http_connect', mock_http_connect(200)):