Merge "mypy: Update format to future __annotations__"

This commit is contained in:
Zuul 2022-08-23 19:15:18 +00:00 committed by Gerrit Code Review
commit 4acfd6bc7e
8 changed files with 55 additions and 44 deletions

View File

@ -14,8 +14,10 @@
"""Exceptions for the Brick library.""" """Exceptions for the Brick library."""
from __future__ import annotations
import traceback import traceback
from typing import Any, List, Optional # noqa: H301 from typing import Any, Optional # noqa: H301
from oslo_concurrency import processutils as putils from oslo_concurrency import processutils as putils
from oslo_log import log as logging from oslo_log import log as logging
@ -184,7 +186,7 @@ class ExceptionChainer(BrickException):
logged with warning level. logged with warning level.
""" """
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._exceptions: List[tuple] = [] self._exceptions: list[tuple] = []
self._repr: Optional[str] = None self._repr: Optional[str] = None
self._exc_msg_args = [] self._exc_msg_args = []
super(ExceptionChainer, self).__init__(*args, **kwargs) super(ExceptionChainer, self).__init__(*args, **kwargs)

View File

@ -18,8 +18,10 @@
and root_helper settings, so this provides that hook. and root_helper settings, so this provides that hook.
""" """
from __future__ import annotations
import threading import threading
from typing import Callable, Tuple # noqa: H301 from typing import Callable
from oslo_concurrency import processutils as putils from oslo_concurrency import processutils as putils
from oslo_context import context as context_utils from oslo_context import context as context_utils
@ -48,7 +50,7 @@ class Executor(object):
if value: if value:
setattr(exc, field, cls.safe_decode(value)) setattr(exc, field, cls.safe_decode(value))
def _execute(self, *args, **kwargs) -> Tuple[str, str]: def _execute(self, *args, **kwargs) -> tuple[str, str]:
try: try:
result = self.__execute(*args, **kwargs) result = self.__execute(*args, **kwargs)
if result: if result:

View File

@ -12,12 +12,14 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import annotations
import functools import functools
import glob import glob
import os import os
import typing import typing
from typing import Optional, Tuple # noqa: H301 from typing import Optional
from oslo_concurrency import lockutils from oslo_concurrency import lockutils
from oslo_concurrency import processutils as putils from oslo_concurrency import processutils as putils
@ -155,7 +157,7 @@ class BaseLinuxConnector(initiator_connector.InitiatorConnector):
def _discover_mpath_device(self, def _discover_mpath_device(self,
device_wwn: str, device_wwn: str,
connection_properties: dict, connection_properties: dict,
device_name: str) -> Tuple[str, str]: device_name: str) -> tuple[str, str]:
"""This method discovers a multipath device. """This method discovers a multipath device.
Discover a multipath device based on a defined connection_property Discover a multipath device based on a defined connection_property

View File

@ -15,7 +15,7 @@
from __future__ import annotations from __future__ import annotations
import copy import copy
from typing import Any, Dict, Generator # noqa: H301 from typing import Any, Generator # noqa: H301
from os_brick.initiator import initiator_connector from os_brick.initiator import initiator_connector
@ -23,7 +23,7 @@ from os_brick.initiator import initiator_connector
class BaseISCSIConnector(initiator_connector.InitiatorConnector): class BaseISCSIConnector(initiator_connector.InitiatorConnector):
def _iterate_all_targets( def _iterate_all_targets(
self, self,
connection_properties: dict) -> Generator[Dict[str, Any], connection_properties: dict) -> Generator[dict[str, Any],
None, None]: None, None]:
for portal, iqn, lun in self._get_all_targets(connection_properties): for portal, iqn, lun in self._get_all_targets(connection_properties):
props = copy.deepcopy(connection_properties) props = copy.deepcopy(connection_properties)

View File

