typing: Add typing to osc_lib.api

While here, we also address a minor behavior issue and add a release
note for same.

Change-Id: I1d26133c9d9ed299d1035f207059aa8fe463a001
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Depends-on: https://review.opendev.org/c/openstack/python-openstackclient/+/946034
This commit is contained in:
Stephen Finucane
2025-02-19 20:12:33 +00:00
parent 0c63b24525
commit 3d221e5992
7 changed files with 107 additions and 50 deletions

View File

@@ -13,6 +13,9 @@
"""Base API Library"""
import builtins
import typing as ty
from keystoneauth1 import exceptions as ksa_exceptions
from keystoneauth1 import session as ksa_session
import requests
@@ -42,8 +45,12 @@ class BaseAPI:
HEADER_NAME = "OpenStack-API-Version"
def __init__(
self, session=None, service_type=None, endpoint=None, **kwargs
):
self,
session: ty.Optional[ksa_session.Session] = None,
service_type: ty.Optional[str] = None,
endpoint: ty.Optional[str] = None,
**kwargs: ty.Any,
) -> None:
"""Base object that contains some common API objects and methods
:param keystoneauth1.session.Session session:
@@ -69,7 +76,7 @@ class BaseAPI:
self.service_type = service_type
self.endpoint = self._munge_endpoint(endpoint)
def _munge_endpoint(self, endpoint):
def _munge_endpoint(self, endpoint: ty.Optional[str]) -> ty.Optional[str]:
"""Hook to allow subclasses to massage the passed-in endpoint
Hook to massage passed-in endpoints from arbitrary sources,
@@ -90,7 +97,13 @@ class BaseAPI:
else:
return endpoint
def _request(self, method, url, session=None, **kwargs):
def _request(
self,
method: str,
url: str,
session: ty.Optional[ksa_session.Session] = None,
**kwargs: ty.Any,
) -> requests.Response:
"""Perform call into session
All API calls are funneled through this method to provide a common
@@ -136,7 +149,13 @@ class BaseAPI:
# The basic action methods all take a Session and return dict/lists
def create(self, url, session=None, method=None, **params):
def create(
self,
url: str,
session: ty.Optional[ksa_session.Session] = None,
method: ty.Optional[str] = None,
**params: ty.Any,
) -> ty.Union[requests.Response, ty.Any]:
"""Create a new resource
:param string url:
@@ -156,7 +175,12 @@ class BaseAPI:
except requests.JSONDecodeError:
return ret
def delete(self, url, session=None, **params):
def delete(
self,
url: str,
session: ty.Optional[ksa_session.Session] = None,
**params: ty.Any,
) -> requests.Response:
"""Delete a resource
:param string url:
@@ -169,13 +193,13 @@ class BaseAPI:
def list(
self,
path,
session=None,
body=None,
detailed=False,
headers=None,
**params,
):
path: str,
session: ty.Optional[ksa_session.Session] = None,
body: ty.Any = None,
detailed: bool = False,
headers: ty.Optional[dict[str, str]] = None,
**params: ty.Any,
) -> ty.Union[requests.Response, ty.Any]:
"""Return a list of resources
GET ${ENDPOINT}/${PATH}?${PARAMS}
@@ -226,11 +250,11 @@ class BaseAPI:
def find_attr(
self,
path,
value=None,
attr=None,
resource=None,
):
path: str,
value: ty.Optional[str] = None,
attr: ty.Optional[str] = None,
resource: ty.Optional[str] = None,
) -> ty.Any:
"""Find a resource via attribute or ID
Most APIs return a list wrapped by a dict with the resource
@@ -260,7 +284,7 @@ class BaseAPI:
if resource is None:
resource = path
def getlist(kw):
def getlist(kw: dict[str, ty.Any]) -> ty.Any:
"""Do list call, unwrap resource dict if present"""
ret = self.list(path, **kw)
if isinstance(ret, dict) and resource in ret:
@@ -290,7 +314,12 @@ class BaseAPI:
msg % {'resource': resource, 'attr': attr, 'value': value}
)
def find_bulk(self, path, headers=None, **kwargs):
def find_bulk(
self,
path: str,
headers: ty.Optional[dict[str, str]] = None,
**kwargs: ty.Any,
) -> builtins.list[ty.Any]:
"""Bulk load and filter locally
:param string path:
@@ -318,7 +347,7 @@ class BaseAPI:
return ret
def find_one(self, path, **kwargs):
def find_one(self, path: str, **kwargs: ty.Any) -> ty.Any:
"""Find a resource by name or ID
:param string path:
@@ -339,11 +368,11 @@ class BaseAPI:
def find(
self,
path,
value=None,
attr=None,
headers=None,
):
path: str,
value: ty.Optional[str] = None,
attr: ty.Optional[str] = None,
headers: ty.Optional[dict[str, str]] = None,
) -> ty.Any:
"""Find a single resource by name or ID
:param string path:
@@ -356,7 +385,7 @@ class BaseAPI:
Headers dictionary to pass to requests
"""
def raise_not_found():
def raise_not_found() -> ty.NoReturn:
msg = _("%s not found") % value
raise exceptions.NotFound(404, msg)

View File

@@ -16,6 +16,7 @@
import argparse
import typing as ty
from keystoneauth1.identity import base as identity_base
from keystoneauth1.identity.v3 import k2k
from keystoneauth1.loading import base
@@ -23,6 +24,9 @@ from osc_lib import exceptions as exc
from osc_lib.i18n import _
from osc_lib import utils
if ty.TYPE_CHECKING:
from openstack import connection
# Initialize the list of Authentication plugins early in order
# to get the command-line options
@@ -38,7 +42,7 @@ class _OptionDict(ty.TypedDict):
OPTIONS_LIST: dict[str, _OptionDict] = {}
def get_plugin_list():
def get_plugin_list() -> frozenset[str]:
"""Gather plugin list and cache it"""
global PLUGIN_LIST
@@ -48,7 +52,7 @@ def get_plugin_list():
return PLUGIN_LIST
def get_options_list():
def get_options_list() -> dict[str, _OptionDict]:
"""Gather plugin options so the help action has them available"""
global OPTIONS_LIST
@@ -72,7 +76,10 @@ def get_options_list():
return OPTIONS_LIST
def check_valid_authorization_options(options, auth_plugin_name):
def check_valid_authorization_options(
options: 'connection.Connection',
auth_plugin_name: str,
) -> None:
"""Validate authorization options, and provide helpful error messages."""
if (
options.auth.get('project_id')
@@ -93,7 +100,10 @@ def check_valid_authorization_options(options, auth_plugin_name):
)
def check_valid_authentication_options(options, auth_plugin_name):
def check_valid_authentication_options(
options: 'connection.Connection',
auth_plugin_name: str,
) -> None:
"""Validate authentication options, and provide helpful error messages."""
# Get all the options defined within the plugin.
plugin_opts = {
@@ -144,7 +154,9 @@ def check_valid_authentication_options(options, auth_plugin_name):
)
def build_auth_plugins_option_parser(parser):
def build_auth_plugins_option_parser(
parser: argparse.ArgumentParser,
) -> argparse.ArgumentParser:
"""Auth plugins options builder
Builds dynamically the list of options expected by each available
@@ -209,13 +221,13 @@ def build_auth_plugins_option_parser(parser):
def get_keystone2keystone_auth(
local_auth,
service_provider,
project_id=None,
project_name=None,
project_domain_id=None,
project_domain_name=None,
):
local_auth: identity_base.BaseIdentityPlugin,
service_provider: str,
project_id: ty.Optional[str] = None,
project_name: ty.Optional[str] = None,
project_domain_id: ty.Optional[str] = None,
project_domain_name: ty.Optional[str] = None,
) -> k2k.Keystone2Keystone:
"""Return Keystone 2 Keystone authentication for service provider.
:param local_auth: authentication to use with the local Keystone

View File

@@ -13,13 +13,18 @@
"""API Utilities Library"""
import typing as ty
_T = ty.TypeVar('_T', bound=list[ty.Any])
def simple_filter(
data=None,
attr=None,
value=None,
property_field=None,
):
data: ty.Optional[_T] = None,
attr: ty.Optional[str] = None,
value: ty.Optional[str] = None,
property_field: ty.Optional[str] = None,
) -> ty.Optional[_T]:
"""Filter a list of dicts
:param list data:

