mypy: lvm.py
Change-Id: I27a565b8ca250f6790061932ab000946dedd97c5
This commit is contained in:
parent
f5d5371cbc
commit
47dda4b487
@ -19,6 +19,7 @@ os_brick/initiator/host_driver.py
|
||||
os_brick/initiator/linuxfc.py
|
||||
os_brick/initiator/linuxrbd.py
|
||||
os_brick/initiator/utils.py
|
||||
os_brick/local_dev/lvm.py
|
||||
os_brick/privileged/nvmeof.py
|
||||
os_brick/privileged/rbd.py
|
||||
os_brick/remotefs/remotefs.py
|
||||
|
@ -15,9 +15,12 @@
|
||||
|
||||
"""LVM class for performing LVM operations."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
from typing import Any, Callable, Optional # noqa: H301
|
||||
|
||||
from oslo_concurrency import processutils as putils
|
||||
from oslo_log import log as logging
|
||||
@ -36,10 +39,12 @@ class LVM(executor.Executor):
|
||||
|
||||
LVM_CMD_PREFIX = ['env', 'LC_ALL=C']
|
||||
|
||||
def __init__(self, vg_name, root_helper, create_vg=False,
|
||||
physical_volumes=None, lvm_type='default',
|
||||
executor=None, lvm_conf=None,
|
||||
suppress_fd_warn=False):
|
||||
def __init__(self, vg_name: str, root_helper: str, create_vg: bool = False,
|
||||
physical_volumes: Optional[list] = None,
|
||||
lvm_type: Optional[str] = 'default',
|
||||
executor: Optional[Callable] = None,
|
||||
lvm_conf=None,
|
||||
suppress_fd_warn: Optional[bool] = False):
|
||||
|
||||
"""Initialize the LVM object.
|
||||
|
||||
@ -66,8 +71,8 @@ class LVM(executor.Executor):
|
||||
self.vg_thin_pool = None
|
||||
self.vg_thin_pool_size = 0.0
|
||||
self.vg_thin_pool_free_space = 0.0
|
||||
self._supports_snapshot_lv_activation = None
|
||||
self._supports_lvchange_ignoreskipactivation = None
|
||||
self._supports_snapshot_lv_activation: Optional[bool] = None
|
||||
self._supports_lvchange_ignoreskipactivation: Optional[bool] = None
|
||||
self.vg_provisioned_capacity = 0.0
|
||||
|
||||
# Ensure LVM_SYSTEM_DIR has been added to LVM.LVM_CMD_PREFIX
|
||||
@ -119,7 +124,7 @@ class LVM(executor.Executor):
|
||||
self.activate_lv(self.vg_thin_pool)
|
||||
self.pv_list = self.get_all_physical_volumes(root_helper, vg_name)
|
||||
|
||||
def _vg_exists(self):
|
||||
def _vg_exists(self) -> bool:
|
||||
"""Simple check to see if VG exists.
|
||||
|
||||
:returns: True if vg specified in object exists, else False
|
||||
@ -139,16 +144,16 @@ class LVM(executor.Executor):
|
||||
|
||||
return exists
|
||||
|
||||
def _create_vg(self, pv_list):
|
||||
def _create_vg(self, pv_list: list[str]) -> None:
|
||||
cmd = ['vgcreate', self.vg_name, ','.join(pv_list)]
|
||||
self._execute(*cmd, root_helper=self._root_helper, run_as_root=True)
|
||||
|
||||
@utils.retry(retry=utils.retry_if_exit_code, retry_param=139, interval=0.5,
|
||||
backoff_rate=0.5)
|
||||
def _run_lvm_command(self,
|
||||
cmd_arg_list: list,
|
||||
root_helper: str = None,
|
||||
run_as_root: bool = True) -> tuple:
|
||||
cmd_arg_list: list[str],
|
||||
root_helper: Optional[str] = None,
|
||||
run_as_root: bool = True) -> tuple[str, str]:
|
||||
"""Run LVM commands with a retry on code 139 to work around LVM bugs.
|
||||
|
||||
Refer to LP bug 1901783, LP bug 1932188.
|
||||
@ -161,7 +166,7 @@ class LVM(executor.Executor):
|
||||
run_as_root=run_as_root)
|
||||
return (out, err)
|
||||
|
||||
def _get_vg_uuid(self):
|
||||
def _get_vg_uuid(self) -> list:
|
||||
cmd = LVM.LVM_CMD_PREFIX + ['vgs', '--noheadings',
|
||||
'-o', 'uuid', self.vg_name]
|
||||
(out, _err) = self._execute(*cmd,
|
||||
@ -172,7 +177,8 @@ class LVM(executor.Executor):
|
||||
else:
|
||||
return []
|
||||
|
||||
def _get_thin_pool_free_space(self, vg_name, thin_pool_name):
|
||||
def _get_thin_pool_free_space(self,
|
||||
vg_name: str, thin_pool_name: str) -> float:
|
||||
"""Returns available thin pool free space.
|
||||
|
||||
:param vg_name: the vg where the pool is placed
|
||||
@ -212,7 +218,7 @@ class LVM(executor.Executor):
|
||||
return free_space
|
||||
|
||||
@staticmethod
|
||||
def get_lvm_version(root_helper):
|
||||
def get_lvm_version(root_helper: str) -> tuple:
|
||||
"""Static method to get LVM version from system.
|
||||
|
||||
:param root_helper: root_helper to use for execute
|
||||
@ -234,11 +240,15 @@ class LVM(executor.Executor):
|
||||
version = version_list[2]
|
||||
version_filter = r"(\d+)\.(\d+)\.(\d+).*"
|
||||
r = re.search(version_filter, version)
|
||||
if r is None:
|
||||
raise exception.BrickException(
|
||||
message='Cannot parse LVM version')
|
||||
version_tuple = tuple(map(int, r.group(1, 2, 3)))
|
||||
return version_tuple
|
||||
raise
|
||||
|
||||
@staticmethod
|
||||
def supports_thin_provisioning(root_helper):
|
||||
def supports_thin_provisioning(root_helper: str) -> bool:
|
||||
"""Static method to check for thin LVM support on a system.
|
||||
|
||||
:param root_helper: root_helper to use for execute
|
||||
@ -249,7 +259,7 @@ class LVM(executor.Executor):
|
||||
return LVM.get_lvm_version(root_helper) >= (2, 2, 95)
|
||||
|
||||
@property
|
||||
def supports_snapshot_lv_activation(self):
|
||||
def supports_snapshot_lv_activation(self) -> bool:
|
||||
"""Property indicating whether snap activation changes are supported.
|
||||
|
||||
Check for LVM version >= 2.02.91.
|
||||
@ -267,7 +277,7 @@ class LVM(executor.Executor):
|
||||
return self._supports_snapshot_lv_activation
|
||||
|
||||
@property
|
||||
def supports_lvchange_ignoreskipactivation(self):
|
||||
def supports_lvchange_ignoreskipactivation(self) -> bool:
|
||||
"""Property indicating whether lvchange can ignore skip activation.
|
||||
|
||||
Check for LVM version >= 2.02.99.
|
||||
@ -283,7 +293,7 @@ class LVM(executor.Executor):
|
||||
return self._supports_lvchange_ignoreskipactivation
|
||||
|
||||
@property
|
||||
def supports_full_pool_create(self):
|
||||
def supports_full_pool_create(self) -> bool:
|
||||
"""Property indicating whether 100% pool creation is supported.
|
||||
|
||||
Check for LVM version >= 2.02.115.
|
||||
@ -298,7 +308,9 @@ class LVM(executor.Executor):
|
||||
@staticmethod
|
||||
@utils.retry(retry=utils.retry_if_exit_code, retry_param=139, interval=0.5,
|
||||
backoff_rate=0.5) # Bug#1901783
|
||||
def get_lv_info(root_helper, vg_name=None, lv_name=None):
|
||||
def get_lv_info(root_helper: str,
|
||||
vg_name: Optional[str] = None,
|
||||
lv_name: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
"""Retrieve info about LVs (all, in a VG, or a single LV).
|
||||
|
||||
:param root_helper: root_helper to use for execute
|
||||
@ -337,7 +349,7 @@ class LVM(executor.Executor):
|
||||
|
||||
return lv_list
|
||||
|
||||
def get_volumes(self, lv_name=None):
|
||||
def get_volumes(self, lv_name: Optional[str] = None) -> list[dict]:
|
||||
"""Get all LV's associated with this instantiation (VG).
|
||||
|
||||
:returns: List of Dictionaries with LV info
|
||||
@ -347,7 +359,7 @@ class LVM(executor.Executor):
|
||||
self.vg_name,
|
||||
lv_name)
|
||||
|
||||
def get_volume(self, name):
|
||||
def get_volume(self, name: str) -> Optional[dict]:
|
||||
"""Get reference object of volume specified by name.
|
||||
|
||||
:returns: dict representation of Logical Volume if exists
|
||||
@ -360,7 +372,9 @@ class LVM(executor.Executor):
|
||||
return None
|
||||
|
||||
@staticmethod
|
||||
def get_all_physical_volumes(root_helper, vg_name=None):
|
||||
def get_all_physical_volumes(
|
||||
root_helper: str,
|
||||
vg_name: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
"""Static method to get all PVs on a system.
|
||||
|
||||
:param root_helper: root_helper to use for execute
|
||||
@ -391,7 +405,7 @@ class LVM(executor.Executor):
|
||||
'available': float(fields[3])})
|
||||
return pv_list
|
||||
|
||||
def get_physical_volumes(self):
|
||||
def get_physical_volumes(self) -> list[dict[str, Any]]:
|
||||
"""Get all PVs associated with this instantiation (VG).
|
||||
|
||||
:returns: List of Dictionaries with PV info
|
||||
@ -402,7 +416,9 @@ class LVM(executor.Executor):
|
||||
return self.pv_list
|
||||
|
||||
@staticmethod
|
||||
def get_all_volume_groups(root_helper, vg_name=None):
|
||||
def get_all_volume_groups(
|
||||
root_helper: str,
|
||||
vg_name: Optional[str] = None) -> list[dict[str, Any]]:
|
||||
"""Static method to get all VGs on a system.
|
||||
|
||||
:param root_helper: root_helper to use for execute
|
||||
@ -434,7 +450,7 @@ class LVM(executor.Executor):
|
||||
|
||||
return vg_list
|
||||
|
||||
def update_volume_group_info(self):
|
||||
def update_volume_group_info(self) -> None:
|
||||
"""Update VG info for this instantiation.
|
||||
|
||||
Used to update member fields of object and
|
||||
@ -499,7 +515,7 @@ class LVM(executor.Executor):
|
||||
|
||||
self.vg_provisioned_capacity = total_vols_size
|
||||
|
||||
def _calculate_thin_pool_size(self):
|
||||
def _calculate_thin_pool_size(self) -> list[str]:
|
||||
"""Calculates the correct size for a thin pool.
|
||||
|
||||
Ideally we would use 100% of the containing volume group and be done.
|
||||
@ -522,7 +538,7 @@ class LVM(executor.Executor):
|
||||
# leave 5% free for metadata
|
||||
return ["-L", "%sg" % (self.vg_free_space * 0.95)]
|
||||
|
||||
def create_thin_pool(self, name=None):
|
||||
def create_thin_pool(self, name: Optional[str] = None) -> None:
|
||||
"""Creates a thin provisioning pool for this VG.
|
||||
|
||||
The syntax here is slightly different than the default
|
||||
@ -562,7 +578,11 @@ class LVM(executor.Executor):
|
||||
|
||||
return
|
||||
|
||||
def create_volume(self, name, size_str, lv_type='default', mirror_count=0):
|
||||
def create_volume(self,
|
||||
name: str,
|
||||
size_str: str,
|
||||
lv_type: str = 'default',
|
||||
mirror_count: int = 0) -> None:
|
||||
"""Creates a logical volume on the object's VG.
|
||||
|
||||
:param name: Name to use when creating Logical Volume
|
||||
@ -581,7 +601,7 @@ class LVM(executor.Executor):
|
||||
'-L', size_str]
|
||||
|
||||
if mirror_count > 0:
|
||||
cmd.extend(['-m', mirror_count, '--nosync',
|
||||
cmd.extend(['-m', str(mirror_count), '--nosync',
|
||||
'--mirrorlog', 'mirrored'])
|
||||
terras = int(size_str[:-1]) / 1024.0
|
||||
if terras >= 1.5:
|
||||
@ -600,7 +620,10 @@ class LVM(executor.Executor):
|
||||
raise
|
||||
|
||||
@utils.retry(putils.ProcessExecutionError)
|
||||
def create_lv_snapshot(self, name, source_lv_name, lv_type='default'):
|
||||
def create_lv_snapshot(self,
|
||||
name: str,
|
||||
source_lv_name: str,
|
||||
lv_type: str = 'default') -> None:
|
||||
"""Creates a snapshot of a logical volume.
|
||||
|
||||
:param name: Name to assign to new snapshot
|
||||
@ -629,14 +652,14 @@ class LVM(executor.Executor):
|
||||
LOG.error('StdErr :%s', err.stderr)
|
||||
raise
|
||||
|
||||
def _mangle_lv_name(self, name):
|
||||
def _mangle_lv_name(self, name: str) -> str:
|
||||
# Linux LVM reserves name that starts with snapshot, so that
|
||||
# such volume name can't be created. Mangle it.
|
||||
if not name.startswith('snapshot'):
|
||||
return name
|
||||
return '_' + name
|
||||
|
||||
def _lv_is_active(self, name):
|
||||
def _lv_is_active(self, name: str) -> bool:
|
||||
cmd = LVM.LVM_CMD_PREFIX + ['lvdisplay', '--noheading', '-C', '-o',
|
||||
'Attr', '%s/%s' % (self.vg_name, name)]
|
||||
out, _err = self._run_lvm_command(cmd)
|
||||
@ -648,7 +671,7 @@ class LVM(executor.Executor):
|
||||
return True
|
||||
return False
|
||||
|
||||
def deactivate_lv(self, name):
|
||||
def deactivate_lv(self, name: str) -> None:
|
||||
lv_path = self.vg_name + '/' + self._mangle_lv_name(name)
|
||||
cmd = ['lvchange', '-a', 'n']
|
||||
cmd.append(lv_path)
|
||||
@ -669,7 +692,7 @@ class LVM(executor.Executor):
|
||||
|
||||
@utils.retry(retry_param=exception.VolumeNotDeactivated, retries=5,
|
||||
backoff_rate=2)
|
||||
def _wait_for_volume_deactivation(self, name):
|
||||
def _wait_for_volume_deactivation(self, name: str) -> None:
|
||||
LOG.debug("Checking to see if volume %s has been deactivated.",
|
||||
name)
|
||||
if self._lv_is_active(name):
|
||||
@ -678,7 +701,10 @@ class LVM(executor.Executor):
|
||||
else:
|
||||
LOG.debug("Volume %s has been deactivated.", name)
|
||||
|
||||
def activate_lv(self, name, is_snapshot=False, permanent=False):
|
||||
def activate_lv(self,
|
||||
name: str,
|
||||
is_snapshot: bool = False,
|
||||
permanent: bool = False) -> None:
|
||||
"""Ensure that logical volume/snapshot logical volume is activated.
|
||||
|
||||
:param name: Name of LV to activate
|
||||
@ -721,7 +747,7 @@ class LVM(executor.Executor):
|
||||
raise
|
||||
|
||||
@utils.retry(putils.ProcessExecutionError)
|
||||
def delete(self, name):
|
||||
def delete(self, name: str) -> None:
|
||||
"""Delete logical volume or snapshot.
|
||||
|
||||
:param name: Name of LV to delete
|
||||
@ -767,7 +793,7 @@ class LVM(executor.Executor):
|
||||
LOG.debug('Successfully deleted volume: %s after '
|
||||
'udev settle.', name)
|
||||
|
||||
def revert(self, snapshot_name):
|
||||
def revert(self, snapshot_name: str) -> None:
|
||||
"""Revert an LV from snapshot.
|
||||
|
||||
:param snapshot_name: Name of snapshot to revert
|
||||
@ -777,7 +803,7 @@ class LVM(executor.Executor):
|
||||
snapshot_name, root_helper=self._root_helper,
|
||||
run_as_root=True)
|
||||
|
||||
def lv_has_snapshot(self, name):
|
||||
def lv_has_snapshot(self, name: str) -> bool:
|
||||
cmd = LVM.LVM_CMD_PREFIX + ['lvdisplay', '--noheading', '-C', '-o',
|
||||
'Attr', '--readonly',
|
||||
'%s/%s' % (self.vg_name, name)]
|
||||
@ -788,7 +814,7 @@ class LVM(executor.Executor):
|
||||
return True
|
||||
return False
|
||||
|
||||
def extend_volume(self, lv_name, new_size):
|
||||
def extend_volume(self, lv_name: str, new_size) -> None:
|
||||
"""Extend the size of an existing volume."""
|
||||
# Volumes with snaps have attributes 'o' or 'O' and will be
|
||||
# deactivated, but Thin Volumes with snaps have attribute 'V'
|
||||
@ -808,7 +834,7 @@ class LVM(executor.Executor):
|
||||
LOG.error('StdErr :%s', err.stderr)
|
||||
raise
|
||||
|
||||
def vg_mirror_free_space(self, mirror_count):
|
||||
def vg_mirror_free_space(self, mirror_count: int) -> float:
|
||||
free_capacity = 0.0
|
||||
|
||||
disks = []
|
||||
@ -829,10 +855,10 @@ class LVM(executor.Executor):
|
||||
|
||||
return free_capacity
|
||||
|
||||
def vg_mirror_size(self, mirror_count):
|
||||
def vg_mirror_size(self, mirror_count: int) -> float:
|
||||
return (self.vg_free_space / (mirror_count + 1))
|
||||
|
||||
def rename_volume(self, lv_name, new_name):
|
||||
def rename_volume(self, lv_name: str, new_name: str) -> None:
|
||||
"""Change the name of an existing volume."""
|
||||
|
||||
try:
|
||||
|
Loading…
Reference in New Issue
Block a user