Merge "mypy: create_volume flows"

This commit is contained in:
Zuul 2021-08-18 22:29:35 +00:00 committed by Gerrit Code Review
commit 6643e3e62c
9 changed files with 227 additions and 80 deletions

View File

@ -11,6 +11,7 @@
# under the License.
import os
from typing import Any, List # noqa: H301
from oslo_log import log as logging
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
@ -24,7 +25,7 @@ from cinder import exception
LOG = logging.getLogger(__name__)
def _make_task_name(cls, addons=None):
def _make_task_name(cls, addons: List[str] = None) -> str:
"""Makes a pretty name for a task class."""
base_name = ".".join([cls.__module__, cls.__name__])
extra = ''
@ -40,11 +41,11 @@ class CinderTask(task.Task):
implement the given task as the task name.
"""
def __init__(self, addons=None, **kwargs):
def __init__(self, addons: List[str] = None, **kwargs: Any) -> None:
super(CinderTask, self).__init__(self.make_name(addons), **kwargs)
@classmethod
def make_name(cls, addons=None):
def make_name(cls, addons: List[str] = None) -> str:
return _make_task_name(cls, addons)

View File

@ -133,7 +133,8 @@ def as_int(obj: Union[int, float, str], quiet: bool = True) -> int:
return obj
def check_exclusive_options(**kwargs: dict) -> None:
def check_exclusive_options(
**kwargs: Optional[Union[dict, str, bool]]) -> None:
"""Checks that only one of the provided options is actually not-none.
Iterates over all the kwargs passed in and checks that only one of said

View File

