typing: Fix initial typing issues

We bump the versions of cliff and keystoneauth1 to the first versions
with full typing. We also remove an irrelevant note from the
requirements files: dependency order hasn't mattered since pip 20.3.

Change-Id: I7fb94985785323ba91aef973686c89a27c32c368
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane
2025-02-19 20:12:33 +00:00
parent 7eb3f6228f
commit 2fd1e26a7b
10 changed files with 67 additions and 40 deletions

View File

@@ -14,6 +14,7 @@
"""Authentication Library"""
import argparse
import typing as ty
from keystoneauth1.identity.v3 import k2k
from keystoneauth1.loading import base
@@ -27,8 +28,14 @@ from osc_lib import utils
# to get the command-line options
PLUGIN_LIST = None
class _OptionDict(ty.TypedDict):
env: str
help: str
# List of plugin command line options
OPTIONS_LIST = {}
OPTIONS_LIST: dict[str, _OptionDict] = {}
def get_plugin_list():
@@ -87,15 +94,11 @@ def check_valid_authorization_options(options, auth_plugin_name):
def check_valid_authentication_options(options, auth_plugin_name):
"""Validate authentication options, and provide helpful error messages
:param required_scope: indicate whether a scoped token is required
"""
"""Validate authentication options, and provide helpful error messages."""
# Get all the options defined within the plugin.
plugin_opts = base.get_plugin_options(auth_plugin_name)
plugin_opts = {opt.dest: opt for opt in plugin_opts}
plugin_opts = {
opt.dest: opt for opt in base.get_plugin_options(auth_plugin_name)
}
# NOTE(aloga): this is an horrible hack. We need a way to specify the
# required options in the plugins. Using the "required" argument for

View File

@@ -15,12 +15,14 @@
"""Formattable column for specify content type"""
import typing as ty
from cliff import columns
from osc_lib import utils
class DictColumn(columns.FormattableColumn):
class DictColumn(columns.FormattableColumn[dict[str, ty.Any]]):
"""Format column for dict content"""
def human_readable(self):
@@ -30,7 +32,7 @@ class DictColumn(columns.FormattableColumn):
return dict(self._value or {})
class DictListColumn(columns.FormattableColumn):
class DictListColumn(columns.FormattableColumn[dict[str, list[ty.Any]]]):
"""Format column for dict, key is string, value is list"""
def human_readable(self):
@@ -40,7 +42,7 @@ class DictListColumn(columns.FormattableColumn):
return dict(self._value or {})
class ListColumn(columns.FormattableColumn):
class ListColumn(columns.FormattableColumn[list[ty.Any]]):
"""Format column for list content"""
def human_readable(self):
@@ -50,7 +52,7 @@ class ListColumn(columns.FormattableColumn):
return [x for x in self._value or []]
class ListDictColumn(columns.FormattableColumn):
class ListDictColumn(columns.FormattableColumn[list[dict[str, ty.Any]]]):
"""Format column for list of dict content"""
def human_readable(self):
@@ -60,7 +62,7 @@ class ListDictColumn(columns.FormattableColumn):
return [dict(x) for x in self._value or []]
class SizeColumn(columns.FormattableColumn):
class SizeColumn(columns.FormattableColumn[ty.Union[int, float]]):
"""Format column for file size content"""
def human_readable(self):

View File

