Merge "Add support for CIFS shares in HNAS driver"

This commit is contained in:
Jenkins 2016-08-10 03:30:51 +00:00 committed by Gerrit Code Review
commit bbe36a9ec1
6 changed files with 1030 additions and 239 deletions

View File

@ -98,7 +98,7 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| HDFS | \- | HDFS(K) | \- | \- | \- | HDFS(K) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| Hitachi HNAS | NFS (L) | \- | \- | \- | NFS (L) | \- | \- | \- |
| Hitachi HNAS | NFS (L) | CIFS (N) | \- | \- | NFS (L) | CIFS (N) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| HPE 3PAR | NFS,CIFS (K) | CIFS (K) | \- | \- | \- | \- | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+

View File

@ -61,6 +61,13 @@ hds_hnas_opts = [
cfg.StrOpt('hds_hnas_driver_helper',
default='manila.share.drivers.hitachi.ssh.HNASSSHBackend',
help="Python class to be used for driver helper."),
cfg.BoolOpt('hds_hnas_allow_cifs_snapshot_while_mounted',
default=False,
help="By default, CIFS snapshots are not allowed to be taken "
"when the share has clients connected because consistent "
"point-in-time replica cannot be guaranteed for all "
"files. Enabling this might cause inconsistent snapshots "
"on CIFS shares."),
]
CONF = cfg.CONF
@ -72,6 +79,7 @@ class HDSHNASDriver(driver.ShareDriver):
1.0.0 - Initial Version.
2.0.0 - Refactoring, bugfixes, implemented Share Shrink and Update Access.
3.0.0 - Implemented support for CIFS protocol.
"""
def __init__(self, *args, **kwargs):
@ -92,6 +100,8 @@ class HDSHNASDriver(driver.ShareDriver):
hnas_evs_id = self.configuration.safe_get('hds_hnas_evs_id')
self.hnas_evs_ip = self.configuration.safe_get('hds_hnas_evs_ip')
self.fs_name = self.configuration.safe_get('hds_hnas_file_system_name')
self.cifs_snapshot = self.configuration.safe_get(
'hds_hnas_allow_cifs_snapshot_while_mounted')
ssh_private_key = self.configuration.safe_get(
'hds_hnas_ssh_private_key')
cluster_admin_ip0 = self.configuration.safe_get(
@ -141,14 +151,11 @@ class HDSHNASDriver(driver.ShareDriver):
:param context: The `context.RequestContext` object for the request
:param share: Share that will have its access rules updated.
:param access_rules: All access rules for given share. This list
is enough to update the access rules for given share.
:param access_rules: All access rules for given share.
:param add_rules: Empty List or List of access rules which should be
added. access_rules already contains these rules. Not used by this
driver.
added. access_rules already contains these rules.
:param delete_rules: Empty List or List of access rules which should be
removed. access_rules doesn't contain these rules. Not used by
this driver.
removed. access_rules doesn't contain these rules.
:param share_server: Data structure with share server information.
Not used by this driver.
"""
@ -156,15 +163,32 @@ class HDSHNASDriver(driver.ShareDriver):
hnas_share_id = self._get_hnas_share_id(share['id'])
try:
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
except exception.HNASItemNotFoundException:
raise exception.ShareResourceNotFound(share_id=share['id'])
self._check_protocol(share['id'], share['share_proto'])
if share['share_proto'].lower() == 'nfs':
self._nfs_update_access(share, hnas_share_id, access_rules)
else:
if not (add_rules or delete_rules):
# recovery mode
self._clean_cifs_access_list(hnas_share_id)
self._cifs_allow_access(share, hnas_share_id, access_rules)
else:
self._cifs_deny_access(share, hnas_share_id, delete_rules)
self._cifs_allow_access(share, hnas_share_id, add_rules)
def _nfs_update_access(self, share, hnas_share_id, access_rules):
host_list = []
for rule in access_rules:
if rule['access_type'].lower() != 'ip':
msg = _("Only IP access type currently supported.")
msg = _("Only IP access type currently supported for NFS. "
"Share provided %(share)s with rule type "
"%(type)s.") % {'share': share['id'],
'type': rule['access_type']}
raise exception.InvalidShareAccess(reason=msg)
if rule['access_level'] == constants.ACCESS_LEVEL_RW:
@ -175,7 +199,7 @@ class HDSHNASDriver(driver.ShareDriver):
host_list.append(rule['access_to'] + '(' +
rule['access_level'] + ')')
self.hnas.update_access_rule(hnas_share_id, host_list)
self.hnas.update_nfs_access_rule(hnas_share_id, host_list)
if host_list:
LOG.debug("Share %(share)s has the rules: %(rules)s",
@ -183,6 +207,58 @@ class HDSHNASDriver(driver.ShareDriver):
else:
LOG.debug("Share %(share)s has no rules.", {'share': share['id']})
def _cifs_allow_access(self, share, hnas_share_id, add_rules):
for rule in add_rules:
if rule['access_type'].lower() != 'user':
msg = _("Only USER access type currently supported for CIFS. "
"Share provided %(share)s with rule %(r_id)s type "
"%(type)s allowing permission to %(to)s.") % {
'share': share['id'], 'type': rule['access_type'],
'r_id': rule['id'], 'to': rule['access_to']}
raise exception.InvalidShareAccess(reason=msg)
if rule['access_level'] == constants.ACCESS_LEVEL_RW:
# Adding permission acr = Allow Change&Read
permission = 'acr'
else:
# Adding permission ar = Allow Read
permission = 'ar'
formatted_user = rule['access_to'].replace('\\', '\\\\')
self.hnas.cifs_allow_access(hnas_share_id, formatted_user,
permission)
LOG.debug("Added %(rule)s rule for user/group %(user)s to share "
"%(share)s.", {'rule': rule['access_level'],
'user': rule['access_to'],
'share': share['id']})
def _cifs_deny_access(self, share, hnas_share_id, delete_rules):
for rule in delete_rules:
if rule['access_type'].lower() != 'user':
LOG.warning(_LW('Only USER access type is allowed for '
'CIFS shares. Share provided %(share)s with '
'protocol %(proto)s.'),
{'share': share['id'],
'proto': share['share_proto']})
continue
formatted_user = rule['access_to'].replace('\\', '\\\\')
self.hnas.cifs_deny_access(hnas_share_id, formatted_user)
LOG.debug("Access denied for user/group %(user)s to share "
"%(share)s.", {'user': rule['access_to'],
'share': share['id']})
def _clean_cifs_access_list(self, hnas_share_id):
permission_list = self.hnas.list_cifs_permissions(hnas_share_id)
for permission in permission_list:
formatted_user = r'"\{1}{0}\{1}"'.format(permission[0], '"')
self.hnas.cifs_deny_access(hnas_share_id, formatted_user)
def create_share(self, context, share, share_server=None):
"""Creates share.
@ -191,17 +267,16 @@ class HDSHNASDriver(driver.ShareDriver):
:param share_server: Data structure with share server information.
Not used by this driver.
:returns: Returns a path of EVS IP concatenate with the path
of share in the filesystem (e.g. ['172.24.44.10:/shares/id']).
of share in the filesystem (e.g. ['172.24.44.10:/shares/id'] for
NFS and ['\\172.24.44.10\id'] for CIFS).
"""
LOG.debug("Creating share in HNAS: %(shr)s.",
{'shr': share['id']})
if share['share_proto'].lower() != 'nfs':
msg = _("Only NFS protocol is currently supported.")
raise exception.ShareBackendException(msg=msg)
self._check_protocol(share['id'], share['share_proto'])
path = self._create_share(share['id'], share['size'])
uri = self.hnas_evs_ip + ":" + path
uri = self._create_share(share['id'], share['size'],
share['share_proto'])
LOG.debug("Share created successfully on path: %(uri)s.",
{'uri': uri})
@ -220,7 +295,7 @@ class HDSHNASDriver(driver.ShareDriver):
LOG.debug("Deleting share in HNAS: %(shr)s.",
{'shr': share['id']})
self._delete_share(hnas_share_id)
self._delete_share(hnas_share_id, share['share_proto'])
LOG.debug("Export and share successfully deleted: %(shr)s.",
{'shr': share['id']})
@ -239,7 +314,7 @@ class HDSHNASDriver(driver.ShareDriver):
"id %(ss_id)s.", {'ss_sid': snapshot['share_id'],
'ss_id': snapshot['id']})
self._create_snapshot(hnas_share_id, snapshot['id'])
self._create_snapshot(hnas_share_id, snapshot)
LOG.info(_LI("Snapshot %(id)s successfully created."),
{'id': snapshot['id']})
@ -279,9 +354,8 @@ class HDSHNASDriver(driver.ShareDriver):
hnas_src_share_id = self._get_hnas_share_id(snapshot['share_id'])
path = self._create_share_from_snapshot(share, hnas_src_share_id,
snapshot)
uri = self.hnas_evs_ip + ":" + path
uri = self._create_share_from_snapshot(share, hnas_src_share_id,
snapshot)
LOG.debug("Share %(share)s created successfully on path: %(uri)s.",
{'uri': uri,
@ -296,20 +370,19 @@ class HDSHNASDriver(driver.ShareDriver):
:param share_server: Data structure with share server information.
Not used by this driver.
:returns: Returns a list of EVS IP concatenated with the path
of share in the filesystem (e.g. ['172.24.44.10:/shares/id']).
of share in the filesystem (e.g. ['172.24.44.10:/shares/id'] for
NFS and ['\\172.24.44.10\id'] for CIFS).
"""
LOG.debug("Ensuring share in HNAS: %(shr)s.",
{'shr': share['id']})
LOG.debug("Ensuring share in HNAS: %(shr)s.", {'shr': share['id']})
hnas_share_id = self._get_hnas_share_id(share['id'])
path = self._ensure_share(hnas_share_id)
export = self._ensure_share(share, hnas_share_id)
export = self.hnas_evs_ip + ":" + path
export_list = [export]
LOG.debug("Share ensured in HNAS: %(shr)s.",
{'shr': share['id']})
LOG.debug("Share ensured in HNAS: %(shr)s, protocol %(proto)s.",
{'shr': share['id'], 'proto': share['share_proto']})
return export_list
def extend_share(self, share, new_size, share_server=None):
@ -355,8 +428,8 @@ class HDSHNASDriver(driver.ShareDriver):
'share_backend_name': self.backend_name,
'driver_handles_share_servers': self.driver_handles_share_servers,
'vendor_name': 'HDS',
'driver_version': '2.0.0',
'storage_protocol': 'NFS',
'driver_version': '3.0.0',
'storage_protocol': 'NFS_CIFS',
'total_capacity_gb': total_space,
'free_capacity_gb': free_space,
'reserved_percentage': reserved,
@ -375,7 +448,7 @@ class HDSHNASDriver(driver.ShareDriver):
:param share: Share that will be managed.
:param driver_options: Empty dict or dict with 'volume_id' option.
:returns: Returns a dict with size of share managed
and its location (your path in file-system).
and its export location.
"""
hnas_share_id = self._get_hnas_share_id(share['id'])
@ -385,20 +458,36 @@ class HDSHNASDriver(driver.ShareDriver):
msg = _("Share ID %s already exists, cannot manage.") % share['id']
raise exception.HNASBackendException(msg=msg)
LOG.info(_LI("Share %(shr_path)s will be managed with ID %(shr_id)s."),
{'shr_path': share['export_locations'][0]['path'],
'shr_id': share['id']})
self._check_protocol(share['id'], share['share_proto'])
old_path_info = share['export_locations'][0]['path'].split(':')
old_path = old_path_info[1].split('/')
if share['share_proto'].lower() == 'nfs':
# 10.0.0.1:/shares/example
LOG.info(_LI("Share %(shr_path)s will be managed with ID "
"%(shr_id)s."),
{'shr_path': share['export_locations'][0]['path'],
'shr_id': share['id']})
if len(old_path) == 3:
evs_ip = old_path_info[0]
hnas_share_id = old_path[2]
else:
msg = _("Incorrect path. It should have the following format: "
"IP:/shares/share_id.")
raise exception.ShareBackendException(msg=msg)
old_path_info = share['export_locations'][0]['path'].split(':')
old_path = old_path_info[1].split('/')
if len(old_path) == 3:
evs_ip = old_path_info[0]
hnas_share_id = old_path[2]
else:
msg = _("Incorrect path. It should have the following format: "
"IP:/shares/share_id.")
raise exception.ShareBackendException(msg=msg)
else: # then its CIFS
# \\10.0.0.1\example
old_path = share['export_locations'][0]['path'].split('\\')
if len(old_path) == 4:
evs_ip = old_path[2]
hnas_share_id = old_path[3]
else:
msg = _("Incorrect path. It should have the following format: "
"\\\\IP\\share_id.")
raise exception.ShareBackendException(msg=msg)
if evs_ip != self.hnas_evs_ip:
msg = _("The EVS IP %(evs)s is not "
@ -410,7 +499,7 @@ class HDSHNASDriver(driver.ShareDriver):
"not configured.") % {'shr': share['host']}
raise exception.ShareBackendException(msg=msg)
output = self._manage_existing(share['id'], hnas_share_id)
output = self._manage_existing(share, hnas_share_id)
self.private_storage.update(
share['id'], {'hnas_id': hnas_share_id})
@ -472,17 +561,17 @@ class HDSHNASDriver(driver.ShareDriver):
return hnas_id
def _create_share(self, share_id, share_size):
def _create_share(self, share_id, share_size, share_proto):
"""Creates share.
Creates a virtual-volume, adds a quota limit and exports it.
:param share_id: manila's database ID of share that will be created.
:param share_size: Size limit of share.
:returns: Returns a path of /shares/share_id if the export was
created successfully.
:param share_proto: Protocol of share that will be created
(NFS or CIFS)
:returns: Returns a path IP:/shares/share_id for NFS or \\IP\share_id
for CIFS if the export was created successfully.
"""
path = os.path.join('/shares', share_id)
self._check_fs_mounted()
self.hnas.vvol_create(share_id)
@ -493,11 +582,19 @@ class HDSHNASDriver(driver.ShareDriver):
{'shr': share_id, 'size': share_size})
try:
# Create NFS export
self.hnas.nfs_export_add(share_id)
LOG.debug("NFS Export created to %(shr)s.",
{'shr': share_id})
return path
if share_proto.lower() == 'nfs':
# Create NFS export
self.hnas.nfs_export_add(share_id)
LOG.debug("NFS Export created to %(shr)s.",
{'shr': share_id})
uri = self.hnas_evs_ip + ":/shares/" + share_id
else:
# Create CIFS share with vvol path
self.hnas.cifs_share_add(share_id)
LOG.debug("CIFS share created to %(shr)s.",
{'shr': share_id})
uri = r'\\%s\%s' % (self.hnas_evs_ip, share_id)
return uri
except exception.HNASBackendException as e:
with excutils.save_and_reraise_exception():
self.hnas.vvol_delete(share_id)
@ -510,20 +607,29 @@ class HDSHNASDriver(driver.ShareDriver):
msg = _("Filesystem %s is not mounted.") % self.fs_name
raise exception.HNASBackendException(msg=msg)
def _ensure_share(self, hnas_share_id):
def _ensure_share(self, share, hnas_share_id):
"""Ensure that share is exported.
:param share: Share that will be checked.
:param hnas_share_id: HNAS ID of share that will be checked.
:returns: Returns a path of /shares/share_id if the export is ok.
:returns: Returns a path IP:/shares/share_id for NFS or \\IP\share_id
for CIFS if the export is ok.
"""
self._check_protocol(share['id'], share['share_proto'])
path = os.path.join('/shares', hnas_share_id)
self._check_fs_mounted()
self.hnas.check_vvol(hnas_share_id)
self.hnas.check_quota(hnas_share_id)
self.hnas.check_export(hnas_share_id)
return path
if share['share_proto'].lower() == 'nfs':
self.hnas.check_export(hnas_share_id)
export = self.hnas_evs_ip + ":" + path
else:
self.hnas.check_cifs(hnas_share_id)
export = r'\\%s\%s' % (self.hnas_evs_ip, hnas_share_id)
return export
def _shrink_share(self, hnas_share_id, share, new_size):
"""Shrinks a share to new size.
@ -532,7 +638,7 @@ class HDSHNASDriver(driver.ShareDriver):
:param share: model of share that will be shrunk.
:param new_size: New size of share after shrink operation.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
usage = self.hnas.get_share_usage(hnas_share_id)
@ -552,7 +658,7 @@ class HDSHNASDriver(driver.ShareDriver):
:param share: model of share that will be extended.
:param new_size: New size of share after extend operation.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
old_size = share['size']
total, available_space = self.hnas.get_stats()
@ -567,56 +673,76 @@ class HDSHNASDriver(driver.ShareDriver):
% share['id'])
raise exception.HNASBackendException(msg=msg)
def _delete_share(self, hnas_share_id):
def _delete_share(self, hnas_share_id, share_proto):
"""Deletes share.
It uses tree-delete-job-submit to format and delete virtual-volumes.
Quota is deleted with virtual-volume.
:param hnas_share_id: HNAS ID of share that will be deleted.
:param share_proto: Protocol of share that will be deleted.
"""
self._check_fs_mounted()
self.hnas.nfs_export_del(hnas_share_id)
if share_proto.lower() == 'nfs':
self.hnas.nfs_export_del(hnas_share_id)
elif share_proto.lower() == 'cifs':
self.hnas.cifs_share_del(hnas_share_id)
self.hnas.vvol_delete(hnas_share_id)
def _manage_existing(self, share_id, hnas_share_id):
def _manage_existing(self, share, hnas_share_id):
"""Manages a share that exists on backend.
:param share_id: manila's database ID of share that will be managed.
:param share: share that will be managed.
:param hnas_share_id: HNAS ID of share that will be managed.
:returns: Returns a dict with size of share managed
and its location (your path in file-system).
and its export location.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
share_size = self.hnas.get_share_quota(hnas_share_id)
if share_size is None:
msg = (_("The share %s trying to be managed does not have a "
"quota limit, please set it before manage.") % share_id)
"quota limit, please set it before manage.")
% share['id'])
raise exception.ManageInvalidShare(reason=msg)
path = self.hnas_evs_ip + os.path.join(':/shares', hnas_share_id)
if share['share_proto'].lower() == 'nfs':
path = self.hnas_evs_ip + os.path.join(':/shares', hnas_share_id)
else:
path = r'\\%s\%s' % (self.hnas_evs_ip, hnas_share_id)
return {'size': share_size, 'export_locations': [path]}
def _create_snapshot(self, hnas_share_id, snapshot_id):
def _create_snapshot(self, hnas_share_id, snapshot):
"""Creates a snapshot of share.
It copies the directory and all files to a new directory inside
/snapshots/share_id/.
:param hnas_share_id: HNAS ID of share for snapshot.
:param snapshot_id: ID of new snapshot.
:param snapshot: Snapshot that will be created.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(snapshot['share'], hnas_share_id)
saved_list = []
saved_list = self.hnas.get_host_list(hnas_share_id)
new_list = []
for access in saved_list:
new_list.append(access.replace('(rw)', '(ro)'))
self.hnas.update_access_rule(hnas_share_id, new_list)
self._check_protocol(snapshot['share_id'],
snapshot['share']['share_proto'])
if snapshot['share']['share_proto'].lower() == 'nfs':
saved_list = self.hnas.get_nfs_host_list(hnas_share_id)
new_list = []
for access in saved_list:
new_list.append(access.replace('(rw)', '(ro)'))
self.hnas.update_nfs_access_rule(hnas_share_id, new_list)
else: # CIFS
if (self.hnas.is_cifs_in_use(hnas_share_id) and
not self.cifs_snapshot):
msg = _("CIFS snapshot when share is mounted is disabled. "
"Set hds_hnas_allow_cifs_snapshot_while_mounted to "
"True or unmount the share to take a snapshot.")
raise exception.ShareBackendException(msg=msg)
src_path = os.path.join('/shares', hnas_share_id)
dest_path = os.path.join('/snapshots', hnas_share_id, snapshot_id)
dest_path = os.path.join('/snapshots', hnas_share_id, snapshot['id'])
try:
self.hnas.tree_clone(src_path, dest_path)
except exception.HNASNothingToCloneException:
@ -624,7 +750,8 @@ class HDSHNASDriver(driver.ShareDriver):
"directory."))
self.hnas.create_directory(dest_path)
finally:
self.hnas.update_access_rule(hnas_share_id, saved_list)
if snapshot['share']['share_proto'].lower() == 'nfs':
self.hnas.update_nfs_access_rule(hnas_share_id, saved_list)
def _delete_snapshot(self, hnas_share_id, snapshot_id):
"""Deletes snapshot.
@ -670,5 +797,21 @@ class HDSHNASDriver(driver.ShareDriver):
except exception.HNASNothingToCloneException:
LOG.warning(_LW("Source directory is empty, exporting "
"directory."))
self.hnas.nfs_export_add(share['id'])
return dest_path
self._check_protocol(share['id'], share['share_proto'])
if share['share_proto'].lower() == 'nfs':
self.hnas.nfs_export_add(share['id'])
uri = self.hnas_evs_ip + ":" + dest_path
else:
self.hnas.cifs_share_add(share['id'])
uri = r'\\%s\%s' % (self.hnas_evs_ip, share['id'])
return uri
def _check_protocol(self, share_id, protocol):
if protocol.lower() not in ('nfs', 'cifs'):
msg = _("Only NFS or CIFS protocol are currently supported. "
"Share provided %(share)s with protocol "
"%(proto)s.") % {'share': share_id,
'proto': protocol}
raise exception.ShareBackendException(msg=msg)

