Merge "Revert "Use Python 3.8-style type hints""
This commit is contained in:
@@ -13,6 +13,7 @@
|
||||
|
||||
"""Base API Library"""
|
||||
|
||||
import builtins
|
||||
import typing as ty
|
||||
import warnings
|
||||
|
||||
@@ -203,7 +204,7 @@ class BaseAPI:
|
||||
session: ty.Optional[ksa_session.Session] = None,
|
||||
body: ty.Any = None,
|
||||
detailed: bool = False,
|
||||
headers: ty.Optional[ty.Dict[str, str]] = None,
|
||||
headers: ty.Optional[dict[str, str]] = None,
|
||||
**params: ty.Any,
|
||||
) -> ty.Union[requests.Response, ty.Any]:
|
||||
"""Return a list of resources
|
||||
@@ -290,7 +291,7 @@ class BaseAPI:
|
||||
if resource is None:
|
||||
resource = path
|
||||
|
||||
def getlist(kw: ty.Dict[str, ty.Any]) -> ty.Any:
|
||||
def getlist(kw: dict[str, ty.Any]) -> ty.Any:
|
||||
"""Do list call, unwrap resource dict if present"""
|
||||
ret = self.list(path, **kw)
|
||||
if isinstance(ret, dict) and resource in ret:
|
||||
@@ -323,9 +324,9 @@ class BaseAPI:
|
||||
def find_bulk(
|
||||
self,
|
||||
path: str,
|
||||
headers: ty.Optional[ty.Dict[str, str]] = None,
|
||||
headers: ty.Optional[dict[str, str]] = None,
|
||||
**kwargs: ty.Any,
|
||||
) -> ty.List[ty.Any]:
|
||||
) -> builtins.list[ty.Any]:
|
||||
"""Bulk load and filter locally
|
||||
|
||||
:param string path:
|
||||
@@ -377,7 +378,7 @@ class BaseAPI:
|
||||
path: str,
|
||||
value: ty.Optional[str] = None,
|
||||
attr: ty.Optional[str] = None,
|
||||
headers: ty.Optional[ty.Dict[str, str]] = None,
|
||||
headers: ty.Optional[dict[str, str]] = None,
|
||||
) -> ty.Any:
|
||||
"""Find a single resource by name or ID
|
||||
|
||||
|
@@ -39,10 +39,10 @@ class _OptionDict(ty.TypedDict):
|
||||
|
||||
|
||||
# List of plugin command line options
|
||||
OPTIONS_LIST: ty.Dict[str, _OptionDict] = {}
|
||||
OPTIONS_LIST: dict[str, _OptionDict] = {}
|
||||
|
||||
|
||||
def get_plugin_list() -> ty.FrozenSet[str]:
|
||||
def get_plugin_list() -> frozenset[str]:
|
||||
"""Gather plugin list and cache it"""
|
||||
|
||||
global PLUGIN_LIST
|
||||
@@ -52,7 +52,7 @@ def get_plugin_list() -> ty.FrozenSet[str]:
|
||||
return PLUGIN_LIST
|
||||
|
||||
|
||||
def get_options_list() -> ty.Dict[str, _OptionDict]:
|
||||
def get_options_list() -> dict[str, _OptionDict]:
|
||||
"""Gather plugin options so the help action has them available"""
|
||||
|
||||
global OPTIONS_LIST
|
||||
|
@@ -16,7 +16,7 @@
|
||||
import typing as ty
|
||||
|
||||
|
||||
_T = ty.TypeVar('_T', bound=ty.List[ty.Any])
|
||||
_T = ty.TypeVar('_T', bound=list[ty.Any])
|
||||
|
||||
|
||||
def simple_filter(
|
||||
|
@@ -29,8 +29,8 @@ LOG = logging.getLogger(__name__)
|
||||
# before auth plugins are loaded
|
||||
class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
def _auth_select_default_plugin(
|
||||
self, config: ty.Dict[str, ty.Any]
|
||||
) -> ty.Dict[str, ty.Any]:
|
||||
self, config: dict[str, ty.Any]
|
||||
) -> dict[str, ty.Any]:
|
||||
"""Select a default plugin based on supplied arguments
|
||||
|
||||
Migrated from auth.select_auth_plugin()
|
||||
@@ -64,8 +64,8 @@ class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
return config
|
||||
|
||||
def _auth_v2_arguments(
|
||||
self, config: ty.Dict[str, ty.Any]
|
||||
) -> ty.Dict[str, ty.Any]:
|
||||
self, config: dict[str, ty.Any]
|
||||
) -> dict[str, ty.Any]:
|
||||
"""Set up v2-required arguments from v3 info
|
||||
|
||||
Migrated from auth.build_auth_params()
|
||||
@@ -79,8 +79,8 @@ class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
return config
|
||||
|
||||
def _auth_v2_ignore_v3(
|
||||
self, config: ty.Dict[str, ty.Any]
|
||||
) -> ty.Dict[str, ty.Any]:
|
||||
self, config: dict[str, ty.Any]
|
||||
) -> dict[str, ty.Any]:
|
||||
"""Remove v3 arguments if present for v2 plugin
|
||||
|
||||
Migrated from clientmanager.setup_auth()
|
||||
@@ -119,8 +119,8 @@ class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
return config
|
||||
|
||||
def _auth_default_domain(
|
||||
self, config: ty.Dict[str, ty.Any]
|
||||
) -> ty.Dict[str, ty.Any]:
|
||||
self, config: dict[str, ty.Any]
|
||||
) -> dict[str, ty.Any]:
|
||||
"""Set a default domain from available arguments
|
||||
|
||||
Migrated from clientmanager.setup_auth()
|
||||
@@ -161,9 +161,7 @@ class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
config['auth']['user_domain_id'] = default_domain
|
||||
return config
|
||||
|
||||
def auth_config_hook(
|
||||
self, config: ty.Dict[str, ty.Any]
|
||||
) -> ty.Dict[str, ty.Any]:
|
||||
def auth_config_hook(self, config: dict[str, ty.Any]) -> dict[str, ty.Any]:
|
||||
"""Allow examination of config values before loading auth plugin
|
||||
|
||||
OpenStackClient will override this to perform additional checks
|
||||
@@ -183,9 +181,9 @@ class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
|
||||
def _validate_auth(
|
||||
self,
|
||||
config: ty.Dict[str, ty.Any],
|
||||
loader: ksa_loading.BaseIdentityLoader, # type: ignore
|
||||
) -> ty.Dict[str, ty.Any]:
|
||||
config: dict[str, ty.Any],
|
||||
loader: ksa_loading.BaseIdentityLoader[ty.Any],
|
||||
) -> dict[str, ty.Any]:
|
||||
"""Validate auth plugin arguments"""
|
||||
# May throw a keystoneauth1.exceptions.NoMatchingPlugin
|
||||
|
||||
@@ -250,7 +248,7 @@ class OSC_Config(config.OpenStackConfig): # type: ignore
|
||||
return config
|
||||
|
||||
# TODO(stephenfin): Add type once we have typing for SDK
|
||||
def load_auth_plugin(self, config: ty.Dict[str, ty.Any]) -> ty.Any:
|
||||
def load_auth_plugin(self, config: dict[str, ty.Any]) -> ty.Any:
|
||||
"""Get auth plugin and validate args"""
|
||||
|
||||
loader = self._get_auth_loader(config)
|
||||
|
@@ -22,47 +22,47 @@ from cliff import columns
|
||||
from osc_lib import utils
|
||||
|
||||
|
||||
class DictColumn(columns.FormattableColumn): # type: ignore
|
||||
class DictColumn(columns.FormattableColumn[dict[str, ty.Any]]):
|
||||
"""Format column for dict content"""
|
||||
|
||||
def human_readable(self) -> str:
|
||||
return utils.format_dict(self._value)
|
||||
|
||||
def machine_readable(self) -> ty.Dict[str, ty.Any]:
|
||||
def machine_readable(self) -> dict[str, ty.Any]:
|
||||
return dict(self._value or {})
|
||||
|
||||
|
||||
class DictListColumn(columns.FormattableColumn): # type: ignore
|
||||
class DictListColumn(columns.FormattableColumn[dict[str, list[ty.Any]]]):
|
||||
"""Format column for dict, key is string, value is list"""
|
||||
|
||||
def human_readable(self) -> str:
|
||||
return utils.format_dict_of_list(self._value) or ''
|
||||
|
||||
def machine_readable(self) -> ty.Dict[str, ty.List[ty.Any]]:
|
||||
def machine_readable(self) -> dict[str, list[ty.Any]]:
|
||||
return dict(self._value or {})
|
||||
|
||||
|
||||
class ListColumn(columns.FormattableColumn): # type: ignore
|
||||
class ListColumn(columns.FormattableColumn[list[ty.Any]]):
|
||||
"""Format column for list content"""
|
||||
|
||||
def human_readable(self) -> str:
|
||||
return utils.format_list(self._value) or ''
|
||||
|
||||
def machine_readable(self) -> ty.List[ty.Any]:
|
||||
def machine_readable(self) -> list[ty.Any]:
|
||||
return [x for x in self._value or []]
|
||||
|
||||
|
||||
class ListDictColumn(columns.FormattableColumn): # type: ignore
|
||||
class ListDictColumn(columns.FormattableColumn[list[dict[str, ty.Any]]]):
|
||||
"""Format column for list of dict content"""
|
||||
|
||||
def human_readable(self) -> str:
|
||||
return utils.format_list_of_dicts(self._value) or ''
|
||||
|
||||
def machine_readable(self) -> ty.List[ty.Dict[str, ty.Any]]:
|
||||
def machine_readable(self) -> list[dict[str, ty.Any]]:
|
||||
return [dict(x) for x in self._value or []]
|
||||
|
||||
|
||||
class SizeColumn(columns.FormattableColumn): # type: ignore
|
||||
class SizeColumn(columns.FormattableColumn[ty.Union[int, float]]):
|
||||
"""Format column for file size content"""
|
||||
|
||||
def human_readable(self) -> str:
|
||||
|
@@ -16,6 +16,7 @@
|
||||
"""argparse Custom Actions"""
|
||||
|
||||
import argparse
|
||||
import collections.abc
|
||||
import typing as ty
|
||||
|
||||
from osc_lib.i18n import _
|
||||
@@ -114,11 +115,11 @@ class MultiKeyValueAction(argparse.Action):
|
||||
optional_keys: ty.Optional[ty.Sequence[str]] = None,
|
||||
const: ty.Optional[_T] = None,
|
||||
default: ty.Union[_T, str, None] = None,
|
||||
type: ty.Optional[ty.Callable[[str], _T]] = None,
|
||||
choices: ty.Optional[ty.Iterable[_T]] = None,
|
||||
type: ty.Optional[collections.abc.Callable[[str], _T]] = None,
|
||||
choices: ty.Optional[collections.abc.Iterable[_T]] = None,
|
||||
required: bool = False,
|
||||
help: ty.Optional[str] = None,
|
||||
metavar: ty.Union[str, ty.Tuple[str, ...], None] = None,
|
||||
metavar: ty.Union[str, tuple[str, ...], None] = None,
|
||||
) -> None:
|
||||
"""Initialize the action object, and parse customized options
|
||||
|
||||
@@ -212,7 +213,7 @@ class MultiKeyValueAction(argparse.Action):
|
||||
if getattr(namespace, self.dest, None) is None:
|
||||
setattr(namespace, self.dest, [])
|
||||
|
||||
params: ty.Dict[str, str] = {}
|
||||
params: dict[str, str] = {}
|
||||
for kv in values.split(','):
|
||||
# Add value if an assignment else raise ArgumentError
|
||||
if '=' in kv:
|
||||
@@ -264,7 +265,7 @@ class MultiKeyValueCommaAction(MultiKeyValueAction):
|
||||
if getattr(namespace, self.dest, None) is None:
|
||||
setattr(namespace, self.dest, [])
|
||||
|
||||
params: ty.Dict[str, str] = {}
|
||||
params: dict[str, str] = {}
|
||||
key = ''
|
||||
for kv in values.split(','):
|
||||
# Add value if an assignment else raise ArgumentError
|
||||
|
@@ -75,7 +75,7 @@ class ClientManager:
|
||||
def __init__(
|
||||
self,
|
||||
cli_options: cloud_region.CloudRegion,
|
||||
api_version: ty.Optional[ty.Dict[str, str]],
|
||||
api_version: ty.Optional[dict[str, str]],
|
||||
pw_func: ty.Optional[_PasswordHelper] = None,
|
||||
app_name: ty.Optional[str] = None,
|
||||
app_version: ty.Optional[str] = None,
|
||||
@@ -283,5 +283,5 @@ class ClientManager:
|
||||
)
|
||||
return endpoint
|
||||
|
||||
def get_configuration(self) -> ty.Dict[str, ty.Any]:
|
||||
def get_configuration(self) -> dict[str, ty.Any]:
|
||||
return copy.deepcopy(self._cli_options.config)
|
||||
|
@@ -27,10 +27,10 @@ from osc_lib.i18n import _
|
||||
|
||||
class CommandMeta(abc.ABCMeta):
|
||||
def __new__(
|
||||
mcs: ty.Type['CommandMeta'],
|
||||
mcs: type['CommandMeta'],
|
||||
name: str,
|
||||
bases: ty.Tuple[ty.Type[ty.Any], ...],
|
||||
namespace: ty.Dict[str, ty.Any],
|
||||
bases: tuple[type[ty.Any], ...],
|
||||
namespace: dict[str, ty.Any],
|
||||
) -> 'CommandMeta':
|
||||
if 'log' not in namespace:
|
||||
namespace['log'] = logging.getLogger(
|
||||
|
@@ -29,7 +29,7 @@ class Timing(command.Lister):
|
||||
|
||||
def take_action(
|
||||
self, parsed_args: argparse.Namespace
|
||||
) -> ty.Tuple[ty.Tuple[str, ...], ty.List[ty.Any]]:
|
||||
) -> tuple[tuple[str, ...], list[ty.Any]]:
|
||||
column_headers = (
|
||||
'URL',
|
||||
'Seconds',
|
||||
|
@@ -14,6 +14,7 @@
|
||||
"""Application logging"""
|
||||
|
||||
import argparse
|
||||
import collections.abc
|
||||
import logging
|
||||
import sys
|
||||
import typing as ty
|
||||
@@ -22,7 +23,7 @@ import warnings
|
||||
from openstack.config import cloud_config
|
||||
|
||||
|
||||
def get_loggers() -> ty.Dict[str, str]:
|
||||
def get_loggers() -> dict[str, str]:
|
||||
loggers = {}
|
||||
for logkey in logging.Logger.manager.loggerDict.keys():
|
||||
loggers[logkey] = logging.getLevelName(logging.getLogger(logkey).level)
|
||||
@@ -56,7 +57,7 @@ def log_level_from_string(level_string: str) -> int:
|
||||
return log_level
|
||||
|
||||
|
||||
def log_level_from_config(config: ty.Mapping[str, ty.Any]) -> int:
|
||||
def log_level_from_config(config: collections.abc.Mapping[str, ty.Any]) -> int:
|
||||
# Check the command line option
|
||||
verbose_level = config.get('verbose_level')
|
||||
if config.get('debug', False):
|
||||
|
@@ -82,7 +82,7 @@ class OpenStackShell(app.App):
|
||||
client_manager: clientmanager.ClientManager
|
||||
|
||||
log = logging.getLogger(__name__)
|
||||
timing_data: ty.List[ty.Any] = []
|
||||
timing_data: list[ty.Any] = []
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
@@ -93,7 +93,7 @@ class OpenStackShell(app.App):
|
||||
stdout: ty.Optional[ty.TextIO] = None,
|
||||
stderr: ty.Optional[ty.TextIO] = None,
|
||||
interactive_app_factory: ty.Optional[
|
||||
ty.Type['interactive.InteractiveApp']
|
||||
type['interactive.InteractiveApp']
|
||||
] = None,
|
||||
deferred_help: bool = False,
|
||||
) -> None:
|
||||
@@ -127,7 +127,7 @@ class OpenStackShell(app.App):
|
||||
# Set in subclasses
|
||||
self.api_version = None
|
||||
|
||||
self.command_options: ty.List[str] = []
|
||||
self.command_options: list[str] = []
|
||||
|
||||
self.do_profile = False
|
||||
|
||||
@@ -136,7 +136,7 @@ class OpenStackShell(app.App):
|
||||
self.log_configurator = logs.LogConfigurator(self.options)
|
||||
self.dump_stack_trace = self.log_configurator.dump_trace
|
||||
|
||||
def run(self, argv: ty.List[str]) -> int:
|
||||
def run(self, argv: list[str]) -> int:
|
||||
ret_val = 1
|
||||
self.command_options = argv
|
||||
try:
|
||||
@@ -180,7 +180,7 @@ class OpenStackShell(app.App):
|
||||
f"osprofiler trace show --html {trace_id} "
|
||||
)
|
||||
|
||||
def run_subcommand(self, argv: ty.List[str]) -> int:
|
||||
def run_subcommand(self, argv: list[str]) -> int:
|
||||
self.init_profile()
|
||||
try:
|
||||
ret_value = super().run_subcommand(argv)
|
||||
@@ -200,7 +200,7 @@ class OpenStackShell(app.App):
|
||||
self,
|
||||
description: ty.Optional[str],
|
||||
version: ty.Optional[str],
|
||||
argparse_kwargs: ty.Optional[ty.Dict[str, ty.Any]] = None,
|
||||
argparse_kwargs: ty.Optional[dict[str, ty.Any]] = None,
|
||||
) -> _argparse.ArgumentParser:
|
||||
parser = super().build_option_parser(
|
||||
description,
|
||||
@@ -416,7 +416,7 @@ class OpenStackShell(app.App):
|
||||
"""
|
||||
pass
|
||||
|
||||
def initialize_app(self, argv: ty.List[str]) -> None:
|
||||
def initialize_app(self, argv: list[str]) -> None:
|
||||
"""Global app init bits:
|
||||
|
||||
* set up API versions
|
||||
@@ -570,7 +570,7 @@ class OpenStackShell(app.App):
|
||||
tcmd.run(targs)
|
||||
|
||||
|
||||
def main(argv: ty.Optional[ty.List[str]] = None) -> int:
|
||||
def main(argv: ty.Optional[list[str]] = None) -> int:
|
||||
if argv is None:
|
||||
argv = sys.argv[1:]
|
||||
return OpenStackShell().run(argv)
|
||||
|
@@ -16,6 +16,7 @@
|
||||
"""Common client utilities"""
|
||||
|
||||
import argparse
|
||||
import collections.abc
|
||||
import copy
|
||||
import functools
|
||||
import getpass
|
||||
@@ -38,10 +39,10 @@ _T = ty.TypeVar('_T')
|
||||
|
||||
|
||||
def backward_compat_col_lister(
|
||||
column_headers: ty.List[str],
|
||||
columns: ty.List[str],
|
||||
column_map: ty.Dict[str, str],
|
||||
) -> ty.List[str]:
|
||||
column_headers: list[str],
|
||||
columns: list[str],
|
||||
column_map: dict[str, str],
|
||||
) -> list[str]:
|
||||
"""Convert the column headers to keep column backward compatibility.
|
||||
|
||||
Replace the new column name of column headers by old name, so that
|
||||
@@ -74,10 +75,10 @@ def backward_compat_col_lister(
|
||||
|
||||
|
||||
def backward_compat_col_showone(
|
||||
show_object: ty.MutableMapping[str, _T],
|
||||
columns: ty.List[str],
|
||||
column_map: ty.Dict[str, str],
|
||||
) -> ty.MutableMapping[str, _T]:
|
||||
show_object: collections.abc.MutableMapping[str, _T],
|
||||
columns: list[str],
|
||||
column_map: dict[str, str],
|
||||
) -> collections.abc.MutableMapping[str, _T]:
|
||||
"""Convert the output object to keep column backward compatibility.
|
||||
|
||||
Replace the new column name of output object by old name, so that
|
||||
@@ -107,7 +108,7 @@ def backward_compat_col_showone(
|
||||
return show_object
|
||||
|
||||
|
||||
def build_kwargs_dict(arg_name: str, value: _T) -> ty.Dict[str, _T]:
|
||||
def build_kwargs_dict(arg_name: str, value: _T) -> dict[str, _T]:
|
||||
"""Return a dictionary containing `arg_name` if `value` is set."""
|
||||
kwargs = {}
|
||||
if value:
|
||||
@@ -116,10 +117,10 @@ def build_kwargs_dict(arg_name: str, value: _T) -> ty.Dict[str, _T]:
|
||||
|
||||
|
||||
def calculate_header_and_attrs(
|
||||
column_headers: ty.Sequence[str],
|
||||
attrs: ty.Sequence[str],
|
||||
column_headers: collections.abc.Sequence[str],
|
||||
attrs: collections.abc.Sequence[str],
|
||||
parsed_args: argparse.Namespace,
|
||||
) -> ty.Tuple[ty.Sequence[str], ty.Sequence[str]]:
|
||||
) -> tuple[collections.abc.Sequence[str], collections.abc.Sequence[str]]:
|
||||
"""Calculate headers and attribute names based on parsed_args.column.
|
||||
|
||||
When --column (-c) option is specified, this function calculates
|
||||
@@ -172,10 +173,10 @@ def env(*vars: str, **kwargs: ty.Any) -> ty.Optional[str]:
|
||||
|
||||
|
||||
def find_min_match(
|
||||
items: ty.Sequence[_T],
|
||||
items: collections.abc.Sequence[_T],
|
||||
sort_attr: str,
|
||||
**kwargs: ty.Any,
|
||||
) -> ty.Sequence[_T]:
|
||||
) -> collections.abc.Sequence[_T]:
|
||||
"""Find all resources meeting the given minimum constraints
|
||||
|
||||
:param items: A List of objects to consider
|
||||
@@ -327,7 +328,7 @@ def find_resource(
|
||||
|
||||
|
||||
def format_dict(
|
||||
data: ty.Dict[str, ty.Any], prefix: ty.Optional[str] = None
|
||||
data: dict[str, ty.Any], prefix: ty.Optional[str] = None
|
||||
) -> str:
|
||||
"""Return a formatted string of key value pairs
|
||||
|
||||
@@ -357,7 +358,7 @@ def format_dict(
|
||||
|
||||
|
||||
def format_dict_of_list(
|
||||
data: ty.Optional[ty.Dict[str, ty.List[ty.Any]]], separator: str = '; '
|
||||
data: ty.Optional[dict[str, list[ty.Any]]], separator: str = '; '
|
||||
) -> ty.Optional[str]:
|
||||
"""Return a formatted string of key value pair
|
||||
|
||||
@@ -384,7 +385,7 @@ def format_dict_of_list(
|
||||
|
||||
|
||||
def format_list(
|
||||
data: ty.Optional[ty.List[ty.Any]], separator: str = ', '
|
||||
data: ty.Optional[list[ty.Any]], separator: str = ', '
|
||||
) -> ty.Optional[str]:
|
||||
"""Return a formatted strings
|
||||
|
||||
@@ -399,7 +400,7 @@ def format_list(
|
||||
|
||||
|
||||
def format_list_of_dicts(
|
||||
data: ty.Optional[ty.List[ty.Dict[str, ty.Any]]],
|
||||
data: ty.Optional[list[dict[str, ty.Any]]],
|
||||
) -> ty.Optional[str]:
|
||||
"""Return a formatted string of key value pairs for each dict
|
||||
|
||||
@@ -444,7 +445,7 @@ def format_size(size: ty.Union[int, float, None]) -> str:
|
||||
def get_client_class(
|
||||
api_name: str,
|
||||
version: ty.Union[str, int, float],
|
||||
version_map: ty.Dict[str, ty.Type[_T]],
|
||||
version_map: dict[str, type[_T]],
|
||||
) -> ty.Any:
|
||||
"""Returns the client class for the requested API version
|
||||
|
||||
@@ -482,13 +483,13 @@ def get_client_class(
|
||||
|
||||
|
||||
def get_dict_properties(
|
||||
item: ty.Dict[str, _T],
|
||||
fields: ty.Sequence[str],
|
||||
mixed_case_fields: ty.Optional[ty.Sequence[str]] = None,
|
||||
formatters: ty.Optional[ # type: ignore
|
||||
ty.Dict[str, ty.Type[cliff_columns.FormattableColumn]]
|
||||
item: dict[str, _T],
|
||||
fields: collections.abc.Sequence[str],
|
||||
mixed_case_fields: ty.Optional[collections.abc.Sequence[str]] = None,
|
||||
formatters: ty.Optional[
|
||||
dict[str, type[cliff_columns.FormattableColumn[ty.Any]]]
|
||||
] = None,
|
||||
) -> ty.Tuple[ty.Any, ...]:
|
||||
) -> tuple[ty.Any, ...]:
|
||||
"""Return a tuple containing the item properties.
|
||||
|
||||
:param item: a single dict resource
|
||||
@@ -534,13 +535,13 @@ def get_dict_properties(
|
||||
|
||||
|
||||
def get_item_properties(
|
||||
item: ty.Dict[str, _T],
|
||||
fields: ty.Sequence[str],
|
||||
mixed_case_fields: ty.Optional[ty.Sequence[str]] = None,
|
||||
formatters: ty.Optional[ # type: ignore
|
||||
ty.Dict[str, ty.Type[cliff_columns.FormattableColumn]]
|
||||
item: dict[str, _T],
|
||||
fields: collections.abc.Sequence[str],
|
||||
mixed_case_fields: ty.Optional[collections.abc.Sequence[str]] = None,
|
||||
formatters: ty.Optional[
|
||||
dict[str, type[cliff_columns.FormattableColumn[ty.Any]]]
|
||||
] = None,
|
||||
) -> ty.Tuple[ty.Any, ...]:
|
||||
) -> tuple[ty.Any, ...]:
|
||||
"""Return a tuple containing the item properties.
|
||||
|
||||
:param item: a single item resource (e.g. Server, Project, etc)
|
||||
@@ -655,10 +656,10 @@ def read_blob_file_contents(blob_file: str) -> str:
|
||||
|
||||
|
||||
def sort_items(
|
||||
items: ty.Sequence[_T],
|
||||
items: collections.abc.Sequence[_T],
|
||||
sort_str: str,
|
||||
sort_type: ty.Optional[ty.Type[ty.Any]] = None,
|
||||
) -> ty.Sequence[_T]:
|
||||
sort_type: ty.Optional[type[ty.Any]] = None,
|
||||
) -> collections.abc.Sequence[_T]:
|
||||
"""Sort items based on sort keys and sort directions given by sort_str.
|
||||
|
||||
:param items: a list or generator object of items
|
||||
@@ -719,11 +720,11 @@ def wait_for_delete(
|
||||
manager: ty.Any,
|
||||
res_id: str,
|
||||
status_field: str = 'status',
|
||||
error_status: ty.Sequence[str] = ['error'],
|
||||
exception_name: ty.Sequence[str] = ['NotFound'],
|
||||
error_status: collections.abc.Sequence[str] = ['error'],
|
||||
exception_name: collections.abc.Sequence[str] = ['NotFound'],
|
||||
sleep_time: int = 5,
|
||||
timeout: int = 300,
|
||||
callback: ty.Optional[ty.Callable[[int], None]] = None,
|
||||
callback: ty.Optional[collections.abc.Callable[[int], None]] = None,
|
||||
) -> bool:
|
||||
"""Wait for resource deletion
|
||||
|
||||
@@ -775,13 +776,13 @@ def wait_for_delete(
|
||||
|
||||
|
||||
def wait_for_status(
|
||||
status_f: ty.Callable[[str], object],
|
||||
status_f: collections.abc.Callable[[str], object],
|
||||
res_id: str,
|
||||
status_field: str = 'status',
|
||||
success_status: ty.Sequence[str] = ['active'],
|
||||
error_status: ty.Sequence[str] = ['error'],
|
||||
success_status: collections.abc.Sequence[str] = ['active'],
|
||||
error_status: collections.abc.Sequence[str] = ['error'],
|
||||
sleep_time: int = 5,
|
||||
callback: ty.Optional[ty.Callable[[int], None]] = None,
|
||||
callback: ty.Optional[collections.abc.Callable[[int], None]] = None,
|
||||
) -> bool:
|
||||
"""Wait for status change on a resource during a long-running operation
|
||||
|
||||
@@ -818,9 +819,9 @@ def wait_for_status(
|
||||
|
||||
def get_osc_show_columns_for_sdk_resource(
|
||||
sdk_resource: resource.Resource,
|
||||
osc_column_map: ty.Dict[str, str],
|
||||
invisible_columns: ty.Optional[ty.Sequence[str]] = None,
|
||||
) -> ty.Tuple[ty.Tuple[str, ...], ty.Tuple[str, ...]]:
|
||||
osc_column_map: dict[str, str],
|
||||
invisible_columns: ty.Optional[collections.abc.Sequence[str]] = None,
|
||||
) -> tuple[tuple[str, ...], tuple[str, ...]]:
|
||||
"""Get and filter the display and attribute columns for an SDK resource.
|
||||
|
||||
Common utility function for preparing the output of an OSC show command.
|
||||
|
@@ -19,8 +19,8 @@ LIST_LONG_ONLY = 'long_only'
|
||||
|
||||
|
||||
def get_column_definitions(
|
||||
attr_map: ty.List[ty.Tuple[str, str, str]], long_listing: bool
|
||||
) -> ty.Tuple[ty.List[str], ty.List[str]]:
|
||||
attr_map: list[tuple[str, str, str]], long_listing: bool
|
||||
) -> tuple[list[str], list[str]]:
|
||||
"""Return table headers and column names for a listing table.
|
||||
|
||||
An attribute map (attr_map) is a list of table entry definitions
|
||||
@@ -77,9 +77,9 @@ def get_column_definitions(
|
||||
|
||||
|
||||
def get_columns(
|
||||
item: ty.Dict[str, ty.Any],
|
||||
attr_map: ty.Optional[ty.List[ty.Tuple[str, str, str]]] = None,
|
||||
) -> ty.Tuple[ty.Tuple[str, ...], ty.Tuple[str, ...]]:
|
||||
item: dict[str, ty.Any],
|
||||
attr_map: ty.Optional[list[tuple[str, str, str]]] = None,
|
||||
) -> tuple[tuple[str, ...], tuple[str, ...]]:
|
||||
"""Return pair of resource attributes and corresponding display names.
|
||||
|
||||
:param item: a dictionary which represents a resource.
|
||||
|
@@ -11,6 +11,7 @@
|
||||
# under the License.
|
||||
|
||||
import argparse
|
||||
import collections.abc
|
||||
import typing as ty
|
||||
|
||||
from osc_lib.i18n import _
|
||||
@@ -32,7 +33,7 @@ class _CommaListAction(argparse.Action):
|
||||
def add_tag_filtering_option_to_parser(
|
||||
parser: argparse.ArgumentParser,
|
||||
resource_name: str,
|
||||
enhance_help: ty.Callable[[str], str] = lambda _h: _h,
|
||||
enhance_help: collections.abc.Callable[[str], str] = lambda _h: _h,
|
||||
) -> None:
|
||||
"""Add tag filtering options to a parser.
|
||||
|
||||
@@ -96,7 +97,7 @@ def add_tag_filtering_option_to_parser(
|
||||
|
||||
def get_tag_filtering_args(
|
||||
parsed_args: argparse.Namespace,
|
||||
args: ty.Dict[str, ty.Any],
|
||||
args: dict[str, ty.Any],
|
||||
) -> None:
|
||||
"""Adds the tag arguments to an args list.
|
||||
|
||||
@@ -119,7 +120,7 @@ def get_tag_filtering_args(
|
||||
def add_tag_option_to_parser_for_create(
|
||||
parser: argparse.ArgumentParser,
|
||||
resource_name: str,
|
||||
enhance_help: ty.Callable[[str], str] = lambda _h: _h,
|
||||
enhance_help: collections.abc.Callable[[str], str] = lambda _h: _h,
|
||||
) -> None:
|
||||
"""Add tag options to a parser for create commands.
|
||||
|
||||
@@ -152,7 +153,7 @@ def add_tag_option_to_parser_for_create(
|
||||
def add_tag_option_to_parser_for_set(
|
||||
parser: argparse.ArgumentParser,
|
||||
resource_name: str,
|
||||
enhance_help: ty.Callable[[str], str] = lambda _h: _h,
|
||||
enhance_help: collections.abc.Callable[[str], str] = lambda _h: _h,
|
||||
) -> None:
|
||||
"""Add tag options to a parser for set commands.
|
||||
|
||||
@@ -190,7 +191,7 @@ def add_tag_option_to_parser_for_set(
|
||||
def add_tag_option_to_parser_for_unset(
|
||||
parser: argparse.ArgumentParser,
|
||||
resource_name: str,
|
||||
enhance_help: ty.Callable[[str], str] = lambda _h: _h,
|
||||
enhance_help: collections.abc.Callable[[str], str] = lambda _h: _h,
|
||||
) -> None:
|
||||
"""Add tag options to a parser for set commands.
|
||||
|
||||
|
@@ -1,5 +1,5 @@
|
||||
[tool.mypy]
|
||||
python_version = "3.8"
|
||||
python_version = "3.9"
|
||||
show_column_numbers = true
|
||||
show_error_context = true
|
||||
ignore_missing_imports = true
|
||||
@@ -33,7 +33,6 @@ ignore_errors = true
|
||||
|
||||
[tool.ruff]
|
||||
line-length = 79
|
||||
target-version = "py38"
|
||||
|
||||
[tool.ruff.format]
|
||||
quote-style = "preserve"
|
||||
|
Reference in New Issue
Block a user