mypy: initiator/linuxrbd

Add type coverage for initiator/linuxrbd.

Change-Id: Id9a3bf8b1e8aea14cd2cf1eae6907c9719aad3b0
This commit is contained in:
Eric Harney 2022-08-17 14:02:46 -04:00
parent 0e3ab3bd74
commit f5d5371cbc
2 changed files with 41 additions and 27 deletions

View File

@ -17,8 +17,8 @@ os_brick/initiator/connectors/rbd.py
os_brick/initiator/connectors/remotefs.py
os_brick/initiator/host_driver.py
os_brick/initiator/linuxfc.py
os_brick/initiator/linuxrbd.py
os_brick/initiator/utils.py
os_brick/privileged/nvmeof.py
os_brick/privileged/rbd.py
os_brick/remotefs/remotefs.py
os_brick/initiator/linuxrbd.py

View File

@ -12,7 +12,10 @@
"""Generic RBD connection utilities."""
from __future__ import annotations
import io
from typing import NoReturn, Optional # noqa: H301
from oslo_log import log as logging
@ -33,10 +36,12 @@ LOG = logging.getLogger(__name__)
class RBDClient(object):
def __init__(self, user, pool, *args, **kwargs):
def __init__(self, user: str, pool: str, *args, **kwargs):
self.rbd_user = user
self.rbd_pool = pool
self.rados: 'rados.Rados'
self.rbd: 'rbd.RBD'
for attr in ['rbd_user', 'rbd_pool']:
val = getattr(self, attr)
@ -54,9 +59,10 @@ class RBDClient(object):
raise exception.InvalidParameterValue(
err=_('rbd module required'))
self.rbd_conf = kwargs.get('conffile', '/etc/ceph/ceph.conf')
self.rbd_cluster_name = kwargs.get('rbd_cluster_name', 'ceph')
self.rados_connect_timeout = kwargs.get('rados_connect_timeout', -1)
self.rbd_conf: str = kwargs.get('conffile', '/etc/ceph/ceph.conf')
self.rbd_cluster_name: str = kwargs.get('rbd_cluster_name', 'ceph')
self.rados_connect_timeout: int = kwargs.get('rados_connect_timeout',
-1)
self.client, self.ioctx = self.connect()
@ -66,7 +72,7 @@ class RBDClient(object):
def __exit__(self, type_, value, traceback):
self.disconnect()
def connect(self):
def connect(self) -> tuple['rados.Rados', 'rados.Ioctx']:
LOG.debug("opening connection to ceph cluster (timeout=%s).",
self.rados_connect_timeout)
client = self.rados.Rados(rados_id=self.rbd_user,
@ -88,7 +94,7 @@ class RBDClient(object):
client.shutdown()
raise exception.BrickException(message=msg)
def disconnect(self):
def disconnect(self) -> None:
# closing an ioctx cannot raise an exception
self.ioctx.close()
self.client.shutdown()
@ -97,7 +103,11 @@ class RBDClient(object):
class RBDVolume(object):
"""Context manager for dealing with an existing rbd volume."""
def __init__(self, client, name, snapshot=None, read_only=False):
def __init__(self,
client: RBDClient,
name: str,
snapshot: Optional[str] = None,
read_only: bool = False):
if snapshot is not None:
snapshot = utils.convert_str(snapshot)
@ -118,16 +128,16 @@ class RBDVolume(object):
self.name = name
self.client = client
def close(self):
def close(self) -> None:
try:
self.image.close()
finally:
self.client.disconnect()
def __enter__(self):
def __enter__(self) -> 'RBDVolume':
return self
def __exit__(self, type_, value, traceback):
def __exit__(self, type_, value, traceback) -> None:
self.close()
def __getattr__(self, attrib):
@ -136,7 +146,11 @@ class RBDVolume(object):
class RBDImageMetadata(object):
"""RBD image metadata to be used with RBDVolumeIOWrapper."""
def __init__(self, image, pool, user, conf):
def __init__(self,
image: 'rbd.Image',
pool: Optional[str],
user: Optional[str],
conf: Optional[str]):
self.image = image
self.pool = utils.convert_str(pool or '')
self.user = utils.convert_str(user or '')
@ -149,33 +163,33 @@ class RBDVolumeIOWrapper(io.RawIOBase):
Calling unimplemented interfaces will raise IOError.
"""
def __init__(self, rbd_volume):
def __init__(self, rbd_volume: RBDImageMetadata):
super(RBDVolumeIOWrapper, self).__init__()
self._rbd_volume = rbd_volume
self._offset = 0
def _inc_offset(self, length):
def _inc_offset(self, length: int) -> None:
self._offset += length
@property
def rbd_image(self):
def rbd_image(self) -> 'rbd.Image':
return self._rbd_volume.image
@property
def rbd_user(self):
def rbd_user(self) -> str:
return self._rbd_volume.user
@property
def rbd_pool(self):
def rbd_pool(self) -> str:
return self._rbd_volume.pool
@property
def rbd_conf(self):
def rbd_conf(self) -> str:
return self._rbd_volume.conf
def read(self, length=None):
def read(self, length: Optional[int] = None) -> bytes:
offset = self._offset
total = self._rbd_volume.image.size()
total = int(self._rbd_volume.image.size())
# NOTE(dosaboy): posix files do not barf if you read beyond their
# length (they just return nothing) but rbd images do so we need to
@ -198,14 +212,14 @@ class RBDVolumeIOWrapper(io.RawIOBase):
self._inc_offset(length)
return data
def write(self, data):
def write(self, data) -> None:
self._rbd_volume.image.write(data, self._offset)
self._inc_offset(len(data))
def seekable(self):
def seekable(self) -> bool:
return True
def seek(self, offset, whence=0):
def seek(self, offset: int, whence: int = 0):
if whence == 0:
new_offset = offset
elif whence == 1:
@ -222,10 +236,10 @@ class RBDVolumeIOWrapper(io.RawIOBase):
self._offset = new_offset
def tell(self):
def tell(self) -> int:
return self._offset
def flush(self):
def flush(self) -> None:
# Raise ValueError if already closed
super().flush()
try:
@ -233,7 +247,7 @@ class RBDVolumeIOWrapper(io.RawIOBase):
except AttributeError:
LOG.warning("flush() not supported in this version of librbd")
def fileno(self):
def fileno(self) -> NoReturn:
"""RBD does not have support for fileno() so we raise IOError.
Raising IOError is recommended way to notify caller that interface is
@ -241,7 +255,7 @@ class RBDVolumeIOWrapper(io.RawIOBase):
"""
raise IOError(_("fileno() not supported by RBD()"))
def close(self):
def close(self) -> None:
if not self.closed:
# Can't set closed attribute ourselves, call parent to flush and
# change it.