mypy: RBD driver

Change-Id: I4f8e4c13e678912ce5a3105dc21ea81d1b53bf9a
This commit is contained in:
Eric Harney 2021-01-27 10:36:05 -05:00
parent 1075d1d304
commit efd476c74e
2 changed files with 284 additions and 120 deletions

View File

@ -19,6 +19,8 @@ import json
import math
import os
import tempfile
import typing
from typing import Any, Dict, List, Optional, Tuple, Union # noqa: H301
from castellan import key_manager
from eventlet import tpool
@ -36,15 +38,19 @@ try:
except ImportError:
rados = None
rbd = None
import six
from six.moves import urllib
from cinder import context
from cinder import exception
from cinder.i18n import _
from cinder.image import image_utils
from cinder import interface
from cinder import objects
from cinder.objects.backup import Backup
from cinder.objects import fields
from cinder.objects.snapshot import Snapshot
from cinder.objects.volume import Volume
from cinder.objects.volume_type import VolumeType
from cinder import utils
from cinder.volume import configuration
from cinder.volume import driver
@ -150,9 +156,16 @@ class RBDVolumeProxy(object):
The underlying librados client and ioctx can be accessed as the attributes
'client' and 'ioctx'.
"""
def __init__(self, driver, name, pool=None, snapshot=None,
read_only=False, remote=None, timeout=None,
client=None, ioctx=None):
def __init__(self,
driver: 'RBDDriver',
name: str,
pool: str = None,
snapshot: str = None,
read_only: bool = False,
remote: Optional[Dict[str, str]] = None,
timeout: int = None,
client: 'rados.Rados' = None,
ioctx: 'rados.Ioctx' = None):
self._close_conn = not (client and ioctx)
rados_client, rados_ioctx = driver._connect_to_rados(
pool, remote, timeout) if self._close_conn else (client, ioctx)
@ -173,34 +186,37 @@ class RBDVolumeProxy(object):
self.client = rados_client
self.ioctx = rados_ioctx
def __enter__(self):
def __enter__(self) -> 'RBDVolumeProxy':
return self
def __exit__(self, type_, value, traceback):
def __exit__(self,
type_: Optional[Any],
value: Optional[Any],
traceback: Optional[Any]) -> None:
try:
self.volume.close()
finally:
if self._close_conn:
self.driver._disconnect_from_rados(self.client, self.ioctx)
def __getattr__(self, attrib):
def __getattr__(self, attrib: str):
return getattr(self.volume, attrib)
class RADOSClient(object):
"""Context manager to simplify error handling for connecting to ceph."""
def __init__(self, driver, pool=None):
def __init__(self, driver: 'RBDDriver', pool: str = None) -> None:
self.driver = driver
self.cluster, self.ioctx = driver._connect_to_rados(pool)
def __enter__(self):
def __enter__(self) -> 'RADOSClient':
return self
def __exit__(self, type_, value, traceback):
def __exit__(self, type_, value, traceback) -> None:
self.driver._disconnect_from_rados(self.cluster, self.ioctx)
@property
def features(self):
def features(self) -> int:
features = self.cluster.conf_get('rbd_default_features')
if ((features is None) or (int(features) == 0)):
features = self.driver.RBD_FEATURE_LAYERING
@ -229,10 +245,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
RBD_FEATURE_JOURNALING = 64
STORAGE_PROTOCOL = 'ceph'
def __init__(self, active_backend_id=None, *args, **kwargs):
def __init__(self,
active_backend_id: str = None,
*args,
**kwargs) -> None:
super(RBDDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(RBD_OPTS)
self._stats = {}
self._stats: Dict[str, Union[str, bool]] = {}
# allow overrides for testing
self.rados = kwargs.get('rados', rados)
self.rbd = kwargs.get('rbd', rbd)
@ -247,12 +266,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._backend_name = (self.configuration.volume_backend_name or
self.__class__.__name__)
self._active_backend_id = active_backend_id
self._active_config = {}
self._active_backend_id: Optional[str] = active_backend_id
self._active_config: Dict[str, str] = {}
self._is_replication_enabled = False
self._replication_targets = []
self._target_names = []
self._clone_v2_api_checked = False
self._replication_targets: list = []
self._target_names: List[str] = []
self._clone_v2_api_checked: bool = False
if self.rbd is not None:
self.RBD_FEATURE_LAYERING = self.rbd.RBD_FEATURE_LAYERING
@ -268,9 +287,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self.RBD_FEATURE_OBJECT_MAP |
self.RBD_FEATURE_EXCLUSIVE_LOCK)
self.keyring_data: Optional[str] = None
self._set_keyring_attributes()
def _set_keyring_attributes(self):
def _set_keyring_attributes(self) -> None:
# The rbd_keyring_conf option is not available for OpenStack usage
# for security reasons (OSSN-0085) and in OpenStack we use
# rbd_secret_uuid or make sure that the keyring files are present on
@ -281,8 +301,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
# file even if it's there (because we have removed the conf option
# definition), but cinderlib will find it because it sets the option
# directly as an attribute.
self.keyring_file = getattr(self.configuration, 'rbd_keyring_conf',
None)
self.keyring_file: Optional[str] = getattr(self.configuration,
'rbd_keyring_conf',
None)
self.keyring_data = None
try:
@ -293,13 +314,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.debug('Cannot read RBD keyring file: %s.', self.keyring_file)
@classmethod
def get_driver_options(cls):
def get_driver_options(cls) -> list:
additional_opts = cls._get_oslo_driver_opts(
'replication_device', 'reserved_percentage',
'max_over_subscription_ratio', 'volume_dd_blocksize')
return RBD_OPTS + additional_opts
def _show_msg_check_clone_v2_api(self, volume_name):
def _show_msg_check_clone_v2_api(self, volume_name: str) -> None:
if not self._clone_v2_api_checked:
self._clone_v2_api_checked = True
with RBDVolumeProxy(self, volume_name, read_only=True) as volume:
@ -315,7 +336,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
' compat version to mimic for better'
' performance, fewer deletion issues')
def _get_target_config(self, target_id):
def _get_target_config(self,
target_id: Optional[str]) -> Dict[str,
str]:
"""Get a replication target from known replication targets."""
for target in self._replication_targets:
if target['name'] == target_id:
@ -330,12 +353,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
raise exception.InvalidReplicationTarget(
reason=_('RBD: Unknown failover target host %s.') % target_id)
def do_setup(self, context):
def do_setup(self, context: context.RequestContext) -> None:
"""Performs initialization steps that could raise exceptions."""
self._do_setup_replication()
self._active_config = self._get_target_config(self._active_backend_id)
def _do_setup_replication(self):
def _do_setup_replication(self) -> None:
replication_devices = self.configuration.safe_get(
'replication_device')
if replication_devices:
@ -343,7 +366,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._is_replication_enabled = True
self._target_names.append('default')
def _parse_replication_configs(self, replication_devices):
def _parse_replication_configs(self,
replication_devices: List[dict]) -> None:
for replication_device in replication_devices:
if 'backend_id' not in replication_device:
msg = _('Missing backend_id in replication_device '
@ -366,13 +390,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._replication_targets.append(replication_target)
self._target_names.append(name)
def _get_config_tuple(self, remote=None):
def _get_config_tuple(
self,
remote: Optional[Dict[str, str]] = None) \
-> Tuple[Optional[str], Optional[str],
Optional[str], Optional[str]]:
if not remote:
remote = self._active_config
return (remote.get('name'), remote.get('conf'), remote.get('user'),
remote.get('secret_uuid', None))
def _trash_purge(self):
def _trash_purge(self) -> None:
LOG.info("Purging trash for backend '%s'", self._backend_name)
def _err(vol_name: str, backend_name: str) -> None:
@ -405,7 +433,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
vol.get('name'),
self._backend_name)
def _start_periodic_tasks(self):
def _start_periodic_tasks(self) -> None:
if self.configuration.enable_deferred_deletion:
LOG.info("Starting periodic trash purge for backend '%s'",
self._backend_name)
@ -414,7 +442,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
deferred_deletion_ptask.start(
interval=self.configuration.deferred_deletion_purge_interval)
def check_for_setup_error(self):
def check_for_setup_error(self) -> None:
"""Returns an error if prerequisites aren't met."""
if rados is None:
msg = _('rados and rbd python libraries not found')
@ -451,10 +479,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._start_periodic_tasks()
def RBDProxy(self):
def RBDProxy(self) -> tpool.Proxy:
return tpool.Proxy(self.rbd.RBD())
def _ceph_args(self):
def _ceph_args(self) -> List[str]:
args = []
name, conf, user, secret_uuid = self._get_config_tuple()
@ -468,11 +496,18 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return args
def _connect_to_rados(self, pool=None, remote=None, timeout=None):
def _connect_to_rados(self,
pool: Optional[str] = None,
remote: Optional[dict] = None,
timeout: Optional[int] = None) -> \
Tuple['rados.Rados', 'rados.Ioctx']:
@utils.retry(exception.VolumeBackendAPIException,
self.configuration.rados_connection_interval,
self.configuration.rados_connection_retries)
def _do_conn(pool, remote, timeout):
def _do_conn(pool: Optional[str],
remote: Optional[dict],
timeout: Optional[int]) -> Tuple['rados.Rados',
'rados.Ioctx']:
name, conf, user, secret_uuid = self._get_config_tuple(remote)
if pool is not None:
@ -494,10 +529,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
try:
if timeout >= 0:
timeout = six.text_type(timeout)
client.conf_set('rados_osd_op_timeout', timeout)
client.conf_set('rados_mon_op_timeout', timeout)
client.conf_set('client_mount_timeout', timeout)
t = str(timeout)
client.conf_set('rados_osd_op_timeout', t)
client.conf_set('rados_mon_op_timeout', t)
client.conf_set('client_mount_timeout', t)
client.connect()
ioctx = client.open_ioctx(pool)
@ -510,12 +545,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return _do_conn(pool, remote, timeout)
def _disconnect_from_rados(self, client, ioctx):
def _disconnect_from_rados(self,
client: 'rados.Rados',
ioctx: 'rados.Ioctx') -> None:
# closing an ioctx cannot raise an exception
ioctx.close()
client.shutdown()
def _get_backup_snaps(self, rbd_image):
def _get_backup_snaps(self, rbd_image) -> List:
"""Get list of any backup snapshots that exist on this volume.
There should only ever be one but accept all since they need to be
@ -528,7 +565,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
from cinder.backup.drivers import ceph
return ceph.CephBackupDriver.get_backup_snaps(rbd_image)
def _get_mon_addrs(self):
def _get_mon_addrs(self) -> Tuple[List[str], List[str]]:
args = ['ceph', 'mon', 'dump', '--format=json']
args.extend(self._ceph_args())
out, _ = self._execute(*args)
@ -536,7 +573,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
if lines[0].startswith('dumped monmap epoch'):
lines = lines[1:]
monmap = json.loads('\n'.join(lines))
addrs = [mon['addr'] for mon in monmap['mons']]
addrs: List[str] = [mon['addr'] for mon in monmap['mons']]
hosts = []
ports = []
for addr in addrs:
@ -546,7 +583,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
ports.append(port)
return hosts, ports
def _get_usage_info(self):
def _get_usage_info(self) -> int:
"""Calculate provisioned volume space in GiB.
Stats report should send provisioned size of volumes (snapshot must not
@ -572,7 +609,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
total_provisioned = math.ceil(float(total_provisioned) / units.Gi)
return total_provisioned
def _get_pool_stats(self):
def _get_pool_stats(self) -> Union[Tuple[str, str],
Tuple[float, float]]:
"""Gets pool free and total capacity in GiB.
Calculate free and total capacity of the pool based on the pool's
@ -602,6 +640,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
pool_stats = [pool for pool in df_data['pools']
if pool['name'] == pool_name][0]['stats']
total_capacity: float
free_capacity: float
quota_outbuf = encodeutils.safe_decode(quota_outbuf)
bytes_quota = json.loads(quota_outbuf)['quota_max_bytes']
# With quota the total is the quota limit and free is quota - used
@ -624,7 +664,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return free_capacity, total_capacity
def _update_volume_stats(self):
def _update_volume_stats(self) -> None:
location_info = '%s:%s:%s:%s:%s' % (
self.configuration.rbd_cluster_name,
self.configuration.rbd_ceph_conf,
@ -673,7 +713,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.exception('error refreshing volume stats')
self._stats = stats
def _get_clone_depth(self, client, volume_name, depth=0):
def _get_clone_depth(self,
client: 'rados.Rados',
volume_name: str,
depth: int = 0) -> int:
"""Returns the number of ancestral clones of the given volume."""
parent_volume = self.rbd.Image(client.ioctx,
volume_name,
@ -689,7 +732,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return self._get_clone_depth(client, parent, depth + 1)
def _extend_if_required(self, volume, src_vref):
def _extend_if_required(self, volume: Volume, src_vref: Volume) -> None:
"""Extends a volume if required
In case src_vref size is smaller than the size if the requested
@ -702,7 +745,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
'dst_size': volume.size})
self._resize(volume)
def create_cloned_volume(self, volume, src_vref):
def create_cloned_volume(
self,
volume: Volume,
src_vref: Volume) -> Optional[Dict[str, Optional[str]]]:
"""Create a cloned volume from another volume.
Since we are cloning from a volume and not a snapshot, we must first
@ -722,7 +768,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
with RBDVolumeProxy(self, src_name, read_only=True) as vol:
vol.copy(vol.ioctx, dest_name)
self._extend_if_required(volume, src_vref)
return
return None
# Otherwise do COW clone.
with RADOSClient(self) as client:
@ -808,7 +854,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.debug("clone created successfully")
return volume_update
def _enable_replication(self, volume):
def _enable_replication(self, volume: Volume) -> Dict[str, str]:
"""Enable replication for a volume.
Returns required volume update.
@ -832,7 +878,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'replication_status': fields.ReplicationStatus.ENABLED,
'replication_driver_data': driver_data}
def _enable_multiattach(self, volume):
def _enable_multiattach(self, volume: Volume) -> Dict[str, str]:
vol_name = utils.convert_str(volume.name)
with RBDVolumeProxy(self, vol_name) as image:
image_features = image.features()
@ -842,7 +888,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'provider_location':
self._dumps({'saved_features': image_features})}
def _disable_multiattach(self, volume):
def _disable_multiattach(self, volume: Volume) -> Dict[str, None]:
vol_name = utils.convert_str(volume.name)
with RBDVolumeProxy(self, vol_name) as image:
try:
@ -859,7 +905,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'provider_location': None}
def _is_replicated_type(self, volume_type):
def _is_replicated_type(self, volume_type: VolumeType) -> bool:
try:
extra_specs = volume_type.extra_specs
LOG.debug('extra_specs: %s', extra_specs)
@ -868,7 +914,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.debug('Unable to retrieve extra specs info')
return False
def _is_multiattach_type(self, volume_type):
def _is_multiattach_type(self, volume_type: VolumeType) -> bool:
try:
extra_specs = volume_type.extra_specs
LOG.debug('extra_specs: %s', extra_specs)
@ -877,7 +923,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.debug('Unable to retrieve extra specs info')
return False
def _setup_volume(self, volume, volume_type=None):
def _setup_volume(
self,
volume: Volume,
volume_type: Optional[VolumeType] = None) -> Dict[str,
Optional[str]]:
if volume_type:
had_replication = self._is_replicated_type(volume.volume_type)
@ -894,7 +944,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
msg = _('Replication and Multiattach are mutually exclusive.')
raise RBDDriverException(reason=msg)
volume_update = dict()
volume_update: dict = dict()
if want_replication:
if had_multiattach:
@ -924,7 +974,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return volume_update
def _create_encrypted_volume(self, volume, context):
def _create_encrypted_volume(self,
volume: Volume,
context: context.RequestContext) -> None:
"""Create an encrypted volume.
This works by creating an encrypted image locally,
@ -972,11 +1024,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
cmd.extend(self._ceph_args())
self._execute(*cmd)
def create_volume(self, volume):
def create_volume(self, volume: Volume) -> Dict[str, Any]:
"""Creates a logical volume."""
if volume.encryption_key_id:
return self._create_encrypted_volume(volume, volume.obj_context)
self._create_encrypted_volume(volume, volume.obj_context)
return {}
size = int(volume.size) * units.Gi
@ -1004,13 +1057,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return volume_update
def _flatten(self, pool, volume_name):
def _flatten(self, pool: str, volume_name: str) -> None:
LOG.debug('flattening %(pool)s/%(img)s',
dict(pool=pool, img=volume_name))
with RBDVolumeProxy(self, volume_name, pool) as vol:
vol.flatten()
def _get_stripe_unit(self, ioctx, volume_name):
def _get_stripe_unit(self, ioctx: 'rados.Ioctx', volume_name: str) -> int:
"""Return the correct stripe unit for a cloned volume.
A cloned volume must be created with a stripe unit at least as large
@ -1029,7 +1082,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return max(image_stripe_unit, default_stripe_unit)
def _clone(self, volume, src_pool, src_image, src_snap):
def _clone(self,
volume: Volume,
src_pool: str,
src_image: str,
src_snap: str) -> Dict[str, Optional[str]]:
LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
dict(pool=src_pool, img=src_image, snap=src_snap,
dst=volume.name))
@ -1056,7 +1113,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
volume_id=volume.id)
return volume_update or {}
def _resize(self, volume, **kwargs):
def _resize(self, volume: Volume, **kwargs: Any) -> None:
size = kwargs.get('size', None)
if not size:
size = int(volume.size) * units.Gi
@ -1064,14 +1121,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
with RBDVolumeProxy(self, volume.name) as vol:
vol.resize(size)
def _calculate_new_size(self, size_diff, volume_name):
def _calculate_new_size(self, size_diff: int, volume_name: str) -> int:
with RBDVolumeProxy(self, volume_name) as vol:
current_size_bytes = vol.volume.size()
size_diff_bytes = size_diff * units.Gi
new_size_bytes = current_size_bytes + size_diff_bytes
return new_size_bytes
def create_volume_from_snapshot(self, volume, snapshot):
def create_volume_from_snapshot(
self,
volume: Volume,
snapshot: Snapshot) -> dict:
"""Creates a volume from a snapshot."""
volume_update = self._clone(volume, self.configuration.rbd_pool,
snapshot.volume_name, snapshot.name)
@ -1098,7 +1158,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._show_msg_check_clone_v2_api(snapshot.volume_name)
return volume_update
def _delete_backup_snaps(self, rbd_image):
def _delete_backup_snaps(self, rbd_image: 'rbd.Image') -> None:
backup_snaps = self._get_backup_snaps(rbd_image)
if backup_snaps:
for snap in backup_snaps:
@ -1106,7 +1166,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
else:
LOG.debug("volume has no backup snaps")
def _get_clone_info(self, volume, volume_name, snap=None):
def _get_clone_info(
self,
volume: 'rbd.Image',
volume_name: str,
snap: Optional[str] = None) -> Union[Tuple[str, str, str],
Tuple[None, None, None]]:
"""If volume is a clone, return its parent info.
Returns a tuple of (pool, parent, snap). A snapshot may optionally be
@ -1132,7 +1197,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return (None, None, None)
def _get_children_info(self, volume, snap):
def _get_children_info(self,
volume: 'rbd.Image',
snap: Optional[str]) -> List[tuple]:
"""List children for the given snapshot of a volume(image).
Returns a list of (pool, image).
@ -1147,7 +1214,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return children_list
def _delete_clone_parent_refs(self, client, parent_name, parent_snap):
def _delete_clone_parent_refs(self,
client: RADOSClient,
parent_name: str,
parent_snap: str) -> None:
"""Walk back up the clone chain and delete references.
Deletes references i.e. deleted parent volumes and snapshots.
@ -1183,9 +1253,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
# Now move up to grandparent if there is one
if g_parent:
g_parent_snap = typing.cast(str, g_parent_snap)
self._delete_clone_parent_refs(client, g_parent, g_parent_snap)
def delete_volume(self, volume):
def delete_volume(self, volume: Volume) -> None:
"""Deletes a logical volume."""
# NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
# utf-8 otherwise librbd will barf.
@ -1226,7 +1297,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
@utils.retry(self.rbd.ImageBusy,
self.configuration.rados_connection_interval,
self.configuration.rados_connection_retries)
def _try_remove_volume(client, volume_name):
def _try_remove_volume(client: Any, volume_name: str) -> None:
if self.configuration.enable_deferred_deletion:
delay = self.configuration.deferred_deletion_delay
else:
@ -1269,6 +1340,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
# references.
if parent:
LOG.debug("volume is a clone so cleaning references")
parent_snap = typing.cast(str, parent_snap)
self._delete_clone_parent_refs(client, parent, parent_snap)
else:
# If the volume has copy-on-write clones we will not be able to
@ -1277,14 +1349,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
new_name = "%s.deleted" % (volume_name)
self.RBDProxy().rename(client.ioctx, volume_name, new_name)
def create_snapshot(self, snapshot):
def create_snapshot(self, snapshot: Snapshot) -> None:
"""Creates an rbd snapshot."""
with RBDVolumeProxy(self, snapshot.volume_name) as volume:
snap = utils.convert_str(snapshot.name)
volume.create_snap(snap)
volume.protect_snap(snap)
def delete_snapshot(self, snapshot):
def delete_snapshot(self, snapshot: Snapshot) -> None:
"""Deletes an rbd snapshot."""
# NOTE(dosaboy): this was broken by commit cbe1d5f. Ensure names are
# utf-8 otherwise librbd will barf.
@ -1320,11 +1392,14 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.info("Snapshot %s does not exist in backend.",
snap_name)
def snapshot_revert_use_temp_snapshot(self):
def snapshot_revert_use_temp_snapshot(self) -> bool:
"""Disable the use of a temporary snapshot on revert."""
return False
def revert_to_snapshot(self, context, volume, snapshot):
def revert_to_snapshot(self,
context: context.RequestContext,
volume: Volume,
snapshot: Snapshot) -> None:
"""Revert a volume to a given snapshot."""
# NOTE(rosmaita): The Ceph documentation notes that this operation is
# inefficient on the backend for large volumes, and that the preferred
@ -1346,7 +1421,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
with RBDVolumeProxy(self, volume.name) as image:
image.rollback_to_snap(snapshot.name)
def _disable_replication(self, volume):
def _disable_replication(self, volume: Volume) -> Dict[str, Optional[str]]:
"""Disable replication on the given volume."""
vol_name = utils.convert_str(volume.name)
with RBDVolumeProxy(self, vol_name) as image:
@ -1363,14 +1438,24 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'replication_status': fields.ReplicationStatus.DISABLED,
'replication_driver_data': None}
def retype(self, context, volume, new_type, diff, host):
def retype(self,
context: context.RequestContext,
volume: Volume,
new_type: VolumeType,
diff: Union[Dict[str, Dict[str, str]], Dict[str, Dict], None],
host: Optional[Dict[str, str]]) -> Tuple[bool, dict]:
"""Retype from one volume type to another on the same backend."""
return True, self._setup_volume(volume, new_type)
def _dumps(self, obj):
def _dumps(self, obj: Dict[str, Union[bool, int]]) -> str:
return json.dumps(obj, separators=(',', ':'), sort_keys=True)
def _exec_on_volume(self, volume_name, remote, operation, *args, **kwargs):
def _exec_on_volume(self,
volume_name: str,
remote: Dict[str, str],
operation: str,
*args: Any,
**kwargs: Any):
@utils.retry(rbd.ImageBusy,
self.configuration.rados_connection_interval,
self.configuration.rados_connection_retries)
@ -1381,7 +1466,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return getattr(rbd_image, operation)(*args, **kwargs)
return _do_exec()
def _failover_volume(self, volume, remote, is_demoted, replication_status):
def _failover_volume(self,
volume: Volume,
remote: Dict[str, str],
is_demoted: bool,
replication_status: str) -> Dict[str, Any]:
"""Process failover for a volume.
There are 2 different cases that will return different update values
@ -1420,7 +1509,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return error_result
def _demote_volumes(self, volumes, until_failure=True):
def _demote_volumes(self,
volumes: List[Volume],
until_failure: bool = True) -> List[bool]:
"""Try to demote volumes on the current primary cluster."""
result = []
try_demoting = True
@ -1440,7 +1531,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
result.append(demoted)
return result
def _get_failover_target_config(self, secondary_id=None):
def _get_failover_target_config(self,
secondary_id: str = None) -> Tuple[str,
dict]:
if not secondary_id:
# In auto mode exclude failback and active
candidates = set(self._target_names).difference(
@ -1451,7 +1544,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
secondary_id = candidates.pop()
return secondary_id, self._get_target_config(secondary_id)
def failover(self, context, volumes, secondary_id=None, groups=None):
def failover(self,
context: context.RequestContext,
volumes: list,
secondary_id: Optional[str] = None,
groups=None) -> Tuple[str, list, list]:
"""Failover replicated volumes."""
LOG.info('RBD driver failover started.')
if not self._is_replication_enabled:
@ -1476,7 +1573,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.info('RBD driver failover completed.')
return secondary_id, updates, []
def failover_completed(self, context, secondary_id=None):
def failover_completed(self,
context: context.RequestContext,
secondary_id: Optional[str] = None) -> None:
"""Failover to replication target."""
LOG.info('RBD driver failover completion started.')
secondary_id, remote = self._get_failover_target_config(secondary_id)
@ -1485,7 +1584,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._active_config = remote
LOG.info('RBD driver failover completion completed.')
def failover_host(self, context, volumes, secondary_id=None, groups=None):
def failover_host(self,
context: context.RequestContext,
volumes: List[Volume],
secondary_id: str = None,
groups: Optional[List] = None) -> Tuple[str,
List[Volume],
List]:
"""Failover to replication target.
This function combines calls to failover() and failover_completed() to
@ -1496,19 +1601,28 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self.failover_completed(context, secondary_id)
return active_backend_id, volume_update_list, group_update_list
def ensure_export(self, context, volume):
def ensure_export(self,
context: context.RequestContext,
volume: Volume):
"""Synchronously recreates an export for a logical volume."""
pass
def create_export(self, context, volume, connector):
def create_export(self,
context: context.RequestContext,
volume: Volume,
connector: dict):
"""Exports the volume."""
pass
def remove_export(self, context, volume):
def remove_export(self,
context: context.RequestContext,
volume: Volume):
"""Removes an export for a logical volume."""
pass
def initialize_connection(self, volume, connector):
def initialize_connection(self,
volume: Volume,
connector: dict) -> Dict[str, Any]:
hosts, ports = self._get_mon_addrs()
name, conf, user, secret_uuid = self._get_config_tuple()
data = {
@ -1528,14 +1642,17 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
}
}
if self.keyring_data:
data['data']['keyring'] = self.keyring_data
data['data']['keyring'] = self.keyring_data # type: ignore
LOG.debug('connection data: %s', data)
return data
def terminate_connection(self, volume, connector, **kwargs):
def terminate_connection(self,
volume: Volume,
connector: dict,
**kwargs) -> None:
pass
def _parse_location(self, location):
def _parse_location(self, location: str) -> List[str]:
prefix = 'rbd://'
if not location.startswith(prefix):
reason = _('Not stored in rbd')
@ -1550,7 +1667,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
raise exception.ImageUnacceptable(image_id=location, reason=reason)
return pieces
def _get_fsid(self):
def _get_fsid(self) -> str:
with RADOSClient(self) as client:
# Librados's get_fsid is represented as binary
# in py3 instead of str as it is in py2.
@ -1567,7 +1684,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
# https://tracker.ceph.com/issues/38381
return encodeutils.safe_decode(client.cluster.get_fsid())
def _is_cloneable(self, image_location, image_meta):
def _is_cloneable(self, image_location: str, image_meta: dict) -> bool:
try:
fsid, pool, image, snapshot = self._parse_location(image_location)
except exception.ImageUnacceptable as e:
@ -1597,9 +1714,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
dict(loc=image_location, err=e))
return False
def clone_image(self, context, volume,
image_location, image_meta,
image_service):
def clone_image(self,
context: context.RequestContext,
volume: Volume,
image_location: Optional[list],
image_meta: dict,
image_service) -> Tuple[dict, bool]:
if image_location:
# Note: image_location[0] is glance image direct_url.
# image_location[1] contains the list of all locations (including
@ -1623,16 +1743,29 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return volume_update, True
return ({}, False)
def copy_image_to_encrypted_volume(self, context, volume, image_service,
image_id):
def copy_image_to_encrypted_volume(self,
context: context.RequestContext,
volume: Volume,
image_service,
image_id: str) -> None:
self._copy_image_to_volume(context, volume, image_service, image_id,
encrypted=True)
def copy_image_to_volume(self, context, volume, image_service, image_id):
def copy_image_to_volume(self,
context: context.RequestContext,
volume: Volume,
image_service,
image_id: str) -> None:
self._copy_image_to_volume(context, volume, image_service, image_id)
def _encrypt_image(self, context, volume, tmp_dir, src_image_path):
encryption = volume_utils.check_encryption_provider(volume, context)
def _encrypt_image(self,
context: context.RequestContext,
volume: Volume,
tmp_dir: str,
src_image_path: Any) -> None:
encryption = volume_utils.check_encryption_provider(
volume,
context)
# Fetch the key associated with the volume and decode the passphrase
keymgr = key_manager.API(CONF)
@ -1663,8 +1796,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
finally:
fileutils.delete_if_exists(dest_image_path)
def _copy_image_to_volume(self, context, volume, image_service, image_id,
encrypted=False):
def _copy_image_to_volume(self,
context: context.RequestContext,
volume: Volume,
image_service: Any,
image_id: str,
encrypted: bool = False) -> None:
tmp_dir = volume_utils.image_conversion_dir()
@ -1680,7 +1817,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
@utils.retry(exception.VolumeIsBusy,
self.configuration.rados_connection_interval,
self.configuration.rados_connection_retries)
def _delete_volume(volume):
def _delete_volume(volume: Volume) -> None:
self.delete_volume(volume)
_delete_volume(volume)
@ -1721,7 +1858,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
volume)
os.unlink(tmp_file)
def extend_volume(self, volume, new_size):
def extend_volume(self, volume: Volume, new_size: str) -> None:
"""Extend an existing volume."""
old_size = volume.size
@ -1737,7 +1874,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.debug("Extend volume from %(old_size)s GB to %(new_size)s GB.",
{'old_size': old_size, 'new_size': new_size})
def manage_existing(self, volume, existing_ref):
def manage_existing(self,
volume: Volume, existing_ref: Dict[str, str]) -> None:
"""Manages an existing image.
Renames the image name to match the expected name for the volume.
@ -1756,7 +1894,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
utils.convert_str(rbd_name),
utils.convert_str(volume.name))
def manage_existing_get_size(self, volume, existing_ref):
def manage_existing_get_size(self,
volume: Volume,
existing_ref: Dict[str, str]) -> int:
"""Return size of an existing image for manage_existing.
:param volume:
@ -1812,8 +1952,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
out, _ = self._execute(*args)
return json.loads(out)
def get_manageable_volumes(self, cinder_volumes, marker, limit, offset,
sort_keys, sort_dirs):
def get_manageable_volumes(self,
cinder_volumes: List[Dict[str, str]],
marker: Optional[Any],
limit: int,
offset: int,
sort_keys: List[str],
sort_dirs: List[str]) -> List[Dict[str, Any]]:
manageable_volumes = []
cinder_ids = [resource['id'] for resource in cinder_volumes]
@ -1860,8 +2005,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def unmanage(self, volume):
pass
def update_migrated_volume(self, ctxt, volume, new_volume,
original_volume_status):
def update_migrated_volume(self,
ctxt: Dict,
volume: Volume,
new_volume: Volume,
original_volume_status: str) -> \
Union[Dict[str, None], Dict[str, Optional[str]]]:
"""Return model update from RBD for migrated volume.
This method should rename the back-end volume name(id) on the
@ -1902,7 +2051,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'_name_id': name_id,
'provider_location': provider_location}
def migrate_volume(self, context, volume, host):
def migrate_volume(self,
context: context.RequestContext,
volume: Volume,
host: Dict[str, Dict[str, str]]) -> Tuple[bool, None]:
refuse_to_migrate = (False, None)
@ -1982,7 +2134,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return (True, None)
def manage_existing_snapshot_get_size(self, snapshot, existing_ref):
def manage_existing_snapshot_get_size(self,
snapshot: Snapshot,
existing_ref: Dict[str, Any]) -> int:
"""Return size of an existing image for manage_existing.
:param snapshot:
@ -2031,7 +2185,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
raise exception.VolumeBackendAPIException(
data=exception_message)
def manage_existing_snapshot(self, snapshot, existing_ref):
def manage_existing_snapshot(self,
snapshot: Snapshot,
existing_ref: Dict[str, Any]) -> None:
"""Manages an existing snapshot.
Renames the snapshot name to match the expected name for the snapshot.
@ -2053,8 +2209,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
if not volume.is_protected_snap(snapshot.name):
volume.protect_snap(snapshot.name)
def get_manageable_snapshots(self, cinder_snapshots, marker, limit, offset,
sort_keys, sort_dirs):
def get_manageable_snapshots(self,
cinder_snapshots: List[Dict[str, str]],
marker: Optional[Any],
limit: int,
offset: int,
sort_keys: List[str],
sort_dirs: List[str]) -> List[Dict[str, Any]]:
"""List manageable snapshots on RBD backend."""
manageable_snapshots = []
cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots]
@ -2104,7 +2265,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return volume_utils.paginate_entries_list(
manageable_snapshots, marker, limit, offset, sort_keys, sort_dirs)
def unmanage_snapshot(self, snapshot):
def unmanage_snapshot(self, snapshot: Snapshot) -> None:
"""Removes the specified snapshot from Cinder management."""
with RBDVolumeProxy(self, snapshot.volume_name) as volume:
volume.set_snap(snapshot.name)
@ -2113,7 +2274,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
if not children and volume.is_protected_snap(snapshot.name):
volume.unprotect_snap(snapshot.name)
def get_backup_device(self, context, backup):
def get_backup_device(self,
context: context.RequestContext,
backup: Backup) -> Tuple[Volume, bool]:
"""Get a backup device from an existing volume.
To support incremental backups on Ceph to Ceph we don't clone

View File

@ -28,6 +28,7 @@ cinder/scheduler/weights/volume_number.py
cinder/utils.py
cinder/volume/__init__.py
cinder/volume/api.py
cinder/volume/drivers/rbd.py
cinder/volume/flows/api/create_volume.py
cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py