Merge "Exclude local_dev from sync partners on failure"

This commit is contained in:
Jenkins 2015-05-27 12:59:20 +00:00 committed by Gerrit Code Review
commit d0a55ee95f
7 changed files with 118 additions and 49 deletions

View File

@ -29,8 +29,8 @@ from eventlet.support.greenlets import GreenletExit
from swift import gettext_ as _ from swift import gettext_ as _
from swift.common.utils import ( from swift.common.utils import (
whataremyips, unlink_older_than, compute_eta, get_logger, whataremyips, unlink_older_than, compute_eta, get_logger,
dump_recon_cache, ismount, mkdirs, config_true_value, list_from_csv, dump_recon_cache, mkdirs, config_true_value, list_from_csv, get_hub,
get_hub, tpool_reraise, GreenAsyncPile, Timestamp, remove_file) tpool_reraise, GreenAsyncPile, Timestamp, remove_file)
from swift.common.swob import HeaderKeyDict from swift.common.swob import HeaderKeyDict
from swift.common.bufferedhttp import http_connect from swift.common.bufferedhttp import http_connect
from swift.common.daemon import Daemon from swift.common.daemon import Daemon
@ -568,9 +568,12 @@ class ObjectReconstructor(Daemon):
job['sync_to'], job['sync_to'],
# I think we could order these based on our index to better # I think we could order these based on our index to better
# protect against a broken chain # protect against a broken chain
itertools.ifilter( [
lambda n: n['id'] not in (n['id'] for n in job['sync_to']), n for n in
job['policy'].object_ring.get_part_nodes(job['partition'])), job['policy'].object_ring.get_part_nodes(job['partition'])
if n['id'] != job['local_dev']['id'] and
n['id'] not in (m['id'] for m in job['sync_to'])
],
) )
syncd_with = 0 syncd_with = 0
for node in dest_nodes: for node in dest_nodes:
@ -776,13 +779,14 @@ class ObjectReconstructor(Daemon):
if override_devices and (local_dev['device'] not in if override_devices and (local_dev['device'] not in
override_devices): override_devices):
continue continue
dev_path = join(self.devices_dir, local_dev['device']) dev_path = self._df_router[policy].get_dev_path(
obj_path = join(dev_path, data_dir) local_dev['device'])
tmp_path = join(dev_path, get_tmp_dir(int(policy))) if not dev_path:
if self.mount_check and not ismount(dev_path):
self.logger.warn(_('%s is not mounted'), self.logger.warn(_('%s is not mounted'),
local_dev['device']) local_dev['device'])
continue continue
obj_path = join(dev_path, data_dir)
tmp_path = join(dev_path, get_tmp_dir(int(policy)))
unlink_older_than(tmp_path, time.time() - unlink_older_than(tmp_path, time.time() -
self.reclaim_age) self.reclaim_age)
if not os.path.exists(obj_path): if not os.path.exists(obj_path):

View File

@ -19,7 +19,6 @@ import eventlet
import eventlet.wsgi import eventlet.wsgi
import eventlet.greenio import eventlet.greenio
from swift.common import constraints
from swift.common import exceptions from swift.common import exceptions
from swift.common import http from swift.common import http
from swift.common import swob from swift.common import swob
@ -176,8 +175,7 @@ class Receiver(object):
self.frag_index = None self.frag_index = None
utils.validate_device_partition(self.device, self.partition) utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy] self.diskfile_mgr = self.app._diskfile_router[self.policy]
if self.diskfile_mgr.mount_check and not constraints.check_mount( if not self.diskfile_mgr.get_dev_path(self.device):
self.diskfile_mgr.devices, self.device):
raise swob.HTTPInsufficientStorage(drive=self.device) raise swob.HTTPInsufficientStorage(drive=self.device)
self.fp = self.request.environ['wsgi.input'] self.fp = self.request.environ['wsgi.input']
for data in self._ensure_flush(): for data in self._ensure_flush():

View File

