Use modern type annotation format for collections

This works in Python 3.7 or greater and is
cleaner looking.

See PEP-585 for more info.
https://peps.python.org/pep-0585/

Change-Id: I4c9da881cea1a3638da504c4b79ca8db13851b06
This commit is contained in:
Eric Harney 2022-04-29 10:14:32 -04:00
parent 8f25145772
commit 5179e4f6bf
20 changed files with 255 additions and 221 deletions

@ -17,9 +17,11 @@
"""Handles all requests relating to the volume backups service."""
from __future__ import annotations
from datetime import datetime
import random
from typing import List, Optional # noqa: H301
from typing import Optional
from eventlet import greenthread
from oslo_config import cfg
@ -123,8 +125,8 @@ class API(base.Base):
marker: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_keys: Optional[List[str]] = None,
sort_dirs: Optional[List[str]] = None) -> 'objects.BackupList':
sort_keys: Optional[list[str]] = None,
sort_dirs: Optional[list[str]] = None) -> 'objects.BackupList':
context.authorize(policy.GET_ALL_POLICY)
search_opts = search_opts or {}
@ -200,7 +202,7 @@ class API(base.Base):
raise exception.ServiceNotFound(service_id='cinder-backup')
return backup_host
def _list_backup_services(self) -> List['objects.Service']:
def _list_backup_services(self) -> list['objects.Service']:
"""List all enabled backup services.
:returns: list -- hosts for services that are enabled for backup.

@ -17,8 +17,10 @@
"""RequestContext: context for requests that persist through all of cinder."""
from __future__ import annotations
import copy
from typing import Any, Dict, Optional # noqa: H301
from typing import Any, Optional # noqa: H301
from keystoneauth1.access import service_catalog as ksa_service_catalog
from keystoneauth1 import plugin
@ -162,7 +164,7 @@ class RequestContext(context.RequestContext):
read_deleted = property(_get_read_deleted, _set_read_deleted,
_del_read_deleted)
def to_dict(self) -> Dict[str, Any]:
def to_dict(self) -> dict[str, Any]:
result = super(RequestContext, self).to_dict()
result['user_id'] = self.user_id
result['project_id'] = self.project_id

@ -10,8 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import os
from typing import Any, List # noqa: H301
from typing import Any
from oslo_log import log as logging
# For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
@ -25,7 +27,7 @@ from cinder import exception
LOG = logging.getLogger(__name__)
def _make_task_name(cls, addons: List[str] = None) -> str:
def _make_task_name(cls, addons: list[str] = None) -> str:
"""Makes a pretty name for a task class."""
base_name = ".".join([cls.__module__, cls.__name__])
extra = ''
@ -41,11 +43,11 @@ class CinderTask(task.Task):
implement the given task as the task name.
"""
def __init__(self, addons: List[str] = None, **kwargs: Any) -> None:
def __init__(self, addons: list[str] = None, **kwargs: Any) -> None:
super(CinderTask, self).__init__(self.make_name(addons), **kwargs)
@classmethod
def make_name(cls, addons: List[str] = None) -> str:
def make_name(cls, addons: list[str] = None) -> str:
return _make_task_name(cls, addons)

@ -16,6 +16,8 @@
"""Implementation of an image service that uses Glance as the backend"""
from __future__ import annotations # Remove when only supporting python 3.9+
import copy
import itertools
import random
@ -23,8 +25,7 @@ import shutil
import sys
import textwrap
import time
from typing import (Any, Callable, Dict, Iterable, List, # noqa: H301
NoReturn, Optional, Tuple) # noqa: H301
from typing import (Any, Callable, Iterable, NoReturn, Optional) # noqa: H301
import urllib
import urllib.parse
@ -93,7 +94,7 @@ _SESSION = None
LOG = logging.getLogger(__name__)
def _parse_image_ref(image_href: str) -> Tuple[str, str, bool]:
def _parse_image_ref(image_href: str) -> tuple[str, str, bool]:
"""Parse an image href into composite parts.
:param image_href: href of an image
@ -283,7 +284,7 @@ class GlanceImageService(object):
def detail(self,
context: context.RequestContext,
**kwargs: str) -> List[dict]:
**kwargs: str) -> list[dict]:
"""Calls out to Glance for a list of detailed image information."""
params = self._extract_query_params(kwargs)
try:
@ -298,7 +299,7 @@ class GlanceImageService(object):
return _images
def _extract_query_params(self, params: dict) -> Dict[str, Any]:
def _extract_query_params(self, params: dict) -> dict[str, Any]:
_params = {}
accepted_params = ('filters', 'marker', 'limit',
'sort_key', 'sort_dir')
@ -310,7 +311,7 @@ class GlanceImageService(object):
def list_members(self,
context: context.RequestContext,
image_id: str) -> List[dict]:
image_id: str) -> list[dict]:
"""Returns a list of dicts with image member data."""
try:
return self._client.call(context,
@ -330,7 +331,7 @@ class GlanceImageService(object):
def show(self,
context: context.RequestContext,
image_id: str) -> Dict[str, Any]:
image_id: str) -> dict[str, Any]:
"""Returns a dict with image data for the given opaque image id."""
try:
image = self._client.call(context, 'get', image_id)
@ -345,7 +346,7 @@ class GlanceImageService(object):
def get_location(self,
context: context.RequestContext,
image_id: str) -> Tuple[Optional[str], Any]:
image_id: str) -> tuple[Optional[str], Any]:
"""Get backend storage location url.
Returns a tuple containing the direct url and locations representing
@ -421,8 +422,8 @@ class GlanceImageService(object):
def create(self,
context: context.RequestContext,
image_meta: Dict[str, Any],
data=None) -> Dict[str, Any]:
image_meta: dict[str, Any],
data=None) -> dict[str, Any]:
"""Store the image data and return the new image object."""
sent_service_image_meta = self._translate_to_glance(image_meta)
@ -497,7 +498,7 @@ class GlanceImageService(object):
def _translate_from_glance(self,
context: context.RequestContext,
image: Dict[str, Any]) -> dict:
image: dict[str, Any]) -> dict:
"""Get image metadata from glance image.
Extract metadata from image and convert it's properties
@ -536,7 +537,7 @@ class GlanceImageService(object):
return image_meta
@staticmethod
def _translate_to_glance(image_meta: Dict[str, Any]) -> Dict[str, Any]:
def _translate_to_glance(image_meta: dict[str, Any]) -> dict[str, Any]:
image_meta = _convert_to_string(image_meta)
image_meta = _remove_read_only(image_meta)
@ -684,7 +685,7 @@ def _translate_plain_exception(exc_value: BaseException) -> BaseException:
def get_remote_image_service(context: context.RequestContext,
image_href) -> Tuple[GlanceImageService, str]:
image_href) -> tuple[GlanceImageService, str]:
"""Create an image_service and parse the id from the given image_href.
The image_href param can be an href of the form

