Volume discovery and local storage management lib
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

251 lines
12 KiB

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
import os
import tempfile
from oslo_concurrency import processutils as putils
import six
from os_brick import exception
from os_brick.privileged import rootwrap as priv_rootwrap
from os_brick.remotefs import remotefs
from os_brick.tests import base
class RemoteFsClientTestCase(base.TestCase):
def setUp(self):
super(RemoteFsClientTestCase, self).setUp()
self.mock_execute = self.mock_object(priv_rootwrap, 'execute',
return_value=None)
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])
def test_cifs(self, mock_read_mounts):
client = remotefs.RemoteFsClient("cifs", root_helper='true',
smbfs_mount_point_base='/mnt')
share = '10.0.0.1:/qwe'
mount_point = client.get_mount_point(share)
client.mount(share)
calls = [mock.call('mkdir', '-p', mount_point, check_exit_code=0),
mock.call('mount', '-t', 'cifs', share, mount_point,
run_as_root=True, root_helper='true',
check_exit_code=0)]
self.mock_execute.assert_has_calls(calls)
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])
def test_nfs(self, mock_read_mounts):
client = remotefs.RemoteFsClient("nfs", root_helper='true',
nfs_mount_point_base='/mnt')
share = '10.0.0.1:/qwe'
mount_point = client.get_mount_point(share)
client.mount(share)
calls = [mock.call('mkdir', '-p', mount_point, check_exit_code=0),
mock.call('mount', '-t', 'nfs', '-o', 'vers=4,minorversion=1',
share, mount_point, check_exit_code=0,
run_as_root=True, root_helper='true')]
self.mock_execute.assert_has_calls(calls)
def test_read_mounts(self):
mounts = """device1 on mnt_point1
device2 on mnt_point2 type ext4 opts"""
with mock.patch.object(priv_rootwrap, 'execute',
return_value=[mounts, '']):
client = remotefs.RemoteFsClient("cifs", root_helper='true',
smbfs_mount_point_base='/mnt')
ret = client._read_mounts()
self.assertEqual(ret, {'mnt_point1': 'device1',
'mnt_point2': 'device2'})
@mock.patch.object(priv_rootwrap, 'execute')
@mock.patch.object(remotefs.RemoteFsClient, '_do_mount')
def test_mount_already_mounted(self, mock_do_mount, mock_execute):
share = "10.0.0.1:/share"
client = remotefs.RemoteFsClient("cifs", root_helper='true',
smbfs_mount_point_base='/mnt')
mounts = {client.get_mount_point(share): 'some_dev'}
with mock.patch.object(client, '_read_mounts',
return_value=mounts):
client.mount(share)
self.assertEqual(mock_do_mount.call_count, 0)
self.assertEqual(mock_execute.call_count, 0)
@mock.patch.object(priv_rootwrap, 'execute')
def test_mount_race(self, mock_execute):
err_msg = 'mount.nfs: /var/asdf is already mounted'
mock_execute.side_effect = putils.ProcessExecutionError(stderr=err_msg)
mounts = {'192.0.2.20:/share': '/var/asdf/'}
client = remotefs.RemoteFsClient("nfs", root_helper='true',
nfs_mount_point_base='/var/asdf')
with mock.patch.object(client, '_read_mounts',
return_value=mounts):
client._do_mount('nfs', '192.0.2.20:/share', '/var/asdf')
@mock.patch.object(priv_rootwrap, 'execute')
def test_mount_failure(self, mock_execute):
err_msg = 'mount.nfs: nfs broke'
mock_execute.side_effect = putils.ProcessExecutionError(stderr=err_msg)
client = remotefs.RemoteFsClient("nfs", root_helper='true',
nfs_mount_point_base='/var/asdf')
self.assertRaises(putils.ProcessExecutionError,
client._do_mount,
'nfs', '192.0.2.20:/share', '/var/asdf')
def _test_no_mount_point(self, fs_type):
self.assertRaises(exception.InvalidParameterValue,
remotefs.RemoteFsClient,
fs_type, root_helper='true')
def test_no_mount_point_nfs(self):
self._test_no_mount_point('nfs')
def test_no_mount_point_cifs(self):
self._test_no_mount_point('cifs')
def test_no_mount_point_glusterfs(self):
self._test_no_mount_point('glusterfs')
def test_no_mount_point_vzstorage(self):
self._test_no_mount_point('vzstorage')
def test_no_mount_point_quobyte(self):
self._test_no_mount_point('quobyte')
def test_invalid_fs(self):
self.assertRaises(exception.ProtocolNotSupported,
remotefs.RemoteFsClient,
'my_fs', root_helper='true')
def test_init_sets_mount_base(self):
client = remotefs.RemoteFsClient("cifs", root_helper='true',
smbfs_mount_point_base='/fake',
cifs_mount_point_base='/fake2')
# Tests that although the FS type is "cifs", the config option
# starts with "smbfs_"
self.assertEqual('/fake', client._mount_base)
@mock.patch('os_brick.remotefs.remotefs.RemoteFsClient._check_nfs_options')
def test_init_nfs_calls_check_nfs_options(self, mock_check_nfs_options):
remotefs.RemoteFsClient("nfs", root_helper='true',
nfs_mount_point_base='/fake')
mock_check_nfs_options.assert_called_once_with()
class VZStorageRemoteFSClientTestVase(RemoteFsClientTestCase):
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])
def test_vzstorage_by_cluster_name(self, mock_read_mounts):
client = remotefs.VZStorageRemoteFSClient(
"vzstorage", root_helper='true', vzstorage_mount_point_base='/mnt')
share = 'qwe'
cluster_name = share
mount_point = client.get_mount_point(share)
client.mount(share)
calls = [mock.call('mkdir', '-p', mount_point, check_exit_code=0),
mock.call('pstorage-mount', '-c', cluster_name, mount_point,
root_helper='true', check_exit_code=0,
run_as_root=True)]
self.mock_execute.assert_has_calls(calls)
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])
def test_vzstorage_with_auth(self, mock_read_mounts):
client = remotefs.VZStorageRemoteFSClient(
"vzstorage", root_helper='true', vzstorage_mount_point_base='/mnt')
cluster_name = 'qwe'
password = '123456'
share = '%s:%s' % (cluster_name, password)
mount_point = client.get_mount_point(share)
client.mount(share)
calls = [mock.call('mkdir', '-p', mount_point, check_exit_code=0),
mock.call('pstorage', '-c', cluster_name, 'auth-node', '-P',
process_input=password, root_helper='true',
run_as_root=True),
mock.call('pstorage-mount', '-c', cluster_name, mount_point,
root_helper='true', check_exit_code=0,
run_as_root=True)]
self.mock_execute.assert_has_calls(calls)
@mock.patch('os.path.exists', return_value=False)
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])
def test_vzstorage_with_mds_list(self, mock_read_mounts, mock_exists):
client = remotefs.VZStorageRemoteFSClient(
"vzstorage", root_helper='true', vzstorage_mount_point_base='/mnt')
cluster_name = 'qwe'
mds_list = ['10.0.0.1', '10.0.0.2']
share = '%s:/%s' % (','.join(mds_list), cluster_name)
mount_point = client.get_mount_point(share)
vz_conf_dir = os.path.join('/etc/pstorage/clusters/', cluster_name)
tmp_dir = '/tmp/fake_dir/'
with mock.patch.object(tempfile, 'mkdtemp',
return_value=tmp_dir):
mock_open = mock.mock_open()
with mock.patch.object(six.moves.builtins, "open",
mock_open, create=True):
client.mount(share)
write_calls = [mock.call(tmp_dir + 'bs_list', 'w'),
mock.call().__enter__(),
mock.call().write('10.0.0.1\n'),
mock.call().write('10.0.0.2\n'),
mock.call().__exit__(None, None, None)]
mock_open.assert_has_calls(write_calls)
calls = [mock.call('mkdir', '-p', mount_point, check_exit_code=0),
mock.call('cp', '-rf', tmp_dir, vz_conf_dir,
run_as_root=True, root_helper='true'),
mock.call('chown', '-R', 'root:root', vz_conf_dir,
run_as_root=True, root_helper='true'),
mock.call('pstorage-mount', '-c', cluster_name, mount_point,
root_helper='true', check_exit_code=0,
run_as_root=True)]
self.mock_execute.assert_has_calls(calls)
@mock.patch.object(remotefs.RemoteFsClient, '_read_mounts',
return_value=[])
def test_vzstorage_invalid_share(self, mock_read_mounts):
client = remotefs.VZStorageRemoteFSClient(
"vzstorage", root_helper='true', vzstorage_mount_point_base='/mnt')
self.assertRaises(exception.BrickException, client.mount, ':')
class ScalityRemoteFsClientTestCase(base.TestCase):
def test_no_mount_point_scality(self):
self.assertRaises(exception.InvalidParameterValue,
remotefs.ScalityRemoteFsClient,
'scality', root_helper='true')
def test_get_mount_point(self):
fsclient = remotefs.ScalityRemoteFsClient(
'scality', root_helper='true', scality_mount_point_base='/fake')
self.assertEqual('/fake/path/00', fsclient.get_mount_point('path'))
@mock.patch('oslo_concurrency.processutils.execute', return_value=None)
@mock.patch('os_brick.remotefs.remotefs.RemoteFsClient._do_mount')
def test_mount(self, mock_do_mount, mock_execute):
fsclient = remotefs.ScalityRemoteFsClient(
'scality', root_helper='true', scality_mount_point_base='/fake',
execute=putils.execute)
with mock.patch.object(fsclient, '_read_mounts', return_value={}):
fsclient.mount('fake')
mock_execute.assert_called_once_with(
'mkdir', '-p', '/fake', check_exit_code=0)
mock_do_mount.assert_called_once_with(
'sofs', '/etc/sfused.conf', '/fake')