diff --git a/swift/common/utils.py b/swift/common/utils.py index 5a8bf0e1be..9df54f64db 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -29,7 +29,6 @@ from urllib import quote from contextlib import contextmanager import ctypes import ctypes.util -import fcntl import struct from ConfigParser import ConfigParser, NoSectionError, NoOptionError from tempfile import mkstemp @@ -622,6 +621,7 @@ def write_pickle(obj, dest, tmp): os.fsync(fd) renamer(tmppath, dest) + def audit_location_generator(devices, datadir, mount_check=True, logger=None): ''' Given a devices path and a data directory, yield (path, device, diff --git a/swift/obj/replicator.py b/swift/obj/replicator.py index 0ff90c92e1..0eec1920c8 100644 --- a/swift/obj/replicator.py +++ b/swift/obj/replicator.py @@ -470,6 +470,36 @@ class ObjectReplicator(Daemon): self.kill_coros() self.last_replication_count = self.replication_count + def collect_jobs(self): + jobs = [] + ips = whataremyips() + for local_dev in [dev for dev in self.object_ring.devs + if dev and dev['ip'] in ips and dev['port'] == self.port]: + dev_path = join(self.devices_dir, local_dev['device']) + obj_path = join(dev_path, 'objects') + tmp_path = join(dev_path, 'tmp') + if self.mount_check and not os.path.ismount(dev_path): + self.logger.warn('%s is not mounted' % local_dev['device']) + continue + unlink_older_than(tmp_path, time.time() - self.reclaim_age) + if not os.path.exists(obj_path): + continue + for partition in os.listdir(obj_path): + try: + nodes = [node for node in + self.object_ring.get_part_nodes(int(partition)) + if node['id'] != local_dev['id']] + jobs.append(dict(path=join(obj_path, partition), + nodes=nodes, delete=len(nodes) > 2, + partition=partition)) + except ValueError: + continue + random.shuffle(jobs) + # Partititons that need to be deleted take priority + jobs.sort(key=lambda job: not job['delete']) + self.job_count = len(jobs) + return jobs + def replicate(self): """Run a replication pass""" self.start = time.time() @@ -479,38 +509,11 @@ class ObjectReplicator(Daemon): self.replication_count = 0 self.last_replication_count = -1 self.partition_times = [] - jobs = [] stats = eventlet.spawn(self.heartbeat) lockup_detector = eventlet.spawn(self.detect_lockups) try: - ips = whataremyips() self.run_pool = GreenPool(size=self.concurrency) - for local_dev in [ - dev for dev in self.object_ring.devs - if dev and dev['ip'] in ips and dev['port'] == self.port]: - dev_path = join(self.devices_dir, local_dev['device']) - obj_path = join(dev_path, 'objects') - tmp_path = join(dev_path, 'tmp') - if self.mount_check and not os.path.ismount(dev_path): - self.logger.warn('%s is not mounted' % local_dev['device']) - continue - unlink_older_than(tmp_path, time.time() - self.reclaim_age) - if not os.path.exists(obj_path): - continue - for partition in os.listdir(obj_path): - try: - nodes = [node for node in - self.object_ring.get_part_nodes(int(partition)) - if node['id'] != local_dev['id']] - jobs.append(dict(path=join(obj_path, partition), - nodes=nodes, delete=len(nodes) > 2, - partition=partition)) - except ValueError: - continue - random.shuffle(jobs) - # Partititons that need to be deleted take priority - jobs.sort(key=lambda job: not job['delete']) - self.job_count = len(jobs) + jobs = self.collect_jobs() for job in jobs: if not self.check_ring(): self.logger.info( diff --git a/test/unit/obj/test_replicator.py b/test/unit/obj/test_replicator.py index a5e8cb3fe2..58b804460b 100644 --- a/test/unit/obj/test_replicator.py +++ b/test/unit/obj/test_replicator.py @@ -32,11 +32,14 @@ from swift.common import ring from swift.obj import replicator as object_replicator from swift.obj.server import DiskFile + def _ips(): - return ['127.0.0.0',] + return ['127.0.0.0'] object_replicator.whataremyips = _ips + class NullHandler(logging.Handler): + def emit(self, record): pass null_logger = logging.getLogger("testing") @@ -55,7 +58,7 @@ def mock_http_connect(status): self.method = args[4] self.path = args[5] self.with_exc = False - self.headers = kwargs.get('headers',{}) + self.headers = kwargs.get('headers', {}) def getresponse(self): if self.with_exc: @@ -74,12 +77,14 @@ def mock_http_connect(status): process_errors = [] + class MockProcess(object): ret_code = None ret_log = None check_args = None class Stream(object): + def read(self): return MockProcess.ret_log.next() @@ -94,6 +99,7 @@ class MockProcess(object): def wait(self): return self.ret_code.next() + @contextmanager def _mock_process(ret): orig_process = subprocess.Popen @@ -104,6 +110,7 @@ def _mock_process(ret): yield object_replicator.subprocess.Popen = orig_process + def _create_test_ring(path): testgz = os.path.join(path, 'object.ring.gz') intended_replica2part2dev_id = [ @@ -141,7 +148,7 @@ class TestObjectReplicator(unittest.TestCase): self.objects = os.path.join(self.devices, 'sda', 'objects') os.mkdir(self.objects) self.parts = {} - for part in ['0','1','2', '3']: + for part in ['0', '1', '2', '3']: self.parts[part] = os.path.join(self.objects, part) os.mkdir(os.path.join(self.objects, part)) self.ring = _create_test_ring(self.testdir) @@ -206,8 +213,8 @@ class TestObjectReplicator(unittest.TestCase): def test_hash_suffix_multi_file_one(self): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') mkdirs(df.datadir) - for tdiff in [1,50,100,500]: - for suff in ['.meta','.data','.ts']: + for tdiff in [1, 50, 100, 500]: + for suff in ['.meta', '.data', '.ts']: f = open(os.path.join(df.datadir, normalize_timestamp(int(time.time()) - tdiff) + suff), 'wb') @@ -224,12 +231,11 @@ class TestObjectReplicator(unittest.TestCase): # only the tombstone should be left self.assertEquals(len(os.listdir(whole_hsh_path)), 1) - def test_hash_suffix_multi_file_two(self): df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') mkdirs(df.datadir) - for tdiff in [1,50,100,500]: - suffs = ['.meta','.data'] + for tdiff in [1, 50, 100, 500]: + suffs = ['.meta', '.data'] if tdiff > 50: suffs.append('.ts') for suff in suffs: @@ -249,88 +255,133 @@ class TestObjectReplicator(unittest.TestCase): # only the meta and data should be left self.assertEquals(len(os.listdir(whole_hsh_path)), 2) + def test_invalidate_hash(self): -# def test_check_ring(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# self.assertTrue(self.replicator.check_ring()) -# orig_check = self.replicator.next_check -# self.replicator.next_check = orig_check - 30 -# self.assertTrue(self.replicator.check_ring()) -# self.replicator.next_check = orig_check -# orig_ring_time = self.replicator.object_ring._mtime -# self.replicator.object_ring._mtime = orig_ring_time - 30 -# self.assertTrue(self.replicator.check_ring()) -# self.replicator.next_check = orig_check - 30 -# self.assertFalse(self.replicator.check_ring()) -# -# def test_collect_jobs(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# self.assertTrue('1' in self.replicator.parts_to_delete) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['0']['nodes']], -# [1,2]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['1']['nodes']], -# [1,2,3]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['2']['nodes']], -# [2,3]) -# self.assertEquals( -# [node['id'] for node in self.replicator.partitions['3']['nodes']], -# [3,1]) -# for part in ['0', '1', '2', '3']: -# self.assertEquals(self.replicator.partitions[part]['device'], 'sda') -# self.assertEquals(self.replicator.partitions[part]['path'], -# self.objects) -# -# def test_delete_partition(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# part_path = os.path.join(self.objects, '1') -# self.assertTrue(os.access(part_path, os.F_OK)) -# self.replicator.delete_partition('1') -# self.assertFalse(os.access(part_path, os.F_OK)) -# -# def test_rsync(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(0,''), (0,''), (0,'')]): -# self.replicator.rsync('0') -# -# def test_rsync_delete_no(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(-1, "stuff in log"), (-1, "stuff in log"), -# (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [False, True, True]) -# -# def test_rsync_delete_yes(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(0,''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) -# -# def test_rsync_delete_yes_with_failure(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(-1, "stuff in log"), (0, ''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) -# -# def test_rsync_failed_drive(self): -# self.replicator.collect_jobs('sda', 0, self.ring) -# with _mock_process([(12,'There was an error in file IO'), -# (0,''), (0,''), (0,'')]): -# self.replicator.rsync('1') -# self.assertEquals(self.replicator.parts_to_delete['1'], -# [True, True, True]) + def assertFileData(file_path, data): + with open(file_path, 'r') as fp: + fdata = fp.read() + self.assertEquals(fdata, data) + + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, '0', data_dir) + hashes_file = os.path.join(self.objects, '0', + object_replicator.HASH_FILE) + # test that non existant file except caught + self.assertEquals(object_replicator.invalidate_hash(whole_path_from), + None) + # test that hashes get cleared + check_pickle_data = pickle.dumps({data_dir: None}, + object_replicator.PICKLE_PROTOCOL) + for data_hash in [{data_dir: None}, {data_dir: 'abcdefg'}]: + with open(hashes_file, 'wb') as fp: + pickle.dump(data_hash, fp, object_replicator.PICKLE_PROTOCOL) + object_replicator.invalidate_hash(whole_path_from) + assertFileData(hashes_file, check_pickle_data) + + def test_check_ring(self): + self.assertTrue(self.replicator.check_ring()) + orig_check = self.replicator.next_check + self.replicator.next_check = orig_check - 30 + self.assertTrue(self.replicator.check_ring()) + self.replicator.next_check = orig_check + orig_ring_time = self.replicator.object_ring._mtime + self.replicator.object_ring._mtime = orig_ring_time - 30 + self.assertTrue(self.replicator.check_ring()) + self.replicator.next_check = orig_check - 30 + self.assertFalse(self.replicator.check_ring()) + + def test_collect_jobs(self): + jobs = self.replicator.collect_jobs() + jobs_to_delete = [j for j in jobs if j['delete']] + jobs_to_keep = [j for j in jobs if not j['delete']] + jobs_by_part = {} + for job in jobs: + jobs_by_part[job['partition']] = job + self.assertEquals(len(jobs_to_delete), 1) + self.assertTrue('1', jobs_to_delete[0]['partition']) + self.assertEquals( + [node['id'] for node in jobs_by_part['0']['nodes']], [1, 2]) + self.assertEquals( + [node['id'] for node in jobs_by_part['1']['nodes']], [1, 2, 3]) + self.assertEquals( + [node['id'] for node in jobs_by_part['2']['nodes']], [2, 3]) + self.assertEquals( + [node['id'] for node in jobs_by_part['3']['nodes']], [3, 1]) + for part in ['0', '1', '2', '3']: + for node in jobs_by_part[part]['nodes']: + self.assertEquals(node['device'], 'sda') + self.assertEquals(jobs_by_part[part]['path'], + os.path.join(self.objects, part)) + + def test_delete_partition(self): + df = DiskFile(self.devices, 'sda', '0', 'a', 'c', 'o') + mkdirs(df.datadir) + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + part_path = os.path.join(self.objects, '1') + self.assertTrue(os.access(part_path, os.F_OK)) + self.replicator.replicate() + self.assertFalse(os.access(part_path, os.F_OK)) + + def test_rsync(self): + jobs = self.replicator.collect_jobs() + job = jobs[0] + node = job['nodes'][0] + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + with _mock_process([(0, ''), (0, ''), (0, '')]): + self.replicator.rsync(node, job, [data_dir]) + + def test_run_once_recover_from_failure(self): + replicator = object_replicator.ObjectReplicator( + dict(swift_dir=self.testdir, devices=self.devices, + mount_check='false', timeout='300', stats_interval='1')) + was_connector = object_replicator.http_connect + object_replicator.http_connect = mock_http_connect(200) + # Write some files into '1' and run replicate- they should be moved + # to the other partitoins and then node should get deleted. + cur_part = '1' + df = DiskFile(self.devices, 'sda', cur_part, 'a', 'c', 'o') + mkdirs(df.datadir) + f = open(os.path.join(df.datadir, + normalize_timestamp(time.time()) + '.data'), + 'wb') + f.write('1234567890') + f.close() + ohash = hash_path('a', 'c', 'o') + data_dir = ohash[-3:] + whole_path_from = os.path.join(self.objects, cur_part, data_dir) + process_arg_checker = [] + nodes = [node for node in + self.ring.get_part_nodes(int(cur_part)) \ + if node['ip'] not in _ips()] + for node in nodes: + rsync_mod = '%s::object/sda/objects/%s' % (node['ip'], cur_part) + process_arg_checker.append( + (0, '', ['rsync', whole_path_from, rsync_mod])) + self.assertTrue(os.access(os.path.join(self.objects, + '1', data_dir, ohash), + os.F_OK)) + with _mock_process(process_arg_checker): + replicator.run_once() + self.assertFalse(process_errors) + for i, result in [('0', True), ('1', False), + ('2', True), ('3', True)]: + self.assertEquals(os.access( + os.path.join(self.objects, + i, object_replicator.HASH_FILE), + os.F_OK), result) + object_replicator.http_connect = was_connector def test_run(self): - with _mock_process([(0,'')]*100): + with _mock_process([(0, '')]*100): self.replicator.replicate() def test_run_withlog(self): - with _mock_process([(0,"stuff in log")]*100): + with _mock_process([(0, "stuff in log")]*100): self.replicator.replicate() if __name__ == '__main__':