@ -23,6 +23,7 @@ Some slight modifications, but at some point
we should look at maybe pushing this up to Oslo
"""
from __future__ import annotations # Remove when only supporting python 3.9+
import contextlib
import errno
@ -31,8 +32,7 @@ import math
import os
import re
import tempfile
from typing import (ContextManager, Dict, Generator, # noqa: H301
List, Optional, Tuple)
from typing import ContextManager, Generator, Optional # noqa: H301
import cryptography
from cursive import exception as cursive_exception
@ -152,7 +152,7 @@ def qemu_img_info(path: str,
return info
def get_qemu_img_version() -> Optional[List[int]]:
def get_qemu_img_version() -> Optional[list[int]]:
"""The qemu-img version will be cached until the process is restarted."""
global QEMU_IMG_VERSION
@ -187,8 +187,7 @@ def _get_qemu_convert_luks_cmd(src: str,
cipher_spec: Optional[dict] = None,
passphrase_file: Optional[str] = None,
src_passphrase_file: Optional[str] = None) \
-> List[str]:
-> list[str]:
cmd = ['qemu-img', 'convert']
if prefix:
@ -223,8 +222,7 @@ def _get_qemu_convert_cmd(src: str,
passphrase_file: Optional[str] = None,
compress: bool = False,
src_passphrase_file: Optional[str] = None) \
-> List[str]:
-> list[str]:
if src_passphrase_file is not None:
if passphrase_file is None:
message = _("Can't create unencrypted volume %(format)s "
@ -290,7 +288,7 @@ def _get_qemu_convert_cmd(src: str,
return cmd
def _get_version_from_string(version_string: str) -> List[int]:
def _get_version_from_string(version_string: str) -> list[int]:
return [int(x) for x in version_string.split('.')]
@ -451,7 +449,7 @@ def resize_image(source: str,
run_as_root: bool = False,
file_format: Optional[str] = None) -> None:
"""Changes the virtual size of the image."""
cmd: Tuple[str, ...]
cmd: tuple[str, ...]
if file_format:
cmd = ('qemu-img', 'resize', '-f', file_format, source, '%sG' % size)
else:
@ -922,7 +920,7 @@ def extract_targz(archive_name: str, target: str) -> None:
utils.execute('tar', '-xzf', archive_name, '-C', target)
def fix_vhd_chain(vhd_chain: List[str]) -> None:
def fix_vhd_chain(vhd_chain: list[str]) -> None:
for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
set_vhd_parent(child, parent)
@ -991,7 +989,7 @@ def temporary_dir() -> ContextManager[str]:
return utils.tempdir(dir=CONF.image_conversion_dir)
def coalesce_chain(vhd_chain: List[str]) -> str:
def coalesce_chain(vhd_chain: list[str]) -> str:
for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
with temporary_dir() as directory_for_journal:
size = get_vhd_size(child)
@ -1003,7 +1001,7 @@ def coalesce_chain(vhd_chain: List[str]) -> str:
return vhd_chain[-1]
def discover_vhd_chain(directory: str) -> List[str]:
def discover_vhd_chain(directory: str) -> list[str]:
counter = 0
chain = []
@ -1028,7 +1026,7 @@ def replace_xenserver_image_with_coalesced_vhd(image_file: str) -> None:
os.rename(coalesced, image_file)
def decode_cipher(cipher_spec: str, key_size: int) -> Dict[str, str]:
def decode_cipher(cipher_spec: str, key_size: int) -> dict[str, str]:
"""Decode a dm-crypt style cipher specification string
The assumed format being cipher-chainmode-ivmode, similar to that
@ -1060,7 +1058,7 @@ class TemporaryImages(object):
"""
def __init__(self, image_service: glance.GlanceImageService):
self.temporary_images: Dict[str, dict] = {}
self.temporary_images: dict[str, dict] = {}
self.image_service = image_service
image_service.temp_images = self

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
__all__ = [
'init',
'cleanup',
@ -26,7 +28,7 @@ __all__ = [
]
import functools
from typing import Tuple, Union # noqa: H301
from typing import Union
from oslo_config import cfg
from oslo_log import log as logging
@ -230,7 +232,7 @@ class RPCAPI(object):
return versions[-1]
def _get_cctxt(self,
version: Union[str, Tuple[str, ...]] = None,
version: Union[str, tuple[str, ...]] = None,
**kwargs):
"""Prepare client context

@ -17,8 +17,10 @@
Pluggable Weighing support
"""
from __future__ import annotations
import abc
from typing import Iterable, List, Optional # noqa: H301
from typing import Iterable, Optional # noqa: H301
from oslo_log import log as logging
@ -28,7 +30,7 @@ from cinder.scheduler import base_handler
LOG = logging.getLogger(__name__)
def normalize(weight_list: List[float],
def normalize(weight_list: list[float],
minval: Optional[float] = None,
maxval: Optional[float] = None) -> Iterable[float]:
"""Normalize the values in a list between 0 and 1.0.
@ -95,8 +97,8 @@ class BaseWeigher(object, metaclass=abc.ABCMeta):
"""Override in a subclass to specify a weight for a specific object."""
def weigh_objects(self,
weighed_obj_list: List[WeighedObject],
weight_properties: dict) -> List[float]:
weighed_obj_list: list[WeighedObject],
weight_properties: dict) -> list[float]:
"""Weigh multiple objects.
Override in a subclass if you need access to all objects in order
@ -130,8 +132,8 @@ class BaseWeightHandler(base_handler.BaseHandler):
def get_weighed_objects(self,
weigher_classes: list,
obj_list: List[WeighedObject],
weighing_properties: dict) -> List[WeighedObject]:
obj_list: list[WeighedObject],
weighing_properties: dict) -> list[WeighedObject]:
"""Return a sorted (descending), normalized list of WeighedObjects."""
if not obj_list:

@ -13,9 +13,11 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations # Remove when only supporting python 3.9+
import operator
import re
from typing import Callable, Dict # noqa: H301
from typing import Callable
import pyparsing
@ -168,7 +170,7 @@ class EvalTernaryOp(object):
class EvalFunction(object):
functions: Dict[str, Callable] = {
functions: dict[str, Callable] = {
"abs": abs,
"max": max,
"min": min,

@ -20,7 +20,9 @@ You can customize this scheduler by specifying your own volume Filters and
Weighing Functions.
"""
from typing import List, Optional # noqa: H301
from __future__ import annotations
from typing import Optional
from oslo_config import cfg
from oslo_log import log as logging
@ -375,9 +377,9 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_generic_group(
self, context: context.RequestContext,
group_spec: dict, request_spec_list: List[dict],
group_spec: dict, request_spec_list: list[dict],
group_filter_properties: Optional[dict] = None,
filter_properties_list: Optional[List[dict]] = None) -> list:
filter_properties_list: Optional[list[dict]] = None) -> list:
"""Finds backends that supports the group.
Returns a list of backends that meet the required specs,
@ -486,7 +488,7 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_by_group_type(
self, context: context.RequestContext, group_spec: dict,
group_filter_properties: Optional[dict] = None) \
-> List[WeighedHost]:
-> list[WeighedHost]:
"""Finds backends that supports the group type.
Returns a list of backends that meet the required specs,
@ -598,7 +600,7 @@ class FilterScheduler(driver.Scheduler):
return self._choose_top_backend_generic_group(weighed_backends)
def _choose_top_backend(self,
weighed_backends: List[WeighedHost],
weighed_backends: list[WeighedHost],
request_spec: dict):
top_backend = weighed_backends[0]
backend_state = top_backend.obj
@ -609,7 +611,7 @@ class FilterScheduler(driver.Scheduler):
def _choose_top_backend_generic_group(
self,
weighed_backends: List[WeighedHost]) -> WeighedHost:
weighed_backends: list[WeighedHost]) -> WeighedHost:
top_backend = weighed_backends[0]
backend_state = top_backend.obj
LOG.debug("Choosing %s", backend_state.backend_id)

@ -10,7 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, Optional # noqa: H301
from __future__ import annotations # Remove when only supporting python 3.9+
from typing import Any, Optional # noqa: H301
from oslo_log import log as logging
from oslo_utils import excutils
@ -49,7 +51,7 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
volume: objects.Volume,
snapshot_id: Optional[str],
image_id: Optional[str],
backup_id: Optional[str]) -> Dict[str, Any]:
backup_id: Optional[str]) -> dict[str, Any]:
# Create the full request spec using the volume object.
#
# NOTE(dulek): At this point, a volume can be deleted before it gets
@ -77,7 +79,7 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
volume: objects.Volume,
snapshot_id: Optional[str],
image_id: Optional[str],
backup_id: Optional[str]) -> Dict[str, Any]:
backup_id: Optional[str]) -> dict[str, Any]:
# For RPC version < 1.2 backward compatibility
if request_spec is None:
request_spec = self._populate_request_spec(volume,

@ -15,11 +15,12 @@
"""Manage backends in the current zone."""
from __future__ import annotations
from collections import abc
import random
import typing
from typing import (Any, Dict, Iterable, List, # noqa: H301
Optional, Type, Union)
from typing import (Any, Iterable, Optional, Type, Union) # noqa: H301
from oslo_config import cfg
from oslo_log import log as logging
@ -163,7 +164,7 @@ class BackendState(object):
self.service = ReadOnlyDict(service)
def update_from_volume_capability(self,
capability: Dict[str, Any],
capability: dict[str, Any],
service=None) -> None:
"""Update information about a host from its volume_node info.
@ -289,7 +290,7 @@ class BackendState(object):
'host': self.host})
del self.pools[pool]
def _append_backend_info(self, pool_cap: Dict[str, Any]) -> None:
def _append_backend_info(self, pool_cap: dict[str, Any]) -> None:
# Fill backend level info to pool if needed.
if not pool_cap.get('volume_backend_name', None):
pool_cap['volume_backend_name'] = self.volume_backend_name
@ -407,7 +408,7 @@ class PoolState(BackendState):
self.pools: dict = {}
def update_from_volume_capability(self,
capability: Dict[str, Any],
capability: dict[str, Any],
service=None) -> None:
"""Update information about a pool from its volume_node info."""
LOG.debug("Updating capabilities for %s: %s", self.host, capability)
@ -470,7 +471,7 @@ class HostManager(object):
def __init__(self):
self.service_states = {} # { <host|cluster>: {<service>: {cap k : v}}}
self.backend_state_map: Dict[str, BackendState] = {}
self.backend_state_map: dict[str, BackendState] = {}
self.backup_service_states = {}
self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.'
'filters')
@ -513,7 +514,7 @@ class HostManager(object):
def _choose_backend_weighers(
self,
weight_cls_names: Optional[List[str]]) -> list:
weight_cls_names: Optional[list[str]]) -> list:
"""Return a list of available weigher names.
This function checks input weigher names against a predefined set
@ -795,7 +796,7 @@ class HostManager(object):
def get_pools(self,
context: cinder_context.RequestContext,
filters: Optional[dict] = None) -> List[dict]:
filters: Optional[dict] = None) -> list[dict]:
"""Returns a dict of all pools on all hosts HostManager knows about."""
self._update_backend_state_map(context)
@ -858,7 +859,7 @@ class HostManager(object):
capa_new: dict,
updated_pools: Iterable[dict],
host: str,
timestamp) -> List[dict]:
timestamp) -> list[dict]:
pools = capa_new.get('pools')
usage = []
if pools and isinstance(pools, list):
@ -888,7 +889,7 @@ class HostManager(object):
def _get_pool_usage(self,
pool: dict,
host: str, timestamp) -> Dict[str, Any]:
host: str, timestamp) -> dict[str, Any]:
total = pool["total_capacity_gb"]
free = pool["free_capacity_gb"]
@ -967,7 +968,7 @@ class HostManager(object):
def _notify_capacity_usage(self,
context: cinder_context.RequestContext,
usage: List[dict]) -> None:
usage: list[dict]) -> None:
if usage:
for u in usage:
volume_utils.notify_about_capacity_usage(

@ -25,6 +25,8 @@
should be placed in volume_utils instead.
"""
from __future__ import annotations # Remove when only supporting python 3.9+
from collections import OrderedDict
import contextlib
import datetime
@ -42,8 +44,8 @@ import stat
import sys
import tempfile
import typing
from typing import Callable, Dict, Iterable, Iterator, List # noqa: H301
from typing import Optional, Tuple, Type, Union # noqa: H301
from typing import Callable, Iterable, Iterator # noqa: H301
from typing import Optional, Type, Union # noqa: H301
import eventlet
from eventlet import tpool
@ -165,15 +167,15 @@ def check_exclusive_options(
raise exception.InvalidInput(reason=msg)
def execute(*cmd: str, **kwargs: Union[bool, str]) -> Tuple[str, str]:
def execute(*cmd: str, **kwargs: Union[bool, str]) -> tuple[str, str]:
"""Convenience wrapper around oslo's execute() method."""
if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
kwargs['root_helper'] = get_root_helper()
return processutils.execute(*cmd, **kwargs)
def check_ssh_injection(cmd_list: List[str]) -> None:
ssh_injection_pattern: Tuple[str, ...] = ('`', '$', '|', '||', ';', '&',
def check_ssh_injection(cmd_list: list[str]) -> None:
ssh_injection_pattern: tuple[str, ...] = ('`', '$', '|', '||', ';', '&',
'&&', '>', '>>', '<')
# Check whether injection attacks exist
@ -208,7 +210,7 @@ def check_ssh_injection(cmd_list: List[str]) -> None:
def check_metadata_properties(
metadata: Optional[Dict[str, str]]) -> None:
metadata: Optional[dict[str, str]]) -> None:
"""Checks that the volume metadata properties are valid."""
if not metadata:
@ -235,7 +237,7 @@ def check_metadata_properties(
def last_completed_audit_period(unit: Optional[str] = None) -> \
Tuple[Union[datetime.datetime, datetime.timedelta],
tuple[Union[datetime.datetime, datetime.timedelta],
Union[datetime.datetime, datetime.timedelta]]:
"""This method gives you the most recently *completed* audit period.
@ -496,7 +498,7 @@ def get_file_size(path: str) -> int:
def _get_disk_of_partition(
devpath: str,
st: Optional[os.stat_result] = None) -> Tuple[str, os.stat_result]:
st: Optional[os.stat_result] = None) -> tuple[str, os.stat_result]:
"""Gets a disk device path and status from partition path.
Returns a disk device path from a partition device path, and stat for
@ -633,9 +635,9 @@ class retry_if_exit_code(tenacity.retry_if_exception):
def retry(retry_param: Union[None,
Type[Exception],
Tuple[Type[Exception], ...],
tuple[Type[Exception], ...],
int,
Tuple[int, ...]],
tuple[int, ...]],
interval: float = 1,
retries: int = 3,
backoff_rate: float = 2,

@ -16,11 +16,12 @@
"""Handles all requests relating to volumes."""
from __future__ import annotations
import ast
import collections
import datetime
from typing import (Any, DefaultDict, Dict, Iterable, List, # noqa: H301
Optional, Tuple, Union)
from typing import (Any, DefaultDict, Iterable, Optional, Union) # noqa: H301
from castellan import key_manager
from oslo_config import cfg
@ -139,7 +140,7 @@ class API(base.Base):
services = objects.ServiceList.get_all_by_topic(ctxt, topic)
az_data = [(s.availability_zone, s.disabled)
for s in services]
disabled_map: Dict[str, bool] = {}
disabled_map: dict[str, bool] = {}
for (az_name, disabled) in az_data:
tracked_disabled = disabled_map.get(az_name, True)
disabled_map[az_name] = tracked_disabled and disabled
@ -725,8 +726,8 @@ class API(base.Base):
search_opts: Optional[dict] = None,
marker: Optional[str] = None,
limit: Optional[int] = None,
sort_keys: Optional[List[str]] = None,
sort_dirs: Optional[List[str]] = None,
sort_keys: Optional[list[str]] = None,
sort_dirs: Optional[list[str]] = None,
offset: Optional[int] = None) -> objects.SnapshotList:
context.authorize(snapshot_policy.GET_ALL_POLICY)
@ -971,7 +972,7 @@ class API(base.Base):
utils.check_metadata_properties(metadata)
valid_status: Tuple[str, ...]
valid_status: tuple[str, ...]
valid_status = ('available',)
if force or allow_in_use:
valid_status = ('available', 'in-use')
@ -1118,7 +1119,7 @@ class API(base.Base):
context: context.RequestContext,
volume_list: objects.VolumeList) -> list:
reserve_opts_list = []
total_reserve_opts: Dict[str, int] = {}
total_reserve_opts: dict[str, int] = {}
try:
for volume in volume_list:
if CONF.no_snapshot_gb_quota:
@ -1155,7 +1156,7 @@ class API(base.Base):
name: str,
description: str,
cgsnapshot_id: str,
group_snapshot_id: Optional[str] = None) -> Dict[str, Any]:
group_snapshot_id: Optional[str] = None) -> dict[str, Any]:
options = {'volume_id': volume['id'],
'cgsnapshot_id': cgsnapshot_id,
'group_snapshot_id': group_snapshot_id,
@ -1176,7 +1177,7 @@ class API(base.Base):
volume: objects.Volume,
name: str,
description: str,
metadata: Optional[Dict[str, Any]] = None,
metadata: Optional[dict[str, Any]] = None,
cgsnapshot_id: Optional[str] = None,
group_snapshot_id: Optional[str] = None,
allow_in_use: bool = False) -> objects.Snapshot:
@ -1193,7 +1194,7 @@ class API(base.Base):
volume: objects.Volume,
name: str,
description: str,
metadata: Optional[Dict[str, Any]] = None) -> objects.Snapshot:
metadata: Optional[dict[str, Any]] = None) -> objects.Snapshot:
result = self._create_snapshot(context, volume, name, description,
True, metadata)
LOG.info("Snapshot force create request issued successfully.",
@ -1211,7 +1212,7 @@ class API(base.Base):
snapshot.assert_not_frozen()
# Build required conditions for conditional update
expected: Dict[str, Any] = {'cgsnapshot_id': None,
expected: dict[str, Any] = {'cgsnapshot_id': None,
'group_snapshot_id': None}
# If not force deleting we have status conditions
if not force:
@ -1237,7 +1238,7 @@ class API(base.Base):
def update_snapshot(self,
context: context.RequestContext,
snapshot: objects.Snapshot,
fields: Dict[str, Any]) -> None:
fields: dict[str, Any]) -> None:
context.authorize(snapshot_policy.UPDATE_POLICY,
target_obj=snapshot)
snapshot.update(fields)
@ -1256,7 +1257,7 @@ class API(base.Base):
def create_volume_metadata(self,
context: context.RequestContext,
volume: objects.Volume,
metadata: Dict[str, Any]) -> dict:
metadata: dict[str, Any]) -> dict:
"""Creates volume metadata."""
context.authorize(vol_meta_policy.CREATE_POLICY, target_obj=volume)
db_meta = self._update_volume_metadata(context, volume, metadata)
@ -1284,7 +1285,7 @@ class API(base.Base):
def _update_volume_metadata(self,
context: context.RequestContext,
volume: objects.Volume,
metadata: Dict[str, Any],
metadata: dict[str, Any],
delete: bool = False,
meta_type=common.METADATA_TYPES.user) -> dict:
if volume['status'] in ('maintenance', 'uploading'):
@ -1298,7 +1299,7 @@ class API(base.Base):
def update_volume_metadata(self,
context: context.RequestContext,
volume: objects.Volume,
metadata: Dict[str, Any],
metadata: dict[str, Any],
delete: bool = False,
meta_type=common.METADATA_TYPES.user) -> dict:
"""Updates volume metadata.
@ -1320,7 +1321,7 @@ class API(base.Base):
def update_volume_admin_metadata(self,
context: context.RequestContext,
volume: objects.Volume,
metadata: Dict[str, Any],
metadata: dict[str, Any],
delete: Optional[bool] = False,
add: Optional[bool] = True,
update: Optional[bool] = True) -> dict:
@ -1369,7 +1370,7 @@ class API(base.Base):
def update_snapshot_metadata(self,
context: context.RequestContext,
snapshot: objects.Snapshot,
metadata: Dict[str, Any],
metadata: dict[str, Any],
delete: bool = False) -> dict:
"""Updates or creates snapshot metadata.
@ -1410,7 +1411,7 @@ class API(base.Base):
def get_volume_image_metadata(self,
context: context.RequestContext,
volume: objects.Volume) -> Dict[str, str]:
volume: objects.Volume) -> dict[str, str]:
context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume)
db_data = self.db.volume_glance_metadata_get(context, volume['id'])
LOG.info("Get volume image-metadata completed successfully.",
@ -1420,7 +1421,7 @@ class API(base.Base):
def get_list_volumes_image_metadata(
self,
context: context.RequestContext,
volume_id_list: List[str]) -> DefaultDict[str, str]:
volume_id_list: list[str]) -> DefaultDict[str, str]:
db_data = self.db.volume_glance_metadata_list_get(context,
volume_id_list)
results: collections.defaultdict = collections.defaultdict(dict)
@ -1458,8 +1459,8 @@ class API(base.Base):
def copy_volume_to_image(self,
context: context.RequestContext,
volume: objects.Volume,
metadata: Dict[str, str],
force: bool) -> Dict[str, Optional[str]]:
metadata: dict[str, str],
force: bool) -> dict[str, Optional[str]]:
"""Create a new image from the specified volume."""
if not CONF.enable_force_upload and force:
LOG.info("Force upload to image is disabled, "
@ -2069,8 +2070,8 @@ class API(base.Base):
marker: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_keys: Optional[List[str]] = None,
sort_dirs: Optional[List[str]] = None):
sort_keys: Optional[list[str]] = None,
sort_dirs: Optional[list[str]] = None):
svc = self._get_service_by_host_cluster(context, host, cluster_name)
return self.volume_rpcapi.get_manageable_volumes(context, svc,
marker, limit,
@ -2110,8 +2111,8 @@ class API(base.Base):
marker: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_keys: Optional[List[str]] = None,
sort_dirs: Optional[List[str]] = None) -> List[dict]:
sort_keys: Optional[list[str]] = None,
sort_dirs: Optional[list[str]] = None) -> list[dict]:
svc = self._get_service_by_host_cluster(context, host, cluster_name,
'snapshot')
return self.volume_rpcapi.get_manageable_snapshots(context, svc,

@ -13,6 +13,8 @@
# under the License.
"""RADOS Block Device Driver"""
from __future__ import annotations
import binascii
import errno
import json
@ -20,7 +22,7 @@ import math
import os
import tempfile
import typing
from typing import Any, Dict, List, Optional, Tuple, Union # noqa: H301
from typing import Any, Optional, Union # noqa: H301
import urllib.parse
from castellan import key_manager
@ -163,7 +165,7 @@ class RBDVolumeProxy(object):
pool: Optional[str] = None,
snapshot: Optional[str] = None,
read_only: bool = False,
remote: Optional[Dict[str, str]] = None,
remote: Optional[dict[str, str]] = None,
timeout: Optional[int] = None,
client: 'rados.Rados' = None,
ioctx: 'rados.Ioctx' = None):
@ -254,7 +256,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
**kwargs) -> None:
super(RBDDriver, self).__init__(*args, **kwargs)
self.configuration.append_config_values(RBD_OPTS)
self._stats: Dict[str, Union[str, bool]] = {}
self._stats: dict[str, Union[str, bool]] = {}
# allow overrides for testing
self.rados = kwargs.get('rados', rados)
self.rbd = kwargs.get('rbd', rbd)
@ -270,10 +272,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._backend_name = (self.configuration.volume_backend_name or
self.__class__.__name__)
self._active_backend_id: Optional[str] = active_backend_id
self._active_config: Dict[str, str] = {}
self._active_config: dict[str, str] = {}
self._is_replication_enabled = False
self._replication_targets: list = []
self._target_names: List[str] = []
self._target_names: list[str] = []
self._clone_v2_api_checked: bool = False
if self.rbd is not None:
@ -341,7 +343,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
' performance, fewer deletion issues')
def _get_target_config(self,
target_id: Optional[str]) -> Dict[str,
target_id: Optional[str]) -> dict[str,
str]:
"""Get a replication target from known replication targets."""
for target in self._replication_targets:
@ -371,7 +373,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self._target_names.append('default')
def _parse_replication_configs(self,
replication_devices: List[dict]) -> None:
replication_devices: list[dict]) -> None:
for replication_device in replication_devices:
if 'backend_id' not in replication_device:
msg = _('Missing backend_id in replication_device '
@ -396,8 +398,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def _get_config_tuple(
self,
remote: Optional[Dict[str, str]] = None) \
-> Tuple[Optional[str], Optional[str],
remote: Optional[dict[str, str]] = None) \
-> tuple[Optional[str], Optional[str],
Optional[str], Optional[str]]:
if not remote:
remote = self._active_config
@ -486,7 +488,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def RBDProxy(self) -> tpool.Proxy:
return tpool.Proxy(self.rbd.RBD())
def _ceph_args(self) -> List[str]:
def _ceph_args(self) -> list[str]:
args = []
name, conf, user, secret_uuid = self._get_config_tuple()
@ -504,13 +506,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
pool: Optional[str] = None,
remote: Optional[dict] = None,
timeout: Optional[int] = None) -> \
Tuple['rados.Rados', 'rados.Ioctx']:
tuple['rados.Rados', 'rados.Ioctx']:
@utils.retry(exception.VolumeBackendAPIException,
self.configuration.rados_connection_interval,
self.configuration.rados_connection_retries)
def _do_conn(pool: Optional[str],
remote: Optional[dict],
timeout: Optional[int]) -> Tuple['rados.Rados',
timeout: Optional[int]) -> tuple['rados.Rados',
'rados.Ioctx']:
name, conf, user, secret_uuid = self._get_config_tuple(remote)
@ -557,7 +559,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
client.shutdown()
@staticmethod
def _get_backup_snaps(rbd_image) -> List:
def _get_backup_snaps(rbd_image) -> list:
"""Get list of any backup snapshots that exist on this volume.
There should only ever be one but accept all since they need to be
@ -570,7 +572,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
from cinder.backup.drivers import ceph
return ceph.CephBackupDriver.get_backup_snaps(rbd_image)
def _get_mon_addrs(self) -> Tuple[List[str], List[str]]:
def _get_mon_addrs(self) -> tuple[list[str], list[str]]:
args = ['ceph', 'mon', 'dump', '--format=json']
args.extend(self._ceph_args())
out, _ = self._execute(*args)
@ -578,7 +580,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
if lines[0].startswith('dumped monmap epoch'):
lines = lines[1:]
monmap = json.loads('\n'.join(lines))
addrs: List[str] = [mon['addr'] for mon in monmap['mons']]
addrs: list[str] = [mon['addr'] for mon in monmap['mons']]
hosts = []
ports = []
for addr in addrs:
@ -614,8 +616,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
total_provisioned = math.ceil(float(total_provisioned) / units.Gi)
return total_provisioned
def _get_pool_stats(self) -> Union[Tuple[str, str],
Tuple[float, float]]:
def _get_pool_stats(self) -> Union[tuple[str, str],
tuple[float, float]]:
"""Gets pool free and total capacity in GiB.
Calculate free and total capacity of the pool based on the pool's
@ -753,7 +755,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def create_cloned_volume(
self,
volume: Volume,
src_vref: Volume) -> Optional[Dict[str, Optional[str]]]:
src_vref: Volume) -> Optional[dict[str, Optional[str]]]:
"""Create a cloned volume from another volume.
Since we are cloning from a volume and not a snapshot, we must first
@ -860,7 +862,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
LOG.debug("clone created successfully")
return volume_update
def _enable_replication(self, volume: Volume) -> Dict[str, str]:
def _enable_replication(self, volume: Volume) -> dict[str, str]:
"""Enable replication for a volume.
Returns required volume update.
@ -884,7 +886,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'replication_status': fields.ReplicationStatus.ENABLED,
'replication_driver_data': driver_data}
def _enable_multiattach(self, volume: Volume) -> Dict[str, str]:
def _enable_multiattach(self, volume: Volume) -> dict[str, str]:
vol_name = utils.convert_str(volume.name)
with RBDVolumeProxy(self, vol_name) as image:
image_features = image.features()
@ -894,7 +896,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return {'provider_location':
self._dumps({'saved_features': image_features})}
def _disable_multiattach(self, volume: Volume) -> Dict[str, None]:
def _disable_multiattach(self, volume: Volume) -> dict[str, None]:
vol_name = utils.convert_str(volume.name)
with RBDVolumeProxy(self, vol_name) as image:
try:
@ -932,7 +934,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def _setup_volume(
self,
volume: Volume,
volume_type: Optional[VolumeType] = None) -> Dict[str,
volume_type: Optional[VolumeType] = None) -> dict[str,
Optional[str]]:
if volume_type:
@ -1030,7 +1032,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
cmd.extend(self._ceph_args())
self._execute(*cmd)
def create_volume(self, volume: Volume) -> Dict[str, Any]:
def create_volume(self, volume: Volume) -> dict[str, Any]:
"""Creates a logical volume."""
if volume.encryption_key_id:
@ -1092,7 +1094,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
volume: Volume,
src_pool: str,
src_image: str,
src_snap: str) -> Dict[str, Optional[str]]:
src_snap: str) -> dict[str, Optional[str]]:
LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
dict(pool=src_pool, img=src_image, snap=src_snap,
dst=volume.name))
@ -1178,8 +1180,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
self,
volume: 'rbd.Image',
volume_name: str,
snap: Optional[str] = None) -> Union[Tuple[str, str, str],
Tuple[None, None, None]]:
snap: Optional[str] = None) -> Union[tuple[str, str, str],
tuple[None, None, None]]:
"""If volume is a clone, return its parent info.
Returns a tuple of (pool, parent, snap). A snapshot may optionally be
@ -1207,7 +1209,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def _get_children_info(self,
volume: 'rbd.Image',
snap: Optional[str]) -> List[tuple]:
snap: Optional[str]) -> list[tuple]:
"""List children for the given snapshot of a volume(image).
Returns a list of (pool, image).
@ -1429,7 +1431,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
with RBDVolumeProxy(self, volume.name) as image:
image.rollback_to_snap(snapshot.name)
def _disable_replication(self, volume: Volume) -> Dict[str, Optional[str]]:
def _disable_replication(self, volume: Volume) -> dict[str, Optional[str]]:
"""Disable replication on the given volume."""
vol_name = utils.convert_str(volume.name)
with RBDVolumeProxy(self, vol_name) as image:
@ -1450,18 +1452,18 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
context: context.RequestContext,
volume: Volume,
new_type: VolumeType,
diff: Union[Dict[str, Dict[str, str]], Dict[str, Dict], None],
host: Optional[Dict[str, str]]) -> Tuple[bool, dict]:
diff: Union[dict[str, dict[str, str]], dict[str, dict], None],
host: Optional[dict[str, str]]) -> tuple[bool, dict]:
"""Retype from one volume type to another on the same backend."""
return True, self._setup_volume(volume, new_type)
@staticmethod
def _dumps(obj: Dict[str, Union[bool, int]]) -> str:
def _dumps(obj: dict[str, Union[bool, int]]) -> str:
return json.dumps(obj, separators=(',', ':'), sort_keys=True)
def _exec_on_volume(self,
volume_name: str,
remote: Dict[str, str],
remote: dict[str, str],
operation: str,
*args: Any,
**kwargs: Any):
@ -1477,9 +1479,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def _failover_volume(self,
volume: Volume,
remote: Dict[str, str],
remote: dict[str, str],
is_demoted: bool,
replication_status: str) -> Dict[str, Any]:
replication_status: str) -> dict[str, Any]:
"""Process failover for a volume.
There are 2 different cases that will return different update values
@ -1519,8 +1521,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return error_result
def _demote_volumes(self,
volumes: List[Volume],
until_failure: bool = True) -> List[bool]:
volumes: list[Volume],
until_failure: bool = True) -> list[bool]:
"""Try to demote volumes on the current primary cluster."""
result = []
try_demoting = True
@ -1542,7 +1544,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def _get_failover_target_config(
self,
secondary_id: Optional[str] = None) -> Tuple[str, dict]:
secondary_id: Optional[str] = None) -> tuple[str, dict]:
if not secondary_id:
# In auto mode exclude failback and active
candidates = set(self._target_names).difference(
@ -1557,7 +1559,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
context: context.RequestContext,
volumes: list,
secondary_id: Optional[str] = None,
groups=None) -> Tuple[str, list, list]:
groups=None) -> tuple[str, list, list]:
"""Failover replicated volumes."""
LOG.info('RBD driver failover started.')
if not self._is_replication_enabled:
@ -1595,11 +1597,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def failover_host(self,
context: context.RequestContext,
volumes: List[Volume],
volumes: list[Volume],
secondary_id: Optional[str] = None,
groups: Optional[List] = None) -> Tuple[str,
List[Volume],
List]:
groups: Optional[list] = None) -> tuple[str,
list[Volume],
list]:
"""Failover to replication target.
This function combines calls to failover() and failover_completed() to
@ -1631,7 +1633,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def initialize_connection(self,
volume: Volume,
connector: dict) -> Dict[str, Any]:
connector: dict) -> dict[str, Any]:
hosts, ports = self._get_mon_addrs()
name, conf, user, secret_uuid = self._get_config_tuple()
data = {
@ -1662,7 +1664,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
pass
@staticmethod
def _parse_location(location: str) -> List[str]:
def _parse_location(location: str) -> list[str]:
prefix = 'rbd://'
if not location.startswith(prefix):
reason = _('Not stored in rbd')
@ -1729,7 +1731,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
volume: Volume,
image_location: Optional[list],
image_meta: dict,
image_service) -> Tuple[dict, bool]:
image_service) -> tuple[dict, bool]:
if image_location:
# Note: image_location[0] is glance image direct_url.
# image_location[1] contains the list of all locations (including
@ -1885,7 +1887,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
{'old_size': old_size, 'new_size': new_size})
def manage_existing(self,
volume: Volume, existing_ref: Dict[str, str]) -> None:
volume: Volume, existing_ref: dict[str, str]) -> None:
"""Manages an existing image.
Renames the image name to match the expected name for the volume.
@ -1906,7 +1908,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def manage_existing_get_size(self,
volume: Volume,
existing_ref: Dict[str, str]) -> int:
existing_ref: dict[str, str]) -> int:
"""Return size of an existing image for manage_existing.
:param volume:
@ -1963,12 +1965,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
return json.loads(out)
def get_manageable_volumes(self,
cinder_volumes: List[Dict[str, str]],
cinder_volumes: list[dict[str, str]],
marker: Optional[Any],
limit: int,
offset: int,
sort_keys: List[str],
sort_dirs: List[str]) -> List[Dict[str, Any]]:
sort_keys: list[str],
sort_dirs: list[str]) -> list[dict[str, Any]]:
manageable_volumes = []
cinder_ids = [resource['id'] for resource in cinder_volumes]
@ -2016,11 +2018,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
pass
def update_migrated_volume(self,
ctxt: Dict,
ctxt: dict,
volume: Volume,
new_volume: Volume,
original_volume_status: str) -> \
Union[Dict[str, None], Dict[str, Optional[str]]]:
Union[dict[str, None], dict[str, Optional[str]]]:
"""Return model update from RBD for migrated volume.
This method should rename the back-end volume name(id) on the
@ -2064,7 +2066,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def migrate_volume(self,
context: context.RequestContext,
volume: Volume,
host: Dict[str, Dict[str, str]]) -> Tuple[bool, None]:
host: dict[str, dict[str, str]]) -> tuple[bool, None]:
refuse_to_migrate = (False, None)
@ -2146,7 +2148,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def manage_existing_snapshot_get_size(self,
snapshot: Snapshot,
existing_ref: Dict[str, Any]) -> int:
existing_ref: dict[str, Any]) -> int:
"""Return size of an existing image for manage_existing.
:param snapshot:
@ -2197,7 +2199,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def manage_existing_snapshot(self,
snapshot: Snapshot,
existing_ref: Dict[str, Any]) -> None:
existing_ref: dict[str, Any]) -> None:
"""Manages an existing snapshot.
Renames the snapshot name to match the expected name for the snapshot.
@ -2220,12 +2222,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
volume.protect_snap(snapshot.name)
def get_manageable_snapshots(self,
cinder_snapshots: List[Dict[str, str]],
cinder_snapshots: list[dict[str, str]],
marker: Optional[Any],
limit: int,
offset: int,
sort_keys: List[str],
sort_dirs: List[str]) -> List[Dict[str, Any]]:
sort_keys: list[str],
sort_dirs: list[str]) -> list[dict[str, Any]]:
"""List manageable snapshots on RBD backend."""
manageable_snapshots = []
cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots]
@ -2286,7 +2288,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def get_backup_device(self,
context: context.RequestContext,
backup: Backup) -> Tuple[Volume, bool]:
backup: Backup) -> tuple[Volume, bool]:
"""Get a backup device from an existing volume.
To support incremental backups on Ceph to Ceph we don't clone

