Add a filter to rate limit logs

* Add configuration options to enable rate limiting:

  - rate_limit_interval
  - rate_limit_burst
  - rate_limit_except_level

* Add oslo_log.rate_limit submodule
* Add public functins:

  - install_filter(burst, interval, except_level)
  - uninstall_filter()

* Add unit tests
* Add a new dependency, monotonic, to get a monotonic clock

Default: rate limiting is disabled and logs at CRITICAL level are not
rate limited.

DocImpact
Change-Id: Ic58dafceefde1b109721a58631c223522bf4cc9c
This commit is contained in:
Victor Stinner 2016-05-27 17:17:59 +02:00
parent 62ba713e3b
commit 7d1ef90316
5 changed files with 289 additions and 0 deletions

View File

@ -146,6 +146,21 @@ log_opts = [
default='[instance: %(uuid)s] ',
help='The format for an instance UUID that is passed with the '
'log message.'),
cfg.IntOpt('rate_limit_interval',
default=0,
help='Interval, number of seconds, of log rate limiting.'),
cfg.IntOpt('rate_limit_burst',
default=0,
help='Maximum number of logged messages per '
'rate_limit_interval.'),
cfg.StrOpt('rate_limit_except_level',
default='CRITICAL',
help='Log level name used by rate limiting: CRITICAL, ERROR, '
'INFO, WARNING, DEBUG or empty string. Logs with level '
'greater or equal to rate_limit_except_level are not '
'filtered. An empty string means that all levels are '
'filtered.'),
]

View File

@ -415,6 +415,12 @@ def _setup_logging_from_conf(conf, project, version):
else:
logger.setLevel(level_name)
if conf.rate_limit_burst >= 1 and conf.rate_limit_interval >= 1:
from oslo_log import rate_limit
rate_limit.install_filter(conf.rate_limit_burst,
conf.rate_limit_interval,
conf.rate_limit_except)
_loggers = {}

157
oslo_log/rate_limit.py Normal file
View File

@ -0,0 +1,157 @@
# Copyright 2016 Red Hat, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
try:
from time import monotonic as monotonic_clock # noqa
except ImportError:
from monotonic import monotonic as monotonic_clock # noqa
class _LogRateLimit(logging.Filter):
def __init__(self, burst, interval, except_level=None):
logging.Filter.__init__(self)
self.burst = burst
self.interval = interval
self.except_level = except_level
self.logger = logging.getLogger()
self._reset()
def _reset(self, now=None):
if now is None:
now = monotonic_clock()
self.counter = 0
self.end_time = now + self.interval
self.emit_warn = False
def filter(self, record):
if (self.except_level is not None
and record.levelno >= self.except_level):
# don't limit levels >= except_level
return True
timestamp = monotonic_clock()
if timestamp >= self.end_time:
self._reset(timestamp)
self.counter += 1
return True
self.counter += 1
if self.counter <= self.burst:
return True
if self.emit_warn:
# Allow to log our own warning: self.logger is also filtered by
# rate limiting
return True
if self.counter == self.burst + 1:
self.emit_warn = True
self.logger.error("Logging rate limit: "
"drop after %s records/%s sec",
self.burst, self.interval)
self.emit_warn = False
# Drop the log
return False
def _iter_loggers():
"""Iterate on existing loggers."""
# Sadly, Logger.manager and Manager.loggerDict are not documented,
# but there is no logging public function to iterate on all loggers.
# The root logger is not part of loggerDict.
yield logging.getLogger()
manager = logging.Logger.manager
for logger in manager.loggerDict.values():
if isinstance(logger, logging.PlaceHolder):
continue
yield logger
_LOG_LEVELS = {
'CRITICAL': logging.CRITICAL,
'ERROR': logging.ERROR,
'INFO': logging.INFO,
'WARNING': logging.WARNING,
'DEBUG': logging.DEBUG,
}
def install_filter(burst, interval, except_level='CRITICAL'):
"""Install a rate limit filter on existing and future loggers.
Limit logs to *burst* messages every *interval* seconds, except of levels
>= *except_level*. *except_level* is a log level name like 'CRITICAL'. If
*except_level* is an empty string, all levels are filtered.
The filter uses a monotonic clock, the timestamp of log records is not
used.
Raise an exception if a rate limit filter is already installed.
"""
if install_filter.log_filter is not None:
raise RuntimeError("rate limit filter already installed")
try:
except_levelno = _LOG_LEVELS[except_level]
except KeyError:
raise ValueError("invalid log level name: %r" % except_level)
log_filter = _LogRateLimit(burst, interval, except_levelno)
install_filter.log_filter = log_filter
install_filter.logger_class = logging.getLoggerClass()
class RateLimitLogger(install_filter.logger_class):
def __init__(self, *args, **kw):
logging.Logger.__init__(self, *args, **kw)
self.addFilter(log_filter)
# Setup our own logger class to automatically add the filter
# to new loggers.
logging.setLoggerClass(RateLimitLogger)
# Add the filter to all existing loggers
for logger in _iter_loggers():
logger.addFilter(log_filter)
install_filter.log_filter = None
install_filter.logger_class = None
def uninstall_filter():
"""Uninstall the rate filter installed by install_filter().
Do nothing if the filter was already uninstalled.
"""
if install_filter.log_filter is None:
# not installed (or already uninstalled)
return
# Restore the old logger class
logging.setLoggerClass(install_filter.logger_class)
# Remove the filter from all existing loggers
for logger in _iter_loggers():
logger.removeFilter(install_filter.log_filter)
install_filter.logger_class = None
install_filter.log_filter = None

