Add typing

Change-Id: I92fce9c3f0cb290937cf213baeb264d891f144ef
Signed-off-by: Stephen Finucane <sfinucan@redhat.com>
This commit is contained in:
Stephen Finucane
2025-09-30 17:23:08 +01:00
parent a1279f7b98
commit 672cce4dc1
19 changed files with 477 additions and 230 deletions

View File

@@ -23,3 +23,20 @@ repos:
hooks:
- id: hacking
additional_dependencies: []
- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.18.2
hooks:
- id: mypy
additional_dependencies: [
'fixtures>=4.2.6',
'oslo.context',
'oslo.i18n',
'oslo.utils',
'types-python-dateutil',
]
# keep this in-sync with '[tool.mypy] exclude' in 'pyproject.toml'
exclude: |
(?x)(
doc/.*
| releasenotes/.*
)

View File

@@ -255,7 +255,7 @@ log_opts = [
]
def list_opts():
def list_opts() -> list[tuple[str | None, list[cfg.Opt]]]:
"""Returns a list of oslo.config options available in the library.
The returned list includes all oslo.config options which may be registered

View File

@@ -13,13 +13,17 @@
import argparse
import collections
from collections.abc import Callable, Iterator, Sequence
import functools
import io
import sys
import time
from typing import cast
from oslo_serialization import jsonutils
from oslo_utils import importutils
from oslo_log.formatters import JSONLogRecord
from oslo_log import log
termcolor = importutils.try_import('termcolor')
@@ -30,7 +34,7 @@ DEFAULT_LEVEL_KEY = 'levelname'
DEFAULT_TRACEBACK_KEY = 'traceback'
def main():
def main() -> None:
global _USE_COLOR
args = parse_args()
_USE_COLOR = args.color
@@ -61,7 +65,7 @@ def main():
sys.exit(0)
def parse_args():
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser()
parser.add_argument(
"file",
@@ -134,7 +138,7 @@ def parse_args():
return args
def colorise(key, text=None):
def colorise(key: str, text: str | None = None) -> str:
if text is None:
text = key
if not _USE_COLOR:
@@ -149,15 +153,19 @@ def colorise(key, text=None):
}
color, attrs = colors.get(key, ('', []))
if color:
return termcolor.colored(text, color=color, attrs=attrs)
return cast(str, termcolor.colored(text, color=color, attrs=attrs))
return text
def warn(prefix, msg):
def warn(prefix: str, msg: object) -> str:
return "{}: {}".format(colorise('exc', prefix), msg)
def reformat_json(fh, formatter, follow=False):
def reformat_json(
fh: io.StringIO,
formatter: Callable[..., Iterator[str]],
follow: bool = False,
) -> Iterator[str]:
# using readline allows interactive stdin to respond to every line
while True:
line = fh.readline()
@@ -179,29 +187,31 @@ def reformat_json(fh, formatter, follow=False):
def console_format(
prefix,
locator,
record,
loggers=[],
levels=[],
level_key=DEFAULT_LEVEL_KEY,
traceback_key=DEFAULT_TRACEBACK_KEY,
):
prefix: str,
locator: str,
record: JSONLogRecord,
loggers: Sequence[str] = [],
levels: Sequence[str] = [],
level_key: str = DEFAULT_LEVEL_KEY,
traceback_key: str = DEFAULT_TRACEBACK_KEY,
) -> Iterator[str]:
# Provide an empty string to format-specifiers the record is
# missing, instead of failing. Doesn't work for non-string
# specifiers.
record = collections.defaultdict(str, record)
record = collections.defaultdict(str, record) # type: ignore
# skip if the record doesn't match a logger we are looking at
if loggers:
name = record.get('name')
name = record['name']
if not any(name.startswith(n) for n in loggers):
return
if levels:
if record.get(level_key) not in levels:
return
levelname = record.get(level_key)
if levelname:
record[level_key] = colorise(levelname)
record[level_key] = colorise(levelname) # type: ignore
try:
prefix = prefix % record
@@ -227,7 +237,7 @@ def console_format(
if tb:
if type(tb) is str:
tb = tb.rstrip().split("\n")
for tb_line in tb:
for tb_line in tb: # type: ignore
yield ' '.join([prefix, tb_line])

View File

@@ -12,10 +12,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
from typing import Any
import fixtures
def get_logging_handle_error_fixture():
def get_logging_handle_error_fixture() -> Any:
"""returns a fixture to make logging raise formatting exceptions.
To use::
@@ -27,7 +30,7 @@ def get_logging_handle_error_fixture():
return fixtures.MonkeyPatch('logging.Handler.handleError', _handleError)
def _handleError(self, record):
def _handleError(self: logging.Handler, record: logging.LogRecord) -> None:
"""Monkey patch for logging.Handler.handleError.
The default handleError just logs the error to stderr but we want

View File

@@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from collections.abc import Sequence
import logging
import fixtures
@@ -35,11 +36,11 @@ class SetLogLevel(fixtures.Fixture):
:type level: int
"""
def __init__(self, logger_names, level):
def __init__(self, logger_names: Sequence[str], level: int):
self.logger_names = logger_names
self.level = level
def setUp(self):
def setUp(self) -> None:
super().setUp()
for name in self.logger_names:
# NOTE(dhellmann): Use the stdlib version of getLogger()

View File

