typing: Make loaders broadly generic
Yet more fanciness. A Loader class always has a 1:1 relationship with a given Plugin class. The way to indicate this in the world of typing is via a generic. Start doing this, allowing us to remove a whole load of casts (including subclass methods that exist purely to allow us to cast the return type) and generally tidy everything up a little. Signed-off-by: Stephen Finucane <stephenfin@redhat.com> Change-Id: Ic77def3da3453614944589d59a86ec9fef44c955
This commit is contained in:
parent
e26894c708
commit
92746dfde9
@ -17,7 +17,7 @@ from keystoneauth1 import loading
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
|
||||
class Saml2Password(loading.BaseFederationLoader):
|
||||
class Saml2Password(loading.BaseFederationLoader[_saml2.V3Saml2Password]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[_saml2.V3Saml2Password]:
|
||||
return _saml2.V3Saml2Password
|
||||
@ -49,7 +49,7 @@ class Saml2Password(loading.BaseFederationLoader):
|
||||
return options
|
||||
|
||||
|
||||
class ADFSPassword(loading.BaseFederationLoader):
|
||||
class ADFSPassword(loading.BaseFederationLoader[_saml2.V3ADFSPassword]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[_saml2.V3ADFSPassword]:
|
||||
return _saml2.V3ADFSPassword
|
||||
|
@ -18,7 +18,7 @@ from keystoneauth1 import loading
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
|
||||
class Kerberos(loading.BaseV3Loader):
|
||||
class Kerberos(loading.BaseV3Loader[kerberos.Kerberos]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[kerberos.Kerberos]:
|
||||
return kerberos.Kerberos
|
||||
@ -54,12 +54,10 @@ class Kerberos(loading.BaseV3Loader):
|
||||
)
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
# NOTE(stephenfin): Cast is here because I can't figure out how to make
|
||||
# ensure a loader always maps 1:1 with a plugin
|
||||
return ty.cast(kerberos.Kerberos, super().load_from_options(**kwargs))
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class MappedKerberos(loading.BaseFederationLoader):
|
||||
class MappedKerberos(loading.BaseFederationLoader[kerberos.MappedKerberos]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[kerberos.MappedKerberos]:
|
||||
return kerberos.MappedKerberos
|
||||
@ -95,6 +93,4 @@ class MappedKerberos(loading.BaseFederationLoader):
|
||||
)
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
return ty.cast(
|
||||
kerberos.MappedKerberos, super().load_from_options(**kwargs)
|
||||
)
|
||||
return super().load_from_options(**kwargs)
|
||||
|
@ -19,7 +19,7 @@ from keystoneauth1.loading import opts
|
||||
|
||||
# NOTE(jamielennox): This is not a BaseV3Loader because we don't want to
|
||||
# include the scoping options like project-id in the option list
|
||||
class V3OAuth1(loading.BaseIdentityLoader):
|
||||
class V3OAuth1(loading.BaseIdentityLoader[v3.OAuth1]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[v3.OAuth1]:
|
||||
return v3.OAuth1
|
||||
|
@ -92,7 +92,7 @@ class TestPlugin(plugin.BaseAuthPlugin):
|
||||
# functions, everything else is on a best effort basis.
|
||||
|
||||
|
||||
class _TestPluginLoader(loading.BaseLoader):
|
||||
class _TestPluginLoader(loading.BaseLoader[TestPlugin]):
|
||||
def __init__(self, plugin):
|
||||
super().__init__()
|
||||
self._plugin = plugin
|
||||
|
@ -14,7 +14,7 @@ import typing as ty
|
||||
|
||||
from keystoneauth1.identity.v3 import base
|
||||
from keystoneauth1 import loading
|
||||
|
||||
from keystoneauth1 import plugin
|
||||
|
||||
__all__ = ('MultiFactor',)
|
||||
|
||||
@ -60,7 +60,10 @@ class MultiFactor(base.Auth):
|
||||
method_keys: ty.Set[str] = set()
|
||||
for method in auth_methods:
|
||||
# Using the loaders we pull the related auth method class
|
||||
plugin_class = loading.get_plugin_loader(method).plugin_class
|
||||
loader: loading.BaseLoader[plugin.BaseAuthPlugin] = (
|
||||
loading.get_plugin_loader(method)
|
||||
)
|
||||
plugin_class = loader.plugin_class
|
||||
if not issubclass(plugin_class, base.AuthConstructor):
|
||||
raise TypeError(
|
||||
'The multifactor auth method can only be used with v3 '
|
||||
|
@ -40,6 +40,9 @@ __all__ = (
|
||||
SENSITIVE_KEYS = ("password", "code", "token", "secret")
|
||||
|
||||
|
||||
_OidcBaseT = ty.TypeVar('_OidcBaseT', bound='_OidcBase')
|
||||
|
||||
|
||||
class _OidcBase(federation.FederationBaseAuth, metaclass=abc.ABCMeta):
|
||||
"""Base class for different OpenID Connect based flows.
|
||||
|
||||
|
@ -17,7 +17,7 @@ from keystoneauth1.loading import opts
|
||||
from keystoneauth1 import token_endpoint
|
||||
|
||||
|
||||
class AdminToken(loading.BaseLoader):
|
||||
class AdminToken(loading.BaseLoader[token_endpoint.Token]):
|
||||
"""Use an existing token and a known endpoint to perform requests.
|
||||
|
||||
This plugin is primarily useful for development or for use with identity
|
||||
|
@ -17,7 +17,7 @@ from keystoneauth1 import loading
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
|
||||
class HTTPBasicAuth(loading.BaseLoader):
|
||||
class HTTPBasicAuth(loading.BaseLoader[http_basic.HTTPBasicAuth]):
|
||||
"""Use HTTP Basic authentication to perform requests.
|
||||
|
||||
This can be used to instantiate clients for services deployed in
|
||||
|
@ -17,7 +17,7 @@ from keystoneauth1 import loading
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
|
||||
class Token(loading.BaseGenericLoader):
|
||||
class Token(loading.BaseGenericLoader[identity.Token]):
|
||||
"""Given an existing token rescope it to another target.
|
||||
|
||||
This plugin uses the Identity service's rescope mechanism to get a new
|
||||
@ -49,7 +49,7 @@ class Token(loading.BaseGenericLoader):
|
||||
return options
|
||||
|
||||
|
||||
class Password(loading.BaseGenericLoader):
|
||||
class Password(loading.BaseGenericLoader[identity.Password]):
|
||||
"""Authenticate via a username and password.
|
||||
|
||||
Authenticate to the identity service using an inbuilt username and
|
||||
|
@ -17,7 +17,7 @@ from keystoneauth1 import loading
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
|
||||
class Token(loading.BaseV2Loader):
|
||||
class Token(loading.BaseV2Loader[identity.V2Token]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V2Token]:
|
||||
return identity.V2Token
|
||||
@ -30,7 +30,7 @@ class Token(loading.BaseV2Loader):
|
||||
return options
|
||||
|
||||
|
||||
class Password(loading.BaseV2Loader):
|
||||
class Password(loading.BaseV2Loader[identity.V2Password]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V2Password]:
|
||||
return identity.V2Password
|
||||
|
@ -14,12 +14,10 @@ import typing as ty
|
||||
|
||||
from keystoneauth1 import exceptions
|
||||
from keystoneauth1 import identity
|
||||
from keystoneauth1.identity.v3 import oidc
|
||||
from keystoneauth1 import loading
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
if ty.TYPE_CHECKING:
|
||||
from keystoneauth1.identity.v3 import oidc
|
||||
|
||||
|
||||
def _add_common_identity_options(options: ty.List[opts.Opt]) -> None:
|
||||
options.extend(
|
||||
@ -48,7 +46,7 @@ def _assert_identity_options(options: ty.Dict[str, ty.Any]) -> None:
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
|
||||
class Password(loading.BaseV3Loader):
|
||||
class Password(loading.BaseV3Loader[identity.V3Password]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3Password]:
|
||||
return identity.V3Password
|
||||
@ -72,13 +70,11 @@ class Password(loading.BaseV3Loader):
|
||||
|
||||
def load_from_options(self, **kwargs: ty.Any) -> identity.V3Password:
|
||||
_assert_identity_options(kwargs)
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3Password, super().load_from_options(**kwargs)
|
||||
)
|
||||
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class Token(loading.BaseV3Loader):
|
||||
class Token(loading.BaseV3Loader[identity.V3Token]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3Token]:
|
||||
return identity.V3Token
|
||||
@ -96,13 +92,9 @@ class Token(loading.BaseV3Loader):
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(self, **kwargs: ty.Any) -> identity.V3Token:
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(identity.V3Token, super().load_from_options(**kwargs))
|
||||
|
||||
|
||||
class _OpenIDConnectBase(loading.BaseFederationLoader):
|
||||
def load_from_options(self, **kwargs: ty.Any) -> 'oidc._OidcBase':
|
||||
class _OpenIDConnectBase(loading.BaseFederationLoader[oidc._OidcBaseT]):
|
||||
def load_from_options(self, **kwargs: ty.Any) -> oidc._OidcBaseT:
|
||||
if not (
|
||||
kwargs.get('access_token_endpoint')
|
||||
or kwargs.get('discovery_endpoint')
|
||||
@ -113,8 +105,7 @@ class _OpenIDConnectBase(loading.BaseFederationLoader):
|
||||
)
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast('oidc._OidcBase', super().load_from_options(**kwargs))
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
def get_options(self) -> ty.List[opts.Opt]:
|
||||
options = super().get_options()
|
||||
@ -166,7 +157,9 @@ class _OpenIDConnectBase(loading.BaseFederationLoader):
|
||||
return options
|
||||
|
||||
|
||||
class OpenIDConnectClientCredentials(_OpenIDConnectBase):
|
||||
class OpenIDConnectClientCredentials(
|
||||
_OpenIDConnectBase[identity.V3OidcClientCredentials]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OidcClientCredentials]:
|
||||
return identity.V3OidcClientCredentials
|
||||
@ -176,17 +169,8 @@ class OpenIDConnectClientCredentials(_OpenIDConnectBase):
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(
|
||||
self, **kwargs: ty.Any
|
||||
) -> identity.V3OidcClientCredentials:
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OidcClientCredentials,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
|
||||
|
||||
class OpenIDConnectPassword(_OpenIDConnectBase):
|
||||
class OpenIDConnectPassword(_OpenIDConnectBase[identity.V3OidcPassword]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OidcPassword]:
|
||||
return identity.V3OidcPassword
|
||||
@ -211,14 +195,10 @@ class OpenIDConnectPassword(_OpenIDConnectBase):
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(self, **kwargs: ty.Any) -> identity.V3OidcPassword:
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OidcPassword, super().load_from_options(**kwargs)
|
||||
)
|
||||
|
||||
|
||||
class OpenIDConnectAuthorizationCode(_OpenIDConnectBase):
|
||||
class OpenIDConnectAuthorizationCode(
|
||||
_OpenIDConnectBase[identity.V3OidcAuthorizationCode]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OidcAuthorizationCode]:
|
||||
return identity.V3OidcAuthorizationCode
|
||||
@ -243,17 +223,10 @@ class OpenIDConnectAuthorizationCode(_OpenIDConnectBase):
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(
|
||||
self, **kwargs: ty.Any
|
||||
) -> identity.V3OidcAuthorizationCode:
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OidcAuthorizationCode,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
|
||||
|
||||
class OpenIDConnectAccessToken(loading.BaseFederationLoader):
|
||||
class OpenIDConnectAccessToken(
|
||||
loading.BaseFederationLoader[identity.V3OidcAccessToken]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OidcAccessToken]:
|
||||
return identity.V3OidcAccessToken
|
||||
@ -273,16 +246,10 @@ class OpenIDConnectAccessToken(loading.BaseFederationLoader):
|
||||
)
|
||||
return options
|
||||
|
||||
def load_from_options(
|
||||
self, **kwargs: ty.Any
|
||||
) -> identity.V3OidcAccessToken:
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OidcAccessToken, super().load_from_options(**kwargs)
|
||||
)
|
||||
|
||||
|
||||
class OpenIDConnectDeviceAuthorization(_OpenIDConnectBase):
|
||||
class OpenIDConnectDeviceAuthorization(
|
||||
_OpenIDConnectBase[identity.V3OidcDeviceAuthorization]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OidcDeviceAuthorization]:
|
||||
return identity.V3OidcDeviceAuthorization
|
||||
@ -311,17 +278,8 @@ class OpenIDConnectDeviceAuthorization(_OpenIDConnectBase):
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(
|
||||
self, **kwargs: ty.Any
|
||||
) -> identity.V3OidcDeviceAuthorization:
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OidcDeviceAuthorization,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
|
||||
|
||||
class TOTP(loading.BaseV3Loader):
|
||||
class TOTP(loading.BaseV3Loader[identity.V3TOTP]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3TOTP]:
|
||||
return identity.V3TOTP
|
||||
@ -345,11 +303,11 @@ class TOTP(loading.BaseV3Loader):
|
||||
|
||||
def load_from_options(self, **kwargs: ty.Any) -> identity.V3TOTP:
|
||||
_assert_identity_options(kwargs)
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(identity.V3TOTP, super().load_from_options(**kwargs))
|
||||
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class TokenlessAuth(loading.BaseLoader):
|
||||
class TokenlessAuth(loading.BaseLoader[identity.V3TokenlessAuth]):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3TokenlessAuth]:
|
||||
return identity.V3TokenlessAuth
|
||||
@ -402,14 +360,12 @@ class TokenlessAuth(loading.BaseLoader):
|
||||
)
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
return ty.cast(
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
identity.V3TokenlessAuth,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class ApplicationCredential(loading.BaseV3Loader):
|
||||
class ApplicationCredential(
|
||||
loading.BaseV3Loader[identity.V3ApplicationCredential]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3ApplicationCredential]:
|
||||
return identity.V3ApplicationCredential
|
||||
@ -455,14 +411,10 @@ class ApplicationCredential(loading.BaseV3Loader):
|
||||
m = 'You must provide an auth secret.'
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3ApplicationCredential,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class MultiFactor(loading.BaseV3Loader):
|
||||
class MultiFactor(loading.BaseV3Loader[identity.V3MultiFactor]):
|
||||
def __init__(self) -> None:
|
||||
super().__init__()
|
||||
self._methods = None
|
||||
@ -501,13 +453,12 @@ class MultiFactor(loading.BaseV3Loader):
|
||||
|
||||
self._methods = kwargs['auth_methods']
|
||||
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3MultiFactor, super().load_from_options(**kwargs)
|
||||
)
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class OAuth2ClientCredential(loading.BaseV3Loader):
|
||||
class OAuth2ClientCredential(
|
||||
loading.BaseV3Loader[identity.V3OAuth2ClientCredential]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OAuth2ClientCredential]:
|
||||
return identity.V3OAuth2ClientCredential
|
||||
@ -550,14 +501,12 @@ class OAuth2ClientCredential(loading.BaseV3Loader):
|
||||
m = 'You must provide an OAuth2.0 client credential auth secret.'
|
||||
raise exceptions.OptionError(m)
|
||||
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OAuth2ClientCredential,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class OAuth2mTlsClientCredential(loading.BaseV3Loader):
|
||||
class OAuth2mTlsClientCredential(
|
||||
loading.BaseV3Loader[identity.V3OAuth2mTlsClientCredential]
|
||||
):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[identity.V3OAuth2mTlsClientCredential]:
|
||||
return identity.V3OAuth2mTlsClientCredential
|
||||
@ -593,8 +542,4 @@ class OAuth2mTlsClientCredential(loading.BaseV3Loader):
|
||||
'OAuth2.0 Mutual-TLS Authorization.'
|
||||
)
|
||||
raise exceptions.OptionError(m)
|
||||
# TODO(stephenfin): Avoid the need for this cast with generics?
|
||||
return ty.cast(
|
||||
identity.V3OAuth2mTlsClientCredential,
|
||||
super().load_from_options(**kwargs),
|
||||
)
|
||||
return super().load_from_options(**kwargs)
|
||||
|
@ -17,7 +17,7 @@ from keystoneauth1.loading import opts
|
||||
from keystoneauth1 import noauth
|
||||
|
||||
|
||||
class NoAuth(loading.BaseLoader):
|
||||
class NoAuth(loading.BaseLoader[noauth.NoAuth]):
|
||||
"""Use no tokens to perform requests.
|
||||
|
||||
This can be used to instantiate clients for services deployed in
|
||||
|
@ -17,10 +17,10 @@ import stevedore
|
||||
from stevedore import extension
|
||||
|
||||
from keystoneauth1 import exceptions
|
||||
from keystoneauth1 import plugin
|
||||
|
||||
if ty.TYPE_CHECKING:
|
||||
from keystoneauth1.loading import opts
|
||||
from keystoneauth1 import plugin # noqa: F401
|
||||
|
||||
PLUGIN_NAMESPACE = 'keystoneauth1.plugin'
|
||||
|
||||
@ -34,6 +34,8 @@ __all__ = (
|
||||
'PLUGIN_NAMESPACE',
|
||||
)
|
||||
|
||||
T = ty.TypeVar('T', covariant=True)
|
||||
|
||||
|
||||
def _auth_plugin_available(ext: extension.Extension) -> bool:
|
||||
"""Read the value of available for whether to load this plugin."""
|
||||
@ -58,7 +60,9 @@ def get_available_plugin_names() -> ty.FrozenSet[str]:
|
||||
return frozenset(mgr.names())
|
||||
|
||||
|
||||
def get_available_plugin_loaders() -> ty.Dict[str, 'BaseLoader']:
|
||||
def get_available_plugin_loaders() -> (
|
||||
ty.Dict[str, 'BaseLoader[plugin.BaseAuthPluginT]']
|
||||
):
|
||||
"""Retrieve all the plugin classes available on the system.
|
||||
|
||||
:returns: A dict with plugin entrypoint name as the key and the plugin
|
||||
@ -75,7 +79,7 @@ def get_available_plugin_loaders() -> ty.Dict[str, 'BaseLoader']:
|
||||
return dict(mgr.map(lambda ext: (ext.entry_point.name, ext.obj)))
|
||||
|
||||
|
||||
def get_plugin_loader(name: str) -> 'BaseLoader':
|
||||
def get_plugin_loader(name: str) -> 'BaseLoader[plugin.BaseAuthPluginT]':
|
||||
"""Retrieve a plugin class by its entrypoint name.
|
||||
|
||||
:param str name: The name of the object to get.
|
||||
@ -93,7 +97,7 @@ def get_plugin_loader(name: str) -> 'BaseLoader':
|
||||
except RuntimeError:
|
||||
raise exceptions.NoMatchingPlugin(name)
|
||||
|
||||
return ty.cast('BaseLoader', mgr.driver)
|
||||
return ty.cast('BaseLoader[plugin.BaseAuthPluginT]', mgr.driver)
|
||||
|
||||
|
||||
def get_plugin_options(name: str) -> ty.List['opts.Opt']:
|
||||
@ -110,9 +114,6 @@ def get_plugin_options(name: str) -> ty.List['opts.Opt']:
|
||||
return get_plugin_loader(name).get_options()
|
||||
|
||||
|
||||
T = ty.TypeVar('T')
|
||||
|
||||
|
||||
class _BaseLoader(ty.Generic[T], metaclass=abc.ABCMeta):
|
||||
@property
|
||||
def plugin_class(self) -> ty.Type[T]:
|
||||
@ -204,4 +205,4 @@ class _BaseLoader(ty.Generic[T], metaclass=abc.ABCMeta):
|
||||
return self.load_from_options(**kwargs)
|
||||
|
||||
|
||||
class BaseLoader(_BaseLoader['plugin.BaseAuthPlugin']): ...
|
||||
class BaseLoader(_BaseLoader[plugin.BaseAuthPluginT]): ...
|
||||
|
@ -26,7 +26,7 @@ __all__ = ('register_argparse_arguments', 'load_from_argparse_arguments')
|
||||
|
||||
def _register_plugin_argparse_arguments(
|
||||
parser: ty.Union[argparse.ArgumentParser, argparse._ArgumentGroup],
|
||||
plugin: base.BaseLoader,
|
||||
plugin: base.BaseLoader['plugin.BaseAuthPluginT'],
|
||||
) -> None:
|
||||
for opt in plugin.get_options():
|
||||
parser.add_argument(
|
||||
@ -40,7 +40,7 @@ def _register_plugin_argparse_arguments(
|
||||
|
||||
def register_argparse_arguments(
|
||||
parser: argparse.ArgumentParser, argv: ty.List[str], default: ty.Any = None
|
||||
) -> ty.Optional[base.BaseLoader]:
|
||||
) -> ty.Optional[base.BaseLoader['plugin.BaseAuthPluginT']]:
|
||||
"""Register CLI options needed to create a plugin.
|
||||
|
||||
The function inspects the provided arguments so that it can also register
|
||||
@ -90,7 +90,7 @@ def register_argparse_arguments(
|
||||
|
||||
def load_from_argparse_arguments(
|
||||
namespace: argparse.Namespace, **kwargs: ty.Any
|
||||
) -> ty.Optional['plugin.BaseAuthPlugin']:
|
||||
) -> ty.Optional['plugin.BaseAuthPluginT']:
|
||||
"""Retrieve the created plugin from the completed argparse results.
|
||||
|
||||
Loads and creates the auth plugin from the information parsed from the
|
||||
@ -104,15 +104,18 @@ def load_from_argparse_arguments(
|
||||
:raises keystoneauth1.exceptions.auth_plugins.NoMatchingPlugin:
|
||||
if a plugin cannot be created.
|
||||
"""
|
||||
if not namespace.os_auth_type:
|
||||
os_auth_type = namespace.os_auth_type
|
||||
|
||||
if not os_auth_type:
|
||||
return None
|
||||
|
||||
if isinstance(namespace.os_auth_type, base.BaseLoader):
|
||||
plugin = namespace.os_auth_type
|
||||
loader: base.BaseLoader[plugin.BaseAuthPluginT]
|
||||
if isinstance(os_auth_type, base.BaseLoader):
|
||||
loader = os_auth_type
|
||||
else:
|
||||
plugin = base.get_plugin_loader(namespace.os_auth_type)
|
||||
loader = base.get_plugin_loader(os_auth_type)
|
||||
|
||||
def _getter(opt: 'opts.Opt') -> ty.Any:
|
||||
return getattr(namespace, f'os_{opt.dest}')
|
||||
|
||||
return plugin.load_from_options_getter(_getter, **kwargs)
|
||||
return loader.load_from_options_getter(_getter, **kwargs)
|
||||
|
@ -54,7 +54,9 @@ def get_common_conf_options() -> ty.List['cfg.Opt']:
|
||||
|
||||
|
||||
def get_plugin_conf_options(
|
||||
plugin: ty.Union[base.BaseLoader, str],
|
||||
plugin: ty.Union[
|
||||
base.BaseLoader[keystoneauth1.plugin.BaseAuthPluginT], str
|
||||
],
|
||||
) -> ty.List['cfg.Opt']:
|
||||
"""Get the oslo_config options for a specific plugin.
|
||||
|
||||
@ -62,7 +64,7 @@ def get_plugin_conf_options(
|
||||
the specified plugin.
|
||||
|
||||
:param plugin: The name of the plugin loader or a plugin loader object
|
||||
:type plugin: str or keystoneauth1._loading.BaseLoader
|
||||
:type plugin: str or keystoneauth1.loading.BaseLoader
|
||||
|
||||
:returns: A list of oslo_config options.
|
||||
"""
|
||||
@ -136,7 +138,9 @@ def load_from_conf_options(
|
||||
if not name:
|
||||
return None
|
||||
|
||||
loader = base.get_plugin_loader(name)
|
||||
loader: base.BaseLoader[keystoneauth1.plugin.BaseAuthPlugin] = (
|
||||
base.get_plugin_loader(name)
|
||||
)
|
||||
loader_opts = loader.get_options()
|
||||
oslo_opts = [o._to_oslo_opt() for o in loader_opts]
|
||||
|
||||
|
@ -17,7 +17,10 @@ from keystoneauth1.loading import base
|
||||
from keystoneauth1.loading import opts
|
||||
|
||||
if ty.TYPE_CHECKING:
|
||||
from keystoneauth1 import plugin
|
||||
from keystoneauth1.identity import base as base_plugins
|
||||
from keystoneauth1.identity import generic as generic_plugins
|
||||
from keystoneauth1.identity import v2 as v2_plugins
|
||||
from keystoneauth1.identity import v3 as v3_plugins
|
||||
|
||||
__all__ = (
|
||||
'BaseIdentityLoader',
|
||||
@ -28,7 +31,18 @@ __all__ = (
|
||||
)
|
||||
|
||||
|
||||
class BaseIdentityLoader(base.BaseLoader):
|
||||
PluginT = ty.TypeVar('PluginT', bound='base_plugins.BaseIdentityPlugin')
|
||||
V2PluginT = ty.TypeVar('V2PluginT', bound='v2_plugins.Auth')
|
||||
V3PluginT = ty.TypeVar('V3PluginT', bound='v3_plugins.BaseAuth')
|
||||
V3FederationPluginT = ty.TypeVar(
|
||||
'V3FederationPluginT', bound='v3_plugins.FederationBaseAuth'
|
||||
)
|
||||
GenericPluginT = ty.TypeVar(
|
||||
'GenericPluginT', bound='generic_plugins.BaseGenericPlugin'
|
||||
)
|
||||
|
||||
|
||||
class BaseIdentityLoader(base.BaseLoader[PluginT]):
|
||||
"""Base Option handling for identity plugins.
|
||||
|
||||
This class defines options and handling that should be common across all
|
||||
@ -47,7 +61,7 @@ class BaseIdentityLoader(base.BaseLoader):
|
||||
return options
|
||||
|
||||
|
||||
class BaseV2Loader(BaseIdentityLoader):
|
||||
class BaseV2Loader(BaseIdentityLoader[V2PluginT]):
|
||||
"""Base Option handling for identity plugins.
|
||||
|
||||
This class defines options and handling that should be common to the V2
|
||||
@ -71,7 +85,7 @@ class BaseV2Loader(BaseIdentityLoader):
|
||||
return options
|
||||
|
||||
|
||||
class BaseV3Loader(BaseIdentityLoader):
|
||||
class BaseV3Loader(BaseIdentityLoader[V3PluginT]):
|
||||
"""Base Option handling for identity plugins.
|
||||
|
||||
This class defines options and handling that should be common to the V3
|
||||
@ -104,7 +118,7 @@ class BaseV3Loader(BaseIdentityLoader):
|
||||
|
||||
return options
|
||||
|
||||
def load_from_options(self, **kwargs: ty.Any) -> 'plugin.BaseAuthPlugin':
|
||||
def load_from_options(self, **kwargs: ty.Any) -> V3PluginT:
|
||||
if kwargs.get('project_name') and not (
|
||||
kwargs.get('project_domain_name')
|
||||
or kwargs.get('project_domain_id')
|
||||
@ -120,7 +134,7 @@ class BaseV3Loader(BaseIdentityLoader):
|
||||
return super().load_from_options(**kwargs)
|
||||
|
||||
|
||||
class BaseFederationLoader(BaseV3Loader):
|
||||
class BaseFederationLoader(BaseV3Loader[V3FederationPluginT]):
|
||||
"""Base Option handling for federation plugins.
|
||||
|
||||
This class defines options and handling that should be common to the V3
|
||||
@ -149,7 +163,7 @@ class BaseFederationLoader(BaseV3Loader):
|
||||
return options
|
||||
|
||||
|
||||
class BaseGenericLoader(BaseIdentityLoader):
|
||||
class BaseGenericLoader(BaseIdentityLoader[GenericPluginT]):
|
||||
"""Base Option handling for generic plugins.
|
||||
|
||||
This class defines options and handling that should be common to generic
|
||||
|
@ -26,6 +26,11 @@ AUTH_INTERFACE = object()
|
||||
IDENTITY_AUTH_HEADER_NAME = 'X-Auth-Token'
|
||||
|
||||
|
||||
BaseAuthPluginT = ty.TypeVar(
|
||||
'BaseAuthPluginT', bound='BaseAuthPlugin', covariant=True
|
||||
)
|
||||
|
||||
|
||||
class BaseAuthPlugin:
|
||||
"""The basic structure of an authentication plugin.
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user