View File

@ -0,0 +1,110 @@
# Copyright 2016 Red Hat, Inc. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import logging
import mock
from oslotest import base as test_base
import six
from oslo_log import rate_limit
class LogRateLimitTestCase(test_base.BaseTestCase):
def tearDown(self):
super(LogRateLimitTestCase, self).tearDown()
rate_limit.uninstall_filter()
def install_filter(self, *args):
rate_limit.install_filter(*args)
logger = logging.getLogger()
# remove handlers to not pollute stdout
def restore_handlers(logger, handlers):
for handler in handlers:
logger.addHandler(handler)
self.addCleanup(restore_handlers, logger, list(logger.handlers))
for handler in list(logger.handlers):
logger.removeHandler(handler)
# install our handler writing logs into a StringIO
stream = six.StringIO()
handler = logging.StreamHandler(stream)
logger.addHandler(handler)
return (logger, stream)
@mock.patch('oslo_log.rate_limit.monotonic_clock')
def test_rate_limit(self, mock_clock):
mock_clock.return_value = 1
logger, stream = self.install_filter(2, 1)
# first burst
logger.error("message 1")
logger.error("message 2")
logger.error("message 3")
self.assertEqual(stream.getvalue(),
'message 1\n'
'message 2\n'
'Logging rate limit: drop after 2 records/1 sec\n')
# second burst (clock changed)
stream.seek(0)
stream.truncate()
mock_clock.return_value = 2
logger.error("message 4")
logger.error("message 5")
logger.error("message 6")
self.assertEqual(stream.getvalue(),
'message 4\n'
'message 5\n'
'Logging rate limit: drop after 2 records/1 sec\n')
@mock.patch('oslo_log.rate_limit.monotonic_clock')
def test_rate_limit_except_level(self, mock_clock):
mock_clock.return_value = 1
logger, stream = self.install_filter(1, 1, 'CRITICAL')
# first burst
logger.error("error 1")
logger.error("error 2")
logger.critical("critical 3")
logger.critical("critical 4")
self.assertEqual(stream.getvalue(),
'error 1\n'
'Logging rate limit: drop after 1 records/1 sec\n'
'critical 3\n'
'critical 4\n')
def test_install_twice(self):
rate_limit.install_filter(100, 1)
self.assertRaises(RuntimeError, rate_limit.install_filter, 100, 1)
@mock.patch('oslo_log.rate_limit.monotonic_clock')
def test_uninstall(self, mock_clock):
mock_clock.return_value = 1
logger, stream = self.install_filter(1, 1)
rate_limit.uninstall_filter()
# not limited
logger.error("message 1")
logger.error("message 2")
logger.error("message 3")
self.assertEqual(stream.getvalue(),
'message 1\n'
'message 2\n'
'message 3\n')

View File

@ -12,3 +12,4 @@ oslo.serialization>=1.10.0 # Apache-2.0
debtcollector>=1.2.0 # Apache-2.0
pyinotify>=0.9.6;sys_platform!='win32' and sys_platform!='darwin' and sys_platform!='sunos5' # MIT
python-dateutil>=2.4.2 # BSD
monotonic>=0.6 # Apache-2.0