Add type hints to 'nova.virt.libvirt.utils'
The ROI for these util modules is pretty high, given how they are used. Part of blueprint add-emulated-virtual-tpm Change-Id: I46846627311beb21946fd4d24048e0e3ea7ac942 Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
parent
49817d31b1
commit
6f1f71620a
@ -6,3 +6,4 @@ nova/virt/hardware.py
|
||||
nova/virt/libvirt/__init__.py
|
||||
nova/virt/libvirt/driver.py
|
||||
nova/virt/libvirt/host.py
|
||||
nova/virt/libvirt/utils.py
|
||||
|
@ -4157,7 +4157,9 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
if instance.task_state == task_states.RESIZE_FINISH:
|
||||
backend.create_snap(libvirt_utils.RESIZE_SNAPSHOT_NAME)
|
||||
if backend.SUPPORTS_CLONE:
|
||||
def clone_fallback_to_fetch(*args, **kwargs):
|
||||
def clone_fallback_to_fetch(
|
||||
context, target, image_id, trusted_certs=None,
|
||||
):
|
||||
refuse_fetch = (
|
||||
CONF.libvirt.images_type == 'rbd' and
|
||||
CONF.workarounds.never_download_image_if_on_rbd)
|
||||
@ -4176,10 +4178,13 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
'never_download_image_if_on_rbd=True;'
|
||||
' refusing to fetch and upload.',
|
||||
disk_images['image_id'])
|
||||
libvirt_utils.fetch_image(*args, **kwargs)
|
||||
libvirt_utils.fetch_image(
|
||||
context, target, image_id, trusted_certs,
|
||||
)
|
||||
fetch_func = clone_fallback_to_fetch
|
||||
else:
|
||||
fetch_func = libvirt_utils.fetch_image
|
||||
|
||||
self._try_fetch_image_cache(backend, fetch_func, context,
|
||||
root_fname, disk_images['image_id'],
|
||||
instance, size, fallback_from_host)
|
||||
@ -11117,8 +11122,9 @@ class LibvirtDriver(driver.ComputeDriver):
|
||||
caps = deepcopy(self._host.get_capabilities())
|
||||
if cpu.mode in ('host-model', 'host-passthrough'):
|
||||
# Account for features in cpu_model_extra_flags conf
|
||||
host_features = [f.name for f in
|
||||
caps.host.cpu.features | cpu.features]
|
||||
host_features: ty.Set[str] = {
|
||||
f.name for f in caps.host.cpu.features | cpu.features
|
||||
}
|
||||
return libvirt_utils.cpu_features_to_traits(host_features)
|
||||
|
||||
def _resolve_features(cpu):
|
||||
|
@ -21,6 +21,7 @@
|
||||
import errno
|
||||
import os
|
||||
import re
|
||||
import typing as ty
|
||||
import uuid
|
||||
|
||||
import os_traits
|
||||
@ -29,6 +30,7 @@ from oslo_log import log as logging
|
||||
from oslo_utils import fileutils
|
||||
|
||||
import nova.conf
|
||||
from nova import context as nova_context
|
||||
from nova.i18n import _
|
||||
from nova import objects
|
||||
from nova.objects import fields as obj_fields
|
||||
@ -39,6 +41,7 @@ from nova.scheduler import utils as scheduler_utils
|
||||
from nova import utils
|
||||
from nova.virt import images
|
||||
from nova.virt.libvirt import config as vconfig
|
||||
from nova.virt.libvirt import guest as libvirt_guest
|
||||
from nova.virt.libvirt.volume import remotefs
|
||||
|
||||
CONF = nova.conf.CONF
|
||||
@ -96,7 +99,9 @@ CPU_TRAITS_MAPPING = {
|
||||
TRAITS_CPU_MAPPING = {v: k for k, v in CPU_TRAITS_MAPPING.items()}
|
||||
|
||||
|
||||
def create_image(disk_format, path, size):
|
||||
def create_image(
|
||||
disk_format: str, path: str, size: ty.Union[str, int],
|
||||
) -> None:
|
||||
"""Create a disk image
|
||||
|
||||
:param disk_format: Disk image format (as known by qemu-img)
|
||||
@ -111,7 +116,9 @@ def create_image(disk_format, path, size):
|
||||
processutils.execute('qemu-img', 'create', '-f', disk_format, path, size)
|
||||
|
||||
|
||||
def create_cow_image(backing_file, path, size=None):
|
||||
def create_cow_image(
|
||||
backing_file: ty.Optional[str], path: str, size: ty.Optional[int] = None,
|
||||
) -> None:
|
||||
"""Create COW image
|
||||
|
||||
Creates a COW image with the given backing file
|
||||
@ -144,7 +151,9 @@ def create_cow_image(backing_file, path, size=None):
|
||||
processutils.execute(*cmd)
|
||||
|
||||
|
||||
def create_ploop_image(disk_format, path, size, fs_type):
|
||||
def create_ploop_image(
|
||||
disk_format: str, path: str, size: ty.Union[int, str], fs_type: str,
|
||||
) -> None:
|
||||
"""Create ploop image
|
||||
|
||||
:param disk_format: Disk image format (as known by ploop)
|
||||
@ -165,7 +174,9 @@ def create_ploop_image(disk_format, path, size, fs_type):
|
||||
nova.privsep.libvirt.ploop_init(size, disk_format, fs_type, disk_path)
|
||||
|
||||
|
||||
def pick_disk_driver_name(hypervisor_version, is_block_dev=False):
|
||||
def pick_disk_driver_name(
|
||||
hypervisor_version: int, is_block_dev: bool = False,
|
||||
) -> ty.Optional[str]:
|
||||
"""Pick the libvirt primary backend driver name
|
||||
|
||||
If the hypervisor supports multiple backend drivers we have to tell libvirt
|
||||
@ -221,7 +232,7 @@ def pick_disk_driver_name(hypervisor_version, is_block_dev=False):
|
||||
return None
|
||||
|
||||
|
||||
def get_disk_size(path, format=None):
|
||||
def get_disk_size(path: str, format: ty.Optional[str] = None) -> int:
|
||||
"""Get the (virtual) size of a disk image
|
||||
|
||||
:param path: Path to the disk image
|
||||
@ -233,7 +244,9 @@ def get_disk_size(path, format=None):
|
||||
return int(size)
|
||||
|
||||
|
||||
def get_disk_backing_file(path, basename=True, format=None):
|
||||
def get_disk_backing_file(
|
||||
path: str, basename: bool = True, format: ty.Optional[str] = None,
|
||||
) -> ty.Optional[str]:
|
||||
"""Get the backing file of a disk image
|
||||
|
||||
:param path: Path to the disk image
|
||||
@ -246,9 +259,15 @@ def get_disk_backing_file(path, basename=True, format=None):
|
||||
return backing_file
|
||||
|
||||
|
||||
def copy_image(src, dest, host=None, receive=False,
|
||||
on_execute=None, on_completion=None,
|
||||
compression=True):
|
||||
def copy_image(
|
||||
src: str,
|
||||
dest: str,
|
||||
host: ty.Optional[str] = None,
|
||||
receive: bool = False,
|
||||
on_execute: ty.Callable = None,
|
||||
on_completion: ty.Callable = None,
|
||||
compression: bool = True,
|
||||
) -> None:
|
||||
"""Copy a disk image to an existing directory
|
||||
|
||||
:param src: Source image
|
||||
@ -280,7 +299,8 @@ def copy_image(src, dest, host=None, receive=False,
|
||||
compression=compression)
|
||||
|
||||
|
||||
def write_to_file(path, contents):
|
||||
# TODO(stephenfin): This is dumb; remove it.
|
||||
def write_to_file(path: str, contents: str) -> None:
|
||||
"""Write the given contents to a file
|
||||
|
||||
:param path: Destination file
|
||||
@ -290,7 +310,9 @@ def write_to_file(path, contents):
|
||||
f.write(contents)
|
||||
|
||||
|
||||
def chown_for_id_maps(path, id_maps):
|
||||
def chown_for_id_maps(
|
||||
path: str, id_maps: ty.List[vconfig.LibvirtConfigGuestIDMap],
|
||||
) -> None:
|
||||
"""Change ownership of file or directory for an id mapped
|
||||
environment
|
||||
|
||||
@ -304,7 +326,9 @@ def chown_for_id_maps(path, id_maps):
|
||||
nova.privsep.idmapshift.shift(path, uid_maps, gid_maps)
|
||||
|
||||
|
||||
def extract_snapshot(disk_path, source_fmt, out_path, dest_fmt):
|
||||
def extract_snapshot(
|
||||
disk_path: str, source_fmt: str, out_path: str, dest_fmt: str,
|
||||
) -> None:
|
||||
"""Extract a snapshot from a disk image.
|
||||
Note that nobody should write to the disk image during this operation.
|
||||
|
||||
@ -322,7 +346,8 @@ def extract_snapshot(disk_path, source_fmt, out_path, dest_fmt):
|
||||
compress=compress)
|
||||
|
||||
|
||||
def load_file(path):
|
||||
# TODO(stephenfin): This is dumb; remove it.
|
||||
def load_file(path: str) -> str:
|
||||
"""Read contents of file
|
||||
|
||||
:param path: File to read
|
||||
@ -331,6 +356,8 @@ def load_file(path):
|
||||
return fp.read()
|
||||
|
||||
|
||||
# TODO(stephenfin): Remove this; we have suitably powerful mocking abilities
|
||||
# nowadays
|
||||
def file_open(*args, **kwargs):
|
||||
"""Open file
|
||||
|
||||
@ -343,7 +370,7 @@ def file_open(*args, **kwargs):
|
||||
return open(*args, **kwargs)
|
||||
|
||||
|
||||
def find_disk(guest):
|
||||
def find_disk(guest: libvirt_guest.Guest) -> ty.Tuple[str, ty.Optional[str]]:
|
||||
"""Find root device path for instance
|
||||
|
||||
May be file or device
|
||||
@ -384,7 +411,7 @@ def find_disk(guest):
|
||||
return (disk_path, disk_format)
|
||||
|
||||
|
||||
def get_disk_type_from_path(path):
|
||||
def get_disk_type_from_path(path: str) -> ty.Optional[str]:
|
||||
"""Retrieve disk type (raw, qcow2, lvm, ploop) for given file."""
|
||||
if path.startswith('/dev'):
|
||||
return 'lvm'
|
||||
@ -398,7 +425,7 @@ def get_disk_type_from_path(path):
|
||||
return None
|
||||
|
||||
|
||||
def get_fs_info(path):
|
||||
def get_fs_info(path: str) -> ty.Dict[str, int]:
|
||||
"""Get free/used/total space info for a filesystem
|
||||
|
||||
:param path: Any dirent on the filesystem
|
||||
@ -412,12 +439,15 @@ def get_fs_info(path):
|
||||
total = hddinfo.f_frsize * hddinfo.f_blocks
|
||||
free = hddinfo.f_frsize * hddinfo.f_bavail
|
||||
used = hddinfo.f_frsize * (hddinfo.f_blocks - hddinfo.f_bfree)
|
||||
return {'total': total,
|
||||
'free': free,
|
||||
'used': used}
|
||||
return {'total': total, 'free': free, 'used': used}
|
||||
|
||||
|
||||
def fetch_image(context, target, image_id, trusted_certs=None):
|
||||
def fetch_image(
|
||||
context: nova_context.RequestContext,
|
||||
target: str,
|
||||
image_id: str,
|
||||
trusted_certs: ty.Optional['objects.TrustedCerts'] = None,
|
||||
) -> None:
|
||||
"""Grab image.
|
||||
|
||||
:param context: nova.context.RequestContext auth request context
|
||||
@ -428,7 +458,12 @@ def fetch_image(context, target, image_id, trusted_certs=None):
|
||||
images.fetch_to_raw(context, image_id, target, trusted_certs)
|
||||
|
||||
|
||||
def fetch_raw_image(context, target, image_id, trusted_certs=None):
|
||||
def fetch_raw_image(
|
||||
context: nova_context.RequestContext,
|
||||
target: str,
|
||||
image_id: str,
|
||||
trusted_certs: ty.Optional['objects.TrustedCerts'] = None,
|
||||
) -> None:
|
||||
"""Grab initrd or kernel image.
|
||||
|
||||
This function does not attempt raw conversion, as these images will
|
||||
@ -442,7 +477,9 @@ def fetch_raw_image(context, target, image_id, trusted_certs=None):
|
||||
images.fetch(context, image_id, target, trusted_certs)
|
||||
|
||||
|
||||
def get_instance_path(instance, relative=False):
|
||||
def get_instance_path(
|
||||
instance: 'objects.Instance', relative: bool = False,
|
||||
) -> str:
|
||||
"""Determine the correct path for instance storage.
|
||||
|
||||
This method determines the directory name for instance storage.
|
||||
@ -457,7 +494,10 @@ def get_instance_path(instance, relative=False):
|
||||
return os.path.join(CONF.instances_path, instance.uuid)
|
||||
|
||||
|
||||
def get_instance_path_at_destination(instance, migrate_data=None):
|
||||
def get_instance_path_at_destination(
|
||||
instance: 'objects.Instance',
|
||||
migrate_data: ty.Optional['objects.LibvirtLiveMigrateData'] = None,
|
||||
) -> str:
|
||||
"""Get the instance path on destination node while live migration.
|
||||
|
||||
This method determines the directory name for instance storage on
|
||||
@ -484,7 +524,7 @@ def get_instance_path_at_destination(instance, migrate_data=None):
|
||||
return instance_dir
|
||||
|
||||
|
||||
def get_arch(image_meta):
|
||||
def get_arch(image_meta: 'objects.ImageMeta') -> str:
|
||||
"""Determine the architecture of the guest (or host).
|
||||
|
||||
This method determines the CPU architecture that must be supported by
|
||||
@ -504,7 +544,7 @@ def get_arch(image_meta):
|
||||
return obj_fields.Architecture.from_host()
|
||||
|
||||
|
||||
def is_mounted(mount_path, source=None):
|
||||
def is_mounted(mount_path: str, source: ty.Optional[str] = None) -> bool:
|
||||
"""Check if the given source is mounted at given destination point."""
|
||||
if not os.path.ismount(mount_path):
|
||||
return False
|
||||
@ -517,16 +557,16 @@ def is_mounted(mount_path, source=None):
|
||||
return any(mnt[0] == source and mnt[1] == mount_path for mnt in mounts)
|
||||
|
||||
|
||||
def is_valid_hostname(hostname):
|
||||
return re.match(r"^[\w\-\.:]+$", hostname)
|
||||
def is_valid_hostname(hostname: str) -> bool:
|
||||
return bool(re.match(r"^[\w\-\.:]+$", hostname))
|
||||
|
||||
|
||||
def version_to_string(version):
|
||||
def version_to_string(version: ty.Tuple[int, int, int]) -> str:
|
||||
"""Returns string version based on tuple"""
|
||||
return '.'.join([str(x) for x in version])
|
||||
|
||||
|
||||
def cpu_features_to_traits(features):
|
||||
def cpu_features_to_traits(features: ty.Set[str]) -> ty.Dict[str, bool]:
|
||||
"""Returns this driver's CPU traits dict where keys are trait names from
|
||||
CPU_TRAITS_MAPPING, values are boolean indicates whether the trait should
|
||||
be set in the provider tree.
|
||||
@ -539,7 +579,7 @@ def cpu_features_to_traits(features):
|
||||
return traits
|
||||
|
||||
|
||||
def get_cpu_model_from_arch(arch):
|
||||
def get_cpu_model_from_arch(arch: str) -> str:
|
||||
mode = 'qemu64'
|
||||
if arch == obj_fields.Architecture.I686:
|
||||
mode = 'qemu32'
|
||||
@ -552,7 +592,7 @@ def get_cpu_model_from_arch(arch):
|
||||
return mode
|
||||
|
||||
|
||||
def get_machine_type(image_meta):
|
||||
def get_machine_type(image_meta: 'objects.ImageMeta') -> ty.Optional[str]:
|
||||
"""The guest machine type can be set as an image metadata property, or
|
||||
otherwise based on architecture-specific defaults. If no defaults are
|
||||
found then None will be returned. This will ultimately lead to QEMU using
|
||||
@ -565,7 +605,7 @@ def get_machine_type(image_meta):
|
||||
return get_default_machine_type(get_arch(image_meta))
|
||||
|
||||
|
||||
def get_default_machine_type(arch):
|
||||
def get_default_machine_type(arch: str) -> ty.Optional[str]:
|
||||
# NOTE(lyarwood): Values defined in [libvirt]/hw_machine_type take
|
||||
# precedence here if available for the provided arch.
|
||||
for mapping in CONF.libvirt.hw_machine_type or {}:
|
||||
@ -595,21 +635,21 @@ def get_default_machine_type(arch):
|
||||
return default_mtypes.get(arch)
|
||||
|
||||
|
||||
def mdev_name2uuid(mdev_name):
|
||||
def mdev_name2uuid(mdev_name: str) -> str:
|
||||
"""Convert an mdev name (of the form mdev_<uuid_with_underscores>) to a
|
||||
uuid (of the form 8-4-4-4-12).
|
||||
"""
|
||||
return str(uuid.UUID(mdev_name[5:].replace('_', '-')))
|
||||
|
||||
|
||||
def mdev_uuid2name(mdev_uuid):
|
||||
def mdev_uuid2name(mdev_uuid: str) -> str:
|
||||
"""Convert an mdev uuid (of the form 8-4-4-4-12) to a name (of the form
|
||||
mdev_<uuid_with_underscores>).
|
||||
"""
|
||||
return "mdev_" + mdev_uuid.replace('-', '_')
|
||||
|
||||
|
||||
def get_flags_by_flavor_specs(flavor):
|
||||
def get_flags_by_flavor_specs(flavor: 'objects.Flavor') -> ty.Set[str]:
|
||||
req_spec = objects.RequestSpec(flavor=flavor)
|
||||
resource_request = scheduler_utils.ResourceRequest(req_spec)
|
||||
required_traits = resource_request.all_required_traits
|
||||
|
Loading…
Reference in New Issue
Block a user