Refactor iSCSI disconnect

This patch refactors iSCSI disconnect code changing the approach to one
that just uses `iscsiadm -m session` and sysfs to get all the required
information: devices from the connection, multipath system device name,
multipath name, the WWN for the block devices...

By doing so, not only do we fix a good number of bugs, but we also
improve the reliability and speed of the mechanism.

A good example of improvements and benefits achieved by this patch are:

- Common code for multipath and single path disconnects.

- No more querying iSCSI devices for their WWN (page 0x83) removing
  delays and issue on flaky connections.

- All devices are properly cleaned even if they are not part of the
  multipath.

- We wait for device removal and do it in parallel if there are
  multiple.

- Removed usage of `multipath -l` to find devices which is really slow
  with flaky connections and didn't work when called with a device from
  a path that is down.

- Prevent losing data when detaching, currently if the multipath flush
  fails for any other reason than "in use" we silently continue with the
  removal.  That is the case when all paths are momentarily down.

- Adds a new mechanism for the caller of the disconnect to specify that
  it's acceptable to lose data and that it's more important to leave a
  clean system.  That is the case if we are creating a volume from an
  image, since the volume will just be set to error, but we don't want
  leftovers.  Optionally we can tell os-brick to ignore errors and don't
  raise an exception if the flush fails.

- Add a warning when we could be leaving leftovers behind due to
  disconnect issues.

- Action retries (like multipath flush) will now only log the final
  exception instead of logging all the exceptions.

- Flushes of individual paths now use exponential backoff retries
  instead of random retries between 0.2 and 2 seconds (from oslo
  library).

- We no longer use symlinks from `/dev/disk/by-path`, `/dev/disk/by-id`,
  or `/dev/mapper` to find devices or multipaths, as they could be
  leftovers from previous runs.

- With high failure rates (above 30%) some CLI calls will enter into a
  weird state where they wait forever, so we add a timeout mechanism in
  our `execute` method and add it to those specific calls.

Closes-Bug: #1502534
Change-Id: I058ff0a0e5ad517507dc3cda39087c913558561d
This commit is contained in:
Gorka Eguileor 2017-04-04 12:36:03 +02:00
parent 7fc3bf912a
commit 400ca5d6db
29 changed files with 955 additions and 650 deletions

View File

@ -14,7 +14,9 @@
"""Exceptions for the Brick library."""
from oslo_concurrency import processutils as putils
import six
import traceback
from os_brick.i18n import _
from oslo_log import log as logging
@ -167,3 +169,70 @@ class HostChannelsTargetsNotFound(BrickException):
def __init__(self, message=None, iqns=None, found=None):
super(HostChannelsTargetsNotFound, self).__init__(message, iqns=iqns)
self.found = found
class ExceptionChainer(BrickException):
"""A Exception that can contain a group of exceptions.
This exception serves as a container for exceptions, useful when we want to
store all exceptions that happened during a series of steps and then raise
them all together as one.
The representation of the exception will include all exceptions and their
tracebacks.
This class also includes a context manager for convenience, one that will
support both swallowing the exception as if nothing had happened and
raising the exception. In both cases the exception will be stored.
If a message is provided to the context manager it will be formatted and
logged with warning level.
"""
def __init__(self, *args, **kwargs):
self._exceptions = []
self._repr = None
super(ExceptionChainer, self).__init__(*args, **kwargs)
def __repr__(self):
# Since generating the representation can be slow we cache it
if not self._repr:
tracebacks = (
''.join(traceback.format_exception(*e)).replace('\n', '\n\t')
for e in self._exceptions)
self._repr = '\n'.join('\nChained Exception #%s\n\t%s' % (i + 1, t)
for i, t in enumerate(tracebacks))
return self._repr
__str__ = __unicode__ = __repr__
def __nonzero__(self):
# We want to be able to do boolean checks on the exception
return bool(self._exceptions)
__bool__ = __nonzero__ # For Python 3
def add_exception(self, exc_type, exc_val, exc_tb):
# Clear the representation cache
self._repr = None
self._exceptions.append((exc_type, exc_val, exc_tb))
def context(self, catch_exception, msg='', *msg_args):
self._catch_exception = catch_exception
self._exc_msg = msg
self._exc_msg_args = msg_args
return self
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type:
self.add_exception(exc_type, exc_val, exc_tb)
if self._exc_msg:
LOG.warning(self._exc_msg, *self._exc_msg_args)
if self._catch_exception:
return True
class ExecutionTimeout(putils.ProcessExecutionError):
pass

View File

@ -25,7 +25,6 @@ import re
DEVICE_SCAN_ATTEMPTS_DEFAULT = 3
MULTIPATH_ERROR_REGEX = re.compile("\w{3} \d+ \d\d:\d\d:\d\d \|.*$")
MULTIPATH_DEV_CHECK_REGEX = re.compile("\s+dm-\d+\s+")
MULTIPATH_PATH_CHECK_REGEX = re.compile("\s+\d+:\d+:\d+:\d+\s+")
PLATFORM_ALL = 'ALL'

View File

@ -43,7 +43,6 @@ synchronized = lockutils.synchronized_with_prefix('os-brick-')
DEVICE_SCAN_ATTEMPTS_DEFAULT = 3
MULTIPATH_ERROR_REGEX = re.compile("\w{3} \d+ \d\d:\d\d:\d\d \|.*$")
MULTIPATH_DEV_CHECK_REGEX = re.compile("\s+dm-\d+\s+")
MULTIPATH_PATH_CHECK_REGEX = re.compile("\s+\d+:\d+:\d+:\d+\s+")
PLATFORM_ALL = 'ALL'