View File

@@ -288,19 +288,19 @@ class TestShellCli(utils.TestShell):
# Default
utils.fake_execute(_shell, "module list")
self.assertEqual('', _shell.options.cert)
self.assertEqual('', _shell.options.key)
self.assertIsNone(_shell.options.cert)
self.assertIsNone(_shell.options.key)
self.assertIsNone(_shell.client_manager.cert)
# --os-cert
utils.fake_execute(_shell, "--os-cert mycert module list")
self.assertEqual('mycert', _shell.options.cert)
self.assertEqual('', _shell.options.key)
self.assertIsNone(_shell.options.key)
self.assertEqual('mycert', _shell.client_manager.cert)
# --os-key
utils.fake_execute(_shell, "--os-key mickey module list")
self.assertEqual('', _shell.options.cert)
self.assertIsNone(_shell.options.cert)
self.assertEqual('mickey', _shell.options.key)
self.assertIsNone(_shell.client_manager.cert)

View File

@@ -21,6 +21,7 @@ import getpass
import logging
import os
import time
import typing as ty
import warnings
from cliff import columns as cliff_columns
@@ -138,7 +139,7 @@ def calculate_header_and_attrs(column_headers, attrs, parsed_args):
return column_headers, attrs
def env(*vars, **kwargs):
def env(*vars: str, **kwargs: ty.Any) -> ty.Optional[str]:
"""Search for the first defined of possibly many env vars
Returns the first environment variable defined in vars, or
@@ -148,7 +149,11 @@ def env(*vars, **kwargs):
value = os.environ.get(v, None)
if value:
return value
return kwargs.get('default', '')
if 'default' in kwargs and kwargs['default'] is not None:
return str(kwargs['default'])
return None
def find_min_match(items, sort_attr, **kwargs):

View File

@@ -34,6 +34,7 @@ ignore_errors = true
[[tool.mypy.overrides]]
module = [
"osc_lib.api.*",
"osc_lib.exceptions",
]
disallow_untyped_calls = true

View File

@@ -0,0 +1,5 @@
---
fixes:
- |
The ``osc_lib.utils.env`` helper will now consistently return a string
or ``None``.