Update cephfs drivers to use ceph-mgr client

Use python rados client to talk to the ceph-mgr service.

A python rados client is created by the driver that lasts
during the driver's lifecycle.

The drivers can now work with multiple filesystem clusters.
The filesystem to be used by manila can be specified by the
driver option 'cephfs_filesystem_name'.

The removal of a share will be quicker for the manila user.
The ceph-mgr volumes module moves the share's content to
a trash folder and purges the trash's contents
(`rm -rf` of the backend CephFS subvolume/subtree) aysnchronously,
whereas the ceph_volume_client library moves the share's content
and purges the content synchronously.

Implements: bp update-cephfs-drivers

Co-Authored-By: Victoria Martinez de la Cruz <victoria@redhat.com>
Co-Authored-By: Ramana Raja <rraja@redhat.com>
Co-Authored-By: Tom Barron <tpb@dyncloud.net>

DocImpact

Change-Id: I1f81db1ba7724c0784d87f9cb92bb696f6778806
This commit is contained in:
Victoria Martinez de la Cruz 2021-03-09 13:24:14 +00:00 committed by Tom Barron
parent 7f6aa6dbd3
commit a830710939
7 changed files with 1059 additions and 479 deletions

View File

@ -15,14 +15,15 @@
import ipaddress
import json
import socket
import sys
from oslo_config import cfg
from oslo_config import types
from oslo_log import log
from oslo_utils import importutils
from oslo_utils import units
import six
from manila.common import constants
from manila import exception
@ -33,14 +34,30 @@ from manila.share import driver
from manila.share.drivers import ganesha
from manila.share.drivers.ganesha import utils as ganesha_utils
from manila.share.drivers import helpers as driver_helpers
from manila.share import share_types
try:
import ceph_volume_client
ceph_module_found = True
except ImportError:
ceph_volume_client = None
ceph_module_found = False
rados = None
json_command = None
def setup_rados():
global rados
if not rados:
try:
rados = importutils.import_module('rados')
except ImportError:
raise exception.ShareBackendException(
_("rados python module is not installed"))
def setup_json_command():
global json_command
if not json_command:
try:
json_command = importutils.import_class(
'ceph_argparse.json_command')
except ImportError:
raise exception.ShareBackendException(
_("ceph_argparse python module is not installed"))
CEPHX_ACCESS_TYPE = "cephx"
@ -50,6 +67,7 @@ CEPH_DEFAULT_AUTH_ID = "admin"
DEFAULT_VOLUME_MODE = '755'
RADOS_TIMEOUT = 10
LOG = log.getLogger(__name__)
@ -66,6 +84,10 @@ cephfs_opts = [
help="The name of the ceph auth identity to use."
),
cfg.StrOpt('cephfs_volume_path_prefix',
deprecated_for_removal=True,
deprecated_since='Wallaby',
deprecated_reason='This option is not used starting with '
'the Nautilus release of Ceph.',
default="/volumes",
help="The prefix of the cephfs volume path."
),
@ -111,6 +133,9 @@ cephfs_opts = [
help="The read/write/execute permissions mode for CephFS "
"volumes, snapshots, and snapshot groups expressed in "
"Octal as with linux 'chmod' or 'umask' commands."),
cfg.StrOpt('cephfs_filesystem_name',
help="The name of the filesystem to use, if there are "
"multiple filesystems in the cluster."),
]
@ -118,10 +143,60 @@ CONF = cfg.CONF
CONF.register_opts(cephfs_opts)
def cephfs_share_path(share):
"""Get VolumePath from Share."""
return ceph_volume_client.VolumePath(
share['share_group_id'], share['id'])
class RadosError(Exception):
"""Something went wrong talking to Ceph with librados"""
pass
def rados_command(rados_client, prefix=None, args=None, json_obj=False):
"""Safer wrapper for ceph_argparse.json_command
Raises error exception instead of relying on caller to check return
codes.
Error exception can result from:
* Timeout
* Actual legitimate errors
* Malformed JSON output
return: If json_obj is True, return the decoded JSON object from ceph,
or None if empty string returned.
If json is False, return a decoded string (the data returned by
ceph command)
"""
if args is None:
args = {}
argdict = args.copy()
argdict['format'] = 'json'
LOG.debug("Invoking ceph_argparse.json_command - rados_client=%(cl)s, "
"prefix='%(pf)s', argdict=%(ad)s, timeout=%(to)s.",
{"cl": rados_client, "pf": prefix, "ad": argdict,
"to": RADOS_TIMEOUT})
try:
ret, outbuf, outs = json_command(rados_client,
prefix=prefix,
argdict=argdict,
timeout=RADOS_TIMEOUT)
if ret != 0:
raise rados.Error(outs, ret)
if not json_obj:
result = outbuf.decode().strip()
else:
if outbuf:
result = json.loads(outbuf.decode().strip())
else:
result = None
except Exception as e:
msg = _("json_command failed - prefix=%(pfx)s, argdict=%(ad)s - "
"exception message: %(ex)s." %
{"pfx": prefix, "ad": argdict, "ex": e})
raise exception.ShareBackendException(msg)
return result
class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
@ -133,18 +208,22 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
self.backend_name = self.configuration.safe_get(
'share_backend_name') or 'CephFS'
self._volume_client = None
setup_rados()
setup_json_command()
self._rados_client = None
# name of the filesystem/volume used by the driver
self._volname = None
self.configuration.append_config_values(cephfs_opts)
try:
self._cephfs_volume_mode = int(
self.configuration.cephfs_volume_mode, 8)
int(self.configuration.cephfs_volume_mode, 8)
except ValueError:
msg = _("Invalid CephFS volume mode %s")
raise exception.BadConfigurationException(
msg % self.configuration.cephfs_volume_mode)
self._cephfs_volume_mode = self.configuration.cephfs_volume_mode
self.ipv6_implemented = True
def do_setup(self, context):
@ -158,7 +237,8 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
self.protocol_helper = protocol_helper_class(
self._execute,
self.configuration,
ceph_vol_client=self.volume_client)
rados_client=self.rados_client,
volname=self.volname)
self.protocol_helper.init_helper()
@ -167,7 +247,7 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
self.protocol_helper.check_for_setup_error()
def _update_share_stats(self):
stats = self.volume_client.rados.get_cluster_stats()
stats = self.rados_client.get_cluster_stats()
total_capacity_gb = round(stats['kb'] / units.Mi, 2)
free_capacity_gb = round(stats['kb_avail'] / units.Mi, 2)
@ -210,41 +290,58 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
return gigs * units.Gi
@property
def volume_client(self):
if self._volume_client:
return self._volume_client
if not ceph_module_found:
raise exception.ManilaException(
_("Ceph client libraries not found.")
)
def rados_client(self):
if self._rados_client:
return self._rados_client
conf_path = self.configuration.safe_get('cephfs_conf_path')
cluster_name = self.configuration.safe_get('cephfs_cluster_name')
auth_id = self.configuration.safe_get('cephfs_auth_id')
volume_prefix = self.configuration.safe_get(
'cephfs_volume_path_prefix')
self._volume_client = ceph_volume_client.CephFSVolumeClient(
auth_id, conf_path, cluster_name, volume_prefix=volume_prefix)
LOG.info("[%(be)s}] Ceph client found, connecting...",
self._rados_client = rados.Rados(
name="client.{0}".format(auth_id),
clustername=cluster_name,
conffile=conf_path,
conf={}
)
LOG.info("[%(be)s] Ceph client found, connecting...",
{"be": self.backend_name})
if auth_id != CEPH_DEFAULT_AUTH_ID:
# Evict any other manila sessions. Only do this if we're
# using a client ID that isn't the default admin ID, to avoid
# rudely disrupting anyone else.
premount_evict = auth_id
else:
premount_evict = None
try:
self._volume_client.connect(premount_evict=premount_evict)
if self._rados_client.state != "connected":
self._rados_client.connect()
except Exception:
self._volume_client = None
raise
self._rados_client = None
raise exception.ShareBackendException(
"[%(be)s] Ceph client failed to connect.",
{"be": self.backend_name})
else:
LOG.info("[%(be)s] Ceph client connection complete.",
{"be": self.backend_name})
return self._volume_client
return self._rados_client
@property
def volname(self):
# Name of the CephFS volume/filesystem where the driver creates
# manila entities such as shares, sharegroups, snapshots, etc.
if self._volname:
return self._volname
self._volname = self.configuration.safe_get('cephfs_filesystem_name')
if not self._volname:
out = rados_command(
self.rados_client, "fs volume ls", json_obj=True)
if len(out) == 1:
self._volname = out[0]['name']
else:
if len(out) > 1:
msg = _("Specify Ceph filesystem name using "
"'cephfs_filesystem_name' driver option.")
else:
msg = _("No Ceph filesystem found.")
raise exception.ShareBackendException(msg=msg)
return self._volname
def create_share(self, context, share, share_server=None):
"""Create a CephFS volume.
@ -260,34 +357,55 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
if (requested_proto != supported_proto):
msg = _("Share protocol %s is not supported.") % requested_proto
raise exception.ShareBackendException(msg=msg)
# `share` is a Share
msg = _("create_share {be} name={id} size={size}"
" share_group_id={group}")
LOG.debug(msg.format(
be=self.backend_name, id=share['id'], size=share['size'],
group=share['share_group_id']))
extra_specs = share_types.get_extra_specs_from_share(share)
data_isolated = extra_specs.get("cephfs:data_isolated", False)
size = self._to_bytes(share['size'])
# Create the CephFS volume
cephfs_volume = self.volume_client.create_volume(
cephfs_share_path(share), size=size, data_isolated=data_isolated,
mode=self._cephfs_volume_mode)
LOG.debug("[%(be)s]: create_share: id=%(id)s, size=%(sz)s, "
"group=%(gr)s.",
{"be": self.backend_name, "id": share['id'],
"sz": share['size'], "gr": share['share_group_id']})
return self.protocol_helper.get_export_locations(share, cephfs_volume)
# create FS subvolume/share
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"size": size,
"namespace_isolated": True,
"mode": self._cephfs_volume_mode,
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
rados_command(self.rados_client, "fs subvolume create", argdict)
# get path of FS subvolume/share
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
subvolume_path = rados_command(
self.rados_client, "fs subvolume getpath", argdict)
return self.protocol_helper.get_export_locations(share, subvolume_path)
def delete_share(self, context, share, share_server=None):
extra_specs = share_types.get_extra_specs_from_share(share)
data_isolated = extra_specs.get("cephfs:data_isolated", False)
# remove FS subvolume/share
self.volume_client.delete_volume(cephfs_share_path(share),
data_isolated=data_isolated)
self.volume_client.purge_volume(cephfs_share_path(share),
data_isolated=data_isolated)
LOG.debug("[%(be)s]: delete_share: id=%(id)s, group=%(gr)s.",
{"be": self.backend_name, "id": share['id'],
"gr": share['share_group_id']})
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"force": True,
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
rados_command(self.rados_client, "fs subvolume rm", argdict)
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
@ -300,65 +418,151 @@ class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin,
return self.create_share(context, share, share_server)
def extend_share(self, share, new_size, share_server=None):
# resize FS subvolume/share
LOG.debug("[%(be)s]: extend_share: share=%(id)s, size=%(sz)s.",
{"be": self.backend_name, "id": share['id'],
"sz": new_size})
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"new_size": self._to_bytes(new_size),
}
if share['share_group_id'] is not None:
argdict.update({"group_name": share["share_group_id"]})
LOG.debug("extend_share {id} {size}".format(
id=share['id'], size=new_size))
self.volume_client.set_max_bytes(cephfs_share_path(share),
self._to_bytes(new_size))
rados_command(self.rados_client, "fs subvolume resize", argdict)
def shrink_share(self, share, new_size, share_server=None):
LOG.debug("shrink_share {id} {size}".format(
id=share['id'], size=new_size))
new_bytes = self._to_bytes(new_size)
used = self.volume_client.get_used_bytes(cephfs_share_path(share))
if used > new_bytes:
# While in fact we can "shrink" our volumes to less than their
# used bytes (it's just a quota), raise error anyway to avoid
# confusing API consumers that might depend on typical shrink
# behaviour.
raise exception.ShareShrinkingPossibleDataLoss(
share_id=share['id'])
# resize FS subvolume/share
LOG.debug("[%(be)s]: shrink_share: share=%(id)s, size=%(sz)s.",
{"be": self.backend_name, "id": share['id'],
"sz": new_size})
self.volume_client.set_max_bytes(cephfs_share_path(share), new_bytes)
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"new_size": self._to_bytes(new_size),
"no_shrink": True,
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
try:
rados_command(self.rados_client, "fs subvolume resize", argdict)
except exception.ShareBackendException as e:
if 'would be lesser than' in str(e).lower():
raise exception.ShareShrinkingPossibleDataLoss(
share_id=share['id'])
raise
def create_snapshot(self, context, snapshot, share_server=None):
self.volume_client.create_snapshot_volume(
cephfs_share_path(snapshot['share']),
'_'.join([snapshot['snapshot_id'], snapshot['id']]),
mode=self._cephfs_volume_mode)
# create a FS snapshot
LOG.debug("[%(be)s]: create_snapshot: original share=%(id)s, "
"snapshot=%(sn)s.",
{"be": self.backend_name, "id": snapshot['share_id'],
"sn": snapshot['id']})
argdict = {
"vol_name": self.volname,
"sub_name": snapshot["share_id"],
"snap_name": "_".join([snapshot["snapshot_id"], snapshot["id"]]),
}
rados_command(
self.rados_client, "fs subvolume snapshot create", argdict)
def delete_snapshot(self, context, snapshot, share_server=None):
self.volume_client.destroy_snapshot_volume(
cephfs_share_path(snapshot['share']),
'_'.join([snapshot['snapshot_id'], snapshot['id']]))
# delete a FS snapshot
LOG.debug("[%(be)s]: delete_snapshot: snapshot=%(id)s.",
{"be": self.backend_name, "id": snapshot['id']})
argdict = {
"vol_name": self.volname,
"sub_name": snapshot["share_id"],
"snap_name": '_'.join([snapshot['snapshot_id'], snapshot['id']]),
"force": True,
}
rados_command(self.rados_client, "fs subvolume snapshot rm", argdict)
def create_share_group(self, context, sg_dict, share_server=None):
self.volume_client.create_group(sg_dict['id'],
mode=self._cephfs_volume_mode)
# delete a FS group
LOG.debug("[%(be)s]: create_share_group: share_group=%(id)s.",
{"be": self.backend_name, "id": sg_dict['id']})
argdict = {
"vol_name": self.volname,
"group_name": sg_dict['id'],
"mode": self._cephfs_volume_mode,
}
rados_command(self.rados_client, "fs subvolumegroup create", argdict)
def delete_share_group(self, context, sg_dict, share_server=None):
self.volume_client.destroy_group(sg_dict['id'])
# create a FS group
LOG.debug("[%(be)s]: delete_share_group: share_group=%(id)s.",
{"be": self.backend_name, "id": sg_dict['id']})
argdict = {
"vol_name": self.volname,
"group_name": sg_dict['id'],
"force": True,
}
rados_command(self.rados_client, "fs subvolumegroup rm", argdict)
def delete_share_group_snapshot(self, context, snap_dict,
share_server=None):
self.volume_client.destroy_snapshot_group(
snap_dict['share_group_id'],
snap_dict['id'])
# delete a FS group snapshot
LOG.debug("[%(be)s]: delete_share_group_snapshot: "
"share_group=%(sg_id)s, snapshot=%(sn)s.",
{"be": self.backend_name, "sg_id": snap_dict['id'],
"sn": snap_dict["share_group_id"]})
argdict = {
"vol_name": self.volname,
"group_name": snap_dict["share_group_id"],
"snap_name": snap_dict["id"],
"force": True,
}
rados_command(
self.rados_client, "fs subvolumegroup snapshot rm", argdict)
return None, []
def create_share_group_snapshot(self, context, snap_dict,
share_server=None):
self.volume_client.create_snapshot_group(
snap_dict['share_group_id'],
snap_dict['id'],
mode=self._cephfs_volume_mode)
# create a FS group snapshot
LOG.debug("[%(be)s]: create_share_group_snapshot: share_group=%(id)s, "
"snapshot=%(sn)s.",
{"be": self.backend_name, "id": snap_dict['share_group_id'],
"sn": snap_dict["id"]})
argdict = {
"vol_name": self.volname,
"group_name": snap_dict["share_group_id"],
"snap_name": snap_dict["id"]
}
rados_command(
self.rados_client, "fs subvolumegroup snapshot create", argdict)
return None, []
def __del__(self):
if self._volume_client:
self._volume_client.disconnect()
self._volume_client = None
if self._rados_client:
LOG.info("[%(be)s] Ceph client disconnecting...",
{"be": self.backend_name})
self._rados_client.shutdown()
self._rados_client = None
LOG.info("[%(be)s] Ceph client disconnected",
{"be": self.backend_name})
def get_configured_ip_versions(self):
return self.protocol_helper.get_configured_ip_versions()
@ -372,7 +576,8 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
constants.ACCESS_LEVEL_RO)
def __init__(self, execute, config, **kwargs):
self.volume_client = kwargs.pop('ceph_vol_client')
self.rados_client = kwargs.pop('rados_client')
self.volname = kwargs.pop('volname')
self.message_api = message_api.API()
super(NativeProtocolHelper, self).__init__(execute, config,
**kwargs)
@ -384,13 +589,22 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
"""Returns an error if prerequisites aren't met."""
return
def get_export_locations(self, share, cephfs_volume):
def get_mon_addrs(self):
result = []
mon_map = rados_command(self.rados_client, "mon dump", json_obj=True)
for mon in mon_map['mons']:
ip_port = mon['addr'].split("/")[0]
result.append(ip_port)
return result
def get_export_locations(self, share, subvolume_path):
# To mount this you need to know the mon IPs and the path to the volume
mon_addrs = self.volume_client.get_mon_addrs()
mon_addrs = self.get_mon_addrs()
export_location = "{addrs}:{path}".format(
addrs=",".join(mon_addrs),
path=cephfs_volume['mount_path'])
path=subvolume_path)
LOG.info("Calculated export location for share %(id)s: %(loc)s",
{"id": share['id'], "loc": export_location})
@ -418,31 +632,36 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
ceph_auth_id)
raise exception.InvalidShareAccess(reason=error_message)
if not getattr(self.volume_client, 'version', None):
if access['access_level'] == constants.ACCESS_LEVEL_RO:
LOG.error("Need python-cephfs package version 10.2.3 or "
"greater to enable read-only access.")
raise exception.InvalidShareAccessLevel(
level=constants.ACCESS_LEVEL_RO)
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"auth_id": ceph_auth_id,
"tenant_id": share["project_id"],
}
auth_result = self.volume_client.authorize(
cephfs_share_path(share), ceph_auth_id)
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
readonly = access['access_level'] == constants.ACCESS_LEVEL_RO
if readonly:
argdict.update({"access_level": "r"})
else:
readonly = access['access_level'] == constants.ACCESS_LEVEL_RO
try:
auth_result = self.volume_client.authorize(
cephfs_share_path(share), ceph_auth_id, readonly=readonly,
tenant_id=share['project_id'])
except Exception as e:
if 'not allowed' in str(e).lower():
msg = ("Access to client %(client)s is not allowed. "
"Reason: %(reason)s")
msg_payload = {'client': ceph_auth_id, 'reason': e}
raise exception.InvalidShareAccess(
reason=msg % msg_payload)
raise
argdict.update({"access_level": "rw"})
return auth_result['auth_key']
try:
auth_result = rados_command(
self.rados_client, "fs subvolume authorize", argdict)
except exception.ShareBackendException as e:
if 'not allowed' in str(e).lower():
msg = ("Access to client %(client)s is not allowed. "
"Reason: %(reason)s")
msg_payload = {'client': ceph_auth_id, 'reason': e}
raise exception.InvalidShareAccess(
reason=msg % msg_payload)
raise
return auth_result
def _deny_access(self, context, share, access, share_server=None):
if access['access_type'] != CEPHX_ACCESS_TYPE:
@ -451,35 +670,50 @@ class NativeProtocolHelper(ganesha.NASHelperBase):
{"type": access['access_type']})
return
self.volume_client.deauthorize(cephfs_share_path(share),
access['access_to'])
self.volume_client.evict(
access['access_to'],
volume_path=cephfs_share_path(share))
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"auth_id": access['access_to']
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
rados_command(self.rados_client, "fs subvolume deauthorize", argdict)
rados_command(self.rados_client, "fs subvolume evict", argdict)
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
access_updates = {}
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
if not (add_rules or delete_rules): # recovery/maintenance mode
add_rules = access_rules
existing_auths = None
# The unversioned volume client cannot fetch from the Ceph backend,
# the list of auth IDs that have share access.
if getattr(self.volume_client, 'version', None):
existing_auths = self.volume_client.get_authorized_ids(
cephfs_share_path(share))
existing_auths = rados_command(
self.rados_client, "fs subvolume authorized_list",
argdict, json_obj=True)
if existing_auths:
existing_auth_ids = set(
[auth[0] for auth in existing_auths])
existing_auth_ids = set()
for rule in range(len(existing_auths)):
for cephx_id in existing_auths[rule]:
existing_auth_ids.add(cephx_id)
want_auth_ids = set(
[rule['access_to'] for rule in add_rules])
delete_auth_ids = existing_auth_ids.difference(
want_auth_ids)
for delete_auth_id in delete_auth_ids:
delete_auth_ids_list = delete_auth_ids
for delete_auth_id in delete_auth_ids_list:
delete_rules.append(
{
'access_to': delete_auth_id,
@ -562,8 +796,10 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
super(NFSProtocolHelper, self).__init__(execute, config_object,
**kwargs)
if not hasattr(self, 'ceph_vol_client'):
self.ceph_vol_client = kwargs.pop('ceph_vol_client')
if not hasattr(self, 'rados_client'):
self.rados_client = kwargs.pop('rados_client')
if not hasattr(self, 'volname'):
self.volname = kwargs.pop('volname')
self.export_ips = config_object.cephfs_ganesha_export_ips
if not self.export_ips:
self.export_ips = [self.ganesha_host]
@ -582,12 +818,12 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
"hostname.") % export_ip)
raise exception.InvalidParameterValue(err=msg)
def get_export_locations(self, share, cephfs_volume):
def get_export_locations(self, share, subvolume_path):
export_locations = []
for export_ip in self.export_ips:
export_path = "{server_address}:{mount_path}".format(
server_address=driver_helpers.escaped_address(export_ip),
mount_path=cephfs_volume['mount_path'])
mount_path=subvolume_path)
LOG.info("Calculated export path for share %(id)s: %(epath)s",
{"id": share['id'], "epath": export_path})
@ -609,9 +845,21 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
def _fsal_hook(self, base, share, access):
"""Callback to create FSAL subblock."""
ceph_auth_id = ''.join(['ganesha-', share['id']])
auth_result = self.ceph_vol_client.authorize(
cephfs_share_path(share), ceph_auth_id, readonly=False,
tenant_id=share['project_id'])
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"auth_id": ceph_auth_id,
"access_level": "rw",
"tenant_id": share["project_id"],
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
auth_result = rados_command(
self.rados_client, "fs subvolume authorize", argdict)
# Restrict Ganesha server's access to only the CephFS subtree or path,
# corresponding to the manila share, that is to be exported by making
# Ganesha use Ceph auth IDs with path restricted capabilities to
@ -619,31 +867,49 @@ class NFSProtocolHelper(ganesha.GaneshaNASHelper2):
return {
'Name': 'Ceph',
'User_Id': ceph_auth_id,
'Secret_Access_Key': auth_result['auth_key']
'Secret_Access_Key': auth_result
}
def _cleanup_fsal_hook(self, base, share, access):
"""Callback for FSAL specific cleanup after removing an export."""
ceph_auth_id = ''.join(['ganesha-', share['id']])
self.ceph_vol_client.deauthorize(cephfs_share_path(share),
ceph_auth_id)
argdict = {
"vol_name": self.volname,
"sub_name": share["id"],
"auth_id": ceph_auth_id,
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
rados_command(self.rados_client, "fs subvolume deauthorize", argdict)
def _get_export_path(self, share):
"""Callback to provide export path."""
volume_path = cephfs_share_path(share)
return self.ceph_vol_client._get_path(volume_path)
argdict = {
"vol_name": self.volname,
"sub_name": share["id"]
}
if share["share_group_id"] is not None:
argdict.update({"group_name": share["share_group_id"]})
path = rados_command(
self.rados_client, "fs subvolume getpath", argdict)
return path
def _get_export_pseudo_path(self, share):
"""Callback to provide pseudo path."""
volume_path = cephfs_share_path(share)
return self.ceph_vol_client._get_path(volume_path)
return self._get_export_path(share)
def get_configured_ip_versions(self):
if not self.configured_ip_versions:
try:
for export_ip in self.export_ips:
self.configured_ip_versions.add(
ipaddress.ip_address(six.text_type(export_ip)).version)
ipaddress.ip_address(str(export_ip)).version)
except Exception:
# export_ips contained a hostname, safest thing is to
# claim support for IPv4 and IPv6 address families

View File

@ -20,7 +20,6 @@ import re
from oslo_config import cfg
from oslo_log import log
import six
from manila.common import constants
from manila import exception
@ -32,8 +31,7 @@ CONF = cfg.CONF
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class NASHelperBase(object):
class NASHelperBase(object, metaclass=abc.ABCMeta):
"""Interface to work with share."""
# drivers that use a helper derived from this class
@ -184,7 +182,7 @@ class GaneshaNASHelper2(GaneshaNASHelper):
def __init__(self, execute, config, tag='<no name>', **kwargs):
super(GaneshaNASHelper2, self).__init__(execute, config, **kwargs)
if self.configuration.ganesha_rados_store_enable:
self.ceph_vol_client = kwargs.pop('ceph_vol_client')
self.rados_client = kwargs.pop('rados_client')
def init_helper(self):
"""Initializes protocol-specific NAS drivers."""
@ -206,8 +204,7 @@ class GaneshaNASHelper2(GaneshaNASHelper):
self.configuration.ganesha_rados_export_index)
kwargs['ganesha_rados_export_counter'] = (
self.configuration.ganesha_rados_export_counter)
kwargs['ceph_vol_client'] = (
self.ceph_vol_client)
kwargs['rados_client'] = self.rados_client
else:
kwargs['ganesha_db_path'] = self.configuration.ganesha_db_path
self.ganesha = ganesha_manager.GaneshaManager(

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import io
import os
import pipes
import re
@ -21,7 +22,6 @@ import sys
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import importutils
import six
from manila import exception
from manila.i18n import _
@ -36,7 +36,7 @@ def _conf2json(conf):
"""Convert Ganesha config to JSON."""
# tokenize config string
token_list = [six.StringIO()]
token_list = [io.StringIO()]
state = {
'in_quote': False,
'in_comment': False,
@ -49,7 +49,7 @@ def _conf2json(conf):
if not state['escape']:
if char == '"':
state['in_quote'] = False
cbk.append(lambda: token_list.append(six.StringIO()))
cbk.append(lambda: token_list.append(io.StringIO()))
elif char == '\\':
cbk.append(lambda: state.update({'escape': True}))
else:
@ -60,7 +60,7 @@ def _conf2json(conf):
state['in_comment'] = False
else:
if char == '"':
token_list.append(six.StringIO())
token_list.append(io.StringIO())
state['in_quote'] = True
state['escape'] = False
if not state['in_comment']:
@ -200,7 +200,7 @@ def parseconf(conf):
def mkconf(confdict):
"""Create Ganesha config string from confdict."""
s = six.StringIO()
s = io.StringIO()
_dump_to_conf(confdict, s)
return s.getvalue()
@ -255,13 +255,12 @@ class GaneshaManager(object):
kwargs['ganesha_rados_export_counter'])
self.ganesha_rados_export_index = (
kwargs['ganesha_rados_export_index'])
self.ceph_vol_client = (
kwargs['ceph_vol_client'])
self.rados_client = kwargs['rados_client']
try:
self._get_rados_object(self.ganesha_rados_export_counter)
except rados.ObjectNotFound:
self._put_rados_object(self.ganesha_rados_export_counter,
six.text_type(1000))
str(1000))
else:
self.ganesha_db_path = kwargs['ganesha_db_path']
self.execute('mkdir', '-p', os.path.dirname(self.ganesha_db_path))
@ -385,7 +384,7 @@ class GaneshaManager(object):
for k, v in ganesha_utils.walk(confdict):
# values in the export block template that need to be
# filled in by Manila are pre-fixed by '@'
if isinstance(v, six.string_types) and v[0] == '@':
if isinstance(v, str) and v[0] == '@':
msg = _("Incomplete export block: value %(val)s of attribute "
"%(key)s is a stub.") % {'key': k, 'val': v}
raise exception.InvalidParameterValue(err=msg)
@ -524,22 +523,78 @@ class GaneshaManager(object):
self._rm_export_file(name)
self._mkindex()
def _get_rados_object(self, obj_name):
"""Get data stored in Ceph RADOS object as a text string."""
return self.ceph_vol_client.get_object(
self.ganesha_rados_store_pool_name, obj_name).decode('utf-8')
def _get_rados_object(self, object_name):
"""Synchronously read data from Ceph RADOS object as a text string.
def _put_rados_object(self, obj_name, data):
"""Put data as a byte string in a Ceph RADOS object."""
return self.ceph_vol_client.put_object(
self.ganesha_rados_store_pool_name,
obj_name,
data.encode('utf-8'))
:param pool_name: name of the pool
:type pool_name: str
:param object_name: name of the object
:type object_name: str
:returns: tuple of object data and version
"""
def _delete_rados_object(self, obj_name):
return self.ceph_vol_client.delete_object(
self.ganesha_rados_store_pool_name,
obj_name)
pool_name = self.ganesha_rados_store_pool_name
ioctx = self.rados_client.open_ioctx(pool_name)
osd_max_write_size = self.rados_client.conf_get('osd_max_write_size')
max_size = int(osd_max_write_size) * 1024 * 1024
try:
bytes_read = ioctx.read(object_name, max_size)
if ((len(bytes_read) == max_size) and
(ioctx.read(object_name, 1, offset=max_size))):
LOG.warning("Size of object {0} exceeds '{1}' bytes "
"read".format(object_name, max_size))
finally:
ioctx.close()
bytes_read_decoded = bytes_read.decode('utf-8')
return bytes_read_decoded
def _put_rados_object(self, object_name, data):
"""Synchronously write data as a byte string in a Ceph RADOS object.
:param pool_name: name of the pool
:type pool_name: str
:param object_name: name of the object
:type object_name: str
:param data: data to write
:type data: bytes
"""
pool_name = self.ganesha_rados_store_pool_name
encoded_data = data.encode('utf-8')
ioctx = self.rados_client.open_ioctx(pool_name)
max_size = int(
self.rados_client.conf_get('osd_max_write_size')) * 1024 * 1024
if len(encoded_data) > max_size:
msg = ("Data to be written to object '{0}' exceeds "
"{1} bytes".format(object_name, max_size))
LOG.error(msg)
raise exception.ShareBackendException(msg)
try:
with rados.WriteOpCtx() as wop:
wop.write_full(encoded_data)
ioctx.operate_write_op(wop, object_name)
except rados.OSError as e:
LOG.error(e)
raise e
finally:
ioctx.close()
def _delete_rados_object(self, object_name):
pool_name = self.ganesha_rados_store_pool_name
ioctx = self.rados_client.open_ioctx(pool_name)
try:
ioctx.remove_object(object_name)
except rados.ObjectNotFound:
LOG.warning("Object '{0}' was already removed".format(object_name))
finally:
ioctx.close()
def get_export_id(self, bump=True):
"""Get a new export id."""