@@ -21,16 +21,26 @@ import re
import socket
import sys
import traceback
from types import TracebackType
from typing import Any, cast, TypeAlias, TypedDict
from dateutil import tz
from oslo_config import cfg
from oslo_context import context as context_utils
from oslo_serialization import jsonutils
from oslo_utils import encodeutils
_SysExcInfoType: TypeAlias = (
tuple[type[BaseException], BaseException, TracebackType | None]
| tuple[None, None, None]
)
def _dictify_context(context):
if getattr(context, 'get_logging_values', None):
def _dictify_context(
context: context_utils.RequestContext | dict[str, Any],
) -> dict[str, Any]:
if isinstance(context, context_utils.RequestContext):
return context.get_logging_values()
# This dict only style logging format will become deprecated
# when projects using a dictionary object for context are updated
@@ -45,12 +55,14 @@ def _dictify_context(context):
_CONF = None
def _store_global_conf(conf):
def _store_global_conf(conf: cfg.ConfigOpts) -> cfg.ConfigOpts:
global _CONF
_CONF = conf
def _update_record_with_context(record):
def _update_record_with_context(
record: logging.LogRecord,
) -> context_utils.RequestContext | None:
"""Given a log record, update it with context information.
The request context, if there is one, will either be passed with the
@@ -67,7 +79,7 @@ def _update_record_with_context(record):
return context
def _ensure_unicode(msg):
def _ensure_unicode(msg: Any) -> str:
"""Do our best to turn the input argument into a unicode object."""
if isinstance(msg, str):
return msg
@@ -78,7 +90,7 @@ def _ensure_unicode(msg):
)
def _get_error_summary(record):
def _get_error_summary(record: logging.LogRecord) -> str:
"""Return the error summary
If there is no active exception, return the default.
@@ -155,36 +167,70 @@ def _get_error_summary(record):
return error_summary
class _ReplaceFalseValue(dict):
def __getitem__(self, key):
class _ReplaceFalseValue(dict[str, Any]):
def __getitem__(self, key: str) -> Any:
return dict.get(self, key, None) or '-'
_MSG_KEY_REGEX = re.compile(r'(%+)\((\w+)\)')
def _json_dumps_with_fallback(obj):
def _json_dumps_with_fallback(obj: Any) -> str:
# Bug #1593641: If an object cannot be serialized to JSON, convert
# it using repr() to prevent serialization errors. Using repr() is
# not ideal, but serialization errors are unexpected on logs,
# especially when the code using logs is not aware that the
# JSONFormatter will be used.
convert = functools.partial(jsonutils.to_primitive, fallback=repr)
return jsonutils.dumps(obj, default=convert)
return cast(str, jsonutils.dumps(obj, default=convert))
class JSONLogRecord(TypedDict):
message: str
asctime: str
name: str
msg: str
args: Any
levelname: str
levelno: int
pathname: str
filename: str
module: str
lineno: int
funcname: str
created: float
msecs: float
relative_created: float
thread: int | None
thread_name: str | None
process_name: str | None
process: int | None
traceback: str | None
hostname: str | None
error_summary: str
context: dict[str, Any]
extra: dict[str, Any]
class JSONFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None, style='%'):
# NOTE(sfinucan) we ignore the fmt and style arguments, but they're
def __init__(
self,
fmt: str | None = None,
datefmt: str | None = None,
style: str = '%',
):
# NOTE(stephenfin) we ignore the fmt and style arguments, but they're
# still there since logging.config.fileConfig passes the former in
# Python < 3.2 and both in Python >= 3.2
self.datefmt = datefmt
try:
self.hostname = socket.gethostname()
self.hostname: str | None = socket.gethostname()
except OSError:
self.hostname = None
def formatException(self, ei, strip_newlines=True):
def formatException(
self, ei: _SysExcInfoType, *, strip_newlines: bool = True
) -> str:
try:
lines = traceback.format_exception(*ei)
except TypeError as type_error:
@@ -192,14 +238,14 @@ class JSONFormatter(logging.Formatter):
msg = str(type_error)
lines = [f'<Unprintable exception due to {msg}>\n']
if strip_newlines:
lines = [
_lines = [
filter(lambda x: x, line.rstrip().splitlines())
for line in lines
]
lines = list(itertools.chain(*lines))
return lines
lines = list(itertools.chain(*_lines))
return '\n'.join(lines)
def format(self, record):
def format(self, record: logging.LogRecord) -> str:
args = record.args
if isinstance(args, dict):
msg_keys = _MSG_KEY_REGEX.findall(record.msg)
@@ -211,7 +257,8 @@ class JSONFormatter(logging.Formatter):
# the value to be formatted. Don't filter anything.
if msg_keys:
args = {k: v for k, v in args.items() if k in msg_keys}
message = {
message: JSONLogRecord = {
'message': record.getMessage(),
'asctime': self.formatTime(record, self.datefmt),
'name': record.name,
@@ -234,6 +281,8 @@ class JSONFormatter(logging.Formatter):
'traceback': None,
'hostname': self.hostname,
'error_summary': _get_error_summary(record),
'context': {},
'extra': {},
}
# Build the extra values that were given to us, including
@@ -255,8 +304,7 @@ class JSONFormatter(logging.Formatter):
message['context'] = _dictify_context(extra['context'])
elif context:
message['context'] = _dictify_context(context)
else:
message['context'] = {}
extra.pop('context', None)
message['extra'] = extra
@@ -276,12 +324,17 @@ class FluentFormatter(logging.Formatter):
.. versionadded:: 3.17
"""
def __init__(self, fmt=None, datefmt=None, style='%s'):
def __init__(
self,
fmt: str | None = None,
datefmt: str | None = None,
style: str = '%',
):
# NOTE(sfinucan) we ignore the fmt and style arguments for the same
# reason as JSONFormatter.
self.datefmt = datefmt
try:
self.hostname = socket.gethostname()
self.hostname: str | None = socket.gethostname()
except OSError:
self.hostname = None
self.cmdline = " ".join(sys.argv)
@@ -290,13 +343,15 @@ class FluentFormatter(logging.Formatter):
import uwsgi
svc_name = uwsgi.opt.get("name")
self.uwsgi_name = svc_name
self.uwsgi_name: str | None = svc_name
except Exception:
self.uwsgi_name = None
def formatException(self, exc_info, strip_newlines=True):
def formatException(
self, ei: _SysExcInfoType, *, strip_newlines: bool = True
) -> str:
try:
lines = traceback.format_exception(*exc_info)
lines = traceback.format_exception(*ei)
except TypeError as type_error:
# Work around https://bugs.python.org/issue28603
msg = str(type_error)
@@ -305,9 +360,9 @@ class FluentFormatter(logging.Formatter):
lines = functools.reduce(
lambda a, line: a + line.rstrip().splitlines(), lines, []
)
return lines
return '\n'.join(lines)
def format(self, record):
def format(self, record: logging.LogRecord) -> Any:
message = {
'message': record.getMessage(),
'time': self.formatTime(record, self.datefmt),
@@ -382,7 +437,7 @@ class ContextFormatter(logging.Formatter):
the data in a dict representation of the context.
"""
def __init__(self, *args, **kwargs):
def __init__(self, *args: Any, **kwargs: Any) -> None:
"""Initialize ContextFormatter instance
Takes additional keyword arguments which can be used in the message
@@ -401,7 +456,7 @@ class ContextFormatter(logging.Formatter):
logging.Formatter.__init__(self, *args, **kwargs)
def format(self, record):
def format(self, record: logging.LogRecord) -> str:
"""Uses contextstring if request_id is set, otherwise default."""
# store project info
record.project = self.project
@@ -479,15 +534,17 @@ class ContextFormatter(logging.Formatter):
# Cache the formatted traceback on the record, Logger will
# respect our formatted copy
if record.exc_info:
record.exc_text = self.formatException(record.exc_info, record)
record.exc_text = self.formatException(
record.exc_info, record=record
)
record.error_summary = _get_error_summary(record)
if '%(error_summary)s' in fmt:
# If we have been told explicitly how to format the error
# summary, make sure there is always a default value for
# it.
record.error_summary = record.error_summary or '-'
elif record.error_summary:
record.error_summary = getattr(record, 'error_summary') or '-'
elif getattr(record, 'error_summary'):
# If we have not been told how to format the error and
# there is an error to summarize, make sure the format
# string includes the bits we need to include it.
@@ -514,11 +571,13 @@ class ContextFormatter(logging.Formatter):
).replace('%', '*')
return logging.Formatter.format(self, record)
def formatException(self, exc_info, record=None):
def formatException(
self, ei: _SysExcInfoType, *, record: logging.LogRecord | None = None
) -> str:
"""Format exception output with CONF.logging_exception_prefix."""
if not record:
try:
return logging.Formatter.formatException(self, exc_info)
return logging.Formatter.formatException(self, ei)
except TypeError as type_error:
# Work around https://bugs.python.org/issue28603
msg = str(type_error)
@@ -526,9 +585,7 @@ class ContextFormatter(logging.Formatter):
stringbuffer = io.StringIO()
try:
traceback.print_exception(
exc_info[0], exc_info[1], exc_info[2], None, stringbuffer
)
traceback.print_exception(ei[0], ei[1], ei[2], None, stringbuffer)
except TypeError as type_error:
# Work around https://bugs.python.org/issue28603
msg = str(type_error)
@@ -549,7 +606,7 @@ class ContextFormatter(logging.Formatter):
formatted_lines.append(fl)
return '\n'.join(formatted_lines)
def _compute_iso_time(self, record):
def _compute_iso_time(self, record: logging.LogRecord) -> None:
# set iso8601 timestamp
localtz = tz.tzlocal()
record.isotime = (
@@ -565,5 +622,5 @@ class ContextFormatter(logging.Formatter):
# isoformat() looks different. This adds microseconds when
# that happens.
record.isotime = (
f"{record.isotime[:-6]}.000000{record.isotime[-6:]}"
f"{record.isotime[:-6]}.000000{record.isotime[-6:]}" # type: ignore
)

View File

@@ -10,11 +10,14 @@
# License for the specific language governing permissions and limitations
# under the License.
from __future__ import annotations
import inspect
import logging
import logging.config
import logging.handlers
import os
from typing import TYPE_CHECKING
try:
from systemd import journal
@@ -23,16 +26,20 @@ except ImportError:
try:
import syslog
except ImportError:
syslog = None
syslog = None # type: ignore
if TYPE_CHECKING:
# Needed until we bump our minimum to Python 3.11
#
# https://github.com/python/typeshed/issues/7855
from _typeshed import SupportsWrite
_StreamHandler = logging.StreamHandler[SupportsWrite[str]]
else:
_StreamHandler = logging.StreamHandler
NullHandler = logging.NullHandler
def _get_binary_name():
return os.path.basename(inspect.stack()[-1][1])
_AUDIT = logging.INFO + 1
_TRACE = 5
@@ -50,22 +57,24 @@ SYSLOG_MAP = {
}
def _get_binary_name() -> str:
return os.path.basename(inspect.stack()[-1][1])
class OSSysLogHandler(logging.Handler):
"""Syslog based handler. Only available on UNIX-like platforms."""
def __init__(self, facility=None):
def __init__(self, facility: int | None = None) -> None:
# Default values always get evaluated, for which reason we avoid
# using 'syslog' directly, which may not be available.
facility = facility if facility is not None else syslog.LOG_USER
# Do not use super() unless type(logging.Handler) is 'type'
# (i.e. >= Python 2.7).
if not syslog:
raise RuntimeError("Syslog not available on this platform")
logging.Handler.__init__(self)
super().__init__()
binary_name = _get_binary_name()
syslog.openlog(binary_name, 0, facility)
def emit(self, record):
def emit(self, record: logging.LogRecord) -> None:
priority = SYSLOG_MAP.get(record.levelname, 7)
message = self.format(record)
syslog.syslog(priority, message)
@@ -80,7 +89,7 @@ class OSJournalHandler(logging.Handler):
'request_id',
)
def __init__(self, facility=None):
def __init__(self, facility: int | None = None):
if not journal:
raise RuntimeError("Systemd bindings do not exist")
@@ -95,7 +104,7 @@ class OSJournalHandler(logging.Handler):
self.binary_name = _get_binary_name()
self.facility = facility
def emit(self, record):
def emit(self, record: logging.LogRecord) -> None:
priority = SYSLOG_MAP.get(record.levelname, 7)
message = self.format(record)
@@ -115,7 +124,7 @@ class OSJournalHandler(logging.Handler):
if record.exc_info:
# Cache the traceback text to avoid converting it multiple times
# (it's constant anyway)
if not record.exc_text:
if not record.exc_text and self.formatter is not None:
record.exc_text = self.formatter.formatException(
record.exc_info
)
@@ -132,7 +141,7 @@ class OSJournalHandler(logging.Handler):
journal.send(message, **extras)
class ColorHandler(logging.StreamHandler):
class ColorHandler(_StreamHandler):
"""Log handler that sets the 'color' key based on the level
To use, include a '%(color)s' entry in the logging_context_format_string.
@@ -140,7 +149,7 @@ class ColorHandler(logging.StreamHandler):
the color within a log line.
"""
LEVEL_COLORS = {
LEVEL_COLORS: dict[int, str] = {
_TRACE: '\033[00;35m', # MAGENTA
logging.DEBUG: '\033[00;32m', # GREEN
logging.INFO: '\033[00;36m', # CYAN
@@ -150,7 +159,6 @@ class ColorHandler(logging.StreamHandler):
logging.CRITICAL: '\033[01;31m', # BOLD RED
}
def format(self, record):
def format(self, record: logging.LogRecord) -> str:
record.color = self.LEVEL_COLORS[record.levelno]
record.reset_color = '\033[00m'
return logging.StreamHandler.format(self, record) + record.reset_color
return logging.StreamHandler.format(self, record) + '\033[00m'

View File

@@ -14,18 +14,23 @@
"""Log helper functions."""
from collections.abc import Callable
import functools
import inspect
import logging
from typing import Any
from typing import TypeVar
F = TypeVar('F', bound=Callable[..., Any])
def _get_full_class_name(cls):
def _get_full_class_name(cls: type) -> str:
return '{}.{}'.format(
cls.__module__, getattr(cls, '__qualname__', cls.__name__)
)
def _is_method(obj, method):
def _is_method(obj: object, method: Callable[..., Any]) -> bool:
"""Returns True if a given method is obj's method.
You can not simply test a given method like:
@@ -38,7 +43,7 @@ def _is_method(obj, method):
return inspect.ismethod(getattr(obj, method.__name__, None))
def log_method_call(method):
def log_method_call(method: F) -> F:
"""Decorator helping to log method calls.
:param method: Method to decorate to be logged.
@@ -47,7 +52,7 @@ def log_method_call(method):
log = logging.getLogger(method.__module__)
@functools.wraps(method)
def wrapper(*args, **kwargs):
def wrapper(*args: Any, **kwargs: Any) -> Any:
args_start_pos = 0
if args:
first_arg = args[0]
@@ -76,4 +81,4 @@ def log_method_call(method):
)
return method(*args, **kwargs)
return wrapper
return wrapper # type: ignore[return-value]

View File

@@ -24,20 +24,25 @@ may be passed as part of the log message, which is intended to make it easier
for admins to find messages related to a specific instance.
It also allows setting of formatting information through conf.
"""
from collections.abc import Callable
from collections.abc import Generator
from collections.abc import Iterable
from collections.abc import MutableMapping
import configparser
import logging
import logging.config
import logging.handlers
import os
import sys
from types import TracebackType
from typing import Any, cast, TYPE_CHECKING
try:
import syslog
except ImportError:
syslog = None
syslog = None # type: ignore
import debtcollector
from oslo_config import cfg
@@ -50,6 +55,15 @@ from oslo_log import _options
from oslo_log import formatters
from oslo_log import handlers
if TYPE_CHECKING:
# Needed until we bump our minimum to Python 3.11
#
# https://github.com/python/typeshed/issues/7855
_LoggerAdapter = logging.LoggerAdapter[logging.Logger]
else:
_LoggerAdapter = logging.LoggerAdapter
CRITICAL = logging.CRITICAL
FATAL = logging.FATAL
ERROR = logging.ERROR
@@ -74,9 +88,11 @@ LOG_ROTATE_INTERVAL_MAPPING = {
_EVENTLET_FIX_APPLIED = False
def _get_log_file_path(conf, binary=None):
logfile = conf.log_file
logdir = conf.log_dir
def _get_log_file_path(
conf: cfg.ConfigOpts, binary: str | None = None
) -> str | None:
logfile: str | None = conf.log_file
logdir: str | None = conf.log_dir
if logfile and not logdir:
return logfile
@@ -91,7 +107,7 @@ def _get_log_file_path(conf, binary=None):
return None
def _iter_loggers():
def _iter_loggers() -> Generator[logging.Logger, None, None]:
"""Iterate on existing loggers."""
# Sadly, Logger.manager and Manager.loggerDict are not documented,
@@ -107,14 +123,14 @@ def _iter_loggers():
yield logger
class BaseLoggerAdapter(logging.LoggerAdapter):
class BaseLoggerAdapter(_LoggerAdapter):
warn = logging.LoggerAdapter.warning
@property
def handlers(self):
def handlers(self) -> Iterable[logging.Handler] | None:
return self.logger.handlers
def trace(self, msg, *args, **kwargs):
def trace(self, msg: Any, *args: Any, **kwargs: Any) -> None:
self.log(TRACE, msg, *args, **kwargs)
@@ -136,11 +152,13 @@ class KeywordArgumentAdapter(BaseLoggerAdapter):
"""
def process(self, msg, kwargs):
def process(
self, msg: Any, kwargs: MutableMapping[str, Any]
) -> tuple[Any, MutableMapping[str, Any]]:
# Make a new extra dictionary combining the values we were
# given when we were constructed and anything from kwargs.
extra = {}
extra.update(self.extra)
extra: dict[str, Any] = {}
extra.update(self.extra) # type: ignore
if 'extra' in kwargs:
extra.update(kwargs.pop('extra'))
# Move any unknown keyword arguments into the extra
@@ -199,10 +217,21 @@ class KeywordArgumentAdapter(BaseLoggerAdapter):
return msg, kwargs
def _create_logging_excepthook(product_name):
def logging_excepthook(exc_type, value, tb):
extra = {'exc_info': (exc_type, value, tb)}
getLogger(product_name).critical('Unhandled error', **extra)
def _create_logging_excepthook(
product_name: str,
) -> Callable[
[type[BaseException], BaseException, TracebackType | None],
None,
]:
def logging_excepthook(
exc_type: type[BaseException],
value: BaseException,
tb: TracebackType | None,
) -> None:
getLogger(product_name).critical(
'Unhandled error',
exc_info=(exc_type, value, tb),
)
return logging_excepthook
@@ -210,49 +239,50 @@ def _create_logging_excepthook(product_name):
class LogConfigError(Exception):
message = _('Error loading logging config %(log_config)s: %(err_msg)s')
def __init__(self, log_config, err_msg):
def __init__(self, log_config: str, err_msg: str) -> None:
self.log_config = log_config
self.err_msg = err_msg
def __str__(self):
def __str__(self) -> str:
return self.message % dict(
log_config=self.log_config, err_msg=self.err_msg
)
def _load_log_config(log_config_append):
def _load_log_config(log_config_append: str) -> None:
try:
if not hasattr(_load_log_config, "old_time"):
_load_log_config.old_time = 0
if not hasattr(_load_log_config, 'old_time'):
setattr(_load_log_config, 'old_time', 0)
new_time = os.path.getmtime(log_config_append)
if _load_log_config.old_time != new_time:
if getattr(_load_log_config, 'old_time') != new_time:
# Reset all existing loggers before reloading config as fileConfig
# does not reset non-child loggers.
for logger in _iter_loggers():
logger.setLevel(logging.NOTSET)
logger.handlers = []
logger.propagate = 1
logger.propagate = True
logging.config.fileConfig(
log_config_append, disable_existing_loggers=False
)
_load_log_config.old_time = new_time
setattr(_load_log_config, 'old_time', new_time)
except (configparser.Error, KeyError, OSError, RuntimeError) as exc:
raise LogConfigError(log_config_append, str(exc))
def _mutate_hook(conf, fresh):
def _mutate_hook(conf: cfg.ConfigOpts, fresh: cfg.ConfigOpts) -> None:
"""Reconfigures oslo.log according to the mutated options."""
if (None, 'debug') in fresh:
_refresh_root_level(conf.debug)
if (None, 'log-config-append') in fresh:
_load_log_config.old_time = 0
setattr(_load_log_config, 'old_time', 0)
if conf.log_config_append:
_load_log_config(conf.log_config_append)
def register_options(conf):
def register_options(conf: cfg.ConfigOpts) -> None:
"""Register the command line and configuration options used by oslo.log."""
# Sometimes logging occurs before logging is ready (e.g., oslo_config).
@@ -270,7 +300,7 @@ def register_options(conf):
conf.register_mutate_hook(_mutate_hook)
def _fix_eventlet_logging():
def _fix_eventlet_logging() -> None:
"""Properly setup logging with eventlet on native threads.
Workaround for: https://github.com/eventlet/eventlet/issues/432
@@ -288,13 +318,19 @@ def _fix_eventlet_logging():
import eventlet.green.threading
from oslo_log import pipe_mutex
logging.threading = eventlet.green.threading
logging._lock = logging.threading.RLock()
logging.Handler.createLock = pipe_mutex.pipe_createLock
logging.threading = eventlet.green.threading # type: ignore
logging._lock = logging.threading.RLock() # type: ignore
logging.Handler.createLock = pipe_mutex.pipe_createLock # type: ignore
_EVENTLET_FIX_APPLIED = True
def setup(conf, product_name, version='unknown', *, fix_eventlet=True):
def setup(
conf: cfg.ConfigOpts,
product_name: str,
version: str = 'unknown',
*,
fix_eventlet: bool = True,
) -> None:
"""Setup logging for the current application."""
if fix_eventlet:
_fix_eventlet_logging()
@@ -305,7 +341,10 @@ def setup(conf, product_name, version='unknown', *, fix_eventlet=True):
sys.excepthook = _create_logging_excepthook(product_name)
def set_defaults(logging_context_format_string=None, default_log_levels=None):
def set_defaults(
logging_context_format_string: str | None = None,
default_log_levels: Iterable[str] | None = None,
) -> None:
"""Set default values for the configuration options used by oslo.log."""
# Just in case the caller is not setting the
# default_log_level. This is insurance because
@@ -322,7 +361,7 @@ def set_defaults(logging_context_format_string=None, default_log_levels=None):
)
def tempest_set_log_file(filename):
def tempest_set_log_file(filename: str) -> None:
"""Provide an API for tempest to set the logging filename.
.. warning:: Only Tempest should use this function.
@@ -336,7 +375,7 @@ def tempest_set_log_file(filename):
cfg.set_defaults(_options.logging_cli_opts, log_file=filename)
def _find_facility(facility):
def _find_facility(facility: str) -> Any:
# NOTE(jd): Check the validity of facilities at run time as they differ
# depending on the OS and Python version being used.
valid_facilities = [
@@ -380,7 +419,7 @@ def _find_facility(facility):
return getattr(syslog, facility)
def _refresh_root_level(debug):
def _refresh_root_level(debug: bool) -> None:
"""Set the level of the root logger.
:param debug: If 'debug' is True, the level will be DEBUG.
@@ -393,7 +432,9 @@ def _refresh_root_level(debug):
log_root.setLevel(logging.INFO)
def _setup_logging_from_conf(conf, project, version):
def _setup_logging_from_conf(
conf: cfg.ConfigOpts, project: str, version: str
) -> None:
log_root = getLogger(None).logger
# Remove all handlers
@@ -402,6 +443,9 @@ def _setup_logging_from_conf(conf, project, version):
logpath = _get_log_file_path(conf)
if logpath:
file_handler: type[logging.Handler]
filelog: logging.Handler
# On Windows, in-use files cannot be moved or deleted.
if conf.log_rotation_type.lower() == "interval":
file_handler = logging.handlers.TimedRotatingFileHandler
@@ -430,6 +474,7 @@ def _setup_logging_from_conf(conf, project, version):
log_root.addHandler(filelog)
if conf.use_stderr:
streamlog: logging.Handler
if conf.log_color:
streamlog = handlers.ColorHandler()
else:
@@ -508,15 +553,19 @@ def _setup_logging_from_conf(conf, project, version):
)
_loggers = {}
_loggers: dict[str | None, BaseLoggerAdapter] = {}
def get_loggers():
def get_loggers() -> dict[str | None, BaseLoggerAdapter]:
"""Return a copy of the oslo loggers dictionary."""
return _loggers.copy()
def getLogger(name=None, project='unknown', version='unknown'):
def getLogger(
name: str | None = None,
project: str = 'unknown',
version: str = 'unknown',
) -> BaseLoggerAdapter:
"""Build a logger with the given name.
:param name: The name for the logger. This is usually the module
@@ -543,7 +592,7 @@ def getLogger(name=None, project='unknown', version='unknown'):
return _loggers[name]
def get_default_log_levels():
def get_default_log_levels() -> list[str]:
"""Return the Oslo Logging default log levels.
Returns a copy of the list so an application can change the value
@@ -553,6 +602,6 @@ def get_default_log_levels():
return list(_options.DEFAULT_LOG_LEVELS)
def is_debug_enabled(conf):
def is_debug_enabled(conf: cfg.ConfigOpts) -> bool:
"""Determine if debug logging mode is enabled."""
return conf.debug
return cast(bool, conf.debug)

View File

@@ -18,6 +18,9 @@ from asyncio.exceptions import TimeoutError as AsyncioTimeoutError
import errno
import fcntl
import importlib.metadata
import logging
from types import TracebackType
from typing import TYPE_CHECKING
import eventlet
import eventlet.asyncio
@@ -27,6 +30,18 @@ import eventlet.hubs
import eventlet.hubs.asyncio
import eventlet.patcher
if TYPE_CHECKING:
from typing_extensions import Self
# Needed until we bump our minimum to Python 3.11
#
# https://github.com/python/typeshed/issues/7855
from _typeshed import SupportsWrite
_StreamHandler = logging.StreamHandler[SupportsWrite[str]]
else:
_StreamHandler = logging.StreamHandler
# We want the blocking APIs, because we set file descriptors to non-blocking.
os = eventlet.patcher.original("os")
# Used to communicate between real threads:
@@ -38,11 +53,11 @@ threading = eventlet.patcher.original("threading")
class _BaseMutex:
"""Shared code for different mutex implementations."""
def __init__(self):
self.owner = None
def __init__(self) -> None:
self.owner: int | None = None
self.recursion_depth = 0
def acquire(self, blocking=True):
def acquire(self, blocking: bool = True) -> bool:
"""Acquire the mutex.
If called with blocking=False, returns True if the mutex was
@@ -59,7 +74,12 @@ class _BaseMutex:
return self._acquire_eventlet(blocking, current_greenthread_id)
def release(self):
def _acquire_eventlet(
self, blocking: bool, current_greenthread_id: int
) -> bool:
raise NotImplementedError()
def release(self) -> None:
"""Release the mutex."""
current_greenthread_id = id(eventlet.greenthread.getcurrent())
if self.owner != current_greenthread_id:
@@ -72,7 +92,10 @@ class _BaseMutex:
self.owner = None
self._release_eventlet()
def close(self):
def _release_eventlet(self) -> None:
raise NotImplementedError()
def close(self) -> None:
"""Close the mutex.
This releases its file descriptors.
@@ -91,7 +114,7 @@ class _ReallyPipeMutex(_BaseMutex):
Related eventlet bug: https://github.com/eventlet/eventlet/issues/432
"""
def __init__(self):
def __init__(self) -> None:
super().__init__()
self.rfd, self.wfd = os.pipe()
@@ -118,7 +141,9 @@ class _ReallyPipeMutex(_BaseMutex):
# our calls to trampoline(), but eventlet does not support that.
eventlet.debug.hub_prevent_multiple_readers(False)
def _acquire_eventlet(self, blocking, current_greenthread_id):
def _acquire_eventlet(
self, blocking: bool, current_greenthread_id: int
) -> bool:
while True:
try:
# If there is a byte available, this will read it and remove
@@ -139,10 +164,10 @@ class _ReallyPipeMutex(_BaseMutex):
# else writes to self.wfd.
eventlet.hubs.trampoline(self.rfd, read=True)
def _release_eventlet(self):
def _release_eventlet(self) -> None:
os.write(self.wfd, b'X')
def close(self):
def close(self) -> None:
"""Close the mutex.
This releases its file descriptors.
@@ -156,7 +181,7 @@ class _ReallyPipeMutex(_BaseMutex):
self.rfd = None
super().close()
def __del__(self):
def __del__(self) -> None:
# We need this so we don't leak file descriptors. Otherwise, if you
# call get_logger() and don't explicitly dispose of it by calling
# logger.logger.handlers[0].lock.close() [1], the pipe file
@@ -170,11 +195,16 @@ class _ReallyPipeMutex(_BaseMutex):
# do, so nobody does it and that's okay.
self.close()
def __enter__(self):
def __enter__(self) -> 'Self':
self.acquire()
return self
def __exit__(self, exc_type, exc_value, traceback):
def __exit__(
self,
type_: type[BaseException] | None,
value: BaseException | None,
traceback: TracebackType | None,
) -> None:
self.release()
@@ -190,12 +220,14 @@ class _AsyncioMutex(_BaseMutex):
thread (each OS thread running greenlets has its own asyncio loop)
"""
def __init__(self):
def __init__(self) -> None:
super().__init__()
self._asyncio_lock = asyncio.Lock()
self._threading_lock = threading.RLock()
async def _asyncio_acquire(self, blocking, current_greenthread_id):
async def _asyncio_acquire(
self, blocking: bool, current_greenthread_id: int
) -> bool:
if blocking:
timeout = None
else:
@@ -210,12 +242,23 @@ class _AsyncioMutex(_BaseMutex):
self.owner = current_greenthread_id
return True
def _acquire_eventlet(self, blocking, current_greenthread_id):
return eventlet.asyncio.spawn_for_awaitable(
def _acquire_eventlet(
self, blocking: bool, current_greenthread_id: int
) -> bool:
return eventlet.asyncio.spawn_for_awaitable( # type: ignore
self._asyncio_acquire(blocking, current_greenthread_id)
).wait()
def acquire(self, blocking=True):
def acquire(self, blocking: bool = True) -> bool:
"""Acquire the mutex.
If called with blocking=False, returns True if the mutex was
acquired and False if it wasn't. Otherwise, blocks until the mutex
is acquired and returns True.
This lock is recursive; the same greenthread may acquire it as many
times as it wants to, though it must then release it that many times
too.
"""
# First, acquire the RLock:
rlock_acquired = self._threading_lock.acquire(blocking=False)
if not rlock_acquired and not blocking:
@@ -232,20 +275,17 @@ class _AsyncioMutex(_BaseMutex):
# Then, do the eventlet locking:
return super().acquire(blocking=blocking)
# Preserve documentation, without copy/pasting:
acquire.__doc__ = _BaseMutex.acquire.__doc__
def _release_eventlet(self):
def _release_eventlet(self) -> None:
self._asyncio_lock.release()
def release(self):
def release(self) -> None:
"""Release the mutex."""
# We release in reverse order from acquire(), first eventlet and then
# the RLock:
super().release()
self._threading_lock.release()
def close(self):
def close(self) -> None:
"""Close the mutex."""
del self._asyncio_lock
del self._threading_lock
@@ -261,11 +301,11 @@ if isinstance(_HUB, eventlet.hubs.asyncio.Hub):
raise RuntimeError(
"eventlet 0.38.2 or later is required when using asyncio hub"
)
PipeMutex = _AsyncioMutex
PipeMutex: type[_AsyncioMutex | _ReallyPipeMutex] = _AsyncioMutex
else:
PipeMutex = _ReallyPipeMutex
def pipe_createLock(self):
def pipe_createLock(self: _StreamHandler) -> None:
"""Replacement for logging.Handler.createLock method."""
self.lock = PipeMutex()
self.lock = PipeMutex() # type: ignore

0
oslo_log/py.typed Normal file
View File

View File

@@ -12,13 +12,27 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections.abc
import logging
from time import monotonic as monotonic_clock
from typing import Any
class _LogRateLimit(logging.Filter):
def __init__(self, burst, interval, except_level=None):
burst: float
interval: float
except_level: int | None
logger: logging.Logger
counter: int
end_time: float
emit_warn: bool
def __init__(
self,
burst: float,
interval: float,
except_level: int | None = None,
) -> None:
logging.Filter.__init__(self)
self.burst = burst
self.interval = interval
@@ -26,14 +40,14 @@ class _LogRateLimit(logging.Filter):
self.logger = logging.getLogger()
self._reset()
def _reset(self, now=None):
def _reset(self, now: float | None = None) -> None:
if now is None:
now = monotonic_clock()
self.counter = 0
self.end_time = now + self.interval
self.emit_warn = False
def filter(self, record):
def filter(self, record: logging.LogRecord) -> bool:
if (
self.except_level is not None
and record.levelno >= self.except_level
@@ -68,7 +82,7 @@ class _LogRateLimit(logging.Filter):
return False
def _iter_loggers():
def _iter_loggers() -> collections.abc.Iterator[logging.Logger]:
"""Iterate on existing loggers."""
# Sadly, Logger.manager and Manager.loggerDict are not documented,
@@ -92,8 +106,14 @@ _LOG_LEVELS = {
'DEBUG': logging.DEBUG,
}
# Module-level state for the rate limit filter
_log_filter: _LogRateLimit | None = None
_logger_class: type[logging.Logger] | None = None
def install_filter(burst, interval, except_level='CRITICAL'):
def install_filter(
burst: float, interval: float, except_level: str = 'CRITICAL'
) -> None:
"""Install a rate limit filter on existing and future loggers.
Limit logs to *burst* messages every *interval* seconds, except of levels
@@ -105,8 +125,9 @@ def install_filter(burst, interval, except_level='CRITICAL'):
Raise an exception if a rate limit filter is already installed.
"""
global _log_filter, _logger_class
if install_filter.log_filter is not None:
if _log_filter is not None:
raise RuntimeError("rate limit filter already installed")
try:
@@ -116,11 +137,11 @@ def install_filter(burst, interval, except_level='CRITICAL'):
log_filter = _LogRateLimit(burst, interval, except_levelno)
install_filter.log_filter = log_filter
install_filter.logger_class = logging.getLoggerClass()
_log_filter = log_filter
_logger_class = logging.getLoggerClass()
class RateLimitLogger(install_filter.logger_class):
def __init__(self, *args, **kw):
class RateLimitLogger(_logger_class): # type: ignore[misc,valid-type]
def __init__(self, *args: Any, **kw: Any) -> None:
logging.Logger.__init__(self, *args, **kw)
self.addFilter(log_filter)
@@ -133,26 +154,24 @@ def install_filter(burst, interval, except_level='CRITICAL'):
logger.addFilter(log_filter)
install_filter.log_filter = None
install_filter.logger_class = None
def uninstall_filter():
def uninstall_filter() -> None:
"""Uninstall the rate filter installed by install_filter().
Do nothing if the filter was already uninstalled.
"""
global _log_filter, _logger_class
if install_filter.log_filter is None:
if _log_filter is None:
# not installed (or already uninstalled)
return
# Restore the old logger class
logging.setLoggerClass(install_filter.logger_class)
if _logger_class is not None:
logging.setLoggerClass(_logger_class)
# Remove the filter from all existing loggers
for logger in _iter_loggers():
logger.removeFilter(install_filter.log_filter)
logger.removeFilter(_log_filter)
install_filter.logger_class = None
install_filter.log_filter = None
_logger_class = None
_log_filter = None

View File

@@ -46,7 +46,7 @@ class ConvertJsonTestCase(test_base.BaseTestCase):
def _reformat(self, text):
fh = io.StringIO(text)
return list(convert_json.reformat_json(fh, lambda x: [x]))
return list(convert_json.reformat_json(fh, lambda x: iter([x])))
def test_reformat_json_single(self):
text = jsonutils.dumps(TRIVIAL_RECORD)

View File

@@ -53,7 +53,7 @@ class FormatterTest(test_base.BaseTestCase):
self.assertEqual("-", s)
def test_dictify_context_empty(self):
self.assertEqual({}, formatters._dictify_context(None))
self.assertEqual({}, formatters._dictify_context(None)) # type: ignore
def test_dictify_context_with_dict(self):
d = {"user": "user"}
@@ -78,7 +78,7 @@ class FormatUnhashableExceptionTest(test_base.BaseTestCase):
def _unhashable_exception_info(self):
class UnhashableException(Exception):
__hash__ = None
__hash__ = None # type: ignore
try:
raise UnhashableException()
@@ -88,7 +88,7 @@ class FormatUnhashableExceptionTest(test_base.BaseTestCase):
def test_error_summary(self):
exc_info = self._unhashable_exception_info()
record = logging.LogRecord(
'test', logging.ERROR, 'test', 0, 'test message', [], exc_info
'test', logging.ERROR, 'test', 0, 'test message', {}, exc_info
)
err_summary = formatters._get_error_summary(record)
self.assertTrue(err_summary)
@@ -115,7 +115,7 @@ class FormatUnhashableExceptionTest(test_base.BaseTestCase):
exc_info = self._unhashable_exception_info()
formatter = formatters.ContextFormatter(config=self.conf)
record = logging.LogRecord(
'test', logging.ERROR, 'test', 0, 'test message', [], exc_info
'test', logging.ERROR, 'test', 0, 'test message', {}, exc_info
)
tb = formatter.format(record)
self.assertTrue(tb)

View File

@@ -10,6 +10,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from typing import Any
from unittest import mock
from oslotest import base as test_base
@@ -61,7 +62,7 @@ class LogHelpersTestCase(test_base.BaseTestCase):
def test_staticmethod(arg1, arg2, arg3, *args, **kwargs):
pass
data = {
data: dict[str, Any] = {
'caller': 'static',
'method_name': '_static_method',
'args': (),

View File

@@ -26,7 +26,7 @@ import sys
try:
import syslog
except ImportError:
syslog = None
syslog = None # type: ignore
try:
from systemd import journal
except ImportError:
@@ -37,11 +37,12 @@ from unittest import mock
from dateutil import tz
from oslo_config import cfg
from oslo_config import fixture as fixture_config # noqa
from oslo_config import fixture as fixture_config
from oslo_context import context
from oslo_context import fixture as fixture_context
from oslo_i18n import fixture as fixture_trans
from oslo_serialization import jsonutils
from oslo_utils import units
from oslotest import base as test_base
import testtools
@@ -49,7 +50,7 @@ from oslo_log import _options
from oslo_log import formatters
from oslo_log import handlers
from oslo_log import log
from oslo_utils import units
from oslo_log import versionutils
MIN_LOG_INI = b"""[loggers]
@@ -82,11 +83,7 @@ def _fake_context():
return ctxt
class CommonLoggerTestsMixIn:
"""These tests are shared between LoggerTestCase and
LazyLoggerTestCase.
"""
class LoggerTestCase(test_base.BaseTestCase):
def setUp(self):
super().setUp()
# common context has different fields to the defaults in log.py
@@ -102,7 +99,7 @@ class CommonLoggerTestsMixIn:
'%(user)s %(project)s] '
'%(message)s'
)
self.log = None
self.log = log.getLogger(None)
log._setup_logging_from_conf(self.config_fixture.conf, 'test', 'test')
self.log_handlers = log.getLogger(None).logger.handlers
@@ -187,12 +184,6 @@ class CommonLoggerTestsMixIn:
self.assertEqual(self.log_handlers[0], handler_mock.return_value)
class LoggerTestCase(CommonLoggerTestsMixIn, test_base.BaseTestCase):
def setUp(self):
super().setUp()
self.log = log.getLogger(None)
class BaseTestCase(test_base.BaseTestCase):
def setUp(self):
super().setUp()
@@ -629,7 +620,6 @@ class JSONFormatterTestCase(LogTestBase):
self.assertEqual(special_user, data['extra'][extra_keys[1]])
def test_can_process_strings(self):
expected = b'\\u2622'
# see ContextFormatterTestCase.test_can_process_strings
expected = '\\\\xe2\\\\x98\\\\xa2'
self.log.info(b'%s', '\u2622'.encode())
@@ -716,7 +706,7 @@ class JSONFormatterTestCase(LogTestBase):
def get_fake_datetime(retval):
class FakeDateTime(datetime.datetime):
@classmethod
def fromtimestamp(cls, timestamp):
def fromtimestamp(cls, timestamp, /, tzinfo=None):
return retval
return FakeDateTime
@@ -905,14 +895,13 @@ class ContextFormatterTestCase(LogTestBase):
ctxt = _fake_context()
ctxt.request_id = '99'
message = self.trans_fixture.lazy('test ' + chr(128))
ignored_exceptions = [
for ignore in (
ValueError,
TypeError,
KeyError,
AttributeError,
ImportError,
]
for ignore in ignored_exceptions:
):
try:
raise ignore('test_exception_logging')
except ignore as e:
@@ -1055,7 +1044,6 @@ class ContextFormatterTestCase(LogTestBase):
self.assertEqual(expected, self.stream.getvalue())
def test_can_process_strings(self):
expected = b'\xe2\x98\xa2'
# logging format string should be unicode string
# or it will fail and inserting byte string in unicode string
# causes such formatting
@@ -1645,6 +1633,7 @@ keys=
root = logging.getLogger()
self.assertEqual(1, len(root.handlers))
handler = root.handlers[0]
assert isinstance(handler, logging.StreamHandler)
handler.stream = io.StringIO()
return handler.stream
@@ -1788,14 +1777,14 @@ class LogConfigOptsTestCase(BaseTestCase):
def test_handlers_cleanup(self):
"""Test that all old handlers get removed from log_root."""
old_handlers = [
log.handlers.ColorHandler(),
log.handlers.ColorHandler(),
handlers.ColorHandler(),
handlers.ColorHandler(),
]
log._loggers[None].logger.handlers = list(old_handlers)
log._setup_logging_from_conf(self.CONF, 'test', 'test')
handlers = log._loggers[None].logger.handlers
self.assertEqual(1, len(handlers))
self.assertNotIn(handlers[0], old_handlers)
new_handlers = log._loggers[None].logger.handlers
self.assertEqual(1, len(new_handlers))
self.assertNotIn(new_handlers[0], old_handlers)
def test_list_opts(self):
all_options = _options.list_opts()
@@ -1807,7 +1796,7 @@ class LogConfigOptsTestCase(BaseTestCase):
+ _options.logging_cli_opts
+ _options.generic_log_opts
+ _options.log_opts
+ _options.versionutils.deprecated_opts
+ versionutils.deprecated_opts
),
options,
)
@@ -1929,7 +1918,7 @@ class KeywordArgumentAdapterTestCase(BaseTestCase):
a = SavingAdapter(self.mock_log, {})
message = 'message'
exc_message = 'exception'
exc_message = Exception('exception')
val = 'value'
a.log(logging.DEBUG, message, name=val, exc_info=exc_message)
@@ -1948,7 +1937,7 @@ class KeywordArgumentAdapterTestCase(BaseTestCase):
def test_pass_args_via_debug(self):
a = SavingAdapter(self.mock_log, {})
message = 'message'
exc_message = 'exception'
exc_message = Exception('exception')
val = 'value'
a.debug(message, name=val, exc_info=exc_message)