@ -13,14 +13,15 @@
# Look in the NVMeOFConnProps class docstring to see the format of the NVMe-oF # Look in the NVMeOFConnProps class docstring to see the format of the NVMe-oF
# connection properties # connection properties
from __future__ import annotations
import errno import errno
import functools import functools
import glob import glob
import json import json
import os.path import os.path
import time import time
from typing import (Callable, Dict, List, Optional, Sequence, # noqa: H301 from typing import (Callable, Optional, Sequence, Type, Union) # noqa: H301
Tuple, Type, Union) # noqa: H301
import uuid as uuid_lib import uuid as uuid_lib
@ -206,7 +207,7 @@ class Portal(object):
'"devices_on_start"') '"devices_on_start"')
return target.get_device_path_by_initial_devices() return target.get_device_path_by_initial_devices()
def get_all_namespaces_ctrl_paths(self) -> List[str]: def get_all_namespaces_ctrl_paths(self) -> list[str]:
"""Return all nvme sysfs control paths for this portal. """Return all nvme sysfs control paths for this portal.
The basename of the path can be single volume or a channel to an ANA The basename of the path can be single volume or a channel to an ANA
@ -295,7 +296,7 @@ class Target(object):
def factory(cls: Type['Target'], def factory(cls: Type['Target'],
source_conn_props: 'NVMeOFConnProps', source_conn_props: 'NVMeOFConnProps',
target_nqn: str, target_nqn: str,
portals: List[str], portals: list[str],
vol_uuid: Optional[str] = None, vol_uuid: Optional[str] = None,
volume_nguid: Optional[str] = None, volume_nguid: Optional[str] = None,
ns_id: Optional[str] = None, ns_id: Optional[str] = None,
@ -314,7 +315,7 @@ class Target(object):
def __init__(self, def __init__(self,
source_conn_props: 'NVMeOFConnProps', source_conn_props: 'NVMeOFConnProps',
nqn: str, nqn: str,
portals: List[str], portals: list[str],
uuid: Optional[str] = None, uuid: Optional[str] = None,
nguid: Optional[str] = None, nguid: Optional[str] = None,
ns_id: Optional[str] = None, ns_id: Optional[str] = None,
@ -347,13 +348,13 @@ class Target(object):
LOG.debug('Devices on start are: %s', self.devices_on_start) LOG.debug('Devices on start are: %s', self.devices_on_start)
@staticmethod @staticmethod
def _get_nvme_devices() -> List[str]: def _get_nvme_devices() -> list[str]:
"""Get all NVMe devices present in the system.""" """Get all NVMe devices present in the system."""
pattern = '/dev/nvme*n*' # e.g. /dev/nvme10n10 pattern = '/dev/nvme*n*' # e.g. /dev/nvme10n10
return glob.glob(pattern) return glob.glob(pattern)
@property @property
def live_portals(self) -> List[Portal]: def live_portals(self) -> list[Portal]:
"""Get live portals. """Get live portals.
Must have called set_portals_controllers first since portals without a Must have called set_portals_controllers first since portals without a
@ -362,7 +363,7 @@ class Target(object):
return [p for p in self.portals if p.is_live] return [p for p in self.portals if p.is_live]
@property @property
def present_portals(self) -> List[Portal]: def present_portals(self) -> list[Portal]:
"""Get present portals. """Get present portals.
Must have called set_portals_controllers first since portals without a Must have called set_portals_controllers first since portals without a
@ -383,13 +384,13 @@ class Target(object):
# List of portal addresses and transports for this target # List of portal addresses and transports for this target
# Unlike "nvme list-subsys -o json" sysfs addr is separated by a comma # Unlike "nvme list-subsys -o json" sysfs addr is separated by a comma
sysfs_portals: List[Tuple[Optional[str], sysfs_portals: list[tuple[Optional[str],
Optional[str], Optional[str],
Optional[Union[str, utils.Anything]]]] = [ Optional[Union[str, utils.Anything]]]] = [
(f'traddr={p.address},trsvcid={p.port}', p.transport, hostnqn) (f'traddr={p.address},trsvcid={p.port}', p.transport, hostnqn)
for p in self.portals for p in self.portals
] ]
known_names: List[str] = [p.controller for p in self.portals known_names: list[str] = [p.controller for p in self.portals
if p.controller] if p.controller]
warned = False warned = False
@ -434,7 +435,7 @@ class Target(object):
if len(known_names) == len(sysfs_portals): if len(known_names) == len(sysfs_portals):
return return
def get_devices(self, only_live=False, get_one=False) -> List[str]: def get_devices(self, only_live=False, get_one=False) -> list[str]:
"""Return devices for this target """Return devices for this target
Optionally only return devices from portals that are live and also Optionally only return devices from portals that are live and also
@ -610,7 +611,7 @@ class NVMeOFConnProps(object):
replica_count = None replica_count = None
cinder_volume_id: Optional[str] = None cinder_volume_id: Optional[str] = None
def __init__(self, conn_props: Dict, def __init__(self, conn_props: dict,
find_controllers: bool = False) -> None: find_controllers: bool = False) -> None:
# Generic connection properties fields used by Nova # Generic connection properties fields used by Nova
self.qos_specs = conn_props.get('qos_specs') self.qos_specs = conn_props.get('qos_specs')
@ -649,7 +650,7 @@ class NVMeOFConnProps(object):
# Below fields may have been added by nova # Below fields may have been added by nova
self.device_path = conn_props.get('device_path') self.device_path = conn_props.get('device_path')
def get_devices(self, only_live: bool = False) -> List[str]: def get_devices(self, only_live: bool = False) -> list[str]:
"""Get all device paths present in the system for all targets.""" """Get all device paths present in the system for all targets."""
result = [] result = []
for target in self.targets: for target in self.targets:
@ -707,7 +708,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
def get_volume_paths( def get_volume_paths(
self, self,
connection_properties: NVMeOFConnProps, connection_properties: NVMeOFConnProps,
device_info: Optional[Dict[str, str]] = None) -> List[str]: device_info: Optional[dict[str, str]] = None) -> list[str]:
"""Return paths where the volume is present.""" """Return paths where the volume is present."""
# Priority is on the value returned by connect_volume method # Priority is on the value returned by connect_volume method
if device_info and device_info.get('path'): if device_info and device_info.get('path'):
@ -759,7 +760,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return False return False
@classmethod @classmethod
def get_connector_properties(cls, root_helper, *args, **kwargs) -> Dict: def get_connector_properties(cls, root_helper, *args, **kwargs) -> dict:
"""The NVMe-oF connector properties (initiator uuid and nqn.)""" """The NVMe-oF connector properties (initiator uuid and nqn.)"""
execute = kwargs.get('execute') or priv_rootwrap.execute execute = kwargs.get('execute') or priv_rootwrap.execute
nvmf = NVMeOFConnector(root_helper=root_helper, execute=execute) nvmf = NVMeOFConnector(root_helper=root_helper, execute=execute)
@ -843,7 +844,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
@base.synchronized('connect_volume', external=True) @base.synchronized('connect_volume', external=True)
@NVMeOFConnProps.from_dictionary_parameter @NVMeOFConnProps.from_dictionary_parameter
def connect_volume( def connect_volume(
self, connection_properties: NVMeOFConnProps) -> Dict[str, str]: self, connection_properties: NVMeOFConnProps) -> dict[str, str]:
"""Attach and discover the volume.""" """Attach and discover the volume."""
try: try:
if connection_properties.is_replicated is False: if connection_properties.is_replicated is False:
@ -978,7 +979,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return device_path return device_path
def _handle_replicated_volume(self, def _handle_replicated_volume(self,
host_device_paths: List[str], host_device_paths: list[str],
conn_props: NVMeOFConnProps) -> str: conn_props: NVMeOFConnProps) -> str:
"""Assemble the raid from found devices.""" """Assemble the raid from found devices."""
path_in_raid = False path_in_raid = False
@ -1004,7 +1005,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return device_path return device_path
def _handle_single_replica(self, def _handle_single_replica(self,
host_device_paths: List[str], host_device_paths: list[str],
volume_alias: str) -> str: volume_alias: str) -> str:
"""Assemble the raid from a single device.""" """Assemble the raid from a single device."""
if self._is_raid_device(host_device_paths[0]): if self._is_raid_device(host_device_paths[0]):
@ -1019,8 +1020,8 @@ class NVMeOFConnector(base.BaseLinuxConnector):
@base.synchronized('connect_volume', external=True) @base.synchronized('connect_volume', external=True)
@utils.connect_volume_undo_prepare_result(unlink_after=True) @utils.connect_volume_undo_prepare_result(unlink_after=True)
def disconnect_volume(self, def disconnect_volume(self,
connection_properties: Dict, connection_properties: dict,
device_info: Dict[str, str], device_info: dict[str, str],
force: bool = False, force: bool = False,
ignore_errors: bool = False) -> None: ignore_errors: bool = False) -> None:
"""Flush the volume. """Flush the volume.
@ -1115,7 +1116,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
# ####### Extend methods ######## # ####### Extend methods ########
@staticmethod @staticmethod
def _get_sizes_from_lba(ns_data: Dict) -> Tuple[Optional[int], def _get_sizes_from_lba(ns_data: dict) -> tuple[Optional[int],
Optional[int]]: Optional[int]]:
"""Return size in bytes and the nsze of the volume from NVMe NS data. """Return size in bytes and the nsze of the volume from NVMe NS data.
@ -1143,7 +1144,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
@utils.trace @utils.trace
@base.synchronized('extend_volume', external=True) @base.synchronized('extend_volume', external=True)
@utils.connect_volume_undo_prepare_result @utils.connect_volume_undo_prepare_result
def extend_volume(self, connection_properties: Dict[str, str]) -> int: def extend_volume(self, connection_properties: dict[str, str]) -> int:
"""Update an attached volume to reflect the current size after extend """Update an attached volume to reflect the current size after extend
The only way to reflect the new size of an NVMe-oF volume on the host The only way to reflect the new size of an NVMe-oF volume on the host
@ -1286,7 +1287,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return None return None
def stop_and_assemble_raid(self, def stop_and_assemble_raid(self,
drives: List[str], drives: list[str],
md_path: str, md_path: str,
read_only: bool) -> None: read_only: bool) -> None:
md_name = None md_name = None
@ -1317,7 +1318,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
i += 1 i += 1
def assemble_raid(self, def assemble_raid(self,
drives: List[str], drives: list[str],
md_path: str, md_path: str,
read_only: bool) -> bool: read_only: bool) -> bool:
cmd = ['mdadm', '--assemble', '--run', md_path] cmd = ['mdadm', '--assemble', '--run', md_path]
@ -1337,7 +1338,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return True return True
def create_raid(self, def create_raid(self,
drives: List[str], drives: list[str],
raid_type: str, raid_type: str,
device_name: str, device_name: str,
name: str, name: str,
@ -1452,7 +1453,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
def run_nvme_cli(self, def run_nvme_cli(self,
nvme_command: Sequence[str], nvme_command: Sequence[str],
**kwargs) -> Tuple[str, str]: **kwargs) -> tuple[str, str]:
"""Run an nvme cli command and return stdout and stderr output.""" """Run an nvme cli command and return stdout and stderr output."""
(out, err) = self._execute('nvme', *nvme_command, run_as_root=True, (out, err) = self._execute('nvme', *nvme_command, run_as_root=True,
root_helper=self._root_helper, root_helper=self._root_helper,

View File

@ -13,14 +13,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from __future__ import annotations
import errno import errno
import os import os
from typing import List
class HostDriver(object): class HostDriver(object):
def get_all_block_devices(self) -> List[str]: def get_all_block_devices(self) -> list[str]:
"""Get the list of all block devices seen in /dev/disk/by-path/.""" """Get the list of all block devices seen in /dev/disk/by-path/."""
dir = "/dev/disk/by-path/" dir = "/dev/disk/by-path/"
try: try:

View File

@ -18,7 +18,7 @@ from __future__ import annotations
import glob import glob
import os import os
from typing import Dict, Iterable, List # noqa: H301 from typing import Iterable
from oslo_concurrency import processutils as putils from oslo_concurrency import processutils as putils
from oslo_log import log as logging from oslo_log import log as logging
@ -166,7 +166,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
{'hp': hostpath, 'exc': exc}) {'hp': hostpath, 'exc': exc})
return hbas return hbas
def get_fc_hbas_info(self) -> List[Dict[str, str]]: def get_fc_hbas_info(self) -> list[dict[str, str]]:
"""Get Fibre Channel WWNs and device paths from the system, if any.""" """Get Fibre Channel WWNs and device paths from the system, if any."""
hbas = self.get_fc_hbas() hbas = self.get_fc_hbas()
@ -182,7 +182,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
'device_path': device_path}) 'device_path': device_path})
return hbas_info return hbas_info
def get_fc_wwpns(self) -> List[str]: def get_fc_wwpns(self) -> list[str]:
"""Get Fibre Channel WWPNs from the system, if any.""" """Get Fibre Channel WWPNs from the system, if any."""
hbas = self.get_fc_hbas() hbas = self.get_fc_hbas()
@ -194,7 +194,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
return wwpns return wwpns
def get_fc_wwnns(self) -> List[str]: def get_fc_wwnns(self) -> list[str]:
"""Get Fibre Channel WWNNs from the system, if any.""" """Get Fibre Channel WWNNs from the system, if any."""
hbas = self.get_fc_hbas() hbas = self.get_fc_hbas()

View File

@ -16,11 +16,14 @@
Note, this is not iSCSI. Note, this is not iSCSI.
""" """
from __future__ import annotations
import glob import glob
import os import os
import re import re
import time import time
from typing import Dict, List, Optional # noqa: H301 from typing import Optional
from oslo_concurrency import processutils as putils from oslo_concurrency import processutils as putils
from oslo_log import log as logging from oslo_log import log as logging
@ -82,7 +85,7 @@ class LinuxSCSI(executor.Executor):
with exc.context(force, 'Removing %s failed', device): with exc.context(force, 'Removing %s failed', device):
self.echo_scsi_command(path, "1") self.echo_scsi_command(path, "1")
def wait_for_volumes_removal(self, volumes_names: List[str]) -> None: def wait_for_volumes_removal(self, volumes_names: list[str]) -> None:
"""Wait for device paths to be removed from the system.""" """Wait for device paths to be removed from the system."""
str_names = ', '.join(volumes_names) str_names = ', '.join(volumes_names)
LOG.debug('Checking to see if SCSI volumes %s have been removed.', LOG.debug('Checking to see if SCSI volumes %s have been removed.',
@ -105,7 +108,7 @@ class LinuxSCSI(executor.Executor):
LOG.debug('%s still exist.', ', '.join(exist)) LOG.debug('%s still exist.', ', '.join(exist))
raise exception.VolumePathNotRemoved(volume_path=exist) raise exception.VolumePathNotRemoved(volume_path=exist)
def get_device_info(self, device: str) -> Dict[str, Optional[str]]: def get_device_info(self, device: str) -> dict[str, Optional[str]]:
dev_info = {'device': device, 'host': None, dev_info = {'device': device, 'host': None,
'channel': None, 'id': None, 'lun': None} 'channel': None, 'id': None, 'lun': None}
# The input argument 'device' can be of 2 types: # The input argument 'device' can be of 2 types:
@ -132,7 +135,7 @@ class LinuxSCSI(executor.Executor):
LOG.debug('dev_info=%s', str(dev_info)) LOG.debug('dev_info=%s', str(dev_info))
return dev_info return dev_info
def get_sysfs_wwn(self, device_names: List[str], mpath=None) -> str: def get_sysfs_wwn(self, device_names: list[str], mpath=None) -> str:
"""Return the wwid from sysfs in any of devices in udev format.""" """Return the wwid from sysfs in any of devices in udev format."""
# If we have a multipath DM we know that it has found the WWN # If we have a multipath DM we know that it has found the WWN
if mpath: if mpath: