Merge "Enable mypy on nova/utils.py"
This commit is contained in:
@@ -28,6 +28,7 @@ import re
|
||||
import shutil
|
||||
import tempfile
|
||||
import time
|
||||
import typing as ty
|
||||
|
||||
from eventlet import tpool
|
||||
import futurist
|
||||
@@ -75,11 +76,16 @@ SM_SKIP_KEYS = (
|
||||
'img_mappings', 'img_block_device_mapping',
|
||||
)
|
||||
|
||||
_FILE_CACHE = {}
|
||||
_FILE_CACHE: dict[str, dict] = {}
|
||||
|
||||
_SERVICE_TYPES = service_types.ServiceTypes()
|
||||
|
||||
DEFAULT_EXECUTOR = None
|
||||
# NOTE(gibi): futurist does not expose the common based class.
|
||||
# NOTE(gibi): we can simplify this when eventlet is removed.
|
||||
Executor: ty.TypeAlias = (
|
||||
futurist.GreenThreadPoolExecutor | futurist.ThreadPoolExecutor)
|
||||
|
||||
DEFAULT_EXECUTOR: Executor | None = None
|
||||
|
||||
|
||||
def cooperative_yield():
|
||||
@@ -105,7 +111,7 @@ def destroy_default_executor():
|
||||
DEFAULT_EXECUTOR = None
|
||||
|
||||
|
||||
def create_executor(max_workers):
|
||||
def create_executor(max_workers) -> Executor:
|
||||
if concurrency_mode_threading():
|
||||
executor = futurist.ThreadPoolExecutor(max_workers)
|
||||
else:
|
||||
@@ -113,7 +119,7 @@ def create_executor(max_workers):
|
||||
return executor
|
||||
|
||||
|
||||
def _get_default_executor():
|
||||
def _get_default_executor() -> Executor:
|
||||
global DEFAULT_EXECUTOR
|
||||
|
||||
if not DEFAULT_EXECUTOR:
|
||||
@@ -563,7 +569,9 @@ def _serialize_profile_info():
|
||||
return trace_info
|
||||
|
||||
|
||||
def spawn(func, *args, **kwargs) -> futurist.Future:
|
||||
def spawn(
|
||||
func: ty.Callable[..., ty.Any], *args: ty.Any, **kwargs: ty.Any
|
||||
) -> futurist.Future:
|
||||
"""Passthrough method for eventlet.spawn.
|
||||
|
||||
This utility exists so that it can be stubbed for testing without
|
||||
@@ -577,7 +585,11 @@ def spawn(func, *args, **kwargs) -> futurist.Future:
|
||||
return spawn_on(_get_default_executor(), func, *args, **kwargs)
|
||||
|
||||
|
||||
def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future:
|
||||
def spawn_after(
|
||||
seconds: float,
|
||||
func: ty.Callable[..., ty.Any],
|
||||
*args: ty.Any, **kwargs: ty.Any
|
||||
) -> futurist.Future:
|
||||
"""Executing the function asynchronously after the given time."""
|
||||
|
||||
def delayed(*args, **kwargs):
|
||||
@@ -587,7 +599,7 @@ def spawn_after(seconds, func, *args, **kwargs) -> futurist.Future:
|
||||
return spawn(delayed, *args, **kwargs)
|
||||
|
||||
|
||||
def _executor_is_full(executor):
|
||||
def _executor_is_full(executor: Executor) -> bool:
|
||||
if concurrency_mode_threading():
|
||||
# TODO(gibi): Move this whole logic to futurist ThreadPoolExecutor
|
||||
# so that we can avoid accessing the internals of the executor
|
||||
@@ -599,7 +611,11 @@ def _executor_is_full(executor):
|
||||
return False
|
||||
|
||||
|
||||
def spawn_on(executor, func, *args, **kwargs) -> futurist.Future:
|
||||
def spawn_on(
|
||||
executor: Executor,
|
||||
func: ty.Callable[..., ty.Any],
|
||||
*args: ty.Any, **kwargs: ty.Any,
|
||||
) -> futurist.Future:
|
||||
"""Passthrough method to run func on a thread in a given executor.
|
||||
|
||||
It will also grab the context from the threadlocal store and add it to
|
||||
@@ -1089,6 +1105,7 @@ def run_once(message, logger, cleanup=None):
|
||||
logger and cleanup function will be propagated to the caller.
|
||||
"""
|
||||
def outer_wrapper(func):
|
||||
@ty.no_type_check
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if not wrapper.called:
|
||||
@@ -1138,6 +1155,7 @@ def latch_error_on_raise(retryable=(_SentinelException,)):
|
||||
"""
|
||||
|
||||
def outer_wrapper(func):
|
||||
@ty.no_type_check
|
||||
@functools.wraps(func)
|
||||
def wrapper(*args, **kwargs):
|
||||
if wrapper.error:
|
||||
@@ -1272,10 +1290,10 @@ def concurrency_mode_threading():
|
||||
return not monkey_patch.is_patched()
|
||||
|
||||
|
||||
SCATTER_GATHER_EXECUTOR = None
|
||||
SCATTER_GATHER_EXECUTOR: Executor | None = None
|
||||
|
||||
|
||||
def get_scatter_gather_executor():
|
||||
def get_scatter_gather_executor() -> Executor:
|
||||
"""Returns the executor used for scatter/gather operations."""
|
||||
global SCATTER_GATHER_EXECUTOR
|
||||
|
||||
@@ -1313,10 +1331,10 @@ def destroy_scatter_gather_executor():
|
||||
SCATTER_GATHER_EXECUTOR = None
|
||||
|
||||
|
||||
CACHE_IMAGES_EXECUTOR = None
|
||||
CACHE_IMAGES_EXECUTOR: Executor | None = None
|
||||
|
||||
|
||||
def get_cache_images_executor():
|
||||
def get_cache_images_executor() -> Executor:
|
||||
"""Returns the executor used for cache images operations."""
|
||||
global CACHE_IMAGES_EXECUTOR
|
||||
|
||||
@@ -1350,7 +1368,7 @@ def destroy_cache_images_executor():
|
||||
CACHE_IMAGES_EXECUTOR = None
|
||||
|
||||
|
||||
def _log_executor_stats(executor):
|
||||
def _log_executor_stats(executor: Executor) -> None:
|
||||
if CONF.thread_pool_statistic_period < 0:
|
||||
return
|
||||
|
||||
|
||||
@@ -137,6 +137,7 @@ files = [
|
||||
"nova/scheduler/client/report.py",
|
||||
"nova/scheduler/request_filter.py",
|
||||
"nova/scheduler/utils.py",
|
||||
"nova/utils.py",
|
||||
"nova/virt/driver.py",
|
||||
"nova/virt/hardware.py",
|
||||
"nova/virt/libvirt/machine_type_utils.py",
|
||||
|
||||
Reference in New Issue
Block a user