@ -10,7 +10,10 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any, Dict, List, Optional, Tuple, Type, Union # noqa: H301
from __future__ import annotations # Remove when only supporting python 3.9+
from typing import Any, Optional, Type, Union # noqa: H301
from oslo_config import cfg
from oslo_log import log as logging
@ -82,10 +85,10 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
@staticmethod
def _extract_resource(resource: Optional[dict],
allowed_vals: Tuple[Tuple[str, ...]],
allowed_vals: tuple[tuple[str, ...]],
exc: Type[exception.CinderException],
resource_name: str,
props: Tuple[str] = ('status',)) -> Optional[str]:
props: tuple[str] = ('status',)) -> Optional[str]:
"""Extracts the resource id from the provided resource.
This method validates the input resource dict and checks that the
@ -233,7 +236,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
def _get_image_metadata(self,
context: context.RequestContext,
image_id: Optional[str],
size: int) -> Optional[Dict[str, Any]]:
size: int) -> Optional[dict[str, Any]]:
"""Checks image existence and validates the image metadata.
Returns: image metadata or None
@ -257,7 +260,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
snapshot,
source_volume,
group: Optional[dict],
volume_type: Optional[Dict[str, Any]] = None) -> Tuple[List[str],
volume_type: Optional[dict[str, Any]] = None) -> tuple[list[str],
bool]:
"""Extracts and returns a validated availability zone list.
@ -358,7 +361,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
volume_type_id: str,
snapshot: Optional[objects.Snapshot],
source_volume: Optional[objects.Volume],
image_metadata: Optional[Dict[str, Any]]) -> Optional[str]:
image_metadata: Optional[dict[str, Any]]) -> Optional[str]:
if volume_types.is_encrypted(context, volume_type_id):
encryption_key_id = None
@ -442,7 +445,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
group,
group_snapshot,
backup: Optional[dict],
multiattach: bool = False) -> Dict[str, Any]:
multiattach: bool = False) -> dict[str, Any]:
utils.check_exclusive_options(snapshot=snapshot,
imageRef=image_id,
@ -502,7 +505,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
if multiattach:
context.authorize(policy.MULTIATTACH_POLICY)
specs: Optional[Dict] = {}
specs: Optional[dict] = {}
if volume_type_id:
qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
if qos_specs['qos_specs']:
@ -560,7 +563,7 @@ class EntryCreateTask(flow_utils.CinderTask):
def execute(self,
context: context.RequestContext,
optional_args: dict,
**kwargs) -> Dict[str, Any]:
**kwargs) -> dict[str, Any]:
"""Creates a database entry for the given inputs and returns details.
Accesses the database and creates a new entry for the to be created
@ -809,8 +812,8 @@ class VolumeCastTask(flow_utils.CinderTask):
def _cast_create_volume(self,
context: context.RequestContext,
request_spec: Dict[str, Any],
filter_properties: Dict) -> None:
request_spec: dict[str, Any],
filter_properties: dict) -> None:
source_volid = request_spec['source_volid']
volume = request_spec['volume']
snapshot_id = request_spec['snapshot_id']

