NexentaStor 5 NFS backend driver.

This patch implements a driver for NexentaStor 5 NFS backend.

DocImpact
Change-Id: I424970b20925bf3d2eec8d6e8f633d79e07a93cb
Implements: blueprint nexentastor-5-cinder-nfs-driver
This commit is contained in:
Alexey Khodos 2015-06-10 20:29:07 +03:00
parent 75401951df
commit b022418c4a
6 changed files with 741 additions and 0 deletions

View File

@ -0,0 +1,174 @@
# Copyright 2016 Nexenta Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for OpenStack Cinder volume driver
"""
import mock
from mock import patch
from cinder import context
from cinder import db
from cinder import test
from cinder.volume import configuration as conf
from cinder.volume.drivers.nexenta.ns5 import jsonrpc
from cinder.volume.drivers.nexenta.ns5 import nfs
class TestNexentaNfsDriver(test.TestCase):
TEST_SHARE = 'host1:/pool/share'
TEST_SHARE2_OPTIONS = '-o intr'
TEST_FILE_NAME = 'test.txt'
TEST_SHARES_CONFIG_FILE = '/etc/cinder/nexenta-shares.conf'
TEST_SNAPSHOT_NAME = 'snapshot1'
TEST_VOLUME_NAME = 'volume1'
TEST_VOLUME_NAME2 = 'volume2'
TEST_VOLUME = {
'name': TEST_VOLUME_NAME,
'id': '1',
'size': 1,
'status': 'available',
'provider_location': TEST_SHARE
}
TEST_VOLUME2 = {
'name': TEST_VOLUME_NAME2,
'size': 1,
'id': '2',
'status': 'in-use'
}
TEST_SNAPSHOT = {
'name': TEST_SNAPSHOT_NAME,
'volume_name': TEST_VOLUME_NAME,
'volume_id': '1'
}
TEST_SHARE_SVC = 'svc:/network/nfs/server:default'
def setUp(self):
super(TestNexentaNfsDriver, self).setUp()
self.ctxt = context.get_admin_context()
self.cfg = mock.Mock(spec=conf.Configuration)
self.cfg.nexenta_dataset_description = ''
self.cfg.nexenta_mount_point_base = '$state_path/mnt'
self.cfg.nexenta_sparsed_volumes = True
self.cfg.nexenta_dataset_compression = 'on'
self.cfg.nexenta_dataset_dedup = 'off'
self.cfg.nfs_mount_point_base = '/mnt/test'
self.cfg.nfs_mount_attempts = 3
self.cfg.nas_mount_options = 'vers=4'
self.cfg.nfs_used_ratio = 0.5
self.cfg.reserved_percentage = 20
self.cfg.nfs_oversub_ratio = 1.0
self.cfg.nexenta_rest_protocol = 'http'
self.cfg.nexenta_rest_port = 8080
self.cfg.nexenta_user = 'user'
self.cfg.nexenta_password = 'pass'
self.cfg.max_over_subscription_ratio = 20.0
self.cfg.nas_ip = '1.1.1.1'
self.cfg.nas_share_path = 'pool/share'
self.nef_mock = mock.Mock()
self.stubs.Set(jsonrpc, 'NexentaJSONProxy',
lambda *_, **__: self.nef_mock)
self.drv = nfs.NexentaNfsDriver(configuration=self.cfg)
self.drv.db = db
self.drv.do_setup(self.ctxt)
def _create_volume_db_entry(self):
vol = {
'id': '1',
'size': 1,
'status': 'available',
'provider_location': self.TEST_SHARE
}
return db.volume_create(self.ctxt, vol)['id']
def test_check_for_setup_error(self):
self.nef_mock.get.return_value = {'data': []}
self.assertRaises(
LookupError, lambda: self.drv.check_for_setup_error())
def test_initialize_connection(self):
data = {
'export': self.TEST_VOLUME['provider_location'], 'name': 'volume'}
self.assertEqual({
'driver_volume_type': self.drv.driver_volume_type,
'data': data
}, self.drv.initialize_connection(self.TEST_VOLUME, None))
@patch('cinder.volume.drivers.nexenta.ns5.nfs.'
'NexentaNfsDriver._create_regular_file')
@patch('cinder.volume.drivers.nexenta.ns5.nfs.'
'NexentaNfsDriver._create_sparsed_file')
@patch('cinder.volume.drivers.nexenta.ns5.nfs.'
'NexentaNfsDriver._ensure_share_mounted')
@patch('cinder.volume.drivers.nexenta.ns5.nfs.'
'NexentaNfsDriver._share_folder')
def test_do_create_volume(self, share, ensure, sparsed, regular):
ensure.return_value = True
share.return_value = True
self.nef_mock.get.return_value = 'on'
self.drv._do_create_volume(self.TEST_VOLUME)
url = 'storage/pools/pool/filesystems'
data = {
'name': 'share/volume1',
'compressionMode': 'on',
'dedupMode': 'off',
}
self.nef_mock.post.assert_called_with(url, data)
@patch('cinder.volume.drivers.nexenta.ns5.nfs.'
'NexentaNfsDriver._ensure_share_mounted')
def test_delete_volume(self, ensure):
self._create_volume_db_entry()
self.nef_mock.get.return_value = {}
self.drv.delete_volume(self.TEST_VOLUME)
self.nef_mock.delete.assert_called_with(
'storage/pools/pool/filesystems/share%2Fvolume1?snapshots=true')
def test_create_snapshot(self):
self._create_volume_db_entry()
self.drv.create_snapshot(self.TEST_SNAPSHOT)
url = 'storage/pools/pool/filesystems/share%2Fvolume-1/snapshots'
data = {'name': self.TEST_SNAPSHOT['name']}
self.nef_mock.post.assert_called_with(url, data)
def test_delete_snapshot(self):
self._create_volume_db_entry()
self.drv.delete_snapshot(self.TEST_SNAPSHOT)
url = ('storage/pools/pool/filesystems/share%2Fvolume-1/'
'snapshots/snapshot1')
self.drv.delete_snapshot(self.TEST_SNAPSHOT)
self.nef_mock.delete.assert_called_with(url)
@patch('cinder.volume.drivers.nexenta.ns5.nfs.'
'NexentaNfsDriver._share_folder')
def test_create_volume_from_snapshot(self, share):
self._create_volume_db_entry()
url = ('storage/filesystems/pool%2Fshare%2Fvolume2/promote')
self.drv.create_volume_from_snapshot(
self.TEST_VOLUME2, self.TEST_SNAPSHOT)
self.nef_mock.post.assert_called_with(url)
def test_get_capacity_info(self):
self.nef_mock.get.return_value = {
'bytesAvailable': 1000,
'bytesUsed': 100}
self.assertEqual(
(1000, 900, 100), self.drv._get_capacity_info('pool/share'))

View File

@ -0,0 +1,105 @@
# Copyright 2011 Nexenta Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nexenta.jsonrpc` -- Nexenta-specific JSON RPC client
=====================================================================
.. automodule:: nexenta.jsonrpc
"""
import time
from oslo_log import log as logging
from oslo_serialization import jsonutils
import requests
from cinder import exception
LOG = logging.getLogger(__name__)
class NexentaJSONProxy(object):
def __init__(self, scheme, host, port, user,
password, auto=False, method=None):
self.scheme = scheme
self.host = host
self.port = port
self.user = user
self.password = password
self.auto = True
self.method = method
@property
def url(self):
return '%s://%s:%s/' % (self.scheme, self.host, self.port)
def __getattr__(self, method=None):
if method:
return NexentaJSONProxy(
self.scheme, self.host, self.port,
self.user, self.password, self.auto, method)
def __hash__(self):
return self.url.__hash__()
def __repr__(self):
return 'NEF proxy: %s' % self.url
def __call__(self, path, data=None):
auth = ('%s:%s' % (self.user, self.password)).encode('base64')[:-1]
headers = {
'Content-Type': 'application/json',
'Authorization': 'Basic %s' % auth
}
url = self.url + path
if data:
data = jsonutils.dumps(data)
LOG.debug('Sending JSON to url: %s, data: %s, method: %s',
path, data, self.method)
if self.method == 'get':
resp = requests.get(url, headers=headers)
if self.method == 'post':
resp = requests.post(url, data=data, headers=headers)
if self.method == 'put':
resp = requests.put(url, data=data, headers=headers)
if self.method == 'delete':
resp = requests.delete(url, data=data, headers=headers)
if resp.status_code == 201 or (
resp.status_code == 200 and not resp.content):
LOG.debug('Got response: Success')
return 'Success'
response = resp.json()
resp.close()
if response and resp.status_code == 202:
url = self.url + response['links'][0]['href']
while resp.status_code == 202:
time.sleep(1)
resp = requests.get(url)
if resp.status_code == 201 or (
resp.status_code == 200 and not resp.content):
LOG.debug('Got response: Success')
return 'Success'
else:
response = resp.json()
resp.close()
if response.get('code'):
raise exception.NexentaException(response)
LOG.debug('Got response: %s', response)
return response

View File

@ -0,0 +1,457 @@
# Copyright 2016 Nexenta Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nexenta.nfs` -- Driver to store volumes on NexentaStor Appliance.
=======================================================================
.. automodule:: nexenta.nfs
"""
import hashlib
import os
from oslo_log import log as logging
from cinder import context
from cinder import db
from cinder import exception
from cinder.i18n import _, _LE, _LI, _LW
from cinder.volume.drivers.nexenta.ns5 import jsonrpc
from cinder.volume.drivers.nexenta import options
from cinder.volume.drivers.nexenta import utils
from cinder.volume.drivers import nfs
VERSION = '1.0.0'
LOG = logging.getLogger(__name__)
class NexentaNfsDriver(nfs.NfsDriver): # pylint: disable=R0921
"""Executes volume driver commands on Nexenta Appliance.
Version history:
1.0.0 - Initial driver version.
"""
driver_prefix = 'nexenta'
volume_backend_name = 'NexentaNfsDriver'
VERSION = VERSION
def __init__(self, *args, **kwargs):
super(NexentaNfsDriver, self).__init__(*args, **kwargs)
if self.configuration:
self.configuration.append_config_values(
options.NEXENTA_CONNECTION_OPTS)
self.configuration.append_config_values(
options.NEXENTA_NFS_OPTS)
self.configuration.append_config_values(
options.NEXENTA_DATASET_OPTS)
self.nfs_mount_point_base = self.configuration.nexenta_mount_point_base
self.dataset_compression = (
self.configuration.nexenta_dataset_compression)
self.dataset_deduplication = self.configuration.nexenta_dataset_dedup
self.dataset_description = (
self.configuration.nexenta_dataset_description)
self.sparsed_volumes = self.configuration.nexenta_sparsed_volumes
self.nef = None
self.nef_protocol = self.configuration.nexenta_rest_protocol
self.nef_host = self.configuration.nas_ip
self.share = self.configuration.nas_share_path
self.nef_port = self.configuration.nexenta_rest_port
self.nef_user = self.configuration.nexenta_user
self.nef_password = self.configuration.nexenta_password
@property
def backend_name(self):
backend_name = None
if self.configuration:
backend_name = self.configuration.safe_get('volume_backend_name')
if not backend_name:
backend_name = self.__class__.__name__
return backend_name
def do_setup(self, context):
if self.nef_protocol == 'auto':
protocol, auto = 'http', True
else:
protocol, auto = self.nef_protocol, False
self.nef = jsonrpc.NexentaJSONProxy(
protocol, self.nef_host, self.nef_port, self.nef_user,
self.nef_password, auto=auto)
def check_for_setup_error(self):
"""Verify that the volume for our folder exists.
:raise: :py:exc:`LookupError`
"""
pool_name, fs = self._get_share_datasets(self.share)
url = 'storage/pools/%s' % (pool_name)
if not self.nef.get(url):
raise LookupError(_("Pool %s does not exist in Nexenta "
"Store appliance") % pool_name)
url = 'storage/pools/%s/filesystems/%s' % (
pool_name, fs)
if not self.nef.get(url):
raise LookupError(_("filesystem %s does not exist in "
"Nexenta Store appliance") % fs)
path = '/'.join([pool_name, fs])
shared = False
response = self.nef.get('nas/nfs')
for share in response['data']:
if share.get('filesystem') == path:
shared = True
break
if not shared:
raise LookupError(_("Dataset %s is not shared in Nexenta "
"Store appliance") % path)
def initialize_connection(self, volume, connector):
"""Allow connection to connector and return connection info.
:param volume: volume reference
:param connector: connector reference
"""
data = {'export': volume['provider_location'], 'name': 'volume'}
if volume['provider_location'] in self.shares:
data['options'] = self.shares[volume['provider_location']]
return {
'driver_volume_type': self.driver_volume_type,
'data': data
}
def create_volume(self, volume):
"""Creates a volume.
:param volume: volume reference
:returns: provider_location update dict for database
"""
self._do_create_volume(volume)
return {'provider_location': volume['provider_location']}
def _do_create_volume(self, volume):
pool, fs = self._get_share_datasets(self.share)
filesystem = '%s/%s/%s' % (pool, fs, volume['name'])
LOG.debug('Creating filesystem on NexentaStor %s', filesystem)
url = 'storage/pools/%s/filesystems' % pool
data = {
'name': '/'.join([fs, volume['name']]),
'compressionMode': self.dataset_compression,
'dedupMode': self.dataset_deduplication,
}
self.nef.post(url, data)
volume['provider_location'] = '%s:/%s/%s' % (
self.nef_host, self.share, volume['name'])
try:
self._share_folder(fs, volume['name'])
self._ensure_share_mounted('%s:/%s/%s' % (
self.nef_host, self.share, volume['name']))
volume_size = volume['size']
if getattr(self.configuration,
self.driver_prefix + '_sparsed_volumes'):
self._create_sparsed_file(self.local_path(volume), volume_size)
else:
url = 'storage/pools/%s/filesystems/%s' % (
pool, '%2F'.join([fs, volume['name']]))
compression = self.nef.get(url).get('compressionMode')
if compression != 'off':
# Disable compression, because otherwise will not use space
# on disk.
self.nef.put(url, {'compressionMode': 'off'})
try:
self._create_regular_file(
self.local_path(volume), volume_size)
finally:
if compression != 'off':
# Backup default compression value if it was changed.
self.nef.put(url, {'compressionMode': compression})
except exception.NexentaException:
try:
url = 'storage/pools/%s/filesystems/%s/%s' % (
pool, '%2F'.join([fs, volume['name']]))
self.nef.delete(url)
except exception.NexentaException:
LOG.warning(_LW("Cannot destroy created folder: "
"%(vol)s/%(folder)s"),
{'vol': pool, 'folder': '/'.join(
[fs, volume['name']])})
raise
def delete_volume(self, volume):
"""Deletes a logical volume.
:param volume: volume reference
"""
pool, fs = self._get_share_datasets(self.share)
url = ('storage/pools/%(pool)s/filesystems/%(fs)s') % {
'pool': pool,
'fs': '%2F'.join([fs, volume['name']])
}
origin = self.nef.get(url).get('originalSnapshot')
url = ('storage/pools/%(pool)s/filesystems/'
'%(fs)s?snapshots=true') % {
'pool': pool,
'fs': '%2F'.join([fs, volume['name']])
}
try:
self.nef.delete(url)
except exception.NexentaException as exc:
if 'Failed to destroy snapshot' in exc.args[0]:
LOG.debug('Snapshot has dependent clones, skipping')
else:
raise
try:
if origin and self._is_clone_snapshot_name(origin):
path, snap = origin.split('@')
pool, fs = path.split('/', 1)
snap_url = ('storage/pools/%(pool)s/'
'filesystems/%(fs)s/snapshots/%(snap)s') % {
'pool': pool,
'fs': fs,
'snap': snap
}
self.nef.delete(snap_url)
except exception.NexentaException as exc:
if 'does not exist' in exc.args[0]:
LOG.debug(
'Volume %s does not exist on appliance', '/'.join(
[pool, fs]))
def create_snapshot(self, snapshot):
"""Creates a snapshot.
:param snapshot: snapshot reference
"""
volume = self._get_snapshot_volume(snapshot)
pool, fs = self._get_share_datasets(self.share)
url = 'storage/pools/%(pool)s/filesystems/%(fs)s/snapshots' % {
'pool': pool,
'fs': '%2F'.join([fs, volume['name']]),
}
data = {'name': snapshot['name']}
self.nef.post(url, data)
def delete_snapshot(self, snapshot):
"""Deletes a snapshot.
:param snapshot: snapshot reference
"""
volume = self._get_snapshot_volume(snapshot)
pool, fs = self._get_share_datasets(self.share)
url = ('storage/pools/%(pool)s/'
'filesystems/%(fs)s/snapshots/%(snap)s') % {
'pool': pool,
'fs': '%2F'.join([fs, volume['name']]),
'snap': snapshot['name']
}
try:
self.nef.delete(url)
except exception.NexentaException as exc:
if 'EBUSY' is exc:
LOG.warning(_LW(
'Could not delete snapshot %s - it has dependencies'),
snapshot['name'])
def create_volume_from_snapshot(self, volume, snapshot):
"""Create new volume from other's snapshot on appliance.
:param volume: reference of volume to be created
:param snapshot: reference of source snapshot
"""
snapshot_vol = self._get_snapshot_volume(snapshot)
volume['provider_location'] = snapshot_vol['provider_location']
pool, fs = self._get_share_datasets(self.share)
dataset_path = '%s/%s' % (pool, fs)
url = ('storage/pools/%(pool)s/'
'filesystems/%(fs)s/snapshots/%(snap)s/clone') % {
'pool': pool,
'fs': '%2F'.join([fs, snapshot_vol['name']]),
'snap': snapshot['name']
}
path = '/'.join([pool, fs, volume['name']])
data = {'targetPath': path}
self.nef.post(url, data)
path = '%2F'.join([pool, fs, volume['name']])
url = 'storage/filesystems/%s/promote' % path
self.nef.post(url)
try:
self._share_folder(fs, volume['name'])
except exception.NexentaException:
try:
url = ('storage/pools/%(pool)s/'
'filesystems/%(fs)s') % {
'pool': pool,
'fs': volume['name']
}
self.nef.delete(url)
except exception.NexentaException:
LOG.warning(_LW("Cannot destroy cloned filesystem: "
"%(vol)s/%(filesystem)s"),
{'vol': dataset_path,
'filesystem': volume['name']})
raise
return {'provider_location': volume['provider_location']}
def create_cloned_volume(self, volume, src_vref):
"""Creates a clone of the specified volume.
:param volume: new volume reference
:param src_vref: source volume reference
"""
LOG.info(_LI('Creating clone of volume: %s'), src_vref['id'])
snapshot = {'volume_name': src_vref['name'],
'volume_id': src_vref['id'],
'name': self._get_clone_snapshot_name(volume)}
self.create_snapshot(snapshot)
try:
return self.create_volume_from_snapshot(volume, snapshot)
except exception.NexentaException:
LOG.error(_LE('Volume creation failed, deleting created snapshot '
'%(volume_name)s@%(name)s'), snapshot)
try:
self.delete_snapshot(snapshot)
except (exception.NexentaException, exception.SnapshotIsBusy):
LOG.warning(_LW('Failed to delete zfs snapshot '
'%(volume_name)s@%(name)s'), snapshot)
raise
self.delete_snapshot(snapshot)
def local_path(self, volume):
"""Get volume path (mounted locally fs path) for given volume.
:param volume: volume reference
"""
nfs_share = volume['provider_location']
return os.path.join(self._get_mount_point_for_share(nfs_share),
'volume')
def _get_mount_point_for_share(self, nfs_share):
"""Returns path to mount point NFS share.
:param nfs_share: example 172.18.194.100:/var/nfs
"""
nfs_share = nfs_share.encode('utf-8')
return os.path.join(self.configuration.nexenta_mount_point_base,
hashlib.md5(nfs_share).hexdigest())
def _share_folder(self, path, filesystem):
"""Share NFS filesystem on NexentaStor Appliance.
:param nef: nef object
:param path: path to parent filesystem
:param filesystem: filesystem that needs to be shared
"""
pool = self.share.split('/')[0]
LOG.debug(
'Creating ACL for filesystem %s on Nexenta Store', filesystem)
url = 'storage/pools/%s/filesystems/%s/acl' % (
pool, '%2F'.join([path.replace('/', '%2F'), filesystem]))
data = {
"type": "allow",
"principal": "everyone@",
"permissions": [
"list_directory",
"read_data",
"add_file",
"write_data",
"add_subdirectory",
"append_data",
"read_xattr",
"write_xattr",
"execute",
"delete_child",
"read_attributes",
"write_attributes",
"delete",
"read_acl",
"write_acl",
"write_owner",
"synchronize"
],
"flags": [
"file_inherit",
"dir_inherit"
]
}
self.nef.post(url, data)
LOG.debug(
'Successfully shared filesystem %s', '/'.join(
[path, filesystem]))
def _get_capacity_info(self, path):
"""Calculate available space on the NFS share.
:param path: example pool/nfs
"""
pool, fs = self._get_share_datasets(path)
url = 'storage/pools/%s/filesystems/%s' % (
pool, fs)
data = self.nef.get(url)
total = utils.str2size(data['bytesAvailable'])
allocated = utils.str2size(data['bytesUsed'])
free = total - allocated
return total, free, allocated
def _get_snapshot_volume(self, snapshot):
ctxt = context.get_admin_context()
return db.volume_get(ctxt, snapshot['volume_id'])
def _get_share_datasets(self, nfs_share):
pool_name, fs = nfs_share.split('/', 1)
return pool_name, fs
def _get_clone_snapshot_name(self, volume):
"""Return name for snapshot that will be used to clone the volume."""
return 'cinder-clone-snapshot-%(id)s' % volume
def _is_clone_snapshot_name(self, snapshot):
"""Check if snapshot is created for cloning."""
name = snapshot.split('@')[-1]
return name.startswith('cinder-clone-snapshot-')
def _update_volume_stats(self):
"""Retrieve stats info for NexentaStor appliance."""
LOG.debug('Updating volume stats')
share = ':/'.join([self.nef_host, self.share])
total, free, allocated = self._get_capacity_info(self.share)
total_space = utils.str2gib_size(total)
free_space = utils.str2gib_size(free)
location_info = '%(driver)s:%(share)s' % {
'driver': self.__class__.__name__,
'share': share
}
self._stats = {
'vendor_name': 'Nexenta',
'dedup': self.dataset_deduplication,
'compression': self.dataset_compression,
'description': self.dataset_description,
'nef_url': self.nef_host,
'driver_version': self.VERSION,
'storage_protocol': 'NFS',
'total_capacity_gb': total_space,
'free_capacity_gb': free_space,
'reserved_percentage': self.configuration.reserved_percentage,
'QoS_support': False,
'location_info': location_info,
'volume_backend_name': self.backend_name,
'nfs_mount_point_base': self.nfs_mount_point_base
}

View File

@ -81,6 +81,9 @@ NEXENTA_ISCSI_OPTS = [
cfg.StrOpt('nexenta_target_group_prefix',
default='cinder/',
help='Prefix for iSCSI target groups on SA'),
cfg.StrOpt('nexenta_volume_group',
default='iscsi',
help='Volume group for ns5'),
]
NEXENTA_NFS_OPTS = [

View File

@ -0,0 +1,2 @@
features:
- Added backend driver for NexentaStor5 NFS storage.