Huawei: Create share from snapshot support in Huawei driver

Once create share from snapshot is in Manila core,
we should support the API in the Huawei drivers.

Implements: blueprint huawei-driver-create-share-from-snapshot
Change-Id: I906916f6ae7f5625863ca9f8e3324480048374de
This commit is contained in:
liucheng 2015-12-21 15:22:53 +08:00
parent ea8276cd10
commit e9ca434486
8 changed files with 625 additions and 9 deletions

View File

@ -51,7 +51,7 @@ Mapping of share drivers and share features support
+----------------------------------------+-----------------------------+-----------------------+--------------+--------------+------------------------+----------------------------+
| HPE 3PAR | DHSS = True (L) & False (K) | \- | \- | \- | K | K |
+----------------------------------------+-----------------------------+-----------------------+--------------+--------------+------------------------+----------------------------+
| Huawei | DHSS = True (M) & False(K) | L | L | L | K | \- |
| Huawei | DHSS = True (M) & False(K) | L | L | L | K | M |
+----------------------------------------+-----------------------------+-----------------------+--------------+--------------+------------------------+----------------------------+
| IBM GPFS | DHSS = False(K) | \- | L | \- | K | K |
+----------------------------------------+-----------------------------+-----------------------+--------------+--------------+------------------------+----------------------------+

View File

@ -706,3 +706,15 @@ class DriverNotInitialized(ManilaException):
class ShareResourceNotFound(StorageResourceNotFound):
message = _("Share id %(share_id)s could not be found "
"in storage backend.")
class ShareUmountException(ManilaException):
message = _("Failed to unmount share: %(reason)s")
class ShareMountException(ManilaException):
message = _("Failed to mount share: %(reason)s")
class ShareCopyDataException(ManilaException):
message = _("Failed to copy data: %(reason)s")

View File

@ -59,6 +59,11 @@ class HuaweiBase(object):
def extend_share(self, share, new_size, share_server):
"""Extends size of existing share."""
@abc.abstractmethod
def create_share_from_snapshot(self, share, snapshot,
share_server=None):
"""Create share from snapshot."""
@abc.abstractmethod
def shrink_share(self, share, new_size, share_server):
"""Shrinks size of existing share."""

View File

@ -33,6 +33,8 @@ LOGIN_SOCKET_TIMEOUT = 4
QOS_NAME_PREFIX = 'OpenStack_'
SYSTEM_NAME_PREFIX = "Array-"
ARRAY_VERSION = 'V300R003C00'
TMP_PATH_SRC_PREFIX = "huawei_manila_tmp_path_src_"
TMP_PATH_DST_PREFIX = "huawei_manila_tmp_path_dst_"
ACCESS_NFS_RW = "1"
ACCESS_NFS_RO = "0"

View File