File diff suppressed because it is too large Load Diff

View File

@ -14,12 +14,12 @@
# under the License.
import copy
import io
import re
from unittest import mock
import ddt
from oslo_serialization import jsonutils
import six
from manila import exception
from manila.share.drivers.ganesha import manager
@ -67,12 +67,26 @@ manager_fake_kwargs = {
}
class MockRadosClientModule(object):
"""Mocked up version of Ceph's RADOS client interface."""
class MockRadosModule(object):
"""Mocked up version of Ceph's RADOS module."""
class ObjectNotFound(Exception):
pass
class OSError(Exception):
pass
class WriteOpCtx():
def __enter__(self):
return self
def __exit__(self, type, msg, traceback):
pass
def write_full(self, bytes_to_write):
pass
@ddt.ddt
class MiscTests(test.TestCase):
@ -167,7 +181,7 @@ class GaneshaConfigTests(test.TestCase):
self.assertEqual(test_dict_unicode, ret)
def test_dump_to_conf(self):
ganesha_cnf = six.StringIO()
ganesha_cnf = io.StringIO()
manager._dump_to_conf(test_dict_str, ganesha_cnf)
self.assertEqual(*self.conf_mangle(self.ref_ganesha_cnf,
ganesha_cnf.getvalue()))
@ -200,12 +214,14 @@ class GaneshaManagerTestCase(test.TestCase):
def setUp(self):
super(GaneshaManagerTestCase, self).setUp()
self._execute = mock.Mock(return_value=('', ''))
self._rados_client = mock.Mock()
self._manager = self.instantiate_ganesha_manager(
self._execute, 'faketag', **manager_fake_kwargs)
self._ceph_vol_client = mock.Mock()
self._execute, 'faketag',
rados_client=self._rados_client,
**manager_fake_kwargs)
self._setup_rados = mock.Mock()
self._execute2 = mock.Mock(return_value=('', ''))
self.mock_object(manager, 'rados', MockRadosClientModule)
self.mock_object(manager, 'rados', MockRadosModule)
self.mock_object(manager, 'setup_rados', self._setup_rados)
fake_kwargs = copy.copy(manager_fake_kwargs)
fake_kwargs.update(
@ -213,7 +229,7 @@ class GaneshaManagerTestCase(test.TestCase):
ganesha_rados_store_pool_name='fakepool',
ganesha_rados_export_counter='fakecounter',
ganesha_rados_export_index='fakeindex',
ceph_vol_client=self._ceph_vol_client
rados_client=self._rados_client
)
self._manager_with_rados_store = self.instantiate_ganesha_manager(
self._execute2, 'faketag', **fake_kwargs)
@ -285,7 +301,7 @@ class GaneshaManagerTestCase(test.TestCase):
ganesha_rados_store_pool_name='fakepool',
ganesha_rados_export_counter='fakecounter',
ganesha_rados_export_index='fakeindex',
ceph_vol_client=self._ceph_vol_client
rados_client=self._rados_client
)
if counter_exists:
self.mock_object(
@ -293,7 +309,7 @@ class GaneshaManagerTestCase(test.TestCase):
else:
self.mock_object(
manager.GaneshaManager, '_get_rados_object',
mock.Mock(side_effect=MockRadosClientModule.ObjectNotFound))
mock.Mock(side_effect=MockRadosModule.ObjectNotFound))
self.mock_object(manager.GaneshaManager, '_put_rados_object')
test_mgr = manager.GaneshaManager(
@ -309,14 +325,14 @@ class GaneshaManagerTestCase(test.TestCase):
self.assertEqual('fakepool', test_mgr.ganesha_rados_store_pool_name)
self.assertEqual('fakecounter', test_mgr.ganesha_rados_export_counter)
self.assertEqual('fakeindex', test_mgr.ganesha_rados_export_index)
self.assertEqual(self._ceph_vol_client, test_mgr.ceph_vol_client)
self.assertEqual(self._rados_client, test_mgr.rados_client)
self._setup_rados.assert_called_with()
test_mgr._get_rados_object.assert_called_once_with('fakecounter')
if counter_exists:
self.assertFalse(test_mgr._put_rados_object.called)
else:
test_mgr._put_rados_object.assert_called_once_with(
'fakecounter', six.text_type(1000))
'fakecounter', str(1000))
def test_ganesha_export_dir(self):
self.assertEqual(
@ -478,7 +494,7 @@ class GaneshaManagerTestCase(test.TestCase):
else:
self.mock_object(
self._manager_with_rados_store, '_get_rados_object',
mock.Mock(side_effect=MockRadosClientModule.ObjectNotFound))
mock.Mock(side_effect=MockRadosModule.ObjectNotFound))
ret = self._manager_with_rados_store._check_export_rados_object_exists(
test_name)
@ -1021,36 +1037,58 @@ class GaneshaManagerTestCase(test.TestCase):
self._manager._remove_rados_object_url_from_index.called)
def test_get_rados_object(self):
fakebin = six.unichr(246).encode('utf-8')
self.mock_object(self._ceph_vol_client, 'get_object',
mock.Mock(return_value=fakebin))
fakebin = chr(246).encode('utf-8')
ioctx = mock.Mock()
ioctx.read.side_effect = [fakebin, fakebin]
self._rados_client.open_ioctx = mock.Mock(return_value=ioctx)
self._rados_client.conf_get = mock.Mock(return_value=256)
max_size = 256 * 1024 * 1024
ret = self._manager_with_rados_store._get_rados_object('fakeobj')
self._ceph_vol_client.get_object.assert_called_once_with(
'fakepool', 'fakeobj')
self._rados_client.open_ioctx.assert_called_once_with('fakepool')
self._rados_client.conf_get.assert_called_once_with(
'osd_max_write_size')
ioctx.read.assert_called_once_with('fakeobj', max_size)
ioctx.close.assert_called_once()
self.assertEqual(fakebin.decode('utf-8'), ret)
def test_put_rados_object(self):
faketext = six.unichr(246)
self.mock_object(self._ceph_vol_client, 'put_object',
mock.Mock(return_value=None))
faketext = chr(246)
ioctx = mock.Mock()
manager.rados.WriteOpCtx.write_full = mock.Mock()
self._rados_client.open_ioctx = mock.Mock(return_value=ioctx)
self._rados_client.conf_get = mock.Mock(return_value=256)
ret = self._manager_with_rados_store._put_rados_object(
'fakeobj', faketext)
self._ceph_vol_client.put_object.assert_called_once_with(
'fakepool', 'fakeobj', faketext.encode('utf-8'))
self._rados_client.open_ioctx.assert_called_once_with('fakepool')
self._rados_client.conf_get.assert_called_once_with(
'osd_max_write_size')
manager.rados.WriteOpCtx.write_full.assert_called_once_with(
faketext.encode('utf-8'))
ioctx.operate_write_op.assert_called_once_with(mock.ANY, 'fakeobj')
self.assertIsNone(ret)
def test_delete_rados_object(self):
self.mock_object(self._ceph_vol_client, 'delete_object',
mock.Mock(return_value=None))
ioctx = mock.Mock()
self._rados_client.open_ioctx = mock.Mock(return_value=ioctx)
ret = self._manager_with_rados_store._delete_rados_object('fakeobj')
self._ceph_vol_client.delete_object.assert_called_once_with(
'fakepool', 'fakeobj')
self._rados_client.open_ioctx.assert_called_once_with('fakepool')
ioctx.remove_object.assert_called_once_with('fakeobj')
ioctx.close.assert_called_once()
self.assertIsNone(ret)
def test_get_export_id(self):

