Refactor the Ceph NFS driver to use Cephadm NFS

CephAdmNFSProtocol helper has been added to allow users to consume NFS clusters
deployed using cephadm. This presents many advantages, since the operator
no longer needs to maintain their own instances of NFS Ganesha apart of the
Ceph cluster. For this, we now communicate with ceph mgr using
the nfs plugin. Read more about this plugin in
https://docs.ceph.com/en/latest/cephfs/nfs/

Implements: bp/use-cephadm-nfs-ganesha

DocImpact

Change-Id: I1826f2970528928a31b32a664013380e38bbd7c9
This commit is contained in:
Victoria Martinez de la Cruz 2022-07-07 16:31:52 +00:00
parent 30cc734f0f
commit f32be69fc4
5 changed files with 422 additions and 68 deletions

View File

@ -140,9 +140,15 @@ cephfs_opts = [
"multiple filesystems in the cluster."), "multiple filesystems in the cluster."),
] ]
cephfsnfs_opts = [
cfg.StrOpt('cephfs_nfs_cluster_id',
help="The ID of the NFS cluster to use."),
]
CONF = cfg.CONF CONF = cfg.CONF
CONF.register_opts(cephfs_opts) CONF.register_opts(cephfs_opts)
CONF.register_opts(cephfsnfs_opts)
class RadosError(Exception): class RadosError(Exception):
@ -151,8 +157,8 @@ class RadosError(Exception):
pass pass
def rados_command(rados_client, prefix=None, args=None, json_obj=False, def rados_command(rados_client, prefix=None, args=None,
target=None): json_obj=False, target=None, inbuf=None):
"""Safer wrapper for ceph_argparse.json_command """Safer wrapper for ceph_argparse.json_command
Raises error exception instead of relying on caller to check return Raises error exception instead of relying on caller to check return
@ -177,17 +183,21 @@ def rados_command(rados_client, prefix=None, args=None, json_obj=False,
argdict = args.copy() argdict = args.copy()
argdict['format'] = 'json' argdict['format'] = 'json'
if inbuf is None:
inbuf = b''
LOG.debug("Invoking ceph_argparse.json_command - rados_client=%(cl)s, " LOG.debug("Invoking ceph_argparse.json_command - rados_client=%(cl)s, "
"target=%(tg)s, prefix='%(pf)s', argdict=%(ad)s, " "target=%(tg)s, prefix='%(pf)s', argdict=%(ad)s, inbuf=%(ib)s, "
"timeout=%(to)s.", "timeout=%(to)s.",
{"cl": rados_client, "tg": target, "pf": prefix, "ad": argdict, {"cl": rados_client, "tg": target, "pf": prefix, "ad": argdict,
"to": RADOS_TIMEOUT}) "ib": inbuf, "to": RADOS_TIMEOUT})
try: try:
ret, outbuf, outs = json_command(rados_client, ret, outbuf, outs = json_command(rados_client,
target=target, target=target,
prefix=prefix, prefix=prefix,
argdict=argdict, argdict=argdict,
inbuf=inbuf,
timeout=RADOS_TIMEOUT) timeout=RADOS_TIMEOUT)
if ret != 0: if ret != 0:
raise rados.Error(outs, ret) raise rados.Error(outs, ret)
@ -223,6 +233,7 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
self._volname = None self._volname = None
self._ceph_mon_version = None self._ceph_mon_version = None
self.configuration.append_config_values(cephfs_opts) self.configuration.append_config_values(cephfs_opts)
self.configuration.append_config_values(cephfsnfs_opts)
try: try:
int(self.configuration.cephfs_volume_mode, 8) int(self.configuration.cephfs_volume_mode, 8)
@ -239,8 +250,14 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
protocol_helper_class = getattr( protocol_helper_class = getattr(
sys.modules[__name__], 'NativeProtocolHelper') sys.modules[__name__], 'NativeProtocolHelper')
else: else:
protocol_helper_class = getattr( # FIXME(vkmc) we intent to replace NFSProtocolHelper
sys.modules[__name__], 'NFSProtocolHelper') # with NFSClusterProtocolHelper helper in BB/CC release
if self.configuration.cephfs_nfs_cluster_id is None:
protocol_helper_class = getattr(
sys.modules[__name__], 'NFSProtocolHelper')
else:
protocol_helper_class = getattr(
sys.modules[__name__], 'NFSClusterProtocolHelper')
self.setup_default_ceph_cmd_target() self.setup_default_ceph_cmd_target()
@ -952,7 +969,75 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
return [4] return [4]
class NFSProtocolHelper(ganesha.GaneshaNASHelper2): class NFSProtocolHelperMixin():
def get_export_locations(self, share, subvolume_path):
export_locations = []
if not self.export_ips:
self.export_ips = self._get_export_ips()
for export_ip in self.export_ips:
# Try to escape the export ip. If it fails, means that the
# `cephfs_ganesha_server_ip` wasn't possibly set and the used
# address is the hostname
try:
server_address = driver_helpers.escaped_address(export_ip)
except ValueError:
server_address = export_ip
export_path = "{server_address}:{mount_path}".format(
server_address=server_address, mount_path=subvolume_path)
LOG.info("Calculated export path for share %(id)s: %(epath)s",
{"id": share['id'], "epath": export_path})
export_location = {
'path': export_path,
'is_admin_only': False,
'metadata': {},
}
export_locations.append(export_location)
return export_locations
def _get_export_path(self, share):
"""Callback to provide export path."""
argdict = {
"vol_name": self.volname,
"sub_name": share["id"]
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
path = rados_command(
self.rados_client, "fs subvolume getpath", argdict)
return path
def _get_export_pseudo_path(self, share):
"""Callback to provide pseudo path."""
return self._get_export_path(share)
def get_configured_ip_versions(self):
if not self.configured_ip_versions:
try:
if not self.export_ips:
self.export_ips = self._get_export_ips()
for export_ip in self.export_ips:
self.configured_ip_versions.add(
ipaddress.ip_address(str(export_ip)).version)
except Exception:
# export_ips contained a hostname, safest thing is to
# claim support for IPv4 and IPv6 address families
LOG.warning("Setting configured IP versions to [4, 6] since "
"a hostname (rather than IP address) was supplied "
"in 'cephfs_ganesha_server_ip' or "
"in 'cephfs_ganesha_export_ips'.")
return [4, 6]
return list(self.configured_ip_versions)
class NFSProtocolHelper(NFSProtocolHelperMixin, ganesha.GaneshaNASHelper2):
shared_data = {} shared_data = {}
supported_protocols = ('NFS',) supported_protocols = ('NFS',)
@ -980,9 +1065,7 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
self.rados_client = kwargs.pop('rados_client') self.rados_client = kwargs.pop('rados_client')
if not hasattr(self, 'volname'): if not hasattr(self, 'volname'):
self.volname = kwargs.pop('volname') self.volname = kwargs.pop('volname')
self.export_ips = config_object.cephfs_ganesha_export_ips self.export_ips = None
if not self.export_ips:
self.export_ips = [self.ganesha_host]
self.configured_ip_versions = set() self.configured_ip_versions = set()
self.config = config_object self.config = config_object
@ -998,30 +1081,6 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
"hostname.") % export_ip) "hostname.") % export_ip)
raise exception.InvalidParameterValue(err=msg) raise exception.InvalidParameterValue(err=msg)
def get_export_locations(self, share, subvolume_path):
export_locations = []
for export_ip in self.export_ips:
# Try to escape the export ip. If it fails, means that the
# `cephfs_ganesha_server_ip` wasn't possibly set and the used
# address is the hostname
try:
server_address = driver_helpers.escaped_address(export_ip)
except ValueError:
server_address = export_ip
export_path = "{server_address}:{mount_path}".format(
server_address=server_address, mount_path=subvolume_path)
LOG.info("Calculated export path for share %(id)s: %(epath)s",
{"id": share['id'], "epath": export_path})
export_location = {
'path': export_path,
'is_admin_only': False,
'metadata': {},
}
export_locations.append(export_location)
return export_locations
def _default_config_hook(self): def _default_config_hook(self):
"""Callback to provide default export block.""" """Callback to provide default export block."""
dconf = super(NFSProtocolHelper, self)._default_config_hook() dconf = super(NFSProtocolHelper, self)._default_config_hook()
@ -1070,36 +1129,160 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
rados_command(self.rados_client, "fs subvolume deauthorize", argdict) rados_command(self.rados_client, "fs subvolume deauthorize", argdict)
def _get_export_path(self, share): def _get_export_ips(self):
"""Callback to provide export path.""" export_ips = self.config.cephfs_ganesha_export_ips
if not export_ips:
export_ips = [self.ganesha_host]
return export_ips
class NFSClusterProtocolHelper(NFSProtocolHelperMixin, ganesha.NASHelperBase):
supported_access_types = ('ip', )
supported_access_levels = (constants.ACCESS_LEVEL_RW,
constants.ACCESS_LEVEL_RO)
def __init__(self, execute, config_object, **kwargs):
self.rados_client = kwargs.pop('rados_client')
self.volname = kwargs.pop('volname')
self.configured_ip_versions = set()
self.configuration = config_object
self._nfs_clusterid = None
self.export_ips = None
super(NFSClusterProtocolHelper, self).__init__(execute,
config_object,
**kwargs)
@property
def nfs_clusterid(self):
# ID of the NFS cluster where the driver exports shares
if self._nfs_clusterid:
return self._nfs_clusterid
self._nfs_clusterid = (
self.configuration.safe_get('cephfs_nfs_cluster_id'))
if not self._nfs_clusterid:
msg = _("The NFS Cluster ID has not been configured"
"Please check cephfs_nfs_cluster_id option "
"has been correctly set in the backend configuration.")
raise exception.ShareBackendException(msg=msg)
return self._nfs_clusterid
def _get_export_ips(self):
"""Get NFS cluster export ips."""
nfs_clusterid = self.nfs_clusterid
export_ips = []
argdict = { argdict = {
"vol_name": self.volname, "nfs_cluster_id": nfs_clusterid,
"sub_name": share["id"]
} }
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
path = rados_command( output = rados_command(self.rados_client, "nfs cluster info", argdict)
self.rados_client, "fs subvolume getpath", argdict)
return path nfs_cluster_info = json.loads(output)
def _get_export_pseudo_path(self, share): # NFS has been deployed with an ingress
"""Callback to provide pseudo path.""" # we use the VIP for the export ips
return self._get_export_path(share) vip = nfs_cluster_info[nfs_clusterid]["virtual_ip"]
def get_configured_ip_versions(self): # there is no VIP, we fallback to NFS cluster ips
if not self.configured_ip_versions: if not vip:
hosts = nfs_cluster_info[nfs_clusterid]["backend"]
for host in hosts:
export_ips.append(host["ip"])
else:
export_ips.append(vip)
return export_ips
def check_for_setup_error(self):
"""Returns an error if prerequisites aren't met."""
return
def _allow_access(self, share, access):
"""Allow access to the share."""
export = {
"path": self._get_export_path(share),
"nfs_cluster_id": self.nfs_clusterid,
"pseudo": self._get_export_pseudo_path(share),
"squash": "none",
"security_label": True,
"protocols": [4],
"fsal": {
"name": "CEPH",
"fs_name": self.volname,
},
"clients": access
}
argdict = {
"nfs_cluster_id": self.nfs_clusterid,
}
inbuf = json.dumps(export).encode('utf-8')
rados_command(self.rados_client,
"nfs export apply", argdict, inbuf=inbuf)
def _deny_access(self, share):
"""Deny access to the share."""
argdict = {
"nfs_cluster_id": self.nfs_clusterid,
"pseudo_path": self._get_export_pseudo_path(share)
}
rados_command(self.rados_client, "nfs export rm", argdict)
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
"""Update access rules of share.
Creates an export per share. Modifies access rules of shares by
dynamically updating exports via ceph nfs.
"""
rule_state_map = {}
wanted_rw_clients, wanted_ro_clients = [], []
for rule in access_rules:
try: try:
for export_ip in self.export_ips: ganesha_utils.validate_access_rule(
self.configured_ip_versions.add( self.supported_access_types, self.supported_access_levels,
ipaddress.ip_address(str(export_ip)).version) rule, True)
except Exception: except (exception.InvalidShareAccess,
# export_ips contained a hostname, safest thing is to exception.InvalidShareAccessLevel):
# claim support for IPv4 and IPv6 address families rule_state_map[rule['id']] = {'state': 'error'}
LOG.warning("Setting configured IP versions to [4, 6] since " continue
"a hostname (rather than IP address) was supplied "
"in 'cephfs_ganesha_server_ip' or " rule = ganesha_utils.fixup_access_rule(rule)
"in 'cephfs_ganesha_export_ips'.") if rule['access_level'] == 'rw':
return [4, 6] wanted_rw_clients.append(rule['access_to'])
return list(self.configured_ip_versions) elif rule['access_level'] == 'ro':
wanted_ro_clients.append(rule['access_to'])
if access_rules:
# add or update export
clients = []
if wanted_ro_clients:
clients.append({
'access_type': 'ro',
'addresses': wanted_ro_clients,
'squash': 'none'
})
if wanted_rw_clients:
clients.append({
'access_type': 'rw',
'addresses': wanted_rw_clients,
'squash': 'none'
})
if clients: # empty list if no rules passed validation
self._allow_access(share, clients)
else:
# no clients have access to the share. remove export
self._deny_access(share)
return rule_state_map

View File

@ -215,7 +215,7 @@ def setup_rados():
rados = importutils.import_module('rados') rados = importutils.import_module('rados')
except ImportError: except ImportError:
raise exception.ShareBackendException( raise exception.ShareBackendException(
_("python-rados is not installed")) _("rados python module is not installed"))
class GaneshaManager(object): class GaneshaManager(object):

View File

@ -105,7 +105,11 @@ def validate_access_rule(supported_access_types, supported_access_levels,
errmsg = _("Unsupported access rule of 'type' %(access_type)s, " errmsg = _("Unsupported access rule of 'type' %(access_type)s, "
"'level' %(access_level)s, 'to' %(access_to)s: " "'level' %(access_level)s, 'to' %(access_to)s: "
"%(field)s should be one of %(supported)s.") "%(field)s should be one of %(supported)s.")
access_param = access_rule.to_dict()
if not isinstance(access_rule, dict):
access_param = access_rule.to_dict()
else:
access_param = access_rule
def validate(field, supported_tokens, excinfo): def validate(field, supported_tokens, excinfo):
if access_rule['access_%s' % field] in supported_tokens: if access_rule['access_%s' % field] in supported_tokens:

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import json
from unittest import mock from unittest import mock
import ddt import ddt
@ -88,6 +89,7 @@ class CephFSDriverTestCase(test.TestCase):
self.mock_object(driver, "json_command", MockCephArgparseModule) self.mock_object(driver, "json_command", MockCephArgparseModule)
self.mock_object(driver, 'NativeProtocolHelper') self.mock_object(driver, 'NativeProtocolHelper')
self.mock_object(driver, 'NFSProtocolHelper') self.mock_object(driver, 'NFSProtocolHelper')
self.mock_object(driver, 'NFSClusterProtocolHelper')
driver.ceph_default_target = ('mon-mgr', ) driver.ceph_default_target = ('mon-mgr', )
@ -101,10 +103,17 @@ class CephFSDriverTestCase(test.TestCase):
self.mock_object(share_types, 'get_share_type_extra_specs', self.mock_object(share_types, 'get_share_type_extra_specs',
mock.Mock(return_value={})) mock.Mock(return_value={}))
@ddt.data('cephfs', 'nfs') @ddt.data(
def test_do_setup(self, protocol_helper): ('cephfs', None),
('nfs', None),
('nfs', 'fs-manila')
)
@ddt.unpack
def test_do_setup(self, protocol_helper, cephfs_nfs_cluster_id):
self._driver.configuration.cephfs_protocol_helper_type = ( self._driver.configuration.cephfs_protocol_helper_type = (
protocol_helper) protocol_helper)
self.fake_conf.set_default('cephfs_nfs_cluster_id',
cephfs_nfs_cluster_id)
self._driver.do_setup(self._context) self._driver.do_setup(self._context)
@ -114,10 +123,16 @@ class CephFSDriverTestCase(test.TestCase):
rados_client=self._driver._rados_client, rados_client=self._driver._rados_client,
volname=self._driver.volname) volname=self._driver.volname)
else: else:
driver.NFSProtocolHelper.assert_called_once_with( if self.fake_conf.cephfs_nfs_cluster_id is None:
self._execute, self._driver.configuration, driver.NFSProtocolHelper.assert_called_once_with(
rados_client=self._driver._rados_client, self._execute, self._driver.configuration,
volname=self._driver.volname) rados_client=self._driver._rados_client,
volname=self._driver.volname)
else:
driver.NFSClusterProtocolHelper.assert_called_once_with(
self._execute, self._driver.configuration,
rados_client=self._driver._rados_client,
volname=self._driver.volname)
self._driver.protocol_helper.init_helper.assert_called_once_with() self._driver.protocol_helper.init_helper.assert_called_once_with()
@ -1219,6 +1234,136 @@ class NFSProtocolHelperTestCase(test.TestCase):
self.assertEqual('/foo/bar', ret) self.assertEqual('/foo/bar', ret)
@ddt.ddt
class NFSClusterProtocolHelperTestCase(test.TestCase):
def setUp(self):
super(NFSClusterProtocolHelperTestCase, self).setUp()
self._execute = mock.Mock()
self._context = context.get_admin_context()
self._share = fake_share.fake_share(share_proto='NFS')
self._rados_client = MockRadosModule.Rados()
self._volname = "cephfs"
self.fake_conf = configuration.Configuration(None)
self.mock_object(driver.NFSClusterProtocolHelper,
'_get_export_path',
mock.Mock(return_value="ganesha:/foo/bar"))
self.mock_object(driver.NFSClusterProtocolHelper,
'_get_export_pseudo_path',
mock.Mock(return_value="ganesha:/foo/bar"))
self.mock_object(driver, "rados_command")
driver.ceph_default_target = ('mon-mgr', )
self._nfscluster_protocol_helper = driver.NFSClusterProtocolHelper(
self._execute,
self.fake_conf,
rados_client=self._rados_client,
volname=self._volname)
type(self._nfscluster_protocol_helper).nfs_clusterid = (
mock.PropertyMock(return_value='fs-manila'))
@ddt.data(constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO)
def test_allow_access_rw_ro(self, mode):
access_allow_prefix = "nfs export apply"
nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
volname = self._nfscluster_protocol_helper.volname
clients = {
'access_type': mode,
'addresses': ['10.0.0.1'],
'squash': 'none'
}
access_allow_dict = {
"nfs_cluster_id": nfs_clusterid,
}
export = {
"path": "ganesha:/foo/bar",
"nfs_cluster_id": nfs_clusterid,
"pseudo": "ganesha:/foo/bar",
"squash": "none",
"security_label": True,
"protocols": [4],
"fsal": {
"name": "CEPH",
"fs_name": volname,
},
"clients": clients
}
inbuf = json.dumps(export).encode('utf-8')
self._nfscluster_protocol_helper._allow_access(self._share, clients)
driver.rados_command.assert_called_once_with(
self._rados_client,
access_allow_prefix, access_allow_dict, inbuf=inbuf)
def test_deny_access(self):
access_deny_prefix = "nfs export rm"
nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
access_deny_dict = {
"nfs_cluster_id": nfs_clusterid,
"pseudo_path": "ganesha:/foo/bar"
}
self._nfscluster_protocol_helper._deny_access(self._share)
driver.rados_command.assert_called_once_with(
self._rados_client,
access_deny_prefix, access_deny_dict)
def test_get_export_locations(self):
cluster_info_prefix = "nfs cluster info"
nfs_clusterid = self._nfscluster_protocol_helper.nfs_clusterid
cluster_info_dict = {
"nfs_cluster_id": nfs_clusterid,
}
cluster_info = {"fs-manila": {
"virtual_ip": None,
"backend": [
{"hostname": "fake-ceph-node-1",
"ip": "10.0.0.10",
"port": "1010"},
{"hostname": "fake-ceph-node-2",
"ip": "10.0.0.11",
"port": "1011"}
]
}}
driver.rados_command.return_value = json.dumps(cluster_info)
fake_cephfs_subvolume_path = "/foo/bar"
expected_export_locations = [{
'path': '10.0.0.10:/foo/bar',
'is_admin_only': False,
'metadata': {},
}, {
'path': '10.0.0.11:/foo/bar',
'is_admin_only': False,
'metadata': {},
}]
export_locations = (
self._nfscluster_protocol_helper.get_export_locations(
self._share, fake_cephfs_subvolume_path))
driver.rados_command.assert_called_once_with(
self._rados_client,
cluster_info_prefix, cluster_info_dict)
self.assertEqual(expected_export_locations, export_locations)
@ddt.ddt @ddt.ddt
class CephFSDriverAltConfigTestCase(test.TestCase): class CephFSDriverAltConfigTestCase(test.TestCase):
"""Test the CephFS driver with non-default config values.""" """Test the CephFS driver with non-default config values."""

View File

@ -0,0 +1,22 @@
---
features:
- |
NFSClusterProtocolHelper has been added to allow users to consume to export CephFS shares
over a clustered NFS gateway. This presents many advantages, since the operator no longer
needs to maintain their own instances of NFS Ganesha apart of the Ceph cluster.
For this, we now communicate with ceph mgr using the nfs plugin. Read more
about this plugin in https://docs.ceph.com/en/latest/cephfs/nfs/
upgrade:
- |
The CephFS driver now supports a new configuration option:
* cephfs_nfs_cluster_id (string option): name of the nfs cluster to use.
This option can be used to specify which NFS cluster to use.
other:
- |
Since the CephFS driver is now capable of using ceph manager commands to manage
NFS exports, we would like to deprecate and remove support for managing exports
with the help of DBUS in a future release. Please use cephadm deployed NFS ganesha
clusters in greenfield deployments with OpenStack Manila and refrain from using
a standalone non-clustered nfs-ganesha service with this driver. As this solution
is hardened for HA within Ceph, we expect to provide code to help migrate existing
nfs-ganesha exports to the nfs-ganesha clusters in a future release.