From 8d2e02e9c01fe1df649bc39ac5fbf950338db053 Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Mon, 12 Aug 2024 11:02:55 +0100 Subject: [PATCH] typing: Annotate various plugin modules Signed-off-by: Stephen Finucane Change-Id: Ieb58e34e68ca6074a337ec8735eb42694f1734a5 --- keystoneauth1/http_basic.py | 19 +++++- keystoneauth1/noauth.py | 9 ++- keystoneauth1/service_token.py | 67 +++++++++++++------ .../tests/unit/test_service_token.py | 6 +- keystoneauth1/token_endpoint.py | 31 ++++++--- setup.cfg | 12 ++++ 6 files changed, 108 insertions(+), 36 deletions(-) diff --git a/keystoneauth1/http_basic.py b/keystoneauth1/http_basic.py index e4c8eed0..5fe2ae70 100644 --- a/keystoneauth1/http_basic.py +++ b/keystoneauth1/http_basic.py @@ -11,9 +11,13 @@ # under the License. import base64 +import typing as ty from keystoneauth1 import plugin +if ty.TYPE_CHECKING: + from keystoneauth1 import session as ks_session + AUTH_HEADER_NAME = 'Authorization' @@ -24,19 +28,28 @@ class HTTPBasicAuth(plugin.FixedEndpointPlugin): that might be deployed in standalone mode. """ - def __init__(self, endpoint=None, username=None, password=None): + def __init__( + self, + endpoint: ty.Optional[str] = None, + username: ty.Optional[str] = None, + password: ty.Optional[str] = None, + ): super().__init__(endpoint) self.username = username self.password = password - def get_token(self, session, **kwargs): + def get_token( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: if self.username is None or self.password is None: return None token = bytes(f'{self.username}:{self.password}', encoding='utf-8') encoded = base64.b64encode(token) return str(encoded, encoding='utf-8') - def get_headers(self, session, **kwargs): + def get_headers( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[ty.Dict[str, str]]: token = self.get_token(session) if not token: return None diff --git a/keystoneauth1/noauth.py b/keystoneauth1/noauth.py index c9ff51b1..bdc4bee9 100644 --- a/keystoneauth1/noauth.py +++ b/keystoneauth1/noauth.py @@ -10,8 +10,13 @@ # License for the specific language governing permissions and limitations # under the License. +import typing as ty + from keystoneauth1 import plugin +if ty.TYPE_CHECKING: + from keystoneauth1 import session as ks_session + class NoAuth(plugin.FixedEndpointPlugin): """A provider that will always use no auth. @@ -20,5 +25,7 @@ class NoAuth(plugin.FixedEndpointPlugin): that might be deployed in standalone/noauth mode. """ - def get_token(self, session, **kwargs): + def get_token( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: return 'notused' diff --git a/keystoneauth1/service_token.py b/keystoneauth1/service_token.py index fb79e2b3..a967d616 100644 --- a/keystoneauth1/service_token.py +++ b/keystoneauth1/service_token.py @@ -10,28 +10,39 @@ # License for the specific language governing permissions and limitations # under the License. +import typing as ty + from keystoneauth1 import plugin +if ty.TYPE_CHECKING: + from keystoneauth1 import session as ks_session + SERVICE_AUTH_HEADER_NAME = 'X-Service-Token' __all__ = ('ServiceTokenAuthWrapper',) class ServiceTokenAuthWrapper(plugin.BaseAuthPlugin): - def __init__(self, user_auth, service_auth): + def __init__( + self, + user_auth: plugin.BaseAuthPlugin, + service_auth: plugin.BaseAuthPlugin, + ): super().__init__() self.user_auth = user_auth self.service_auth = service_auth - def get_headers(self, session, **kwargs): - headers = self.user_auth.get_headers(session, **kwargs) - + def get_headers( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[ty.Dict[str, str]]: + headers = self.user_auth.get_headers(session, **kwargs) or {} token = self.service_auth.get_token(session, **kwargs) - headers[SERVICE_AUTH_HEADER_NAME] = token + if token: + headers[SERVICE_AUTH_HEADER_NAME] = token return headers - def invalidate(self): + def invalidate(self) -> bool: # NOTE(jamielennox): hmm, what to do here? Should we invalidate both # the service and user auth? Only one? There's no way to know what the # failure was to selectively invalidate. @@ -39,35 +50,49 @@ class ServiceTokenAuthWrapper(plugin.BaseAuthPlugin): service = self.service_auth.invalidate() return user or service - def get_connection_params(self, *args, **kwargs): + def get_connection_params( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Dict[str, ty.Any]: # NOTE(jamielennox): This is also a bit of a guess but unlikely to be a # problem in practice. We don't know how merging connection parameters # between these plugins will conflict - but there aren't many plugins # that set this anyway. # Take the service auth params first so that user auth params will be # given priority. - params = self.service_auth.get_connection_params(*args, **kwargs) - params.update(self.user_auth.get_connection_params(*args, **kwargs)) + params = self.service_auth.get_connection_params(session, **kwargs) + params.update(self.user_auth.get_connection_params(session, **kwargs)) return params # TODO(jamielennox): Everything below here is a generic wrapper that could # be extracted into a base wrapper class. We can do this as soon as there # is a need for it, but we may never actually need it. - def get_token(self, *args, **kwargs): - return self.user_auth.get_token(*args, **kwargs) + def get_token( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: + return self.user_auth.get_token(session, **kwargs) - def get_endpoint(self, *args, **kwargs): - return self.user_auth.get_endpoint(*args, **kwargs) + def get_endpoint( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: + return self.user_auth.get_endpoint(session, **kwargs) - def get_user_id(self, *args, **kwargs): - return self.user_auth.get_user_id(*args, **kwargs) + def get_user_id( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: + return self.user_auth.get_user_id(session, **kwargs) - def get_project_id(self, *args, **kwargs): - return self.user_auth.get_project_id(*args, **kwargs) + def get_project_id( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: + return self.user_auth.get_project_id(session, **kwargs) - def get_sp_auth_url(self, *args, **kwargs): - return self.user_auth.get_sp_auth_url(*args, **kwargs) + def get_sp_auth_url( + self, session: 'ks_session.Session', sp_id: str, **kwargs: ty.Any + ) -> ty.Optional[str]: + return self.user_auth.get_sp_auth_url(session, sp_id, **kwargs) - def get_sp_url(self, *args, **kwargs): - return self.user_auth.get_sp_url(*args, **kwargs) + def get_sp_url( + self, session: 'ks_session.Session', sp_id: str, **kwargs: ty.Any + ) -> ty.Optional[str]: + return self.user_auth.get_sp_url(session, sp_id, **kwargs) diff --git a/keystoneauth1/tests/unit/test_service_token.py b/keystoneauth1/tests/unit/test_service_token.py index f202cd0f..6f9c60be 100644 --- a/keystoneauth1/tests/unit/test_service_token.py +++ b/keystoneauth1/tests/unit/test_service_token.py @@ -109,8 +109,10 @@ class ServiceTokenTests(utils.TestCase): ) self.assertEqual( - self.user_auth.get_endpoint(self.session, 'identity'), - self.combined_auth.get_endpoint(self.session, 'identity'), + self.user_auth.get_endpoint(self.session, service_type='identity'), + self.combined_auth.get_endpoint( + self.session, service_type='identity' + ), ) self.assertEqual( diff --git a/keystoneauth1/token_endpoint.py b/keystoneauth1/token_endpoint.py index 9f814e71..2f93118c 100644 --- a/keystoneauth1/token_endpoint.py +++ b/keystoneauth1/token_endpoint.py @@ -10,8 +10,15 @@ # License for the specific language governing permissions and limitations # under the License. +import typing as ty + +from keystoneauth1 import discover from keystoneauth1 import plugin +if ty.TYPE_CHECKING: + from keystoneauth1.access import access + from keystoneauth1 import session as ks_session + class Token(plugin.BaseAuthPlugin): """A provider that will always use the given token and endpoint. @@ -20,24 +27,26 @@ class Token(plugin.BaseAuthPlugin): have a known endpoint and admin token that you want to use. """ - def __init__(self, endpoint, token): + def __init__(self, endpoint: ty.Optional[str], token: ty.Optional[str]): super().__init__() # NOTE(jamielennox): endpoint is reserved for when plugins # can be used to provide that information self.endpoint = endpoint self.token = token - def get_token(self, session, **kwargs): + def get_token( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: return self.token def get_endpoint_data( self, - session, + session: 'ks_session.Session', *, - endpoint_override=None, - discover_versions=True, - **kwargs, - ): + endpoint_override: ty.Optional[str] = None, + discover_versions: bool = True, + **kwargs: ty.Any, + ) -> ty.Optional[discover.EndpointData]: """Return a valid endpoint data for a the service. :param session: A session object that can be used for communication. @@ -65,7 +74,9 @@ class Token(plugin.BaseAuthPlugin): **kwargs, ) - def get_endpoint(self, session, **kwargs): + def get_endpoint( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional[str]: """Return the supplied endpoint. Using this plugin the same endpoint is returned regardless of the @@ -73,7 +84,9 @@ class Token(plugin.BaseAuthPlugin): """ return self.endpoint - def get_auth_ref(self, session, **kwargs): + def get_auth_ref( + self, session: 'ks_session.Session', **kwargs: ty.Any + ) -> ty.Optional['access.AccessInfo']: """Return the authentication reference of an auth plugin. :param session: A session object to be used for communication diff --git a/setup.cfg b/setup.cfg index f869b058..5a5dbb46 100644 --- a/setup.cfg +++ b/setup.cfg @@ -108,11 +108,23 @@ disallow_untyped_defs = true [mypy-keystoneauth1.discover] disallow_untyped_defs = true +[mypy-keystoneauth1.http_basic] +disallow_untyped_defs = true + +[mypy-keystoneauth1.noauth] +disallow_untyped_defs = true + [mypy-keystoneauth1.plugin] disallow_untyped_defs = true +[mypy-keystoneauth1.service_token] +disallow_untyped_defs = true + [mypy-keystoneauth1.session] disallow_untyped_defs = true +[mypy-keystoneauth1.token_endpoint] +disallow_untyped_defs = true + [mypy-keystoneauth1._fair_semaphore] disallow_untyped_defs = true