typing: Annotate various plugin modules

Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Change-Id: Ieb58e34e68ca6074a337ec8735eb42694f1734a5
This commit is contained in:
Stephen Finucane 2024-08-12 11:02:55 +01:00
parent c45aa48363
commit 8d2e02e9c0
6 changed files with 108 additions and 36 deletions

View File

@ -11,9 +11,13 @@
# under the License. # under the License.
import base64 import base64
import typing as ty
from keystoneauth1 import plugin from keystoneauth1 import plugin
if ty.TYPE_CHECKING:
from keystoneauth1 import session as ks_session
AUTH_HEADER_NAME = 'Authorization' AUTH_HEADER_NAME = 'Authorization'
@ -24,19 +28,28 @@ class HTTPBasicAuth(plugin.FixedEndpointPlugin):
that might be deployed in standalone mode. that might be deployed in standalone mode.
""" """
def __init__(self, endpoint=None, username=None, password=None): def __init__(
self,
endpoint: ty.Optional[str] = None,
username: ty.Optional[str] = None,
password: ty.Optional[str] = None,
):
super().__init__(endpoint) super().__init__(endpoint)
self.username = username self.username = username
self.password = password self.password = password
def get_token(self, session, **kwargs): def get_token(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
if self.username is None or self.password is None: if self.username is None or self.password is None:
return None return None
token = bytes(f'{self.username}:{self.password}', encoding='utf-8') token = bytes(f'{self.username}:{self.password}', encoding='utf-8')
encoded = base64.b64encode(token) encoded = base64.b64encode(token)
return str(encoded, encoding='utf-8') return str(encoded, encoding='utf-8')
def get_headers(self, session, **kwargs): def get_headers(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[ty.Dict[str, str]]:
token = self.get_token(session) token = self.get_token(session)
if not token: if not token:
return None return None

View File

@ -10,8 +10,13 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing as ty
from keystoneauth1 import plugin from keystoneauth1 import plugin
if ty.TYPE_CHECKING:
from keystoneauth1 import session as ks_session
class NoAuth(plugin.FixedEndpointPlugin): class NoAuth(plugin.FixedEndpointPlugin):
"""A provider that will always use no auth. """A provider that will always use no auth.
@ -20,5 +25,7 @@ class NoAuth(plugin.FixedEndpointPlugin):
that might be deployed in standalone/noauth mode. that might be deployed in standalone/noauth mode.
""" """
def get_token(self, session, **kwargs): def get_token(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
return 'notused' return 'notused'

View File

@ -10,28 +10,39 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing as ty
from keystoneauth1 import plugin from keystoneauth1 import plugin
if ty.TYPE_CHECKING:
from keystoneauth1 import session as ks_session
SERVICE_AUTH_HEADER_NAME = 'X-Service-Token' SERVICE_AUTH_HEADER_NAME = 'X-Service-Token'
__all__ = ('ServiceTokenAuthWrapper',) __all__ = ('ServiceTokenAuthWrapper',)
class ServiceTokenAuthWrapper(plugin.BaseAuthPlugin): class ServiceTokenAuthWrapper(plugin.BaseAuthPlugin):
def __init__(self, user_auth, service_auth): def __init__(
self,
user_auth: plugin.BaseAuthPlugin,
service_auth: plugin.BaseAuthPlugin,
):
super().__init__() super().__init__()
self.user_auth = user_auth self.user_auth = user_auth
self.service_auth = service_auth self.service_auth = service_auth
def get_headers(self, session, **kwargs): def get_headers(
headers = self.user_auth.get_headers(session, **kwargs) self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[ty.Dict[str, str]]:
headers = self.user_auth.get_headers(session, **kwargs) or {}
token = self.service_auth.get_token(session, **kwargs) token = self.service_auth.get_token(session, **kwargs)
if token:
headers[SERVICE_AUTH_HEADER_NAME] = token headers[SERVICE_AUTH_HEADER_NAME] = token
return headers return headers
def invalidate(self): def invalidate(self) -> bool:
# NOTE(jamielennox): hmm, what to do here? Should we invalidate both # NOTE(jamielennox): hmm, what to do here? Should we invalidate both
# the service and user auth? Only one? There's no way to know what the # the service and user auth? Only one? There's no way to know what the
# failure was to selectively invalidate. # failure was to selectively invalidate.
@ -39,35 +50,49 @@ class ServiceTokenAuthWrapper(plugin.BaseAuthPlugin):
service = self.service_auth.invalidate() service = self.service_auth.invalidate()
return user or service return user or service
def get_connection_params(self, *args, **kwargs): def get_connection_params(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Dict[str, ty.Any]:
# NOTE(jamielennox): This is also a bit of a guess but unlikely to be a # NOTE(jamielennox): This is also a bit of a guess but unlikely to be a
# problem in practice. We don't know how merging connection parameters # problem in practice. We don't know how merging connection parameters
# between these plugins will conflict - but there aren't many plugins # between these plugins will conflict - but there aren't many plugins
# that set this anyway. # that set this anyway.
# Take the service auth params first so that user auth params will be # Take the service auth params first so that user auth params will be
# given priority. # given priority.
params = self.service_auth.get_connection_params(*args, **kwargs) params = self.service_auth.get_connection_params(session, **kwargs)
params.update(self.user_auth.get_connection_params(*args, **kwargs)) params.update(self.user_auth.get_connection_params(session, **kwargs))
return params return params
# TODO(jamielennox): Everything below here is a generic wrapper that could # TODO(jamielennox): Everything below here is a generic wrapper that could
# be extracted into a base wrapper class. We can do this as soon as there # be extracted into a base wrapper class. We can do this as soon as there
# is a need for it, but we may never actually need it. # is a need for it, but we may never actually need it.
def get_token(self, *args, **kwargs): def get_token(
return self.user_auth.get_token(*args, **kwargs) self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
return self.user_auth.get_token(session, **kwargs)
def get_endpoint(self, *args, **kwargs): def get_endpoint(
return self.user_auth.get_endpoint(*args, **kwargs) self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
return self.user_auth.get_endpoint(session, **kwargs)
def get_user_id(self, *args, **kwargs): def get_user_id(
return self.user_auth.get_user_id(*args, **kwargs) self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
return self.user_auth.get_user_id(session, **kwargs)
def get_project_id(self, *args, **kwargs): def get_project_id(
return self.user_auth.get_project_id(*args, **kwargs) self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
return self.user_auth.get_project_id(session, **kwargs)
def get_sp_auth_url(self, *args, **kwargs): def get_sp_auth_url(
return self.user_auth.get_sp_auth_url(*args, **kwargs) self, session: 'ks_session.Session', sp_id: str, **kwargs: ty.Any
) -> ty.Optional[str]:
return self.user_auth.get_sp_auth_url(session, sp_id, **kwargs)
def get_sp_url(self, *args, **kwargs): def get_sp_url(
return self.user_auth.get_sp_url(*args, **kwargs) self, session: 'ks_session.Session', sp_id: str, **kwargs: ty.Any
) -> ty.Optional[str]:
return self.user_auth.get_sp_url(session, sp_id, **kwargs)

View File

@ -109,8 +109,10 @@ class ServiceTokenTests(utils.TestCase):
) )
self.assertEqual( self.assertEqual(
self.user_auth.get_endpoint(self.session, 'identity'), self.user_auth.get_endpoint(self.session, service_type='identity'),
self.combined_auth.get_endpoint(self.session, 'identity'), self.combined_auth.get_endpoint(
self.session, service_type='identity'
),
) )
self.assertEqual( self.assertEqual(

View File

@ -10,8 +10,15 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import typing as ty
from keystoneauth1 import discover
from keystoneauth1 import plugin from keystoneauth1 import plugin
if ty.TYPE_CHECKING:
from keystoneauth1.access import access
from keystoneauth1 import session as ks_session
class Token(plugin.BaseAuthPlugin): class Token(plugin.BaseAuthPlugin):
"""A provider that will always use the given token and endpoint. """A provider that will always use the given token and endpoint.
@ -20,24 +27,26 @@ class Token(plugin.BaseAuthPlugin):
have a known endpoint and admin token that you want to use. have a known endpoint and admin token that you want to use.
""" """
def __init__(self, endpoint, token): def __init__(self, endpoint: ty.Optional[str], token: ty.Optional[str]):
super().__init__() super().__init__()
# NOTE(jamielennox): endpoint is reserved for when plugins # NOTE(jamielennox): endpoint is reserved for when plugins
# can be used to provide that information # can be used to provide that information
self.endpoint = endpoint self.endpoint = endpoint
self.token = token self.token = token
def get_token(self, session, **kwargs): def get_token(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
return self.token return self.token
def get_endpoint_data( def get_endpoint_data(
self, self,
session, session: 'ks_session.Session',
*, *,
endpoint_override=None, endpoint_override: ty.Optional[str] = None,
discover_versions=True, discover_versions: bool = True,
**kwargs, **kwargs: ty.Any,
): ) -> ty.Optional[discover.EndpointData]:
"""Return a valid endpoint data for a the service. """Return a valid endpoint data for a the service.
:param session: A session object that can be used for communication. :param session: A session object that can be used for communication.
@ -65,7 +74,9 @@ class Token(plugin.BaseAuthPlugin):
**kwargs, **kwargs,
) )
def get_endpoint(self, session, **kwargs): def get_endpoint(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional[str]:
"""Return the supplied endpoint. """Return the supplied endpoint.
Using this plugin the same endpoint is returned regardless of the Using this plugin the same endpoint is returned regardless of the
@ -73,7 +84,9 @@ class Token(plugin.BaseAuthPlugin):
""" """
return self.endpoint return self.endpoint
def get_auth_ref(self, session, **kwargs): def get_auth_ref(
self, session: 'ks_session.Session', **kwargs: ty.Any
) -> ty.Optional['access.AccessInfo']:
"""Return the authentication reference of an auth plugin. """Return the authentication reference of an auth plugin.
:param session: A session object to be used for communication :param session: A session object to be used for communication

View File

@ -108,11 +108,23 @@ disallow_untyped_defs = true
[mypy-keystoneauth1.discover] [mypy-keystoneauth1.discover]
disallow_untyped_defs = true disallow_untyped_defs = true
[mypy-keystoneauth1.http_basic]
disallow_untyped_defs = true
[mypy-keystoneauth1.noauth]
disallow_untyped_defs = true
[mypy-keystoneauth1.plugin] [mypy-keystoneauth1.plugin]
disallow_untyped_defs = true disallow_untyped_defs = true
[mypy-keystoneauth1.service_token]
disallow_untyped_defs = true
[mypy-keystoneauth1.session] [mypy-keystoneauth1.session]
disallow_untyped_defs = true disallow_untyped_defs = true
[mypy-keystoneauth1.token_endpoint]
disallow_untyped_defs = true
[mypy-keystoneauth1._fair_semaphore] [mypy-keystoneauth1._fair_semaphore]
disallow_untyped_defs = true disallow_untyped_defs = true