@ -10,15 +10,19 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List, Optional, Tuple, Type, Union # noqa: H301
from oslo_config import cfg
from oslo_log import log as logging
import taskflow.engines
from taskflow.patterns import linear_flow
from taskflow.types import failure as ft
from cinder import context
from cinder import exception
from cinder import flow_utils
from cinder.i18n import _
from cinder.image import glance
from cinder import objects
from cinder.objects import fields
from cinder.policies import volumes as policy
@ -68,15 +72,20 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
'refresh_az', 'backup_id', 'availability_zones',
'multiattach'])
def __init__(self, image_service, availability_zones, **kwargs):
def __init__(self,
image_service: glance.GlanceImageService,
availability_zones, **kwargs) -> None:
super(ExtractVolumeRequestTask, self).__init__(addons=[ACTION],
**kwargs)
self.image_service = image_service
self.availability_zones = availability_zones
@staticmethod
def _extract_resource(resource, allowed_vals, exc, resource_name,
props=('status',)):
def _extract_resource(resource: Optional[dict],
allowed_vals: Tuple[Tuple[str, ...]],
exc: Type[exception.CinderException],
resource_name: str,
props: Tuple[str] = ('status',)) -> Optional[str]:
"""Extracts the resource id from the provided resource.
This method validates the input resource dict and checks that the
@ -109,36 +118,51 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
resource_id = resource['id']
return resource_id
def _extract_consistencygroup(self, consistencygroup):
def _extract_consistencygroup(
self,
consistencygroup: Optional[dict]) -> Optional[str]:
return self._extract_resource(consistencygroup, (CG_PROCEED_STATUS,),
exception.InvalidConsistencyGroup,
'consistencygroup')
def _extract_group(self, group):
def _extract_group(
self,
group: Optional[dict]) -> Optional[str]:
return self._extract_resource(group, (GROUP_PROCEED_STATUS,),
exception.InvalidGroup,
'group')
def _extract_cgsnapshot(self, cgsnapshot):
def _extract_cgsnapshot(
self,
cgsnapshot: Optional[dict]) -> Optional[str]:
return self._extract_resource(cgsnapshot, (CGSNAPSHOT_PROCEED_STATUS,),
exception.InvalidCgSnapshot,
'CGSNAPSHOT')
def _extract_snapshot(self, snapshot):
def _extract_snapshot(
self,
snapshot: Optional[dict]) -> Optional[str]:
return self._extract_resource(snapshot, (SNAPSHOT_PROCEED_STATUS,),
exception.InvalidSnapshot, 'snapshot')
def _extract_source_volume(self, source_volume):
def _extract_source_volume(
self,
source_volume: Optional[dict]) -> Optional[str]:
return self._extract_resource(source_volume, (SRC_VOL_PROCEED_STATUS,),
exception.InvalidVolume, 'source volume')
def _extract_backup(self, backup):
def _extract_backup(
self,
backup: Optional[dict]) -> Optional[str]:
return self._extract_resource(backup, (BACKUP_PROCEED_STATUS,),
exception.InvalidBackup,
'backup')
@staticmethod
def _extract_size(size, source_volume, snapshot, backup):
def _extract_size(size: int,
source_volume: Optional[objects.Volume],
snapshot: Optional[objects.Snapshot],
backup: Optional[objects.Backup]) -> int:
"""Extracts and validates the volume size.
This function will validate or when not provided fill in the provided
@ -146,7 +170,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
validation on the size that is found and returns said validated size.
"""
def validate_snap_size(size):
def validate_snap_size(size: int) -> None:
if snapshot and size < snapshot.volume_size:
msg = _("Volume size '%(size)s'GB cannot be smaller than"
" the snapshot size %(snap_size)sGB. "
@ -155,7 +179,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
'snap_size': snapshot.volume_size}
raise exception.InvalidInput(reason=msg)
def validate_source_size(size):
def validate_source_size(size: int) -> None:
if source_volume and size < source_volume['size']:
msg = _("Volume size '%(size)s'GB cannot be smaller than "
"original volume size %(source_size)sGB. "
@ -164,7 +188,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
'source_size': source_volume['size']}
raise exception.InvalidInput(reason=msg)
def validate_backup_size(size):
def validate_backup_size(size: int) -> None:
if backup and size < backup['size']:
msg = _("Volume size %(size)sGB cannot be smaller than "
"the backup size %(backup_size)sGB. "
@ -173,7 +197,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
'backup_size': backup['size']}
raise exception.InvalidInput(reason=msg)
def validate_int(size):
def validate_int(size: int) -> None:
if not isinstance(size, int) or size <= 0:
msg = _("Volume size '%(size)s' must be an integer and"
" greater than 0") % {'size': size}
@ -206,7 +230,10 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
func(size)
return size
def _get_image_metadata(self, context, image_id, size):
def _get_image_metadata(self,
context: context.RequestContext,
image_id: Optional[str],
size: int) -> Optional[Dict[str, Any]]:
"""Checks image existence and validates the image metadata.
Returns: image metadata or None
@ -224,8 +251,13 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
return image_meta
def _extract_availability_zones(self, availability_zone, snapshot,
source_volume, group, volume_type=None):
def _extract_availability_zones(
self,
availability_zone: Optional[str],
snapshot,
source_volume,
group: Optional[dict],
volume_type: Dict[str, Any] = None) -> Tuple[List[str], bool]:
"""Extracts and returns a validated availability zone list.
This function will extract the availability zone (if not provided) from
@ -238,6 +270,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
volume_type)
type_az_configured = type_azs is not None
if type_az_configured:
assert type_azs is not None
safe_azs = list(
set(type_azs).intersection(self.availability_zones))
if not safe_azs:
@ -317,11 +350,17 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
else:
return safe_azs, refresh_az
def _get_encryption_key_id(self, key_manager, context, volume_type_id,
snapshot, source_volume,
image_metadata):
encryption_key_id = None
def _get_encryption_key_id(
self,
key_manager,
context: context.RequestContext,
volume_type_id: str,
snapshot: Optional[objects.Snapshot],
source_volume: Optional[objects.Volume],
image_metadata: Optional[Dict[str, Any]]) -> Optional[str]:
if volume_types.is_encrypted(context, volume_type_id):
encryption_key_id = None
if snapshot is not None: # creating from snapshot
encryption_key_id = snapshot['encryption_key_id']
elif source_volume is not None: # cloning volume
@ -335,22 +374,29 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# be copied because the key is deleted when the volume is deleted.
# Clone the existing key and associate a separate -- but
# identical -- key with each volume.
new_encryption_key_id: Optional[str]
if encryption_key_id is not None:
encryption_key_id = volume_utils.clone_encryption_key(
new_encryption_key_id = volume_utils.clone_encryption_key(
context,
key_manager,
encryption_key_id)
else:
encryption_key_id = volume_utils.create_encryption_key(
new_encryption_key_id = volume_utils.create_encryption_key(
context,
key_manager,
volume_type_id)
return encryption_key_id
return new_encryption_key_id
else:
return None
@staticmethod
def _get_volume_type(context, volume_type,
source_volume, snapshot, image_volume_type_id):
def _get_volume_type(
context: context.RequestContext,
volume_type: Optional[Any],
source_volume: Optional[objects.Volume],
snapshot: Optional[objects.Snapshot],
image_volume_type_id: Optional[str]) -> objects.VolumeType:
"""Returns a volume_type object or raises. Never returns None."""
if volume_type:
return volume_type
@ -380,10 +426,22 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
# otherwise, use the default volume type
return volume_types.get_default_volume_type(context)
def execute(self, context, size, snapshot, image_id, source_volume,
availability_zone, volume_type, metadata, key_manager,
consistencygroup, cgsnapshot, group, group_snapshot, backup,
multiattach=False):
def execute(self,
context: context.RequestContext,
size: int,
snapshot: Optional[dict],
image_id: Optional[str],
source_volume: Optional[dict],
availability_zone: Optional[str],
volume_type,
metadata,
key_manager,
consistencygroup,
cgsnapshot,
group,
group_snapshot,
backup: Optional[dict],
multiattach: bool = False) -> Dict[str, Any]:
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
@ -443,7 +501,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
if multiattach:
context.authorize(policy.MULTIATTACH_POLICY)
specs = {}
specs: Optional[Dict] = {}
if volume_type_id:
qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
if qos_specs['qos_specs']:
@ -489,7 +547,7 @@ class EntryCreateTask(flow_utils.CinderTask):
default_provides = set(['volume_properties', 'volume_id', 'volume'])
def __init__(self):
def __init__(self) -> None:
requires = ['description', 'metadata',
'name', 'reservations', 'size', 'snapshot_id',
'source_volid', 'volume_type_id', 'encryption_key_id',
@ -498,7 +556,10 @@ class EntryCreateTask(flow_utils.CinderTask):
super(EntryCreateTask, self).__init__(addons=[ACTION],
requires=requires)
def execute(self, context, optional_args, **kwargs):
def execute(self,
context: context.RequestContext,
optional_args: dict,
**kwargs) -> Dict[str, Any]:
"""Creates a database entry for the given inputs and returns details.
Accesses the database and creates a new entry for the to be created
@ -569,7 +630,11 @@ class EntryCreateTask(flow_utils.CinderTask):
'volume': volume,
}
def revert(self, context, result, optional_args, **kwargs):
def revert(self,
context: context.RequestContext,
result: Union[dict, ft.Failure],
optional_args: dict,
**kwargs) -> None:
if isinstance(result, ft.Failure):
# We never produced a result and therefore can't destroy anything.
return
@ -610,8 +675,12 @@ class QuotaReserveTask(flow_utils.CinderTask):
def __init__(self):
super(QuotaReserveTask, self).__init__(addons=[ACTION])
def execute(self, context, size, volume_type_id, group_snapshot,
optional_args):
def execute(self,
context: context.RequestContext,
size: int,
volume_type_id,
group_snapshot: Optional[objects.Snapshot],
optional_args: dict) -> Optional[dict]:
try:
values = {'per_volume_gigabytes': size}
QUOTAS.limit_check(context, project_id=context.project_id,
@ -638,8 +707,12 @@ class QuotaReserveTask(flow_utils.CinderTask):
quota_utils.process_reserve_over_quota(context, e,
resource='volumes',
size=size)
return None # TODO: is this correct?
def revert(self, context, result, optional_args, **kwargs):
def revert(self,
context: context.RequestContext,
result: Union[dict, ft.Failure],
optional_args: dict, **kwargs) -> None:
# We never produced a result and therefore can't destroy anything.
if isinstance(result, ft.Failure):
return
@ -678,14 +751,18 @@ class QuotaCommitTask(flow_utils.CinderTask):
def __init__(self):
super(QuotaCommitTask, self).__init__(addons=[ACTION])
def execute(self, context, reservations, volume_properties,
optional_args):
def execute(self, context: context.RequestContext,
reservations, volume_properties,
optional_args: dict) -> dict:
QUOTAS.commit(context, reservations)
# updating is_quota_committed attribute of optional_args dictionary
optional_args['is_quota_committed'] = True
return {'volume_properties': volume_properties}
def revert(self, context, result, **kwargs):
def revert(self,
context: context.RequestContext,
result: Union[dict, ft.Failure],
**kwargs) -> None:
# We never produced a result and therefore can't destroy anything.
if isinstance(result, ft.Failure):
return
@ -717,7 +794,7 @@ class VolumeCastTask(flow_utils.CinderTask):
created volume.
"""
def __init__(self, scheduler_rpcapi, volume_rpcapi, db):
def __init__(self, scheduler_rpcapi, volume_rpcapi, db) -> None:
requires = ['image_id', 'scheduler_hints', 'snapshot_id',
'source_volid', 'volume_id', 'volume', 'volume_type',
'volume_properties', 'consistencygroup_id',
@ -729,7 +806,10 @@ class VolumeCastTask(flow_utils.CinderTask):
self.scheduler_rpcapi = scheduler_rpcapi
self.db = db
def _cast_create_volume(self, context, request_spec, filter_properties):
def _cast_create_volume(self,
context: context.RequestContext,
request_spec: Dict[str, Any],
filter_properties: Dict) -> None:
source_volid = request_spec['source_volid']
volume = request_spec['volume']
snapshot_id = request_spec['snapshot_id']
@ -775,7 +855,7 @@ class VolumeCastTask(flow_utils.CinderTask):
filter_properties=filter_properties,
backup_id=backup_id)
def execute(self, context, **kwargs):
def execute(self, context: context.RequestContext, **kwargs) -> None:
scheduler_hints = kwargs.pop('scheduler_hints', None)
db_vt = kwargs.pop('volume_type')
kwargs['volume_type'] = None
@ -789,7 +869,12 @@ class VolumeCastTask(flow_utils.CinderTask):
filter_properties['scheduler_hints'] = scheduler_hints
self._cast_create_volume(context, request_spec, filter_properties)
def revert(self, context, result, flow_failures, volume, **kwargs):
def revert(self,
context: context.RequestContext,
result: Union[dict, ft.Failure],
flow_failures,
volume: objects.Volume,
**kwargs) -> None:
if isinstance(result, ft.Failure):
return

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Callable
from oslo_log import log as logging
from cinder import exception
@ -28,7 +30,7 @@ LOG = logging.getLogger(__name__)
REASON_LENGTH = 128
def make_pretty_name(method):
def make_pretty_name(method: Callable) -> str:
"""Makes a pretty name for a function/method."""
meth_pieces = [method.__name__]
# If its an instance method attempt to tack on the class name

View File

@ -12,6 +12,8 @@
import binascii
import traceback
import typing
from typing import Any, Dict, Optional, Tuple # noqa: H301
from castellan import key_manager
import os_brick.initiator.connectors
@ -381,7 +383,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
default_provides = 'volume_spec'
def __init__(self, manager, db, driver, image_volume_cache=None):
def __init__(self, manager, db, driver, image_volume_cache=None) -> None:
super(CreateVolumeFromSpecTask, self).__init__(addons=[ACTION])
self.manager = manager
self.db = db
@ -450,8 +452,11 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
'vol_id': volume.id})
raise exception.MetadataCopyFailure(reason=ex)
def _create_from_snapshot(self, context, volume, snapshot_id,
**kwargs):
def _create_from_snapshot(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
snapshot_id: str,
**kwargs: Any) -> dict:
snapshot = objects.Snapshot.get_by_id(context, snapshot_id)
try:
model_update = self.driver.create_volume_from_snapshot(volume,
@ -618,7 +623,10 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
return model_update
def _create_from_source_volume(self, context, volume, source_volid,
def _create_from_source_volume(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
source_volid: str,
**kwargs):
# NOTE(harlowja): if the source volume has disappeared this will be our
# detection of that since this database call should fail.
@ -645,8 +653,11 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
context, volume, source_volid=srcvol_ref.id)
return model_update
def _capture_volume_image_metadata(self, context, volume_id,
image_id, image_meta):
def _capture_volume_image_metadata(self,
context: cinder_context.RequestContext,
volume_id: str,
image_id: str,
image_meta: dict) -> None:
volume_metadata = volume_utils.get_volume_image_metadata(
image_id, image_meta)
LOG.debug("Creating volume glance metadata for volume %(volume_id)s"
@ -656,7 +667,11 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
self.db.volume_glance_metadata_bulk_create(context, volume_id,
volume_metadata)
def _clone_image_volume(self, context, volume, image_location, image_meta):
def _clone_image_volume(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location,
image_meta: Dict[str, Any]) -> Tuple[None, bool]:
"""Create a volume efficiently from an existing image.
Returns a dict of volume properties eg. provider_location,
@ -713,8 +728,12 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
{'id': image_volume['id']})
return None, False
def _create_from_image_download(self, context, volume, image_location,
image_meta, image_service):
def _create_from_image_download(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location,
image_meta: Dict[str, Any],
image_service) -> dict:
# TODO(harlowja): what needs to be rolled back in the clone if this
# volume create fails?? Likely this should be a subflow or broken
# out task in the future. That will bring up the question of how
@ -743,14 +762,20 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
{'volume_id': volume.id})
return model_update
def _create_from_image_cache(self, context, internal_context, volume,
image_id, image_meta):
def _create_from_image_cache(
self,
context: cinder_context.RequestContext,
internal_context: cinder_context.RequestContext,
volume: objects.Volume,
image_id: str,
image_meta: Dict[str, Any]) -> Tuple[None, bool]:
"""Attempt to create the volume using the image cache.
Best case this will simply clone the existing volume in the cache.
Worst case the image is out of date and will be evicted. In that case
a clone will not be created and the image must be downloaded again.
"""
assert self.image_volume_cache is not None
LOG.debug('Attempting to retrieve cache entry for image = '
'%(image_id)s on host %(host)s.',
{'image_id': image_id, 'host': volume.host})
@ -787,9 +812,15 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
return None, False
@coordination.synchronized('{image_id}')
def _prepare_image_cache_entry(self, context, volume,
image_location, image_id,
image_meta, image_service):
def _prepare_image_cache_entry(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location: str,
image_id: str,
image_meta: Dict[str, Any],
image_service) -> Tuple[Optional[dict],
bool]:
assert self.image_volume_cache is not None
internal_context = cinder_context.get_internal_tenant_context()
if not internal_context:
return None, False
@ -822,10 +853,15 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
update_cache=True)
return model_update, True
def _create_from_image_cache_or_download(self, context, volume,
image_location, image_id,
image_meta, image_service,
update_cache=False):
def _create_from_image_cache_or_download(
self,
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location,
image_id: str,
image_meta: Dict[str, Any],
image_service,
update_cache: bool = False) -> Optional[dict]:
# NOTE(e0ne): check for free space in image_conversion_dir before
# image downloading.
# NOTE(mnaser): This check *only* happens if the backend is not able
@ -850,7 +886,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
should_create_cache_entry = False
cloned = False
model_update = None
if self.image_volume_cache:
if self.image_volume_cache is not None:
internal_context = cinder_context.get_internal_tenant_context()
if not internal_context:
LOG.info('Unable to get Cinder internal context, will '
@ -968,9 +1004,14 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
return model_update
@utils.retry(exception.SnapshotLimitReached, retries=1)
def _create_from_image(self, context, volume,
image_location, image_id, image_meta,
image_service, **kwargs):
def _create_from_image(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location,
image_id: str,
image_meta: Dict[str, Any],
image_service,
**kwargs: Any) -> Optional[dict]:
LOG.debug("Cloning %(volume_id)s from image %(image_id)s "
" at location %(image_location)s.",
{'volume_id': volume.id,
@ -1036,9 +1077,14 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
self._handle_bootable_volume_glance_meta(context, volume,
image_id=image_id,
image_meta=image_meta)
typing.cast(dict, model_update)
return model_update
def _create_from_backup(self, context, volume, backup_id, **kwargs):
def _create_from_backup(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
backup_id: str,
**kwargs) -> Tuple[Dict, bool]:
LOG.info("Creating volume %(volume_id)s from backup %(backup_id)s.",
{'volume_id': volume.id,
'backup_id': backup_id})
@ -1077,7 +1123,10 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
'backup_id': backup_id})
return ret, need_update_volume
def _create_raw_volume(self, context, volume, **kwargs):
def _create_raw_volume(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
**kwargs: Any):
try:
ret = self.driver.create_volume(volume)
except Exception as ex:
@ -1092,7 +1141,10 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
self._cleanup_cg_in_volume(volume)
return ret
def execute(self, context, volume, volume_spec):
def execute(self,
context: cinder_context.RequestContext,
volume: objects.Volume,
volume_spec) -> dict:
volume_spec = dict(volume_spec)
volume_id = volume_spec.pop('volume_id', None)
if not volume_id:
@ -1119,6 +1171,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
"with specification: %(volume_spec)s",
{'volume_spec': volume_spec, 'volume_id': volume_id,
'create_type': create_type})
model_update: dict
if create_type == 'raw':
model_update = self._create_raw_volume(
context, volume, **volume_spec)
@ -1155,7 +1208,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
raise
return volume_spec
def _cleanup_cg_in_volume(self, volume):
def _cleanup_cg_in_volume(self, volume: objects.Volume) -> None:
# NOTE(xyang): Cannot have both group_id and consistencygroup_id.
# consistencygroup_id needs to be removed to avoid DB reference
# error because there isn't an entry in the consistencygroups table.

