mypy: image_utils

Change-Id: Ib32e58125be3a2b6d05488807c83d48e920cf57d
This commit is contained in:
Eric Harney 2021-02-05 11:07:19 -05:00
parent 77d7628724
commit 334485dc5f
4 changed files with 165 additions and 73 deletions

View File

@ -26,10 +26,13 @@ we should look at maybe pushing this up to Oslo
import contextlib
import errno
import io
import math
import os
import re
import tempfile
from typing import (ContextManager, Dict, Generator, # noqa: H301
List, Optional, Tuple)
import cryptography
from cursive import exception as cursive_exception
@ -44,6 +47,7 @@ from oslo_utils import timeutils
from oslo_utils import units
import psutil
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.image import accelerator
@ -94,7 +98,8 @@ QEMU_IMG_MIN_CONVERT_LUKS_VERSION = '2.10'
COMPRESSIBLE_IMAGE_FORMATS = ('qcow2',)
def validate_stores_id(context, image_service_store_id):
def validate_stores_id(context: context.RequestContext,
image_service_store_id: str) -> None:
image_service = glance.get_default_image_service()
stores_info = image_service.get_stores(context)['stores']
for info in stores_info:
@ -107,19 +112,21 @@ def validate_stores_id(context, image_service_store_id):
raise exception.GlanceStoreNotFound(store_id=image_service_store_id)
def fixup_disk_format(disk_format):
def fixup_disk_format(disk_format: str) -> str:
"""Return the format to be provided to qemu-img convert."""
return QEMU_IMG_FORMAT_MAP.get(disk_format, disk_format)
def from_qemu_img_disk_format(disk_format):
def from_qemu_img_disk_format(disk_format: str) -> str:
"""Return the conventional format derived from qemu-img format."""
return QEMU_IMG_FORMAT_MAP_INV.get(disk_format, disk_format)
def qemu_img_info(path, run_as_root=True, force_share=False):
def qemu_img_info(path: str,
run_as_root: bool = True,
force_share: bool = False) -> imageutils.QemuImgInfo:
"""Return an object containing the parsed output from qemu-img info."""
cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info']
if force_share:
@ -145,7 +152,7 @@ def qemu_img_info(path, run_as_root=True, force_share=False):
return info
def get_qemu_img_version():
def get_qemu_img_version() -> Optional[List[int]]:
"""The qemu-img version will be cached until the process is restarted."""
global QEMU_IMG_VERSION
@ -162,15 +169,24 @@ def get_qemu_img_version():
return QEMU_IMG_VERSION
def qemu_img_supports_force_share():
return get_qemu_img_version() >= QEMU_IMG_MIN_FORCE_SHARE_VERSION
def qemu_img_supports_force_share() -> bool:
ver = get_qemu_img_version()
if ver is None:
return False
else:
return ver >= QEMU_IMG_MIN_FORCE_SHARE_VERSION
def _get_qemu_convert_luks_cmd(src, dest, out_format, src_format=None,
out_subformat=None, cache_mode=None,
prefix=None, cipher_spec=None,
passphrase_file=None,
src_passphrase_file=None):
def _get_qemu_convert_luks_cmd(src: str,
dest: str,
out_format: str,
src_format: Optional[str] = None,
out_subformat: Optional[str] = None,
cache_mode: Optional[str] = None,
prefix: tuple = None,
cipher_spec: Optional[dict] = None,
passphrase_file: Optional[str] = None,
src_passphrase_file: str = None) -> List[str]:
cmd = ['qemu-img', 'convert']
@ -195,11 +211,17 @@ def _get_qemu_convert_luks_cmd(src, dest, out_format, src_format=None,
return command
def _get_qemu_convert_cmd(src, dest, out_format, src_format=None,
out_subformat=None, cache_mode=None,
prefix=None, cipher_spec=None,
passphrase_file=None, compress=False,
src_passphrase_file=None):
def _get_qemu_convert_cmd(src: str,
dest: str,
out_format: str,
src_format: str = None,
out_subformat: str = None,
cache_mode: str = None,
prefix: tuple = None,
cipher_spec: dict = None,
passphrase_file: str = None,
compress: bool = False,
src_passphrase_file: str = None) -> List[str]:
if src_passphrase_file is not None:
if passphrase_file is None:
@ -246,7 +268,8 @@ def _get_qemu_convert_cmd(src, dest, out_format, src_format=None,
# fix the encrypted case.
if (src_format or '').lower() not in ('', 'ami'):
cmd += ('-f', src_format) # prevent detection of format
assert src_format is not None
cmd += ['-f', src_format] # prevent detection of format
# NOTE(lyarwood): When converting to LUKS add the cipher spec if present
# and create a secret for the passphrase, written to a temp file
@ -265,14 +288,15 @@ def _get_qemu_convert_cmd(src, dest, out_format, src_format=None,
return cmd
def _get_version_from_string(version_string):
def _get_version_from_string(version_string: str) -> List[int]:
return [int(x) for x in version_string.split('.')]
def check_qemu_img_version(minimum_version):
def check_qemu_img_version(minimum_version: str) -> None:
qemu_version = get_qemu_img_version()
if (qemu_version is None
or qemu_version < _get_version_from_string(minimum_version)):
current_version: Optional[str]
if qemu_version:
current_version = '.'.join((str(element)
for element in qemu_version))
@ -286,11 +310,17 @@ def check_qemu_img_version(minimum_version):
raise exception.VolumeBackendAPIException(data=_msg)
def _convert_image(prefix, source, dest, out_format,
out_subformat=None, src_format=None,
run_as_root=True, cipher_spec=None,
passphrase_file=None, compress=False,
src_passphrase_file=None):
def _convert_image(prefix: tuple,
source: str,
dest: str,
out_format: str,
out_subformat: str = None,
src_format: str = None,
run_as_root: bool = True,
cipher_spec: dict = None,
passphrase_file: str = None,
compress: bool = False,
src_passphrase_file: str = None) -> None:
"""Convert image to other format.
NOTE: If the qemu-img convert command fails and this function raises an
@ -320,6 +350,7 @@ def _convert_image(prefix, source, dest, out_format,
# flush properly and more efficiently than would be done
# setting O_DIRECT, so check for that and skip the
# setting for non BLK devs
cache_mode: Optional[str]
if (utils.is_blk_device(dest) and
volume_utils.check_for_odirect_support(source,
dest,
@ -387,10 +418,17 @@ def _convert_image(prefix, source, dest, out_format,
LOG.info(msg, {"sz": fsz_mb, "mbps": mbps})
def convert_image(source, dest, out_format, out_subformat=None,
src_format=None, run_as_root=True, throttle=None,
cipher_spec=None, passphrase_file=None,
compress=False, src_passphrase_file=None):
def convert_image(source: str,
dest: str,
out_format: str,
out_subformat: str = None,
src_format: str = None,
run_as_root: bool = True,
throttle=None,
cipher_spec: dict = None,
passphrase_file: str = None,
compress: bool = False,
src_passphrase_file: str = None) -> None:
if not throttle:
throttle = throttling.Throttle.get_default()
with throttle.subcommand(source, dest) as throttle_cmd:
@ -406,8 +444,12 @@ def convert_image(source, dest, out_format, out_subformat=None,
src_passphrase_file=src_passphrase_file)
def resize_image(source, size, run_as_root=False, file_format=None):
def resize_image(source: str,
size: int,
run_as_root: bool = False,
file_format: str = None) -> None:
"""Changes the virtual size of the image."""
cmd: Tuple[str, ...]
if file_format:
cmd = ('qemu-img', 'resize', '-f', file_format, source, '%sG' % size)
else:
@ -415,7 +457,7 @@ def resize_image(source, size, run_as_root=False, file_format=None):
utils.execute(*cmd, run_as_root=run_as_root)
def _verify_image(img_file, verifier):
def _verify_image(img_file: io.RawIOBase, verifier) -> None:
# This methods must be called from a native thread, as the file I/O may
# not yield to other greenthread in some cases, and since the update and
# verify operations are CPU bound there would not be any yielding either,
@ -428,7 +470,10 @@ def _verify_image(img_file, verifier):
verifier.verify()
def verify_glance_image_signature(context, image_service, image_id, path):
def verify_glance_image_signature(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_id: str,
path: str) -> bool:
verifier = None
image_meta = image_service.show(context, image_id)
image_properties = image_meta.get('properties', {})
@ -489,7 +534,12 @@ def verify_glance_image_signature(context, image_service, image_id, path):
return False
def fetch(context, image_service, image_id, path, _user_id, _project_id):
def fetch(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_id: str,
path: str,
_user_id,
_project_id) -> None:
# TODO(vish): Improve context handling and add owner and auth data
# when it is added to glance. Right now there is no
# auth checking in glance, so we assume that access was
@ -534,8 +584,12 @@ def fetch(context, image_service, image_id, path, _user_id, _project_id):
LOG.info(msg, {"sz": fsz_mb, "mbps": mbps})
def get_qemu_data(image_id, has_meta, disk_format_raw, dest, run_as_root,
force_share=False):
def get_qemu_data(image_id: str,
has_meta: bool,
disk_format_raw: bool,
dest: str,
run_as_root: bool,
force_share: bool = False) -> imageutils.QemuImgInfo:
# We may be on a system that doesn't have qemu-img installed. That
# is ok if we are working with a RAW image. This logic checks to see
# if qemu-img is installed. If not we make sure the image is RAW and
@ -569,7 +623,10 @@ def get_qemu_data(image_id, has_meta, disk_format_raw, dest, run_as_root,
return data
def fetch_verify_image(context, image_service, image_id, dest):
def fetch_verify_image(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_id: str,
dest: str) -> None:
fetch(context, image_service, image_id, dest,
None, None)
image_meta = image_service.show(context, image_id)
@ -599,27 +656,46 @@ def fetch_verify_image(context, image_service, image_id, dest):
{'fmt': fmt, 'backing_file': backing_file}))
def fetch_to_vhd(context, image_service,
image_id, dest, blocksize, volume_subformat=None,
user_id=None, project_id=None, run_as_root=True):
def fetch_to_vhd(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_id: str,
dest: str,
blocksize: int,
volume_subformat: Optional[str] = None,
user_id: Optional[str] = None,
project_id: Optional[str] = None,
run_as_root: bool = True) -> None:
fetch_to_volume_format(context, image_service, image_id, dest, 'vpc',
blocksize, volume_subformat=volume_subformat,
user_id=user_id, project_id=project_id,
run_as_root=run_as_root)
def fetch_to_raw(context, image_service,
image_id, dest, blocksize,
user_id=None, project_id=None, size=None, run_as_root=True):
def fetch_to_raw(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_id: str,
dest: str,
blocksize: int,
user_id: Optional[str] = None,
project_id: Optional[str] = None,
size: Optional[int] = None,
run_as_root: bool = True) -> None:
fetch_to_volume_format(context, image_service, image_id, dest, 'raw',
blocksize, user_id=user_id, project_id=project_id,
size=size, run_as_root=run_as_root)
def fetch_to_volume_format(context, image_service,
image_id, dest, volume_format, blocksize,
volume_subformat=None, user_id=None,
project_id=None, size=None, run_as_root=True):
def fetch_to_volume_format(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_id: str,
dest: str,
volume_format: str,
blocksize: int,
volume_subformat: Optional[str] = None,
user_id: Optional[str] = None,
project_id: Optional[str] = None,
size: Optional[int] = None,
run_as_root: bool = True) -> None:
qemu_img = True
image_meta = image_service.show(context, image_id)
@ -722,9 +798,15 @@ def fetch_to_volume_format(context, image_service,
run_as_root=run_as_root)
def upload_volume(context, image_service, image_meta, volume_path,
volume_format='raw', run_as_root=True, compress=True,
store_id=None, base_image_ref=None):
def upload_volume(context: context.RequestContext,
image_service: glance.GlanceImageService,
image_meta: dict,
volume_path: str,
volume_format: str = 'raw',
run_as_root: bool = True,
compress: bool = True,
store_id: str = None,
base_image_ref: Optional[str] = None) -> None:
# NOTE: You probably want to use volume_utils.upload_volume(),
# not this function.
image_id = image_meta['id']
@ -791,7 +873,9 @@ def upload_volume(context, image_service, image_meta, volume_path,
base_image_ref=base_image_ref)
def check_virtual_size(virtual_size, volume_size, image_id):
def check_virtual_size(virtual_size: float,
volume_size: int,
image_id: str) -> int:
virtual_size = int(math.ceil(float(virtual_size) / units.Gi))
if virtual_size > volume_size:
@ -805,7 +889,7 @@ def check_virtual_size(virtual_size, volume_size, image_id):
return virtual_size
def check_available_space(dest, image_size, image_id):
def check_available_space(dest: str, image_size: int, image_id: str) -> None:
# TODO(e0ne): replace psutil with shutil.disk_usage when we drop
# Python 2.7 support.
if not os.path.isdir(dest):
@ -821,50 +905,51 @@ def check_available_space(dest, image_size, image_id):
raise exception.ImageTooBig(image_id=image_id, reason=msg)
def is_xenserver_format(image_meta):
def is_xenserver_format(image_meta: dict) -> bool:
return (
image_meta['disk_format'] == 'vhd'
and image_meta['container_format'] == 'ovf'
)
def set_vhd_parent(vhd_path, parentpath):
def set_vhd_parent(vhd_path: str, parentpath: str) -> None:
utils.execute('vhd-util', 'modify', '-n', vhd_path, '-p', parentpath)
def extract_targz(archive_name, target):
def extract_targz(archive_name: str, target: str) -> None:
utils.execute('tar', '-xzf', archive_name, '-C', target)
def fix_vhd_chain(vhd_chain):
def fix_vhd_chain(vhd_chain: List[str]) -> None:
for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
set_vhd_parent(child, parent)
def get_vhd_size(vhd_path):
def get_vhd_size(vhd_path: str) -> int:
out, _err = utils.execute('vhd-util', 'query', '-n', vhd_path, '-v')
return int(out)
def resize_vhd(vhd_path, size, journal):
def resize_vhd(vhd_path: str, size: int, journal: str) -> None:
utils.execute(
'vhd-util', 'resize', '-n', vhd_path, '-s', '%d' % size, '-j', journal)
def coalesce_vhd(vhd_path):
def coalesce_vhd(vhd_path: str) -> None:
utils.execute(
'vhd-util', 'coalesce', '-n', vhd_path)
def create_temporary_file(*args, **kwargs):
def create_temporary_file(*args: str, **kwargs: str) -> str:
fileutils.ensure_tree(CONF.image_conversion_dir)
fd, tmp = tempfile.mkstemp(dir=CONF.image_conversion_dir, *args, **kwargs)
fd, tmp = tempfile.mkstemp(dir=CONF.image_conversion_dir,
*args, **kwargs) # type: ignore
os.close(fd)
return tmp
def cleanup_temporary_file(backend_name):
def cleanup_temporary_file(backend_name: str) -> None:
temp_dir = CONF.image_conversion_dir
if (not temp_dir or not os.path.exists(temp_dir)):
LOG.debug("Configuration image_conversion_dir is None or the path "
@ -888,7 +973,7 @@ def cleanup_temporary_file(backend_name):
@contextlib.contextmanager
def temporary_file(*args, **kwargs):
def temporary_file(*args: str, **kwargs) -> Generator[str, None, None]:
tmp = None
try:
tmp = create_temporary_file(*args, **kwargs)
@ -898,13 +983,13 @@ def temporary_file(*args, **kwargs):
fileutils.delete_if_exists(tmp)
def temporary_dir():
def temporary_dir() -> ContextManager[str]:
fileutils.ensure_tree(CONF.image_conversion_dir)
return utils.tempdir(dir=CONF.image_conversion_dir)
def coalesce_chain(vhd_chain):
def coalesce_chain(vhd_chain: List[str]) -> str:
for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
with temporary_dir() as directory_for_journal:
size = get_vhd_size(child)
@ -916,7 +1001,7 @@ def coalesce_chain(vhd_chain):
return vhd_chain[-1]
def discover_vhd_chain(directory):
def discover_vhd_chain(directory: str) -> List[str]:
counter = 0
chain = []
@ -931,7 +1016,7 @@ def discover_vhd_chain(directory):
return chain
def replace_xenserver_image_with_coalesced_vhd(image_file):
def replace_xenserver_image_with_coalesced_vhd(image_file: str) -> None:
with temporary_dir() as tempdir:
extract_targz(image_file, tempdir)
chain = discover_vhd_chain(tempdir)
@ -941,7 +1026,7 @@ def replace_xenserver_image_with_coalesced_vhd(image_file):
os.rename(coalesced, image_file)
def decode_cipher(cipher_spec, key_size):
def decode_cipher(cipher_spec: str, key_size: int) -> Dict[str, str]:
"""Decode a dm-crypt style cipher specification string
The assumed format being cipher-chainmode-ivmode, similar to that
@ -967,13 +1052,14 @@ class TemporaryImages(object):
This is useful to inspect image contents before conversion.
"""
def __init__(self, image_service):
self.temporary_images = {}
def __init__(self, image_service: glance.GlanceImageService):
self.temporary_images: Dict[str, dict] = {}
self.image_service = image_service
image_service.temp_images = self
@staticmethod
def for_image_service(image_service):
def for_image_service(
image_service: glance.GlanceImageService) -> 'TemporaryImages':
instance = image_service.temp_images
if instance:
return instance
@ -981,7 +1067,11 @@ class TemporaryImages(object):
@classmethod
@contextlib.contextmanager
def fetch(cls, image_service, context, image_id, suffix=''):
def fetch(cls,
image_service: glance.GlanceImageService,
context: context.RequestContext,
image_id: str,
suffix: Optional[str] = '') -> Generator[str, None, None]:
tmp_images = cls.for_image_service(image_service).temporary_images
with temporary_file(prefix='image_fetch_%s_' % image_id,
suffix=suffix) as tmp:
@ -997,7 +1087,7 @@ class TemporaryImages(object):
LOG.debug("Temporary image %(id)s for user %(user)s is deleted.",
{'id': image_id, 'user': user})
def get(self, context, image_id):
def get(self, context: context.RequestContext, image_id: str):
user = context.user_id
if not self.temporary_images.get(user):
return None

View File

@ -113,7 +113,7 @@ def check_exclusive_options(**kwargs: dict) -> None:
raise exception.InvalidInput(reason=msg)
def execute(*cmd: str, **kwargs) -> Tuple[str, str]:
def execute(*cmd: str, **kwargs: Union[bool, str]) -> Tuple[str, str]:
"""Convenience wrapper around oslo's execute() method."""
if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
kwargs['root_helper'] = get_root_helper()

View File

@ -582,6 +582,7 @@ class VolumeManager(manager.CleanableManager,
# Keep the image tmp file clean when init host.
backend_name = volume_utils.extract_host(self.service_topic_queue)
assert backend_name is not None
image_utils.cleanup_temporary_file(backend_name)
# Migrate any ConfKeyManager keys based on fixed_key to the currently

View File

@ -1,5 +1,6 @@
cinder/context.py
cinder/i18n.py
cinder/image/image_utils.py
cinder/exception.py
cinder/manager.py
cinder/utils.py