manila/manila/tests/share/drivers/test_glusterfs.py

474 lines
19 KiB
Python

# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
import socket
from unittest import mock
import ddt
from oslo_config import cfg
from manila import context
from manila import exception
from manila.share import configuration as config
from manila.share.drivers import ganesha
from manila.share.drivers import glusterfs
from manila.share.drivers.glusterfs import layout
from manila import test
from manila.tests import fake_share
from manila.tests import fake_utils
CONF = cfg.CONF
fake_gluster_manager_attrs = {
'export': '127.0.0.1:/testvol',
'host': '127.0.0.1',
'qualified': 'testuser@127.0.0.1:/testvol',
'user': 'testuser',
'volume': 'testvol',
'path_to_private_key': '/fakepath/to/privatekey',
'remote_server_password': 'fakepassword',
}
fake_share_name = 'fakename'
NFS_EXPORT_DIR = 'nfs.export-dir'
NFS_EXPORT_VOL = 'nfs.export-volumes'
NFS_RPC_AUTH_ALLOW = 'nfs.rpc-auth-allow'
NFS_RPC_AUTH_REJECT = 'nfs.rpc-auth-reject'
@ddt.ddt
class GlusterfsShareDriverTestCase(test.TestCase):
"""Tests GlusterfsShareDriver."""
def setUp(self):
super(GlusterfsShareDriverTestCase, self).setUp()
fake_utils.stub_out_utils_execute(self)
self._execute = fake_utils.fake_execute
self._context = context.get_admin_context()
self.addCleanup(fake_utils.fake_execute_set_repliers, [])
self.addCleanup(fake_utils.fake_execute_clear_log)
CONF.set_default('reserved_share_percentage', 50)
CONF.set_default('driver_handles_share_servers', False)
self.fake_conf = config.Configuration(None)
self._driver = glusterfs.GlusterfsShareDriver(
execute=self._execute,
configuration=self.fake_conf)
self.share = fake_share.fake_share(share_proto='NFS')
def test_do_setup(self):
self.mock_object(self._driver, '_get_helper')
self.mock_object(layout.GlusterfsShareDriverBase, 'do_setup')
_context = mock.Mock()
self._driver.do_setup(_context)
self._driver._get_helper.assert_called_once_with()
layout.GlusterfsShareDriverBase.do_setup.assert_called_once_with(
_context)
@ddt.data(True, False)
def test_setup_via_manager(self, has_parent):
gmgr = mock.Mock()
share_mgr_parent = mock.Mock() if has_parent else None
nfs_helper = mock.Mock()
nfs_helper.get_export = mock.Mock(return_value='host:/vol')
self._driver.nfs_helper = mock.Mock(return_value=nfs_helper)
ret = self._driver._setup_via_manager(
{'manager': gmgr, 'share': self.share},
share_manager_parent=share_mgr_parent)
gmgr.set_vol_option.assert_called_once_with(
'nfs.export-volumes', False)
self._driver.nfs_helper.assert_called_once_with(
self._execute, self.fake_conf, gluster_manager=gmgr)
nfs_helper.get_export.assert_called_once_with(self.share)
self.assertEqual('host:/vol', ret)
@ddt.data({'helpercls': None, 'path': '/fakepath'},
{'helpercls': None, 'path': None},
{'helpercls': glusterfs.GlusterNFSHelper, 'path': '/fakepath'},
{'helpercls': glusterfs.GlusterNFSHelper, 'path': None})
@ddt.unpack
def test_setup_via_manager_path(self, helpercls, path):
gmgr = mock.Mock()
gmgr.path = path
if not helpercls:
helper = mock.Mock()
helper.get_export = mock.Mock(return_value='host:/vol')
helpercls = mock.Mock(return_value=helper)
self._driver.nfs_helper = helpercls
if helpercls == glusterfs.GlusterNFSHelper and path is None:
gmgr.get_vol_option = mock.Mock(return_value=True)
self._driver._setup_via_manager(
{'manager': gmgr, 'share': self.share})
if helpercls == glusterfs.GlusterNFSHelper and path is None:
gmgr.get_vol_option.assert_called_once_with(
NFS_EXPORT_VOL, boolean=True)
args = (NFS_RPC_AUTH_REJECT, '*')
else:
args = (NFS_EXPORT_VOL, False)
gmgr.set_vol_option.assert_called_once_with(*args)
def test_setup_via_manager_export_volumes_off(self):
gmgr = mock.Mock()
gmgr.path = None
gmgr.get_vol_option = mock.Mock(return_value=False)
self._driver.nfs_helper = glusterfs.GlusterNFSHelper
self.assertRaises(exception.GlusterfsException,
self._driver._setup_via_manager,
{'manager': gmgr, 'share': self.share})
gmgr.get_vol_option.assert_called_once_with(NFS_EXPORT_VOL,
boolean=True)
def test_check_for_setup_error(self):
self._driver.check_for_setup_error()
def test_update_share_stats(self):
self.mock_object(layout.GlusterfsShareDriverBase,
'_update_share_stats')
self._driver._update_share_stats()
(layout.GlusterfsShareDriverBase._update_share_stats.
assert_called_once_with({'storage_protocol': 'NFS',
'vendor_name': 'Red Hat',
'share_backend_name': 'GlusterFS',
'reserved_percentage': 50}))
def test_get_network_allocations_number(self):
self.assertEqual(0, self._driver.get_network_allocations_number())
def test_get_helper(self):
ret = self._driver._get_helper()
self.assertIsInstance(ret, self._driver.nfs_helper)
@ddt.data({'path': '/fakepath', 'helper': glusterfs.GlusterNFSHelper},
{'path': None, 'helper': glusterfs.GlusterNFSVolHelper})
@ddt.unpack
def test_get_helper_vol(self, path, helper):
self._driver.nfs_helper = glusterfs.GlusterNFSHelper
gmgr = mock.Mock(path=path)
ret = self._driver._get_helper(gmgr)
self.assertIsInstance(ret, helper)
@ddt.data('type', 'level')
def test_supported_access_features(self, feature):
nfs_helper = mock.Mock()
supported_access_feature = mock.Mock()
setattr(nfs_helper, 'supported_access_%ss' % feature,
supported_access_feature)
self.mock_object(self._driver, 'nfs_helper', nfs_helper)
ret = getattr(self._driver, 'supported_access_%ss' % feature)
self.assertEqual(supported_access_feature, ret)
def test_update_access_via_manager(self):
self.mock_object(self._driver, '_get_helper')
gmgr = mock.Mock()
add_rules = mock.Mock()
delete_rules = mock.Mock()
self._driver._update_access_via_manager(
gmgr, self._context, self.share,
add_rules, delete_rules, recovery=True)
self._driver._get_helper.assert_called_once_with(gmgr)
self._driver._get_helper().update_access.assert_called_once_with(
'/', self.share, add_rules, delete_rules, recovery=True)
@ddt.ddt
class GlusterNFSHelperTestCase(test.TestCase):
"""Tests GlusterNFSHelper."""
def setUp(self):
super(GlusterNFSHelperTestCase, self).setUp()
fake_utils.stub_out_utils_execute(self)
gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
self._execute = mock.Mock(return_value=('', ''))
self.fake_conf = config.Configuration(None)
self._helper = glusterfs.GlusterNFSHelper(
self._execute, self.fake_conf, gluster_manager=gluster_manager)
def test_get_export(self):
ret = self._helper.get_export(mock.Mock())
self.assertEqual(fake_gluster_manager_attrs['export'], ret)
@ddt.data({'output_str': '/foo(10.0.0.1|10.0.0.2),/bar(10.0.0.1)',
'expected': {'foo': ['10.0.0.1', '10.0.0.2'],
'bar': ['10.0.0.1']}},
{'output_str': None, 'expected': {}})
@ddt.unpack
def test_get_export_dir_dict(self, output_str, expected):
self.mock_object(self._helper.gluster_manager,
'get_vol_option',
mock.Mock(return_value=output_str))
ret = self._helper._get_export_dir_dict()
self.assertEqual(expected, ret)
(self._helper.gluster_manager.get_vol_option.
assert_called_once_with(NFS_EXPORT_DIR))
@ddt.data({'delta': (['10.0.0.2'], []), 'extra_exports': {},
'new_exports': '/fakename(10.0.0.1|10.0.0.2)'},
{'delta': (['10.0.0.1'], []), 'extra_exports': {},
'new_exports': '/fakename(10.0.0.1)'},
{'delta': ([], ['10.0.0.2']), 'extra_exports': {},
'new_exports': '/fakename(10.0.0.1)'},
{'delta': ([], ['10.0.0.1']), 'extra_exports': {},
'new_exports': None},
{'delta': ([], ['10.0.0.1']),
'extra_exports': {'elsewhere': ['10.0.1.3']},
'new_exports': '/elsewhere(10.0.1.3)'})
@ddt.unpack
def test_update_access(self, delta, extra_exports, new_exports):
gluster_manager_attrs = {'path': '/fakename'}
gluster_manager_attrs.update(fake_gluster_manager_attrs)
gluster_mgr = mock.Mock(**gluster_manager_attrs)
helper = glusterfs.GlusterNFSHelper(
self._execute, self.fake_conf, gluster_manager=gluster_mgr)
export_dir_dict = {'fakename': ['10.0.0.1']}
export_dir_dict.update(extra_exports)
helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
_share = mock.Mock()
add_rules, delete_rules = (
map(lambda a: {'access_to': a}, r) for r in delta)
helper.update_access('/', _share, add_rules, delete_rules)
helper._get_export_dir_dict.assert_called_once_with()
gluster_mgr.set_vol_option.assert_called_once_with(NFS_EXPORT_DIR,
new_exports)
@ddt.data({}, {'elsewhere': '10.0.1.3'})
def test_update_access_disjoint(self, export_dir_dict):
gluster_manager_attrs = {'path': '/fakename'}
gluster_manager_attrs.update(fake_gluster_manager_attrs)
gluster_mgr = mock.Mock(**gluster_manager_attrs)
helper = glusterfs.GlusterNFSHelper(
self._execute, self.fake_conf, gluster_manager=gluster_mgr)
helper._get_export_dir_dict = mock.Mock(return_value=export_dir_dict)
_share = mock.Mock()
helper.update_access('/', _share, [], [{'access_to': '10.0.0.2'}])
helper._get_export_dir_dict.assert_called_once_with()
self.assertFalse(gluster_mgr.set_vol_option.called)
@ddt.ddt
class GlusterNFSVolHelperTestCase(test.TestCase):
"""Tests GlusterNFSVolHelper."""
def setUp(self):
super(GlusterNFSVolHelperTestCase, self).setUp()
fake_utils.stub_out_utils_execute(self)
gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
self._execute = mock.Mock(return_value=('', ''))
self.fake_conf = config.Configuration(None)
self._helper = glusterfs.GlusterNFSVolHelper(
self._execute, self.fake_conf, gluster_manager=gluster_manager)
@ddt.data({'output_str': '10.0.0.1,10.0.0.2',
'expected': ['10.0.0.1', '10.0.0.2']},
{'output_str': None, 'expected': []})
@ddt.unpack
def test_get_vol_exports(self, output_str, expected):
self.mock_object(self._helper.gluster_manager,
'get_vol_option',
mock.Mock(return_value=output_str))
ret = self._helper._get_vol_exports()
self.assertEqual(expected, ret)
(self._helper.gluster_manager.get_vol_option.
assert_called_once_with(NFS_RPC_AUTH_ALLOW))
@ddt.data({'delta': (["10.0.0.1"], []), 'expected': "10.0.0.1,10.0.0.3"},
{'delta': (["10.0.0.2"], []),
'expected': "10.0.0.1,10.0.0.2,10.0.0.3"},
{'delta': ([], ["10.0.0.1"]), 'expected': "10.0.0.3"},
{'delta': ([], ["10.0.0.2"]), 'expected': "10.0.0.1,10.0.0.3"})
@ddt.unpack
def test_update_access(self, delta, expected):
self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
return_value=["10.0.0.1", "10.0.0.3"]))
_share = mock.Mock()
add_rules, delete_rules = (
map(lambda a: {'access_to': a}, r) for r in delta)
self._helper.update_access("/", _share, add_rules, delete_rules)
self._helper._get_vol_exports.assert_called_once_with()
argseq = [(NFS_RPC_AUTH_ALLOW, expected), (NFS_RPC_AUTH_REJECT, None)]
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.set_vol_option.call_args_list)
def test_update_access_empty(self):
self.mock_object(self._helper, '_get_vol_exports', mock.Mock(
return_value=["10.0.0.1"]))
_share = mock.Mock()
self._helper.update_access("/", _share, [],
[{'access_to': "10.0.0.1"}])
self._helper._get_vol_exports.assert_called_once_with()
argseq = [(NFS_RPC_AUTH_ALLOW, None), (NFS_RPC_AUTH_REJECT, "*")]
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.set_vol_option.call_args_list)
class GaneshaNFSHelperTestCase(test.TestCase):
"""Tests GaneshaNFSHelper."""
def setUp(self):
super(GaneshaNFSHelperTestCase, self).setUp()
self.gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
self._execute = mock.Mock(return_value=('', ''))
self._root_execute = mock.Mock(return_value=('', ''))
self.access = fake_share.fake_access()
self.fake_conf = config.Configuration(None)
self.fake_template = {'key': 'value'}
self.share = fake_share.fake_share()
self.mock_object(glusterfs.ganesha_utils, 'RootExecutor',
mock.Mock(return_value=self._root_execute))
self.mock_object(glusterfs.ganesha.GaneshaNASHelper, '__init__',
mock.Mock())
socket.gethostname = mock.Mock(return_value='example.com')
self._helper = glusterfs.GaneshaNFSHelper(
self._execute, self.fake_conf,
gluster_manager=self.gluster_manager)
self._helper.tag = 'GLUSTER-Ganesha-localhost'
def test_init_local_ganesha_server(self):
glusterfs.ganesha_utils.RootExecutor.assert_called_once_with(
self._execute)
socket.gethostname.assert_has_calls([mock.call()])
glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls(
[mock.call(self._root_execute, self.fake_conf,
tag='GLUSTER-Ganesha-example.com')])
def test_get_export(self):
ret = self._helper.get_export(self.share)
self.assertEqual('example.com:/fakename--<access-id>', ret)
def test_init_remote_ganesha_server(self):
ssh_execute = mock.Mock(return_value=('', ''))
CONF.set_default('glusterfs_ganesha_server_ip', 'fakeip')
self.mock_object(glusterfs.ganesha_utils, 'SSHExecutor',
mock.Mock(return_value=ssh_execute))
glusterfs.GaneshaNFSHelper(
self._execute, self.fake_conf,
gluster_manager=self.gluster_manager)
glusterfs.ganesha_utils.SSHExecutor.assert_called_once_with(
'fakeip', 22, None, 'root', password=None, privatekey=None)
glusterfs.ganesha.GaneshaNASHelper.__init__.assert_has_calls(
[mock.call(ssh_execute, self.fake_conf,
tag='GLUSTER-Ganesha-fakeip')])
def test_init_helper(self):
ganeshelper = mock.Mock()
exptemp = mock.Mock()
def set_attributes(*a, **kw):
self._helper.ganesha = ganeshelper
self._helper.export_template = exptemp
self.mock_object(ganesha.GaneshaNASHelper, 'init_helper',
mock.Mock(side_effect=set_attributes))
self.assertEqual({}, glusterfs.GaneshaNFSHelper.shared_data)
self._helper.init_helper()
ganesha.GaneshaNASHelper.init_helper.assert_called_once_with()
self.assertEqual(ganeshelper, self._helper.ganesha)
self.assertEqual(exptemp, self._helper.export_template)
self.assertEqual({
'GLUSTER-Ganesha-localhost': {
'ganesha': ganeshelper,
'export_template': exptemp}},
glusterfs.GaneshaNFSHelper.shared_data)
other_helper = glusterfs.GaneshaNFSHelper(
self._execute, self.fake_conf,
gluster_manager=self.gluster_manager)
other_helper.tag = 'GLUSTER-Ganesha-localhost'
other_helper.init_helper()
self.assertEqual(ganeshelper, other_helper.ganesha)
self.assertEqual(exptemp, other_helper.export_template)
def test_default_config_hook(self):
fake_conf_dict = {'key': 'value1'}
mock_ganesha_utils_patch = mock.Mock()
def fake_patch_run(tmpl1, tmpl2):
mock_ganesha_utils_patch(
copy.deepcopy(tmpl1), tmpl2)
tmpl1.update(tmpl2)
self.mock_object(glusterfs.ganesha.GaneshaNASHelper,
'_default_config_hook',
mock.Mock(return_value=self.fake_template))
self.mock_object(glusterfs.ganesha_utils, 'path_from',
mock.Mock(return_value='/fakedir/glusterfs/conf'))
self.mock_object(self._helper, '_load_conf_dir',
mock.Mock(return_value=fake_conf_dict))
self.mock_object(glusterfs.ganesha_utils, 'patch',
mock.Mock(side_effect=fake_patch_run))
ret = self._helper._default_config_hook()
(glusterfs.ganesha.GaneshaNASHelper._default_config_hook.
assert_called_once_with())
glusterfs.ganesha_utils.path_from.assert_called_once_with(
glusterfs.__file__, 'conf')
self._helper._load_conf_dir.assert_called_once_with(
'/fakedir/glusterfs/conf')
glusterfs.ganesha_utils.patch.assert_called_once_with(
self.fake_template, fake_conf_dict)
self.assertEqual(fake_conf_dict, ret)
def test_fsal_hook(self):
self._helper.gluster_manager.path = '/fakename'
output = {
'Hostname': '127.0.0.1',
'Volume': 'testvol',
'Volpath': '/fakename'
}
ret = self._helper._fsal_hook('/fakepath', self.share, self.access)
self.assertEqual(output, ret)