View File

@ -124,7 +124,8 @@ class AoEConnector(base.BaseLinuxConnector):
@utils.trace
@lockutils.synchronized('aoe_control', 'aoe-')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Detach and flush the volume.
:param connection_properties: The dictionary that describes all

View File

@ -113,7 +113,8 @@ class DISCOConnector(base.BaseLinuxConnector):
@utils.trace
@synchronized('connect_volume')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Detach the volume from instance."""
disco_id = connection_properties['disco_id']
disco_dev = '/dev/dms%s' % (disco_id)

View File

@ -90,7 +90,8 @@ class DRBDConnector(base.BaseLinuxConnector):
return device_info
@utils.trace
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Detach the volume."""
self._drbdadm_command("down", connection_properties,

View File

@ -27,7 +27,8 @@ class FakeConnector(base.BaseLinuxConnector):
'path': self.fake_path}
return fake_device_info
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
pass
def get_volume_paths(self, connection_properties):

View File

@ -243,7 +243,8 @@ class FibreChannelConnector(base.BaseLinuxConnector):
@utils.trace
@synchronized('connect_volume')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Detach the volume from instance_name.
:param connection_properties: The dictionary that describes all

View File

@ -142,7 +142,8 @@ class HGSTConnector(base.BaseLinuxConnector):
return device_info
@utils.trace
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Detach and flush the volume.
:param connection_properties: The dictionary that describes all

View File

@ -117,7 +117,8 @@ class HuaweiStorHyperConnector(base.BaseLinuxConnector):
@utils.trace
@synchronized('connect_volume')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect a volume from the local host.
:param connection_properties: The dictionary that describes all

View File

@ -17,7 +17,6 @@ import collections
import copy
import glob
import os
import re
import time
from oslo_concurrency import lockutils
@ -106,27 +105,63 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
return volume_paths
def _get_iscsi_sessions(self):
def _get_iscsi_sessions_full(self):
"""Get iSCSI session information as a list of tuples.
Uses iscsiadm -m session and from a command output like
tcp: [1] 192.168.121.250:3260,1 iqn.2010-10.org.openstack:volume-
This method will drop the node type and return a list like this:
[('tcp:', '1', '192.168.121.250:3260', '1',
'iqn.2010-10.org.openstack:volume-')]
"""
out, err = self._run_iscsi_session()
iscsi_sessions = []
if err:
LOG.warning("Couldn't find iscsi sessions because "
"iscsiadm err: %s",
err)
else:
# parse the output from iscsiadm
# lines are in the format of
# tcp: [1] 192.168.121.250:3260,1 iqn.2010-10.org.openstack:volume-
lines = out.split('\n')
for line in lines:
if line:
entries = line.split()
portal = entries[2].split(',')
iscsi_sessions.append(portal[0])
"iscsiadm err: %s", err)
return []
return iscsi_sessions
# Parse and clean the output from iscsiadm, which is in the form of:
# transport_name: [session_id] ip_address:port,tpgt iqn node_type
lines = []
for line in out.splitlines():
if line:
info = line.split()
sid = info[1][1:-1]
portal, tpgt = info[2].split(',')
lines.append((info[0], sid, portal, tpgt, info[3]))
return lines
def _get_iscsi_nodes(self):
"""Get iSCSI node information (portal, iqn) as a list of tuples.
Uses iscsi_adm -m node and from a command output like
192.168.121.250:3260,1 iqn.2010-10.org.openstack:volume
This method will drop the tpgt and return a list like this:
[('192.168.121.250:3260', 'iqn.2010-10.org.openstack:volume')]
"""
out, err = self._execute('iscsiadm', '-m', 'node', run_as_root=True,
root_helper=self._root_helper,
check_exit_code=False)
if err:
LOG.warning("Couldn't find iSCSI nodes because iscsiadm err: %s",
err)
return []
# Parse and clean the output from iscsiadm which is in the form of:
# ip_addresss:port,tpgt iqn
lines = []
for line in out.splitlines():
if line:
info = line.split()
lines.append((info[0].split(',')[0], info[1]))
return lines
def _get_iscsi_sessions(self):
"""Return portals for all existing sessions."""
# entry: [tcp, [1], 192.168.121.250:3260,1 ...]
return [entry[2] for entry in self._get_iscsi_sessions_full()]
def _get_potential_volume_paths(self, connection_properties,
connect_to_portal=True,
@ -471,85 +506,103 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
LOG.debug("connect_volume returning %s", device_info)
return device_info
def _get_connection_devices(self, connection_properties):
"""Get map of devices by sessions from our connection.
For each of the TCP sessions that correspond to our connection
properties we generate a map of (ip, iqn) to (belong, other) where
belong is a set of devices in that session that populated our system
when we did a connection using connection properties, and other are
any other devices that share that same session but are the result of
connecting with different connection properties.
We also include all nodes from our connection that don't have a
session.
"""
ips_iqns_luns = self._get_all_targets(connection_properties)
nodes = self._get_iscsi_nodes()
sessions = self._get_iscsi_sessions_full()
# Use (portal, iqn) to map the session value
sessions_map = {(s[2], s[4]): s[1] for s in sessions if s[0] == 'tcp:'}
# device_map will keep a tuple with devices from the connection and
# others that don't belong to this connection" (belong, others)
device_map = collections.defaultdict(lambda: (set(), set()))
for ip, iqn, lun in ips_iqns_luns:
session = sessions_map.get((ip, iqn))
# Our nodes that don't have a session will be returned as empty
if not session:
if (ip, iqn) in nodes:
device_map[(ip, iqn)] = (set(), set())
continue
# Get all devices for the session
paths = glob.glob('/sys/class/scsi_host/host*/device/session' +
session + '/target*/*:*:*:*/block/*')
belong, others = device_map[(ip, iqn)]
for path in paths:
__, hctl, __, device = path.rsplit('/', 3)
lun_path = int(hctl.rsplit(':', 1)[-1])
# For partitions turn them into the whole device: sde1 -> sde
device = device.strip('0123456789')
if lun_path == lun:
belong.add(device)
else:
others.add(device)
return device_map
@utils.trace
@synchronized('connect_volume')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Detach the volume from instance_name.
:param connection_properties: The dictionary that describes all
of the target volume attributes.
:type connection_properties: dict
:type connection_properties: dict that must include:
target_portal(s) - IP and optional port
target_iqn(s) - iSCSI Qualified Name
target_lun(s) - LUN id of the volume
:param device_info: historical difference, but same as connection_props
:type device_info: dict
connection_properties for iSCSI must include:
target_portal(s) - IP and optional port
target_iqn(s) - iSCSI Qualified Name
target_lun(s) - LUN id of the volume
:param force: Whether to forcefully disconnect even if flush fails.
:type force: bool
:param ignore_errors: When force is True, this will decide whether to
ignore errors or raise an exception once finished
the operation. Default is False.
:type ignore_errors: bool
"""
if self.use_multipath:
host_device = multipath_device = None
host_devices = self._get_device_path(connection_properties)
# Choose an accessible host device
for dev in host_devices:
if os.path.exists(dev):
host_device = dev
device_wwn = self._linuxscsi.get_scsi_wwn(dev)
(multipath_device, multipath_id) = (super(
ISCSIConnector, self)._discover_mpath_device(
device_wwn, connection_properties, dev))
if multipath_device:
break
if not host_device:
LOG.error("No accessible volume device: %(host_devices)s",
{'host_devices': host_devices})
raise exception.VolumeDeviceNotFound(device=host_devices)
exc = exception.ExceptionChainer()
devices_map = self._get_connection_devices(connection_properties)
if multipath_device:
device_realpath = os.path.realpath(host_device)
self._linuxscsi.remove_multipath_device(device_realpath)
return self._disconnect_volume_multipath_iscsi(
connection_properties, multipath_device)
# Remove devices and multipath from this connection
remove_devices = set()
for remove, __ in devices_map.values():
remove_devices.update(remove)
multipath_name = self._linuxscsi.remove_connection(remove_devices,
self.use_multipath,
force, exc)
# When multiple portals/iqns/luns are specified, we need to remove
# unused devices created by logging into other LUNs' session.
for props in self._iterate_all_targets(connection_properties):
self._disconnect_volume_iscsi(props)
# Disconnect sessions and remove nodes that are left without devices
disconnect = [conn for conn, (__, keep) in devices_map.items()
if not keep]
self._disconnect_connection(connection_properties, disconnect, force,
exc)
def _disconnect_volume_iscsi(self, connection_properties):
# remove the device from the scsi subsystem
# this eliminates any stale entries until logout
host_devices = self._get_device_path(connection_properties)
# If flushing the multipath failed before, try now after we have
# removed the devices and we may have even logged off (only reaches
# here with multipath_name if force=True).
if multipath_name:
LOG.debug('Flushing again multipath %s now that we removed the '
'devices.', multipath_name)
self._linuxscsi.flush_multipath_device(multipath_name)
if host_devices:
host_device = host_devices[0]
else:
return
dev_name = self._linuxscsi.get_name_from_path(host_device)
if dev_name:
self._linuxscsi.remove_scsi_device(dev_name)
# NOTE(jdg): On busy systems we can have a race here
# where remove_iscsi_device is called before the device file
# has actually been removed. The result is an orphaned
# iscsi session that never gets logged out. The following
# call to wait addresses that issue.
self._linuxscsi.wait_for_volume_removal(host_device)
# NOTE(vish): Only disconnect from the target if no luns from the
# target are in use.
device_byname = ("ip-%(portal)s-iscsi-%(iqn)s-lun-" %
{'portal': connection_properties['target_portal'],
'iqn': connection_properties['target_iqn']})
devices = self.driver.get_all_block_devices()
devices = [dev for dev in devices if (device_byname in dev
and
dev.startswith(
'/dev/disk/by-path/'))
and os.path.exists(dev)]
if not devices:
self._disconnect_from_iscsi_portal(connection_properties)
if exc:
LOG.warning('There were errors removing %s, leftovers may remain '
'in the system', remove_devices)
if not ignore_errors:
raise exc
def _munge_portal(self, target):
"""Remove brackets from portal.
@ -635,65 +688,6 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
iqns.append(data[1])
return ips, iqns
def _disconnect_volume_multipath_iscsi(self, connection_properties,
multipath_name):
"""This removes a multipath device and it's LUNs."""
LOG.debug("Disconnect multipath device %s", multipath_name)
mpath_map = self._get_multipath_device_map()
block_devices = self.driver.get_all_block_devices()
devices = []
for dev in block_devices:
if os.path.exists(dev):
if "/mapper/" in dev:
devices.append(dev)
else:
mpdev = mpath_map.get(dev)
if mpdev:
devices.append(mpdev)
# Do a discovery to find all targets.
# Targets for multiple paths for the same multipath device
# may not be the same.
all_ips_iqns_luns = self._discover_iscsi_portals(connection_properties)
# As discovery result may contain other targets' iqns, extract targets
# to be disconnected whose block devices are already deleted here.
ips_iqns = []
entries = [device.lstrip('ip-').split('-lun-')[0]
for device in self._get_iscsi_devices()]
for ip, iqn, lun in all_ips_iqns_luns:
ip_iqn = "%s-iscsi-%s" % (ip.split(",")[0], iqn)
if ip_iqn not in entries:
ips_iqns.append([ip, iqn])
if not devices:
# disconnect if no other multipath devices
self._disconnect_mpath(connection_properties, ips_iqns)
return
# Get a target for all other multipath devices
other_iqns = self._get_multipath_iqns(devices, mpath_map)
# Get all the targets for the current multipath device
current_iqns = [iqn for ip, iqn in ips_iqns]
in_use = False
for current in current_iqns:
if current in other_iqns:
in_use = True
break
# If no other multipath device attached has the same iqn
# as the current device
if not in_use:
# disconnect if no other multipath devices with same iqn
self._disconnect_mpath(connection_properties, ips_iqns)
return
# else do not disconnect iscsi portals,
# as they are used for other luns
return
def _connect_to_iscsi_portal(self, connection_properties):
# NOTE(vish): If we are on the same host as nova volume, the
# discovery makes the target so we don't need to
@ -774,54 +768,15 @@ class ISCSIConnector(base.BaseLinuxConnector, base_iscsi.BaseISCSIConnector):
attempts=5,
delay_on_retry=True)
def _get_iscsi_devices(self):
try:
devices = list(os.walk('/dev/disk/by-path'))[0][-1]
except IndexError:
return []
# For iSCSI HBAs, look at an offset of len('pci-0000:00:00.0')
return [entry for entry in devices if (entry.startswith("ip-")
or (entry.startswith("pci-")
and
entry.find("ip-", 16, 21)
>= 16))]
def _disconnect_mpath(self, connection_properties, ips_iqns):
for ip, iqn in ips_iqns:
props = copy.deepcopy(connection_properties)
def _disconnect_connection(self, connection_properties, connections, force,
exc):
LOG.debug('Disconnecting from: %s', connections)
props = connection_properties.copy()
for ip, iqn in connections:
props['target_portal'] = ip
props['target_iqn'] = iqn
self._disconnect_from_iscsi_portal(props)
def _get_multipath_iqns(self, multipath_devices, mpath_map):
entries = self._get_iscsi_devices()
iqns = []
for entry in entries:
entry_real_path = os.path.realpath("/dev/disk/by-path/%s" % entry)
entry_multipath = mpath_map.get(entry_real_path)
if entry_multipath and entry_multipath in multipath_devices:
iqns.append(entry.split("iscsi-")[1].split("-lun")[0])
return iqns
def _get_multipath_device_map(self):
out = self._run_multipath(['-ll'], check_exit_code=[0, 1])[0]
mpath_line = [line for line in out.splitlines()
if not re.match(initiator.MULTIPATH_ERROR_REGEX, line)]
mpath_dev = None
mpath_map = {}
for line in out.splitlines():
m = initiator.MULTIPATH_DEV_CHECK_REGEX.split(line)
if len(m) >= 2:
mpath_dev = '/dev/mapper/' + m[0].split(" ")[0]
continue
m = initiator.MULTIPATH_PATH_CHECK_REGEX.split(line)
if len(m) >= 2:
mpath_map['/dev/' + m[1].split(" ")[0]] = mpath_dev
if mpath_line and not mpath_map:
LOG.warning("Failed to parse the output of multipath -ll. "
"stdout: %s", out)
return mpath_map
with exc.context(force, 'Disconnect from %s %s failed', ip, iqn):
self._disconnect_from_iscsi_portal(props)
def _run_iscsi_session(self):
(out, err) = self._run_iscsiadm_bare(('-m', 'session'),

View File

@ -62,7 +62,8 @@ class LocalConnector(base.BaseLinuxConnector):
return device_info
@utils.trace
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect a volume from the local host.
:param connection_properties: The dictionary that describes all

View File

@ -192,7 +192,8 @@ class RBDConnector(base.BaseLinuxConnector):
return {'path': rbd_handle}
@utils.trace
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect a volume.
:param connection_properties: The dictionary that describes all

View File

@ -105,7 +105,8 @@ class RemoteFsConnector(base.BaseLinuxConnector):
return {'path': path}
@utils.trace
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""No need to do anything to disconnect a volume in a filesystem.
:param connection_properties: The dictionary that describes all

View File

@ -410,7 +410,8 @@ class ScaleIOConnector(base.BaseLinuxConnector):
@utils.trace
@lockutils.synchronized('scaleio', 'scaleio-')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect the ScaleIO volume.
:param connection_properties: The dictionary that describes all

View File

@ -83,7 +83,8 @@ class SheepdogConnector(base.BaseLinuxConnector):
return {'path': sheepdog_handle}
@utils.trace
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect a volume.
:param connection_properties: The dictionary that describes all

View File

@ -237,7 +237,8 @@ class VmdkConnector(initiator_connector.InitiatorConnector):
datacenter=dc_ref)
session.wait_for_task(task)
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
tmp_file_path = device_info['path']
if not os.path.exists(tmp_file_path):
msg = _("Vmdk: %s not found.") % tmp_file_path

