Implement IPv6 support for Dell EMC VNX driver

Major changes:
  * Support to create/delete/extend/access NFS and CIFS share/snapshot
    in the IPv6 network which created by Neutron
  * Support to connect VNX management interface using IPv6 address

Change-Id: Ibe7620f9548d5f57780e49c08214dc627b91a945
Implements: blueprint vnx-manila-ipv6-support
This commit is contained in:
Yong Huang 2017-12-13 15:07:19 -05:00
parent c8cceebf8e
commit e691879fcf
13 changed files with 1230 additions and 117 deletions

View File

@ -189,6 +189,7 @@ for the VNX driver:
emc_nas_pool_name = <pool name>
emc_interface_ports = <Comma separated ports list>
share_driver = manila.share.drivers.dell_emc.driver.EMCShareDriver
driver_handles_share_servers = True
- `emc_share_backend` is the plugin name. Set it to `vnx` for the VNX driver.
- `emc_nas_server` is the control station IP address of the VNX system to be
@ -204,10 +205,36 @@ for the VNX driver:
Members of the list can be Unix-style glob expressions (supports Unix shell-style
wildcards). This list is optional. In the absence of this option, any of the ports
on the Data Mover can be used.
- `driver_handles_share_servers` must be True, the driver will choose a port
from port list which configured in emc_interface_ports.
Restart of :term:`manila-share` service is needed for the configuration changes to take
effect.
IPv6 support
------------
IPv6 support for VNX driver is introduced in Queens release. The feature is divided
into two parts:
1. The driver is able to manage share or snapshot in the Neutron IPv6 network.
2. The driver is able to connect VNX management interface using its IPv6 address.
Pre-Configurations for IPv6 support
===================================
The following parameters need to be configured in `/etc/manila/manila.conf`
for the VNX driver:
network_plugin_ipv6_enabled = True
- `network_plugin_ipv6_enabled` indicates IPv6 is enabled.
If you want to connect VNX using IPv6 address, you should configure IPv6 address
by `nas_cs` command for VNX and specify the address in `/etc/manila/manila.conf`:
emc_nas_server = <IPv6 address>
Restrictions
------------

View File

@ -104,7 +104,7 @@ Mapping of share drivers and share access rules support
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| EMC VMAX | NFS (O) | \- | CIFS (O) | \- | \- | NFS (O) | \- | CIFS (O) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| EMC VNX | NFS (J) | \- | CIFS (J) | \- | \- | NFS (L) | \- | CIFS (L) | \- | \- |
| EMC VNX | NFS (J) | NFS (Q) | CIFS (J) | \- | \- | NFS (L) | NFS (Q) | CIFS (L) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
| EMC Unity | NFS (N) | \- | CIFS (N) | \- | \- | NFS (N) | \- | CIFS (N) | \- | \- |
+----------------------------------------+--------------+--------------+----------------+------------+--------------+--------------+--------------+----------------+------------+------------+
@ -224,7 +224,7 @@ More information: :ref:`capabilities_and_extra_specs`
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+
| EMC VMAX | O | \- | \- | \- | \- | O | \- | O | \- | \- | P | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+
| EMC VNX | J | \- | \- | \- | \- | L | \- | J | \- | \- | P | \- |
| EMC VNX | J | \- | \- | \- | \- | L | \- | J | \- | \- | P | Q |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+
| EMC Unity | N | \- | \- | \- | N | \- | \- | N | \- | \- | P | \- |
+----------------------------------------+-----------+------------+--------+-------------+-------------------+--------------------+-----+----------------------------+--------------------+--------------------+--------------+--------------+

View File

@ -35,7 +35,8 @@ LOG = log.getLogger(__name__)
class XMLAPIConnector(object):
def __init__(self, configuration, debug=True):
super(XMLAPIConnector, self).__init__()
self.storage_ip = configuration.emc_nas_server
self.storage_ip = enas_utils.convert_ipv6_format_if_needed(
configuration.emc_nas_server)
self.username = configuration.emc_nas_login
self.password = configuration.emc_nas_password
self.debug = debug

View File

@ -18,6 +18,7 @@ import types
from oslo_config import cfg
from oslo_log import log
from oslo_utils import fnmatch
from oslo_utils import netutils
from oslo_utils import timeutils
import ssl
@ -103,3 +104,78 @@ def create_ssl_context(configuration):
'version of Python, ssl verification is disabled.')
context = None
return context
def parse_ipaddr(text):
"""Parse the output of VNX server_export command, get IPv4/IPv6 addresses.
Example:
input: 192.168.100.102:[fdf8:f53b:82e4::57]:[fdf8:f53b:82e4::54]
output: ['192.168.100.102', '[fdf8:f53b:82e4::57]', '[fdf8:f53b:82e4::54]']
:param text: The output of VNX server_export command.
:return: The list of IPv4/IPv6 addresses. The IPv6 address enclosed by [].
"""
rst = []
stk = []
ipaddr = ''
it = iter(text)
try:
while True:
i = next(it)
if i == ':' and not stk and ipaddr:
rst.append(ipaddr)
ipaddr = ''
elif i == ':' and not ipaddr:
continue
elif i == '[':
stk.append(i)
elif i == ']':
rst.append('[%s]' % ipaddr)
stk.pop()
ipaddr = ''
else:
ipaddr += i
except StopIteration:
if ipaddr:
rst.append(ipaddr)
return rst
def convert_ipv6_format_if_needed(ip_addr):
"""Convert IPv6 address format if needed. The IPv6 address enclosed by [].
For the invalid IPv6 cidr, its format will not be changed.
:param ip_addr: IPv6 address.
:return: Converted IPv6 address.
"""
if netutils.is_valid_ipv6_cidr(ip_addr):
ip_addr = '[%s]' % ip_addr
return ip_addr
def export_unc_path(ip_addr):
"""Convert IPv6 address to valid UNC path.
In Microsoft Windows OS, UNC (Uniform Naming Convention) specifies a
common syntax to describe the location of a network resource.
The colon which used by IPv6 is an illegal character in a UNC path name.
So the IPv6 address need to be converted to valid UNC path.
References:
- https://en.wikipedia.org/wiki/IPv6_address
#Literal_IPv6_addresses_in_UNC_path_names
- https://en.wikipedia.org/wiki/Path_(computing)#Uniform_Naming_Convention
:param ip_addr: IPv6 address.
:return: UNC path.
"""
unc_suffix = '.ipv6-literal.net'
if netutils.is_valid_ipv6(ip_addr):
ip_addr = ip_addr.replace(':', '-') + unc_suffix
return ip_addr

View File

@ -78,6 +78,9 @@ class EMCShareDriver(driver.ShareDriver):
super(EMCShareDriver, self).__init__(
self.plugin.driver_handles_share_servers, *args, **kwargs)
if hasattr(self.plugin, 'ipv6_implemented'):
self.ipv6_implemented = self.plugin.ipv6_implemented
def create_share(self, context, share, share_server=None):
"""Is called to create share."""
location = self.plugin.create_share(context, share, share_server)
@ -159,3 +162,9 @@ class EMCShareDriver(driver.ShareDriver):
def _teardown_server(self, server_details, security_services=None):
"""Teardown share server."""
return self.plugin.teardown_server(server_details, security_services)
def get_configured_ip_versions(self):
if self.ipv6_implemented:
return [4, 6]
else:
return [4]

View File

@ -16,6 +16,7 @@
import copy
import random
import six
from oslo_config import cfg
from oslo_log import log
@ -37,8 +38,9 @@ from manila import utils
2.0.0 - Bumped the version for Mitaka
3.0.0 - Bumped the version for Ocata
4.0.0 - Bumped the version for Pike
5.0.0 - Bumped the version for Queens
"""
VERSION = "4.0.0"
VERSION = "5.0.0"
LOG = log.getLogger(__name__)
@ -79,6 +81,7 @@ class VNXStorageConnection(driver.StorageConnection):
self.reserved_percentage = None
self.driver_handles_share_servers = True
self.port_conf = None
self.ipv6_implemented = True
def create_share(self, context, share, share_server=None):
"""Create a share and export it based on protocol used."""
@ -179,7 +182,7 @@ class VNXStorageConnection(driver.StorageConnection):
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
interface = server['interfaces'][0]
interface = enas_utils.export_unc_path(server['interfaces'][0])
self._get_context('CIFSShare').create(share_name, server['name'],
vdm_name)
@ -199,8 +202,11 @@ class VNXStorageConnection(driver.StorageConnection):
self._get_context('NFSShare').create(share_name, vdm_name)
nfs_if = enas_utils.convert_ipv6_format_if_needed(
share_server['backend_details']['nfs_if'])
return ('%(nfs_if)s:/%(share_name)s'
% {'nfs_if': share_server['backend_details']['nfs_if'],
% {'nfs_if': nfs_if,
'share_name': share_name})
def create_share_from_snapshot(self, context, share, snapshot,
@ -228,10 +234,13 @@ class VNXStorageConnection(driver.StorageConnection):
self._allocate_container_from_snapshot(
share, snapshot, share_server, pool_name)
nfs_if = enas_utils.convert_ipv6_format_if_needed(
share_server['backend_details']['nfs_if'])
if share_proto == 'NFS':
self._create_nfs_share(share_name, share_server)
location = ('%(nfs_if)s:/%(share_name)s'
% {'nfs_if': share_server['backend_details']['nfs_if'],
% {'nfs_if': nfs_if,
'share_name': share_name})
elif share_proto == 'CIFS':
location = self._create_cifs_share(share_name, share_server)
@ -243,9 +252,9 @@ class VNXStorageConnection(driver.StorageConnection):
share_name = snapshot['share_id']
status, filesystem = self._get_context('FileSystem').get(share_name)
if status != constants.STATUS_OK:
message = (_("File System %s not found.") % share_name)
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
message = (_("File System %s not found.") % share_name)
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
pool_id = filesystem['pools_id'][0]
@ -371,9 +380,9 @@ class VNXStorageConnection(driver.StorageConnection):
status, server = self._get_context('CIFSServer').get(server_name,
vdm_name)
if status != constants.STATUS_OK:
message = (_("CIFS server %s not found.") % server_name)
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
message = (_("CIFS server %s not found.") % server_name)
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
self._get_context('CIFSShare').allow_share_access(
vdm_name,
@ -413,7 +422,9 @@ class VNXStorageConnection(driver.StorageConnection):
white_list = []
for rule in access_rules:
self.allow_access(context, share, rule, share_server)
white_list.append(rule['access_to'])
white_list.append(
enas_utils.convert_ipv6_format_if_needed(
rule['access_to']))
self.clear_access(share, share_server, white_list)
def clear_access(self, share, share_server, white_list):
@ -488,9 +499,9 @@ class VNXStorageConnection(driver.StorageConnection):
status, server = self._get_context('CIFSServer').get(server_name,
vdm_name)
if status != constants.STATUS_OK:
message = (_("CIFS server %s not found.") % server_name)
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
message = (_("CIFS server %s not found.") % server_name)
LOG.error(message)
raise exception.EMCVnxXMLAPIError(err=message)
self._get_context('CIFSShare').deny_share_access(
vdm_name,
@ -509,7 +520,7 @@ class VNXStorageConnection(driver.StorageConnection):
reason = _('Only ip access type allowed.')
raise exception.InvalidShareAccess(reason=reason)
host_ip = access['access_to']
host_ip = enas_utils.convert_ipv6_format_if_needed(access['access_to'])
self._get_context('NFSShare').deny_share_access(share['id'], host_ip,
vdm_name)
@ -681,21 +692,29 @@ class VNXStorageConnection(driver.StorageConnection):
'share server...', vdm_name)
self._get_context('VDM').create(vdm_name, self.mover_name)
netmask = utils.cidr_to_netmask(network_info['cidr'])
devices = self.get_managed_ports()
for net_info in network_info['network_allocations']:
random.shuffle(devices)
ip_version = net_info['ip_version']
interface = {
'name': net_info['id'][-12:],
'device_name': devices[0],
'ip': net_info['ip_address'],
'mover_name': self.mover_name,
'net_mask': netmask,
'vlan_id': vlan_id if vlan_id else -1,
}
if ip_version == 6:
interface['ip_version'] = ip_version
interface['net_mask'] = six.text_type(
utils.cidr_to_prefixlen(network_info['cidr']))
else:
interface['net_mask'] = utils.cidr_to_netmask(
network_info['cidr'])
self._get_context('MoverInterface').create(interface)
allocated_interfaces.append(interface)

View File

@ -27,15 +27,15 @@ from manila import exception
from manila.i18n import _
from manila.share.drivers.dell_emc.common.enas import connector
from manila.share.drivers.dell_emc.common.enas import constants
from manila.share.drivers.dell_emc.common.enas import utils as vnx_utils
from manila.share.drivers.dell_emc.common.enas import utils as enas_utils
from manila.share.drivers.dell_emc.common.enas import xml_api_parser as parser
from manila import utils
LOG = log.getLogger(__name__)
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class StorageObjectManager(object):
def __init__(self, configuration):
self.context = dict()
@ -211,8 +211,8 @@ class StorageObject(object):
return self.manager.getStorageContext(type)
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class FileSystem(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(FileSystem, self).__init__(conn, elt_maker, xml_parser, manager)
@ -479,8 +479,8 @@ class FileSystem(StorageObject):
self._execute_cmd(rw_mount_cmd)
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class StoragePool(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(StoragePool, self).__init__(conn, elt_maker, xml_parser, manager)
@ -541,8 +541,8 @@ class StoragePool(StorageObject):
return out['id']
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class MountPoint(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(MountPoint, self).__init__(conn, elt_maker, xml_parser, manager)
@ -682,8 +682,8 @@ class MountPoint(StorageObject):
return False
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class Mover(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(Mover, self).__init__(conn, elt_maker, xml_parser, manager)
@ -847,8 +847,8 @@ class Mover(StorageObject):
return physical_network_devices
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class VDM(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(VDM, self).__init__(conn, elt_maker, xml_parser, manager)
@ -1022,8 +1022,8 @@ class VDM(StorageObject):
return interfaces
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class Snapshot(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(Snapshot, self).__init__(conn, elt_maker, xml_parser, manager)
@ -1137,8 +1137,8 @@ class Snapshot(StorageObject):
return self.snap_map[name]['id']
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class MoverInterface(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(MoverInterface, self).__init__(conn, elt_maker, xml_parser,
@ -1159,18 +1159,21 @@ class MoverInterface(StorageObject):
mover_id = self._get_mover_id(mover_name, False)
params = dict(device=device_name,
ipAddress=six.text_type(ip_addr),
mover=mover_id,
name=name,
netMask=net_mask,
vlanid=six.text_type(vlan_id))
if interface.get('ip_version') == 6:
params['ipVersion'] = 'IPv6'
if self.xml_retry:
self.xml_retry = False
request = self._build_task_package(
self.elt_maker.NewMoverInterface(
device=device_name,
ipAddress=six.text_type(ip_addr),
mover=mover_id,
name=name,
netMask=net_mask,
vlanid=six.text_type(vlan_id)
)
self.elt_maker.NewMoverInterface(**params)
)
response = self._send_request(request)
@ -1261,8 +1264,8 @@ class MoverInterface(StorageObject):
raise exception.EMCVnxXMLAPIError(err=message)
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class DNSDomain(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(DNSDomain, self).__init__(conn, elt_maker, xml_parser, manager)
@ -1323,8 +1326,8 @@ class DNSDomain(StorageObject):
{'name': name, 'err': response['problems']})
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class CIFSServer(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(CIFSServer, self).__init__(conn, elt_maker, xml_parser, manager)
@ -1544,8 +1547,8 @@ class CIFSServer(StorageObject):
self.cifs_server_map[mover_name].pop(computer_name)
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class CIFSShare(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(CIFSShare, self).__init__(conn, elt_maker, xml_parser, manager)
@ -1771,8 +1774,8 @@ class CIFSShare(StorageObject):
return users_to_remove
@vnx_utils.decorate_all_methods(vnx_utils.log_enter_exit,
debug_only=True)
@enas_utils.decorate_all_methods(enas_utils.log_enter_exit,
debug_only=True)
class NFSShare(StorageObject):
def __init__(self, conn, elt_maker, xml_parser, manager):
super(NFSShare, self).__init__(conn, elt_maker, xml_parser, manager)
@ -1872,13 +1875,14 @@ class NFSShare(StorageObject):
for field in fields:
field = field.strip()
if field.startswith('rw='):
nfs_share['RwHosts'] = field[3:].split(":")
nfs_share['RwHosts'] = enas_utils.parse_ipaddr(field[3:])
elif field.startswith('access='):
nfs_share['AccessHosts'] = field[7:].split(":")
nfs_share['AccessHosts'] = enas_utils.parse_ipaddr(
field[7:])
elif field.startswith('root='):
nfs_share['RootHosts'] = field[5:].split(":")
nfs_share['RootHosts'] = enas_utils.parse_ipaddr(field[5:])
elif field.startswith('ro='):
nfs_share['RoHosts'] = field[3:].split(":")
nfs_share['RoHosts'] = enas_utils.parse_ipaddr(field[3:])
self.nfs_share_map[name] = nfs_share
else:
@ -1899,6 +1903,9 @@ class NFSShare(StorageObject):
changed = False
rwhosts = share['RwHosts']
rohosts = share['RoHosts']
host_ip = enas_utils.convert_ipv6_format_if_needed(host_ip)
if access_level == const.ACCESS_LEVEL_RW:
if host_ip not in rwhosts:
rwhosts.append(host_ip)
@ -1942,7 +1949,6 @@ class NFSShare(StorageObject):
do_allow_access(share_name, host_ip, mover_name, access_level)
def deny_share_access(self, share_name, host_ip, mover_name):
@utils.synchronized('emc-shareaccess-' + share_name)
def do_deny_access(share_name, host_ip, mover_name):
status, share = self.get(share_name, mover_name)

View File

@ -18,6 +18,7 @@ from oslo_utils import units
from manila.common import constants as const
from manila.share import configuration as conf
from manila.share.drivers.dell_emc.common.enas import utils
from manila.tests import fake_share
@ -77,15 +78,26 @@ class FakeData(object):
# Share network information
share_network_id = 'c5b3a865-56d0-4d88-abe5-879965e099c9'
cidr = '192.168.1.0/24'
cidr_v6 = 'fdf8:f53b:82e1::/64'
segmentation_id = 100
network_allocations_id1 = '132dbb10-9a36-46f2-8d89-3d909830c356'
network_allocations_id2 = '7eabdeed-bad2-46ea-bd0f-a33884c869e0'
network_allocations_id3 = '98c9e490-a842-4e59-b59a-a6042069d35b'
network_allocations_id4 = '6319a917-ab95-4b65-a498-773ae33c5550'
network_allocations_ip1 = '192.168.1.1'
network_allocations_ip2 = '192.168.1.2'
network_allocations_ip3 = 'fdf8:f53b:82e1::1'
network_allocations_ip4 = 'fdf8:f53b:82e1::2'
network_allocations_ip_version1 = 4
network_allocations_ip_version2 = 4
network_allocations_ip_version3 = 6
network_allocations_ip_version4 = 6
domain_name = 'fake_domain'
domain_user = 'administrator'
domain_password = 'password'
dns_ip_address = '192.168.1.200'
dns_ipv6_address = 'fdf8:f53b:82e1::f'
# Share server information
share_server_id = '56aafd02-4d44-43d7-b784-57fc88167224'
@ -104,8 +116,11 @@ class FakeData(object):
mover_id = 'fake_mover_id'
interface_name1 = network_allocations_id1[-12:]
interface_name2 = network_allocations_id2[-12:]
interface_name3 = network_allocations_id3[-12:]
interface_name4 = network_allocations_id4[-12:]
long_interface_name = network_allocations_id1
net_mask = '255.255.255.0'
net_mask_v6 = 64
device_name = 'cge-1-0'
interconnect_id = '2001'
@ -123,6 +138,9 @@ class FakeData(object):
rw_hosts = ['192.168.1.1', '192.168.1.2']
ro_hosts = ['192.168.1.3', '192.168.1.4']
nfs_host_ip = '192.168.1.5'
rw_hosts_ipv6 = ['fdf8:f53b:82e1::1', 'fdf8:f53b:82e1::2']
ro_hosts_ipv6 = ['fdf8:f53b:82e1::3', 'fdf8:f53b:82e1::4']
nfs_host_ipv6 = 'fdf8:f53b:82e1::5'
fake_output = ''
@ -175,10 +193,15 @@ class StorageObjectTestData(object):
self.interface_name1 = FakeData.interface_name1
self.interface_name2 = FakeData.interface_name2
self.interface_name3 = FakeData.interface_name3
self.interface_name4 = FakeData.interface_name4
self.long_interface_name = FakeData.long_interface_name
self.ip_address1 = FakeData.network_allocations_ip1
self.ip_address2 = FakeData.network_allocations_ip2
self.ip_address3 = FakeData.network_allocations_ip3
self.ip_address4 = FakeData.network_allocations_ip4
self.net_mask = FakeData.net_mask
self.net_mask_v6 = FakeData.net_mask_v6
self.vlan_id = FakeData.segmentation_id
self.cifs_server_name = FakeData.vdm_name
@ -196,6 +219,10 @@ class StorageObjectTestData(object):
self.ro_hosts = FakeData.ro_hosts
self.nfs_host_ip = FakeData.nfs_host_ip
self.rw_hosts_ipv6 = FakeData.rw_hosts_ipv6
self.ro_hosts_ipv6 = FakeData.ro_hosts_ipv6
self.nfs_host_ipv6 = FakeData.nfs_host_ipv6
self.fake_output = FakeData.fake_output
@response
@ -710,10 +737,16 @@ class VDMTestData(StorageObjectTestData):
return '<VdmQueryParams/>'
@response
def resp_get_succeed(self, name=None):
if not name:
def resp_get_succeed(self, name=None, interface1=None, interface2=None):
if name is None:
name = self.vdm_name
if interface1 is None:
interface1 = self.interface_name1
if interface2 is None:
interface2 = self.interface_name2
return (
'<QueryStatus maxSeverity="ok"/>'
'<Vdm name="%(vdm_name)s" state="loaded" mover="%(mover_id)s" '
@ -724,8 +757,8 @@ class VDMTestData(StorageObjectTestData):
{'vdm_name': name,
'vdm_id': self.vdm_id,
'mover_id': self.mover_id,
'interface1': self.interface_name1,
'interface2': self.interface_name2}
'interface1': interface1,
'interface2': interface2}
)
@response
@ -738,11 +771,14 @@ class VDMTestData(StorageObjectTestData):
def req_delete(self):
return '<DeleteVdm vdm="%(vdmid)s"/>' % {'vdmid': self.vdm_id}
def cmd_attach_nfs_interface(self):
def cmd_attach_nfs_interface(self, interface=None):
if interface is None:
interface = self.interface_name2
return [
'env', 'NAS_DB=/nas', '/nas/bin/nas_server',
'-vdm', self.vdm_name,
'-attach', self.interface_name2,
'-attach', interface,
]
def cmd_detach_nfs_interface(self):
@ -967,6 +1003,23 @@ class MoverTestData(StorageObjectTestData):
'net_mask': self.net_mask}
)
@start_task
def req_create_interface_with_ipv6(self,
if_name=FakeData.interface_name3,
ip=FakeData.network_allocations_ip3):
return (
'<NewMoverInterface name="%(if_name)s" vlanid="%(vlan)s" '
'ipVersion="IPv6" netMask="%(net_mask)s" '
'device="%(device_name)s" '
'mover="%(mover_id)s" ipAddress="%(ip)s"/>'
% {'if_name': if_name,
'vlan': self.vlan_id,
'ip': ip,
'mover_id': self.mover_id,
'device_name': self.device_name,
'net_mask': self.net_mask_v6}
)
@response
def resp_create_interface_but_name_already_exist(self):
return (
@ -1087,13 +1140,16 @@ class DNSDomainTestData(StorageObjectTestData):
super(DNSDomainTestData, self).__init__()
@start_task
def req_create(self):
def req_create(self, ip_addr=None):
if ip_addr is None:
ip_addr = self.dns_ip_address
return (
'<NewMoverDnsDomain mover="%(mover_id)s" protocol="udp" '
'name="%(domain_name)s" servers="%(server_ips)s"/>' %
{'mover_id': self.mover_id,
'domain_name': self.domain_name,
'server_ips': self.dns_ip_address}
'server_ips': ip_addr}
)
@start_task
@ -1111,7 +1167,10 @@ class CIFSServerTestData(StorageObjectTestData):
super(CIFSServerTestData, self).__init__()
@start_task
def req_create(self, mover_id, is_vdm=True):
def req_create(self, mover_id, is_vdm=True, ip_addr=None):
if ip_addr is None:
ip_addr = self.ip_address1
return (
'<NewW2KCifsServer interfaces="%(ip)s" compName="%(comp_name)s" '
'name="%(name)s" domain="%(domain)s">'
@ -1120,7 +1179,7 @@ class CIFSServerTestData(StorageObjectTestData):
'<JoinDomain userName="%(domain_user)s" '
'password="%(domain_password)s"/>'
'</NewW2KCifsServer>'
% {'ip': self.ip_address1,
% {'ip': ip_addr,
'comp_name': self.cifs_server_name,
'name': self.cifs_server_name[-14:],
'mover_id': mover_id,
@ -1143,9 +1202,14 @@ class CIFSServerTestData(StorageObjectTestData):
@response
def resp_get_succeed(self, mover_id, is_vdm, join_domain,
cifs_server_name=None):
cifs_server_name=None,
ip_addr=None):
if cifs_server_name is None:
cifs_server_name = self.cifs_server_name
if ip_addr is None:
ip_addr = self.ip_address1
return (
'<QueryStatus maxSeverity="ok"/>'
'<CifsServer interfaces="%(ip)s" type="W2K" '
@ -1156,7 +1220,7 @@ class CIFSServerTestData(StorageObjectTestData):
'domainJoined="%(join_domain)s"/></CifsServer>'
% {'mover_id': mover_id,
'cifsserver': self.cifs_server_name[-14:],
'ip': self.ip_address1,
'ip': ip_addr,
'is_vdm': 'true' if is_vdm else 'false',
'alias': self.cifs_server_name[-12:],
'domain': self.domain_name,
@ -1405,6 +1469,11 @@ class NFSShareTestData(StorageObjectTestData):
]
def output_get_succeed(self, rw_hosts, ro_hosts):
rw_hosts = [utils.convert_ipv6_format_if_needed(ip_addr) for ip_addr in
rw_hosts]
ro_hosts = [utils.convert_ipv6_format_if_needed(ip_addr) for ip_addr in
ro_hosts]
if rw_hosts and ro_hosts:
return (
'%(mover_name)s :\nexport "%(path)s" '
@ -1466,6 +1535,11 @@ class NFSShareTestData(StorageObjectTestData):
% self.vdm_name)
def cmd_set_access(self, rw_hosts, ro_hosts):
rw_hosts = [utils.convert_ipv6_format_if_needed(ip_addr) for ip_addr in
rw_hosts]
ro_hosts = [utils.convert_ipv6_format_if_needed(ip_addr) for ip_addr in
ro_hosts]
access_str = ("access=-0.0.0.0/0.0.0.0:%(access_hosts)s,"
"root=%(root_hosts)s,rw=%(rw_hosts)s,ro=%(ro_hosts)s" %
{'rw_hosts': ":".join(rw_hosts),
@ -1531,11 +1605,21 @@ NFS_RW_ACCESS = fake_share.fake_access(
access_to=FakeData.nfs_host_ip,
access_level='rw')
NFS_RW_ACCESS_IPV6 = fake_share.fake_access(
access_type='ip',
access_to=FakeData.nfs_host_ipv6,
access_level='rw')
NFS_RO_ACCESS = fake_share.fake_access(
access_type='ip',
access_to=FakeData.nfs_host_ip,
access_level='ro')
NFS_RO_ACCESS_IPV6 = fake_share.fake_access(
access_type='ip',
access_to=FakeData.nfs_host_ipv6,
access_level='ro')
SHARE_SERVER = {
'id': FakeData.share_server_id,
'share_network': {
@ -1550,12 +1634,32 @@ SHARE_SERVER = {
}
}
SHARE_SERVER_IPV6 = {
'id': FakeData.share_server_id,
'share_network': {
'name': 'fake_share_network',
'id': FakeData.share_network_id
},
'share_network_id': FakeData.share_network_id,
'backend_details': {
'share_server_name': FakeData.vdm_name,
'cifs_if': FakeData.network_allocations_ip3,
'nfs_if': FakeData.network_allocations_ip4,
}
}
SERVER_DETAIL = {
'share_server_name': FakeData.vdm_name,
'cifs_if': FakeData.network_allocations_ip1,
'nfs_if': FakeData.network_allocations_ip2,
}
SERVER_DETAIL_IPV6 = {
'share_server_name': FakeData.vdm_name,
'cifs_if': FakeData.network_allocations_ip3,
'nfs_if': FakeData.network_allocations_ip4,
}
SECURITY_SERVICE = [
{
'type': 'active_directory',
@ -1566,6 +1670,16 @@ SECURITY_SERVICE = [
},
]
SECURITY_SERVICE_IPV6 = [
{
'type': 'active_directory',
'domain': FakeData.domain_name,
'dns_ip': FakeData.dns_ipv6_address,
'user': FakeData.domain_user,
'password': FakeData.domain_password
},
]
NETWORK_INFO = {
'server_id': FakeData.share_server_id,
'cidr': FakeData.cidr,
@ -1580,9 +1694,33 @@ NETWORK_INFO = {
'network_type': 'vlan',
'network_allocations': [
{'id': FakeData.network_allocations_id1,
'ip_address': FakeData.network_allocations_ip1},
'ip_address': FakeData.network_allocations_ip1,
'ip_version': FakeData.network_allocations_ip_version1},
{'id': FakeData.network_allocations_id2,
'ip_address': FakeData.network_allocations_ip2}
'ip_address': FakeData.network_allocations_ip2,
'ip_version': FakeData.network_allocations_ip_version2}
]
}
NETWORK_INFO_IPV6 = {
'server_id': FakeData.share_server_id,
'cidr': FakeData.cidr_v6,
'security_services': [
{'type': 'active_directory',
'domain': FakeData.domain_name,
'dns_ip': FakeData.dns_ipv6_address,
'user': FakeData.domain_user,
'password': FakeData.domain_password},
],
'segmentation_id': FakeData.segmentation_id,
'network_type': 'vlan',
'network_allocations': [
{'id': FakeData.network_allocations_id3,
'ip_address': FakeData.network_allocations_ip3,
'ip_version': FakeData.network_allocations_ip_version3},
{'id': FakeData.network_allocations_id4,
'ip_address': FakeData.network_allocations_ip4,
'ip_version': FakeData.network_allocations_ip_version4}
]
}

View File

@ -71,3 +71,71 @@ class SslContextTestCase(test.TestCase):
mock.Mock(side_effect=AttributeError))
context = utils.create_ssl_context(configuration)
self.assertIsNone(context)
@ddt.ddt
class ParseIpaddrTestCase(test.TestCase):
@ddt.data({'lst_ipaddr': ['192.168.100.101',
'192.168.100.102',
'192.168.100.103']},
{'lst_ipaddr': ['[fdf8:f53b:82e4::57]',
'[fdf8:f53b:82e4::54]',
'[fdf8:f53b:82e4::55]']},
{'lst_ipaddr': ['[fdf8:f53b:82e4::57]',
'[fdf8:f53b:82e4::54]',
'192.168.100.103',
'[fdf8:f53b:82e4::55]']},
{'lst_ipaddr': ['192.168.100.101',
'[fdf8:f53b:82e4::57]',
'[fdf8:f53b:82e4::54]',
'192.168.100.101',
'[fdf8:f53b:82e4::55]',
'192.168.100.102']},)
@ddt.unpack
def test_parse_ipv4_addr(self, lst_ipaddr):
self.assertEqual(lst_ipaddr, utils.parse_ipaddr(':'.join(lst_ipaddr)))
@ddt.ddt
class ConvertIPv6FormatTestCase(test.TestCase):
@ddt.data({'ip_addr': 'fdf8:f53b:82e4::55'},
{'ip_addr': 'fdf8:f53b:82e4::55/64'},
{'ip_addr': 'fdf8:f53b:82e4::55/128'})
@ddt.unpack
def test_ipv6_addr(self, ip_addr):
expected_ip_addr = '[%s]' % ip_addr
self.assertEqual(expected_ip_addr,
utils.convert_ipv6_format_if_needed(ip_addr))
@ddt.data({'ip_addr': '192.168.1.100'},
{'ip_addr': '192.168.1.100/24'},
{'ip_addr': '192.168.1.100/32'},
{'ip_addr': '[fdf8:f53b:82e4::55]'})
@ddt.unpack
def test_invalid_ipv6_addr(self, ip_addr):
self.assertEqual(ip_addr, utils.convert_ipv6_format_if_needed(ip_addr))
@ddt.ddt
class ExportUncPathTestCase(test.TestCase):
@ddt.data({'ip_addr': 'fdf8:f53b:82e4::55'},
{'ip_addr': 'fdf8:f53b:82e4::'},
{'ip_addr': '2018::'})
@ddt.unpack
def test_ipv6_addr(self, ip_addr):
expected_ip_addr = '%s.ipv6-literal.net' % ip_addr.replace(':', '-')
self.assertEqual(expected_ip_addr,
utils.export_unc_path(ip_addr))
@ddt.data({'ip_addr': '192.168.1.100'},
{'ip_addr': '192.168.1.100/24'},
{'ip_addr': '192.168.1.100/32'},
{'ip_addr': 'fdf8:f53b:82e4::55/64'},
{'ip_addr': 'fdf8:f53b:82e4::55/128'},
{'ip_addr': '[fdf8:f53b:82e4::55]'})
@ddt.unpack
def test_invalid_ipv6_addr(self, ip_addr):
self.assertEqual(ip_addr, utils.export_unc_path(ip_addr))

View File

@ -173,7 +173,51 @@ class StorageConnectionTestCase(test.TestCase):
ssh_calls = [mock.call(self.cifs_share.cmd_disable_access(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, r'\\192.168.1.1\%s' % share['name'],
self.assertEqual(location, r'\\%s\%s' %
(fakes.FakeData.network_allocations_ip1,
share['name']),
'CIFS export path is incorrect')
def test_create_cifs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_create_on_vdm()),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.cifs_share.cmd_disable_access(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, r'\\%s.ipv6-literal.net\%s' %
(fakes.FakeData.network_allocations_ip3.replace(':',
'-'),
share['name']),
'CIFS export path is incorrect')
def test_create_nfs_share(self):
@ -207,6 +251,41 @@ class StorageConnectionTestCase(test.TestCase):
self.assertEqual(location, '192.168.1.2:/%s' % share['name'],
'NFS export path is incorrect')
def test_create_nfs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.pool.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share(None, share, share_server)
expected_calls = [
mock.call(self.pool.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.fs.req_create_on_vdm()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [mock.call(self.nfs_share.cmd_create(), True)]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, '[%s]:/%s' %
(fakes.FakeData.network_allocations_ip4,
share['name']),
'NFS export path is incorrect')
def test_create_cifs_share_without_share_server(self):
share = fakes.CIFS_SHARE
@ -336,6 +415,70 @@ class StorageConnectionTestCase(test.TestCase):
self.assertEqual(location, r'\\192.168.1.1\%s' % share['name'],
'CIFS export path is incorrect')
def test_create_cifs_share_from_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
hook.append(self.cifs_share.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_share.req_create(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.cifs_share.cmd_disable_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, r'\\%s.ipv6-literal.net\%s' %
(fakes.FakeData.network_allocations_ip3.replace(':',
'-'),
share['name']),
'CIFS export path is incorrect')
def test_create_nfs_share_from_snapshot(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -385,6 +528,57 @@ class StorageConnectionTestCase(test.TestCase):
self.assertEqual(location, '192.168.1.2:/%s' % share['name'],
'NFS export path is incorrect')
def test_create_nfs_share_from_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
snapshot = fake_share.fake_snapshot(
name=fakes.FakeData.src_snap_name,
share_name=fakes.FakeData.src_share_name,
share_id=fakes.FakeData.src_share_name,
id=fakes.FakeData.src_snap_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.mover.output_get_interconnect_id())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.fs.output_copy_ckpt)
ssh_hook.append(self.fs.output_info())
ssh_hook.append()
ssh_hook.append()
ssh_hook.append()
ssh_hook.append(self.nfs_share.output_create())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
location = self.connection.create_share_from_snapshot(
None, share, snapshot, share_server)
expected_calls = [mock.call(self.fs.req_get())]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.mover.cmd_get_interconnect_id(), False),
mock.call(self.fs.cmd_create_from_ckpt(), False),
mock.call(self.mount.cmd_server_mount('ro'), False),
mock.call(self.fs.cmd_copy_ckpt(), True),
mock.call(self.fs.cmd_nas_fs_info(), False),
mock.call(self.mount.cmd_server_umount(), False),
mock.call(self.fs.cmd_delete(), False),
mock.call(self.mount.cmd_server_mount('rw'), False),
mock.call(self.nfs_share.cmd_create(), True)
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
self.assertEqual(location, '[%s]:/%s' %
(fakes.FakeData.network_allocations_ip4,
share['name']),
'NFS export path is incorrect')
def test_create_share_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')
@ -440,6 +634,34 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_cifs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.cifs_share.resp_get_succeed(self.vdm.vdm_id))
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_share.resp_task_succeed())
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.cifs_share.req_get()),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_share.req_delete(self.vdm.vdm_id)),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_nfs_share(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -476,6 +698,44 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_delete_nfs_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.mount.resp_task_succeed())
hook.append(self.fs.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=self.nfs_share.rw_hosts,
ro_hosts=self.nfs_share.ro_hosts))
ssh_hook.append(self.nfs_share.output_delete_succeed())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.delete_share(None, share, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mount.req_delete(self.vdm.vdm_id)),
mock.call(self.fs.req_get()),
mock.call(self.fs.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), False),
mock.call(self.nfs_share.cmd_delete(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_delete_share_without_share_server(self):
share = fakes.CIFS_SHARE
@ -538,6 +798,27 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
new_size = fakes.FakeData.new_size
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.pool.resp_get_succeed())
hook.append(self.fs.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.extend_share(share, new_size, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.pool.req_get()),
mock.call(self.fs.req_extend()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_extend_share_without_pool_name(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(host='HostA@BackendB',
@ -569,6 +850,27 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.fs.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.create_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.fs.req_get()),
mock.call(self.snap.req_create()),
]
xml_req_mock.assert_has_calls(expected_calls)
def test_create_snapshot_with_incorrect_share_info(self):
share_server = fakes.SHARE_SERVER
snapshot = fake_share.fake_snapshot(
@ -609,6 +911,27 @@ class StorageConnectionTestCase(test.TestCase):
]
xml_req_mock.assert_has_calls(expected_calls)
def test_delete_snapshot_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
snapshot = fake_share.fake_snapshot(
id=fakes.FakeData.snapshot_name,
share_id=fakes.FakeData.filesystem_name,
share_name=fakes.FakeData.share_name)
hook = utils.RequestSideEffect()
hook.append(self.snap.resp_get_succeed())
hook.append(self.snap.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
self.connection.delete_snapshot(None, snapshot, share_server)
expected_calls = [
mock.call(self.snap.req_get()),
mock.call(self.snap.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server(self):
hook = utils.RequestSideEffect()
@ -630,8 +953,8 @@ class StorageConnectionTestCase(test.TestCase):
self.connection.setup_server(fakes.NETWORK_INFO, None)
if_name_1 = fakes.FakeData.network_allocations_id1[-12:]
if_name_2 = fakes.FakeData.network_allocations_id2[-12:]
if_name_1 = fakes.FakeData.interface_name1
if_name_2 = fakes.FakeData.interface_name2
expected_calls = [
mock.call(self.vdm.req_get()),
@ -654,6 +977,59 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_ipv6(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_but_not_found())
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.vdm.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.dns.resp_task_succeed())
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.setup_server(fakes.NETWORK_INFO_IPV6, None)
if_name_1 = fakes.FakeData.interface_name3
if_name_2 = fakes.FakeData.interface_name4
expect_ip_1 = fakes.FakeData.network_allocations_ip3
expect_ip_2 = fakes.FakeData.network_allocations_ip4
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.mover.req_get_ref()),
mock.call(self.vdm.req_create()),
mock.call(self.mover.req_create_interface_with_ipv6(
if_name=if_name_1,
ip=expect_ip_1)),
mock.call(self.mover.req_create_interface_with_ipv6(
if_name=if_name_2,
ip=expect_ip_2)),
mock.call(self.dns.req_create(
ip_addr=fakes.FakeData.dns_ipv6_address)),
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_create(
self.vdm.vdm_id,
ip_addr=fakes.FakeData.network_allocations_ip3)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_attach_nfs_interface(
interface=fakes.FakeData.interface_name4), False),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
@utils.patch_get_managed_ports_vnx(return_value=['cge-1-0'])
def test_setup_server_with_existing_vdm(self):
hook = utils.RequestSideEffect()
@ -834,6 +1210,51 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_with_ipv6(self):
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True))
hook.append(self.cifs_server.resp_task_succeed())
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False))
hook.append(self.mover.resp_get_ref_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.mover.resp_task_succeed())
hook.append(self.vdm.resp_task_succeed())
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.vdm.output_get_interfaces_vdm())
ssh_hook.append()
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.teardown_server(fakes.SERVER_DETAIL_IPV6,
fakes.SECURITY_SERVICE_IPV6)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
mock.call(self.cifs_server.req_modify(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=False)),
mock.call(self.cifs_server.req_delete(self.vdm.vdm_id)),
mock.call(self.mover.req_get_ref()),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip3)),
mock.call(self.mover.req_delete_interface(
fakes.FakeData.network_allocations_ip4)),
mock.call(self.vdm.req_delete()),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.vdm.cmd_get_interfaces(), False),
mock.call(self.vdm.cmd_detach_nfs_interface(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_teardown_server_without_server_detail(self):
self.connection.teardown_server(None, fakes.SECURITY_SERVICE)
@ -1006,6 +1427,40 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_add_cifs_rw_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [access], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_deny_nfs(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -1037,6 +1492,37 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_deny_nfs_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [], [], [access],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts_ipv6,
ro_hosts=self.nfs_share.ro_hosts_ipv6), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_nfs_rule(self):
share_server = fakes.SHARE_SERVER
share = fakes.NFS_SHARE
@ -1069,6 +1555,38 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_nfs_rule_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
hosts = ['fdf8:f53b:82e1::5']
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=hosts,
ro_hosts=[]))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=hosts,
ro_hosts=[]), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_cifs_rule(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1106,6 +1624,46 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_update_access_recover_cifs_rule_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_hook.append(fakes.FakeData.cifs_access)
ssh_hook.append('Command succeeded')
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.update_access(None, share, [access], [], [],
share_server=share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
mock.call(self.cifs_share.cmd_get_access(), True),
mock.call(self.cifs_share.cmd_change_access(
action='revoke', user='guest'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_cifs_clear_access_server_not_found(self):
server = fakes.SHARE_SERVER
@ -1157,6 +1715,39 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_rw_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_ro_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1187,6 +1778,39 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_ro_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_ro_access_without_share_server_name(self):
share = fakes.CIFS_SHARE
share_server = copy.deepcopy(fakes.SHARE_SERVER)
@ -1277,6 +1901,37 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_nfs_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.allow_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=rw_hosts,
ro_hosts=self.nfs_share.ro_hosts_ipv6),
True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_allow_cifs_access_with_incorrect_access_type(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1335,6 +1990,40 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_rw_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RW_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access(action='revoke'),
True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_ro_access(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1365,6 +2054,39 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_ro_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.CIFS_SHARE
access = fakes.CIFS_RO_ACCESS
hook = utils.RequestSideEffect()
hook.append(self.vdm.resp_get_succeed(
interface1=fakes.FakeData.interface_name3,
interface2=fakes.FakeData.interface_name4))
hook.append(self.cifs_server.resp_get_succeed(
mover_id=self.vdm.vdm_id, is_vdm=True, join_domain=True,
ip_addr=fakes.FakeData.network_allocations_ip3))
xml_req_mock = utils.EMCMock(side_effect=hook)
self.connection.manager.connectors['XML'].request = xml_req_mock
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.cifs_share.output_allow_access())
ssh_cmd_mock = mock.Mock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
expected_calls = [
mock.call(self.vdm.req_get()),
mock.call(self.cifs_server.req_get(self.vdm.vdm_id)),
]
xml_req_mock.assert_has_calls(expected_calls)
ssh_calls = [
mock.call(self.cifs_share.cmd_change_access('ro', 'revoke'), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_cifs_access_with_invliad_share_server_name(self):
share_server = fakes.SHARE_SERVER
share = fakes.CIFS_SHARE
@ -1416,6 +2138,36 @@ class StorageConnectionTestCase(test.TestCase):
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_nfs_access_with_ipv6(self):
share_server = fakes.SHARE_SERVER_IPV6
share = fakes.NFS_SHARE
access = fakes.NFS_RW_ACCESS_IPV6
rw_hosts = copy.deepcopy(fakes.FakeData.rw_hosts_ipv6)
rw_hosts.append(access['access_to'])
ssh_hook = utils.SSHSideEffect()
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=rw_hosts,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_hook.append(self.nfs_share.output_set_access_success())
ssh_hook.append(self.nfs_share.output_get_succeed(
rw_hosts=fakes.FakeData.rw_hosts_ipv6,
ro_hosts=fakes.FakeData.ro_hosts_ipv6))
ssh_cmd_mock = utils.EMCNFSShareMock(side_effect=ssh_hook)
self.connection.manager.connectors['SSH'].run_ssh = ssh_cmd_mock
self.connection.deny_access(None, share, access, share_server)
ssh_calls = [
mock.call(self.nfs_share.cmd_get(), True),
mock.call(self.nfs_share.cmd_set_access(
rw_hosts=self.nfs_share.rw_hosts_ipv6,
ro_hosts=self.nfs_share.ro_hosts_ipv6), True),
mock.call(self.nfs_share.cmd_get(), True),
]
ssh_cmd_mock.assert_has_calls(ssh_calls)
def test_deny_access_with_incorrect_proto(self):
share_server = fakes.SHARE_SERVER
share = fake_share.fake_share(share_proto='FAKE_PROTO')

View File

@ -345,54 +345,58 @@ class SSHPoolTestCase(test.TestCase):
paramiko.SSHClient.assert_called_once_with()
@ddt.ddt
class CidrToNetmaskTestCase(test.TestCase):
"""Unit test for cidr to netmask."""
def test_cidr_to_netmask_01(self):
cidr = '10.0.0.0/0'
expected_netmask = '0.0.0.0'
@ddt.data(
('10.0.0.0/0', '0.0.0.0'),
('10.0.0.0/24', '255.255.255.0'),
('10.0.0.0/5', '248.0.0.0'),
('10.0.0.0/32', '255.255.255.255'),
('10.0.0.1', '255.255.255.255'),
)
@ddt.unpack
def test_cidr_to_netmask(self, cidr, expected_netmask):
result = utils.cidr_to_netmask(cidr)
self.assertEqual(expected_netmask, result)
def test_cidr_to_netmask_02(self):
cidr = '10.0.0.0/24'
expected_netmask = '255.255.255.0'
result = utils.cidr_to_netmask(cidr)
self.assertEqual(expected_netmask, result)
def test_cidr_to_netmask_03(self):
cidr = '10.0.0.0/5'
expected_netmask = '248.0.0.0'
result = utils.cidr_to_netmask(cidr)
self.assertEqual(expected_netmask, result)
def test_cidr_to_netmask_04(self):
cidr = '10.0.0.0/32'
expected_netmask = '255.255.255.255'
result = utils.cidr_to_netmask(cidr)
self.assertEqual(expected_netmask, result)
def test_cidr_to_netmask_05(self):
cidr = '10.0.0.1'
expected_netmask = '255.255.255.255'
result = utils.cidr_to_netmask(cidr)
self.assertEqual(expected_netmask, result)
def test_cidr_to_netmask_invalid_01(self):
cidr = '10.0.0.0/33'
@ddt.data(
'10.0.0.0/33',
'',
'10.0.0.555/33'
)
def test_cidr_to_netmask_invalid(self, cidr):
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
def test_cidr_to_netmask_invalid_02(self):
cidr = ''
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
def test_cidr_to_netmask_invalid_03(self):
cidr = '10.0.0.0/33'
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
@ddt.ddt
class CidrToPrefixLenTestCase(test.TestCase):
"""Unit test for cidr to prefix length."""
def test_cidr_to_netmask_invalid_04(self):
cidr = '10.0.0.555/33'
self.assertRaises(exception.InvalidInput, utils.cidr_to_netmask, cidr)
@ddt.data(
('10.0.0.0/0', 0),
('10.0.0.0/24', 24),
('10.0.0.1', 32),
('fdf8:f53b:82e1::1/0', 0),
('fdf8:f53b:82e1::1/64', 64),
('fdf8:f53b:82e1::1', 128),
)
@ddt.unpack
def test_cidr_to_prefixlen(self, cidr, expected_prefixlen):
result = utils.cidr_to_prefixlen(cidr)
self.assertEqual(expected_prefixlen, result)
@ddt.data(
'10.0.0.0/33',
'',
'10.0.0.555/33',
'fdf8:f53b:82e1::1/129',
'fdf8:f53b:82e1::fffff'
)
def test_cidr_to_prefixlen_invalid(self, cidr):
self.assertRaises(exception.InvalidInput,
utils.cidr_to_prefixlen, cidr)
@ddt.ddt

View File

@ -376,15 +376,25 @@ def walk_class_hierarchy(clazz, encountered=None):
yield subclass
def cidr_to_netmask(cidr):
"""Convert cidr to netmask."""
def cidr_to_network(cidr):
"""Convert cidr to network."""
try:
network = netaddr.IPNetwork(cidr)
return str(network.netmask)
return network
except netaddr.AddrFormatError:
raise exception.InvalidInput(_("Invalid cidr supplied %s") % cidr)
def cidr_to_netmask(cidr):
"""Convert cidr to netmask."""
return six.text_type(cidr_to_network(cidr).netmask)
def cidr_to_prefixlen(cidr):
"""Convert cidr to prefix length."""
return cidr_to_network(cidr).prefixlen
def is_valid_ip_address(ip_address, ip_version):
ip_version = ([int(ip_version)] if not isinstance(ip_version, list)
else ip_version)

View File

@ -0,0 +1,3 @@
---
features:
- IPv6 support for Dell EMC VNX Manila driver.