From 6de226ed18b0f4e704dc12eefdf2adf6aecadae5 Mon Sep 17 00:00:00 2001 From: Eric Harney Date: Fri, 19 Aug 2022 11:32:52 -0400 Subject: [PATCH] mypy: Update format to future __annotations__ Use dict[], list[] etc. instead of Dict[], List[]. This works on all versions of Python that we support. Change-Id: If9c737277c041ffd2d3bca1863d300ae7d9bd52e --- os_brick/exception.py | 6 ++- os_brick/executor.py | 6 ++- os_brick/initiator/connectors/base.py | 6 ++- os_brick/initiator/connectors/base_iscsi.py | 4 +- os_brick/initiator/connectors/nvmeof.py | 53 +++++++++++---------- os_brick/initiator/host_driver.py | 5 +- os_brick/initiator/linuxfc.py | 8 ++-- os_brick/initiator/linuxscsi.py | 11 +++-- 8 files changed, 55 insertions(+), 44 deletions(-) diff --git a/os_brick/exception.py b/os_brick/exception.py index 3bcb0db30..87367e423 100644 --- a/os_brick/exception.py +++ b/os_brick/exception.py @@ -14,8 +14,10 @@ """Exceptions for the Brick library.""" +from __future__ import annotations + import traceback -from typing import Any, List, Optional # noqa: H301 +from typing import Any, Optional # noqa: H301 from oslo_concurrency import processutils as putils from oslo_log import log as logging @@ -184,7 +186,7 @@ class ExceptionChainer(BrickException): logged with warning level. """ def __init__(self, *args, **kwargs): - self._exceptions: List[tuple] = [] + self._exceptions: list[tuple] = [] self._repr: Optional[str] = None self._exc_msg_args = [] super(ExceptionChainer, self).__init__(*args, **kwargs) diff --git a/os_brick/executor.py b/os_brick/executor.py index 7a003cada..bfaab5234 100644 --- a/os_brick/executor.py +++ b/os_brick/executor.py @@ -18,8 +18,10 @@ and root_helper settings, so this provides that hook. """ +from __future__ import annotations + import threading -from typing import Callable, Tuple # noqa: H301 +from typing import Callable from oslo_concurrency import processutils as putils from oslo_context import context as context_utils @@ -48,7 +50,7 @@ class Executor(object): if value: setattr(exc, field, cls.safe_decode(value)) - def _execute(self, *args, **kwargs) -> Tuple[str, str]: + def _execute(self, *args, **kwargs) -> tuple[str, str]: try: result = self.__execute(*args, **kwargs) if result: diff --git a/os_brick/initiator/connectors/base.py b/os_brick/initiator/connectors/base.py index 09e89dca8..1c856db6e 100644 --- a/os_brick/initiator/connectors/base.py +++ b/os_brick/initiator/connectors/base.py @@ -12,12 +12,14 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + import functools import glob import os import typing -from typing import Optional, Tuple # noqa: H301 +from typing import Optional from oslo_concurrency import lockutils from oslo_concurrency import processutils as putils @@ -155,7 +157,7 @@ class BaseLinuxConnector(initiator_connector.InitiatorConnector): def _discover_mpath_device(self, device_wwn: str, connection_properties: dict, - device_name: str) -> Tuple[str, str]: + device_name: str) -> tuple[str, str]: """This method discovers a multipath device. Discover a multipath device based on a defined connection_property diff --git a/os_brick/initiator/connectors/base_iscsi.py b/os_brick/initiator/connectors/base_iscsi.py index 733e6ab45..f20dcdf37 100644 --- a/os_brick/initiator/connectors/base_iscsi.py +++ b/os_brick/initiator/connectors/base_iscsi.py @@ -15,7 +15,7 @@ from __future__ import annotations import copy -from typing import Any, Dict, Generator # noqa: H301 +from typing import Any, Generator # noqa: H301 from os_brick.initiator import initiator_connector @@ -23,7 +23,7 @@ from os_brick.initiator import initiator_connector class BaseISCSIConnector(initiator_connector.InitiatorConnector): def _iterate_all_targets( self, - connection_properties: dict) -> Generator[Dict[str, Any], + connection_properties: dict) -> Generator[dict[str, Any], None, None]: for portal, iqn, lun in self._get_all_targets(connection_properties): props = copy.deepcopy(connection_properties) diff --git a/os_brick/initiator/connectors/nvmeof.py b/os_brick/initiator/connectors/nvmeof.py index 3234e9cbc..d10c548c1 100644 --- a/os_brick/initiator/connectors/nvmeof.py +++ b/os_brick/initiator/connectors/nvmeof.py @@ -13,14 +13,15 @@ # Look in the NVMeOFConnProps class docstring to see the format of the NVMe-oF # connection properties +from __future__ import annotations + import errno import functools import glob import json import os.path import time -from typing import (Callable, Dict, List, Optional, Sequence, # noqa: H301 - Tuple, Type, Union) # noqa: H301 +from typing import (Callable, Optional, Sequence, Type, Union) # noqa: H301 import uuid as uuid_lib @@ -206,7 +207,7 @@ class Portal(object): '"devices_on_start"') return target.get_device_path_by_initial_devices() - def get_all_namespaces_ctrl_paths(self) -> List[str]: + def get_all_namespaces_ctrl_paths(self) -> list[str]: """Return all nvme sysfs control paths for this portal. The basename of the path can be single volume or a channel to an ANA @@ -295,7 +296,7 @@ class Target(object): def factory(cls: Type['Target'], source_conn_props: 'NVMeOFConnProps', target_nqn: str, - portals: List[str], + portals: list[str], vol_uuid: Optional[str] = None, volume_nguid: Optional[str] = None, ns_id: Optional[str] = None, @@ -314,7 +315,7 @@ class Target(object): def __init__(self, source_conn_props: 'NVMeOFConnProps', nqn: str, - portals: List[str], + portals: list[str], uuid: Optional[str] = None, nguid: Optional[str] = None, ns_id: Optional[str] = None, @@ -347,13 +348,13 @@ class Target(object): LOG.debug('Devices on start are: %s', self.devices_on_start) @staticmethod - def _get_nvme_devices() -> List[str]: + def _get_nvme_devices() -> list[str]: """Get all NVMe devices present in the system.""" pattern = '/dev/nvme*n*' # e.g. /dev/nvme10n10 return glob.glob(pattern) @property - def live_portals(self) -> List[Portal]: + def live_portals(self) -> list[Portal]: """Get live portals. Must have called set_portals_controllers first since portals without a @@ -362,7 +363,7 @@ class Target(object): return [p for p in self.portals if p.is_live] @property - def present_portals(self) -> List[Portal]: + def present_portals(self) -> list[Portal]: """Get present portals. Must have called set_portals_controllers first since portals without a @@ -383,13 +384,13 @@ class Target(object): # List of portal addresses and transports for this target # Unlike "nvme list-subsys -o json" sysfs addr is separated by a comma - sysfs_portals: List[Tuple[Optional[str], + sysfs_portals: list[tuple[Optional[str], Optional[str], Optional[Union[str, utils.Anything]]]] = [ (f'traddr={p.address},trsvcid={p.port}', p.transport, hostnqn) for p in self.portals ] - known_names: List[str] = [p.controller for p in self.portals + known_names: list[str] = [p.controller for p in self.portals if p.controller] warned = False @@ -434,7 +435,7 @@ class Target(object): if len(known_names) == len(sysfs_portals): return - def get_devices(self, only_live=False, get_one=False) -> List[str]: + def get_devices(self, only_live=False, get_one=False) -> list[str]: """Return devices for this target Optionally only return devices from portals that are live and also @@ -610,7 +611,7 @@ class NVMeOFConnProps(object): replica_count = None cinder_volume_id: Optional[str] = None - def __init__(self, conn_props: Dict, + def __init__(self, conn_props: dict, find_controllers: bool = False) -> None: # Generic connection properties fields used by Nova self.qos_specs = conn_props.get('qos_specs') @@ -649,7 +650,7 @@ class NVMeOFConnProps(object): # Below fields may have been added by nova self.device_path = conn_props.get('device_path') - def get_devices(self, only_live: bool = False) -> List[str]: + def get_devices(self, only_live: bool = False) -> list[str]: """Get all device paths present in the system for all targets.""" result = [] for target in self.targets: @@ -707,7 +708,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): def get_volume_paths( self, connection_properties: NVMeOFConnProps, - device_info: Optional[Dict[str, str]] = None) -> List[str]: + device_info: Optional[dict[str, str]] = None) -> list[str]: """Return paths where the volume is present.""" # Priority is on the value returned by connect_volume method if device_info and device_info.get('path'): @@ -759,7 +760,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): return False @classmethod - def get_connector_properties(cls, root_helper, *args, **kwargs) -> Dict: + def get_connector_properties(cls, root_helper, *args, **kwargs) -> dict: """The NVMe-oF connector properties (initiator uuid and nqn.)""" execute = kwargs.get('execute') or priv_rootwrap.execute nvmf = NVMeOFConnector(root_helper=root_helper, execute=execute) @@ -843,7 +844,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): @base.synchronized('connect_volume', external=True) @NVMeOFConnProps.from_dictionary_parameter def connect_volume( - self, connection_properties: NVMeOFConnProps) -> Dict[str, str]: + self, connection_properties: NVMeOFConnProps) -> dict[str, str]: """Attach and discover the volume.""" try: if connection_properties.is_replicated is False: @@ -978,7 +979,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): return device_path def _handle_replicated_volume(self, - host_device_paths: List[str], + host_device_paths: list[str], conn_props: NVMeOFConnProps) -> str: """Assemble the raid from found devices.""" path_in_raid = False @@ -1004,7 +1005,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): return device_path def _handle_single_replica(self, - host_device_paths: List[str], + host_device_paths: list[str], volume_alias: str) -> str: """Assemble the raid from a single device.""" if self._is_raid_device(host_device_paths[0]): @@ -1019,8 +1020,8 @@ class NVMeOFConnector(base.BaseLinuxConnector): @base.synchronized('connect_volume', external=True) @utils.connect_volume_undo_prepare_result(unlink_after=True) def disconnect_volume(self, - connection_properties: Dict, - device_info: Dict[str, str], + connection_properties: dict, + device_info: dict[str, str], force: bool = False, ignore_errors: bool = False) -> None: """Flush the volume. @@ -1115,7 +1116,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): # ####### Extend methods ######## @staticmethod - def _get_sizes_from_lba(ns_data: Dict) -> Tuple[Optional[int], + def _get_sizes_from_lba(ns_data: dict) -> tuple[Optional[int], Optional[int]]: """Return size in bytes and the nsze of the volume from NVMe NS data. @@ -1143,7 +1144,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): @utils.trace @base.synchronized('extend_volume', external=True) @utils.connect_volume_undo_prepare_result - def extend_volume(self, connection_properties: Dict[str, str]) -> int: + def extend_volume(self, connection_properties: dict[str, str]) -> int: """Update an attached volume to reflect the current size after extend The only way to reflect the new size of an NVMe-oF volume on the host @@ -1286,7 +1287,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): return None def stop_and_assemble_raid(self, - drives: List[str], + drives: list[str], md_path: str, read_only: bool) -> None: md_name = None @@ -1317,7 +1318,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): i += 1 def assemble_raid(self, - drives: List[str], + drives: list[str], md_path: str, read_only: bool) -> bool: cmd = ['mdadm', '--assemble', '--run', md_path] @@ -1337,7 +1338,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): return True def create_raid(self, - drives: List[str], + drives: list[str], raid_type: str, device_name: str, name: str, @@ -1452,7 +1453,7 @@ class NVMeOFConnector(base.BaseLinuxConnector): def run_nvme_cli(self, nvme_command: Sequence[str], - **kwargs) -> Tuple[str, str]: + **kwargs) -> tuple[str, str]: """Run an nvme cli command and return stdout and stderr output.""" (out, err) = self._execute('nvme', *nvme_command, run_as_root=True, root_helper=self._root_helper, diff --git a/os_brick/initiator/host_driver.py b/os_brick/initiator/host_driver.py index 102885810..ef3df3fa9 100644 --- a/os_brick/initiator/host_driver.py +++ b/os_brick/initiator/host_driver.py @@ -13,14 +13,15 @@ # License for the specific language governing permissions and limitations # under the License. +from __future__ import annotations + import errno import os -from typing import List class HostDriver(object): - def get_all_block_devices(self) -> List[str]: + def get_all_block_devices(self) -> list[str]: """Get the list of all block devices seen in /dev/disk/by-path/.""" dir = "/dev/disk/by-path/" try: diff --git a/os_brick/initiator/linuxfc.py b/os_brick/initiator/linuxfc.py index 85a2f515c..2063a6485 100644 --- a/os_brick/initiator/linuxfc.py +++ b/os_brick/initiator/linuxfc.py @@ -18,7 +18,7 @@ from __future__ import annotations import glob import os -from typing import Dict, Iterable, List # noqa: H301 +from typing import Iterable from oslo_concurrency import processutils as putils from oslo_log import log as logging @@ -166,7 +166,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI): {'hp': hostpath, 'exc': exc}) return hbas - def get_fc_hbas_info(self) -> List[Dict[str, str]]: + def get_fc_hbas_info(self) -> list[dict[str, str]]: """Get Fibre Channel WWNs and device paths from the system, if any.""" hbas = self.get_fc_hbas() @@ -182,7 +182,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI): 'device_path': device_path}) return hbas_info - def get_fc_wwpns(self) -> List[str]: + def get_fc_wwpns(self) -> list[str]: """Get Fibre Channel WWPNs from the system, if any.""" hbas = self.get_fc_hbas() @@ -194,7 +194,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI): return wwpns - def get_fc_wwnns(self) -> List[str]: + def get_fc_wwnns(self) -> list[str]: """Get Fibre Channel WWNNs from the system, if any.""" hbas = self.get_fc_hbas() diff --git a/os_brick/initiator/linuxscsi.py b/os_brick/initiator/linuxscsi.py index a57cfb3ce..298f0324a 100644 --- a/os_brick/initiator/linuxscsi.py +++ b/os_brick/initiator/linuxscsi.py @@ -16,11 +16,14 @@ Note, this is not iSCSI. """ + +from __future__ import annotations + import glob import os import re import time -from typing import Dict, List, Optional # noqa: H301 +from typing import Optional from oslo_concurrency import processutils as putils from oslo_log import log as logging @@ -82,7 +85,7 @@ class LinuxSCSI(executor.Executor): with exc.context(force, 'Removing %s failed', device): self.echo_scsi_command(path, "1") - def wait_for_volumes_removal(self, volumes_names: List[str]) -> None: + def wait_for_volumes_removal(self, volumes_names: list[str]) -> None: """Wait for device paths to be removed from the system.""" str_names = ', '.join(volumes_names) LOG.debug('Checking to see if SCSI volumes %s have been removed.', @@ -105,7 +108,7 @@ class LinuxSCSI(executor.Executor): LOG.debug('%s still exist.', ', '.join(exist)) raise exception.VolumePathNotRemoved(volume_path=exist) - def get_device_info(self, device: str) -> Dict[str, Optional[str]]: + def get_device_info(self, device: str) -> dict[str, Optional[str]]: dev_info = {'device': device, 'host': None, 'channel': None, 'id': None, 'lun': None} # The input argument 'device' can be of 2 types: @@ -132,7 +135,7 @@ class LinuxSCSI(executor.Executor): LOG.debug('dev_info=%s', str(dev_info)) return dev_info - def get_sysfs_wwn(self, device_names: List[str], mpath=None) -> str: + def get_sysfs_wwn(self, device_names: list[str], mpath=None) -> str: """Return the wwid from sysfs in any of devices in udev format.""" # If we have a multipath DM we know that it has found the WWN if mpath: