Merge "mypy: set no_implicit_optional"

This commit is contained in:
Zuul 2022-05-13 03:12:13 +00:00 committed by Gerrit Code Review
commit 693f25fc23
14 changed files with 94 additions and 83 deletions

View File

@ -119,12 +119,12 @@ class API(base.Base):
def get_all(self,
context: context.RequestContext,
search_opts: dict = None,
marker: str = None,
limit: int = None,
offset: int = None,
sort_keys: List[str] = None,
sort_dirs: List[str] = None) -> 'objects.BackupList':
search_opts: Optional[dict] = None,
marker: Optional[str] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort_keys: Optional[List[str]] = None,
sort_dirs: Optional[List[str]] = None) -> 'objects.BackupList':
context.authorize(policy.GET_ALL_POLICY)
search_opts = search_opts or {}
@ -223,10 +223,10 @@ class API(base.Base):
volume_id: str,
container: str,
incremental: bool = False,
availability_zone: str = None,
availability_zone: Optional[str] = None,
force: bool = False,
snapshot_id: Optional[str] = None,
metadata: dict = None) -> 'objects.Backup':
metadata: Optional[dict] = None) -> 'objects.Backup':
"""Make the RPC call to create a volume backup."""
volume = self.volume_api.get(context, volume_id)
context.authorize(policy.CREATE_POLICY, target_obj=volume)
@ -365,8 +365,8 @@ class API(base.Base):
def restore(self,
context: context.RequestContext,
backup_id: str,
volume_id: str = None,
name: str = None) -> dict:
volume_id: Optional[str] = None,
name: Optional[str] = None) -> dict:
"""Make the RPC call to restore a volume backup."""
backup = self.get(context, backup_id)
context.authorize(policy.RESTORE_POLICY, target_obj=backup)

View File

@ -21,7 +21,7 @@ import inspect
import os
import re
import sys
from typing import Callable
from typing import Callable, Optional # noqa: H301
import uuid
import decorator
@ -56,7 +56,7 @@ class Coordinator(object):
meaningful prefix.
"""
def __init__(self, agent_id: str = None, prefix: str = ''):
def __init__(self, agent_id: Optional[str] = None, prefix: str = ''):
self.coordinator = None
self.agent_id = agent_id or str(uuid.uuid4())
self.started = False

View File

@ -22,7 +22,7 @@ SHOULD include dedicated exception logging.
"""
from typing import Union
from typing import Optional, Union # noqa: H301
from oslo_log import log as logging
from oslo_versionedobjects import exception as obj_exc
@ -72,7 +72,7 @@ class CinderException(Exception):
headers: dict = {}
safe = False
def __init__(self, message: Union[str, tuple] = None, **kwargs):
def __init__(self, message: Optional[Union[str, tuple]] = None, **kwargs):
self.kwargs = kwargs
self.kwargs['message'] = message

View File

@ -183,10 +183,11 @@ def _get_qemu_convert_luks_cmd(src: str,
src_format: Optional[str] = None,
out_subformat: Optional[str] = None,
cache_mode: Optional[str] = None,
prefix: tuple = None,
prefix: Optional[tuple] = None,
cipher_spec: Optional[dict] = None,
passphrase_file: Optional[str] = None,
src_passphrase_file: str = None) -> List[str]:
src_passphrase_file: Optional[str] = None) \
-> List[str]:
cmd = ['qemu-img', 'convert']
@ -214,14 +215,15 @@ def _get_qemu_convert_luks_cmd(src: str,
def _get_qemu_convert_cmd(src: str,
dest: str,
out_format: str,
src_format: str = None,
out_subformat: str = None,
cache_mode: str = None,
prefix: tuple = None,
cipher_spec: dict = None,
passphrase_file: str = None,
src_format: Optional[str] = None,
out_subformat: Optional[str] = None,
cache_mode: Optional[str] = None,
prefix: Optional[tuple] = None,
cipher_spec: Optional[dict] = None,
passphrase_file: Optional[str] = None,
compress: bool = False,
src_passphrase_file: str = None) -> List[str]:
src_passphrase_file: Optional[str] = None) \
-> List[str]:
if src_passphrase_file is not None:
if passphrase_file is None:
@ -314,13 +316,13 @@ def _convert_image(prefix: tuple,
source: str,
dest: str,
out_format: str,
out_subformat: str = None,
src_format: str = None,
out_subformat: Optional[str] = None,
src_format: Optional[str] = None,
run_as_root: bool = True,
cipher_spec: dict = None,
passphrase_file: str = None,
cipher_spec: Optional[dict] = None,
passphrase_file: Optional[str] = None,
compress: bool = False,
src_passphrase_file: str = None) -> None:
src_passphrase_file: Optional[str] = None) -> None:
"""Convert image to other format.
NOTE: If the qemu-img convert command fails and this function raises an
@ -421,14 +423,14 @@ def _convert_image(prefix: tuple,
def convert_image(source: str,
dest: str,
out_format: str,
out_subformat: str = None,
src_format: str = None,
out_subformat: Optional[str] = None,
src_format: Optional[str] = None,
run_as_root: bool = True,
throttle=None,
cipher_spec: dict = None,
passphrase_file: str = None,
cipher_spec: Optional[dict] = None,
passphrase_file: Optional[str] = None,
compress: bool = False,
src_passphrase_file: str = None) -> None:
src_passphrase_file: Optional[str] = None) -> None:
if not throttle:
throttle = throttling.Throttle.get_default()
with throttle.subcommand(source, dest) as throttle_cmd:
@ -447,7 +449,7 @@ def convert_image(source: str,
def resize_image(source: str,
size: int,
run_as_root: bool = False,
file_format: str = None) -> None:
file_format: Optional[str] = None) -> None:
"""Changes the virtual size of the image."""
cmd: Tuple[str, ...]
if file_format:
@ -805,7 +807,7 @@ def upload_volume(context: context.RequestContext,
volume_format: str = 'raw',
run_as_root: bool = True,
compress: bool = True,
store_id: str = None,
store_id: Optional[str] = None,
base_image_ref: Optional[str] = None) -> None:
# NOTE: You probably want to use volume_utils.upload_volume(),
# not this function.

View File

@ -29,8 +29,8 @@ LOG = logging.getLogger(__name__)
def normalize(weight_list: List[float],
minval: float = None,
maxval: float = None) -> Iterable[float]:
minval: Optional[float] = None,
maxval: Optional[float] = None) -> Iterable[float]:
"""Normalize the values in a list between 0 and 1.0.
The normalization is made regarding the lower and upper values present in

View File

@ -485,7 +485,8 @@ class FilterScheduler(driver.Scheduler):
def _get_weighted_candidates_by_group_type(
self, context: context.RequestContext, group_spec: dict,
group_filter_properties: dict = None) -> List[WeighedHost]:
group_filter_properties: Optional[dict] = None) \
-> List[WeighedHost]:
"""Finds backends that supports the group type.
Returns a list of backends that meet the required specs,

View File

@ -72,7 +72,7 @@ LOG = logging.getLogger(__name__)
class ReadOnlyDict(abc.Mapping):
"""A read-only dict."""
def __init__(self, source: Union[dict, 'ReadOnlyDict'] = None):
def __init__(self, source: Optional[Union[dict, 'ReadOnlyDict']] = None):
self.data: dict
if source is not None:
self.data = dict(source)

View File

@ -234,7 +234,7 @@ def check_metadata_properties(
raise exception.InvalidVolumeMetadataSize(reason=msg)
def last_completed_audit_period(unit: str = None) -> \
def last_completed_audit_period(unit: Optional[str] = None) -> \
Tuple[Union[datetime.datetime, datetime.timedelta],
Union[datetime.datetime, datetime.timedelta]]:
"""This method gives you the most recently *completed* audit period.
@ -374,7 +374,9 @@ def monkey_patch() -> None:
decorator("%s.%s" % (module, key), func))
def make_dev_path(dev: str, partition: str = None, base: str = '/dev') -> str:
def make_dev_path(dev: str,
partition: Optional[str] = None,
base: str = '/dev') -> str:
"""Return a path to a particular device.
>>> make_dev_path('xvdc')
@ -434,7 +436,8 @@ def robust_file_write(directory: str, filename: str, data: str) -> None:
@contextlib.contextmanager
def temporary_chown(path: str, owner_uid: int = None) -> Iterator[None]:
def temporary_chown(path: str,
owner_uid: Optional[int] = None) -> Iterator[None]:
"""Temporarily chown a path.
:params owner_uid: UID of temporary owner (defaults to current user)
@ -493,7 +496,7 @@ def get_file_size(path: str) -> int:
def _get_disk_of_partition(
devpath: str,
st: os.stat_result = None) -> Tuple[str, os.stat_result]:
st: Optional[os.stat_result] = None) -> Tuple[str, os.stat_result]:
"""Gets a disk device path and status from partition path.
Returns a disk device path from a partition device path, and stat for
@ -558,7 +561,7 @@ def get_blkdev_major_minor(path: str,
def check_string_length(value: str, name: str, min_length: int = 0,
max_length: int = None,
max_length: Optional[int] = None,
allow_all_spaces: bool = True) -> None:
"""Check the length of specified string.
@ -684,7 +687,7 @@ def convert_str(text: Union[str, bytes]) -> str:
def build_or_str(elements: Union[None, str, Iterable[str]],
str_format: str = None) -> str:
str_format: Optional[str] = None) -> str:
"""Builds a string of elements joined by 'or'.
Will join strings with the 'or' word and if a str_format is provided it

View File

@ -219,10 +219,10 @@ class API(base.Base):
name: Optional[str],
description: Optional[str],
snapshot: Optional[objects.Snapshot] = None,
image_id: str = None,
image_id: Optional[str] = None,
volume_type: Optional[objects.VolumeType] = None,
metadata: Optional[dict] = None,
availability_zone: str = None,
availability_zone: Optional[str] = None,
source_volume: Optional[objects.Volume] = None,
scheduler_hints=None,
source_replica=None,
@ -1155,7 +1155,7 @@ class API(base.Base):
name: str,
description: str,
cgsnapshot_id: str,
group_snapshot_id: str = None) -> Dict[str, Any]:
group_snapshot_id: Optional[str] = None) -> Dict[str, Any]:
options = {'volume_id': volume['id'],
'cgsnapshot_id': cgsnapshot_id,
'group_snapshot_id': group_snapshot_id,
@ -1176,7 +1176,7 @@ class API(base.Base):
volume: objects.Volume,
name: str,
description: str,
metadata: Dict[str, Any] = None,
metadata: Optional[Dict[str, Any]] = None,
cgsnapshot_id: Optional[str] = None,
group_snapshot_id: Optional[str] = None,
allow_in_use: bool = False) -> objects.Snapshot:
@ -1193,7 +1193,7 @@ class API(base.Base):
volume: objects.Volume,
name: str,
description: str,
metadata: Dict[str, Any] = None) -> objects.Snapshot:
metadata: Optional[Dict[str, Any]] = None) -> objects.Snapshot:
result = self._create_snapshot(context, volume, name, description,
True, metadata)
LOG.info("Snapshot force create request issued successfully.",

View File

@ -160,11 +160,11 @@ class RBDVolumeProxy(object):
def __init__(self,
driver: 'RBDDriver',
name: str,
pool: str = None,
snapshot: str = None,
pool: Optional[str] = None,
snapshot: Optional[str] = None,
read_only: bool = False,
remote: Optional[Dict[str, str]] = None,
timeout: int = None,
timeout: Optional[int] = None,
client: 'rados.Rados' = None,
ioctx: 'rados.Ioctx' = None):
self._close_conn = not (client and ioctx)
@ -206,7 +206,9 @@ class RBDVolumeProxy(object):
class RADOSClient(object):
"""Context manager to simplify error handling for connecting to ceph."""
def __init__(self, driver: 'RBDDriver', pool: str = None) -> None:
def __init__(self,
driver: 'RBDDriver',
pool: Optional[str] = None) -> None:
self.driver = driver
self.cluster, self.ioctx = driver._connect_to_rados(pool)
@ -247,7 +249,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
STORAGE_PROTOCOL = constants.CEPH
def __init__(self,
active_backend_id: str = None,
active_backend_id: Optional[str] = None,
*args,
**kwargs) -> None:
super(RBDDriver, self).__init__(*args, **kwargs)
@ -1535,9 +1537,9 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
result.append(demoted)
return result
def _get_failover_target_config(self,
secondary_id: str = None) -> Tuple[str,
dict]:
def _get_failover_target_config(
self,
secondary_id: Optional[str] = None) -> Tuple[str, dict]:
if not secondary_id:
# In auto mode exclude failback and active
candidates = set(self._target_names).difference(
@ -1591,7 +1593,7 @@ class RBDDriver(driver.CloneableImageVD, driver.MigrateVD,
def failover_host(self,
context: context.RequestContext,
volumes: List[Volume],
secondary_id: str = None,
secondary_id: Optional[str] = None,
groups: Optional[List] = None) -> Tuple[str,
List[Volume],
List]:

View File

@ -257,7 +257,8 @@ class ExtractVolumeRequestTask(flow_utils.CinderTask):
snapshot,
source_volume,
group: Optional[dict],
volume_type: Dict[str, Any] = None) -> Tuple[List[str], bool]:
volume_type: Optional[Dict[str, Any]] = None) -> Tuple[List[str],
bool]:
"""Extracts and returns a validated availability zone list.
This function will extract the availability zone (if not provided) from

View File

@ -146,8 +146,8 @@ class VolumeAPI(rpc.RPCAPI):
BINARY = constants.VOLUME_BINARY
def _get_cctxt(self,
host: str = None,
version: Union[str, Tuple[str, ...]] = None,
host: Optional[str] = None,
version: Optional[Union[str, Tuple[str, ...]]] = None,
**kwargs) -> rpc.RPCAPI:
if host:
server = volume_utils.extract_host(host)

View File

@ -164,8 +164,8 @@ def _usage_from_backup(backup: 'objects.Backup', **kw) -> dict:
def notify_about_volume_usage(context: context.RequestContext,
volume: 'objects.Volume',
event_suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -182,8 +182,8 @@ def notify_about_volume_usage(context: context.RequestContext,
def notify_about_backup_usage(context: context.RequestContext,
backup: 'objects.Backup',
event_suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -227,8 +227,8 @@ def _usage_from_snapshot(snapshot: 'objects.Snapshot',
def notify_about_snapshot_usage(context: context.RequestContext,
snapshot: 'objects.Snapshot',
event_suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -263,8 +263,8 @@ def _usage_from_capacity(capacity: Dict[str, Any],
def notify_about_capacity_usage(context: context.RequestContext,
capacity: dict,
suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -292,11 +292,12 @@ def _usage_from_consistencygroup(group_ref: 'objects.Group', **kw) -> dict:
@utils.if_notifications_enabled
def notify_about_consistencygroup_usage(context: context.RequestContext,
group: 'objects.Group',
event_suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
def notify_about_consistencygroup_usage(
context: context.RequestContext,
group: 'objects.Group',
event_suffix: str,
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -330,8 +331,8 @@ def _usage_from_group(group_ref: 'objects.Group', **kw) -> dict:
def notify_about_group_usage(context: context.RequestContext,
group: 'objects.Group',
event_suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -381,8 +382,8 @@ def _usage_from_group_snapshot(group_snapshot: 'objects.GroupSnapshot',
def notify_about_cgsnapshot_usage(context: context.RequestContext,
cgsnapshot: 'objects.CGSnapshot',
event_suffix: str,
extra_usage_info: dict = None,
host: str = None) -> None:
extra_usage_info: Optional[dict] = None,
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -403,7 +404,7 @@ def notify_about_group_snapshot_usage(context: context.RequestContext,
group_snapshot: 'objects.GroupSnapshot',
event_suffix: str,
extra_usage_info=None,
host: str = None) -> None:
host: Optional[str] = None) -> None:
if not host:
host = CONF.host
@ -633,9 +634,9 @@ def copy_volume(src: Union[str, BinaryIO],
def clear_volume(volume_size: int,
volume_path: str,
volume_clear: str = None,
volume_clear_size: int = None,
volume_clear_ionice: str = None,
volume_clear: Optional[str] = None,
volume_clear_size: Optional[int] = None,
volume_clear_ionice: Optional[str] = None,
throttle=None) -> None:
"""Unprovision old volumes to prevent data leaking between users."""
if volume_clear is None:

View File

@ -132,3 +132,4 @@ warn_unused_ignores = true
show_error_codes = true
pretty = true
html_report = mypy-report
no_implicit_optional = true