Add support for CIFS shares in HNAS driver

Updating Manila Hitachi HNAS driver to support shares using CIFS protocol.
It accepts 'user' as access type and both rw and ro as access level.

Change-Id: I18fd5afcea6f91d870bbfc256c71a92aad014c91
Implements: blueprint hnas-driver-cifs-support
This commit is contained in:
Alyson Rosa 2016-07-15 10:23:01 -03:00
parent 2c1cfc3a07
commit dfb9e587b1
6 changed files with 1030 additions and 239 deletions

View File

@ -94,7 +94,7 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| HDFS | \- | HDFS(K) | \- | \- | \- | HDFS(K) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| Hitachi HNAS | NFS (L) | \- | \- | \- | NFS (L) | \- | \- | \- |
| Hitachi HNAS | NFS (L) | CIFS (N) | \- | \- | NFS (L) | CIFS (N) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| HPE 3PAR | NFS,CIFS (K) | CIFS (K) | \- | \- | \- | \- | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+

View File

@ -61,6 +61,13 @@ hds_hnas_opts = [
cfg.StrOpt('hds_hnas_driver_helper',
default='manila.share.drivers.hitachi.ssh.HNASSSHBackend',
help="Python class to be used for driver helper."),
cfg.BoolOpt('hds_hnas_allow_cifs_snapshot_while_mounted',
default=False,
help="By default, CIFS snapshots are not allowed to be taken "
"when the share has clients connected because consistent "
"point-in-time replica cannot be guaranteed for all "
"files. Enabling this might cause inconsistent snapshots "
"on CIFS shares."),
]
CONF = cfg.CONF
@ -72,6 +79,7 @@ class HDSHNASDriver(driver.ShareDriver):
1.0.0 - Initial Version.
2.0.0 - Refactoring, bugfixes, implemented Share Shrink and Update Access.
3.0.0 - Implemented support for CIFS protocol.
"""
def __init__(self, *args, **kwargs):
@ -92,6 +100,8 @@ class HDSHNASDriver(driver.ShareDriver):
hnas_evs_id = self.configuration.safe_get('hds_hnas_evs_id')
self.hnas_evs_ip = self.configuration.safe_get('hds_hnas_evs_ip')
self.fs_name = self.configuration.safe_get('hds_hnas_file_system_name')
self.cifs_snapshot = self.configuration.safe_get(
'hds_hnas_allow_cifs_snapshot_while_mounted')
ssh_private_key = self.configuration.safe_get(
'hds_hnas_ssh_private_key')
cluster_admin_ip0 = self.configuration.safe_get(
@ -141,14 +151,11 @@ class HDSHNASDriver(driver.ShareDriver):
:param context: The `context.RequestContext` object for the request
:param share: Share that will have its access rules updated.
:param access_rules: All access rules for given share. This list
is enough to update the access rules for given share.
:param access_rules: All access rules for given share.
:param add_rules: Empty List or List of access rules which should be
added. access_rules already contains these rules. Not used by this
driver.
added. access_rules already contains these rules.
:param delete_rules: Empty List or List of access rules which should be
removed. access_rules doesn't contain these rules. Not used by
this driver.
removed. access_rules doesn't contain these rules.
:param share_server: Data structure with share server information.
Not used by this driver.
"""
@ -156,15 +163,32 @@ class HDSHNASDriver(driver.ShareDriver):
hnas_share_id = self._get_hnas_share_id(share['id'])
try:
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
except exception.HNASItemNotFoundException:
raise exception.ShareResourceNotFound(share_id=share['id'])
self._check_protocol(share['id'], share['share_proto'])
if share['share_proto'].lower() == 'nfs':
self._nfs_update_access(share, hnas_share_id, access_rules)
else:
if not (add_rules or delete_rules):
# recovery mode
self._clean_cifs_access_list(hnas_share_id)
self._cifs_allow_access(share, hnas_share_id, access_rules)
else:
self._cifs_deny_access(share, hnas_share_id, delete_rules)
self._cifs_allow_access(share, hnas_share_id, add_rules)
def _nfs_update_access(self, share, hnas_share_id, access_rules):
host_list = []
for rule in access_rules:
if rule['access_type'].lower() != 'ip':
msg = _("Only IP access type currently supported.")
msg = _("Only IP access type currently supported for NFS. "
"Share provided %(share)s with rule type "
"%(type)s.") % {'share': share['id'],
'type': rule['access_type']}
raise exception.InvalidShareAccess(reason=msg)
if rule['access_level'] == constants.ACCESS_LEVEL_RW:
@ -175,7 +199,7 @@ class HDSHNASDriver(driver.ShareDriver):
host_list.append(rule['access_to'] + '(' +
rule['access_level'] + ')')
self.hnas.update_access_rule(hnas_share_id, host_list)
self.hnas.update_nfs_access_rule(hnas_share_id, host_list)
if host_list:
LOG.debug("Share %(share)s has the rules: %(rules)s",
@ -183,6 +207,58 @@ class HDSHNASDriver(driver.ShareDriver):
else:
LOG.debug("Share %(share)s has no rules.", {'share': share['id']})
def _cifs_allow_access(self, share, hnas_share_id, add_rules):
for rule in add_rules:
if rule['access_type'].lower() != 'user':
msg = _("Only USER access type currently supported for CIFS. "
"Share provided %(share)s with rule %(r_id)s type "
"%(type)s allowing permission to %(to)s.") % {
'share': share['id'], 'type': rule['access_type'],
'r_id': rule['id'], 'to': rule['access_to']}
raise exception.InvalidShareAccess(reason=msg)
if rule['access_level'] == constants.ACCESS_LEVEL_RW:
# Adding permission acr = Allow Change&Read
permission = 'acr'
else:
# Adding permission ar = Allow Read
permission = 'ar'
formatted_user = rule['access_to'].replace('\\', '\\\\')
self.hnas.cifs_allow_access(hnas_share_id, formatted_user,
permission)
LOG.debug("Added %(rule)s rule for user/group %(user)s to share "
"%(share)s.", {'rule': rule['access_level'],
'user': rule['access_to'],
'share': share['id']})
def _cifs_deny_access(self, share, hnas_share_id, delete_rules):
for rule in delete_rules:
if rule['access_type'].lower() != 'user':
LOG.warning(_LW('Only USER access type is allowed for '
'CIFS shares. Share provided %(share)s with '
'protocol %(proto)s.'),
{'share': share['id'],
'proto': share['share_proto']})
continue
formatted_user = rule['access_to'].replace('\\', '\\\\')
self.hnas.cifs_deny_access(hnas_share_id, formatted_user)
LOG.debug("Access denied for user/group %(user)s to share "
"%(share)s.", {'user': rule['access_to'],
'share': share['id']})
def _clean_cifs_access_list(self, hnas_share_id):
permission_list = self.hnas.list_cifs_permissions(hnas_share_id)
for permission in permission_list:
formatted_user = r'"\{1}{0}\{1}"'.format(permission[0], '"')
self.hnas.cifs_deny_access(hnas_share_id, formatted_user)
def create_share(self, context, share, share_server=None):
"""Creates share.
@ -191,17 +267,16 @@ class HDSHNASDriver(driver.ShareDriver):
:param share_server: Data structure with share server information.
Not used by this driver.
:returns: Returns a path of EVS IP concatenate with the path
of share in the filesystem (e.g. ['172.24.44.10:/shares/id']).
of share in the filesystem (e.g. ['172.24.44.10:/shares/id'] for
NFS and ['\\172.24.44.10\id'] for CIFS).
"""
LOG.debug("Creating share in HNAS: %(shr)s.",
{'shr': share['id']})
if share['share_proto'].lower() != 'nfs':
msg = _("Only NFS protocol is currently supported.")
raise exception.ShareBackendException(msg=msg)
self._check_protocol(share['id'], share['share_proto'])
path = self._create_share(share['id'], share['size'])
uri = self.hnas_evs_ip + ":" + path
uri = self._create_share(share['id'], share['size'],
share['share_proto'])
LOG.debug("Share created successfully on path: %(uri)s.",
{'uri': uri})
@ -220,7 +295,7 @@ class HDSHNASDriver(driver.ShareDriver):
LOG.debug("Deleting share in HNAS: %(shr)s.",
{'shr': share['id']})
self._delete_share(hnas_share_id)
self._delete_share(hnas_share_id, share['share_proto'])
LOG.debug("Export and share successfully deleted: %(shr)s.",
{'shr': share['id']})
@ -239,7 +314,7 @@ class HDSHNASDriver(driver.ShareDriver):
"id %(ss_id)s.", {'ss_sid': snapshot['share_id'],
'ss_id': snapshot['id']})
self._create_snapshot(hnas_share_id, snapshot['id'])
self._create_snapshot(hnas_share_id, snapshot)
LOG.info(_LI("Snapshot %(id)s successfully created."),
{'id': snapshot['id']})
@ -279,9 +354,8 @@ class HDSHNASDriver(driver.ShareDriver):
hnas_src_share_id = self._get_hnas_share_id(snapshot['share_id'])
path = self._create_share_from_snapshot(share, hnas_src_share_id,
snapshot)
uri = self.hnas_evs_ip + ":" + path
uri = self._create_share_from_snapshot(share, hnas_src_share_id,
snapshot)
LOG.debug("Share %(share)s created successfully on path: %(uri)s.",
{'uri': uri,
@ -296,20 +370,19 @@ class HDSHNASDriver(driver.ShareDriver):
:param share_server: Data structure with share server information.
Not used by this driver.
:returns: Returns a list of EVS IP concatenated with the path
of share in the filesystem (e.g. ['172.24.44.10:/shares/id']).
of share in the filesystem (e.g. ['172.24.44.10:/shares/id'] for
NFS and ['\\172.24.44.10\id'] for CIFS).
"""
LOG.debug("Ensuring share in HNAS: %(shr)s.",
{'shr': share['id']})
LOG.debug("Ensuring share in HNAS: %(shr)s.", {'shr': share['id']})
hnas_share_id = self._get_hnas_share_id(share['id'])
path = self._ensure_share(hnas_share_id)
export = self._ensure_share(share, hnas_share_id)
export = self.hnas_evs_ip + ":" + path
export_list = [export]
LOG.debug("Share ensured in HNAS: %(shr)s.",
{'shr': share['id']})
LOG.debug("Share ensured in HNAS: %(shr)s, protocol %(proto)s.",
{'shr': share['id'], 'proto': share['share_proto']})
return export_list
def extend_share(self, share, new_size, share_server=None):
@ -355,8 +428,8 @@ class HDSHNASDriver(driver.ShareDriver):
'share_backend_name': self.backend_name,
'driver_handles_share_servers': self.driver_handles_share_servers,
'vendor_name': 'HDS',
'driver_version': '2.0.0',
'storage_protocol': 'NFS',
'driver_version': '3.0.0',
'storage_protocol': 'NFS_CIFS',
'total_capacity_gb': total_space,
'free_capacity_gb': free_space,
'reserved_percentage': reserved,
@ -375,7 +448,7 @@ class HDSHNASDriver(driver.ShareDriver):
:param share: Share that will be managed.
:param driver_options: Empty dict or dict with 'volume_id' option.
:returns: Returns a dict with size of share managed
and its location (your path in file-system).
and its export location.
"""
hnas_share_id = self._get_hnas_share_id(share['id'])
@ -385,20 +458,36 @@ class HDSHNASDriver(driver.ShareDriver):
msg = _("Share ID %s already exists, cannot manage.") % share['id']
raise exception.HNASBackendException(msg=msg)
LOG.info(_LI("Share %(shr_path)s will be managed with ID %(shr_id)s."),
{'shr_path': share['export_locations'][0]['path'],
'shr_id': share['id']})
self._check_protocol(share['id'], share['share_proto'])
old_path_info = share['export_locations'][0]['path'].split(':')
old_path = old_path_info[1].split('/')
if share['share_proto'].lower() == 'nfs':
# 10.0.0.1:/shares/example
LOG.info(_LI("Share %(shr_path)s will be managed with ID "
"%(shr_id)s."),
{'shr_path': share['export_locations'][0]['path'],
'shr_id': share['id']})
if len(old_path) == 3:
evs_ip = old_path_info[0]
hnas_share_id = old_path[2]
else:
msg = _("Incorrect path. It should have the following format: "
"IP:/shares/share_id.")
raise exception.ShareBackendException(msg=msg)
old_path_info = share['export_locations'][0]['path'].split(':')
old_path = old_path_info[1].split('/')
if len(old_path) == 3:
evs_ip = old_path_info[0]
hnas_share_id = old_path[2]
else:
msg = _("Incorrect path. It should have the following format: "
"IP:/shares/share_id.")
raise exception.ShareBackendException(msg=msg)
else: # then its CIFS
# \\10.0.0.1\example
old_path = share['export_locations'][0]['path'].split('\\')
if len(old_path) == 4:
evs_ip = old_path[2]
hnas_share_id = old_path[3]
else:
msg = _("Incorrect path. It should have the following format: "
"\\\\IP\\share_id.")
raise exception.ShareBackendException(msg=msg)
if evs_ip != self.hnas_evs_ip:
msg = _("The EVS IP %(evs)s is not "
@ -410,7 +499,7 @@ class HDSHNASDriver(driver.ShareDriver):
"not configured.") % {'shr': share['host']}
raise exception.ShareBackendException(msg=msg)
output = self._manage_existing(share['id'], hnas_share_id)
output = self._manage_existing(share, hnas_share_id)
self.private_storage.update(
share['id'], {'hnas_id': hnas_share_id})
@ -472,17 +561,17 @@ class HDSHNASDriver(driver.ShareDriver):
return hnas_id
def _create_share(self, share_id, share_size):
def _create_share(self, share_id, share_size, share_proto):
"""Creates share.
Creates a virtual-volume, adds a quota limit and exports it.
:param share_id: manila's database ID of share that will be created.
:param share_size: Size limit of share.
:returns: Returns a path of /shares/share_id if the export was
created successfully.
:param share_proto: Protocol of share that will be created
(NFS or CIFS)
:returns: Returns a path IP:/shares/share_id for NFS or \\IP\share_id
for CIFS if the export was created successfully.
"""
path = os.path.join('/shares', share_id)
self._check_fs_mounted()
self.hnas.vvol_create(share_id)
@ -493,11 +582,19 @@ class HDSHNASDriver(driver.ShareDriver):
{'shr': share_id, 'size': share_size})
try:
# Create NFS export
self.hnas.nfs_export_add(share_id)
LOG.debug("NFS Export created to %(shr)s.",
{'shr': share_id})
return path
if share_proto.lower() == 'nfs':
# Create NFS export
self.hnas.nfs_export_add(share_id)
LOG.debug("NFS Export created to %(shr)s.",
{'shr': share_id})
uri = self.hnas_evs_ip + ":/shares/" + share_id
else:
# Create CIFS share with vvol path
self.hnas.cifs_share_add(share_id)
LOG.debug("CIFS share created to %(shr)s.",
{'shr': share_id})
uri = r'\\%s\%s' % (self.hnas_evs_ip, share_id)
return uri
except exception.HNASBackendException as e:
with excutils.save_and_reraise_exception():
self.hnas.vvol_delete(share_id)
@ -510,20 +607,29 @@ class HDSHNASDriver(driver.ShareDriver):
msg = _("Filesystem %s is not mounted.") % self.fs_name
raise exception.HNASBackendException(msg=msg)
def _ensure_share(self, hnas_share_id):
def _ensure_share(self, share, hnas_share_id):
"""Ensure that share is exported.
:param share: Share that will be checked.
:param hnas_share_id: HNAS ID of share that will be checked.
:returns: Returns a path of /shares/share_id if the export is ok.
:returns: Returns a path IP:/shares/share_id for NFS or \\IP\share_id
for CIFS if the export is ok.
"""
self._check_protocol(share['id'], share['share_proto'])
path = os.path.join('/shares', hnas_share_id)
self._check_fs_mounted()
self.hnas.check_vvol(hnas_share_id)
self.hnas.check_quota(hnas_share_id)
self.hnas.check_export(hnas_share_id)
return path
if share['share_proto'].lower() == 'nfs':
self.hnas.check_export(hnas_share_id)
export = self.hnas_evs_ip + ":" + path
else:
self.hnas.check_cifs(hnas_share_id)
export = r'\\%s\%s' % (self.hnas_evs_ip, hnas_share_id)
return export
def _shrink_share(self, hnas_share_id, share, new_size):
"""Shrinks a share to new size.
@ -532,7 +638,7 @@ class HDSHNASDriver(driver.ShareDriver):
:param share: model of share that will be shrunk.
:param new_size: New size of share after shrink operation.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
usage = self.hnas.get_share_usage(hnas_share_id)
@ -552,7 +658,7 @@ class HDSHNASDriver(driver.ShareDriver):
:param share: model of share that will be extended.
:param new_size: New size of share after extend operation.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
old_size = share['size']
total, available_space = self.hnas.get_stats()
@ -567,56 +673,76 @@ class HDSHNASDriver(driver.ShareDriver):
% share['id'])
raise exception.HNASBackendException(msg=msg)
def _delete_share(self, hnas_share_id):
def _delete_share(self, hnas_share_id, share_proto):
"""Deletes share.
It uses tree-delete-job-submit to format and delete virtual-volumes.
Quota is deleted with virtual-volume.
:param hnas_share_id: HNAS ID of share that will be deleted.
:param share_proto: Protocol of share that will be deleted.
"""
self._check_fs_mounted()
self.hnas.nfs_export_del(hnas_share_id)
if share_proto.lower() == 'nfs':
self.hnas.nfs_export_del(hnas_share_id)
elif share_proto.lower() == 'cifs':
self.hnas.cifs_share_del(hnas_share_id)
self.hnas.vvol_delete(hnas_share_id)
def _manage_existing(self, share_id, hnas_share_id):
def _manage_existing(self, share, hnas_share_id):
"""Manages a share that exists on backend.
:param share_id: manila's database ID of share that will be managed.
:param share: share that will be managed.
:param hnas_share_id: HNAS ID of share that will be managed.
:returns: Returns a dict with size of share managed
and its location (your path in file-system).
and its export location.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(share, hnas_share_id)
share_size = self.hnas.get_share_quota(hnas_share_id)
if share_size is None:
msg = (_("The share %s trying to be managed does not have a "
"quota limit, please set it before manage.") % share_id)
"quota limit, please set it before manage.")
% share['id'])
raise exception.ManageInvalidShare(reason=msg)
path = self.hnas_evs_ip + os.path.join(':/shares', hnas_share_id)
if share['share_proto'].lower() == 'nfs':
path = self.hnas_evs_ip + os.path.join(':/shares', hnas_share_id)
else:
path = r'\\%s\%s' % (self.hnas_evs_ip, hnas_share_id)
return {'size': share_size, 'export_locations': [path]}
def _create_snapshot(self, hnas_share_id, snapshot_id):
def _create_snapshot(self, hnas_share_id, snapshot):
"""Creates a snapshot of share.
It copies the directory and all files to a new directory inside
/snapshots/share_id/.
:param hnas_share_id: HNAS ID of share for snapshot.
:param snapshot_id: ID of new snapshot.
:param snapshot: Snapshot that will be created.
"""
self._ensure_share(hnas_share_id)
self._ensure_share(snapshot['share'], hnas_share_id)
saved_list = []
saved_list = self.hnas.get_host_list(hnas_share_id)
new_list = []
for access in saved_list:
new_list.append(access.replace('(rw)', '(ro)'))
self.hnas.update_access_rule(hnas_share_id, new_list)
self._check_protocol(snapshot['share_id'],
snapshot['share']['share_proto'])
if snapshot['share']['share_proto'].lower() == 'nfs':
saved_list = self.hnas.get_nfs_host_list(hnas_share_id)
new_list = []
for access in saved_list:
new_list.append(access.replace('(rw)', '(ro)'))
self.hnas.update_nfs_access_rule(hnas_share_id, new_list)
else: # CIFS
if (self.hnas.is_cifs_in_use(hnas_share_id) and
not self.cifs_snapshot):
msg = _("CIFS snapshot when share is mounted is disabled. "
"Set hds_hnas_allow_cifs_snapshot_while_mounted to "
"True or unmount the share to take a snapshot.")
raise exception.ShareBackendException(msg=msg)
src_path = os.path.join('/shares', hnas_share_id)
dest_path = os.path.join('/snapshots', hnas_share_id, snapshot_id)
dest_path = os.path.join('/snapshots', hnas_share_id, snapshot['id'])
try:
self.hnas.tree_clone(src_path, dest_path)
except exception.HNASNothingToCloneException:
@ -624,7 +750,8 @@ class HDSHNASDriver(driver.ShareDriver):
"directory."))
self.hnas.create_directory(dest_path)
finally:
self.hnas.update_access_rule(hnas_share_id, saved_list)
if snapshot['share']['share_proto'].lower() == 'nfs':
self.hnas.update_nfs_access_rule(hnas_share_id, saved_list)
def _delete_snapshot(self, hnas_share_id, snapshot_id):
"""Deletes snapshot.
@ -670,5 +797,21 @@ class HDSHNASDriver(driver.ShareDriver):
except exception.HNASNothingToCloneException:
LOG.warning(_LW("Source directory is empty, exporting "
"directory."))
self.hnas.nfs_export_add(share['id'])
return dest_path
self._check_protocol(share['id'], share['share_proto'])
if share['share_proto'].lower() == 'nfs':
self.hnas.nfs_export_add(share['id'])
uri = self.hnas_evs_ip + ":" + dest_path
else:
self.hnas.cifs_share_add(share['id'])
uri = r'\\%s\%s' % (self.hnas_evs_ip, share['id'])
return uri
def _check_protocol(self, share_id, protocol):
if protocol.lower() not in ('nfs', 'cifs'):
msg = _("Only NFS or CIFS protocol are currently supported. "
"Share provided %(share)s with protocol "
"%(proto)s.") % {'share': share_id,
'proto': protocol}
raise exception.ShareBackendException(msg=msg)

View File

@ -82,11 +82,31 @@ class HNASSSHBackend(object):
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
def get_host_list(self, share_id):
def cifs_share_add(self, share_id):
path = r'\\shares\\' + share_id
command = ['cifs-share', 'add', '-S', 'disable', '--enable-abe',
'--nodefaultsaa', share_id, self.fs_name, path]
self._execute(command)
def cifs_share_del(self, share_id):
command = ['cifs-share', 'del', '--target-label', self.fs_name,
share_id]
try:
self._execute(command)
except processutils.ProcessExecutionError as e:
if e.exit_code == 1:
LOG.warning(_LW("CIFS share %s does not exist on "
"backend anymore."), share_id)
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
def get_nfs_host_list(self, share_id):
export = self._get_share_export(share_id)
return export[0].export_configuration
def update_access_rule(self, share_id, host_list):
def update_nfs_access_rule(self, share_id, host_list):
command = ['nfs-export', 'mod', '-c']
if len(host_list) == 0:
@ -103,6 +123,55 @@ class HNASSSHBackend(object):
command.append(path)
self._execute(command)
def cifs_allow_access(self, hnas_share_id, user, permission):
command = ['cifs-saa', 'add', '--target-label', self.fs_name,
hnas_share_id, user, permission]
try:
self._execute(command)
except processutils.ProcessExecutionError as e:
if 'already listed as a user' in e.stderr:
LOG.debug('User %(user)s already allowed to access share '
'%(share)s.', {'user': user, 'share': hnas_share_id})
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.InvalidShareAccess(reason=msg)
def cifs_deny_access(self, hnas_share_id, user):
command = ['cifs-saa', 'delete', '--target-label', self.fs_name,
hnas_share_id, user]
try:
self._execute(command)
except processutils.ProcessExecutionError as e:
if ('not listed as a user' in e.stderr or
'Could not delete user/group' in e.stderr):
LOG.warning(_LW('User %(user)s already not allowed to access '
'share %(share)s.'),
{'user': user, 'share': hnas_share_id})
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
def list_cifs_permissions(self, hnas_share_id):
command = ['cifs-saa', 'list', '--target-label', self.fs_name,
hnas_share_id]
try:
output, err = self._execute(command)
except processutils.ProcessExecutionError as e:
if 'No entries for this share' in e.stderr:
LOG.debug('Share %(share)s does not have any permission '
'added.', {'share': hnas_share_id})
return []
else:
msg = six.text_type(e)
LOG.exception(msg)
raise exception.HNASBackendException(msg=msg)
permissions = CIFSPermissions(output)
return permissions.permission_list
def tree_clone(self, src_path, dest_path):
command = ['tree-clone-job-submit', '-e', '-f', self.fs_name,
src_path, dest_path]
@ -278,6 +347,40 @@ class HNASSSHBackend(object):
msg = _("Export %s does not exist.") % export[0].export_name
raise exception.HNASItemNotFoundException(msg=msg)
def check_cifs(self, vvol_name):
output = self._cifs_list(vvol_name)
cifs_share = CIFSShare(output)
if self.fs_name != cifs_share.fs:
msg = _("CIFS share %(share)s is not located in "
"configured filesystem "
"%(fs)s.") % {'share': vvol_name,
'fs': self.fs_name}
raise exception.HNASItemNotFoundException(msg=msg)
def is_cifs_in_use(self, vvol_name):
output = self._cifs_list(vvol_name)
cifs_share = CIFSShare(output)
return cifs_share.is_mounted
def _cifs_list(self, vvol_name):
command = ['cifs-share', 'list', vvol_name]
try:
output, err = self._execute(command)
except processutils.ProcessExecutionError as e:
if 'does not exist' in e.stderr:
msg = _("CIFS share %(share)s was not found in EVS "
"%(evs_id)s") % {'share': vvol_name,
'evs_id': self.evs_id}
raise exception.HNASItemNotFoundException(msg=msg)
else:
raise
return output
def get_share_quota(self, share_id):
command = ['quota', 'list', self.fs_name, share_id]
output, err = self._execute(command)
@ -532,3 +635,37 @@ class Quota(object):
else:
self.limit = float(items[13])
self.limit_unit = items[14]
class CIFSPermissions(object):
def __init__(self, data):
self.permission_list = []
hnas_cifs_permissions = [('Allow Read', 'ar'),
('Allow Change & Read', 'acr'),
('Allow Full Control', 'af'),
('Deny Read', 'dr'),
('Deny Change & Read', 'dcr'),
('Deny Full Control', 'df')]
lines = data.split('\n')
for line in lines:
filtered = list(filter(lambda x: x[0] in line,
hnas_cifs_permissions))
if len(filtered) == 1:
token, permission = filtered[0]
user = line.split(token)[1:][0].strip()
self.permission_list.append((user, permission))
class CIFSShare(object):
def __init__(self, data):
lines = data.split('\n')
for line in lines:
if 'File system label' in line:
self.fs = line.split(': ')[1]
elif 'Share users' in line:
users = line.split(': ')
self.is_mounted = users[1] != '0'

View File

@ -26,7 +26,7 @@ from manila import test
CONF = cfg.CONF
share = {
share_nfs = {
'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'size': 50,
@ -39,6 +39,19 @@ share = {
'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}],
}
share_cifs = {
'id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
'name': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
'size': 50,
'host': 'hnas',
'share_proto': 'CIFS',
'share_type_id': 1,
'share_network_id': 'bb329e24-3bdb-491d-acfd-dfe70c09b98d',
'share_server_id': 'cc345a53-491d-acfd-3bdb-dfe70c09b98d',
'export_locations': [{'path': '\\\\172.24.44.10\\'
'f5cadaf2-afbe-4cc4-9021-85491b6b76f7'}],
}
share_invalid_host = {
'id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
@ -52,7 +65,7 @@ share_invalid_host = {
'aa4a7710-f326-41fb-ad18-b4ad587fc87a'}],
}
access = {
access_nfs_rw = {
'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
'access_type': 'ip',
'access_to': '172.24.44.200',
@ -60,9 +73,32 @@ access = {
'state': 'active',
}
snapshot = {
access_cifs_rw = {
'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
'access_type': 'user',
'access_to': 'fake_user',
'access_level': 'rw',
'state': 'active',
}
access_cifs_ro = {
'id': '32407088-1f4f-40e9-b899-b9a4176b574d',
'access_type': 'user',
'access_to': 'fake_user',
'access_level': 'ro',
'state': 'active',
}
snapshot_nfs = {
'id': 'abba6d9b-f29c-4bf7-aac1-618cda7aaf0f',
'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'share': share_nfs,
}
snapshot_cifs = {
'id': '91bc6e1b-1ba5-f29c-abc1-da7618cabf0a',
'share_id': 'f5cadaf2-afbe-4cc4-9021-85491b6b76f7',
'share': share_cifs,
}
invalid_share = {
@ -70,12 +106,18 @@ invalid_share = {
'name': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'size': 100,
'host': 'hnas',
'share_proto': 'CIFS',
'share_proto': 'HDFS',
}
invalid_snapshot = {
'id': '24dcdcb5-a582-4bcc-b462-641da143afee',
'share_id': 'aa4a7710-f326-41fb-ad18-b4ad587fc87a',
'share': invalid_share,
}
invalid_access_type = {
'id': 'acdc7172b-fe07-46c4-b78f-df3e0324ccd0',
'access_type': 'user',
'access_type': 'cert',
'access_to': 'manila_user',
'access_level': 'rw',
'state': 'active',
@ -89,6 +131,12 @@ invalid_access_level = {
'state': 'active',
}
invalid_protocol_msg = ("Share backend error: Only NFS or CIFS protocol are "
"currently supported. Share provided %(id)s with "
"protocol %(proto)s." %
{'id': invalid_share['id'],
'proto': invalid_share['share_proto']})
@ddt.ddt
class HDSHNASTestCase(test.TestCase):
@ -121,11 +169,18 @@ class HDSHNASTestCase(test.TestCase):
self._driver.backend_name = "hnas"
self.mock_log = self.mock_object(hds_hnas, 'LOG')
# mocking common backend calls
self.mock_object(ssh.HNASSSHBackend, "check_fs_mounted", mock.Mock(
return_value=True))
self.mock_object(ssh.HNASSSHBackend, "check_vvol", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_quota", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_cifs", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_export", mock.Mock())
@ddt.data('hds_hnas_driver_helper', 'hds_hnas_evs_id', 'hds_hnas_evs_ip',
'hds_hnas_ip', 'hds_hnas_user')
def test_init_invalid_conf_parameters(self, attr_name):
self.mock_object(manila.share.driver.ShareDriver,
'__init__')
self.mock_object(manila.share.driver.ShareDriver, '__init__')
setattr(CONF, attr_name, None)
self.assertRaises(exception.InvalidParameterValue,
@ -140,7 +195,7 @@ class HDSHNASTestCase(test.TestCase):
self.assertRaises(exception.InvalidParameterValue,
self._driver.__init__)
def test_update_access(self):
def test_update_access_nfs(self):
access1 = {
'access_type': 'ip',
'access_to': '172.24.10.10',
@ -153,18 +208,15 @@ class HDSHNASTestCase(test.TestCase):
}
access_list = [access1, access2]
self.mock_object(self._driver, '_get_hnas_share_id',
mock.Mock(return_value='hnas_id'))
self.mock_object(self._driver, '_ensure_share')
self.mock_object(ssh.HNASSSHBackend, "update_access_rule",
self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
mock.Mock())
self._driver.update_access('context', share, access_list, [], [])
self._driver.update_access('context', share_nfs, access_list, [], [])
ssh.HNASSSHBackend.update_access_rule.assert_called_once_with(
'hnas_id', [access1['access_to'] + '('
+ access1['access_level'] + ',norootsquash)',
access2['access_to'] + '('
+ access2['access_level'] + ')'])
ssh.HNASSSHBackend.update_nfs_access_rule.assert_called_once_with(
share_nfs['id'], [access1['access_to'] + '('
+ access1['access_level'] + ',norootsquash)',
access2['access_to'] + '('
+ access2['access_level'] + ')'])
self.assertTrue(self.mock_log.debug.called)
def test_update_access_ip_exception(self):
@ -180,10 +232,8 @@ class HDSHNASTestCase(test.TestCase):
}
access_list = [access1, access2]
self.mock_object(self._driver, '_ensure_share')
self.assertRaises(exception.InvalidShareAccess,
self._driver.update_access, 'context', share,
self._driver.update_access, 'context', share_nfs,
access_list, [], [])
def test_update_access_not_found_exception(self):
@ -203,25 +253,115 @@ class HDSHNASTestCase(test.TestCase):
side_effect=exception.HNASItemNotFoundException(msg='fake')))
self.assertRaises(exception.ShareResourceNotFound,
self._driver.update_access, 'context', share,
self._driver.update_access, 'context', share_nfs,
access_list, add_rules=[], delete_rules=[])
def test_create_share(self):
@ddt.data([access_cifs_rw, 'acr'], [access_cifs_ro, 'ar'])
@ddt.unpack
def test_allow_access_cifs(self, access_cifs, permission):
access_list_allow = [access_cifs]
self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access', mock.Mock())
self._driver.update_access('context', share_cifs, [],
access_list_allow, [])
ssh.HNASSSHBackend.cifs_allow_access.assert_called_once_with(
share_cifs['id'], 'fake_user', permission)
self.assertTrue(self.mock_log.debug.called)
def test_allow_access_cifs_invalid_type(self):
access_cifs_type_ip = {
'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
'access_type': 'ip',
'access_to': 'fake_user',
'access_level': 'rw',
'state': 'active',
}
access_list_allow = [access_cifs_type_ip]
self.assertRaises(exception.InvalidShareAccess,
self._driver.update_access, 'context', share_cifs,
[], access_list_allow, [])
def test_deny_access_cifs(self):
access_list_deny = [access_cifs_rw]
self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access', mock.Mock())
self._driver.update_access('context', share_cifs, [], [],
access_list_deny)
ssh.HNASSSHBackend.cifs_deny_access.assert_called_once_with(
share_cifs['id'], 'fake_user')
self.assertTrue(self.mock_log.debug.called)
def test_deny_access_cifs_unsupported_type(self):
access_cifs_type_ip = {
'id': '43167594-40e9-b899-1f4f-b9c2176b7564',
'access_type': 'ip',
'access_to': 'fake_user',
'access_level': 'rw',
'state': 'active',
}
access_list_deny = [access_cifs_type_ip]
self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access', mock.Mock())
self._driver.update_access('context', share_cifs, [], [],
access_list_deny)
self.assertTrue(self.mock_log.warning.called)
def test_update_access_invalid_share_protocol(self):
self.mock_object(self._driver, '_ensure_share', mock.Mock())
ex = self.assertRaises(exception.ShareBackendException,
self._driver.update_access, 'context',
invalid_share, [], [], [])
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_update_access_cifs_recovery_mode(self):
access_list = [access_cifs_rw, access_cifs_ro]
permission_list = [('fake_user1', 'acr'), ('fake_user2', 'ar')]
self.mock_object(ssh.HNASSSHBackend, 'list_cifs_permissions',
mock.Mock(return_value=permission_list))
self.mock_object(ssh.HNASSSHBackend, 'cifs_deny_access', mock.Mock())
self.mock_object(ssh.HNASSSHBackend, 'cifs_allow_access', mock.Mock())
self._driver.update_access('context', share_cifs, access_list, [], [])
ssh.HNASSSHBackend.list_cifs_permissions.assert_called_once_with(
share_cifs['id'])
self.assertTrue(self.mock_log.debug.called)
@ddt.data(share_nfs, share_cifs)
def test_create_share(self, share):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock(
return_value='/shares/' + share['id']))
self.mock_object(ssh.HNASSSHBackend, "cifs_share_add", mock.Mock())
result = self._driver.create_share('context', share)
self.assertEqual(self._driver.hnas_evs_ip + ":/shares/" + share['id'],
result)
self.assertTrue(self.mock_log.debug.called)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
if share['share_proto'].lower() == 'nfs':
self.assertEqual(self._driver.hnas_evs_ip + ":/shares/" +
share_nfs['id'], result)
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share_nfs['id'])
self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called)
else:
self.assertEqual("\\\\" + self._driver.hnas_evs_ip + "\\" +
share_cifs['id'], result)
ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with(
share_cifs['id'])
self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called)
def test_create_share_export_error(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
@ -233,202 +373,244 @@ class HDSHNASTestCase(test.TestCase):
self.mock_object(ssh.HNASSSHBackend, "vvol_delete", mock.Mock())
self.assertRaises(exception.HNASBackendException,
self._driver.create_share, 'context', share)
self._driver.create_share, 'context', share_nfs)
self.assertTrue(self.mock_log.debug.called)
self.assertTrue(self.mock_log.exception.called)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'],
share_nfs['size'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share_nfs['id'])
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share_nfs['id'])
def test_create_share_invalid_share_protocol(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_create_share",
mock.Mock(return_value="path"))
self.assertRaises(exception.ShareBackendException,
self._driver.create_share, 'context', invalid_share)
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_share, 'context',
invalid_share)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_delete_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
@ddt.data(share_nfs, share_cifs)
def test_delete_share(self, share):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_del", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "cifs_share_del", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "vvol_delete", mock.Mock())
self._driver.delete_share('context', share)
self.assertTrue(self.mock_log.debug.called)
ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.vvol_delete.assert_called_once_with(share['id'])
def test_create_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share")
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "get_host_list", mock.Mock(
if share['share_proto'].lower() == 'nfs':
ssh.HNASSSHBackend.nfs_export_del.assert_called_once_with(
share['id'])
self.assertFalse(ssh.HNASSSHBackend.cifs_share_del.called)
else:
ssh.HNASSSHBackend.cifs_share_del.assert_called_once_with(
share['id'])
self.assertFalse(ssh.HNASSSHBackend.nfs_export_del.called)
@ddt.data(snapshot_nfs, snapshot_cifs)
def test_create_snapshot(self, snapshot):
hnas_id = snapshot['share_id']
self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock(
return_value=['172.24.44.200(rw)']))
self.mock_object(ssh.HNASSSHBackend, "update_access_rule", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock(
return_value=False))
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock())
self._driver.create_snapshot('context', snapshot)
ssh.HNASSSHBackend.get_host_list.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(rw)'])
ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
'/shares/' + 'hnas_id', '/snapshots/' + 'hnas_id' + '/' +
'/shares/' + hnas_id, '/snapshots/' + hnas_id + '/' +
snapshot['id'])
if snapshot['share']['share_proto'].lower() == 'nfs':
ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with(
hnas_id)
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(rw)'])
else:
ssh.HNASSSHBackend.is_cifs_in_use.assert_called_once_with(
hnas_id)
def test_create_snapshot_invalid_protocol(self):
self.mock_object(self._driver, '_ensure_share', mock.Mock())
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_snapshot, 'context',
invalid_snapshot)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_create_snapshot_cifs_exception(self):
cifs_excep_msg = ("Share backend error: CIFS snapshot when share is "
"mounted is disabled. Set "
"hds_hnas_allow_cifs_snapshot_while_mounted to True "
"or unmount the share to take a snapshot.")
self.mock_object(ssh.HNASSSHBackend, "is_cifs_in_use", mock.Mock(
return_value=True))
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_snapshot, 'context',
snapshot_cifs)
self.assertEqual(cifs_excep_msg, ex.msg)
def test_create_snapshot_first_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share")
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "get_host_list", mock.Mock(
hnas_id = snapshot_nfs['share_id']
self.mock_object(ssh.HNASSSHBackend, "get_nfs_host_list", mock.Mock(
return_value=['172.24.44.200(rw)']))
self.mock_object(ssh.HNASSSHBackend, "update_access_rule", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "update_nfs_access_rule",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock(
side_effect=exception.HNASNothingToCloneException('msg')))
self.mock_object(ssh.HNASSSHBackend, "create_directory", mock.Mock())
self._driver.create_snapshot('context', snapshot)
self._driver.create_snapshot('context', snapshot_nfs)
self.assertTrue(self.mock_log.warning.called)
ssh.HNASSSHBackend.get_host_list.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_access_rule.assert_any_call(
'hnas_id', ['172.24.44.200(rw)'])
ssh.HNASSSHBackend.get_nfs_host_list.assert_called_once_with(hnas_id)
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(ro)'])
ssh.HNASSSHBackend.update_nfs_access_rule.assert_any_call(
hnas_id, ['172.24.44.200(rw)'])
ssh.HNASSSHBackend.create_directory.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'])
'/snapshots/' + hnas_id + '/' + snapshot_nfs['id'])
def test_delete_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
hnas_id = snapshot_nfs['share_id']
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted")
self.mock_object(ssh.HNASSSHBackend, "tree_delete", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "delete_directory", mock.Mock())
self._driver.delete_snapshot('context', snapshot)
self._driver.delete_snapshot('context', snapshot_nfs)
self.assertTrue(self.mock_log.debug.called)
self.assertTrue(self.mock_log.info.called)
hds_hnas.HDSHNASDriver._check_fs_mounted.assert_called_once_with()
ssh.HNASSSHBackend.tree_delete.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'])
'/snapshots/' + hnas_id + '/' + snapshot_nfs['id'])
ssh.HNASSSHBackend.delete_directory.assert_called_once_with(
'/snapshots/' + 'hnas_id')
def test_ensure_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_vvol", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_quota", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "check_export", mock.Mock())
'/snapshots/' + hnas_id)
@ddt.data(share_nfs, share_cifs)
def test_ensure_share(self, share):
result = self._driver.ensure_share('context', share)
self.assertEqual(['172.24.44.10:/shares/' + 'hnas_id'], result)
ssh.HNASSSHBackend.check_vvol.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.check_quota.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.check_export.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.check_vvol.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.check_quota.assert_called_once_with(share['id'])
if share['share_proto'].lower() == 'nfs':
self.assertEqual(['172.24.44.10:/shares/' + share['id']], result)
ssh.HNASSSHBackend.check_export.assert_called_once_with(
share['id'])
self.assertFalse(ssh.HNASSSHBackend.check_cifs.called)
else:
self.assertEqual(['\\\\172.24.44.10\\' + share['id']], result)
ssh.HNASSSHBackend.check_cifs.assert_called_once_with(share['id'])
self.assertFalse(ssh.HNASSSHBackend.check_export.called)
def test_ensure_share_invalid_protocol(self):
ex = self.assertRaises(exception.ShareBackendException,
self._driver.ensure_share, 'context',
invalid_share)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test_shrink_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock(
return_value=10))
self.mock_object(ssh.HNASSSHBackend, "modify_quota", mock.Mock())
self._driver.shrink_share(share, 11)
self._driver.shrink_share(share_nfs, 11)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with('hnas_id')
ssh.HNASSSHBackend.modify_quota.assert_called_once_with('hnas_id', 11)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with(
share_nfs['id'])
ssh.HNASSSHBackend.modify_quota.assert_called_once_with(
share_nfs['id'], 11)
def test_shrink_share_new_size_lower_than_usage(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_share_usage", mock.Mock(
return_value=10))
self.assertRaises(exception.ShareShrinkingPossibleDataLoss,
self._driver.shrink_share, share, 9)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with('hnas_id')
self._driver.shrink_share, share_nfs, 9)
ssh.HNASSSHBackend.get_share_usage.assert_called_once_with(
share_nfs['id'])
def test_extend_share(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock(
return_value=(500, 200)))
self.mock_object(ssh.HNASSSHBackend, "modify_quota", mock.Mock())
self._driver.extend_share(share, 150)
self._driver.extend_share(share_nfs, 150)
ssh.HNASSSHBackend.get_stats.assert_called_once_with()
ssh.HNASSSHBackend.modify_quota.assert_called_once_with('hnas_id', 150)
ssh.HNASSSHBackend.modify_quota.assert_called_once_with(
share_nfs['id'], 150)
def test_extend_share_with_no_available_space_in_fs(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_stats", mock.Mock(
return_value=(500, 200)))
self.mock_object(ssh.HNASSSHBackend, "modify_quota", mock.Mock())
self.assertRaises(exception.HNASBackendException,
self._driver.extend_share, share, 1000)
self._driver.extend_share, share_nfs, 1000)
ssh.HNASSSHBackend.get_stats.assert_called_once_with()
def test_manage_existing(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value=share['id']))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
@ddt.data(share_nfs, share_cifs)
def test_manage_existing(self, share):
expected_out = {'size': share['size'],
'export_locations':
[share['export_locations'][0]['path']]}
self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock(
return_value=1))
return_value=share['size']))
self._driver.manage_existing(share, 'option')
out = self._driver.manage_existing(share, 'option')
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(share['id'])
self.assertEqual(expected_out, out)
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(
share['id'])
def test_manage_existing_no_quota(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value=share['id']))
self.mock_object(hds_hnas.HDSHNASDriver, "_ensure_share", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "get_share_quota", mock.Mock(
return_value=None))
self.assertRaises(exception.ManageInvalidShare,
self._driver.manage_existing, share, 'option')
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(share['id'])
self._driver.manage_existing, share_nfs, 'option')
ssh.HNASSSHBackend.get_share_quota.assert_called_once_with(
share_nfs['id'])
def test_manage_existing_wrong_share_id(self):
self.mock_object(self.fake_private_storage, 'get',
mock.Mock(return_value='Wrong_share_id'))
self.assertRaises(exception.HNASBackendException,
self._driver.manage_existing, share, 'option')
self._driver.manage_existing, share_nfs, 'option')
def test_manage_existing_wrong_path_format(self):
share['export_locations'] = [{'path': ':/'}]
@ddt.data(share_nfs, share_cifs)
def test_manage_existing_wrong_path_format(self, share):
share_copy = share.copy()
share_copy['export_locations'] = [{'path': ':/'}]
self.assertRaises(exception.ShareBackendException,
self._driver.manage_existing, share,
self._driver.manage_existing, share_copy,
'option')
def test_manage_existing_wrong_evs_ip(self):
share['export_locations'] = [{'path': '172.24.44.189:/shares/'
'aa4a7710-f326-41fb-ad18-'}]
share_nfs['export_locations'] = [{'path': '172.24.44.189:/shares/'
'aa4a7710-f326-41fb-ad18-'}]
self.assertRaises(exception.ShareBackendException,
self._driver.manage_existing, share,
self._driver.manage_existing, share_nfs,
'option')
def test_manage_existing_invalid_host(self):
@ -436,8 +618,13 @@ class HDSHNASTestCase(test.TestCase):
self._driver.manage_existing, share_invalid_host,
'option')
def test_manage_existing_invalid_protocol(self):
self.assertRaises(exception.ShareBackendException,
self._driver.manage_existing, invalid_share,
'option')
def test_unmanage(self):
self._driver.unmanage(share)
self._driver.unmanage(share_nfs)
self.assertTrue(self.fake_private_storage.delete.called)
self.assertTrue(self.mock_log.info.called)
@ -447,56 +634,75 @@ class HDSHNASTestCase(test.TestCase):
self.assertEqual(0, result)
def test_create_share_from_snapshot(self):
@ddt.data([share_nfs, snapshot_nfs], [share_cifs, snapshot_cifs])
@ddt.unpack
def test_create_share_from_snapshot(self, share, snapshot):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "cifs_share_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock())
result = self._driver.create_share_from_snapshot('context',
share, snapshot)
share,
snapshot)
self.assertEqual('172.24.44.10:/shares/' + share['id'], result)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'],
'/snapshots/' + share['id'] + '/' + snapshot['id'],
'/shares/' + share['id'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
if share['share_proto'].lower() == 'nfs':
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share['id'])
self.assertEqual('172.24.44.10:/shares/' + share_nfs['id'], result)
self.assertFalse(ssh.HNASSSHBackend.cifs_share_add.called)
else:
ssh.HNASSSHBackend.cifs_share_add.assert_called_once_with(
share['id'])
self.assertEqual('\\\\172.24.44.10\\' + share['id'], result)
self.assertFalse(ssh.HNASSSHBackend.nfs_export_add.called)
def test_create_share_from_snapshot_empty_snapshot(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(hds_hnas.HDSHNASDriver, "_get_hnas_share_id",
mock.Mock(return_value='hnas_id'))
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock(
side_effect=exception.HNASNothingToCloneException('msg')))
self.mock_object(ssh.HNASSSHBackend, "nfs_export_add", mock.Mock())
result = self._driver.create_share_from_snapshot('context', share,
snapshot)
result = self._driver.create_share_from_snapshot('context', share_nfs,
snapshot_nfs)
self.assertEqual('172.24.44.10:/shares/' + share['id'], result)
self.assertEqual('172.24.44.10:/shares/' + share_nfs['id'], result)
self.assertTrue(self.mock_log.warning.called)
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share['id'],
share['size'])
ssh.HNASSSHBackend.vvol_create.assert_called_once_with(share_nfs['id'])
ssh.HNASSSHBackend.quota_add.assert_called_once_with(share_nfs['id'],
share_nfs['size'])
ssh.HNASSSHBackend.tree_clone.assert_called_once_with(
'/snapshots/' + 'hnas_id' + '/' + snapshot['id'],
'/shares/' + share['id'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(share['id'])
'/snapshots/' + share_nfs['id'] + '/' + snapshot_nfs['id'],
'/shares/' + share_nfs['id'])
ssh.HNASSSHBackend.nfs_export_add.assert_called_once_with(
share_nfs['id'])
def test_create_share_from_snapshot_invalid_protocol(self):
self.mock_object(hds_hnas.HDSHNASDriver, "_check_fs_mounted",
mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "vvol_create", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "quota_add", mock.Mock())
self.mock_object(ssh.HNASSSHBackend, "tree_clone", mock.Mock())
ex = self.assertRaises(exception.ShareBackendException,
self._driver.create_share_from_snapshot,
'context', invalid_share, snapshot_nfs)
self.assertEqual(invalid_protocol_msg, ex.msg)
def test__check_fs_mounted(self):
self.mock_object(ssh.HNASSSHBackend, 'check_fs_mounted', mock.Mock(
return_value=True))
self._driver._check_fs_mounted()
ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with()
@ -505,8 +711,8 @@ class HDSHNASTestCase(test.TestCase):
self.mock_object(ssh.HNASSSHBackend, 'check_fs_mounted', mock.Mock(
return_value=False))
self.assertRaises(
exception.HNASBackendException, self._driver._check_fs_mounted)
self.assertRaises(exception.HNASBackendException,
self._driver._check_fs_mounted)
ssh.HNASSSHBackend.check_fs_mounted.assert_called_once_with()
@ -516,8 +722,8 @@ class HDSHNASTestCase(test.TestCase):
'driver_handles_share_servers':
self._driver.driver_handles_share_servers,
'vendor_name': 'HDS',
'driver_version': '2.0.0',
'storage_protocol': 'NFS',
'driver_version': '3.0.0',
'storage_protocol': 'NFS_CIFS',
'total_capacity_gb': 1000,
'free_capacity_gb': 200,
'reserved_percentage': hds_hnas.CONF.reserved_share_percentage,

View File

@ -382,6 +382,81 @@ HNAS_RESULT_unmounted_filesystem = """
file_system 1055 fake_span Umount 2 4 5 1
"""
HNAS_RESULT_cifs_list = """
Share name: vvol_test
Share path: \\\\shares\\vvol_test
Share users: 2
Share online: Yes
Share comment:
Cache options: Manual local caching for documents
ABE enabled: Yes
Continuous Availability: No
Access snapshots: No
Display snapshots: No
ShadowCopy enabled: Yes
Lower case on create: No
Follow symlinks: Yes
Follow global symlinks: No
Scan for viruses: Yes
File system label: file_system
File system size: 9.938 GB
File system free space: 6.763 GB
File system state:
formatted = Yes
mounted = Yes
failed = No
thin provisioned = No
Disaster recovery setting:
Recovered = No
Transfer setting = Use file system default
Home directories: Off
Mount point options:
"""
HNAS_RESULT_different_fs_cifs_list = """
Share name: vvol_test
Share path: \\\\shares\\vvol_test
Share users: 0
Share online: Yes
Share comment:
Cache options: Manual local caching for documents
ABE enabled: Yes
Continuous Availability: No
Access snapshots: No
Display snapshots: No
ShadowCopy enabled: Yes
Lower case on create: No
Follow symlinks: Yes
Follow global symlinks: No
Scan for viruses: Yes
File system label: different_filesystem
File system size: 9.938 GB
File system free space: 6.763 GB
File system state:
formatted = Yes
mounted = Yes
failed = No
thin provisioned = No
Disaster recovery setting:
Recovered = No
Transfer setting = Use file system default
Home directories: Off
Mount point options:
"""
HNAS_RESULT_list_cifs_permissions = """ \
Displaying the details of the share 'vvol_test' on file system 'filesystem' ...
Maximum user count is unlimited
Type Permission User/Group
U Deny Read NFSv4 user\\user1@domain.com
G Deny Change & Read Unix user\\1087
U Allow Full Control Unix user\\1088
U Allow Read Unix user\\1089
? Deny Full Control NFSv4 user\\user2@company.com
X Allow Change & Read Unix user\\1090
"""
@ddt.ddt
class HNASSSHTestCase(test.TestCase):
@ -425,7 +500,7 @@ class HNASSSHTestCase(test.TestCase):
}
def test_get_stats(self):
fake_list_command = ['df', '-a', '-f', 'file_system']
fake_list_command = ['df', '-a', '-f', self.fs_name]
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(return_value=(HNAS_RESULT_df_tb, "")))
@ -471,33 +546,192 @@ class HNASSSHTestCase(test.TestCase):
self._driver_ssh.nfs_export_del, 'vvol_test')
self.assertTrue(self.mock_log.exception.called)
def test_get_host_list(self):
def test_cifs_share_add(self):
fake_cifs_add_command = ['cifs-share', 'add', '-S', 'disable',
'--enable-abe', '--nodefaultsaa',
'vvol_test', self.fs_name,
r'\\shares\\vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_share_add('vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_add_command)
def test_cifs_share_del(self):
fake_cifs_del_command = ['cifs-share', 'del', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_share_del('vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_del_command)
def test_cifs_share_del_inexistent_share(self):
fake_cifs_del_command = ['cifs-share', 'del', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=putils.ProcessExecutionError(
exit_code=1)))
self._driver_ssh.cifs_share_del('vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_del_command)
self.assertTrue(self.mock_log.warning.called)
def test_cifs_share_del_exception(self):
fake_cifs_del_command = ['cifs-share', 'del', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=putils.ProcessExecutionError))
self.assertRaises(exception.HNASBackendException,
self._driver_ssh.cifs_share_del, 'vvol_test')
self._driver_ssh._execute.assert_called_with(fake_cifs_del_command)
def test_get_nfs_host_list(self):
self.mock_object(ssh.HNASSSHBackend, "_get_share_export", mock.Mock(
return_value=[ssh.Export(HNAS_RESULT_export)]))
host_list = self._driver_ssh.get_host_list('fake_id')
host_list = self._driver_ssh.get_nfs_host_list('fake_id')
self.assertEqual(['127.0.0.2'], host_list)
def test_update_access_rule_empty_host_list(self):
def test_update_nfs_access_rule_empty_host_list(self):
fake_export_command = ['nfs-export', 'mod', '-c', '127.0.0.1',
'/shares/fake_id']
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock())
self._driver_ssh.update_access_rule("fake_id", [])
self._driver_ssh.update_nfs_access_rule("fake_id", [])
self._driver_ssh._execute.assert_called_with(fake_export_command)
def test_update_access_rule(self):
def test_update_nfs_access_rule(self):
fake_export_command = ['nfs-export', 'mod', '-c',
u'"127.0.0.1,127.0.0.2"', '/shares/fake_id']
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock())
self._driver_ssh.update_access_rule("fake_id", ['127.0.0.1',
'127.0.0.2'])
self._driver_ssh.update_nfs_access_rule("fake_id", ['127.0.0.1',
'127.0.0.2'])
self._driver_ssh._execute.assert_called_with(fake_export_command)
def test_cifs_allow_access(self):
fake_cifs_allow_command = ['cifs-saa', 'add', '--target-label',
self.fs_name, 'vvol_test',
'fake_user', 'ar']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_allow_access('vvol_test', 'fake_user', 'ar')
self._driver_ssh._execute.assert_called_with(fake_cifs_allow_command)
def test_cifs_allow_access_already_allowed_user(self):
fake_cifs_allow_command = ['cifs-saa', 'add', '--target-label',
self.fs_name, 'vvol_test',
'fake_user', 'acr']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=[putils.ProcessExecutionError(
stderr='already listed as a user')]))
self._driver_ssh.cifs_allow_access('vvol_test', 'fake_user', 'acr')
self._driver_ssh._execute.assert_called_with(fake_cifs_allow_command)
self.assertTrue(self.mock_log.debug.called)
def test_cifs_allow_access_exception(self):
fake_cifs_allow_command = ['cifs-saa', 'add', '--target-label',
self.fs_name, 'vvol_test',
'fake_user', 'acr']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=[putils.ProcessExecutionError(
stderr='Could not add user/group fake_user to '
'share \'vvol_test\'')]))
self.assertRaises(exception.InvalidShareAccess,
self._driver_ssh.cifs_allow_access, 'vvol_test',
'fake_user', 'acr')
self._driver_ssh._execute.assert_called_with(fake_cifs_allow_command)
def test_cifs_deny_access(self):
fake_cifs_deny_command = ['cifs-saa', 'delete', '--target-label',
self.fs_name, 'vvol_test', 'fake_user']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock())
self._driver_ssh.cifs_deny_access('vvol_test', 'fake_user')
self._driver_ssh._execute.assert_called_with(fake_cifs_deny_command)
def test_cifs_deny_access_already_deleted_user(self):
fake_cifs_deny_command = ['cifs-saa', 'delete', '--target-label',
self.fs_name, 'vvol_test', 'fake_user']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='not listed as a user')]))
self._driver_ssh.cifs_deny_access('vvol_test', 'fake_user')
self._driver_ssh._execute.assert_called_with(fake_cifs_deny_command)
self.assertTrue(self.mock_log.debug.called)
def test_cifs_deny_access_backend_exception(self):
fake_cifs_deny_command = ['cifs-saa', 'delete', '--target-label',
self.fs_name, 'vvol_test', 'fake_user']
self.mock_object(ssh.HNASSSHBackend, '_execute',
mock.Mock(side_effect=[putils.ProcessExecutionError(
stderr='Unexpected error')]))
self.assertRaises(exception.HNASBackendException,
self._driver_ssh.cifs_deny_access, 'vvol_test',
'fake_user')
self._driver_ssh._execute.assert_called_with(fake_cifs_deny_command)
def test_list_cifs_permission(self):
fake_cifs_list_command = ['cifs-saa', 'list', '--target-label',
self.fs_name, 'vvol_test']
expected_out = ssh.CIFSPermissions(HNAS_RESULT_list_cifs_permissions)
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=(HNAS_RESULT_list_cifs_permissions, '')))
out = self._driver_ssh.list_cifs_permissions('vvol_test')
for i in range(len(expected_out.permission_list)):
self.assertEqual(expected_out.permission_list[i], out[i])
self._driver_ssh._execute.assert_called_with(fake_cifs_list_command)
def test_list_cifs_no_permissions_added(self):
fake_cifs_list_command = ['cifs-saa', 'list', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='No entries for this share')]))
out = self._driver_ssh.list_cifs_permissions('vvol_test')
self.assertEqual([], out)
self._driver_ssh._execute.assert_called_with(fake_cifs_list_command)
self.assertTrue(self.mock_log.debug.called)
def test_list_cifs_exception(self):
fake_cifs_list_command = ['cifs-saa', 'list', '--target-label',
self.fs_name, 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='Error.')]))
self.assertRaises(exception.HNASBackendException,
self._driver_ssh.list_cifs_permissions,
"vvol_test")
self._driver_ssh._execute.assert_called_with(fake_cifs_list_command)
self.assertTrue(self.mock_log.exception.called)
def test_tree_clone_nothing_to_clone(self):
fake_tree_clone_command = ['tree-clone-job-submit', '-e', '-f',
self.fs_name, '/src', '/dst']
@ -722,6 +956,70 @@ class HNASSSHTestCase(test.TestCase):
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh.check_export, "vvol_test")
def test_check_cifs(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_cifs_list, '']))
self._driver_ssh.check_cifs('vvol_test')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_check_cifs_inexistent_share(self):
check_cifs_share_command = ['cifs-share', 'list', 'wrong_vvol']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(
stderr='Export wrong_vvol does not exist on backend '
'anymore.')]))
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh.check_cifs, 'wrong_vvol')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_check_cifs_exception(self):
check_cifs_share_command = ['cifs-share', 'list', 'wrong_vvol']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
side_effect=[putils.ProcessExecutionError(stderr='Error.')]))
self.assertRaises(putils.ProcessExecutionError,
self._driver_ssh.check_cifs, 'wrong_vvol')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_check_cifs_different_fs_exception(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_different_fs_cifs_list, '']))
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh.check_cifs, 'vvol_test')
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_is_cifs_in_use(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_cifs_list, '']))
out = self._driver_ssh.is_cifs_in_use('vvol_test')
self.assertTrue(out)
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_is_cifs_without_use(self):
check_cifs_share_command = ['cifs-share', 'list', 'vvol_test']
self.mock_object(ssh.HNASSSHBackend, '_execute', mock.Mock(
return_value=[HNAS_RESULT_different_fs_cifs_list, '']))
out = self._driver_ssh.is_cifs_in_use('vvol_test')
self.assertFalse(out)
self._driver_ssh._execute.assert_called_with(check_cifs_share_command)
def test_get_share_quota(self):
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock(
return_value=(HNAS_RESULT_quota, '')))
@ -795,8 +1093,7 @@ class HNASSSHTestCase(test.TestCase):
self.mock_object(ssh.HNASSSHBackend, "_execute", mock.Mock(
side_effect=putils.ProcessExecutionError(
stderr="NFS Export List: Export 'id' does not exist.")
))
stderr="NFS Export List: Export 'id' does not exist.")))
self.assertRaises(exception.HNASItemNotFoundException,
self._driver_ssh._get_share_export, 'fake_id')

View File

@ -0,0 +1,8 @@
---
prelude: >
Add support for CIFS protocol in Manila HNAS driver.
features:
- Added support for CIFS shares in Hitachi HNAS driver. It supports
user access type, where a permission for a user or a group can be
added/removed. Also, accepts 'read write' and 'read only' as access
level.