typing: Annotate keystoneauth1.exceptions

Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
Change-Id: Iffe298b5d327eaf7269f21019ee8b73fd573e104
This commit is contained in:
Stephen Finucane 2024-08-12 11:49:24 +01:00
parent f117b0d31b
commit dfd52e045e
11 changed files with 95 additions and 32 deletions

View File

@ -301,6 +301,7 @@ def _normalize_version_args(
normalized_min_version = normalize_version_number(version)
normalized_max_version = (normalized_min_version[0], LATEST)
if implied_version:
assert service_type is not None # nosec B101
if normalized_min_version[0] != implied_version[0]:
raise exceptions.ImpliedVersionMismatch(
service_type=service_type,
@ -344,6 +345,7 @@ def _normalize_version_args(
raise ValueError("min_version cannot be greater than max_version")
if implied_version:
assert service_type is not None # nosec B101
if normalized_min_version:
if normalized_min_version[0] != implied_version[0]:
raise exceptions.ImpliedMinVersionMismatch(

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import requests
from keystoneauth1 import _utils as utils
from keystoneauth1.exceptions import base
@ -21,7 +23,7 @@ class AuthorizationFailure(base.ClientException):
class MissingAuthMethods(base.ClientException):
message = "Not all required auth rules were satisfied"
def __init__(self, response):
def __init__(self, response: requests.Response):
self.response = response
self.receipt = response.headers.get("Openstack-Auth-Receipt")
body = response.json()

View File

@ -10,8 +10,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
from keystoneauth1.exceptions import base
if ty.TYPE_CHECKING:
from keystoneauth1 import loading
__all__ = (
'AuthPluginException',
@ -41,7 +46,7 @@ class NoMatchingPlugin(AuthPluginException):
The name of the plugin that was attempted to load.
"""
def __init__(self, name):
def __init__(self, name: str):
self.name = name
msg = f'The plugin {name} could not be found'
super().__init__(msg)
@ -57,7 +62,7 @@ class UnsupportedParameters(AuthPluginException):
Names of the unsupported parameters.
"""
def __init__(self, names):
def __init__(self, names: ty.Sequence[str]):
self.names = names
m = 'The following parameters were given that are unsupported: %s'
@ -87,7 +92,7 @@ class MissingRequiredOptions(OptionError):
List of the missing options.
"""
def __init__(self, options):
def __init__(self, options: ty.Sequence['loading.Opt']):
self.options = options
names = ", ".join(o.dest for o in options)

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
__all__ = ('ClientException',)
@ -19,6 +20,6 @@ class ClientException(Exception):
message = "ClientException"
def __init__(self, message=None):
def __init__(self, message: ty.Optional[str] = None):
self.message = message or self.message
super().__init__(self.message)

View File

@ -48,6 +48,6 @@ class SSLError(ConnectionError):
class UnknownConnectionError(ConnectionError):
"""An error was encountered but we don't know what it is."""
def __init__(self, msg, original):
def __init__(self, msg: str, original: Exception):
super().__init__(msg)
self.original = original

View File

@ -10,6 +10,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import typing as ty
import os_service_types
from keystoneauth1.exceptions import base
@ -25,6 +27,8 @@ __all__ = (
'VersionNotAvailable',
)
_PARSED_VERSION_T = ty.Tuple[ty.Union[int, float], ...]
class DiscoveryFailure(base.ClientException):
message = "Discovery of client versions failed."
@ -37,12 +41,14 @@ class VersionNotAvailable(DiscoveryFailure):
class ImpliedVersionMismatch(ValueError):
label = 'version'
def __init__(self, service_type, implied, given):
def __init__(
self, service_type: str, implied: _PARSED_VERSION_T, given: str
):
super().__init__(
f"service_type {service_type} was given which implies"
f" major API version {str(implied[0])} but {self.label} of"
f" {given} was also given. Please update your code"
f" to use the official service_type {_SERVICE_TYPES.get_service_type(service_type)}."
f"service_type {service_type} was given which implies major API "
f"version {str(implied[0])} but {self.label} of {given} was also "
f"given. Please update your code to use the official service_type "
f"{_SERVICE_TYPES.get_service_type(service_type)}."
)

View File

@ -18,8 +18,9 @@
"""HTTP Exceptions used by keystoneauth1."""
import inspect
import sys
import typing as ty
import requests
from keystoneauth1.exceptions import auth
from keystoneauth1.exceptions import base
@ -66,14 +67,14 @@ class HttpError(base.ClientException):
def __init__(
self,
message=None,
details=None,
response=None,
request_id=None,
url=None,
method=None,
http_status=None,
retry_after=0,
message: ty.Optional[str] = None,
details: ty.Optional[str] = None,
response: ty.Optional[requests.Response] = None,
request_id: ty.Optional[str] = None,
url: ty.Optional[str] = None,
method: ty.Optional[str] = None,
http_status: ty.Optional[int] = None,
retry_after: int = 0,
):
self.http_status = http_status or self.http_status
self.message = message or self.message
@ -256,13 +257,31 @@ class RequestEntityTooLarge(HTTPClientError):
http_status = 413
message = "Request Entity Too Large"
def __init__(self, *args, **kwargs):
def __init__(
self,
message: ty.Optional[str] = None,
details: ty.Optional[str] = None,
response: ty.Optional[requests.Response] = None,
request_id: ty.Optional[str] = None,
url: ty.Optional[str] = None,
method: ty.Optional[str] = None,
http_status: ty.Optional[int] = None,
retry_after: int = 0,
):
try:
self.retry_after = int(kwargs.pop('retry_after'))
self.retry_after = int(retry_after)
except (KeyError, ValueError):
self.retry_after = 0
super().__init__(*args, **kwargs)
super().__init__(
message=message,
details=details,
response=response,
request_id=request_id,
url=url,
method=method,
http_status=http_status,
)
class RequestUriTooLong(HTTPClientError):
@ -384,13 +403,37 @@ class HttpVersionNotSupported(HttpServerError):
# _code_map contains all the classes that have http_status attribute.
_code_map = {
getattr(obj, 'http_status', None): obj
for name, obj in vars(sys.modules[__name__]).items()
if inspect.isclass(obj) and getattr(obj, 'http_status', False)
400: BadRequest,
401: Unauthorized,
402: PaymentRequired,
403: Forbidden,
404: NotFound,
405: MethodNotAllowed,
406: NotAcceptable,
407: ProxyAuthenticationRequired,
408: RequestTimeout,
409: Conflict,
410: Gone,
411: LengthRequired,
412: PreconditionFailed,
413: RequestEntityTooLarge,
414: RequestUriTooLong,
415: UnsupportedMediaType,
416: RequestedRangeNotSatisfiable,
417: ExpectationFailed,
422: UnprocessableEntity,
500: InternalServerError,
501: HttpNotImplemented,
502: BadGateway,
503: ServiceUnavailable,
504: GatewayTimeout,
505: HttpVersionNotSupported,
}
def from_response(response, method, url):
def from_response(
response: requests.Response, method: str, url: str
) -> ty.Union[HttpError, auth.MissingAuthMethods]:
"""Return an instance of :class:`HttpError` or subclass based on response.
:param response: instance of `requests.Response` class
@ -468,4 +511,4 @@ def from_response(response, method, url):
cls = HTTPClientError
else:
cls = HttpError
return cls(**kwargs)
return cls(**kwargs) # type: ignore

View File

@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
import requests
from keystoneauth1.exceptions import base
@ -20,6 +21,6 @@ __all__ = ('InvalidResponse',)
class InvalidResponse(base.ClientException):
message = "Invalid response from server."
def __init__(self, response):
def __init__(self, response: requests.Response):
super().__init__()
self.response = response

View File

@ -18,7 +18,7 @@ __all__ = ('ServiceProviderNotFound',)
class ServiceProviderNotFound(base.ClientException):
"""A Service Provider cannot be found."""
def __init__(self, sp_id):
def __init__(self, sp_id: str):
self.sp_id = sp_id
msg = f'The Service Provider {sp_id} could not be found'
super().__init__(msg)

View File

@ -812,7 +812,7 @@ class OidcDeviceAuthorization(_OidcBase):
sanitized_response,
)
except exceptions.http.BadRequest as exc:
error = exc.response.json().get("error")
error = exc.response and exc.response.json().get("error")
if error != "authorization_pending":
raise
time.sleep(self.interval)

View File

@ -108,6 +108,9 @@ disallow_untyped_defs = true
[mypy-keystoneauth1.discover]
disallow_untyped_defs = true
[mypy-keystoneauth1.exceptions.*]
disallow_untyped_defs = true
[mypy-keystoneauth1.http_basic]
disallow_untyped_defs = true