Browse Source

Add QNAP Manila Driver

This driver supports below features:
 - Create NFS Share
 - Delete NFS Share
 - Allow NFS Share access (IP access type)
 - Deny NFS Share access
 - Create snapshot
 - Delete snapshot
 - Create share from snapshot
 - Extend share
 - Manage share
 - Unmanage share
 - Manage snapshot
 - Unmanage snapshot

DocImpact
Implements: blueprint qnap-manila-driver

Change-Id: I4e4278a870af7be1c026385b85ea309b2d1773a9
changes/03/394703/42
Pony Chou 5 years ago
parent
commit
cf182947b7
  1. 10
      doc/source/devref/share_back_ends_feature_support_mapping.rst
  2. 2
      manila/opts.py
  3. 0
      manila/share/drivers/qnap/__init__.py
  4. 646
      manila/share/drivers/qnap/api.py
  5. 707
      manila/share/drivers/qnap/qnap.py
  6. 6
      manila/tests/conf_fixture.py
  7. 0
      manila/tests/share/drivers/qnap/__init__.py
  8. 527
      manila/tests/share/drivers/qnap/fakes.py
  9. 788
      manila/tests/share/drivers/qnap/test_api.py
  10. 1111
      manila/tests/share/drivers/qnap/test_qnap.py
  11. 4
      releasenotes/notes/qnap-manila-driver-a30fe4011cb90801.yaml

10
doc/source/devref/share_back_ends_feature_support_mapping.rst

@ -81,7 +81,8 @@ Mapping of share drivers and share features support
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+--------------------+
| MapRFS | O | O | O | O | O | O | O | \- |
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+--------------------+
| QNAP | O | O | O | \- | O | O | O | \- |
+----------------------------------------+-----------------------+-----------------------+--------------+--------------+------------------------+----------------------------+--------------------------+--------------------+
Mapping of share drivers and share access rules support
-------------------------------------------------------
@ -139,6 +140,8 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| MapRFS | \- | MapRFS(O) | \- | \- | \- | MapRFS(O) | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
| QNAP | NFS (O) | \- | \- | \- | NFS (O) | \- | \- | \- |
+----------------------------------------+--------------+----------------+------------+--------------+--------------+----------------+------------+------------+
Mapping of share drivers and security services support
------------------------------------------------------
@ -194,7 +197,8 @@ Mapping of share drivers and security services support
+----------------------------------------+------------------+-----------------+------------------+
| MapRFS | \- | \- | \- |
+----------------------------------------+------------------+-----------------+------------------+
| QNAP | \- | \- | \- |
+----------------------------------------+------------------+-----------------+------------------+
Mapping of share drivers and common capabilities
------------------------------------------------
@ -252,6 +256,8 @@ More information: :ref:`capabilities_and_extra_specs`
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+
| MapRFS | \- | N | \- | \- | \- | N | \- | O | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+
| QNAP | \- | O | \- | \- | O | \- | \- | O | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+
.. note::

2
manila/opts.py

