Add handoffs_first and handoff_delete to obj-repl

If handoffs_first is True, then the object replicator will give
partitions that are not supposed to be on the node priority.

If handoff_delete is set to a number (n), then it will delete a handoff
partition if at least n replicas were successfully replicated

Also fixed a couple of things in the object replicator unit tests and
added some more

DocImpact

Change-Id: Icb9968953cf467be2a52046fb16f4b84eb5604e4
This commit is contained in:
Chuck Thier 2013-08-22 19:23:29 +00:00
parent cb114e5ecf
commit a30a7ced9c
3 changed files with 163 additions and 6 deletions

View File

@ -414,6 +414,19 @@ stats_interval 3600 Interval in seconds between logging
replication statistics
reclaim_age 604800 Time elapsed in seconds before an
object can be reclaimed
handoffs_first false If set to True, partitions that are
not supposed to be on the node will be
replicated first. The default setting
should not be changed, except for
extreme situations.
handoff_delete auto By default handoff partitions will be
removed when it has successfully
replicated to all the cannonical nodes.
If set to an integer n, it will remove
the partition if it is successfully
replicated to n nodes. The default
setting should not be changed, except
for extremem situations.
================== ================= =======================================
[object-updater]

View File

@ -31,7 +31,7 @@ from swift.common.ring import Ring
from swift.common.utils import whataremyips, unlink_older_than, \
compute_eta, get_logger, dump_recon_cache, \
rsync_ip, mkdirs, config_true_value, list_from_csv, get_hub, \
tpool_reraise
tpool_reraise, config_auto_int_value
from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon
from swift.common.http import HTTP_OK, HTTP_INSUFFICIENT_STORAGE
@ -83,6 +83,10 @@ class ObjectReplicator(Daemon):
'user-agent': 'obj-replicator %s' % os.getpid()}
self.rsync_error_log_line_length = \
int(conf.get('rsync_error_log_line_length', 0))
self.handoffs_first = config_true_value(conf.get('handoffs_first',
False))
self.handoff_delete = config_auto_int_value(
conf.get('handoff_delete', 'auto'), 0)
def _rsync(self, args):
"""
@ -212,8 +216,15 @@ class ObjectReplicator(Daemon):
'/' + '-'.join(suffixes), headers=self.headers)
conn.getresponse().read()
responses.append(success)
if not suffixes or (len(responses) ==
len(job['nodes']) and all(responses)):
if self.handoff_delete:
# delete handoff if we have had handoff_delete successes
delete_handoff = len([resp for resp in responses if resp]) >= \
self.handoff_delete
else:
# delete handoff if all syncs were successful
delete_handoff = len(responses) == len(job['nodes']) and \
all(responses)
if not suffixes or delete_handoff:
self.logger.info(_("Removing partition: %s"), job['path'])
tpool.execute(shutil.rmtree, job['path'], ignore_errors=True)
except (Exception, Timeout):
@ -412,6 +423,9 @@ class ObjectReplicator(Daemon):
except (ValueError, OSError):
continue
random.shuffle(jobs)
if self.handoffs_first:
# Move the handoff parts to the front of the list
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
return jobs

View File

@ -236,7 +236,7 @@ class TestObjectReplicator(unittest.TestCase):
for job in jobs:
jobs_by_part[job['partition']] = job
self.assertEquals(len(jobs_to_delete), 1)
self.assertTrue('1', jobs_to_delete[0]['partition'])
self.assertEquals('1', jobs_to_delete[0]['partition'])
self.assertEquals(
[node['id'] for node in jobs_by_part['0']['nodes']], [1, 2])
self.assertEquals(
@ -251,6 +251,12 @@ class TestObjectReplicator(unittest.TestCase):
self.assertEquals(jobs_by_part[part]['path'],
os.path.join(self.objects, part))
def test_collect_jobs_handoffs_first(self):
self.replicator.handoffs_first = True
jobs = self.replicator.collect_jobs()
self.assertTrue(jobs[0]['delete'])
self.assertEquals('1', jobs[0]['partition'])
def test_collect_jobs_removes_zbf(self):
"""
After running xfs_repair, a partition directory could become a
@ -292,13 +298,137 @@ class TestObjectReplicator(unittest.TestCase):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = diskfile.DiskFile(self.devices,
'sda', '0', 'a', 'c', 'o', FakeLogger())
'sda', '1', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
print df.datadir
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
nodes = [node for node in
self.ring.get_part_nodes(1)
if node['ip'] not in _ips()]
process_arg_checker = []
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
def test_delete_partition_with_failures(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
df = diskfile.DiskFile(self.devices,
'sda', '1', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
print df.datadir
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
nodes = [node for node in
self.ring.get_part_nodes(1)
if node['ip'] not in _ips()]
process_arg_checker = []
for i, node in enumerate(nodes):
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
if i == 0:
# force one of the rsync calls to fail
ret_code = 1
else:
ret_code = 0
process_arg_checker.append(
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The path should still exist
self.assertTrue(os.access(part_path, os.F_OK))
def test_delete_partition_with_handoff_delete(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.handoff_delete = 2
df = diskfile.DiskFile(self.devices,
'sda', '1', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
print df.datadir
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
nodes = [node for node in
self.ring.get_part_nodes(1)
if node['ip'] not in _ips()]
process_arg_checker = []
for i, node in enumerate(nodes):
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
if i == 0:
# force one of the rsync calls to fail
ret_code = 1
else:
ret_code = 0
process_arg_checker.append(
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
def test_delete_partition_with_handoff_delete_failures(self):
with mock.patch('swift.obj.replicator.http_connect',
mock_http_connect(200)):
self.replicator.handoff_delete = 2
df = diskfile.DiskFile(self.devices,
'sda', '1', 'a', 'c', 'o', FakeLogger())
mkdirs(df.datadir)
print df.datadir
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '1', data_dir)
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
nodes = [node for node in
self.ring.get_part_nodes(1)
if node['ip'] not in _ips()]
process_arg_checker = []
for i, node in enumerate(nodes):
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], 1)
if i in (0, 1):
# force two of the rsync calls to fail
ret_code = 1
else:
ret_code = 0
process_arg_checker.append(
(ret_code, '', ['rsync', whole_path_from, rsync_mod]))
with _mock_process(process_arg_checker):
self.replicator.replicate()
# The file should still exist
self.assertTrue(os.access(part_path, os.F_OK))
def test_delete_partition_override_params(self):
df = diskfile.DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o',
FakeLogger())