mypy: Update format to future __annotations__

Use dict[], list[] etc. instead of Dict[], List[].

This works on all versions of Python that we support.

Change-Id: If9c737277c041ffd2d3bca1863d300ae7d9bd52e
This commit is contained in:
Eric Harney 2022-08-19 11:32:52 -04:00
parent aa97f4455c
commit 6de226ed18
8 changed files with 55 additions and 44 deletions

View File

@ -14,8 +14,10 @@
"""Exceptions for the Brick library."""
from __future__ import annotations
import traceback
from typing import Any, List, Optional # noqa: H301
from typing import Any, Optional # noqa: H301
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
@ -184,7 +186,7 @@ class ExceptionChainer(BrickException):
logged with warning level.
"""
def __init__(self, *args, **kwargs):
self._exceptions: List[tuple] = []
self._exceptions: list[tuple] = []
self._repr: Optional[str] = None
self._exc_msg_args = []
super(ExceptionChainer, self).__init__(*args, **kwargs)

View File

@ -18,8 +18,10 @@
and root_helper settings, so this provides that hook.
"""
from __future__ import annotations
import threading
from typing import Callable, Tuple # noqa: H301
from typing import Callable
from oslo_concurrency import processutils as putils
from oslo_context import context as context_utils
@ -48,7 +50,7 @@ class Executor(object):
if value:
setattr(exc, field, cls.safe_decode(value))
def _execute(self, *args, **kwargs) -> Tuple[str, str]:
def _execute(self, *args, **kwargs) -> tuple[str, str]:
try:
result = self.__execute(*args, **kwargs)
if result:

View File

@ -12,12 +12,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import functools
import glob
import os
import typing
from typing import Optional, Tuple # noqa: H301
from typing import Optional
from oslo_concurrency import lockutils
from oslo_concurrency import processutils as putils
@ -155,7 +157,7 @@ class BaseLinuxConnector(initiator_connector.InitiatorConnector):
def _discover_mpath_device(self,
device_wwn: str,
connection_properties: dict,
device_name: str) -> Tuple[str, str]:
device_name: str) -> tuple[str, str]:
"""This method discovers a multipath device.
Discover a multipath device based on a defined connection_property

View File

@ -15,7 +15,7 @@
from __future__ import annotations
import copy
from typing import Any, Dict, Generator # noqa: H301
from typing import Any, Generator # noqa: H301
from os_brick.initiator import initiator_connector
@ -23,7 +23,7 @@ from os_brick.initiator import initiator_connector
class BaseISCSIConnector(initiator_connector.InitiatorConnector):
def _iterate_all_targets(
self,
connection_properties: dict) -> Generator[Dict[str, Any],
connection_properties: dict) -> Generator[dict[str, Any],
None, None]:
for portal, iqn, lun in self._get_all_targets(connection_properties):
props = copy.deepcopy(connection_properties)

View File

@ -13,14 +13,15 @@
# Look in the NVMeOFConnProps class docstring to see the format of the NVMe-oF
# connection properties
from __future__ import annotations
import errno
import functools
import glob
import json
import os.path
import time
from typing import (Callable, Dict, List, Optional, Sequence, # noqa: H301
Tuple, Type, Union) # noqa: H301
from typing import (Callable, Optional, Sequence, Type, Union) # noqa: H301
import uuid as uuid_lib
@ -206,7 +207,7 @@ class Portal(object):
'"devices_on_start"')
return target.get_device_path_by_initial_devices()
def get_all_namespaces_ctrl_paths(self) -> List[str]:
def get_all_namespaces_ctrl_paths(self) -> list[str]:
"""Return all nvme sysfs control paths for this portal.
The basename of the path can be single volume or a channel to an ANA
@ -295,7 +296,7 @@ class Target(object):
def factory(cls: Type['Target'],
source_conn_props: 'NVMeOFConnProps',
target_nqn: str,
portals: List[str],
portals: list[str],
vol_uuid: Optional[str] = None,
volume_nguid: Optional[str] = None,
ns_id: Optional[str] = None,
@ -314,7 +315,7 @@ class Target(object):
def __init__(self,
source_conn_props: 'NVMeOFConnProps',
nqn: str,
portals: List[str],
portals: list[str],
uuid: Optional[str] = None,
nguid: Optional[str] = None,
ns_id: Optional[str] = None,
@ -347,13 +348,13 @@ class Target(object):
LOG.debug('Devices on start are: %s', self.devices_on_start)
@staticmethod
def _get_nvme_devices() -> List[str]:
def _get_nvme_devices() -> list[str]:
"""Get all NVMe devices present in the system."""
pattern = '/dev/nvme*n*' # e.g. /dev/nvme10n10
return glob.glob(pattern)
@property
def live_portals(self) -> List[Portal]:
def live_portals(self) -> list[Portal]:
"""Get live portals.
Must have called set_portals_controllers first since portals without a
@ -362,7 +363,7 @@ class Target(object):
return [p for p in self.portals if p.is_live]
@property
def present_portals(self) -> List[Portal]:
def present_portals(self) -> list[Portal]:
"""Get present portals.
Must have called set_portals_controllers first since portals without a
@ -383,13 +384,13 @@ class Target(object):
# List of portal addresses and transports for this target
# Unlike "nvme list-subsys -o json" sysfs addr is separated by a comma
sysfs_portals: List[Tuple[Optional[str],
sysfs_portals: list[tuple[Optional[str],
Optional[str],
Optional[Union[str, utils.Anything]]]] = [
(f'traddr={p.address},trsvcid={p.port}', p.transport, hostnqn)
for p in self.portals
]
known_names: List[str] = [p.controller for p in self.portals
known_names: list[str] = [p.controller for p in self.portals
if p.controller]
warned = False
@ -434,7 +435,7 @@ class Target(object):
if len(known_names) == len(sysfs_portals):
return
def get_devices(self, only_live=False, get_one=False) -> List[str]:
def get_devices(self, only_live=False, get_one=False) -> list[str]:
"""Return devices for this target
Optionally only return devices from portals that are live and also
@ -610,7 +611,7 @@ class NVMeOFConnProps(object):
replica_count = None
cinder_volume_id: Optional[str] = None
def __init__(self, conn_props: Dict,
def __init__(self, conn_props: dict,
find_controllers: bool = False) -> None:
# Generic connection properties fields used by Nova
self.qos_specs = conn_props.get('qos_specs')
@ -649,7 +650,7 @@ class NVMeOFConnProps(object):
# Below fields may have been added by nova
self.device_path = conn_props.get('device_path')
def get_devices(self, only_live: bool = False) -> List[str]:
def get_devices(self, only_live: bool = False) -> list[str]:
"""Get all device paths present in the system for all targets."""
result = []
for target in self.targets:
@ -707,7 +708,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
def get_volume_paths(
self,
connection_properties: NVMeOFConnProps,
device_info: Optional[Dict[str, str]] = None) -> List[str]:
device_info: Optional[dict[str, str]] = None) -> list[str]:
"""Return paths where the volume is present."""
# Priority is on the value returned by connect_volume method
if device_info and device_info.get('path'):
@ -759,7 +760,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return False
@classmethod
def get_connector_properties(cls, root_helper, *args, **kwargs) -> Dict:
def get_connector_properties(cls, root_helper, *args, **kwargs) -> dict:
"""The NVMe-oF connector properties (initiator uuid and nqn.)"""
execute = kwargs.get('execute') or priv_rootwrap.execute
nvmf = NVMeOFConnector(root_helper=root_helper, execute=execute)
@ -843,7 +844,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
@base.synchronized('connect_volume', external=True)
@NVMeOFConnProps.from_dictionary_parameter
def connect_volume(
self, connection_properties: NVMeOFConnProps) -> Dict[str, str]:
self, connection_properties: NVMeOFConnProps) -> dict[str, str]:
"""Attach and discover the volume."""
try:
if connection_properties.is_replicated is False:
@ -978,7 +979,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return device_path
def _handle_replicated_volume(self,
host_device_paths: List[str],
host_device_paths: list[str],
conn_props: NVMeOFConnProps) -> str:
"""Assemble the raid from found devices."""
path_in_raid = False
@ -1004,7 +1005,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return device_path
def _handle_single_replica(self,
host_device_paths: List[str],
host_device_paths: list[str],
volume_alias: str) -> str:
"""Assemble the raid from a single device."""
if self._is_raid_device(host_device_paths[0]):
@ -1019,8 +1020,8 @@ class NVMeOFConnector(base.BaseLinuxConnector):
@base.synchronized('connect_volume', external=True)
@utils.connect_volume_undo_prepare_result(unlink_after=True)
def disconnect_volume(self,
connection_properties: Dict,
device_info: Dict[str, str],
connection_properties: dict,
device_info: dict[str, str],
force: bool = False,
ignore_errors: bool = False) -> None:
"""Flush the volume.
@ -1115,7 +1116,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
# ####### Extend methods ########
@staticmethod
def _get_sizes_from_lba(ns_data: Dict) -> Tuple[Optional[int],
def _get_sizes_from_lba(ns_data: dict) -> tuple[Optional[int],
Optional[int]]:
"""Return size in bytes and the nsze of the volume from NVMe NS data.
@ -1143,7 +1144,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
@utils.trace
@base.synchronized('extend_volume', external=True)
@utils.connect_volume_undo_prepare_result
def extend_volume(self, connection_properties: Dict[str, str]) -> int:
def extend_volume(self, connection_properties: dict[str, str]) -> int:
"""Update an attached volume to reflect the current size after extend
The only way to reflect the new size of an NVMe-oF volume on the host
@ -1286,7 +1287,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return None
def stop_and_assemble_raid(self,
drives: List[str],
drives: list[str],
md_path: str,
read_only: bool) -> None:
md_name = None
@ -1317,7 +1318,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
i += 1
def assemble_raid(self,
drives: List[str],
drives: list[str],
md_path: str,
read_only: bool) -> bool:
cmd = ['mdadm', '--assemble', '--run', md_path]
@ -1337,7 +1338,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
return True
def create_raid(self,
drives: List[str],
drives: list[str],
raid_type: str,
device_name: str,
name: str,
@ -1452,7 +1453,7 @@ class NVMeOFConnector(base.BaseLinuxConnector):
def run_nvme_cli(self,
nvme_command: Sequence[str],
**kwargs) -> Tuple[str, str]:
**kwargs) -> tuple[str, str]:
"""Run an nvme cli command and return stdout and stderr output."""
(out, err) = self._execute('nvme', *nvme_command, run_as_root=True,
root_helper=self._root_helper,

View File

@ -13,14 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import errno
import os
from typing import List
class HostDriver(object):
def get_all_block_devices(self) -> List[str]:
def get_all_block_devices(self) -> list[str]:
"""Get the list of all block devices seen in /dev/disk/by-path/."""
dir = "/dev/disk/by-path/"
try:

View File

@ -18,7 +18,7 @@ from __future__ import annotations
import glob
import os
from typing import Dict, Iterable, List # noqa: H301
from typing import Iterable
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
@ -166,7 +166,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
{'hp': hostpath, 'exc': exc})
return hbas
def get_fc_hbas_info(self) -> List[Dict[str, str]]:
def get_fc_hbas_info(self) -> list[dict[str, str]]:
"""Get Fibre Channel WWNs and device paths from the system, if any."""
hbas = self.get_fc_hbas()
@ -182,7 +182,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
'device_path': device_path})
return hbas_info
def get_fc_wwpns(self) -> List[str]:
def get_fc_wwpns(self) -> list[str]:
"""Get Fibre Channel WWPNs from the system, if any."""
hbas = self.get_fc_hbas()
@ -194,7 +194,7 @@ class LinuxFibreChannel(linuxscsi.LinuxSCSI):
return wwpns
def get_fc_wwnns(self) -> List[str]:
def get_fc_wwnns(self) -> list[str]:
"""Get Fibre Channel WWNNs from the system, if any."""
hbas = self.get_fc_hbas()

View File

@ -16,11 +16,14 @@
Note, this is not iSCSI.
"""
from __future__ import annotations
import glob
import os
import re
import time
from typing import Dict, List, Optional # noqa: H301
from typing import Optional
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
@ -82,7 +85,7 @@ class LinuxSCSI(executor.Executor):
with exc.context(force, 'Removing %s failed', device):
self.echo_scsi_command(path, "1")
def wait_for_volumes_removal(self, volumes_names: List[str]) -> None:
def wait_for_volumes_removal(self, volumes_names: list[str]) -> None:
"""Wait for device paths to be removed from the system."""
str_names = ', '.join(volumes_names)
LOG.debug('Checking to see if SCSI volumes %s have been removed.',
@ -105,7 +108,7 @@ class LinuxSCSI(executor.Executor):
LOG.debug('%s still exist.', ', '.join(exist))
raise exception.VolumePathNotRemoved(volume_path=exist)
def get_device_info(self, device: str) -> Dict[str, Optional[str]]:
def get_device_info(self, device: str) -> dict[str, Optional[str]]:
dev_info = {'device': device, 'host': None,
'channel': None, 'id': None, 'lun': None}
# The input argument 'device' can be of 2 types:
@ -132,7 +135,7 @@ class LinuxSCSI(executor.Executor):
LOG.debug('dev_info=%s', str(dev_info))
return dev_info
def get_sysfs_wwn(self, device_names: List[str], mpath=None) -> str:
def get_sysfs_wwn(self, device_names: list[str], mpath=None) -> str:
"""Return the wwid from sysfs in any of devices in udev format."""
# If we have a multipath DM we know that it has found the WWN
if mpath: