EMC Isilon Manila driver

The Isilon Manila driver supports the following functionalities:
* Create/delete NFS/CIFS share
* Create/delete snapshot
* Allow/deny access to share
* Create share from snapshot

implements: blueprint emc-isilon-manila-driver

Change-Id: If5f064751120890afba0571bc2b407d141ba0323
This commit is contained in:
Shaun Edwards 2015-02-10 16:54:01 -08:00
parent 9bcb9ad76c
commit 9cc913ed53
11 changed files with 1641 additions and 0 deletions

View File

@ -51,6 +51,7 @@ import manila.service
import manila.share.api
import manila.share.driver
import manila.share.drivers.emc.driver
import manila.share.drivers.emc.plugins.isilon.isilon
import manila.share.drivers.generic
import manila.share.drivers.glusterfs
import manila.share.drivers.glusterfs_native
@ -111,6 +112,7 @@ _global_opt_lists = [
manila.share.driver.share_opts,
manila.share.driver.ssh_opts,
manila.share.drivers.emc.driver.EMC_NAS_OPTS,
manila.share.drivers.emc.plugins.isilon.isilon.ISILON_OPTS,
manila.share.drivers.generic.share_opts,
manila.share.drivers.glusterfs.GlusterfsManilaShare_opts,
manila.share.drivers.glusterfs_native.glusterfs_native_manila_share_opts,

View File

@ -0,0 +1,334 @@
# Copyright 2015 EMC Corporation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Isilon specific NAS backend plugin.
"""
import os
from oslo_config import cfg
from oslo_log import log
import six
from manila import exception
from manila.i18n import _, _LW
from manila.share.drivers.emc.plugins import base
from manila.share.drivers.emc.plugins.isilon import isilon_api
CONF = cfg.CONF
VERSION = "0.1.0"
ISILON_OPTS = [
cfg.StrOpt('isilon_share_root_dir', default='/ifs/manila-shares',
help='The path on Isilon where the manila shares will be '
'created.')]
CONF.register_opts(ISILON_OPTS)
LOG = log.getLogger(__name__)
class IsilonStorageConnection(base.StorageConnection):
"""Implements Isilon specific functionality for EMC Manila driver."""
def __init__(self, *args, **kwargs):
super(IsilonStorageConnection, self).__init__(*args, **kwargs)
self._server = None
self._port = None
self._username = None
self._password = None
self._server_url = None
self._connect_resp = None
self._root_dir = None
self._verify_ssl_cert = None
self._containers = {}
self._shares = {}
self._snapshots = {}
self._isilon_api = None
self._isilon_api_class = isilon_api.IsilonApi
self.driver_handles_share_servers = False
def _get_container_path(self, share):
"""Return path to a container."""
return os.path.join(self._root_dir, share['name'])
def create_share(self, emc_share_driver, context, share, share_server):
"""Is called to create share."""
if share['share_proto'] == 'NFS':
location = self._create_nfs_share(share)
elif share['share_proto'] == 'CIFS':
location = self._create_cifs_share(share)
else:
message = (_('Unsupported share protocol: %(proto)s.') %
{'proto': share['share_proto']})
LOG.error(message)
raise exception.InvalidShare(message=message)
return location
def create_share_from_snapshot(self, emc_share_driver, context, share,
snapshot, share_server):
"""Creates a share from the snapshot."""
# Create share at new location
location = self.create_share(
emc_share_driver, context, share, share_server)
# Clone snapshot to new location
fq_target_dir = self._get_container_path(share)
self._isilon_api.clone_snapshot(snapshot['name'], fq_target_dir)
return location
def _create_nfs_share(self, share):
"""Is called to create nfs share."""
container_path = self._get_container_path(share)
self._isilon_api.create_directory(container_path)
share_created = self._isilon_api.create_nfs_export(container_path)
if not share_created:
message = (
_('The requested NFS share "%(share)s" was not created.') %
{'share': share['name']})
LOG.error(message)
raise exception.ShareBackendException(message=message)
location = '{0}:{1}'.format(self._server, container_path)
return location
def _create_cifs_share(self, share):
"""Is called to create cifs share."""
# Create the directory
container_path = self._get_container_path(share)
self._isilon_api.create_directory(container_path)
self._isilon_api.create_smb_share(share['name'], container_path)
share_path = '\\\\{0}\\{1}'.format(self._server, share['name'])
return share_path
def create_snapshot(self, emc_share_driver, context,
snapshot, share_server):
"""Is called to create snapshot."""
snapshot_path = os.path.join(self._root_dir, snapshot['share_name'])
self._isilon_api.create_snapshot(snapshot['name'], snapshot_path)
def delete_share(self, emc_share_driver, context, share, share_server):
"""Is called to remove share."""
if share['share_proto'] == 'NFS':
self._delete_nfs_share(share)
elif share['share_proto'] == 'CIFS':
self._delete_cifs_share(share)
else:
message = (_('Unsupported share type: %(type)s.') %
{'type': share['share_proto']})
LOG.error(message)
raise exception.InvalidShare(message=message)
def _delete_nfs_share(self, share):
"""Is called to remove nfs share."""
share_id = self._isilon_api.lookup_nfs_export(
self._root_dir + '/' + share['name'])
if share_id is None:
lw = _LW('Attempted to delete NFS Share "%s", but the share does '
'not appear to exist.')
LOG.warn(lw, share['name'])
else:
# attempt to delete the share
export_deleted = self._isilon_api.delete_nfs_share(share_id)
if not export_deleted:
message = _('Error deleting NFS share: %s') % share['name']
LOG.error(message)
raise exception.ShareBackendException(message=message)
def _delete_cifs_share(self, share):
"""Is called to remove CIFS share."""
smb_share = self._isilon_api.lookup_smb_share(share['name'])
if smb_share is None:
lw = _LW('Attempted to delete CIFS Share "%s", but the share does '
'not appear to exist.')
LOG.warn(lw, share['name'])
else:
share_deleted = self._isilon_api.delete_smb_share(share['name'])
if not share_deleted:
message = _('Error deleting CIFS share: %s') % share['name']
LOG.error(message)
raise exception.ShareBackendException(message=message)
def delete_snapshot(self, emc_share_driver, context,
snapshot, share_server):
"""Is called to remove snapshot."""
self._isilon_api.delete_snapshot(snapshot['name'])
def ensure_share(self, emc_share_driver, context, share, share_server):
"""Invoked to ensure that share is exported."""
def allow_access(self, emc_share_driver, context, share,
access, share_server):
"""Allow access to the share."""
# TODO(sedwards): Look into supporting ro/rw access to shares
if access['access_type'] != 'ip':
message = _('Only ip access type allowed.')
LOG.error(message)
raise exception.ShareBackendException(message=message)
access_ip = access['access_to']
if share['share_proto'] == 'NFS':
export_path = self._get_container_path(share)
self._nfs_allow_access(access_ip, export_path)
elif share['share_proto'] == 'CIFS':
self._cifs_allow_access(access_ip, share)
else:
message = _(
'Unsupported share protocol: %s. Only "NFS" and '
'"CIFS" are currently supported share protocols.') % share[
'share_proto']
LOG.error(message)
raise exception.InvalidShare(message=message)
def _nfs_allow_access(self, access_ip, export_path):
"""Allow access to nfs share."""
share_id = self._isilon_api.lookup_nfs_export(export_path)
# Get current allowed clients
export = self._isilon_api.get_nfs_export(share_id)
current_clients = export['clients']
# Format of ips could be '10.0.0.2', or '10.0.0.2, 10.0.0.0/24'
ips = list()
ips.append(access_ip)
ips.extend(current_clients)
export_params = {"clients": ips}
url = '{0}/platform/1/protocols/nfs/exports/{1}'.format(
self._server_url, share_id)
resp = self._isilon_api.request('PUT', url, data=export_params)
resp.raise_for_status()
def _cifs_allow_access(self, ip, share):
"""Allow access to cifs share."""
allowed_ip = 'allow:' + ip
smb_share = self._isilon_api.lookup_smb_share(share['name'])
host_acl = smb_share['host_acl']
if allowed_ip not in host_acl:
host_acl.append(allowed_ip)
data = {'host_acl': host_acl}
url = ('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._server_url, smb_share['name']))
r = self._isilon_api.request('PUT', url, data=data)
r.raise_for_status()
def deny_access(self, emc_share_driver, context, share,
access, share_server):
"""Deny access to the share."""
if access['access_type'] != 'ip':
return
denied_ip = access['access_to']
if share['share_proto'] == 'NFS':
self._nfs_deny_access(denied_ip, share)
elif share['share_proto'] == 'CIFS':
self._cifs_deny_access(denied_ip, share)
def _nfs_deny_access(self, denied_ip, share):
"""Deny access to nfs share."""
# Get list of currently allowed client ips
export_id = self._isilon_api.lookup_nfs_export(
self._get_container_path(share))
if export_id is None:
message = _('Share %s should have been created, but was not '
'found.') % share['name']
LOG.error(message)
raise exception.ShareBackendException(message=message)
clients = self._get_nfs_ip_access_list(export_id)
allowed_ips = set(clients)
if allowed_ips.__contains__(denied_ip):
allowed_ips.remove(denied_ip)
data = {"clients": list(allowed_ips)}
url = ('{0}/platform/1/protocols/nfs/exports/{1}'
.format(self._server_url, six.text_type(export_id)))
r = self._isilon_api.request('PUT', url, data=data)
r.raise_for_status()
def _get_nfs_ip_access_list(self, export_id):
export = self._isilon_api.get_nfs_export(export_id)
if export is None:
message = _('NFS share with export id %d should have been '
'created, but was not found.') % export_id
LOG.error(message)
raise exception.ShareBackendException(message=message)
return export["clients"]
def _cifs_deny_access(self, denied_ip, share):
"""Deny access to cifs share."""
share_json = self._isilon_api.lookup_smb_share(share['name'])
host_acl_list = share_json['host_acl']
allow_ip = 'allow:' + denied_ip
if allow_ip in host_acl_list:
host_acl_list.remove(allow_ip)
share_params = {"host_acl": host_acl_list}
url = ('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._server_url, share['name']))
resp = self._isilon_api.request('PUT', url, data=share_params)
resp.raise_for_status()
def connect(self, emc_share_driver, context):
"""Connect to an Isilon cluster."""
self._server = emc_share_driver.configuration.safe_get(
"emc_nas_server")
self._port = (
int(emc_share_driver.configuration.safe_get("emc_nas_server_port"))
)
self._server_url = ('https://' + self._server + ':' +
six.text_type(self._port))
self._username = emc_share_driver.configuration.safe_get(
"emc_nas_login")
self._password = emc_share_driver.configuration.safe_get(
"emc_nas_password")
self._root_dir = emc_share_driver.configuration.safe_get(
"isilon_share_root_dir")
# TODO(Shaun Edwards): make verify ssl a config variable?
self._verify_ssl_cert = False
self._isilon_api = self._isilon_api_class(self._server_url, auth=(
self._username, self._password),
verify_ssl_cert=self._verify_ssl_cert)
if not self._isilon_api.is_path_existent(self._root_dir):
self._isilon_api.create_directory(self._root_dir, recursive=True)
def update_share_stats(self, stats_dict):
"""TODO."""
# TODO(Shaun Edwards): query capacity, set storage_protocol,
# QoS support?
stats_dict['driver_version'] = VERSION
def get_network_allocations_number(self):
"""Returns number of network allocations for creating VIFs."""
# TODO(Shaun Edwards)
return 0
def setup_server(self, network_info, metadata=None):
"""Set up and configures share server with given network parameters."""
# TODO(Shaun Edwards): Look into supporting share servers
def teardown_server(self, server_details,
security_services=None):
"""Teardown share server."""
# TODO(Shaun Edwards): Look into supporting share servers

View File

@ -0,0 +1,216 @@
# Copyright (c) 2015 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from oslo_serialization import jsonutils
import requests
import six
LOG = log.getLogger(__name__)
class IsilonApi(object):
def __init__(self, api_url, auth, verify_ssl_cert=True):
self.host_url = api_url
self.session = requests.session()
self.session.auth = auth
self.verify_ssl_cert = verify_ssl_cert
def create_directory(self, container_path, recursive=False):
"""Create a directory."""
headers = {"x-isi-ifs-target-type": "container"}
url = (self.host_url + "/namespace" + container_path + '?recursive='
+ six.text_type(recursive))
r = self.request('PUT', url,
headers=headers)
return r.status_code == 200
def clone_snapshot(self, snapshot_name, fq_target_dir):
self.create_directory(fq_target_dir)
snapshot = self.get_snapshot(snapshot_name)
snapshot_path = snapshot['path']
# remove /ifs from start of path
relative_snapshot_path = snapshot_path[4:]
fq_snapshot_path = ('/ifs/.snapshot/' + snapshot_name +
relative_snapshot_path)
self._clone_directory_contents(fq_snapshot_path, fq_target_dir,
snapshot_name, relative_snapshot_path)
def _clone_directory_contents(self, fq_source_dir, fq_target_dir,
snapshot_name, relative_path):
dir_listing = self.get_directory_listing(fq_source_dir)
for item in dir_listing['children']:
name = item['name']
source_item_path = fq_source_dir + '/' + name
new_relative_path = relative_path + '/' + name
dest_item_path = fq_target_dir + '/' + name
if item['type'] == 'container':
# create the container name in the target dir & clone dir
self.create_directory(dest_item_path)
self._clone_directory_contents(source_item_path,
dest_item_path,
snapshot_name,
new_relative_path)
elif item['type'] == 'object':
self.clone_file_from_snapshot('/ifs' + new_relative_path,
dest_item_path, snapshot_name)
def clone_file_from_snapshot(self, fq_file_path, fq_dest_path,
snapshot_name):
headers = {'x-isi-ifs-copy-source': '/namespace' + fq_file_path}
snapshot_suffix = '&snapshot=' + snapshot_name
url = (self.host_url + '/namespace' + fq_dest_path + '?clone=true' +
snapshot_suffix)
self.request('PUT', url, headers=headers)
def get_directory_listing(self, fq_dir_path):
url = self.host_url + '/namespace' + fq_dir_path + '?detail=default'
r = self.request('GET', url)
r.raise_for_status()
return r.json()
def is_path_existent(self, resource_path):
url = self.host_url + '/namespace' + resource_path
r = self.request('HEAD', url)
if r.status_code == 200:
return True
elif r.status_code == 404:
return False
else:
r.raise_for_status()
def get_snapshot(self, snapshot_name):
r = self.request('GET',
self.host_url + '/platform/1/snapshot/snapshots/' +
snapshot_name)
snapshot_json = r.json()
if r.status_code == 200:
return snapshot_json['snapshots'][0]
elif r.status_code == 404:
return None
else:
r.raise_for_status()
def get_snapshots(self):
r = self.request('GET',
self.host_url + '/platform/1/snapshot/snapshots')
if r.status_code == 200:
return r.json()
else:
r.raise_for_status()
def lookup_nfs_export(self, share_path):
response = self.session.get(
self.host_url + '/platform/1/protocols/nfs/exports',
verify=self.verify_ssl_cert)
nfs_exports_json = response.json()
for export in nfs_exports_json['exports']:
for path in export['paths']:
if path == share_path:
return export['id']
return None
def get_nfs_export(self, export_id):
response = self.request('GET',
self.host_url +
'/platform/1/protocols/nfs/exports/' +
six.text_type(export_id))
if response.status_code == 200:
return response.json()['exports'][0]
else:
return None
def lookup_smb_share(self, share_name):
response = self.session.get(
self.host_url + '/platform/1/protocols/smb/shares/' + share_name)
if response.status_code == 200:
return response.json()['shares'][0]
else:
return None
def create_nfs_export(self, export_path):
"""Creates an NFS export using the Platform API.
:param export_path: a string specifying the desired export path
:return: "True" if created successfully; "False" otherwise
"""
data = {'paths': [export_path]}
url = self.host_url + '/platform/1/protocols/nfs/exports'
response = self.request('POST', url, data=data)
return response.status_code == 201
def create_smb_share(self, share_name, share_path):
"""Creates an SMB/CIFS share.
:param share_name: the name of the CIFS share
:param share_path: the path associated with the CIFS share
:return: "True" if the share created successfully; returns "False"
otherwise
"""
data = {}
data['name'] = share_name
data['path'] = share_path
url = self.host_url + '/platform/1/protocols/smb/shares'
response = self.request('POST', url, data=data)
return response.status_code == 201
def create_snapshot(self, snapshot_name, snapshot_path):
"""Creates a snapshot."""
data = {'name': snapshot_name, 'path': snapshot_path}
r = self.request('POST',
self.host_url + '/platform/1/snapshot/snapshots',
data=data)
if r.status_code == 201:
return True
else:
r.raise_for_status()
def delete(self, fq_resource_path, recursive=False):
"""Deletes a file or folder."""
r = self.request('DELETE',
self.host_url + '/namespace' + fq_resource_path +
'?recursive=' + six.text_type(recursive))
r.raise_for_status()
def delete_nfs_share(self, share_number):
response = self.session.delete(
self.host_url + '/platform/1/protocols/nfs/exports' + '/' +
six.text_type(share_number))
return response.status_code == 204
def delete_smb_share(self, share_name):
url = self.host_url + '/platform/1/protocols/smb/shares/' + share_name
response = self.request('DELETE', url)
return response.status_code == 204
def delete_snapshot(self, snapshot_name):
response = self.request(
'DELETE', '{0}/platform/1/snapshot/snapshots/{1}'
.format(self.host_url, snapshot_name))
response.raise_for_status()
def request(self, method, url, headers=None, data=None):
if data is not None:
data = jsonutils.dumps(data)
r = self.session.request(method, url, headers=headers, data=data,
verify=self.verify_ssl_cert)
return r

View File

@ -0,0 +1,574 @@
# Copyright (c) 2015 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import mock
from oslo_log import log
from manila import exception
from manila.share.drivers.emc.plugins.isilon import isilon
from manila import test
LOG = log.getLogger(__name__)
class IsilonTest(test.TestCase):
"""Integration test for the Isilon Manila driver."""
ISILON_ADDR = '10.0.0.1'
API_URL = 'https://%s:8080' % ISILON_ADDR
AUTH = ('admin', 'admin')
ROOT_DIR = '/ifs/manila-test'
SHARE_NAME = 'share-foo'
SHARE_DIR = ROOT_DIR + '/' + SHARE_NAME
ADMIN_HOME_DIR = '/ifs/home/admin'
CLONE_DIR = ROOT_DIR + '/clone-dir'
class MockConfig(object):
def safe_get(self, value):
if value == 'emc_nas_server':
return '10.0.0.1'
elif value == 'emc_nas_server_port':
return '8080'
elif value == 'emc_nas_login':
return 'admin'
elif value == 'emc_nas_password':
return 'a'
elif value == 'isilon_share_root_dir':
return '/ifs/manila-test'
else:
return None
@mock.patch(
'manila.share.drivers.emc.plugins.isilon.isilon.isilon_api.IsilonApi',
autospec=True)
def setUp(self, mock_isi_api):
super(IsilonTest, self).setUp()
self._mock_isilon_api = mock_isi_api.return_value
self.storage_connection = isilon.IsilonStorageConnection(LOG)
self.mock_context = mock.Mock('Context')
self.mock_emc_driver = mock.Mock('EmcDriver')
self.mock_emc_driver.attach_mock(self.MockConfig(), 'configuration')
self.storage_connection.connect(
self.mock_emc_driver, self.mock_context)
def test_allow_access_single_ip_nfs(self):
# setup
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'ip', 'access_to': '10.1.1.10'}
share_server = None
fake_export_id = 1
self._mock_isilon_api.lookup_nfs_export.return_value = fake_export_id
self._mock_isilon_api.get_nfs_export.return_value = {'clients': []}
self.assertFalse(self._mock_isilon_api.request.called)
# call method under test
self.storage_connection.allow_access(self.mock_emc_driver,
self.mock_context, share, access,
share_server)
# verify expected REST API call is executed
expected_url = (self.API_URL + '/platform/1/protocols/nfs/exports/' +
str(fake_export_id))
expected_data = {'clients': ['10.1.1.10']}
self._mock_isilon_api.request.assert_called_once_with(
'PUT', expected_url, data=expected_data)
def test_deny_access_ip_nfs(self):
"""Verifies that an IP will be remove from a whitelist."""
fake_export_id = 1
self._mock_isilon_api.lookup_nfs_export.return_value = fake_export_id
# simulate an IP added to the whitelist
ip_addr = '10.0.0.4'
self._mock_isilon_api.get_nfs_export.return_value = {
'clients': [ip_addr]}
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'ip', 'access_to': ip_addr}
share_server = None
# call method under test
self.assertFalse(self._mock_isilon_api.request.called)
self.storage_connection.deny_access(self.mock_emc_driver,
self.mock_context, share, access,
share_server)
# verify that a call is made to remove an existing IP from the list
expected_url = (self.API_URL + '/platform/1/protocols/nfs/exports/' +
str(fake_export_id))
expected_data = {'clients': []}
self._mock_isilon_api.request.assert_called_once_with(
'PUT', expected_url, data=expected_data
)
def test_deny_access_ip_cifs(self):
"""Verifies that an IP will be remove from a whitelist.
Precondition: the IP to be removed exists in the whitelist. Otherwise,
do nothing.
"""
# setup
ip_addr = '10.1.1.10'
share = {'name': self.SHARE_NAME, 'share_proto': 'CIFS'}
self._mock_isilon_api.lookup_smb_share.return_value = {
'host_acl': ['allow:' + ip_addr]}
self.assertFalse(self._mock_isilon_api.request.called)
# call method under test
access = {'access_type': 'ip', 'access_to': ip_addr}
share_server = None
self.storage_connection.deny_access(self.mock_emc_driver,
self.mock_context, share, access,
share_server)
# verify API call is made to remove IP is removed from whitelist
expected_url = (self.API_URL + '/platform/1/protocols/smb/shares/' +
self.SHARE_NAME)
expected_data = {'host_acl': []}
self._mock_isilon_api.request.assert_called_once_with(
'PUT', expected_url, data=expected_data)
def test_deny_access_invalid_access_type(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'foo_access_type', 'access_to': '10.0.0.1'}
# This operation should return silently
self.storage_connection.deny_access(
self.mock_emc_driver, self.mock_context, share, access, None)
def test_deny_access_invalid_share_protocol(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'FOO'}
access = {'access_type': 'ip', 'access_to': '10.0.0.1'}
# This operation should return silently
self.storage_connection.deny_access(
self.mock_emc_driver, self.mock_context, share, access, None)
def test_deny_access_nfs_export_does_not_exist(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'ip', 'access_to': '10.0.0.1'}
self._mock_isilon_api.lookup_nfs_export.return_value = 1
self._mock_isilon_api.get_nfs_export.return_value = None
self.assertRaises(
exception.ShareBackendException,
self.storage_connection.deny_access, self.mock_emc_driver,
self.mock_context, share, access, None
)
def test_deny_access_nfs_share_does_not_exist(self):
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'ip', 'access_to': '10.0.0.1'}
self._mock_isilon_api.lookup_nfs_export.return_value = None
self.assertRaises(
exception.ShareBackendException,
self.storage_connection.deny_access, self.mock_emc_driver,
self.mock_context, share, access, None)
def test_allow_access_multiple_ip_nfs(self):
"""Verifies adding an IP to a whitelist with pre-existing ips.
Verifies that when adding an additional IP to a whitelist which already
contains IPs, the Isilon driver successfully appends the IP to the
whitelist.
"""
# setup
fake_export_id = 42
new_allowed_ip = '10.7.7.8'
self._mock_isilon_api.lookup_nfs_export.return_value = fake_export_id
existing_ips = ['10.0.0.1', '10.1.1.1', '10.0.0.2']
export_json = {'clients': existing_ips}
self._mock_isilon_api.get_nfs_export.return_value = export_json
self.assertFalse(self._mock_isilon_api.request.called)
# call method under test
share = {'name': self.SHARE_NAME, 'share_proto': 'NFS'}
access = {'access_type': 'ip', 'access_to': new_allowed_ip}
share_server = None
self.storage_connection.allow_access(self.mock_emc_driver,
self.mock_context, share,
access,
share_server)
# verify access rule is applied
expected_url = (self.API_URL + '/platform/1/protocols/nfs/exports/' +
str(fake_export_id))
self.assertTrue(self._mock_isilon_api.request.called)
args, kwargs = self._mock_isilon_api.request.call_args
action, url = args
self.assertEqual('PUT', action)
self.assertEqual(expected_url, url)
self.assertEqual(1, len(kwargs))
self.assertTrue('data' in kwargs)
actual_clients = set(kwargs['data']['clients'])
expected_clients = set(existing_ips)
expected_clients.add(new_allowed_ip)
self.assertEqual(expected_clients, actual_clients)
def test_allow_access_multiple_ip_cifs(self):
"""Verifies adding an IP to a whitelist with pre-existing ips.
Verifies that when adding an additional IP to a whitelist which already
contains IPs, the Isilon driver successfully appends the IP to the
whitelist.
"""
# setup
share_name = self.SHARE_NAME
new_allowed_ip = '10.101.1.1'
existing_ips = ['allow:10.0.0.1', 'allow:10.1.1.1', 'allow:10.0.0.2']
share_json = {'name': share_name, 'host_acl': existing_ips}
self._mock_isilon_api.lookup_smb_share.return_value = share_json
self.assertFalse(self._mock_isilon_api.request.called)
# call method under test
share = {'name': share_name, 'share_proto': 'CIFS'}
access = {'access_type': 'ip', 'access_to': new_allowed_ip}
share_server = None
self.storage_connection.allow_access(self.mock_emc_driver,
self.mock_context, share,
access,
share_server)
# verify access rule is applied
expected_url = (self.API_URL + '/platform/1/protocols/smb/shares/' +
share_name)
self.assertTrue(self._mock_isilon_api.request.called)
args, kwargs = self._mock_isilon_api.request.call_args
action, url = args
self.assertEqual('PUT', action)
self.assertEqual(expected_url, url)
self.assertEqual(1, len(kwargs))
self.assertTrue('data' in kwargs)
actual_clients = set(kwargs['data']['host_acl'])
expected_clients = set(existing_ips)
expected_clients.add('allow:' + new_allowed_ip)
self.assertEqual(expected_clients, actual_clients)
def test_allow_access_single_ip_cifs(self):
# setup
share_name = self.SHARE_NAME
share = {'name': share_name, 'share_proto': 'CIFS'}
allow_ip = '10.1.1.10'
access = {'access_type': 'ip', 'access_to': allow_ip}
share_server = None
self._mock_isilon_api.lookup_smb_share.return_value = {
'name': share_name, 'host_acl': []}
self.assertFalse(self._mock_isilon_api.request.called)
# call method under test
self.storage_connection.allow_access(self.mock_emc_driver,
self.mock_context, share, access,
share_server)
# verify access rule is applied
expected_url = (self.API_URL + '/platform/1/protocols/smb/shares/' +
self.SHARE_NAME)
expected_data = {'host_acl': ['allow:' + allow_ip]}
self._mock_isilon_api.request.assert_called_once_with(
'PUT', expected_url, data=expected_data)
def test_allow_access_invalid_access_type(self):
# setup
share_name = self.SHARE_NAME
share = {'name': share_name, 'share_proto': 'NFS'}
allow_ip = '10.1.1.10'
access = {'access_type': 'foo_access_type', 'access_to': allow_ip}
# verify method under test throws the expected exception
self.assertRaises(
exception.ShareBackendException,
self.storage_connection.allow_access, self.mock_emc_driver,
self.mock_context, share, access, None)
def test_allow_access_invalid_share_protocol(self):
# setup
share_name = self.SHARE_NAME
share = {'name': share_name, 'share_proto': 'FOO_PROTOCOL'}
allow_ip = '10.1.1.10'
access = {'access_type': 'ip', 'access_to': allow_ip}
# verify method under test throws the expected exception
self.assertRaises(
exception.InvalidShare, self.storage_connection.allow_access,
self.mock_emc_driver, self.mock_context, share, access, None)
def test_create_share_nfs(self):
share_path = self.SHARE_DIR
self.assertFalse(self._mock_isilon_api.create_directory.called)
self.assertFalse(self._mock_isilon_api.create_nfs_export.called)
# create the share
share = {"name": self.SHARE_NAME, "share_proto": 'NFS'}
location = self.storage_connection.create_share(self.mock_emc_driver,
self.mock_context,
share, None)
# verify location and API call made
expected_location = '%s:%s' % (self.ISILON_ADDR, self.SHARE_DIR)
self.assertEqual(expected_location, location)
self._mock_isilon_api.create_directory.assert_called_with(share_path)
self._mock_isilon_api.create_nfs_export.assert_called_with(share_path)
def test_create_share_cifs(self):
self.assertFalse(self._mock_isilon_api.create_directory.called)
self.assertFalse(self._mock_isilon_api.create_smb_share.called)
# create the share
share = {"name": self.SHARE_NAME, "share_proto": 'CIFS'}
location = self.storage_connection.create_share(self.mock_emc_driver,
self.mock_context,
share, None)
expected_location = '\\\\{0}\\{1}'.format(
self.ISILON_ADDR, self.SHARE_NAME)
self.assertEqual(expected_location, location)
self._mock_isilon_api.create_directory.assert_called_once_with(
self.SHARE_DIR)
self._mock_isilon_api.create_smb_share.assert_called_once_with(
self.SHARE_NAME, self.SHARE_DIR)
def test_create_share_invalid_share_protocol(self):
share = {"name": self.SHARE_NAME, "share_proto": 'FOO_PROTOCOL'}
self.assertRaises(
exception.InvalidShare, self.storage_connection.create_share,
self.mock_emc_driver, self.mock_context, share, share_server=None)
def test_create_share_nfs_backend_failure(self):
share = {"name": self.SHARE_NAME, "share_proto": 'NFS'}
self._mock_isilon_api.create_nfs_export.return_value = False
self.assertRaises(
exception.ShareBackendException,
self.storage_connection.create_share, self.mock_emc_driver,
self.mock_context, share, share_server=None)
def test_create_snapshot(self):
# create snapshot
snapshot_name = "snapshot01"
snapshot_path = '/ifs/home/admin'
snapshot = {'name': snapshot_name, 'share_name': snapshot_path}
self.storage_connection.create_snapshot(self.mock_emc_driver,
self.mock_context, snapshot,
None)
# verify the create snapshot API call is executed
self._mock_isilon_api.create_snapshot.assert_called_with(snapshot_name,
snapshot_path)
def test_create_share_from_snapshot_nfs(self):
# assertions
self.assertFalse(self._mock_isilon_api.create_nfs_export.called)
self.assertFalse(self._mock_isilon_api.clone_snapshot.called)
snapshot_name = "snapshot01"
snapshot_path = '/ifs/home/admin'
# execute method under test
snapshot = {'name': snapshot_name, 'share_name': snapshot_path}
share = {"name": self.SHARE_NAME, "share_proto": 'NFS'}
location = self.storage_connection.create_share_from_snapshot(
self.mock_emc_driver,
self.mock_context,
share, snapshot,
None)
# verify NFS export created at expected location
self._mock_isilon_api.create_nfs_export.assert_called_with(
self.SHARE_DIR)
# verify clone_directory(container_path) method called
self._mock_isilon_api.clone_snapshot.assert_called_once_with(
snapshot_name, self.SHARE_DIR)
expected_location = '{0}:{1}'.format(
self.ISILON_ADDR, self.SHARE_DIR)
self.assertEqual(expected_location, location)
def test_create_share_from_snapshot_cifs(self):
# assertions
self.assertFalse(self._mock_isilon_api.create_smb_share.called)
self.assertFalse(self._mock_isilon_api.clone_snapshot.called)
# setup
snapshot_name = "snapshot01"
snapshot_path = '/ifs/home/admin'
new_share_name = 'clone-dir'
# execute method under test
snapshot = {'name': snapshot_name, 'share_name': snapshot_path}
share = {"name": new_share_name, "share_proto": 'CIFS'}
location = self.storage_connection.create_share_from_snapshot(
self.mock_emc_driver, self.mock_context, share, snapshot,
None)
# verify call made to create new CIFS share
self._mock_isilon_api.create_smb_share.assert_called_once_with(
new_share_name, self.CLONE_DIR)
self._mock_isilon_api.clone_snapshot.assert_called_once_with(
snapshot_name, self.CLONE_DIR)
expected_location = '\\\\{0}\\{1}'.format(self.ISILON_ADDR,
new_share_name)
self.assertEqual(expected_location, location)
def test_delete_share_nfs(self):
share = {"name": self.SHARE_NAME, "share_proto": 'NFS'}
fake_share_num = 42
self._mock_isilon_api.lookup_nfs_export.return_value = fake_share_num
self.assertFalse(self._mock_isilon_api.delete_nfs_share.called)
# delete the share
self.storage_connection.delete_share(self.mock_emc_driver,
self.mock_context, share, None)
# verify share delete
self._mock_isilon_api.delete_nfs_share.assert_called_with(
fake_share_num)
def test_delete_share_cifs(self):
self.assertFalse(self._mock_isilon_api.delete_smb_share.called)
# delete the share
share = {"name": self.SHARE_NAME, "share_proto": 'CIFS'}
self.storage_connection.delete_share(self.mock_emc_driver,
self.mock_context, share, None)
# verify share deleted
self._mock_isilon_api.delete_smb_share.assert_called_with(
self.SHARE_NAME)
def test_delete_share_invalid_share_proto(self):
share = {"name": self.SHARE_NAME, "share_proto": 'FOO_PROTOCOL'}
self.assertRaises(
exception.InvalidShare, self.storage_connection.delete_share,
self.mock_emc_driver, self.mock_context, share, None
)
def test_delete_nfs_share_backend_failure(self):
share = {"name": self.SHARE_NAME, "share_proto": 'NFS'}
self._mock_isilon_api.delete_nfs_share.return_value = False
self.assertRaises(
exception.ShareBackendException,
self.storage_connection.delete_share,
self.mock_emc_driver, self.mock_context, share, None
)
def test_delete_nfs_share_share_does_not_exist(self):
self._mock_isilon_api.lookup_nfs_export.return_value = None
share = {"name": self.SHARE_NAME, "share_proto": 'NFS'}
# verify the calling delete on a non-existent share returns and does
# not throw exception
self.storage_connection.delete_share(
self.mock_emc_driver, self.mock_context, share, None)
def test_delete_cifs_share_backend_failure(self):
share = {"name": self.SHARE_NAME, "share_proto": 'CIFS'}
self._mock_isilon_api.delete_smb_share.return_value = False
self.assertRaises(
exception.ShareBackendException,
self.storage_connection.delete_share,
self.mock_emc_driver, self.mock_context, share, None
)
def test_delete_cifs_share_share_does_not_exist(self):
share = {"name": self.SHARE_NAME, "share_proto": 'CIFS'}
self._mock_isilon_api.lookup_smb_share.return_value = None
# verify the calling delete on a non-existent share returns and does
# not throw exception
self.storage_connection.delete_share(
self.mock_emc_driver, self.mock_context, share, None)
def test_delete_snapshot(self):
# create a snapshot
snapshot_name = "snapshot01"
snapshot_path = '/ifs/home/admin'
snapshot = {'name': snapshot_name, 'share_name': snapshot_path}
self.assertFalse(self._mock_isilon_api.delete_snapshot.called)
# delete the created snapshot
self.storage_connection.delete_snapshot(self.mock_emc_driver,
self.mock_context, snapshot,
None)
# verify the API call was made to delete the snapshot
self._mock_isilon_api.delete_snapshot.assert_called_once_with(
snapshot_name)
def test_ensure_share(self):
share = {"name": self.SHARE_NAME, "share_proto": 'CIFS'}
self.storage_connection.ensure_share(self.mock_emc_driver,
self.mock_context, share, None)
@mock.patch(
'manila.share.drivers.emc.plugins.isilon.isilon.isilon_api.IsilonApi',
autospec=True)
def test_connect(self, mock_isi_api):
storage_connection = isilon.IsilonStorageConnection(LOG)
# execute method under test
storage_connection.connect(
self.mock_emc_driver, self.mock_context)
# verify connect sets driver params appropriately
mock_config = self.MockConfig()
server_addr = mock_config.safe_get('emc_nas_server')
self.assertEqual(server_addr, storage_connection._server)
expected_port = int(mock_config.safe_get('emc_nas_server_port'))
self.assertEqual(expected_port, storage_connection._port)
self.assertEqual('https://{0}:{1}'.format(server_addr, expected_port),
storage_connection._server_url)
expected_username = mock_config.safe_get('emc_nas_login')
self.assertEqual(expected_username, storage_connection._username)
expected_password = mock_config.safe_get('emc_nas_password')
self.assertEqual(expected_password, storage_connection._password)
self.assertFalse(storage_connection._verify_ssl_cert)
@mock.patch(
'manila.share.drivers.emc.plugins.isilon.isilon.isilon_api.IsilonApi',
autospec=True)
def test_connect_root_dir_does_not_exist(self, mock_isi_api):
mock_isilon_api = mock_isi_api.return_value
mock_isilon_api.is_path_existent.return_value = False
storage_connection = isilon.IsilonStorageConnection(LOG)
# call method under test
storage_connection.connect(self.mock_emc_driver, self.mock_context)
mock_isilon_api.create_directory.assert_called_once_with(
self.ROOT_DIR, recursive=True)
def test_update_share_stats(self):
stats_dict = {}
self.storage_connection.update_share_stats(stats_dict)
expected_version = isilon.VERSION
self.assertEqual({'driver_version': expected_version}, stats_dict)
def test_get_network_allocations_number(self):
# call method under test
num = self.storage_connection.get_network_allocations_number()
self.assertEqual(0, num)

View File

@ -0,0 +1,512 @@
# Copyright (c) 2015 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ddt
from oslo_serialization import jsonutils as json
import requests
import requests_mock
import six
from manila.share.drivers.emc.plugins.isilon import isilon_api
from manila import test
@ddt.ddt
class IsilonApiTest(test.TestCase):
def setUp(self):
super(IsilonApiTest, self).setUp()
self._mock_url = 'https://localhost:8080'
_mock_auth = ('admin', 'admin')
self.isilon_api = isilon_api.IsilonApi(
self._mock_url, _mock_auth
)
@ddt.data(False, True)
def test_create_directory(self, is_recursive):
with requests_mock.Mocker() as m:
path = '/ifs/test'
self.assertEqual(0, len(m.request_history))
self._add_create_directory_response(m, path, is_recursive)
r = self.isilon_api.create_directory(path,
recursive=is_recursive)
self.assertTrue(r)
self.assertEqual(1, len(m.request_history))
request = m.request_history[0]
self._verify_dir_creation_request(request, path, is_recursive)
@requests_mock.mock()
def test_clone_snapshot(self, m):
snapshot_name = 'snapshot01'
fq_target_dir = '/ifs/admin/target'
self.assertEqual(0, len(m.request_history))
self._add_create_directory_response(m, fq_target_dir, False)
snapshots_json = (
'{"snapshots": '
'[{"name": "snapshot01", "path": "/ifs/admin/source"}]'
'}'
)
self._add_get_snapshot_response(m, snapshot_name, snapshots_json)
# In order to test cloning a snapshot, we build out a mock
# source directory tree. After the method under test is called we
# will verify the the necessary calls are made to clone a snapshot.
source_dir_listing_json = (
'{"children": ['
'{"name": "dir1", "type": "container"},'
'{"name": "dir2", "type": "container"},'
'{"name": "file1", "type": "object"},'
'{"name": "file2", "type": "object"}'
']}'
)
self._add_get_directory_listing_response(
m, '/ifs/.snapshot/{0}/admin/source'.format(snapshot_name),
source_dir_listing_json)
# Add request responses for creating directories and cloning files
# to the destination tree
self._add_file_clone_response(m, '/ifs/admin/target/file1',
snapshot_name)
self._add_file_clone_response(m, '/ifs/admin/target/file2',
snapshot_name)
self._add_create_directory_response(m, fq_target_dir + '/dir1', False)
self._add_get_directory_listing_response(
m, '/ifs/.snapshot/{0}/admin/source/dir1'.format(snapshot_name),
'{"children": ['
'{"name": "file11", "type": "object"}, '
'{"name": "file12", "type": "object"}'
']}')
self._add_file_clone_response(m, '/ifs/admin/target/dir1/file11',
snapshot_name)
self._add_file_clone_response(m, '/ifs/admin/target/dir1/file12',
snapshot_name)
self._add_create_directory_response(m, fq_target_dir + '/dir2', False)
self._add_get_directory_listing_response(
m, '/ifs/.snapshot/{0}/admin/source/dir2'.format(snapshot_name),
'{"children": ['
'{"name": "file21", "type": "object"}, '
'{"name": "file22", "type": "object"}'
']}')
self._add_file_clone_response(m, '/ifs/admin/target/dir2/file21',
snapshot_name)
self._add_file_clone_response(m, '/ifs/admin/target/dir2/file22',
snapshot_name)
# Call method under test
self.isilon_api.clone_snapshot(snapshot_name, fq_target_dir)
# Verify calls needed to clone the source snapshot to the target dir
expected_calls = []
clone_path_list = [
'file1', 'file2', 'dir1/file11', 'dir1/file12',
'dir2/file21', 'dir2/file22']
for path in clone_path_list:
expected_call = IsilonApiTest.ExpectedCall(
IsilonApiTest.ExpectedCall.FILE_CLONE,
self._mock_url + '/namespace/ifs/admin/target/' + path,
['/ifs/admin/target/' + path, '/ifs/admin/source/' + path,
snapshot_name])
expected_calls.append(expected_call)
dir_path_list = [
('/dir1?recursive', '/dir1'),
('/dir2?recursive', '/dir2'),
('?recursive=', '')]
for url, path in dir_path_list:
expected_call = IsilonApiTest.ExpectedCall(
IsilonApiTest.ExpectedCall.DIR_CREATION,
self._mock_url + '/namespace/ifs/admin/target' + url,
['/ifs/admin/target' + path, False])
expected_calls.append(expected_call)
self._verify_clone_snapshot_calls(expected_calls, m.request_history)
class ExpectedCall(object):
DIR_CREATION = 'dir_creation'
FILE_CLONE = 'file_clone'
def __init__(self, request_type, match_url, verify_args):
self.request_type = request_type
self.match_url = match_url
self.verify_args = verify_args
def _verify_clone_snapshot_calls(self, expected_calls, response_calls):
actual_calls = []
for call in response_calls:
actual_calls.append(call)
for expected_call in expected_calls:
# Match the expected call to the actual call, then verify
match_found = False
for call in actual_calls:
if call.url.startswith(expected_call.match_url):
match_found = True
if expected_call.request_type is 'dir_creation':
self._verify_dir_creation_request(
call, *expected_call.verify_args)
elif expected_call.request_type is 'file_clone':
pass
else:
self.fail('Invalid request type')
actual_calls.remove(call)
self.assertTrue(match_found)
@requests_mock.mock()
def test_get_directory_listing(self, m):
self.assertEqual(0, len(m.request_history))
fq_dir_path = 'ifs/admin/test'
json_str = '{"my_json": "test123"}'
self._add_get_directory_listing_response(m, fq_dir_path, json_str)
actual_json = self.isilon_api.get_directory_listing(fq_dir_path)
self.assertEqual(1, len(m.request_history))
self.assertEqual(json.loads(json_str), actual_json)
@ddt.data((200, True), (404, False))
def test_is_path_existent(self, data):
status_code, expected_return_value = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
path = '/ifs/home/admin'
m.head('{0}/namespace{1}'.format(self._mock_url, path),
status_code=status_code)
r = self.isilon_api.is_path_existent(path)
self.assertEqual(expected_return_value, r)
self.assertEqual(1, len(m.request_history))
@requests_mock.mock()
def test_is_path_existent_unexpected_error(self, m):
path = '/ifs/home/admin'
m.head('{0}/namespace{1}'.format(self._mock_url, path),
status_code=400)
self.assertRaises(
requests.exceptions.HTTPError, self.isilon_api.is_path_existent,
'/ifs/home/admin')
@ddt.data(
(200, '{"snapshots": [{"path": "/ifs/home/test"}]}',
{'path': '/ifs/home/test'}),
(404, '{"errors": []}', None)
)
def test_get_snapshot(self, data):
status_code, json_body, expected_return_value = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
snapshot_name = 'foo1'
self._add_get_snapshot_response(m, snapshot_name, json_body,
status=status_code)
r = self.isilon_api.get_snapshot(snapshot_name)
self.assertEqual(1, len(m.request_history))
self.assertEqual(expected_return_value, r)
@requests_mock.mock()
def test_get_snapshot_unexpected_error(self, m):
snapshot_name = 'foo1'
json_body = '{"snapshots": [{"path": "/ifs/home/test"}]}'
self._add_get_snapshot_response(
m, snapshot_name, json_body, status=400)
self.assertRaises(
requests.exceptions.HTTPError, self.isilon_api.get_snapshot,
snapshot_name)
@requests_mock.mock()
def test_get_snapshots(self, m):
self.assertEqual(0, len(m.request_history))
snapshot_json = '{"snapshots": [{"path": "/ifs/home/test"}]}'
m.get('{0}/platform/1/snapshot/snapshots'.format(self._mock_url),
status_code=200, json=json.loads(snapshot_json))
r = self.isilon_api.get_snapshots()
self.assertEqual(1, len(m.request_history))
self.assertEqual(json.loads(snapshot_json), r)
@requests_mock.mock()
def test_get_snapshots_error_occurred(self, m):
self.assertEqual(0, len(m.request_history))
m.get('{0}/platform/1/snapshot/snapshots'.format(self._mock_url),
status_code=404)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.get_snapshots)
self.assertEqual(1, len(m.request_history))
@ddt.data(
('/ifs/home/admin',
'{"exports": [{"id": 42, "paths": ["/ifs/home/admin"]}]}', 42),
('/ifs/home/test',
'{"exports": [{"id": 42, "paths": ["/ifs/home/admin"]}]}', None)
)
def test_lookup_nfs_export(self, data):
share_path, response_json, expected_return = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
m.get('{0}/platform/1/protocols/nfs/exports'
.format(self._mock_url), json=json.loads(response_json))
r = self.isilon_api.lookup_nfs_export(share_path)
self.assertEqual(1, len(m.request_history))
self.assertEqual(expected_return, r)
@requests_mock.mock()
def test_get_nfs_export(self, m):
self.assertEqual(0, len(m.request_history))
export_id = 42
response_json = '{"exports": [{"id": 1}]}'
status_code = 200
m.get('{0}/platform/1/protocols/nfs/exports/{1}'
.format(self._mock_url, export_id),
json=json.loads(response_json), status_code=status_code)
r = self.isilon_api.get_nfs_export(export_id)
self.assertEqual(1, len(m.request_history))
self.assertEqual(json.loads('{"id": 1}'), r)
@requests_mock.mock()
def test_get_nfs_export_error(self, m):
self.assertEqual(0, len(m.request_history))
export_id = 3
response_json = '{}'
status_code = 404
m.get('{0}/platform/1/protocols/nfs/exports/{1}'
.format(self._mock_url, export_id),
json=json.loads(response_json), status_code=status_code)
r = self.isilon_api.get_nfs_export(export_id)
self.assertEqual(1, len(m.request_history))
self.assertEqual(None, r)
@requests_mock.mock()
def test_lookup_smb_share(self, m):
self.assertEqual(0, len(m.request_history))
share_name = 'my_smb_share'
share_json = '{"id": "my_smb_share"}'
response_json = '{{"shares": [{0}]}}'.format(share_json)
m.get('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._mock_url, share_name), status_code=200,
json=json.loads(response_json))
r = self.isilon_api.lookup_smb_share(share_name)
self.assertEqual(1, len(m.request_history))
self.assertEqual(json.loads(share_json), r)
@requests_mock.mock()
def test_lookup_smb_share_error(self, m):
self.assertEqual(0, len(m.request_history))
share_name = 'my_smb_share'
m.get('{0}/platform/1/protocols/smb/shares/{1}'.format(
self._mock_url, share_name), status_code=404)
r = self.isilon_api.lookup_smb_share(share_name)
self.assertEqual(1, len(m.request_history))
self.assertEqual(None, r)
@ddt.data((201, True), (404, False))
def test_create_nfs_export(self, data):
status_code, expected_return_value = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
export_path = '/ifs/home/test'
m.post(self._mock_url + '/platform/1/protocols/nfs/exports',
status_code=status_code)
r = self.isilon_api.create_nfs_export(export_path)
self.assertEqual(1, len(m.request_history))
call = m.request_history[0]
expected_request_body = '{"paths": ["/ifs/home/test"]}'
self.assertEqual(json.loads(expected_request_body),
json.loads(call.body))
self.assertEqual(expected_return_value, r)
@ddt.data((201, True), (404, False))
def test_create_smb_share(self, data):
status_code, expected_return_value = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
share_name = 'my_smb_share'
share_path = '/ifs/home/admin/smb_share'
m.post(self._mock_url + '/platform/1/protocols/smb/shares',
status_code=status_code)
r = self.isilon_api.create_smb_share(share_name, share_path)
self.assertEqual(expected_return_value, r)
self.assertEqual(1, len(m.request_history))
expected_request_data = json.loads(
'{{"name": "{0}", "path": "{1}"}}'.format(
share_name, share_path)
)
self.assertEqual(expected_request_data,
json.loads(m.request_history[0].body))
@requests_mock.mock()
def test_create_snapshot(self, m):
self.assertEqual(0, len(m.request_history))
snapshot_name = 'my_snapshot_01'
snapshot_path = '/ifs/home/admin'
m.post(self._mock_url + '/platform/1/snapshot/snapshots',
status_code=201)
r = self.isilon_api.create_snapshot(snapshot_name, snapshot_path)
self.assertEqual(1, len(m.request_history))
self.assertEqual(True, r)
expected_request_body = json.loads(
'{{"name": "{0}", "path": "{1}"}}'
.format(snapshot_name, snapshot_path)
)
self.assertEqual(expected_request_body,
json.loads(m.request_history[0].body))
@requests_mock.mock()
def test_create_snapshot_error_case(self, m):
self.assertEqual(0, len(m.request_history))
snapshot_name = 'my_snapshot_01'
snapshot_path = '/ifs/home/admin'
m.post(self._mock_url + '/platform/1/snapshot/snapshots',
status_code=404)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.create_snapshot,
snapshot_name, snapshot_path)
self.assertEqual(1, len(m.request_history))
@ddt.data(True, False)
def test_delete(self, is_recursive_delete):
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
fq_path = '/ifs/home/admin/test'
m.delete(self._mock_url + '/namespace' + fq_path + '?recursive='
+ six.text_type(is_recursive_delete), status_code=204)
self.isilon_api.delete(fq_path, recursive=is_recursive_delete)
self.assertEqual(1, len(m.request_history))
@requests_mock.mock()
def test_delete_error_case(self, m):
fq_path = '/ifs/home/admin/test'
m.delete(self._mock_url + '/namespace' + fq_path + '?recursive=False',
status_code=403)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.delete, fq_path, recursive=False)
@ddt.data((204, True), (404, False))
def test_delete_nfs_share(self, data):
status_code, expected_return_value = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
share_number = 42
m.delete('{0}/platform/1/protocols/nfs/exports/{1}'
.format(self._mock_url, share_number),
status_code=status_code)
r = self.isilon_api.delete_nfs_share(share_number)
self.assertEqual(1, len(m.request_history))
self.assertEqual(r, expected_return_value)
@ddt.data((204, True), (404, False))
def test_delete_smb_shares(self, data):
status_code, expected_return_value = data
with requests_mock.mock() as m:
self.assertEqual(0, len(m.request_history))
share_name = 'smb_share_42'
m.delete('{0}/platform/1/protocols/smb/shares/{1}'
.format(self._mock_url, share_name),
status_code=status_code)
r = self.isilon_api.delete_smb_share(share_name)
self.assertEqual(1, len(m.request_history))
self.assertEqual(r, expected_return_value)
@requests_mock.mock()
def test_delete_snapshot(self, m):
self.assertEqual(0, len(m.request_history))
m.delete(self._mock_url + '/platform/1/snapshot/snapshots/my_snapshot',
status_code=204)
self.isilon_api.delete_snapshot("my_snapshot")
self.assertEqual(1, len(m.request_history))
@requests_mock.mock()
def test_delete_snapshot_error_case(self, m):
m.delete(self._mock_url + '/platform/1/snapshot/snapshots/my_snapshot',
status_code=403)
self.assertRaises(requests.exceptions.HTTPError,
self.isilon_api.delete_snapshot, "my_snapshot")
def _add_create_directory_response(self, m, path, is_recursive):
url = '{0}/namespace{1}?recursive={2}'.format(
self._mock_url, path, six.text_type(is_recursive))
m.put(url, status_code=200)
def _add_file_clone_response(self, m, fq_dest_path, snapshot_name):
url = '{0}/namespace{1}?clone=true&snapshot={2}'.format(
self._mock_url, fq_dest_path, snapshot_name)
m.put(url)
def _add_get_directory_listing_response(self, m, fq_dir_path, json_str):
url = '{0}/namespace{1}?detail=default'.format(
self._mock_url, fq_dir_path)
m.get(url, json=json.loads(json_str), status_code=200)
def _add_get_snapshot_response(
self, m, snapshot_name, json_str, status=200):
url = '{0}/platform/1/snapshot/snapshots/{1}'.format(
self._mock_url, snapshot_name
)
m.get(url, status_code=status, json=json.loads(json_str))
def _verify_dir_creation_request(self, request, path, is_recursive):
self.assertEqual('PUT', request.method)
expected_url = '{0}/namespace{1}?recursive={2}'.format(
self._mock_url, path, six.text_type(is_recursive))
self.assertEqual(expected_url, request.url)
self.assertTrue("x-isi-ifs-target-type" in request.headers)
self.assertEqual("container",
request.headers['x-isi-ifs-target-type'])
def _verify_clone_file_from_snapshot(
self, request, fq_file_path, fq_dest_path, snapshot_name):
self.assertEqual('PUT', request.method)
expected_url = '{0}/namespace{1}?clone=true&snapshot={2}'.format(
self._mock_url, fq_dest_path, snapshot_name
)
self.assertEqual(expected_url, request.request.url)
self.assertTrue("x-isi-ifs-copy-source" in request.headers)
self.assertEqual('/namespace' + fq_file_path,
request.headers['x-isi-ifs-copy-source'])

View File

@ -28,6 +28,7 @@ Paste
PasteDeploy>=1.5.0
python-neutronclient>=2.3.11,<3
keystonemiddleware>=1.0.0
requests>=2.2.0,!=2.4.0
Routes>=1.12.3,!=2.0
six>=1.9.0
SQLAlchemy>=0.9.7,<=0.9.99

View File

@ -53,6 +53,7 @@ oslo.config.opts =
manila = manila.opts:list_opts
manila.share.drivers.emc.plugins =
vnx = manila.share.drivers.emc.plugins.vnx.connection:VNXStorageConnection
isilon = manila.share.drivers.emc.plugins.isilon.isilon:IsilonStorageConnection
[build_sphinx]
all_files = 1

View File

@ -15,6 +15,7 @@ oslotest>=1.2.0 # Apache-2.0
oslosphinx>=2.2.0 # Apache-2.0
psycopg2
python-subunit>=0.0.18
requests-mock>=0.5.1
sphinx>=1.1.2,!=1.2.0,!=1.3b1,<1.3
testrepository>=0.0.18
testtools>=0.9.36,!=1.2.0