From 5179e4f6bf49795d2aa4c8d0807a502ee1561f60 Mon Sep 17 00:00:00 2001
From: Eric Harney <eharney@redhat.com>
Date: Fri, 29 Apr 2022 10:14:32 -0400
Subject: [PATCH] Use modern type annotation format for collections

This works in Python 3.7 or greater and is
cleaner looking.

See PEP-585 for more info.
https://peps.python.org/pep-0585/

Change-Id: I4c9da881cea1a3638da504c4b79ca8db13851b06
---
 cinder/backup/api.py                         |  10 +-
 cinder/context.py                            |   6 +-
 cinder/flow_utils.py                         |  10 +-
 cinder/image/glance.py                       |  27 +++--
 cinder/image/image_utils.py                  |  26 ++--
 cinder/rpc.py                                |   6 +-
 cinder/scheduler/base_weight.py              |  14 ++-
 cinder/scheduler/evaluator/evaluator.py      |   6 +-
 cinder/scheduler/filter_scheduler.py         |  14 ++-
 cinder/scheduler/flows/create_volume.py      |   8 +-
 cinder/scheduler/host_manager.py             |  23 ++--
 cinder/utils.py                              |  22 ++--
 cinder/volume/api.py                         |  51 ++++----
 cinder/volume/drivers/rbd.py                 | 120 ++++++++++---------
 cinder/volume/flows/api/create_volume.py     |  25 ++--
 cinder/volume/flows/manager/create_volume.py |  22 ++--
 cinder/volume/manager.py                     |  42 +++----
 cinder/volume/rpcapi.py                      |   6 +-
 cinder/volume/volume_types.py                |  10 +-
 cinder/volume/volume_utils.py                |  28 +++--
 20 files changed, 255 insertions(+), 221 deletions(-)

diff --git a/cinder/backup/api.py b/cinder/backup/api.py
index 52df30c60ca..7325fb60880 100644
--- a/cinder/backup/api.py
+++ b/cinder/backup/api.py
@@ -17,9 +17,11 @@
 
 """Handles all requests relating to the volume backups service."""
 
+from __future__ import annotations
+
 from datetime import datetime
 import random
-from typing import List, Optional  # noqa: H301
+from typing import Optional
 
 from eventlet import greenthread
 from oslo_config import cfg
@@ -123,8 +125,8 @@ class API(base.Base):
                 marker: Optional[str] = None,
                 limit: Optional[int] = None,
                 offset: Optional[int] = None,
-                sort_keys: Optional[List[str]] = None,
-                sort_dirs: Optional[List[str]] = None) -> 'objects.BackupList':
+                sort_keys: Optional[list[str]] = None,
+                sort_dirs: Optional[list[str]] = None) -> 'objects.BackupList':
         context.authorize(policy.GET_ALL_POLICY)
 
         search_opts = search_opts or {}
@@ -200,7 +202,7 @@ class API(base.Base):
             raise exception.ServiceNotFound(service_id='cinder-backup')
         return backup_host
 
-    def _list_backup_services(self) -> List['objects.Service']:
+    def _list_backup_services(self) -> list['objects.Service']:
         """List all enabled backup services.
 
         :returns: list -- hosts for services that are enabled for backup.
diff --git a/cinder/context.py b/cinder/context.py
index 594c8606831..6cb7dda0acd 100644
--- a/cinder/context.py
+++ b/cinder/context.py
@@ -17,8 +17,10 @@
 
 """RequestContext: context for requests that persist through all of cinder."""
 
+from __future__ import annotations
+
 import copy
-from typing import Any, Dict, Optional  # noqa: H301
+from typing import Any, Optional  # noqa: H301
 
 from keystoneauth1.access import service_catalog as ksa_service_catalog
 from keystoneauth1 import plugin
@@ -162,7 +164,7 @@ class RequestContext(context.RequestContext):
     read_deleted = property(_get_read_deleted, _set_read_deleted,
                             _del_read_deleted)
 
-    def to_dict(self) -> Dict[str, Any]:
+    def to_dict(self) -> dict[str, Any]:
         result = super(RequestContext, self).to_dict()
         result['user_id'] = self.user_id
         result['project_id'] = self.project_id
diff --git a/cinder/flow_utils.py b/cinder/flow_utils.py
index 576f8c5d379..17ea75bd389 100644
--- a/cinder/flow_utils.py
+++ b/cinder/flow_utils.py
@@ -10,8 +10,10 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from __future__ import annotations
+
 import os
-from typing import Any, List  # noqa: H301
+from typing import Any
 
 from oslo_log import log as logging
 # For more information please visit: https://wiki.openstack.org/wiki/TaskFlow
@@ -25,7 +27,7 @@ from cinder import exception
 LOG = logging.getLogger(__name__)
 
 
-def _make_task_name(cls, addons: List[str] = None) -> str:
+def _make_task_name(cls, addons: list[str] = None) -> str:
     """Makes a pretty name for a task class."""
     base_name = ".".join([cls.__module__, cls.__name__])
     extra = ''
@@ -41,11 +43,11 @@ class CinderTask(task.Task):
     implement the given task as the task name.
     """
 
-    def __init__(self, addons: List[str] = None, **kwargs: Any) -> None:
+    def __init__(self, addons: list[str] = None, **kwargs: Any) -> None:
         super(CinderTask, self).__init__(self.make_name(addons), **kwargs)
 
     @classmethod
-    def make_name(cls, addons: List[str] = None) -> str:
+    def make_name(cls, addons: list[str] = None) -> str:
         return _make_task_name(cls, addons)
 
 
diff --git a/cinder/image/glance.py b/cinder/image/glance.py
index e762ed11dcb..cf32516f0ca 100644
--- a/cinder/image/glance.py
+++ b/cinder/image/glance.py
@@ -16,6 +16,8 @@
 
 """Implementation of an image service that uses Glance as the backend"""
 
+from __future__ import annotations  # Remove when only supporting python 3.9+
+
 import copy
 import itertools
 import random
@@ -23,8 +25,7 @@ import shutil
 import sys
 import textwrap
 import time
-from typing import (Any, Callable, Dict, Iterable, List,  # noqa: H301
-                    NoReturn, Optional, Tuple)            # noqa: H301
+from typing import (Any, Callable, Iterable, NoReturn, Optional)  # noqa: H301
 import urllib
 import urllib.parse
 
@@ -93,7 +94,7 @@ _SESSION = None
 LOG = logging.getLogger(__name__)
 
 
-def _parse_image_ref(image_href: str) -> Tuple[str, str, bool]:
+def _parse_image_ref(image_href: str) -> tuple[str, str, bool]:
     """Parse an image href into composite parts.
 
     :param image_href: href of an image
@@ -283,7 +284,7 @@ class GlanceImageService(object):
 
     def detail(self,
                context: context.RequestContext,
-               **kwargs: str) -> List[dict]:
+               **kwargs: str) -> list[dict]:
         """Calls out to Glance for a list of detailed image information."""
         params = self._extract_query_params(kwargs)
         try:
@@ -298,7 +299,7 @@ class GlanceImageService(object):
 
         return _images
 
-    def _extract_query_params(self, params: dict) -> Dict[str, Any]:
+    def _extract_query_params(self, params: dict) -> dict[str, Any]:
         _params = {}
         accepted_params = ('filters', 'marker', 'limit',
                            'sort_key', 'sort_dir')
@@ -310,7 +311,7 @@ class GlanceImageService(object):
 
     def list_members(self,
                      context: context.RequestContext,
-                     image_id: str) -> List[dict]:
+                     image_id: str) -> list[dict]:
         """Returns a list of dicts with image member data."""
         try:
             return self._client.call(context,
@@ -330,7 +331,7 @@ class GlanceImageService(object):
 
     def show(self,
              context: context.RequestContext,
-             image_id: str) -> Dict[str, Any]:
+             image_id: str) -> dict[str, Any]:
         """Returns a dict with image data for the given opaque image id."""
         try:
             image = self._client.call(context, 'get', image_id)
@@ -345,7 +346,7 @@ class GlanceImageService(object):
 
     def get_location(self,
                      context: context.RequestContext,
-                     image_id: str) -> Tuple[Optional[str], Any]:
+                     image_id: str) -> tuple[Optional[str], Any]:
         """Get backend storage location url.
 
         Returns a tuple containing the direct url and locations representing
@@ -421,8 +422,8 @@ class GlanceImageService(object):
 
     def create(self,
                context: context.RequestContext,
-               image_meta: Dict[str, Any],
-               data=None) -> Dict[str, Any]:
+               image_meta: dict[str, Any],
+               data=None) -> dict[str, Any]:
         """Store the image data and return the new image object."""
         sent_service_image_meta = self._translate_to_glance(image_meta)
 
@@ -497,7 +498,7 @@ class GlanceImageService(object):
 
     def _translate_from_glance(self,
                                context: context.RequestContext,
-                               image: Dict[str, Any]) -> dict:
+                               image: dict[str, Any]) -> dict:
         """Get image metadata from glance image.
 
         Extract metadata from image and convert it's properties
@@ -536,7 +537,7 @@ class GlanceImageService(object):
         return image_meta
 
     @staticmethod
-    def _translate_to_glance(image_meta: Dict[str, Any]) -> Dict[str, Any]:
+    def _translate_to_glance(image_meta: dict[str, Any]) -> dict[str, Any]:
         image_meta = _convert_to_string(image_meta)
         image_meta = _remove_read_only(image_meta)
 
@@ -684,7 +685,7 @@ def _translate_plain_exception(exc_value: BaseException) -> BaseException:
 
 
 def get_remote_image_service(context: context.RequestContext,
-                             image_href) -> Tuple[GlanceImageService, str]:
+                             image_href) -> tuple[GlanceImageService, str]:
     """Create an image_service and parse the id from the given image_href.
 
     The image_href param can be an href of the form
diff --git a/cinder/image/image_utils.py b/cinder/image/image_utils.py
index 6a2ea0704e2..b2ae795b02e 100644
--- a/cinder/image/image_utils.py
+++ b/cinder/image/image_utils.py
@@ -23,6 +23,7 @@ Some slight modifications, but at some point
 we should look at maybe pushing this up to Oslo
 """
 
+from __future__ import annotations  # Remove when only supporting python 3.9+
 
 import contextlib
 import errno
@@ -31,8 +32,7 @@ import math
 import os
 import re
 import tempfile
-from typing import (ContextManager, Dict, Generator,  # noqa: H301
-                    List, Optional, Tuple)
+from typing import ContextManager, Generator, Optional  # noqa: H301
 
 import cryptography
 from cursive import exception as cursive_exception
@@ -152,7 +152,7 @@ def qemu_img_info(path: str,
     return info
 
 
-def get_qemu_img_version() -> Optional[List[int]]:
+def get_qemu_img_version() -> Optional[list[int]]:
     """The qemu-img version will be cached until the process is restarted."""
 
     global QEMU_IMG_VERSION
@@ -187,8 +187,7 @@ def _get_qemu_convert_luks_cmd(src: str,
                                cipher_spec: Optional[dict] = None,
                                passphrase_file: Optional[str] = None,
                                src_passphrase_file: Optional[str] = None) \
-        -> List[str]:
-
+        -> list[str]:
     cmd = ['qemu-img', 'convert']
 
     if prefix:
@@ -223,8 +222,7 @@ def _get_qemu_convert_cmd(src: str,
                           passphrase_file: Optional[str] = None,
                           compress: bool = False,
                           src_passphrase_file: Optional[str] = None) \
-        -> List[str]:
-
+        -> list[str]:
     if src_passphrase_file is not None:
         if passphrase_file is None:
             message = _("Can't create unencrypted volume %(format)s "
@@ -290,7 +288,7 @@ def _get_qemu_convert_cmd(src: str,
     return cmd
 
 
-def _get_version_from_string(version_string: str) -> List[int]:
+def _get_version_from_string(version_string: str) -> list[int]:
     return [int(x) for x in version_string.split('.')]
 
 
@@ -451,7 +449,7 @@ def resize_image(source: str,
                  run_as_root: bool = False,
                  file_format: Optional[str] = None) -> None:
     """Changes the virtual size of the image."""
-    cmd: Tuple[str, ...]
+    cmd: tuple[str, ...]
     if file_format:
         cmd = ('qemu-img', 'resize', '-f', file_format, source, '%sG' % size)
     else:
@@ -922,7 +920,7 @@ def extract_targz(archive_name: str, target: str) -> None:
     utils.execute('tar', '-xzf', archive_name, '-C', target)
 
 
-def fix_vhd_chain(vhd_chain: List[str]) -> None:
+def fix_vhd_chain(vhd_chain: list[str]) -> None:
     for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
         set_vhd_parent(child, parent)
 
@@ -991,7 +989,7 @@ def temporary_dir() -> ContextManager[str]:
     return utils.tempdir(dir=CONF.image_conversion_dir)
 
 
-def coalesce_chain(vhd_chain: List[str]) -> str:
+def coalesce_chain(vhd_chain: list[str]) -> str:
     for child, parent in zip(vhd_chain[:-1], vhd_chain[1:]):
         with temporary_dir() as directory_for_journal:
             size = get_vhd_size(child)
@@ -1003,7 +1001,7 @@ def coalesce_chain(vhd_chain: List[str]) -> str:
     return vhd_chain[-1]
 
 
-def discover_vhd_chain(directory: str) -> List[str]:
+def discover_vhd_chain(directory: str) -> list[str]:
     counter = 0
     chain = []
 
@@ -1028,7 +1026,7 @@ def replace_xenserver_image_with_coalesced_vhd(image_file: str) -> None:
         os.rename(coalesced, image_file)
 
 
-def decode_cipher(cipher_spec: str, key_size: int) -> Dict[str, str]:
+def decode_cipher(cipher_spec: str, key_size: int) -> dict[str, str]:
     """Decode a dm-crypt style cipher specification string
 
        The assumed format being cipher-chainmode-ivmode, similar to that
@@ -1060,7 +1058,7 @@ class TemporaryImages(object):
     """
 
     def __init__(self, image_service: glance.GlanceImageService):
-        self.temporary_images: Dict[str, dict] = {}
+        self.temporary_images: dict[str, dict] = {}
         self.image_service = image_service
         image_service.temp_images = self
 
diff --git a/cinder/rpc.py b/cinder/rpc.py
index f499f9d5089..d5db4675eb8 100644
--- a/cinder/rpc.py
+++ b/cinder/rpc.py
@@ -12,6 +12,8 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from __future__ import annotations
+
 __all__ = [
     'init',
     'cleanup',
@@ -26,7 +28,7 @@ __all__ = [
 ]
 
 import functools
-from typing import Tuple, Union  # noqa: H301
+from typing import Union
 
 from oslo_config import cfg
 from oslo_log import log as logging
@@ -230,7 +232,7 @@ class RPCAPI(object):
         return versions[-1]
 
     def _get_cctxt(self,
-                   version: Union[str, Tuple[str, ...]] = None,
+                   version: Union[str, tuple[str, ...]] = None,
                    **kwargs):
         """Prepare client context
 
diff --git a/cinder/scheduler/base_weight.py b/cinder/scheduler/base_weight.py
index 519cff0e089..7ae44c2eec5 100644
--- a/cinder/scheduler/base_weight.py
+++ b/cinder/scheduler/base_weight.py
@@ -17,8 +17,10 @@
 Pluggable Weighing support
 """
 
+from __future__ import annotations
+
 import abc
-from typing import Iterable, List, Optional  # noqa: H301
+from typing import Iterable, Optional  # noqa: H301
 
 from oslo_log import log as logging
 
@@ -28,7 +30,7 @@ from cinder.scheduler import base_handler
 LOG = logging.getLogger(__name__)
 
 
-def normalize(weight_list: List[float],
+def normalize(weight_list: list[float],
               minval: Optional[float] = None,
               maxval: Optional[float] = None) -> Iterable[float]:
     """Normalize the values in a list between 0 and 1.0.
@@ -95,8 +97,8 @@ class BaseWeigher(object, metaclass=abc.ABCMeta):
         """Override in a subclass to specify a weight for a specific object."""
 
     def weigh_objects(self,
-                      weighed_obj_list: List[WeighedObject],
-                      weight_properties: dict) -> List[float]:
+                      weighed_obj_list: list[WeighedObject],
+                      weight_properties: dict) -> list[float]:
         """Weigh multiple objects.
 
         Override in a subclass if you need access to all objects in order
@@ -130,8 +132,8 @@ class BaseWeightHandler(base_handler.BaseHandler):
 
     def get_weighed_objects(self,
                             weigher_classes: list,
-                            obj_list: List[WeighedObject],
-                            weighing_properties: dict) -> List[WeighedObject]:
+                            obj_list: list[WeighedObject],
+                            weighing_properties: dict) -> list[WeighedObject]:
         """Return a sorted (descending), normalized list of WeighedObjects."""
 
         if not obj_list:
diff --git a/cinder/scheduler/evaluator/evaluator.py b/cinder/scheduler/evaluator/evaluator.py
index 39c8cad0aa2..47db14e1715 100644
--- a/cinder/scheduler/evaluator/evaluator.py
+++ b/cinder/scheduler/evaluator/evaluator.py
@@ -13,9 +13,11 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from __future__ import annotations  # Remove when only supporting python 3.9+
+
 import operator
 import re
-from typing import Callable, Dict  # noqa: H301
+from typing import Callable
 
 import pyparsing
 
@@ -168,7 +170,7 @@ class EvalTernaryOp(object):
 
 
 class EvalFunction(object):
-    functions: Dict[str, Callable] = {
+    functions: dict[str, Callable] = {
         "abs": abs,
         "max": max,
         "min": min,
diff --git a/cinder/scheduler/filter_scheduler.py b/cinder/scheduler/filter_scheduler.py
index 57acac874e4..32be969f8d8 100644
--- a/cinder/scheduler/filter_scheduler.py
+++ b/cinder/scheduler/filter_scheduler.py
@@ -20,7 +20,9 @@ You can customize this scheduler by specifying your own volume Filters and
 Weighing Functions.
 """
 
-from typing import List, Optional  # noqa: H301
+from __future__ import annotations
+
+from typing import Optional
 
 from oslo_config import cfg
 from oslo_log import log as logging
@@ -375,9 +377,9 @@ class FilterScheduler(driver.Scheduler):
 
     def _get_weighted_candidates_generic_group(
             self, context: context.RequestContext,
-            group_spec: dict, request_spec_list: List[dict],
+            group_spec: dict, request_spec_list: list[dict],
             group_filter_properties: Optional[dict] = None,
-            filter_properties_list: Optional[List[dict]] = None) -> list:
+            filter_properties_list: Optional[list[dict]] = None) -> list:
         """Finds backends that supports the group.
 
         Returns a list of backends that meet the required specs,
@@ -486,7 +488,7 @@ class FilterScheduler(driver.Scheduler):
     def _get_weighted_candidates_by_group_type(
             self, context: context.RequestContext, group_spec: dict,
             group_filter_properties: Optional[dict] = None) \
-            -> List[WeighedHost]:
+            -> list[WeighedHost]:
         """Finds backends that supports the group type.
 
         Returns a list of backends that meet the required specs,
@@ -598,7 +600,7 @@ class FilterScheduler(driver.Scheduler):
         return self._choose_top_backend_generic_group(weighed_backends)
 
     def _choose_top_backend(self,
-                            weighed_backends: List[WeighedHost],
+                            weighed_backends: list[WeighedHost],
                             request_spec: dict):
         top_backend = weighed_backends[0]
         backend_state = top_backend.obj
@@ -609,7 +611,7 @@ class FilterScheduler(driver.Scheduler):
 
     def _choose_top_backend_generic_group(
             self,
-            weighed_backends: List[WeighedHost]) -> WeighedHost:
+            weighed_backends: list[WeighedHost]) -> WeighedHost:
         top_backend = weighed_backends[0]
         backend_state = top_backend.obj
         LOG.debug("Choosing %s", backend_state.backend_id)
diff --git a/cinder/scheduler/flows/create_volume.py b/cinder/scheduler/flows/create_volume.py
index 54eaa61a57f..a9f967a69b7 100644
--- a/cinder/scheduler/flows/create_volume.py
+++ b/cinder/scheduler/flows/create_volume.py
@@ -10,7 +10,9 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from typing import Any, Dict, Optional  # noqa: H301
+from __future__ import annotations  # Remove when only supporting python 3.9+
+
+from typing import Any, Optional  # noqa: H301
 
 from oslo_log import log as logging
 from oslo_utils import excutils
@@ -49,7 +51,7 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
                                volume: objects.Volume,
                                snapshot_id: Optional[str],
                                image_id: Optional[str],
-                               backup_id: Optional[str]) -> Dict[str, Any]:
+                               backup_id: Optional[str]) -> dict[str, Any]:
         # Create the full request spec using the volume object.
         #
         # NOTE(dulek): At this point, a volume can be deleted before it gets
@@ -77,7 +79,7 @@ class ExtractSchedulerSpecTask(flow_utils.CinderTask):
                 volume: objects.Volume,
                 snapshot_id: Optional[str],
                 image_id: Optional[str],
-                backup_id: Optional[str]) -> Dict[str, Any]:
+                backup_id: Optional[str]) -> dict[str, Any]:
         # For RPC version < 1.2 backward compatibility
         if request_spec is None:
             request_spec = self._populate_request_spec(volume,
diff --git a/cinder/scheduler/host_manager.py b/cinder/scheduler/host_manager.py
index 52979010ffb..021181286eb 100644
--- a/cinder/scheduler/host_manager.py
+++ b/cinder/scheduler/host_manager.py
@@ -15,11 +15,12 @@
 
 """Manage backends in the current zone."""
 
+from __future__ import annotations
+
 from collections import abc
 import random
 import typing
-from typing import (Any, Dict, Iterable, List,  # noqa: H301
-                    Optional, Type, Union)
+from typing import (Any, Iterable, Optional, Type, Union)  # noqa: H301
 
 from oslo_config import cfg
 from oslo_log import log as logging
@@ -163,7 +164,7 @@ class BackendState(object):
         self.service = ReadOnlyDict(service)
 
     def update_from_volume_capability(self,
-                                      capability: Dict[str, Any],
+                                      capability: dict[str, Any],
                                       service=None) -> None:
         """Update information about a host from its volume_node info.
 
@@ -289,7 +290,7 @@ class BackendState(object):
                                                 'host': self.host})
             del self.pools[pool]
 
-    def _append_backend_info(self, pool_cap: Dict[str, Any]) -> None:
+    def _append_backend_info(self, pool_cap: dict[str, Any]) -> None:
         # Fill backend level info to pool if needed.
         if not pool_cap.get('volume_backend_name', None):
             pool_cap['volume_backend_name'] = self.volume_backend_name
@@ -407,7 +408,7 @@ class PoolState(BackendState):
         self.pools: dict = {}
 
     def update_from_volume_capability(self,
-                                      capability: Dict[str, Any],
+                                      capability: dict[str, Any],
                                       service=None) -> None:
         """Update information about a pool from its volume_node info."""
         LOG.debug("Updating capabilities for %s: %s", self.host, capability)
@@ -470,7 +471,7 @@ class HostManager(object):
 
     def __init__(self):
         self.service_states = {}  # { <host|cluster>: {<service>: {cap k : v}}}
-        self.backend_state_map: Dict[str, BackendState] = {}
+        self.backend_state_map: dict[str, BackendState] = {}
         self.backup_service_states = {}
         self.filter_handler = filters.BackendFilterHandler('cinder.scheduler.'
                                                            'filters')
@@ -513,7 +514,7 @@ class HostManager(object):
 
     def _choose_backend_weighers(
             self,
-            weight_cls_names: Optional[List[str]]) -> list:
+            weight_cls_names: Optional[list[str]]) -> list:
         """Return a list of available weigher names.
 
         This function checks input weigher names against a predefined set
@@ -795,7 +796,7 @@ class HostManager(object):
 
     def get_pools(self,
                   context: cinder_context.RequestContext,
-                  filters: Optional[dict] = None) -> List[dict]:
+                  filters: Optional[dict] = None) -> list[dict]:
         """Returns a dict of all pools on all hosts HostManager knows about."""
 
         self._update_backend_state_map(context)
@@ -858,7 +859,7 @@ class HostManager(object):
                    capa_new: dict,
                    updated_pools: Iterable[dict],
                    host: str,
-                   timestamp) -> List[dict]:
+                   timestamp) -> list[dict]:
         pools = capa_new.get('pools')
         usage = []
         if pools and isinstance(pools, list):
@@ -888,7 +889,7 @@ class HostManager(object):
 
     def _get_pool_usage(self,
                         pool: dict,
-                        host: str, timestamp) -> Dict[str, Any]:
+                        host: str, timestamp) -> dict[str, Any]:
         total = pool["total_capacity_gb"]
         free = pool["free_capacity_gb"]
 
@@ -967,7 +968,7 @@ class HostManager(object):
 
     def _notify_capacity_usage(self,
                                context: cinder_context.RequestContext,
-                               usage: List[dict]) -> None:
+                               usage: list[dict]) -> None:
         if usage:
             for u in usage:
                 volume_utils.notify_about_capacity_usage(
diff --git a/cinder/utils.py b/cinder/utils.py
index 4b2182d5ef0..fbc41e27b6c 100644
--- a/cinder/utils.py
+++ b/cinder/utils.py
@@ -25,6 +25,8 @@
    should be placed in volume_utils instead.
 """
 
+from __future__ import annotations  # Remove when only supporting python 3.9+
+
 from collections import OrderedDict
 import contextlib
 import datetime
@@ -42,8 +44,8 @@ import stat
 import sys
 import tempfile
 import typing
-from typing import Callable, Dict, Iterable, Iterator, List  # noqa: H301
-from typing import Optional, Tuple, Type, Union  # noqa: H301
+from typing import Callable, Iterable, Iterator  # noqa: H301
+from typing import Optional, Type, Union  # noqa: H301
 
 import eventlet
 from eventlet import tpool
@@ -165,15 +167,15 @@ def check_exclusive_options(
         raise exception.InvalidInput(reason=msg)
 
 
-def execute(*cmd: str, **kwargs: Union[bool, str]) -> Tuple[str, str]:
+def execute(*cmd: str, **kwargs: Union[bool, str]) -> tuple[str, str]:
     """Convenience wrapper around oslo's execute() method."""
     if 'run_as_root' in kwargs and 'root_helper' not in kwargs:
         kwargs['root_helper'] = get_root_helper()
     return processutils.execute(*cmd, **kwargs)
 
 
-def check_ssh_injection(cmd_list: List[str]) -> None:
-    ssh_injection_pattern: Tuple[str, ...] = ('`', '$', '|', '||', ';', '&',
+def check_ssh_injection(cmd_list: list[str]) -> None:
+    ssh_injection_pattern: tuple[str, ...] = ('`', '$', '|', '||', ';', '&',
                                               '&&', '>', '>>', '<')
 
     # Check whether injection attacks exist
@@ -208,7 +210,7 @@ def check_ssh_injection(cmd_list: List[str]) -> None:
 
 
 def check_metadata_properties(
-        metadata: Optional[Dict[str, str]]) -> None:
+        metadata: Optional[dict[str, str]]) -> None:
     """Checks that the volume metadata properties are valid."""
 
     if not metadata:
@@ -235,7 +237,7 @@ def check_metadata_properties(
 
 
 def last_completed_audit_period(unit: Optional[str] = None) -> \
-        Tuple[Union[datetime.datetime, datetime.timedelta],
+        tuple[Union[datetime.datetime, datetime.timedelta],
               Union[datetime.datetime, datetime.timedelta]]:
     """This method gives you the most recently *completed* audit period.
 
@@ -496,7 +498,7 @@ def get_file_size(path: str) -> int:
 
 def _get_disk_of_partition(
         devpath: str,
-        st: Optional[os.stat_result] = None) -> Tuple[str, os.stat_result]:
+        st: Optional[os.stat_result] = None) -> tuple[str, os.stat_result]:
     """Gets a disk device path and status from partition path.
 
     Returns a disk device path from a partition device path, and stat for
@@ -633,9 +635,9 @@ class retry_if_exit_code(tenacity.retry_if_exception):
 
 def retry(retry_param: Union[None,
                              Type[Exception],
-                             Tuple[Type[Exception], ...],
+                             tuple[Type[Exception], ...],
                              int,
-                             Tuple[int, ...]],
+                             tuple[int, ...]],
           interval: float = 1,
           retries: int = 3,
           backoff_rate: float = 2,
diff --git a/cinder/volume/api.py b/cinder/volume/api.py
index 98b5c8d899d..b18e8a110cf 100644
--- a/cinder/volume/api.py
+++ b/cinder/volume/api.py
@@ -16,11 +16,12 @@
 
 """Handles all requests relating to volumes."""
 
+from __future__ import annotations
+
 import ast
 import collections
 import datetime
-from typing import (Any, DefaultDict, Dict, Iterable, List,   # noqa: H301
-                    Optional, Tuple, Union)
+from typing import (Any, DefaultDict, Iterable, Optional, Union)  # noqa: H301
 
 from castellan import key_manager
 from oslo_config import cfg
@@ -139,7 +140,7 @@ class API(base.Base):
             services = objects.ServiceList.get_all_by_topic(ctxt, topic)
             az_data = [(s.availability_zone, s.disabled)
                        for s in services]
-            disabled_map: Dict[str, bool] = {}
+            disabled_map: dict[str, bool] = {}
             for (az_name, disabled) in az_data:
                 tracked_disabled = disabled_map.get(az_name, True)
                 disabled_map[az_name] = tracked_disabled and disabled
@@ -725,8 +726,8 @@ class API(base.Base):
             search_opts: Optional[dict] = None,
             marker: Optional[str] = None,
             limit: Optional[int] = None,
-            sort_keys: Optional[List[str]] = None,
-            sort_dirs: Optional[List[str]] = None,
+            sort_keys: Optional[list[str]] = None,
+            sort_dirs: Optional[list[str]] = None,
             offset: Optional[int] = None) -> objects.SnapshotList:
         context.authorize(snapshot_policy.GET_ALL_POLICY)
 
@@ -971,7 +972,7 @@ class API(base.Base):
 
         utils.check_metadata_properties(metadata)
 
-        valid_status: Tuple[str, ...]
+        valid_status: tuple[str, ...]
         valid_status = ('available',)
         if force or allow_in_use:
             valid_status = ('available', 'in-use')
@@ -1118,7 +1119,7 @@ class API(base.Base):
             context: context.RequestContext,
             volume_list: objects.VolumeList) -> list:
         reserve_opts_list = []
-        total_reserve_opts: Dict[str, int] = {}
+        total_reserve_opts: dict[str, int] = {}
         try:
             for volume in volume_list:
                 if CONF.no_snapshot_gb_quota:
@@ -1155,7 +1156,7 @@ class API(base.Base):
             name: str,
             description: str,
             cgsnapshot_id: str,
-            group_snapshot_id: Optional[str] = None) -> Dict[str, Any]:
+            group_snapshot_id: Optional[str] = None) -> dict[str, Any]:
         options = {'volume_id': volume['id'],
                    'cgsnapshot_id': cgsnapshot_id,
                    'group_snapshot_id': group_snapshot_id,
@@ -1176,7 +1177,7 @@ class API(base.Base):
             volume: objects.Volume,
             name: str,
             description: str,
-            metadata: Optional[Dict[str, Any]] = None,
+            metadata: Optional[dict[str, Any]] = None,
             cgsnapshot_id: Optional[str] = None,
             group_snapshot_id: Optional[str] = None,
             allow_in_use: bool = False) -> objects.Snapshot:
@@ -1193,7 +1194,7 @@ class API(base.Base):
             volume: objects.Volume,
             name: str,
             description: str,
-            metadata: Optional[Dict[str, Any]] = None) -> objects.Snapshot:
+            metadata: Optional[dict[str, Any]] = None) -> objects.Snapshot:
         result = self._create_snapshot(context, volume, name, description,
                                        True, metadata)
         LOG.info("Snapshot force create request issued successfully.",
@@ -1211,7 +1212,7 @@ class API(base.Base):
             snapshot.assert_not_frozen()
 
         # Build required conditions for conditional update
-        expected: Dict[str, Any] = {'cgsnapshot_id': None,
+        expected: dict[str, Any] = {'cgsnapshot_id': None,
                                     'group_snapshot_id': None}
         # If not force deleting we have status conditions
         if not force:
@@ -1237,7 +1238,7 @@ class API(base.Base):
     def update_snapshot(self,
                         context: context.RequestContext,
                         snapshot: objects.Snapshot,
-                        fields: Dict[str, Any]) -> None:
+                        fields: dict[str, Any]) -> None:
         context.authorize(snapshot_policy.UPDATE_POLICY,
                           target_obj=snapshot)
         snapshot.update(fields)
@@ -1256,7 +1257,7 @@ class API(base.Base):
     def create_volume_metadata(self,
                                context: context.RequestContext,
                                volume: objects.Volume,
-                               metadata: Dict[str, Any]) -> dict:
+                               metadata: dict[str, Any]) -> dict:
         """Creates volume metadata."""
         context.authorize(vol_meta_policy.CREATE_POLICY, target_obj=volume)
         db_meta = self._update_volume_metadata(context, volume, metadata)
@@ -1284,7 +1285,7 @@ class API(base.Base):
     def _update_volume_metadata(self,
                                 context: context.RequestContext,
                                 volume: objects.Volume,
-                                metadata: Dict[str, Any],
+                                metadata: dict[str, Any],
                                 delete: bool = False,
                                 meta_type=common.METADATA_TYPES.user) -> dict:
         if volume['status'] in ('maintenance', 'uploading'):
@@ -1298,7 +1299,7 @@ class API(base.Base):
     def update_volume_metadata(self,
                                context: context.RequestContext,
                                volume: objects.Volume,
-                               metadata: Dict[str, Any],
+                               metadata: dict[str, Any],
                                delete: bool = False,
                                meta_type=common.METADATA_TYPES.user) -> dict:
         """Updates volume metadata.
@@ -1320,7 +1321,7 @@ class API(base.Base):
     def update_volume_admin_metadata(self,
                                      context: context.RequestContext,
                                      volume: objects.Volume,
-                                     metadata: Dict[str, Any],
+                                     metadata: dict[str, Any],
                                      delete: Optional[bool] = False,
                                      add: Optional[bool] = True,
                                      update: Optional[bool] = True) -> dict:
@@ -1369,7 +1370,7 @@ class API(base.Base):
     def update_snapshot_metadata(self,
                                  context: context.RequestContext,
                                  snapshot: objects.Snapshot,
-                                 metadata: Dict[str, Any],
+                                 metadata: dict[str, Any],
                                  delete: bool = False) -> dict:
         """Updates or creates snapshot metadata.
 
@@ -1410,7 +1411,7 @@ class API(base.Base):
 
     def get_volume_image_metadata(self,
                                   context: context.RequestContext,
-                                  volume: objects.Volume) -> Dict[str, str]:
+                                  volume: objects.Volume) -> dict[str, str]:
         context.authorize(vol_meta_policy.GET_POLICY, target_obj=volume)
         db_data = self.db.volume_glance_metadata_get(context, volume['id'])
         LOG.info("Get volume image-metadata completed successfully.",
@@ -1420,7 +1421,7 @@ class API(base.Base):
     def get_list_volumes_image_metadata(
             self,
             context: context.RequestContext,
-            volume_id_list: List[str]) -> DefaultDict[str, str]:
+            volume_id_list: list[str]) -> DefaultDict[str, str]:
         db_data = self.db.volume_glance_metadata_list_get(context,
                                                           volume_id_list)
         results: collections.defaultdict = collections.defaultdict(dict)
@@ -1458,8 +1459,8 @@ class API(base.Base):
     def copy_volume_to_image(self,
                              context: context.RequestContext,
                              volume: objects.Volume,
-                             metadata: Dict[str, str],
-                             force: bool) -> Dict[str, Optional[str]]:
+                             metadata: dict[str, str],
+                             force: bool) -> dict[str, Optional[str]]:
         """Create a new image from the specified volume."""
         if not CONF.enable_force_upload and force:
             LOG.info("Force upload to image is disabled, "
@@ -2069,8 +2070,8 @@ class API(base.Base):
                                marker: Optional[str] = None,
                                limit: Optional[int] = None,
                                offset: Optional[int] = None,
-                               sort_keys: Optional[List[str]] = None,
-                               sort_dirs: Optional[List[str]] = None):
+                               sort_keys: Optional[list[str]] = None,
+                               sort_dirs: Optional[list[str]] = None):
         svc = self._get_service_by_host_cluster(context, host, cluster_name)
         return self.volume_rpcapi.get_manageable_volumes(context, svc,
                                                          marker, limit,
@@ -2110,8 +2111,8 @@ class API(base.Base):
             marker: Optional[str] = None,
             limit: Optional[int] = None,
             offset: Optional[int] = None,
-            sort_keys: Optional[List[str]] = None,
-            sort_dirs: Optional[List[str]] = None) -> List[dict]:
+            sort_keys: Optional[list[str]] = None,
+            sort_dirs: Optional[list[str]] = None) -> list[dict]:
         svc = self._get_service_by_host_cluster(context, host, cluster_name,
                                                 'snapshot')
         return self.volume_rpcapi.get_manageable_snapshots(context, svc,
diff --git a/cinder/volume/drivers/rbd.py b/cinder/volume/drivers/rbd.py
index 601b416ba4a..71777f6aa3e 100644
--- a/cinder/volume/drivers/rbd.py
+++ b/cinder/volume/drivers/rbd.py
@@ -13,6 +13,8 @@
 #    under the License.
 """RADOS Block Device Driver"""
 
+from __future__ import annotations
+
 import binascii
 import errno
 import json
@@ -20,7 +22,7 @@ import math
 import os
 import tempfile
 import typing
-from typing import Any, Dict, List, Optional, Tuple, Union  # noqa: H301
+from typing import Any, Optional, Union  # noqa: H301
 import urllib.parse
 
 from castellan import key_manager
@@ -163,7 +165,7 @@ class RBDVolumeProxy(object):
                  pool: Optional[str] = None,
                  snapshot: Optional[str] = None,
                  read_only: bool = False,
-                 remote: Optional[Dict[str, str]] = None,
+                 remote: Optional[dict[str, str]] = None,
                  timeout: Optional[int] = None,
                  client: 'rados.Rados' = None,
                  ioctx: 'rados.Ioctx' = None):
@@ -254,7 +256,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                  **kwargs) -> None:
         super(RBDDriver, self).__init__(*args, **kwargs)
         self.configuration.append_config_values(RBD_OPTS)
-        self._stats: Dict[str, Union[str, bool]] = {}
+        self._stats: dict[str, Union[str, bool]] = {}
         # allow overrides for testing
         self.rados = kwargs.get('rados', rados)
         self.rbd = kwargs.get('rbd', rbd)
@@ -270,10 +272,10 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         self._backend_name = (self.configuration.volume_backend_name or
                               self.__class__.__name__)
         self._active_backend_id: Optional[str] = active_backend_id
-        self._active_config: Dict[str, str] = {}
+        self._active_config: dict[str, str] = {}
         self._is_replication_enabled = False
         self._replication_targets: list = []
-        self._target_names: List[str] = []
+        self._target_names: list[str] = []
         self._clone_v2_api_checked: bool = False
 
         if self.rbd is not None:
@@ -341,7 +343,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                                 ' performance, fewer deletion issues')
 
     def _get_target_config(self,
-                           target_id: Optional[str]) -> Dict[str,
+                           target_id: Optional[str]) -> dict[str,
                                                              str]:
         """Get a replication target from known replication targets."""
         for target in self._replication_targets:
@@ -371,7 +373,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
             self._target_names.append('default')
 
     def _parse_replication_configs(self,
-                                   replication_devices: List[dict]) -> None:
+                                   replication_devices: list[dict]) -> None:
         for replication_device in replication_devices:
             if 'backend_id' not in replication_device:
                 msg = _('Missing backend_id in replication_device '
@@ -396,8 +398,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def _get_config_tuple(
             self,
-            remote: Optional[Dict[str, str]] = None) \
-            -> Tuple[Optional[str], Optional[str],
+            remote: Optional[dict[str, str]] = None) \
+            -> tuple[Optional[str], Optional[str],
                      Optional[str], Optional[str]]:
         if not remote:
             remote = self._active_config
@@ -486,7 +488,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
     def RBDProxy(self) -> tpool.Proxy:
         return tpool.Proxy(self.rbd.RBD())
 
-    def _ceph_args(self) -> List[str]:
+    def _ceph_args(self) -> list[str]:
         args = []
 
         name, conf, user, secret_uuid = self._get_config_tuple()
@@ -504,13 +506,13 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                           pool: Optional[str] = None,
                           remote: Optional[dict] = None,
                           timeout: Optional[int] = None) -> \
-            Tuple['rados.Rados', 'rados.Ioctx']:
+            tuple['rados.Rados', 'rados.Ioctx']:
         @utils.retry(exception.VolumeBackendAPIException,
                      self.configuration.rados_connection_interval,
                      self.configuration.rados_connection_retries)
         def _do_conn(pool: Optional[str],
                      remote: Optional[dict],
-                     timeout: Optional[int]) -> Tuple['rados.Rados',
+                     timeout: Optional[int]) -> tuple['rados.Rados',
                                                       'rados.Ioctx']:
             name, conf, user, secret_uuid = self._get_config_tuple(remote)
 
@@ -557,7 +559,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         client.shutdown()
 
     @staticmethod
-    def _get_backup_snaps(rbd_image) -> List:
+    def _get_backup_snaps(rbd_image) -> list:
         """Get list of any backup snapshots that exist on this volume.
 
         There should only ever be one but accept all since they need to be
@@ -570,7 +572,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         from cinder.backup.drivers import ceph
         return ceph.CephBackupDriver.get_backup_snaps(rbd_image)
 
-    def _get_mon_addrs(self) -> Tuple[List[str], List[str]]:
+    def _get_mon_addrs(self) -> tuple[list[str], list[str]]:
         args = ['ceph', 'mon', 'dump', '--format=json']
         args.extend(self._ceph_args())
         out, _ = self._execute(*args)
@@ -578,7 +580,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         if lines[0].startswith('dumped monmap epoch'):
             lines = lines[1:]
         monmap = json.loads('\n'.join(lines))
-        addrs: List[str] = [mon['addr'] for mon in monmap['mons']]
+        addrs: list[str] = [mon['addr'] for mon in monmap['mons']]
         hosts = []
         ports = []
         for addr in addrs:
@@ -614,8 +616,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         total_provisioned = math.ceil(float(total_provisioned) / units.Gi)
         return total_provisioned
 
-    def _get_pool_stats(self) -> Union[Tuple[str, str],
-                                       Tuple[float, float]]:
+    def _get_pool_stats(self) -> Union[tuple[str, str],
+                                       tuple[float, float]]:
         """Gets pool free and total capacity in GiB.
 
         Calculate free and total capacity of the pool based on the pool's
@@ -753,7 +755,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
     def create_cloned_volume(
             self,
             volume: Volume,
-            src_vref: Volume) -> Optional[Dict[str, Optional[str]]]:
+            src_vref: Volume) -> Optional[dict[str, Optional[str]]]:
         """Create a cloned volume from another volume.
 
         Since we are cloning from a volume and not a snapshot, we must first
@@ -860,7 +862,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         LOG.debug("clone created successfully")
         return volume_update
 
-    def _enable_replication(self, volume: Volume) -> Dict[str, str]:
+    def _enable_replication(self, volume: Volume) -> dict[str, str]:
         """Enable replication for a volume.
 
         Returns required volume update.
@@ -884,7 +886,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         return {'replication_status': fields.ReplicationStatus.ENABLED,
                 'replication_driver_data': driver_data}
 
-    def _enable_multiattach(self, volume: Volume) -> Dict[str, str]:
+    def _enable_multiattach(self, volume: Volume) -> dict[str, str]:
         vol_name = utils.convert_str(volume.name)
         with RBDVolumeProxy(self, vol_name) as image:
             image_features = image.features()
@@ -894,7 +896,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         return {'provider_location':
                 self._dumps({'saved_features': image_features})}
 
-    def _disable_multiattach(self, volume: Volume) -> Dict[str, None]:
+    def _disable_multiattach(self, volume: Volume) -> dict[str, None]:
         vol_name = utils.convert_str(volume.name)
         with RBDVolumeProxy(self, vol_name) as image:
             try:
@@ -932,7 +934,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
     def _setup_volume(
             self,
             volume: Volume,
-            volume_type: Optional[VolumeType] = None) -> Dict[str,
+            volume_type: Optional[VolumeType] = None) -> dict[str,
                                                               Optional[str]]:
 
         if volume_type:
@@ -1030,7 +1032,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
             cmd.extend(self._ceph_args())
             self._execute(*cmd)
 
-    def create_volume(self, volume: Volume) -> Dict[str, Any]:
+    def create_volume(self, volume: Volume) -> dict[str, Any]:
         """Creates a logical volume."""
 
         if volume.encryption_key_id:
@@ -1092,7 +1094,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                volume: Volume,
                src_pool: str,
                src_image: str,
-               src_snap: str) -> Dict[str, Optional[str]]:
+               src_snap: str) -> dict[str, Optional[str]]:
         LOG.debug('cloning %(pool)s/%(img)s@%(snap)s to %(dst)s',
                   dict(pool=src_pool, img=src_image, snap=src_snap,
                        dst=volume.name))
@@ -1178,8 +1180,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
             self,
             volume: 'rbd.Image',
             volume_name: str,
-            snap: Optional[str] = None) -> Union[Tuple[str, str, str],
-                                                 Tuple[None, None, None]]:
+            snap: Optional[str] = None) -> Union[tuple[str, str, str],
+                                                 tuple[None, None, None]]:
         """If volume is a clone, return its parent info.
 
         Returns a tuple of (pool, parent, snap). A snapshot may optionally be
@@ -1207,7 +1209,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def _get_children_info(self,
                            volume: 'rbd.Image',
-                           snap: Optional[str]) -> List[tuple]:
+                           snap: Optional[str]) -> list[tuple]:
         """List children for the given snapshot of a volume(image).
 
         Returns a list of (pool, image).
@@ -1429,7 +1431,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         with RBDVolumeProxy(self, volume.name) as image:
             image.rollback_to_snap(snapshot.name)
 
-    def _disable_replication(self, volume: Volume) -> Dict[str, Optional[str]]:
+    def _disable_replication(self, volume: Volume) -> dict[str, Optional[str]]:
         """Disable replication on the given volume."""
         vol_name = utils.convert_str(volume.name)
         with RBDVolumeProxy(self, vol_name) as image:
@@ -1450,18 +1452,18 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                context: context.RequestContext,
                volume: Volume,
                new_type: VolumeType,
-               diff: Union[Dict[str, Dict[str, str]], Dict[str, Dict], None],
-               host: Optional[Dict[str, str]]) -> Tuple[bool, dict]:
+               diff: Union[dict[str, dict[str, str]], dict[str, dict], None],
+               host: Optional[dict[str, str]]) -> tuple[bool, dict]:
         """Retype from one volume type to another on the same backend."""
         return True, self._setup_volume(volume, new_type)
 
     @staticmethod
-    def _dumps(obj: Dict[str, Union[bool, int]]) -> str:
+    def _dumps(obj: dict[str, Union[bool, int]]) -> str:
         return json.dumps(obj, separators=(',', ':'), sort_keys=True)
 
     def _exec_on_volume(self,
                         volume_name: str,
-                        remote: Dict[str, str],
+                        remote: dict[str, str],
                         operation: str,
                         *args: Any,
                         **kwargs: Any):
@@ -1477,9 +1479,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def _failover_volume(self,
                          volume: Volume,
-                         remote: Dict[str, str],
+                         remote: dict[str, str],
                          is_demoted: bool,
-                         replication_status: str) -> Dict[str, Any]:
+                         replication_status: str) -> dict[str, Any]:
         """Process failover for a volume.
 
         There are 2 different cases that will return different update values
@@ -1519,8 +1521,8 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         return error_result
 
     def _demote_volumes(self,
-                        volumes: List[Volume],
-                        until_failure: bool = True) -> List[bool]:
+                        volumes: list[Volume],
+                        until_failure: bool = True) -> list[bool]:
         """Try to demote volumes on the current primary cluster."""
         result = []
         try_demoting = True
@@ -1542,7 +1544,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def _get_failover_target_config(
             self,
-            secondary_id: Optional[str] = None) -> Tuple[str, dict]:
+            secondary_id: Optional[str] = None) -> tuple[str, dict]:
         if not secondary_id:
             # In auto mode exclude failback and active
             candidates = set(self._target_names).difference(
@@ -1557,7 +1559,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                  context: context.RequestContext,
                  volumes: list,
                  secondary_id: Optional[str] = None,
-                 groups=None) -> Tuple[str, list, list]:
+                 groups=None) -> tuple[str, list, list]:
         """Failover replicated volumes."""
         LOG.info('RBD driver failover started.')
         if not self._is_replication_enabled:
@@ -1595,11 +1597,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def failover_host(self,
                       context: context.RequestContext,
-                      volumes: List[Volume],
+                      volumes: list[Volume],
                       secondary_id: Optional[str] = None,
-                      groups: Optional[List] = None) -> Tuple[str,
-                                                              List[Volume],
-                                                              List]:
+                      groups: Optional[list] = None) -> tuple[str,
+                                                              list[Volume],
+                                                              list]:
         """Failover to replication target.
 
         This function combines calls to failover() and failover_completed() to
@@ -1631,7 +1633,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def initialize_connection(self,
                               volume: Volume,
-                              connector: dict) -> Dict[str, Any]:
+                              connector: dict) -> dict[str, Any]:
         hosts, ports = self._get_mon_addrs()
         name, conf, user, secret_uuid = self._get_config_tuple()
         data = {
@@ -1662,7 +1664,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         pass
 
     @staticmethod
-    def _parse_location(location: str) -> List[str]:
+    def _parse_location(location: str) -> list[str]:
         prefix = 'rbd://'
         if not location.startswith(prefix):
             reason = _('Not stored in rbd')
@@ -1729,7 +1731,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                     volume: Volume,
                     image_location: Optional[list],
                     image_meta: dict,
-                    image_service) -> Tuple[dict, bool]:
+                    image_service) -> tuple[dict, bool]:
         if image_location:
             # Note: image_location[0] is glance image direct_url.
             # image_location[1] contains the list of all locations (including
@@ -1885,7 +1887,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                   {'old_size': old_size, 'new_size': new_size})
 
     def manage_existing(self,
-                        volume: Volume, existing_ref: Dict[str, str]) -> None:
+                        volume: Volume, existing_ref: dict[str, str]) -> None:
         """Manages an existing image.
 
         Renames the image name to match the expected name for the volume.
@@ -1906,7 +1908,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def manage_existing_get_size(self,
                                  volume: Volume,
-                                 existing_ref: Dict[str, str]) -> int:
+                                 existing_ref: dict[str, str]) -> int:
         """Return size of an existing image for manage_existing.
 
         :param volume:
@@ -1963,12 +1965,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         return json.loads(out)
 
     def get_manageable_volumes(self,
-                               cinder_volumes: List[Dict[str, str]],
+                               cinder_volumes: list[dict[str, str]],
                                marker: Optional[Any],
                                limit: int,
                                offset: int,
-                               sort_keys: List[str],
-                               sort_dirs: List[str]) -> List[Dict[str, Any]]:
+                               sort_keys: list[str],
+                               sort_dirs: list[str]) -> list[dict[str, Any]]:
         manageable_volumes = []
         cinder_ids = [resource['id'] for resource in cinder_volumes]
 
@@ -2016,11 +2018,11 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
         pass
 
     def update_migrated_volume(self,
-                               ctxt: Dict,
+                               ctxt: dict,
                                volume: Volume,
                                new_volume: Volume,
                                original_volume_status: str) -> \
-            Union[Dict[str, None], Dict[str, Optional[str]]]:
+            Union[dict[str, None], dict[str, Optional[str]]]:
         """Return model update from RBD for migrated volume.
 
         This method should rename the back-end volume name(id) on the
@@ -2064,7 +2066,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
     def migrate_volume(self,
                        context: context.RequestContext,
                        volume: Volume,
-                       host: Dict[str, Dict[str, str]]) -> Tuple[bool, None]:
+                       host: dict[str, dict[str, str]]) -> tuple[bool, None]:
 
         refuse_to_migrate = (False, None)
 
@@ -2146,7 +2148,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def manage_existing_snapshot_get_size(self,
                                           snapshot: Snapshot,
-                                          existing_ref: Dict[str, Any]) -> int:
+                                          existing_ref: dict[str, Any]) -> int:
         """Return size of an existing image for manage_existing.
 
         :param snapshot:
@@ -2197,7 +2199,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def manage_existing_snapshot(self,
                                  snapshot: Snapshot,
-                                 existing_ref: Dict[str, Any]) -> None:
+                                 existing_ref: dict[str, Any]) -> None:
         """Manages an existing snapshot.
 
         Renames the snapshot name to match the expected name for the snapshot.
@@ -2220,12 +2222,12 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
                 volume.protect_snap(snapshot.name)
 
     def get_manageable_snapshots(self,
-                                 cinder_snapshots: List[Dict[str, str]],
+                                 cinder_snapshots: list[dict[str, str]],
                                  marker: Optional[Any],
                                  limit: int,
                                  offset: int,
-                                 sort_keys: List[str],
-                                 sort_dirs: List[str]) -> List[Dict[str, Any]]:
+                                 sort_keys: list[str],
+                                 sort_dirs: list[str]) -> list[dict[str, Any]]:
         """List manageable snapshots on RBD backend."""
         manageable_snapshots = []
         cinder_snapshot_ids = [resource['id'] for resource in cinder_snapshots]
@@ -2286,7 +2288,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
 
     def get_backup_device(self,
                           context: context.RequestContext,
-                          backup: Backup) -> Tuple[Volume, bool]:
+                          backup: Backup) -> tuple[Volume, bool]:
         """Get a backup device from an existing volume.
 
         To support incremental backups on Ceph to Ceph we don't clone
diff --git a/cinder/volume/flows/api/create_volume.py b/cinder/volume/flows/api/create_volume.py
index 40f861fa1eb..c96841f7c0e 100644
--- a/cinder/volume/flows/api/create_volume.py
+++ b/cinder/volume/flows/api/create_volume.py
@@ -10,7 +10,10 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from typing import Any, Dict, List, Optional, Tuple, Type, Union  # noqa: H301
+
+from __future__ import annotations  # Remove when only supporting python 3.9+
+
+from typing import Any, Optional, Type, Union  # noqa: H301
 
 from oslo_config import cfg
 from oslo_log import log as logging
@@ -82,10 +85,10 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
 
     @staticmethod
     def _extract_resource(resource: Optional[dict],
-                          allowed_vals: Tuple[Tuple[str, ...]],
+                          allowed_vals: tuple[tuple[str, ...]],
                           exc: Type[exception.CinderException],
                           resource_name: str,
-                          props: Tuple[str] = ('status',)) -> Optional[str]:
+                          props: tuple[str] = ('status',)) -> Optional[str]:
         """Extracts the resource id from the provided resource.
 
         This method validates the input resource dict and checks that the
@@ -233,7 +236,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
     def _get_image_metadata(self,
                             context: context.RequestContext,
                             image_id: Optional[str],
-                            size: int) -> Optional[Dict[str, Any]]:
+                            size: int) -> Optional[dict[str, Any]]:
         """Checks image existence and validates the image metadata.
 
         Returns: image metadata or None
@@ -257,7 +260,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
             snapshot,
             source_volume,
             group: Optional[dict],
-            volume_type: Optional[Dict[str, Any]] = None) -> Tuple[List[str],
+            volume_type: Optional[dict[str, Any]] = None) -> tuple[list[str],
                                                                    bool]:
         """Extracts and returns a validated availability zone list.
 
@@ -358,7 +361,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
             volume_type_id: str,
             snapshot: Optional[objects.Snapshot],
             source_volume: Optional[objects.Volume],
-            image_metadata: Optional[Dict[str, Any]]) -> Optional[str]:
+            image_metadata: Optional[dict[str, Any]]) -> Optional[str]:
         if volume_types.is_encrypted(context, volume_type_id):
             encryption_key_id = None
 
@@ -442,7 +445,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
                 group,
                 group_snapshot,
                 backup: Optional[dict],
-                multiattach: bool = False) -> Dict[str, Any]:
+                multiattach: bool = False) -> dict[str, Any]:
 
         utils.check_exclusive_options(snapshot=snapshot,
                                       imageRef=image_id,
@@ -502,7 +505,7 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
         if multiattach:
             context.authorize(policy.MULTIATTACH_POLICY)
 
-        specs: Optional[Dict] = {}
+        specs: Optional[dict] = {}
         if volume_type_id:
             qos_specs = volume_types.get_volume_type_qos_specs(volume_type_id)
             if qos_specs['qos_specs']:
@@ -560,7 +563,7 @@ class EntryCreateTask(flow_utils.CinderTask):
     def execute(self,
                 context: context.RequestContext,
                 optional_args: dict,
-                **kwargs) -> Dict[str, Any]:
+                **kwargs) -> dict[str, Any]:
         """Creates a database entry for the given inputs and returns details.
 
         Accesses the database and creates a new entry for the to be created
@@ -809,8 +812,8 @@ class VolumeCastTask(flow_utils.CinderTask):
 
     def _cast_create_volume(self,
                             context: context.RequestContext,
-                            request_spec: Dict[str, Any],
-                            filter_properties: Dict) -> None:
+                            request_spec: dict[str, Any],
+                            filter_properties: dict) -> None:
         source_volid = request_spec['source_volid']
         volume = request_spec['volume']
         snapshot_id = request_spec['snapshot_id']
diff --git a/cinder/volume/flows/manager/create_volume.py b/cinder/volume/flows/manager/create_volume.py
index 0c3a53d7fc8..3f251289055 100644
--- a/cinder/volume/flows/manager/create_volume.py
+++ b/cinder/volume/flows/manager/create_volume.py
@@ -10,10 +10,12 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
+from __future__ import annotations
+
 import binascii
 import traceback
 import typing
-from typing import Any, Dict, List, Optional, Tuple  # noqa: H301
+from typing import Any, Optional  # noqa: H301
 
 from castellan import key_manager
 import os_brick.initiator.connectors
@@ -676,7 +678,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                                                    volume_metadata)
 
     @staticmethod
-    def _extract_cinder_ids(urls: List[str]) -> List[str]:
+    def _extract_cinder_ids(urls: list[str]) -> list[str]:
         """Process a list of location URIs from glance
 
         :param urls: list of glance location URIs
@@ -707,7 +709,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                             context: cinder_context.RequestContext,
                             volume: objects.Volume,
                             image_location,
-                            image_meta: Dict[str, Any]) -> Tuple[None, bool]:
+                            image_meta: dict[str, Any]) -> tuple[None, bool]:
         """Create a volume efficiently from an existing image.
 
         Returns a dict of volume properties eg. provider_location,
@@ -768,7 +770,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                                     context: cinder_context.RequestContext,
                                     volume: objects.Volume,
                                     image_location,
-                                    image_meta: Dict[str, Any],
+                                    image_meta: dict[str, Any],
                                     image_service) -> dict:
         # TODO(harlowja): what needs to be rolled back in the clone if this
         # volume create fails?? Likely this should be a subflow or broken
@@ -804,7 +806,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
             internal_context: cinder_context.RequestContext,
             volume: objects.Volume,
             image_id: str,
-            image_meta: Dict[str, Any]) -> Tuple[None, bool]:
+            image_meta: dict[str, Any]) -> tuple[None, bool]:
         """Attempt to create the volume using the image cache.
 
         Best case this will simply clone the existing volume in the cache.
@@ -853,8 +855,8 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                                    volume: objects.Volume,
                                    image_location: str,
                                    image_id: str,
-                                   image_meta: Dict[str, Any],
-                                   image_service) -> Tuple[Optional[dict],
+                                   image_meta: dict[str, Any],
+                                   image_service) -> tuple[Optional[dict],
                                                            bool]:
         assert self.image_volume_cache is not None
         internal_context = cinder_context.get_internal_tenant_context()
@@ -895,7 +897,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
             volume: objects.Volume,
             image_location,
             image_id: str,
-            image_meta: Dict[str, Any],
+            image_meta: dict[str, Any],
             image_service,
             update_cache: bool = False) -> Optional[dict]:
         # NOTE(e0ne): check for free space in image_conversion_dir before
@@ -1045,7 +1047,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                            volume: objects.Volume,
                            image_location,
                            image_id: str,
-                           image_meta: Dict[str, Any],
+                           image_meta: dict[str, Any],
                            image_service,
                            **kwargs: Any) -> Optional[dict]:
         LOG.debug("Cloning %(volume_id)s from image %(image_id)s "
@@ -1120,7 +1122,7 @@ class CreateVolumeFromSpecTask(flow_utils.CinderTask):
                             context: cinder_context.RequestContext,
                             volume: objects.Volume,
                             backup_id: str,
-                            **kwargs) -> Tuple[Dict, bool]:
+                            **kwargs) -> tuple[dict, bool]:
         LOG.info("Creating volume %(volume_id)s from backup %(backup_id)s.",
                  {'volume_id': volume.id,
                   'backup_id': backup_id})
diff --git a/cinder/volume/manager.py b/cinder/volume/manager.py
index 88d325d2916..ded7428b0f9 100644
--- a/cinder/volume/manager.py
+++ b/cinder/volume/manager.py
@@ -35,10 +35,12 @@ intact.
 
 """
 
+from __future__ import annotations  # Remove when only supporting Python 3.9+
+
 import functools
 import time
 import typing
-from typing import Any, Dict, List, Optional, Set, Tuple, Union  # noqa: H301
+from typing import Any, Optional, Union  # noqa: H301
 
 from castellan import key_manager
 from oslo_config import cfg
@@ -539,7 +541,7 @@ class VolumeManager(manager.CleanableManager,
         num_vols: int = 0
         num_snaps: int = 0
         max_objs_num: int = 0
-        req_range: Union[List[int], range] = [0]
+        req_range: Union[list[int], range] = [0]
         req_limit = CONF.init_host_max_objects_retrieval or 0
         use_batch_objects_retrieval: bool = req_limit > 0
 
@@ -1601,7 +1603,7 @@ class VolumeManager(manager.CleanableManager,
         reservations = QUOTAS.reserve(ctx, **reserve_opts)
         # NOTE(yikun): Skip 'snapshot_id', 'source_volid' keys to avoid
         # creating tmp img vol from wrong snapshot or wrong source vol.
-        skip: Set[str] = {'snapshot_id', 'source_volid'}
+        skip: set[str] = {'snapshot_id', 'source_volid'}
         skip.update(self._VOLUME_CLONE_SKIP_PROPERTIES)
         try:
             new_vol_values = {k: volume[k] for k in set(volume.keys()) - skip}
@@ -3199,7 +3201,7 @@ class VolumeManager(manager.CleanableManager,
 
         return vol_ref
 
-    def _get_cluster_or_host_filters(self) -> Dict[str, Any]:
+    def _get_cluster_or_host_filters(self) -> dict[str, Any]:
         if self.cluster:
             filters = {'cluster_name': self.cluster}
         else:
@@ -3500,12 +3502,12 @@ class VolumeManager(manager.CleanableManager,
             self,
             context: context.RequestContext,
             group: objects.Group,
-            volumes: List[objects.Volume],
+            volumes: list[objects.Volume],
             group_snapshot: Optional[objects.GroupSnapshot] = None,
-            snapshots: Optional[List[objects.Snapshot]] = None,
+            snapshots: Optional[list[objects.Snapshot]] = None,
             source_group: Optional[objects.Group] = None,
-            source_vols: Optional[List[objects.Volume]] = None) \
-            -> Tuple[Dict[str, str], List[Dict[str, str]]]:
+            source_vols: Optional[list[objects.Volume]] = None) \
+            -> tuple[dict[str, str], list[dict[str, str]]]:
         """Creates a group from source.
 
         :param context: the context of the caller.
@@ -3518,7 +3520,7 @@ class VolumeManager(manager.CleanableManager,
         :returns: model_update, volumes_model_update
         """
         model_update = {'status': 'available'}
-        volumes_model_update: List[dict] = []
+        volumes_model_update: list[dict] = []
         for vol in volumes:
             if snapshots:
                 for snapshot in snapshots:
@@ -3819,7 +3821,7 @@ class VolumeManager(manager.CleanableManager,
     def _convert_group_to_cg(
             self,
             group: objects.Group,
-            volumes: objects.VolumeList) -> Tuple[objects.Group,
+            volumes: objects.VolumeList) -> tuple[objects.Group,
                                                   objects.VolumeList]:
         if not group:
             return None, None
@@ -3832,7 +3834,7 @@ class VolumeManager(manager.CleanableManager,
         return cg, volumes
 
     def _remove_consistencygroup_id_from_volumes(
-            self, volumes: Optional[List[objects.Volume]]) -> None:
+            self, volumes: Optional[list[objects.Volume]]) -> None:
         if not volumes:
             return
         for vol in volumes:
@@ -3843,7 +3845,7 @@ class VolumeManager(manager.CleanableManager,
             self,
             group_snapshot: objects.GroupSnapshot,
             snapshots: objects.SnapshotList,
-            ctxt) -> Tuple[objects.CGSnapshot,
+            ctxt) -> tuple[objects.CGSnapshot,
                            objects.SnapshotList]:
         if not group_snapshot:
             return None, None
@@ -3881,7 +3883,7 @@ class VolumeManager(manager.CleanableManager,
     def _delete_group_generic(self,
                               context: context.RequestContext,
                               group: objects.Group,
-                              volumes) -> Tuple:
+                              volumes) -> tuple:
         """Deletes a group and volumes in the group."""
         model_update = {'status': group.status}
         volume_model_updates = []
@@ -3903,7 +3905,7 @@ class VolumeManager(manager.CleanableManager,
     def _update_group_generic(
             self, context: context.RequestContext, group,
             add_volumes=None,
-            remove_volumes=None) -> Tuple[None, None, None]:
+            remove_volumes=None) -> tuple[None, None, None]:
         """Updates a group."""
         # NOTE(xyang): The volume manager adds/removes the volume to/from the
         # group in the database. This default implementation does not do
@@ -3916,7 +3918,7 @@ class VolumeManager(manager.CleanableManager,
             group,
             volumes: Optional[str],
             add: bool = True) -> list:
-        valid_status: Tuple[str, ...]
+        valid_status: tuple[str, ...]
         if add:
             valid_status = VALID_ADD_VOL_TO_GROUP_STATUS
         else:
@@ -4182,7 +4184,7 @@ class VolumeManager(manager.CleanableManager,
             self,
             context: context.RequestContext,
             group_snapshot: objects.GroupSnapshot,
-            snapshots: list) -> Tuple[dict, List[dict]]:
+            snapshots: list) -> tuple[dict, list[dict]]:
         """Creates a group_snapshot."""
         model_update = {'status': 'available'}
         snapshot_model_updates = []
@@ -4208,7 +4210,7 @@ class VolumeManager(manager.CleanableManager,
             self,
             context: context.RequestContext,
             group_snapshot: objects.GroupSnapshot,
-            snapshots: list) -> Tuple[dict, List[dict]]:
+            snapshots: list) -> tuple[dict, list[dict]]:
         """Deletes a group_snapshot."""
         model_update = {'status': group_snapshot.status}
         snapshot_model_updates = []
@@ -4772,7 +4774,7 @@ class VolumeManager(manager.CleanableManager,
                            ctxt: context.RequestContext,
                            volume: objects.Volume,
                            attachment: objects.VolumeAttachment,
-                           connector: dict) -> Dict[str, Any]:
+                           connector: dict) -> dict[str, Any]:
         try:
             self.driver.validate_connector(connector)
         except exception.InvalidConnectorException as err:
@@ -4830,7 +4832,7 @@ class VolumeManager(manager.CleanableManager,
                           context: context.RequestContext,
                           vref: objects.Volume,
                           connector: dict,
-                          attachment_id: str) -> Dict[str, Any]:
+                          attachment_id: str) -> dict[str, Any]:
         """Update/Finalize an attachment.
 
         This call updates a valid attachment record to associate with a volume
@@ -5236,7 +5238,7 @@ class VolumeManager(manager.CleanableManager,
 
     def list_replication_targets(self,
                                  ctxt: context.RequestContext,
-                                 group: objects.Group) -> Dict[str, list]:
+                                 group: objects.Group) -> dict[str, list]:
         """Provide a means to obtain replication targets for a group.
 
         This method is used to find the replication_device config
diff --git a/cinder/volume/rpcapi.py b/cinder/volume/rpcapi.py
index 7cdeccc4430..097dc16acdb 100644
--- a/cinder/volume/rpcapi.py
+++ b/cinder/volume/rpcapi.py
@@ -12,7 +12,9 @@
 #    License for the specific language governing permissions and limitations
 #    under the License.
 
-from typing import Optional, Tuple, Union  # noqa: H301
+from __future__ import annotations
+
+from typing import Optional, Union  # noqa: H301
 
 from cinder.common import constants
 from cinder import context
@@ -147,7 +149,7 @@ class VolumeAPI(rpc.RPCAPI):
 
     def _get_cctxt(self,
                    host: Optional[str] = None,
-                   version: Optional[Union[str, Tuple[str, ...]]] = None,
+                   version: Optional[Union[str, tuple[str, ...]]] = None,
                    **kwargs) -> rpc.RPCAPI:
         if host:
             server = volume_utils.extract_host(host)
diff --git a/cinder/volume/volume_types.py b/cinder/volume/volume_types.py
index aeec7a47a4d..2f5993d2adc 100644
--- a/cinder/volume/volume_types.py
+++ b/cinder/volume/volume_types.py
@@ -19,7 +19,9 @@
 
 """Built-in volume type properties."""
 
-import typing as ty
+from __future__ import annotations
+
+from typing import Optional
 
 from oslo_config import cfg
 from oslo_db import exception as db_exc
@@ -330,7 +332,7 @@ def get_volume_type_qos_specs(volume_type_id):
 
 def volume_types_diff(context: context.RequestContext,
                       vol_type_id1,
-                      vol_type_id2) -> ty.Tuple[dict, bool]:
+                      vol_type_id2) -> tuple[dict, bool]:
     """Returns a 'diff' of two volume types and whether they are equal.
 
     Returns a tuple of (diff, equal), where 'equal' is a boolean indicating
@@ -370,8 +372,8 @@ def volume_types_diff(context: context.RequestContext,
                 encryption.pop(param, None)
         return encryption
 
-    def _dict_diff(dict1: ty.Optional[dict],
-                   dict2: ty.Optional[dict]) -> ty.Tuple[dict, bool]:
+    def _dict_diff(dict1: Optional[dict],
+                   dict2: Optional[dict]) -> tuple[dict, bool]:
         res = {}
         equal = True
         if dict1 is None:
diff --git a/cinder/volume/volume_utils.py b/cinder/volume/volume_utils.py
index 1384fe455b9..01c138206ae 100644
--- a/cinder/volume/volume_utils.py
+++ b/cinder/volume/volume_utils.py
@@ -14,6 +14,8 @@
 
 """Volume-related Utilities and helpers."""
 
+from __future__ import annotations  # Remove when only supporting python 3.9+
+
 import abc
 import ast
 import functools
@@ -31,8 +33,8 @@ import tempfile
 import time
 import types
 import typing
-from typing import Any, BinaryIO, Callable, Dict, IO  # noqa: H301
-from typing import List, Optional, Tuple, Union  # noqa: H301
+from typing import Any, BinaryIO, Callable, IO  # noqa: H301
+from typing import Optional, Union  # noqa: H301
 import uuid
 
 from castellan.common.credentials import keystone_password
@@ -242,8 +244,8 @@ def notify_about_snapshot_usage(context: context.RequestContext,
                                             usage_info)
 
 
-def _usage_from_capacity(capacity: Dict[str, Any],
-                         **extra_usage_info) -> Dict[str, Any]:
+def _usage_from_capacity(capacity: dict[str, Any],
+                         **extra_usage_info) -> dict[str, Any]:
 
     capacity_info = {
         'name_to_id': capacity['name_to_id'],
@@ -686,7 +688,7 @@ def get_all_volume_groups(vg_name=None) -> list:
 
 def extract_availability_zones_from_volume_type(
         volume_type: Union['objects.VolumeType', dict]) \
-        -> Optional[List[str]]:
+        -> Optional[list[str]]:
     if not volume_type:
         return None
     extra_specs = volume_type.get('extra_specs', {})
@@ -705,7 +707,7 @@ DEFAULT_PASSWORD_SYMBOLS = ('23456789',  # Removed: 0,1
 
 def generate_password(
         length: int = 16,
-        symbolgroups: Tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
+        symbolgroups: tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
     """Generate a random password from the supplied symbol groups.
 
     At least one symbol from each group will be included. Unpredictable
@@ -744,7 +746,7 @@ def generate_password(
 
 def generate_username(
         length: int = 20,
-        symbolgroups: Tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
+        symbolgroups: tuple[str, ...] = DEFAULT_PASSWORD_SYMBOLS) -> str:
     # Use the same implementation as the password generation.
     return generate_password(length, symbolgroups)
 
@@ -864,12 +866,12 @@ def extract_id_from_snapshot_name(snap_name: str) -> Optional[str]:
     return match.group('uuid') if match else None
 
 
-def paginate_entries_list(entries: List[Dict],
+def paginate_entries_list(entries: list[dict],
                           marker: Optional[Union[dict, str]],
                           limit: int,
                           offset: Optional[int],
-                          sort_keys: List[str],
-                          sort_dirs: List[str]) -> list:
+                          sort_keys: list[str],
+                          sort_dirs: list[str]) -> list:
     """Paginate a list of entries.
 
     :param entries: list of dictionaries
@@ -1096,7 +1098,7 @@ def get_max_over_subscription_ratio(
     return mosr
 
 
-def check_image_metadata(image_meta: Dict[str, Union[str, int]],
+def check_image_metadata(image_meta: dict[str, Union[str, int]],
                          vol_size: int) -> None:
     """Validates the image metadata."""
     # Check whether image is active
@@ -1136,7 +1138,7 @@ def enable_bootable_flag(volume: 'objects.Volume') -> None:
 
 
 def get_volume_image_metadata(image_id: str,
-                              image_meta: Dict[str, Any]) -> dict:
+                              image_meta: dict[str, Any]) -> dict:
 
     # Save some base attributes into the volume metadata
     base_metadata = {
@@ -1172,7 +1174,7 @@ def copy_image_to_volume(driver,
                          context: context.RequestContext,
                          volume: 'objects.Volume',
                          image_meta: dict,
-                         image_location: Union[str, Tuple[Optional[str], Any]],
+                         image_location: Union[str, tuple[Optional[str], Any]],
                          image_service) -> None:
     """Downloads Glance image to the specified volume."""
     image_id = image_meta['id']