typing: Resolve remaining initial issues

Just enough to get mypy passing with a minimal config.

Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Change-Id: I40594b518fab8d72dabeffe3ab3d46fa605ff930
This commit is contained in:
Stephen Finucane 2024-07-30 14:20:04 +01:00
parent 9181ef3474
commit b2dcd89546
27 changed files with 96 additions and 59 deletions

View File

@ -34,7 +34,7 @@ class FairSemaphore:
self._concurrency = concurrency
if concurrency:
self._count = 0
self._queue = queue.Queue()
self._queue: queue.Queue[threading.Event] = queue.Queue()
self._rate_delay = rate_delay
self._rate_last_ts = time.time()

View File

@ -15,6 +15,7 @@
# limitations under the License.
import functools
import typing as ty
from keystoneauth1 import _utils as utils
from keystoneauth1.access import service_catalog
@ -61,7 +62,7 @@ class AccessInfo:
"""
_service_catalog_class = None
_service_catalog_class: ty.Type[service_catalog.ServiceCatalog]
def __init__(self, body, auth_token=None):
self._data = body

View File

@ -18,6 +18,7 @@
import abc
import copy
import typing as ty
from keystoneauth1 import discover
from keystoneauth1 import exceptions
@ -29,6 +30,15 @@ class ServiceCatalog(metaclass=abc.ABCMeta):
def __init__(self, catalog):
self._catalog = catalog
@classmethod
@abc.abstractmethod
def from_token(cls, token):
"""Retrieve the service catalog from a token.
:param token:
:returns: A service catalog.
"""
def _get_endpoint_region(self, endpoint):
return endpoint.get('region_id') or endpoint.get('region')
@ -51,7 +61,7 @@ class ServiceCatalog(metaclass=abc.ABCMeta):
"""
@staticmethod
def normalize_interface(self, interface):
def normalize_interface(interface):
"""Handle differences in the way v2 and v3 catalogs specify endpoint.
Both v2 and v3 must be able to handle the endpoint style of the other.
@ -168,7 +178,7 @@ class ServiceCatalog(metaclass=abc.ABCMeta):
"""
interfaces = self._get_interface_list(interface)
matching_endpoints = {}
matching_endpoints: ty.Dict[str, ty.List[discover.EndpointData]] = {}
for service in self.normalize_catalog():
if service_type and not discover._SERVICE_TYPES.is_match(
@ -214,15 +224,19 @@ class ServiceCatalog(metaclass=abc.ABCMeta):
if not interfaces:
return self._endpoints_by_type(service_type, matching_endpoints)
ret = {}
ret: ty.Dict[str, ty.List[discover.EndpointData]] = {}
for matched_service_type, endpoints in matching_endpoints.items():
if not endpoints:
ret[matched_service_type] = []
continue
matches_by_interface = {}
matches_by_interface: ty.Dict[
str, ty.List[discover.EndpointData]
] = {}
for endpoint in endpoints:
matches_by_interface.setdefault(endpoint.interface, [])
matches_by_interface[endpoint.interface].append(endpoint)
best_interface = [
i for i in interfaces if i in matches_by_interface.keys()
][0]

View File

@ -11,6 +11,7 @@
# under the License.
import os
import typing as ty
import warnings
import requests
@ -329,7 +330,7 @@ class Adapter:
:returns: Endpoint data if available or None.
:rtype: keystoneauth1.discover.EndpointData
"""
kwargs = {}
kwargs: ty.Dict[str, ty.Any] = {}
self._set_endpoint_filter_kwargs(kwargs)
if self.endpoint_override:
kwargs['endpoint_override'] = self.endpoint_override

View File

@ -23,6 +23,7 @@ raw data specified in version discovery responses.
import copy
import re
import typing as ty
import urllib
import os_service_types
@ -834,7 +835,7 @@ class Discover:
return data['url'] if data else None
class VersionData(dict):
class VersionData(ty.Dict[str, ty.Any]):
"""Normalized Version Data about an endpoint."""
def __init__(

View File

@ -15,7 +15,9 @@ import urllib
import uuid
try:
from lxml import etree
# explicitly re-export symbol
# https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-no-implicit-reexport
from lxml import etree as etree
except ImportError:
etree = None

View File

@ -11,7 +11,9 @@
# under the License.
try:
from lxml import etree
# explicitly re-export symbol
# https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-no-implicit-reexport
from lxml import etree as etree
except ImportError:
etree = None
@ -24,7 +26,7 @@ class _Saml2TokenAuthMethod(v3.AuthMethod):
# TODO(stephenfin): Deprecate and remove unused kwargs
def get_auth_data(self, session, auth, headers, request_kwargs, **kwargs):
raise exceptions.MethodNotImplemented(
raise exceptions.HttpNotImplemented(
'This method should never be called'
)

View File

@ -13,7 +13,9 @@
import abc
try:
from lxml import etree
# explicitly re-export symbol
# https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-no-implicit-reexport
from lxml import etree as etree
except ImportError:
etree = None

View File

@ -22,7 +22,9 @@
"""
try:
import requests_kerberos
# explicitly re-export symbol
# https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-no-implicit-reexport
import requests_kerberos as requests_kerberos
except ImportError:
requests_kerberos = None
@ -58,6 +60,8 @@ packages. These can be installed with::
class KerberosMethod(v3.AuthMethod):
mutual_auth: str
_method_parameters = ['mutual_auth']
def __init__(self, *args, **kwargs):

View File

@ -42,7 +42,7 @@ class Kerberos(loading.BaseV3Loader):
def load_from_options(self, **kwargs):
if kwargs.get('mutual_auth'):
value = kwargs.get('mutual_auth')
value = kwargs['mutual_auth']
if not (value.lower() in ['required', 'optional', 'disabled']):
m = (
'You need to provide a valid value for kerberos mutual '
@ -81,7 +81,7 @@ class MappedKerberos(loading.BaseFederationLoader):
def load_from_options(self, **kwargs):
if kwargs.get('mutual_auth'):
value = kwargs.get('mutual_auth')
value = kwargs['mutual_auth']
if not (value.lower() in ['required', 'optional', 'disabled']):
m = (
'You need to provide a valid value for kerberos mutual '

View File

@ -24,7 +24,9 @@
import logging
try:
from oauthlib import oauth1
# explicitly re-export symbol
# https://mypy.readthedocs.io/en/stable/command_line.html#cmdoption-mypy-no-implicit-reexport
from oauthlib import oauth1 as oauth1
except ImportError:
oauth1 = None

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from keystoneauth1 import _utils as utils
__all__ = ('DiscoveryList', 'V2Discovery', 'V3Discovery', 'VersionDiscovery')
@ -17,7 +19,7 @@ __all__ = ('DiscoveryList', 'V2Discovery', 'V3Discovery', 'VersionDiscovery')
_DEFAULT_DAYS_AGO = 30
class DiscoveryBase(dict):
class DiscoveryBase(ty.Dict[str, ty.Any]):
"""The basic version discovery structure.
All version discovery elements should have access to these values.
@ -281,7 +283,7 @@ class V3Discovery(DiscoveryBase):
)
class DiscoveryList(dict):
class DiscoveryList(ty.Dict[str, ty.Any]):
"""A List of version elements.
Creates a correctly structured list of identity service endpoints for

View File

@ -81,7 +81,7 @@ class YamlJsonSerializer(betamax.serializers.base.BaseSerializer):
class MyDumper(yaml.Dumper):
"""Specialized Dumper which does nice blocks and unicode."""
yaml.representer.BaseRepresenter.represent_scalar = _represent_scalar
yaml.representer.BaseRepresenter.represent_scalar = _represent_scalar # type: ignore[method-assign]
MyDumper.add_representer(str, _unicode_representer)

View File

@ -11,13 +11,14 @@
# under the License.
import datetime
import typing as ty
import uuid
from keystoneauth1 import _utils
from keystoneauth1.fixture import exception
class _Service(dict):
class _Service(ty.Dict[str, ty.Any]):
def add_endpoint(
self,
public,
@ -40,7 +41,7 @@ class _Service(dict):
return data
class Token(dict):
class Token(ty.Dict[str, ty.Any]):
"""A V2 Keystone token that can be used for testing.
This object is designed to allow clients to generate a correct V2 token for
@ -229,14 +230,14 @@ class Token(dict):
self._token['audit_ids'] = [self.audit_id, value]
def validate(self):
scoped = 'tenant' in self.token
scoped = 'tenant' in self._token
catalog = self.root.get('serviceCatalog')
if catalog and not scoped:
msg = 'You cannot have a service catalog on an unscoped token'
raise exception.FixtureValidationError(msg)
if scoped and not self.user.get('roles'):
if scoped and not self._user.get('roles'):
msg = 'You must have roles on a token to scope it'
raise exception.FixtureValidationError(msg)

View File

@ -11,13 +11,14 @@
# under the License.
import datetime
import typing as ty
import uuid
from keystoneauth1 import _utils
from keystoneauth1.fixture import exception
class _Service(dict):
class _Service(ty.Dict[str, ty.Any]):
"""One of the services that exist in the catalog.
You use this by adding a service to a token which returns an instance of
@ -50,7 +51,7 @@ class _Service(dict):
return ret
class Token(dict):
class Token(ty.Dict[str, ty.Any]):
"""A V3 Keystone token that can be used for testing.
This object is designed to allow clients to generate a correct V3 token for
@ -466,7 +467,7 @@ class Token(dict):
msg = 'You cannot have a service catalog on an unscoped token'
raise exception.FixtureValidationError(msg)
if scoped and not self.user.get('roles'):
if scoped and not self._user.get('roles'):
msg = 'You must have roles on a token to scope it'
raise exception.FixtureValidationError(msg)

View File

@ -16,6 +16,7 @@ import functools
import hashlib
import json
import threading
import typing as ty
from keystoneauth1 import _utils as utils
from keystoneauth1 import access
@ -594,7 +595,9 @@ class BaseIdentityPlugin(plugin.BaseAuthPlugin, metaclass=abc.ABCMeta):
"""
service_types = discover._SERVICE_TYPES
catalog = self.get_access(session).service_catalog
version_data = {}
version_data: ty.Dict[
str, ty.Dict[str, ty.Dict[str, ty.List[discover.VersionData]]]
] = {}
endpoints_data = catalog.get_endpoints_data(
interface=interface,
region_name=region_name,

View File

@ -12,6 +12,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from keystoneauth1.identity.v3 import base
@ -57,7 +59,9 @@ class ApplicationCredentialMethod(base.AuthMethod):
# TODO(stephenfin): Deprecate and remove unused kwargs
def get_auth_data(self, session, auth, headers, request_kwargs, **kwargs):
auth_data = {'secret': self.application_credential_secret}
auth_data: ty.Dict[str, ty.Any] = {
'secret': self.application_credential_secret
}
if self.application_credential_id:
auth_data['id'] = self.application_credential_id

View File

@ -12,6 +12,9 @@
import abc
import json
import typing as ty
import typing_extensions as ty_ext
from keystoneauth1 import _utils as utils
from keystoneauth1 import access
@ -88,6 +91,15 @@ class BaseAuth(base.BaseIdentityPlugin, metaclass=abc.ABCMeta):
)
class _AuthIdentity(ty.TypedDict):
identity: ty.Dict[str, ty.Any]
scope: ty_ext.NotRequired[ty.Union[ty.Dict[str, ty.Any], str]]
class _AuthBody(ty.TypedDict):
auth: _AuthIdentity
class Auth(BaseAuth):
"""Identity V3 Authentication Plugin.
@ -120,9 +132,10 @@ class Auth(BaseAuth):
def get_auth_ref(self, session, **kwargs):
headers = {'Accept': 'application/json'}
body = {'auth': {'identity': {}}}
body: _AuthBody = {'auth': {'identity': {}}}
ident = body['auth']['identity']
rkwargs = {}
# this is passed around for its side-effects
rkwargs: ty.Dict[str, ty.Any] = {}
for method in self.auth_methods:
name, auth_data = method.get_auth_data(
@ -254,7 +267,7 @@ class AuthMethod(metaclass=abc.ABCMeta):
the factory method and don't work as well with AuthConstructors.
"""
_method_parameters = []
_method_parameters: ty.List[str] = []
def __init__(self, **kwargs):
for param in self._method_parameters:
@ -312,7 +325,7 @@ class AuthConstructor(Auth, metaclass=abc.ABCMeta):
creates the auth plugin with only that authentication method.
"""
_auth_method_class = None
_auth_method_class: ty.Type[AuthMethod]
def __init__(self, auth_url, *args, **kwargs):
method_kwargs = self._auth_method_class._extract_kwargs(kwargs)

View File

@ -44,7 +44,7 @@ class _OidcBase(federation.FederationBaseAuth, metaclass=abc.ABCMeta):
``http://openid.net/specs/openid-connect-core-1_0.html``
"""
grant_type = None
grant_type: str
def __init__(
self,

View File

@ -10,8 +10,9 @@
# License for the specific language governing permissions and limitations
# under the License.
from keystoneauth1.identity.v3 import base
import typing as ty
from keystoneauth1.identity.v3 import base
__all__ = ('PasswordMethod', 'Password')
@ -42,7 +43,7 @@ class PasswordMethod(base.AuthMethod):
# TODO(stephenfin): Deprecate and remove unused kwargs
def get_auth_data(self, session, auth, headers, request_kwargs, **kwargs):
user = {'password': self.password}
user: ty.Dict[str, ty.Any] = {'password': self.password}
if self.user_id:
user['id'] = self.user_id

View File

@ -11,6 +11,7 @@
# under the License.
import copy
import typing as ty
from keystoneauth1.identity.v3 import base
@ -44,7 +45,7 @@ class TOTPMethod(base.AuthMethod):
# TODO(stephenfin): Deprecate and remove unused kwargs
def get_auth_data(self, session, auth, headers, request_kwargs, **kwargs):
user = {'passcode': self.passcode}
user: ty.Dict[str, ty.Any] = {'passcode': self.passcode}
if self.user_id:
user['id'] = self.user_id

View File

@ -95,7 +95,7 @@ def load_from_argparse_arguments(namespace, **kwargs):
if not namespace.os_auth_type:
return None
if isinstance(namespace.os_auth_type, type):
if isinstance(namespace.os_auth_type, base.BaseLoader):
plugin = namespace.os_auth_type
else:
plugin = base.get_plugin_loader(namespace.os_auth_type)

View File

@ -129,12 +129,6 @@ class Opt:
and self.metavar == other.metavar
)
# NOTE: This function is only needed by Python 2. If we get to point where
# we don't support Python 2 anymore, this function should be removed.
def __ne__(self, other):
"""Define inequality operator on option parameters."""
return not self.__eq__(other)
@property
def _all_opts(self):
return itertools.chain([self], self.deprecated)

View File

@ -244,6 +244,8 @@ class RequestTiming:
class _Retries:
__slots__ = ('_fixed_delay', '_current')
_fixed_delay: float
_current: float
def __init__(self, fixed_delay=None):
self._fixed_delay = fixed_delay
@ -261,9 +263,6 @@ class _Retries:
else:
self._current = _EXPONENTIAL_DELAY_START
# Python 2 compatibility
next = __next__
class Session:
"""Maintains client communication state and common functionality.

View File

@ -68,12 +68,6 @@ class BoolType:
# hack around oslo.config equality comparison
return type(self) is type(other)
# NOTE: This function is only needed by Python 2. If we get to point where
# we don't support Python 2 anymore, this function should be removed.
def __ne__(self, other):
"""Define inequiality for many bool types."""
return not self.__eq__(other)
def __call__(self, value):
return str(value).lower() in ('1', 'true', 't', 'yes', 'y')

View File

@ -144,12 +144,6 @@ class TestResponse(requests.Response):
"""Define equiality behavior of request and response."""
return self.__dict__ == other.__dict__
# NOTE: This function is only needed by Python 2. If we get to point where
# we don't support Python 2 anymore, this function should be removed.
def __ne__(self, other):
"""Define inequiality behavior of request and response."""
return not self.__eq__(other)
@property
def text(self):
return self.content

View File

@ -15,4 +15,5 @@ pbr>=2.0.0 # Apache-2.0
iso8601>=0.1.11 # MIT
requests>=2.14.2 # Apache-2.0
stevedore>=1.20.0 # Apache-2.0
os-service-types>=1.2.0 # Apache-2.0
os-service-types>=1.2.0 # Apache-2.0
typing-extensions>=4.12 # PSF