# Copyright (c) 2016 Red Hat, Inc. # All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import ipaddress import socket import sys from oslo_config import cfg from oslo_config import types from oslo_log import log from oslo_utils import units import six from manila.common import constants from manila import exception from manila.i18n import _ from manila.message import api as message_api from manila.message import message_field from manila.share import driver from manila.share.drivers import ganesha from manila.share.drivers.ganesha import utils as ganesha_utils from manila.share.drivers import helpers as driver_helpers from manila.share import share_types try: import ceph_volume_client ceph_module_found = True except ImportError: ceph_volume_client = None ceph_module_found = False CEPHX_ACCESS_TYPE = "cephx" # The default Ceph administrative identity CEPH_DEFAULT_AUTH_ID = "admin" DEFAULT_VOLUME_MODE = '755' LOG = log.getLogger(__name__) cephfs_opts = [ cfg.StrOpt('cephfs_conf_path', default="", help="Fully qualified path to the ceph.conf file."), cfg.StrOpt('cephfs_cluster_name', help="The name of the cluster in use, if it is not " "the default ('ceph')." ), cfg.StrOpt('cephfs_auth_id', default="manila", help="The name of the ceph auth identity to use." ), cfg.StrOpt('cephfs_volume_path_prefix', default="/volumes", help="The prefix of the cephfs volume path." ), cfg.BoolOpt('cephfs_enable_snapshots', default=False, help="Whether to enable snapshots in this driver." ), cfg.StrOpt('cephfs_protocol_helper_type', default="CEPHFS", choices=['CEPHFS', 'NFS'], ignore_case=True, help="The type of protocol helper to use. Default is " "CEPHFS." ), cfg.BoolOpt('cephfs_ganesha_server_is_remote', default=False, help="Whether the NFS-Ganesha server is remote to the driver." ), cfg.HostAddressOpt('cephfs_ganesha_server_ip', help="The IP address of the NFS-Ganesha server."), cfg.StrOpt('cephfs_ganesha_server_username', default='root', help="The username to authenticate as in the remote " "NFS-Ganesha server host."), cfg.StrOpt('cephfs_ganesha_path_to_private_key', help="The path of the driver host's private SSH key file."), cfg.StrOpt('cephfs_ganesha_server_password', secret=True, help="The password to authenticate as the user in the remote " "Ganesha server host. This is not required if " "'cephfs_ganesha_path_to_private_key' is configured."), cfg.ListOpt('cephfs_ganesha_export_ips', default='', help="List of IPs to export shares. If not supplied, " "then the value of 'cephfs_ganesha_server_ip' " "will be used to construct share export locations."), cfg.StrOpt('cephfs_volume_mode', default=DEFAULT_VOLUME_MODE, help="The read/write/execute permissions mode for CephFS " "volumes, snapshots, and snapshot groups expressed in " "Octal as with linux 'chmod' or 'umask' commands."), ] CONF = cfg.CONF CONF.register_opts(cephfs_opts) def cephfs_share_path(share): """Get VolumePath from Share.""" return ceph_volume_client.VolumePath( share['share_group_id'], share['id']) class CephFSDriver(driver.ExecuteMixin, driver.GaneshaMixin, driver.ShareDriver): """Driver for the Ceph Filesystem.""" def __init__(self, *args, **kwargs): super(CephFSDriver, self).__init__(False, *args, **kwargs) self.backend_name = self.configuration.safe_get( 'share_backend_name') or 'CephFS' self._volume_client = None self.configuration.append_config_values(cephfs_opts) try: self._cephfs_volume_mode = int( self.configuration.cephfs_volume_mode, 8) except ValueError: msg = _("Invalid CephFS volume mode %s") raise exception.BadConfigurationException( msg % self.configuration.cephfs_volume_mode) self.ipv6_implemented = True def do_setup(self, context): if self.configuration.cephfs_protocol_helper_type.upper() == "CEPHFS": protocol_helper_class = getattr( sys.modules[__name__], 'NativeProtocolHelper') else: protocol_helper_class = getattr( sys.modules[__name__], 'NFSProtocolHelper') self.protocol_helper = protocol_helper_class( self._execute, self.configuration, ceph_vol_client=self.volume_client) self.protocol_helper.init_helper() def check_for_setup_error(self): """Returns an error if prerequisites aren't met.""" self.protocol_helper.check_for_setup_error() def _update_share_stats(self): stats = self.volume_client.rados.get_cluster_stats() total_capacity_gb = stats['kb'] * units.Mi free_capacity_gb = stats['kb_avail'] * units.Mi data = { 'vendor_name': 'Ceph', 'driver_version': '1.0', 'share_backend_name': self.backend_name, 'storage_protocol': self.configuration.safe_get( 'cephfs_protocol_helper_type'), 'pools': [ { 'pool_name': 'cephfs', 'total_capacity_gb': total_capacity_gb, 'free_capacity_gb': free_capacity_gb, 'qos': 'False', 'reserved_percentage': 0, 'dedupe': [False], 'compression': [False], 'thin_provisioning': [False] } ], 'total_capacity_gb': total_capacity_gb, 'free_capacity_gb': free_capacity_gb, 'snapshot_support': self.configuration.safe_get( 'cephfs_enable_snapshots'), } super( # pylint: disable=no-member CephFSDriver, self)._update_share_stats(data) def _to_bytes(self, gigs): """Convert a Manila size into bytes. Manila uses gibibytes everywhere. :param gigs: integer number of gibibytes. :return: integer number of bytes. """ return gigs * units.Gi @property def volume_client(self): if self._volume_client: return self._volume_client if not ceph_module_found: raise exception.ManilaException( _("Ceph client libraries not found.") ) conf_path = self.configuration.safe_get('cephfs_conf_path') cluster_name = self.configuration.safe_get('cephfs_cluster_name') auth_id = self.configuration.safe_get('cephfs_auth_id') volume_prefix = self.configuration.safe_get( 'cephfs_volume_path_prefix') self._volume_client = ceph_volume_client.CephFSVolumeClient( auth_id, conf_path, cluster_name, volume_prefix=volume_prefix) LOG.info("[%(be)s}] Ceph client found, connecting...", {"be": self.backend_name}) if auth_id != CEPH_DEFAULT_AUTH_ID: # Evict any other manila sessions. Only do this if we're # using a client ID that isn't the default admin ID, to avoid # rudely disrupting anyone else. premount_evict = auth_id else: premount_evict = None try: self._volume_client.connect(premount_evict=premount_evict) except Exception: self._volume_client = None raise else: LOG.info("[%(be)s] Ceph client connection complete.", {"be": self.backend_name}) return self._volume_client def create_share(self, context, share, share_server=None): """Create a CephFS volume. :param context: A RequestContext. :param share: A Share. :param share_server: Always None for CephFS native. :return: The export locations dictionary. """ requested_proto = share['share_proto'].upper() supported_proto = ( self.configuration.cephfs_protocol_helper_type.upper()) if (requested_proto != supported_proto): msg = _("Share protocol %s is not supported.") % requested_proto raise exception.ShareBackendException(msg=msg) # `share` is a Share msg = _("create_share {be} name={id} size={size}" " share_group_id={group}") LOG.debug(msg.format( be=self.backend_name, id=share['id'], size=share['size'], group=share['share_group_id'])) extra_specs = share_types.get_extra_specs_from_share(share) data_isolated = extra_specs.get("cephfs:data_isolated", False) size = self._to_bytes(share['size']) # Create the CephFS volume cephfs_volume = self.volume_client.create_volume( cephfs_share_path(share), size=size, data_isolated=data_isolated, mode=self._cephfs_volume_mode) return self.protocol_helper.get_export_locations(share, cephfs_volume) def delete_share(self, context, share, share_server=None): extra_specs = share_types.get_extra_specs_from_share(share) data_isolated = extra_specs.get("cephfs:data_isolated", False) self.volume_client.delete_volume(cephfs_share_path(share), data_isolated=data_isolated) self.volume_client.purge_volume(cephfs_share_path(share), data_isolated=data_isolated) def update_access(self, context, share, access_rules, add_rules, delete_rules, share_server=None): return self.protocol_helper.update_access( context, share, access_rules, add_rules, delete_rules, share_server=share_server) def ensure_share(self, context, share, share_server=None): # Creation is idempotent return self.create_share(context, share, share_server) def extend_share(self, share, new_size, share_server=None): LOG.debug("extend_share {id} {size}".format( id=share['id'], size=new_size)) self.volume_client.set_max_bytes(cephfs_share_path(share), self._to_bytes(new_size)) def shrink_share(self, share, new_size, share_server=None): LOG.debug("shrink_share {id} {size}".format( id=share['id'], size=new_size)) new_bytes = self._to_bytes(new_size) used = self.volume_client.get_used_bytes(cephfs_share_path(share)) if used > new_bytes: # While in fact we can "shrink" our volumes to less than their # used bytes (it's just a quota), raise error anyway to avoid # confusing API consumers that might depend on typical shrink # behaviour. raise exception.ShareShrinkingPossibleDataLoss( share_id=share['id']) self.volume_client.set_max_bytes(cephfs_share_path(share), new_bytes) def create_snapshot(self, context, snapshot, share_server=None): self.volume_client.create_snapshot_volume( cephfs_share_path(snapshot['share']), '_'.join([snapshot['snapshot_id'], snapshot['id']]), mode=self._cephfs_volume_mode) def delete_snapshot(self, context, snapshot, share_server=None): self.volume_client.destroy_snapshot_volume( cephfs_share_path(snapshot['share']), '_'.join([snapshot['snapshot_id'], snapshot['id']])) def create_share_group(self, context, sg_dict, share_server=None): self.volume_client.create_group(sg_dict['id'], mode=self._cephfs_volume_mode) def delete_share_group(self, context, sg_dict, share_server=None): self.volume_client.destroy_group(sg_dict['id']) def delete_share_group_snapshot(self, context, snap_dict, share_server=None): self.volume_client.destroy_snapshot_group( snap_dict['share_group_id'], snap_dict['id']) return None, [] def create_share_group_snapshot(self, context, snap_dict, share_server=None): self.volume_client.create_snapshot_group( snap_dict['share_group_id'], snap_dict['id'], mode=self._cephfs_volume_mode) return None, [] def __del__(self): if self._volume_client: self._volume_client.disconnect() self._volume_client = None def get_configured_ip_versions(self): return self.protocol_helper.get_configured_ip_versions() class NativeProtocolHelper(ganesha.NASHelperBase): """Helper class for native CephFS protocol""" supported_access_types = (CEPHX_ACCESS_TYPE, ) supported_access_levels = (constants.ACCESS_LEVEL_RW, constants.ACCESS_LEVEL_RO) def __init__(self, execute, config, **kwargs): self.volume_client = kwargs.pop('ceph_vol_client') self.message_api = message_api.API() super(NativeProtocolHelper, self).__init__(execute, config, **kwargs) def _init_helper(self): pass def check_for_setup_error(self): """Returns an error if prerequisites aren't met.""" return def get_export_locations(self, share, cephfs_volume): # To mount this you need to know the mon IPs and the path to the volume mon_addrs = self.volume_client.get_mon_addrs() export_location = "{addrs}:{path}".format( addrs=",".join(mon_addrs), path=cephfs_volume['mount_path']) LOG.info("Calculated export location for share %(id)s: %(loc)s", {"id": share['id'], "loc": export_location}) return { 'path': export_location, 'is_admin_only': False, 'metadata': {}, } def _allow_access(self, context, share, access, share_server=None): if access['access_type'] != CEPHX_ACCESS_TYPE: raise exception.InvalidShareAccessType(type=access['access_type']) ceph_auth_id = access['access_to'] # We need to check here rather than the API or Manila Client to see # if the ceph_auth_id is the same as the one specified for Manila's # usage. This is due to the fact that the API and the Manila client # cannot read the contents of the Manila configuration file. If it # is the same, we need to error out. if ceph_auth_id == CONF.cephfs_auth_id: error_message = (_('Ceph authentication ID %s must be different ' 'than the one the Manila service uses.') % ceph_auth_id) raise exception.InvalidShareAccess(reason=error_message) if not getattr(self.volume_client, 'version', None): if access['access_level'] == constants.ACCESS_LEVEL_RO: LOG.error("Need python-cephfs package version 10.2.3 or " "greater to enable read-only access.") raise exception.InvalidShareAccessLevel( level=constants.ACCESS_LEVEL_RO) auth_result = self.volume_client.authorize( cephfs_share_path(share), ceph_auth_id) else: readonly = access['access_level'] == constants.ACCESS_LEVEL_RO try: auth_result = self.volume_client.authorize( cephfs_share_path(share), ceph_auth_id, readonly=readonly, tenant_id=share['project_id']) except Exception as e: if 'not allowed' in six.text_type(e).lower(): msg = ("Access to client %(client)s is not allowed. " "Reason: %(reason)s") msg_payload = {'client': ceph_auth_id, 'reason': e} raise exception.InvalidShareAccess( reason=msg % msg_payload) raise return auth_result['auth_key'] def _deny_access(self, context, share, access, share_server=None): if access['access_type'] != CEPHX_ACCESS_TYPE: LOG.warning("Invalid access type '%(type)s', " "ignoring in deny.", {"type": access['access_type']}) return self.volume_client.deauthorize(cephfs_share_path(share), access['access_to']) self.volume_client.evict( access['access_to'], volume_path=cephfs_share_path(share)) def update_access(self, context, share, access_rules, add_rules, delete_rules, share_server=None): access_updates = {} if not (add_rules or delete_rules): # recovery/maintenance mode add_rules = access_rules existing_auths = None # The unversioned volume client cannot fetch from the Ceph backend, # the list of auth IDs that have share access. if getattr(self.volume_client, 'version', None): existing_auths = self.volume_client.get_authorized_ids( cephfs_share_path(share)) if existing_auths: existing_auth_ids = set( [auth[0] for auth in existing_auths]) want_auth_ids = set( [rule['access_to'] for rule in add_rules]) delete_auth_ids = existing_auth_ids.difference( want_auth_ids) for delete_auth_id in delete_auth_ids: delete_rules.append( { 'access_to': delete_auth_id, 'access_type': CEPHX_ACCESS_TYPE, }) # During recovery mode, re-authorize share access for auth IDs that # were already granted access by the backend. Do this to fetch their # access keys and ensure that after recovery, manila and the Ceph # backend are in sync. for rule in add_rules: try: access_key = self._allow_access(context, share, rule) except (exception.InvalidShareAccessLevel, exception.InvalidShareAccessType): self.message_api.create( context, message_field.Action.UPDATE_ACCESS_RULES, share['project_id'], resource_type=message_field.Resource.SHARE, resource_id=share['share_id'], detail=message_field.Detail.UNSUPPORTED_CLIENT_ACCESS) log_args = {'id': rule['access_id'], 'access_level': rule['access_level'], 'access_to': rule['access_to']} LOG.exception("Failed to provide %(access_level)s access to " "%(access_to)s (Rule ID: %(id)s). Setting rule " "to 'error' state.", log_args) access_updates.update({rule['access_id']: {'state': 'error'}}) except exception.InvalidShareAccess: self.message_api.create( context, message_field.Action.UPDATE_ACCESS_RULES, share['project_id'], resource_type=message_field.Resource.SHARE, resource_id=share['share_id'], detail=message_field.Detail.FORBIDDEN_CLIENT_ACCESS) log_args = {'id': rule['access_id'], 'access_level': rule['access_level'], 'access_to': rule['access_to']} LOG.exception("Failed to provide %(access_level)s access to " "%(access_to)s (Rule ID: %(id)s). Setting rule " "to 'error' state.", log_args) access_updates.update({rule['access_id']: {'state': 'error'}}) else: access_updates.update({ rule['access_id']: {'access_key': access_key}, }) for rule in delete_rules: self._deny_access(context, share, rule) return access_updates def get_configured_ip_versions(self): return [4] class NFSProtocolHelper(ganesha.GaneshaNASHelper2): shared_data = {} supported_protocols = ('NFS',) def __init__(self, execute, config_object, **kwargs): if config_object.cephfs_ganesha_server_is_remote: execute = ganesha_utils.SSHExecutor( config_object.cephfs_ganesha_server_ip, 22, None, config_object.cephfs_ganesha_server_username, password=config_object.cephfs_ganesha_server_password, privatekey=config_object.cephfs_ganesha_path_to_private_key) else: execute = ganesha_utils.RootExecutor(execute) self.ganesha_host = config_object.cephfs_ganesha_server_ip if not self.ganesha_host: self.ganesha_host = socket.gethostname() LOG.info("NFS-Ganesha server's location defaulted to driver's " "hostname: %s", self.ganesha_host) super(NFSProtocolHelper, self).__init__(execute, config_object, **kwargs) if not hasattr(self, 'ceph_vol_client'): self.ceph_vol_client = kwargs.pop('ceph_vol_client') self.export_ips = config_object.cephfs_ganesha_export_ips if not self.export_ips: self.export_ips = [self.ganesha_host] self.configured_ip_versions = set() self.config = config_object def check_for_setup_error(self): """Returns an error if prerequisites aren't met.""" host_address_obj = types.HostAddress() for export_ip in self.config.cephfs_ganesha_export_ips: try: host_address_obj(export_ip) except ValueError: msg = (_("Invalid list member of 'cephfs_ganesha_export_ips' " "option supplied %s -- not a valid IP address or " "hostname.") % export_ip) raise exception.InvalidParameterValue(err=msg) def get_export_locations(self, share, cephfs_volume): export_locations = [] for export_ip in self.export_ips: export_path = "{server_address}:{mount_path}".format( server_address=driver_helpers.escaped_address(export_ip), mount_path=cephfs_volume['mount_path']) LOG.info("Calculated export path for share %(id)s: %(epath)s", {"id": share['id'], "epath": export_path}) export_location = { 'path': export_path, 'is_admin_only': False, 'metadata': {}, } export_locations.append(export_location) return export_locations def _default_config_hook(self): """Callback to provide default export block.""" dconf = super(NFSProtocolHelper, self)._default_config_hook() conf_dir = ganesha_utils.path_from(__file__, "conf") ganesha_utils.patch(dconf, self._load_conf_dir(conf_dir)) return dconf def _fsal_hook(self, base, share, access): """Callback to create FSAL subblock.""" ceph_auth_id = ''.join(['ganesha-', share['id']]) auth_result = self.ceph_vol_client.authorize( cephfs_share_path(share), ceph_auth_id, readonly=False, tenant_id=share['project_id']) # Restrict Ganesha server's access to only the CephFS subtree or path, # corresponding to the manila share, that is to be exported by making # Ganesha use Ceph auth IDs with path restricted capabilities to # communicate with CephFS. return { 'Name': 'Ceph', 'User_Id': ceph_auth_id, 'Secret_Access_Key': auth_result['auth_key'] } def _cleanup_fsal_hook(self, base, share, access): """Callback for FSAL specific cleanup after removing an export.""" ceph_auth_id = ''.join(['ganesha-', share['id']]) self.ceph_vol_client.deauthorize(cephfs_share_path(share), ceph_auth_id) def _get_export_path(self, share): """Callback to provide export path.""" volume_path = cephfs_share_path(share) return self.ceph_vol_client._get_path(volume_path) def _get_export_pseudo_path(self, share): """Callback to provide pseudo path.""" volume_path = cephfs_share_path(share) return self.ceph_vol_client._get_path(volume_path) def get_configured_ip_versions(self): if not self.configured_ip_versions: try: for export_ip in self.export_ips: self.configured_ip_versions.add( ipaddress.ip_address(six.text_type(export_ip)).version) except Exception: # export_ips contained a hostname, safest thing is to # claim support for IPv4 and IPv6 address families LOG.warning("Setting configured IP versions to [4, 6] since " "a hostname (rather than IP address) was supplied " "in 'cephfs_ganesha_server_ip' or " "in 'cephfs_ganesha_export_ips'.") return [4, 6] return list(self.configured_ip_versions)