View File

@@ -266,8 +266,8 @@ class DeprecatedTestCase(test_base.BaseTestCase):
obj = OutdatedClass(*args, **kwargs)
self.assertIsInstance(obj, OutdatedClass)
self.assertEqual('__init__', obj.__init__.__name__)
self.assertEqual('It is __init__ method.', obj.__init__.__doc__)
self.assertEqual('__init__', obj.__init__.__name__) # type: ignore
self.assertEqual('It is __init__ method.', obj.__init__.__doc__) # type: ignore
self.assertEqual(args, mock_arguments.args)
self.assertEqual(kwargs, mock_arguments.kwargs)
self.assert_deprecated(

View File

@@ -20,12 +20,17 @@ Helpers for comparing version strings.
import functools
import inspect
import logging
from typing import Any, TypeVar, cast, overload
from collections.abc import Callable
from oslo_config import cfg
from oslo_log._i18n import _
_F = TypeVar('_F', bound=Callable[..., Any])
_C = TypeVar('_C', bound=type[Any])
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
_DEPRECATED_EXCEPTIONS = set()
@@ -87,7 +92,7 @@ _RELEASES = {
}
def register_options():
def register_options() -> None:
"""Register configuration options used by this library.
.. note: This is optional since the options are also registered
@@ -174,7 +179,13 @@ class deprecated:
YOGA = 'Y'
ZED = 'Z'
def __init__(self, as_of, in_favor_of=None, remove_in=2, what=None):
def __init__(
self,
as_of: str,
in_favor_of: str | None = None,
remove_in: int | None = 2,
what: str | None = None,
) -> None:
"""Initialize decorator
:param as_of: the release deprecating the callable. Constants
@@ -191,7 +202,13 @@ class deprecated:
self.remove_in = remove_in
self.what = what
def __call__(self, func_or_cls):
@overload
def __call__(self, func_or_cls: _C) -> _C: ...
@overload
def __call__(self, func_or_cls: _F) -> _F: ...
def __call__(self, func_or_cls: _F | _C) -> _F | _C:
report_deprecated = functools.partial(
deprecation_warning,
what=self.what or func_or_cls.__name__ + '()',
@@ -203,16 +220,16 @@ class deprecated:
if inspect.isfunction(func_or_cls):
@functools.wraps(func_or_cls)
def wrapped(*args, **kwargs):
def wrapped(*args: Any, **kwargs: Any) -> Any:
report_deprecated()
return func_or_cls(*args, **kwargs)
return wrapped
return cast(_F, wrapped)
elif inspect.isclass(func_or_cls):
orig_init = func_or_cls.__init__
@functools.wraps(orig_init, assigned=('__name__', '__doc__'))
def new_init(self, *args, **kwargs):
def new_init(self: Any, *args: Any, **kwargs: Any) -> None:
if self.__class__ in _DEPRECATED_EXCEPTIONS:
report_deprecated()
orig_init(self, *args, **kwargs)
@@ -232,7 +249,7 @@ class deprecated:
# PyObject_IsSubclass in cpython/Objects/abstract.c
# for the short-cut.)
class ExceptionMeta(type):
def __subclasscheck__(self, subclass):
def __subclasscheck__(self, subclass: type) -> bool:
if self in _DEPRECATED_EXCEPTIONS:
report_deprecated()
return super().__subclasscheck__(subclass)
@@ -240,14 +257,14 @@ class deprecated:
func_or_cls.__meta__ = ExceptionMeta
_DEPRECATED_EXCEPTIONS.add(func_or_cls)
return func_or_cls
return cast(_C, func_or_cls)
else:
raise TypeError(
'deprecated can be used only with functions or classes'
)
def _get_safe_to_remove_release(release, remove_in):
def _get_safe_to_remove_release(release: str, remove_in: int | None) -> str:
# TODO(dstanek): this method will have to be reimplemented once
# when we get to the X release because once we get to the Y
# release, what is Y+2?
@@ -261,8 +278,12 @@ def _get_safe_to_remove_release(release, remove_in):
def deprecation_warning(
what, as_of, in_favor_of=None, remove_in=2, logger=LOG
):
what: str,
as_of: str,
in_favor_of: str | None = None,
remove_in: int | None = 2,
logger: logging.Logger = LOG,
) -> None:
"""Warn about the deprecation of a feature.
:param what: name of the thing being deprecated.
@@ -299,10 +320,15 @@ def deprecation_warning(
# Track the messages we have sent already. See
# report_deprecated_feature().
_deprecated_messages_sent = {}
_deprecated_messages_sent: dict[str, list[Any]] = {}
def report_deprecated_feature(logger, msg, *args, **kwargs):
def report_deprecated_feature(
logger: logging.Logger,
msg: str,
*args: Any,
**kwargs: Any,
) -> None:
"""Call this function when a deprecated feature is used.
If the system is configured for fatal deprecations then the message
@@ -334,5 +360,5 @@ def report_deprecated_feature(logger, msg, *args, **kwargs):
class DeprecatedConfig(Exception):
message = _("Fatal call to deprecated config: %(msg)s")
def __init__(self, msg):
def __init__(self, msg: str) -> None:
super(Exception, self).__init__(self.message % dict(msg=msg))

View File

@@ -48,6 +48,28 @@ convert-json = "oslo_log.cmds.convert_json:main"
[tool.setuptools]
packages = ["oslo_log"]
[tool.mypy]
python_version = "3.10"
show_column_numbers = true
show_error_context = true
ignore_missing_imports = true
strict = true
# keep this in-sync with 'mypy.exclude' in '.pre-commit-config.yaml'
exclude = '''
(?x)(
doc
| releasenotes
)
'''
[[tool.mypy.overrides]]
module = ["oslo_log.tests.*"]
warn_return_any = false
disallow_untyped_calls = false
disallow_untyped_defs = false
disallow_subclassing_any = false
disallow_any_generics = false
[tool.ruff]
line-length = 79
@@ -58,8 +80,8 @@ docstring-code-format = true
[tool.ruff.lint]
select = ["E4", "E5", "E7", "E9", "F", "S", "UP"]
ignore = [
# we want to test printf-style formatting
"UP031"
"UP031", # we want to test printf-style formatting
"S101", # assertions are only used for type narrowing
]
[tool.ruff.lint.per-file-ignores]