Shared filesystem management project for OpenStack.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 

5814 lines
261 KiB

# Copyright (c) 2015 Clinton Knight. All rights reserved.
# Copyright (c) 2015 Tom Barron. All rights reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Unit tests for the NetApp Data ONTAP cDOT base storage driver library.
"""
import copy
import json
import math
import socket
import time
from unittest import mock
import ddt
from oslo_log import log
from oslo_service import loopingcall
from oslo_utils import timeutils
from oslo_utils import units
from oslo_utils import uuidutils
from manila.common import constants
from manila import exception
from manila.share.drivers.netapp.dataontap.client import api as netapp_api
from manila.share.drivers.netapp.dataontap.client import client_cmode
from manila.share.drivers.netapp.dataontap.cluster_mode import data_motion
from manila.share.drivers.netapp.dataontap.cluster_mode import lib_base
from manila.share.drivers.netapp.dataontap.cluster_mode import performance
from manila.share.drivers.netapp.dataontap.protocols import cifs_cmode
from manila.share.drivers.netapp.dataontap.protocols import nfs_cmode
from manila.share.drivers.netapp import utils as na_utils
from manila.share import share_types
from manila.share import utils as share_utils
from manila import test
from manila.tests import fake_share
from manila.tests.share.drivers.netapp.dataontap import fakes as fake
from manila.tests import utils
def fake_replica(**kwargs):
return fake_share.fake_replica(for_manager=True, **kwargs)
@ddt.ddt
class NetAppFileStorageLibraryTestCase(test.TestCase):
def setUp(self):
super(NetAppFileStorageLibraryTestCase, self).setUp()
self.mock_object(na_utils, 'validate_driver_instantiation')
self.mock_object(na_utils, 'setup_tracing')
# Mock loggers as themselves to allow logger arg validation
mock_logger = log.getLogger('mock_logger')
self.mock_object(lib_base.LOG,
'info',
mock.Mock(side_effect=mock_logger.info))
self.mock_object(lib_base.LOG,
'warning',
mock.Mock(side_effect=mock_logger.warning))
self.mock_object(lib_base.LOG,
'error',
mock.Mock(side_effect=mock_logger.error))
self.mock_object(lib_base.LOG,
'debug',
mock.Mock(side_effect=mock_logger.debug))
kwargs = {
'configuration': fake.get_config_cmode(),
'private_storage': mock.Mock(),
'app_version': fake.APP_VERSION
}
self.library = lib_base.NetAppCmodeFileStorageLibrary(fake.DRIVER_NAME,
**kwargs)
self.library._client = mock.Mock()
self.library._perf_library = mock.Mock()
self.client = self.library._client
self.context = mock.Mock()
self.fake_replica = copy.deepcopy(fake.SHARE)
self.fake_replica_2 = copy.deepcopy(fake.SHARE)
self.fake_replica_2['id'] = fake.SHARE_ID2
self.fake_replica_2['replica_state'] = (
constants.REPLICA_STATE_OUT_OF_SYNC)
self.mock_dm_session = mock.Mock()
self.mock_object(data_motion, "DataMotionSession",
mock.Mock(return_value=self.mock_dm_session))
self.mock_object(data_motion, 'get_client_for_backend')
def _mock_api_error(self, code='fake', message='fake'):
return mock.Mock(side_effect=netapp_api.NaApiError(code=code,
message=message))
def test_init(self):
self.assertEqual(fake.DRIVER_NAME, self.library.driver_name)
self.assertEqual(1, na_utils.validate_driver_instantiation.call_count)
self.assertEqual(1, na_utils.setup_tracing.call_count)
self.assertListEqual([], self.library._licenses)
self.assertDictEqual({}, self.library._clients)
self.assertDictEqual({}, self.library._ssc_stats)
self.assertIsNotNone(self.library._app_version)
def test_do_setup(self):
mock_get_api_client = self.mock_object(self.library, '_get_api_client')
self.mock_object(
performance, 'PerformanceLibrary',
mock.Mock(return_value='fake_perf_library'))
self.mock_object(
self.library._client, 'check_for_cluster_credentials',
mock.Mock(return_value=True))
self.mock_object(
self.library, '_check_snaprestore_license',
mock.Mock(return_value=True))
self.mock_object(
self.library,
'_get_licenses',
mock.Mock(return_value=fake.LICENSES))
self.library.do_setup(self.context)
self.assertEqual(fake.LICENSES, self.library._licenses)
mock_get_api_client.assert_called_once_with()
(self.library._client.check_for_cluster_credentials.
assert_called_once_with())
self.assertEqual('fake_perf_library', self.library._perf_library)
self.mock_object(self.library._client,
'check_for_cluster_credentials',
mock.Mock(return_value=True))
self.mock_object(
self.library, '_check_snaprestore_license',
mock.Mock(return_value=True))
mock_set_cluster_info = self.mock_object(
self.library, '_set_cluster_info')
self.library.do_setup(self.context)
mock_set_cluster_info.assert_called_once()
def test_set_cluster_info(self):
self.library._set_cluster_info()
self.assertTrue(self.library._cluster_info['nve_support'],
fake.CLUSTER_NODES)
def test_check_for_setup_error(self):
mock_start_periodic_tasks = self.mock_object(self.library,
'_start_periodic_tasks')
self.library.check_for_setup_error()
mock_start_periodic_tasks.assert_called_once_with()
def test_get_vserver(self):
self.assertRaises(NotImplementedError, self.library._get_vserver)
def test_get_api_client(self):
client_kwargs = fake.CLIENT_KWARGS.copy()
# First call should proceed normally.
mock_client_constructor = self.mock_object(client_cmode,
'NetAppCmodeClient')
client1 = self.library._get_api_client()
self.assertIsNotNone(client1)
mock_client_constructor.assert_called_once_with(**client_kwargs)
# Second call should yield the same object.
mock_client_constructor = self.mock_object(client_cmode,
'NetAppCmodeClient')
client2 = self.library._get_api_client()
self.assertEqual(client1, client2)
self.assertFalse(mock_client_constructor.called)
def test_get_api_client_with_vserver(self):
client_kwargs = fake.CLIENT_KWARGS.copy()
client_kwargs['vserver'] = fake.VSERVER1
# First call should proceed normally.
mock_client_constructor = self.mock_object(client_cmode,
'NetAppCmodeClient')
client1 = self.library._get_api_client(vserver=fake.VSERVER1)
self.assertIsNotNone(client1)
mock_client_constructor.assert_called_once_with(**client_kwargs)
# Second call should yield the same object.
mock_client_constructor = self.mock_object(client_cmode,
'NetAppCmodeClient')
client2 = self.library._get_api_client(vserver=fake.VSERVER1)
self.assertEqual(client1, client2)
self.assertFalse(mock_client_constructor.called)
# A different vserver should work normally without caching.
mock_client_constructor = self.mock_object(client_cmode,
'NetAppCmodeClient')
client3 = self.library._get_api_client(vserver=fake.VSERVER2)
self.assertNotEqual(client1, client3)
client_kwargs['vserver'] = fake.VSERVER2
mock_client_constructor.assert_called_once_with(**client_kwargs)
def test_get_licenses_both_protocols(self):
self.library._have_cluster_creds = True
self.mock_object(self.client,
'get_licenses',
mock.Mock(return_value=fake.LICENSES))
result = self.library._get_licenses()
self.assertSequenceEqual(fake.LICENSES, result)
self.assertEqual(0, lib_base.LOG.error.call_count)
self.assertEqual(1, lib_base.LOG.info.call_count)
def test_get_licenses_one_protocol(self):
self.library._have_cluster_creds = True
licenses = list(fake.LICENSES)
licenses.remove('nfs')
self.mock_object(self.client,
'get_licenses',
mock.Mock(return_value=licenses))
result = self.library._get_licenses()
self.assertListEqual(licenses, result)
self.assertEqual(0, lib_base.LOG.error.call_count)
self.assertEqual(1, lib_base.LOG.info.call_count)
def test_get_licenses_no_protocols(self):
self.library._have_cluster_creds = True
licenses = list(fake.LICENSES)
licenses.remove('nfs')
licenses.remove('cifs')
self.mock_object(self.client,
'get_licenses',
mock.Mock(return_value=licenses))
result = self.library._get_licenses()
self.assertListEqual(licenses, result)
self.assertEqual(1, lib_base.LOG.error.call_count)
self.assertEqual(1, lib_base.LOG.info.call_count)
def test_get_licenses_no_cluster_creds(self):
self.library._have_cluster_creds = False
result = self.library._get_licenses()
self.assertListEqual([], result)
self.assertEqual(1, lib_base.LOG.debug.call_count)
def test_start_periodic_tasks(self):
mock_update_ssc_info = self.mock_object(self.library,
'_update_ssc_info')
mock_handle_ems_logging = self.mock_object(self.library,
'_handle_ems_logging')
mock_handle_housekeeping_tasks = self.mock_object(
self.library, '_handle_housekeeping_tasks')
mock_ssc_periodic_task = mock.Mock()
mock_ems_periodic_task = mock.Mock()
mock_housekeeping_periodic_task = mock.Mock()
mock_loopingcall = self.mock_object(
loopingcall,
'FixedIntervalLoopingCall',
mock.Mock(side_effect=[mock_ssc_periodic_task,
mock_ems_periodic_task,
mock_housekeeping_periodic_task]))
self.library._start_periodic_tasks()
self.assertTrue(mock_update_ssc_info.called)
self.assertFalse(mock_handle_ems_logging.called)
self.assertFalse(mock_housekeeping_periodic_task.called)
mock_loopingcall.assert_has_calls(
[mock.call(mock_update_ssc_info),
mock.call(mock_handle_ems_logging),
mock.call(mock_handle_housekeeping_tasks)])
self.assertTrue(mock_ssc_periodic_task.start.called)
self.assertTrue(mock_ems_periodic_task.start.called)
self.assertTrue(mock_housekeeping_periodic_task.start.called)
def test_get_backend_share_name(self):
result = self.library._get_backend_share_name(fake.SHARE_ID)
expected = (fake.VOLUME_NAME_TEMPLATE %
{'share_id': fake.SHARE_ID.replace('-', '_')})
self.assertEqual(expected, result)
def test_get_backend_snapshot_name(self):
result = self.library._get_backend_snapshot_name(fake.SNAPSHOT_ID)
expected = 'share_snapshot_' + fake.SNAPSHOT_ID.replace('-', '_')
self.assertEqual(expected, result)
def test_get_backend_cg_snapshot_name(self):
result = self.library._get_backend_cg_snapshot_name(fake.SNAPSHOT_ID)
expected = 'share_cg_snapshot_' + fake.SNAPSHOT_ID.replace('-', '_')
self.assertEqual(expected, result)
def test_get_aggregate_space_cluster_creds(self):
self.library._have_cluster_creds = True
self.mock_object(self.library,
'_find_matching_aggregates',
mock.Mock(return_value=fake.AGGREGATES))
self.mock_object(self.library._client,
'get_cluster_aggregate_capacities',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES))
result = self.library._get_aggregate_space()
(self.library._client.get_cluster_aggregate_capacities.
assert_called_once_with(fake.AGGREGATES))
self.assertDictEqual(fake.AGGREGATE_CAPACITIES, result)
def test_get_aggregate_space_no_cluster_creds(self):
self.library._have_cluster_creds = False
self.mock_object(self.library,
'_find_matching_aggregates',
mock.Mock(return_value=fake.AGGREGATES))
self.mock_object(self.library._client,
'get_vserver_aggregate_capacities',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES))
result = self.library._get_aggregate_space()
(self.library._client.get_vserver_aggregate_capacities.
assert_called_once_with(fake.AGGREGATES))
self.assertDictEqual(fake.AGGREGATE_CAPACITIES, result)
def test_check_snaprestore_license_admin_notfound(self):
self.library._have_cluster_creds = True
licenses = list(fake.LICENSES)
licenses.remove('snaprestore')
self.mock_object(self.client,
'get_licenses',
mock.Mock(return_value=licenses))
result = self.library._check_snaprestore_license()
self.assertIs(False, result)
def test_check_snaprestore_license_admin_found(self):
self.library._have_cluster_creds = True
self.library._licenses = fake.LICENSES
result = self.library._check_snaprestore_license()
self.assertIs(True, result)
def test_check_snaprestore_license_svm_scoped_notfound(self):
self.library._have_cluster_creds = False
self.mock_object(self.library._client,
'restore_snapshot',
mock.Mock(side_effect=netapp_api.NaApiError(
code=netapp_api.EAPIERROR,
message=fake.NO_SNAPRESTORE_LICENSE)))
result = self.library._check_snaprestore_license()
self.assertIs(False, result)
def test_check_snaprestore_license_svm_scoped_found(self):
self.library._have_cluster_creds = False
self.mock_object(self.library._client,
'restore_snapshot',
mock.Mock(side_effect=netapp_api.NaApiError(
code=netapp_api.EAPIERROR,
message='Other error')))
result = self.library._check_snaprestore_license()
self.assertIs(True, result)
def test_check_snaprestore_license_svm_scoped_found_exception(self):
self.mock_object(lib_base.LOG, 'exception')
self.library._have_cluster_creds = False
self.mock_object(self.library._client,
'restore_snapshot',
mock.Mock(return_value=None))
self.assertRaises(
exception.NetAppException,
self.library._check_snaprestore_license)
lib_base.LOG.exception.assert_called_once()
def test_get_aggregate_node_cluster_creds(self):
self.library._have_cluster_creds = True
self.mock_object(self.library._client,
'get_node_for_aggregate',
mock.Mock(return_value=fake.CLUSTER_NODE))
result = self.library._get_aggregate_node(fake.AGGREGATE)
(self.library._client.get_node_for_aggregate.
assert_called_once_with(fake.AGGREGATE))
self.assertEqual(fake.CLUSTER_NODE, result)
def test_get_aggregate_node_no_cluster_creds(self):
self.library._have_cluster_creds = False
self.mock_object(self.library._client, 'get_node_for_aggregate')
result = self.library._get_aggregate_node(fake.AGGREGATE)
self.assertFalse(self.library._client.get_node_for_aggregate.called)
self.assertIsNone(result)
def test_get_default_filter_function(self):
result = self.library.get_default_filter_function()
self.assertEqual(self.library.DEFAULT_FILTER_FUNCTION, result)
def test_get_default_goodness_function(self):
result = self.library.get_default_goodness_function()
self.assertEqual(self.library.DEFAULT_GOODNESS_FUNCTION, result)
def test_get_share_stats(self):
mock_get_pools = self.mock_object(
self.library, '_get_pools',
mock.Mock(return_value=fake.POOLS))
result = self.library.get_share_stats(filter_function='filter',
goodness_function='goodness')
expected = {
'share_backend_name': fake.BACKEND_NAME,
'driver_name': fake.DRIVER_NAME,
'vendor_name': 'NetApp',
'driver_version': '1.0',
'netapp_storage_family': 'ontap_cluster',
'storage_protocol': 'NFS_CIFS',
'pools': fake.POOLS,
'share_group_stats': {'consistent_snapshot_support': 'host'},
}
self.assertDictEqual(expected, result)
mock_get_pools.assert_called_once_with(filter_function='filter',
goodness_function='goodness')
def test_get_share_stats_with_replication(self):
self.library.configuration.replication_domain = "fake_domain"
mock_get_pools = self.mock_object(
self.library, '_get_pools',
mock.Mock(return_value=fake.POOLS))
result = self.library.get_share_stats(filter_function='filter',
goodness_function='goodness')
expected = {
'share_backend_name': fake.BACKEND_NAME,
'driver_name': fake.DRIVER_NAME,
'vendor_name': 'NetApp',
'driver_version': '1.0',
'netapp_storage_family': 'ontap_cluster',
'storage_protocol': 'NFS_CIFS',
'replication_type': 'dr',
'replication_domain': 'fake_domain',
'pools': fake.POOLS,
'share_group_stats': {'consistent_snapshot_support': 'host'},
}
self.assertDictEqual(expected, result)
mock_get_pools.assert_called_once_with(filter_function='filter',
goodness_function='goodness')
def test_get_share_server_pools(self):
self.mock_object(self.library,
'_get_pools',
mock.Mock(return_value=fake.POOLS))
result = self.library.get_share_server_pools(fake.SHARE_SERVER)
self.assertListEqual(fake.POOLS, result)
def test_get_pools(self):
self.mock_object(
self.library, '_get_aggregate_space',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES))
self.library._have_cluster_creds = True
self.library._revert_to_snapshot_support = True
self.library._cluster_info = fake.CLUSTER_INFO
self.library._ssc_stats = fake.SSC_INFO
self.library._perf_library.get_node_utilization_for_pool = (
mock.Mock(side_effect=[30.0, 42.0]))
result = self.library._get_pools(filter_function='filter',
goodness_function='goodness')
self.assertListEqual(fake.POOLS, result)
def test_get_pools_vserver_creds(self):
self.mock_object(
self.library, '_get_aggregate_space',
mock.Mock(return_value=fake.AGGREGATE_CAPACITIES_VSERVER_CREDS))
self.library._have_cluster_creds = False
self.library._revert_to_snapshot_support = True
self.library._cluster_info = fake.CLUSTER_INFO
self.library._ssc_stats = fake.SSC_INFO_VSERVER_CREDS
self.library._perf_library.get_node_utilization_for_pool = (
mock.Mock(side_effect=[50.0, 50.0]))
result = self.library._get_pools()
self.assertListEqual(fake.POOLS_VSERVER_CREDS, result)
def test_handle_ems_logging(self):
self.mock_object(self.library,
'_build_ems_log_message_0',
mock.Mock(return_value=fake.EMS_MESSAGE_0))
self.mock_object(self.library,
'_build_ems_log_message_1',
mock.Mock(return_value=fake.EMS_MESSAGE_1))
self.library._handle_ems_logging()
self.library._client.send_ems_log_message.assert_has_calls([
mock.call(fake.EMS_MESSAGE_0),
mock.call(fake.EMS_MESSAGE_1),
])
def test_build_ems_log_message_0(self):
self.mock_object(socket,
'gethostname',
mock.Mock(return_value=fake.HOST_NAME))
result = self.library._build_ems_log_message_0()
self.assertDictEqual(fake.EMS_MESSAGE_0, result)
def test_build_ems_log_message_1(self):
pool_info = {
'pools': {
'vserver': 'fake_vserver',
'aggregates': ['aggr1', 'aggr2'],
},
}
self.mock_object(socket,
'gethostname',
mock.Mock(return_value=fake.HOST_NAME))
self.mock_object(self.library,
'_get_ems_pool_info',
mock.Mock(return_value=pool_info))
result = self.library._build_ems_log_message_1()
self.assertDictEqual(pool_info,
json.loads(result['event-description']))
result['event-description'] = ''
self.assertDictEqual(fake.EMS_MESSAGE_1, result)
def test_get_ems_pool_info(self):
self.assertRaises(NotImplementedError,
self.library._get_ems_pool_info)
def test_find_matching_aggregates(self):
self.assertRaises(NotImplementedError,
self.library._find_matching_aggregates)
@ddt.data(('NFS', nfs_cmode.NetAppCmodeNFSHelper),
('nfs', nfs_cmode.NetAppCmodeNFSHelper),
('CIFS', cifs_cmode.NetAppCmodeCIFSHelper),
('cifs', cifs_cmode.NetAppCmodeCIFSHelper))
@ddt.unpack
def test_get_helper(self, protocol, helper_type):
fake_share = fake.SHARE.copy()
fake_share['share_proto'] = protocol
mock_check_license_for_protocol = self.mock_object(
self.library, '_check_license_for_protocol')
result = self.library._get_helper(fake_share)
mock_check_license_for_protocol.assert_called_once_with(
protocol.lower())
self.assertEqual(helper_type, type(result))
def test_get_helper_invalid_protocol(self):
fake_share = fake.SHARE.copy()
fake_share['share_proto'] = 'iSCSI'
self.mock_object(self.library, '_check_license_for_protocol')
self.assertRaises(exception.NetAppException,
self.library._get_helper,
fake_share)
def test_check_license_for_protocol_no_cluster_creds(self):
self.library._have_cluster_creds = False
result = self.library._check_license_for_protocol('fake_protocol')
self.assertIsNone(result)
def test_check_license_for_protocol_have_license(self):
self.library._have_cluster_creds = True
self.library._licenses = ['base', 'fake_protocol']
result = self.library._check_license_for_protocol('FAKE_PROTOCOL')
self.assertIsNone(result)
def test_check_license_for_protocol_newly_licensed_protocol(self):
self.library._have_cluster_creds = True
self.mock_object(self.library,
'_get_licenses',
mock.Mock(return_value=['base', 'nfs']))
self.library._licenses = ['base']
result = self.library._check_license_for_protocol('NFS')
self.assertIsNone(result)
self.assertTrue(self.library._get_licenses.called)
def test_check_license_for_protocol_unlicensed_protocol(self):
self.library._have_cluster_creds = True
self.mock_object(self.library,
'_get_licenses',
mock.Mock(return_value=['base']))
self.library._licenses = ['base']
self.assertRaises(exception.NetAppException,
self.library._check_license_for_protocol,
'NFS')
def test_get_pool_has_pool(self):
result = self.library.get_pool(fake.SHARE)
self.assertEqual(fake.POOL_NAME, result)
self.assertFalse(self.client.get_aggregate_for_volume.called)
def test_get_pool_no_pool(self):
fake_share = copy.deepcopy(fake.SHARE)
fake_share['host'] = '%(host)s@%(backend)s' % {
'host': fake.HOST_NAME, 'backend': fake.BACKEND_NAME}
self.client.get_aggregate_for_volume.return_value = fake.POOL_NAME
result = self.library.get_pool(fake_share)
self.assertEqual(fake.POOL_NAME, result)
self.assertTrue(self.client.get_aggregate_for_volume.called)
def test_get_pool_raises(self):
fake_share = copy.deepcopy(fake.SHARE)
fake_share['host'] = '%(host)s@%(backend)s' % {
'host': fake.HOST_NAME, 'backend': fake.BACKEND_NAME}
self.client.get_aggregate_for_volume.side_effect = (
exception.NetAppException)
self.assertRaises(exception.NetAppException,
self.library.get_pool,
fake_share)
def test_create_share(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_allocate_container = self.mock_object(self.library,
'_allocate_container')
mock_create_export = self.mock_object(
self.library,
'_create_export',
mock.Mock(return_value='fake_export_location'))
result = self.library.create_share(self.context,
fake.SHARE,
share_server=fake.SHARE_SERVER)
mock_allocate_container.assert_called_once_with(fake.SHARE,
fake.VSERVER1,
vserver_client)
mock_create_export.assert_called_once_with(fake.SHARE,
fake.SHARE_SERVER,
fake.VSERVER1,
vserver_client)
self.assertEqual('fake_export_location', result)
def test_create_share_from_snapshot(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_allocate_container_from_snapshot = self.mock_object(
self.library,
'_allocate_container_from_snapshot')
mock_create_export = self.mock_object(
self.library,
'_create_export',
mock.Mock(return_value='fake_export_location'))
result = self.library.create_share_from_snapshot(
self.context,
fake.SHARE,
fake.SNAPSHOT,
share_server=fake.SHARE_SERVER,
parent_share=fake.SHARE)
mock_allocate_container_from_snapshot.assert_called_once_with(
fake.SHARE,
fake.SNAPSHOT,
fake.VSERVER1,
vserver_client)
mock_create_export.assert_called_once_with(fake.SHARE,
fake.SHARE_SERVER,
fake.VSERVER1,
vserver_client)
self.assertEqual('fake_export_location', result)
def _setup_mocks_for_create_share_from_snapshot(
self, allocate_attr=None, dest_cluster=fake.CLUSTER_NAME):
class FakeDBObj(dict):
def to_dict(self):
return self
if allocate_attr is None:
allocate_attr = mock.Mock()
self.src_vserver_client = mock.Mock()
self.mock_dm_session = mock.Mock()
self.fake_share = FakeDBObj(fake.SHARE)
self.fake_share_server = FakeDBObj(fake.SHARE_SERVER)
self.mock_dm_constr = self.mock_object(
data_motion, "DataMotionSession",
mock.Mock(return_value=self.mock_dm_session))
self.mock_dm_backend = self.mock_object(
self.mock_dm_session, 'get_backend_info_for_share',
mock.Mock(return_value=(None,
fake.VSERVER1, fake.BACKEND_NAME)))
self.mock_dm_get_src_client = self.mock_object(
data_motion, 'get_client_for_backend',
mock.Mock(return_value=self.src_vserver_client))
self.mock_get_src_cluster = self.mock_object(
self.src_vserver_client, 'get_cluster_name',
mock.Mock(return_value=fake.CLUSTER_NAME))
self.dest_vserver_client = mock.Mock()
self.mock_get_vserver = self.mock_object(
self.library, '_get_vserver',
mock.Mock(return_value=(fake.VSERVER2, self.dest_vserver_client)))
self.mock_get_dest_cluster = self.mock_object(
self.dest_vserver_client, 'get_cluster_name',
mock.Mock(return_value=dest_cluster))
self.mock_allocate_container_from_snapshot = self.mock_object(
self.library, '_allocate_container_from_snapshot', allocate_attr)
self.mock_allocate_container = self.mock_object(
self.library, '_allocate_container')
self.mock_dm_create_snapmirror = self.mock_object(
self.mock_dm_session, 'create_snapmirror')
self.mock_storage_update = self.mock_object(
self.library.private_storage, 'update')
self.mock_object(self.library, '_have_cluster_creds',
mock.Mock(return_value=True))
# Parent share on MANILA_HOST_2
self.parent_share = copy.copy(fake.SHARE)
self.parent_share['share_server'] = fake.SHARE_SERVER_2
self.parent_share['host'] = fake.MANILA_HOST_NAME_2
self.parent_share_server = {}
ss_keys = ['id', 'identifier', 'backend_details', 'host']
for key in ss_keys:
self.parent_share_server[key] = (
self.parent_share['share_server'].get(key, None))
self.temp_src_share = {
'id': self.fake_share['id'],
'host': self.parent_share['host'],
'share_server': self.parent_share_server or None
}
@ddt.data(fake.CLUSTER_NAME, fake.CLUSTER_NAME_2)
def test_create_share_from_snapshot_another_host(self, dest_cluster):
self._setup_mocks_for_create_share_from_snapshot(
dest_cluster=dest_cluster)
result = self.library.create_share_from_snapshot(
self.context,
self.fake_share,
fake.SNAPSHOT,
share_server=self.fake_share_server,
parent_share=self.parent_share)
self.fake_share['share_server'] = self.fake_share_server
self.mock_dm_constr.assert_called_once()
self.mock_dm_backend.assert_called_once_with(self.parent_share)
self.mock_dm_get_src_client.assert_called_once_with(
fake.BACKEND_NAME, vserver_name=fake.VSERVER1)
self.mock_get_src_cluster.assert_called_once()
self.mock_get_vserver.assert_called_once_with(self.fake_share_server)
self.mock_get_dest_cluster.assert_called_once()
if dest_cluster != fake.CLUSTER_NAME:
self.mock_allocate_container_from_snapshot.assert_called_once_with(
self.fake_share, fake.SNAPSHOT, fake.VSERVER1,
self.src_vserver_client, split=False)
self.mock_allocate_container.assert_called_once_with(
self.fake_share, fake.VSERVER2,
self.dest_vserver_client, replica=True)
self.mock_dm_create_snapmirror.assert_called_once()
self.temp_src_share['replica_state'] = (
constants.REPLICA_STATE_ACTIVE)
state = self.library.STATE_SNAPMIRROR_DATA_COPYING
else:
self.mock_allocate_container_from_snapshot.assert_called_once_with(
self.fake_share, fake.SNAPSHOT, fake.VSERVER1,
self.src_vserver_client, split=True)
state = self.library.STATE_SPLITTING_VOLUME_CLONE
self.temp_src_share['internal_state'] = state
self.temp_src_share['status'] = constants.STATUS_ACTIVE
str_temp_src_share = json.dumps(self.temp_src_share)
self.mock_storage_update.assert_called_once_with(
self.fake_share['id'], {
'source_share': str_temp_src_share
})
expected_return = {'status': constants.STATUS_CREATING_FROM_SNAPSHOT}
self.assertEqual(expected_return, result)
def test_create_share_from_snapshot_another_host_driver_error(self):
self._setup_mocks_for_create_share_from_snapshot(
allocate_attr=mock.Mock(side_effect=exception.NetAppException))
mock_delete_snapmirror = self.mock_object(
self.mock_dm_session, 'delete_snapmirror')
mock_get_backend_shr_name = self.mock_object(
self.library, '_get_backend_share_name',
mock.Mock(return_value=fake.SHARE_NAME))
mock_share_exits = self.mock_object(
self.library, '_share_exists',
mock.Mock(return_value=True))
mock_deallocate_container = self.mock_object(
self.library, '_deallocate_container')
self.assertRaises(exception.NetAppException,
self.library.create_share_from_snapshot,
self.context,
self.fake_share,
fake.SNAPSHOT,
share_server=self.fake_share_server,
parent_share=self.parent_share)
self.fake_share['share_server'] = self.fake_share_server
self.mock_dm_constr.assert_called_once()
self.mock_dm_backend.assert_called_once_with(self.parent_share)
self.mock_dm_get_src_client.assert_called_once_with(
fake.BACKEND_NAME, vserver_name=fake.VSERVER1)
self.mock_get_src_cluster.assert_called_once()
self.mock_get_vserver.assert_called_once_with(self.fake_share_server)
self.mock_get_dest_cluster.assert_called_once()
self.mock_allocate_container_from_snapshot.assert_called_once_with(
self.fake_share, fake.SNAPSHOT, fake.VSERVER1,
self.src_vserver_client, split=True)
mock_delete_snapmirror.assert_called_once_with(self.temp_src_share,
self.fake_share)
mock_get_backend_shr_name.assert_called_once_with(
self.fake_share['id'])
mock_share_exits.assert_called_once_with(fake.SHARE_NAME,
self.src_vserver_client)
mock_deallocate_container.assert_called_once_with(
fake.SHARE_NAME, self.src_vserver_client)
def test__update_create_from_snapshot_status(self):
fake_result = mock.Mock()
mock_pvt_storage_get = self.mock_object(
self.library.private_storage, 'get',
mock.Mock(return_value=fake.SHARE))
mock__create_continue = self.mock_object(
self.library, '_create_from_snapshot_continue',
mock.Mock(return_value=fake_result))
result = self.library._update_create_from_snapshot_status(
fake.SHARE, fake.SHARE_SERVER)
mock_pvt_storage_get.assert_called_once_with(fake.SHARE['id'],
'source_share')
mock__create_continue.assert_called_once_with(fake.SHARE,
fake.SHARE_SERVER)
self.assertEqual(fake_result, result)
def test__update_create_from_snapshot_status_missing_source_share(self):
mock_pvt_storage_get = self.mock_object(
self.library.private_storage, 'get',
mock.Mock(return_value=None))
expected_result = {'status': constants.STATUS_ERROR}
result = self.library._update_create_from_snapshot_status(
fake.SHARE, fake.SHARE_SERVER)
mock_pvt_storage_get.assert_called_once_with(fake.SHARE['id'],
'source_share')
self.assertEqual(expected_result, result)
def test__update_create_from_snapshot_status_driver_error(self):
fake_src_share = {
'id': fake.SHARE['id'],
'host': fake.SHARE['host'],
'internal_state': 'fake_internal_state',
}
copy_fake_src_share = copy.deepcopy(fake_src_share)
src_vserver_client = mock.Mock()
mock_dm_session = mock.Mock()
mock_pvt_storage_get = self.mock_object(
self.library.private_storage, 'get',
mock.Mock(return_value=json.dumps(copy_fake_src_share)))
mock__create_continue = self.mock_object(
self.library, '_create_from_snapshot_continue',
mock.Mock(side_effect=exception.NetAppException))
mock_dm_constr = self.mock_object(
data_motion, "DataMotionSession",
mock.Mock(return_value=mock_dm_session))
mock_delete_snapmirror = self.mock_object(
mock_dm_session, 'delete_snapmirror')
mock_dm_backend = self.mock_object(
mock_dm_session, 'get_backend_info_for_share',
mock.Mock(return_value=(None,
fake.VSERVER1, fake.BACKEND_NAME)))
mock_dm_get_src_client = self.mock_object(
data_motion, 'get_client_for_backend',
mock.Mock(return_value=src_vserver_client))
mock_get_backend_shr_name = self.mock_object(
self.library, '_get_backend_share_name',
mock.Mock(return_value=fake.SHARE_NAME))
mock_share_exits = self.mock_object(
self.library, '_share_exists',
mock.Mock(return_value=True))
mock_deallocate_container = self.mock_object(
self.library, '_deallocate_container')
mock_pvt_storage_delete = self.mock_object(
self.library.private_storage, 'delete')
result = self.library._update_create_from_snapshot_status(
fake.SHARE, fake.SHARE_SERVER)
expected_result = {'status': constants.STATUS_ERROR}
mock_pvt_storage_get.assert_called_once_with(fake.SHARE['id'],
'source_share')
mock__create_continue.assert_called_once_with(fake.SHARE,
fake.SHARE_SERVER)
mock_dm_constr.assert_called_once()
mock_delete_snapmirror.assert_called_once_with(fake_src_share,
fake.SHARE)
mock_dm_backend.assert_called_once_with(fake_src_share)
mock_dm_get_src_client.assert_called_once_with(
fake.BACKEND_NAME, vserver_name=fake.VSERVER1)
mock_get_backend_shr_name.assert_called_once_with(fake_src_share['id'])
mock_share_exits.assert_called_once_with(fake.SHARE_NAME,
src_vserver_client)
mock_deallocate_container.assert_called_once_with(fake.SHARE_NAME,
src_vserver_client)
mock_pvt_storage_delete.assert_called_once_with(fake.SHARE['id'])
self.assertEqual(expected_result, result)
def _setup_mocks_for_create_from_snapshot_continue(
self, src_host=fake.MANILA_HOST_NAME,
dest_host=fake.MANILA_HOST_NAME, split_completed_result=True,
move_completed_result=True, share_internal_state='fake_state',
replica_state='in_sync'):
self.fake_export_location = 'fake_export_location'
self.fake_src_share = {
'id': fake.SHARE['id'],
'host': src_host,
'internal_state': share_internal_state,
}
self.copy_fake_src_share = copy.deepcopy(self.fake_src_share)
src_pool = src_host.split('#')[1]
dest_pool = dest_host.split('#')[1]
self.src_vserver_client = mock.Mock()
self.dest_vserver_client = mock.Mock()
self.mock_dm_session = mock.Mock()
self.mock_dm_constr = self.mock_object(
data_motion, "DataMotionSession",
mock.Mock(return_value=self.mock_dm_session))
self.mock_pvt_storage_get = self.mock_object(
self.library.private_storage, 'get',
mock.Mock(return_value=json.dumps(self.copy_fake_src_share)))
self.mock_dm_backend = self.mock_object(
self.mock_dm_session, 'get_backend_info_for_share',
mock.Mock(return_value=(None,
fake.VSERVER1, fake.BACKEND_NAME)))
self.mock_extract_host = self.mock_object(
share_utils, 'extract_host',
mock.Mock(side_effect=[src_pool, dest_pool]))
self.mock_dm_get_src_client = self.mock_object(
data_motion, 'get_client_for_backend',
mock.Mock(return_value=self.src_vserver_client))
self.mock_get_vserver = self.mock_object(
self.library, '_get_vserver',
mock.Mock(return_value=(fake.VSERVER2, self.dest_vserver_client)))
self.mock_split_completed = self.mock_object(
self.library, '_check_volume_clone_split_completed',
mock.Mock(return_value=split_completed_result))
self.mock_rehost_vol = self.mock_object(
self.library, '_rehost_and_mount_volume')
self.mock_move_vol = self.mock_object(self.library,
'_move_volume_after_splitting')
self.mock_move_completed = self.mock_object(
self.library, '_check_volume_move_completed',
mock.Mock(return_value=move_completed_result))
self.mock_update_rep_state = self.mock_object(
self.library, 'update_replica_state',
mock.Mock(return_value=replica_state)
)
self.mock_update_snapmirror = self.mock_object(
self.mock_dm_session, 'update_snapmirror')
self.mock_break_snapmirror = self.mock_object(
self.mock_dm_session, 'break_snapmirror')
self.mock_delete_snapmirror = self.mock_object(
self.mock_dm_session, 'delete_snapmirror')
self.mock_get_backend_shr_name = self.mock_object(
self.library, '_get_backend_share_name',
mock.Mock(return_value=fake.SHARE_NAME))
self.mock__delete_share = self.mock_object(self.library,
'_delete_share')
self.mock_set_vol_size_fixes = self.mock_object(
self.dest_vserver_client, 'set_volume_filesys_size_fixed')
self.mock_create_export = self.mock_object(
self.library, '_create_export',
mock.Mock(return_value=self.fake_export_location))
self.mock_pvt_storage_update = self.mock_object(
self.library.private_storage, 'update')
self.mock_pvt_storage_delete = self.mock_object(
self.library.private_storage, 'delete')
self.mock_get_extra_specs_qos = self.mock_object(
share_types, 'get_extra_specs_from_share',
mock.Mock(return_value=fake.EXTRA_SPEC_WITH_QOS))
self.mock__get_provisioning_opts = self.mock_object(
self.library, '_get_provisioning_options',
mock.Mock(return_value=copy.deepcopy(fake.PROVISIONING_OPTIONS))
)
self.mock_modify_create_qos = self.mock_object(
self.library, '_modify_or_create_qos_for_existing_share',
mock.Mock(return_value=fake.QOS_POLICY_GROUP_NAME))
self.mock_modify_vol = self.mock_object(self.dest_vserver_client,
'modify_volume')
self.mock_get_backend_qos_name = self.mock_object(
self.library, '_get_backend_qos_policy_group_name',
mock.Mock(return_value=fake.QOS_POLICY_GROUP_NAME))
self.mock_mark_qos_deletion = self.mock_object(
self.src_vserver_client, 'mark_qos_policy_group_for_deletion')
@ddt.data(fake.MANILA_HOST_NAME, fake.MANILA_HOST_NAME_2)
def test__create_from_snapshot_continue_state_splitting(self, src_host):
self._setup_mocks_for_create_from_snapshot_continue(
src_host=src_host,
share_internal_state=self.library.STATE_SPLITTING_VOLUME_CLONE)
result = self.library._create_from_snapshot_continue(fake.SHARE,
fake.SHARE_SERVER)
fake.SHARE['share_server'] = fake.SHARE_SERVER
self.mock_pvt_storage_get.assert_called_once_with(fake.SHARE['id'],
'source_share')
self.mock_dm_backend.assert_called_once_with(self.fake_src_share)
self.mock_extract_host.assert_has_calls([
mock.call(self.fake_src_share['host'], level='pool'),
mock.call(fake.SHARE['host'], level='pool'),
])
self.mock_dm_get_src_client.assert_called_once_with(
fake.BACKEND_NAME, vserver_name=fake.VSERVER1)
self.mock_get_vserver.assert_called_once_with(fake.SHARE_SERVER)
self.mock_split_completed.assert_called_once_with(
self.fake_src_share, self.src_vserver_client)
self.mock_get_backend_qos_name.assert_called_once_with(fake.SHARE_ID)
self.mock_mark_qos_deletion.assert_called_once_with(
fake.QOS_POLICY_GROUP_NAME)
self.mock_rehost_vol.assert_called_once_with(
fake.SHARE, fake.VSERVER1, self.src_vserver_client,
fake.VSERVER2, self.dest_vserver_client)
if src_host != fake.MANILA_HOST_NAME:
expected_result = {
'status': constants.STATUS_CREATING_FROM_SNAPSHOT
}
self.mock_move_vol.assert_called_once_with(
self.fake_src_share, fake.SHARE, fake.SHARE_SERVER,
cutover_action='defer')
self.fake_src_share['internal_state'] = (
self.library.STATE_MOVING_VOLUME)
self.mock_pvt_storage_update.assert_called_once_with(
fake.SHARE['id'],
{'source_share': json.dumps(self.fake_src_share)}
)
self.assertEqual(expected_result, result)
else:
self.mock_get_extra_specs_qos.assert_called_once_with(fake.SHARE)
self.mock__get_provisioning_opts.assert_called_once_with(
fake.EXTRA_SPEC_WITH_QOS)
self.mock_modify_create_qos.assert_called_once_with(
fake.SHARE, fake.EXTRA_SPEC_WITH_QOS, fake.VSERVER2,
self.dest_vserver_client)
self.mock_get_backend_shr_name.assert_called_once_with(
fake.SHARE_ID)
self.mock_modify_vol.assert_called_once_with(
fake.POOL_NAME, fake.SHARE_NAME,
**fake.PROVISIONING_OPTIONS_WITH_QOS)
self.mock_pvt_storage_delete.assert_called_once_with(
fake.SHARE['id'])
self.mock_create_export.assert_called_once_with(
fake.SHARE, fake.SHARE_SERVER, fake.VSERVER2,
self.dest_vserver_client, clear_current_export_policy=False)
expected_result = {
'status': constants.STATUS_AVAILABLE,
'export_locations': self.fake_export_location,
}
self.assertEqual(expected_result, result)
@ddt.data(True, False)
def test__create_from_snapshot_continue_state_moving(self, move_completed):
self._setup_mocks_for_create_from_snapshot_continue(
share_internal_state=self.library.STATE_MOVING_VOLUME,
move_completed_result=move_completed)
result = self.library._create_from_snapshot_continue(fake.SHARE,
fake.SHARE_SERVER)
expect_result = {
'status': constants.STATUS_CREATING_FROM_SNAPSHOT
}
fake.SHARE['share_server'] = fake.SHARE_SERVER
self.mock_pvt_storage_get.assert_called_once_with(fake.SHARE['id'],
'source_share')
self.mock_dm_backend.assert_called_once_with(self.fake_src_share)
self.mock_extract_host.assert_has_calls([
mock.call(self.fake_src_share['host'], level='pool'),
mock.call(fake.SHARE['host'], level='pool'),
])
self.mock_dm_get_src_client.assert_called_once_with(
fake.BACKEND_NAME, vserver_name=fake.VSERVER1)
self.mock_get_vserver.assert_called_once_with(fake.SHARE_SERVER)
self.mock_move_completed.assert_called_once_with(
fake.SHARE, fake.SHARE_SERVER)
if move_completed:
expect_result['status'] = constants.STATUS_AVAILABLE
self.mock_pvt_storage_delete.assert_called_once_with(
fake.SHARE['id'])
self.mock_create_export.assert_called_once_with(
fake.SHARE, fake.SHARE_SERVER, fake.VSERVER2,
self.dest_vserver_client, clear_current_export_policy=False)
expect_result['export_locations'] = self.fake_export_location
self.assertEqual(expect_result, result)
else:
self.mock_pvt_storage_update.assert_called_once_with(
fake.SHARE['id'],
{'source_share': json.dumps(self.fake_src_share)}
)
self.assertEqual(expect_result, result)
@ddt.data('in_sync', 'out_of_sync')
def test__create_from_snapshot_continue_state_snapmirror(self,
replica_state):
self._setup_mocks_for_create_from_snapshot_continue(
share_internal_state=self.library.STATE_SNAPMIRROR_DATA_COPYING,
replica_state=replica_state)
result = self.library._create_from_snapshot_continue(fake.SHARE,
fake.SHARE_SERVER)
expect_result = {
'status': constants.STATUS_CREATING_FROM_SNAPSHOT
}
fake.SHARE['share_server'] = fake.SHARE_SERVER
self.mock_pvt_storage_get.assert_called_once_with(fake.SHARE['id'],
'source_share')
self.mock_dm_backend.assert_called_once_with(self.fake_src_share)
self.mock_extract_host.assert_has_calls([
mock.call(self.fake_src_share['host'], level='pool'),
mock.call(fake.SHARE['host'], level='pool'),
])
self.mock_dm_get_src_client.assert_called_once_with(
fake.BACKEND_NAME, vserver_name=fake.VSERVER1)
self.mock_get_vserver.assert_called_once_with(fake.SHARE_SERVER)
self.mock_update_rep_state.assert_called_once_with(
None, [self.fake_src_share], fake.SHARE, [], [], fake.SHARE_SERVER)
if replica_state == constants.REPLICA_STATE_IN_SYNC:
self.mock_update_snapmirror.assert_called_once_with(
self.fake_src_share, fake.SHARE)
self.mock_break_snapmirror.assert_called_once_with(
self.fake_src_share, fake.SHARE)
self.mock_delete_snapmirror.assert_called_once_with(
self.fake_src_share, fake.SHARE)
self.mock_get_backend_shr_name.assert_has_calls(
[mock.call(self.fake_src_share['id']),
mock.call(fake.SHARE_ID)])
self.mock__delete_share.assert_called_once_with(
self.fake_src_share, self.src_vserver_client,
remove_export=False)
self.mock_set_vol_size_fixes.assert_called_once_with(
fake.SHARE_NAME, filesys_size_fixed=False)
self.mock_get_extra_specs_qos.assert_called_once_with(fake.SHARE)
self.mock__get_provisioning_opts.assert_called_once_with(
fake.EXTRA_SPEC_WITH_QOS)
self.mock_modify_create_qos.assert_called_once_with(
fake.SHARE, fake.EXTRA_SPEC_WITH_QOS, fake.VSERVER2,
self.dest_vserver_client)
self.mock_modify_vol.assert_called_once_with(
fake.POOL_NAME, fake.SHARE_NAME,
**fake.PROVISIONING_OPTIONS_WITH_QOS)
expect_result['status'] = constants.STATUS_AVAILABLE
self.mock_pvt_storage_delete.assert_called_once_with(
fake.SHARE['id'])
self.mock_create_export.assert_called_once_with(
fake.SHARE, fake.SHARE_SERVER, fake.VSERVER2,
self.dest_vserver_client, clear_current_export_policy=False)
expect_result['export_locations'] = self.fake_export_location
self.assertEqual(expect_result, result)
elif replica_state not in [constants.STATUS_ERROR, None]:
self.mock_pvt_storage_update.assert_called_once_with(
fake.SHARE['id'],
{'source_share': json.dumps(self.fake_src_share)}
)
self.assertEqual(expect_result, result)
def test__create_from_snapshot_continue_state_unknown(self):
self._setup_mocks_for_create_from_snapshot_continue(
share_internal_state='unknown_state')
self.assertRaises(exception.NetAppException,
self.library._create_from_snapshot_continue,
fake.SHARE,
fake.SHARE_SERVER)
self.mock_pvt_storage_delete.assert_called_once_with(fake.SHARE_ID)
@ddt.data(False, True)
def test_allocate_container(self, hide_snapdir):
provisioning_options = copy.deepcopy(fake.PROVISIONING_OPTIONS)
provisioning_options['hide_snapdir'] = hide_snapdir
self.mock_object(self.library, '_get_backend_share_name', mock.Mock(
return_value=fake.SHARE_NAME))
self.mock_object(share_utils, 'extract_host', mock.Mock(
return_value=fake.POOL_NAME))
mock_get_provisioning_opts = self.mock_object(
self.library, '_get_provisioning_options_for_share',
mock.Mock(return_value=provisioning_options))
vserver_client = mock.Mock()
self.library._allocate_container(fake.SHARE_INSTANCE,
fake.VSERVER1,
vserver_client)
mock_get_provisioning_opts.assert_called_once_with(
fake.SHARE_INSTANCE, fake.VSERVER1, vserver_client=vserver_client,
replica=False)
vserver_client.create_volume.assert_called_once_with(
fake.POOL_NAME, fake.SHARE_NAME, fake.SHARE['size'],
thin_provisioned=True, snapshot_policy='default',
language='en-US', dedup_enabled=True, split=True, encrypt=False,
compression_enabled=False, max_files=5000, snapshot_reserve=8)
if hide_snapdir:
vserver_client.set_volume_snapdir_access.assert_called_once_with(
fake.SHARE_NAME, hide_snapdir)
else:
vserver_client.set_volume_snapdir_access.assert_not_called()
def test_remap_standard_boolean_extra_specs(self):
extra_specs = copy.deepcopy(fake.OVERLAPPING_EXTRA_SPEC)
result = self.library._remap_standard_boolean_extra_specs(extra_specs)
self.assertDictEqual(fake.REMAPPED_OVERLAPPING_EXTRA_SPEC, result)
def test_allocate_container_as_replica(self):
self.mock_object(self.library, '_get_backend_share_name', mock.Mock(
return_value=fake.SHARE_NAME))
self.mock_object(share_utils, 'extract_host', mock.Mock(
return_value=fake.POOL_NAME))
mock_get_provisioning_opts = self.mock_object(
self.library, '_get_provisioning_options_for_share',
mock.Mock(return_value=copy.deepcopy(fake.PROVISIONING_OPTIONS)))
vserver_client = mock.Mock()
self.library._allocate_container(fake.SHARE_INSTANCE, fake.VSERVER1,
vserver_client, replica=True)
mock_get_provisioning_opts.assert_called_once_with(
fake.SHARE_INSTANCE, fake.VSERVER1, vserver_client=vserver_client,
replica=True)
vserver_client.create_volume.assert_called_once_with(
fake.POOL_NAME, fake.SHARE_NAME, fake.SHARE['size'],
thin_provisioned=True, snapshot_policy='default',
language='en-US', dedup_enabled=True, split=True,
compression_enabled=False, max_files=5000, encrypt=False,
snapshot_reserve=8, volume_type='dp')
def test_allocate_container_no_pool_name(self):
self.mock_object(self.library, '_get_backend_share_name', mock.Mock(
return_value=fake.SHARE_NAME))
self.mock_object(share_utils, 'extract_host', mock.Mock(
return_value=None))
self.mock_object(self.library, '_check_extra_specs_validity')
self.mock_object(self.library, '_get_provisioning_options')
vserver_client = mock.Mock()
self.assertRaises(exception.InvalidHost,
self.library._allocate_container,
fake.SHARE_INSTANCE, fake.VSERVER1, vserver_client)
self.library._get_backend_share_name.assert_called_once_with(
fake.SHARE_INSTANCE['id'])
share_utils.extract_host.assert_called_once_with(
fake.SHARE_INSTANCE['host'], level='pool')
self.assertEqual(0,
self.library._check_extra_specs_validity.call_count)
self.assertEqual(0, self.library._get_provisioning_options.call_count)
def test_check_extra_specs_validity(self):
boolean_extra_spec_keys = list(
self.library.BOOLEAN_QUALIFIED_EXTRA_SPECS_MAP)
mock_bool_check = self.mock_object(
self.library, '_check_boolean_extra_specs_validity')
mock_string_check = self.mock_object(
self.library, '_check_string_extra_specs_validity')
self.library._check_extra_specs_validity(
fake.SHARE_INSTANCE, fake.EXTRA_SPEC)
mock_bool_check.assert_called_once_with(
fake.SHARE_INSTANCE, fake.EXTRA_SPEC, boolean_extra_spec_keys)
mock_string_check.assert_called_once_with(
fake.SHARE_INSTANCE, fake.EXTRA_SPEC)
def test_check_extra_specs_validity_empty_spec(self):
result = self.library._check_extra_specs_validity(
fake.SHARE_INSTANCE, fake.EMPTY_EXTRA_SPEC)
self.assertIsNone(result)
def test_check_extra_specs_validity_invalid_value(self):
self.assertRaises(
exception.Invalid, self.library._check_extra_specs_validity,
fake.SHARE_INSTANCE, fake.INVALID_EXTRA_SPEC)
def test_check_string_extra_specs_validity(self):
result = self.library._check_string_extra_specs_validity(
fake.SHARE_INSTANCE, fake.EXTRA_SPEC)
self.assertIsNone(result)
def test_check_string_extra_specs_validity_empty_spec(self):
result = self.library._check_string_extra_specs_validity(
fake.SHARE_INSTANCE, fake.EMPTY_EXTRA_SPEC)
self.assertIsNone(result)
def test_check_string_extra_specs_validity_invalid_value(self):
self.assertRaises(
exception.NetAppException,
self.library._check_string_extra_specs_validity,
fake.SHARE_INSTANCE, fake.INVALID_MAX_FILE_EXTRA_SPEC)
def test_check_boolean_extra_specs_validity_invalid_value(self):
self.assertRaises(
exception.Invalid,
self.library._check_boolean_extra_specs_validity,
fake.SHARE_INSTANCE, fake.INVALID_EXTRA_SPEC,
list(self.library.BOOLEAN_QUALIFIED_EXTRA_SPECS_MAP))
def test_check_extra_specs_validity_invalid_combination(self):
self.assertRaises(
exception.Invalid,
self.library._check_boolean_extra_specs_validity,
fake.SHARE_INSTANCE, fake.INVALID_EXTRA_SPEC_COMBO,
list(self.library.BOOLEAN_QUALIFIED_EXTRA_SPECS_MAP))
@ddt.data({'extra_specs': fake.EXTRA_SPEC, 'is_replica': False},
{'extra_specs': fake.EXTRA_SPEC_WITH_QOS, 'is_replica': True},
{'extra_specs': fake.EXTRA_SPEC, 'is_replica': False},
{'extra_specs': fake.EXTRA_SPEC_WITH_QOS, 'is_replica': True})
@ddt.unpack
def test_get_provisioning_options_for_share(self, extra_specs, is_replica):
qos = True if fake.QOS_EXTRA_SPEC in extra_specs else False
vserver_client = mock.Mock()
mock_get_extra_specs_from_share = self.mock_object(
share_types, 'get_extra_specs_from_share',
mock.Mock(return_value=extra_specs))
mock_remap_standard_boolean_extra_specs = self.mock_object(
self.library, '_remap_standard_boolean_extra_specs',
mock.Mock(return_value=extra_specs))
mock_check_extra_specs_validity = self.mock_object(
self.library, '_check_extra_specs_validity')
mock_get_provisioning_options = self.mock_object(
self.library, '_get_provisioning_options',
mock.Mock(return_value=fake.PROVISIONING_OPTIONS))
mock_get_normalized_qos_specs = self.mock_object(
self.library, '_get_normalized_qos_specs',
mock.Mock(return_value={fake.QOS_NORMALIZED_SPEC: 3000}))
mock_create_qos_policy_group = self.mock_object(
self.library, '_create_qos_policy_group', mock.Mock(
return_value=fake.QOS_POLICY_GROUP_NAME))
result = self.library._get_provisioning_options_for_share(
fake.SHARE_INSTANCE, fake.VSERVER1, vserver_client=vserver_client,
replica=is_replica)
if qos and is_replica:
expected_provisioning_opts = fake.PROVISIONING_OPTIONS
self.assertFalse(mock_create_qos_policy_group.called)
else:
expected_provisioning_opts = fake.PROVISIONING_OPTIONS_WITH_QOS
mock_create_qos_policy_group.assert_called_once_with(
fake.SHARE_INSTANCE, fake.VSERVER1,
{fake.QOS_NORMALIZED_SPEC: 3000}, vserver_client)
self.assertEqual(expected_provisioning_opts, result)
mock_get_extra_specs_from_share.assert_called_once_with(
fake.SHARE_INSTANCE)
mock_remap_standard_boolean_extra_specs.assert_called_once_with(
extra_specs)
mock_check_extra_specs_validity.assert_called_once_with(
fake.SHARE_INSTANCE, extra_specs)
mock_get_provisioning_options.assert_called_once_with(extra_specs)
mock_get_normalized_qos_specs.assert_called_once_with(extra_specs)
def test_get_provisioning_options_implicit_false(self):
result = self.library._get_provisioning_options(
fake.EMPTY_EXTRA_SPEC)
expected = {
'language': None,
'max_files': None,
'snapshot_policy': None,
'thin_provisioned': False,
'compression_enabled': False,
'dedup_enabled': False,
'split': False,
'encrypt': False,
'hide_snapdir': False,
}
self.assertEqual(expected, result)
def test_get_boolean_provisioning_options(self):
result = self.library._get_boolean_provisioning_options(
fake.SHORT_BOOLEAN_EXTRA_SPEC,
self.library.BOOLEAN_QUALIFIED_EXTRA_SPECS_MAP)
self.assertEqual(fake.PROVISIONING_OPTIONS_BOOLEAN, result)
def test_get_boolean_provisioning_options_missing_spec(self):
result = self.library._get_boolean_provisioning_options(
fake.SHORT_BOOLEAN_EXTRA_SPEC,
self.library.BOOLEAN_QUALIFIED_EXTRA_SPECS_MAP)
self.assertEqual(fake.PROVISIONING_OPTIONS_BOOLEAN, result)
def test_get_boolean_provisioning_options_implicit_false(self):
expected = {
'thin_provisioned': False,
'dedup_enabled': False,
'compression_enabled': False,
'split': False,
'hide_snapdir': False,
}
result = self.library._get_boolean_provisioning_options(
fake.EMPTY_EXTRA_SPEC,
self.library.BOOLEAN_QUALIFIED_EXTRA_SPECS_MAP)
self.assertEqual(expected, result)
def test_get_string_provisioning_options(self):
result = self.library._get_string_provisioning_options(
fake.STRING_EXTRA_SPEC,
self.library.STRING_QUALIFIED_EXTRA_SPECS_MAP)
self.assertEqual(fake.PROVISIONING_OPTIONS_STRING, result)
def test_get_string_provisioning_options_missing_spec(self):
result = self.library._get_string_provisioning_options(
fake.SHORT_STRING_EXTRA_SPEC,
self.library.STRING_QUALIFIED_EXTRA_SPECS_MAP)
self.assertEqual(fake.PROVISIONING_OPTIONS_STRING_MISSING_SPECS,
result)
def test_get_string_provisioning_options_implicit_false(self):
result = self.library._get_string_provisioning_options(
fake.EMPTY_EXTRA_SPEC,
self.library.STRING_QUALIFIED_EXTRA_SPECS_MAP)
self.assertEqual(fake.PROVISIONING_OPTIONS_STRING_DEFAULT, result)
@ddt.data({}, {'foo': 'bar'}, {'netapp:maxiops': '3000'},
{'qos': True, 'netapp:absiops': '3000'},
{'qos': True, 'netapp:maxiops:': '3000'})
def test_get_normalized_qos_specs_no_qos_specs(self, extra_specs):
if 'qos' in extra_specs:
self.assertRaises(exception.NetAppException,
self.library._get_normalized_qos_specs,
extra_specs)
else:
self.assertDictMatch(
{}, self.library._get_normalized_qos_specs(extra_specs))
@ddt.data({'qos': True, 'netapp:maxiops': '3000', 'netapp:maxbps': '9000'},
{'qos': True, 'netapp:maxiopspergib': '1000',
'netapp:maxiops': '1000'})
def test_get_normalized_qos_specs_multiple_qos_specs(self, extra_specs):
self.assertRaises(exception.NetAppException,
self.library._get_normalized_qos_specs,
extra_specs)
@ddt.data({'qos': True, 'netapp:maxIOPS': '3000'},
{'qos': True, 'netapp:MAxBPs': '3000', 'clem': 'son'},
{'qos': True, 'netapp:maxbps': '3000', 'tig': 'ers'},
{'qos': True, 'netapp:MAXiopSPerGib': '3000', 'kin': 'gsof'},
{'qos': True, 'netapp:maxiopspergib': '3000', 'coll': 'ege'},
{'qos': True, 'netapp:maxBPSperGiB': '3000', 'foot': 'ball'})
def test_get_normalized_qos_specs(self, extra_specs):
expected_normalized_spec = {
key.lower().split('netapp:')[1]: value
for key, value in extra_specs.items() if 'netapp:' in key
}
qos_specs = self.library._get_normalized_qos_specs(extra_specs)
self.assertDictMatch(expected_normalized_spec, qos_specs)
self.assertEqual(1, len(qos_specs))
@ddt.data({'qos': {'maxiops': '3000'}, 'expected': '3000iops'},
{'qos': {'maxbps': '3000'}, 'expected': '3000B/s'},
{'qos': {'maxbpspergib': '3000'}, 'expected': '12000B/s'},
{'qos': {'maxiopspergib': '3000'}, 'expected': '12000iops'})
@ddt.unpack
def test_get_max_throughput(self, qos, expected):
throughput = self.library._get_max_throughput(4, qos)
self.assertEqual(expected, throughput)
def test_create_qos_policy_group(self):
mock_qos_policy_create = self.mock_object(
self.library._client, 'qos_policy_group_create')
self.library._create_qos_policy_group(
fake.SHARE, fake.VSERVER1, {'maxiops': '3000'})
expected_policy_name = 'qos_share_' + fake.SHARE['id'].replace(
'-', '_')
mock_qos_policy_create.assert_called_once_with(
expected_policy_name, fake.VSERVER1, max_throughput='3000iops')
def test_check_if_max_files_is_valid_with_negative_integer(self):
self.assertRaises(exception.NetAppException,
self.library._check_if_max_files_is_valid,
fake.SHARE, -1)
def test_check_if_max_files_is_valid_with_string(self):
self.assertRaises(ValueError,
self.library._check_if_max_files_is_valid,
fake.SHARE, 'abc')
def test_allocate_container_no_pool(self):
vserver_client = mock.Mock()
fake_share_inst = copy.deepcopy(fake.SHARE_INSTANCE)
fake_share_inst['host'] = fake_share_inst['host'].split('#')[0]
self.assertRaises(exception.InvalidHost,
self.library._allocate_container,
fake_share_inst,
fake.VSERVER1,
vserver_client)
def test_check_aggregate_extra_specs_validity(self):
self.library._have_cluster_creds = True
self.library._ssc_stats = fake.SSC_INFO
result = self.library._check_aggregate_extra_specs_validity(
fake.AGGREGATES[0], fake.EXTRA_SPEC)
self.assertIsNone(result)
def test_check_aggregate_extra_specs_validity_no_match(self):
self.library._have_cluster_creds = True
self.library._ssc_stats = fake.SSC_INFO
self.assertRaises(exception.NetAppException,
self.library._check_aggregate_extra_specs_validity,
fake.AGGREGATES[1],
fake.EXTRA_SPEC)
@ddt.data({'provider_location': None, 'size': 50, 'hide_snapdir': True,
'split': None},
{'provider_location': 'fake_location', 'size': 30,
'hide_snapdir': False, 'split': True},
{'provider_location': 'fake_location', 'size': 20,
'hide_snapdir': True, 'split': False})
@ddt.unpack
def test_allocate_container_from_snapshot(
self, provider_location, size, hide_snapdir, split):
provisioning_options = copy.deepcopy(fake.PROVISIONING_OPTIONS)
provisioning_options['hide_snapdir'] = hide_snapdir
mock_get_provisioning_opts = self.mock_object(
self.library, '_get_provisioning_options_for_share',
mock.Mock(return_value=provisioning_options))
vserver = fake.VSERVER1
vserver_client = mock.Mock()
original_snapshot_size = 20
expected_split_op = split or fake.PROVISIONING_OPTIONS['split']
fake_share_inst = copy.deepcopy(fake.SHARE_INSTANCE)
fake_share_inst['size'] = size
fake_snapshot = copy.deepcopy(fake.SNAPSHOT)
fake_snapshot['provider_location'] = provider_location
fake_snapshot['size'] = original_snapshot_size
self.library._allocate_container_from_snapshot(fake_share_inst,
fake_snapshot,
vserver,
vserver_client)
share_name = self.library._get_backend_share_name(
fake_share_inst['id'])
parent_share_name = self.library._get_backend_share_name(
fake_snapshot['share_id'])
parent_snapshot_name = self.library._get_backend_snapshot_name(
fake_snapshot['id']) if not provider_location else 'fake_location'
mock_get_provisioning_opts.assert_called_once_with(
fake_share_inst, fake.VSERVER1, vserver_client=vserver_client)
vserver_client.create_volume_clone.assert_called_once_with(
share_name, parent_share_name, parent_snapshot_name,
thin_provisioned=True, snapshot_policy='default',
language='en-US', dedup_enabled=True, split=expected_split_op,
encrypt=False, compression_enabled=False, max_files=5000)
if size > original_snapshot_size:
vserver_client.set_volume_size.assert_called_once_with(
share_name, size)
else:
vserver_client.set_volume_size.assert_not_called()
if hide_snapdir:
vserver_client.set_volume_snapdir_access.assert_called_once_with(
fake.SHARE_INSTANCE_NAME, hide_snapdir)
else:
vserver_client.set_volume_snapdir_access.assert_not_called()
def test_share_exists(self):
vserver_client = mock.Mock()
vserver_client.volume_exists.return_value = True
result = self.library._share_exists(fake.SHARE_NAME, vserver_client)
self.assertTrue(result)
def test_share_exists_not_found(self):
vserver_client = mock.Mock()
vserver_client.volume_exists.return_value = False
result = self.library._share_exists(fake.SHARE_NAME, vserver_client)
self.assertFalse(result)
def test_delete_share(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_share_exists = self.mock_object(self.library,
'_share_exists',
mock.Mock(return_value=True))
mock_remove_export = self.mock_object(self.library, '_remove_export')
mock_deallocate_container = self.mock_object(self.library,
'_deallocate_container')
self.library.delete_share(self.context,
fake.SHARE,
share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(fake.SHARE['id'])
qos_policy_name = self.library._get_backend_qos_policy_group_name(
fake.SHARE['id'])
mock_share_exists.assert_called_once_with(share_name, vserver_client)
mock_remove_export.assert_called_once_with(fake.SHARE, vserver_client)
mock_deallocate_container.assert_called_once_with(share_name,
vserver_client)
(vserver_client.mark_qos_policy_group_for_deletion
.assert_called_once_with(qos_policy_name))
self.assertEqual(0, lib_base.LOG.info.call_count)
@ddt.data(exception.InvalidInput(reason='fake_reason'),
exception.VserverNotSpecified(),
exception.VserverNotFound(vserver='fake_vserver'))
def test_delete_share_no_share_server(self, get_vserver_exception):
self.mock_object(self.library,
'_get_vserver',
mock.Mock(side_effect=get_vserver_exception))
mock_share_exists = self.mock_object(self.library,
'_share_exists',
mock.Mock(return_value=False))
mock_remove_export = self.mock_object(self.library, '_remove_export')
mock_deallocate_container = self.mock_object(self.library,
'_deallocate_container')
self.library.delete_share(self.context,
fake.SHARE,
share_server=fake.SHARE_SERVER)
self.assertFalse(mock_share_exists.called)
self.assertFalse(mock_remove_export.called)
self.assertFalse(mock_deallocate_container.called)
self.assertFalse(
self.library._client.mark_qos_policy_group_for_deletion.called)
self.assertEqual(1, lib_base.LOG.warning.call_count)
def test_delete_share_not_found(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_share_exists = self.mock_object(self.library,
'_share_exists',
mock.Mock(return_value=False))
mock_remove_export = self.mock_object(self.library, '_remove_export')
mock_deallocate_container = self.mock_object(self.library,
'_deallocate_container')
self.library.delete_share(self.context,
fake.SHARE,
share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(fake.SHARE['id'])
mock_share_exists.assert_called_once_with(share_name, vserver_client)
self.assertFalse(mock_remove_export.called)
self.assertFalse(mock_deallocate_container.called)
self.assertFalse(
self.library._client.mark_qos_policy_group_for_deletion.called)
self.assertEqual(1, lib_base.LOG.info.call_count)
def test_deallocate_container(self):
vserver_client = mock.Mock()
self.library._deallocate_container(fake.SHARE_NAME, vserver_client)
vserver_client.unmount_volume.assert_called_with(fake.SHARE_NAME,
force=True)
vserver_client.offline_volume.assert_called_with(fake.SHARE_NAME)
vserver_client.delete_volume.assert_called_with(fake.SHARE_NAME)
def test_create_export(self):
protocol_helper = mock.Mock()
callback = (lambda export_address, export_path='fake_export_path':
':'.join([export_address, export_path]))
protocol_helper.create_share.return_value = callback
self.mock_object(self.library,
'_get_helper',
mock.Mock(return_value=protocol_helper))
vserver_client = mock.Mock()
vserver_client.get_network_interfaces.return_value = fake.LIFS
fake_interface_addresses_with_metadata = copy.deepcopy(
fake.INTERFACE_ADDRESSES_WITH_METADATA)
mock_get_export_addresses_with_metadata = self.mock_object(
self.library, '_get_export_addresses_with_metadata',
mock.Mock(return_value=fake_interface_addresses_with_metadata))
result = self.library._create_export(fake.SHARE,
fake.SHARE_SERVER,
fake.VSERVER1,
vserver_client)
self.assertEqual(fake.NFS_EXPORTS, result)
mock_get_export_addresses_with_metadata.assert_called_once_with(
fake.SHARE, fake.SHARE_SERVER, fake.LIFS)
protocol_helper.create_share.assert_called_once_with(
fake.SHARE, fake.SHARE_NAME, clear_current_export_policy=True)
def test_create_export_lifs_not_found(self):
self.mock_object(self.library, '_get_helper')
vserver_client = mock.Mock()
vserver_client.get_network_interfaces.return_value = []
self.assertRaises(exception.NetAppException,
self.library._create_export,
fake.SHARE,
fake.SHARE_SERVER,
fake.VSERVER1,
vserver_client)
def test_get_export_addresses_with_metadata(self):
mock_get_aggregate_node = self.mock_object(
self.library, '_get_aggregate_node',
mock.Mock(return_value=fake.CLUSTER_NODES[0]))
mock_get_admin_addresses_for_share_server = self.mock_object(
self.library, '_get_admin_addresses_for_share_server',
mock.Mock(return_value=[fake.LIF_ADDRESSES[1]]))
result = self.library._get_export_addresses_with_metadata(
fake.SHARE, fake.SHARE_SERVER, fake.LIFS)
self.assertEqual(fake.INTERFACE_ADDRESSES_WITH_METADATA, result)
mock_get_aggregate_node.assert_called_once_with(fake.POOL_NAME)
mock_get_admin_addresses_for_share_server.assert_called_once_with(
fake.SHARE_SERVER)
def test_get_export_addresses_with_metadata_node_unknown(self):
mock_get_aggregate_node = self.mock_object(
self.library, '_get_aggregate_node',
mock.Mock(return_value=None))
mock_get_admin_addresses_for_share_server = self.mock_object(
self.library, '_get_admin_addresses_for_share_server',
mock.Mock(return_value=[fake.LIF_ADDRESSES[1]]))
result = self.library._get_export_addresses_with_metadata(
fake.SHARE, fake.SHARE_SERVER, fake.LIFS)
expected = copy.deepcopy(fake.INTERFACE_ADDRESSES_WITH_METADATA)
for key, value in expected.items():
value['preferred'] = False
self.assertEqual(expected, result)
mock_get_aggregate_node.assert_called_once_with(fake.POOL_NAME)
mock_get_admin_addresses_for_share_server.assert_called_once_with(
fake.SHARE_SERVER)
def test_get_admin_addresses_for_share_server(self):
result = self.library._get_admin_addresses_for_share_server(
fake.SHARE_SERVER)
self.assertEqual([fake.ADMIN_NETWORK_ALLOCATIONS[0]['ip_address']],
result)
def test_get_admin_addresses_for_share_server_no_share_server(self):
result = self.library._get_admin_addresses_for_share_server(None)
self.assertEqual([], result)
@ddt.data(True, False)
def test_sort_export_locations_by_preferred_paths(self, reverse):
export_locations = copy.copy(fake.NFS_EXPORTS)
if reverse:
export_locations.reverse()
result = self.library._sort_export_locations_by_preferred_paths(
export_locations)
self.assertEqual(fake.NFS_EXPORTS, result)
def test_remove_export(self):
protocol_helper = mock.Mock()
protocol_helper.get_target.return_value = 'fake_target'
self.mock_object(self.library,
'_get_helper',
mock.Mock(return_value=protocol_helper))
vserver_client = mock.Mock()
self.library._remove_export(fake.SHARE, vserver_client)
protocol_helper.set_client.assert_called_once_with(vserver_client)
protocol_helper.get_target.assert_called_once_with(fake.SHARE)
protocol_helper.delete_share.assert_called_once_with(fake.SHARE,
fake.SHARE_NAME)
def test_remove_export_target_not_found(self):
protocol_helper = mock.Mock()
protocol_helper.get_target.return_value = None
self.mock_object(self.library,
'_get_helper',
mock.Mock(return_value=protocol_helper))
vserver_client = mock.Mock()
self.library._remove_export(fake.SHARE, vserver_client)
protocol_helper.set_client.assert_called_once_with(vserver_client)
protocol_helper.get_target.assert_called_once_with(fake.SHARE)
self.assertFalse(protocol_helper.delete_share.called)
def test_create_snapshot(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
model_update = self.library.create_snapshot(
self.context, fake.SNAPSHOT, share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(
fake.SNAPSHOT['share_id'])
snapshot_name = self.library._get_backend_snapshot_name(
fake.SNAPSHOT['id'])
vserver_client.create_snapshot.assert_called_once_with(share_name,
snapshot_name)
self.assertEqual(snapshot_name, model_update['provider_location'])
@ddt.data(True, False)
def test_revert_to_snapshot(self, use_snap_provider_location):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
fake_snapshot = copy.deepcopy(fake.SNAPSHOT)
if use_snap_provider_location:
fake_snapshot['provider_location'] = 'fake-provider-location'
else:
del fake_snapshot['provider_location']
result = self.library.revert_to_snapshot(
self.context, fake_snapshot, share_server=fake.SHARE_SERVER)
self.assertIsNone(result)
share_name = self.library._get_backend_share_name(
fake_snapshot['share_id'])
snapshot_name = (self.library._get_backend_snapshot_name(
fake_snapshot['id']) if not use_snap_provider_location
else 'fake-provider-location')
vserver_client.restore_snapshot.assert_called_once_with(share_name,
snapshot_name)
def test_delete_snapshot(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_delete_snapshot = self.mock_object(self.library,
'_delete_snapshot')
self.library.delete_snapshot(self.context,
fake.SNAPSHOT,
share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(
fake.SNAPSHOT['share_id'])
snapshot_name = self.library._get_backend_snapshot_name(
fake.SNAPSHOT['id'])
mock_delete_snapshot.assert_called_once_with(
vserver_client, share_name, snapshot_name)
def test_delete_snapshot_with_provider_location(self):
vserver_client = mock.Mock()
vserver_client.get_snapshot.return_value = fake.CDOT_SNAPSHOT
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
fake_snapshot = copy.deepcopy(fake.SNAPSHOT)
fake_snapshot['provider_location'] = 'fake_provider_location'
self.library.delete_snapshot(self.context,
fake_snapshot,
share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(
fake_snapshot['share_id'])
vserver_client.delete_snapshot.assert_called_once_with(
share_name, fake_snapshot['provider_location'])
@ddt.data(exception.InvalidInput(reason='fake_reason'),
exception.VserverNotSpecified(),
exception.VserverNotFound(vserver='fake_vserver'))
def test_delete_snapshot_no_share_server(self, get_vserver_exception):
self.mock_object(self.library,
'_get_vserver',
mock.Mock(side_effect=get_vserver_exception))
mock_delete_snapshot = self.mock_object(self.library,
'_delete_snapshot')
self.library.delete_snapshot(self.context,
fake.SNAPSHOT,
share_server=fake.SHARE_SERVER)
self.assertFalse(mock_delete_snapshot.called)
def test_delete_snapshot_not_found(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_delete_snapshot = self.mock_object(
self.library, '_delete_snapshot',
mock.Mock(side_effect=exception.SnapshotResourceNotFound(
name=fake.SNAPSHOT_NAME)))
self.library.delete_snapshot(self.context,
fake.SNAPSHOT,
share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(
fake.SNAPSHOT['share_id'])
snapshot_name = self.library._get_backend_snapshot_name(
fake.SNAPSHOT['id'])
mock_delete_snapshot.assert_called_once_with(
vserver_client, share_name, snapshot_name)
def test_delete_snapshot_not_unique(self):
vserver_client = mock.Mock()
self.mock_object(self.library,
'_get_vserver',
mock.Mock(return_value=(fake.VSERVER1,
vserver_client)))
mock_delete_snapshot = self.mock_object(
self.library, '_delete_snapshot',
mock.Mock(side_effect=exception.NetAppException()))
self.assertRaises(exception.NetAppException,
self.library.delete_snapshot,
self.context,
fake.SNAPSHOT,
share_server=fake.SHARE_SERVER)
share_name = self.library._get_backend_share_name(
fake.SNAPSHOT['share_id'])
snapshot_name = self.library._get_backend_snapshot_name(
fake.SNAPSHOT['id'])
mock_delete_snapshot.assert_called_once_with(
vserver_client, share_name, snapshot_name)
def test__delete_snapshot(self):
vserver_client = mock.Mock()
vserver_client.get_snapshot.return_value = fake.CDOT_SNAPSHOT
self.library._delete_snapshot(vserver_client,
fake.SHARE_NAME,
fake.SNAPSHOT_NAME)
vserver_client.delete_snapshot.assert_called_once_with(
fake.SHARE_NAME, fake.SNAPSHOT_NAME)
self.assertFalse(vserver_client.get_clone_children_for_snapshot.called)
self.assertFalse(vserver_client.split_volume_clone.called)
self.assertFalse(vserver_client.soft_delete_snapshot.called)
def test__delete_snapshot_busy_volume_clone(self):
vserver_client = mock.Mock()
vserver_client.get_snapshot.return_value = (
fake.CDOT_SNAPSHOT_BUSY_VOLUME_CLONE)
vserver_client.get_clone_children_for_snapshot.return_value = (
fake.CDOT_CLONE_CHILDREN)
self.library._delete_snapshot(vserver_client,
fake.SHARE_NAME,
fake.SNAPSHOT_NAME)
self.assertFalse(vserver_client.delete_snapshot.called)
vserver_client.get_clone_children_for_snapshot.assert_called_once_with(
fake.SHARE_NAME, fake.SNAPSHOT_NAME)
vserver_client.split_volume_clone.assert_has_calls([
mock.call(fake.CDOT_CLONE_CHILD_1),
mock.call(fake.CDOT_CLONE_CHILD_2),
])
vserver_client.soft_delete_snapshot.assert_called_once_with(
fake.SHARE_NAME, fake.SNAPSHOT_NAME)
def test__delete_snapshot_busy_snapmirror(self):
vserver_client = mock.Mock()
vserver_client.get_snapshot.return_value = (
fake.CDOT_SNAPSHOT_BUSY_SNAPMIRROR)
self.assertRaises(exception.ShareSnapshotIsBusy,
self.library._delete_snapshot,
vserver_client,
fake.SHARE_NAME,
fake.SNAPSHOT_NAME)
self.assertFalse(vserver_client.delete_snapshot.called)
self.assertFalse(vserver_client.get_clone_children_for_snapshot.called)
self.assertFalse(vserver_client.split_volume_clone.called)
self.assertFalse(vserver_client.soft_delete_snapshot.called)
@ddt.data(None, fake.VSERVER1)
def test_manage_existing(self, fake_vserver):
vserver_client = mock.Mock()
mock__get_vserver = self.mock_object(
self.library, '_get_vserver',
mock.Mock(return_value=(fake.VSERVER1, vserver_client)))
mock_manage_container = self.mock_object(
self.library,
'_manage_container',
mock.Mock(return_value=fake.SHARE_SIZE))
mock_create_export = self.mock_object(
self.library,
'_create_export',
mock.Mock(return_value=fake.NFS_EXPORTS))
result = self.library.manage_existing(fake.SHARE, {},
share_server=fake_vserver)
expected = {
'size': fake.SHARE_SIZE,
'export_locations': fake.NFS_EXPORTS
}
mock__get_vserver.assert_called_once_with(share_server=fake_vserver)
mock_manage_container.assert_called_once_with(fake.SHARE,
fake.VSERVER1,
vserver_client)
mock_create_export.assert_called_once_with(fake.SHARE,
fake_vserver,
fake.VSERVER1,
vserver_client)
self.assertDictEqual(expected, result)
@ddt.data(None, fake.VSERVER1)
def test_unmanage(self, fake_vserver):
result = self.library.unmanage(fake.SHARE, share_server=fake_vserver)
self.assertIsNone(result)
@ddt.data(True, False)
def test_manage_container_with_qos(self, qos):
vserver_client = mock.Mock()
qos_policy_group_name = fake.QOS_POLICY_GROUP_NAME if qos else None
extra_specs = fake.EXTRA_SPEC_WITH_QOS if qos else fake.EXTRA_SPEC
provisioning_opts = self.library._get_provisioning_options(extra_specs)
if qos:
provisioning_opts['qos_policy_group'] = fake.QOS_POLICY_GROUP_NAME
share_to_manage = copy.deepcopy(fake.SHARE)
share_to_manage['export_location'] = fake.EXPORT_LOCATION
mock_helper = mock.Mock()
mock_helper.get_share_name_for_share.return_value = fake.FLEXVOL_NAME
self.mock_object(self.library,
'_get_helper',
mock.Mock(return_value=mock_helper))
mock_get_volume_to_manage = self.mock_object(
vserver_client,
'get_volume_to_manage',
mock.Mock(return_value=fake.FLEXVOL_TO_MANAGE))
mock_validate_volume_for_manage = self.mock_object(
self.library,
'_validate_volume_for_manage')
self.mock_object(share_types,
'get_extra_specs_from_share',
mock.Mock(return_value=extra_specs))
mock_check_extra_specs_validity = self.mock_object(
self.library,
'_check_extra_specs_validity')
mock_check_aggregate_extra_specs_validity = self.mock_object(
self.library,
'_check_aggregate_extra_specs_validity')
mock_modify_or_create_qos_policy = self.mock_object(
self.library, '_modify_or_create_qos_for_existing_share',
mock.Mock(return_value=qos_policy_group_name))
result = self.library._manage_container(share_to_manage,
fake.VSERVER1,
vserver_client)
mock_get_volume_to_manage.assert_called_once_with(
fake.POOL_NAME, fake.FLEXVOL_NAME)
mock_check_extra_specs_validity.assert_called_once_with(
share_to_manage, extra_specs)
mock_check_aggregate_extra_specs_validity.assert_called_once_with(
fake.POOL_NAME, extra_specs)
vserver_client.unmount_volume.assert_called_once_with(
fake.FLEXVOL_NAME)
vserver_client.set_volume_name.assert_called_once_with(
fake.FLEXVOL_NAME, fake.SHARE_NAME)
vserver_client.mount_volume.assert_called_once_with(
fake.SHARE_NAME)
vserver_client.modify_volume.assert_called_once_with(
fake.POOL_NAME, fake.SHARE_NAME, **provisioning_opts)
mock_modify_or_create_qos_policy.assert_called_once_with(
share_to_manage, extra_specs, fake.VSERVER1, vserver_client)
mock_validate_volume_for_manage.assert_called()
original_data = {
'original_name': fake.FLEXVOL_TO_MANAGE['name'],
'original_junction_path': fake.FLEXVOL_TO_MANAGE['junction-path'],
}
self.library.private_storage.update.assert_called_once_with(
fake.SHARE['id'], original_data)
expected_size = int(
math.ceil(float(fake.FLEXVOL_TO_MANAGE['size']) / units.Gi))
self.assertEqual(expected_size, result)
def test_manage_container_invalid_export_location(self):
vserver_client = mock.Mock()
share_to_manage = copy.deepcopy(fake.SHARE)
share_to_manage['export_location'] = fake.EXPORT_LOCATION
mock_helper = mock.Mock()
mock_helper.get_share_name_for_share.return_value = None
self.mock_object(self.library,
'_get_helper',
mock.Mock(return_value=mock_helper))
self.assertRaises(exception.ManageInvalidShare,
self.library._manage_container,
share_to_manage,
fake.VSERVER1,
vserver_client)
def test_manage_container_not_found(self):
vserver_client = mock.Mock()
share_to_manage = copy.deepcopy(fake.SHARE)
share_to_manage['export_location'] = fake.EXPORT_LOCATION
mock_helper = mock.Mock()
mock_helper.get_share_name_for_share.return_value = fake.FLEXVOL_NAME
self.mock_object(self.library,
'_get_helper',
mock.Mock(return_value=mock_helper))
self.mock_object(vserver_client,
'get_volume_to_manage',
mock.Mock(return_value=None))
self.assertRaises(exception.ManageInvalidShare,
self.library._manage_container,
share_to_manage,
fake.VSERVER1,
vserver_client)
def test_manage_container_invalid_extra_specs(self):
vserver_client = mock.Mock()
share_to_manage = copy.deepcopy(fake.SHARE)
share_to_manage['export_location'] = fake.EXPORT_LOCATION
mock_helper = mock.Mock()
mock_helper.get_share_name_for_share.return_value = fake.FLEXVOL_NAME
self.mock_object(self.library,
'_get_helper',