View File

@ -351,12 +351,12 @@ class GaneshaNASHelper2TestCase(test.TestCase):
self._context = context.get_admin_context()
self._execute = mock.Mock(return_value=('', ''))
self.ceph_vol_client = mock.Mock()
self.rados_client = mock.Mock()
self.fake_conf = config.Configuration(None)
self.fake_conf_dir_path = '/fakedir0/exports.d'
self._helper = ganesha.GaneshaNASHelper2(
self._execute, self.fake_conf, tag='faketag',
ceph_vol_client=self.ceph_vol_client)
rados_client=self.rados_client)
self._helper.ganesha = mock.Mock()
self._helper.export_template = {}
self.share = fake_share.fake_share()
@ -387,7 +387,7 @@ class GaneshaNASHelper2TestCase(test.TestCase):
'ganesha_rados_store_pool_name': 'ceph_pool',
'ganesha_rados_export_index': 'fake_index',
'ganesha_rados_export_counter': 'fake_counter',
'ceph_vol_client': self.ceph_vol_client
'rados_client': self.rados_client
}
else:
kwargs = {
@ -431,7 +431,7 @@ class GaneshaNASHelper2TestCase(test.TestCase):
ganesha_rados_store_pool_name='ceph_pool',
ganesha_rados_export_index='fake_index',
ganesha_rados_export_counter='fake_counter',
ceph_vol_client=self.ceph_vol_client)
rados_client=self.rados_client)
self._helper._load_conf_dir.assert_called_once_with(
'/fakedir2/faketempl.d', must_exist=False)
self.assertEqual(mock_ganesha_manager, self._helper.ganesha)

