341161eead
Use parenthesis instead of backslashes in tests folder TrivialFix Change-Id: I6c2ea07b0bfc5852b28e44989406cc10eb972e53
474 lines
19 KiB
Python
474 lines
19 KiB
Python
# Copyright (c) 2014 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import copy
|
|
import socket
|
|
|
|
import ddt
|
|
import mock
|
|
from oslo_config import cfg
|
|
|
|
from manila import context
|
|
from manila import exception
|
|
from manila.share import configuration as config
|
|
from manila.share.drivers import ganesha
|
|
from manila.share.drivers import glusterfs
|
|
from manila.share.drivers.glusterfs import layout
|
|
from manila import test
|
|
from manila.tests import fake_share
|
|
from manila.tests import fake_utils
|
|
|
|
|
|
CONF = cfg.CONF
|
|
|
|
|
|
fake_gluster_manager_attrs = {
|
|
'export': '127.0.0.1:/testvol',
|
|
'host': '127.0.0.1',
|
|
'qualified': 'testuser@127.0.0.1:/testvol',
|
|
'user': 'testuser',
|
|
'volume': 'testvol',
|
|
'path_to_private_key': '/fakepath/to/privatekey',
|
|
'remote_server_password': 'fakepassword',
|
|
}
|
|
|
|
fake_share_name = 'fakename'
|
|
NFS_EXPORT_DIR = 'nfs.export-dir'
|
|
NFS_EXPORT_VOL = 'nfs.export-volumes'
|
|
NFS_RPC_AUTH_ALLOW = 'nfs.rpc-auth-allow'
|
|
NFS_RPC_AUTH_REJECT = 'nfs.rpc-auth-reject'
|
|
|
|
|
|
@ddt.ddt
|
|
class GlusterfsShareDriverTestCase(test.TestCase):
|
|
"""Tests GlusterfsShareDriver."""
|
|
|
|
def setUp(self):
|
|
super(GlusterfsShareDriverTestCase, self).setUp()
|
|
fake_utils.stub_out_utils_execute(self)
|
|
self._execute = fake_utils.fake_execute
|
|
self._context = context.get_admin_context()
|
|
self.addCleanup(fake_utils.fake_execute_set_repliers, [])
|
|
self.addCleanup(fake_utils.fake_execute_clear_log)
|
|
|
|
CONF.set_default('reserved_share_percentage', 50)
|
|
CONF.set_default('driver_handles_share_servers', False)
|
|
|
|
self.fake_conf = config.Configuration(None)
|
|
self._driver = glusterfs.GlusterfsShareDriver(
|
|
execute=self._execute,
|
|
configuration=self.fake_conf)
|
|
self.share = fake_share.fake_share(share_proto='NFS')
|
|
|
|
def test_do_setup(self):
|
|
self.mock_object(self._driver, '_get_helper')
|
|
self.mock_object(layout.GlusterfsShareDriverBase, 'do_setup')
|
|
_context = mock.Mock()
|
|
|
|
self._driver.do_setup(_context)
|
|
|
|
self._driver._get_helper.assert_called_once_with()
|
|
layout.GlusterfsShareDriverBase.do_setup.assert_called_once_with(
|
|
_context)
|
|
|
|
@ddt.data(True, False)
|
|
def test_setup_via_manager(self, has_parent):
|
|
gmgr = mock.Mock()
|
|
share_mgr_parent = mock.Mock() if has_parent else None
|
|
nfs_helper = mock.Mock()
|
|
nfs_helper.get_export = mock.Mock(return_value='host:/vol')
|
|
self._driver.nfs_helper = mock.Mock(return_value=nfs_helper)
|
|
|
|
ret = self._driver._setup_via_manager(
|
|
{'manager': gmgr, 'share': self.share},
|
|
share_manager_parent=share_mgr_parent)
|
|
|
|
gmgr.set_vol_option.assert_called_once_with(
|
|
'nfs.export-volumes', False)
|
|
self._driver.nfs_helper.assert_called_once_with(
|
|
self._execute, self.fake_conf, gluster_manager=gmgr)
|
|
nfs_helper.get_export.assert_called_once_with(self.share)
|
|
self.assertEqual('host:/vol', ret)
|
|
|
|
@ddt.data({'helpercls': None, 'path': '/fakepath'},
|
|
{'helpercls': None, 'path': None},
|
|
{'helpercls': glusterfs.GlusterNFSHelper, 'path': '/fakepath'},
|
|
{'helpercls': glusterfs.GlusterNFSHelper, 'path': None})
|
|
@ddt.unpack
|
|
def test_setup_via_manager_path(self, helpercls, path):
|
|
gmgr = mock.Mock()
|
|
gmgr.path = path
|
|
if not helpercls:
|
|
helper = mock.Mock()
|
|
helper.get_export = mock.Mock(return_value='host:/vol')
|
|
helpercls = mock.Mock(return_value=helper)
|
|
self._driver.nfs_helper = helpercls
|
|
if helpercls == glusterfs.GlusterNFSHelper and path is None:
|
|
gmgr.get_vol_option = mock.Mock(return_value=True)
|
|
|
|
self._driver._setup_via_manager(
|
|
{'manager': gmgr, 'share': self.share})
|
|
|
|
if helpercls == glusterfs.GlusterNFSHelper and path is None:
|
|
gmgr.get_vol_option.assert_called_once_with(
|
|
NFS_EXPORT_VOL, boolean=True)
|
|
args = (NFS_RPC_AUTH_REJECT, '*')
|
|
else:
|
|
args = (NFS_EXPORT_VOL, False)
|
|
gmgr.set_vol_option.assert_called_once_with(*args)
|
|
|
|
def test_setup_via_manager_export_volumes_off(self):
|
|
gmgr = mock.Mock()
|
|
gmgr.path = None
|
|
gmgr.get_vol_option = mock.Mock(return_value=False)
|
|
self._driver.nfs_helper = glusterfs.GlusterNFSHelper
|
|
|
|
self.assertRaises(exception.GlusterfsException,
|
|
self._driver._setup_via_manager,
|
|
{'manager': gmgr, 'share': self.share})
|
|
|
|
gmgr.get_vol_option.assert_called_once_with(NFS_EXPORT_VOL,
|
|
boolean=True)
|
|
|
|
def test_check_for_setup_error(self):
|
|
self._driver.check_for_setup_error()
|
|
|
|
def test_update_share_stats(self):
|
|
self.mock_object(layout.GlusterfsShareDriverBase,
|
|
'_update_share_stats')
|
|
|
|
self._driver._update_share_stats()
|
|
|
|
(layout.GlusterfsShareDriverBase._update_share_stats.
|
|
assert_called_once_with({'storage_protocol': 'NFS',
|
|
'vendor_name': 'Red Hat',
|
|
'share_backend_name': 'GlusterFS',
|
|
'reserved_percentage': 50}))
|
|
|
|
def test_get_network_allocations_number(self):
|
|
self.assertEqual(0, self._driver.get_network_allocations_number())
|
|
|
|
def test_get_helper(self):
|
|
ret = self._driver._get_helper()
|
|
self.assertIsInstance(ret, self._driver.nfs_helper)
|
|
|
|
@ddt.data({'path': '/fakepath', 'helper': glusterfs.GlusterNFSHelper},
|
|
{'path': None, 'helper': glusterfs.GlusterNFSVolHelper})
|
|
@ddt.unpack
|
|
def test_get_helper_vol(self, path, helper):
|
|
self._driver.nfs_helper = glusterfs.GlusterNFSHelper
|
|
|
|
gmgr = mock.Mock(path=path)
|
|
ret = self._driver._get_helper(gmgr)
|
|
|
|
self.assertIsInstance(ret, helper)
|
|
|
|
@ddt.data('type', 'level')
|
|
def test_supported_access_features(self, feature):
|
|
nfs_helper = mock.Mock()
|
|
supported_access_feature = mock.Mock()
|
|
setattr(nfs_helper, 'supported_access_%ss' % feature,
|
|
supported_access_feature)
|
|
self.mock_object(self._driver, 'nfs_helper', nfs_helper)
|
|
|
|
ret = getattr(self._driver, 'supported_access_%ss' % feature)
|
|
|
|
self.assertEqual(supported_access_feature, ret)
|
|
|
|
def test_update_access_via_manager(self):
|
|
self.mock_object(self._driver, '_get_helper')
|
|
gmgr = mock.Mock()
|
|
add_rules = mock.Mock()
|
|
delete_rules = mock.Mock()
|
|
|
|
self._driver._update_access_via_manager(
|
|
gmgr, self._context, self.share,
|
|
add_rules, delete_rules, recovery=True)
|
|
|
|
self._driver._get_helper.assert_called_once_with(gmgr)
|
|
self._driver._get_helper().update_access.assert_called_once_with(
|
|
'/', self.share, add_rules, delete_rules, recovery=True)
|
|
|
|
|
|
@ddt.ddt
|
|
class GlusterNFSHelperTestCase(test.TestCase):
|
|
"""Tests GlusterNFSHelper."""
|
|
|
|
def setUp(self):
|
|
super(GlusterNFSHelperTestCase, self).setUp()
|
|
fake_utils.stub_out_utils_execute(self)
|
|
gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
|
|
self._execute = mock.Mock(return_value=('', ''))
|
|
self.fake_conf = config.Configuration(None)
|
|
self._helper = glusterfs.GlusterNFSHelper(
|
|
self._execute, self.fake_conf, gluster_manager=gluster_manager)
|
|
|
|
def test_get_export(self):
|
|
ret = self._helper.get_export(mock.Mock())
|
|
|
|
self.assertEqual(fake_gluster_manager_attrs['export'], ret)
|
|
|
|
@ddt.data({'output_str': '/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)',
|
|
'expected': {'foo': ['10.0.0.1', '10.0.0.2'],
|
|
'bar': ['10.0.0.1']}},
|
|
{'output_str': None, 'expected': {}})
|
|
@ddt.unpack
|
|
def test_get_export_dir_dict(self, output_str, expected):
|
|
self.mock_object(self._helper.gluster_manager,
|
|
'get_vol_option',
|
|
mock.Mock(return_value=output_str))
|
|
|
|
ret = self._helper._get_export_dir_dict()
|
|
|
|
self.assertEqual(expected, ret)
|
|
(self._helper.gluster_manager.get_vol_option.
|
|
assert_called_once_with(NFS_EXPORT_DIR))
|
|
|
|
@ddt.data({'delta': (['10.0.0.2'], []), 'extra_exports': {},
|
|
'new_exports': '/fakename(10.0.0.1|10.0.0.2)'},
|
|
{'delta': (['10.0.0.1'], []), 'extra_exports': {},
|
|
'new_exports': '/fakename(10.0.0.1)'},
|
|
{'delta': ([], ['10.0.0.2']), 'extra_exports': {},
|
|
'new_exports': '/fakename(10.0.0.1)'},
|
|
{'delta': ([], ['10.0.0.1']), 'extra_exports': {},
|
|
'new_exports': None},
|
|
{'delta': ([], ['10.0.0.1']),
|
|
'extra_exports': {'elsewhere': ['10.0.1.3']},
|
|
'new_exports': '/elsewhere(10.0.1.3)'})
|
|
@ddt.unpack
|
|
def test_update_access(self, delta, extra_exports, new_exports):
|
|
gluster_manager_attrs = {'path': '/fakename'}
|
|
gluster_manager_attrs.update(fake_gluster_manager_attrs)
|
|
gluster_mgr = mock.Mock(**gluster_manager_attrs)
|
|
helper = glusterfs.GlusterNFSHelper(
|
|
self._execute, self.fake_conf, gluster_manager=gluster_mgr)
|
|
export_dir_dict = {'fakename': ['10.0.0.1']}
|
|
export_dir_dict.update(extra_exports)
|
|
helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
|
|
_share = mock.Mock()
|
|
|
|
add_rules, delete_rules = (
|
|
map(lambda a: {'access_to': a}, r) for r in delta)
|
|
helper.update_access('/', _share, add_rules, delete_rules)
|
|
|
|
helper._get_export_dir_dict.assert_called_once_with()
|
|
gluster_mgr.set_vol_option.assert_called_once_with(NFS_EXPORT_DIR,
|
|
new_exports)
|
|
|
|
@ddt.data({}, {'elsewhere': '10.0.1.3'})
|
|
def test_update_access_disjoint(self, export_dir_dict):
|
|
gluster_manager_attrs = {'path': '/fakename'}
|
|
gluster_manager_attrs.update(fake_gluster_manager_attrs)
|
|
gluster_mgr = mock.Mock(**gluster_manager_attrs)
|
|
helper = glusterfs.GlusterNFSHelper(
|
|
self._execute, self.fake_conf, gluster_manager=gluster_mgr)
|
|
helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
|
|
_share = mock.Mock()
|
|
|
|
helper.update_access('/', _share, [], [{'access_to': '10.0.0.2'}])
|
|
|
|
helper._get_export_dir_dict.assert_called_once_with()
|
|
self.assertFalse(gluster_mgr.set_vol_option.called)
|
|
|
|
|
|
@ddt.ddt
|
|
class GlusterNFSVolHelperTestCase(test.TestCase):
|
|
"""Tests GlusterNFSVolHelper."""
|
|
|
|
def setUp(self):
|
|
super(GlusterNFSVolHelperTestCase, self).setUp()
|
|
fake_utils.stub_out_utils_execute(self)
|
|
gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
|
|
self._execute = mock.Mock(return_value=('', ''))
|
|
self.fake_conf = config.Configuration(None)
|
|
self._helper = glusterfs.GlusterNFSVolHelper(
|
|
self._execute, self.fake_conf, gluster_manager=gluster_manager)
|
|
|
|
@ddt.data({'output_str': '10.0.0.1,10.0.0.2',
|
|
'expected': ['10.0.0.1', '10.0.0.2']},
|
|
{'output_str': None, 'expected': []})
|
|
@ddt.unpack
|
|
def test_get_vol_exports(self, output_str, expected):
|
|
self.mock_object(self._helper.gluster_manager,
|
|
'get_vol_option',
|
|
mock.Mock(return_value=output_str))
|
|
|
|
ret = self._helper._get_vol_exports()
|
|
|
|
self.assertEqual(expected, ret)
|
|
(self._helper.gluster_manager.get_vol_option.
|
|
assert_called_once_with(NFS_RPC_AUTH_ALLOW))
|
|
|
|
@ddt.data({'delta': (["10.0.0.1"], []), 'expected': "10.0.0.1,10.0.0.3"},
|
|
{'delta': (["10.0.0.2"], []),
|
|
'expected': "10.0.0.1,10.0.0.2,10.0.0.3"},
|
|
{'delta': ([], ["10.0.0.1"]), 'expected': "10.0.0.3"},
|
|
{'delta': ([], ["10.0.0.2"]), 'expected': "10.0.0.1,10.0.0.3"})
|
|
@ddt.unpack
|
|
def test_update_access(self, delta, expected):
|
|
self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
|
|
return_value=["10.0.0.1", "10.0.0.3"]))
|
|
_share = mock.Mock()
|
|
|
|
add_rules, delete_rules = (
|
|
map(lambda a: {'access_to': a}, r) for r in delta)
|
|
self._helper.update_access("/", _share, add_rules, delete_rules)
|
|
|
|
self._helper._get_vol_exports.assert_called_once_with()
|
|
argseq = [(NFS_RPC_AUTH_ALLOW, expected), (NFS_RPC_AUTH_REJECT, None)]
|
|
self.assertEqual(
|
|
[mock.call(*a) for a in argseq],
|
|
self._helper.gluster_manager.set_vol_option.call_args_list)
|
|
|
|
def test_update_access_empty(self):
|
|
self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
|
|
return_value=["10.0.0.1"]))
|
|
_share = mock.Mock()
|
|
|
|
self._helper.update_access("/", _share, [],
|
|
[{'access_to': "10.0.0.1"}])
|
|
|
|
self._helper._get_vol_exports.assert_called_once_with()
|
|
argseq = [(NFS_RPC_AUTH_ALLOW, None), (NFS_RPC_AUTH_REJECT, "*")]
|
|
self.assertEqual(
|
|
[mock.call(*a) for a in argseq],
|
|
self._helper.gluster_manager.set_vol_option.call_args_list)
|
|
|
|
|
|
class GaneshaNFSHelperTestCase(test.TestCase):
|
|
"""Tests GaneshaNFSHelper."""
|
|
|
|
def setUp(self):
|
|
super(GaneshaNFSHelperTestCase, self).setUp()
|
|
self.gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
|
|
self._execute = mock.Mock(return_value=('', ''))
|
|
self._root_execute = mock.Mock(return_value=('', ''))
|
|
self.access = fake_share.fake_access()
|
|
self.fake_conf = config.Configuration(None)
|
|
self.fake_template = {'key': 'value'}
|
|
self.share = fake_share.fake_share()
|
|
self.mock_object(glusterfs.ganesha_utils, 'RootExecutor',
|
|
mock.Mock(return_value=self._root_execute))
|
|
self.mock_object(glusterfs.ganesha.GaneshaNASHelper, '__init__',
|
|
mock.Mock())
|
|
socket.gethostname = mock.Mock(return_value='example.com')
|
|
self._helper = glusterfs.GaneshaNFSHelper(
|
|
self._execute, self.fake_conf,
|
|
gluster_manager=self.gluster_manager)
|
|
self._helper.tag = 'GLUSTER-Ganesha-localhost'
|
|
|
|
def test_init_local_ganesha_server(self):
|
|
glusterfs.ganesha_utils.RootExecutor.assert_called_once_with(
|
|
self._execute)
|
|
socket.gethostname.assert_has_calls([mock.call()])
|
|
glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls(
|
|
[mock.call(self._root_execute, self.fake_conf,
|
|
tag='GLUSTER-Ganesha-example.com')])
|
|
|
|
def test_get_export(self):
|
|
ret = self._helper.get_export(self.share)
|
|
|
|
self.assertEqual('example.com:/fakename--<access-id>', ret)
|
|
|
|
def test_init_remote_ganesha_server(self):
|
|
ssh_execute = mock.Mock(return_value=('', ''))
|
|
CONF.set_default('glusterfs_ganesha_server_ip', 'fakeip')
|
|
self.mock_object(glusterfs.ganesha_utils, 'SSHExecutor',
|
|
mock.Mock(return_value=ssh_execute))
|
|
glusterfs.GaneshaNFSHelper(
|
|
self._execute, self.fake_conf,
|
|
gluster_manager=self.gluster_manager)
|
|
glusterfs.ganesha_utils.SSHExecutor.assert_called_once_with(
|
|
'fakeip', 22, None, 'root', password=None, privatekey=None)
|
|
glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls(
|
|
[mock.call(ssh_execute, self.fake_conf,
|
|
tag='GLUSTER-Ganesha-fakeip')])
|
|
|
|
def test_init_helper(self):
|
|
ganeshelper = mock.Mock()
|
|
exptemp = mock.Mock()
|
|
|
|
def set_attributes(*a, **kw):
|
|
self._helper.ganesha = ganeshelper
|
|
self._helper.export_template = exptemp
|
|
|
|
self.mock_object(ganesha.GaneshaNASHelper, 'init_helper',
|
|
mock.Mock(side_effect=set_attributes))
|
|
self.assertEqual({}, glusterfs.GaneshaNFSHelper.shared_data)
|
|
|
|
self._helper.init_helper()
|
|
|
|
ganesha.GaneshaNASHelper.init_helper.assert_called_once_with()
|
|
self.assertEqual(ganeshelper, self._helper.ganesha)
|
|
self.assertEqual(exptemp, self._helper.export_template)
|
|
self.assertEqual({
|
|
'GLUSTER-Ganesha-localhost': {
|
|
'ganesha': ganeshelper,
|
|
'export_template': exptemp}},
|
|
glusterfs.GaneshaNFSHelper.shared_data)
|
|
|
|
other_helper = glusterfs.GaneshaNFSHelper(
|
|
self._execute, self.fake_conf,
|
|
gluster_manager=self.gluster_manager)
|
|
other_helper.tag = 'GLUSTER-Ganesha-localhost'
|
|
|
|
other_helper.init_helper()
|
|
|
|
self.assertEqual(ganeshelper, other_helper.ganesha)
|
|
self.assertEqual(exptemp, other_helper.export_template)
|
|
|
|
def test_default_config_hook(self):
|
|
fake_conf_dict = {'key': 'value1'}
|
|
mock_ganesha_utils_patch = mock.Mock()
|
|
|
|
def fake_patch_run(tmpl1, tmpl2):
|
|
mock_ganesha_utils_patch(
|
|
copy.deepcopy(tmpl1), tmpl2)
|
|
tmpl1.update(tmpl2)
|
|
|
|
self.mock_object(glusterfs.ganesha.GaneshaNASHelper,
|
|
'_default_config_hook',
|
|
mock.Mock(return_value=self.fake_template))
|
|
self.mock_object(glusterfs.ganesha_utils, 'path_from',
|
|
mock.Mock(return_value='/fakedir/glusterfs/conf'))
|
|
self.mock_object(self._helper, '_load_conf_dir',
|
|
mock.Mock(return_value=fake_conf_dict))
|
|
self.mock_object(glusterfs.ganesha_utils, 'patch',
|
|
mock.Mock(side_effect=fake_patch_run))
|
|
|
|
ret = self._helper._default_config_hook()
|
|
|
|
(glusterfs.ganesha.GaneshaNASHelper._default_config_hook.
|
|
assert_called_once_with())
|
|
glusterfs.ganesha_utils.path_from.assert_called_once_with(
|
|
glusterfs.__file__, 'conf')
|
|
self._helper._load_conf_dir.assert_called_once_with(
|
|
'/fakedir/glusterfs/conf')
|
|
glusterfs.ganesha_utils.patch.assert_called_once_with(
|
|
self.fake_template, fake_conf_dict)
|
|
self.assertEqual(fake_conf_dict, ret)
|
|
|
|
def test_fsal_hook(self):
|
|
self._helper.gluster_manager.path = '/fakename'
|
|
output = {
|
|
'Hostname': '127.0.0.1',
|
|
'Volume': 'testvol',
|
|
'Volpath': '/fakename'
|
|
}
|
|
|
|
ret = self._helper._fsal_hook('/fakepath', self.share, self.access)
|
|
|
|
self.assertEqual(output, ret)
|