manila/manila/tests/share/drivers/dell_emc/plugins/vnx/test_connection.py

2370 lines
96 KiB
Python

# Copyright (c) 2015 EMC Corporation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import copy
from unittest import mock
import ddt
from oslo_log import log
from manila import exception
from manila.share.drivers.dell_emc.common.enas import connector
from manila.share.drivers.dell_emc.common.enas import utils as enas_utils
from manila.share.drivers.dell_emc.plugins.vnx import connection
from manila.share.drivers.dell_emc.plugins.vnx import object_manager
from manila import test
from manila.tests import fake_share
from manila.tests.share.drivers.dell_emc.common.enas import fakes
from manila.tests.share.drivers.dell_emc.common.enas import utils
LOG = log.getLogger(__name__)
@ddt.ddt
class StorageConnectionTestCase(test.TestCase):
@mock.patch.object(connector.XMLAPIConnector, "_do_setup", mock.Mock())
def setUp(self):
super(StorageConnectionTestCase, self).setUp()
self.emc_share_driver = fakes.FakeEMCShareDriver()
self.connection = connection.VNXStorageConnection(LOG)
self.pool = fakes.PoolTestData()
self.vdm = fakes.VDMTestData()
self.mover = fakes.MoverTestData()
self.fs = fakes.FileSystemTestData()
self.mount = fakes.MountPointTestData()
self.snap = fakes.SnapshotTestData()
self.cifs_share = fakes.CIFSShareTestData()
self.nfs_share = fakes.NFSShareTestData()
self.cifs_server = fakes.CIFSServerTestData()
self.dns = fakes.DNSDomainTestData()
with mock.patch.object(connector.XMLAPIConnector, 'request',
mock.Mock()):
self.connection.connect(self.emc_share_driver, None)
def test_check_for_setup_error(self):
hook = utils.RequestSideEffect()
hook.append(self.mover.resp_get_ref_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
with mock.patch.object(connection.VNXStorageConnection,
'_get_managed_storage_pools',
mock.Mock()):
self.connection.check_for_setup_error()
expected_calls = [mock.call(self.mover.req_get_ref())]
xml_req_mock.assert_has_calls(expected_calls)
def test_check_for_setup_error_with_invalid_mover_name(self):
hook = utils.RequestSideEffect()
hook.append(self.mover.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.InvalidParameterValue,
self.connection.check_for_setup_error)
expected_calls = [mock.call(self.mover.req_get_ref())]
xml_req_mock.assert_has_calls(expected_calls)
@ddt.data({'pool_conf': None,
'real_pools': ['fake_pool', 'nas_pool'],
'matched_pool': set()},
{'pool_conf': [],
'real_pools': ['fake_pool', 'nas_pool'],
'matched_pool': set()},
{'pool_conf': ['*'],
'real_pools': ['fake_pool', 'nas_pool'],
'matched_pool': {'fake_pool', 'nas_pool'}},
{'pool_conf': ['fake_*'],
'real_pools': ['fake_pool', 'nas_pool', 'Perf_Pool'],
'matched_pool': {'fake_pool'}},
{'pool_conf': ['*pool'],
'real_pools': ['fake_pool', 'NAS_Pool', 'Perf_POOL'],
'matched_pool': {'fake_pool'}},
{'pool_conf': ['nas_pool'],
'real_pools': ['fake_pool', 'nas_pool', 'perf_pool'],
'matched_pool': {'nas_pool'}})
@ddt.unpack
def test__get_managed_storage_pools(self, pool_conf, real_pools,
matched_pool):
with mock.patch.object(object_manager.StoragePool,
'get_all',
mock.Mock(return_value=('ok', real_pools))):
pool = self.connection._get_managed_storage_pools(pool_conf)
self.assertEqual(matched_pool, pool)
def test__get_managed_storage_pools_failed_to_get_pool_info(self):
hook = utils.RequestSideEffect()
hook.append(self.pool.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
pool_conf = fakes.FakeData.pool_name
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection._get_managed_storage_pools,
pool_conf)
expected_calls = [mock.call(self.pool.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
@ddt.data(
{'pool_conf': ['fake_*'],
'real_pools': ['nas_pool', 'Perf_Pool']},
{'pool_conf': ['*pool'],
'real_pools': ['NAS_Pool', 'Perf_POOL']},
{'pool_conf': ['nas_pool'],
'real_pools': ['fake_pool', 'perf_pool']},
)
@ddt.unpack
def test__get_managed_storage_pools_without_matched_pool(self, pool_conf,
real_pools):
with mock.patch.object(object_manager.StoragePool,
'get_all',
mock.Mock(return_value=('ok', real_pools))):
self.assertRaises(exception.InvalidParameterValue,
self.connection._get_managed_storage_pools,
pool_conf)
def test_create_cifs_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_create_on_vdm()),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.cifs_share.cmd_disable_access(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location,
[{'path': r'\\%s\%s' % (
fakes.FakeData.network_allocations_ip1,
share['name'])}],
'CIFS export path is incorrect')
def test_create_cifs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_create_on_vdm()),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.cifs_share.cmd_disable_access(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(
location,
[{'path': r'\\%s.ipv6-literal.net\%s' % (
fakes.FakeData.network_allocations_ip3.replace(':', '-'),
share['name'])}],
'CIFS export path is incorrect'
)
def test_create_nfs_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.pool.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.pool.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.fs.req_create_on_vdm()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.nfs_share.cmd_create(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location,
[{'path': '192.168.1.2:/%s' % share['name']}],
'NFS export path is incorrect')
def test_create_nfs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.pool.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.pool.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.fs.req_create_on_vdm()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.nfs_share.cmd_create(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location,
[{'path': '[%s]:/%s' % (
fakes.FakeData.network_allocations_ip4,
share['name'])}],
'NFS export path is incorrect')
def test_create_cifs_share_without_share_server(self):
share = fakes.CIFS_SHARE
self.assertRaises(exception.InvalidInput,
self.connection.create_share,
None, share, None)
def test_create_cifs_share_without_share_server_name(self):
share = fakes.CIFS_SHARE
share_server = copy.deepcopy(fakes.SHARE_SERVER)
share_server['backend_details']['share_server_name'] = None
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.create_share,
None, share, share_server)
def test_create_cifs_share_with_invalide_cifs_server_name(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.create_share,
None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_cifs_share_without_interface_in_cifs_server(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_without_interface(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.create_share,
None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_create_on_vdm()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_cifs_share_without_pool_name(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(host='HostA@BackendB',
share_proto='CIFS')
self.assertRaises(exception.InvalidHost,
self.connection.create_share,
None, share, share_server)
def test_create_cifs_share_from_snapshot(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.cifs_share.cmd_disable_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location,
[{'path': r'\\192.168.1.1\%s' % share['name']}],
'CIFS export path is incorrect')
def test_create_cifs_share_from_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.cifs_share.cmd_disable_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(
location,
[{'path': r'\\%s.ipv6-literal.net\%s' % (
fakes.FakeData.network_allocations_ip3.replace(':', '-'),
share['name'])}],
'CIFS export path is incorrect')
def test_create_nfs_share_from_snapshot(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [mock.call(self.fs.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.nfs_share.cmd_create(), True)
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location,
[{'path': '192.168.1.2:/%s' % share['name']}],
'NFS export path is incorrect')
def test_create_nfs_share_from_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [mock.call(self.fs.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.nfs_share.cmd_create(), True)
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location,
[{'path': '[%s]:/%s' % (
fakes.FakeData.network_allocations_ip4,
share['name'])}],
'NFS export path is incorrect')
def test_create_share_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
self.assertRaises(exception.InvalidShare,
self.connection.create_share,
context=None,
share=share,
share_server=share_server)
def test_create_share_from_snapshot_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
snapshot = fake_share.fake_snapshot()
self.assertRaises(exception.InvalidShare,
self.connection.create_share_from_snapshot,
None, share, snapshot, share_server)
def test_create_share_from_snapshot_without_pool_name(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(host='HostA@BackendB',
share_proto='CIFS')
snapshot = fake_share.fake_snapshot()
self.assertRaises(exception.InvalidHost,
self.connection.create_share_from_snapshot,
None, share, snapshot, share_server)
def test_delete_cifs_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.cifs_share.resp_get_succeed(self.vdm.vdm_id))
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_share.resp_task_succeed())
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.cifs_share.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_share.req_delete(self.vdm.vdm_id)),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_cifs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.cifs_share.resp_get_succeed(self.vdm.vdm_id))
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_share.resp_task_succeed())
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.cifs_share.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_share.req_delete(self.vdm.vdm_id)),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_nfs_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=self.nfs_share.rw_hosts,
ro_hosts=self.nfs_share.ro_hosts))
ssh_hook.append(self.nfs_share.output_delete_succeed())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), False),
mock.call(self.nfs_share.cmd_delete(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_delete_nfs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=self.nfs_share.rw_hosts,
ro_hosts=self.nfs_share.ro_hosts))
ssh_hook.append(self.nfs_share.output_delete_succeed())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), False),
mock.call(self.nfs_share.cmd_delete(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_delete_share_without_share_server(self):
share = fakes.CIFS_SHARE
self.connection.delete_share(None, share)
def test_delete_share_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
self.assertRaises(exception.InvalidShare,
self.connection.delete_share,
context=None,
share=share,
share_server=share_server)
def test_delete_cifs_share_with_nonexistent_mount_and_filesystem(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.cifs_share.resp_get_succeed(self.vdm.vdm_id))
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_share.resp_task_succeed())
hook.append(self.mount.resp_task_error())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.cifs_share.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_share.req_delete(self.vdm.vdm_id)),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
new_size = fakes.FakeData.new_size
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.extend_share(share, new_size, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_extend()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
new_size = fakes.FakeData.new_size
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.extend_share(share, new_size, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_extend()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share_without_pool_name(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(host='HostA@BackendB',
share_proto='CIFS')
new_size = fakes.FakeData.new_size
self.assertRaises(exception.InvalidHost,
self.connection.extend_share,
share, new_size, share_server)
def test_create_snapshot(self):
share_server = fakes.SHARE_SERVER
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.create_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.snap.req_create()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.create_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.snap.req_create()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_snapshot_with_incorrect_share_info(self):
share_server = fakes.SHARE_SERVER
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_but_not_found())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.create_snapshot,
None, snapshot, share_server)
expected_calls = [mock.call(self.fs.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_snapshot(self):
share_server = fakes.SHARE_SERVER
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.snap.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.snap.req_get()),
mock.call(self.snap.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.snap.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.snap.req_get()),
mock.call(self.snap.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_but_not_found())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.vdm.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.dns.resp_task_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.setup_server(fakes.NETWORK_INFO, None)
if_name_1 = fakes.FakeData.interface_name1
if_name_2 = fakes.FakeData.interface_name2
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.vdm.req_create()),
mock.call(self.mover.req_create_interface(
if_name=if_name_1,
ip=fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_create_interface(
if_name=if_name_2,
ip=fakes.FakeData.network_allocations_ip2)),
mock.call(self.dns.req_create()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_attach_nfs_interface(), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_ipv6(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_but_not_found())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.vdm.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.dns.resp_task_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.setup_server(fakes.NETWORK_INFO_IPV6, None)
if_name_1 = fakes.FakeData.interface_name3
if_name_2 = fakes.FakeData.interface_name4
expect_ip_1 = fakes.FakeData.network_allocations_ip3
expect_ip_2 = fakes.FakeData.network_allocations_ip4
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.vdm.req_create()),
mock.call(self.mover.req_create_interface_with_ipv6(
if_name=if_name_1,
ip=expect_ip_1)),
mock.call(self.mover.req_create_interface_with_ipv6(
if_name=if_name_2,
ip=expect_ip_2)),
mock.call(self.dns.req_create(
ip_addr=fakes.FakeData.dns_ipv6_address)),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_create(
self.vdm.vdm_id,
ip_addr=fakes.FakeData.network_allocations_ip3)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_attach_nfs_interface(
interface=fakes.FakeData.interface_name4), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_existing_vdm(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.dns.resp_task_succeed())
hook.append(self.cifs_server.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.setup_server(fakes.NETWORK_INFO, None)
if_name_1 = fakes.FakeData.network_allocations_id1[-12:]
if_name_2 = fakes.FakeData.network_allocations_id2[-12:]
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_create_interface(
if_name=if_name_1,
ip=fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_create_interface(
if_name=if_name_2,
ip=fakes.FakeData.network_allocations_ip2)),
mock.call(self.dns.req_create()),
mock.call(self.cifs_server.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_attach_nfs_interface(), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_setup_server_with_invalid_security_service(self):
network_info = copy.deepcopy(fakes.NETWORK_INFO)
network_info['security_services'][0]['type'] = 'fake_type'
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.setup_server,
network_info, None)
@utils.patch_get_managed_ports_vnx(
side_effect=exception.EMCVnxXMLAPIError(
err="Get managed ports fail."))
def test_setup_server_without_valid_physical_device(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_but_not_found())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.vdm.resp_task_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_without_value())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm(nfs_interface=''))
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.setup_server,
fakes.NETWORK_INFO, None)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.vdm.req_create()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_exception(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_but_not_found())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.vdm.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_error())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_without_value())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm(nfs_interface=''))
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.setup_server,
fakes.NETWORK_INFO, None)
if_name_1 = fakes.FakeData.network_allocations_id1[-12:]
if_name_2 = fakes.FakeData.network_allocations_id2[-12:]
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.vdm.req_create()),
mock.call(self.mover.req_create_interface(
if_name=if_name_1,
ip=fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_create_interface(
if_name=if_name_2,
ip=fakes.FakeData.network_allocations_ip2)),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip1)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.cifs_server.resp_task_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False))
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL,
fakes.SECURITY_SERVICE)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_server.req_modify(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False)),
mock.call(self.cifs_server.req_delete(self.vdm.vdm_id)),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip2)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_with_ipv6(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.cifs_server.resp_task_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False))
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL_IPV6,
fakes.SECURITY_SERVICE_IPV6)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_server.req_modify(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False)),
mock.call(self.cifs_server.req_delete(self.vdm.vdm_id)),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip3)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip4)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_without_server_detail(self):
self.connection.teardown_server(None, fakes.SECURITY_SERVICE)
def test_teardown_server_without_security_services(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL, [])
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip2)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_without_share_server_name_in_server_detail(self):
server_detail = {
'cifs_if': fakes.FakeData.network_allocations_ip1,
'nfs_if': fakes.FakeData.network_allocations_ip2,
}
self.connection.teardown_server(server_detail, fakes.SECURITY_SERVICE)
def test_teardown_server_with_invalid_server_name(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.teardown_server(fakes.SERVER_DETAIL,
fakes.SECURITY_SERVICE)
expected_calls = [mock.call(self.vdm.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
def test_teardown_server_without_cifs_server(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_error())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.cifs_server.resp_task_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False))
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL,
fakes.SECURITY_SERVICE)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip2)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_with_invalid_cifs_server_modification(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.cifs_server.resp_task_error())
hook.append(self.cifs_server.resp_task_succeed())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL,
fakes.SECURITY_SERVICE)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_server.req_modify(self.vdm.vdm_id)),
mock.call(self.cifs_server.req_delete(self.vdm.vdm_id)),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip1)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip2)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_add_cifs_rw(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [access], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_add_cifs_rw_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [access], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_deny_nfs(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [], [access],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts,
ro_hosts=self.nfs_share.ro_hosts), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_deny_nfs_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [], [access],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts_ipv6,
ro_hosts=self.nfs_share.ro_hosts_ipv6), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_nfs_rule(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
hosts = ['192.168.1.5']
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=hosts,
ro_hosts=[]))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=hosts,
ro_hosts=[]), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_nfs_rule_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
hosts = ['fdf8:f53b:82e1::5']
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=hosts,
ro_hosts=[]))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=hosts,
ro_hosts=[]), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_cifs_rule(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_hook.append(fakes.FakeData.cifs_access)
ssh_hook.append('Command succeeded')
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
mock.call(self.cifs_share.cmd_get_access(), True),
mock.call(self.cifs_share.cmd_change_access(
action='revoke', user='guest'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_cifs_rule_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_hook.append(fakes.FakeData.cifs_access)
ssh_hook.append('Command succeeded')
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
mock.call(self.cifs_share.cmd_get_access(), True),
mock.call(self.cifs_share.cmd_change_access(
action='revoke', user='guest'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_cifs_clear_access_server_not_found(self):
server = fakes.SHARE_SERVER
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
cifs_server_name='cifs_server_name'))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection._cifs_clear_access,
'share_name', server, None)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_allow_cifs_rw_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_rw_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_ro_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_ro_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_ro_access_without_share_server_name(self):
share = fakes.CIFS_SHARE
share_server = copy.deepcopy(fakes.SHARE_SERVER)
share_server['backend_details'].pop('share_server_name')
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_access_with_invalid_access_level(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fake_share.fake_access(access_level='fake_level')
self.assertRaises(exception.InvalidShareAccessLevel,
self.connection.allow_access,
None, share, access, share_server)
def test_allow_access_with_invalid_share_server_name(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.allow_access,
None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_allow_nfs_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=rw_hosts, ro_hosts=self.nfs_share.ro_hosts), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_nfs_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=rw_hosts,
ro_hosts=self.nfs_share.ro_hosts_ipv6),
True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_access_with_incorrect_access_type(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fake_share.fake_access(access_type='fake_type')
self.assertRaises(exception.InvalidShareAccess,
self.connection.allow_access,
None, share, access, share_server)
def test_allow_nfs_access_with_incorrect_access_type(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
access = fake_share.fake_access(access_type='fake_type')
self.assertRaises(exception.InvalidShareAccess,
self.connection.allow_access,
None, share, access, share_server)
def test_allow_access_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
access = fake_share.fake_access()
self.assertRaises(exception.InvalidShare,
self.connection.allow_access,
None, share, access, share_server)
def test_deny_cifs_rw_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(action='revoke'),
True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_rw_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(action='revoke'),
True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_ro_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro', 'revoke'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_ro_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro', 'revoke'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_access_with_invliad_share_server_name(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.deny_access,
None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_deny_nfs_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts,
ro_hosts=self.nfs_share.ro_hosts), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_nfs_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts_ipv6,
ro_hosts=self.nfs_share.ro_hosts_ipv6), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_access_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
access = fakes.CIFS_RW_ACCESS
self.assertRaises(exception.InvalidShare,
self.connection.deny_access,
None, share, access, share_server)
def test_deny_cifs_access_with_incorrect_access_type(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
access = fake_share.fake_access(access_type='fake_type')
self.assertRaises(exception.InvalidShareAccess,
self.connection.deny_access,
None, share, access, share_server)
def test_deny_nfs_access_with_incorrect_access_type(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
access = fake_share.fake_access(access_type='fake_type')
self.assertRaises(exception.InvalidShareAccess,
self.connection.deny_access,
None, share, access, share_server)
def test_update_share_stats(self):
hook = utils.RequestSideEffect()
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.pool.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.update_share_stats(fakes.STATS)
expected_calls = [
mock.call(self.mover.req_get_ref()),
mock.call(self.pool.req_get()),
]
xml_req_mock.assert_has_calls(expected_calls)
for pool in fakes.STATS['pools']:
if pool['pool_name'] == fakes.FakeData.pool_name:
self.assertEqual(
enas_utils.mb_to_gb(fakes.FakeData.pool_total_size),
pool['total_capacity_gb'])
free_size = (fakes.FakeData.pool_total_size -
fakes.FakeData.pool_used_size)
self.assertEqual(
enas_utils.mb_to_gb(free_size), pool['free_capacity_gb'])
def test_update_share_stats_without_matched_config_pools(self):
self.connection.pools = set('fake_pool')
hook = utils.RequestSideEffect()
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.pool.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.update_share_stats,
fakes.STATS)
expected_calls = [
mock.call(self.mover.req_get_ref()),
mock.call(self.pool.req_get()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_get_pool(self):
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
pool_name = self.connection.get_pool(share)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
]
xml_req_mock.assert_has_calls(expected_calls)
self.assertEqual(fakes.FakeData.pool_name, pool_name)
def test_get_pool_failed_to_get_filesystem_info(self):
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.get_pool,
share)
expected_calls = [mock.call(self.fs.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
def test_get_pool_failed_to_get_pool_info(self):
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_error())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.get_pool,
share)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_get_pool_failed_to_find_matched_pool_name(self):
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_succeed(name='unmatch_pool_name',
id='unmatch_pool_id'))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.get_pool,
share)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
]
xml_req_mock.assert_has_calls(expected_calls)
@ddt.data({'port_conf': None,
'managed_ports': ['cge-1-0', 'cge-1-3']},
{'port_conf': '*',
'managed_ports': ['cge-1-0', 'cge-1-3']},
{'port_conf': ['cge-1-*'],
'managed_ports': ['cge-1-0', 'cge-1-3']},
{'port_conf': ['cge-1-3'],
'managed_ports': ['cge-1-3']})
@ddt.unpack
def test_get_managed_ports_one_port(self, port_conf, managed_ports):
hook = utils.SSHSideEffect()
hook.append(self.mover.output_get_physical_devices())
ssh_cmd_mock = mock.Mock(side_effect=hook)
expected_calls = [
mock.call(self.mover.cmd_get_physical_devices(), False),
]
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.port_conf = port_conf
ports = self.connection.get_managed_ports()
self.assertIsInstance(ports, list)
self.assertEqual(sorted(managed_ports), sorted(ports))
ssh_cmd_mock.assert_has_calls(expected_calls)
def test_get_managed_ports_no_valid_port(self):
hook = utils.SSHSideEffect()
hook.append(self.mover.output_get_physical_devices())
ssh_cmd_mock = mock.Mock(side_effect=hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.port_conf = ['cge-2-0']
self.assertRaises(exception.BadConfigurationException,
self.connection.get_managed_ports)
def test_get_managed_ports_query_devices_failed(self):
hook = utils.SSHSideEffect()
hook.append(self.mover.fake_output)
ssh_cmd_mock = mock.Mock(side_effect=hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.port_conf = ['cge-2-0']
self.assertRaises(exception.EMCVnxXMLAPIError,
self.connection.get_managed_ports)