View File

@ -0,0 +1,36 @@
---
deprecations:
- |
As of the Wallaby release the CephFS driver no longer recognizes
the scoped extra-spec ``cephfs:data_isolated`` because it is no
longer supported by the Ceph community. This style of data isolation
required dedicating a Ceph pool for each share and scaled and performed
poorly.
- |
The ``ceph_volume_client`` is deprecated by the CephFS driver in favor of
a python rados client that connects to the Ceph manager daemon to interact
with the Ceph cluster. This new connection method will enable functionality
not available with older client, which has been deprecated by the Ceph
community and will be removed in the Quincy release.
upgrade:
- |
Manila's CephFS drivers now **require** the "python3-ceph-argparse" and
"python3-rados" packages. Do not upgrade without adding these packages
to the environment where the ``manila-share`` service runs since
without them the driver will refuse to start up. This breaking change
is necessary because the old ``ceph_volume_client`` has been deprecated
by the Ceph community.
features:
- |
The Ceph backend can now work with multiple filesystem clusters.
The filesystem to be used by manila can be specified by the
driver option 'cephfs_filesystem_name'. If this option is not specified,
the driver will assume that a single filesystem is present in the Ceph
cluster and will attempt to use it.
- |
Deletion of shares offerd by the CephFS driver (CephFS and NFS) is
now faster. Now the Ceph manager moves deleted share's content to a
trash folder and purges the contents asynchronously rather than
handling this as part of the synchronous delete operation. The purge
can take considerable time if a share contains a significant amount of data.