Merge "Fix efficient replication handoff delete"
This commit is contained in:
commit
6a6f7d5c9f
@ -256,7 +256,7 @@ class ObjectReplicator(Daemon):
|
|||||||
node['device'], job['partition'], 'REPLICATE',
|
node['device'], job['partition'], 'REPLICATE',
|
||||||
'/' + '-'.join(suffixes), headers=self.headers)
|
'/' + '-'.join(suffixes), headers=self.headers)
|
||||||
conn.getresponse().read()
|
conn.getresponse().read()
|
||||||
if node['region'] != job['region'] and cand_objs:
|
if node['region'] != job['region']:
|
||||||
synced_remote_regions[node['region']] = cand_objs
|
synced_remote_regions[node['region']] = cand_objs
|
||||||
responses.append(success)
|
responses.append(success)
|
||||||
for region, cand_objs in synced_remote_regions.iteritems():
|
for region, cand_objs in synced_remote_regions.iteritems():
|
||||||
@ -272,21 +272,25 @@ class ObjectReplicator(Daemon):
|
|||||||
# delete handoff if all syncs were successful
|
# delete handoff if all syncs were successful
|
||||||
delete_handoff = len(responses) == len(job['nodes']) and \
|
delete_handoff = len(responses) == len(job['nodes']) and \
|
||||||
all(responses)
|
all(responses)
|
||||||
if not suffixes or delete_handoff:
|
if delete_handoff:
|
||||||
if delete_objs:
|
if delete_objs:
|
||||||
self.logger.info(_("Removing %s objects"),
|
self.logger.info(_("Removing %s objects"),
|
||||||
len(delete_objs))
|
len(delete_objs))
|
||||||
self.delete_handoff_objs(job, delete_objs)
|
self.delete_handoff_objs(job, delete_objs)
|
||||||
else:
|
elif self.conf.get('sync_method') == 'rsync':
|
||||||
self.logger.info(_("Removing partition: %s"), job['path'])
|
self.delete_partition(job['path'])
|
||||||
tpool.execute(
|
elif not suffixes:
|
||||||
shutil.rmtree, job['path'], ignore_errors=True)
|
self.delete_partition(job['path'])
|
||||||
except (Exception, Timeout):
|
except (Exception, Timeout):
|
||||||
self.logger.exception(_("Error syncing handoff partition"))
|
self.logger.exception(_("Error syncing handoff partition"))
|
||||||
finally:
|
finally:
|
||||||
self.partition_times.append(time.time() - begin)
|
self.partition_times.append(time.time() - begin)
|
||||||
self.logger.timing_since('partition.delete.timing', begin)
|
self.logger.timing_since('partition.delete.timing', begin)
|
||||||
|
|
||||||
|
def delete_partition(self, path):
|
||||||
|
self.logger.info(_("Removing partition: %s"), path)
|
||||||
|
tpool.execute(shutil.rmtree, path, ignore_errors=True)
|
||||||
|
|
||||||
def delete_handoff_objs(self, job, delete_objs):
|
def delete_handoff_objs(self, job, delete_objs):
|
||||||
for object_hash in delete_objs:
|
for object_hash in delete_objs:
|
||||||
object_path = storage_directory(job['obj_path'], job['partition'],
|
object_path = storage_directory(job['obj_path'], job['partition'],
|
||||||
|
@ -70,7 +70,7 @@ class Sender(object):
|
|||||||
# other exceptions will be logged with a full stack trace.
|
# other exceptions will be logged with a full stack trace.
|
||||||
self.connect()
|
self.connect()
|
||||||
self.missing_check()
|
self.missing_check()
|
||||||
if not self.remote_check_objs:
|
if self.remote_check_objs is None:
|
||||||
self.updates()
|
self.updates()
|
||||||
can_delete_obj = self.available_set
|
can_delete_obj = self.available_set
|
||||||
else:
|
else:
|
||||||
@ -196,7 +196,7 @@ class Sender(object):
|
|||||||
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
|
hash_gen = self.daemon._diskfile_mgr.yield_hashes(
|
||||||
self.job['device'], self.job['partition'],
|
self.job['device'], self.job['partition'],
|
||||||
self.policy_idx, self.suffixes)
|
self.policy_idx, self.suffixes)
|
||||||
if self.remote_check_objs:
|
if self.remote_check_objs is not None:
|
||||||
hash_gen = ifilter(lambda (path, object_hash, timestamp):
|
hash_gen = ifilter(lambda (path, object_hash, timestamp):
|
||||||
object_hash in self.remote_check_objs, hash_gen)
|
object_hash in self.remote_check_objs, hash_gen)
|
||||||
for path, object_hash, timestamp in hash_gen:
|
for path, object_hash, timestamp in hash_gen:
|
||||||
|
@ -27,7 +27,7 @@ from errno import ENOENT, ENOTEMPTY, ENOTDIR
|
|||||||
from eventlet.green import subprocess
|
from eventlet.green import subprocess
|
||||||
from eventlet import Timeout, tpool
|
from eventlet import Timeout, tpool
|
||||||
|
|
||||||
from test.unit import FakeLogger, patch_policies
|
from test.unit import FakeLogger, debug_logger, patch_policies
|
||||||
from swift.common import utils
|
from swift.common import utils
|
||||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||||
storage_directory
|
storage_directory
|
||||||
@ -186,7 +186,7 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
_create_test_rings(self.testdir)
|
_create_test_rings(self.testdir)
|
||||||
self.conf = dict(
|
self.conf = dict(
|
||||||
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
swift_dir=self.testdir, devices=self.devices, mount_check='false',
|
||||||
timeout='300', stats_interval='1')
|
timeout='300', stats_interval='1', sync_method='rsync')
|
||||||
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
self.replicator = object_replicator.ObjectReplicator(self.conf)
|
||||||
self.replicator.logger = FakeLogger()
|
self.replicator.logger = FakeLogger()
|
||||||
self.df_mgr = diskfile.DiskFileManager(self.conf,
|
self.df_mgr = diskfile.DiskFileManager(self.conf,
|
||||||
@ -713,6 +713,46 @@ class TestObjectReplicator(unittest.TestCase):
|
|||||||
self.assertTrue(os.access(part_path, os.F_OK))
|
self.assertTrue(os.access(part_path, os.F_OK))
|
||||||
del self.call_nums
|
del self.call_nums
|
||||||
|
|
||||||
|
def test_delete_objs_ssync_only_when_in_sync(self):
|
||||||
|
self.replicator.logger = debug_logger('test-replicator')
|
||||||
|
with mock.patch('swift.obj.replicator.http_connect',
|
||||||
|
mock_http_connect(200)):
|
||||||
|
df = self.df_mgr.get_diskfile('sda', '1', 'a', 'c', 'o')
|
||||||
|
mkdirs(df._datadir)
|
||||||
|
f = open(os.path.join(df._datadir,
|
||||||
|
normalize_timestamp(time.time()) + '.data'),
|
||||||
|
'wb')
|
||||||
|
f.write('0')
|
||||||
|
f.close()
|
||||||
|
ohash = hash_path('a', 'c', 'o')
|
||||||
|
whole_path_from = storage_directory(self.objects, 1, ohash)
|
||||||
|
suffix_dir_path = os.path.dirname(whole_path_from)
|
||||||
|
part_path = os.path.join(self.objects, '1')
|
||||||
|
self.assertTrue(os.access(part_path, os.F_OK))
|
||||||
|
self.call_nums = 0
|
||||||
|
self.conf['sync_method'] = 'ssync'
|
||||||
|
|
||||||
|
in_sync_objs = []
|
||||||
|
|
||||||
|
def _fake_ssync(node, job, suffixes, remote_check_objs=None):
|
||||||
|
self.call_nums += 1
|
||||||
|
if remote_check_objs is None:
|
||||||
|
# sync job
|
||||||
|
ret_val = [whole_path_from]
|
||||||
|
else:
|
||||||
|
ret_val = in_sync_objs
|
||||||
|
return True, set(ret_val)
|
||||||
|
|
||||||
|
self.replicator.sync_method = _fake_ssync
|
||||||
|
self.replicator.replicate()
|
||||||
|
self.assertEqual(3, self.call_nums)
|
||||||
|
# The file should still exist
|
||||||
|
self.assertTrue(os.access(whole_path_from, os.F_OK))
|
||||||
|
self.assertTrue(os.access(suffix_dir_path, os.F_OK))
|
||||||
|
self.assertTrue(os.access(part_path, os.F_OK))
|
||||||
|
|
||||||
|
del self.call_nums
|
||||||
|
|
||||||
def test_delete_partition_ssync_with_cleanup_failure(self):
|
def test_delete_partition_ssync_with_cleanup_failure(self):
|
||||||
with mock.patch('swift.obj.replicator.http_connect',
|
with mock.patch('swift.obj.replicator.http_connect',
|
||||||
mock_http_connect(200)):
|
mock_http_connect(200)):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user