@ -55,6 +55,7 @@ class HuaweiNasDriver(driver.ShareDriver):
1.2 - Add share server support.
Add ensure share.
Add QoS support.
Add create share from snapshot.
"""
def __init__(self, *args, **kwargs):
@ -112,6 +113,13 @@ class HuaweiNasDriver(driver.ShareDriver):
LOG.debug("Extend a share.")
self.plugin.extend_share(share, new_size, share_server)
def create_share_from_snapshot(self, context, share, snapshot,
share_server=None):
"""Create a share from snapshot."""
LOG.debug("Create a share from snapshot %s.", snapshot['snapshot_id'])
location = self.plugin.create_share_from_snapshot(share, snapshot)
return location
def shrink_share(self, share, new_size, share_server=None):
"""Shrinks size of existing share."""
LOG.debug("Shrink a share.")

View File

@ -13,18 +13,23 @@
# License for the specific language governing permissions and limitations
# under the License.
import os
import random
import string
import tempfile
import time
from oslo_log import log
from oslo_serialization import jsonutils
from oslo_utils import excutils
from oslo_utils import strutils
from oslo_utils import units
import six
from manila.common import constants as common_constants
from manila import exception
from manila.i18n import _
from manila.i18n import _LE
from manila.i18n import _LI
from manila.i18n import _LW
from manila.share.drivers.huawei import base as driver
@ -328,6 +333,241 @@ class V3StorageConnection(driver.HuaweiBase):
return share
def create_share_from_snapshot(self, share, snapshot,
share_server=None):
"""Create a share from snapshot."""
share_fs_id = self.helper._get_fsid_by_name(snapshot['share_name'])
if not share_fs_id:
err_msg = (_("The source filesystem of snapshot %s "
"does not exist.")
% snapshot['snapshot_id'])
LOG.error(err_msg)
raise exception.StorageResourceNotFound(
name=snapshot['share_name'])
snapshot_id = self.helper._get_snapshot_id(share_fs_id, snapshot['id'])
snapshot_flag = self.helper._check_snapshot_id_exist(snapshot_id)
if not snapshot_flag:
err_msg = (_("Cannot find snapshot %s on array.")
% snapshot['snapshot_id'])
LOG.error(err_msg)
raise exception.ShareSnapshotNotFound(
snapshot_id=snapshot['snapshot_id'])
self.assert_filesystem(share_fs_id)
old_share_name = self.helper.get_share_name_by_id(
snapshot['share_id'])
old_share_proto = self._get_share_proto(old_share_name)
if not old_share_proto:
err_msg = (_("Cannot find source share %(share)s of "
"snapshot %(snapshot)s on array.")
% {'share': snapshot['share_id'],
'snapshot': snapshot['snapshot_id']})
LOG.error(err_msg)
raise exception.ShareResourceNotFound(
share_id=snapshot['share_id'])
new_share_path = self.create_share(share)
new_share = {
"share_proto": share['share_proto'],
"size": share['size'],
"name": share['name'],
"mount_path": new_share_path.replace("\\", "/"),
"mount_src":
tempfile.mkdtemp(prefix=constants.TMP_PATH_DST_PREFIX),
}
old_share_path = self._get_location_path(old_share_name,
old_share_proto)
old_share = {
"share_proto": old_share_proto,
"name": old_share_name,
"mount_path": old_share_path.replace("\\", "/"),
"mount_src":
tempfile.mkdtemp(prefix=constants.TMP_PATH_SRC_PREFIX),
"snapshot_name": ("share_snapshot_" +
snapshot['id'].replace("-", "_"))
}
try:
self.copy_data_from_parent_share(old_share, new_share)
except Exception:
with excutils.save_and_reraise_exception():
self.delete_share(new_share)
finally:
for item in (new_share, old_share):
try:
os.rmdir(item['mount_src'])
except Exception as err:
err_msg = (_('Failed to remove temp file. '
'File path: %(file_path)s. Reason: %(err)s.')
% {'file_path': item['mount_src'],
'err': six.text_type(err)})
LOG.warning(err_msg)
return new_share_path
def copy_data_from_parent_share(self, old_share, new_share):
old_access = self.get_access(old_share)
old_access_id = self._get_access_id(old_share, old_access)
if not old_access_id:
try:
self.allow_access(old_share, old_access)
except exception.ManilaException as err:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Failed to add access to share %(name)s. '
'Reason: %(err)s.')
% {'name': old_share['name'],
'err': six.text_type(err)})
new_access = self.get_access(new_share)
try:
try:
self.mount_share_to_host(old_share, old_access)
except exception.ShareMountException as err:
with excutils.save_and_reraise_exception():
LOG.error(_LE('Failed to mount old share %(name)s. '
'Reason: %(err)s.')
% {'name': old_share['name'],
'err': six.text_type(err)})
try:
self.allow_access(new_share, new_access)
self.mount_share_to_host(new_share, new_access)
except Exception as err:
with excutils.save_and_reraise_exception():
self.umount_share_from_host(old_share)
LOG.error(_LE('Failed to mount new share %(name)s. '
'Reason: %(err)s.')
% {'name': new_share['name'],
'err': six.text_type(err)})
copied = self.copy_snapshot_data(old_share, new_share)
for item in (new_share, old_share):
try:
self.umount_share_from_host(item)
except exception.ShareUmountException as err:
err_msg = (_('Failed to unmount share %(name)s. '
'Reason: %(err)s.')
% {'name': item['name'],
'err': six.text_type(err)})
LOG.warning(err_msg)
self.deny_access(new_share, new_access)
if copied:
LOG.debug("Created share from snapshot successfully, "
"new_share: %s, old_share: %s.",
new_share, old_share)
else:
message = (_('Failed to copy data from share %(old_share)s '
'to share %(new_share)s.')
% {'old_share': old_share['name'],
'new_share': new_share['name']})
raise exception.ShareCopyDataException(reason=message)
finally:
if not old_access_id:
self.deny_access(old_share, old_access)
def get_access(self, share):
share_proto = share['share_proto']
access = {}
root = self.helper._read_xml()
if share_proto == 'NFS':
access['access_to'] = root.findtext('Filesystem/NFSClient/IP')
access['access_level'] = common_constants.ACCESS_LEVEL_RW
access['access_type'] = 'ip'
elif share_proto == 'CIFS':
access['access_to'] = root.findtext(
'Filesystem/CIFSClient/UserName')
access['access_password'] = root.findtext(
'Filesystem/CIFSClient/UserPassword')
access['access_level'] = common_constants.ACCESS_LEVEL_RW
access['access_type'] = 'user'
LOG.debug("Get access for share: %s, access_type: %s, access_to: %s, "
"access_level: %s", share['name'], access['access_type'],
access['access_to'], access['access_level'])
return access
def _get_access_id(self, share, access):
"""Get access id of the share."""
access_id = None
share_name = share['name']
share_url_type = self.helper._get_share_url_type(share['share_proto'])
share_client_type = self.helper._get_share_client_type(
share['share_proto'])
access_to = access['access_to']
share = self.helper._get_share_by_name(share_name, share_url_type)
access_id = self.helper._get_access_from_share(share['ID'], access_to,
share_client_type)
if access_id is None:
LOG.debug('Cannot get access ID from share. '
'share_name: %s', share_name)
return access_id
def copy_snapshot_data(self, old_share, new_share):
src_path = '/'.join((old_share['mount_src'], '.snapshot',
old_share['snapshot_name']))
dst_path = new_share['mount_src']
copy_finish = False
LOG.debug("Copy data from src_path: %s to dst_path: %s.",
src_path, dst_path)
try:
ignore_list = ''
copy = share_utils.Copy(src_path,
dst_path,
ignore_list)
copy.run()
if copy.get_progress()['total_progress'] == 100:
copy_finish = True
except Exception as err:
err_msg = (_("Failed to copy data, reason: %s.")
% six.text_type(err))
LOG.error(err_msg)
return copy_finish
def umount_share_from_host(self, share):
try:
utils.execute('umount', share['mount_path'],
run_as_root=True)
except Exception as err:
message = (_("Failed to unmount share %(share)s. "
"Reason: %(reason)s.")
% {'share': share['name'],
'reason': six.text_type(err)})
raise exception.ShareUmountException(reason=message)
def mount_share_to_host(self, share, access):
LOG.debug("Mounting share: %s to host, mount_src: %s",
share['name'], share['mount_src'])
try:
if share['share_proto'] == 'NFS':
utils.execute('mount', '-t', 'nfs',
share['mount_path'], share['mount_src'],
run_as_root=True)
LOG.debug("Execute mount. mount_src: %s",
share['mount_src'])
elif share['share_proto'] == 'CIFS':
user = ('user=' + access['access_to'] + ',' +
'password=' + access['access_password'])
utils.execute('mount', '-t', 'cifs',
share['mount_path'], share['mount_src'],
'-o', user, run_as_root=True)
except Exception as err:
message = (_('Bad response from mount share: %(share)s. '
'Reason: %(reason)s.')
% {'share': share['name'],
'reason': six.text_type(err)})
raise exception.ShareMountException(reason=message)
def get_network_allocations_number(self):
"""Get number of network interfaces to be created."""
if self.configuration.driver_handles_share_servers:
@ -799,6 +1039,16 @@ class V3StorageConnection(driver.HuaweiBase):
return location
def _get_share_proto(self, share_name):
share_proto = None
for proto in ('NFS', 'CIFS'):
share_url_type = self.helper._get_share_url_type(proto)
share = self.helper._get_share_by_name(share_name, share_url_type)
if share:
share_proto = proto
break
return share_proto
def _get_wait_interval(self):
"""Get wait interval from huawei conf file."""
root = self.helper._read_xml()

View File

@ -660,6 +660,10 @@ class RestHelper(object):
share_path = "/" + share_name.replace("-", "_") + "/"
return share_path
def get_share_name_by_id(self, share_id):
share_name = "share_" + share_id
return share_name
def _get_share_name_by_export_location(self, export_location, share_proto):
export_location_split = None
share_name = None

View File

@ -34,7 +34,9 @@ from manila.share.drivers.huawei import huawei_nas
from manila.share.drivers.huawei.v3 import connection
from manila.share.drivers.huawei.v3 import helper
from manila.share.drivers.huawei.v3 import smartx
from manila.share import utils as share_utils
from manila import test
from manila import utils
def fake_sleep(time):
@ -283,8 +285,8 @@ def dec_driver_handles_share_servers(func):
def QoS_response(method):
if method == "GET":
data = """{"error":{"code":0},
"data":[{"NAME": "OpenStack_Fake_QoS", "MAXIOPS": "100",
"FSLIST": 4, "LUNLIST": ""}]}"""
"data":{"NAME": "OpenStack_Fake_QoS", "MAXIOPS": "100",
"FSLIST": "4", "LUNLIST": "", "RUNNINGSTATUS": "2"}}"""
elif method == "PUT":
data = """{"error":{"code":0}}"""
else:
@ -515,7 +517,7 @@ class FakeHuaweiNasHelper(helper.RestHelper):
"RUNNINGSTATUS":"2"}}"""
else:
data = """{"error":{"code":0},"data":{
"RUNNINGSTATUS":"1"}}"""
"RUNNINGSTATUS":"1"}}"""
if url == "/NFSSERVICE":
if self.service_nfs_status_flag:
@ -732,6 +734,7 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.share_nfs = {
'id': 'fake_uuid',
'share_id': 'fake_uuid',
'project_id': 'fake_tenant_id',
'display_name': 'fake',
'name': 'share-fake-uuid',
@ -857,6 +860,7 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.share_cifs = {
'id': 'fake_uuid',
'share_id': 'fake_uuid',
'project_id': 'fake_tenant_id',
'display_name': 'fake',
'name': 'share-fake-uuid',
@ -889,10 +893,12 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.nfs_snapshot = {
'id': 'fake_snapshot_uuid',
'snapshot_id': 'fake_snapshot_uuid',
'display_name': 'snapshot',
'name': 'fake_snapshot_name',
'size': 1,
'share_name': 'share_fake_uuid',
'share_id': 'fake_uuid',
'share': {
'share_name': 'share_fake_uuid',
'share_id': 'fake_uuid',
@ -903,10 +909,12 @@ class HuaweiShareDriverTestCase(test.TestCase):
self.cifs_snapshot = {
'id': 'fake_snapshot_uuid',
'snapshot_id': 'fake_snapshot_uuid',
'display_name': 'snapshot',
'name': 'fake_snapshot_name',
'size': 1,
'share_name': 'share_fake_uuid',
'share_id': 'fake_uuid',
'share': {
'share_name': 'share_fake_uuid',
'share_id': 'fake_uuid',
@ -1665,11 +1673,318 @@ class HuaweiShareDriverTestCase(test.TestCase):
number = self.driver.get_network_allocations_number()
self.assertEqual(0, number)
def test_create_share_from_snapshot(self):
self.assertRaises(NotImplementedError,
def test_create_nfsshare_from_nfssnapshot_success(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin,
'copy_snapshot_data',
mock.Mock(return_value=True))
self.mock_object(self.driver.plugin,
'umount_share_from_host',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
location = self.driver.create_share_from_snapshot(self._context,
self.share_nfs,
self.nfs_snapshot,
self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertEqual(2, self.driver.plugin.
mount_share_to_host.call_count)
self.assertTrue(self.driver.plugin.
copy_snapshot_data.called)
self.assertEqual(2, self.driver.plugin.
umount_share_from_host.call_count)
self.assertEqual("100.115.10.68:/share_fake_uuid", location)
def test_create_cifsshare_from_cifssnapshot_success(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin,
'copy_snapshot_data',
mock.Mock(return_value=True))
self.mock_object(self.driver.plugin,
'umount_share_from_host',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
location = self.driver.create_share_from_snapshot(self._context,
self.share_cifs,
self.cifs_snapshot,
self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertEqual(2, self.driver.plugin.
mount_share_to_host.call_count)
self.assertTrue(self.driver.plugin.
copy_snapshot_data.called)
self.assertEqual(2, self.driver.plugin.
umount_share_from_host.call_count)
self.assertEqual("\\\\100.115.10.68\\share_fake_uuid", location)
def test_create_nfsshare_from_cifssnapshot_success(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'_get_access_id',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin,
'copy_snapshot_data',
mock.Mock(return_value=True))
self.mock_object(self.driver.plugin,
'umount_share_from_host',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.access_id = None
self.driver.plugin.helper.snapshot_flag = True
location = self.driver.create_share_from_snapshot(self._context,
self.share_nfs,
self.cifs_snapshot,
self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertTrue(self.driver.plugin.
_get_access_id.called)
self.assertEqual(2, self.driver.plugin.
mount_share_to_host.call_count)
self.assertTrue(self.driver.plugin.
copy_snapshot_data.called)
self.assertEqual(2, self.driver.plugin.
umount_share_from_host.call_count)
self.assertEqual("100.115.10.68:/share_fake_uuid", location)
def test_create_cifsshare_from_nfssnapshot_success(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'_get_access_id',
mock.Mock(return_value={}))
self.mock_object(utils,
'execute',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
location = self.driver.create_share_from_snapshot(self._context,
self.share_cifs,
self.nfs_snapshot,
self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertTrue(self.driver.plugin.
_get_access_id.called)
self.assertEqual(4, utils.execute.call_count)
self.assertEqual("\\\\100.115.10.68\\share_fake_uuid", location)
def test_create_share_from_snapshot_nonefs(self):
self.driver.plugin.helper.login()
self.mock_object(self.driver.plugin.helper,
'_get_fsid_by_name',
mock.Mock(return_value={}))
self.assertRaises(exception.StorageResourceNotFound,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs, self.nfs_snapshot,
self.share_server)
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(self.driver.plugin.helper.
_get_fsid_by_name.called)
def test_create_share_from_notexistingsnapshot_fail(self):
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = False
self.assertRaises(exception.ShareSnapshotNotFound,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
def test_create_share_from_share_fail(self):
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
self.mock_object(self.driver.plugin,
'check_fs_status',
mock.Mock(return_value={}))
self.assertRaises(exception.StorageResourceException,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(self.driver.plugin.check_fs_status.called)
def test_create_share_from_snapshot_share_error(self):
self.mock_object(self.driver.plugin,
'_get_share_proto',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
self.assertRaises(exception.ShareResourceNotFound,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(self.driver.plugin.
_get_share_proto.called)
def test_create_share_from_snapshot_allow_oldaccess_fail(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'_get_share_proto',
mock.Mock(return_value='NFS'))
self.mock_object(self.driver.plugin,
'_get_access_id',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin.helper,
'_get_share_by_name',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
self.assertRaises(exception.InvalidShareAccess,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertTrue(self.driver.plugin._get_share_proto.called)
self.assertTrue(self.driver.plugin._get_access_id.called)
self.assertTrue(self.driver.plugin.helper._get_share_by_name.called)
def test_create_share_from_snapshot_mountshare_fail(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(side_effect=exception.
ShareMountException('err')))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
self.assertRaises(exception.ShareMountException,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertEqual(1, self.driver.plugin.
mount_share_to_host.call_count)
def test_create_share_from_snapshot_allow_newaccess_fail(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'_get_share_proto',
mock.Mock(return_value='NFS'))
self.mock_object(self.driver.plugin,
'_get_access_id',
mock.Mock(return_value='5'))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin.helper,
'_get_share_by_name',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin,
'umount_share_from_host',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
self.assertRaises(exception.InvalidShareAccess,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertTrue(self.driver.plugin._get_share_proto.called)
self.assertTrue(self.driver.plugin._get_access_id.called)
self.assertEqual(1, self.driver.plugin.
mount_share_to_host.call_count)
self.assertTrue(self.driver.plugin.helper.
_get_share_by_name.called)
self.assertEqual(1, self.driver.plugin.
umount_share_from_host.call_count)
def test_create_nfsshare_from_nfssnapshot_copydata_fail(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(return_value={}))
self.mock_object(share_utils,
'Copy',
mock.Mock(side_effect=Exception('err')))
self.mock_object(utils,
'execute',
mock.Mock(return_value={}))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
self.assertRaises(exception.ShareCopyDataException,
self.driver.create_share_from_snapshot,
self._context, self.share_nfs,
self.nfs_snapshot, self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertEqual(2, self.driver.plugin.
mount_share_to_host.call_count)
self.assertTrue(share_utils.Copy.called)
self.assertEqual(2, utils.execute.call_count)
def test_create_nfsshare_from_nfssnapshot_umountshare_fail(self):
share_type = self.fake_type_not_extra['test_with_extra']
self.mock_object(db,
'share_type_get',
mock.Mock(return_value=share_type))
self.mock_object(self.driver.plugin,
'mount_share_to_host',
mock.Mock(return_value={}))
self.mock_object(self.driver.plugin,
'copy_snapshot_data',
mock.Mock(return_value=True))
self.mock_object(self.driver.plugin,
'umount_share_from_host',
mock.Mock(side_effect=exception.
ShareUmountException('err')))
self.mock_object(os, 'rmdir',
mock.Mock(side_effect=Exception('err')))
self.driver.plugin.helper.login()
self.driver.plugin.helper.snapshot_flag = True
location = self.driver.create_share_from_snapshot(self._context,
self.share_nfs,
self.cifs_snapshot,
self.share_server)
self.assertTrue(db.share_type_get.called)
self.assertEqual(2, self.driver.plugin.
mount_share_to_host.call_count)
self.assertTrue(self.driver.plugin.copy_snapshot_data.called)
self.assertEqual(2, self.driver.plugin.
umount_share_from_host.call_count)
self.assertTrue(os.rmdir.called)
self.assertEqual("100.115.10.68:/share_fake_uuid", location)
def test_get_share_stats_refresh_pool_not_exist(self):
self.driver.plugin.helper.login()
@ -1693,7 +2008,7 @@ class HuaweiShareDriverTestCase(test.TestCase):
expected['total_capacity_gb'] = 0.0
expected['free_capacity_gb'] = 0.0
expected['qos'] = True
expected["snapshot_support"] = False
expected["snapshot_support"] = True
expected["pools"] = []
pool = dict(
pool_name='OpenStack_Pool',
@ -3018,6 +3333,26 @@ class HuaweiShareDriverTestCase(test.TestCase):
alloctype_text = doc.createTextNode(alloctype_value)
alloctype.appendChild(alloctype_text)
NFSClient = doc.createElement('NFSClient')
virtualip = doc.createElement('IP')
virtualip_text = doc.createTextNode('100.112.0.1')
virtualip.appendChild(virtualip_text)
NFSClient.appendChild(virtualip)
CIFSClient = doc.createElement('CIFSClient')
username = doc.createElement('UserName')
username_text = doc.createTextNode('user_name')
username.appendChild(username_text)
CIFSClient.appendChild(username)
userpassword = doc.createElement('UserPassword')
userpassword_text = doc.createTextNode('user_password')
userpassword.appendChild(userpassword_text)
CIFSClient.appendChild(userpassword)
lun.appendChild(NFSClient)
lun.appendChild(CIFSClient)
lun.appendChild(timeout)
lun.appendChild(alloctype)
lun.appendChild(waitinterval)