View File

@ -82,11 +82,31 @@ class HNASSSHBackend(object):
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
def get_host_list(self, share_id):
def cifs_share_add(self, share_id):
path = r'\\shares\\' + share_id
command = ['cifs-share', 'add', '-S', 'disable', '--enable-abe',
'--nodefaultsaa', share_id, self.fs_name, path]
self._execute(command)
def cifs_share_del(self, share_id):
command = ['cifs-share', 'del', '--target-label', self.fs_name,
share_id]
try:
self._execute(command)
except processutils.ProcessExecutionError as e:
if e.exit_code == 1:
LOG.warning(_LW("CIFS share %s does not exist on "
"backend anymore."), share_id)
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
def get_nfs_host_list(self, share_id):
export = self._get_share_export(share_id)
return export[0].export_configuration
def update_access_rule(self, share_id, host_list):
def update_nfs_access_rule(self, share_id, host_list):
command = ['nfs-export', 'mod', '-c']
if len(host_list) == 0:
@ -103,6 +123,55 @@ class HNASSSHBackend(object):
command.append(path)
self._execute(command)
def cifs_allow_access(self, hnas_share_id, user, permission):
command = ['cifs-saa', 'add', '--target-label', self.fs_name,
hnas_share_id, user, permission]
try:
self._execute(command)
except processutils.ProcessExecutionError as e:
if 'already listed as a user' in e.stderr:
LOG.debug('User %(user)s already allowed to access share '
'%(share)s.', {'user': user, 'share': hnas_share_id})
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.InvalidShareAccess(reason=msg)
def cifs_deny_access(self, hnas_share_id, user):
command = ['cifs-saa', 'delete', '--target-label', self.fs_name,
hnas_share_id, user]
try:
self._execute(command)
except processutils.ProcessExecutionError as e:
if ('not listed as a user' in e.stderr or
'Could not delete user/group' in e.stderr):
LOG.warning(_LW('User %(user)s already not allowed to access '
'share %(share)s.'),
{'user': user, 'share': hnas_share_id})
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
def list_cifs_permissions(self, hnas_share_id):
command = ['cifs-saa', 'list', '--target-label', self.fs_name,
hnas_share_id]
try:
output, err = self._execute(command)
except processutils.ProcessExecutionError as e:
if 'No entries for this share' in e.stderr:
LOG.debug('Share %(share)s does not have any permission '
'added.', {'share': hnas_share_id})
return []
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
permissions = CIFSPermissions(output)
return permissions.permission_list
def tree_clone(self, src_path, dest_path):
command = ['tree-clone-job-submit', '-e', '-f', self.fs_name,
src_path, dest_path]
@ -278,6 +347,40 @@ class HNASSSHBackend(object):
msg = _("Export %s does not exist.") % export[0].export_name
raise exception.HNASItemNotFoundException(msg=msg)
def check_cifs(self, vvol_name):
output = self._cifs_list(vvol_name)
cifs_share = CIFSShare(output)
if self.fs_name != cifs_share.fs:
msg = _("CIFS share %(share)s is not located in "
"configured filesystem "
"%(fs)s.") % {'share': vvol_name,
'fs': self.fs_name}
raise exception.HNASItemNotFoundException(msg=msg)
def is_cifs_in_use(self, vvol_name):
output = self._cifs_list(vvol_name)
cifs_share = CIFSShare(output)
return cifs_share.is_mounted
def _cifs_list(self, vvol_name):
command = ['cifs-share', 'list', vvol_name]
try:
output, err = self._execute(command)
except processutils.ProcessExecutionError as e:
if 'does not exist' in e.stderr:
msg = _("CIFS share %(share)s was not found in EVS "
"%(evs_id)s") % {'share': vvol_name,
'evs_id': self.evs_id}
raise exception.HNASItemNotFoundException(msg=msg)
else:
raise
return output
def get_share_quota(self, share_id):
command = ['quota', 'list', self.fs_name, share_id]
output, err = self._execute(command)
@ -532,3 +635,37 @@ class Quota(object):
else:
self.limit = float(items[13])
self.limit_unit = items[14]
class CIFSPermissions(object):
def __init__(self, data):
self.permission_list = []
hnas_cifs_permissions = [('Allow Read', 'ar'),
('Allow Change & Read', 'acr'),
('Allow Full Control', 'af'),
('Deny Read', 'dr'),
('Deny Change & Read', 'dcr'),
('Deny Full Control', 'df')]
lines = data.split('\n')
for line in lines:
filtered = list(filter(lambda x: x[0] in line,
hnas_cifs_permissions))
if len(filtered) == 1:
token, permission = filtered[0]
user = line.split(token)[1:][0].strip()
self.permission_list.append((user, permission))
class CIFSShare(object):
def __init__(self, data):
lines = data.split('\n')
for line in lines:
if 'File system label' in line:
self.fs = line.split(': ')[1]
elif 'Share users' in line:
users = line.split(': ')
self.is_mounted = users[1] != '0'

