Fix efficient replication handoff delete
Current code might delete local handoff objects incorrectly when remote node requires whole of the objects at poking because empty cand_objs won't be applied to the delete candidate objects list. This patch ensures the delete candidate objects list always will be updated (i.e. it will be empty list when the poke job find whole local objects are required by remote), and then, handle deleting objects correctly according to the delete candidate. This patch includes a test written by Clay Gerrard at [1]. Co-Authored-By: Clay Gerrard <clay.gerrard@gmail.com> 1: https://review.openstack.org/#/c/155542/ Change-Id: Ie8f75ed65c7bfefbb18ddccd9fe0e41b72dca0a4
This commit is contained in:
parent
a6091c0f39
commit
f578a35100
@ -256,7 +256,7 @@ class ObjectReplicator(Daemon):
|
|||||||
node['device'], job['partition'], 'REPLICATE',
|
node['device'], job['partition'], 'REPLICATE',
|
||||||
'/' + '-'.join(suffixes), headers=self.headers)
|
'/' + '-'.join(suffixes), headers=self.headers)
|
||||||
conn.getresponse().read()
|
conn.getresponse().read()
|
||||||
if node['region'] != job['region'] and cand_objs:
|
if node['region'] != job['region']:
|
||||||
synced_remote_regions[node['region']] = cand_objs
|
synced_remote_regions[node['region']] = cand_objs
|
||||||
responses.append(success)
|
responses.append(success)
|
||||||
for region, cand_objs in synced_remote_regions.iteritems():
|
for region, cand_objs in synced_remote_regions.iteritems():
|
||||||
@ -272,21 +272,25 @@ class ObjectReplicator(Daemon):
|
|||||||
# delete handoff if all syncs were successful
|
# delete handoff if all syncs were successful
|
||||||
delete_handoff = len(responses) == len(job['nodes']) and \
|
delete_handoff = len(responses) == len(job['nodes']) and \
|
||||||
all(responses)
|
all(responses)
|
||||||
if not suffixes or delete_handoff:
|
if delete_handoff:
|
||||||
if delete_objs:
|
if delete_objs:
|
||||||
self.logger.info(_("Removing %s objects"),
|
self.logger.info(_("Removing %s objects"),
|
||||||
len(delete_objs))
|
len(delete_objs))
|
||||||
self.delete_handoff_objs(job, delete_objs)
|
self.delete_handoff_objs(job, delete_objs)
|
||||||
else:
|
elif self.conf.get('sync_method') == 'rsync':
|
||||||
self.logger.info(_("Removing partition: %s"), job['path'])
|
self.delete_partition(job['path'])
|
||||||
tpool.execute(
|
elif not suffixes:
|
||||||
shutil.rmtree, job['path'], ignore_errors=True)
|
self.delete_partition(job['path'])
|
||||||
except (Exception, Timeout):
|
except (Exception, Timeout):
|
||||||
self.logger.exception(_("Error syncing handoff partition"))
|
self.logger.exception(_("Error syncing handoff partition"))
|
||||||
finally:
|
finally:
|
||||||
self.partition_times.append(time.time() - begin)
|
self.partition_times.append(time.time() - begin)
|
||||||
self.logger.timing_since('partition.delete.timing', begin)
|
self.logger.timing_since('partition.delete.timing', begin)
|
||||||
|
|
||||||
|
def delete_partition(self, path):
|
||||||
|
self.logger.info(_("Removing partition: %s"), path)
|
||||||
|
tpool.execute(shutil.rmtree, path, ignore_errors=True)
|
||||||
|
|
||||||
def delete_handoff_objs(self, job, delete_objs):
|
def delete_handoff_objs(self, job, delete_objs):
|
||||||
for object_hash in delete_objs:
|
for object_hash in delete_objs:
|
||||||
object_path = storage_directory(job['obj_path'], job['partition'],
|
object_path = storage_directory(job['obj_path'], job['partition'],
|
||||||
|
@ -70,7 +70,7 @@ class Sender(object):
|
|||||||
# other exceptions will be logged with a full stack trace.
|
# other exceptions will be logged with a full stack trace.
|
||||||
self.connect()
|
self.connect()
|
||||||
self.missing_check()
|
self.missing_check()
|
||||||
if not self.remote_check_objs:
|
if self.remote_check_objs is None:
|
||||||
self.updates()
|
self.updates()
|
||||||
can_delete_obj = self.available_set
|
can_delete_obj = self.available_set
|
||||||
else:
|
else:
|
||||||
@ -196,7 +196,7 @@ class Sender(object):
|
|||||||
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
|
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
|
||||||
self.job['device'], self.job['partition'],
|
self.job['device'], self.job['partition'],
|
||||||
self.policy_idx, self.suffixes)
|
self.policy_idx, self.suffixes)
|
||||||
if self.remote_check_objs:
|
if self.remote_check_objs is not None:
|
||||||
hash_gen = ifilter(lambda (path, object_hash, timestamp):
|
hash_gen = ifilter(lambda (path, object_hash, timestamp):
|
||||||
object_hash in self.remote_check_objs, hash_gen)
|
object_hash in self.remote_check_objs, hash_gen)
|
||||||
for path, object_hash, timestamp in hash_gen:
|
for path, object_hash, timestamp in hash_gen:
|
||||||
|
@ -27,7 +27,7 @@ from errno import ENOENT, ENOTEMPTY, ENOTDIR
|
|||||||
from eventlet.green import subprocess
|
from eventlet.green import subprocess
|
||||||
from eventlet import Timeout, tpool
|
from eventlet import Timeout, tpool
|
||||||
|
|
||||||
from test.unit import FakeLogger, patch_policies
|
from test.unit import FakeLogger, debug_logger, patch_policies
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||||
storage_directory
|
storage_directory
|
||||||
@ -186,7 +186,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
_create_test_rings(self.testdir)
|
_create_test_rings(self.testdir)
|
||||||
self.conf = dict(
|
self.conf = dict(
|
||||||
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
||||||
timeout='300', stats_interval='1')
|
timeout='300', stats_interval='1', sync_method='rsync')
|
||||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||||
self.replicator.logger = FakeLogger()
|
self.replicator.logger = FakeLogger()
|
||||||
self.df_mgr = diskfile.DiskFileManager(self.conf,
|
self.df_mgr = diskfile.DiskFileManager(self.conf,
|
||||||
@ -713,6 +713,46 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
self.assertTrue(os.access(part_path, os.F_OK))
|
self.assertTrue(os.access(part_path, os.F_OK))
|
||||||
del self.call_nums
|
del self.call_nums
|
||||||
|
|
||||||
|
def test_delete_objs_ssync_only_when_in_sync(self):
|
||||||
|
self.replicator.logger = debug_logger('test-replicator')
|
||||||
|
with mock.patch('swift.obj.replicator.http_connect',
|
||||||
|
mock_http_connect(200)):
|
||||||
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||||
|
mkdirs(df._datadir)
|
||||||
|
f = open(os.path.join(df._datadir,
|
||||||
|
normalize_timestamp(time.time()) + '.data'),
|
||||||
|
'wb')
|
||||||
|
f.write('0')
|
||||||
|
f.close()
|
||||||
|
ohash = hash_path('a', 'c', 'o')
|
||||||
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
||||||
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
||||||
|
part_path = os.path.join(self.objects, '1')
|
||||||
|
self.assertTrue(os.access(part_path, os.F_OK))
|
||||||
|
self.call_nums = 0
|
||||||
|
self.conf['sync_method'] = 'ssync'
|
||||||
|
|
||||||
|
in_sync_objs = []
|
||||||
|
|
||||||
|
def _fake_ssync(node, job, suffixes, remote_check_objs=None):
|
||||||
|
self.call_nums += 1
|
||||||
|
if remote_check_objs is None:
|
||||||
|
# sync job
|
||||||
|
ret_val = [whole_path_from]
|
||||||
|
else:
|
||||||
|
ret_val = in_sync_objs
|
||||||
|
return True, set(ret_val)
|
||||||
|
|
||||||
|
self.replicator.sync_method = _fake_ssync
|
||||||
|
self.replicator.replicate()
|
||||||
|
self.assertEqual(3, self.call_nums)
|
||||||
|
# The file should still exist
|
||||||
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||||
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||||
|
self.assertTrue(os.access(part_path, os.F_OK))
|
||||||
|
|
||||||
|
del self.call_nums
|
||||||
|
|
||||||
def test_delete_partition_ssync_with_cleanup_failure(self):
|
def test_delete_partition_ssync_with_cleanup_failure(self):
|
||||||
with mock.patch('swift.obj.replicator.http_connect',
|
with mock.patch('swift.obj.replicator.http_connect',
|
||||||
mock_http_connect(200)):
|
mock_http_connect(200)):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user