View File

@ -127,7 +127,8 @@ class HyperScaleConnector(base.BaseLinuxConnector):
@utils.trace
@synchronized('connect_volume')
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect a volume from an instance."""
volume_name = None

View File

@ -112,7 +112,8 @@ class InitiatorConnector(executor.Executor):
pass
@abc.abstractmethod
def disconnect_volume(self, connection_properties, device_info):
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
"""Disconnect a volume from the local host.
The connection_properties are the same as from connect_volume.
@ -123,6 +124,12 @@ class InitiatorConnector(executor.Executor):
:type connection_properties: dict
:param device_info: historical difference, but same as connection_props
:type device_info: dict
:param force: Whether to forcefully disconnect even if flush fails.
:type force: bool
:param ignore_errors: When force is True, this will decide whether to
ignore errors or raise an exception once finished
the operation. Default is False.
:type ignore_errors: bool
"""
pass

View File

@ -16,6 +16,7 @@
Note, this is not iSCSI.
"""
import glob
import os
import re
import six
@ -56,30 +57,32 @@ class LinuxSCSI(executor.Executor):
else:
return None
def remove_scsi_device(self, device):
def remove_scsi_device(self, device, force=False, exc=None):
"""Removes a scsi device based upon /dev/sdX name."""
path = "/sys/block/%s/device/delete" % device.replace("/dev/", "")
if os.path.exists(path):
exc = exception.ExceptionChainer() if exc is None else exc
# flush any outstanding IO first
self.flush_device_io(device)
with exc.context(force, 'Flushing %s failed', device):
self.flush_device_io(device)
LOG.debug("Remove SCSI device %(device)s with %(path)s",
{'device': device, 'path': path})
self.echo_scsi_command(path, "1")
with exc.context(force, 'Removing %s failed', device):
self.echo_scsi_command(path, "1")
@utils.retry(exceptions=exception.VolumePathNotRemoved, retries=3,
backoff_rate=2)
def wait_for_volume_removal(self, volume_path):
"""This is used to ensure that volumes are gone."""
LOG.debug("Checking to see if SCSI volume %s has been removed.",
volume_path)
if os.path.exists(volume_path):
LOG.debug("%(path)s still exists.", {'path': volume_path})
raise exception.VolumePathNotRemoved(
volume_path=volume_path)
else:
LOG.debug("SCSI volume %s has been removed.", volume_path)
@utils.retry(exceptions=exception.VolumePathNotRemoved)
def wait_for_volumes_removal(self, volumes_names):
"""Wait for device paths to be removed from the system."""
str_names = ', '.join(volumes_names)
LOG.debug('Checking to see if SCSI volumes %s have been removed.',
str_names)
exist = [volume_name for volume_name in volumes_names
if os.path.exists('/dev/' + volume_name)]
if exist:
LOG.debug('%s still exist.', ', '.join(exist))
raise exception.VolumePathNotRemoved(volume_path=exist)
LOG.debug("SCSI volumes %s have been removed.", str_names)
def get_device_info(self, device):
(out, _err) = self._execute('sg_scan', device, run_as_root=True,
@ -125,52 +128,92 @@ class LinuxSCSI(executor.Executor):
return True
def remove_multipath_device(self, device):
"""Removes related LUNs and multipath device
def get_dm_name(self, dm):
"""Get the Device map name given the device name of the dm on sysfs.
This removes LUNs associated with a multipath device
and the multipath device itself.
:param dm: Device map name as seen in sysfs. ie: 'dm-0'
:returns: String with the name, or empty string if not available.
ie: '36e843b658476b7ed5bc1d4d10d9b1fde'
"""
try:
with open('/sys/block/' + dm + '/dm/name') as f:
return f.read().strip()
except IOError:
return ''
LOG.debug("remove multipath device %s", device)
mpath_dev = self.find_multipath_device(device)
if mpath_dev:
self.flush_multipath_device(mpath_dev['name'])
devices = mpath_dev['devices']
LOG.debug("multipath LUNs to remove %s", devices)
for device in devices:
self.remove_scsi_device(device['device'])
def find_sysfs_multipath_dm(self, device_names):
"""Find the dm device name given a list of device names
:param device_names: Iterable with device names, not paths. ie: ['sda']
:returns: String with the dm name or None if not found. ie: 'dm-0'
"""
glob_str = '/sys/block/%s/holders/dm-*'
for dev_name in device_names:
dms = glob.glob(glob_str % dev_name)
if dms:
__, device_name, __, dm = dms[0].rsplit('/', 3)
return dm
return None
def remove_connection(self, devices_names, is_multipath, force=False,
exc=None):
"""Remove LUNs and multipath associated with devices names.
:param devices_names: Iterable with real device names ('sda', 'sdb')
:param is_multipath: Whether this is a multipath connection or not
:param force: Whether to forcefully disconnect even if flush fails.
:param exc: ExceptionChainer where to add exceptions if forcing
:returns: Multipath device map name if found and not flushed
"""
if not devices_names:
return
multipath_name = None
exc = exception.ExceptionChainer() if exc is None else exc
LOG.debug('Removing %(type)s devices %(devices)s',
{'type': 'multipathed' if is_multipath else 'single pathed',
'devices': ', '.join(devices_names)})
if is_multipath:
multipath_dm = self.find_sysfs_multipath_dm(devices_names)
multipath_name = multipath_dm and self.get_dm_name(multipath_dm)
if multipath_name:
with exc.context(force, 'Flushing %s failed', multipath_name):
self.flush_multipath_device(multipath_name)
multipath_name = None
for device_name in devices_names:
self.remove_scsi_device('/dev/' + device_name, force, exc)
# Wait until the symlinks are removed
with exc.context(force, 'Some devices remain from %s', devices_names):
self.wait_for_volumes_removal(devices_names)
return multipath_name
def flush_device_io(self, device):
"""This is used to flush any remaining IO in the buffers."""
try:
LOG.debug("Flushing IO for device %s", device)
self._execute('blockdev', '--flushbufs', device, run_as_root=True,
root_helper=self._root_helper)
except putils.ProcessExecutionError as exc:
LOG.warning("Failed to flush IO buffers prior to removing "
"device: %(code)s", {'code': exc.exit_code})
@utils.retry(exceptions=putils.ProcessExecutionError)
def flush_multipath_device(self, device):
try:
LOG.debug("Flush multipath device %s", device)
self._execute('multipath', '-f', device, run_as_root=True,
root_helper=self._root_helper)
except putils.ProcessExecutionError as exc:
if exc.exit_code == 1 and 'map in use' in exc.stdout:
LOG.debug('Multipath is in use, cannot be flushed yet.')
if os.path.exists(device):
try:
# NOTE(geguileo): With 30% connection error rates flush can get
# stuck, set timeout to prevent it from hanging here forever.
# Retry twice after 20 and 40 seconds.
LOG.debug("Flushing IO for device %s", device)
self._execute('blockdev', '--flushbufs', device,
run_as_root=True, attempts=3, timeout=300,
interval=10, root_helper=self._root_helper)
except putils.ProcessExecutionError as exc:
LOG.warning("Failed to flush IO buffers prior to removing "
"device: %(code)s", {'code': exc.exit_code})
raise
LOG.warning("multipath call failed exit %(code)s",
{'code': exc.exit_code})
def flush_multipath_devices(self):
try:
self._execute('multipath', '-F', run_as_root=True,
root_helper=self._root_helper)
except putils.ProcessExecutionError as exc:
LOG.warning("multipath call failed exit %(code)s",
{'code': exc.exit_code})
def flush_multipath_device(self, device_map_name):
LOG.debug("Flush multipath device %s", device_map_name)
# NOTE(geguileo): With 30% connection error rates flush can get stuck,
# set timeout to prevent it from hanging here forever. Retry twice
# after 20 and 40 seconds.
self._execute('multipath', '-f', device_map_name, run_as_root=True,
attempts=3, timeout=300, interval=10,
root_helper=self._root_helper)
@utils.retry(exceptions=exception.VolumeDeviceNotFound)
def wait_for_path(self, volume_path):

View File

@ -126,5 +126,6 @@ class WindowsFCConnector(win_conn_base.BaseWindowsConnector):
return mappings
@utils.trace
def disconnect_volume(self, connection_properties):
def disconnect_volume(self, connection_properties,
force=False, ignore_errors=False):
pass

View File

@ -134,7 +134,8 @@ class WindowsISCSIConnector(win_conn_base.BaseWindowsConnector,
return device_info
@utils.trace
def disconnect_volume(self, connection_properties):
def disconnect_volume(self, connection_properties,
force=False, ignore_errors=False):
# We want to refresh the cached information first.
self._diskutils.rescan_disks()
for (target_portal,

View File

@ -49,7 +49,8 @@ class WindowsSMBFSConnector(win_conn_base.BaseWindowsConnector):
return device_info
@utils.trace
def disconnect_volume(self, connection_properties):
def disconnect_volume(self, connection_properties,
force=False, ignore_errors=False):
export_path = self._get_export_path(connection_properties)
self._remotefsclient.unmount(export_path)

View File

@ -36,14 +36,126 @@ the urgency of (1)), then work on the larger refactor that addresses
"""
import signal
import six
import threading
import time
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
from oslo_utils import strutils
from os_brick import exception
from os_brick import privileged
LOG = logging.getLogger(__name__)
def custom_execute(*cmd, **kwargs):
"""Custom execute with additional functionality on top of Oslo's.
Additional features are timeouts and exponential backoff retries.
The exponential backoff retries replaces standard Oslo random sleep times
that range from 200ms to 2seconds when attempts is greater than 1, but it
is disabled if delay_on_retry is passed as a parameter.
Exponential backoff is controlled via interval and backoff_rate parameters,
just like the os_brick.utils.retry decorator.
To use the timeout mechanism to stop the subprocess with a specific signal
after a number of seconds we must pass a non-zero timeout value in the
call.
When using multiple attempts and timeout at the same time the method will
only raise the timeout exception to the caller if the last try timeouts.
Timeout mechanism is controlled with timeout, signal, and raise_timeout
parameters.
:param interval: The multiplier
:param backoff_rate: Base used for the exponential backoff
:param timeout: Timeout defined in seconds
:param signal: Signal to use to stop the process on timeout
:param raise_timeout: Raise and exception on timeout or return error as
stderr. Defaults to raising if check_exit_code is
not False.
:returns: Tuple with stdout and stderr
"""
# Since python 2 doesn't have nonlocal we use a mutable variable to store
# the previous attempt number, the timeout handler, and the process that
# timed out
shared_data = [0, None, None]
def on_timeout(proc):
sanitized_cmd = strutils.mask_password(' '.join(cmd))
LOG.warning('Stopping %(cmd)s with signal %(signal)s after %(time)ss.',
{'signal': sig_end, 'cmd': sanitized_cmd, 'time': timeout})
shared_data[2] = proc
proc.send_signal(sig_end)
def on_execute(proc):
# Call user's on_execute method
if on_execute_call:
on_execute_call(proc)
# Sleep if this is not the first try and we have a timeout interval
if shared_data[0] and interval:
exp = backoff_rate ** shared_data[0]
wait_for = max(0, interval * exp)
LOG.debug('Sleeping for %s seconds', wait_for)
time.sleep(wait_for)
# Increase the number of tries and start the timeout timer
shared_data[0] += 1
if timeout:
shared_data[2] = None
shared_data[1] = threading.Timer(timeout, on_timeout, (proc,))
shared_data[1].start()
def on_completion(proc):
# This is always called regardless of success or failure
# Cancel the timeout timer
if shared_data[1]:
shared_data[1].cancel()
# Call user's on_completion method
if on_completion_call:
on_completion_call(proc)
# We will be doing the wait ourselves in on_execute
if 'delay_on_retry' in kwargs:
interval = None
else:
kwargs['delay_on_retry'] = False
interval = kwargs.pop('interval', 1)
backoff_rate = kwargs.pop('backoff_rate', 2)
timeout = kwargs.pop('timeout', None)
sig_end = kwargs.pop('signal', signal.SIGTERM)
default_raise_timeout = kwargs.get('check_exit_code', True)
raise_timeout = kwargs.pop('raise_timeout', default_raise_timeout)
on_execute_call = kwargs.pop('on_execute', None)
on_completion_call = kwargs.pop('on_completion', None)
try:
return putils.execute(on_execute=on_execute,
on_completion=on_completion, *cmd, **kwargs)
except putils.ProcessExecutionError:
# proc is only stored if a timeout happened
proc = shared_data[2]
if proc:
sanitized_cmd = strutils.mask_password(' '.join(cmd))
msg = ('Time out on proc %(pid)s after waiting %(time)s seconds '
'when running %(cmd)s' %
{'pid': proc.pid, 'time': timeout, 'cmd': sanitized_cmd})
LOG.debug(msg)
if raise_timeout:
raise exception.ExecutionTimeout(stdout='', stderr=msg,
cmd=sanitized_cmd)
return '', msg
raise
# Entrypoint used for rootwrap.py transition code. Don't use this for
# other purposes, since it will be removed when we think the
# transition is finished.
@ -51,12 +163,11 @@ def execute(*cmd, **kwargs):
"""NB: Raises processutils.ProcessExecutionError on failure."""
run_as_root = kwargs.pop('run_as_root', False)
kwargs.pop('root_helper', None)
try:
if run_as_root:
return execute_root(*cmd, **kwargs)
else:
return putils.execute(*cmd, **kwargs)
return custom_execute(*cmd, **kwargs)
except OSError as e:
# Note:
# putils.execute('bogus', run_as_root=True)
@ -79,4 +190,4 @@ def execute(*cmd, **kwargs):
@privileged.default.entrypoint
def execute_root(*cmd, **kwargs):
"""NB: Raises processutils.ProcessExecutionError/OSError on failure."""
return putils.execute(*cmd, shell=False, run_as_root=False, **kwargs)
return custom_execute(*cmd, shell=False, run_as_root=False, **kwargs)

View File

@ -11,12 +11,14 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import collections
import glob
import mock
import os
import testtools
import time
import ddt
from oslo_concurrency import processutils as putils
from os_brick import exception
@ -28,18 +30,117 @@ from os_brick.privileged import rootwrap as priv_rootwrap
from os_brick.tests.initiator import test_connector
@ddt.ddt
class ISCSIConnectorTestCase(test_connector.ConnectorTestCase):
CON_PROPS = {
'volume_id': 'vol_id',
'target_portal': 'ip1:port1',
'target_iqn': 'tgt1',
'target_lun': 4,
'target_portals': ['ip1:port1', 'ip2:port2', 'ip3:port3',
'ip4:port4'],
'target_iqns': ['tgt1', 'tgt2', 'tgt3', 'tgt4'],
'target_luns': [4, 5, 6, 7],
}
def setUp(self):
super(ISCSIConnectorTestCase, self).setUp()
self.connector = iscsi.ISCSIConnector(
None, execute=self.fake_execute, use_multipath=False)
self.connector_with_multipath = iscsi.ISCSIConnector(
None, execute=self.fake_execute, use_multipath=True)
self.mock_object(self.connector._linuxscsi, 'get_name_from_path',
return_value="/dev/sdb")
self._fake_iqn = 'iqn.1234-56.foo.bar:01:23456789abc'
self._name = 'volume-00000001'
self._iqn = 'iqn.2010-10.org.openstack:%s' % self._name
self._location = '10.0.2.15:3260'
self._lun = 1
@mock.patch.object(iscsi.ISCSIConnector, '_run_iscsi_session')
def test_get_iscsi_sessions_full(self, sessions_mock):
iscsiadm_result = ('tcp: [session1] ip1:port1,1 tgt1 (non-flash)\n'
'tcp: [session2] ip2:port2,-1 tgt2 (non-flash)\n'
'tcp: [session3] ip3:port3,1 tgt3\n')
sessions_mock.return_value = (iscsiadm_result, '')
res = self.connector._get_iscsi_sessions_full()
expected = [('tcp:', 'session1', 'ip1:port1', '1', 'tgt1'),
('tcp:', 'session2', 'ip2:port2', '-1', 'tgt2'),
('tcp:', 'session3', 'ip3:port3', '1', 'tgt3')]
self.assertListEqual(expected, res)
@mock.patch.object(iscsi.ISCSIConnector, '_run_iscsi_session',
return_value=(None, 'error'))
def test_get_iscsi_sessions_full_error(self, sessions_mock):
res = self.connector._get_iscsi_sessions_full()
self.assertEqual([], res)
sessions_mock.assert_called()
@mock.patch.object(iscsi.ISCSIConnector, '_get_iscsi_sessions_full')
def test_get_iscsi_sessions(self, sessions_mock):
sessions_mock.return_value = [
('tcp:', 'session1', 'ip1:port1', '1', 'tgt1'),
('tcp:', 'session2', 'ip2:port2', '-1', 'tgt2'),
('tcp:', 'session3', 'ip3:port3', '1', 'tgt3')]
res = self.connector._get_iscsi_sessions()
expected = ['ip1:port1', 'ip2:port2', 'ip3:port3']
self.assertListEqual(expected, res)
@mock.patch.object(iscsi.ISCSIConnector, '_get_iscsi_sessions_full',
return_value=[])
def test_get_iscsi_sessions_no_sessions(self, sessions_mock):
res = self.connector._get_iscsi_sessions()
self.assertListEqual([], res)
sessions_mock.assert_called()
@mock.patch.object(iscsi.ISCSIConnector, '_execute')
def test_get_iscsi_nodes(self, exec_mock):
iscsiadm_result = ('ip1:port1,1 tgt1\nip2:port2,-1 tgt2\n'
'ip3:port3,1 tgt3\n')
exec_mock.return_value = (iscsiadm_result, '')
res = self.connector._get_iscsi_nodes()
expected = [('ip1:port1', 'tgt1'), ('ip2:port2', 'tgt2'),
('ip3:port3', 'tgt3')]
self.assertListEqual(expected, res)
exec_mock.assert_called_once_with(
'iscsiadm', '-m', 'node', run_as_root=True,
root_helper=self.connector._root_helper, check_exit_code=False)
@mock.patch.object(iscsi.ISCSIConnector, '_execute')
def test_get_iscsi_nodes_error(self, exec_mock):
exec_mock.return_value = (None, 'error')
res = self.connector._get_iscsi_nodes()
self.assertEqual([], res)
@mock.patch('glob.glob')
@mock.patch.object(iscsi.ISCSIConnector, '_get_iscsi_sessions_full')
@mock.patch.object(iscsi.ISCSIConnector, '_get_iscsi_nodes')
def test_get_connection_devices(self, nodes_mock, sessions_mock,
glob_mock):
# List sessions from other targets and non tcp sessions
sessions_mock.return_value = [
('non-tcp:', '0', 'ip1:port1', '1', 'tgt1'),
('tcp:', '1', 'ip1:port1', '1', 'tgt1'),
('tcp:', '2', 'ip2:port2', '-1', 'tgt2'),
('tcp:', '3', 'ip1:port1', '1', 'tgt4'),
('tcp:', '4', 'ip2:port2', '-1', 'tgt5')]
# List 1 node without sessions
nodes_mock.return_value = [('ip1:port1', 'tgt1'),
('ip2:port2', 'tgt2'),
('ip3:port3', 'tgt3')]
sys_cls = '/sys/class/scsi_host/host'
glob_mock.side_effect = [
[sys_cls + '1/device/session1/target6/1:2:6:4/block/sda',
sys_cls + '1/device/session1/target6/1:2:6:4/block/sda1'],
[sys_cls + '2/device/session2/target7/2:2:7:5/block/sdb',
sys_cls + '2/device/session2/target7/2:2:7:4/block/sdc'],
]
res = self.connector._get_connection_devices(self.CON_PROPS)
expected = {('ip1:port1', 'tgt1'): ({'sda'}, set()),
('ip2:port2', 'tgt2'): ({'sdb'}, {'sdc'}),
('ip3:port3', 'tgt3'): (set(), set())}
self.assertDictEqual(expected, res)
def generate_device(self, location, iqn, transport=None, lun=1):
dev_format = "ip-%s-iscsi-%s-lun-%s" % (location, iqn, lun)
@ -236,70 +337,45 @@ class ISCSIConnectorTestCase(test_connector.ConnectorTestCase):
self.assertEqual(expected_result, result)
@mock.patch('time.sleep', mock.Mock())
@mock.patch.object(iscsi.ISCSIConnector, 'disconnect_volume')
def _test_connect_volume(self, extra_props, additional_commands,
transport=None, disconnect_mock=None):
disconnect_vol_mock, transport=None):
# for making sure the /dev/disk/by-path is gone
exists_mock = mock.Mock()
exists_mock.return_value = True
os.path.exists = exists_mock
location = '10.0.2.15:3260'
name = 'volume-00000001'
iqn = 'iqn.2010-10.org.openstack:%s' % name
vol = {'id': 1, 'name': name}
connection_info = self.iscsi_connection(vol, location, iqn)
vol = {'id': 1, 'name': self._name}
connection_info = self.iscsi_connection(vol, self._location, self._iqn)
for key, value in extra_props.items():
connection_info['data'][key] = value
if transport is not None:
dev_list = self.generate_device(location, iqn, transport)
dev_list = self.generate_device(self._location, self._iqn,
transport)
with mock.patch.object(glob, 'glob', return_value=[dev_list]):
device = self.connector.connect_volume(connection_info['data'])
else:
device = self.connector.connect_volume(connection_info['data'])
dev_str = self.generate_device(location, iqn, transport)
dev_str = self.generate_device(self._location, self._iqn, transport)
self.assertEqual(device['type'], 'block')
self.assertEqual(device['path'], dev_str)
self.count = 0
def mock_exists_effect(*args, **kwargs):
self.count = self.count + 1
if self.count == 4:
return False
else:
return True
# Disconnect has its own tests, should not be tested here
expected_commands = [
('iscsiadm -m node -T %s -p %s' % (self._iqn, self._location)),
('iscsiadm -m session'),
('iscsiadm -m node -T %s -p %s --login' % (self._iqn,
self._location)),
('iscsiadm -m node -T %s -p %s --op update'
' -n node.startup -v automatic' % (self._iqn,
self._location)),
('/lib/udev/scsi_id --page 0x83 --whitelisted %s' % dev_str),