@ -70,6 +70,7 @@ import manila.share.drivers.lvm
import manila.share.drivers.maprfs.maprfs_native
import manila.share.drivers.netapp.options
import manila.share.drivers.nexenta.options
import manila.share.drivers.qnap.qnap
import manila.share.drivers.quobyte.quobyte
import manila.share.drivers.service_instance
import manila.share.drivers.tegile.tegile
@ -147,6 +148,7 @@ _global_opt_lists = [
manila.share.drivers.nexenta.options.nexenta_connection_opts,
manila.share.drivers.nexenta.options.nexenta_dataset_opts,
manila.share.drivers.nexenta.options.nexenta_nfs_opts,
manila.share.drivers.qnap.qnap.qnap_manila_opts,
manila.share.drivers.quobyte.quobyte.quobyte_manila_share_opts,
manila.share.drivers.service_instance.common_opts,
manila.share.drivers.service_instance.no_share_servers_handling_mode_opts,

0
manila/share/drivers/qnap/__init__.py

646
manila/share/drivers/qnap/api.py

@ -0,0 +1,646 @@
# Copyright (c) 2016 QNAP Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
API for QNAP Storage.
"""
import base64
import functools
import re
import ssl
try:
import xml.etree.cElementTree as ET
except ImportError:
import xml.etree.ElementTree as ET
from oslo_log import log as logging
import six
from six.moves import http_client
from six.moves import urllib
from manila import exception
from manila.i18n import _
from manila import utils
LOG = logging.getLogger(__name__)
MSG_SESSION_EXPIRED = _("Session ID expired")
MSG_UNEXPECT_RESP = _("Unexpected response from QNAP API")
@utils.retry(exception=exception.ShareBackendException,
retries=5)
def _connection_checker(func):
"""Decorator to check session has expired or not."""
@functools.wraps(func)
def inner_connection_checker(self, *args, **kwargs):
LOG.debug('in _connection_checker')
pattern = re.compile(r".*Session ID expired.$")
try:
return func(self, *args, **kwargs)
except exception.ShareBackendException as e:
matches = pattern.match(six.text_type(e))
if matches:
LOG.debug('Session might have expired.'
' Trying to relogin')
self._login()
raise
return inner_connection_checker
class QnapAPIExecutor(object):
"""Makes QNAP API calls for ES NAS."""
def __init__(self, *args, **kwargs):
self.sid = None
self.username = kwargs['username']
self.password = kwargs['password']
self.ip, self.port, self.ssl = (
self._parse_management_url(kwargs['management_url']))
self._login()
def _parse_management_url(self, management_url):
pattern = re.compile(r"(http|https)\:\/\/(\S+)\:(\d+)")
matches = pattern.match(management_url)
if matches.group(1) == 'http':
management_ssl = False
else:
management_ssl = True
management_ip = matches.group(2)
management_port = matches.group(3)
return management_ip, management_port, management_ssl
def _prepare_connection(self, isSSL, ip, port):
if isSSL:
if hasattr(ssl, '_create_unverified_context'):
context = ssl._create_unverified_context()
connection = http_client.HTTPSConnection(ip,
port=port,
context=context)
else:
connection = http_client.HTTPSConnection(ip,
port=port)
else:
connection = http_client.HTTPConnection(ip, port)
return connection
def get_basic_info(self, management_url):
"""Get the basic information of NAS."""
LOG.debug('in get_basic_info')
management_ip, management_port, management_ssl = (
self._parse_management_url(management_url))
connection = self._prepare_connection(management_ssl,
management_ip,
management_port)
connection.request('GET', '/cgi-bin/authLogin.cgi')
response = connection.getresponse()
data = response.read()
LOG.debug('response data: %s', data)
root = ET.fromstring(data)
display_model_name = root.find('model/displayModelName').text
internal_model_name = root.find('model/internalModelName').text
fw_version = root.find('firmware/version').text
connection.close()
return display_model_name, internal_model_name, fw_version
def _execute_and_get_response_details(self, nas_ip, url):
"""Will prepare response after executing a http request."""
LOG.debug('port: %(port)s, ssl: %(ssl)s',
{'port': self.port, 'ssl': self.ssl})
res_details = {}
# Prepare the connection
connection = self._prepare_connection(self.ssl,
nas_ip,
self.port)
# Make the connection
LOG.debug('url : %s', url)
connection.request('GET', url)
# Extract the response as the connection was successful
response = connection.getresponse()
# Read the response
data = response.read()
LOG.debug('response data: %s', data)
res_details['data'] = data
res_details['error'] = None
res_details['http_status'] = response.status
connection.close()
return res_details
def execute_login(self):
"""Login and return sid."""
params = {
'user': self.username,
'pwd': base64.b64encode(self.password.encode("utf-8")),
'serviceKey': '1',
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/authLogin.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
session_id = root.find('authSid').text
return session_id
def _login(self):
"""Execute Https Login API."""
self.sid = self.execute_login()
LOG.debug('sid: %s', self.sid)
def _sanitize_params(self, params):
sanitized_params = {}
for key in params:
value = params[key]
if value is not None:
sanitized_params[key] = six.text_type(value)
return sanitized_params
@_connection_checker
def create_share(self, share, pool_name, create_share_name, share_proto):
"""Create share."""
LOG.debug('create_share_name: %s', create_share_name)
params = {
'wiz_func': 'share_create',
'action': 'add_share',
'vol_name': create_share_name,
'vol_size': six.text_type(share['size']) + 'GB',
'threshold': '80',
'dedup': 'off',
'compression': '1',
'thin_pro': '0',
'cache': '0',
'cifs_enable': '0' if share_proto == 'NFS' else '1',
'nfs_enable': '0' if share_proto == 'CIFS' else '1',
'afp_enable': '0',
'ftp_enable': '0',
'encryption': '0',
'hidden': '0',
'oplocks': '1',
'sync': 'always',
'userrw0': 'admin',
'userrd_len': '0',
'userrw_len': '1',
'userno_len': '0',
'access_r': 'setup_users',
'path_type': 'auto',
'recycle_bin': '1',
'recycle_bin_administrators_only': '0',
'pool_name': pool_name,
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/wizReq.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('ES_RET_CODE').text < '0':
msg = _('Create share %s failed') % share['display_name']
raise exception.ShareBackendException(msg=msg)
vol_list = root.find('func').find('ownContent').find('volumeList')
vol_info_tree = vol_list.findall('volume')
for vol in vol_info_tree:
LOG.debug('Iterating vol name: %(name)s, index: %(id)s',
{'name': vol.find('volumeLabel').text,
'id': vol.find('volumeValue').text})
if (create_share_name == vol.find('volumeLabel').text):
LOG.debug('volumeLabel:%s', vol.find('volumeLabel').text)
return vol.find('volumeValue').text
return res_details['data']
@_connection_checker
def delete_share(self, vol_id, *args, **kwargs):
"""Execute delete share API."""
params = {
'func': 'volume_mgmt',
'vol_remove': '1',
'volumeID': vol_id,
'stop_service': 'no',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
msg = _('Delete share id: %s failed') % vol_id
raise exception.ShareBackendException(msg=msg)
@_connection_checker
def get_specific_poolinfo(self, pool_id):
"""Execute get_specific_poolinfo API."""
params = {
'store': 'poolInfo',
'func': 'extra_get',
'poolID': pool_id,
'Pool_Info': '1',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
msg = _('get_specific_poolinfo failed')
raise exception.ShareBackendException(msg=msg)
pool_list = root.find('Pool_Index')
pool_info_tree = pool_list.findall('row')
for pool in pool_info_tree:
if pool_id == pool.find('poolID').text:
LOG.debug('poolID: %s', pool.find('poolID').text)
return pool
@_connection_checker
def get_share_info(self, pool_id, **kwargs):
"""Execute get_share_info API."""
for key, value in six.iteritems(kwargs):
LOG.debug('%(key)s = %(val)s',
{'key': key, 'val': value})
params = {
'store': 'poolVolumeList',
'poolID': pool_id,
'func': 'extra_get',
'Pool_Vol_Info': '1',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if ('vol_no' in kwargs) or ('vol_label' in kwargs):
vol_list = root.find('Volume_Info')
vol_info_tree = vol_list.findall('row')
for vol in vol_info_tree:
LOG.debug('Iterating vol name: %(name)s, index: %(id)s',
{'name': vol.find('vol_label').text,
'id': vol.find('vol_no').text})
if 'vol_no' in kwargs:
if kwargs['vol_no'] == vol.find('vol_no').text:
LOG.debug('vol_no:%s',
vol.find('vol_no').text)
return vol
elif 'vol_label' in kwargs:
if kwargs['vol_label'] == vol.find('vol_label').text:
LOG.debug('vol_label:%s', vol.find('vol_label').text)
return vol
if vol is vol_info_tree[-1]:
return None
else:
return res_details['data']
@_connection_checker
def get_specific_volinfo(self, vol_id, **kwargs):
"""Execute get_specific_volinfo API."""
params = {
'store': 'volumeInfo',
'volumeID': vol_id,
'func': 'extra_get',
'Volume_Info': '1',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/disk_manage.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
vol_list = root.find('Volume_Info')
vol_info_tree = vol_list.findall('row')
for vol in vol_info_tree:
if vol_id == vol.find('vol_no').text:
LOG.debug('vol_no: %s', vol.find('vol_no').text)
return vol
@_connection_checker
def get_snapshot_info(self, **kwargs):
"""Execute get_snapshot_info API."""
params = {
'func': 'extra_get',
'volumeID': kwargs['volID'],
'snapshot_list': '1',
'snap_start': '0',
'snap_count': '100',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
snapshot_list = root.find('SnapshotList')
# if snapshot_list is None:
if not snapshot_list:
return None
if ('snapshot_name' in kwargs):
snapshot_tree = snapshot_list.findall('row')
for snapshot in snapshot_tree:
if (kwargs['snapshot_name'] ==
snapshot.find('snapshot_name').text):
LOG.debug('snapshot_name:%s', kwargs['snapshot_name'])
return snapshot
if (snapshot is snapshot_tree[-1]):
return None
return res_details['data']
@_connection_checker
def create_snapshot_api(self, volumeID, snapshot_name):
"""Execute CGI to create snapshot from source share."""
LOG.debug('volumeID: %s', volumeID)
LOG.debug('snapshot_name: %s', snapshot_name)
params = {
'func': 'create_snapshot',
'volumeID': volumeID,
'snapshot_name': snapshot_name,
'expire_min': '0',
'vital': '1',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('ES_RET_CODE').text < '0':
msg = _('Create snapshot failed')
raise exception.ShareBackendException(msg=msg)
@_connection_checker
def delete_snapshot_api(self, snapshot_id):
"""Execute CGI to delete snapshot from snapshot_id."""
params = {
'func': 'del_snapshots',
'snapshotID': snapshot_id,
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
# snapshot not exist
if root.find('result').text == '-206021':
return
# lun not exist
if root.find('result').text == '-200005':
return
if root.find('result').text < '0':
msg = _('Failed to delete snapshot.')
raise exception.ShareBackendException(msg=msg)
@_connection_checker
def clone_snapshot(self, snapshot_id, new_sharename):
"""Execute CGI to clone snapshot as share."""
params = {
'func': 'clone_qsnapshot',
'by_vol': '1',
'snapshotID': snapshot_id,
'new_name': new_sharename,
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
msg = _('Failed to clone snapshot.')
raise exception.ShareBackendException(msg=msg)
@_connection_checker
def edit_share(self, share_dict):
"""Edit share properties."""
LOG.debug('share_dict[sharename]: %s', share_dict['sharename'])
params = {
'wiz_func': 'share_property',
'action': 'share_property',
'sharename': share_dict['sharename'],
'old_sharename': share_dict['old_sharename'],
'vol_size': six.text_type(share_dict['new_size']) + 'GB',
'dedup': '0',
'compression': '1',
'thin_pro': '0',
'cache': '0',
'afp_enable': '0',
'ftp_enable': '1',
'hidden': '0',
'oplocks': '1',
'sync': 'always',
'recycle_bin': '1',
'recycle_bin_administrators_only': '0',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('ES_RET_CODE').text < '0':
msg = _('Edit sharename %s failed') % share_dict['sharename']
raise exception.ShareBackendException(msg=msg)
@_connection_checker
def get_host_list(self, **kwargs):
"""Execute get_host_list API."""
params = {
'module': 'hosts',
'func': 'get_hostlist',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
host_list = root.find('content').find('host_list')
# if host_list is None:
if not host_list:
return None
return_hosts = []
host_tree = host_list.findall('host')
for host in host_tree:
LOG.debug('host:%s', host)
return_hosts.append(host)
return return_hosts
@_connection_checker
def add_host(self, hostname, ipv4):
"""Execute add_host API."""
params = {
'module': 'hosts',
'func': 'apply_addhost',
'name': hostname,
'ipaddr_v4': ipv4,
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/accessrights/accessrightsRequest.cgi?%s' %
sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
@_connection_checker
def set_nfs_access(self, sharename, access, host_name):
"""Execute set_nfs_access API."""
params = {
'wiz_func': 'share_nfs_control',
'action': 'share_nfs_control',
'sharename': sharename,
'access': access,
'host_name': host_name,
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/priv/privWizard.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
class QnapAPIExecutorTS(QnapAPIExecutor):
"""Makes QNAP API calls for TS NAS."""
@_connection_checker
def get_snapshot_info(self, **kwargs):
"""Execute get_snapshot_info API."""
for key, value in six.iteritems(kwargs):
LOG.debug('%(key)s = %(val)s',
{'key': key, 'val': value})
params = {
'func': 'extra_get',
'LUNIndex': kwargs['lun_index'],
'smb_snapshot_list': '1',
'smb_snapshot': '1',
'snapshot_list': '1',
'sid': self.sid,
}
sanitized_params = self._sanitize_params(params)
sanitized_params = urllib.parse.urlencode(sanitized_params)
url = ('/cgi-bin/disk/snapshot.cgi?%s' % sanitized_params)
res_details = self._execute_and_get_response_details(self.ip, url)
root = ET.fromstring(res_details['data'])
if root.find('authPassed').text == '0':
raise exception.ShareBackendException(msg=MSG_SESSION_EXPIRED)
if root.find('result').text < '0':
raise exception.ShareBackendException(msg=MSG_UNEXPECT_RESP)
snapshot_list = root.find('SnapshotList')
if snapshot_list is None:
return None
snapshot_tree = snapshot_list.findall('row')
for snapshot in snapshot_tree:
if (kwargs['snapshot_name'] ==
snapshot.find('snapshot_name').text):
LOG.debug('snapshot_name:%s', kwargs['snapshot_name'])
return snapshot
return None

707
manila/share/drivers/qnap/qnap.py

@ -0,0 +1,707 @@
# Copyright (c) 2016 QNAP Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Share driver for QNAP Storage.
This driver supports QNAP Storage for NFS.
"""
import re
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from oslo_utils import units
from manila.common import constants
from manila import exception
from manila import share
from manila.i18n import _, _LE, _LI, _LW
from manila.share import driver
from manila.share.drivers.qnap import api
from manila import utils
LOG = logging.getLogger(__name__)
qnap_manila_opts = [
cfg.StrOpt('qnap_management_url',
required=True,
help='The URL to manage QNAP Storage.'),
cfg.StrOpt('qnap_share_ip',
required=True,
help='NAS share IP for mounting shares.'),
cfg.StrOpt('qnap_nas_login',
required=True,
help='Username for QNAP storage.'),
cfg.StrOpt('qnap_nas_password',
required=True,
secret=True,
help='Password for QNAP storage.'),
cfg.StrOpt('qnap_poolname',
required=True,
help='Pool within which QNAP shares must be created.'),
]
CONF = cfg.CONF
CONF.register_opts(qnap_manila_opts)
class QnapShareDriver(driver.ShareDriver):
"""OpenStack driver to enable QNAP Storage.
Version history:
1.0.0 - Initial driver (Only NFS)
"""
DRIVER_VERSION = '1.0.0'
def __init__(self, *args, **kwargs):
"""Initialize QnapShareDriver."""
super(QnapShareDriver, self).__init__(False, *args, **kwargs)
self.private_storage = kwargs.get('private_storage')
self.api_executor = None
self.group_stats = {}
self.configuration.append_config_values(qnap_manila_opts)
self.share_api = share.API()
def do_setup(self, context):
"""Setup the QNAP Manila share driver."""
self.ctxt = context
LOG.debug('context: %s', context)
# Setup API Executor
try:
self.api_executor = self._create_api_executor()
except Exception:
LOG.exception(_LE('Failed to create HTTP client. Check IP '
'address, port, username, password and make '
'sure the array version is compatible.'))
raise
def check_for_setup_error(self):
"""Check the status of setup."""
if self.api_executor is None:
msg = _("Failed to instantiate API client to communicate with "
"QNAP storage systems.")
raise exception.ShareBackendException(msg=msg)
def _create_api_executor(self):
"""Create API executor by NAS model."""
"""LOG.debug('CONF.qnap_nas_login=%(conf)s',
{'conf': CONF.qnap_nas_login})
LOG.debug('self.configuration.qnap_nas_login=%(conf)s',
{'conf': self.configuration.qnap_nas_login})"""
self.api_executor = api.QnapAPIExecutor(
username=self.configuration.qnap_nas_login,
password=self.configuration.qnap_nas_password,
management_url=self.configuration.qnap_management_url)
display_model_name, internal_model_name, fw_version = (
self.api_executor.get_basic_info(
self.configuration.qnap_management_url))
pattern = re.compile(r"^([A-Z]+)-?[A-Z]{0,2}(\d+)\d{2}(U|[a-z]*)")
matches = pattern.match(display_model_name)
if not matches:
return None
model_type = matches.group(1)
ts_model_types = (
"TS", "SS", "IS", "TVS", "TDS", "TBS"
)
tes_model_types = (
"TES",
)
es_model_types = (
"ES",
)
if model_type in ts_model_types:
if (fw_version.startswith("4.2") or fw_version.startswith("4.3")):
LOG.debug('Create TS API Executor')
# modify the pool name to pool index
self.configuration.qnap_poolname = (
self._get_ts_model_pool_id(
self.configuration.qnap_poolname))
return api.QnapAPIExecutorTS(
username=self.configuration.qnap_nas_login,
password=self.configuration.qnap_nas_password,
management_url=self.configuration.qnap_management_url)
elif model_type in tes_model_types:
if 'TS' in internal_model_name:
if (fw_version.startswith("4.2") or
fw_version.startswith("4.3")):
LOG.debug('Create TS API Executor')
# modify the pool name to pool index
self.configuration.qnap_poolname = (
self._get_ts_model_pool_id(
self.configuration.qnap_poolname))
return api.QnapAPIExecutorTS(
username=self.configuration.qnap_nas_login,
password=self.configuration.qnap_nas_password,
management_url=self.configuration.qnap_management_url)
if (fw_version.startswith("1.1.2") or
fw_version.startswith("1.1.3")):
LOG.debug('Create ES API Executor')
return api.QnapAPIExecutor(
username=self.configuration.qnap_nas_login,
password=self.configuration.qnap_nas_password,
management_url=self.configuration.qnap_management_url)
elif model_type in es_model_types:
if (fw_version.startswith("1.1.2") or
fw_version.startswith("1.1.3")):
LOG.debug('Create ES API Executor')
return api.QnapAPIExecutor(
username=self.configuration.qnap_nas_login,
password=self.configuration.qnap_nas_password,
management_url=self.configuration.qnap_management_url)
msg = _('QNAP Storage model is not supported by this driver.')
raise exception.ShareBackendException(msg=msg)
def _get_ts_model_pool_id(self, pool_name):
"""Modify the pool name to pool index."""
pattern = re.compile(r"^(\d+)+|^Storage Pool (\d+)+")
matches = pattern.match(pool_name)
if matches.group(1):
return matches.group(1)
else:
return matches.group(2)
@utils.synchronized('qnap-gen_name')
def _gen_random_name(self, type):
if type == 'share':
infix = "shr-"
elif type == 'snapshot':
infix = "snp-"
elif type == 'host':
infix = "hst-"
else:
infix = ""
return ("manila-%(ifx)s%(time)s" %
{'ifx': infix,
'time': timeutils.utcnow().strftime('%Y%m%d%H%M%S%f')})
def _get_location_path(self, share_name, share_proto, ip):
if share_proto == 'NFS':
created_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_label=share_name)
vol_no = created_share.find('vol_no').text
vol = self.api_executor.get_specific_volinfo(vol_no)
vol_mount_path = vol.find('vol_mount_path').text
location = '%s:%s' % (ip, vol_mount_path)
else:
msg = _('Invalid NAS protocol: %s') % share_proto
raise exception.InvalidInput(reason=msg)
export_location = {
'path': location,
'is_admin_only': False,
}
return export_location
def _update_share_stats(self):
"""Get latest share stats."""
backend_name = (self.configuration.safe_get(
'share_backend_name') or
self.__class__.__name__)
LOG.debug('backend_name=%(backend_name)s',
{'backend_name': backend_name})
selected_pool = self.api_executor.get_specific_poolinfo(
self.configuration.qnap_poolname)
total_capacity_gb = (int(selected_pool.find('capacity_bytes').text) /
units.Gi)
LOG.debug('total_capacity_gb: %s GB', total_capacity_gb)
free_capacity_gb = (int(selected_pool.find('freesize_bytes').text) /
units.Gi)
LOG.debug('free_capacity_gb: %s GB', free_capacity_gb)
alloc_capacity_gb = (int(selected_pool.find('allocated_bytes').text) /
units.Gi)
LOG.debug('allocated_capacity_gb: %s GB', alloc_capacity_gb)
reserved_percentage = self.configuration.safe_get(
'reserved_share_percentage')
# single pool now, need support multiple pools in the future
single_pool = {
"pool_name": self.configuration.qnap_poolname,
"total_capacity_gb": total_capacity_gb,
"free_capacity_gb": free_capacity_gb,
"allocated_capacity_gb": alloc_capacity_gb,
"reserved_percentage": reserved_percentage,
"qos": False,
}
data = {
"share_backend_name": backend_name,
"vendor_name": "QNAP",
"driver_version": self.DRIVER_VERSION,
"storage_protocol": "NFS",
"snapshot_support": True,
"create_share_from_snapshot_support": True,
"driver_handles_share_servers": self.configuration.safe_get(
'driver_handles_share_servers'),
'pools': [single_pool],
}
super(self.__class__, self)._update_share_stats(data)
@utils.retry(exception=exception.ShareBackendException,
interval=3,
retries=200)
def create_share(self, context, share, share_server=None):
"""Create a new share."""
LOG.debug('share: %s', share.__dict__)
share_proto = share['share_proto']
# User could create two shares with the same name on horizon.
# Therefore, we should not use displayname to create shares on NAS.
create_share_name = self._gen_random_name("share")
# If share name exists, need to change to another name.
created_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_label=create_share_name)
if created_share is not None:
msg = _("Failed to create an unused share name.")
raise exception.ShareBackendException(msg=msg)
create_volID = self.api_executor.create_share(
share,
self.configuration.qnap_poolname,
create_share_name,
share_proto)
# Use private_storage to record volume ID and Name created in the NAS.
_metadata = {'volID': create_volID, 'volName': create_share_name}
self.private_storage.update(share['id'], _metadata)
return self._get_location_path(create_share_name,
share['share_proto'],
self.configuration.qnap_share_ip)
def delete_share(self, context, share, share_server=None):
"""Delete the specified share."""
# Use private_storage to retreive volume ID created in the NAS.
volID = self.private_storage.get(share['id'], 'volID')
if not volID:
LOG.warning(_LW('volID for Share %s does not exist'), share['id'])
return
LOG.debug('volID: %s', volID)
del_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_no=volID)
if del_share is None:
LOG.warning(_LW('Share %s does not exist'), share['id'])
return
vol_no = del_share.find('vol_no').text
self.api_executor.delete_share(vol_no)
self.private_storage.delete(share['id'])
def extend_share(self, share, new_size, share_server=None):
"""Extend an existing share."""
LOG.debug('Entering extend_share share=%(share)s '
'new_size=%(size)s',
{'share': share['display_name'], 'size': new_size})
# Use private_storage to retrieve volume Name created in the NAS.
volName = self.private_storage.get(share['id'], 'volName')
if not volName:
LOG.debug('Share %s does not exist', share['id'])
raise exception.ShareResourceNotFound(share_id=share['id'])
LOG.debug('volName: %s', volName)
share_dict = {
"sharename": volName,
"old_sharename": volName,
"new_size": new_size,
}
self.api_executor.edit_share(share_dict)
@utils.retry(exception=exception.ShareBackendException,
interval=3,
retries=200)
def create_snapshot(self, context, snapshot, share_server=None):
"""Create a snapshot."""
LOG.debug('snapshot[share][share_id]: %s',
snapshot['share']['share_id'])
LOG.debug('snapshot id: %s', snapshot['id'])
# Use private_storage to retrieve volume ID created in the NAS.
volID = self.private_storage.get(snapshot['share']['id'], 'volID')
if not volID:
LOG.warning(
_LW('volID for Share %s does not exist'),
snapshot['share']['id'])
raise exception.ShareResourceNotFound(
share_id=snapshot['share']['id'])
LOG.debug('volID: %s', volID)
# User could create two snapshot with the same name on horizon.
# Therefore, we should not use displayname to create snapshot on NAS.
# if snapshot exist, need to change another
create_snapshot_name = self._gen_random_name("snapshot")
LOG.debug('create_snapshot_name: %s', create_snapshot_name)
check_snapshot = self.api_executor.get_snapshot_info(
volID=volID, snapshot_name=create_snapshot_name)
if check_snapshot is not None:
msg = _("Failed to create an unused snapshot name.")
raise exception.ShareBackendException(msg=msg)
LOG.debug('create_snapshot_name: %s', create_snapshot_name)
self.api_executor.create_snapshot_api(volID, create_snapshot_name)
snapshot_id = ""
created_snapshot = self.api_executor.get_snapshot_info(
volID=volID, snapshot_name=create_snapshot_name)
if created_snapshot is not None:
snapshot_id = created_snapshot.find('snapshot_id').text
else:
msg = _("Failed to get snapshot information.")
raise exception.ShareBackendException(msg=msg)
LOG.debug('created_snapshot: %s', created_snapshot)
LOG.debug('snapshot_id: %s', snapshot_id)
# Use private_storage to record data instead of metadata.
_metadata = {'snapshot_id': snapshot_id}
self.private_storage.update(snapshot['id'], _metadata)
# Test to get value from private_storage.
snapshot_id = self.private_storage.get(snapshot['id'], 'snapshot_id')
LOG.debug('snapshot_id: %s', snapshot_id)
return {'provider_location': snapshot_id}
def delete_snapshot(self, context, snapshot, share_server=None):
"""Delete a snapshot."""
LOG.debug('Entering delete_snapshot. The deleted snapshot=%(snap)s',
{'snap': snapshot['id']})
snapshot_id = (snapshot.get('provider_location') or
self.private_storage.get(snapshot['id'], 'snapshot_id'))
if not snapshot_id:
LOG.warning(_LW('Snapshot %s does not exist'), snapshot['id'])
return
LOG.debug('snapshot_id: %s', snapshot_id)
self.api_executor.delete_snapshot_api(snapshot_id)
self.private_storage.delete(snapshot['id'])
@utils.retry(exception=exception.ShareBackendException,
interval=3,
retries=200)
def create_share_from_snapshot(self, context, share, snapshot,
share_server=None):
"""Create a share from a snapshot."""
LOG.debug('Entering create_share_from_snapshot. The source '
'snapshot=%(snap)s. The created share=%(share)s',
{'snap': snapshot['id'], 'share': share['id']})
snapshot_id = (snapshot.get('provider_location') or
self.private_storage.get(snapshot['id'], 'snapshot_id'))
if not snapshot_id:
LOG.warning(_LW('Snapshot %s does not exist'), snapshot['id'])
raise exception.SnapshotResourceNotFound(name=snapshot['id'])
LOG.debug('snapshot_id: %s', snapshot_id)
create_share_name = self._gen_random_name("share")
# if sharename exist, need to change another
created_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_label=create_share_name)
if created_share is not None:
msg = _("Failed to create an unused share name.")
raise exception.ShareBackendException(msg=msg)
self.api_executor.clone_snapshot(snapshot_id, create_share_name)
create_volID = ""
created_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_label=create_share_name)
if created_share.find('vol_no') is not None:
create_volID = created_share.find('vol_no').text
else:
msg = _("Failed to clone a snapshot in time.")
raise exception.ShareBackendException(msg=msg)
snap_share = self.share_api.get(context,
snapshot['share_instance']['share_id'])
LOG.debug('snap_share[size]: %s', snap_share['size'])
if (share['size'] > snap_share['size']):
share_dict = {'sharename': create_share_name,
'old_sharename': create_share_name,
'new_size': share['size']}
self.api_executor.edit_share(share_dict)
# Use private_storage to record volume ID and Name created in the NAS.
_metadata = {
'volID': create_volID,
'volName': create_share_name,
}
self.private_storage.update(share['id'], _metadata)
# Test to get value from private_storage.
volName = self.private_storage.get(share['id'], 'volName')
LOG.debug('volName: %s', volName)
return self._get_location_path(create_share_name,
share['share_proto'],
self.configuration.qnap_share_ip)
def _get_manila_hostIPv4s(self, hostlist):
host_dict_IPs = []
if hostlist is None:
return host_dict_IPs
for host in hostlist:
# Check host alias name with prefix "manila-hst-" to verify this
# host is created/managed by Manila or not.
if (re.match("^manila-hst-[0-9]+$", host.find('name').text)
is not None):
LOG.debug('host netaddrs text: %s', host.find('netaddrs').text)
if host.find('netaddrs').text is not None:
# Because Manila supports only IPv4 now, check "netaddrs"
# have "ipv4" tag to verify this host is created/managed
# by Manila or not.
if host.find('netaddrs/ipv4').text is not None:
host_dict = {
'index': host.find('index').text,
'hostid': host.find('hostid').text,
'name': host.find('name').text,
'netaddrs': host.find('netaddrs').find('ipv4').text
}
host_dict_IPs.append(host_dict)
return host_dict_IPs
def update_access(self, context, share, access_rules, add_rules,
delete_rules, share_server=None):
if not (add_rules or delete_rules):
volName = self.private_storage.get(share['id'], 'volName')
LOG.debug('volName: %s', volName)
if volName is None:
LOG.debug('Share %s does not exist', share['id'])
raise exception.ShareResourceNotFound(share_id=share['id'])
# Clear all current ACLs
self.api_executor.set_nfs_access(volName, 2, "all")
# Add each one through all rules.
for access in access_rules:
self._allow_access(context, share, access, share_server)
else:
# Adding/Deleting specific rules
for access in delete_rules:
self._deny_access(context, share, access, share_server)
for access in add_rules:
self._allow_access(context, share, access, share_server)
def _allow_access(self, context, share, access, share_server=None):
"""Allow access to the share."""
share_proto = share['share_proto']
access_type = access['access_type']
access_level = access['access_level']
access_to = access['access_to']
self._check_share_access(share_proto, access_type)
hostlist = self.api_executor.get_host_list()
host_dict_IPs = self._get_manila_hostIPv4s(hostlist)
LOG.debug('host_dict_IPs: %s', host_dict_IPs)
if len(host_dict_IPs) == 0:
host_name = self._gen_random_name("host")
self.api_executor.add_host(host_name, access_to)
else:
for host in host_dict_IPs:
LOG.debug('host[netaddrs]: %s', host['netaddrs'])
LOG.debug('access_to: %s', access_to)
if host['netaddrs'] == access_to:
LOG.debug('in match ip')
host_name = host['name']
break
if host is host_dict_IPs[-1]:
host_name = self._gen_random_name("host")
self.api_executor.add_host(host_name, access_to)
volName = self.private_storage.get(share['id'], 'volName')
LOG.debug('volName: %(volName)s for share: %(share)s',
{'volName': volName, 'share': share['id']})
LOG.debug('access_level: %(access)s for share: %(share)s',
{'access': access_level, 'share': share['id']})
LOG.debug('host_name: %(host)s for share: %(share)s',
{'host': host_name, 'share': share['id']})
if access_level == constants.ACCESS_LEVEL_RO:
self.api_executor.set_nfs_access(volName, 1, host_name)
elif access_level == constants.ACCESS_LEVEL_RW:
self.api_executor.set_nfs_access(volName, 0, host_name)
def _deny_access(self, context, share, access, share_server=None):
"""Deny access to the share."""
share_proto = share['share_proto']
access_type = access['access_type']
access_to = access['access_to']
try:
self._check_share_access(share_proto, access_type)
except exception.InvalidShareAccess:
LOG.warning(_LW('The denied rule is invalid and does not exist.'))
return
hostlist = self.api_executor.get_host_list()
host_dict_IPs = self._get_manila_hostIPv4s(hostlist)
LOG.debug('host_dict_IPs: %s', host_dict_IPs)
if len(host_dict_IPs) == 0:
return
else:
for host in host_dict_IPs:
if (host['netaddrs'] == access_to):
host_name = host['name']
break
if (host is host_dict_IPs[-1]):
return
volName = self.private_storage.get(share['id'], 'volName')
LOG.debug('volName: %s', volName)
self.api_executor.set_nfs_access(volName, 2, host_name)
def _check_share_access(self, share_proto, access_type):
if share_proto == 'NFS' and access_type != 'ip':
reason = _('Only "ip" access type is allowed for '
'NFS shares.')
LOG.warning(reason)
raise exception.InvalidShareAccess(reason=reason)
elif share_proto != 'NFS':
reason = _('Invalid NAS protocol: %s') % share_proto
raise exception.InvalidShareAccess(reason=reason)
def manage_existing(self, share, driver_options):
"""Manages a share that exists on backend."""
if share['share_proto'].lower() == 'nfs':
# 10.0.0.1:/share/example
LOG.info(_LI("Share %(shr_path)s will be managed with ID "
"%(shr_id)s."),
{'shr_path': share['export_locations'][0]['path'],
'shr_id': share['id']})
old_path_info = share['export_locations'][0]['path'].split(
':/share/')
if len(old_path_info) == 2:
ip = old_path_info[0]
share_name = old_path_info[1]
else:
msg = _("Incorrect path. It should have the following format: "
"IP:/share/share_name.")
raise exception.ShareBackendException(msg=msg)
else:
msg = _('Invalid NAS protocol: %s') % share['share_proto']
raise exception.InvalidInput(reason=msg)
if ip != self.configuration.qnap_share_ip:
msg = _("The NAS IP %(ip)s is not configured.") % {'ip': ip}
raise exception.ShareBackendException(msg=msg)
existing_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_label=share_name)
if existing_share is None:
msg = _("The share %s trying to be managed was not found on "
"backend.") % share['id']
raise exception.ManageInvalidShare(reason=msg)
_metadata = {}
vol_no = existing_share.find('vol_no').text
_metadata['volID'] = vol_no
_metadata['volName'] = share_name
self.private_storage.update(share['id'], _metadata)
# Test to get value from private_storage.
volID = self.private_storage.get(share['id'], 'volID')
LOG.debug('volID: %s', volID)
volName = self.private_storage.get(share['id'], 'volName')
LOG.debug('volName: %s', volName)
LOG.info(_LI("Share %(shr_path)s was successfully managed with ID "
"%(shr_id)s."),
{'shr_path': share['export_locations'][0]['path'],
'shr_id': share['id']})
vol = self.api_executor.get_specific_volinfo(vol_no)
vol_size_gb = int(vol.find('size').text) / units.Gi
export_locations = self._get_location_path(
share_name,
share['share_proto'],
self.configuration.qnap_share_ip)
return {'size': vol_size_gb, 'export_locations': export_locations}
def unmanage(self, share):
"""Remove the specified share from Manila management."""
self.private_storage.delete(share['id'])
def manage_existing_snapshot(self, snapshot, driver_options):
"""Manage existing share snapshot with manila."""
volID = self.private_storage.get(snapshot['share']['id'], 'volID')
LOG.debug('volID: %s', volID)
existing_share = self.api_executor.get_share_info(
self.configuration.qnap_poolname,
vol_no=volID)
if existing_share is None:
msg = _("The share id %s was not found on backend.") % volID
LOG.error(msg)
raise exception.ShareNotFound(reason=msg)
snapshot_id = snapshot.get('provider_location')
snapshot_id_info = snapshot_id.split('@')
if len(snapshot_id_info) == 2:
share_name = snapshot_id_info[0]
else:
msg = _("Incorrect provider_location format. It should have the "
"following format: share_name@snapshot_name.")
LOG.error(msg)
raise exception.InvalidParameterValue(reason=msg)
if share_name != existing_share.find('vol_label').text:
msg = (_("The assigned share %(share_name)s was not matched "
"%(vol_label)s on backend.") %
{'share_name': share_name,
'vol_label': existing_share.find('vol_label').text})
LOG.error(msg)
raise exception.ShareNotFound(reason=msg)
_metadata = {
'snapshot_id': snapshot_id,
}
self.private_storage.update(snapshot['id'], _metadata)
def unmanage_snapshot(self, snapshot):
"""Remove the specified snapshot from Manila management."""
self.private_storage.delete(snapshot['id'])

