Merge "mypy: image_utils"
This commit is contained in:
commit
735fc313a1
@ -26,10 +26,13 @@ we should look at maybe pushing this up to Oslo
|
||||
|
||||
import contextlib
|
||||
import errno
|
||||
import io
|
||||
import math
|
||||
import os
|
||||
import re
|
||||
import tempfile
|
||||
from typing import (ContextManager, Dict, Generator, # noqa: H301
|
||||
List, Optional, Tuple)
|
||||
|
||||
import cryptography
|
||||
from cursive import exception as cursive_exception
|
||||
@ -44,6 +47,7 @@ from oslo_utils import timeutils
|
||||
from oslo_utils import units
|
||||
import psutil
|
||||
|
||||
from cinder import context
|
||||
from cinder import exception
|
||||
from cinder.i18n import _
|
||||
from cinder.image import accelerator
|
||||
@ -94,7 +98,8 @@ QEMU_IMG_MIN_CONVERT_LUKS_VERSION = '2.10'
|
||||
COMPRESSIBLE_IMAGE_FORMATS = ('qcow2',)
|
||||
|
||||
|
||||
def validate_stores_id(context, image_service_store_id):
|
||||
def validate_stores_id(context: context.RequestContext,
|
||||
image_service_store_id: str) -> None:
|
||||
image_service = glance.get_default_image_service()
|
||||
stores_info = image_service.get_stores(context)['stores']
|
||||
for info in stores_info:
|
||||
@ -107,19 +112,21 @@ def validate_stores_id(context, image_service_store_id):
|
||||
raise exception.GlanceStoreNotFound(store_id=image_service_store_id)
|
||||
|
||||
|
||||
def fixup_disk_format(disk_format):
|
||||
def fixup_disk_format(disk_format: str) -> str:
|
||||
"""Return the format to be provided to qemu-img convert."""
|
||||
|
||||
return QEMU_IMG_FORMAT_MAP.get(disk_format, disk_format)
|
||||
|
||||
|
||||
def from_qemu_img_disk_format(disk_format):
|
||||
def from_qemu_img_disk_format(disk_format: str) -> str:
|
||||
"""Return the conventional format derived from qemu-img format."""
|
||||
|
||||
return QEMU_IMG_FORMAT_MAP_INV.get(disk_format, disk_format)
|
||||
|
||||
|
||||
def qemu_img_info(path, run_as_root=True, force_share=False):
|
||||
def qemu_img_info(path: str,
|
||||
run_as_root: bool = True,
|
||||
force_share: bool = False) -> imageutils.QemuImgInfo:
|
||||
"""Return an object containing the parsed output from qemu-img info."""
|
||||
cmd = ['env', 'LC_ALL=C', 'qemu-img', 'info']
|
||||
if force_share:
|
||||
@ -145,7 +152,7 @@ def qemu_img_info(path, run_as_root=True, force_share=False):
|
||||
return info
|
||||
|
||||
|
||||
def get_qemu_img_version():
|
||||
def get_qemu_img_version() -> Optional[List[int]]:
|
||||
"""The qemu-img version will be cached until the process is restarted."""
|
||||
|
||||
global QEMU_IMG_VERSION
|
||||
@ -162,15 +169,24 @@ def get_qemu_img_version():
|
||||
return QEMU_IMG_VERSION
|
||||
|
||||
|
||||
def qemu_img_supports_force_share():
|
||||
return get_qemu_img_version() >= QEMU_IMG_MIN_FORCE_SHARE_VERSION
|
||||
def qemu_img_supports_force_share() -> bool:
|
||||
ver = get_qemu_img_version()
|
||||
if ver is None:
|
||||
return False
|
||||
else:
|
||||
return ver >= QEMU_IMG_MIN_FORCE_SHARE_VERSION
|
||||
|
||||
|
||||
def _get_qemu_convert_luks_cmd(src, dest, out_format, src_format=None,
|
||||
out_subformat=None, cache_mode=None,
|
||||
prefix=None, cipher_spec=None,
|
||||
passphrase_file=None,
|
||||
src_passphrase_file=None):
|
||||
def _get_qemu_convert_luks_cmd(src: str,
|
||||
dest: str,
|
||||
out_format: str,
|
||||
src_format: Optional[str] = None,
|
||||
out_subformat: Optional[str] = None,
|
||||
cache_mode: Optional[str] = None,
|
||||
prefix: tuple = None,
|
||||
cipher_spec: Optional[dict] = None,
|
||||
passphrase_file: Optional[str] = None,
|
||||
src_passphrase_file: str = None) -> List[str]:
|
||||
|
||||
cmd = ['qemu-img', 'convert']
|
||||
|
||||
@ -195,11 +211,17 @@ def _get_qemu_convert_luks_cmd(src, dest, out_format, src_format=None,
|
||||
return command
|
||||
|
||||
|
||||
def _get_qemu_convert_cmd(src, dest, out_format, src_format=None,
|
||||
out_subformat=None, cache_mode=None,
|
||||
prefix=None, cipher_spec=None,
|
||||
passphrase_file=None, compress=False,
|
||||
src_passphrase_file=None):
|
||||
def _get_qemu_convert_cmd(src: str,
|
||||
dest: str,
|
||||
out_format: str,
|
||||
src_format: str = None,
|
||||
out_subformat: str = None,
|
||||
cache_mode: str = None,
|
||||
prefix: tuple = None,
|
||||
cipher_spec: dict = None,
|
||||
passphrase_file: str = None,
|
||||
compress: bool = False,
|
||||
src_passphrase_file: str = None) -> List[str]:
|
||||
|
||||
if src_passphrase_file is not None:
|
||||
if passphrase_file is None:
|
||||
@ -246,7 +268,8 @@ def _get_qemu_convert_cmd(src, dest, out_format, src_format=None,
|
||||
# fix the encrypted case.
|
||||
|
||||
if (src_format or '').lower() not in ('', 'ami'):
|
||||
cmd += ('-f', src_format) # prevent detection of format
|
||||
assert src_format is not None
|
||||
cmd += ['-f', src_format] # prevent detection of format
|
||||
|
||||
# NOTE(lyarwood): When converting to LUKS add the cipher spec if present
|
||||
# and create a secret for the passphrase, written to a temp file
|
||||
@ -265,14 +288,15 @@ def _get_qemu_convert_cmd(src, dest, out_format, src_format=None,
|
||||
return cmd
|
||||
|
||||
|
||||
def _get_version_from_string(version_string):
|
||||
def _get_version_from_string(version_string: str) -> List[int]:
|
||||
return [int(x) for x in version_string.split('.')]
|
||||
|
||||
|
||||
def check_qemu_img_version(minimum_version):
|
||||
def check_qemu_img_version(minimum_version: str) -> None:
|
||||
qemu_version = get_qemu_img_version()
|
||||
if (qemu_version is None
|
||||
or qemu_version < _get_version_from_string(minimum_version)):
|
||||
current_version: Optional[str]
|
||||
if qemu_version:
|
||||
current_version = '.'.join((str(element)
|
||||
for element in qemu_version))
|
||||
@ -286,11 +310,17 @@ def check_qemu_img_version(minimum_version):
|
||||
raise exception.VolumeBackendAPIException(data=_msg)
|
||||
|
||||
|
||||
def _convert_image(prefix, source, dest, out_format,
|
||||
out_subformat=None, src_format=None,
|
||||
run_as_root=True, cipher_spec=None,
|
||||
passphrase_file=None, compress=False,
|
||||
src_passphrase_file=None):
|
||||
def _convert_image(prefix: tuple,
|
||||
source: str,
|
||||
dest: str,
|
||||
out_format: str,
|
||||
out_subformat: str = None,
|
||||
src_format: str = None,
|
||||
run_as_root: bool = True,
|
||||
cipher_spec: dict = None,
|
||||
passphrase_file: str = None,
|
||||
compress: bool = False,
|
||||
src_passphrase_file: str = None) -> None:
|
||||
"""Convert image to other format.
|
||||
|
||||
NOTE: If the qemu-img convert command fails and this function raises an
|
||||
@ -320,6 +350,7 @@ def _convert_image(prefix, source, dest, out_format,
|
||||
# flush properly and more efficiently than would be done
|
||||
# setting O_DIRECT, so check for that and skip the
|
||||
# setting for non BLK devs
|
||||
cache_mode: Optional[str]
|
||||
if (utils.is_blk_device(dest) and
|
||||
volume_utils.check_for_odirect_support(source,
|
||||
dest,
|
||||
@ -387,10 +418,17 @@ def _convert_image(prefix, source, dest, out_format,
|
||||
LOG.info(msg, {"sz": fsz_mb, "mbps": mbps})
|
||||
|
||||
|
||||
def convert_image(source, dest, out_format, out_subformat=None,
|
||||
src_format=None, run_as_root=True, throttle=None,
|
||||
cipher_spec=None, passphrase_file=None,
|
||||
compress=False, src_passphrase_file=None):
|
||||
def convert_image(source: str,
|
||||
dest: str,
|
||||
out_format: str,
|
||||
out_subformat: str = None,
|
||||
src_format: str = None,
|
||||
run_as_root: bool = True,
|
||||
throttle=None,
|
||||
cipher_spec: dict = None,
|
||||
passphrase_file: str = None,
|
||||
compress: bool = False,
|
||||
src_passphrase_file: str = None) -> None:
|
||||
if not throttle:
|
||||
throttle = throttling.Throttle.get_default()
|
||||
with throttle.subcommand(source, dest) as throttle_cmd:
|
||||
@ -406,8 +444,12 @@ def convert_image(source, dest, out_format, out_subformat=None,
|
||||
src_passphrase_file=src_passphrase_file)
|
||||
|
||||
|
||||
def resize_image(source, size, run_as_root=False, file_format=None):
|
||||
def resize_image(source: str,
|
||||
size: int,
|
||||
run_as_root: bool = False,
|
||||
file_format: str = None) -> None:
|
||||
"""Changes the virtual size of the image."""
|
||||
cmd: Tuple[str, ...]
|
||||
if file_format:
|
||||
cmd = ('qemu-img', 'resize', '-f', file_format, source, '%sG' % size)
|
||||
else:
|
||||
@ -415,7 +457,7 @@ def resize_image(source, size, run_as_root=False, file_format=None):
|
||||
utils.execute(*cmd, run_as_root=run_as_root)
|
||||
|
||||
|
||||
def _verify_image(img_file, verifier):
|
||||
def _verify_image(img_file: io.RawIOBase, verifier) -> None:
|
||||
# This methods must be called from a native thread, as the file I/O may
|
||||
# not yield to other greenthread in some cases, and since the update and
|
||||
# verify operations are CPU bound there would not be any yielding either,
|
||||
@ -428,7 +470,10 @@ def _verify_image(img_file, verifier):
|
||||
verifier.verify()
|
||||
|
||||
|
||||
def verify_glance_image_signature(context, image_service, image_id, path):
|
||||
def verify_glance_image_signature(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_id: str,
|
||||
path: str) -> bool:
|
||||
verifier = None
|
||||
image_meta = image_service.show(context, image_id)
|
||||
image_properties = image_meta.get('properties', {})
|
||||
@ -489,7 +534,12 @@ def verify_glance_image_signature(context, image_service, image_id, path):
|
||||
return False
|
||||
|
||||
|
||||
def fetch(context, image_service, image_id, path, _user_id, _project_id):
|
||||
def fetch(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_id: str,
|
||||
path: str,
|
||||
_user_id,
|
||||
_project_id) -> None:
|
||||
# TODO(vish): Improve context handling and add owner and auth data
|
||||
# when it is added to glance. Right now there is no
|
||||
# auth checking in glance, so we assume that access was
|
||||
@ -534,8 +584,12 @@ def fetch(context, image_service, image_id, path, _user_id, _project_id):
|
||||
LOG.info(msg, {"sz": fsz_mb, "mbps": mbps})
|
||||
|
||||
|
||||
def get_qemu_data(image_id, has_meta, disk_format_raw, dest, run_as_root,
|
||||
force_share=False):
|
||||
def get_qemu_data(image_id: str,
|
||||
has_meta: bool,
|
||||
disk_format_raw: bool,
|
||||
dest: str,
|
||||
run_as_root: bool,
|
||||
force_share: bool = False) -> imageutils.QemuImgInfo:
|
||||
# We may be on a system that doesn't have qemu-img installed. That
|
||||
# is ok if we are working with a RAW image. This logic checks to see
|
||||
# if qemu-img is installed. If not we make sure the image is RAW and
|
||||
@ -569,7 +623,10 @@ def get_qemu_data(image_id, has_meta, disk_format_raw, dest, run_as_root,
|
||||
return data
|
||||
|
||||
|
||||
def fetch_verify_image(context, image_service, image_id, dest):
|
||||
def fetch_verify_image(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_id: str,
|
||||
dest: str) -> None:
|
||||
fetch(context, image_service, image_id, dest,
|
||||
None, None)
|
||||
image_meta = image_service.show(context, image_id)
|
||||
@ -599,27 +656,46 @@ def fetch_verify_image(context, image_service, image_id, dest):
|
||||
{'fmt': fmt, 'backing_file': backing_file}))
|
||||
|
||||
|
||||
def fetch_to_vhd(context, image_service,
|
||||
image_id, dest, blocksize, volume_subformat=None,
|
||||
user_id=None, project_id=None, run_as_root=True):
|
||||
def fetch_to_vhd(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_id: str,
|
||||
dest: str,
|
||||
blocksize: int,
|
||||
volume_subformat: Optional[str] = None,
|
||||
user_id: Optional[str] = None,
|
||||
project_id: Optional[str] = None,
|
||||
run_as_root: bool = True) -> None:
|
||||
fetch_to_volume_format(context, image_service, image_id, dest, 'vpc',
|
||||
blocksize, volume_subformat=volume_subformat,
|
||||
user_id=user_id, project_id=project_id,
|
||||
run_as_root=run_as_root)
|
||||
|
||||
|
||||
def fetch_to_raw(context, image_service,
|
||||
image_id, dest, blocksize,
|
||||
user_id=None, project_id=None, size=None, run_as_root=True):
|
||||
def fetch_to_raw(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_id: str,
|
||||
dest: str,
|
||||
blocksize: int,
|
||||
user_id: Optional[str] = None,
|
||||
project_id: Optional[str] = None,
|
||||
size: Optional[int] = None,
|
||||
run_as_root: bool = True) -> None:
|
||||
fetch_to_volume_format(context, image_service, image_id, dest, 'raw',
|
||||
blocksize, user_id=user_id, project_id=project_id,
|
||||
size=size, run_as_root=run_as_root)
|
||||
|
||||
|
||||
def fetch_to_volume_format(context, image_service,
|
||||
image_id, dest, volume_format, blocksize,
|
||||
volume_subformat=None, user_id=None,
|
||||
project_id=None, size=None, run_as_root=True):
|
||||
def fetch_to_volume_format(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_id: str,
|
||||
dest: str,
|
||||
volume_format: str,
|
||||
blocksize: int,
|
||||
volume_subformat: Optional[str] = None,
|
||||
user_id: Optional[str] = None,
|
||||
project_id: Optional[str] = None,
|
||||
size: Optional[int] = None,
|
||||
run_as_root: bool = True) -> None:
|
||||
qemu_img = True
|
||||
image_meta = image_service.show(context, image_id)
|
||||
|
||||
@ -722,9 +798,15 @@ def fetch_to_volume_format(context, image_service,
|
||||
run_as_root=run_as_root)
|
||||
|
||||
|
||||
def upload_volume(context, image_service, image_meta, volume_path,
|
||||
volume_format='raw', run_as_root=True, compress=True,
|
||||
store_id=None, base_image_ref=None):
|
||||
def upload_volume(context: context.RequestContext,
|
||||
image_service: glance.GlanceImageService,
|
||||
image_meta: dict,
|
||||
volume_path: str,
|
||||
volume_format: str = 'raw',
|
||||
run_as_root: bool = True,
|
||||
compress: bool = True,
|
||||
store_id: str = None,
|
||||
base_image_ref: Optional[str] = None) -> None:
|
||||
# NOTE: You probably want to use volume_utils.upload_volume(),
|
||||
# not this function.
|
||||
image_id = image_meta['id']
|
||||
@ -791,7 +873,9 @@ def upload_volume(context, image_service, image_meta, volume_path,
|
||||
base_image_ref=base_image_ref)
|
||||
|
||||
|
||||
def check_virtual_size(virtual_size, volume_size, image_id):
|
||||
def check_virtual_size(virtual_size: float,
|
||||
volume_size: int,
|
||||
image_id: str) -> int:
|
||||
virtual_size = int(math.ceil(float(virtual_size) / units.Gi))
|
||||
|
||||
if virtual_size > volume_size:
|
||||
@ -805,7 +889,7 @@ def check_virtual_size(virtual_size, volume_size, image_id):
|
||||
return virtual_size
|
||||
|
||||
|
||||
def check_available_space(dest, image_size, image_id):
|
||||
def check_available_space(dest: str, image_size: int, image_id: str) -> None:
|
||||
# TODO(e0ne): replace psutil with shutil.disk_usage when we drop
|
||||
# Python 2.7 support.
|
||||
if not os.path.isdir(dest):
|
||||
@ -821,50 +905,51 @@ def check_available_space(dest, image_size, image_id):
|
||||
raise exception.ImageTooBig(image_id=image_id, reason=msg)
|
||||
|
||||
|
||||
def is_xenserver_format(image_meta):
|
||||
def is_xenserver_format(image_meta: dict) -> bool:
|
||||
return (
|
||||
image_meta['disk_format'] == 'vhd'
|
||||
and image_meta['container_format'] == 'ovf'
|
||||
)
|
||||
|
||||
|
||||
def set_vhd_parent(vhd_path, parentpath):
|
||||
def set_vhd_parent(vhd_path: str, parentpath: str) -> None:
|
||||
utils.execute('vhd-util', 'modify', '-n', vhd_path, '-p', parentpath)
|
||||
|
||||
|
||||
def extract_targz(archive_name, target):
|
||||
def extract_targz(archive_name: str, target: str) -> None:
|
||||
utils.execute('tar', '-xzf', archive_name, '-C', target)
|
||||
|
||||
|
||||
def fix_vhd_chain(vhd_chain):
|
||||
def fix_vhd_chain(vhd_chain: List[str]) -> None:
|
||||
for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
|
||||
set_vhd_parent(child, parent)
|
||||
|
||||
|
||||
def get_vhd_size(vhd_path):
|
||||
def get_vhd_size(vhd_path: str) -> int:
|
||||
out, _err = utils.execute('vhd-util', 'query', '-n', vhd_path, '-v')
|
||||
return int(out)
|
||||
|
||||
|
||||
def resize_vhd(vhd_path, size, journal):
|
||||
def resize_vhd(vhd_path: str, size: int, journal: str) -> None:
|
||||
utils.execute(
|
||||
'vhd-util', 'resize', '-n', vhd_path, '-s', '%d' % size, '-j', journal)
|
||||
|
||||
|
||||
def coalesce_vhd(vhd_path):
|
||||
def coalesce_vhd(vhd_path: str) -> None:
|
||||
utils.execute(
|
||||
'vhd-util', 'coalesce', '-n', vhd_path)
|
||||
|
||||
|
||||
def create_temporary_file(*args, **kwargs):
|
||||
def create_temporary_file(*args: str, **kwargs: str) -> str:
|
||||
fileutils.ensure_tree(CONF.image_conversion_dir)
|
||||
|
||||
fd, tmp = tempfile.mkstemp(dir=CONF.image_conversion_dir, *args, **kwargs)
|
||||
fd, tmp = tempfile.mkstemp(dir=CONF.image_conversion_dir,
|
||||
*args, **kwargs) # type: ignore
|
||||
os.close(fd)
|
||||
return tmp
|
||||
|
||||
|
||||
def cleanup_temporary_file(backend_name):
|
||||
def cleanup_temporary_file(backend_name: str) -> None:
|
||||
temp_dir = CONF.image_conversion_dir
|
||||
if (not temp_dir or not os.path.exists(temp_dir)):
|
||||
LOG.debug("Configuration image_conversion_dir is None or the path "
|
||||
@ -888,7 +973,7 @@ def cleanup_temporary_file(backend_name):
|
||||
|
||||
|
||||
@contextlib.contextmanager
|
||||
def temporary_file(*args, **kwargs):
|
||||
def temporary_file(*args: str, **kwargs) -> Generator[str, None, None]:
|
||||
tmp = None
|
||||
try:
|
||||
tmp = create_temporary_file(*args, **kwargs)
|
||||
@ -898,13 +983,13 @@ def temporary_file(*args, **kwargs):
|
||||
fileutils.delete_if_exists(tmp)
|
||||
|
||||
|
||||
def temporary_dir():
|
||||
def temporary_dir() -> ContextManager[str]:
|
||||
fileutils.ensure_tree(CONF.image_conversion_dir)
|
||||
|
||||
return utils.tempdir(dir=CONF.image_conversion_dir)
|
||||
|
||||
|
||||
def coalesce_chain(vhd_chain):
|
||||
def coalesce_chain(vhd_chain: List[str]) -> str:
|
||||
for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
|
||||
with temporary_dir() as directory_for_journal:
|
||||
size = get_vhd_size(child)
|
||||
@ -916,7 +1001,7 @@ def coalesce_chain(vhd_chain):
|
||||
return vhd_chain[-1]
|
||||
|
||||
|
||||
def discover_vhd_chain(directory):
|
||||
def discover_vhd_chain(directory: str) -> List[str]:
|
||||
counter = 0
|
||||
chain = []
|
||||
|
||||
@ -931,7 +1016,7 @@ def discover_vhd_chain(directory):
|
||||
return chain
|
||||
|
||||
|
||||
def replace_xenserver_image_with_coalesced_vhd(image_file):
|
||||
def replace_xenserver_image_with_coalesced_vhd(image_file: str) -> None:
|
||||
with temporary_dir() as tempdir:
|
||||
extract_targz(image_file, tempdir)
|
||||
chain = discover_vhd_chain(tempdir)
|
||||
@ -941,7 +1026,7 @@ def replace_xenserver_image_with_coalesced_vhd(image_file):
|
||||
os.rename(coalesced, image_file)
|
||||
|
||||
|
||||
def decode_cipher(cipher_spec, key_size):
|
||||
def decode_cipher(cipher_spec: str, key_size: int) -> Dict[str, str]:
|
||||
"""Decode a dm-crypt style cipher specification string
|
||||
|
||||
The assumed format being cipher-chainmode-ivmode, similar to that
|
||||
@ -967,13 +1052,14 @@ class TemporaryImages(object):
|
||||
This is useful to inspect image contents before conversion.
|
||||
"""
|
||||
|
||||
def __init__(self, image_service):
|
||||
self.temporary_images = {}
|
||||
def __init__(self, image_service: glance.GlanceImageService):
|
||||
self.temporary_images: Dict[str, dict] = {}
|
||||
self.image_service = image_service
|
||||
image_service.temp_images = self
|
||||
|
||||
@staticmethod
|
||||
def for_image_service(image_service):
|
||||
def for_image_service(
|
||||
image_service: glance.GlanceImageService) -> 'TemporaryImages':
|
||||
instance = image_service.temp_images
|
||||
if instance:
|
||||
return instance
|
||||
@ -981,7 +1067,11 @@ class TemporaryImages(object):
|
||||
|
||||
@classmethod
|
||||
@contextlib.contextmanager
|
||||
def fetch(cls, image_service, context, image_id, suffix=''):
|
||||
def fetch(cls,
|
||||
image_service: glance.GlanceImageService,
|
||||
context: context.RequestContext,
|
||||
image_id: str,
|
||||
suffix: Optional[str] = '') -> Generator[str, None, None]:
|
||||
tmp_images = cls.for_image_service(image_service).temporary_images
|
||||
with temporary_file(prefix='image_fetch_%s_' % image_id,
|
||||
suffix=suffix) as tmp:
|
||||
@ -997,7 +1087,7 @@ class TemporaryImages(object):
|
||||
LOG.debug("Temporary image %(id)s for user %(user)s is deleted.",
|
||||
{'id': image_id, 'user': user})
|
||||
|
||||
def get(self, context, image_id):
|
||||
def get(self, context: context.RequestContext, image_id: str):
|
||||
user = context.user_id
|
||||
if not self.temporary_images.get(user):
|
||||
return None
|
||||
|
@ -121,7 +121,7 @@ def check_exclusive_options(**kwargs: dict) -> None:
|
||||
raise exception.InvalidInput(reason=msg)
|
||||
|
||||
|
||||
def execute(*cmd: str, **kwargs) -> Tuple[str, str]:
|
||||
def execute(*cmd: str, **kwargs: Union[bool, str]) -> Tuple[str, str]:
|
||||
"""Convenience wrapper around oslo's execute() method."""
|
||||
if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
|
||||
kwargs['root_helper'] = get_root_helper()
|
||||
|
@ -581,6 +581,7 @@ class VolumeManager(manager.CleanableManager,
|
||||
|
||||
# Keep the image tmp file clean when init host.
|
||||
backend_name = volume_utils.extract_host(self.service_topic_queue)
|
||||
assert backend_name is not None
|
||||
image_utils.cleanup_temporary_file(backend_name)
|
||||
|
||||
# Migrate any ConfKeyManager keys based on fixed_key to the currently
|
||||
|
@ -1,5 +1,6 @@
|
||||
cinder/context.py
|
||||
cinder/i18n.py
|
||||
cinder/image/image_utils.py
|
||||
cinder/exception.py
|
||||
cinder/manager.py
|
||||
cinder/utils.py
|
||||
|
Loading…
Reference in New Issue
Block a user