adding tests and splitting out collect jobs

This commit is contained in:
David Goetz
2010-11-05 09:15:31 -07:00
parent 2cb61902a3
commit a71164995a
3 changed files with 167 additions and 113 deletions

View File

@@ -29,7 +29,6 @@ from urllib import quote
from contextlib import contextmanager
import ctypes
import ctypes.util
import fcntl
import struct
from ConfigParser import ConfigParser, NoSectionError, NoOptionError
from tempfile import mkstemp
@@ -622,6 +621,7 @@ def write_pickle(obj, dest, tmp):
os.fsync(fd)
renamer(tmppath, dest)
def audit_location_generator(devices, datadir, mount_check=True, logger=None):
'''
Given a devices path and a data directory, yield (path, device,

View File

@@ -470,6 +470,36 @@ class ObjectReplicator(Daemon):
self.kill_coros()
self.last_replication_count = self.replication_count
def collect_jobs(self):
jobs = []
ips = whataremyips()
for local_dev in [dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not os.path.ismount(dev_path):
self.logger.warn('%s is not mounted' % local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
if not os.path.exists(obj_path):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
self.object_ring.get_part_nodes(int(partition))
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes, delete=len(nodes) > 2,
partition=partition))
except ValueError:
continue
random.shuffle(jobs)
# Partititons that need to be deleted take priority
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
return jobs
def replicate(self):
"""Run a replication pass"""
self.start = time.time()
@@ -479,38 +509,11 @@ class ObjectReplicator(Daemon):
self.replication_count = 0
self.last_replication_count = -1
self.partition_times = []
jobs = []
stats = eventlet.spawn(self.heartbeat)
lockup_detector = eventlet.spawn(self.detect_lockups)
try:
ips = whataremyips()
self.run_pool = GreenPool(size=self.concurrency)
for local_dev in [
dev for dev in self.object_ring.devs
if dev and dev['ip'] in ips and dev['port'] == self.port]:
dev_path = join(self.devices_dir, local_dev['device'])
obj_path = join(dev_path, 'objects')
tmp_path = join(dev_path, 'tmp')
if self.mount_check and not os.path.ismount(dev_path):
self.logger.warn('%s is not mounted' % local_dev['device'])
continue
unlink_older_than(tmp_path, time.time() - self.reclaim_age)
if not os.path.exists(obj_path):
continue
for partition in os.listdir(obj_path):
try:
nodes = [node for node in
self.object_ring.get_part_nodes(int(partition))
if node['id'] != local_dev['id']]
jobs.append(dict(path=join(obj_path, partition),
nodes=nodes, delete=len(nodes) > 2,
partition=partition))
except ValueError:
continue
random.shuffle(jobs)
# Partititons that need to be deleted take priority
jobs.sort(key=lambda job: not job['delete'])
self.job_count = len(jobs)
jobs = self.collect_jobs()
for job in jobs:
if not self.check_ring():
self.logger.info(

View File

@@ -32,11 +32,14 @@ from swift.common import ring
from swift.obj import replicator as object_replicator
from swift.obj.server import DiskFile
def _ips():
return ['127.0.0.0',]
return ['127.0.0.0']
object_replicator.whataremyips = _ips
class NullHandler(logging.Handler):
def emit(self, record):
pass
null_logger = logging.getLogger("testing")
@@ -55,7 +58,7 @@ def mock_http_connect(status):
self.method = args[4]
self.path = args[5]
self.with_exc = False
self.headers = kwargs.get('headers',{})
self.headers = kwargs.get('headers', {})
def getresponse(self):
if self.with_exc:
@@ -74,12 +77,14 @@ def mock_http_connect(status):
process_errors = []
class MockProcess(object):
ret_code = None
ret_log = None
check_args = None
class Stream(object):
def read(self):
return MockProcess.ret_log.next()
@@ -94,6 +99,7 @@ class MockProcess(object):
def wait(self):
return self.ret_code.next()
@contextmanager
def _mock_process(ret):
orig_process = subprocess.Popen
@@ -104,6 +110,7 @@ def _mock_process(ret):
yield
object_replicator.subprocess.Popen = orig_process
def _create_test_ring(path):
testgz = os.path.join(path, 'object.ring.gz')
intended_replica2part2dev_id = [
@@ -141,7 +148,7 @@ class TestObjectReplicator(unittest.TestCase):
self.objects = os.path.join(self.devices, 'sda', 'objects')
os.mkdir(self.objects)
self.parts = {}
for part in ['0','1','2', '3']:
for part in ['0', '1', '2', '3']:
self.parts[part] = os.path.join(self.objects, part)
os.mkdir(os.path.join(self.objects, part))
self.ring = _create_test_ring(self.testdir)
@@ -206,8 +213,8 @@ class TestObjectReplicator(unittest.TestCase):
def test_hash_suffix_multi_file_one(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
for tdiff in [1,50,100,500]:
for suff in ['.meta','.data','.ts']:
for tdiff in [1, 50, 100, 500]:
for suff in ['.meta', '.data', '.ts']:
f = open(os.path.join(df.datadir,
normalize_timestamp(int(time.time()) - tdiff) + suff),
'wb')
@@ -224,12 +231,11 @@ class TestObjectReplicator(unittest.TestCase):
# only the tombstone should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 1)
def test_hash_suffix_multi_file_two(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
for tdiff in [1,50,100,500]:
suffs = ['.meta','.data']
for tdiff in [1, 50, 100, 500]:
suffs = ['.meta', '.data']
if tdiff > 50:
suffs.append('.ts')
for suff in suffs:
@@ -249,88 +255,133 @@ class TestObjectReplicator(unittest.TestCase):
# only the meta and data should be left
self.assertEquals(len(os.listdir(whole_hsh_path)), 2)
def test_invalidate_hash(self):
# def test_check_ring(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# self.assertTrue(self.replicator.check_ring())
# orig_check = self.replicator.next_check
# self.replicator.next_check = orig_check - 30
# self.assertTrue(self.replicator.check_ring())
# self.replicator.next_check = orig_check
# orig_ring_time = self.replicator.object_ring._mtime
# self.replicator.object_ring._mtime = orig_ring_time - 30
# self.assertTrue(self.replicator.check_ring())
# self.replicator.next_check = orig_check - 30
# self.assertFalse(self.replicator.check_ring())
#
# def test_collect_jobs(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# self.assertTrue('1' in self.replicator.parts_to_delete)
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['0']['nodes']],
# [1,2])
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['1']['nodes']],
# [1,2,3])
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['2']['nodes']],
# [2,3])
# self.assertEquals(
# [node['id'] for node in self.replicator.partitions['3']['nodes']],
# [3,1])
# for part in ['0', '1', '2', '3']:
# self.assertEquals(self.replicator.partitions[part]['device'], 'sda')
# self.assertEquals(self.replicator.partitions[part]['path'],
# self.objects)
#
# def test_delete_partition(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# part_path = os.path.join(self.objects, '1')
# self.assertTrue(os.access(part_path, os.F_OK))
# self.replicator.delete_partition('1')
# self.assertFalse(os.access(part_path, os.F_OK))
#
# def test_rsync(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(0,''), (0,''), (0,'')]):
# self.replicator.rsync('0')
#
# def test_rsync_delete_no(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"),
# (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [False, True, True])
#
# def test_rsync_delete_yes(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(0,''), (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True])
#
# def test_rsync_delete_yes_with_failure(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True])
#
# def test_rsync_failed_drive(self):
# self.replicator.collect_jobs('sda', 0, self.ring)
# with _mock_process([(12,'There was an error in file IO'),
# (0,''), (0,''), (0,'')]):
# self.replicator.rsync('1')
# self.assertEquals(self.replicator.parts_to_delete['1'],
# [True, True, True])
def assertFileData(file_path, data):
with open(file_path, 'r') as fp:
fdata = fp.read()
self.assertEquals(fdata, data)
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, '0', data_dir)
hashes_file = os.path.join(self.objects, '0',
object_replicator.HASH_FILE)
# test that non existant file except caught
self.assertEquals(object_replicator.invalidate_hash(whole_path_from),
None)
# test that hashes get cleared
check_pickle_data = pickle.dumps({data_dir: None},
object_replicator.PICKLE_PROTOCOL)
for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]:
with open(hashes_file, 'wb') as fp:
pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL)
object_replicator.invalidate_hash(whole_path_from)
assertFileData(hashes_file, check_pickle_data)
def test_check_ring(self):
self.assertTrue(self.replicator.check_ring())
orig_check = self.replicator.next_check
self.replicator.next_check = orig_check - 30
self.assertTrue(self.replicator.check_ring())
self.replicator.next_check = orig_check
orig_ring_time = self.replicator.object_ring._mtime
self.replicator.object_ring._mtime = orig_ring_time - 30
self.assertTrue(self.replicator.check_ring())
self.replicator.next_check = orig_check - 30
self.assertFalse(self.replicator.check_ring())
def test_collect_jobs(self):
jobs = self.replicator.collect_jobs()
jobs_to_delete = [j for j in jobs if j['delete']]
jobs_to_keep = [j for j in jobs if not j['delete']]
jobs_by_part = {}
for job in jobs:
jobs_by_part[job['partition']] = job
self.assertEquals(len(jobs_to_delete), 1)
self.assertTrue('1', jobs_to_delete[0]['partition'])
self.assertEquals(
[node['id'] for node in jobs_by_part['0']['nodes']], [1, 2])
self.assertEquals(
[node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_part['2']['nodes']], [2, 3])
self.assertEquals(
[node['id'] for node in jobs_by_part['3']['nodes']], [3, 1])
for part in ['0', '1', '2', '3']:
for node in jobs_by_part[part]['nodes']:
self.assertEquals(node['device'], 'sda')
self.assertEquals(jobs_by_part[part]['path'],
os.path.join(self.objects, part))
def test_delete_partition(self):
df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o')
mkdirs(df.datadir)
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
part_path = os.path.join(self.objects, '1')
self.assertTrue(os.access(part_path, os.F_OK))
self.replicator.replicate()
self.assertFalse(os.access(part_path, os.F_OK))
def test_rsync(self):
jobs = self.replicator.collect_jobs()
job = jobs[0]
node = job['nodes'][0]
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
with _mock_process([(0, ''), (0, ''), (0, '')]):
self.replicator.rsync(node, job, [data_dir])
def test_run_once_recover_from_failure(self):
replicator = object_replicator.ObjectReplicator(
dict(swift_dir=self.testdir, devices=self.devices,
mount_check='false', timeout='300', stats_interval='1'))
was_connector = object_replicator.http_connect
object_replicator.http_connect = mock_http_connect(200)
# Write some files into '1' and run replicate- they should be moved
# to the other partitoins and then node should get deleted.
cur_part = '1'
df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o')
mkdirs(df.datadir)
f = open(os.path.join(df.datadir,
normalize_timestamp(time.time()) + '.data'),
'wb')
f.write('1234567890')
f.close()
ohash = hash_path('a', 'c', 'o')
data_dir = ohash[-3:]
whole_path_from = os.path.join(self.objects, cur_part, data_dir)
process_arg_checker = []
nodes = [node for node in
self.ring.get_part_nodes(int(cur_part)) \
if node['ip'] not in _ips()]
for node in nodes:
rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part)
process_arg_checker.append(
(0, '', ['rsync', whole_path_from, rsync_mod]))
self.assertTrue(os.access(os.path.join(self.objects,
'1', data_dir, ohash),
os.F_OK))
with _mock_process(process_arg_checker):
replicator.run_once()
self.assertFalse(process_errors)
for i, result in [('0', True), ('1', False),
('2', True), ('3', True)]:
self.assertEquals(os.access(
os.path.join(self.objects,
i, object_replicator.HASH_FILE),
os.F_OK), result)
object_replicator.http_connect = was_connector
def test_run(self):
with _mock_process([(0,'')]*100):
with _mock_process([(0, '')]*100):
self.replicator.replicate()
def test_run_withlog(self):
with _mock_process([(0,"stuff in log")]*100):
with _mock_process([(0, "stuff in log")]*100):
self.replicator.replicate()
if __name__ == '__main__':