@ -26,7 +26,7 @@ from swiftclient import get_auth, head_account
from swift.obj.diskfile import get_data_dir from swift.obj.diskfile import get_data_dir
from swift.common.ring import Ring from swift.common.ring import Ring
from swift.common.utils import readconf from swift.common.utils import readconf, renamer
from swift.common.manager import Manager from swift.common.manager import Manager
from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY from swift.common.storage_policy import POLICIES, EC_POLICY, REPL_POLICY
@ -314,6 +314,19 @@ class ProbeTest(unittest.TestCase):
self.updaters.once() self.updaters.once()
self.replicators.once() self.replicators.once()
def kill_drive(self, device):
if os.path.ismount(device):
os.system('sudo umount %s' % device)
else:
renamer(device, device + "X")
def revive_drive(self, device):
disabled_name = device + "X"
if os.path.isdir(disabled_name):
renamer(device + "X", device)
else:
os.system('sudo mount %s' % device)
class ReplProbeTest(ProbeTest): class ReplProbeTest(ProbeTest):

View File

@ -19,12 +19,14 @@ import unittest
import uuid import uuid
import shutil import shutil
import random import random
from collections import defaultdict
from test.probe.common import ECProbeTest from test.probe.common import ECProbeTest
from swift.common import direct_client from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY from swift.common.storage_policy import EC_POLICY
from swift.common.manager import Manager from swift.common.manager import Manager
from swift.obj.reconstructor import _get_partners
from swiftclient import client from swiftclient import client
@ -165,6 +167,61 @@ class TestReconstructorRebuild(ECProbeTest):
self._format_node(onode), self._format_node(onode),
[self._format_node(n) for n in node_list])) [self._format_node(n) for n in node_list]))
def test_rebuild_partner_down(self):
# create EC container
headers = {'X-Storage-Policy': self.policy.name}
client.put_container(self.url, self.token, self.container_name,
headers=headers)
# PUT object
contents = Body()
client.put_object(self.url, self.token,
self.container_name,
self.object_name,
contents=contents)
opart, onodes = self.object_ring.get_nodes(
self.account, self.container_name, self.object_name)
# find a primary server that only has one of it's devices in the
# primary node list
group_nodes_by_config = defaultdict(list)
for n in onodes:
group_nodes_by_config[self.config_number(n)].append(n)
for config_number, node_list in group_nodes_by_config.items():
if len(node_list) == 1:
break
else:
self.fail('ring balancing did not use all available nodes')
primary_node = node_list[0]
# pick one it's partners to fail randomly
partner_node = random.choice(_get_partners(
primary_node['index'], onodes))
# 507 the partner device
device_path = self.device_dir('object', partner_node)
self.kill_drive(device_path)
# select another primary sync_to node to fail
failed_primary = [n for n in onodes if n['id'] not in
(primary_node['id'], partner_node['id'])][0]
# ... capture it's fragment etag
failed_primary_etag = self.direct_get(failed_primary, opart)
# ... and delete it
part_dir = self.storage_dir('object', failed_primary, part=opart)
shutil.rmtree(part_dir, True)
# reconstruct from the primary, while one of it's partners is 507'd
self.reconstructor.once(number=self.config_number(primary_node))
# the other failed primary will get it's fragment rebuilt instead
self.assertEqual(failed_primary_etag,
self.direct_get(failed_primary, opart))
# just to be nice
self.revive_drive(device_path)
if __name__ == "__main__": if __name__ == "__main__":
unittest.main() unittest.main()

View File

@ -17,7 +17,6 @@
from hashlib import md5 from hashlib import md5
import unittest import unittest
import uuid import uuid
import os
import random import random
import shutil import shutil
from collections import defaultdict from collections import defaultdict
@ -27,7 +26,6 @@ from test.probe.common import ECProbeTest
from swift.common import direct_client from swift.common import direct_client
from swift.common.storage_policy import EC_POLICY from swift.common.storage_policy import EC_POLICY
from swift.common.manager import Manager from swift.common.manager import Manager
from swift.common.utils import renamer
from swift.obj import reconstructor from swift.obj import reconstructor
from swiftclient import client from swiftclient import client
@ -70,19 +68,6 @@ class TestReconstructorRevert(ECProbeTest):
self.assertEqual(self.policy.policy_type, EC_POLICY) self.assertEqual(self.policy.policy_type, EC_POLICY)
self.reconstructor = Manager(["object-reconstructor"]) self.reconstructor = Manager(["object-reconstructor"])
def kill_drive(self, device):
if os.path.ismount(device):
os.system('sudo umount %s' % device)
else:
renamer(device, device + "X")
def revive_drive(self, device):
disabled_name = device + "X"
if os.path.isdir(disabled_name):
renamer(device + "X", device)
else:
os.system('sudo mount %s' % device)
def proxy_get(self): def proxy_get(self):
# GET object # GET object
headers, body = client.get_object(self.url, self.token, headers, body = client.get_object(self.url, self.token,
@ -277,6 +262,8 @@ class TestReconstructorRevert(ECProbeTest):
else: else:
self.fail('ring balancing did not use all available nodes') self.fail('ring balancing did not use all available nodes')
primary_node = node_list[0] primary_node = node_list[0]
# ... and 507 it's device
primary_device = self.device_dir('object', primary_node) primary_device = self.device_dir('object', primary_node)
self.kill_drive(primary_device) self.kill_drive(primary_device)

View File

@ -932,7 +932,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_insufficient_storage(self): def test_process_job_all_insufficient_storage(self):
self.reconstructor._reset_stats() self.reconstructor._reset_stats()
with mock_ssync_sender(): with mock_ssync_sender():
with mocked_http_conn(*[507] * 10): with mocked_http_conn(*[507] * 8):
found_jobs = [] found_jobs = []
for part_info in self.reconstructor.collect_parts(): for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs( jobs = self.reconstructor.build_reconstruction_jobs(
@ -954,7 +954,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_client_error(self): def test_process_job_all_client_error(self):
self.reconstructor._reset_stats() self.reconstructor._reset_stats()
with mock_ssync_sender(): with mock_ssync_sender():
with mocked_http_conn(*[400] * 10): with mocked_http_conn(*[400] * 8):
found_jobs = [] found_jobs = []
for part_info in self.reconstructor.collect_parts(): for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs( jobs = self.reconstructor.build_reconstruction_jobs(
@ -976,7 +976,7 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_process_job_all_timeout(self): def test_process_job_all_timeout(self):
self.reconstructor._reset_stats() self.reconstructor._reset_stats()
with mock_ssync_sender(): with mock_ssync_sender():
with nested(mocked_http_conn(*[Timeout()] * 10)): with nested(mocked_http_conn(*[Timeout()] * 8)):
found_jobs = [] found_jobs = []
for part_info in self.reconstructor.collect_parts(): for part_info in self.reconstructor.collect_parts():
jobs = self.reconstructor.build_reconstruction_jobs( jobs = self.reconstructor.build_reconstruction_jobs(
@ -1012,6 +1012,13 @@ class TestObjectReconstructor(unittest.TestCase):
'bind_port': self.port, 'bind_port': self.port,
} }
self.logger = debug_logger('object-reconstructor') self.logger = debug_logger('object-reconstructor')
self._configure_reconstructor()
self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter()
def _configure_reconstructor(self, **kwargs):
self.conf.update(kwargs)
self.reconstructor = object_reconstructor.ObjectReconstructor( self.reconstructor = object_reconstructor.ObjectReconstructor(
self.conf, logger=self.logger) self.conf, logger=self.logger)
self.reconstructor._reset_stats() self.reconstructor._reset_stats()
@ -1019,9 +1026,6 @@ class TestObjectReconstructor(unittest.TestCase):
# directly, so you end up with a /0 when you try to show the # directly, so you end up with a /0 when you try to show the
# percentage of complete jobs as ratio of the total job count # percentage of complete jobs as ratio of the total job count
self.reconstructor.job_count = 1 self.reconstructor.job_count = 1
self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter()
def tearDown(self): def tearDown(self):
self.reconstructor.stats_line() self.reconstructor.stats_line()
@ -1115,16 +1119,16 @@ class TestObjectReconstructor(unittest.TestCase):
paths = [] paths = []
def fake_ismount(path): def fake_check_mount(devices, device):
paths.append(path) paths.append(os.path.join(devices, device))
return False return False
with nested(mock.patch('swift.obj.reconstructor.whataremyips', with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs', mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), new=stub_ring_devs),
mock.patch('swift.obj.reconstructor.ismount', mock.patch('swift.obj.diskfile.check_mount',
fake_ismount)): fake_check_mount)):
part_infos = list(self.reconstructor.collect_parts()) part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(2, len(part_infos)) # sanity, same jobs self.assertEqual(2, len(part_infos)) # sanity, same jobs
self.assertEqual(set(int(p['partition']) for p in part_infos), self.assertEqual(set(int(p['partition']) for p in part_infos),
@ -1134,13 +1138,16 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(paths, []) self.assertEqual(paths, [])
# ... now with mount check # ... now with mount check
self.reconstructor.mount_check = True self._configure_reconstructor(mount_check=True)
self.assertTrue(self.reconstructor.mount_check)
for policy in POLICIES:
self.assertTrue(self.reconstructor._df_router[policy].mount_check)
with nested(mock.patch('swift.obj.reconstructor.whataremyips', with nested(mock.patch('swift.obj.reconstructor.whataremyips',
return_value=[self.ip]), return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs', mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), new=stub_ring_devs),
mock.patch('swift.obj.reconstructor.ismount', mock.patch('swift.obj.diskfile.check_mount',
fake_ismount)): fake_check_mount)):
part_infos = list(self.reconstructor.collect_parts()) part_infos = list(self.reconstructor.collect_parts())
self.assertEqual([], part_infos) # sanity, no jobs self.assertEqual([], part_infos) # sanity, no jobs
@ -1148,7 +1155,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(set(paths), set([ self.assertEqual(set(paths), set([
os.path.join(self.devices, dev) for dev in local_devs])) os.path.join(self.devices, dev) for dev in local_devs]))
def fake_ismount(path): def fake_check_mount(devices, device):
path = os.path.join(devices, device)
if path.endswith('sda'): if path.endswith('sda'):
return True return True
else: else:
@ -1158,8 +1166,8 @@ class TestObjectReconstructor(unittest.TestCase):
return_value=[self.ip]), return_value=[self.ip]),
mock.patch.object(self.policy.object_ring, '_devs', mock.patch.object(self.policy.object_ring, '_devs',
new=stub_ring_devs), new=stub_ring_devs),
mock.patch('swift.obj.reconstructor.ismount', mock.patch('swift.obj.diskfile.check_mount',
fake_ismount)): fake_check_mount)):
part_infos = list(self.reconstructor.collect_parts()) part_infos = list(self.reconstructor.collect_parts())
self.assertEqual(1, len(part_infos)) # only sda picked up (part 0) self.assertEqual(1, len(part_infos)) # only sda picked up (part 0)
self.assertEqual(part_infos[0]['partition'], 0) self.assertEqual(part_infos[0]['partition'], 0)
@ -1171,6 +1179,8 @@ class TestObjectReconstructor(unittest.TestCase):
'replication_ip': self.ip, 'replication_ip': self.ip,
'replication_port': self.port 'replication_port': self.port
} for dev in local_devs] } for dev in local_devs]
for device in local_devs:
utils.mkdirs(os.path.join(self.devices, device))
fake_unlink = mock.MagicMock() fake_unlink = mock.MagicMock()
self.reconstructor.reclaim_age = 1000 self.reconstructor.reclaim_age = 1000
now = time.time() now = time.time()

View File

@ -23,7 +23,6 @@ import unittest
import eventlet import eventlet
import mock import mock
from swift.common import constraints
from swift.common import exceptions from swift.common import exceptions
from swift.common import swob from swift.common import swob
from swift.common import utils from swift.common import utils
@ -53,6 +52,7 @@ class TestReceiver(unittest.TestCase):
'mount_check': 'false', 'mount_check': 'false',
'replication_one_per_device': 'false', 'replication_one_per_device': 'false',
'log_requests': 'false'} 'log_requests': 'false'}
utils.mkdirs(os.path.join(self.testdir, 'device', 'partition'))
self.controller = server.ObjectController(self.conf) self.controller = server.ObjectController(self.conf)
self.controller.bytes_per_sync = 1 self.controller.bytes_per_sync = 1
@ -285,8 +285,8 @@ class TestReceiver(unittest.TestCase):
mock.patch.object( mock.patch.object(
self.controller._diskfile_router[POLICIES.legacy], self.controller._diskfile_router[POLICIES.legacy],
'mount_check', False), 'mount_check', False),
mock.patch.object( mock.patch('swift.obj.diskfile.check_mount',
constraints, 'check_mount', return_value=False)) as ( return_value=False)) as (
mocked_replication_semaphore, mocked_replication_semaphore,
mocked_mount_check, mocked_mount_check,
mocked_check_mount): mocked_check_mount):
@ -305,8 +305,8 @@ class TestReceiver(unittest.TestCase):
mock.patch.object( mock.patch.object(
self.controller._diskfile_router[POLICIES.legacy], self.controller._diskfile_router[POLICIES.legacy],
'mount_check', True), 'mount_check', True),
mock.patch.object( mock.patch('swift.obj.diskfile.check_mount',
constraints, 'check_mount', return_value=False)) as ( return_value=False)) as (
mocked_replication_semaphore, mocked_replication_semaphore,
mocked_mount_check, mocked_mount_check,
mocked_check_mount): mocked_check_mount):