mypy: initiator/connectors/rbd.py

Change-Id: I55a8baba7e40dc0fe9427231e293a3273936b4be
This commit is contained in:
Eric Harney 2022-07-08 09:51:43 -04:00
parent b3f89a304f
commit dfa1d167b9
2 changed files with 52 additions and 20 deletions

View File

@ -11,6 +11,7 @@ os_brick/initiator/connectors/fibre_channel.py
os_brick/initiator/connectors/iscsi.py
os_brick/initiator/connectors/nvmeof.py
os_brick/initiator/connectors/local.py
os_brick/initiator/connectors/rbd.py
os_brick/initiator/connectors/remotefs.py
os_brick/initiator/host_driver.py
os_brick/initiator/linuxfc.py

View File

@ -1,5 +1,3 @@
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -12,9 +10,15 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
import tempfile
import typing
from typing import Any, Optional, Union # noqa: H301
if typing.TYPE_CHECKING:
import io
from oslo_concurrency import processutils as putils
from oslo_log import log as logging
@ -41,7 +45,8 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
device_scan_attempts=initiator.DEVICE_SCAN_ATTEMPTS_DEFAULT,
*args, **kwargs):
super(RBDConnector, self).__init__(root_helper, driver=driver,
super(RBDConnector, self).__init__(root_helper,
driver=driver, # type: ignore
device_scan_attempts=
device_scan_attempts,
*args, **kwargs)
@ -52,21 +57,27 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
"""The RBD connector properties."""
return {'do_local_attach': kwargs.get('do_local_attach', False)}
def get_volume_paths(self, connection_properties):
def get_volume_paths(self,
connection_properties: dict[str, Any]) -> list[str]:
# TODO(e0ne): Implement this for local volume.
return []
def get_search_path(self):
def get_search_path(self) -> None:
# TODO(walter-boring): don't know where the connector
# looks for RBD volumes.
return None
def get_all_available_volumes(self, connection_properties=None):
def get_all_available_volumes(
self,
connection_properties: Optional[dict[str, Any]] = None) -> \
list[str]:
# TODO(e0ne): Implement this for local volume.
return []
@staticmethod
def _check_or_get_keyring_contents(keyring, cluster_name, user):
def _check_or_get_keyring_contents(keyring: Optional[str],
cluster_name: str,
user: str) -> str:
try:
if keyring is None:
if user:
@ -82,8 +93,12 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
raise exception.BrickException(msg=msg)
@classmethod
def _create_ceph_conf(cls, monitor_ips, monitor_ports,
cluster_name, user, keyring):
def _create_ceph_conf(cls,
monitor_ips: list[str],
monitor_ports: list[str],
cluster_name: str,
user: str,
keyring) -> str:
monitors = ["%s:%s" % (ip, port) for ip, port in
zip(cls._sanitize_mon_hosts(monitor_ips), monitor_ports)]
mon_hosts = "mon_host = %s" % (','.join(monitors))
@ -105,7 +120,8 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
msg = (_("Failed to write data to %s.") % (ceph_conf_path))
raise exception.BrickException(msg=msg)
def _get_rbd_handle(self, connection_properties):
def _get_rbd_handle(self, connection_properties: dict[str, Any]) -> \
linuxrbd.RBDVolumeIOWrapper:
try:
user = connection_properties['auth_username']
pool, volume = connection_properties['name'].split('/')
@ -136,7 +152,7 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
return rbd_handle
@staticmethod
def get_rbd_device_name(pool, volume):
def get_rbd_device_name(pool: str, volume: str) -> str:
"""Return device name which will be generated by RBD kernel module.
:param pool: RBD pool name.
@ -147,7 +163,9 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
return '/dev/rbd/{pool}/{volume}'.format(pool=pool, volume=volume)
@classmethod
def create_non_openstack_config(cls, connection_properties):
def create_non_openstack_config(
cls,
connection_properties: dict[str, Any]):
"""Get root owned Ceph's .conf file for non OpenStack usage."""
# If keyring info is missing then we are in OpenStack, nothing to do
keyring = connection_properties.get('keyring')
@ -170,7 +188,10 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
keyring)
return conf
def _local_attach_volume(self, connection_properties):
def _local_attach_volume(
self,
connection_properties: dict[str, Any]) -> \
dict[str, Union[str, linuxrbd.RBDVolumeIOWrapper]]:
# NOTE(e0ne): sanity check if ceph-common is installed.
try:
self._execute('which', 'rbd')
@ -220,6 +241,7 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
if conf:
rbd_privsep.delete_if_exists(conf)
res: dict[str, Union[str, linuxrbd.RBDVolumeIOWrapper]]
res = {'path': rbd_dev_path,
'type': 'block'}
if conf:
@ -228,7 +250,9 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
@utils.trace
@utils.connect_volume_prepare_result
def connect_volume(self, connection_properties):
def connect_volume(self,
connection_properties: dict[str, Any]) -> \
dict[str, Union[linuxrbd.RBDVolumeIOWrapper, str]]:
"""Connect to a volume.
:param connection_properties: The dictionary that describes all
@ -245,7 +269,9 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
rbd_handle = self._get_rbd_handle(connection_properties)
return {'path': rbd_handle}
def _find_root_device(self, connection_properties, conf):
def _find_root_device(self,
connection_properties: dict[str, Any],
conf) -> Optional[str]:
"""Find the underlying /dev/rbd* device for a mapping.
Use the showmapped command to list all acive mappings and find the
@ -307,8 +333,11 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
@utils.trace
@utils.connect_volume_undo_prepare_result(unlink_after=True)
def disconnect_volume(self, connection_properties, device_info,
force=False, ignore_errors=False):
def disconnect_volume(self,
connection_properties: dict[str, Any],
device_info: dict,
force: bool = False,
ignore_errors: bool = False) -> None:
"""Disconnect a volume.
:param connection_properties: The dictionary that describes all
@ -340,7 +369,7 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
rbd_handle.close()
@staticmethod
def _check_valid_device(rbd_handle):
def _check_valid_device(rbd_handle: 'io.BufferedReader') -> bool:
original_offset = rbd_handle.tell()
try:
@ -354,7 +383,9 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
return True
def check_valid_device(self, path, run_as_root=True):
def check_valid_device(self,
path: Optional[str],
run_as_root: bool = True) -> bool:
"""Verify an existing RBD handle is connected and valid."""
if not path:
return False
@ -371,7 +402,7 @@ class RBDConnector(base_rbd.RBDConnectorMixin, base.BaseLinuxConnector):
return self._check_valid_device(path)
@utils.connect_volume_undo_prepare_result
def extend_volume(self, connection_properties):
def extend_volume(self, connection_properties: dict[str, Any]) -> int:
"""Refresh local volume view and return current size in bytes."""
# Nothing to do, RBD attached volumes are automatically refreshed, but
# we need to return the new size for compatibility