@@ -164,7 +164,7 @@ class MultiKeyValueAction(argparse.Action):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
params = {}
params: dict[str, str] = {}
for kv in values.split(','):
# Add value if an assignment else raise ArgumentError
if '=' in kv:
@@ -174,7 +174,7 @@ class MultiKeyValueAction(argparse.Action):
msg = _("Each property key must be specified: %s")
raise argparse.ArgumentError(self, msg % str(kv))
else:
params.update([kv_list])
params.update(dict([kv_list]))
else:
msg = _(
"Expected comma separated 'key=value' pairs, but got: %s"
@@ -206,7 +206,7 @@ class MultiKeyValueCommaAction(MultiKeyValueAction):
if getattr(namespace, self.dest, None) is None:
setattr(namespace, self.dest, [])
params = {}
params: dict[str, str] = {}
key = ''
for kv in values.split(','):
# Add value if an assignment else raise ArgumentError
@@ -217,7 +217,7 @@ class MultiKeyValueCommaAction(MultiKeyValueAction):
msg = _("A key must be specified before '=': %s")
raise argparse.ArgumentError(self, msg % str(kv))
else:
params.update([kv_list])
params.update(dict([kv_list]))
key = kv_list[0]
else:
# If the ',' split does not have key=value pair, then it

View File

@@ -25,11 +25,8 @@ from oslo_utils import strutils
from osc_lib.api import auth
from osc_lib import exceptions
LOG = logging.getLogger(__name__)
PLUGIN_MODULES = []
class ClientCache:
"""Descriptor class for caching created client handles."""
@@ -178,6 +175,9 @@ class ClientManager:
self._auth_setup_completed = True
def validate_scope(self):
if not self._auth_ref:
raise Exception('no authentication information')
if self._auth_ref.project_id is not None:
# We already have a project scope.
return

View File

@@ -33,6 +33,8 @@ class CommandMeta(abc.ABCMeta):
class Command(command.Command, metaclass=CommandMeta):
log: logging.Logger
def run(self, parsed_args):
self.log.debug('run(%s)', parsed_args)
return super().run(parsed_args)

View File

@@ -13,12 +13,19 @@
"""Timing Implementation"""
import typing as ty
from osc_lib.command import command
if ty.TYPE_CHECKING:
from osc_lib import shell
class Timing(command.Lister):
"""Show timing data"""
app: 'shell.OpenStackShell'
def take_action(self, parsed_args):
column_headers = (
'URL',

View File

@@ -59,6 +59,9 @@ class InvalidValue(Exception):
class ClientException(Exception):
"""The base exception class for all exceptions this library raises."""
http_status: int
message: str
def __init__(self, code, message=None, details=None):
if not isinstance(code, int) and message is None:
message = code

View File

@@ -20,7 +20,9 @@ import getpass
import logging
import sys
import traceback
import typing as ty
from cliff import _argparse
from cliff import app
from cliff import command
from cliff import commandmanager
@@ -77,7 +79,7 @@ class OpenStackShell(app.App):
CONSOLE_MESSAGE_FORMAT = '%(levelname)s: %(name)s %(message)s'
log = logging.getLogger(__name__)
timing_data = []
timing_data: list[ty.Any] = []
def __init__(
self,
@@ -91,11 +93,11 @@ class OpenStackShell(app.App):
deferred_help=False,
):
# Patch command.Command to add a default auth_required = True
command.Command.auth_required = True
setattr(command.Command, 'auth_required', True)
# Some commands do not need authentication
help.HelpCommand.auth_required = False
complete.CompleteCommand.auth_required = False
setattr(help.HelpCommand, 'auth_required', False)
setattr(complete.CompleteCommand, 'auth_required', False)
# Slight change to the meaning of --debug
self.DEFAULT_DEBUG_VALUE = None
@@ -190,8 +192,17 @@ class OpenStackShell(app.App):
self.close_profile()
return ret_value
def build_option_parser(self, description, version):
parser = super().build_option_parser(description, version)
def build_option_parser(
self,
description: ty.Optional[str],
version: ty.Optional[str],
argparse_kwargs: ty.Optional[dict[str, ty.Any]] = None,
) -> _argparse.ArgumentParser:
parser = super().build_option_parser(
description,
version,
argparse_kwargs,
)
# service token auth argument
parser.add_argument(
@@ -345,7 +356,6 @@ class OpenStackShell(app.App):
)
return parser
# return clientmanager.build_plugin_option_parser(parser)
"""
Break up initialize_app() so that overriding it in a subclass does not
@@ -414,7 +424,9 @@ class OpenStackShell(app.App):
super().initialize_app(argv)
self.log.info(
"START with options: %s",
strutils.mask_password(" ".join(self.command_options)),
strutils.mask_password(" ".join(self.command_options))
if self.command_options
else "",
)
self.log.debug("options: %s", strutils.mask_password(self.options))
@@ -481,6 +493,9 @@ class OpenStackShell(app.App):
cmd.auth_required,
)
if not self.client_manager:
raise RuntimeWarning("can't get here")
# NOTE(dtroyer): If auth is not required for a command, skip
# get_one()'s validation to avoid loading plugins
validate = cmd.auth_required
@@ -515,6 +530,9 @@ class OpenStackShell(app.App):
def clean_up(self, cmd, result, err):
self.log.debug('clean_up %s: %s', cmd.__class__.__name__, err or '')
if not self.client_manager:
raise RuntimeWarning("can't get here")
# Close SDK connection if available to have proper cleanup there
if hasattr(self.client_manager, "sdk_connection"):
self.client_manager.sdk_connection.close()

View File

@@ -1,12 +1,8 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
pbr!=2.1.0,>=2.0.0 # Apache-2.0
cliff>=3.2.0 # Apache-2.0
keystoneauth1>=3.14.0 # Apache-2.0
cliff>=4.9.0 # Apache-2.0
keystoneauth1>=5.10.0 # Apache-2.0
openstacksdk>=0.15.0 # Apache-2.0
oslo.i18n>=3.15.3 # Apache-2.0
oslo.utils>=3.33.0 # Apache-2.0
pbr!=2.1.0,>=2.0.0 # Apache-2.0
requests>=2.14.2 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0

View File

@@ -1,7 +1,3 @@
# The order of packages is significant, because pip processes them in the order
# of appearance. Changing the order has an impact on the overall integration
# process, which may cause wedges in the gate later.
coverage!=4.4,>=4.0 # Apache-2.0
fixtures>=3.0.0 # Apache-2.0/BSD
oslotest>=3.2.0 # Apache-2.0