6
manila/tests/conf_fixture.py

@ -53,6 +53,12 @@ def set_defaults(conf):
_safe_set_of_opts(conf, 'hitachi_hsp_username', 'hsp_user')
_safe_set_of_opts(conf, 'hitachi_hsp_password', 'hsp_password')
_safe_set_of_opts(conf, 'qnap_management_url', 'http://1.2.3.4:8080')
_safe_set_of_opts(conf, 'qnap_share_ip', '1.2.3.4')
_safe_set_of_opts(conf, 'qnap_nas_login', 'admin')
_safe_set_of_opts(conf, 'qnap_nas_password', 'qnapadmin')
_safe_set_of_opts(conf, 'qnap_poolname', 'Storage Pool 1')
def _safe_set_of_opts(conf, *args, **kwargs):
try:

0
manila/tests/share/drivers/qnap/__init__.py

527
manila/tests/share/drivers/qnap/fakes.py

@ -0,0 +1,527 @@
# Copyright (c) 2016 QNAP Systems, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
FAKE_RES_DETAIL_DATA_LOGIN = """
<QDocRoot version="1.0">
<authPassed><![CDATA[1]]></authPassed>
<authSid><![CDATA[fakeSid]]></authSid>
</QDocRoot>"""
FAKE_RES_DETAIL_DATA_GETBASIC_INFO_ES = """
<QDocRoot version="1.0">
<model>
<displayModelName><![CDATA[ES1640dc]]></displayModelName>