Merge "mypy: annotate remotefs"

This commit is contained in:
Zuul 2022-05-12 15:49:45 +00:00 committed by Gerrit Code Review
commit eb6ba62473
2 changed files with 229 additions and 121 deletions

View File

@ -26,16 +26,21 @@ import shutil
import string import string
import tempfile import tempfile
import time import time
import typing
from typing import Callable, List, Optional, Tuple, Union # noqa: H301
from castellan import key_manager from castellan import key_manager
from os_brick.remotefs import remotefs as remotefs_brick
from oslo_config import cfg from oslo_config import cfg
from oslo_log import log as logging from oslo_log import log as logging
from oslo_serialization import jsonutils from oslo_serialization import jsonutils
from oslo_utils import imageutils
from oslo_utils.secretutils import md5 from oslo_utils.secretutils import md5
from oslo_utils import units from oslo_utils import units
import six import six
from cinder import compute from cinder import compute
from cinder import context
from cinder import coordination from cinder import coordination
from cinder import db from cinder import db
from cinder import exception from cinder import exception
@ -110,7 +115,7 @@ CONF.register_opts(nas_opts, group=configuration.SHARED_CONF_GROUP)
CONF.register_opts(volume_opts, group=configuration.SHARED_CONF_GROUP) CONF.register_opts(volume_opts, group=configuration.SHARED_CONF_GROUP)
def locked_volume_id_operation(f): def locked_volume_id_operation(f: Callable) -> Callable:
"""Lock decorator for volume operations. """Lock decorator for volume operations.
Takes a named lock prior to executing the operation. The lock is named Takes a named lock prior to executing the operation. The lock is named
@ -154,9 +159,9 @@ class BackingFileTemplate(string.Template):
class RemoteFSDriver(driver.BaseVD): class RemoteFSDriver(driver.BaseVD):
"""Common base for drivers that work like NFS.""" """Common base for drivers that work like NFS."""
driver_volume_type = None driver_volume_type: Optional[str] = None
driver_prefix = 'remotefs' driver_prefix = 'remotefs'
volume_backend_name = None volume_backend_name: Optional[str] = None
vendor_name = 'Open Source' vendor_name = 'Open Source'
SHARE_FORMAT_REGEX = r'.+:/.+' SHARE_FORMAT_REGEX = r'.+:/.+'
@ -168,7 +173,7 @@ class RemoteFSDriver(driver.BaseVD):
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
super(RemoteFSDriver, self).__init__(*args, **kwargs) super(RemoteFSDriver, self).__init__(*args, **kwargs)
self.shares = {} self.shares = {}
self._mounted_shares = [] self._mounted_shares: List[str] = []
self._execute_as_root = True self._execute_as_root = True
self._is_voldb_empty_at_startup = kwargs.pop('is_vol_db_empty', None) self._is_voldb_empty_at_startup = kwargs.pop('is_vol_db_empty', None)
self._supports_encryption = False self._supports_encryption = False
@ -178,12 +183,14 @@ class RemoteFSDriver(driver.BaseVD):
self.configuration.append_config_values(nas_opts) self.configuration.append_config_values(nas_opts)
self.configuration.append_config_values(volume_opts) self.configuration.append_config_values(volume_opts)
def check_for_setup_error(self): def check_for_setup_error(self) -> None:
"""Just to override parent behavior.""" """Just to override parent behavior."""
pass pass
@volume_utils.trace @volume_utils.trace
def initialize_connection(self, volume, connector): def initialize_connection(self,
volume: objects.Volume,
connector: dict) -> dict:
"""Allow connection to connector and return connection info. """Allow connection to connector and return connection info.
:param volume: volume reference :param volume: volume reference
@ -199,7 +206,7 @@ class RemoteFSDriver(driver.BaseVD):
'mount_point_base': self._get_mount_point_base() 'mount_point_base': self._get_mount_point_base()
} }
def do_setup(self, context): def do_setup(self, context: context.RequestContext) -> None:
"""Any initialization the volume driver does while starting.""" """Any initialization the volume driver does while starting."""
super(RemoteFSDriver, self).do_setup(context) super(RemoteFSDriver, self).do_setup(context)
@ -223,7 +230,7 @@ class RemoteFSDriver(driver.BaseVD):
LOG.error(msg) LOG.error(msg)
raise exception.InvalidConfigurationValue(msg) raise exception.InvalidConfigurationValue(msg)
def _get_provisioned_capacity(self): def _get_provisioned_capacity(self) -> float:
"""Returns the provisioned capacity. """Returns the provisioned capacity.
Get the sum of sizes of volumes, snapshots and any other Get the sum of sizes of volumes, snapshots and any other
@ -237,7 +244,7 @@ class RemoteFSDriver(driver.BaseVD):
provisioned_size += int(out.split()[0]) provisioned_size += int(out.split()[0])
return round(provisioned_size / units.Gi, 2) return round(provisioned_size / units.Gi, 2)
def _get_mount_point_base(self): def _get_mount_point_base(self) -> Optional[str]:
"""Returns the mount point base for the remote fs. """Returns the mount point base for the remote fs.
This method facilitates returning mount point base This method facilitates returning mount point base
@ -253,10 +260,10 @@ class RemoteFSDriver(driver.BaseVD):
return None return None
@staticmethod @staticmethod
def _validate_state(current_state, def _validate_state(current_state: str,
acceptable_states, acceptable_states: Union[tuple, list],
obj_description='volume', obj_description='volume',
invalid_exc=exception.InvalidVolume): invalid_exc=exception.InvalidVolume) -> None:
if current_state not in acceptable_states: if current_state not in acceptable_states:
message = _('Invalid %(obj_description)s state. ' message = _('Invalid %(obj_description)s state. '
'Acceptable states for this operation: ' 'Acceptable states for this operation: '
@ -270,7 +277,7 @@ class RemoteFSDriver(driver.BaseVD):
current_state=current_state)) current_state=current_state))
@volume_utils.trace @volume_utils.trace
def create_volume(self, volume): def create_volume(self, volume: objects.Volume) -> dict:
"""Creates a volume. """Creates a volume.
:param volume: volume reference :param volume: volume reference
@ -292,7 +299,7 @@ class RemoteFSDriver(driver.BaseVD):
return {'provider_location': volume.provider_location} return {'provider_location': volume.provider_location}
def _do_create_volume(self, volume): def _do_create_volume(self, volume: objects.Volume) -> None:
"""Create a volume on given remote share. """Create a volume on given remote share.
:param volume: volume reference :param volume: volume reference
@ -331,9 +338,9 @@ class RemoteFSDriver(driver.BaseVD):
with volume.obj_as_admin(): with volume.obj_as_admin():
volume.save() volume.save()
def _ensure_shares_mounted(self): def _ensure_shares_mounted(self) -> None:
"""Look for remote shares in the flags and mount them locally.""" """Look for remote shares in the flags and mount them locally."""
mounted_shares = [] mounted_shares: List[str] = []
self._load_shares_config(getattr(self.configuration, self._load_shares_config(getattr(self.configuration,
self.driver_prefix + self.driver_prefix +
@ -351,7 +358,7 @@ class RemoteFSDriver(driver.BaseVD):
LOG.debug('Available shares %s', self._mounted_shares) LOG.debug('Available shares %s', self._mounted_shares)
@volume_utils.trace @volume_utils.trace
def delete_volume(self, volume): def delete_volume(self, volume: objects.Volume) -> None:
"""Deletes a logical volume. """Deletes a logical volume.
:param volume: volume reference :param volume: volume reference
@ -371,11 +378,16 @@ class RemoteFSDriver(driver.BaseVD):
self._delete(mounted_path) self._delete(mounted_path)
def ensure_export(self, ctx, volume): def ensure_export(self,
ctx: context.RequestContext,
volume: objects.Volume) -> None:
"""Synchronously recreates an export for a logical volume.""" """Synchronously recreates an export for a logical volume."""
self._ensure_share_mounted(volume.provider_location) self._ensure_share_mounted(volume.provider_location)
def create_export(self, ctx, volume, connector): def create_export(self,
ctx: context.RequestContext,
volume: objects.Volume,
connector: dict) -> None:
"""Exports the volume. """Exports the volume.
Can optionally return a dictionary of changes Can optionally return a dictionary of changes
@ -383,11 +395,13 @@ class RemoteFSDriver(driver.BaseVD):
""" """
pass pass
def remove_export(self, ctx, volume): def remove_export(self,
ctx: context.RequestContext,
volume: objects.Volume) -> None:
"""Removes an export for a logical volume.""" """Removes an export for a logical volume."""
pass pass
def delete_snapshot(self, snapshot): def delete_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Delete snapshot. """Delete snapshot.
Do nothing for this driver, but allow manager to handle deletion Do nothing for this driver, but allow manager to handle deletion
@ -395,17 +409,17 @@ class RemoteFSDriver(driver.BaseVD):
""" """
pass pass
def _delete(self, path): def _delete(self, path: str) -> None:
# Note(lpetrut): this method is needed in order to provide # Note(lpetrut): this method is needed in order to provide
# interoperability with Windows as it will be overridden. # interoperability with Windows as it will be overridden.
self._execute('rm', '-f', path, run_as_root=self._execute_as_root) self._execute('rm', '-f', path, run_as_root=self._execute_as_root)
def _create_sparsed_file(self, path, size): def _create_sparsed_file(self, path: str, size: int) -> None:
"""Creates a sparse file of a given size in GiB.""" """Creates a sparse file of a given size in GiB."""
self._execute('truncate', '-s', '%sG' % size, self._execute('truncate', '-s', '%sG' % size,
path, run_as_root=self._execute_as_root) path, run_as_root=self._execute_as_root)
def _create_regular_file(self, path, size): def _create_regular_file(self, path: str, size: int) -> None:
"""Creates a regular file of given size in GiB.""" """Creates a regular file of given size in GiB."""
block_size_mb = 1 block_size_mb = 1
@ -416,7 +430,7 @@ class RemoteFSDriver(driver.BaseVD):
'count=%d' % block_count, 'count=%d' % block_count,
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def _create_qcow2_file(self, path, size_gb): def _create_qcow2_file(self, path: str, size_gb: int) -> None:
"""Creates a QCOW2 file of a given size in GiB.""" """Creates a QCOW2 file of a given size in GiB."""
self._execute('qemu-img', 'create', '-f', 'qcow2', self._execute('qemu-img', 'create', '-f', 'qcow2',
@ -425,10 +439,10 @@ class RemoteFSDriver(driver.BaseVD):
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def _create_encrypted_volume_file(self, def _create_encrypted_volume_file(self,
path, path: str,
size_gb, size_gb: int,
encryption, encryption: dict,
context): context: context.RequestContext) -> None:
"""Create an encrypted volume. """Create an encrypted volume.
This works by creating an encrypted image locally, This works by creating an encrypted image locally,
@ -465,7 +479,7 @@ class RemoteFSDriver(driver.BaseVD):
path, str(size_gb * units.Gi), path, str(size_gb * units.Gi),
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def _set_rw_permissions(self, path): def _set_rw_permissions(self, path: str) -> None:
"""Sets access permissions for given NFS path. """Sets access permissions for given NFS path.
Volume file permissions are set based upon the value of Volume file permissions are set based upon the value of
@ -487,17 +501,17 @@ class RemoteFSDriver(driver.BaseVD):
self._execute('chmod', permissions, path, self._execute('chmod', permissions, path,
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def _set_rw_permissions_for_all(self, path): def _set_rw_permissions_for_all(self, path: str) -> None:
"""Sets 666 permissions for the path.""" """Sets 666 permissions for the path."""
self._execute('chmod', 'ugo+rw', path, self._execute('chmod', 'ugo+rw', path,
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def _set_rw_permissions_for_owner(self, path): def _set_rw_permissions_for_owner(self, path: str) -> None:
"""Sets read-write permissions to the owner for the path.""" """Sets read-write permissions to the owner for the path."""
self._execute('chmod', 'u+rw', path, self._execute('chmod', 'u+rw', path,
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def local_path(self, volume): def local_path(self, volume: objects.Volume) -> str:
"""Get volume path (mounted locally fs path) for given volume. """Get volume path (mounted locally fs path) for given volume.
:param volume: volume reference :param volume: volume reference
@ -506,7 +520,11 @@ class RemoteFSDriver(driver.BaseVD):
return os.path.join(self._get_mount_point_for_share(remotefs_share), return os.path.join(self._get_mount_point_for_share(remotefs_share),
volume.name) volume.name)
def copy_image_to_volume(self, context, volume, image_service, image_id): def copy_image_to_volume(self,
context: context.RequestContext,
volume: objects.Volume,
image_service,
image_id: str) -> None:
"""Fetch the image from image_service and write it to the volume.""" """Fetch the image from image_service and write it to the volume."""
image_utils.fetch_to_raw(context, image_utils.fetch_to_raw(context,
@ -536,7 +554,11 @@ class RemoteFSDriver(driver.BaseVD):
reason=(_("Expected volume size was %d") % volume.size) reason=(_("Expected volume size was %d") % volume.size)
+ (_(" but size is now %d") % virt_size)) + (_(" but size is now %d") % virt_size))
def copy_volume_to_image(self, context, volume, image_service, image_meta): def copy_volume_to_image(self,
context: context.RequestContext,
volume: objects.Volume,
image_service,
image_meta: dict) -> None:
"""Copy the volume to the specified image.""" """Copy the volume to the specified image."""
volume_utils.upload_volume(context, volume_utils.upload_volume(context,
image_service, image_service,
@ -545,12 +567,12 @@ class RemoteFSDriver(driver.BaseVD):
volume, volume,
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def _read_config_file(self, config_file): def _read_config_file(self, config_file: str) -> List[str]:
# Returns list of lines in file # Returns list of lines in file
with open(config_file) as f: with open(config_file) as f:
return f.readlines() return f.readlines()
def _load_shares_config(self, share_file=None): def _load_shares_config(self, share_file: Optional[str] = None) -> None:
self.shares = {} self.shares = {}
if all((self.configuration.nas_host, if all((self.configuration.nas_host,
@ -606,14 +628,17 @@ class RemoteFSDriver(driver.BaseVD):
LOG.debug("shares loaded: %s", self.shares) LOG.debug("shares loaded: %s", self.shares)
def _get_mount_point_for_share(self, path): def _get_mount_point_for_share(self, path: str) -> str:
raise NotImplementedError() raise NotImplementedError()
def terminate_connection(self, volume, connector, **kwargs): def terminate_connection(self,
volume: objects.Volume,
connector: dict,
**kwargs) -> None:
"""Disallow connection from connector.""" """Disallow connection from connector."""
pass pass
def _update_volume_stats(self): def _update_volume_stats(self) -> None:
"""Retrieve stats info from volume group.""" """Retrieve stats info from volume group."""
data = {} data = {}
@ -638,16 +663,16 @@ class RemoteFSDriver(driver.BaseVD):
data['QoS_support'] = False data['QoS_support'] = False
self._stats = data self._stats = data
def _get_capacity_info(self, share): def _get_capacity_info(self, share: str):
raise NotImplementedError() raise NotImplementedError()
def _find_share(self, volume): def _find_share(self, volume: objects.Volume):
raise NotImplementedError() raise NotImplementedError()
def _ensure_share_mounted(self, share): def _ensure_share_mounted(self, share: str):
raise NotImplementedError() raise NotImplementedError()
def secure_file_operations_enabled(self): def secure_file_operations_enabled(self) -> bool:
"""Determine if driver is operating in Secure File Operations mode. """Determine if driver is operating in Secure File Operations mode.
The Cinder Volume driver needs to query if this driver is operating The Cinder Volume driver needs to query if this driver is operating
@ -657,7 +682,7 @@ class RemoteFSDriver(driver.BaseVD):
return True return True
return False return False
def set_nas_security_options(self, is_new_cinder_install): def set_nas_security_options(self, is_new_cinder_install: bool) -> None:
"""Determine the setting to use for Secure NAS options. """Determine the setting to use for Secure NAS options.
This method must be overridden by child wishing to use secure This method must be overridden by child wishing to use secure
@ -680,8 +705,11 @@ class RemoteFSDriver(driver.BaseVD):
"information on a secure NFS configuration.", "information on a secure NFS configuration.",
doc_html) doc_html)
def _determine_nas_security_option_setting(self, nas_option, mount_point, def _determine_nas_security_option_setting(
is_new_cinder_install): self,
nas_option: str,
mount_point: str,
is_new_cinder_install: bool) -> str:
"""Determine NAS security option setting when 'auto' is assigned. """Determine NAS security option setting when 'auto' is assigned.
This method determines the final 'true'/'false' setting of an NAS This method determines the final 'true'/'false' setting of an NAS
@ -746,7 +774,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
_local_volume_dir(self, volume) _local_volume_dir(self, volume)
""" """
_VALID_IMAGE_EXTENSIONS = [] _VALID_IMAGE_EXTENSIONS: List[str] = []
# The following flag may be overridden by the concrete drivers in order # The following flag may be overridden by the concrete drivers in order
# to avoid using temporary volume snapshots when creating volume clones, # to avoid using temporary volume snapshots when creating volume clones,
# when possible. # when possible.
@ -754,44 +782,44 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
_always_use_temp_snap_when_cloning = True _always_use_temp_snap_when_cloning = True
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._remotefsclient = None self._remotefsclient: remotefs_brick.RemoteFsClient = None
self.base = None self.base: Optional[str] = None
self._nova = None self._nova: Optional[db.base.Base] = None
super(RemoteFSSnapDriverBase, self).__init__(*args, **kwargs) super(RemoteFSSnapDriverBase, self).__init__(*args, **kwargs)
def do_setup(self, context): def do_setup(self, context: context.RequestContext) -> None:
super(RemoteFSSnapDriverBase, self).do_setup(context) super(RemoteFSSnapDriverBase, self).do_setup(context)
self._nova = compute.API() self._nova = compute.API()
def snapshot_revert_use_temp_snapshot(self): def snapshot_revert_use_temp_snapshot(self) -> bool:
# Considering that RemoteFS based drivers use COW images # Considering that RemoteFS based drivers use COW images
# for storing snapshots, having chains of such images, # for storing snapshots, having chains of such images,
# creating a backup snapshot when reverting one is not # creating a backup snapshot when reverting one is not
# actutally helpful. # actutally helpful.
return False return False
def _local_volume_dir(self, volume): def _local_volume_dir(self, volume: objects.Volume) -> str:
share = volume.provider_location share = volume.provider_location
local_dir = self._get_mount_point_for_share(share) local_dir = self._get_mount_point_for_share(share)
return local_dir return local_dir
def _local_path_volume(self, volume): def _local_path_volume(self, volume: objects.Volume) -> str:
path_to_disk = os.path.join( path_to_disk = os.path.join(
self._local_volume_dir(volume), self._local_volume_dir(volume),
volume.name) volume.name)
return path_to_disk return path_to_disk
def _get_new_snap_path(self, snapshot): def _get_new_snap_path(self, snapshot: objects.Snapshot) -> str:
vol_path = self.local_path(snapshot.volume) vol_path = self.local_path(snapshot.volume)
snap_path = '%s.%s' % (vol_path, snapshot.id) snap_path = '%s.%s' % (vol_path, snapshot.id)
return snap_path return snap_path
def _local_path_volume_info(self, volume): def _local_path_volume_info(self, volume: objects.Volume) -> str:
return '%s%s' % (self.local_path(volume), '.info') return '%s%s' % (self.local_path(volume), '.info')
def _read_file(self, filename): def _read_file(self, filename: str) -> str:
"""This method is to make it easier to stub out code for testing. """This method is to make it easier to stub out code for testing.
Returns a string representing the contents of the file. Returns a string representing the contents of the file.
@ -800,7 +828,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
with open(filename, 'r') as f: with open(filename, 'r') as f:
return f.read() return f.read()
def _write_info_file(self, info_path, snap_info): def _write_info_file(self, info_path: str, snap_info: dict) -> None:
if 'active' not in snap_info.keys(): if 'active' not in snap_info.keys():
msg = _("'active' must be present when writing snap_info.") msg = _("'active' must be present when writing snap_info.")
raise exception.RemoteFSException(msg) raise exception.RemoteFSException(msg)
@ -815,10 +843,13 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
with open(info_path, 'w') as f: with open(info_path, 'w') as f:
json.dump(snap_info, f, indent=1, sort_keys=True) json.dump(snap_info, f, indent=1, sort_keys=True)
def _qemu_img_info_base(self, path, volume_name, basedir, def _qemu_img_info_base(self,
path: str,
volume_name: str,
basedir: str,
ext_bf_template=None, ext_bf_template=None,
force_share=False, force_share=False,
run_as_root=False): run_as_root=False) -> imageutils.QemuImgInfo:
"""Sanitize image_utils' qemu_img_info. """Sanitize image_utils' qemu_img_info.
This code expects to deal only with relative filenames. This code expects to deal only with relative filenames.
@ -871,10 +902,13 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return info return info
def _qemu_img_info(self, path, volume_name): def _qemu_img_info(self, path: str, volume_name: str):
raise NotImplementedError() raise NotImplementedError()
def _img_commit(self, path, passphrase_file=None, backing_file=None): def _img_commit(self,
path: str,
passphrase_file: Optional[str] = None,
backing_file: Optional[str] = None) -> None:
# TODO(eharney): this is not using the correct permissions for # TODO(eharney): this is not using the correct permissions for
# NFS snapshots # NFS snapshots
# It needs to run as root for volumes attached to instances, but # It needs to run as root for volumes attached to instances, but
@ -901,8 +935,11 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
self._execute(*cmd, run_as_root=self._execute_as_root) self._execute(*cmd, run_as_root=self._execute_as_root)
self._delete(path) self._delete(path)
def _rebase_img(self, image, backing_file, volume_format, def _rebase_img(self,
passphrase_file=None): image: str,
backing_file: str,
volume_format: str,
passphrase_file: Optional[str] = None) -> None:
# qemu-img create must run as root, because it reads from the # qemu-img create must run as root, because it reads from the
# backing file, which will be owned by qemu:qemu if attached to an # backing file, which will be owned by qemu:qemu if attached to an
# instance. # instance.
@ -922,7 +959,9 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
self._execute(*command, run_as_root=self._execute_as_root) self._execute(*command, run_as_root=self._execute_as_root)
def _read_info_file(self, info_path, empty_if_missing=False): def _read_info_file(self,
info_path: str,
empty_if_missing: bool = False) -> dict:
"""Return dict of snapshot information. """Return dict of snapshot information.
:param info_path: path to file :param info_path: path to file
@ -935,7 +974,8 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return json.loads(self._read_file(info_path)) return json.loads(self._read_file(info_path))
def _get_higher_image_path(self, snapshot): def _get_higher_image_path(self,
snapshot: objects.Snapshot) -> Optional[str]:
volume = snapshot.volume volume = snapshot.volume
info_path = self._local_path_volume_info(volume) info_path = self._local_path_volume_info(volume)
snap_info = self._read_info_file(info_path) snap_info = self._read_info_file(info_path)
@ -954,7 +994,9 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
None) None)
return higher_file return higher_file
def _get_backing_chain_for_path(self, volume, path): def _get_backing_chain_for_path(self,
volume: objects.Volume,
path: str) -> List[dict]:
"""Returns list of dicts containing backing-chain information. """Returns list of dicts containing backing-chain information.
Includes 'filename', and 'backing-filename' for each Includes 'filename', and 'backing-filename' for each
@ -990,7 +1032,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return output return output
def _get_hash_str(self, base_str): def _get_hash_str(self, base_str) -> str:
"""Return a string that represents hash of base_str. """Return a string that represents hash of base_str.
Returns string in a hex format. Returns string in a hex format.
@ -999,14 +1041,14 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
base_str = base_str.encode('utf-8') base_str = base_str.encode('utf-8')
return md5(base_str, usedforsecurity=False).hexdigest() return md5(base_str, usedforsecurity=False).hexdigest()
def _get_mount_point_for_share(self, share): def _get_mount_point_for_share(self, share: str) -> str:
"""Return mount point for share. """Return mount point for share.
:param share: example 172.18.194.100:/var/fs :param share: example 172.18.194.100:/var/fs
""" """
return self._remotefsclient.get_mount_point(share) return self._remotefsclient.get_mount_point(share)
def _get_available_capacity(self, share): def _get_available_capacity(self, share: str) -> Tuple[int, int]:
"""Calculate available space on the share. """Calculate available space on the share.
:param share: example 172.18.194.100:/var/fs :param share: example 172.18.194.100:/var/fs
@ -1023,15 +1065,20 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return available, size return available, size
def _get_capacity_info(self, remotefs_share): def _get_capacity_info(self,
remotefs_share: str) -> Tuple[int, int, int]:
available, size = self._get_available_capacity(remotefs_share) available, size = self._get_available_capacity(remotefs_share)
return size, available, size - available return size, available, size - available
def _get_mount_point_base(self): def _get_mount_point_base(self) -> Optional[str]:
return self.base return self.base
def _copy_volume_to_image(self, context, volume, image_service, def _copy_volume_to_image(self,
image_meta, store_id=None): context: context.RequestContext,
volume: objects.Volume,
image_service,
image_meta: dict,
store_id: Optional[str] = None) -> None:
"""Copy the volume to the specified image.""" """Copy the volume to the specified image."""
# If snapshots exist, flatten to a temporary image, and upload it # If snapshots exist, flatten to a temporary image, and upload it
@ -1066,7 +1113,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
volume, volume,
run_as_root=self._execute_as_root) run_as_root=self._execute_as_root)
def get_active_image_from_info(self, volume): def get_active_image_from_info(self, volume: objects.Volume) -> str:
"""Returns filename of the active image from the info file.""" """Returns filename of the active image from the info file."""
info_file = self._local_path_volume_info(volume) info_file = self._local_path_volume_info(volume)
@ -1080,14 +1127,14 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return snap_info['active'] return snap_info['active']
def _local_path_active_image(self, volume): def _local_path_active_image(self, volume: objects.Volume) -> str:
active_fname = self.get_active_image_from_info(volume) active_fname = self.get_active_image_from_info(volume)
vol_dir = self._local_volume_dir(volume) vol_dir = self._local_volume_dir(volume)
active_fpath = os.path.join(vol_dir, active_fname) active_fpath = os.path.join(vol_dir, active_fname)
return active_fpath return active_fpath
def _get_snapshot_backing_file(self, snapshot): def _get_snapshot_backing_file(self, snapshot: objects.Snapshot) -> str:
info_path = self._local_path_volume_info(snapshot.volume) info_path = self._local_path_volume_info(snapshot.volume)
snap_info = self._read_info_file(info_path) snap_info = self._read_info_file(info_path)
vol_dir = self._local_volume_dir(snapshot.volume) vol_dir = self._local_volume_dir(snapshot.volume)
@ -1097,10 +1144,11 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
# Find the file which backs this file, which represents the point # Find the file which backs this file, which represents the point
# in which this snapshot was created. # in which this snapshot was created.
img_info = self._qemu_img_info(forward_path) # TODO: something is wrong here
img_info = self._qemu_img_info(forward_path) # type: ignore
return img_info.backing_file return img_info.backing_file
def _snapshots_exist(self, volume): def _snapshots_exist(self, volume: objects.Volume) -> bool:
if not volume.provider_location: if not volume.provider_location:
return False return False
@ -1109,10 +1157,13 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return not utils.paths_normcase_equal(active_fpath, base_vol_path) return not utils.paths_normcase_equal(active_fpath, base_vol_path)
def _is_volume_attached(self, volume): def _is_volume_attached(self, volume: objects.Volume) -> bool:
return volume.attach_status == fields.VolumeAttachStatus.ATTACHED return volume.attach_status == fields.VolumeAttachStatus.ATTACHED
def _create_cloned_volume(self, volume, src_vref, context): def _create_cloned_volume(self,
volume: objects.Volume,
src_vref: objects.Volume,
context: context.RequestContext) -> dict:
LOG.info('Cloning volume %(src)s to volume %(dst)s', LOG.info('Cloning volume %(src)s to volume %(dst)s',
{'src': src_vref.id, {'src': src_vref.id,
'dst': volume.id}) 'dst': volume.id})
@ -1125,9 +1176,12 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
volume_name = CONF.volume_name_template % volume.id volume_name = CONF.volume_name_template % volume.id
# Create fake volume and snapshot objects # Create fake volume and snapshot objects
vol_attrs = ['provider_location', 'size', 'id', 'name', 'status', Volume = collections.namedtuple('Volume',
'volume_type', 'metadata', 'obj_context'] ('provider_location', 'size', 'id',
Volume = collections.namedtuple('Volume', vol_attrs) 'name', 'status',
'volume_type', 'metadata',
'obj_context'))
volume_info = Volume(provider_location=src_vref.provider_location, volume_info = Volume(provider_location=src_vref.provider_location,
size=src_vref.size, size=src_vref.size,
id=volume.id, id=volume.id,
@ -1185,11 +1239,11 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
volume.save() volume.save()
return {'provider_location': src_vref.provider_location} return {'provider_location': src_vref.provider_location}
def _copy_volume_image(self, src_path, dest_path): def _copy_volume_image(self, src_path: str, dest_path: str) -> None:
shutil.copyfile(src_path, dest_path) shutil.copyfile(src_path, dest_path)
self._set_rw_permissions(dest_path) self._set_rw_permissions(dest_path)
def _delete_stale_snapshot(self, snapshot): def _delete_stale_snapshot(self, snapshot: objects.Snapshot) -> None:
info_path = self._local_path_volume_info(snapshot.volume) info_path = self._local_path_volume_info(snapshot.volume)
snap_info = self._read_info_file(info_path) snap_info = self._read_info_file(info_path)
@ -1205,7 +1259,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
del(snap_info[snapshot.id]) del(snap_info[snapshot.id])
self._write_info_file(info_path, snap_info) self._write_info_file(info_path, snap_info)
def _delete_snapshot(self, snapshot): def _delete_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Delete a snapshot. """Delete a snapshot.
If volume status is 'available', delete snapshot here in Cinder If volume status is 'available', delete snapshot here in Cinder
@ -1382,7 +1436,9 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
del(snap_info[snapshot.id]) del(snap_info[snapshot.id])
self._write_info_file(info_path, snap_info) self._write_info_file(info_path, snap_info)
def _create_volume_from_snapshot(self, volume, snapshot): def _create_volume_from_snapshot(self,
volume: objects.Volume,
snapshot: objects.Snapshot) -> dict:
"""Creates a volume from a snapshot. """Creates a volume from a snapshot.
Snapshot must not be the active snapshot. (offline) Snapshot must not be the active snapshot. (offline)
@ -1411,13 +1467,19 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
return {'provider_location': volume.provider_location} return {'provider_location': volume.provider_location}
def _copy_volume_from_snapshot(self, snapshot, volume, volume_size, def _copy_volume_from_snapshot(
src_encryption_key_id=None, self,
new_encryption_key_id=None): snapshot: objects.Snapshot,
volume: objects.Volume,
volume_size: int,
src_encryption_key_id: Optional[str] = None,
new_encryption_key_id: Optional[str] = None):
raise NotImplementedError() raise NotImplementedError()
def _do_create_snapshot(self, snapshot, backing_filename, def _do_create_snapshot(self,
new_snap_path): snapshot: objects.Snapshot,
backing_filename: str,
new_snap_path: str) -> None:
"""Create a QCOW2 file backed by another file. """Create a QCOW2 file backed by another file.
:param snapshot: snapshot reference :param snapshot: snapshot reference
@ -1485,7 +1547,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
cipher_spec = image_utils.decode_cipher(encryption['cipher'], cipher_spec = image_utils.decode_cipher(encryption['cipher'],
encryption['key_size']) encryption['key_size'])
command = ('qemu-img', 'create', '-f' 'qcow2', command = ['qemu-img', 'create', '-f' 'qcow2',
'-o', 'encrypt.format=luks,encrypt.key-secret=s1,' '-o', 'encrypt.format=luks,encrypt.key-secret=s1,'
'encrypt.cipher-alg=%(cipher_alg)s,' 'encrypt.cipher-alg=%(cipher_alg)s,'
'encrypt.cipher-mode=%(cipher_mode)s,' 'encrypt.cipher-mode=%(cipher_mode)s,'
@ -1493,7 +1555,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
'-b', 'json:' + file_json, '-b', 'json:' + file_json,
'--object', 'secret,id=s0,file=' + tmp_key.name, '--object', 'secret,id=s0,file=' + tmp_key.name,
'--object', 'secret,id=s1,file=' + tmp_key.name, '--object', 'secret,id=s1,file=' + tmp_key.name,
new_snap_path) new_snap_path]
self._execute(*command, run_as_root=self._execute_as_root) self._execute(*command, run_as_root=self._execute_as_root)
command_path = 'encrypt.key-secret=s0,file.filename=' command_path = 'encrypt.key-secret=s0,file.filename='
@ -1523,7 +1585,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
new_snap_path] new_snap_path]
self._execute(*command, run_as_root=self._execute_as_root) self._execute(*command, run_as_root=self._execute_as_root)
def _create_snapshot(self, snapshot): def _create_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Create a snapshot. """Create a snapshot.
If volume is attached, call to Nova to create snapshot, providing a If volume is attached, call to Nova to create snapshot, providing a
@ -1671,8 +1733,10 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
snap_info[snapshot.id] = active snap_info[snapshot.id] = active
self._write_info_file(info_path, snap_info) self._write_info_file(info_path, snap_info)
def _create_snapshot_online(self, snapshot, backing_filename, def _create_snapshot_online(self,
new_snap_path): snapshot: objects.Snapshot,
backing_filename: str,
new_snap_path: str) -> None:
# Perform online snapshot via Nova # Perform online snapshot via Nova
self._do_create_snapshot(snapshot, self._do_create_snapshot(snapshot,
backing_filename, backing_filename,
@ -1685,6 +1749,7 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
} }
try: try:
assert self._nova is not None
result = self._nova.create_volume_snapshot( result = self._nova.create_volume_snapshot(
snapshot.obj_context, snapshot.obj_context,
snapshot.volume_id, snapshot.volume_id,
@ -1740,7 +1805,10 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
'for creation of snapshot %s.') % snapshot.id 'for creation of snapshot %s.') % snapshot.id
raise exception.RemoteFSException(msg) raise exception.RemoteFSException(msg)
def _delete_snapshot_online(self, context, snapshot, info): def _delete_snapshot_online(self,
context: context.RequestContext,
snapshot: objects.Snapshot,
info: dict) -> None:
# Update info over the course of this method # Update info over the course of this method
# active file never changes # active file never changes
info_path = self._local_path_volume_info(snapshot.volume) info_path = self._local_path_volume_info(snapshot.volume)
@ -1796,8 +1864,12 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
self._local_volume_dir(snapshot.volume), file_to_delete) self._local_volume_dir(snapshot.volume), file_to_delete)
self._delete(path_to_delete) self._delete(path_to_delete)
def _nova_assisted_vol_snap_delete(self, context, snapshot, delete_info): def _nova_assisted_vol_snap_delete(self,
context: context.RequestContext,
snapshot: objects.Snapshot,
delete_info: dict) -> None:
try: try:
assert self._nova is not None
self._nova.delete_volume_snapshot( self._nova.delete_volume_snapshot(
context, context,
snapshot.id, snapshot.id,
@ -1843,51 +1915,65 @@ class RemoteFSSnapDriverBase(RemoteFSDriver):
{'id': snapshot.id} {'id': snapshot.id}
raise exception.RemoteFSException(msg) raise exception.RemoteFSException(msg)
def _extend_volume(self, volume, size_gb): def _extend_volume(self, volume: objects.Volume, size_gb: int):
raise NotImplementedError() raise NotImplementedError()
def _revert_to_snapshot(self, context, volume, snapshot): def _revert_to_snapshot(self,
context: context.RequestContext,
volume: objects.Volume,
snapshot: objects.Snapshot):
raise NotImplementedError() raise NotImplementedError()
class RemoteFSSnapDriver(RemoteFSSnapDriverBase): class RemoteFSSnapDriver(RemoteFSSnapDriverBase):
@locked_volume_id_operation @locked_volume_id_operation
def create_snapshot(self, snapshot): def create_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Apply locking to the create snapshot operation.""" """Apply locking to the create snapshot operation."""
return self._create_snapshot(snapshot) return self._create_snapshot(snapshot)
@locked_volume_id_operation @locked_volume_id_operation
def delete_snapshot(self, snapshot): def delete_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Apply locking to the delete snapshot operation.""" """Apply locking to the delete snapshot operation."""
return self._delete_snapshot(snapshot) return self._delete_snapshot(snapshot)
@locked_volume_id_operation @locked_volume_id_operation
def create_volume_from_snapshot(self, volume, snapshot): def create_volume_from_snapshot(self,
volume: objects.Volume,
snapshot: objects.Snapshot) -> dict:
return self._create_volume_from_snapshot(volume, snapshot) return self._create_volume_from_snapshot(volume, snapshot)
# TODO: should be locking on src_vref id -- bug #1852449 # TODO: should be locking on src_vref id -- bug #1852449
@locked_volume_id_operation @locked_volume_id_operation
def create_cloned_volume(self, volume, src_vref): def create_cloned_volume(self,
volume: objects.Volume,
src_vref: objects.Volume) -> dict:
"""Creates a clone of the specified volume.""" """Creates a clone of the specified volume."""
return self._create_cloned_volume(volume, src_vref, return self._create_cloned_volume(volume, src_vref,
src_vref.obj_context) src_vref.obj_context)
@locked_volume_id_operation @locked_volume_id_operation
def copy_volume_to_image(self, context, volume, image_service, image_meta): def copy_volume_to_image(self,
context: context.RequestContext,
volume: objects.Volume,
image_service,
image_meta: dict) -> None:
"""Copy the volume to the specified image.""" """Copy the volume to the specified image."""
return self._copy_volume_to_image(context, volume, image_service, return self._copy_volume_to_image(context, volume, image_service,
image_meta) image_meta)
@locked_volume_id_operation @locked_volume_id_operation
def extend_volume(self, volume, size_gb): def extend_volume(self, volume: objects.Volume, size_gb: int) -> None:
return self._extend_volume(volume, size_gb) return self._extend_volume(volume, size_gb)
@locked_volume_id_operation @locked_volume_id_operation
def revert_to_snapshot(self, context, volume, snapshot): def revert_to_snapshot(self,
context: context.RequestContext,
volume: objects.Volume,
snapshot: objects.Snapshot) -> None:
"""Revert to specified snapshot.""" """Revert to specified snapshot."""
return self._revert_to_snapshot(context, volume, snapshot) return self._revert_to_snapshot(context, volume, snapshot)
@ -1895,43 +1981,54 @@ class RemoteFSSnapDriver(RemoteFSSnapDriverBase):
class RemoteFSSnapDriverDistributed(RemoteFSSnapDriverBase): class RemoteFSSnapDriverDistributed(RemoteFSSnapDriverBase):
@coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}') @coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}')
def create_snapshot(self, snapshot): def create_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Apply locking to the create snapshot operation.""" """Apply locking to the create snapshot operation."""
return self._create_snapshot(snapshot) return self._create_snapshot(snapshot)
@coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}') @coordination.synchronized('{self.driver_prefix}-{snapshot.volume.id}')
def delete_snapshot(self, snapshot): def delete_snapshot(self, snapshot: objects.Snapshot) -> None:
"""Apply locking to the delete snapshot operation.""" """Apply locking to the delete snapshot operation."""
return self._delete_snapshot(snapshot) return self._delete_snapshot(snapshot)
@coordination.synchronized('{self.driver_prefix}-{volume.id}') @coordination.synchronized('{self.driver_prefix}-{volume.id}')
def create_volume_from_snapshot(self, volume, snapshot): def create_volume_from_snapshot(self,
volume: objects.Volume,
snapshot: objects.Snapshot) -> dict:
return self._create_volume_from_snapshot(volume, snapshot) return self._create_volume_from_snapshot(volume, snapshot)
# lock the source volume id first # lock the source volume id first
@coordination.synchronized('{self.driver_prefix}-{src_vref.id}') @coordination.synchronized('{self.driver_prefix}-{src_vref.id}')
@coordination.synchronized('{self.driver_prefix}-{volume.id}') @coordination.synchronized('{self.driver_prefix}-{volume.id}')
def create_cloned_volume(self, volume, src_vref): def create_cloned_volume(self,
volume: objects.Volume,
src_vref: objects.Volume) -> dict:
"""Creates a clone of the specified volume.""" """Creates a clone of the specified volume."""
return self._create_cloned_volume(volume, src_vref, return self._create_cloned_volume(volume, src_vref,
src_vref.obj_context) src_vref.obj_context)
@coordination.synchronized('{self.driver_prefix}-{volume.id}') @coordination.synchronized('{self.driver_prefix}-{volume.id}')
def copy_volume_to_image(self, context, volume, image_service, image_meta): def copy_volume_to_image(self,
context: context.RequestContext,
volume: objects.Volume,
image_service,
image_meta: dict) -> None:
"""Copy the volume to the specified image.""" """Copy the volume to the specified image."""
return self._copy_volume_to_image(context, volume, image_service, return self._copy_volume_to_image(context, volume, image_service,
image_meta) image_meta)
@coordination.synchronized('{self.driver_prefix}-{volume.id}') @coordination.synchronized('{self.driver_prefix}-{volume.id}')
def extend_volume(self, volume, size_gb): def extend_volume(self, volume: objects.Volume, size_gb: int) -> None:
return self._extend_volume(volume, size_gb) return self._extend_volume(volume, size_gb)
@coordination.synchronized('{self.driver_prefix}-{volume.id}') @coordination.synchronized('{self.driver_prefix}-{volume.id}')
def revert_to_snapshot(self, context, volume, snapshot): def revert_to_snapshot(self,
context: context.RequestContext,
volume: objects.Volume,
snapshot: objects.Snapshot) -> None:
"""Revert to specified snapshot.""" """Revert to specified snapshot."""
return self._revert_to_snapshot(context, volume, snapshot) return self._revert_to_snapshot(context, volume, snapshot)
@ -1940,24 +2037,26 @@ class RemoteFSSnapDriverDistributed(RemoteFSSnapDriverBase):
class RemoteFSPoolMixin(object): class RemoteFSPoolMixin(object):
"""Drivers inheriting this will report each share as a pool.""" """Drivers inheriting this will report each share as a pool."""
def _find_share(self, volume): def _find_share(self, volume: objects.Volume) -> Optional[str]:
# We let the scheduler choose a pool for us. # We let the scheduler choose a pool for us.
pool_name = self._get_pool_name_from_volume(volume) pool_name = self._get_pool_name_from_volume(volume)
share = self._get_share_from_pool_name(pool_name) share = self._get_share_from_pool_name(pool_name)
return share return share
def _get_pool_name_from_volume(self, volume): def _get_pool_name_from_volume(self,
volume: objects.Volume) -> Optional[str]:
pool_name = volume_utils.extract_host(volume['host'], pool_name = volume_utils.extract_host(volume['host'],
level='pool') level='pool')
return pool_name return pool_name
def _get_pool_name_from_share(self, share): def _get_pool_name_from_share(self, share: str):
raise NotImplementedError() raise NotImplementedError()
def _get_share_from_pool_name(self, pool_name): def _get_share_from_pool_name(self, pool_name: Optional[str]):
# To be implemented by drivers using pools. # To be implemented by drivers using pools.
raise NotImplementedError() raise NotImplementedError()
@typing.no_type_check
def _update_volume_stats(self): def _update_volume_stats(self):
data = {} data = {}
pools = [] pools = []
@ -2001,6 +2100,7 @@ class RemoteFSPoolMixin(object):
class RevertToSnapshotMixin(object): class RevertToSnapshotMixin(object):
@typing.no_type_check
def _revert_to_snapshot(self, context, volume, snapshot): def _revert_to_snapshot(self, context, volume, snapshot):
"""Revert a volume to specified snapshot """Revert a volume to specified snapshot
@ -2048,6 +2148,7 @@ class RemoteFSManageableVolumesMixin(object):
_SUPPORTED_IMAGE_FORMATS = ['raw', 'qcow2'] _SUPPORTED_IMAGE_FORMATS = ['raw', 'qcow2']
_MANAGEABLE_IMAGE_RE = None _MANAGEABLE_IMAGE_RE = None
@typing.no_type_check
def _get_manageable_vol_location(self, existing_ref): def _get_manageable_vol_location(self, existing_ref):
if 'source-name' not in existing_ref: if 'source-name' not in existing_ref:
reason = _('The existing volume reference ' reason = _('The existing volume reference '
@ -2095,6 +2196,7 @@ class RemoteFSManageableVolumesMixin(object):
return os.path.join(volume_location['mountpoint'], return os.path.join(volume_location['mountpoint'],
volume.name) volume.name)
@typing.no_type_check
def _is_volume_manageable(self, volume_path, already_managed=False): def _is_volume_manageable(self, volume_path, already_managed=False):
unmanageable_reason = None unmanageable_reason = None
@ -2120,6 +2222,7 @@ class RemoteFSManageableVolumesMixin(object):
return True, None return True, None
@typing.no_type_check
def manage_existing(self, volume, existing_ref): def manage_existing(self, volume, existing_ref):
LOG.info('Managing volume %(volume_id)s with ref %(ref)s', LOG.info('Managing volume %(volume_id)s with ref %(ref)s',
{'volume_id': volume.id, 'ref': existing_ref}) {'volume_id': volume.id, 'ref': existing_ref})
@ -2149,6 +2252,7 @@ class RemoteFSManageableVolumesMixin(object):
return {'provider_location': vol_location['share']} return {'provider_location': vol_location['share']}
@typing.no_type_check
def _get_rounded_manageable_image_size(self, image_path): def _get_rounded_manageable_image_size(self, image_path):
image_size = image_utils.qemu_img_info( image_size = image_utils.qemu_img_info(
image_path, run_as_root=self._execute_as_root).virtual_size image_path, run_as_root=self._execute_as_root).virtual_size
@ -2162,6 +2266,7 @@ class RemoteFSManageableVolumesMixin(object):
def unmanage(self, volume): def unmanage(self, volume):
pass pass
@typing.no_type_check
def _get_manageable_volume(self, share, volume_path, managed_volume=None): def _get_manageable_volume(self, share, volume_path, managed_volume=None):
manageable, unmanageable_reason = self._is_volume_manageable( manageable, unmanageable_reason = self._is_volume_manageable(
volume_path, already_managed=managed_volume is not None) volume_path, already_managed=managed_volume is not None)
@ -2192,6 +2297,7 @@ class RemoteFSManageableVolumesMixin(object):
} }
return manageable_volume return manageable_volume
@typing.no_type_check
def _get_share_manageable_volumes(self, share, managed_volumes): def _get_share_manageable_volumes(self, share, managed_volumes):
manageable_volumes = [] manageable_volumes = []
mount_path = self._get_mount_point_for_share(share) mount_path = self._get_mount_point_for_share(share)
@ -2217,6 +2323,7 @@ class RemoteFSManageableVolumesMixin(object):
dict(image_path=img_path, exc=exc)) dict(image_path=img_path, exc=exc))
return manageable_volumes return manageable_volumes
@typing.no_type_check
def get_manageable_volumes(self, cinder_volumes, marker, limit, offset, def get_manageable_volumes(self, cinder_volumes, marker, limit, offset,
sort_keys, sort_dirs): sort_keys, sort_dirs):
manageable_volumes = [] manageable_volumes = []

View File

@ -31,6 +31,7 @@ cinder/utils.py
cinder/volume/__init__.py cinder/volume/__init__.py
cinder/volume/api.py cinder/volume/api.py
cinder/volume/drivers/rbd.py cinder/volume/drivers/rbd.py
cinder/volume/drivers/remotefs.py
cinder/volume/flows/api/create_volume.py cinder/volume/flows/api/create_volume.py
cinder/volume/flows/manager/create_volume.py cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py cinder/volume/manager.py