View File

@ -875,7 +875,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext,
volume: objects.volume.Volume,
unmanage_only=False,
cascade=False) -> None:
cascade=False):
"""Deletes and unexports volume.
1. Delete a volume(normal case)
@ -1277,7 +1277,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext,
snapshot: objects.Snapshot,
unmanage_only: bool = False,
handle_quota: bool = True) -> None:
handle_quota: bool = True):
"""Deletes and unexports snapshot."""
context = context.elevated()
snapshot._context = context

View File

@ -312,7 +312,8 @@ def remove_volume_type_access(context, volume_type_id, project_id):
'access.remove')
def is_encrypted(context, volume_type_id):
def is_encrypted(context: context.RequestContext,
volume_type_id: str) -> bool:
return get_volume_type_encryption(context, volume_type_id) is not None

View File

@ -723,8 +723,9 @@ def get_all_volume_groups(vg_name=None) -> list:
vg_name)
def extract_availability_zones_from_volume_type(volume_type) \
-> Optional[list]:
def extract_availability_zones_from_volume_type(
volume_type: Union['objects.VolumeType', dict]) \
-> Optional[List[str]]:
if not volume_type:
return None
extra_specs = volume_type.get('extra_specs', {})
@ -1026,6 +1027,7 @@ def create_encryption_key(context: context.RequestContext,
raise exception.Invalid(message="Key manager error")
typing.cast(str, encryption_key_id)
return encryption_key_id

View File

@ -5,6 +5,8 @@ cinder/exception.py
cinder/manager.py
cinder/utils.py
cinder/volume/__init__.py
cinder/volume/flows/api/create_volume.py
cinder/volume/flows/manager/create_volume.py
cinder/volume/manager.py
cinder/volume/volume_types.py
cinder/volume/volume_utils.py