diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 2dd743fa9a..d5478a1d5e 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -29,8 +29,8 @@ from eventlet.support.greenlets import GreenletExit from swift import gettext_ as _ from swift.common.utils import ( whataremyips, unlink_older_than, compute_eta, get_logger, - dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv, - get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file) + dump_recon_cache, mkdirs, config_true_value, list_from_csv, get_hub, + tpool_reraise, GreenAsyncPile, Timestamp, remove_file) from swift.common.swob import HeaderKeyDict from swift.common.bufferedhttp import http_connect from swift.common.daemon import Daemon @@ -568,9 +568,12 @@ class ObjectReconstructor(Daemon): job['sync_to'], # I think we could order these based on our index to better # protect against a broken chain - itertools.ifilter( - lambda n: n['id'] not in (n['id'] for n in job['sync_to']), - job['policy'].object_ring.get_part_nodes(job['partition'])), + [ + n for n in + job['policy'].object_ring.get_part_nodes(job['partition']) + if n['id'] != job['local_dev']['id'] and + n['id'] not in (m['id'] for m in job['sync_to']) + ], ) syncd_with = 0 for node in dest_nodes: @@ -776,13 +779,14 @@ class ObjectReconstructor(Daemon): if override_devices and (local_dev['device'] not in override_devices): continue - dev_path = join(self.devices_dir, local_dev['device']) - obj_path = join(dev_path, data_dir) - tmp_path = join(dev_path, get_tmp_dir(int(policy))) - if self.mount_check and not ismount(dev_path): + dev_path = self._df_router[policy].get_dev_path( + local_dev['device']) + if not dev_path: self.logger.warn(_('%s is not mounted'), local_dev['device']) continue + obj_path = join(dev_path, data_dir) + tmp_path = join(dev_path, get_tmp_dir(int(policy))) unlink_older_than(tmp_path, time.time() - self.reclaim_age) if not os.path.exists(obj_path): diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index b636a16245..aa685211ae 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -19,7 +19,6 @@ import eventlet import eventlet.wsgi import eventlet.greenio -from swift.common import constraints from swift.common import exceptions from swift.common import http from swift.common import swob @@ -176,8 +175,7 @@ class Receiver(object): self.frag_index = None utils.validate_device_partition(self.device, self.partition) self.diskfile_mgr = self.app._diskfile_router[self.policy] - if self.diskfile_mgr.mount_check and not constraints.check_mount( - self.diskfile_mgr.devices, self.device): + if not self.diskfile_mgr.get_dev_path(self.device): raise swob.HTTPInsufficientStorage(drive=self.device) self.fp = self.request.environ['wsgi.input'] for data in self._ensure_flush(): diff --git a/test/probe/common.py b/test/probe/common.py index 7d1e754014..ca1225f9fb 100644 --- a/test/probe/common.py +++ b/test/probe/common.py @@ -26,7 +26,7 @@ from swiftclient import get_auth, head_account from swift.obj.diskfile import get_data_dir from swift.common.ring import Ring -from swift.common.utils import readconf +from swift.common.utils import readconf, renamer from swift.common.manager import Manager from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY @@ -314,6 +314,19 @@ class ProbeTest(unittest.TestCase): self.updaters.once() self.replicators.once() + def kill_drive(self, device): + if os.path.ismount(device): + os.system('sudo umount %s' % device) + else: + renamer(device, device + "X") + + def revive_drive(self, device): + disabled_name = device + "X" + if os.path.isdir(disabled_name): + renamer(device + "X", device) + else: + os.system('sudo mount %s' % device) + class ReplProbeTest(ProbeTest): diff --git a/test/probe/test_reconstructor_rebuild.py b/test/probe/test_reconstructor_rebuild.py index 5edfcc52d1..bf568ccc68 100644 --- a/test/probe/test_reconstructor_rebuild.py +++ b/test/probe/test_reconstructor_rebuild.py @@ -19,12 +19,14 @@ import unittest import uuid import shutil import random +from collections import defaultdict from test.probe.common import ECProbeTest from swift.common import direct_client from swift.common.storage_policy import EC_POLICY from swift.common.manager import Manager +from swift.obj.reconstructor import _get_partners from swiftclient import client @@ -165,6 +167,61 @@ class TestReconstructorRebuild(ECProbeTest): self._format_node(onode), [self._format_node(n) for n in node_list])) + def test_rebuild_partner_down(self): + # create EC container + headers = {'X-Storage-Policy': self.policy.name} + client.put_container(self.url, self.token, self.container_name, + headers=headers) + + # PUT object + contents = Body() + client.put_object(self.url, self.token, + self.container_name, + self.object_name, + contents=contents) + + opart, onodes = self.object_ring.get_nodes( + self.account, self.container_name, self.object_name) + + # find a primary server that only has one of it's devices in the + # primary node list + group_nodes_by_config = defaultdict(list) + for n in onodes: + group_nodes_by_config[self.config_number(n)].append(n) + for config_number, node_list in group_nodes_by_config.items(): + if len(node_list) == 1: + break + else: + self.fail('ring balancing did not use all available nodes') + primary_node = node_list[0] + + # pick one it's partners to fail randomly + partner_node = random.choice(_get_partners( + primary_node['index'], onodes)) + + # 507 the partner device + device_path = self.device_dir('object', partner_node) + self.kill_drive(device_path) + + # select another primary sync_to node to fail + failed_primary = [n for n in onodes if n['id'] not in + (primary_node['id'], partner_node['id'])][0] + # ... capture it's fragment etag + failed_primary_etag = self.direct_get(failed_primary, opart) + # ... and delete it + part_dir = self.storage_dir('object', failed_primary, part=opart) + shutil.rmtree(part_dir, True) + + # reconstruct from the primary, while one of it's partners is 507'd + self.reconstructor.once(number=self.config_number(primary_node)) + + # the other failed primary will get it's fragment rebuilt instead + self.assertEqual(failed_primary_etag, + self.direct_get(failed_primary, opart)) + + # just to be nice + self.revive_drive(device_path) + if __name__ == "__main__": unittest.main() diff --git a/test/probe/test_reconstructor_revert.py b/test/probe/test_reconstructor_revert.py index 39739b617d..249a6b5d62 100755 --- a/test/probe/test_reconstructor_revert.py +++ b/test/probe/test_reconstructor_revert.py @@ -17,7 +17,6 @@ from hashlib import md5 import unittest import uuid -import os import random import shutil from collections import defaultdict @@ -27,7 +26,6 @@ from test.probe.common import ECProbeTest from swift.common import direct_client from swift.common.storage_policy import EC_POLICY from swift.common.manager import Manager -from swift.common.utils import renamer from swift.obj import reconstructor from swiftclient import client @@ -70,19 +68,6 @@ class TestReconstructorRevert(ECProbeTest): self.assertEqual(self.policy.policy_type, EC_POLICY) self.reconstructor = Manager(["object-reconstructor"]) - def kill_drive(self, device): - if os.path.ismount(device): - os.system('sudo umount %s' % device) - else: - renamer(device, device + "X") - - def revive_drive(self, device): - disabled_name = device + "X" - if os.path.isdir(disabled_name): - renamer(device + "X", device) - else: - os.system('sudo mount %s' % device) - def proxy_get(self): # GET object headers, body = client.get_object(self.url, self.token, @@ -277,6 +262,8 @@ class TestReconstructorRevert(ECProbeTest): else: self.fail('ring balancing did not use all available nodes') primary_node = node_list[0] + + # ... and 507 it's device primary_device = self.device_dir('object', primary_node) self.kill_drive(primary_device) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 23e70543f7..321ea3751d 100755 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -932,7 +932,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_process_job_all_insufficient_storage(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[507] * 10): + with mocked_http_conn(*[507] * 8): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -954,7 +954,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_process_job_all_client_error(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with mocked_http_conn(*[400] * 10): + with mocked_http_conn(*[400] * 8): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -976,7 +976,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase): def test_process_job_all_timeout(self): self.reconstructor._reset_stats() with mock_ssync_sender(): - with nested(mocked_http_conn(*[Timeout()] * 10)): + with nested(mocked_http_conn(*[Timeout()] * 8)): found_jobs = [] for part_info in self.reconstructor.collect_parts(): jobs = self.reconstructor.build_reconstruction_jobs( @@ -1012,6 +1012,13 @@ class TestObjectReconstructor(unittest.TestCase): 'bind_port': self.port, } self.logger = debug_logger('object-reconstructor') + self._configure_reconstructor() + self.policy.object_ring.max_more_nodes = \ + self.policy.object_ring.replicas + self.ts_iter = make_timestamp_iter() + + def _configure_reconstructor(self, **kwargs): + self.conf.update(kwargs) self.reconstructor = object_reconstructor.ObjectReconstructor( self.conf, logger=self.logger) self.reconstructor._reset_stats() @@ -1019,9 +1026,6 @@ class TestObjectReconstructor(unittest.TestCase): # directly, so you end up with a /0 when you try to show the # percentage of complete jobs as ratio of the total job count self.reconstructor.job_count = 1 - self.policy.object_ring.max_more_nodes = \ - self.policy.object_ring.replicas - self.ts_iter = make_timestamp_iter() def tearDown(self): self.reconstructor.stats_line() @@ -1115,16 +1119,16 @@ class TestObjectReconstructor(unittest.TestCase): paths = [] - def fake_ismount(path): - paths.append(path) + def fake_check_mount(devices, device): + paths.append(os.path.join(devices, device)) return False with nested(mock.patch('swift.obj.reconstructor.whataremyips', return_value=[self.ip]), mock.patch.object(self.policy.object_ring, '_devs', new=stub_ring_devs), - mock.patch('swift.obj.reconstructor.ismount', - fake_ismount)): + mock.patch('swift.obj.diskfile.check_mount', + fake_check_mount)): part_infos = list(self.reconstructor.collect_parts()) self.assertEqual(2, len(part_infos)) # sanity, same jobs self.assertEqual(set(int(p['partition']) for p in part_infos), @@ -1134,13 +1138,16 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(paths, []) # ... now with mount check - self.reconstructor.mount_check = True + self._configure_reconstructor(mount_check=True) + self.assertTrue(self.reconstructor.mount_check) + for policy in POLICIES: + self.assertTrue(self.reconstructor._df_router[policy].mount_check) with nested(mock.patch('swift.obj.reconstructor.whataremyips', return_value=[self.ip]), mock.patch.object(self.policy.object_ring, '_devs', new=stub_ring_devs), - mock.patch('swift.obj.reconstructor.ismount', - fake_ismount)): + mock.patch('swift.obj.diskfile.check_mount', + fake_check_mount)): part_infos = list(self.reconstructor.collect_parts()) self.assertEqual([], part_infos) # sanity, no jobs @@ -1148,7 +1155,8 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(set(paths), set([ os.path.join(self.devices, dev) for dev in local_devs])) - def fake_ismount(path): + def fake_check_mount(devices, device): + path = os.path.join(devices, device) if path.endswith('sda'): return True else: @@ -1158,8 +1166,8 @@ class TestObjectReconstructor(unittest.TestCase): return_value=[self.ip]), mock.patch.object(self.policy.object_ring, '_devs', new=stub_ring_devs), - mock.patch('swift.obj.reconstructor.ismount', - fake_ismount)): + mock.patch('swift.obj.diskfile.check_mount', + fake_check_mount)): part_infos = list(self.reconstructor.collect_parts()) self.assertEqual(1, len(part_infos)) # only sda picked up (part 0) self.assertEqual(part_infos[0]['partition'], 0) @@ -1171,6 +1179,8 @@ class TestObjectReconstructor(unittest.TestCase): 'replication_ip': self.ip, 'replication_port': self.port } for dev in local_devs] + for device in local_devs: + utils.mkdirs(os.path.join(self.devices, device)) fake_unlink = mock.MagicMock() self.reconstructor.reclaim_age = 1000 now = time.time() diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 4a030c821d..9fdfe7d102 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -23,7 +23,6 @@ import unittest import eventlet import mock -from swift.common import constraints from swift.common import exceptions from swift.common import swob from swift.common import utils @@ -53,6 +52,7 @@ class TestReceiver(unittest.TestCase): 'mount_check': 'false', 'replication_one_per_device': 'false', 'log_requests': 'false'} + utils.mkdirs(os.path.join(self.testdir, 'device', 'partition')) self.controller = server.ObjectController(self.conf) self.controller.bytes_per_sync = 1 @@ -285,8 +285,8 @@ class TestReceiver(unittest.TestCase): mock.patch.object( self.controller._diskfile_router[POLICIES.legacy], 'mount_check', False), - mock.patch.object( - constraints, 'check_mount', return_value=False)) as ( + mock.patch('swift.obj.diskfile.check_mount', + return_value=False)) as ( mocked_replication_semaphore, mocked_mount_check, mocked_check_mount): @@ -305,8 +305,8 @@ class TestReceiver(unittest.TestCase): mock.patch.object( self.controller._diskfile_router[POLICIES.legacy], 'mount_check', True), - mock.patch.object( - constraints, 'check_mount', return_value=False)) as ( + mock.patch('swift.obj.diskfile.check_mount', + return_value=False)) as ( mocked_replication_semaphore, mocked_mount_check, mocked_check_mount):