View File

@ -26,7 +26,7 @@ from manila import test
CONF = cfg.CONF
share = {
share_nfs = {
'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'size': 50,
@ -39,6 +39,19 @@ share = {
'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}],
}
share_cifs = {
'id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
'name': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
'size': 50,
'host': 'hnas',
'share_proto': 'CIFS',
'share_type_id': 1,
'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
'export_locations': [{'path': '\\\\172.24.44.10\\'
'f5cadaf2-afbe-4cc4-9021-85491b6b76f7'}],
}
share_invalid_host = {
'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
@ -52,7 +65,7 @@ share_invalid_host = {
'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}],
}
access = {
access_nfs_rw = {
'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
'access_type': 'ip',
'access_to': '172.24.44.200',
@ -60,9 +73,32 @@ access = {
'state': 'active',
}
snapshot = {
access_cifs_rw = {
'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
'access_type': 'user',
'access_to': 'fake_user',
'access_level': 'rw',
'state': 'active',
}
access_cifs_ro = {
'id': '32407088-1f4f-40e9-b899-b9a4176b574d',
'access_type': 'user',
'access_to': 'fake_user',
'access_level': 'ro',
'state': 'active',
}
snapshot_nfs = {
'id': 'abba6d9b-f29c-4bf7-aac1-618cda7aaf0f',
'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'share': share_nfs,
}
snapshot_cifs = {
'id': '91bc6e1b-1ba5-f29c-abc1-da7618cabf0a',
'share_id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
'share': share_cifs,
}
invalid_share = {
@ -70,12 +106,18 @@ invalid_share = {
'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'size': 100,
'host': 'hnas',
'share_proto': 'CIFS',
'share_proto': 'HDFS',
}
invalid_snapshot = {
'id': '24dcdcb5-a582-4bcc-b462-641da143afee',
'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'share': invalid_share,
}
invalid_access_type = {
'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
'access_type': 'user',
'access_type': 'cert',
'access_to': 'manila_user',
'access_level': 'rw',
'state': 'active',
@ -89,6 +131,12 @@ invalid_access_level = {
'state': 'active',
}
invalid_protocol_msg = ("Share backend error: Only NFS or CIFS protocol are "
"currently supported. Share provided %(id)s with "
"protocol %(proto)s." %
{'id': invalid_share['id'],
'proto': invalid_share['share_proto']})
@ddt.ddt
class HDSHNASTestCase(test.TestCase):
@ -121,11 +169,18 @@ class HDSHNASTestCase(test.TestCase):
self._driver.backend_name = "hnas"
self.mock_log = self.mock_object(hds_hnas, 'LOG')
# mocking common backend calls
self.mock_object(ssh.HNASSSHBackend, "check_fs_mounted", mock.Mock(
return_value=True))
self.mock_object(ssh.HNASSSHBackend, "check_vvol", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_quota", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_cifs", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_export", mock.Mock())
@ddt.data('hds_hnas_driver_helper', 'hds_hnas_evs_id', 'hds_hnas_evs_ip',
'hds_hnas_ip', 'hds_hnas_user')
def test_init_invalid_conf_parameters(self, attr_name):
self.mock_object(manila.share.driver.ShareDriver,
'__init__')
self.mock_object(manila.share.driver.ShareDriver, '__init__')
setattr(CONF, attr_name, None)
self.assertRaises(exception.InvalidParameterValue,
@ -140,7 +195,7 @@ class HDSHNASTestCase(test.TestCase):
self.assertRaises(exception.InvalidParameterValue,
self._driver.__init__)
def test_update_access(self):
def test_update_access_nfs(self):
access1 = {
'access_type': 'ip',
'access_to': '172.24.10.10',
@ -153,18 +208,15 @@ class HDSHNASTestCase(test.TestCase):
}
access_list = [access1, access2]
self.mock_object(self._driver, '_get_hnas_share_id',
mock.Mock(return_value='hnas_id'))
self.mock_object(self._driver, '_ensure_share')
self.mock_object(ssh.HNASSSHBackend, "update_access_rule",
self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
mock.Mock())
self._driver.update_access('context', share, access_list, [], [])
self._driver.update_access('context', share_nfs, access_list, [], [])
ssh.HNASSSHBackend.update_access_rule.assert_called_once_with(
'hnas_id', [access1['access_to'] + '('
+ access1['access_level'] + ',norootsquash)',
access2['access_to'] + '('
+ access2['access_level'] + ')'])
ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with(
share_nfs['id'], [access1['access_to'] + '('
+ access1['access_level'] + ',norootsquash)',
access2['access_to'] + '('
+ access2['access_level'] + ')'])
self.assertTrue(self.mock_log.debug.called)
def test_update_access_ip_exception(self):
@ -180,10 +232,8 @@ class HDSHNASTestCase(test.TestCase):
}
access_list = [access1, access2]
self.mock_object(self._driver, '_ensure_share')
self.assertRaises(exception.InvalidShareAccess,
self._driver.update_access, 'context', share,
self._driver.update_access, 'context', share_nfs,
access_list, [], [])
def test_update_access_not_found_exception(self):
@ -203,25 +253,115 @@ class HDSHNASTestCase(test.TestCase):
side_effect=exception.HNASItemNotFoundException(msg='fake')))
self.assertRaises(exception.ShareResourceNotFound,
self._driver.update_access, 'context', share,
self._driver.update_access, 'context', share_nfs,
access_list, add_rules=[], delete_rules=[])
def test_create_share(self):
@ddt.data([access_cifs_rw, 'acr'], [access_cifs_ro, 'ar'])
@ddt.unpack
def test_allow_access_cifs(self, access_cifs, permission):
access_list_allow = [access_cifs]
self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access', mock.Mock())
self._driver.update_access('context', share_cifs, [],
access_list_allow, [])
ssh.HNASSSHBackend.cifs_allow_access.assert_called_once_with(
share_cifs['id'], 'fake_user', permission)
self.assertTrue(self.mock_log.debug.called)
def test_allow_access_cifs_invalid_type(self):
access_cifs_type_ip = {
'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
'access_type': 'ip',
'access_to': 'fake_user',
'access_level': 'rw',
'state': 'active',
}
access_list_allow = [access_cifs_type_ip]
self.assertRaises(exception.InvalidShareAccess,
self._driver.update_access, 'context', share_cifs,
[], access_list_allow, [])
def test_deny_access_cifs(self):
access_list_deny = [access_cifs_rw]
self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access', mock.Mock())
self._driver.update_access('context', share_cifs, [], [],
access_list_deny)
ssh.HNASSSHBackend.cifs_deny_access.assert_called_once_with(
share_cifs['id'], 'fake_user')
self.assertTrue(self.mock_log.debug.called)
def test_deny_access_cifs_unsupported_type(self):
access_cifs_type_ip = {
'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
'access_type': 'ip',
'access_to': 'fake_user',
'access_level': 'rw',
'state': 'active',
}
access_list_deny = [access_cifs_type_ip]
self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access', mock.Mock())
self._driver.update_access('context', share_cifs, [], [],
access_list_deny)
self.assertTrue(self.mock_log.warning.called)
def test_update_access_invalid_share_protocol(self):
self.mock_object(self._driver, '_ensure_share', mock.Mock())
ex = self.assertRaises(exception.ShareBackendException,
self._driver.update_access, 'context',
invalid_share, [], [], [])
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_update_access_cifs_recovery_mode(self):
access_list = [access_cifs_rw, access_cifs_ro]
permission_list = [('fake_user1', 'acr'), ('fake_user2', 'ar')]
self.mock_object(ssh.HNASSSHBackend, 'list_cifs_permissions',
mock.Mock(return_value=permission_list))
self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access', mock.Mock())
self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access', mock.Mock())
self._driver.update_access('context', share_cifs, access_list, [], [])
ssh.HNASSSHBackend.list_cifs_permissions.assert_called_once_with(
share_cifs['id'])
self.assertTrue(self.mock_log.debug.called)
@ddt.data(share_nfs, share_cifs)
def test_create_share(self, share):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock(
return_value='/shares/' + share['id']))
self.mock_object(ssh.HNASSSHBackend, "cifs_share_add", mock.Mock())
result = self._driver.create_share('context', share)
self.assertEqual(self._driver.hnas_evs_ip + ":/shares/" + share['id'],
result)
self.assertTrue(self.mock_log.debug.called)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
if share['share_proto'].lower() == 'nfs':
self.assertEqual(self._driver.hnas_evs_ip + ":/shares/" +
share_nfs['id'], result)
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share_nfs['id'])
self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called)
else:
self.assertEqual("\\\\" + self._driver.hnas_evs_ip + "\\" +
share_cifs['id'], result)
ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with(
share_cifs['id'])
self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called)
def test_create_share_export_error(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
@ -233,202 +373,244 @@ class HDSHNASTestCase(test.TestCase):
self.mock_object(ssh.HNASSSHBackend, "vvol_delete", mock.Mock())
self.assertRaises(exception.HNASBackendException,
self._driver.create_share, 'context', share)
self._driver.create_share, 'context', share_nfs)
self.assertTrue(self.mock_log.debug.called)
self.assertTrue(self.mock_log.exception.called)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'],
share_nfs['size'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share_nfs['id'])
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share_nfs['id'])
def test_create_share_invalid_share_protocol(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_create_share",
mock.Mock(return_value="path"))
self.assertRaises(exception.ShareBackendException,
self._driver.create_share, 'context', invalid_share)
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_share, 'context',
invalid_share)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_delete_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
@ddt.data(share_nfs, share_cifs)
def test_delete_share(self, share):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_del", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "cifs_share_del", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "vvol_delete", mock.Mock())
self._driver.delete_share('context', share)
self.assertTrue(self.mock_log.debug.called)
ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share['id'])
def test_create_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share")
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "get_host_list", mock.Mock(
if share['share_proto'].lower() == 'nfs':
ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with(
share['id'])
self.assertFalse(ssh.HNASSSHBackend.cifs_share_del.called)
else:
ssh.HNASSSHBackend.cifs_share_del.assert_called_once_with(
share['id'])
self.assertFalse(ssh.HNASSSHBackend.nfs_export_del.called)
@ddt.data(snapshot_nfs, snapshot_cifs)
def test_create_snapshot(self, snapshot):
hnas_id = snapshot['share_id']
self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock(
return_value=['172.24.44.200(rw)']))
self.mock_object(ssh.HNASSSHBackend, "update_access_rule", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock(
return_value=False))
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock())
self._driver.create_snapshot('context', snapshot)
ssh.HNASSSHBackend.get_host_list.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(rw)'])
ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
'/shares/' + 'hnas_id', '/snapshots/' + 'hnas_id' + '/' +
'/shares/' + hnas_id, '/snapshots/' + hnas_id + '/' +
snapshot['id'])
if snapshot['share']['share_proto'].lower() == 'nfs':
ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with(
hnas_id)
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(rw)'])
else:
ssh.HNASSSHBackend.is_cifs_in_use.assert_called_once_with(
hnas_id)
def test_create_snapshot_invalid_protocol(self):
self.mock_object(self._driver, '_ensure_share', mock.Mock())
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_snapshot, 'context',
invalid_snapshot)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_create_snapshot_cifs_exception(self):
cifs_excep_msg = ("Share backend error: CIFS snapshot when share is "
"mounted is disabled. Set "
"hds_hnas_allow_cifs_snapshot_while_mounted to True "
"or unmount the share to take a snapshot.")
self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock(
return_value=True))
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_snapshot, 'context',
snapshot_cifs)
self.assertEqual(cifs_excep_msg, ex.msg)
def test_create_snapshot_first_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share")
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "get_host_list", mock.Mock(
hnas_id = snapshot_nfs['share_id']
self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock(
return_value=['172.24.44.200(rw)']))
self.mock_object(ssh.HNASSSHBackend, "update_access_rule", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock(
side_effect=exception.HNASNothingToCloneException('msg')))
self.mock_object(ssh.HNASSSHBackend, "create_directory", mock.Mock())
self._driver.create_snapshot('context', snapshot)
self._driver.create_snapshot('context', snapshot_nfs)
self.assertTrue(self.mock_log.warning.called)
ssh.HNASSSHBackend.get_host_list.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(rw)'])
ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with(hnas_id)
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(rw)'])
ssh.HNASSSHBackend.create_directory.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'])
'/snapshots/' + hnas_id + '/' + snapshot_nfs['id'])
def test_delete_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
hnas_id = snapshot_nfs['share_id']
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted")
self.mock_object(ssh.HNASSSHBackend, "tree_delete", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "delete_directory", mock.Mock())
self._driver.delete_snapshot('context', snapshot)
self._driver.delete_snapshot('context', snapshot_nfs)
self.assertTrue(self.mock_log.debug.called)
self.assertTrue(self.mock_log.info.called)
hds_hnas.HDSHNASDriver._check_fs_mounted.assert_called_once_with()
ssh.HNASSSHBackend.tree_delete.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'])
'/snapshots/' + hnas_id + '/' + snapshot_nfs['id'])
ssh.HNASSSHBackend.delete_directory.assert_called_once_with(
'/snapshots/' + 'hnas_id')
def test_ensure_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_vvol", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_quota", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_export", mock.Mock())
'/snapshots/' + hnas_id)
@ddt.data(share_nfs, share_cifs)
def test_ensure_share(self, share):
result = self._driver.ensure_share('context', share)
self.assertEqual(['172.24.44.10:/shares/' + 'hnas_id'], result)
ssh.HNASSSHBackend.check_vvol.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.check_quota.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.check_export.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.check_vvol.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.check_quota.assert_called_once_with(share['id'])
if share['share_proto'].lower() == 'nfs':
self.assertEqual(['172.24.44.10:/shares/' + share['id']], result)
ssh.HNASSSHBackend.check_export.assert_called_once_with(
share['id'])
self.assertFalse(ssh.HNASSSHBackend.check_cifs.called)
else:
self.assertEqual(['\\\\172.24.44.10\\' + share['id']], result)
ssh.HNASSSHBackend.check_cifs.assert_called_once_with(share['id'])
self.assertFalse(ssh.HNASSSHBackend.check_export.called)
def test_ensure_share_invalid_protocol(self):
ex = self.assertRaises(exception.ShareBackendException,
self._driver.ensure_share, 'context',
invalid_share)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_shrink_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock(
return_value=10))
self.mock_object(ssh.HNASSSHBackend, "modify_quota", mock.Mock())
self._driver.shrink_share(share, 11)
self._driver.shrink_share(share_nfs, 11)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.modify_quota.assert_called_once_with('hnas_id', 11)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with(
share_nfs['id'])
ssh.HNASSSHBackend.modify_quota.assert_called_once_with(
share_nfs['id'], 11)
def test_shrink_share_new_size_lower_than_usage(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock(
return_value=10))
self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
self._driver.shrink_share, share, 9)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with('hnas_id')
self._driver.shrink_share, share_nfs, 9)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with(
share_nfs['id'])
def test_extend_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock(
return_value=(500, 200)))
self.mock_object(ssh.HNASSSHBackend, "modify_quota", mock.Mock())
self._driver.extend_share(share, 150)
self._driver.extend_share(share_nfs, 150)
ssh.HNASSSHBackend.get_stats.assert_called_once_with()
ssh.HNASSSHBackend.modify_quota.assert_called_once_with('hnas_id', 150)
ssh.HNASSSHBackend.modify_quota.assert_called_once_with(
share_nfs['id'], 150)
def test_extend_share_with_no_available_space_in_fs(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock(
return_value=(500, 200)))
self.mock_object(ssh.HNASSSHBackend, "modify_quota", mock.Mock())
self.assertRaises(exception.HNASBackendException,
self._driver.extend_share, share, 1000)
self._driver.extend_share, share_nfs, 1000)
ssh.HNASSSHBackend.get_stats.assert_called_once_with()
def test_manage_existing(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value=share['id']))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
@ddt.data(share_nfs, share_cifs)
def test_manage_existing(self, share):
expected_out = {'size': share['size'],
'export_locations':
[share['export_locations'][0]['path']]}
self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock(
return_value=1))
return_value=share['size']))
self._driver.manage_existing(share, 'option')
out = self._driver.manage_existing(share, 'option')
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(share['id'])
self.assertEqual(expected_out, out)
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(
share['id'])
def test_manage_existing_no_quota(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value=share['id']))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock(
return_value=None))
self.assertRaises(exception.ManageInvalidShare,
self._driver.manage_existing, share, 'option')
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(share['id'])
self._driver.manage_existing, share_nfs, 'option')
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(
share_nfs['id'])
def test_manage_existing_wrong_share_id(self):
self.mock_object(self.fake_private_storage, 'get',
mock.Mock(return_value='Wrong_share_id'))
self.assertRaises(exception.HNASBackendException,
self._driver.manage_existing, share, 'option')
self._driver.manage_existing, share_nfs, 'option')
def test_manage_existing_wrong_path_format(self):
share['export_locations'] = [{'path': ':/'}]
@ddt.data(share_nfs, share_cifs)
def test_manage_existing_wrong_path_format(self, share):
share_copy = share.copy()
share_copy['export_locations'] = [{'path': ':/'}]
self.assertRaises(exception.ShareBackendException,
self._driver.manage_existing, share,
self._driver.manage_existing, share_copy,
'option')
def test_manage_existing_wrong_evs_ip(self):
share['export_locations'] = [{'path': '172.24.44.189:/shares/'
'aa4a7710-f326-41fb-ad18-'}]
share_nfs['export_locations'] = [{'path': '172.24.44.189:/shares/'
'aa4a7710-f326-41fb-ad18-'}]
self.assertRaises(exception.ShareBackendException,
self._driver.manage_existing, share,
self._driver.manage_existing, share_nfs,
'option')
def test_manage_existing_invalid_host(self):
@ -436,8 +618,13 @@ class HDSHNASTestCase(test.TestCase):
self._driver.manage_existing, share_invalid_host,
'option')
def test_manage_existing_invalid_protocol(self):
self.assertRaises(exception.ShareBackendException,
self._driver.manage_existing, invalid_share,
'option')
def test_unmanage(self):
self._driver.unmanage(share)
self._driver.unmanage(share_nfs)
self.assertTrue(self.fake_private_storage.delete.called)
self.assertTrue(self.mock_log.info.called)
@ -447,56 +634,75 @@ class HDSHNASTestCase(test.TestCase):
self.assertEqual(0, result)
def test_create_share_from_snapshot(self):
@ddt.data([share_nfs, snapshot_nfs], [share_cifs, snapshot_cifs])
@ddt.unpack
def test_create_share_from_snapshot(self, share, snapshot):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "cifs_share_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock())
result = self._driver.create_share_from_snapshot('context',
share, snapshot)
share,
snapshot)
self.assertEqual('172.24.44.10:/shares/' + share['id'], result)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'],
'/snapshots/' + share['id'] + '/' + snapshot['id'],
'/shares/' + share['id'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
if share['share_proto'].lower() == 'nfs':
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share['id'])
self.assertEqual('172.24.44.10:/shares/' + share_nfs['id'], result)
self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called)
else:
ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with(
share['id'])
self.assertEqual('\\\\172.24.44.10\\' + share['id'], result)
self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called)
def test_create_share_from_snapshot_empty_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock(
side_effect=exception.HNASNothingToCloneException('msg')))
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock())
result = self._driver.create_share_from_snapshot('context', share,
snapshot)
result = self._driver.create_share_from_snapshot('context', share_nfs,
snapshot_nfs)
self.assertEqual('172.24.44.10:/shares/' + share['id'], result)
self.assertEqual('172.24.44.10:/shares/' + share_nfs['id'], result)
self.assertTrue(self.mock_log.warning.called)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'],
share_nfs['size'])
ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'],
'/shares/' + share['id'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
'/snapshots/' + share_nfs['id'] + '/' + snapshot_nfs['id'],
'/shares/' + share_nfs['id'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share_nfs['id'])
def test_create_share_from_snapshot_invalid_protocol(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock())
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_share_from_snapshot,
'context', invalid_share, snapshot_nfs)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test__check_fs_mounted(self):
self.mock_object(ssh.HNASSSHBackend, 'check_fs_mounted', mock.Mock(
return_value=True))
self._driver._check_fs_mounted()
ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with()
@ -505,8 +711,8 @@ class HDSHNASTestCase(test.TestCase):
self.mock_object(ssh.HNASSSHBackend, 'check_fs_mounted', mock.Mock(
return_value=False))
self.assertRaises(
exception.HNASBackendException, self._driver._check_fs_mounted)
self.assertRaises(exception.HNASBackendException,
self._driver._check_fs_mounted)
ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with()
@ -516,8 +722,8 @@ class HDSHNASTestCase(test.TestCase):
'driver_handles_share_servers':
self._driver.driver_handles_share_servers,
'vendor_name': 'HDS',
'driver_version': '2.0.0',
'storage_protocol': 'NFS',
'driver_version': '3.0.0',
'storage_protocol': 'NFS_CIFS',
'total_capacity_gb': 1000,
'free_capacity_gb': 200,
'reserved_percentage': hds_hnas.CONF.reserved_share_percentage,

