439 lines
19 KiB
Python
439 lines
19 KiB
Python
# Copyright (c) 2016, MapR Technologies
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
Share driver for MapR-FS distributed file system.
|
|
"""
|
|
import math
|
|
import os
|
|
|
|
from oslo_config import cfg
|
|
from oslo_log import log
|
|
from oslo_utils import strutils
|
|
from oslo_utils import units
|
|
|
|
from manila import context
|
|
from manila import exception
|
|
from manila.i18n import _
|
|
from manila.share import api
|
|
from manila.share import driver
|
|
|
|
from manila.share.drivers.maprfs import driver_util as mapru
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
maprfs_native_share_opts = [
|
|
cfg.ListOpt('maprfs_clinode_ip',
|
|
help='The list of IPs or hostnames of nodes where mapr-core '
|
|
'is installed.'),
|
|
cfg.PortOpt('maprfs_ssh_port',
|
|
default=22,
|
|
help='CLDB node SSH port.'),
|
|
cfg.StrOpt('maprfs_ssh_name',
|
|
default="mapr",
|
|
help='Cluster admin user ssh login name.'),
|
|
cfg.StrOpt('maprfs_ssh_pw',
|
|
help='Cluster node SSH login password, '
|
|
'This parameter is not necessary, if '
|
|
'\'maprfs_ssh_private_key\' is configured.'),
|
|
cfg.StrOpt('maprfs_ssh_private_key',
|
|
help='Path to SSH private '
|
|
'key for login.'),
|
|
cfg.StrOpt('maprfs_base_volume_dir',
|
|
default='/',
|
|
help='Path in MapRFS where share volumes must be created.'),
|
|
cfg.ListOpt('maprfs_zookeeper_ip',
|
|
help='The list of IPs or hostnames of ZooKeeper nodes.'),
|
|
cfg.ListOpt('maprfs_cldb_ip',
|
|
help='The list of IPs or hostnames of CLDB nodes.'),
|
|
cfg.BoolOpt('maprfs_rename_managed_volume',
|
|
default=True,
|
|
help='Specify whether existing volume should be renamed when'
|
|
' start managing.'),
|
|
]
|
|
|
|
CONF = cfg.CONF
|
|
CONF.register_opts(maprfs_native_share_opts)
|
|
|
|
|
|
class MapRFSNativeShareDriver(driver.ExecuteMixin, driver.ShareDriver):
|
|
"""MapR-FS Share Driver.
|
|
|
|
Executes commands relating to shares.
|
|
driver_handles_share_servers must be False because this driver does not
|
|
support creating or managing virtual storage servers (share servers)
|
|
API version history:
|
|
|
|
1.0 - Initial Version
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(MapRFSNativeShareDriver, self).__init__(False, *args, **kwargs)
|
|
self.configuration.append_config_values(maprfs_native_share_opts)
|
|
self.backend_name = self.configuration.safe_get(
|
|
'share_backend_name') or 'MapR-FS-Native'
|
|
self._base_volume_dir = self.configuration.safe_get(
|
|
'maprfs_base_volume_dir') or '/'
|
|
self._maprfs_util = None
|
|
self._maprfs_base_path = "maprfs://"
|
|
self.cldb_ip = self.configuration.maprfs_cldb_ip or []
|
|
self.zookeeper_ip = self.configuration.maprfs_zookeeper_ip or []
|
|
self.rename_volume = self.configuration.maprfs_rename_managed_volume
|
|
self.api = api.API()
|
|
|
|
def do_setup(self, context):
|
|
"""Do initialization while the share driver starts."""
|
|
super(MapRFSNativeShareDriver, self).do_setup(context)
|
|
self._maprfs_util = mapru.get_version_handler(self.configuration)
|
|
|
|
def _share_dir(self, share_name):
|
|
return os.path.join(self._base_volume_dir, share_name)
|
|
|
|
def _volume_name(self, share_name):
|
|
return share_name
|
|
|
|
def _get_share_path(self, share):
|
|
return share['export_location']
|
|
|
|
def _get_snapshot_path(self, snapshot):
|
|
share_dir = snapshot['share_instance']['export_location'].split(
|
|
' ')[0][len(self._maprfs_base_path):]
|
|
return os.path.join(share_dir, '.snapshot',
|
|
snapshot['provider_location'] or snapshot['name'])
|
|
|
|
def _get_volume_name(self, context, share):
|
|
metadata = self.api.get_share_metadata(context,
|
|
{'id': share['share_id']})
|
|
return metadata.get('_name', self._volume_name(share['name']))
|
|
|
|
def _get_share_export_locations(self, share, path=None):
|
|
"""Return share path on storage provider."""
|
|
cluster_name = self._maprfs_util.get_cluster_name()
|
|
path = '%(path)s -C %(cldb)s -Z %(zookeeper)s -N %(name)s' % {
|
|
'path': self._maprfs_base_path + (
|
|
path or self._share_dir(share['name'])),
|
|
'cldb': ' '.join(self.cldb_ip),
|
|
'zookeeper': ' '.join(self.zookeeper_ip),
|
|
'name': cluster_name
|
|
}
|
|
export_list = [{
|
|
"path": path,
|
|
"is_admin_only": False,
|
|
"metadata": {
|
|
"cldb": ','.join(self.cldb_ip),
|
|
"zookeeper": ','.join(self.zookeeper_ip),
|
|
"cluster-name": cluster_name,
|
|
},
|
|
}]
|
|
|
|
return export_list
|
|
|
|
def _create_share(self, share, metadata, context):
|
|
"""Creates a share."""
|
|
if share['share_proto'].lower() != 'maprfs':
|
|
msg = _('Only MapRFS protocol supported!')
|
|
LOG.error(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
options = {k[1:]: v for k, v in metadata.items() if k[0] == '_'}
|
|
share_dir = options.pop('path', self._share_dir(share['name']))
|
|
volume_name = options.pop('name', self._volume_name(share['name']))
|
|
try:
|
|
self._maprfs_util.create_volume(volume_name, share_dir,
|
|
share['size'],
|
|
**options)
|
|
# posix permissions should be 777, ACEs are used as a restriction
|
|
self._maprfs_util.maprfs_chmod(share_dir, '777')
|
|
except exception.ProcessExecutionError:
|
|
self.api.update_share_metadata(context,
|
|
{'id': share['share_id']},
|
|
{'_name': 'error'})
|
|
msg = (_('Failed to create volume in MapR-FS for the '
|
|
'share %(share_name)s.') % {'share_name': share['name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def _set_share_size(self, share, size):
|
|
volume_name = self._get_volume_name(context.get_admin_context(), share)
|
|
try:
|
|
if share['size'] > size:
|
|
info = self._maprfs_util.get_volume_info(volume_name)
|
|
used = info['totalused']
|
|
if int(used) >= int(size) * units.Ki:
|
|
raise exception.ShareShrinkingPossibleDataLoss(
|
|
share_id=share['id'])
|
|
self._maprfs_util.set_volume_size(volume_name, size)
|
|
except exception.ProcessExecutionError:
|
|
msg = (_('Failed to set space quota for the share %(share_name)s.')
|
|
% {'share_name': share['name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def get_network_allocations_number(self):
|
|
return 0
|
|
|
|
def create_share(self, context, share, share_server=None):
|
|
"""Create a MapRFS volume which acts as a share."""
|
|
metadata = self.api.get_share_metadata(context,
|
|
{'id': share['share_id']})
|
|
self._create_share(share, metadata, context)
|
|
return self._get_share_export_locations(share,
|
|
path=metadata.get('_path'))
|
|
|
|
def ensure_share(self, context, share, share_server=None):
|
|
"""Updates export location if it is changes."""
|
|
volume_name = self._get_volume_name(context, share)
|
|
if self._maprfs_util.volume_exists(volume_name):
|
|
info = self._maprfs_util.get_volume_info(volume_name)
|
|
path = info['mountdir']
|
|
old_location = share['export_locations'][0]
|
|
new_location = self._get_share_export_locations(
|
|
share, path=path)
|
|
if new_location[0]['path'] != old_location['path']:
|
|
return new_location
|
|
else:
|
|
raise exception.ShareResourceNotFound(share_id=share['share_id'])
|
|
|
|
def create_share_from_snapshot(self, context, share, snapshot,
|
|
share_server=None, parent_share=None):
|
|
"""Creates a share from snapshot."""
|
|
metadata = self.api.get_share_metadata(context,
|
|
{'id': share['share_id']})
|
|
sn_share_tenant = self.api.get_share_metadata(context, {
|
|
'id': snapshot['share_instance']['share_id']}).get('_tenantuser')
|
|
if sn_share_tenant and sn_share_tenant != metadata.get('_tenantuser'):
|
|
msg = (
|
|
_('Cannot create share from snapshot %(snapshot_name)s '
|
|
'with name %(share_name)s. Error: Tenant user should not '
|
|
'differ from tenant of the source snapshot.') %
|
|
{'snapshot_name': snapshot['name'],
|
|
'share_name': share['name']})
|
|
LOG.error(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
share_dir = metadata.get('_path', self._share_dir(share['name']))
|
|
snapshot_path = self._get_snapshot_path(snapshot)
|
|
self._create_share(share, metadata, context)
|
|
|
|
try:
|
|
if self._maprfs_util.dir_not_empty(snapshot_path):
|
|
self._maprfs_util.maprfs_cp(snapshot_path + '/*', share_dir)
|
|
except exception.ProcessExecutionError:
|
|
msg = (
|
|
_('Failed to create share from snapshot %(snapshot_name)s '
|
|
'with name %(share_name)s.') % {
|
|
'snapshot_name': snapshot['name'],
|
|
'share_name': share['name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
return self._get_share_export_locations(share,
|
|
path=metadata.get('_path'))
|
|
|
|
def create_snapshot(self, context, snapshot, share_server=None):
|
|
"""Creates a snapshot."""
|
|
volume_name = self._get_volume_name(context, snapshot['share'])
|
|
snapshot_name = snapshot['name']
|
|
try:
|
|
self._maprfs_util.create_snapshot(snapshot_name, volume_name)
|
|
return {'provider_location': snapshot_name}
|
|
except exception.ProcessExecutionError:
|
|
msg = (
|
|
_('Failed to create snapshot %(snapshot_name)s for the share '
|
|
'%(share_name)s.') % {'snapshot_name': snapshot_name,
|
|
'share_name': snapshot['share_name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def delete_share(self, context, share, share_server=None):
|
|
"""Deletes share storage."""
|
|
volume_name = self._get_volume_name(context, share)
|
|
if volume_name == "error":
|
|
LOG.info("Skipping deleting share with name %s, as it does not"
|
|
" exist on the backend", share['name'])
|
|
return
|
|
try:
|
|
self._maprfs_util.delete_volume(volume_name)
|
|
except exception.ProcessExecutionError:
|
|
msg = (_('Failed to delete share %(share_name)s.') %
|
|
{'share_name': share['name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def delete_snapshot(self, context, snapshot, share_server=None):
|
|
"""Deletes a snapshot."""
|
|
snapshot_name = snapshot['provider_location'] or snapshot['name']
|
|
volume_name = self._get_volume_name(context, snapshot['share'])
|
|
try:
|
|
self._maprfs_util.delete_snapshot(snapshot_name, volume_name)
|
|
except exception.ProcessExecutionError:
|
|
msg = (_('Failed to delete snapshot %(snapshot_name)s.') %
|
|
{'snapshot_name': snapshot['name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def update_access(self, context, share, access_rules, add_rules,
|
|
delete_rules, share_server=None):
|
|
"""Update access rules for given share."""
|
|
for access in access_rules:
|
|
if access['access_type'].lower() != 'user':
|
|
msg = _("Only 'user' access type allowed!")
|
|
LOG.error(msg)
|
|
raise exception.InvalidShareAccess(reason=msg)
|
|
volume_name = self._get_volume_name(context, share)
|
|
try:
|
|
# 'update_access' is called before share is removed, so this
|
|
# method shouldn`t raise exception if share does
|
|
# not exist actually
|
|
if not self._maprfs_util.volume_exists(volume_name):
|
|
LOG.warning('Can not get share %s.', share['name'])
|
|
return
|
|
# check update
|
|
if add_rules or delete_rules:
|
|
self._maprfs_util.remove_volume_ace_rules(volume_name,
|
|
delete_rules)
|
|
self._maprfs_util.add_volume_ace_rules(volume_name, add_rules)
|
|
else:
|
|
self._maprfs_util.set_volume_ace(volume_name, access_rules)
|
|
except exception.ProcessExecutionError:
|
|
msg = (_('Failed to update access for share %(name)s.') %
|
|
{'name': share['name']})
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def extend_share(self, share, new_size, share_server=None):
|
|
"""Extend share storage."""
|
|
self._set_share_size(share, new_size)
|
|
|
|
def shrink_share(self, share, new_size, share_server=None):
|
|
"""Shrink share storage."""
|
|
self._set_share_size(share, new_size)
|
|
|
|
def _check_maprfs_state(self):
|
|
try:
|
|
return self._maprfs_util.check_state()
|
|
except exception.ProcessExecutionError:
|
|
msg = _('Failed to check MapRFS state.')
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def check_for_setup_error(self):
|
|
"""Return an error if the prerequisites are not met."""
|
|
if not self.configuration.maprfs_clinode_ip:
|
|
msg = _(
|
|
'MapR cluster has not been specified in the configuration. '
|
|
'Add the ip or list of ip of nodes with mapr-core installed '
|
|
'in the "maprfs_clinode_ip" configuration parameter.')
|
|
LOG.error(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
if not self.configuration.maprfs_cldb_ip:
|
|
LOG.warning('CLDB nodes are not specified!')
|
|
|
|
if not self.configuration.maprfs_zookeeper_ip:
|
|
LOG.warning('Zookeeper nodes are not specified!')
|
|
|
|
if not self._check_maprfs_state():
|
|
msg = _('MapR-FS is not in healthy state.')
|
|
LOG.error(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
try:
|
|
self._maprfs_util.maprfs_ls(
|
|
os.path.join(self._base_volume_dir, ''))
|
|
except exception.ProcessExecutionError:
|
|
msg = _('Invalid "maprfs_base_volume_name". No such directory.')
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def manage_existing(self, share, driver_options):
|
|
try:
|
|
# retrieve share path from export location, maprfs:// prefix and
|
|
# metadata (-C -Z -N) should be casted away
|
|
share_path = share['export_location'].split(
|
|
)[0][len(self._maprfs_base_path):]
|
|
info = self._maprfs_util.get_volume_info_by_path(
|
|
share_path, check_if_exists=True)
|
|
if not info:
|
|
msg = _("Share %s not found") % share[
|
|
'export_location']
|
|
LOG.error(msg)
|
|
raise exception.ManageInvalidShare(reason=msg)
|
|
size = math.ceil(float(info['quota']) / units.Ki)
|
|
used = math.ceil(float(info['totalused']) / units.Ki)
|
|
volume_name = info['volumename']
|
|
should_rename = self.rename_volume
|
|
rename_option = driver_options.get('rename')
|
|
if rename_option:
|
|
should_rename = strutils.bool_from_string(rename_option)
|
|
if should_rename:
|
|
self._maprfs_util.rename_volume(volume_name, share['name'])
|
|
else:
|
|
self.api.update_share_metadata(context.get_admin_context(),
|
|
{'id': share['share_id']},
|
|
{'_name': volume_name})
|
|
location = self._get_share_export_locations(share, path=share_path)
|
|
if size == 0:
|
|
size = used
|
|
msg = (
|
|
'Share %s has no size quota. Total used value will be'
|
|
' used as share size')
|
|
LOG.warning(msg, share['name'])
|
|
return {'size': size, 'export_locations': location}
|
|
except (ValueError, KeyError, exception.ProcessExecutionError):
|
|
msg = _('Failed to manage share.')
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def manage_existing_snapshot(self, snapshot, driver_options):
|
|
volume_name = self._get_volume_name(context.get_admin_context(),
|
|
snapshot['share'])
|
|
snapshot_path = self._get_snapshot_path(snapshot)
|
|
try:
|
|
snapshot_list = self._maprfs_util.get_snapshot_list(
|
|
volume_name=volume_name)
|
|
snapshot_name = snapshot['provider_location']
|
|
if snapshot_name not in snapshot_list:
|
|
msg = _("Snapshot %s not found") % snapshot_name
|
|
LOG.error(msg)
|
|
raise exception.ManageInvalidShareSnapshot(reason=msg)
|
|
size = math.ceil(float(self._maprfs_util.maprfs_du(
|
|
snapshot_path)) / units.Gi)
|
|
return {'size': size}
|
|
except exception.ProcessExecutionError:
|
|
msg = _("Manage existing share snapshot failed.")
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
|
|
def _update_share_stats(self):
|
|
"""Retrieves stats info of share directories group."""
|
|
try:
|
|
total, free = self._maprfs_util.fs_capacity()
|
|
except exception.ProcessExecutionError:
|
|
msg = _('Failed to check MapRFS capacity info.')
|
|
LOG.exception(msg)
|
|
raise exception.MapRFSException(msg=msg)
|
|
total_capacity_gb = int(math.ceil(float(total) / units.Gi))
|
|
free_capacity_gb = int(math.floor(float(free) / units.Gi))
|
|
data = {
|
|
'share_backend_name': self.backend_name,
|
|
'storage_protocol': 'MAPRFS',
|
|
'driver_handles_share_servers': self.driver_handles_share_servers,
|
|
'vendor_name': 'MapR Technologies',
|
|
'driver_version': '1.0',
|
|
'total_capacity_gb': total_capacity_gb,
|
|
'free_capacity_gb': free_capacity_gb,
|
|
'snapshot_support': True,
|
|
'create_share_from_snapshot_support': True,
|
|
}
|
|
|
|
super(MapRFSNativeShareDriver, self)._update_share_stats(data)
|