Merge "Remove windows support"

This commit is contained in:
Zuul
2026-02-16 21:09:28 +00:00
committed by Gerrit Code Review
24 changed files with 2 additions and 2162 deletions
-1
View File
@@ -31,7 +31,6 @@ PLATFORM_S390 = 'S390'
PLATFORM_PPC64 = 'PPC64'
OS_TYPE_ALL = 'ALL'
OS_TYPE_LINUX = 'LINUX'
OS_TYPE_WINDOWS = 'WIN'
S390X = "s390x"
S390 = "s390"
+2 -40
View File
@@ -34,16 +34,6 @@ from os_brick import utils
LOG = logging.getLogger(__name__)
# List of connectors to call when getting
# the connector properties for a host
windows_connector_list = [
'os_brick.initiator.windows.base.BaseWindowsConnector',
'os_brick.initiator.windows.iscsi.WindowsISCSIConnector',
'os_brick.initiator.windows.fibre_channel.WindowsFCConnector',
'os_brick.initiator.windows.rbd.WindowsRBDConnector',
'os_brick.initiator.windows.smbfs.WindowsSMBFSConnector'
]
unix_connector_list = [
'os_brick.initiator.connectors.base.BaseLinuxConnector',
'os_brick.initiator.connectors.iscsi.ISCSIConnector',
@@ -66,10 +56,7 @@ unix_connector_list = [
def _get_connector_list():
if sys.platform != 'win32':
return unix_connector_list
else:
return windows_connector_list
return unix_connector_list
# Mappings used to determine who to construct in the factory
@@ -154,34 +141,12 @@ _connector_mapping_linux_ppc64 = {
'os_brick.initiator.connectors.iscsi.ISCSIConnector',
}
# Mapping for the windows connectors
_connector_mapping_windows = {
initiator.ISCSI:
'os_brick.initiator.windows.iscsi.WindowsISCSIConnector',
initiator.FIBRE_CHANNEL:
'os_brick.initiator.windows.fibre_channel.WindowsFCConnector',
initiator.RBD:
'os_brick.initiator.windows.rbd.WindowsRBDConnector',
initiator.SMBFS:
'os_brick.initiator.windows.smbfs.WindowsSMBFSConnector',
}
# Create aliases to the old names until 2.0.0
# TODO(smcginnis) Remove this lookup once unit test code is updated to
# point to the correct location
def _set_aliases():
conn_list = _get_connector_list()
# TODO(lpetrut): Cinder is explicitly trying to use those two
# connectors. We should drop this once we fix Cinder and
# get passed the backwards compatibility period.
if sys.platform == 'win32':
conn_list += [
'os_brick.initiator.connectors.iscsi.ISCSIConnector',
('os_brick.initiator.connectors.fibre_channel.'
'FibreChannelConnector'),
]
for item in conn_list:
_name = item.split('.')[-1]
globals()[_name] = importutils.import_class(item)
@@ -253,13 +218,10 @@ def get_connector_mapping(arch=None):
arch = platform.machine()
# Set the correct mapping for imports
if sys.platform == 'win32':
return _connector_mapping_windows
elif arch in (initiator.S390, initiator.S390X):
if arch in (initiator.S390, initiator.S390X):
return _connector_mapping_linux_s390x
elif arch in (initiator.PPC64, initiator.PPC64LE):
return _connector_mapping_linux_ppc64
else:
return _connector_mapping_linux
-4
View File
@@ -15,7 +15,6 @@
import contextlib
import os
from typing import Generator
from oslo_concurrency import lockutils
@@ -23,9 +22,6 @@ from oslo_concurrency import processutils as putils
def check_manual_scan() -> bool:
if os.name == 'nt':
return False
try:
putils.execute('grep', '-F', 'node.session.scan', '/sbin/iscsiadm')
except putils.ProcessExecutionError:
-122
View File
@@ -1,122 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from os_win import utilsfactory
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
from os_brick import exception
from os_brick.i18n import _
from os_brick import initiator
from os_brick.initiator import initiator_connector
from os_brick import utils
LOG = logging.getLogger(__name__)
class BaseWindowsConnector(initiator_connector.InitiatorConnector):
platform = initiator.PLATFORM_ALL
os_type = initiator.OS_TYPE_WINDOWS
DEFAULT_DEVICE_SCAN_INTERVAL = 2
def __init__(self, root_helper=None, *args, **kwargs):
warnings.warn('Support for Windows OS has been deprecated.',
category=DeprecationWarning, stacklevel=2)
kwargs['executor'] = kwargs.get('executor') or putils.execute
super(BaseWindowsConnector, self).__init__(root_helper,
*args, **kwargs)
self.device_scan_interval = kwargs.pop(
'device_scan_interval', self.DEFAULT_DEVICE_SCAN_INTERVAL)
self._diskutils = utilsfactory.get_diskutils()
@staticmethod
def check_multipath_support(enforce_multipath):
hostutils = utilsfactory.get_hostutils()
mpio_enabled = hostutils.check_server_feature(
hostutils.FEATURE_MPIO)
if not mpio_enabled:
err_msg = _("Using multipath connections for iSCSI and FC disks "
"requires the Multipath IO Windows feature to be "
"enabled. MPIO must be configured to claim such "
"devices.")
LOG.error(err_msg)
if enforce_multipath:
raise exception.BrickException(err_msg)
return False
return True
@staticmethod
def get_connector_properties(*args, **kwargs):
multipath = kwargs['multipath']
enforce_multipath = kwargs['enforce_multipath']
props = {}
props['multipath'] = (
multipath and
BaseWindowsConnector.check_multipath_support(enforce_multipath))
return props
def _get_scsi_wwn(self, device_number):
# NOTE(lpetrut): The Linux connectors use scsi_id to retrieve the
# disk unique id, which prepends the identifier type to the unique id
# retrieved from the page 83 SCSI inquiry data. We'll do the same
# to remain consistent.
disk_uid, uid_type = self._diskutils.get_disk_uid_and_uid_type(
device_number)
scsi_wwn = '%s%s' % (uid_type, disk_uid)
return scsi_wwn
def check_valid_device(self, path, *args, **kwargs):
try:
with open(path, 'r') as dev:
dev.read(1)
except IOError:
LOG.exception(
"Failed to access the device on the path "
"%(path)s", {"path": path})
return False
return True
def get_all_available_volumes(self):
# TODO(lpetrut): query for disks based on the protocol used.
return []
def _check_device_paths(self, device_paths):
if len(device_paths) > 1:
err_msg = _("Multiple volume paths were found: %s. This can "
"occur if multipath is used and MPIO is not "
"properly configured, thus not claiming the device "
"paths. This issue must be addressed urgently as "
"it can lead to data corruption.")
raise exception.BrickException(err_msg % device_paths)
@utils.trace
def extend_volume(self, connection_properties):
volume_paths = self.get_volume_paths(connection_properties)
if not volume_paths:
err_msg = _("Could not find the disk. Extend failed.")
raise exception.NotFound(err_msg)
device_path = volume_paths[0]
device_number = self._diskutils.get_device_number_from_device_name(
device_path)
self._diskutils.refresh_disk(device_number)
def get_search_path(self):
return None
-214
View File
@@ -1,214 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import time
from os_win import exceptions as os_win_exc
from os_win import utilsfactory
from oslo_log import log as logging
from os_brick import exception
from os_brick.i18n import _
from os_brick.initiator.windows import base as win_conn_base
from os_brick import utils
LOG = logging.getLogger(__name__)
class WindowsFCConnector(win_conn_base.BaseWindowsConnector):
def __init__(self, *args, **kwargs):
super(WindowsFCConnector, self).__init__(*args, **kwargs)
self.use_multipath = kwargs.get('use_multipath', False)
self._fc_utils = utilsfactory.get_fc_utils()
@staticmethod
def get_connector_properties(*args, **kwargs):
props = {}
fc_utils = utilsfactory.get_fc_utils()
fc_utils.refresh_hba_configuration()
fc_hba_ports = fc_utils.get_fc_hba_ports()
if fc_hba_ports:
wwnns = []
wwpns = []
for port in fc_hba_ports:
wwnns.append(port['node_name'])
wwpns.append(port['port_name'])
props['wwpns'] = wwpns
props['wwnns'] = list(set(wwnns))
return props
@utils.trace
def connect_volume(self, connection_properties):
volume_paths = self.get_volume_paths(connection_properties)
if not volume_paths:
raise exception.NoFibreChannelVolumeDeviceFound()
device_path = volume_paths[0]
device_number = self._diskutils.get_device_number_from_device_name(
device_path)
scsi_wwn = self._get_scsi_wwn(device_number)
device_info = {'type': 'block',
'path': device_path,
'number': device_number,
'scsi_wwn': scsi_wwn}
return device_info
@utils.trace
def get_volume_paths(self, connection_properties):
# Returns a list containing at most one disk path such as
# \\.\PhysicalDrive4.
#
# If multipath is used and the MPIO service is properly configured
# to claim the disks, we'll still get a single device path, having
# the same format, which will be used for all the IO operations.
for attempt_num in range(self.device_scan_attempts):
disk_paths = set()
if attempt_num:
time.sleep(self.device_scan_interval)
self._diskutils.rescan_disks()
volume_mappings = self._get_fc_volume_mappings(
connection_properties)
LOG.debug("Retrieved volume mappings %(vol_mappings)s "
"for volume %(conn_props)s",
dict(vol_mappings=volume_mappings,
conn_props=connection_properties))
for mapping in volume_mappings:
device_name = mapping['device_name']
if device_name:
disk_paths.add(device_name)
if not disk_paths and volume_mappings:
fcp_lun = volume_mappings[0]['fcp_lun']
try:
disk_paths = self._get_disk_paths_by_scsi_id(
connection_properties, fcp_lun)
disk_paths = set(disk_paths or [])
except os_win_exc.OSWinException as ex:
LOG.debug("Failed to retrieve disk paths by SCSI ID. "
"Exception: %s", ex)
if not disk_paths:
LOG.debug("No disk path retrieved yet.")
continue
if len(disk_paths) > 1:
LOG.debug("Multiple disk paths retrieved: %s This may happen "
"if MPIO did not claim them yet.", disk_paths)
continue
dev_num = self._diskutils.get_device_number_from_device_name(
list(disk_paths)[0])
if self.use_multipath and not self._diskutils.is_mpio_disk(
dev_num):
LOG.debug("Multipath was requested but the disk %s was not "
"claimed yet by the MPIO service.", dev_num)
continue
return list(disk_paths)
return []
def _get_fc_volume_mappings(self, connection_properties):
# Note(lpetrut): All the WWNs returned by os-win are upper case.
target_wwpns = [wwpn.upper()
for wwpn in connection_properties['target_wwn']]
target_lun = connection_properties['target_lun']
volume_mappings = []
hba_mappings = self._get_fc_hba_mappings()
for node_name in hba_mappings:
target_mappings = self._fc_utils.get_fc_target_mappings(node_name)
for mapping in target_mappings:
if (mapping['port_name'] in target_wwpns and
mapping['lun'] == target_lun):
volume_mappings.append(mapping)
return volume_mappings
def _get_fc_hba_mappings(self):
mappings = collections.defaultdict(list)
fc_hba_ports = self._fc_utils.get_fc_hba_ports()
for port in fc_hba_ports:
mappings[port['node_name']].append(port['port_name'])
return mappings
def _get_disk_paths_by_scsi_id(self, connection_properties, fcp_lun):
for local_port_wwn, remote_port_wwns in connection_properties[
'initiator_target_map'].items():
for remote_port_wwn in remote_port_wwns:
try:
dev_nums = self._get_dev_nums_by_scsi_id(
local_port_wwn, remote_port_wwn, fcp_lun)
# This may raise a DiskNotFound exception if the disks
# are meanwhile claimed by the MPIO service.
disk_paths = [
self._diskutils.get_device_name_by_device_number(
dev_num)
for dev_num in dev_nums]
return disk_paths
except os_win_exc.FCException as ex:
LOG.debug("Failed to retrieve volume paths by SCSI id. "
"Exception: %s", ex)
continue
return []
def _get_dev_nums_by_scsi_id(self, local_port_wwn, remote_port_wwn,
fcp_lun):
LOG.debug("Fetching SCSI Unique ID for FCP lun %(fcp_lun)s. "
"Port WWN: %(local_port_wwn)s. "
"Remote port WWN: %(remote_port_wwn)s.",
dict(fcp_lun=fcp_lun,
local_port_wwn=local_port_wwn,
remote_port_wwn=remote_port_wwn))
local_hba_wwn = self._get_fc_hba_wwn_for_port(local_port_wwn)
# This will return the SCSI identifiers in the order of precedence
# used by Windows.
identifiers = self._fc_utils.get_scsi_device_identifiers(
local_hba_wwn, local_port_wwn,
remote_port_wwn, fcp_lun)
if identifiers:
identifier = identifiers[0]
dev_nums = self._diskutils.get_disk_numbers_by_unique_id(
unique_id=identifier['id'],
unique_id_format=identifier['type'])
return dev_nums
return []
def _get_fc_hba_wwn_for_port(self, port_wwn):
fc_hba_ports = self._fc_utils.get_fc_hba_ports()
for port in fc_hba_ports:
if port_wwn.upper() == port['port_name']:
return port['node_name']
err_msg = _("Could not find any FC HBA port "
"having WWN '%s'.") % port_wwn
raise exception.NotFound(err_msg)
@utils.trace
def disconnect_volume(self, connection_properties, device_info=None,
force=False, ignore_errors=False):
pass
-175
View File
@@ -1,175 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_win import exceptions as os_win_exc
from os_win import utilsfactory
from oslo_log import log as logging
from os_brick import exception
from os_brick.i18n import _
from os_brick.initiator.connectors import base_iscsi
from os_brick.initiator.windows import base as win_conn_base
from os_brick import utils
LOG = logging.getLogger(__name__)
class WindowsISCSIConnector(win_conn_base.BaseWindowsConnector,
base_iscsi.BaseISCSIConnector):
def __init__(self, *args, **kwargs):
super(WindowsISCSIConnector, self).__init__(*args, **kwargs)
self.use_multipath = kwargs.pop('use_multipath', False)
self.initiator_list = kwargs.pop('initiator_list', [])
self._iscsi_utils = utilsfactory.get_iscsi_initiator_utils()
self.validate_initiators()
def validate_initiators(self):
"""Validates the list of requested initiator HBAs
Validates the list of requested initiator HBAs to be used
when establishing iSCSI sessions.
"""
valid_initiator_list = True
if not self.initiator_list:
LOG.info("No iSCSI initiator was explicitly requested. "
"The Microsoft iSCSI initiator will choose the "
"initiator when establishing sessions.")
else:
available_initiators = self._iscsi_utils.get_iscsi_initiators()
for initiator in self.initiator_list:
if initiator not in available_initiators:
LOG.warning("The requested initiator %(req_initiator)s "
"is not in the list of available initiators: "
"%(avail_initiators)s.",
dict(req_initiator=initiator,
avail_initiators=available_initiators))
valid_initiator_list = False
return valid_initiator_list
def get_initiator(self):
"""Returns the iSCSI initiator node name."""
return self._iscsi_utils.get_iscsi_initiator()
@staticmethod
def get_connector_properties(*args, **kwargs):
iscsi_utils = utilsfactory.get_iscsi_initiator_utils()
initiator = iscsi_utils.get_iscsi_initiator()
return dict(initiator=initiator)
def _get_all_paths(self, connection_properties):
initiator_list = self.initiator_list or [None]
all_targets = self._get_all_targets(connection_properties)
paths = [(initiator_name, target_portal, target_iqn, target_lun)
for target_portal, target_iqn, target_lun in all_targets
for initiator_name in initiator_list]
return paths
@utils.trace
def connect_volume(self, connection_properties):
connected_target_mappings = set()
volume_connected = False
for (initiator_name,
target_portal,
target_iqn,
target_lun) in self._get_all_paths(connection_properties):
try:
LOG.info("Attempting to establish an iSCSI session to "
"target %(target_iqn)s on portal %(target_portal)s "
"accessing LUN %(target_lun)s using initiator "
"%(initiator_name)s.",
dict(target_portal=target_portal,
target_iqn=target_iqn,
target_lun=target_lun,
initiator_name=initiator_name))
self._iscsi_utils.login_storage_target(
target_lun=target_lun,
target_iqn=target_iqn,
target_portal=target_portal,
auth_username=connection_properties.get('auth_username'),
auth_password=connection_properties.get('auth_password'),
mpio_enabled=self.use_multipath,
initiator_name=initiator_name,
ensure_lun_available=False)
connected_target_mappings.add((target_iqn, target_lun))
if not self.use_multipath:
break
except os_win_exc.OSWinException:
LOG.exception("Could not establish the iSCSI session.")
for target_iqn, target_lun in connected_target_mappings:
try:
(device_number,
device_path) = self._iscsi_utils.get_device_number_and_path(
target_iqn, target_lun,
retry_attempts=self.device_scan_attempts,
retry_interval=self.device_scan_interval,
rescan_disks=True,
ensure_mpio_claimed=self.use_multipath)
volume_connected = True
except os_win_exc.OSWinException:
LOG.exception("Could not retrieve device path for target "
"%(target_iqn)s and lun %(target_lun)s.",
dict(target_iqn=target_iqn,
target_lun=target_lun))
if not volume_connected:
raise exception.BrickException(
_("Could not connect volume %s.") % connection_properties)
scsi_wwn = self._get_scsi_wwn(device_number)
device_info = {'type': 'block',
'path': device_path,
'number': device_number,
'scsi_wwn': scsi_wwn}
return device_info
@utils.trace
def disconnect_volume(self, connection_properties, device_info=None,
force=False, ignore_errors=False):
# We want to refresh the cached information first.
self._diskutils.rescan_disks()
for (target_portal,
target_iqn,
target_lun) in self._get_all_targets(connection_properties):
luns = self._iscsi_utils.get_target_luns(target_iqn)
# We disconnect the target only if it does not expose other
# luns which may be in use.
if not luns or luns == [target_lun]:
self._iscsi_utils.logout_storage_target(target_iqn)
@utils.trace
def get_volume_paths(self, connection_properties):
device_paths = set()
for (target_portal,
target_iqn,
target_lun) in self._get_all_targets(connection_properties):
(device_number,
device_path) = self._iscsi_utils.get_device_number_and_path(
target_iqn, target_lun,
ensure_mpio_claimed=self.use_multipath)
if device_path:
device_paths.add(device_path)
self._check_device_paths(device_paths)
return list(device_paths)
-169
View File
@@ -1,169 +0,0 @@
# Copyright 2020 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import ctypes
import errno
import json
from oslo_concurrency import processutils
from oslo_log import log as logging
from oslo_service import loopingcall
from os_brick import exception
from os_brick.i18n import _
from os_brick.initiator.connectors import base_rbd
from os_brick.initiator.windows import base as win_conn_base
from os_brick import utils
LOG = logging.getLogger(__name__)
class WindowsRBDConnector(base_rbd.RBDConnectorMixin,
win_conn_base.BaseWindowsConnector):
"""Connector class to attach/detach RBD volumes.
The Windows RBD connector is very similar to the Linux one.
There are a few main differences though:
* the Ceph python bindings are not available on Windows yet, so we'll
always do a local mount. Besides, Hyper-V cannot use librbd, so
we'll need to do a local mount anyway.
* The device names aren't handled in the same way. On Windows,
disk names such as "\\\\.\\PhysicalDrive1" are provided by the OS and
cannot be explicitly requsted.
"""
def __init__(self, *args, **kwargs):
super(WindowsRBDConnector, self).__init__(*args, **kwargs)
self._ensure_rbd_available()
def _check_rbd(self):
cmd = ['where.exe', 'rbd']
try:
self._execute(*cmd)
return True
except processutils.ProcessExecutionError:
LOG.warning("rbd.exe is not available.")
return False
def _ensure_rbd_available(self):
if not self._check_rbd():
msg = _("rbd.exe is not available.")
LOG.error(msg)
raise exception.BrickException(msg)
def get_volume_paths(self, connection_properties):
return [self.get_device_name(connection_properties)]
def _show_rbd_mapping(self, connection_properties):
# TODO(lpetrut): consider using "rbd device show" if/when
# it becomes available.
cmd = ['rbd-wnbd', 'show', connection_properties['name'],
'--format', 'json']
try:
out, err = self._execute(*cmd)
return json.loads(out)
except processutils.ProcessExecutionError as ex:
if abs(ctypes.c_int32(ex.exit_code).value) == errno.ENOENT:
LOG.debug("Couldn't find RBD mapping: %s",
connection_properties['name'])
return
raise
except json.decoder.JSONDecodeError:
msg = _("Could not get rbd mappping.")
LOG.exception(msg)
raise exception.BrickException(msg)
def get_device_name(self, connection_properties, expect=True):
mapping = self._show_rbd_mapping(connection_properties)
if mapping:
dev_num = mapping['disk_number']
LOG.debug(
"Located RBD mapping: %(image)s. "
"Disk number: %(disk_number)s.",
dict(image=connection_properties['name'],
disk_number=dev_num))
return self._diskutils.get_device_name_by_device_number(dev_num)
elif expect:
msg = _("The specified RBD image is not mounted: %s")
raise exception.VolumeDeviceNotFound(
msg % connection_properties['name'])
def _wait_for_volume(self, connection_properties):
"""Wait for the specified volume to become accessible."""
attempt = 0
dev_path = None
def _check_rbd_device():
rbd_dev_path = self.get_device_name(
connection_properties, expect=False)
if rbd_dev_path:
try:
# Under high load, it can take a second before the disk
# becomes accessible.
with open(rbd_dev_path, 'rb'):
pass
nonlocal dev_path
dev_path = rbd_dev_path
raise loopingcall.LoopingCallDone()
except FileNotFoundError:
LOG.debug("The RBD image %(image)s mapped to local device "
"%(dev)s isn't available yet.",
{'image': connection_properties['name'],
'dev': rbd_dev_path})
nonlocal attempt
attempt += 1
if attempt >= self.device_scan_attempts:
msg = _("The mounted RBD image isn't available: %s")
raise exception.VolumeDeviceNotFound(
msg % connection_properties['name'])
timer = loopingcall.FixedIntervalLoopingCall(_check_rbd_device)
timer.start(interval=self.device_scan_interval).wait()
return dev_path
@utils.trace
def connect_volume(self, connection_properties):
rbd_dev_path = self.get_device_name(connection_properties,
expect=False)
if not rbd_dev_path:
cmd = ['rbd', 'device', 'map', connection_properties['name']]
cmd += self._get_rbd_args(connection_properties)
self._execute(*cmd)
rbd_dev_path = self._wait_for_volume(connection_properties)
else:
LOG.debug('The RBD image %(image)s is already mapped to local '
'device %(dev)s',
{'image': connection_properties['name'],
'dev': rbd_dev_path})
dev_num = self._diskutils.get_device_number_from_device_name(
rbd_dev_path)
# TODO(lpetrut): remove this once wnbd honors the SAN policy setting.
self._diskutils.set_disk_offline(dev_num)
return {'path': rbd_dev_path,
'type': 'block'}
@utils.trace
def disconnect_volume(self, connection_properties, device_info=None,
force=False, ignore_errors=False):
cmd = ['rbd', 'device', 'unmap', connection_properties['name']]
cmd += self._get_rbd_args(connection_properties)
if force:
cmd += ["-o", "hard-disconnect"]
self._execute(*cmd)
-124
View File
@@ -1,124 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from os_win import utilsfactory
from os_brick.initiator.windows import base as win_conn_base
from os_brick.remotefs import windows_remotefs as remotefs
from os_brick import utils
# The Windows SMBFS connector expects to receive VHD/x images stored on SMB
# shares, exposed by the Cinder SMBFS driver.
class WindowsSMBFSConnector(win_conn_base.BaseWindowsConnector):
def __init__(self, *args, **kwargs):
super(WindowsSMBFSConnector, self).__init__(*args, **kwargs)
# If this flag is set, we use the local paths in case of local
# shares. This is in fact mandatory in some cases, for example
# for the Hyper-C scenario.
self._local_path_for_loopback = kwargs.get('local_path_for_loopback',
True)
self._expect_raw_disk = kwargs.get('expect_raw_disk', False)
self._remotefsclient = remotefs.WindowsRemoteFsClient(
mount_type='smbfs',
*args, **kwargs)
self._smbutils = utilsfactory.get_smbutils()
self._vhdutils = utilsfactory.get_vhdutils()
self._diskutils = utilsfactory.get_diskutils()
@staticmethod
def get_connector_properties(*args, **kwargs):
# No connector properties updates in this case.
return {}
@utils.trace
def connect_volume(self, connection_properties):
self.ensure_share_mounted(connection_properties)
# This will be a virtual disk image path.
disk_path = self._get_disk_path(connection_properties)
if self._expect_raw_disk:
# The caller expects a direct accessible raw disk. We'll
# mount the image and bring the new disk offline, which will
# allow direct IO, while ensuring that any partiton residing
# on it will be unmounted.
read_only = connection_properties.get('access_mode') == 'ro'
self._vhdutils.attach_virtual_disk(disk_path, read_only=read_only)
raw_disk_path = self._vhdutils.get_virtual_disk_physical_path(
disk_path)
dev_num = self._diskutils.get_device_number_from_device_name(
raw_disk_path)
self._diskutils.set_disk_offline(dev_num)
else:
raw_disk_path = None
device_info = {'type': 'file',
'path': raw_disk_path if self._expect_raw_disk
else disk_path}
return device_info
@utils.trace
def disconnect_volume(self, connection_properties, device_info=None,
force=False, ignore_errors=False):
export_path = self._get_export_path(connection_properties)
disk_path = self._get_disk_path(connection_properties)
# The detach method will silently continue if the disk is
# not attached.
self._vhdutils.detach_virtual_disk(disk_path)
self._remotefsclient.unmount(export_path)
def _get_export_path(self, connection_properties):
return connection_properties['export'].replace('/', '\\')
def _get_disk_path(self, connection_properties):
# This is expected to be the share address, as an UNC path.
export_path = self._get_export_path(connection_properties)
mount_base = self._remotefsclient.get_mount_base()
use_local_path = (self._local_path_for_loopback and
self._smbutils.is_local_share(export_path))
disk_dir = export_path
if mount_base:
# This will be a symlink pointing to either the share
# path directly or to the local share path, if requested
# and available.
disk_dir = self._remotefsclient.get_mount_point(
export_path)
elif use_local_path:
disk_dir = self._remotefsclient.get_local_share_path(export_path)
disk_name = connection_properties['name']
disk_path = os.path.join(disk_dir, disk_name)
return disk_path
def get_search_path(self):
return self._remotefsclient.get_mount_base()
@utils.trace
def get_volume_paths(self, connection_properties):
return [self._get_disk_path(connection_properties)]
def ensure_share_mounted(self, connection_properties):
export_path = self._get_export_path(connection_properties)
mount_options = connection_properties.get('options')
self._remotefsclient.mount(export_path, mount_options)
def extend_volume(self, connection_properties):
raise NotImplementedError
-147
View File
@@ -1,147 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Windows remote filesystem client utilities."""
import os
import re
import warnings
from os_win import utilsfactory
from oslo_log import log as logging
from os_brick import exception
from os_brick.i18n import _
from os_brick.remotefs import remotefs
LOG = logging.getLogger(__name__)
class WindowsRemoteFsClient(remotefs.RemoteFsClient):
_username_regex = re.compile(r'user(?:name)?=([^, ]+)')
_password_regex = re.compile(r'pass(?:word)?=([^, ]+)')
_loopback_share_map = {}
def __init__(self, mount_type, root_helper=None,
execute=None, *args, **kwargs):
warnings.warn('Support for Windows OS has been deprecated.',
category=DeprecationWarning, stacklevel=2)
mount_type_to_option_prefix = {
'cifs': 'smbfs',
'smbfs': 'smbfs',
}
self._local_path_for_loopback = kwargs.get('local_path_for_loopback',
True)
if mount_type not in mount_type_to_option_prefix:
raise exception.ProtocolNotSupported(protocol=mount_type)
self._mount_type = mount_type
option_prefix = mount_type_to_option_prefix[mount_type]
self._mount_base = kwargs.get(option_prefix + '_mount_point_base')
self._mount_options = kwargs.get(option_prefix + '_mount_options')
self._smbutils = utilsfactory.get_smbutils()
self._pathutils = utilsfactory.get_pathutils()
def get_local_share_path(self, share, expect_existing=True):
share = self._get_share_norm_path(share)
share_name = self.get_share_name(share)
share_subdir = self.get_share_subdir(share)
is_local_share = self._smbutils.is_local_share(share)
if not is_local_share:
LOG.debug("Share '%s' is not exposed by the current host.", share)
local_share_path = None
else:
local_share_path = self._smbutils.get_smb_share_path(share_name)
if not local_share_path and expect_existing:
err_msg = _("Could not find the local "
"share path for %(share)s.")
raise exception.VolumePathsNotFound(err_msg % dict(share=share))
if local_share_path and share_subdir:
local_share_path = os.path.join(local_share_path, share_subdir)
return local_share_path
def _get_share_norm_path(self, share):
return share.replace('/', '\\')
def get_share_name(self, share):
return self._get_share_norm_path(share).lstrip('\\').split('\\')[1]
def get_share_subdir(self, share):
return "\\".join(
self._get_share_norm_path(share).lstrip('\\').split('\\')[2:])
def mount(self, share, flags=None):
share_norm_path = self._get_share_norm_path(share)
use_local_path = (self._local_path_for_loopback and
self._smbutils.is_local_share(share_norm_path))
if use_local_path:
LOG.info("Skipping mounting local share %(share_path)s.",
dict(share_path=share_norm_path))
else:
mount_options = " ".join(
[self._mount_options or '', flags or ''])
username, password = self._parse_credentials(mount_options)
if not self._smbutils.check_smb_mapping(
share_norm_path):
self._smbutils.mount_smb_share(share_norm_path,
username=username,
password=password)
if self._mount_base:
self._create_mount_point(share, use_local_path)
def unmount(self, share):
self._smbutils.unmount_smb_share(self._get_share_norm_path(share))
def _create_mount_point(self, share, use_local_path):
# The mount point will contain a hash of the share so we're
# intentionally preserving the original share path as this is
# what the caller will expect.
mnt_point = self.get_mount_point(share)
share_norm_path = self._get_share_norm_path(share)
symlink_dest = (share_norm_path if not use_local_path
else self.get_local_share_path(share))
if not os.path.isdir(self._mount_base):
os.makedirs(self._mount_base)
if os.path.exists(mnt_point):
if not self._pathutils.is_symlink(mnt_point):
raise exception.BrickException(_("Link path already exists "
"and it's not a symlink"))
else:
self._pathutils.create_sym_link(mnt_point, symlink_dest)
def _parse_credentials(self, opts_str):
if not opts_str:
return None, None
match = self._username_regex.findall(opts_str)
username = match[0] if match and match[0] != 'guest' else None
match = self._password_regex.findall(opts_str)
password = match[0] if match else None
return username, password
@@ -204,13 +204,6 @@ class ConnectorTestCase(test_base.TestCase):
self.assertEqual(expected_props, props)
@mock.patch('sys.platform', 'win32')
def test_get_connector_mapping_win32(self):
mapping_win32 = connector.get_connector_mapping()
self.assertIn('ISCSI', mapping_win32)
self.assertIn('RBD', mapping_win32)
self.assertNotIn('STORPOOL', mapping_win32)
@mock.patch('os_brick.initiator.connector.platform.machine')
def test_get_connector_mapping(self, mock_platform_machine):
mock_platform_machine.return_value = 'x86_64'
-6
View File
@@ -21,18 +21,12 @@ from os_brick.tests import base
class InitiatorUtilsTestCase(base.TestCase):
@mock.patch('os.name', 'nt')
def test_check_manual_scan_windows(self):
self.assertFalse(utils.check_manual_scan())
@mock.patch('os.name', 'posix')
@mock.patch('oslo_concurrency.processutils.execute')
def test_check_manual_scan_supported(self, mock_exec):
self.assertTrue(utils.check_manual_scan())
mock_exec.assert_called_once_with('grep', '-F', 'node.session.scan',
'/sbin/iscsiadm')
@mock.patch('os.name', 'posix')
@mock.patch('oslo_concurrency.processutils.execute',
side_effect=utils.putils.ProcessExecutionError)
def test_check_manual_scan_not_supported(self, mock_exec):
@@ -1,161 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from os_brick import exception
from os_brick.remotefs import windows_remotefs
from os_brick.tests import base
@ddt.ddt
class WindowsRemotefsClientTestCase(base.TestCase):
_FAKE_SHARE_NAME = 'fake_share'
_FAKE_SHARE_SERVER = 'fake_share_server'
_FAKE_SHARE = '\\\\%s\\%s' % (_FAKE_SHARE_SERVER,
_FAKE_SHARE_NAME)
@mock.patch.object(windows_remotefs, 'utilsfactory')
def setUp(self, mock_utilsfactory):
super(WindowsRemotefsClientTestCase, self).setUp()
self._remotefs = windows_remotefs.WindowsRemoteFsClient(
mount_type='smbfs')
self._remotefs._mount_base = mock.sentinel.mount_base
self._smbutils = self._remotefs._smbutils
self._pathutils = self._remotefs._pathutils
@ddt.data({'is_local_share': False},
{'expect_existing': False})
@ddt.unpack
def test_get_local_share_path_missing(self, expect_existing=True,
is_local_share=True):
self._smbutils.get_smb_share_path.return_value = None
self._smbutils.is_local_share.return_value = is_local_share
if expect_existing:
self.assertRaises(
exception.VolumePathsNotFound,
self._remotefs.get_local_share_path,
self._FAKE_SHARE,
expect_existing=expect_existing)
else:
share_path = self._remotefs.get_local_share_path(
self._FAKE_SHARE,
expect_existing=expect_existing)
self.assertIsNone(share_path)
self.assertEqual(is_local_share,
self._smbutils.get_smb_share_path.called)
self._smbutils.is_local_share.assert_called_once_with(self._FAKE_SHARE)
@ddt.data({'share': '//addr/share_name/subdir_a/subdir_b',
'exp_path': r'C:\shared_dir\subdir_a\subdir_b'},
{'share': '//addr/share_name',
'exp_path': r'C:\shared_dir'})
@ddt.unpack
@mock.patch('os.path.join', lambda *args: '\\'.join(args))
def test_get_local_share_path(self, share, exp_path):
fake_local_path = 'C:\\shared_dir'
self._smbutils.get_smb_share_path.return_value = fake_local_path
share_path = self._remotefs.get_local_share_path(share)
self.assertEqual(exp_path, share_path)
self._smbutils.get_smb_share_path.assert_called_once_with(
'share_name')
def test_get_share_name(self):
resulted_name = self._remotefs.get_share_name(self._FAKE_SHARE)
self.assertEqual(self._FAKE_SHARE_NAME, resulted_name)
@ddt.data(True, False)
@mock.patch.object(windows_remotefs.WindowsRemoteFsClient,
'_create_mount_point')
def test_mount(self, is_local_share,
mock_create_mount_point):
flags = '-o pass=password'
self._remotefs._mount_options = '-o user=username,randomopt'
self._remotefs._local_path_for_loopback = True
self._smbutils.check_smb_mapping.return_value = False
self._smbutils.is_local_share.return_value = is_local_share
self._remotefs.mount(self._FAKE_SHARE, flags)
if is_local_share:
self.assertFalse(self._smbutils.check_smb_mapping.called)
self.assertFalse(self._smbutils.mount_smb_share.called)
else:
self._smbutils.check_smb_mapping.assert_called_once_with(
self._FAKE_SHARE)
self._smbutils.mount_smb_share.assert_called_once_with(
self._FAKE_SHARE,
username='username',
password='password')
mock_create_mount_point.assert_called_once_with(self._FAKE_SHARE,
is_local_share)
def test_unmount(self):
self._remotefs.unmount(self._FAKE_SHARE)
self._smbutils.unmount_smb_share.assert_called_once_with(
self._FAKE_SHARE)
@ddt.data({'use_local_path': True},
{'path_exists': True, 'is_symlink': True},
{'path_exists': True})
@mock.patch.object(windows_remotefs.WindowsRemoteFsClient,
'get_local_share_path')
@mock.patch.object(windows_remotefs.WindowsRemoteFsClient,
'get_mount_point')
@mock.patch.object(windows_remotefs, 'os')
@ddt.unpack
def test_create_mount_point(self, mock_os, mock_get_mount_point,
mock_get_local_share_path,
path_exists=False, is_symlink=False,
use_local_path=False):
mock_os.path.exists.return_value = path_exists
mock_os.isdir.return_value = False
self._pathutils.is_symlink.return_value = is_symlink
if path_exists and not is_symlink:
self.assertRaises(exception.BrickException,
self._remotefs._create_mount_point,
self._FAKE_SHARE,
use_local_path)
else:
self._remotefs._create_mount_point(self._FAKE_SHARE,
use_local_path)
mock_get_mount_point.assert_called_once_with(self._FAKE_SHARE)
mock_os.path.isdir.assert_called_once_with(mock.sentinel.mount_base)
if use_local_path:
mock_get_local_share_path.assert_called_once_with(
self._FAKE_SHARE)
expected_symlink_target = mock_get_local_share_path.return_value
else:
expected_symlink_target = self._FAKE_SHARE.replace('/', '\\')
if path_exists:
self._pathutils.is_symlink.assert_called_once_with(
mock_get_mount_point.return_value)
else:
self._pathutils.create_sym_link.assert_called_once_with(
mock_get_mount_point.return_value,
expected_symlink_target)
View File
-34
View File
@@ -1,34 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from os_brick.initiator.windows import base as win_conn_base
class FakeWindowsConnector(win_conn_base.BaseWindowsConnector):
def connect_volume(self, connection_properties):
return {}
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
pass
def get_volume_paths(self, connection_properties):
return []
def get_search_path(self):
return None
def get_all_available_volumes(self, connection_properties=None):
return []
-35
View File
@@ -1,35 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
from os_win import utilsfactory
from os_brick.tests import base
class WindowsConnectorTestBase(base.TestCase):
@mock.patch('sys.platform', 'win32')
def setUp(self):
super(WindowsConnectorTestBase, self).setUp()
# All the Windows connectors use os_win.utilsfactory to fetch Windows
# specific utils. During init, those will run methods that will fail
# on other platforms. To make testing easier and avoid checking the
# platform in the code, we can simply mock this factory method.
utilsfactory_patcher = mock.patch.object(
utilsfactory, '_get_class')
utilsfactory_patcher.start()
self.addCleanup(utilsfactory_patcher.stop)
@@ -1,135 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from os_brick import exception
from os_brick.initiator.windows import base as base_win_conn
from os_brick.tests.windows import fake_win_conn
from os_brick.tests.windows import test_base
@ddt.ddt
class BaseWindowsConnectorTestCase(test_base.WindowsConnectorTestBase):
def setUp(self):
super(BaseWindowsConnectorTestCase, self).setUp()
self._diskutils = mock.Mock()
self._connector = fake_win_conn.FakeWindowsConnector()
self._connector._diskutils = self._diskutils
@ddt.data({},
{'feature_available': True},
{'feature_available': False, 'enforce_multipath': True})
@ddt.unpack
@mock.patch.object(base_win_conn.utilsfactory, 'get_hostutils')
def test_check_multipath_support(self, mock_get_hostutils,
feature_available=True,
enforce_multipath=False):
mock_hostutils = mock_get_hostutils.return_value
mock_hostutils.check_server_feature.return_value = feature_available
check_mpio = base_win_conn.BaseWindowsConnector.check_multipath_support
if feature_available or not enforce_multipath:
multipath_support = check_mpio(
enforce_multipath=enforce_multipath)
self.assertEqual(feature_available, multipath_support)
else:
self.assertRaises(exception.BrickException,
check_mpio,
enforce_multipath=enforce_multipath)
mock_hostutils.check_server_feature.assert_called_once_with(
mock_hostutils.FEATURE_MPIO)
@ddt.data({}, {'mpio_requested': False}, {'mpio_available': True})
@mock.patch.object(base_win_conn.BaseWindowsConnector,
'check_multipath_support')
@ddt.unpack
def test_get_connector_properties(self, mock_check_mpio,
mpio_requested=True,
mpio_available=True):
mock_check_mpio.return_value = mpio_available
enforce_multipath = False
props = base_win_conn.BaseWindowsConnector.get_connector_properties(
multipath=mpio_requested,
enforce_multipath=enforce_multipath)
self.assertEqual(mpio_requested and mpio_available,
props['multipath'])
if mpio_requested:
mock_check_mpio.assert_called_once_with(enforce_multipath)
def test_get_scsi_wwn(self):
mock_get_uid_and_type = self._diskutils.get_disk_uid_and_uid_type
mock_get_uid_and_type.return_value = (mock.sentinel.disk_uid,
mock.sentinel.uid_type)
scsi_wwn = self._connector._get_scsi_wwn(mock.sentinel.dev_num)
expected_wwn = '%s%s' % (mock.sentinel.uid_type,
mock.sentinel.disk_uid)
self.assertEqual(expected_wwn, scsi_wwn)
mock_get_uid_and_type.assert_called_once_with(mock.sentinel.dev_num)
@ddt.data(None, IOError)
@mock.patch('os_brick.initiator.windows.base.open',
new_callable=mock.mock_open)
def test_check_valid_device(self, exc, mock_open):
mock_open.side_effect = exc
valid_device = self._connector.check_valid_device(
mock.sentinel.dev_path)
self.assertEqual(not exc, valid_device)
mock_open.assert_any_call(mock.sentinel.dev_path, 'r')
mock_read = mock_open.return_value.__enter__.return_value.read
if not exc:
mock_read.assert_called_once_with(1)
def test_check_device_paths(self):
# We expect an exception to be raised if the same volume
# can be accessed through multiple paths.
device_paths = [mock.sentinel.dev_path_0,
mock.sentinel.dev_path_1]
self.assertRaises(exception.BrickException,
self._connector._check_device_paths,
device_paths)
@mock.patch.object(fake_win_conn.FakeWindowsConnector,
'get_volume_paths')
def test_extend_volume(self, mock_get_vol_paths):
mock_vol_paths = [mock.sentinel.dev_path]
mock_get_vol_paths.return_value = mock_vol_paths
self._connector.extend_volume(mock.sentinel.conn_props)
mock_get_vol_paths.assert_called_once_with(mock.sentinel.conn_props)
mock_get_dev_num = self._diskutils.get_device_number_from_device_name
mock_get_dev_num.assert_called_once_with(mock.sentinel.dev_path)
self._diskutils.refresh_disk.assert_called_once_with(
mock_get_dev_num.return_value)
@mock.patch.object(fake_win_conn.FakeWindowsConnector,
'get_volume_paths')
def test_extend_volume_missing_path(self, mock_get_vol_paths):
mock_get_vol_paths.return_value = []
self.assertRaises(exception.NotFound,
self._connector.extend_volume,
mock.sentinel.conn_props)
mock_get_vol_paths.assert_called_once_with(mock.sentinel.conn_props)
-40
View File
@@ -1,40 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from os_brick import initiator
from os_brick.initiator import connector
from os_brick.initiator.windows import fibre_channel
from os_brick.initiator.windows import iscsi
from os_brick.initiator.windows import smbfs
from os_brick.tests.windows import test_base
@ddt.ddt
class WindowsConnectorFactoryTestCase(test_base.WindowsConnectorTestBase):
@ddt.data({'proto': initiator.ISCSI,
'expected_cls': iscsi.WindowsISCSIConnector},
{'proto': initiator.FIBRE_CHANNEL,
'expected_cls': fibre_channel.WindowsFCConnector},
{'proto': initiator.SMBFS,
'expected_cls': smbfs.WindowsSMBFSConnector})
@ddt.unpack
@mock.patch('sys.platform', 'win32')
def test_factory(self, proto, expected_cls):
obj = connector.InitiatorConnector.factory(proto, None)
self.assertIsInstance(obj, expected_cls)
@@ -1,244 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from os_win import exceptions as os_win_exc
from os_brick import exception
from os_brick.initiator.windows import fibre_channel as fc
from os_brick.tests.windows import test_base
@ddt.ddt
class WindowsFCConnectorTestCase(test_base.WindowsConnectorTestBase):
def setUp(self):
super(WindowsFCConnectorTestCase, self).setUp()
self._connector = fc.WindowsFCConnector(
device_scan_interval=mock.sentinel.rescan_interval)
self._diskutils = self._connector._diskutils
self._fc_utils = self._connector._fc_utils
@ddt.data(True, False)
@mock.patch.object(fc.utilsfactory, 'get_fc_utils')
def test_get_volume_connector_props(self, valid_fc_hba_ports,
mock_get_fc_utils):
fake_fc_hba_ports = [{'node_name': mock.sentinel.node_name,
'port_name': mock.sentinel.port_name},
{'node_name': mock.sentinel.second_node_name,
'port_name': mock.sentinel.second_port_name}]
self._fc_utils = mock_get_fc_utils.return_value
self._fc_utils.get_fc_hba_ports.return_value = (
fake_fc_hba_ports if valid_fc_hba_ports else [])
props = self._connector.get_connector_properties()
self._fc_utils.refresh_hba_configuration.assert_called_once_with()
self._fc_utils.get_fc_hba_ports.assert_called_once_with()
if valid_fc_hba_ports:
expected_props = {
'wwpns': [mock.sentinel.port_name,
mock.sentinel.second_port_name],
'wwnns': [mock.sentinel.node_name,
mock.sentinel.second_node_name]
}
else:
expected_props = {}
self.assertCountEqual(expected_props, props)
@mock.patch.object(fc.WindowsFCConnector, '_get_scsi_wwn')
@mock.patch.object(fc.WindowsFCConnector, 'get_volume_paths')
def test_connect_volume(self, mock_get_vol_paths,
mock_get_scsi_wwn):
mock_get_vol_paths.return_value = [mock.sentinel.dev_name]
mock_get_dev_num = self._diskutils.get_device_number_from_device_name
mock_get_dev_num.return_value = mock.sentinel.dev_num
expected_device_info = dict(type='block',
path=mock.sentinel.dev_name,
number=mock.sentinel.dev_num,
scsi_wwn=mock_get_scsi_wwn.return_value)
device_info = self._connector.connect_volume(mock.sentinel.conn_props)
self.assertEqual(expected_device_info, device_info)
mock_get_vol_paths.assert_called_once_with(mock.sentinel.conn_props)
mock_get_dev_num.assert_called_once_with(mock.sentinel.dev_name)
mock_get_scsi_wwn.assert_called_once_with(mock.sentinel.dev_num)
@mock.patch.object(fc.WindowsFCConnector, 'get_volume_paths')
def test_connect_volume_not_found(self, mock_get_vol_paths):
mock_get_vol_paths.return_value = []
self.assertRaises(exception.NoFibreChannelVolumeDeviceFound,
self._connector.connect_volume,
mock.sentinel.conn_props)
@ddt.data({'volume_mappings': [], 'expected_paths': []},
{'volume_mappings': [dict(device_name='',
fcp_lun=mock.sentinel.fcp_lun)] * 3,
'scsi_id_side_eff': os_win_exc.OSWinException,
'expected_paths': []},
{'volume_mappings': [dict(device_name='',
fcp_lun=mock.sentinel.fcp_lun),
dict(device_name=mock.sentinel.disk_path)],
'expected_paths': [mock.sentinel.disk_path]},
{'volume_mappings': [dict(device_name='',
fcp_lun=mock.sentinel.fcp_lun)],
'scsi_id_side_eff': [[mock.sentinel.disk_path]],
'expected_paths': [mock.sentinel.disk_path]},
{'volume_mappings': [dict(device_name=mock.sentinel.disk_path)],
'use_multipath': True,
'is_mpio_disk': True,
'expected_paths': [mock.sentinel.disk_path]},
{'volume_mappings': [dict(device_name=mock.sentinel.disk_path)],
'use_multipath': True,
'is_mpio_disk': False,
'expected_paths': []})
@ddt.unpack
@mock.patch('time.sleep')
@mock.patch.object(fc.WindowsFCConnector, '_get_fc_volume_mappings')
@mock.patch.object(fc.WindowsFCConnector, '_get_disk_paths_by_scsi_id')
def test_get_volume_paths(self, mock_get_disk_paths_by_scsi_id,
mock_get_fc_mappings,
mock_sleep,
volume_mappings, expected_paths,
scsi_id_side_eff=None,
use_multipath=False,
is_mpio_disk=False):
mock_get_dev_num = self._diskutils.get_device_number_from_device_name
mock_get_fc_mappings.return_value = volume_mappings
mock_get_disk_paths_by_scsi_id.side_effect = scsi_id_side_eff
self._diskutils.is_mpio_disk.return_value = is_mpio_disk
self._connector.use_multipath = use_multipath
vol_paths = self._connector.get_volume_paths(mock.sentinel.conn_props)
self.assertEqual(expected_paths, vol_paths)
# In this test case, either the volume is found after the first
# attempt, either it's not found at all, in which case we'd expect
# the number of retries to be the requested maximum number of rescans.
expected_try_count = (1 if expected_paths
else self._connector.device_scan_attempts)
self._diskutils.rescan_disks.assert_has_calls(
[mock.call()] * expected_try_count)
mock_get_fc_mappings.assert_has_calls(
[mock.call(mock.sentinel.conn_props)] * expected_try_count)
mock_sleep.assert_has_calls(
[mock.call(mock.sentinel.rescan_interval)] *
(expected_try_count - 1))
dev_names = [mapping['device_name']
for mapping in volume_mappings if mapping['device_name']]
if volume_mappings and not dev_names:
mock_get_disk_paths_by_scsi_id.assert_any_call(
mock.sentinel.conn_props,
volume_mappings[0]['fcp_lun'])
if expected_paths and use_multipath:
mock_get_dev_num.assert_called_once_with(expected_paths[0])
self._diskutils.is_mpio_disk.assert_any_call(
mock_get_dev_num.return_value)
@mock.patch.object(fc.WindowsFCConnector, '_get_fc_hba_mappings')
def test_get_fc_volume_mappings(self, mock_get_fc_hba_mappings):
fake_target_wwpn = 'FAKE_TARGET_WWPN'
fake_conn_props = dict(target_lun=mock.sentinel.target_lun,
target_wwn=[fake_target_wwpn])
mock_hba_mappings = {mock.sentinel.node_name: mock.sentinel.hba_ports}
mock_get_fc_hba_mappings.return_value = mock_hba_mappings
all_target_mappings = [{'device_name': mock.sentinel.dev_name,
'port_name': fake_target_wwpn,
'lun': mock.sentinel.target_lun},
{'device_name': mock.sentinel.dev_name_1,
'port_name': mock.sentinel.target_port_name_1,
'lun': mock.sentinel.target_lun},
{'device_name': mock.sentinel.dev_name,
'port_name': mock.sentinel.target_port_name,
'lun': mock.sentinel.target_lun_1}]
expected_mappings = [all_target_mappings[0]]
self._fc_utils.get_fc_target_mappings.return_value = (
all_target_mappings)
volume_mappings = self._connector._get_fc_volume_mappings(
fake_conn_props)
self.assertEqual(expected_mappings, volume_mappings)
def test_get_fc_hba_mappings(self):
fake_fc_hba_ports = [{'node_name': mock.sentinel.node_name,
'port_name': mock.sentinel.port_name}]
self._fc_utils.get_fc_hba_ports.return_value = fake_fc_hba_ports
resulted_mappings = self._connector._get_fc_hba_mappings()
expected_mappings = {
mock.sentinel.node_name: [mock.sentinel.port_name]}
self.assertEqual(expected_mappings, resulted_mappings)
@mock.patch.object(fc.WindowsFCConnector, '_get_dev_nums_by_scsi_id')
def test_get_disk_paths_by_scsi_id(self, mock_get_dev_nums):
remote_wwpns = [mock.sentinel.remote_wwpn_0,
mock.sentinel.remote_wwpn_1]
fake_init_target_map = {mock.sentinel.local_wwpn: remote_wwpns}
conn_props = dict(initiator_target_map=fake_init_target_map)
mock_get_dev_nums.side_effect = [os_win_exc.FCException,
[mock.sentinel.dev_num]]
mock_get_dev_name = self._diskutils.get_device_name_by_device_number
mock_get_dev_name.return_value = mock.sentinel.dev_name
disk_paths = self._connector._get_disk_paths_by_scsi_id(
conn_props, mock.sentinel.fcp_lun)
self.assertEqual([mock.sentinel.dev_name], disk_paths)
mock_get_dev_nums.assert_has_calls([
mock.call(mock.sentinel.local_wwpn,
remote_wwpn,
mock.sentinel.fcp_lun)
for remote_wwpn in remote_wwpns])
mock_get_dev_name.assert_called_once_with(mock.sentinel.dev_num)
@mock.patch.object(fc.WindowsFCConnector, '_get_fc_hba_wwn_for_port')
def test_get_dev_nums_by_scsi_id(self, mock_get_fc_hba_wwn):
fake_identifier = dict(id=mock.sentinel.id,
type=mock.sentinel.type)
mock_get_fc_hba_wwn.return_value = mock.sentinel.local_wwnn
self._fc_utils.get_scsi_device_identifiers.return_value = [
fake_identifier]
self._diskutils.get_disk_numbers_by_unique_id.return_value = (
mock.sentinel.dev_nums)
dev_nums = self._connector._get_dev_nums_by_scsi_id(
mock.sentinel.local_wwpn,
mock.sentinel.remote_wwpn,
mock.sentinel.fcp_lun)
self.assertEqual(mock.sentinel.dev_nums, dev_nums)
mock_get_fc_hba_wwn.assert_called_once_with(mock.sentinel.local_wwpn)
self._fc_utils.get_scsi_device_identifiers.assert_called_once_with(
mock.sentinel.local_wwnn, mock.sentinel.local_wwpn,
mock.sentinel.remote_wwpn, mock.sentinel.fcp_lun)
self._diskutils.get_disk_numbers_by_unique_id.assert_called_once_with(
unique_id=mock.sentinel.id,
unique_id_format=mock.sentinel.type)
-195
View File
@@ -1,195 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from os_win import exceptions as os_win_exc
from os_brick import exception
from os_brick.initiator.windows import iscsi
from os_brick.tests.windows import test_base
@ddt.ddt
class WindowsISCSIConnectorTestCase(test_base.WindowsConnectorTestBase):
@mock.patch.object(iscsi.WindowsISCSIConnector, 'validate_initiators')
def setUp(self, mock_validate_connectors):
super(WindowsISCSIConnectorTestCase, self).setUp()
self._diskutils = mock.Mock()
self._iscsi_utils = mock.Mock()
self._connector = iscsi.WindowsISCSIConnector(
device_scan_interval=mock.sentinel.rescan_interval)
self._connector._diskutils = self._diskutils
self._connector._iscsi_utils = self._iscsi_utils
@ddt.data({'requested_initiators': [mock.sentinel.initiator_0],
'available_initiators': [mock.sentinel.initiator_0,
mock.sentinel.initiator_1]},
{'requested_initiators': [mock.sentinel.initiator_0],
'available_initiators': [mock.sentinel.initiator_1]},
{'requested_initiators': [],
'available_initiators': [mock.sentinel.software_initiator]})
@ddt.unpack
def test_validate_initiators(self, requested_initiators,
available_initiators):
self._iscsi_utils.get_iscsi_initiators.return_value = (
available_initiators)
self._connector.initiator_list = requested_initiators
expected_valid_initiator = not (
set(requested_initiators).difference(set(available_initiators)))
valid_initiator = self._connector.validate_initiators()
self.assertEqual(expected_valid_initiator, valid_initiator)
def test_get_initiator(self):
initiator = self._connector.get_initiator()
self.assertEqual(self._iscsi_utils.get_iscsi_initiator.return_value,
initiator)
@mock.patch.object(iscsi, 'utilsfactory')
def test_get_connector_properties(self, mock_utilsfactory):
mock_iscsi_utils = (
mock_utilsfactory.get_iscsi_initiator_utils.return_value)
props = self._connector.get_connector_properties()
expected_props = dict(
initiator=mock_iscsi_utils.get_iscsi_initiator.return_value)
self.assertEqual(expected_props, props)
@mock.patch.object(iscsi.WindowsISCSIConnector, '_get_all_targets')
def test_get_all_paths(self, mock_get_all_targets):
initiators = [mock.sentinel.initiator_0, mock.sentinel.initiator_1]
all_targets = [(mock.sentinel.portal_0, mock.sentinel.target_0,
mock.sentinel.lun_0),
(mock.sentinel.portal_1, mock.sentinel.target_1,
mock.sentinel.lun_1)]
self._connector.initiator_list = initiators
mock_get_all_targets.return_value = all_targets
expected_paths = [
(initiator_name, target_portal, target_iqn, target_lun)
for target_portal, target_iqn, target_lun in all_targets
for initiator_name in initiators]
all_paths = self._connector._get_all_paths(mock.sentinel.conn_props)
self.assertEqual(expected_paths, all_paths)
mock_get_all_targets.assert_called_once_with(mock.sentinel.conn_props)
@ddt.data(True, False)
@mock.patch.object(iscsi.WindowsISCSIConnector, '_get_scsi_wwn')
@mock.patch.object(iscsi.WindowsISCSIConnector, '_get_all_paths')
def test_connect_volume(self, use_multipath,
mock_get_all_paths, mock_get_scsi_wwn):
fake_paths = [(mock.sentinel.initiator_name,
mock.sentinel.target_portal,
mock.sentinel.target_iqn,
mock.sentinel.target_lun)] * 3
fake_conn_props = dict(auth_username=mock.sentinel.auth_username,
auth_password=mock.sentinel.auth_password)
mock_get_all_paths.return_value = fake_paths
self._iscsi_utils.login_storage_target.side_effect = [
os_win_exc.OSWinException, None, None]
self._iscsi_utils.get_device_number_and_path.return_value = (
mock.sentinel.device_number, mock.sentinel.device_path)
self._connector.use_multipath = use_multipath
device_info = self._connector.connect_volume(fake_conn_props)
expected_device_info = dict(type='block',
path=mock.sentinel.device_path,
number=mock.sentinel.device_number,
scsi_wwn=mock_get_scsi_wwn.return_value)
self.assertEqual(expected_device_info, device_info)
mock_get_all_paths.assert_called_once_with(fake_conn_props)
expected_login_attempts = 3 if use_multipath else 2
self._iscsi_utils.login_storage_target.assert_has_calls(
[mock.call(target_lun=mock.sentinel.target_lun,
target_iqn=mock.sentinel.target_iqn,
target_portal=mock.sentinel.target_portal,
auth_username=mock.sentinel.auth_username,
auth_password=mock.sentinel.auth_password,
mpio_enabled=use_multipath,
initiator_name=mock.sentinel.initiator_name,
ensure_lun_available=False)] *
expected_login_attempts)
self._iscsi_utils.get_device_number_and_path.assert_called_once_with(
mock.sentinel.target_iqn, mock.sentinel.target_lun,
retry_attempts=self._connector.device_scan_attempts,
retry_interval=self._connector.device_scan_interval,
rescan_disks=True,
ensure_mpio_claimed=use_multipath)
mock_get_scsi_wwn.assert_called_once_with(mock.sentinel.device_number)
@mock.patch.object(iscsi.WindowsISCSIConnector, '_get_all_paths')
def test_connect_volume_exc(self, mock_get_all_paths):
fake_paths = [(mock.sentinel.initiator_name,
mock.sentinel.target_portal,
mock.sentinel.target_iqn,
mock.sentinel.target_lun)] * 3
mock_get_all_paths.return_value = fake_paths
self._iscsi_utils.login_storage_target.side_effect = (
os_win_exc.OSWinException)
self._connector.use_multipath = True
self.assertRaises(exception.BrickException,
self._connector.connect_volume,
connection_properties={})
@mock.patch.object(iscsi.WindowsISCSIConnector, '_get_all_targets')
def test_disconnect_volume(self, mock_get_all_targets):
targets = [
(mock.sentinel.portal_0, mock.sentinel.tg_0, mock.sentinel.lun_0),
(mock.sentinel.portal_1, mock.sentinel.tg_1, mock.sentinel.lun_1)]
mock_get_all_targets.return_value = targets
self._iscsi_utils.get_target_luns.return_value = [mock.sentinel.lun_0]
self._connector.disconnect_volume(mock.sentinel.conn_props,
mock.sentinel.dev_info)
self._diskutils.rescan_disks.assert_called_once_with()
mock_get_all_targets.assert_called_once_with(mock.sentinel.conn_props)
self._iscsi_utils.logout_storage_target.assert_called_once_with(
mock.sentinel.tg_0)
self._iscsi_utils.get_target_luns.assert_has_calls(
[mock.call(mock.sentinel.tg_0), mock.call(mock.sentinel.tg_1)])
@mock.patch.object(iscsi.WindowsISCSIConnector, '_get_all_targets')
@mock.patch.object(iscsi.WindowsISCSIConnector, '_check_device_paths')
def test_get_volume_paths(self, mock_check_dev_paths,
mock_get_all_targets):
targets = [
(mock.sentinel.portal_0, mock.sentinel.tg_0, mock.sentinel.lun_0),
(mock.sentinel.portal_1, mock.sentinel.tg_1, mock.sentinel.lun_1)]
mock_get_all_targets.return_value = targets
self._iscsi_utils.get_device_number_and_path.return_value = [
mock.sentinel.dev_num, mock.sentinel.dev_path]
volume_paths = self._connector.get_volume_paths(
mock.sentinel.conn_props)
expected_paths = [mock.sentinel.dev_path]
self.assertEqual(expected_paths, volume_paths)
mock_check_dev_paths.assert_called_once_with(set(expected_paths))
-130
View File
@@ -1,130 +0,0 @@
# Copyright 2020 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from unittest import mock
import ddt
from oslo_concurrency import processutils
from os_brick import exception
from os_brick.initiator.windows import rbd
from os_brick.tests.initiator.connectors import test_base_rbd
from os_brick.tests.windows import test_base
@ddt.ddt
class WindowsRBDConnectorTestCase(test_base_rbd.RBDConnectorTestMixin,
test_base.WindowsConnectorTestBase):
def setUp(self):
super(WindowsRBDConnectorTestCase, self).setUp()
self._diskutils = mock.Mock()
self._execute = mock.Mock(return_value=['fake_stdout', 'fake_stderr'])
self._conn = rbd.WindowsRBDConnector(execute=self._execute)
self._conn._diskutils = self._diskutils
self.dev_name = '\\\\.\\PhysicalDrive5'
@ddt.data(True, False)
def test_check_rbd(self, rbd_available):
self._execute.side_effect = (
None if rbd_available
else processutils.ProcessExecutionError)
self.assertEqual(rbd_available, self._conn._check_rbd())
if rbd_available:
self._conn._ensure_rbd_available()
else:
self.assertRaises(exception.BrickException,
self._conn._ensure_rbd_available)
expected_cmd = ['where.exe', 'rbd']
self._execute.assert_any_call(*expected_cmd)
@mock.patch.object(rbd.WindowsRBDConnector, 'get_device_name')
def test_get_volume_paths(self, mock_get_dev_name):
vol_paths = self._conn.get_volume_paths(mock.sentinel.conn_props)
self.assertEqual([mock_get_dev_name.return_value], vol_paths)
mock_get_dev_name.assert_called_once_with(mock.sentinel.conn_props)
@ddt.data(True, False)
@mock.patch.object(rbd.WindowsRBDConnector, 'get_device_name')
@mock.patch('oslo_utils.eventletutils.EventletEvent.wait')
def test_wait_for_volume(self, device_found, mock_wait, mock_get_dev_name):
mock_open = mock.mock_open()
if device_found:
mock_get_dev_name.return_value = mock.sentinel.dev_name
else:
# First call fails to locate the device, the following ones can't
# open it.
mock_get_dev_name.side_effect = (
[None] +
[mock.sentinel.dev_name] * self._conn.device_scan_attempts)
mock_open.side_effect = FileNotFoundError
with mock.patch.object(rbd, 'open', mock_open,
create=True):
if device_found:
dev_name = self._conn._wait_for_volume(
self.connection_properties)
self.assertEqual(mock.sentinel.dev_name, dev_name)
else:
self.assertRaises(exception.VolumeDeviceNotFound,
self._conn._wait_for_volume,
self.connection_properties)
mock_open.assert_any_call(mock.sentinel.dev_name, 'rb')
mock_get_dev_name.assert_any_call(self.connection_properties,
expect=False)
@mock.patch.object(rbd.WindowsRBDConnector, '_wait_for_volume')
@mock.patch.object(rbd.WindowsRBDConnector, 'get_device_name')
def test_connect_volume(self, mock_get_dev_name, mock_wait_vol):
mock_get_dev_name.return_value = None
mock_wait_vol.return_value = self.dev_name
ret_val = self._conn.connect_volume(self.connection_properties)
exp_ret_val = {
'path': self.dev_name,
'type': 'block'
}
self.assertEqual(exp_ret_val, ret_val)
exp_exec_args = ['rbd', 'device', 'map', self.image_name]
exp_exec_args += self._conn._get_rbd_args(self.connection_properties)
self._execute.assert_any_call(*exp_exec_args)
mock_wait_vol.assert_called_once_with(self.connection_properties)
mock_get_dev_num = self._diskutils.get_device_number_from_device_name
mock_get_dev_num.assert_called_once_with(self.dev_name)
self._diskutils.set_disk_offline.assert_called_once_with(
mock_get_dev_num.return_value)
@ddt.data(True, False)
@mock.patch.object(rbd.WindowsRBDConnector, 'get_device_name')
def test_disconnect_volume(self, force, mock_get_dev_name):
mock_get_dev_name.return_value = self.dev_name
self._conn.disconnect_volume(self.connection_properties, force=force)
exp_exec_args = ['rbd', 'device', 'unmap', self.image_name]
exp_exec_args += self._conn._get_rbd_args(self.connection_properties)
if force:
exp_exec_args += ["-o", "hard-disconnect"]
self._execute.assert_any_call(*exp_exec_args)
-177
View File
@@ -1,177 +0,0 @@
# Copyright 2016 Cloudbase Solutions Srl
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import os
from unittest import mock
import ddt
from os_brick.initiator.windows import smbfs
from os_brick.remotefs import windows_remotefs
from os_brick.tests.windows import test_base
@ddt.ddt
class WindowsSMBFSConnectorTestCase(test_base.WindowsConnectorTestBase):
def setUp(self):
super(WindowsSMBFSConnectorTestCase, self).setUp()
self._load_connector()
@mock.patch.object(windows_remotefs, 'WindowsRemoteFsClient')
def _load_connector(self, mock_remotefs_cls, *args, **kwargs):
self._connector = smbfs.WindowsSMBFSConnector(*args, **kwargs)
self._remotefs = mock_remotefs_cls.return_value
self._vhdutils = self._connector._vhdutils
self._diskutils = self._connector._diskutils
@mock.patch.object(smbfs.WindowsSMBFSConnector, '_get_disk_path')
@mock.patch.object(smbfs.WindowsSMBFSConnector, 'ensure_share_mounted')
def test_connect_volume(self, mock_ensure_mounted,
mock_get_disk_path):
device_info = self._connector.connect_volume(mock.sentinel.conn_props)
expected_info = dict(type='file',
path=mock_get_disk_path.return_value)
self.assertEqual(expected_info, device_info)
mock_ensure_mounted.assert_called_once_with(mock.sentinel.conn_props)
mock_get_disk_path.assert_called_once_with(mock.sentinel.conn_props)
@ddt.data(True, False)
@mock.patch.object(smbfs.WindowsSMBFSConnector, '_get_disk_path')
@mock.patch.object(smbfs.WindowsSMBFSConnector, 'ensure_share_mounted')
def test_connect_and_mount_volume(self, read_only,
mock_ensure_mounted,
mock_get_disk_path):
self._load_connector(expect_raw_disk=True)
fake_conn_props = dict(access_mode='ro' if read_only else 'rw')
self._vhdutils.get_virtual_disk_physical_path.return_value = (
mock.sentinel.raw_disk_path)
mock_get_disk_path.return_value = mock.sentinel.image_path
device_info = self._connector.connect_volume(fake_conn_props)
expected_info = dict(type='file',
path=mock.sentinel.raw_disk_path)
self.assertEqual(expected_info, device_info)
self._vhdutils.attach_virtual_disk.assert_called_once_with(
mock.sentinel.image_path,
read_only=read_only)
self._vhdutils.get_virtual_disk_physical_path.assert_called_once_with(
mock.sentinel.image_path)
get_dev_num = self._diskutils.get_device_number_from_device_name
get_dev_num.assert_called_once_with(mock.sentinel.raw_disk_path)
self._diskutils.set_disk_offline.assert_called_once_with(
get_dev_num.return_value)
@mock.patch.object(smbfs.WindowsSMBFSConnector, '_get_disk_path')
@mock.patch.object(smbfs.WindowsSMBFSConnector, '_get_export_path')
def test_disconnect_volume(self, mock_get_export_path,
mock_get_disk_path):
self._connector.disconnect_volume(mock.sentinel.conn_props,
mock.sentinel.dev_info)
mock_get_disk_path.assert_called_once_with(
mock.sentinel.conn_props)
self._vhdutils.detach_virtual_disk.assert_called_once_with(
mock_get_disk_path.return_value)
self._remotefs.unmount.assert_called_once_with(
mock_get_export_path.return_value)
mock_get_export_path.assert_called_once_with(mock.sentinel.conn_props)
def test_get_export_path(self):
fake_export = '//ip/share'
fake_conn_props = dict(export=fake_export)
expected_export = fake_export.replace('/', '\\')
export_path = self._connector._get_export_path(fake_conn_props)
self.assertEqual(expected_export, export_path)
@ddt.data({},
{'mount_base': mock.sentinel.mount_base},
{'is_local_share': True},
{'is_local_share': True,
'local_path_for_loopbk': True})
@ddt.unpack
def test_get_disk_path(self, mount_base=None,
local_path_for_loopbk=False,
is_local_share=False):
fake_mount_point = r'C:\\fake_mount_point'
fake_share_name = 'fake_share'
fake_local_share_path = 'C:\\%s' % fake_share_name
fake_export_path = '\\\\host\\%s' % fake_share_name
fake_disk_name = 'fake_disk.vhdx'
fake_conn_props = dict(name=fake_disk_name,
export=fake_export_path)
self._remotefs.get_mount_base.return_value = mount_base
self._remotefs.get_mount_point.return_value = fake_mount_point
self._remotefs.get_local_share_path.return_value = (
fake_local_share_path)
self._remotefs.get_share_name.return_value = fake_share_name
self._connector._local_path_for_loopback = local_path_for_loopbk
self._connector._smbutils.is_local_share.return_value = is_local_share
expecting_local = local_path_for_loopbk and is_local_share
if mount_base:
expected_export_path = fake_mount_point
elif expecting_local:
# In this case, we expect the local share export path to be
# used directly.
expected_export_path = fake_local_share_path
else:
expected_export_path = fake_export_path
expected_disk_path = os.path.join(expected_export_path,
fake_disk_name)
disk_path = self._connector._get_disk_path(fake_conn_props)
self.assertEqual(expected_disk_path, disk_path)
if mount_base:
self._remotefs.get_mount_point.assert_called_once_with(
fake_export_path)
elif expecting_local:
self._connector._smbutils.is_local_share.assert_called_once_with(
fake_export_path)
self._remotefs.get_local_share_path.assert_called_once_with(
fake_export_path)
def test_get_search_path(self):
search_path = self._connector.get_search_path()
self.assertEqual(search_path,
self._remotefs.get_mount_base.return_value)
@mock.patch.object(smbfs.WindowsSMBFSConnector, '_get_disk_path')
def test_volume_paths(self, mock_get_disk_path):
expected_paths = [mock_get_disk_path.return_value]
volume_paths = self._connector.get_volume_paths(
mock.sentinel.conn_props)
self.assertEqual(expected_paths, volume_paths)
mock_get_disk_path.assert_called_once_with(
mock.sentinel.conn_props)
@mock.patch.object(smbfs.WindowsSMBFSConnector, '_get_export_path')
def test_ensure_share_mounted(self, mock_get_export_path):
fake_conn_props = dict(options=mock.sentinel.mount_opts)
self._connector.ensure_share_mounted(fake_conn_props)
self._remotefs.mount.assert_called_once_with(
mock_get_export_path.return_value,
mock.sentinel.mount_opts)
-1
View File
@@ -14,5 +14,4 @@ oslo.service>=2.8.0 # Apache-2.0
oslo.utils>=7.3.0 # Apache-2.0
requests>=2.25.1 # Apache-2.0
tenacity>=6.3.1 # Apache-2.0
os-win>=5.7.0 # Apache-2.0
psutil>=5.7.2 # BSD
-1
View File
@@ -1,6 +1,5 @@
hacking>=7.0.0,<7.1.0 # Apache-2.0
flake8-import-order # LGPLv3
flake8-logging-format>=0.6.0 # Apache-2.0
coverage>=5.5 # Apache-2.0
ddt>=1.4.1 # MIT
oslotest>=4.5.0 # Apache-2.0