View File

@ -382,6 +382,81 @@ HNAS_RESULT_unmounted_filesystem = """
file_system 1055 fake_span Umount 2 4 5 1
"""
HNAS_RESULT_cifs_list = """
Share name: vvol_test
Share path: \\\\shares\\vvol_test
Share users: 2
Share online: Yes
Share comment:
Cache options: Manual local caching for documents
ABE enabled: Yes
Continuous Availability: No
Access snapshots: No
Display snapshots: No
ShadowCopy enabled: Yes
Lower case on create: No
Follow symlinks: Yes
Follow global symlinks: No
Scan for viruses: Yes
File system label: file_system
File system size: 9.938 GB
File system free space: 6.763 GB
File system state:
formatted = Yes
mounted = Yes
failed = No
thin provisioned = No
Disaster recovery setting:
Recovered = No
Transfer setting = Use file system default
Home directories: Off
Mount point options:
"""
HNAS_RESULT_different_fs_cifs_list = """
Share name: vvol_test
Share path: \\\\shares\\vvol_test
Share users: 0
Share online: Yes
Share comment:
Cache options: Manual local caching for documents
ABE enabled: Yes
Continuous Availability: No
Access snapshots: No
Display snapshots: No
ShadowCopy enabled: Yes
Lower case on create: No
Follow symlinks: Yes
Follow global symlinks: No
Scan for viruses: Yes
File system label: different_filesystem
File system size: 9.938 GB
File system free space: 6.763 GB
File system state:
formatted = Yes
mounted = Yes
failed = No
thin provisioned = No
Disaster recovery setting:
Recovered = No
Transfer setting = Use file system default
Home directories: Off
Mount point options:
"""
HNAS_RESULT_list_cifs_permissions = """ \
Displaying the details of the share 'vvol_test' on file system 'filesystem' ...
Maximum user count is unlimited
Type Permission User/Group
U Deny Read NFSv4 user\\user1@domain.com
G Deny Change & Read Unix user\\1087
U Allow Full Control Unix user\\1088
U Allow Read Unix user\\1089
? Deny Full Control NFSv4 user\\user2@company.com
X Allow Change & Read Unix user\\1090
"""
@ddt.ddt
class HNASSSHTestCase(test.TestCase):
@ -425,7 +500,7 @@ class HNASSSHTestCase(test.TestCase):
}
def test_get_stats(self):
fake_list_command = ['df', '-a', '-f', 'file_system']
fake_list_command = ['df', '-a', '-f', self.fs_name]
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(return_value=(HNAS_RESULT_df_tb, "")))
@ -471,33 +546,192 @@ class HNASSSHTestCase(test.TestCase):
self._driver_ssh.nfs_export_del, 'vvol_test')
self.assertTrue(self.mock_log.exception.called)
def test_get_host_list(self):
def test_cifs_share_add(self):
fake_cifs_add_command = ['cifs-share', 'add', '-S', 'disable',
'--enable-abe', '--nodefaultsaa',
'vvol_test', self.fs_name,
r'\\shares\\vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_share_add('vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_add_command)
def test_cifs_share_del(self):
fake_cifs_del_command = ['cifs-share', 'del', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_share_del('vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_del_command)
def test_cifs_share_del_inexistent_share(self):
fake_cifs_del_command = ['cifs-share', 'del', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=putils.ProcessExecutionError(
exit_code=1)))
self._driver_ssh.cifs_share_del('vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_del_command)
self.assertTrue(self.mock_log.warning.called)
def test_cifs_share_del_exception(self):
fake_cifs_del_command = ['cifs-share', 'del', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=putils.ProcessExecutionError))
self.assertRaises(exception.HNASBackendException,
self._driver_ssh.cifs_share_del, 'vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_del_command)
def test_get_nfs_host_list(self):
self.mock_object(ssh.HNASSSHBackend, "_get_share_export", mock.Mock(
return_value=[ssh.Export(HNAS_RESULT_export)]))
host_list = self._driver_ssh.get_host_list('fake_id')
host_list = self._driver_ssh.get_nfs_host_list('fake_id')
self.assertEqual(['127.0.0.2'], host_list)
def test_update_access_rule_empty_host_list(self):
def test_update_nfs_access_rule_empty_host_list(self):
fake_export_command = ['nfs-export', 'mod', '-c', '127.0.0.1',
'/shares/fake_id']
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock())
self._driver_ssh.update_access_rule("fake_id", [])
self._driver_ssh.update_nfs_access_rule("fake_id", [])
self._driver_ssh._execute.assert_called_with(fake_export_command)
def test_update_access_rule(self):
def test_update_nfs_access_rule(self):
fake_export_command = ['nfs-export', 'mod', '-c',
u'"127.0.0.1,127.0.0.2"', '/shares/fake_id']
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock())
self._driver_ssh.update_access_rule("fake_id", ['127.0.0.1',
'127.0.0.2'])
self._driver_ssh.update_nfs_access_rule("fake_id", ['127.0.0.1',
'127.0.0.2'])
self._driver_ssh._execute.assert_called_with(fake_export_command)
def test_cifs_allow_access(self):
fake_cifs_allow_command = ['cifs-saa', 'add', '--target-label',
self.fs_name, 'vvol_test',
'fake_user', 'ar']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_allow_access('vvol_test', 'fake_user', 'ar')
self._driver_ssh._execute.assert_called_with(fake_cifs_allow_command)
def test_cifs_allow_access_already_allowed_user(self):
fake_cifs_allow_command = ['cifs-saa', 'add', '--target-label',
self.fs_name, 'vvol_test',
'fake_user', 'acr']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=[putils.ProcessExecutionError(
stderr='already listed as a user')]))
self._driver_ssh.cifs_allow_access('vvol_test', 'fake_user', 'acr')
self._driver_ssh._execute.assert_called_with(fake_cifs_allow_command)
self.assertTrue(self.mock_log.debug.called)
def test_cifs_allow_access_exception(self):
fake_cifs_allow_command = ['cifs-saa', 'add', '--target-label',
self.fs_name, 'vvol_test',
'fake_user', 'acr']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=[putils.ProcessExecutionError(
stderr='Could not add user/group fake_user to '
'share \'vvol_test\'')]))
self.assertRaises(exception.InvalidShareAccess,
self._driver_ssh.cifs_allow_access, 'vvol_test',
'fake_user', 'acr')
self._driver_ssh._execute.assert_called_with(fake_cifs_allow_command)
def test_cifs_deny_access(self):
fake_cifs_deny_command = ['cifs-saa', 'delete', '--target-label',
self.fs_name, 'vvol_test', 'fake_user']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_deny_access('vvol_test', 'fake_user')
self._driver_ssh._execute.assert_called_with(fake_cifs_deny_command)
def test_cifs_deny_access_already_deleted_user(self):
fake_cifs_deny_command = ['cifs-saa', 'delete', '--target-label',
self.fs_name, 'vvol_test', 'fake_user']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='not listed as a user')]))
self._driver_ssh.cifs_deny_access('vvol_test', 'fake_user')
self._driver_ssh._execute.assert_called_with(fake_cifs_deny_command)
self.assertTrue(self.mock_log.debug.called)
def test_cifs_deny_access_backend_exception(self):
fake_cifs_deny_command = ['cifs-saa', 'delete', '--target-label',
self.fs_name, 'vvol_test', 'fake_user']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=[putils.ProcessExecutionError(
stderr='Unexpected error')]))
self.assertRaises(exception.HNASBackendException,
self._driver_ssh.cifs_deny_access, 'vvol_test',
'fake_user')
self._driver_ssh._execute.assert_called_with(fake_cifs_deny_command)
def test_list_cifs_permission(self):
fake_cifs_list_command = ['cifs-saa', 'list', '--target-label',
self.fs_name, 'vvol_test']
expected_out = ssh.CIFSPermissions(HNAS_RESULT_list_cifs_permissions)
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=(HNAS_RESULT_list_cifs_permissions, '')))
out = self._driver_ssh.list_cifs_permissions('vvol_test')
for i in range(len(expected_out.permission_list)):
self.assertEqual(expected_out.permission_list[i], out[i])
self._driver_ssh._execute.assert_called_with(fake_cifs_list_command)
def test_list_cifs_no_permissions_added(self):
fake_cifs_list_command = ['cifs-saa', 'list', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='No entries for this share')]))
out = self._driver_ssh.list_cifs_permissions('vvol_test')
self.assertEqual([], out)
self._driver_ssh._execute.assert_called_with(fake_cifs_list_command)
self.assertTrue(self.mock_log.debug.called)
def test_list_cifs_exception(self):
fake_cifs_list_command = ['cifs-saa', 'list', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='Error.')]))
self.assertRaises(exception.HNASBackendException,
self._driver_ssh.list_cifs_permissions,
"vvol_test")
self._driver_ssh._execute.assert_called_with(fake_cifs_list_command)
self.assertTrue(self.mock_log.exception.called)
def test_tree_clone_nothing_to_clone(self):
fake_tree_clone_command = ['tree-clone-job-submit', '-e', '-f',
self.fs_name, '/src', '/dst']
@ -722,6 +956,70 @@ class HNASSSHTestCase(test.TestCase):
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh.check_export, "vvol_test")
def test_check_cifs(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_cifs_list, '']))
self._driver_ssh.check_cifs('vvol_test')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_check_cifs_inexistent_share(self):
check_cifs_share_command = ['cifs-share', 'list', 'wrong_vvol']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='Export wrong_vvol does not exist on backend '
'anymore.')]))
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh.check_cifs, 'wrong_vvol')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_check_cifs_exception(self):
check_cifs_share_command = ['cifs-share', 'list', 'wrong_vvol']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(stderr='Error.')]))
self.assertRaises(putils.ProcessExecutionError,
self._driver_ssh.check_cifs, 'wrong_vvol')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_check_cifs_different_fs_exception(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_different_fs_cifs_list, '']))
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh.check_cifs, 'vvol_test')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_is_cifs_in_use(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_cifs_list, '']))
out = self._driver_ssh.is_cifs_in_use('vvol_test')
self.assertTrue(out)
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_is_cifs_without_use(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_different_fs_cifs_list, '']))
out = self._driver_ssh.is_cifs_in_use('vvol_test')
self.assertFalse(out)
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_get_share_quota(self):
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock(
return_value=(HNAS_RESULT_quota, '')))
@ -795,8 +1093,7 @@ class HNASSSHTestCase(test.TestCase):
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock(
side_effect=putils.ProcessExecutionError(
stderr="NFS Export List: Export 'id' does not exist.")
))
stderr="NFS Export List: Export 'id' does not exist.")))
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh._get_share_export, 'fake_id')

View File

@ -0,0 +1,8 @@
---
prelude: >
Add support for CIFS protocol in Manila HNAS driver.
features:
- Added support for CIFS shares in Hitachi HNAS driver. It supports
user access type, where a permission for a user or a group can be
added/removed. Also, accepts 'read write' and 'read only' as access
level.