@ -10,10 +10,12 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import binascii
import traceback
import typing
from typing import Any, Dict, List, Optional, Tuple # noqa: H301
from typing import Any, Optional # noqa: H301
from castellan import key_manager
import os_brick.initiator.connectors
@ -676,7 +678,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
volume_metadata)
@staticmethod
def _extract_cinder_ids(urls: List[str]) -> List[str]:
def _extract_cinder_ids(urls: list[str]) -> list[str]:
"""Process a list of location URIs from glance
:param urls: list of glance location URIs
@ -707,7 +709,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location,
image_meta: Dict[str, Any]) -> Tuple[None, bool]:
image_meta: dict[str, Any]) -> tuple[None, bool]:
"""Create a volume efficiently from an existing image.
Returns a dict of volume properties eg. provider_location,
@ -768,7 +770,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
context: cinder_context.RequestContext,
volume: objects.Volume,
image_location,
image_meta: Dict[str, Any],
image_meta: dict[str, Any],
image_service) -> dict:
# TODO(harlowja): what needs to be rolled back in the clone if this
# volume create fails?? Likely this should be a subflow or broken
@ -804,7 +806,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
internal_context: cinder_context.RequestContext,
volume: objects.Volume,
image_id: str,
image_meta: Dict[str, Any]) -> Tuple[None, bool]:
image_meta: dict[str, Any]) -> tuple[None, bool]:
"""Attempt to create the volume using the image cache.
Best case this will simply clone the existing volume in the cache.
@ -853,8 +855,8 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
volume: objects.Volume,
image_location: str,
image_id: str,
image_meta: Dict[str, Any],
image_service) -> Tuple[Optional[dict],
image_meta: dict[str, Any],
image_service) -> tuple[Optional[dict],
bool]:
assert self.image_volume_cache is not None
internal_context = cinder_context.get_internal_tenant_context()
@ -895,7 +897,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
volume: objects.Volume,
image_location,
image_id: str,
image_meta: Dict[str, Any],
image_meta: dict[str, Any],
image_service,
update_cache: bool = False) -> Optional[dict]:
# NOTE(e0ne): check for free space in image_conversion_dir before
@ -1045,7 +1047,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
volume: objects.Volume,
image_location,
image_id: str,
image_meta: Dict[str, Any],
image_meta: dict[str, Any],
image_service,
**kwargs: Any) -> Optional[dict]:
LOG.debug("Cloning %(volume_id)s from image %(image_id)s "
@ -1120,7 +1122,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
context: cinder_context.RequestContext,
volume: objects.Volume,
backup_id: str,
**kwargs) -> Tuple[Dict, bool]:
**kwargs) -> tuple[dict, bool]:
LOG.info("Creating volume %(volume_id)s from backup %(backup_id)s.",
{'volume_id': volume.id,
'backup_id': backup_id})

@ -35,10 +35,12 @@ intact.
"""
from __future__ import annotations # Remove when only supporting Python 3.9+
import functools
import time
import typing
from typing import Any, Dict, List, Optional, Set, Tuple, Union # noqa: H301
from typing import Any, Optional, Union # noqa: H301
from castellan import key_manager
from oslo_config import cfg
@ -539,7 +541,7 @@ class VolumeManager(manager.CleanableManager,
num_vols: int = 0
num_snaps: int = 0
max_objs_num: int = 0
req_range: Union[List[int], range] = [0]
req_range: Union[list[int], range] = [0]
req_limit = CONF.init_host_max_objects_retrieval or 0
use_batch_objects_retrieval: bool = req_limit > 0
@ -1601,7 +1603,7 @@ class VolumeManager(manager.CleanableManager,
reservations = QUOTAS.reserve(ctx, **reserve_opts)
# NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid
# creating tmp img vol from wrong snapshot or wrong source vol.
skip: Set[str] = {'snapshot_id', 'source_volid'}
skip: set[str] = {'snapshot_id', 'source_volid'}
skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES)
try:
new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
@ -3199,7 +3201,7 @@ class VolumeManager(manager.CleanableManager,
return vol_ref
def _get_cluster_or_host_filters(self) -> Dict[str, Any]:
def _get_cluster_or_host_filters(self) -> dict[str, Any]:
if self.cluster:
filters = {'cluster_name': self.cluster}
else:
@ -3500,12 +3502,12 @@ class VolumeManager(manager.CleanableManager,
self,
context: context.RequestContext,
group: objects.Group,
volumes: List[objects.Volume],
volumes: list[objects.Volume],
group_snapshot: Optional[objects.GroupSnapshot] = None,
snapshots: Optional[List[objects.Snapshot]] = None,
snapshots: Optional[list[objects.Snapshot]] = None,
source_group: Optional[objects.Group] = None,
source_vols: Optional[List[objects.Volume]] = None) \
-> Tuple[Dict[str, str], List[Dict[str, str]]]:
source_vols: Optional[list[objects.Volume]] = None) \
-> tuple[dict[str, str], list[dict[str, str]]]:
"""Creates a group from source.
:param context: the context of the caller.
@ -3518,7 +3520,7 @@ class VolumeManager(manager.CleanableManager,
:returns: model_update, volumes_model_update
"""
model_update = {'status': 'available'}
volumes_model_update: List[dict] = []
volumes_model_update: list[dict] = []
for vol in volumes:
if snapshots:
for snapshot in snapshots:
@ -3819,7 +3821,7 @@ class VolumeManager(manager.CleanableManager,
def _convert_group_to_cg(
self,
group: objects.Group,
volumes: objects.VolumeList) -> Tuple[objects.Group,
volumes: objects.VolumeList) -> tuple[objects.Group,
objects.VolumeList]:
if not group:
return None, None
@ -3832,7 +3834,7 @@ class VolumeManager(manager.CleanableManager,
return cg, volumes
def _remove_consistencygroup_id_from_volumes(
self, volumes: Optional[List[objects.Volume]]) -> None:
self, volumes: Optional[list[objects.Volume]]) -> None:
if not volumes:
return
for vol in volumes:
@ -3843,7 +3845,7 @@ class VolumeManager(manager.CleanableManager,
self,
group_snapshot: objects.GroupSnapshot,
snapshots: objects.SnapshotList,
ctxt) -> Tuple[objects.CGSnapshot,
ctxt) -> tuple[objects.CGSnapshot,
objects.SnapshotList]:
if not group_snapshot:
return None, None
@ -3881,7 +3883,7 @@ class VolumeManager(manager.CleanableManager,
def _delete_group_generic(self,
context: context.RequestContext,
group: objects.Group,
volumes) -> Tuple:
volumes) -> tuple:
"""Deletes a group and volumes in the group."""
model_update = {'status': group.status}
volume_model_updates = []
@ -3903,7 +3905,7 @@ class VolumeManager(manager.CleanableManager,
def _update_group_generic(
self, context: context.RequestContext, group,
add_volumes=None,
remove_volumes=None) -> Tuple[None, None, None]:
remove_volumes=None) -> tuple[None, None, None]:
"""Updates a group."""
# NOTE(xyang): The volume manager adds/removes the volume to/from the
# group in the database. This default implementation does not do
@ -3916,7 +3918,7 @@ class VolumeManager(manager.CleanableManager,
group,
volumes: Optional[str],
add: bool = True) -> list:
valid_status: Tuple[str, ...]
valid_status: tuple[str, ...]
if add:
valid_status = VALID_ADD_VOL_TO_GROUP_STATUS
else:
@ -4182,7 +4184,7 @@ class VolumeManager(manager.CleanableManager,
self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
snapshots: list) -> Tuple[dict, List[dict]]:
snapshots: list) -> tuple[dict, list[dict]]:
"""Creates a group_snapshot."""
model_update = {'status': 'available'}
snapshot_model_updates = []
@ -4208,7 +4210,7 @@ class VolumeManager(manager.CleanableManager,
self,
context: context.RequestContext,
group_snapshot: objects.GroupSnapshot,
snapshots: list) -> Tuple[dict, List[dict]]:
snapshots: list) -> tuple[dict, list[dict]]:
"""Deletes a group_snapshot."""
model_update = {'status': group_snapshot.status}
snapshot_model_updates = []
@ -4772,7 +4774,7 @@ class VolumeManager(manager.CleanableManager,
ctxt: context.RequestContext,
volume: objects.Volume,
attachment: objects.VolumeAttachment,
connector: dict) -> Dict[str, Any]:
connector: dict) -> dict[str, Any]:
try:
self.driver.validate_connector(connector)
except exception.InvalidConnectorException as err:
@ -4830,7 +4832,7 @@ class VolumeManager(manager.CleanableManager,
context: context.RequestContext,
vref: objects.Volume,
connector: dict,
attachment_id: str) -> Dict[str, Any]:
attachment_id: str) -> dict[str, Any]:
"""Update/Finalize an attachment.
This call updates a valid attachment record to associate with a volume
@ -5236,7 +5238,7 @@ class VolumeManager(manager.CleanableManager,
def list_replication_targets(self,
ctxt: context.RequestContext,
group: objects.Group) -> Dict[str, list]:
group: objects.Group) -> dict[str, list]:
"""Provide a means to obtain replication targets for a group.
This method is used to find the replication_device config

@ -12,7 +12,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Optional, Tuple, Union # noqa: H301
from __future__ import annotations
from typing import Optional, Union # noqa: H301
from cinder.common import constants
from cinder import context
@ -147,7 +149,7 @@ class VolumeAPI(rpc.RPCAPI):
def _get_cctxt(self,
host: Optional[str] = None,
version: Optional[Union[str, Tuple[str, ...]]] = None,
version: Optional[Union[str, tuple[str, ...]]] = None,
**kwargs) -> rpc.RPCAPI:
if host:
server = volume_utils.extract_host(host)

@ -19,7 +19,9 @@
"""Built-in volume type properties."""
import typing as ty
from __future__ import annotations
from typing import Optional
from oslo_config import cfg
from oslo_db import exception as db_exc
@ -330,7 +332,7 @@ def get_volume_type_qos_specs(volume_type_id):
def volume_types_diff(context: context.RequestContext,
vol_type_id1,
vol_type_id2) -> ty.Tuple[dict, bool]:
vol_type_id2) -> tuple[dict, bool]:
"""Returns a 'diff' of two volume types and whether they are equal.
Returns a tuple of (diff, equal), where 'equal' is a boolean indicating
@ -370,8 +372,8 @@ def volume_types_diff(context: context.RequestContext,
encryption.pop(param, None)
return encryption
def _dict_diff(dict1: ty.Optional[dict],
dict2: ty.Optional[dict]) -> ty.Tuple[dict, bool]:
def _dict_diff(dict1: Optional[dict],
dict2: Optional[dict]) -> tuple[dict, bool]:
res = {}
equal = True
if dict1 is None:

@ -14,6 +14,8 @@
"""Volume-related Utilities and helpers."""
from __future__ import annotations # Remove when only supporting python 3.9+
import abc
import ast
import functools
@ -31,8 +33,8 @@ import tempfile
import time
import types
import typing
from typing import Any, BinaryIO, Callable, Dict, IO # noqa: H301
from typing import List, Optional, Tuple, Union # noqa: H301
from typing import Any, BinaryIO, Callable, IO # noqa: H301
from typing import Optional, Union # noqa: H301
import uuid
from castellan.common.credentials import keystone_password
@ -242,8 +244,8 @@ def notify_about_snapshot_usage(context: context.RequestContext,
usage_info)
def _usage_from_capacity(capacity: Dict[str, Any],
**extra_usage_info) -> Dict[str, Any]:
def _usage_from_capacity(capacity: dict[str, Any],
**extra_usage_info) -> dict[str, Any]:
capacity_info = {
'name_to_id': capacity['name_to_id'],
@ -686,7 +688,7 @@ def get_all_volume_groups(vg_name=None) -> list:
def extract_availability_zones_from_volume_type(
volume_type: Union['objects.VolumeType', dict]) \
-> Optional[List[str]]:
-> Optional[list[str]]:
if not volume_type:
return None
extra_specs = volume_type.get('extra_specs', {})
@ -705,7 +707,7 @@ DEFAULT_PASSWORD_SYMBOLS = ('23456789', # Removed: 0,1
def generate_password(
length: int = 16,
symbolgroups: Tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
symbolgroups: tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
"""Generate a random password from the supplied symbol groups.
At least one symbol from each group will be included. Unpredictable
@ -744,7 +746,7 @@ def generate_password(
def generate_username(
length: int = 20,
symbolgroups: Tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
symbolgroups: tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
# Use the same implementation as the password generation.
return generate_password(length, symbolgroups)
@ -864,12 +866,12 @@ def extract_id_from_snapshot_name(snap_name: str) -> Optional[str]:
return match.group('uuid') if match else None
def paginate_entries_list(entries: List[Dict],
def paginate_entries_list(entries: list[dict],
marker: Optional[Union[dict, str]],
limit: int,
offset: Optional[int],
sort_keys: List[str],
sort_dirs: List[str]) -> list:
sort_keys: list[str],
sort_dirs: list[str]) -> list:
"""Paginate a list of entries.
:param entries: list of dictionaries
@ -1096,7 +1098,7 @@ def get_max_over_subscription_ratio(
return mosr
def check_image_metadata(image_meta: Dict[str, Union[str, int]],
def check_image_metadata(image_meta: dict[str, Union[str, int]],
vol_size: int) -> None:
"""Validates the image metadata."""
# Check whether image is active
@ -1136,7 +1138,7 @@ def enable_bootable_flag(volume: 'objects.Volume') -> None:
def get_volume_image_metadata(image_id: str,
image_meta: Dict[str, Any]) -> dict:
image_meta: dict[str, Any]) -> dict:
# Save some base attributes into the volume metadata
base_metadata = {
@ -1172,7 +1174,7 @@ def copy_image_to_volume(driver,
context: context.RequestContext,
volume: 'objects.Volume',
image_meta: dict,
image_location: Union[str, Tuple[Optional[str], Any]],
image_location: Union[str, tuple[Optional[str], Any]],
image_service) -> None:
"""Downloads Glance image to the specified volume."""
image_id = image_meta['id']