# Copyright (c) 2014 Alex Meade. All rights reserved. # Copyright (c) 2015 Dustin Schoenbrun. All rights reserved. # Copyright (c) 2015 Tom Barron. All rights reserved. # Copyright (c) 2016 Mike Rooney. All rights reserved. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import copy import time from unittest import mock import uuid import ddt from lxml import etree import paramiko import six from cinder import exception from cinder import ssh_utils from cinder import test from cinder.tests.unit.volume.drivers.netapp.dataontap.client import ( fakes as fake_client) from cinder.tests.unit.volume.drivers.netapp.dataontap import fakes as fake from cinder.volume.drivers.netapp.dataontap.client import api as netapp_api from cinder.volume.drivers.netapp.dataontap.client import client_base from cinder.volume.drivers.netapp.dataontap.client import client_cmode from cinder.volume.drivers.netapp import utils as netapp_utils CONNECTION_INFO = {'hostname': 'hostname', 'transport_type': 'https', 'port': 443, 'username': 'admin', 'password': 'passw0rd', 'vserver': 'fake_vserver', 'api_trace_pattern': 'fake_regex'} @ddt.ddt class NetAppCmodeClientTestCase(test.TestCase): def setUp(self): super(NetAppCmodeClientTestCase, self).setUp() self.mock_object(client_cmode.Client, '_init_ssh_client') self.mock_object(client_cmode.Client, 'get_ontap_version', return_value='9.6') with mock.patch.object(client_cmode.Client, 'get_ontapi_version', return_value=(1, 20)): self.client = client_cmode.Client(**CONNECTION_INFO) self.client.ssh_client = mock.MagicMock() self.client.connection = mock.MagicMock() self.connection = self.client.connection self.vserver = CONNECTION_INFO['vserver'] self.fake_volume = six.text_type(uuid.uuid4()) self.fake_lun = six.text_type(uuid.uuid4()) self.mock_send_request = self.mock_object( self.client.connection, 'send_request') def _mock_api_error(self, code='fake'): return mock.Mock(side_effect=netapp_api.NaApiError(code=code)) def test_has_records(self): result = self.client._has_records(netapp_api.NaElement( fake_client.QOS_POLICY_GROUP_GET_ITER_RESPONSE)) self.assertTrue(result) def test_has_records_not_found(self): result = self.client._has_records( netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE)) self.assertFalse(result) @ddt.data((fake_client.AGGR_GET_ITER_RESPONSE, 2), (fake_client.NO_RECORDS_RESPONSE, 0)) @ddt.unpack def test_get_record_count(self, response, expected): api_response = netapp_api.NaElement(response) result = self.client._get_record_count(api_response) self.assertEqual(expected, result) def test_get_records_count_invalid(self): api_response = netapp_api.NaElement( fake_client.INVALID_GET_ITER_RESPONSE_NO_RECORDS) self.assertRaises(netapp_utils.NetAppDriverException, self.client._get_record_count, api_response) @ddt.data(True, False) def test_send_iter_request(self, enable_tunneling): api_responses = [ netapp_api.NaElement( fake_client.STORAGE_DISK_GET_ITER_RESPONSE_PAGE_1), netapp_api.NaElement( fake_client.STORAGE_DISK_GET_ITER_RESPONSE_PAGE_2), netapp_api.NaElement( fake_client.STORAGE_DISK_GET_ITER_RESPONSE_PAGE_3), ] mock_send_request = self.mock_object( self.client.connection, 'send_request', side_effect=copy.deepcopy(api_responses)) storage_disk_get_iter_args = { 'desired-attributes': { 'storage-disk-info': { 'disk-name': None, } } } result = self.client.send_iter_request( 'storage-disk-get-iter', api_args=storage_disk_get_iter_args, enable_tunneling=enable_tunneling, max_page_length=10) num_records = result.get_child_content('num-records') self.assertEqual('28', num_records) next_tag = result.get_child_content('next-tag') self.assertEqual('', next_tag) args1 = copy.deepcopy(storage_disk_get_iter_args) args1['max-records'] = 10 args2 = copy.deepcopy(storage_disk_get_iter_args) args2['max-records'] = 10 args2['tag'] = 'next_tag_1' args3 = copy.deepcopy(storage_disk_get_iter_args) args3['max-records'] = 10 args3['tag'] = 'next_tag_2' mock_send_request.assert_has_calls([ mock.call('storage-disk-get-iter', args1, enable_tunneling=enable_tunneling), mock.call('storage-disk-get-iter', args2, enable_tunneling=enable_tunneling), mock.call('storage-disk-get-iter', args3, enable_tunneling=enable_tunneling), ]) def test_send_iter_request_single_page(self): api_response = netapp_api.NaElement( fake_client.STORAGE_DISK_GET_ITER_RESPONSE) mock_send_request = self.mock_object(self.client.connection, 'send_request', return_value=api_response) storage_disk_get_iter_args = { 'desired-attributes': { 'storage-disk-info': { 'disk-name': None, } } } result = self.client.send_iter_request( 'storage-disk-get-iter', api_args=storage_disk_get_iter_args, max_page_length=10) num_records = result.get_child_content('num-records') self.assertEqual('4', num_records) args = copy.deepcopy(storage_disk_get_iter_args) args['max-records'] = 10 mock_send_request.assert_has_calls([ mock.call('storage-disk-get-iter', args, enable_tunneling=True), ]) def test_send_iter_request_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) mock_send_request = self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.send_iter_request('storage-disk-get-iter') num_records = result.get_child_content('num-records') self.assertEqual('0', num_records) args = {'max-records': client_cmode.DEFAULT_MAX_PAGE_LENGTH} mock_send_request.assert_has_calls([ mock.call('storage-disk-get-iter', args, enable_tunneling=True), ]) @ddt.data(fake_client.INVALID_GET_ITER_RESPONSE_NO_ATTRIBUTES, fake_client.INVALID_GET_ITER_RESPONSE_NO_RECORDS) def test_send_iter_request_invalid(self, fake_response): api_response = netapp_api.NaElement(fake_response) self.mock_object(self.client.connection, 'send_request', return_value=api_response) self.assertRaises(netapp_utils.NetAppDriverException, self.client.send_iter_request, 'storage-disk-get-iter') def test_list_vservers(self): api_response = netapp_api.NaElement( fake_client.VSERVER_DATA_LIST_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.list_vservers() vserver_get_iter_args = { 'query': { 'vserver-info': { 'vserver-type': 'data' } }, 'desired-attributes': { 'vserver-info': { 'vserver-name': None } } } self.client.send_iter_request.assert_has_calls([ mock.call('vserver-get-iter', vserver_get_iter_args, enable_tunneling=False)]) self.assertListEqual([fake_client.VSERVER_NAME], result) def test_list_vservers_node_type(self): api_response = netapp_api.NaElement( fake_client.VSERVER_DATA_LIST_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.list_vservers(vserver_type='node') vserver_get_iter_args = { 'query': { 'vserver-info': { 'vserver-type': 'node' } }, 'desired-attributes': { 'vserver-info': { 'vserver-name': None } } } self.client.send_iter_request.assert_has_calls([ mock.call('vserver-get-iter', vserver_get_iter_args, enable_tunneling=False)]) self.assertListEqual([fake_client.VSERVER_NAME], result) def test_list_vservers_not_found(self): api_response = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.list_vservers(vserver_type='data') self.assertListEqual([], result) @ddt.data((1, 21), (1, 100), (2, 0)) def test_get_ems_log_destination_vserver(self, ontapi_version): self.mock_object(self.client, 'get_ontapi_version', return_value=ontapi_version) mock_list_vservers = self.mock_object( self.client, 'list_vservers', return_value=[fake_client.ADMIN_VSERVER_NAME]) result = self.client._get_ems_log_destination_vserver() mock_list_vservers.assert_called_once_with(vserver_type='admin') self.assertEqual(fake_client.ADMIN_VSERVER_NAME, result) def test_get_ems_log_destination_vserver_legacy(self): self.mock_object(self.client, 'get_ontapi_version', return_value=(1, 15)) mock_list_vservers = self.mock_object( self.client, 'list_vservers', return_value=[fake_client.NODE_VSERVER_NAME]) result = self.client._get_ems_log_destination_vserver() mock_list_vservers.assert_called_once_with(vserver_type='node') self.assertEqual(fake_client.NODE_VSERVER_NAME, result) def test_get_ems_log_destination_no_cluster_creds(self): self.mock_object(self.client, 'get_ontapi_version', return_value=(1, 21)) mock_list_vservers = self.mock_object( self.client, 'list_vservers', side_effect=[[], [fake_client.VSERVER_NAME]]) result = self.client._get_ems_log_destination_vserver() mock_list_vservers.assert_has_calls([ mock.call(vserver_type='admin'), mock.call(vserver_type='data')]) self.assertEqual(fake_client.VSERVER_NAME, result) def test_get_ems_log_destination_vserver_not_found(self): self.mock_object(self.client, 'get_ontapi_version', return_value=(1, 21)) mock_list_vservers = self.mock_object( self.client, 'list_vservers', return_value=[]) self.assertRaises(exception.NotFound, self.client._get_ems_log_destination_vserver) mock_list_vservers.assert_has_calls([ mock.call(vserver_type='admin'), mock.call(vserver_type='data'), mock.call(vserver_type='node')]) def test_get_iscsi_target_details_no_targets(self): response = netapp_api.NaElement( etree.XML(""" 1 """)) self.connection.invoke_successfully.return_value = response target_list = self.client.get_iscsi_target_details() self.assertEqual([], target_list) def test_get_iscsi_target_details(self): expected_target = { "address": "127.0.0.1", "port": "1337", "interface-enabled": "true", "tpgroup-tag": "7777", } response = netapp_api.NaElement( etree.XML(""" 1 %(address)s %(port)s %(interface-enabled)s %(tpgroup-tag)s """ % expected_target)) self.connection.invoke_successfully.return_value = response target_list = self.client.get_iscsi_target_details() self.assertEqual([expected_target], target_list) def test_get_iscsi_service_details_with_no_iscsi_service(self): response = netapp_api.NaElement( etree.XML(""" 0 """)) self.connection.invoke_successfully.return_value = response iqn = self.client.get_iscsi_service_details() self.assertIsNone(iqn) def test_get_iscsi_service_details(self): expected_iqn = 'iqn.1998-01.org.openstack.iscsi:name1' response = netapp_api.NaElement( etree.XML(""" 1 %s """ % expected_iqn)) self.connection.invoke_successfully.return_value = response iqn = self.client.get_iscsi_service_details() self.assertEqual(expected_iqn, iqn) def test_get_lun_list(self): response = netapp_api.NaElement( etree.XML(""" 2 """)) self.connection.invoke_successfully.return_value = response luns = self.client.get_lun_list() self.assertEqual(2, len(luns)) def test_get_lun_list_with_multiple_pages(self): response = netapp_api.NaElement( etree.XML(""" 2 fake-next """)) response_2 = netapp_api.NaElement( etree.XML(""" 2 """)) self.connection.invoke_successfully.side_effect = [response, response_2] luns = self.client.get_lun_list() self.assertEqual(4, len(luns)) def test_get_lun_map_no_luns_mapped(self): path = '/vol/%s/%s' % (self.fake_volume, self.fake_lun) response = netapp_api.NaElement( etree.XML(""" 0 """)) self.connection.invoke_successfully.return_value = response lun_map = self.client.get_lun_map(path) self.assertEqual([], lun_map) def test_get_lun_map(self): path = '/vol/%s/%s' % (self.fake_volume, self.fake_lun) expected_lun_map = { "initiator-group": "igroup", "lun-id": "1337", "vserver": "vserver", } response = netapp_api.NaElement( etree.XML(""" 1 %(lun-id)s %(initiator-group)s %(vserver)s """ % expected_lun_map)) self.connection.invoke_successfully.return_value = response lun_map = self.client.get_lun_map(path) self.assertEqual([expected_lun_map], lun_map) def test_get_lun_map_multiple_pages(self): path = '/vol/%s/%s' % (self.fake_volume, self.fake_lun) expected_lun_map = { "initiator-group": "igroup", "lun-id": "1337", "vserver": "vserver", } response = netapp_api.NaElement( etree.XML(""" 1 %(lun-id)s %(initiator-group)s %(vserver)s blah """ % expected_lun_map)) response_2 = netapp_api.NaElement( etree.XML(""" 1 %(lun-id)s %(initiator-group)s %(vserver)s """ % expected_lun_map)) self.connection.invoke_successfully.side_effect = [response, response_2] lun_map = self.client.get_lun_map(path) self.assertEqual([expected_lun_map, expected_lun_map], lun_map) def test_get_igroup_by_initiator_none_found(self): initiator = 'initiator' response = netapp_api.NaElement( etree.XML(""" 0 """)) self.connection.invoke_successfully.return_value = response igroup = self.client.get_igroup_by_initiators([initiator]) self.assertEqual([], igroup) def test_get_igroup_by_initiators(self): initiators = ['11:22:33:44:55:66:77:88'] expected_igroup = { 'initiator-group-os-type': 'default', 'initiator-group-type': 'fcp', 'initiator-group-name': 'openstack-igroup1', } response = netapp_api.NaElement( etree.XML(""" true %(initiator-group-name)s default false 0 %(initiator-group-type)s true f8aa707a-57fa-11e4-ad08-123478563412 false 11:22:33:44:55:66:77:88 cinder-iscsi 1 """ % expected_igroup)) self.connection.invoke_successfully.return_value = response igroups = self.client.get_igroup_by_initiators(initiators) # make these lists of dicts comparable using hashable dictionaries igroups = set( [netapp_utils.hashabledict(igroup) for igroup in igroups]) expected = set([netapp_utils.hashabledict(expected_igroup)]) self.assertSetEqual(igroups, expected) def test_get_igroup_by_initiators_multiple(self): initiators = ['11:22:33:44:55:66:77:88', '88:77:66:55:44:33:22:11'] expected_igroup = { 'initiator-group-os-type': 'default', 'initiator-group-type': 'fcp', 'initiator-group-name': 'openstack-igroup1', } response = netapp_api.NaElement( etree.XML(""" true %(initiator-group-name)s default false 0 %(initiator-group-type)s true f8aa707a-57fa-11e4-ad08-123478563412 false 11:22:33:44:55:66:77:88 88:77:66:55:44:33:22:11 cinder-iscsi 1 """ % expected_igroup)) self.connection.invoke_successfully.return_value = response igroups = self.client.get_igroup_by_initiators(initiators) # make these lists of dicts comparable using hashable dictionaries igroups = set( [netapp_utils.hashabledict(igroup) for igroup in igroups]) expected = set([netapp_utils.hashabledict(expected_igroup)]) self.assertSetEqual(igroups, expected) def test_get_igroup_by_initiators_multiple_pages(self): initiator = '11:22:33:44:55:66:77:88' expected_igroup1 = { 'initiator-group-os-type': 'default', 'initiator-group-type': 'fcp', 'initiator-group-name': 'openstack-igroup1', } expected_igroup2 = { 'initiator-group-os-type': 'default', 'initiator-group-type': 'fcp', 'initiator-group-name': 'openstack-igroup2', } response_1 = netapp_api.NaElement( etree.XML(""" true %(initiator-group-name)s default false 0 %(initiator-group-type)s true f8aa707a-57fa-11e4-ad08-123478563412 false 11:22:33:44:55:66:77:88 cinder-iscsi 12345 1 """ % expected_igroup1)) response_2 = netapp_api.NaElement( etree.XML(""" true %(initiator-group-name)s default false 0 %(initiator-group-type)s true f8aa707a-57fa-11e4-ad08-123478563412 false 11:22:33:44:55:66:77:88 cinder-iscsi 1 """ % expected_igroup2)) self.connection.invoke_successfully.side_effect = [response_1, response_2] igroups = self.client.get_igroup_by_initiators([initiator]) # make these lists of dicts comparable using hashable dictionaries igroups = set( [netapp_utils.hashabledict(igroup) for igroup in igroups]) expected = set([netapp_utils.hashabledict(expected_igroup1), netapp_utils.hashabledict(expected_igroup2)]) self.assertSetEqual(igroups, expected) def test_clone_lun(self): self.client.clone_lun( 'volume', 'fakeLUN', 'newFakeLUN', qos_policy_group_name=fake.QOS_POLICY_GROUP_NAME) self.assertEqual(1, self.connection.invoke_successfully.call_count) @ddt.data({'supports_is_backup': True, 'is_snapshot': True}, {'supports_is_backup': True, 'is_snapshot': False}, {'supports_is_backup': False, 'is_snapshot': True}, {'supports_is_backup': False, 'is_snapshot': False}) @ddt.unpack def test_clone_lun_is_snapshot(self, supports_is_backup, is_snapshot): self.client.features.add_feature('BACKUP_CLONE_PARAM', supported=supports_is_backup) self.client.clone_lun( 'volume', 'fakeLUN', 'newFakeLUN', is_snapshot=is_snapshot) clone_create_args = { 'volume': 'volume', 'source-path': 'fakeLUN', 'destination-path': 'newFakeLUN', 'space-reserve': 'true', } if is_snapshot and supports_is_backup: clone_create_args['is-backup'] = 'true' self.connection.invoke_successfully.assert_called_once_with( netapp_api.NaElement.create_node_with_children( 'clone-create', **clone_create_args), True) def test_clone_lun_multiple_zapi_calls(self): """Test for when lun clone requires more than one zapi call.""" # Max clone size per call = 2^18 blocks * 512 bytes/block = 128 MB # Force 2 calls bc = 2 ** 18 * 2 self.client.clone_lun('volume', 'fakeLUN', 'newFakeLUN', block_count=bc) self.assertEqual(2, self.connection.invoke_successfully.call_count) def test_get_lun_by_args(self): response = netapp_api.NaElement( etree.XML(""" 2 """)) self.connection.invoke_successfully.return_value = response lun = self.client.get_lun_by_args() self.assertEqual(1, len(lun)) def test_get_lun_by_args_no_lun_found(self): response = netapp_api.NaElement( etree.XML(""" 2 """)) self.connection.invoke_successfully.return_value = response lun = self.client.get_lun_by_args() self.assertEqual(0, len(lun)) def test_get_lun_by_args_with_args_specified(self): path = '/vol/%s/%s' % (self.fake_volume, self.fake_lun) response = netapp_api.NaElement( etree.XML(""" 2 """)) self.connection.invoke_successfully.return_value = response lun = self.client.get_lun_by_args(path=path) __, _args, __ = self.connection.invoke_successfully.mock_calls[0] actual_request = _args[0] query = actual_request.get_child_by_name('query') lun_info_args = query.get_child_by_name('lun-info').get_children() # Assert request is made with correct arguments self.assertEqual('path', lun_info_args[0].get_name()) self.assertEqual(path, lun_info_args[0].get_content()) self.assertEqual(1, len(lun)) def test_file_assign_qos(self): api_args = { 'volume': fake.FLEXVOL, 'qos-policy-group-name': fake.QOS_POLICY_GROUP_NAME, 'file': fake.NFS_FILE_PATH, 'vserver': self.vserver } self.client.file_assign_qos( fake.FLEXVOL, fake.QOS_POLICY_GROUP_NAME, fake.NFS_FILE_PATH) self.mock_send_request.assert_has_calls([ mock.call('file-assign-qos', api_args, False)]) def test_set_lun_qos_policy_group(self): api_args = { 'path': fake.LUN_PATH, 'qos-policy-group': fake.QOS_POLICY_GROUP_NAME, } self.client.set_lun_qos_policy_group( fake.LUN_PATH, fake.QOS_POLICY_GROUP_NAME) self.mock_send_request.assert_has_calls([ mock.call('lun-set-qos-policy-group', api_args)]) def test_provision_qos_policy_group_no_qos_policy_group_info(self): self.client.provision_qos_policy_group(qos_policy_group_info=None) self.assertEqual(0, self.connection.qos_policy_group_create.call_count) def test_provision_qos_policy_group_legacy_qos_policy_group_info(self): self.client.provision_qos_policy_group( qos_policy_group_info=fake.QOS_POLICY_GROUP_INFO_LEGACY) self.assertEqual(0, self.connection.qos_policy_group_create.call_count) def test_provision_qos_policy_group_with_qos_spec_create(self): self.mock_object(self.client, 'qos_policy_group_exists', return_value=False) self.mock_object(self.client, 'qos_policy_group_create') self.mock_object(self.client, 'qos_policy_group_modify') self.client.provision_qos_policy_group(fake.QOS_POLICY_GROUP_INFO) self.client.qos_policy_group_create.assert_has_calls([ mock.call(fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT)]) self.assertFalse(self.client.qos_policy_group_modify.called) def test_provision_qos_policy_group_with_qos_spec_modify(self): self.mock_object(self.client, 'qos_policy_group_exists', return_value=True) self.mock_object(self.client, 'qos_policy_group_create') self.mock_object(self.client, 'qos_policy_group_modify') self.client.provision_qos_policy_group(fake.QOS_POLICY_GROUP_INFO) self.assertFalse(self.client.qos_policy_group_create.called) self.client.qos_policy_group_modify.assert_has_calls([ mock.call(fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT)]) def test_qos_policy_group_exists(self): self.mock_send_request.return_value = netapp_api.NaElement( fake_client.QOS_POLICY_GROUP_GET_ITER_RESPONSE) result = self.client.qos_policy_group_exists( fake.QOS_POLICY_GROUP_NAME) api_args = { 'query': { 'qos-policy-group-info': { 'policy-group': fake.QOS_POLICY_GROUP_NAME, }, }, 'desired-attributes': { 'qos-policy-group-info': { 'policy-group': None, }, }, } self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-get-iter', api_args, False)]) self.assertTrue(result) def test_qos_policy_group_exists_not_found(self): self.mock_send_request.return_value = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) result = self.client.qos_policy_group_exists( fake.QOS_POLICY_GROUP_NAME) self.assertFalse(result) def test_qos_policy_group_create(self): api_args = { 'policy-group': fake.QOS_POLICY_GROUP_NAME, 'max-throughput': fake.MAX_THROUGHPUT, 'vserver': self.vserver, } self.client.qos_policy_group_create( fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT) self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-create', api_args, False)]) def test_qos_policy_group_modify(self): api_args = { 'policy-group': fake.QOS_POLICY_GROUP_NAME, 'max-throughput': fake.MAX_THROUGHPUT, } self.client.qos_policy_group_modify( fake.QOS_POLICY_GROUP_NAME, fake.MAX_THROUGHPUT) self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-modify', api_args, False)]) def test_qos_policy_group_delete(self): api_args = { 'policy-group': fake.QOS_POLICY_GROUP_NAME } self.client.qos_policy_group_delete( fake.QOS_POLICY_GROUP_NAME) self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-delete', api_args, False)]) def test_qos_policy_group_rename(self): new_name = 'new-' + fake.QOS_POLICY_GROUP_NAME api_args = { 'policy-group-name': fake.QOS_POLICY_GROUP_NAME, 'new-name': new_name, } self.client.qos_policy_group_rename( fake.QOS_POLICY_GROUP_NAME, new_name) self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-rename', api_args, False)]) def test_mark_qos_policy_group_for_deletion_no_qos_policy_group_info(self): mock_rename = self.mock_object(self.client, 'qos_policy_group_rename') mock_remove = self.mock_object(self.client, 'remove_unused_qos_policy_groups') self.client.mark_qos_policy_group_for_deletion( qos_policy_group_info=None) self.assertEqual(0, mock_rename.call_count) self.assertEqual(0, mock_remove.call_count) def test_mark_qos_policy_group_for_deletion_legacy_qos_policy(self): mock_rename = self.mock_object(self.client, 'qos_policy_group_rename') mock_remove = self.mock_object(self.client, 'remove_unused_qos_policy_groups') self.client.mark_qos_policy_group_for_deletion( qos_policy_group_info=fake.QOS_POLICY_GROUP_INFO_LEGACY) self.assertEqual(0, mock_rename.call_count) self.assertEqual(1, mock_remove.call_count) def test_mark_qos_policy_group_for_deletion_w_qos_spec(self): mock_rename = self.mock_object(self.client, 'qos_policy_group_rename') mock_remove = self.mock_object(self.client, 'remove_unused_qos_policy_groups') mock_log = self.mock_object(client_cmode.LOG, 'warning') new_name = 'deleted_cinder_%s' % fake.QOS_POLICY_GROUP_NAME self.client.mark_qos_policy_group_for_deletion( qos_policy_group_info=fake.QOS_POLICY_GROUP_INFO) mock_rename.assert_has_calls([ mock.call(fake.QOS_POLICY_GROUP_NAME, new_name)]) self.assertEqual(0, mock_log.call_count) self.assertEqual(1, mock_remove.call_count) def test_mark_qos_policy_group_for_deletion_exception_path(self): mock_rename = self.mock_object(self.client, 'qos_policy_group_rename') mock_rename.side_effect = netapp_api.NaApiError mock_remove = self.mock_object(self.client, 'remove_unused_qos_policy_groups') mock_log = self.mock_object(client_cmode.LOG, 'warning') new_name = 'deleted_cinder_%s' % fake.QOS_POLICY_GROUP_NAME self.client.mark_qos_policy_group_for_deletion( qos_policy_group_info=fake.QOS_POLICY_GROUP_INFO) mock_rename.assert_has_calls([ mock.call(fake.QOS_POLICY_GROUP_NAME, new_name)]) self.assertEqual(1, mock_log.call_count) self.assertEqual(1, mock_remove.call_count) def test_remove_unused_qos_policy_groups(self): mock_log = self.mock_object(client_cmode.LOG, 'debug') api_args = { 'query': { 'qos-policy-group-info': { 'policy-group': 'deleted_cinder_*', 'vserver': self.vserver, } }, 'max-records': 3500, 'continue-on-failure': 'true', 'return-success-list': 'false', 'return-failure-list': 'false', } self.client.remove_unused_qos_policy_groups() self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-delete-iter', api_args, False)]) self.assertEqual(0, mock_log.call_count) def test_remove_unused_qos_policy_groups_api_error(self): mock_log = self.mock_object(client_cmode.LOG, 'debug') api_args = { 'query': { 'qos-policy-group-info': { 'policy-group': 'deleted_cinder_*', 'vserver': self.vserver, } }, 'max-records': 3500, 'continue-on-failure': 'true', 'return-success-list': 'false', 'return-failure-list': 'false', } self.mock_send_request.side_effect = netapp_api.NaApiError self.client.remove_unused_qos_policy_groups() self.mock_send_request.assert_has_calls([ mock.call('qos-policy-group-delete-iter', api_args, False)]) self.assertEqual(1, mock_log.call_count) @mock.patch('cinder.volume.volume_utils.resolve_hostname', return_value='192.168.1.101') def test_get_if_info_by_ip_not_found(self, mock_resolve_hostname): fake_ip = '192.168.1.101' response = netapp_api.NaElement( etree.XML(""" 0 """)) self.connection.invoke_successfully.return_value = response self.assertRaises(exception.NotFound, self.client.get_if_info_by_ip, fake_ip) @mock.patch('cinder.volume.volume_utils.resolve_hostname', return_value='192.168.1.101') def test_get_if_info_by_ip(self, mock_resolve_hostname): fake_ip = '192.168.1.101' response = netapp_api.NaElement( etree.XML(""" 1 """)) self.connection.invoke_successfully.return_value = response results = self.client.get_if_info_by_ip(fake_ip) self.assertEqual(1, len(results)) def test_get_vol_by_junc_vserver_not_found(self): fake_vserver = 'fake_vserver' fake_junc = 'fake_junction_path' response = netapp_api.NaElement( etree.XML(""" 0 """)) self.connection.invoke_successfully.return_value = response self.assertRaises(exception.NotFound, self.client.get_vol_by_junc_vserver, fake_vserver, fake_junc) def test_get_vol_by_junc_vserver(self): fake_vserver = 'fake_vserver' fake_junc = 'fake_junction_path' expected_flex_vol = 'fake_flex_vol' response = netapp_api.NaElement( etree.XML(""" 1 %(flex_vol)s """ % {'flex_vol': expected_flex_vol})) self.connection.invoke_successfully.return_value = response actual_flex_vol = self.client.get_vol_by_junc_vserver(fake_vserver, fake_junc) self.assertEqual(expected_flex_vol, actual_flex_vol) def test_clone_file(self): expected_flex_vol = "fake_flex_vol" expected_src_path = "fake_src_path" expected_dest_path = "fake_dest_path" self.connection.get_api_version.return_value = (1, 20) self.client.clone_file(expected_flex_vol, expected_src_path, expected_dest_path, self.vserver, source_snapshot=fake.CG_SNAPSHOT_ID) __, _args, __ = self.connection.invoke_successfully.mock_calls[0] actual_request = _args[0] actual_flex_vol = actual_request.get_child_by_name('volume') \ .get_content() actual_src_path = actual_request \ .get_child_by_name('source-path').get_content() actual_dest_path = actual_request.get_child_by_name( 'destination-path').get_content() self.assertEqual(expected_flex_vol, actual_flex_vol) self.assertEqual(expected_src_path, actual_src_path) self.assertEqual(expected_dest_path, actual_dest_path) req_snapshot_child = actual_request.get_child_by_name('snapshot-name') self.assertEqual(fake.CG_SNAPSHOT_ID, req_snapshot_child.get_content()) self.assertIsNone(actual_request.get_child_by_name( 'destination-exists')) def test_clone_file_when_destination_exists(self): expected_flex_vol = "fake_flex_vol" expected_src_path = "fake_src_path" expected_dest_path = "fake_dest_path" self.connection.get_api_version.return_value = (1, 20) self.client.clone_file(expected_flex_vol, expected_src_path, expected_dest_path, self.vserver, dest_exists=True) __, _args, __ = self.connection.invoke_successfully.mock_calls[0] actual_request = _args[0] actual_flex_vol = actual_request.get_child_by_name('volume') \ .get_content() actual_src_path = actual_request \ .get_child_by_name('source-path').get_content() actual_dest_path = actual_request.get_child_by_name( 'destination-path').get_content() self.assertEqual(expected_flex_vol, actual_flex_vol) self.assertEqual(expected_src_path, actual_src_path) self.assertEqual(expected_dest_path, actual_dest_path) self.assertEqual('true', actual_request.get_child_by_name( 'destination-exists').get_content()) def test_clone_file_when_destination_exists_and_version_less_than_1_20( self): expected_flex_vol = "fake_flex_vol" expected_src_path = "fake_src_path" expected_dest_path = "fake_dest_path" self.connection.get_api_version.return_value = (1, 19) self.client.clone_file(expected_flex_vol, expected_src_path, expected_dest_path, self.vserver, dest_exists=True) __, _args, __ = self.connection.invoke_successfully.mock_calls[0] actual_request = _args[0] actual_flex_vol = actual_request.get_child_by_name('volume') \ .get_content() actual_src_path = actual_request \ .get_child_by_name('source-path').get_content() actual_dest_path = actual_request.get_child_by_name( 'destination-path').get_content() self.assertEqual(expected_flex_vol, actual_flex_vol) self.assertEqual(expected_src_path, actual_src_path) self.assertEqual(expected_dest_path, actual_dest_path) self.assertIsNone(actual_request.get_child_by_name( 'destination-exists')) @ddt.data({'supports_is_backup': True, 'is_snapshot': True}, {'supports_is_backup': True, 'is_snapshot': False}, {'supports_is_backup': False, 'is_snapshot': True}, {'supports_is_backup': False, 'is_snapshot': False}) @ddt.unpack def test_clone_file_is_snapshot(self, supports_is_backup, is_snapshot): self.connection.get_api_version.return_value = (1, 20) self.client.features.add_feature('BACKUP_CLONE_PARAM', supported=supports_is_backup) self.client.clone_file( 'volume', 'fake_source', 'fake_destination', 'fake_vserver', is_snapshot=is_snapshot) clone_create_args = { 'volume': 'volume', 'source-path': 'fake_source', 'destination-path': 'fake_destination', } if is_snapshot and supports_is_backup: clone_create_args['is-backup'] = 'true' self.connection.invoke_successfully.assert_called_once_with( netapp_api.NaElement.create_node_with_children( 'clone-create', **clone_create_args), True) def test_get_file_usage(self): expected_bytes = "2048" fake_vserver = 'fake_vserver' fake_path = 'fake_path' response = netapp_api.NaElement( etree.XML(""" %(unique-bytes)s """ % {'unique-bytes': expected_bytes})) self.connection.invoke_successfully.return_value = response actual_bytes = self.client.get_file_usage(fake_vserver, fake_path) self.assertEqual(expected_bytes, actual_bytes) def test_check_cluster_api(self): self.client.features.USER_CAPABILITY_LIST = True mock_check_cluster_api_legacy = self.mock_object( self.client, '_check_cluster_api_legacy') mock_check_cluster_api = self.mock_object( self.client, '_check_cluster_api', return_value=True) result = self.client.check_cluster_api('object', 'operation', 'api') self.assertTrue(result) self.assertFalse(mock_check_cluster_api_legacy.called) mock_check_cluster_api.assert_called_once_with( 'object', 'operation', 'api') def test_check_cluster_api_legacy(self): self.client.features.USER_CAPABILITY_LIST = False mock_check_cluster_api_legacy = self.mock_object( self.client, '_check_cluster_api_legacy', return_value=True) mock_check_cluster_api = self.mock_object( self.client, '_check_cluster_api') result = self.client.check_cluster_api('object', 'operation', 'api') self.assertTrue(result) self.assertFalse(mock_check_cluster_api.called) mock_check_cluster_api_legacy.assert_called_once_with('api') def test__check_cluster_api(self): api_response = netapp_api.NaElement( fake_client.SYSTEM_USER_CAPABILITY_GET_ITER_RESPONSE) self.mock_send_request.return_value = api_response result = self.client._check_cluster_api('object', 'operation', 'api') system_user_capability_get_iter_args = { 'query': { 'capability-info': { 'object-name': 'object', 'operation-list': { 'operation-info': { 'name': 'operation', }, }, }, }, 'desired-attributes': { 'capability-info': { 'operation-list': { 'operation-info': { 'api-name': None, }, }, }, }, } self.mock_send_request.assert_called_once_with( 'system-user-capability-get-iter', system_user_capability_get_iter_args, False) self.assertTrue(result) @ddt.data(fake_client.SYSTEM_USER_CAPABILITY_GET_ITER_RESPONSE, fake_client.NO_RECORDS_RESPONSE) def test__check_cluster_api_not_found(self, response): api_response = netapp_api.NaElement(response) self.mock_send_request.return_value = api_response result = self.client._check_cluster_api('object', 'operation', 'api4') self.assertFalse(result) @ddt.data('volume-get-iter', 'volume-get', 'aggr-options-list-info') def test__check_cluster_api_legacy(self, api): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_send_request.return_value = api_response result = self.client._check_cluster_api_legacy(api) self.assertTrue(result) self.mock_send_request.assert_called_once_with(api, enable_tunneling=False) @ddt.data(netapp_api.EAPIPRIVILEGE, netapp_api.EAPINOTFOUND) def test__check_cluster_api_legacy_insufficient_privileges(self, code): self.mock_send_request.side_effect = netapp_api.NaApiError(code=code) result = self.client._check_cluster_api_legacy('volume-get-iter') self.assertFalse(result) self.mock_send_request.assert_called_once_with('volume-get-iter', enable_tunneling=False) def test__check_cluster_api_legacy_api_error(self): self.mock_send_request.side_effect = netapp_api.NaApiError() result = self.client._check_cluster_api_legacy('volume-get-iter') self.assertTrue(result) self.mock_send_request.assert_called_once_with('volume-get-iter', enable_tunneling=False) def test__check_cluster_api_legacy_invalid_api(self): self.assertRaises(ValueError, self.client._check_cluster_api_legacy, 'fake_api') def test_get_operational_lif_addresses(self): expected_result = ['1.2.3.4', '99.98.97.96'] api_response = netapp_api.NaElement( fake_client.GET_OPERATIONAL_LIF_ADDRESSES_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) address_list = self.client.get_operational_lif_addresses() net_interface_get_iter_args = { 'query': { 'net-interface-info': { 'operational-status': 'up' } }, 'desired-attributes': { 'net-interface-info': { 'address': None, } } } self.client.send_iter_request.assert_called_once_with( 'net-interface-get-iter', net_interface_get_iter_args) self.assertEqual(expected_result, address_list) @ddt.data({'flexvol_path': '/fake/vol'}, {'flexvol_name': 'fake_volume'}, {'flexvol_path': '/fake/vol', 'flexvol_name': 'fake_volume'}) def test_get_flexvol_capacity(self, kwargs): api_response = netapp_api.NaElement( fake_client.VOLUME_GET_ITER_CAPACITY_RESPONSE) mock_send_iter_request = self.mock_object( self.client, 'send_iter_request', return_value=api_response) capacity = self.client.get_flexvol_capacity(**kwargs) volume_id_attributes = {} if 'flexvol_path' in kwargs: volume_id_attributes['junction-path'] = kwargs['flexvol_path'] if 'flexvol_name' in kwargs: volume_id_attributes['name'] = kwargs['flexvol_name'] volume_get_iter_args = { 'query': { 'volume-attributes': { 'volume-id-attributes': volume_id_attributes, } }, 'desired-attributes': { 'volume-attributes': { 'volume-space-attributes': { 'size-available': None, 'size-total': None, } } }, } mock_send_iter_request.assert_called_once_with( 'volume-get-iter', volume_get_iter_args) self.assertEqual(fake_client.VOLUME_SIZE_TOTAL, capacity['size-total']) self.assertEqual(fake_client.VOLUME_SIZE_AVAILABLE, capacity['size-available']) def test_get_flexvol_capacity_not_found(self): self.mock_send_request.return_value = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.assertRaises(netapp_utils.NetAppDriverException, self.client.get_flexvol_capacity, flexvol_path='fake_path') def test_list_flexvols(self): api_response = netapp_api.NaElement( fake_client.VOLUME_GET_ITER_LIST_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.list_flexvols() volume_get_iter_args = { 'query': { 'volume-attributes': { 'volume-id-attributes': { 'type': 'rw', 'style': 'flex', }, 'volume-state-attributes': { 'is-vserver-root': 'false', 'is-inconsistent': 'false', 'is-invalid': 'false', 'state': 'online', }, }, }, 'desired-attributes': { 'volume-attributes': { 'volume-id-attributes': { 'name': None, }, }, }, } self.client.send_iter_request.assert_called_once_with( 'volume-get-iter', volume_get_iter_args) self.assertEqual(list(fake_client.VOLUME_NAMES), result) def test_list_flexvols_not_found(self): api_response = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.list_flexvols() self.assertEqual([], result) def test_get_flexvol(self): api_response = netapp_api.NaElement( fake_client.VOLUME_GET_ITER_SSC_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_flexvol( flexvol_name=fake_client.VOLUME_NAMES[0], flexvol_path='/%s' % fake_client.VOLUME_NAMES[0]) volume_get_iter_args = { 'query': { 'volume-attributes': { 'volume-id-attributes': { 'name': fake_client.VOLUME_NAMES[0], 'junction-path': '/' + fake_client.VOLUME_NAMES[0], 'type': 'rw', 'style': 'flex', }, 'volume-state-attributes': { 'is-vserver-root': 'false', 'is-inconsistent': 'false', 'is-invalid': 'false', 'state': 'online', }, }, }, 'desired-attributes': { 'volume-attributes': { 'volume-id-attributes': { 'name': None, 'owning-vserver-name': None, 'junction-path': None, 'type': None, 'containing-aggregate-name': None, }, 'volume-mirror-attributes': { 'is-data-protection-mirror': None, 'is-replica-volume': None, }, 'volume-space-attributes': { 'is-space-guarantee-enabled': None, 'space-guarantee': None, 'percentage-snapshot-reserve': None, 'size': None, }, 'volume-qos-attributes': { 'policy-group-name': None, }, 'volume-snapshot-attributes': { 'snapshot-policy': None, }, 'volume-language-attributes': { 'language-code': None, } }, }, } self.client.send_iter_request.assert_called_once_with( 'volume-get-iter', volume_get_iter_args) self.assertEqual(fake_client.VOLUME_INFO_SSC, result) def test_get_flexvol_not_found(self): api_response = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) self.assertRaises(exception.VolumeBackendAPIException, self.client.get_flexvol, flexvol_name=fake_client.VOLUME_NAMES[0]) def test_create_flexvol(self): self.mock_object(self.client.connection, 'send_request') self.client.create_flexvol( fake_client.VOLUME_NAME, fake_client.VOLUME_AGGREGATE_NAME, 100) volume_create_args = { 'containing-aggr-name': fake_client.VOLUME_AGGREGATE_NAME, 'size': '100g', 'volume': fake_client.VOLUME_NAME, 'volume-type': 'rw', 'junction-path': '/%s' % fake_client.VOLUME_NAME, } self.client.connection.send_request.assert_called_once_with( 'volume-create', volume_create_args) @ddt.data('dp', 'rw', None) def test_create_volume_with_extra_specs(self, volume_type): self.mock_object(self.client, 'enable_flexvol_dedupe') self.mock_object(self.client, 'enable_flexvol_compression') self.mock_object(self.client.connection, 'send_request') self.client.create_flexvol( fake_client.VOLUME_NAME, fake_client.VOLUME_AGGREGATE_NAME, 100, space_guarantee_type='volume', language='en-US', snapshot_policy='default', dedupe_enabled=True, compression_enabled=True, snapshot_reserve=15, volume_type=volume_type) volume_create_args = { 'containing-aggr-name': fake_client.VOLUME_AGGREGATE_NAME, 'size': '100g', 'volume': fake_client.VOLUME_NAME, 'space-reserve': 'volume', 'language-code': 'en-US', 'volume-type': volume_type, 'percentage-snapshot-reserve': '15', } if volume_type != 'dp': volume_create_args['snapshot-policy'] = 'default' volume_create_args['junction-path'] = ('/%s' % fake_client.VOLUME_NAME) self.client.connection.send_request.assert_called_with( 'volume-create', volume_create_args) self.client.enable_flexvol_dedupe.assert_called_once_with( fake_client.VOLUME_NAME) self.client.enable_flexvol_compression.assert_called_once_with( fake_client.VOLUME_NAME) def test_flexvol_exists(self): api_response = netapp_api.NaElement( fake_client.VOLUME_GET_NAME_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.flexvol_exists(fake_client.VOLUME_NAME) volume_get_iter_args = { 'query': { 'volume-attributes': { 'volume-id-attributes': { 'name': fake_client.VOLUME_NAME } } }, 'desired-attributes': { 'volume-attributes': { 'volume-id-attributes': { 'name': None } } } } self.client.send_iter_request.assert_has_calls([ mock.call('volume-get-iter', volume_get_iter_args)]) self.assertTrue(result) def test_flexvol_exists_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) self.assertFalse(self.client.flexvol_exists(fake_client.VOLUME_NAME)) def test_rename_flexvol(self): self.mock_object(self.client.connection, 'send_request') self.client.rename_flexvol(fake_client.VOLUME_NAME, 'new_name') volume_rename_api_args = { 'volume': fake_client.VOLUME_NAME, 'new-volume-name': 'new_name', } self.client.connection.send_request.assert_called_once_with( 'volume-rename', volume_rename_api_args) def test_mount_flexvol_default_junction_path(self): self.mock_object(self.client.connection, 'send_request') self.client.mount_flexvol(fake_client.VOLUME_NAME) volume_mount_args = { 'volume-name': fake_client.VOLUME_NAME, 'junction-path': '/%s' % fake_client.VOLUME_NAME, } self.client.connection.send_request.assert_has_calls([ mock.call('volume-mount', volume_mount_args)]) def test_mount_flexvol(self): self.mock_object(self.client.connection, 'send_request') fake_path = '/fake_path' self.client.mount_flexvol(fake_client.VOLUME_NAME, junction_path=fake_path) volume_mount_args = { 'volume-name': fake_client.VOLUME_NAME, 'junction-path': fake_path, } self.client.connection.send_request.assert_has_calls([ mock.call('volume-mount', volume_mount_args)]) def test_enable_flexvol_dedupe(self): self.mock_object(self.client.connection, 'send_request') self.client.enable_flexvol_dedupe(fake_client.VOLUME_NAME) sis_enable_args = {'path': '/vol/%s' % fake_client.VOLUME_NAME} self.client.connection.send_request.assert_called_once_with( 'sis-enable', sis_enable_args) def test_disable_flexvol_dedupe(self): self.mock_object(self.client.connection, 'send_request') self.client.disable_flexvol_dedupe(fake_client.VOLUME_NAME) sis_disable_args = {'path': '/vol/%s' % fake_client.VOLUME_NAME} self.client.connection.send_request.assert_called_once_with( 'sis-disable', sis_disable_args) def test_enable_flexvol_compression(self): self.mock_object(self.client.connection, 'send_request') self.client.enable_flexvol_compression(fake_client.VOLUME_NAME) sis_set_config_args = { 'path': '/vol/%s' % fake_client.VOLUME_NAME, 'enable-compression': 'true' } self.client.connection.send_request.assert_called_once_with( 'sis-set-config', sis_set_config_args) def test_disable_flexvol_compression(self): self.mock_object(self.client.connection, 'send_request') self.client.disable_flexvol_compression(fake_client.VOLUME_NAME) sis_set_config_args = { 'path': '/vol/%s' % fake_client.VOLUME_NAME, 'enable-compression': 'false' } self.client.connection.send_request.assert_called_once_with( 'sis-set-config', sis_set_config_args) def test_get_flexvol_dedupe_info(self): api_response = netapp_api.NaElement( fake_client.SIS_GET_ITER_SSC_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_flexvol_dedupe_info( fake_client.VOLUME_NAMES[0]) sis_get_iter_args = { 'query': { 'sis-status-info': { 'path': '/vol/%s' % fake_client.VOLUME_NAMES[0], }, }, 'desired-attributes': { 'sis-status-info': { 'state': None, 'is-compression-enabled': None, 'logical-data-size': None, 'logical-data-limit': None, }, }, } self.client.send_iter_request.assert_called_once_with( 'sis-get-iter', sis_get_iter_args) self.assertEqual(fake_client.VOLUME_DEDUPE_INFO_SSC, result) def test_get_flexvol_dedupe_info_no_logical_data_values(self): api_response = netapp_api.NaElement( fake_client.SIS_GET_ITER_SSC_NO_LOGICAL_DATA_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_flexvol_dedupe_info( fake_client.VOLUME_NAMES[0]) self.assertEqual(fake_client.VOLUME_DEDUPE_INFO_SSC_NO_LOGICAL_DATA, result) def test_get_flexvol_dedupe_info_not_found(self): api_response = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_flexvol_dedupe_info( fake_client.VOLUME_NAMES[0]) self.assertEqual(fake_client.VOLUME_DEDUPE_INFO_SSC_NO_LOGICAL_DATA, result) def test_get_flexvol_dedupe_info_api_error(self): self.mock_object(self.client, 'send_iter_request', side_effect=self._mock_api_error()) result = self.client.get_flexvol_dedupe_info( fake_client.VOLUME_NAMES[0]) self.assertEqual(fake_client.VOLUME_DEDUPE_INFO_SSC_NO_LOGICAL_DATA, result) def test_get_flexvol_dedupe_info_api_insufficient_privileges(self): api_error = netapp_api.NaApiError(code=netapp_api.EAPIPRIVILEGE) self.mock_object(self.client, 'send_iter_request', side_effect=api_error) result = self.client.get_flexvol_dedupe_info( fake_client.VOLUME_NAMES[0]) self.assertEqual(fake_client.VOLUME_DEDUPE_INFO_SSC_NO_LOGICAL_DATA, result) def test_get_flexvol_dedupe_used_percent(self): self.client.features.add_feature('CLONE_SPLIT_STATUS') mock_get_flexvol_dedupe_info = self.mock_object( self.client, 'get_flexvol_dedupe_info', return_value=fake_client.VOLUME_DEDUPE_INFO_SSC) mock_get_clone_split_info = self.mock_object( self.client, 'get_clone_split_info', return_value=fake_client.VOLUME_CLONE_SPLIT_STATUS) result = self.client.get_flexvol_dedupe_used_percent( fake_client.VOLUME_NAMES[0]) self.assertEqual(75.0, result) mock_get_flexvol_dedupe_info.assert_called_once_with( fake_client.VOLUME_NAMES[0]) mock_get_clone_split_info.assert_called_once_with( fake_client.VOLUME_NAMES[0]) def test_get_flexvol_dedupe_used_percent_not_supported(self): self.client.features.add_feature('CLONE_SPLIT_STATUS', supported=False) mock_get_flexvol_dedupe_info = self.mock_object( self.client, 'get_flexvol_dedupe_info', return_value=fake_client.VOLUME_DEDUPE_INFO_SSC) mock_get_clone_split_info = self.mock_object( self.client, 'get_clone_split_info', return_value=fake_client.VOLUME_CLONE_SPLIT_STATUS) result = self.client.get_flexvol_dedupe_used_percent( fake_client.VOLUME_NAMES[0]) self.assertEqual(0.0, result) self.assertFalse(mock_get_flexvol_dedupe_info.called) self.assertFalse(mock_get_clone_split_info.called) def test_get_clone_split_info(self): api_response = netapp_api.NaElement( fake_client.CLONE_SPLIT_STATUS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_clone_split_info(fake_client.VOLUME_NAMES[0]) self.assertEqual(fake_client.VOLUME_CLONE_SPLIT_STATUS, result) self.client.connection.send_request.assert_called_once_with( 'clone-split-status', {'volume-name': fake_client.VOLUME_NAMES[0]}) def test_get_clone_split_info_api_error(self): self.mock_object(self.client.connection, 'send_request', side_effect=self._mock_api_error()) result = self.client.get_clone_split_info(fake_client.VOLUME_NAMES[0]) expected = {'unsplit-size': 0, 'unsplit-clone-count': 0} self.assertEqual(expected, result) def test_get_clone_split_info_no_data(self): api_response = netapp_api.NaElement( fake_client.CLONE_SPLIT_STATUS_NO_DATA_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_clone_split_info(fake_client.VOLUME_NAMES[0]) expected = {'unsplit-size': 0, 'unsplit-clone-count': 0} self.assertEqual(expected, result) def test_is_flexvol_mirrored(self): api_response = netapp_api.NaElement( fake_client.SNAPMIRROR_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.is_flexvol_mirrored( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) snapmirror_get_iter_args = { 'query': { 'snapmirror-info': { 'source-vserver': fake_client.VOLUME_VSERVER_NAME, 'source-volume': fake_client.VOLUME_NAMES[0], 'mirror-state': 'snapmirrored', 'relationship-type': 'data_protection', }, }, 'desired-attributes': { 'snapmirror-info': None, }, } self.client.send_iter_request.assert_called_once_with( 'snapmirror-get-iter', snapmirror_get_iter_args) self.assertTrue(result) def test_is_flexvol_mirrored_not_mirrored(self): api_response = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.is_flexvol_mirrored( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) self.assertFalse(result) def test_is_flexvol_mirrored_api_error(self): self.mock_object(self.client.connection, 'send_request', side_effect=self._mock_api_error()) result = self.client.is_flexvol_mirrored( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) self.assertFalse(result) def test_is_flexvol_encrypted(self): api_response = netapp_api.NaElement( fake_client.VOLUME_GET_ITER_ENCRYPTION_SSC_RESPONSE) self.client.features.add_feature('FLEXVOL_ENCRYPTION') self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.is_flexvol_encrypted( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) volume_get_iter_args = { 'query': { 'volume-attributes': { 'encrypt': 'true', 'volume-id-attributes': { 'name': fake_client.VOLUME_NAME, 'owning-vserver-name': fake_client.VOLUME_VSERVER_NAME, } } }, 'desired-attributes': { 'volume-attributes': { 'encrypt': None, } } } self.client.send_iter_request.assert_called_once_with( 'volume-get-iter', volume_get_iter_args) self.assertTrue(result) def test_is_flexvol_encrypted_unsupported_version(self): self.client.features.add_feature('FLEXVOL_ENCRYPTION', supported=False) result = self.client.is_flexvol_encrypted( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) self.assertFalse(result) def test_is_flexvol_encrypted_no_records_found(self): api_response = netapp_api.NaElement( fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.is_flexvol_encrypted( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) self.assertFalse(result) def test_is_flexvol_encrypted_api_error(self): self.mock_object(self.client.connection, 'send_request', side_effect=self._mock_api_error()) result = self.client.is_flexvol_encrypted( fake_client.VOLUME_NAMES[0], fake_client.VOLUME_VSERVER_NAME) self.assertFalse(result) def test_get_aggregates(self): api_response = netapp_api.NaElement( fake_client.AGGR_GET_ITER_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client._get_aggregates() self.client.connection.send_request.assert_has_calls([ mock.call('aggr-get-iter', {}, enable_tunneling=False)]) self.assertListEqual( [aggr.to_string() for aggr in api_response.get_child_by_name( 'attributes-list').get_children()], [aggr.to_string() for aggr in result]) def test_get_aggregates_with_filters(self): api_response = netapp_api.NaElement( fake_client.AGGR_GET_SPACE_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) desired_attributes = { 'aggr-attributes': { 'aggregate-name': None, 'aggr-space-attributes': { 'size-total': None, 'size-available': None, } } } result = self.client._get_aggregates( aggregate_names=fake_client.VOLUME_AGGREGATE_NAMES, desired_attributes=desired_attributes) aggr_get_iter_args = { 'query': { 'aggr-attributes': { 'aggregate-name': '|'.join( fake_client.VOLUME_AGGREGATE_NAMES), } }, 'desired-attributes': desired_attributes } self.client.connection.send_request.assert_has_calls([ mock.call('aggr-get-iter', aggr_get_iter_args, enable_tunneling=False)]) self.assertListEqual( [aggr.to_string() for aggr in api_response.get_child_by_name( 'attributes-list').get_children()], [aggr.to_string() for aggr in result]) def test_get_aggregates_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client._get_aggregates() self.client.connection.send_request.assert_has_calls([ mock.call('aggr-get-iter', {}, enable_tunneling=False)]) self.assertListEqual([], result) def test_get_node_for_aggregate(self): api_response = netapp_api.NaElement( fake_client.AGGR_GET_NODE_RESPONSE).get_child_by_name( 'attributes-list').get_children() self.mock_object(self.client, '_get_aggregates', return_value=api_response) result = self.client.get_node_for_aggregate( fake_client.VOLUME_AGGREGATE_NAME) desired_attributes = { 'aggr-attributes': { 'aggregate-name': None, 'aggr-ownership-attributes': { 'home-name': None, }, }, } self.client._get_aggregates.assert_has_calls([ mock.call( aggregate_names=[fake_client.VOLUME_AGGREGATE_NAME], desired_attributes=desired_attributes)]) self.assertEqual(fake_client.NODE_NAME, result) def test_get_node_for_aggregate_none_requested(self): result = self.client.get_node_for_aggregate(None) self.assertIsNone(result) def test_get_node_for_aggregate_api_not_found(self): api_error = self._mock_api_error(netapp_api.EAPINOTFOUND) self.mock_object(self.client.connection, 'send_request', side_effect=api_error) result = self.client.get_node_for_aggregate( fake_client.VOLUME_AGGREGATE_NAME) self.assertIsNone(result) def test_get_node_for_aggregate_api_error(self): self.mock_object(self.client.connection, 'send_request', self._mock_api_error()) self.assertRaises(netapp_api.NaApiError, self.client.get_node_for_aggregate, fake_client.VOLUME_AGGREGATE_NAME) def test_get_node_for_aggregate_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_node_for_aggregate( fake_client.VOLUME_AGGREGATE_NAME) self.assertIsNone(result) def test_get_aggregate_none_specified(self): result = self.client.get_aggregate('') self.assertEqual({}, result) def test_get_aggregate(self): api_response = netapp_api.NaElement( fake_client.AGGR_GET_ITER_SSC_RESPONSE).get_child_by_name( 'attributes-list').get_children() self.mock_object(self.client, '_get_aggregates', return_value=api_response) result = self.client.get_aggregate(fake_client.VOLUME_AGGREGATE_NAME) desired_attributes = { 'aggr-attributes': { 'aggregate-name': None, 'aggr-raid-attributes': { 'raid-type': None, 'is-hybrid': None, }, }, } self.client._get_aggregates.assert_has_calls([ mock.call( aggregate_names=[fake_client.VOLUME_AGGREGATE_NAME], desired_attributes=desired_attributes)]) expected = { 'name': fake_client.VOLUME_AGGREGATE_NAME, 'raid-type': 'raid_dp', 'is-hybrid': True, } self.assertEqual(expected, result) def test_get_aggregate_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_aggregate(fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual({}, result) def test_get_aggregate_api_error(self): self.mock_object(self.client.connection, 'send_request', side_effect=self._mock_api_error()) result = self.client.get_aggregate(fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual({}, result) def test_get_aggregate_api_not_found(self): api_error = netapp_api.NaApiError(code=netapp_api.EAPINOTFOUND) self.mock_object(self.client.connection, 'send_iter_request', side_effect=api_error) result = self.client.get_aggregate(fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual({}, result) def test_list_cluster_nodes(self): api_response = netapp_api.NaElement( fake_client.SYSTEM_NODE_GET_ITER_RESPONSE) self.mock_object(self.client.connection, 'send_request', mock.Mock(return_value=api_response)) result = self.client.list_cluster_nodes() self.assertListEqual([fake_client.NODE_NAME], result) def test_list_cluster_nodes_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', mock.Mock(return_value=api_response)) result = self.client.list_cluster_nodes() self.assertListEqual([], result) @ddt.data({'types': {'FCAL'}, 'expected': ['FCAL']}, {'types': {'SATA', 'SSD'}, 'expected': ['SATA', 'SSD']},) @ddt.unpack def test_get_aggregate_disk_types(self, types, expected): mock_get_aggregate_disk_types = self.mock_object( self.client, '_get_aggregate_disk_types', return_value=types) result = self.client.get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) self.assertItemsEqual(expected, result) mock_get_aggregate_disk_types.assert_called_once_with( fake_client.VOLUME_AGGREGATE_NAME) def test_get_aggregate_disk_types_not_found(self): mock_get_aggregate_disk_types = self.mock_object( self.client, '_get_aggregate_disk_types', return_value=set()) result = self.client.get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) self.assertIsNone(result) mock_get_aggregate_disk_types.assert_called_once_with( fake_client.VOLUME_AGGREGATE_NAME) def test_get_aggregate_disk_types_api_not_found(self): api_error = netapp_api.NaApiError(code=netapp_api.EAPINOTFOUND) self.mock_object(self.client, 'send_iter_request', side_effect=api_error) result = self.client.get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) self.assertIsNone(result) def test_get_aggregate_disk_types_shared(self): self.client.features.add_feature('ADVANCED_DISK_PARTITIONING') mock_get_aggregate_disk_types = self.mock_object( self.client, '_get_aggregate_disk_types', side_effect=[set(['SSD']), set(['SATA'])]) result = self.client.get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) self.assertIsInstance(result, list) self.assertItemsEqual(['SATA', 'SSD'], result) mock_get_aggregate_disk_types.assert_has_calls([ mock.call(fake_client.VOLUME_AGGREGATE_NAME), mock.call(fake_client.VOLUME_AGGREGATE_NAME, shared=True), ]) def test__get_aggregate_disk_types(self): api_response = netapp_api.NaElement( fake_client.STORAGE_DISK_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client._get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) storage_disk_get_iter_args = { 'query': { 'storage-disk-info': { 'disk-raid-info': { 'disk-aggregate-info': { 'aggregate-name': fake_client.VOLUME_AGGREGATE_NAME, }, }, }, }, 'desired-attributes': { 'storage-disk-info': { 'disk-raid-info': { 'effective-disk-type': None, }, }, }, } self.client.send_iter_request.assert_called_once_with( 'storage-disk-get-iter', storage_disk_get_iter_args, enable_tunneling=False) expected = set(fake_client.AGGREGATE_DISK_TYPES) self.assertEqual(expected, result) def test__get_aggregate_disk_types_shared(self): api_response = netapp_api.NaElement( fake_client.STORAGE_DISK_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client._get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME, shared=True) storage_disk_get_iter_args = { 'query': { 'storage-disk-info': { 'disk-raid-info': { 'disk-shared-info': { 'aggregate-list': { 'shared-aggregate-info': { 'aggregate-name': fake_client.VOLUME_AGGREGATE_NAME, }, }, }, }, }, }, 'desired-attributes': { 'storage-disk-info': { 'disk-raid-info': { 'effective-disk-type': None, }, }, }, } self.client.send_iter_request.assert_called_once_with( 'storage-disk-get-iter', storage_disk_get_iter_args, enable_tunneling=False) expected = set(fake_client.AGGREGATE_DISK_TYPES) self.assertEqual(expected, result) def test__get_aggregate_disk_types_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client._get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual(set(), result) def test__get_aggregate_disk_types_api_error(self): self.mock_object(self.client, 'send_iter_request', side_effect=self._mock_api_error()) result = self.client._get_aggregate_disk_types( fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual(set([]), result) def test_get_aggregate_capacities(self): aggr1_capacities = { 'percent-used': 50, 'size-available': 100.0, 'size-total': 200.0, } aggr2_capacities = { 'percent-used': 75, 'size-available': 125.0, 'size-total': 500.0, } mock_get_aggregate_capacity = self.mock_object( self.client, 'get_aggregate_capacity', side_effect=[aggr1_capacities, aggr2_capacities]) result = self.client.get_aggregate_capacities(['aggr1', 'aggr2']) expected = { 'aggr1': aggr1_capacities, 'aggr2': aggr2_capacities, } self.assertEqual(expected, result) mock_get_aggregate_capacity.assert_has_calls([ mock.call('aggr1'), mock.call('aggr2'), ]) def test_get_aggregate_capacities_not_found(self): mock_get_aggregate_capacity = self.mock_object( self.client, 'get_aggregate_capacity', side_effect=[{}, {}]) result = self.client.get_aggregate_capacities(['aggr1', 'aggr2']) expected = { 'aggr1': {}, 'aggr2': {}, } self.assertEqual(expected, result) mock_get_aggregate_capacity.assert_has_calls([ mock.call('aggr1'), mock.call('aggr2'), ]) def test_get_aggregate_capacities_not_list(self): result = self.client.get_aggregate_capacities('aggr1') self.assertEqual({}, result) def test_get_aggregate_capacity(self): api_response = netapp_api.NaElement( fake_client.AGGR_GET_ITER_CAPACITY_RESPONSE).get_child_by_name( 'attributes-list').get_children() self.mock_object(self.client, '_get_aggregates', return_value=api_response) result = self.client.get_aggregate_capacity( fake_client.VOLUME_AGGREGATE_NAME) desired_attributes = { 'aggr-attributes': { 'aggr-space-attributes': { 'percent-used-capacity': None, 'size-available': None, 'size-total': None, }, }, } self.client._get_aggregates.assert_has_calls([ mock.call( aggregate_names=[fake_client.VOLUME_AGGREGATE_NAME], desired_attributes=desired_attributes)]) expected = { 'percent-used': float(fake_client.AGGR_USED_PERCENT), 'size-available': float(fake_client.AGGR_SIZE_AVAILABLE), 'size-total': float(fake_client.AGGR_SIZE_TOTAL), } self.assertEqual(expected, result) def test_get_aggregate_capacity_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_aggregate_capacity( fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual({}, result) def test_get_aggregate_capacity_api_error(self): self.mock_object(self.client.connection, 'send_request', side_effect=self._mock_api_error()) result = self.client.get_aggregate_capacity( fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual({}, result) def test_get_aggregate_capacity_api_not_found(self): api_error = netapp_api.NaApiError(code=netapp_api.EAPINOTFOUND) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) result = self.client.get_aggregate_capacity( fake_client.VOLUME_AGGREGATE_NAME) self.assertEqual({}, result) def test_get_performance_instance_uuids(self): self.mock_send_request.return_value = netapp_api.NaElement( fake_client.PERF_OBJECT_INSTANCE_LIST_INFO_ITER_RESPONSE) result = self.client.get_performance_instance_uuids( 'system', fake_client.NODE_NAME) expected = [fake_client.NODE_NAME + ':kernel:system'] self.assertEqual(expected, result) perf_object_instance_list_info_iter_args = { 'objectname': 'system', 'query': { 'instance-info': { 'uuid': fake_client.NODE_NAME + ':*', } } } self.mock_send_request.assert_called_once_with( 'perf-object-instance-list-info-iter', perf_object_instance_list_info_iter_args, enable_tunneling=False) def test_get_performance_counters(self): self.mock_send_request.return_value = netapp_api.NaElement( fake_client.PERF_OBJECT_GET_INSTANCES_SYSTEM_RESPONSE_CMODE) instance_uuids = [ fake_client.NODE_NAMES[0] + ':kernel:system', fake_client.NODE_NAMES[1] + ':kernel:system', ] counter_names = ['avg_processor_busy'] result = self.client.get_performance_counters('system', instance_uuids, counter_names) expected = [ { 'avg_processor_busy': '5674745133134', 'instance-name': 'system', 'instance-uuid': instance_uuids[0], 'node-name': fake_client.NODE_NAMES[0], 'timestamp': '1453412013', }, { 'avg_processor_busy': '4077649009234', 'instance-name': 'system', 'instance-uuid': instance_uuids[1], 'node-name': fake_client.NODE_NAMES[1], 'timestamp': '1453412013' }, ] self.assertEqual(expected, result) perf_object_get_instances_args = { 'objectname': 'system', 'instance-uuids': [ {'instance-uuid': instance_uuid} for instance_uuid in instance_uuids ], 'counters': [ {'counter': counter} for counter in counter_names ], } self.mock_send_request.assert_called_once_with( 'perf-object-get-instances', perf_object_get_instances_args, enable_tunneling=False) def test_check_iscsi_initiator_exists_when_no_initiator_exists(self): self.connection.invoke_successfully = mock.Mock( side_effect=netapp_api.NaApiError) initiator = fake_client.INITIATOR_IQN initiator_exists = self.client.check_iscsi_initiator_exists(initiator) self.assertFalse(initiator_exists) def test_check_iscsi_initiator_exists_when_initiator_exists(self): self.connection.invoke_successfully = mock.Mock() initiator = fake_client.INITIATOR_IQN initiator_exists = self.client.check_iscsi_initiator_exists(initiator) self.assertTrue(initiator_exists) def test_set_iscsi_chap_authentication_no_previous_initiator(self): self.connection.invoke_successfully = mock.Mock() self.mock_object(self.client, 'check_iscsi_initiator_exists', return_value=False) ssh = mock.Mock(paramiko.SSHClient) sshpool = mock.Mock(ssh_utils.SSHPool) self.client.ssh_client.ssh_pool = sshpool self.mock_object(self.client.ssh_client, 'execute_command_with_prompt') sshpool.item().__enter__ = mock.Mock(return_value=ssh) sshpool.item().__exit__ = mock.Mock(return_value=False) self.client.set_iscsi_chap_authentication(fake_client.INITIATOR_IQN, fake_client.USER_NAME, fake_client.PASSWORD) command = ('iscsi security create -vserver fake_vserver ' '-initiator-name iqn.2015-06.com.netapp:fake_iqn ' '-auth-type CHAP -user-name fake_user') self.client.ssh_client.execute_command_with_prompt.assert_has_calls( [mock.call(ssh, command, 'Password:', fake_client.PASSWORD)] ) def test_set_iscsi_chap_authentication_with_preexisting_initiator(self): self.connection.invoke_successfully = mock.Mock() self.mock_object(self.client, 'check_iscsi_initiator_exists', return_value=True) ssh = mock.Mock(paramiko.SSHClient) sshpool = mock.Mock(ssh_utils.SSHPool) self.client.ssh_client.ssh_pool = sshpool self.mock_object(self.client.ssh_client, 'execute_command_with_prompt') sshpool.item().__enter__ = mock.Mock(return_value=ssh) sshpool.item().__exit__ = mock.Mock(return_value=False) self.client.set_iscsi_chap_authentication(fake_client.INITIATOR_IQN, fake_client.USER_NAME, fake_client.PASSWORD) command = ('iscsi security modify -vserver fake_vserver ' '-initiator-name iqn.2015-06.com.netapp:fake_iqn ' '-auth-type CHAP -user-name fake_user') self.client.ssh_client.execute_command_with_prompt.assert_has_calls( [mock.call(ssh, command, 'Password:', fake_client.PASSWORD)] ) def test_set_iscsi_chap_authentication_with_ssh_exception(self): self.connection.invoke_successfully = mock.Mock() self.mock_object(self.client, 'check_iscsi_initiator_exists', return_value=True) ssh = mock.Mock(paramiko.SSHClient) sshpool = mock.Mock(ssh_utils.SSHPool) self.client.ssh_client.ssh_pool = sshpool sshpool.item().__enter__ = mock.Mock(return_value=ssh) sshpool.item().__enter__.side_effect = paramiko.SSHException( 'Connection Failure') sshpool.item().__exit__ = mock.Mock(return_value=False) self.assertRaises(exception.VolumeBackendAPIException, self.client.set_iscsi_chap_authentication, fake_client.INITIATOR_IQN, fake_client.USER_NAME, fake_client.PASSWORD) def test_get_snapshot_if_snapshot_present_not_busy(self): expected_vol_name = fake.SNAPSHOT['volume_id'] expected_snapshot_name = fake.SNAPSHOT['name'] response = netapp_api.NaElement( fake_client.SNAPSHOT_INFO_FOR_PRESENT_NOT_BUSY_SNAPSHOT_CMODE) self.mock_send_request.return_value = response snapshot = self.client.get_snapshot(expected_vol_name, expected_snapshot_name) self.assertEqual(expected_vol_name, snapshot['volume']) self.assertEqual(expected_snapshot_name, snapshot['name']) self.assertEqual(set([]), snapshot['owners']) self.assertFalse(snapshot['busy']) def test_get_snapshot_if_snapshot_present_busy(self): expected_vol_name = fake.SNAPSHOT['volume_id'] expected_snapshot_name = fake.SNAPSHOT['name'] response = netapp_api.NaElement( fake_client.SNAPSHOT_INFO_FOR_PRESENT_BUSY_SNAPSHOT_CMODE) self.mock_send_request.return_value = response snapshot = self.client.get_snapshot(expected_vol_name, expected_snapshot_name) self.assertEqual(expected_vol_name, snapshot['volume']) self.assertEqual(expected_snapshot_name, snapshot['name']) self.assertEqual(set([]), snapshot['owners']) self.assertTrue(snapshot['busy']) def test_get_snapshot_if_snapshot_not_present(self): expected_vol_name = fake.SNAPSHOT['volume_id'] expected_snapshot_name = fake.SNAPSHOT['name'] response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_send_request.return_value = response self.assertRaises(exception.SnapshotNotFound, self.client.get_snapshot, expected_vol_name, expected_snapshot_name) def test_create_cluster_peer(self): self.mock_object(self.client.connection, 'send_request') self.client.create_cluster_peer(['fake_address_1', 'fake_address_2'], 'fake_user', 'fake_password', 'fake_passphrase') cluster_peer_create_args = { 'peer-addresses': [ {'remote-inet-address': 'fake_address_1'}, {'remote-inet-address': 'fake_address_2'}, ], 'user-name': 'fake_user', 'password': 'fake_password', 'passphrase': 'fake_passphrase', } self.client.connection.send_request.assert_has_calls([ mock.call('cluster-peer-create', cluster_peer_create_args)]) def test_get_cluster_peers(self): api_response = netapp_api.NaElement( fake_client.CLUSTER_PEER_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_cluster_peers() cluster_peer_get_iter_args = {} self.client.send_iter_request.assert_has_calls([ mock.call('cluster-peer-get-iter', cluster_peer_get_iter_args)]) expected = [{ 'active-addresses': [ fake_client.CLUSTER_ADDRESS_1, fake_client.CLUSTER_ADDRESS_2 ], 'availability': 'available', 'cluster-name': fake_client.CLUSTER_NAME, 'cluster-uuid': 'fake_uuid', 'peer-addresses': [fake_client.CLUSTER_ADDRESS_1], 'remote-cluster-name': fake_client.REMOTE_CLUSTER_NAME, 'serial-number': 'fake_serial_number', 'timeout': '60', }] self.assertEqual(expected, result) def test_get_cluster_peers_single(self): api_response = netapp_api.NaElement( fake_client.CLUSTER_PEER_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) self.client.get_cluster_peers( remote_cluster_name=fake_client.CLUSTER_NAME) cluster_peer_get_iter_args = { 'query': { 'cluster-peer-info': { 'remote-cluster-name': fake_client.CLUSTER_NAME, } }, } self.client.send_iter_request.assert_has_calls([ mock.call('cluster-peer-get-iter', cluster_peer_get_iter_args)]) def test_get_cluster_peers_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_cluster_peers( remote_cluster_name=fake_client.CLUSTER_NAME) self.assertEqual([], result) self.assertTrue(self.client.send_iter_request.called) def test_delete_cluster_peer(self): self.mock_object(self.client.connection, 'send_request') self.client.delete_cluster_peer(fake_client.CLUSTER_NAME) cluster_peer_delete_args = {'cluster-name': fake_client.CLUSTER_NAME} self.client.connection.send_request.assert_has_calls([ mock.call('cluster-peer-delete', cluster_peer_delete_args)]) def test_get_cluster_peer_policy(self): self.client.features.add_feature('CLUSTER_PEER_POLICY') api_response = netapp_api.NaElement( fake_client.CLUSTER_PEER_POLICY_GET_RESPONSE) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_cluster_peer_policy() expected = { 'is-unauthenticated-access-permitted': False, 'passphrase-minimum-length': 8, } self.assertEqual(expected, result) self.assertTrue(self.client.connection.send_request.called) def test_get_cluster_peer_policy_not_supported(self): result = self.client.get_cluster_peer_policy() self.assertEqual({}, result) def test_set_cluster_peer_policy_not_supported(self): self.mock_object(self.client.connection, 'send_request') self.client.set_cluster_peer_policy() self.assertFalse(self.client.connection.send_request.called) def test_set_cluster_peer_policy_no_arguments(self): self.client.features.add_feature('CLUSTER_PEER_POLICY') self.mock_object(self.client.connection, 'send_request') self.client.set_cluster_peer_policy() self.assertFalse(self.client.connection.send_request.called) def test_set_cluster_peer_policy(self): self.client.features.add_feature('CLUSTER_PEER_POLICY') self.mock_object(self.client.connection, 'send_request') self.client.set_cluster_peer_policy( is_unauthenticated_access_permitted=True, passphrase_minimum_length=12) cluster_peer_policy_modify_args = { 'is-unauthenticated-access-permitted': 'true', 'passphrase-minlength': '12', } self.client.connection.send_request.assert_has_calls([ mock.call('cluster-peer-policy-modify', cluster_peer_policy_modify_args)]) def test_create_vserver_peer(self): self.mock_object(self.client.connection, 'send_request') self.client.create_vserver_peer('fake_vserver', 'fake_vserver_peer') vserver_peer_create_args = { 'vserver': 'fake_vserver', 'peer-vserver': 'fake_vserver_peer', 'applications': [ {'vserver-peer-application': 'snapmirror'}, ], } self.client.connection.send_request.assert_has_calls([ mock.call('vserver-peer-create', vserver_peer_create_args)]) def test_delete_vserver_peer(self): self.mock_object(self.client.connection, 'send_request') self.client.delete_vserver_peer('fake_vserver', 'fake_vserver_peer') vserver_peer_delete_args = { 'vserver': 'fake_vserver', 'peer-vserver': 'fake_vserver_peer', } self.client.connection.send_request.assert_has_calls([ mock.call('vserver-peer-delete', vserver_peer_delete_args)]) def test_accept_vserver_peer(self): self.mock_object(self.client.connection, 'send_request') self.client.accept_vserver_peer('fake_vserver', 'fake_vserver_peer') vserver_peer_accept_args = { 'vserver': 'fake_vserver', 'peer-vserver': 'fake_vserver_peer', } self.client.connection.send_request.assert_has_calls([ mock.call('vserver-peer-accept', vserver_peer_accept_args)]) def test_get_vserver_peers(self): api_response = netapp_api.NaElement( fake_client.VSERVER_PEER_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_vserver_peers( vserver_name=fake_client.VSERVER_NAME, peer_vserver_name=fake_client.VSERVER_NAME_2) vserver_peer_get_iter_args = { 'query': { 'vserver-peer-info': { 'vserver': fake_client.VSERVER_NAME, 'peer-vserver': fake_client.VSERVER_NAME_2, } }, } self.client.send_iter_request.assert_has_calls([ mock.call('vserver-peer-get-iter', vserver_peer_get_iter_args)]) expected = [{ 'vserver': 'fake_vserver', 'peer-vserver': 'fake_vserver_2', 'peer-state': 'peered', 'peer-cluster': 'fake_cluster' }] self.assertEqual(expected, result) def test_get_vserver_peers_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client.get_vserver_peers( vserver_name=fake_client.VSERVER_NAME, peer_vserver_name=fake_client.VSERVER_NAME_2) self.assertEqual([], result) self.assertTrue(self.client.send_iter_request.called) def test_ensure_snapmirror_v2(self): self.assertIsNone(self.client._ensure_snapmirror_v2()) def test_ensure_snapmirror_v2_not_supported(self): self.client.features.add_feature('SNAPMIRROR_V2', supported=False) self.assertRaises(netapp_utils.NetAppDriverException, self.client._ensure_snapmirror_v2) @ddt.data({'schedule': 'fake_schedule', 'policy': 'fake_policy'}, {'schedule': None, 'policy': None}) @ddt.unpack def test_create_snapmirror(self, schedule, policy): self.mock_object(self.client.connection, 'send_request') self.client.create_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, schedule=schedule, policy=policy) snapmirror_create_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, 'relationship-type': 'data_protection', } if schedule: snapmirror_create_args['schedule'] = schedule if policy: snapmirror_create_args['policy'] = policy self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-create', snapmirror_create_args)]) def test_create_snapmirror_already_exists(self): api_error = netapp_api.NaApiError(code=netapp_api.ERELATION_EXISTS) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.client.create_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_create_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, 'relationship-type': 'data_protection', } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-create', snapmirror_create_args)]) def test_create_snapmirror_error(self): api_error = netapp_api.NaApiError(code=0) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.assertRaises(netapp_api.NaApiError, self.client.create_snapmirror, fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) self.assertTrue(self.client.connection.send_request.called) @ddt.data( { 'source_snapshot': 'fake_snapshot', 'transfer_priority': 'fake_priority' }, { 'source_snapshot': None, 'transfer_priority': None } ) @ddt.unpack def test_initialize_snapmirror(self, source_snapshot, transfer_priority): api_response = netapp_api.NaElement( fake_client.SNAPMIRROR_INITIALIZE_RESULT) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.initialize_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, source_snapshot=source_snapshot, transfer_priority=transfer_priority) snapmirror_initialize_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } if source_snapshot: snapmirror_initialize_args['source-snapshot'] = source_snapshot if transfer_priority: snapmirror_initialize_args['transfer-priority'] = transfer_priority self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-initialize', snapmirror_initialize_args)]) expected = { 'operation-id': None, 'status': 'succeeded', 'jobid': None, 'error-code': None, 'error-message': None } self.assertEqual(expected, result) @ddt.data(True, False) def test_release_snapmirror(self, relationship_info_only): self.mock_object(self.client.connection, 'send_request') self.client.release_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, relationship_info_only=relationship_info_only) snapmirror_release_args = { 'query': { 'snapmirror-destination-info': { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, 'relationship-info-only': ('true' if relationship_info_only else 'false'), } } } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-release-iter', snapmirror_release_args)]) def test_quiesce_snapmirror(self): self.mock_object(self.client.connection, 'send_request') self.client.quiesce_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_quiesce_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-quiesce', snapmirror_quiesce_args)]) @ddt.data(True, False) def test_abort_snapmirror(self, clear_checkpoint): self.mock_object(self.client.connection, 'send_request') self.client.abort_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, clear_checkpoint=clear_checkpoint) snapmirror_abort_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, 'clear-checkpoint': 'true' if clear_checkpoint else 'false', } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-abort', snapmirror_abort_args)]) def test_abort_snapmirror_no_transfer_in_progress(self): api_error = netapp_api.NaApiError( code=netapp_api.ENOTRANSFER_IN_PROGRESS) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.client.abort_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_abort_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, 'clear-checkpoint': 'false', } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-abort', snapmirror_abort_args)]) def test_abort_snapmirror_error(self): api_error = netapp_api.NaApiError(code=0) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.assertRaises(netapp_api.NaApiError, self.client.abort_snapmirror, fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) def test_break_snapmirror(self): self.mock_object(self.client.connection, 'send_request') self.client.break_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_break_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-break', snapmirror_break_args)]) @ddt.data( { 'schedule': 'fake_schedule', 'policy': 'fake_policy', 'tries': 5, 'max_transfer_rate': 1024, }, { 'schedule': None, 'policy': None, 'tries': None, 'max_transfer_rate': None, } ) @ddt.unpack def test_modify_snapmirror(self, schedule, policy, tries, max_transfer_rate): self.mock_object(self.client.connection, 'send_request') self.client.modify_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, schedule=schedule, policy=policy, tries=tries, max_transfer_rate=max_transfer_rate) snapmirror_modify_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } if schedule: snapmirror_modify_args['schedule'] = schedule if policy: snapmirror_modify_args['policy'] = policy if tries: snapmirror_modify_args['tries'] = tries if max_transfer_rate: snapmirror_modify_args['max-transfer-rate'] = max_transfer_rate self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-modify', snapmirror_modify_args)]) def test_delete_snapmirror(self): self.mock_object(self.client.connection, 'send_request') self.client.delete_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_delete_args = { 'query': { 'snapmirror-info': { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } } } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-destroy-iter', snapmirror_delete_args)]) def test_update_snapmirror(self): self.mock_object(self.client.connection, 'send_request') self.client.update_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_update_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-update', snapmirror_update_args)]) def test_update_snapmirror_already_transferring(self): api_error = netapp_api.NaApiError( code=netapp_api.ETRANSFER_IN_PROGRESS) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.client.update_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_update_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-update', snapmirror_update_args)]) def test_update_snapmirror_already_transferring_two(self): api_error = netapp_api.NaApiError(code=netapp_api.EANOTHER_OP_ACTIVE) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.client.update_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_update_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-update', snapmirror_update_args)]) def test_update_snapmirror_error(self): api_error = netapp_api.NaApiError(code=0) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.assertRaises(netapp_api.NaApiError, self.client.update_snapmirror, fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) def test_resume_snapmirror(self): self.mock_object(self.client.connection, 'send_request') self.client.resume_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_resume_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-resume', snapmirror_resume_args)]) def test_resume_snapmirror_not_quiesed(self): api_error = netapp_api.NaApiError( code=netapp_api.ERELATION_NOT_QUIESCED) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.client.resume_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_resume_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-resume', snapmirror_resume_args)]) def test_resume_snapmirror_error(self): api_error = netapp_api.NaApiError(code=0) self.mock_object( self.client.connection, 'send_request', side_effect=api_error) self.assertRaises(netapp_api.NaApiError, self.client.resume_snapmirror, fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) def test_resync_snapmirror(self): self.mock_object(self.client.connection, 'send_request') self.client.resync_snapmirror( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME) snapmirror_resync_args = { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, } self.client.connection.send_request.assert_has_calls([ mock.call('snapmirror-resync', snapmirror_resync_args)]) def test__get_snapmirrors(self): api_response = netapp_api.NaElement( fake_client.SNAPMIRROR_GET_ITER_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) desired_attributes = { 'snapmirror-info': { 'source-vserver': None, 'source-volume': None, 'destination-vserver': None, 'destination-volume': None, 'is-healthy': None, } } result = self.client._get_snapmirrors( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, desired_attributes=desired_attributes) snapmirror_get_iter_args = { 'query': { 'snapmirror-info': { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, }, }, 'desired-attributes': { 'snapmirror-info': { 'source-vserver': None, 'source-volume': None, 'destination-vserver': None, 'destination-volume': None, 'is-healthy': None, }, }, } self.client.send_iter_request.assert_has_calls([ mock.call('snapmirror-get-iter', snapmirror_get_iter_args)]) self.assertEqual(1, len(result)) def test__get_snapmirrors_not_found(self): api_response = netapp_api.NaElement(fake_client.NO_RECORDS_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) result = self.client._get_snapmirrors() self.client.send_iter_request.assert_has_calls([ mock.call('snapmirror-get-iter', {})]) self.assertEqual([], result) def test_get_snapmirrors(self): api_response = netapp_api.NaElement( fake_client.SNAPMIRROR_GET_ITER_FILTERED_RESPONSE) self.mock_object(self.client, 'send_iter_request', return_value=api_response) desired_attributes = ['source-vserver', 'source-volume', 'destination-vserver', 'destination-volume', 'is-healthy', 'mirror-state', 'schedule'] result = self.client.get_snapmirrors( fake_client.SM_SOURCE_VSERVER, fake_client.SM_SOURCE_VOLUME, fake_client.SM_DEST_VSERVER, fake_client.SM_DEST_VOLUME, desired_attributes=desired_attributes) snapmirror_get_iter_args = { 'query': { 'snapmirror-info': { 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, }, }, 'desired-attributes': { 'snapmirror-info': { 'source-vserver': None, 'source-volume': None, 'destination-vserver': None, 'destination-volume': None, 'is-healthy': None, 'mirror-state': None, 'schedule': None, }, }, } expected = [{ 'source-vserver': fake_client.SM_SOURCE_VSERVER, 'source-volume': fake_client.SM_SOURCE_VOLUME, 'destination-vserver': fake_client.SM_DEST_VSERVER, 'destination-volume': fake_client.SM_DEST_VOLUME, 'is-healthy': 'true', 'mirror-state': 'snapmirrored', 'schedule': 'daily', }] self.client.send_iter_request.assert_has_calls([ mock.call('snapmirror-get-iter', snapmirror_get_iter_args)]) self.assertEqual(expected, result) def test_get_provisioning_options_from_flexvol(self): self.mock_object(self.client, 'get_flexvol', return_value=fake_client.VOLUME_INFO_SSC) self.mock_object(self.client, 'get_flexvol_dedupe_info', return_value=fake_client.VOLUME_DEDUPE_INFO_SSC) expected_prov_opts = { 'aggregate': 'fake_aggr1', 'compression_enabled': False, 'dedupe_enabled': True, 'language': 'en_US', 'size': 1, 'snapshot_policy': 'default', 'snapshot_reserve': '5', 'space_guarantee_type': 'none', 'volume_type': 'rw' } actual_prov_opts = self.client.get_provisioning_options_from_flexvol( fake_client.VOLUME_NAME) self.assertEqual(expected_prov_opts, actual_prov_opts) def test_wait_for_busy_snapshot(self): # Need to mock sleep as it is called by @utils.retry self.mock_object(time, 'sleep') mock_get_snapshot = self.mock_object( self.client, 'get_snapshot', return_value=fake.SNAPSHOT ) self.client.wait_for_busy_snapshot(fake.FLEXVOL, fake.SNAPSHOT_NAME) mock_get_snapshot.assert_called_once_with(fake.FLEXVOL, fake.SNAPSHOT_NAME) def test_wait_for_busy_snapshot_raise_exception(self): # Need to mock sleep as it is called by @utils.retry self.mock_object(time, 'sleep') BUSY_SNAPSHOT = dict(fake.SNAPSHOT) BUSY_SNAPSHOT['busy'] = True mock_get_snapshot = self.mock_object( self.client, 'get_snapshot', return_value=BUSY_SNAPSHOT ) self.assertRaises(exception.SnapshotIsBusy, self.client.wait_for_busy_snapshot, fake.FLEXVOL, fake.SNAPSHOT_NAME) calls = [ mock.call(fake.FLEXVOL, fake.SNAPSHOT_NAME), mock.call(fake.FLEXVOL, fake.SNAPSHOT_NAME), mock.call(fake.FLEXVOL, fake.SNAPSHOT_NAME), ] mock_get_snapshot.assert_has_calls(calls) @ddt.data({ 'mock_return': fake_client.SNAPSHOT_INFO_FOR_PRESENT_NOT_BUSY_SNAPSHOT_CMODE, 'expected': [{ 'name': fake.SNAPSHOT_NAME, 'instance_id': 'abcd-ef01-2345-6789', 'volume_name': fake.SNAPSHOT['volume_id'], }] }, { 'mock_return': fake_client.NO_RECORDS_RESPONSE, 'expected': [], }) @ddt.unpack def test_get_snapshots_marked_for_deletion(self, mock_return, expected): api_response = netapp_api.NaElement(mock_return) self.mock_object(self.client.connection, 'send_request', return_value=api_response) result = self.client.get_snapshots_marked_for_deletion() api_args = { 'query': { 'snapshot-info': { 'name': client_base.DELETED_PREFIX + '*', 'vserver': self.vserver, 'busy': 'false' }, }, 'desired-attributes': { 'snapshot-info': { 'name': None, 'volume': None, 'snapshot-instance-uuid': None, } }, } self.client.connection.send_request.assert_called_once_with( 'snapshot-get-iter', api_args) self.assertListEqual(expected, result)