glusterfs: fix gluster-nfs export for volume mapped layout

The nfs.export-dir option is not suitable for specifying
whole volume exports (ie. exports for root directory).

Instead, we have to have nfs.export-volumes = on (contrary
to all other scenarios), and control the export via
the nfs.rpc-auth-{allow,reject} options.

So we subclassed GlusterNFSHelper to GlusterNFSVolHelper,
a new helper class that operates with a similar logic to
its parent, just works with nfs.rpc-auth-{allow,reject}
instead of nfs.export-dir.

The driver code detects if {allow,deny}-acces is performed
with a whole volume backend and if the helper given in
configuration is GlusterNFSHelper, then for the handling of
this call it switches over to GlusterNFSVolHelper.

NOTE: What we *don't* do: we don't set nfs.export-volumes to "on",
it's expected to be done by the admin, beforehand. The reason
is that nfs.export-volumes is not a per-volume option, but a
per-cluster and we don't want to mess up the access control of
the cluster by chance in an over-permissive way. The per-cluster
scope of nfs.export-volumes also implies that using a GlusterFS
backed with gluster-nfs export mechasim and volume mapped layout
is an exclusive choice: the cluster can't host it along with other
export / layout schemes.

Change-Id: Ie4e4d03608f7a380cae790d429f88a5482d88ac8
Closes-Bug: #1495910
This commit is contained in:
Csaba Henk 2015-09-18 13:48:00 +02:00
parent 5704fa6aa3
commit 2153b5fa47
2 changed files with 316 additions and 24 deletions

View File

