NetApp cDOT driver uses deprecated APIs for NFS exports

The nfs-exportfs-* APIs that the NetApp cDOT driver currently uses
for NFS export management are 7-mode APIs which were left in for
backwards compatibility. It's not the recommended/supported way
to do NFS export management. We need to modify the driver to use
the supported set of export-policy-* and export-rule-* APIs.

Closes-Bug: #1370761
Closes-Bug: #1437509
Change-Id: I91347d27ba69d4a20fe73ce7e75f58c9818ea6d8
This commit is contained in:
Clinton Knight 2015-03-23 17:36:04 -04:00
parent 52750a0824
commit 9b6771a580
17 changed files with 1035 additions and 464 deletions

View File

@ -32,6 +32,7 @@ from manila.i18n import _
LOG = log.getLogger(__name__)
EONTAPI_EINVAL = '22'
EAPIERROR = '13001'
EAPINOTFOUND = '13005'
EVOLUMEOFFLINE = '13042'
EINTERNALERROR = '13114'

View File

@ -30,6 +30,7 @@ from manila.share.drivers.netapp import utils as na_utils
LOG = log.getLogger(__name__)
DELETED_PREFIX = 'deleted_manila_'
class NetAppCmodeClient(client_base.NetAppBaseClient):
@ -44,13 +45,6 @@ class NetAppCmodeClient(client_base.NetAppBaseClient):
(major, minor) = self.get_ontapi_version(cached=False)
self.connection.set_api_version(major, minor)
# NOTE(vponomaryov): Different versions of Data ONTAP API has
# different behavior for API call "nfs-exportfs-append-rules-2", that
# can require prefix "/vol" or not for path to apply rules for.
# Following attr used by "add_rules" method to handle setting up nfs
# exports properly in long term.
self.nfs_exports_with_prefix = False
def _invoke_vserver_api(self, na_element, vserver):
server = copy.copy(self.connection)
server.set_vserver(vserver)
@ -641,7 +635,7 @@ class NetAppCmodeClient(client_base.NetAppBaseClient):
'security-flavor': 'any',
},
'rw-rule': {
'security-flavor': 'any',
'security-flavor': 'never',
},
}
self.send_request('export-rule-create', api_args)
@ -1002,143 +996,236 @@ class NetAppCmodeClient(client_base.NetAppBaseClient):
self.send_request('cifs-share-delete', {'share-name': share_name})
@na_utils.trace
def add_nfs_export_rules(self, export_path, rules):
def add_nfs_export_rule(self, policy_name, rule, readonly):
rule_indices = self._get_nfs_export_rule_indices(policy_name, rule)
if not rule_indices:
self._add_nfs_export_rule(policy_name, rule, readonly)
else:
# Update first rule and delete the rest
self._update_nfs_export_rule(
policy_name, rule, readonly, rule_indices.pop(0))
self._remove_nfs_export_rules(policy_name, rule_indices)
# This method builds up a complicated structure needed by the
# nfs-exportfs-append-rules-2 ZAPI. Here is how the end result
# should appear:
#
# {
# 'rules': {
# 'exports-rule-info-2': {
# 'pathname': <share pathname>,
# 'security-rules': {
# 'security-rule-info': {
# 'read-write': [
# {
# 'exports-hostname-info': {
# 'name': <ip address 1>,
# }
# },
# {
# 'exports-hostname-info': {
# 'name': <ip address 2>,
# }
# }
# ],
# 'root': [
# {
# 'exports-hostname-info': {
# 'name': <ip address 1>,
# }
# },
# {
# 'exports-hostname-info': {
# 'name': <ip address 2>,
# }
# }
# ]
# }
# }
# }
# }
# }
@na_utils.trace
def _add_nfs_export_rule(self, policy_name, rule, readonly):
api_args = {
'policy-name': policy_name,
'client-match': rule,
'ro-rule': {
'security-flavor': 'sys',
},
'rw-rule': {
'security-flavor': 'sys' if not readonly else 'never',
},
'super-user-security': {
'security-flavor': 'sys',
},
}
self.send_request('export-rule-create', api_args)
# Default API request, some of which is overwritten below.
request = {
'rules': {
'exports-rule-info-2': {
'pathname': export_path,
'security-rules': {},
@na_utils.trace
def _update_nfs_export_rule(self, policy_name, rule, readonly, rule_index):
api_args = {
'policy-name': policy_name,
'rule-index': rule_index,
'client-match': rule,
'ro-rule': {
'security-flavor': 'sys'
},
'rw-rule': {
'security-flavor': 'sys' if not readonly else 'never'
},
'super-user-security': {
'security-flavor': 'sys'
},
}
self.send_request('export-rule-modify', api_args)
@na_utils.trace
def _get_nfs_export_rule_indices(self, policy_name, rule):
api_args = {
'query': {
'export-rule-info': {
'policy-name': policy_name,
'client-match': rule,
},
},
'desired-attributes': {
'export-rule-info': {
'vserver-name': None,
'policy-name': None,
'client-match': None,
'rule-index': None,
},
},
}
result = self.send_request('export-rule-get-iter', api_args)
allowed_hosts_xml = []
for ip in rules:
allowed_hosts_xml.append({'exports-hostname-info': {'name': ip}})
attributes_list = result.get_child_by_name(
'attributes-list') or netapp_api.NaElement('none')
export_rule_info_list = attributes_list.get_children()
# Build security rules to be grafted into request.
security_rule = {
'security-rule-info': {
'read-write': allowed_hosts_xml,
'root': allowed_hosts_xml,
rule_indices = [int(export_rule_info.get_child_content('rule-index'))
for export_rule_info in export_rule_info_list]
rule_indices.sort()
return [six.text_type(rule_index) for rule_index in rule_indices]
@na_utils.trace
def remove_nfs_export_rule(self, policy_name, rule):
rule_indices = self._get_nfs_export_rule_indices(policy_name, rule)
self._remove_nfs_export_rules(policy_name, rule_indices)
@na_utils.trace
def _remove_nfs_export_rules(self, policy_name, rule_indices):
for rule_index in rule_indices:
api_args = {
'policy-name': policy_name,
'rule-index': rule_index
}
try:
self.send_request('export-rule-destroy', api_args)
except netapp_api.NaApiError as e:
if e.code != netapp_api.EOBJECTNOTFOUND:
raise
@na_utils.trace
def clear_nfs_export_policy_for_volume(self, volume_name):
self.set_nfs_export_policy_for_volume(volume_name, 'default')
@na_utils.trace
def set_nfs_export_policy_for_volume(self, volume_name, policy_name):
api_args = {
'query': {
'volume-attributes': {
'volume-id-attributes': {
'name': volume_name,
},
},
},
'attributes': {
'volume-attributes': {
'volume-export-attributes': {
'policy': policy_name,
},
},
},
}
self.send_request('volume-modify-iter', api_args)
# Insert security rules section into request.
request['rules']['exports-rule-info-2']['security-rules'] = (
security_rule)
@na_utils.trace
def get_nfs_export_policy_for_volume(self, volume_name):
"""Get the name of the export policy for a volume."""
# Make a second copy of the request with /vol prefix on export path.
request_with_prefix = copy.deepcopy(request)
request_with_prefix['rules']['exports-rule-info-2']['pathname'] = (
'/vol' + export_path)
api_args = {
'query': {
'volume-attributes': {
'volume-id-attributes': {
'name': volume_name,
},
},
},
'desired-attributes': {
'volume-attributes': {
'volume-export-attributes': {
'policy': None,
},
},
},
}
result = self.send_request('volume-get-iter', api_args)
LOG.debug('Appending NFS rules %r', rules)
attributes_list = result.get_child_by_name(
'attributes-list') or netapp_api.NaElement('none')
volume_attributes = attributes_list.get_child_by_name(
'volume-attributes') or netapp_api.NaElement('none')
volume_export_attributes = volume_attributes.get_child_by_name(
'volume-export-attributes') or netapp_api.NaElement('none')
export_policy = volume_export_attributes.get_child_content('policy')
if not export_policy:
msg = _('Could not find export policy for volume %s.')
raise exception.NetAppException(msg % volume_name)
return export_policy
@na_utils.trace
def create_nfs_export_policy(self, policy_name):
api_args = {'policy-name': policy_name}
try:
if self.nfs_exports_with_prefix:
self.send_request('nfs-exportfs-append-rules-2',
request_with_prefix)
else:
self.send_request('nfs-exportfs-append-rules-2', request)
self.send_request('export-policy-create', api_args)
except netapp_api.NaApiError as e:
if e.code == netapp_api.EINTERNALERROR:
# We expect getting here only in one case - when received first
# call of this method per backend, that is not covered by
# default value. Change its value, to send proper requests from
# first time.
self.nfs_exports_with_prefix = not self.nfs_exports_with_prefix
LOG.warning(_LW("Data ONTAP API 'nfs-exportfs-append-rules-2' "
"compatibility action: remember behavior to "
"send proper values with first attempt next "
"time. Now trying send another request with "
"changed value for 'pathname'."))
if self.nfs_exports_with_prefix:
self.send_request('nfs-exportfs-append-rules-2',
request_with_prefix)
else:
self.send_request('nfs-exportfs-append-rules-2', request)
else:
if e.code != netapp_api.EDUPLICATEENTRY:
raise
@na_utils.trace
def get_nfs_export_rules(self, export_path):
"""Returns available access rules for a given NFS share."""
api_args = {'pathname': export_path}
response = self.send_request('nfs-exportfs-list-rules-2', api_args)
rules = response.get_child_by_name('rules')
allowed_hosts = []
if rules and rules.get_child_by_name('exports-rule-info-2'):
security_rule = rules.get_child_by_name(
'exports-rule-info-2').get_child_by_name('security-rules')
security_info = security_rule.get_child_by_name(
'security-rule-info')
if security_info:
root_rules = security_info.get_child_by_name('root')
if root_rules:
allowed_hosts = root_rules.get_children()
existing_rules = []
for allowed_host in allowed_hosts:
if 'exports-hostname-info' in allowed_host.get_name():
existing_rules.append(allowed_host.get_child_content('name'))
return existing_rules
def soft_delete_nfs_export_policy(self, policy_name):
try:
self.delete_nfs_export_policy(policy_name)
except netapp_api.NaApiError:
# NOTE(cknight): Policy deletion can fail if called too soon after
# removing from a flexvol. So rename for later harvesting.
self.rename_nfs_export_policy(policy_name,
DELETED_PREFIX + policy_name)
@na_utils.trace
def remove_nfs_export_rules(self, export_path):
def delete_nfs_export_policy(self, policy_name):
api_args = {'policy-name': policy_name}
try:
self.send_request('export-policy-destroy', api_args)
except netapp_api.NaApiError as e:
if e.code == netapp_api.EOBJECTNOTFOUND:
return
raise
@na_utils.trace
def rename_nfs_export_policy(self, policy_name, new_policy_name):
api_args = {
'pathnames': {
'pathname-info': {
'name': export_path,
'policy-name': policy_name,
'new-policy-name': new_policy_name
}
self.send_request('export-policy-rename', api_args)
@na_utils.trace
def prune_deleted_nfs_export_policies(self):
deleted_policy_map = self._get_deleted_nfs_export_policies()
for vserver in deleted_policy_map:
client = copy.deepcopy(self)
client.set_vserver(vserver)
for policy in deleted_policy_map[vserver]:
try:
client.delete_nfs_export_policy(policy)
except netapp_api.NaApiError:
LOG.debug('Could not delete export policy %s.' % policy)
@na_utils.trace
def _get_deleted_nfs_export_policies(self):
api_args = {
'query': {
'export-policy-info': {
'policy-name': DELETED_PREFIX + '*',
},
},
'desired-attributes': {
'export-policy-info': {
'policy-name': None,
'vserver': None,
},
},
}
self.send_request('nfs-exportfs-delete-rules', api_args)
result = self.send_request('export-policy-get-iter', api_args)
attributes_list = result.get_child_by_name(
'attributes-list') or netapp_api.NaElement('none')
policy_map = {}
for export_info in attributes_list.get_children():
vserver = export_info.get_child_content('vserver')
policies = policy_map.get(vserver, [])
policies.append(export_info.get_child_content('policy-name'))
policy_map[vserver] = policies
return policy_map
@na_utils.trace
def _get_ems_log_destination_vserver(self):

View File

@ -45,6 +45,7 @@ class NetAppCmodeFileStorageLibrary(object):
AUTOSUPPORT_INTERVAL_SECONDS = 3600 # hourly
SSC_UPDATE_INTERVAL_SECONDS = 3600 # hourly
HOUSEKEEPING_INTERVAL_SECONDS = 600 # ten minutes
# Maps NetApp qualified extra specs keys to corresponding backend API
# client library argument keywords. When we expose more backend
@ -160,6 +161,13 @@ class NetAppCmodeFileStorageLibrary(object):
ems_periodic_task.start(interval=self.AUTOSUPPORT_INTERVAL_SECONDS,
initial_delay=0)
# Start the task that runs other housekeeping tasks, such as deletion
# of previously soft-deleted storage artifacts.
housekeeping_periodic_task = loopingcall.FixedIntervalLoopingCall(
self._handle_housekeeping_tasks)
housekeeping_periodic_task.start(
interval=self.HOUSEKEEPING_INTERVAL_SECONDS, initial_delay=0)
@na_utils.trace
def _get_valid_share_name(self, share_id):
"""Get share name according to share name template."""
@ -246,6 +254,10 @@ class NetAppCmodeFileStorageLibrary(object):
}
return ems_log
@na_utils.trace
def _handle_housekeeping_tasks(self):
"""Handle various cleanup activities."""
def _find_matching_aggregates(self):
"""Find all aggregates match pattern."""
raise NotImplementedError()
@ -491,7 +503,8 @@ class NetAppCmodeFileStorageLibrary(object):
raise exception.NetAppException(msg % msg_args)
export_addresses = [interface['address'] for interface in interfaces]
export_locations = helper.create_share(share_name, export_addresses)
export_locations = helper.create_share(
share, share_name, export_addresses)
return export_locations
@na_utils.trace
@ -499,10 +512,11 @@ class NetAppCmodeFileStorageLibrary(object):
"""Deletes NAS storage."""
helper = self._get_helper(share)
helper.set_client(vserver_client)
share_name = self._get_valid_share_name(share['id'])
target = helper.get_target(share)
# Share may be in error state, so there's no share and target.
if target:
helper.delete_share(share)
helper.delete_share(share, share_name)
@na_utils.trace
def create_snapshot(self, context, snapshot, share_server=None):
@ -562,17 +576,19 @@ class NetAppCmodeFileStorageLibrary(object):
def allow_access(self, context, share, access, share_server=None):
"""Allows access to a given NAS storage."""
vserver, vserver_client = self._get_vserver(share_server=share_server)
share_name = self._get_valid_share_name(share['id'])
helper = self._get_helper(share)
helper.set_client(vserver_client)
helper.allow_access(context, share, access)
helper.allow_access(context, share, share_name, access)
@na_utils.trace
def deny_access(self, context, share, access, share_server=None):
"""Denies access to a given NAS storage."""
vserver, vserver_client = self._get_vserver(share_server=share_server)
share_name = self._get_valid_share_name(share['id'])
helper = self._get_helper(share)
helper.set_client(vserver_client)
helper.deny_access(context, share, access)
helper.deny_access(context, share, share_name, access)
def setup_server(self, network_info, metadata=None):
raise NotImplementedError()

View File

@ -97,6 +97,14 @@ class NetAppCmodeMultiSVMFileStorageLibrary(
vserver_client = self._get_api_client(vserver)
return vserver, vserver_client
@na_utils.trace
def _handle_housekeeping_tasks(self):
"""Handle various cleanup activities."""
self._client.prune_deleted_nfs_export_policies()
super(NetAppCmodeMultiSVMFileStorageLibrary, self).\
_handle_housekeeping_tasks()
@na_utils.trace
def _find_matching_aggregates(self):
"""Find all aggregates match pattern."""

View File

@ -100,6 +100,15 @@ class NetAppCmodeSingleSVMFileStorageLibrary(
vserver_client = self._get_api_client(self._vserver)
return self._vserver, vserver_client
@na_utils.trace
def _handle_housekeeping_tasks(self):
"""Handle various cleanup activities."""
vserver_client = self._get_api_client(vserver=self._vserver)
vserver_client.prune_deleted_nfs_export_policies()
super(NetAppCmodeSingleSVMFileStorageLibrary, self).\
_handle_housekeeping_tasks()
@na_utils.trace
def _find_matching_aggregates(self):
"""Find all aggregates match pattern."""

View File

@ -29,19 +29,19 @@ class NetAppBaseHelper(object):
self._client = client
@abc.abstractmethod
def create_share(self, share, export_addresses):
def create_share(self, share, share_name, export_addresses):
"""Creates NAS share."""
@abc.abstractmethod
def delete_share(self, share):
def delete_share(self, share, share_name):
"""Deletes NAS share."""
@abc.abstractmethod
def allow_access(self, context, share, access):
def allow_access(self, context, share, share_name, access):
"""Allows new_rules to a given NAS storage in new_rules."""
@abc.abstractmethod
def deny_access(self, context, share, access):
def deny_access(self, context, share, share_name, access):
"""Denies new_rules to a given NAS storage in new_rules."""
@abc.abstractmethod

View File

@ -33,7 +33,7 @@ class NetAppCmodeCIFSHelper(base.NetAppBaseHelper):
"""Netapp specific cluster-mode CIFS sharing driver."""
@na_utils.trace
def create_share(self, share_name, export_addresses):
def create_share(self, share, share_name, export_addresses):
"""Creates CIFS share on Data ONTAP Vserver."""
self._client.create_cifs_share(share_name)
self._client.remove_cifs_share_access(share_name, 'Everyone')
@ -41,13 +41,13 @@ class NetAppCmodeCIFSHelper(base.NetAppBaseHelper):
for export_address in export_addresses]
@na_utils.trace
def delete_share(self, share):
def delete_share(self, share, share_name):
"""Deletes CIFS share on Data ONTAP Vserver."""
host_ip, share_name = self._get_export_location(share)
self._client.remove_cifs_share(share_name)
@na_utils.trace
def allow_access(self, context, share, access):
def allow_access(self, context, share, share_name, access):
"""Allows access to the CIFS share for a given user."""
if access['access_type'] != 'user':
msg = _("Cluster Mode supports only 'user' type for share access"
@ -65,7 +65,7 @@ class NetAppCmodeCIFSHelper(base.NetAppBaseHelper):
raise e
@na_utils.trace
def deny_access(self, context, share, access):
def deny_access(self, context, share, share_name, access):
"""Denies access to the CIFS share for a given user."""
host_ip, share_name = self._get_export_location(share)
user_name = access['access_to']

View File

@ -17,7 +17,9 @@ NetApp NFS protocol helper class.
from oslo_log import log
from manila.share.drivers.netapp.dataontap.client import api as netapp_api
from manila.common import constants
from manila import exception
from manila.i18n import _
from manila.share.drivers.netapp.dataontap.protocols import base
from manila.share.drivers.netapp import utils as na_utils
@ -29,73 +31,87 @@ class NetAppCmodeNFSHelper(base.NetAppBaseHelper):
"""Netapp specific cluster-mode NFS sharing driver."""
@na_utils.trace
def create_share(self, share_name, export_addresses):
def create_share(self, share, share_name, export_addresses):
"""Creates NFS share."""
self._ensure_export_policy(share, share_name)
export_path = self._client.get_volume_junction_path(share_name)
return [':'.join([export_address, export_path])
for export_address in export_addresses]
@na_utils.trace
def delete_share(self, share):
def delete_share(self, share, share_name):
"""Deletes NFS share."""
target, export_path = self._get_export_location(share)
LOG.debug('Deleting NFS rules for share %s', share['id'])
self._client.remove_nfs_export_rules(export_path)
LOG.debug('Deleting NFS export policy for share %s', share['id'])
export_policy_name = self._get_export_policy_name(share)
self._client.clear_nfs_export_policy_for_volume(share_name)
self._client.soft_delete_nfs_export_policy(export_policy_name)
@na_utils.trace
def allow_access(self, context, share, access):
"""Allows access to a given NFS storage."""
new_rules = access['access_to']
existing_rules = self._get_existing_rules(share)
def allow_access(self, context, share, share_name, access):
"""Allows access to a given NFS share."""
if access['access_type'] != 'ip':
reason = _('Only ip access type allowed.')
raise exception.InvalidShareAccess(reason)
if not isinstance(new_rules, list):
new_rules = [new_rules]
self._ensure_export_policy(share, share_name)
export_policy_name = self._get_export_policy_name(share)
rule = access['access_to']
rules = existing_rules + new_rules
try:
self._modify_rule(share, rules)
except netapp_api.NaApiError:
self._modify_rule(share, existing_rules)
if access['access_level'] == constants.ACCESS_LEVEL_RW:
readonly = False
elif access['access_level'] == constants.ACCESS_LEVEL_RO:
readonly = True
else:
raise exception.InvalidShareAccessLevel(
level=access['access_level'])
self._client.add_nfs_export_rule(export_policy_name, rule, readonly)
@na_utils.trace
def deny_access(self, context, share, access):
"""Denies access to a given NFS storage."""
access_to = access['access_to']
existing_rules = self._get_existing_rules(share)
def deny_access(self, context, share, share_name, access):
"""Denies access to a given NFS share."""
if access['access_type'] != 'ip':
return
if not isinstance(access_to, list):
access_to = [access_to]
for deny_rule in access_to:
if deny_rule in existing_rules:
existing_rules.remove(deny_rule)
self._modify_rule(share, existing_rules)
self._ensure_export_policy(share, share_name)
export_policy_name = self._get_export_policy_name(share)
rule = access['access_to']
self._client.remove_nfs_export_rule(export_policy_name, rule)
@na_utils.trace
def get_target(self, share):
"""Returns ID of target OnTap device based on export location."""
return self._get_export_location(share)[0]
@na_utils.trace
def _modify_rule(self, share, rules):
"""Modifies access rule for a given NFS share."""
target, export_path = self._get_export_location(share)
self._client.add_nfs_export_rules(export_path, rules)
@na_utils.trace
def _get_existing_rules(self, share):
"""Returns available access rules for a given NFS share."""
target, export_path = self._get_export_location(share)
existing_rules = self._client.get_nfs_export_rules(export_path)
LOG.debug('Found existing rules %(rules)r for share %(share)s',
{'rules': existing_rules, 'share': share['id']})
return existing_rules
@staticmethod
def _get_export_location(share):
"""Returns IP address and export location of a NFS share."""
"""Returns IP address and export location of an NFS share."""
export_location = share['export_location'] or ':'
return export_location.split(':')
@staticmethod
def _get_export_policy_name(share):
"""Builds export policy name for an NFS share."""
return 'policy_' + share['id'].replace('-', '_')
@na_utils.trace
def _ensure_export_policy(self, share, share_name):
"""Ensures a flexvol/share has an export policy.
This method ensures a flexvol has an export policy with a name
containing the share ID. For legacy reasons, this may not
always be the case.
"""
expected_export_policy = self._get_export_policy_name(share)
actual_export_policy = self._client.get_nfs_export_policy_for_volume(
share_name)
if actual_export_policy == expected_export_policy:
return
elif actual_export_policy == 'default':
self._client.create_nfs_export_policy(expected_export_policy)
self._client.set_nfs_export_policy_for_volume(
share_name, expected_export_policy)
else:
self._client.rename_nfs_export_policy(actual_export_policy,
expected_export_policy)

View File

@ -25,6 +25,7 @@ CONNECTION_INFO = {
NODE_NAME = 'fake_node'
VSERVER_NAME = 'fake_vserver'
VSERVER_NAME_2 = 'fake_vserver_2'
ADMIN_VSERVER_NAME = 'fake_admin_vserver'
NODE_VSERVER_NAME = 'fake_node_vserver'
ROOT_VOLUME_AGGREGATE_NAME = 'fake_root_aggr'
@ -38,6 +39,16 @@ SNAPSHOT_NAME = 'fake_snapshot'
PARENT_SHARE_NAME = 'fake_parent_share'
PARENT_SNAPSHOT_NAME = 'fake_parent_snapshot'
MAX_FILES = 5000
EXPORT_POLICY_NAME = 'fake_export_policy'
DELETED_EXPORT_POLICIES = {
VSERVER_NAME: [
'deleted_manila_fake_policy_1',
'deleted_manila_fake_policy_2',
],
VSERVER_NAME_2: [
'deleted_manila_fake_policy_3',
],
}
USER_NAME = 'fake_user'
@ -1155,3 +1166,66 @@ GET_AGGREGATE_FOR_VOLUME_RESPONSE = etree.XML("""
'aggr': SHARE_AGGREGATE_NAME,
'share': SHARE_NAME
})
EXPORT_RULE_GET_ITER_RESPONSE = etree.XML("""
<results status="passed">
<attributes-list>
<export-rule-info>
<client-match>%(rule)s</client-match>
<policy-name>%(policy)s</policy-name>
<rule-index>3</rule-index>
<vserver-name>manila_svm</vserver-name>
</export-rule-info>
<export-rule-info>
<client-match>%(rule)s</client-match>
<policy-name>%(policy)s</policy-name>
<rule-index>1</rule-index>
<vserver-name>manila_svm</vserver-name>
</export-rule-info>
</attributes-list>
<num-records>2</num-records>
</results>
""" % {'policy': EXPORT_POLICY_NAME, 'rule': IP_ADDRESS})
VOLUME_GET_EXPORT_POLICY_RESPONSE = etree.XML("""
<results status="passed">
<attributes-list>
<volume-attributes>
<volume-export-attributes>
<policy>%(policy)s</policy>
</volume-export-attributes>
<volume-id-attributes>
<name>%(volume)s</name>
<owning-vserver-name>manila_svm</owning-vserver-name>
</volume-id-attributes>
</volume-attributes>
</attributes-list>
<num-records>1</num-records>
</results>
""" % {'policy': EXPORT_POLICY_NAME, 'volume': SHARE_NAME})
DELETED_EXPORT_POLICY_GET_ITER_RESPONSE = etree.XML("""
<results status="passed">
<attributes-list>
<export-policy-info>
<policy-name>%(policy1)s</policy-name>
<vserver>%(vserver)s</vserver>
</export-policy-info>
<export-policy-info>
<policy-name>%(policy2)s</policy-name>
<vserver>%(vserver)s</vserver>
</export-policy-info>
<export-policy-info>
<policy-name>%(policy3)s</policy-name>
<vserver>%(vserver2)s</vserver>
</export-policy-info>
</attributes-list>
<num-records>2</num-records>
</results>
""" % {
'vserver': VSERVER_NAME,
'vserver2': VSERVER_NAME_2,
'policy1': DELETED_EXPORT_POLICIES[VSERVER_NAME][0],
'policy2': DELETED_EXPORT_POLICIES[VSERVER_NAME][1],
'policy3': DELETED_EXPORT_POLICIES[VSERVER_NAME_2][0],
})

View File

@ -656,9 +656,7 @@ class NetAppClientCmodeTestCase(test.TestCase):
def test_create_vlan_api_error(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error())
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(exception.NetAppException,
self.client._create_vlan,
@ -1112,8 +1110,13 @@ class NetAppClientCmodeTestCase(test.TestCase):
export_rule_create_args = {
'client-match': '0.0.0.0/0',
'policy-name': 'default',
'ro-rule': {'security-flavor': 'any'},
'rw-rule': {'security-flavor': 'any'}}
'ro-rule': {
'security-flavor': 'any'
},
'rw-rule': {
'security-flavor': 'never'
}
}
self.client.send_request.assert_has_calls([
mock.call('nfs-enable'),
@ -1170,9 +1173,7 @@ class NetAppClientCmodeTestCase(test.TestCase):
def test_configure_active_directory_api_error(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error())
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.mock_object(self.client, 'configure_dns')
self.assertRaises(exception.NetAppException,
@ -1232,9 +1233,7 @@ class NetAppClientCmodeTestCase(test.TestCase):
def test_create_kerberos_realm_api_error(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error())
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(exception.NetAppException,
self.client.create_kerberos_realm,
@ -1357,9 +1356,7 @@ class NetAppClientCmodeTestCase(test.TestCase):
def test_configure_dns_api_error(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error())
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(exception.NetAppException,
self.client.configure_dns,
@ -1780,189 +1777,451 @@ class NetAppClientCmodeTestCase(test.TestCase):
self.client.send_request.assert_has_calls([
mock.call('cifs-share-delete', cifs_share_delete_args)])
def _get_add_nfs_export_rules_request(self, export_path, rules):
def test_add_nfs_export_rule(self):
return {
'rules': {
'exports-rule-info-2': {
'pathname': fake.VOLUME_JUNCTION_PATH,
'security-rules': {
'security-rule-info': {
'read-write': [
{
'exports-hostname-info': {
'name': fake.NFS_EXPORT_RULES[0],
}
},
{
'exports-hostname-info': {
'name': fake.NFS_EXPORT_RULES[1],
}
}
],
'root': [
{
'exports-hostname-info': {
'name': fake.NFS_EXPORT_RULES[0],
}
},
{
'exports-hostname-info': {
'name': fake.NFS_EXPORT_RULES[1],
}
}
]
}
}
}
}
mock_get_nfs_export_rule_indices = self.mock_object(
self.client, '_get_nfs_export_rule_indices',
mock.Mock(return_value=[]))
mock_add_nfs_export_rule = self.mock_object(
self.client, '_add_nfs_export_rule')
mock_update_nfs_export_rule = self.mock_object(
self.client, '_update_nfs_export_rule')
self.client.add_nfs_export_rule(fake.EXPORT_POLICY_NAME,
fake.IP_ADDRESS,
False)
mock_get_nfs_export_rule_indices.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS)
mock_add_nfs_export_rule.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS, False)
self.assertFalse(mock_update_nfs_export_rule.called)
def test_add_nfs_export_rule_single_existing(self):
mock_get_nfs_export_rule_indices = self.mock_object(
self.client, '_get_nfs_export_rule_indices',
mock.Mock(return_value=['1']))
mock_add_nfs_export_rule = self.mock_object(
self.client, '_add_nfs_export_rule')
mock_update_nfs_export_rule = self.mock_object(
self.client, '_update_nfs_export_rule')
mock_remove_nfs_export_rules = self.mock_object(
self.client, '_remove_nfs_export_rules')
self.client.add_nfs_export_rule(fake.EXPORT_POLICY_NAME,
fake.IP_ADDRESS,
False)
mock_get_nfs_export_rule_indices.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS)
self.assertFalse(mock_add_nfs_export_rule.called)
mock_update_nfs_export_rule.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS, False, '1')
mock_remove_nfs_export_rules.assert_called_once_with(
fake.EXPORT_POLICY_NAME, [])
def test_add_nfs_export_rule_multiple_existing(self):
mock_get_nfs_export_rule_indices = self.mock_object(
self.client, '_get_nfs_export_rule_indices',
mock.Mock(return_value=['2', '4', '6']))
mock_add_nfs_export_rule = self.mock_object(
self.client, '_add_nfs_export_rule')
mock_update_nfs_export_rule = self.mock_object(
self.client, '_update_nfs_export_rule')
mock_remove_nfs_export_rules = self.mock_object(
self.client, '_remove_nfs_export_rules')
self.client.add_nfs_export_rule(fake.EXPORT_POLICY_NAME,
fake.IP_ADDRESS,
False)
mock_get_nfs_export_rule_indices.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS)
self.assertFalse(mock_add_nfs_export_rule.called)
mock_update_nfs_export_rule.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS, False, '2')
mock_remove_nfs_export_rules.assert_called_once_with(
fake.EXPORT_POLICY_NAME, ['4', '6'])
@ddt.data({'readonly': False, 'rw_security_flavor': 'sys'},
{'readonly': True, 'rw_security_flavor': 'never'})
@ddt.unpack
def test__add_nfs_export_rule(self, readonly, rw_security_flavor):
self.mock_object(self.client, 'send_request')
self.client._add_nfs_export_rule(fake.EXPORT_POLICY_NAME,
fake.IP_ADDRESS,
readonly)
export_rule_create_args = {
'policy-name': fake.EXPORT_POLICY_NAME,
'client-match': fake.IP_ADDRESS,
'ro-rule': {
'security-flavor': 'sys',
},
'rw-rule': {
'security-flavor': rw_security_flavor,
},
'super-user-security': {
'security-flavor': 'sys',
},
}
self.client.send_request.assert_has_calls(
[mock.call('export-rule-create', export_rule_create_args)])
@ddt.data({'readonly': False, 'rw_security_flavor': 'sys', 'index': '2'},
{'readonly': True, 'rw_security_flavor': 'never', 'index': '4'})
@ddt.unpack
def test_update_nfs_export_rule(self, readonly, rw_security_flavor, index):
self.mock_object(self.client, 'send_request')
self.client._update_nfs_export_rule(fake.EXPORT_POLICY_NAME,
fake.IP_ADDRESS,
readonly,
index)
export_rule_modify_args = {
'policy-name': fake.EXPORT_POLICY_NAME,
'rule-index': index,
'client-match': fake.IP_ADDRESS,
'ro-rule': {
'security-flavor': 'sys',
},
'rw-rule': {
'security-flavor': rw_security_flavor,
},
'super-user-security': {
'security-flavor': 'sys',
},
}
def test_add_nfs_export_rules(self):
self.client.send_request.assert_has_calls(
[mock.call('export-rule-modify', export_rule_modify_args)])
self.mock_object(self.client, 'send_request')
self.client.nfs_exports_with_prefix = False
def test_get_nfs_export_rule_indices(self):
self.client.add_nfs_export_rules(fake.VOLUME_JUNCTION_PATH,
fake.NFS_EXPORT_RULES)
api_args = self._get_add_nfs_export_rules_request(
fake.VOLUME_JUNCTION_PATH, fake.NFS_EXPORT_RULES)
self.client.send_request.assert_has_calls([
mock.call('nfs-exportfs-append-rules-2', api_args)])
def test_add_nfs_export_rules_with_vol_prefix(self):
self.mock_object(self.client, 'send_request')
self.client.nfs_exports_with_prefix = True
self.client.add_nfs_export_rules(fake.VOLUME_JUNCTION_PATH,
fake.NFS_EXPORT_RULES)
api_args = self._get_add_nfs_export_rules_request(
fake.VOLUME_JUNCTION_PATH, fake.NFS_EXPORT_RULES)
api_args['rules']['exports-rule-info-2']['pathname'] = (
'/vol' + fake.VOLUME_JUNCTION_PATH)
self.client.send_request.assert_has_calls([
mock.call('nfs-exportfs-append-rules-2', api_args)])
def test_add_nfs_export_rules_retry_without_vol_prefix(self):
side_effects = [netapp_api.NaApiError(code=netapp_api.EINTERNALERROR),
None]
self.mock_object(self.client,
'send_request',
mock.Mock(side_effect=side_effects))
self.client.nfs_exports_with_prefix = True
self.client.add_nfs_export_rules(fake.VOLUME_JUNCTION_PATH,
fake.NFS_EXPORT_RULES)
args_without_prefix = self._get_add_nfs_export_rules_request(
fake.VOLUME_JUNCTION_PATH, fake.NFS_EXPORT_RULES)
args_with_prefix = self._get_add_nfs_export_rules_request(
fake.VOLUME_JUNCTION_PATH, fake.NFS_EXPORT_RULES)
args_with_prefix['rules']['exports-rule-info-2']['pathname'] = (
'/vol' + fake.VOLUME_JUNCTION_PATH)
self.client.send_request.assert_has_calls([
mock.call('nfs-exportfs-append-rules-2', args_with_prefix),
mock.call('nfs-exportfs-append-rules-2', args_without_prefix)])
self.assertEqual(1, client_cmode.LOG.warning.call_count)
# Test side effect of setting the prefix flag to false.
self.assertFalse(self.client.nfs_exports_with_prefix)
def test_add_nfs_export_rules_retry_with_vol_prefix(self):
side_effects = [netapp_api.NaApiError(code=netapp_api.EINTERNALERROR),
None]
self.mock_object(self.client,
'send_request',
mock.Mock(side_effect=side_effects))
self.client.nfs_exports_with_prefix = False
self.client.add_nfs_export_rules(fake.VOLUME_JUNCTION_PATH,
fake.NFS_EXPORT_RULES)
args_without_prefix = self._get_add_nfs_export_rules_request(
fake.VOLUME_JUNCTION_PATH, fake.NFS_EXPORT_RULES)
args_with_prefix = self._get_add_nfs_export_rules_request(
fake.VOLUME_JUNCTION_PATH, fake.NFS_EXPORT_RULES)
args_with_prefix['rules']['exports-rule-info-2']['pathname'] = (
'/vol' + fake.VOLUME_JUNCTION_PATH)
self.client.send_request.assert_has_calls([
mock.call('nfs-exportfs-append-rules-2', args_without_prefix),
mock.call('nfs-exportfs-append-rules-2', args_with_prefix)])
self.assertEqual(1, client_cmode.LOG.warning.call_count)
# Test side effect of setting the prefix flag to false.
self.assertTrue(self.client.nfs_exports_with_prefix)
def test_add_nfs_export_rules_api_error(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error())
self.assertRaises(netapp_api.NaApiError,
self.client.add_nfs_export_rules,
fake.VOLUME_JUNCTION_PATH,
fake.NFS_EXPORT_RULES)
def test_get_nfs_export_rules(self):
api_response = netapp_api.NaElement(
fake.NFS_EXPORTFS_LIST_RULES_2_RESPONSE)
api_response = netapp_api.NaElement(fake.EXPORT_RULE_GET_ITER_RESPONSE)
self.mock_object(self.client,
'send_request',
mock.Mock(return_value=api_response))
result = self.client.get_nfs_export_rules(fake.VOLUME_JUNCTION_PATH)
result = self.client._get_nfs_export_rule_indices(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS)
nfs_exportfs_list_rules_2_args = {
'pathname': fake.VOLUME_JUNCTION_PATH
export_rule_get_iter_args = {
'query': {
'export-rule-info': {
'policy-name': fake.EXPORT_POLICY_NAME,
'client-match': fake.IP_ADDRESS,
},
},
'desired-attributes': {
'export-rule-info': {
'vserver-name': None,
'policy-name': None,
'client-match': None,
'rule-index': None,
},
},
}
self.assertListEqual(['1', '3'], result)
self.client.send_request.assert_has_calls([
mock.call('nfs-exportfs-list-rules-2',
nfs_exportfs_list_rules_2_args)])
self.assertSequenceEqual(fake.NFS_EXPORT_RULES, result)
mock.call('export-rule-get-iter', export_rule_get_iter_args)])
def test_get_nfs_export_rules_not_found(self):
def test_remove_nfs_export_rule(self):
api_response = netapp_api.NaElement(
fake.NFS_EXPORTFS_LIST_RULES_2_NO_RULES_RESPONSE)
self.mock_object(self.client,
'send_request',
mock.Mock(return_value=api_response))
fake_indices = ['1', '3', '4']
mock_get_nfs_export_rule_indices = self.mock_object(
self.client, '_get_nfs_export_rule_indices',
mock.Mock(return_value=fake_indices))
mock_remove_nfs_export_rules = self.mock_object(
self.client, '_remove_nfs_export_rules')
result = self.client.get_nfs_export_rules(fake.VOLUME_JUNCTION_PATH)
self.client.remove_nfs_export_rule(fake.EXPORT_POLICY_NAME,
fake.IP_ADDRESS)
self.assertListEqual([], result)
mock_get_nfs_export_rule_indices.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.IP_ADDRESS)
mock_remove_nfs_export_rules.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake_indices)
def test_remove_nfs_export_rules(self):
fake_indices = ['1', '3']
self.mock_object(self.client, 'send_request')
self.client.remove_nfs_export_rules(fake.VOLUME_JUNCTION_PATH)
nfs_exportfs_delete_rules_args = {
'pathnames': {
'pathname-info': {
'name': fake.VOLUME_JUNCTION_PATH,
}
}
}
self.client._remove_nfs_export_rules(fake.EXPORT_POLICY_NAME,
fake_indices)
self.client.send_request.assert_has_calls([
mock.call('nfs-exportfs-delete-rules',
nfs_exportfs_delete_rules_args)])
mock.call(
'export-rule-destroy',
{'policy-name': fake.EXPORT_POLICY_NAME, 'rule-index': '1'}),
mock.call(
'export-rule-destroy',
{'policy-name': fake.EXPORT_POLICY_NAME, 'rule-index': '3'})])
def test_remove_nfs_export_rules_not_found(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error(code=netapp_api.EOBJECTNOTFOUND))
self.client._remove_nfs_export_rules(fake.EXPORT_POLICY_NAME, ['1'])
self.client.send_request.assert_has_calls([
mock.call(
'export-rule-destroy',
{'policy-name': fake.EXPORT_POLICY_NAME, 'rule-index': '1'})])
def test_remove_nfs_export_rules_api_error(self):
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(netapp_api.NaApiError,
self.client._remove_nfs_export_rules,
fake.EXPORT_POLICY_NAME,
['1'])
def test_clear_nfs_export_policy_for_volume(self):
mock_set_nfs_export_policy_for_volume = self.mock_object(
self.client, 'set_nfs_export_policy_for_volume')
self.client.clear_nfs_export_policy_for_volume(fake.SHARE_NAME)
mock_set_nfs_export_policy_for_volume.assert_called_once_with(
fake.SHARE_NAME, 'default')
def test_set_nfs_export_policy_for_volume(self):
self.mock_object(self.client, 'send_request')
self.client.set_nfs_export_policy_for_volume(fake.SHARE_NAME,
fake.EXPORT_POLICY_NAME)
volume_modify_iter_args = {
'query': {
'volume-attributes': {
'volume-id-attributes': {
'name': fake.SHARE_NAME,
},
},
},
'attributes': {
'volume-attributes': {
'volume-export-attributes': {
'policy': fake.EXPORT_POLICY_NAME,
},
},
},
}
self.client.send_request.assert_has_calls([
mock.call('volume-modify-iter', volume_modify_iter_args)])
def test_get_nfs_export_policy_for_volume(self):
api_response = netapp_api.NaElement(
fake.VOLUME_GET_EXPORT_POLICY_RESPONSE)
self.mock_object(self.client,
'send_request',
mock.Mock(return_value=api_response))
result = self.client.get_nfs_export_policy_for_volume(fake.SHARE_NAME)
volume_get_iter_args = {
'query': {
'volume-attributes': {
'volume-id-attributes': {
'name': fake.SHARE_NAME,
},
},
},
'desired-attributes': {
'volume-attributes': {
'volume-export-attributes': {
'policy': None,
},
},
},
}
self.assertEqual(fake.EXPORT_POLICY_NAME, result)
self.client.send_request.assert_has_calls([
mock.call('volume-get-iter', volume_get_iter_args)])
def test_get_nfs_export_policy_for_volume_not_found(self):
api_response = netapp_api.NaElement(fake.NO_RECORDS_RESPONSE)
self.mock_object(self.client,
'send_request',
mock.Mock(return_value=api_response))
self.assertRaises(exception.NetAppException,
self.client.get_nfs_export_policy_for_volume,
fake.SHARE_NAME)
def test_create_nfs_export_policy(self):
self.mock_object(self.client, 'send_request')
self.client.create_nfs_export_policy(fake.EXPORT_POLICY_NAME)
export_policy_create_args = {'policy-name': fake.EXPORT_POLICY_NAME}
self.client.send_request.assert_has_calls([
mock.call('export-policy-create', export_policy_create_args)])
def test_create_nfs_export_policy_already_present(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error(code=netapp_api.EDUPLICATEENTRY))
self.client.create_nfs_export_policy(fake.EXPORT_POLICY_NAME)
export_policy_create_args = {'policy-name': fake.EXPORT_POLICY_NAME}
self.client.send_request.assert_has_calls([
mock.call('export-policy-create', export_policy_create_args)])
def test_create_nfs_export_policy_api_error(self):
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(netapp_api.NaApiError,
self.client.create_nfs_export_policy,
fake.EXPORT_POLICY_NAME)
def test_soft_delete_nfs_export_policy(self):
self.mock_object(self.client, 'delete_nfs_export_policy')
self.mock_object(self.client, 'rename_nfs_export_policy')
self.client.soft_delete_nfs_export_policy(fake.EXPORT_POLICY_NAME)
self.client.delete_nfs_export_policy.assert_has_calls([
mock.call(fake.EXPORT_POLICY_NAME)])
self.assertFalse(self.client.rename_nfs_export_policy.called)
def test_soft_delete_nfs_export_policy_api_error(self):
self.mock_object(self.client,
'delete_nfs_export_policy',
self._mock_api_error())
self.mock_object(self.client, 'rename_nfs_export_policy')
self.client.soft_delete_nfs_export_policy(fake.EXPORT_POLICY_NAME)
self.client.delete_nfs_export_policy.assert_has_calls([
mock.call(fake.EXPORT_POLICY_NAME)])
self.assertTrue(self.client.rename_nfs_export_policy.called)
def test_delete_nfs_export_policy(self):
self.mock_object(self.client, 'send_request')
self.client.delete_nfs_export_policy(fake.EXPORT_POLICY_NAME)
export_policy_destroy_args = {'policy-name': fake.EXPORT_POLICY_NAME}
self.client.send_request.assert_has_calls([
mock.call('export-policy-destroy', export_policy_destroy_args)])
def test_delete_nfs_export_policy_not_found(self):
self.mock_object(self.client,
'send_request',
self._mock_api_error(code=netapp_api.EOBJECTNOTFOUND))
self.client.delete_nfs_export_policy(fake.EXPORT_POLICY_NAME)
export_policy_destroy_args = {'policy-name': fake.EXPORT_POLICY_NAME}
self.client.send_request.assert_has_calls([
mock.call('export-policy-destroy', export_policy_destroy_args)])
def test_delete_nfs_export_policy_api_error(self):
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(netapp_api.NaApiError,
self.client.delete_nfs_export_policy,
fake.EXPORT_POLICY_NAME)
def test_rename_nfs_export_policy(self):
self.mock_object(self.client, 'send_request')
self.client.rename_nfs_export_policy(fake.EXPORT_POLICY_NAME,
'new_policy_name')
export_policy_rename_args = {
'policy-name': fake.EXPORT_POLICY_NAME,
'new-policy-name': 'new_policy_name'
}
self.client.send_request.assert_has_calls([
mock.call('export-policy-rename', export_policy_rename_args)])
def test_prune_deleted_nfs_export_policies(self):
# Mock client lest we not be able to see calls on its copy.
self.mock_object(copy,
'deepcopy',
mock.Mock(return_value=self.client))
self.mock_object(self.client,
'_get_deleted_nfs_export_policies',
mock.Mock(return_value=fake.DELETED_EXPORT_POLICIES))
self.mock_object(self.client, 'delete_nfs_export_policy')
self.client.prune_deleted_nfs_export_policies()
self.assertTrue(self.client.delete_nfs_export_policy.called)
self.client.delete_nfs_export_policy.assert_has_calls(
[mock.call(policy) for policy in
fake.DELETED_EXPORT_POLICIES[fake.VSERVER_NAME]])
def test_prune_deleted_nfs_export_policies_api_error(self):
self.mock_object(copy,
'deepcopy',
mock.Mock(return_value=self.client))
self.mock_object(self.client,
'_get_deleted_nfs_export_policies',
mock.Mock(return_value=fake.DELETED_EXPORT_POLICIES))
self.mock_object(self.client,
'delete_nfs_export_policy',
self._mock_api_error())
self.client.prune_deleted_nfs_export_policies()
self.assertTrue(self.client.delete_nfs_export_policy.called)
self.client.delete_nfs_export_policy.assert_has_calls(
[mock.call(policy) for policy in
fake.DELETED_EXPORT_POLICIES[fake.VSERVER_NAME]])
def test_get_deleted_nfs_export_policies(self):
api_response = netapp_api.NaElement(
fake.DELETED_EXPORT_POLICY_GET_ITER_RESPONSE)
self.mock_object(self.client,
'send_request',
mock.Mock(return_value=api_response))
result = self.client._get_deleted_nfs_export_policies()
export_policy_get_iter_args = {
'query': {
'export-policy-info': {
'policy-name': 'deleted_manila_*',
},
},
'desired-attributes': {
'export-policy-info': {
'policy-name': None,
'vserver': None,
},
},
}
self.assertSequenceEqual(fake.DELETED_EXPORT_POLICIES, result)
self.client.send_request.assert_has_calls([
mock.call('export-policy-get-iter', export_policy_get_iter_args)])
def test_get_ems_log_destination_vserver(self):
@ -2070,9 +2329,7 @@ class NetAppClientCmodeTestCase(test.TestCase):
self.mock_object(self.client,
'_get_ems_log_destination_vserver',
mock.Mock(return_value=fake.ADMIN_VSERVER_NAME))
self.mock_object(self.client,
'send_request',
self._mock_api_error())
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.client.send_ems_log_message(fake.EMS_MESSAGE)
@ -2189,9 +2446,7 @@ class NetAppClientCmodeTestCase(test.TestCase):
def test_check_for_cluster_credentials_api_error(self):
self.mock_object(self.client,
'send_request',
mock.Mock(side_effect=self._mock_api_error()))
self.mock_object(self.client, 'send_request', self._mock_api_error())
self.assertRaises(netapp_api.NaApiError,
self.client.check_for_cluster_credentials)

View File

@ -207,22 +207,30 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
'_update_ssc_info')
mock_handle_ems_logging = self.mock_object(self.library,
'_handle_ems_logging')
mock_handle_housekeeping_tasks = self.mock_object(
self.library, '_handle_housekeeping_tasks')
mock_ssc_periodic_task = mock.Mock()
mock_ems_periodic_task = mock.Mock()
mock_housekeeping_periodic_task = mock.Mock()
mock_loopingcall = self.mock_object(
loopingcall,
'FixedIntervalLoopingCall',
mock.Mock(side_effect=[mock_ssc_periodic_task,
mock_ems_periodic_task]))
mock_ems_periodic_task,
mock_housekeeping_periodic_task]))
self.library._start_periodic_tasks()
self.assertTrue(mock_update_ssc_info.called)
self.assertFalse(mock_handle_ems_logging.called)
mock_loopingcall.assert_has_calls([mock.call(mock_update_ssc_info),
mock.call(mock_handle_ems_logging)])
self.assertFalse(mock_housekeeping_periodic_task.called)
mock_loopingcall.assert_has_calls(
[mock.call(mock_update_ssc_info),
mock.call(mock_handle_ems_logging),
mock.call(mock_handle_housekeeping_tasks)])
self.assertTrue(mock_ssc_periodic_task.start.called)
self.assertTrue(mock_ems_periodic_task.start.called)
self.assertTrue(mock_housekeeping_periodic_task.start.called)
def test_get_valid_share_name(self):
@ -803,10 +811,9 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
fake.VSERVER1,
vserver_client)
share_name = self.library._get_valid_share_name(fake.SHARE['id'])
self.assertEqual(fake.NFS_EXPORTS, result)
protocol_helper.create_share.assert_called_once_with(
share_name, fake.LIF_ADDRESSES)
fake.SHARE, fake.SHARE_NAME, fake.LIF_ADDRESSES)
def test_create_export_lifs_not_found(self):
@ -833,7 +840,8 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
protocol_helper.set_client.assert_called_once_with(vserver_client)
protocol_helper.get_target.assert_called_once_with(fake.SHARE)
protocol_helper.delete_share.assert_called_once_with(fake.SHARE)
protocol_helper.delete_share.assert_called_once_with(fake.SHARE,
fake.SHARE_NAME)
def test_remove_export_target_not_found(self):
@ -976,6 +984,7 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
protocol_helper.allow_access.assert_called_once_with(
self.context,
fake.SHARE,
fake.SHARE_NAME,
fake.SHARE_ACCESS)
def test_deny_access(self):
@ -1000,6 +1009,7 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
protocol_helper.deny_access.assert_called_once_with(
self.context,
fake.SHARE,
fake.SHARE_NAME,
fake.SHARE_ACCESS)
def test_setup_server(self):

View File

@ -184,6 +184,17 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
self.assertTupleEqual((fake.VSERVER1, 'fake_client'), result)
def test_handle_housekeeping_tasks(self):
self.mock_object(self.client, 'prune_deleted_nfs_export_policies')
mock_super = self.mock_object(lib_base.NetAppCmodeFileStorageLibrary,
'_handle_housekeeping_tasks')
self.library._handle_housekeeping_tasks()
self.assertTrue(self.client.prune_deleted_nfs_export_policies.called)
self.assertTrue(mock_super.called)
def test_find_matching_aggregates(self):
self.mock_object(self.client,

View File

@ -148,6 +148,21 @@ class NetAppFileStorageLibraryTestCase(test.TestCase):
self.assertRaises(exception.VserverUnavailable,
self.library._get_vserver)
def test_handle_housekeeping_tasks(self):
mock_vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_api_client',
mock.Mock(return_value=mock_vserver_client))
mock_super = self.mock_object(lib_base.NetAppCmodeFileStorageLibrary,
'_handle_housekeeping_tasks')
self.library._handle_housekeeping_tasks()
self.assertTrue(
mock_vserver_client.prune_deleted_nfs_export_policies.called)
self.assertTrue(mock_super.called)
def test_find_matching_aggregates(self):
mock_vserver_client = mock.Mock()

View File

@ -30,7 +30,7 @@ LICENSES = ('base', 'cifs', 'fcp', 'flexclone', 'iscsi', 'nfs', 'snapmirror',
VOLUME_NAME_TEMPLATE = 'share_%(share_id)s'
VSERVER_NAME_TEMPLATE = 'os_%s'
AGGREGATE_NAME_SEARCH_PATTERN = '(.*)'
SHARE_NAME = 'fake_share'
SHARE_NAME = 'share_7cf7c200_d3af_4e05_b87e_9167c95dfcad'
SNAPSHOT_NAME = 'fake_snapshot'
SHARE_SIZE = 10
TENANT_ID = '24cb2448-13d8-4f41-afd9-eff5c4fd2a57'

View File

@ -12,9 +12,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from manila.common import constants
SHARE_NAME = 'fake_share'
SHARE_ID = '9dba208c-9aa7-11e4-89d3-123b93f75cba'
EXPORT_POLICY_NAME = 'policy_9dba208c_9aa7_11e4_89d3_123b93f75cba'
SHARE_ADDRESS_1 = '10.10.10.10'
SHARE_ADDRESS_2 = '10.10.10.20'
CLIENT_ADDRESS_1 = '20.20.20.10'
@ -31,9 +34,14 @@ NFS_SHARE = {
'id': SHARE_ID
}
NFS_ACCESS_HOSTS = [CLIENT_ADDRESS_1]
ACCESS = {
'access_type': 'user',
'access_to': NFS_ACCESS_HOSTS
IP_ACCESS = {
'access_type': 'ip',
'access_to': CLIENT_ADDRESS_1,
'access_level': constants.ACCESS_LEVEL_RW,
}
USER_ACCESS = {
'access_type': 'user',
'access_to': 'fake_user',
'access_level': constants.ACCESS_LEVEL_RW,
}

View File

@ -47,7 +47,8 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
def test_create_share(self):
result = self.helper.create_share(fake.SHARE_NAME,
result = self.helper.create_share(fake.CIFS_SHARE,
fake.SHARE_NAME,
[fake.SHARE_ADDRESS_1])
expected = [r'\\%s\%s' % (fake.SHARE_ADDRESS_1, fake.SHARE_NAME)]
@ -59,7 +60,8 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
def test_create_share_multiple(self):
result = self.helper.create_share(fake.SHARE_NAME,
result = self.helper.create_share(fake.CIFS_SHARE,
fake.SHARE_NAME,
[fake.SHARE_ADDRESS_1,
fake.SHARE_ADDRESS_2])
@ -73,18 +75,20 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
def test_delete_share(self):
self.helper.delete_share(fake.CIFS_SHARE)
self.helper.delete_share(fake.CIFS_SHARE, fake.SHARE_NAME)
self.mock_client.remove_cifs_share.assert_called_once_with(
fake.SHARE_NAME)
def test_allow_access(self):
self.helper.allow_access(self.mock_context, fake.CIFS_SHARE,
fake.ACCESS)
self.helper.allow_access(self.mock_context,
fake.CIFS_SHARE,
fake.SHARE_NAME,
fake.USER_ACCESS)
self.mock_client.add_cifs_share_access.assert_called_once_with(
fake.SHARE_NAME, fake.ACCESS['access_to'])
fake.SHARE_NAME, fake.USER_ACCESS['access_to'])
def test_allow_access_preexisting(self):
@ -95,7 +99,8 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
self.helper.allow_access,
self.mock_context,
fake.CIFS_SHARE,
fake.ACCESS)
fake.SHARE_NAME,
fake.USER_ACCESS)
def test_allow_access_api_error(self):
@ -106,36 +111,42 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
self.helper.allow_access,
self.mock_context,
fake.CIFS_SHARE,
fake.ACCESS)
fake.SHARE_NAME,
fake.USER_ACCESS)
def test_allow_access_invalid_type(self):
fake_access = fake.ACCESS.copy()
fake_access = fake.USER_ACCESS.copy()
fake_access['access_type'] = 'group'
self.assertRaises(exception.NetAppException,
self.helper.allow_access,
self.mock_context,
fake.CIFS_SHARE,
fake.SHARE_NAME,
fake_access)
def test_deny_access(self):
self.helper.deny_access(self.mock_context, fake.CIFS_SHARE,
fake.ACCESS)
self.helper.deny_access(self.mock_context,
fake.CIFS_SHARE,
fake.SHARE_NAME,
fake.USER_ACCESS)
self.mock_client.remove_cifs_share_access.assert_called_once_with(
fake.SHARE_NAME, fake.ACCESS['access_to'])
fake.SHARE_NAME, fake.USER_ACCESS['access_to'])
def test_deny_access_nonexistent_user(self):
self.mock_client.remove_cifs_share_access.side_effect = (
netapp_api.NaApiError(code=netapp_api.EONTAPI_EINVAL))
self.helper.deny_access(self.mock_context, fake.CIFS_SHARE,
fake.ACCESS)
self.helper.deny_access(self.mock_context,
fake.CIFS_SHARE,
fake.SHARE_NAME,
fake.USER_ACCESS)
self.mock_client.remove_cifs_share_access.assert_called_once_with(
fake.SHARE_NAME, fake.ACCESS['access_to'])
fake.SHARE_NAME, fake.USER_ACCESS['access_to'])
self.assertEqual(1, cifs_cmode.LOG.error.call_count)
def test_deny_access_nonexistent_rule(self):
@ -143,11 +154,13 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
self.mock_client.remove_cifs_share_access.side_effect = (
netapp_api.NaApiError(code=netapp_api.EOBJECTNOTFOUND))
self.helper.deny_access(self.mock_context, fake.CIFS_SHARE,
fake.ACCESS)
self.helper.deny_access(self.mock_context,
fake.CIFS_SHARE,
fake.SHARE_NAME,
fake.USER_ACCESS)
self.mock_client.remove_cifs_share_access.assert_called_once_with(
fake.SHARE_NAME, fake.ACCESS['access_to'])
fake.SHARE_NAME, fake.USER_ACCESS['access_to'])
self.assertEqual(1, cifs_cmode.LOG.error.call_count)
def test_deny_access_api_error(self):
@ -159,7 +172,8 @@ class NetAppClusteredCIFSHelperTestCase(test.TestCase):
self.helper.deny_access,
self.mock_context,
fake.CIFS_SHARE,
fake.ACCESS)
fake.SHARE_NAME,
fake.USER_ACCESS)
def test_get_target(self):

View File

@ -19,7 +19,8 @@ import copy
import mock
from manila.share.drivers.netapp.dataontap.client import api as netapp_api
from manila.common import constants
from manila import exception
from manila.share.drivers.netapp.dataontap.protocols import nfs_cmode
from manila import test
from manila.tests.share.drivers.netapp.dataontap.protocols \
@ -38,101 +39,129 @@ class NetAppClusteredNFSHelperTestCase(test.TestCase):
def test_create_share(self):
mock_ensure_export_policy = self.mock_object(self.helper,
'_ensure_export_policy')
self.mock_client.get_volume_junction_path.return_value = (
fake.NFS_SHARE_PATH)
result = self.helper.create_share(fake.SHARE_NAME,
result = self.helper.create_share(fake.NFS_SHARE,
fake.SHARE_NAME,
[fake.SHARE_ADDRESS_1])
expected = [':'.join([fake.SHARE_ADDRESS_1, fake.NFS_SHARE_PATH])]
self.assertEqual(expected, result)
self.assertTrue(mock_ensure_export_policy.called)
def test_create_share_multiple(self):
mock_ensure_export_policy = self.mock_object(self.helper,
'_ensure_export_policy')
self.mock_client.get_volume_junction_path.return_value = (
fake.NFS_SHARE_PATH)
result = self.helper.create_share(fake.SHARE_NAME,
result = self.helper.create_share(fake.NFS_SHARE,
fake.SHARE_NAME,
[fake.SHARE_ADDRESS_1,
fake.SHARE_ADDRESS_2])
expected = [':'.join([fake.SHARE_ADDRESS_1, fake.NFS_SHARE_PATH]),
':'.join([fake.SHARE_ADDRESS_2, fake.NFS_SHARE_PATH])]
self.assertEqual(expected, result)
self.assertTrue(mock_ensure_export_policy.called)
def test_delete_share(self):
self.helper.delete_share(fake.NFS_SHARE)
self.helper.delete_share(fake.NFS_SHARE, fake.SHARE_NAME)
self.mock_client.remove_nfs_export_rules.assert_called_once_with(
fake.NFS_SHARE_PATH)
self.mock_client.clear_nfs_export_policy_for_volume.\
assert_called_once_with(fake.SHARE_NAME)
self.mock_client.soft_delete_nfs_export_policy.assert_called_once_with(
fake.EXPORT_POLICY_NAME)
def test_allow_access(self):
mock_modify_rule = self.mock_object(self.helper, '_modify_rule')
self.mock_client.get_nfs_export_rules.return_value = ['localhost']
mock_ensure_export_policy = self.mock_object(self.helper,
'_ensure_export_policy')
self.helper.allow_access(
self.mock_context, fake.NFS_SHARE, fake.ACCESS)
self.helper.allow_access(self.mock_context,
fake.NFS_SHARE,
fake.SHARE_NAME,
fake.IP_ACCESS)
mock_modify_rule.assert_called_once_with(
fake.NFS_SHARE, ['localhost'] + fake.ACCESS['access_to'])
self.assertTrue(mock_ensure_export_policy.called)
self.mock_client.add_nfs_export_rule.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.CLIENT_ADDRESS_1, False)
def test_allow_access_single_host(self):
def test_allow_access_readonly(self):
mock_modify_rule = self.mock_object(self.helper, '_modify_rule')
self.mock_client.get_nfs_export_rules.return_value = ['localhost']
fake_access = copy.deepcopy(fake.ACCESS)
fake_access['access_to'] = fake.CLIENT_ADDRESS_1
ip_access = copy.deepcopy(fake.IP_ACCESS)
ip_access['access_level'] = constants.ACCESS_LEVEL_RO
self.helper.allow_access(
self.mock_context, fake.NFS_SHARE, fake_access)
mock_ensure_export_policy = self.mock_object(self.helper,
'_ensure_export_policy')
mock_modify_rule.assert_called_once_with(
fake.NFS_SHARE, ['localhost'] + fake.ACCESS['access_to'])
self.helper.allow_access(self.mock_context,
fake.NFS_SHARE,
fake.SHARE_NAME,
ip_access)
def test_allow_access_api_error(self):
self.assertTrue(mock_ensure_export_policy.called)
self.mock_client.add_nfs_export_rule.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.CLIENT_ADDRESS_1, True)
mock_modify_rule = self.mock_object(self.helper, '_modify_rule')
mock_modify_rule.side_effect = [netapp_api.NaApiError, None]
self.mock_client.get_nfs_export_rules.return_value = ['localhost']
def test_allow_access_invalid_level(self):
self.helper.allow_access(
self.mock_context, fake.NFS_SHARE, fake.ACCESS)
ip_access = copy.deepcopy(fake.IP_ACCESS)
ip_access['access_level'] = 'fake_level'
mock_modify_rule.assert_has_calls([
mock.call(
fake.NFS_SHARE, ['localhost'] + fake.ACCESS['access_to']),
mock.call(fake.NFS_SHARE, ['localhost'])
])
self.assertRaises(exception.InvalidShareAccessLevel,
self.helper.allow_access,
self.mock_context,
fake.NFS_SHARE,
fake.SHARE_NAME,
ip_access)
def test_allow_access_invalid_type(self):
ip_access = copy.deepcopy(fake.IP_ACCESS)
ip_access['access_type'] = 'user'
self.assertRaises(exception.InvalidShareAccess,
self.helper.allow_access,
self.mock_context,
fake.NFS_SHARE,
fake.SHARE_NAME,
ip_access)
def test_deny_access(self):
mock_modify_rule = self.mock_object(self.helper, '_modify_rule')
existing_hosts = [fake.CLIENT_ADDRESS_1, fake.CLIENT_ADDRESS_2]
self.mock_client.get_nfs_export_rules.return_value = existing_hosts
mock_ensure_export_policy = self.mock_object(self.helper,
'_ensure_export_policy')
fake_access = fake.ACCESS.copy()
fake_access['access_to'] = [fake.CLIENT_ADDRESS_2]
self.helper.deny_access(
self.mock_context, fake.NFS_SHARE, fake_access)
self.helper.deny_access(self.mock_context,
fake.NFS_SHARE,
fake.SHARE_NAME,
fake.IP_ACCESS)
mock_modify_rule.assert_called_once_with(
fake.NFS_SHARE, [fake.CLIENT_ADDRESS_1])
self.assertTrue(mock_ensure_export_policy.called)
self.mock_client.remove_nfs_export_rule.assert_called_once_with(
fake.EXPORT_POLICY_NAME, fake.CLIENT_ADDRESS_1)
def test_deny_access_single_host(self):
def test_deny_access_invalid_type(self):
mock_modify_rule = self.mock_object(self.helper, '_modify_rule')
existing_hosts = [fake.CLIENT_ADDRESS_1, fake.CLIENT_ADDRESS_2]
self.mock_client.get_nfs_export_rules.return_value = existing_hosts
ip_access = copy.deepcopy(fake.IP_ACCESS)
ip_access['access_type'] = 'user'
fake_access = fake.ACCESS.copy()
fake_access['access_to'] = fake.CLIENT_ADDRESS_2
self.helper.deny_access(
self.mock_context, fake.NFS_SHARE, fake_access)
mock_ensure_export_policy = self.mock_object(self.helper,
'_ensure_export_policy')
mock_modify_rule.assert_called_once_with(
fake.NFS_SHARE, [fake.CLIENT_ADDRESS_1])
self.helper.deny_access(self.mock_context,
fake.NFS_SHARE,
fake.SHARE_NAME,
ip_access)
self.assertFalse(mock_ensure_export_policy.called)
self.assertFalse(self.mock_client.remove_nfs_export_rule.called)
def test_get_target(self):
@ -144,26 +173,6 @@ class NetAppClusteredNFSHelperTestCase(test.TestCase):
target = self.helper.get_target({'export_location': ''})
self.assertEqual('', target)
def test_modify_rule(self):
access_rules = [fake.CLIENT_ADDRESS_1, fake.CLIENT_ADDRESS_2]
self.helper._modify_rule(fake.NFS_SHARE, access_rules)
self.mock_client.add_nfs_export_rules.assert_called_once_with(
fake.NFS_SHARE_PATH, access_rules)
def test_get_existing_rules(self):
self.mock_client.get_nfs_export_rules.return_value = (
fake.NFS_ACCESS_HOSTS)
result = self.helper._get_existing_rules(fake.NFS_SHARE)
self.mock_client.get_nfs_export_rules.assert_called_once_with(
fake.NFS_SHARE_PATH)
self.assertEqual(fake.NFS_ACCESS_HOSTS, result)
def test_get_export_location(self):
host_ip, export_path = self.helper._get_export_location(
@ -179,4 +188,42 @@ class NetAppClusteredNFSHelperTestCase(test.TestCase):
host_ip, export_path = self.helper._get_export_location(fake_share)
self.assertEqual('', host_ip)
self.assertEqual('', export_path)
self.assertEqual('', export_path)
def test_get_export_policy_name(self):
result = self.helper._get_export_policy_name(fake.NFS_SHARE)
self.assertEqual(fake.EXPORT_POLICY_NAME, result)
def test_ensure_export_policy_equal(self):
self.mock_client.get_nfs_export_policy_for_volume.return_value = (
fake.EXPORT_POLICY_NAME)
self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME)
self.assertFalse(self.mock_client.create_nfs_export_policy.called)
self.assertFalse(self.mock_client.rename_nfs_export_policy.called)
def test_ensure_export_policy_default(self):
self.mock_client.get_nfs_export_policy_for_volume.return_value = (
'default')
self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME)
self.mock_client.create_nfs_export_policy.assert_called_once_with(
fake.EXPORT_POLICY_NAME)
self.mock_client.set_nfs_export_policy_for_volume.\
assert_called_once_with(fake.SHARE_NAME, fake.EXPORT_POLICY_NAME)
self.assertFalse(self.mock_client.rename_nfs_export_policy.called)
def test_ensure_export_policy_rename(self):
self.mock_client.get_nfs_export_policy_for_volume.return_value = 'fake'
self.helper._ensure_export_policy(fake.NFS_SHARE, fake.SHARE_NAME)
self.assertFalse(self.mock_client.create_nfs_export_policy.called)
self.mock_client.rename_nfs_export_policy.assert_called_once_with(
'fake', fake.EXPORT_POLICY_NAME)