@ -65,6 +65,8 @@ CONF.register_opts(GlusterfsManilaShare_opts)
NFS_EXPORT_DIR = 'nfs.export-dir'
NFS_EXPORT_VOL = 'nfs.export-volumes'
NFS_RPC_AUTH_ALLOW = 'nfs.rpc-auth-allow'
NFS_RPC_AUTH_REJECT = 'nfs.rpc-auth-reject'
class GlusterfsShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
@ -94,10 +96,14 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
def _setup_via_manager(self, share_manager, share_manager_parent=None):
gluster_manager = share_manager['manager']
# exporting the whole volume must be prohibited
# to not to defeat access control
args = ('volume', 'set', gluster_manager.volume, NFS_EXPORT_VOL,
'off')
# TODO(csaba): This should be refactored into proper dispatch to helper
if self.nfs_helper == GlusterNFSHelper and not gluster_manager.path:
setting = [NFS_RPC_AUTH_REJECT, '*']
else:
# gluster-nfs export of the whole volume must be prohibited
# to not to defeat access control
setting = [NFS_EXPORT_VOL, 'off']
args = ['volume', 'set', gluster_manager.volume] + setting
try:
gluster_manager.gluster_call(*args)
except exception.ProcessExecutionError as exc:
@ -127,8 +133,12 @@ class GlusterfsShareDriver(driver.ExecuteMixin, driver.GaneshaMixin,
def _get_helper(self, gluster_mgr=None):
"""Choose a protocol specific helper class."""
helper = self.nfs_helper(self._execute, self.configuration,
gluster_manager=gluster_mgr)
helper_class = self.nfs_helper
if (self.nfs_helper == GlusterNFSHelper and gluster_mgr and
not gluster_mgr.path):
helper_class = GlusterNFSVolHelper
helper = helper_class(self._execute, self.configuration,
gluster_manager=gluster_mgr)
helper.init_helper()
return helper
@ -226,8 +236,6 @@ class GlusterNFSHelper(ganesha.NASHelperBase):
return True
ddict[edir].append(host)
path = self.gluster_manager.path
if not path:
path = "/"
self._manage_access(path[1:], access['access_type'],
access['access_to'], cbk)
@ -240,12 +248,83 @@ class GlusterNFSHelper(ganesha.NASHelperBase):
if not ddict[edir]:
ddict.pop(edir)
path = self.gluster_manager.path
if not path:
path = "/"
self._manage_access(path[1:], access['access_type'],
access['access_to'], cbk)
class GlusterNFSVolHelper(GlusterNFSHelper):
"""Manage shares with Gluster-NFS server, volume mapped variant."""
def __init__(self, execute, config_object, **kwargs):
self.gluster_manager = kwargs.pop('gluster_manager')
super(GlusterNFSHelper, self).__init__(execute, config_object,
**kwargs)
def _get_vol_exports(self):
export_vol = self.gluster_manager.get_gluster_vol_option(
NFS_RPC_AUTH_ALLOW)
return export_vol.split(',')
def _manage_access(self, access_type, access_to, cbk):
"""Manage share access with cbk.
Adjust the exports of the Gluster-NFS server using cbk.
:param access_type: type of access allowed in Manila
:type access_type: string
:param access_to: ip of the guest whose share access is managed
:type access_to: string
:param cbk: callback to adjust the exports of NFS server
Following is the description of cbk(explist, host).
:param explist: list of hosts that have access to the share
:type explist: list
:param host: ip address derived from the access object
:type host: string
:returns: bool (cbk leaves ddict intact) or None (cbk modifies ddict)
"""
if access_type != 'ip':
raise exception.InvalidShareAccess('only ip access type allowed')
export_vol_list = self._get_vol_exports()
if cbk(export_vol_list, access_to):
return
if export_vol_list:
argseq = (('volume', 'set', self.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW, ','.join(export_vol_list)),
('volume', 'reset', self.gluster_manager.volume,
NFS_RPC_AUTH_REJECT))
else:
argseq = (('volume', 'reset', self.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW),
('volume', 'set', self.gluster_manager.volume,
NFS_RPC_AUTH_REJECT, '*'))
try:
for args in argseq:
self.gluster_manager.gluster_call(*args)
except exception.ProcessExecutionError as exc:
LOG.error(_LE("Error in gluster volume set: %s"), exc.stderr)
raise
def allow_access(self, base, share, access):
"""Allow access to a share."""
def cbk(explist, host):
if host in explist:
return True
explist.append(host)
self._manage_access(access['access_type'], access['access_to'], cbk)
def deny_access(self, base, share, access):
"""Deny access to a share."""
def cbk(explist, host):
if host not in explist:
return True
explist.remove(host)
self._manage_access(access['access_type'], access['access_to'], cbk)
class GaneshaNFSHelper(ganesha.GaneshaNASHelper):
shared_data = {}

View File

@ -47,6 +47,8 @@ fake_gluster_manager_attrs = {
fake_share_name = 'fakename'
NFS_EXPORT_DIR = 'nfs.export-dir'
NFS_EXPORT_VOL = 'nfs.export-volumes'
NFS_RPC_AUTH_ALLOW = 'nfs.rpc-auth-allow'
NFS_RPC_AUTH_REJECT = 'nfs.rpc-auth-reject'
@ddt.ddt
@ -68,7 +70,6 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self._driver = glusterfs.GlusterfsShareDriver(
execute=self._execute,
configuration=self.fake_conf)
self._helper_nfs = mock.Mock()
self.share = fake_share.fake_share(share_proto='NFS')
def test_do_setup(self):
@ -102,6 +103,31 @@ class GlusterfsShareDriverTestCase(test.TestCase):
nfs_helper.get_export.assert_called_once_with(self.share)
self.assertEqual('host:/vol', ret)
@ddt.data({'helpercls': None, 'path': '/fakepath'},
{'helpercls': None, 'path': None},
{'helpercls': glusterfs.GlusterNFSHelper, 'path': '/fakepath'},
{'helpercls': glusterfs.GlusterNFSHelper, 'path': None})
@ddt.unpack
def test_setup_via_manager_path(self, helpercls, path):
gmgr = mock.Mock()
gmgr.gluster_call = mock.Mock()
gmgr.path = path
if not helpercls:
helper = mock.Mock()
helper.get_export = mock.Mock(return_value='host:/vol')
helpercls = mock.Mock(return_value=helper)
self._driver.nfs_helper = helpercls
self._driver._setup_via_manager(
{'manager': gmgr, 'share': self.share})
if helpercls == glusterfs.GlusterNFSHelper and not path:
args = (NFS_RPC_AUTH_REJECT, '*')
else:
args = (NFS_EXPORT_VOL, 'off')
gmgr.gluster_call.assert_called_once_with(
'volume', 'set', gmgr.volume, *args)
@ddt.data(exception.ProcessExecutionError, RuntimeError)
def test_setup_via_manager_exception(self, _exception):
gmgr = mock.Mock()
@ -136,6 +162,17 @@ class GlusterfsShareDriverTestCase(test.TestCase):
ret = self._driver._get_helper()
self.assertIsInstance(ret, self._driver.nfs_helper)
@ddt.data({'path': '/fakepath', 'helper': glusterfs.GlusterNFSHelper},
{'path': None, 'helper': glusterfs.GlusterNFSVolHelper})
@ddt.unpack
def test_get_helper_vol(self, path, helper):
self._driver.nfs_helper = glusterfs.GlusterNFSHelper
gmgr = mock.Mock(path=path)
ret = self._driver._get_helper(gmgr)
self.assertIsInstance(ret, helper)
@ddt.data({'op': 'allow', 'kwargs': {}},
{'op': 'allow', 'kwargs': {'share_server': None}},
{'op': 'deny', 'kwargs': {}},
@ -157,7 +194,6 @@ class GlusterfsShareDriverTestCase(test.TestCase):
self.assertIsNone(ret)
@ddt.ddt
class GlusterNFSHelperTestCase(test.TestCase):
"""Tests GlusterNFSHelper."""
@ -305,16 +341,13 @@ class GlusterNFSHelperTestCase(test.TestCase):
'volume', 'set', self._helper.gluster_manager.volume,
NFS_EXPORT_DIR, export_str)
@ddt.data({'share_key': 'fakename', 'gmgr_path': '/fakename'},
{'share_key': '', 'gmgr_path': None})
@ddt.unpack
def test_allow_access_with_share_having_access(self, share_key, gmgr_path):
def test_allow_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_dir_dict = {share_key: ['10.0.0.1']}
export_dir_dict = {'fakename': ['10.0.0.1']}
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
self._helper.gluster_manager.path = gmgr_path
self._helper.gluster_manager.path = '/fakename'
self._helper.allow_access(None, share, access)
@ -334,22 +367,19 @@ class GlusterNFSHelperTestCase(test.TestCase):
self._helper._get_export_dir_dict.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.gluster_call.called)
@ddt.data({'share_key': 'fakename', 'gmgr_path': '/fakename'},
{'share_key': '', 'gmgr_path': None})
@ddt.unpack
def test_deny_access_with_share_having_access(self, share_key, gmgr_path):
def test_deny_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_dir_dict = {
'example.com': ['10.0.0.1'],
share_key: ['10.0.0.1'],
'fakename': ['10.0.0.1'],
}
export_str = '/example.com(10.0.0.1)'
args = ('volume', 'set', self._helper.gluster_manager.volume,
NFS_EXPORT_DIR, export_str)
self.mock_object(self._helper, '_get_export_dir_dict',
mock.Mock(return_value=export_dir_dict))
self._helper.gluster_manager.path = gmgr_path
self._helper.gluster_manager.path = '/fakename'
self._helper.deny_access(None, share, access)
@ -358,6 +388,189 @@ class GlusterNFSHelperTestCase(test.TestCase):
*args)
@ddt.ddt
class GlusterNFSVolHelperTestCase(test.TestCase):
"""Tests GlusterNFSVolHelper."""
def setUp(self):
super(GlusterNFSVolHelperTestCase, self).setUp()
fake_utils.stub_out_utils_execute(self)
gluster_manager = mock.Mock(**fake_gluster_manager_attrs)
self._execute = mock.Mock(return_value=('', ''))
self.fake_conf = config.Configuration(None)
self._helper = glusterfs.GlusterNFSVolHelper(
self._execute, self.fake_conf, gluster_manager=gluster_manager)
def test_get_vol_exports(self):
output_str = '10.0.0.1,10.0.0.2'
self.mock_object(self._helper.gluster_manager,
'get_gluster_vol_option',
mock.Mock(return_value=output_str))
ret = self._helper._get_vol_exports()
self.assertEqual(['10.0.0.1', '10.0.0.2'], ret)
(self._helper.gluster_manager.get_gluster_vol_option.
assert_called_once_with(NFS_RPC_AUTH_ALLOW))
def test_manage_access_bad_access_type(self):
cbk = None
access = {'access_type': 'bad', 'access_to': None}
self.assertRaises(exception.InvalidShareAccess,
self._helper._manage_access,
access['access_type'], access['access_to'], cbk)
def test_manage_access_noop(self):
cbk = mock.Mock(return_value=True)
access = fake_share.fake_access()
export_list = mock.Mock()
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
ret = self._helper._manage_access(access['access_type'],
access['access_to'], cbk)
self._helper._get_vol_exports.assert_called_once_with()
cbk.assert_called_once_with(export_list, access['access_to'])
self.assertIsNone(ret)
def test_manage_access_adding_entry(self):
def cbk(li, v):
li.append(v)
access = fake_share.fake_access()
export_list = ['10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
ret = self._helper._manage_access(access['access_type'],
access['access_to'], cbk)
self.assertIsNone(ret)
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2,10.0.0.1'
argseq = (('volume', 'set', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW, export_str),
('volume', 'reset', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_REJECT))
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.gluster_call.call_args_list)
def test_manage_access_adding_entry_cmd_fail(self):
def cbk(li, v):
li.append(v)
def raise_exception(*args, **kwargs):
raise exception.ProcessExecutionError()
access = fake_share.fake_access()
export_list = ['10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self.mock_object(self._helper.gluster_manager, 'gluster_call',
mock.Mock(side_effect=raise_exception))
self.assertRaises(exception.ProcessExecutionError,
self._helper._manage_access,
access['access_type'],
access['access_to'], cbk)
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2,10.0.0.1'
args = ('volume', 'set', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW, export_str)
self._helper.gluster_manager.gluster_call.assert_called_once_with(
*args)
def test_manage_access_removing_last_entry(self):
def cbk(li, v):
li.remove(v)
access = fake_share.fake_access()
export_list = ['10.0.0.1']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
ret = self._helper._manage_access(access['access_type'],
access['access_to'], cbk)
self.assertIsNone(ret)
self._helper._get_vol_exports.assert_called_once_with()
argseq = (('volume', 'reset', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW),
('volume', 'set', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_REJECT, '*'))
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.gluster_call.call_args_list)
def test_allow_access_with_share_having_noaccess(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = ['10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.allow_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2,10.0.0.1'
argseq = (('volume', 'set', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW, export_str),
('volume', 'reset', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_REJECT))
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.gluster_call.call_args_list)
def test_allow_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = ['10.0.0.1']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.allow_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.gluster_call.called)
def test_deny_access_with_share_having_noaccess(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = []
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.deny_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
self.assertFalse(self._helper.gluster_manager.gluster_call.called)
def test_deny_access_with_share_having_access(self):
access = fake_share.fake_access()
share = fake_share.fake_share()
export_list = ['10.0.0.1', '10.0.0.2']
self.mock_object(self._helper, '_get_vol_exports',
mock.Mock(return_value=export_list))
self._helper.deny_access(None, share, access)
self._helper._get_vol_exports.assert_called_once_with()
export_str = '10.0.0.2'
argseq = (('volume', 'set', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_ALLOW, export_str),
('volume', 'reset', self._helper.gluster_manager.volume,
NFS_RPC_AUTH_REJECT))
self.assertEqual(
[mock.call(*a) for a in argseq],
self._helper.gluster_manager.gluster_call.call_args_list)
class GaneshaNFSHelperTestCase(test.TestCase):
"""Tests GaneshaNFSHelper."""