Providing a fallback `dictconfig` implementation for Python < 2.7

This commit is contained in:
Ryan Petrello
2012-03-24 01:42:10 -04:00
parent ccd177d774
commit 13afd3bfbb
5 changed files with 1287 additions and 1 deletions

View File

@@ -11,7 +11,11 @@ from middleware.static import StaticFileMiddleware
from configuration import set_config, Config
from configuration import _runtime_conf as conf
from logging.config import dictConfig as load_logging_config
try:
from logging.config import dictConfig as load_logging_config
except ImportError:
from .compat.dictconfig import dictConfig as load_logging_config
__all__ = [

549
pecan/compat/dictconfig.py Normal file
View File

@@ -0,0 +1,549 @@
# Copyright 2009-2010 by Vinay Sajip. All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
# provided that the above copyright notice appear in all copies and that
# both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of Vinay Sajip
# not be used in advertising or publicity pertaining to distribution
# of the software without specific, written prior permission.
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import logging.handlers
import re
import sys
import types
IDENTIFIER = re.compile('^[a-z_][a-z0-9_]*$', re.I)
def valid_ident(s):
m = IDENTIFIER.match(s)
if not m:
raise ValueError('Not a valid Python identifier: %r' % s)
return True
#
# This function is defined in logging only in recent versions of Python
#
try:
from logging import _checkLevel
except ImportError:
def _checkLevel(level):
if isinstance(level, int):
rv = level
elif str(level) == level:
if level not in logging._levelNames:
raise ValueError('Unknown level: %r' % level)
rv = logging._levelNames[level]
else:
raise TypeError('Level not an integer or a '
'valid string: %r' % level)
return rv
# The ConvertingXXX classes are wrappers around standard Python containers,
# and they serve to convert any suitable values in the container. The
# conversion converts base dicts, lists and tuples to their wrapped
# equivalents, whereas strings which match a conversion format are converted
# appropriately.
#
# Each wrapper should have a configurator attribute holding the actual
# configurator to use for conversion.
class ConvertingDict(dict):
"""A converting dictionary wrapper."""
def __getitem__(self, key):
value = dict.__getitem__(self, key)
result = self.configurator.convert(value)
#If the converted value is different, save for next time
if value is not result:
self[key] = result
if type(result) in (ConvertingDict, ConvertingList,
ConvertingTuple):
result.parent = self
result.key = key
return result
def get(self, key, default=None):
value = dict.get(self, key, default)
result = self.configurator.convert(value)
#If the converted value is different, save for next time
if value is not result:
self[key] = result
if type(result) in (ConvertingDict, ConvertingList,
ConvertingTuple):
result.parent = self
result.key = key
return result
def pop(self, key, default=None):
value = dict.pop(self, key, default)
result = self.configurator.convert(value)
if value is not result:
if type(result) in (ConvertingDict, ConvertingList,
ConvertingTuple):
result.parent = self
result.key = key
return result
class ConvertingList(list):
"""A converting list wrapper."""
def __getitem__(self, key):
value = list.__getitem__(self, key)
result = self.configurator.convert(value)
#If the converted value is different, save for next time
if value is not result:
self[key] = result
if type(result) in (ConvertingDict, ConvertingList,
ConvertingTuple):
result.parent = self
result.key = key
return result
def pop(self, idx=-1):
value = list.pop(self, idx)
result = self.configurator.convert(value)
if value is not result:
if type(result) in (ConvertingDict, ConvertingList,
ConvertingTuple):
result.parent = self
return result
class ConvertingTuple(tuple):
"""A converting tuple wrapper."""
def __getitem__(self, key):
value = tuple.__getitem__(self, key)
result = self.configurator.convert(value)
if value is not result:
if type(result) in (ConvertingDict, ConvertingList,
ConvertingTuple):
result.parent = self
result.key = key
return result
class BaseConfigurator(object):
"""
The configurator base class which defines some useful defaults.
"""
CONVERT_PATTERN = re.compile(r'^(?P<prefix>[a-z]+)://(?P<suffix>.*)$')
WORD_PATTERN = re.compile(r'^\s*(\w+)\s*')
DOT_PATTERN = re.compile(r'^\.\s*(\w+)\s*')
INDEX_PATTERN = re.compile(r'^\[\s*(\w+)\s*\]\s*')
DIGIT_PATTERN = re.compile(r'^\d+$')
value_converters = {
'ext' : 'ext_convert',
'cfg' : 'cfg_convert',
}
# We might want to use a different one, e.g. importlib
importer = __import__
def __init__(self, config):
self.config = ConvertingDict(config)
self.config.configurator = self
def resolve(self, s):
"""
Resolve strings to objects using standard import and attribute
syntax.
"""
name = s.split('.')
used = name.pop(0)
try:
found = self.importer(used)
for frag in name:
used += '.' + frag
try:
found = getattr(found, frag)
except AttributeError:
self.importer(used)
found = getattr(found, frag)
return found
except ImportError:
e, tb = sys.exc_info()[1:]
v = ValueError('Cannot resolve %r: %s' % (s, e))
v.__cause__, v.__traceback__ = e, tb
raise v
def ext_convert(self, value):
"""Default converter for the ext:// protocol."""
return self.resolve(value)
def cfg_convert(self, value):
"""Default converter for the cfg:// protocol."""
rest = value
m = self.WORD_PATTERN.match(rest)
if m is None:
raise ValueError("Unable to convert %r" % value)
else:
rest = rest[m.end():]
d = self.config[m.groups()[0]]
#print d, rest
while rest:
m = self.DOT_PATTERN.match(rest)
if m:
d = d[m.groups()[0]]
else:
m = self.INDEX_PATTERN.match(rest)
if m:
idx = m.groups()[0]
if not self.DIGIT_PATTERN.match(idx):
d = d[idx]
else:
try:
n = int(idx) # try as number first (most likely)
d = d[n]
except TypeError:
d = d[idx]
if m:
rest = rest[m.end():]
else:
raise ValueError('Unable to convert '
'%r at %r' % (value, rest))
#rest should be empty
return d
def convert(self, value):
"""
Convert values to an appropriate type. dicts, lists and tuples are
replaced by their converting alternatives. Strings are checked to
see if they have a conversion format and are converted if they do.
"""
if not isinstance(value, ConvertingDict) and isinstance(value, dict):
value = ConvertingDict(value)
value.configurator = self
elif not isinstance(value, ConvertingList) and isinstance(value, list):
value = ConvertingList(value)
value.configurator = self
elif not isinstance(value, ConvertingTuple) and\
isinstance(value, tuple):
value = ConvertingTuple(value)
value.configurator = self
elif isinstance(value, basestring): # str for py3k
m = self.CONVERT_PATTERN.match(value)
if m:
d = m.groupdict()
prefix = d['prefix']
converter = self.value_converters.get(prefix, None)
if converter:
suffix = d['suffix']
converter = getattr(self, converter)
value = converter(suffix)
return value
def configure_custom(self, config):
"""Configure an object with a user-supplied factory."""
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
props = config.pop('.', None)
# Check for valid identifiers
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
result = c(**kwargs)
if props:
for name, value in props.items():
setattr(result, name, value)
return result
def as_tuple(self, value):
"""Utility function which converts lists to tuples."""
if isinstance(value, list):
value = tuple(value)
return value
class DictConfigurator(BaseConfigurator):
"""
Configure logging using a dictionary-like object to describe the
configuration.
"""
def configure(self):
"""Do the configuration."""
config = self.config
if 'version' not in config:
raise ValueError("dictionary doesn't specify a version")
if config['version'] != 1:
raise ValueError("Unsupported version: %s" % config['version'])
incremental = config.pop('incremental', False)
EMPTY_DICT = {}
logging._acquireLock()
try:
if incremental:
handlers = config.get('handlers', EMPTY_DICT)
# incremental handler config only if handler name
# ties in to logging._handlers (Python 2.7)
if sys.version_info[:2] == (2, 7):
for name in handlers:
if name not in logging._handlers:
raise ValueError('No handler found with '
'name %r' % name)
else:
try:
handler = logging._handlers[name]
handler_config = handlers[name]
level = handler_config.get('level', None)
if level:
handler.setLevel(_checkLevel(level))
except StandardError, e:
raise ValueError('Unable to configure handler '
'%r: %s' % (name, e))
loggers = config.get('loggers', EMPTY_DICT)
for name in loggers:
try:
self.configure_logger(name, loggers[name], True)
except StandardError, e:
raise ValueError('Unable to configure logger '
'%r: %s' % (name, e))
root = config.get('root', None)
if root:
try:
self.configure_root(root, True)
except StandardError, e:
raise ValueError('Unable to configure root '
'logger: %s' % e)
else:
disable_existing = config.pop('disable_existing_loggers', True)
logging._handlers.clear()
del logging._handlerList[:]
# Do formatters first - they don't refer to anything else
formatters = config.get('formatters', EMPTY_DICT)
for name in formatters:
try:
formatters[name] = self.configure_formatter(
formatters[name])
except StandardError, e:
raise ValueError('Unable to configure '
'formatter %r: %s' % (name, e))
# Next, do filters - they don't refer to anything else, either
filters = config.get('filters', EMPTY_DICT)
for name in filters:
try:
filters[name] = self.configure_filter(filters[name])
except StandardError, e:
raise ValueError('Unable to configure '
'filter %r: %s' % (name, e))
# Next, do handlers - they refer to formatters and filters
# As handlers can refer to other handlers, sort the keys
# to allow a deterministic order of configuration
handlers = config.get('handlers', EMPTY_DICT)
for name in sorted(handlers):
try:
handler = self.configure_handler(handlers[name])
handler.name = name
handlers[name] = handler
except StandardError, e:
raise ValueError('Unable to configure handler '
'%r: %s' % (name, e))
# Next, do loggers - they refer to handlers and filters
#we don't want to lose the existing loggers,
#since other threads may have pointers to them.
#existing is set to contain all existing loggers,
#and as we go through the new configuration we
#remove any which are configured. At the end,
#what's left in existing is the set of loggers
#which were in the previous configuration but
#which are not in the new configuration.
root = logging.root
existing = root.manager.loggerDict.keys()
#The list needs to be sorted so that we can
#avoid disabling child loggers of explicitly
#named loggers. With a sorted list it is easier
#to find the child loggers.
existing.sort()
#We'll keep the list of existing loggers
#which are children of named loggers here...
child_loggers = []
#now set up the new ones...
loggers = config.get('loggers', EMPTY_DICT)
for name in loggers:
if name in existing:
i = existing.index(name)
prefixed = name + "."
pflen = len(prefixed)
num_existing = len(existing)
i = i + 1 # look at the entry after name
while (i < num_existing) and\
(existing[i][:pflen] == prefixed):
child_loggers.append(existing[i])
i = i + 1
existing.remove(name)
try:
self.configure_logger(name, loggers[name])
except StandardError, e:
raise ValueError('Unable to configure logger '
'%r: %s' % (name, e))
#Disable any old loggers. There's no point deleting
#them as other threads may continue to hold references
#and by disabling them, you stop them doing any logging.
#However, don't disable children of named loggers, as that's
#probably not what was intended by the user.
for log in existing:
logger = root.manager.loggerDict[log]
if log in child_loggers:
logger.level = logging.NOTSET
logger.handlers = []
logger.propagate = True
elif disable_existing:
logger.disabled = True
# And finally, do the root logger
root = config.get('root', None)
if root:
try:
self.configure_root(root)
except StandardError, e:
raise ValueError('Unable to configure root '
'logger: %s' % e)
finally:
logging._releaseLock()
def configure_formatter(self, config):
"""Configure a formatter from a dictionary."""
if '()' in config:
factory = config['()'] # for use in exception handler
try:
result = self.configure_custom(config)
except TypeError, te:
if "'format'" not in str(te):
raise
#Name of parameter changed from fmt to format.
#Retry with old name.
#This is so that code can be used with older Python versions
#(e.g. by Django)
config['fmt'] = config.pop('format')
config['()'] = factory
result = self.configure_custom(config)
else:
fmt = config.get('format', None)
dfmt = config.get('datefmt', None)
result = logging.Formatter(fmt, dfmt)
return result
def configure_filter(self, config):
"""Configure a filter from a dictionary."""
if '()' in config:
result = self.configure_custom(config)
else:
name = config.get('name', '')
result = logging.Filter(name)
return result
def add_filters(self, filterer, filters):
"""Add filters to a filterer from a list of names."""
for f in filters:
try:
filterer.addFilter(self.config['filters'][f])
except StandardError, e:
raise ValueError('Unable to add filter %r: %s' % (f, e))
def configure_handler(self, config):
"""Configure a handler from a dictionary."""
formatter = config.pop('formatter', None)
if formatter:
try:
formatter = self.config['formatters'][formatter]
except StandardError, e:
raise ValueError('Unable to set formatter '
'%r: %s' % (formatter, e))
level = config.pop('level', None)
filters = config.pop('filters', None)
if '()' in config:
c = config.pop('()')
if not hasattr(c, '__call__') and hasattr(types, 'ClassType') and type(c) != types.ClassType:
c = self.resolve(c)
factory = c
else:
klass = self.resolve(config.pop('class'))
#Special case for handler which refers to another handler
if issubclass(klass, logging.handlers.MemoryHandler) and\
'target' in config:
try:
config['target'] = self.config['handlers'][config['target']]
except StandardError, e:
raise ValueError('Unable to set target handler '
'%r: %s' % (config['target'], e))
elif issubclass(klass, logging.handlers.SMTPHandler) and\
'mailhost' in config:
config['mailhost'] = self.as_tuple(config['mailhost'])
elif issubclass(klass, logging.handlers.SysLogHandler) and\
'address' in config:
config['address'] = self.as_tuple(config['address'])
factory = klass
kwargs = dict([(k, config[k]) for k in config if valid_ident(k)])
try:
result = factory(**kwargs)
except TypeError, te:
if "'stream'" not in str(te):
raise
#The argument name changed from strm to stream
#Retry with old name.
#This is so that code can be used with older Python versions
#(e.g. by Django)
kwargs['strm'] = kwargs.pop('stream')
result = factory(**kwargs)
if formatter:
result.setFormatter(formatter)
if level is not None:
result.setLevel(_checkLevel(level))
if filters:
self.add_filters(result, filters)
return result
def add_handlers(self, logger, handlers):
"""Add handlers to a logger from a list of names."""
for h in handlers:
try:
logger.addHandler(self.config['handlers'][h])
except StandardError, e:
raise ValueError('Unable to add handler %r: %s' % (h, e))
def common_logger_config(self, logger, config, incremental=False):
"""
Perform configuration which is common to root and non-root loggers.
"""
level = config.get('level', None)
if level is not None:
logger.setLevel(_checkLevel(level))
if not incremental:
#Remove any existing handlers
for h in logger.handlers[:]:
logger.removeHandler(h)
handlers = config.get('handlers', None)
if handlers:
self.add_handlers(logger, handlers)
filters = config.get('filters', None)
if filters:
self.add_filters(logger, filters)
def configure_logger(self, name, config, incremental=False):
"""Configure a non-root logger from a dictionary."""
logger = logging.getLogger(name)
self.common_logger_config(logger, config, incremental)
propagate = config.get('propagate', None)
if propagate is not None:
logger.propagate = propagate
def configure_root(self, config, incremental=False):
"""Configure a root logger from a dictionary."""
root = logging.getLogger()
self.common_logger_config(root, config, incremental)
dictConfigClass = DictConfigurator
def dictConfig(config):
"""Configure logging using a dictionary."""
dictConfigClass(config).configure()

View File

View File

@@ -0,0 +1,733 @@
# Copyright 2009-2010 by Vinay Sajip. All Rights Reserved.
#
# Permission to use, copy, modify, and distribute this software and its
# documentation for any purpose and without fee is hereby granted,
# provided that the above copyright notice appear in all copies and that
# both that copyright notice and this permission notice appear in
# supporting documentation, and that the name of Vinay Sajip
# not be used in advertising or publicity pertaining to distribution
# of the software without specific, written prior permission.
# VINAY SAJIP DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS SOFTWARE, INCLUDING
# ALL IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL
# VINAY SAJIP BE LIABLE FOR ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR
# ANY DAMAGES WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER
# IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
import sys
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO # noqa
from pecan.compat.dictconfig import dictConfig
import logging
import re
from test.test_support import captured_stdout, run_unittest
import unittest
class BaseTest(unittest.TestCase):
"""Base class for logging tests."""
log_format = "%(name)s -> %(levelname)s: %(message)s"
expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$"
message_num = 0
def setUp(self):
"""Setup the default logging stream to an internal StringIO instance,
so that we can examine log output as we want."""
logger_dict = logging.getLogger().manager.loggerDict
logging._acquireLock()
try:
self.saved_handlers = logging._handlers.copy()
self.saved_handler_list = logging._handlerList[:]
self.saved_loggers = logger_dict.copy()
self.saved_level_names = logging._levelNames.copy()
finally:
logging._releaseLock()
self.root_logger = logging.getLogger("")
self.original_logging_level = self.root_logger.getEffectiveLevel()
self.stream = StringIO()
self.root_logger.setLevel(logging.DEBUG)
self.root_hdlr = logging.StreamHandler(self.stream)
self.root_formatter = logging.Formatter(self.log_format)
self.root_hdlr.setFormatter(self.root_formatter)
self.root_logger.addHandler(self.root_hdlr)
def tearDown(self):
"""Remove our logging stream, and restore the original logging
level."""
self.stream.close()
self.root_logger.removeHandler(self.root_hdlr)
self.root_logger.setLevel(self.original_logging_level)
logging._acquireLock()
try:
logging._levelNames.clear()
logging._levelNames.update(self.saved_level_names)
logging._handlers.clear()
logging._handlers.update(self.saved_handlers)
logging._handlerList[:] = self.saved_handler_list
loggerDict = logging.getLogger().manager.loggerDict
loggerDict.clear()
loggerDict.update(self.saved_loggers)
finally:
logging._releaseLock()
def assert_log_lines(self, expected_values, stream=None):
"""Match the collected log lines against the regular expression
self.expected_log_pat, and compare the extracted group values to
the expected_values list of tuples."""
stream = stream or self.stream
pat = re.compile(self.expected_log_pat)
try:
stream.reset()
actual_lines = stream.readlines()
except AttributeError:
# StringIO.StringIO lacks a reset() method.
actual_lines = stream.getvalue().splitlines()
self.assertEquals(len(actual_lines), len(expected_values))
for actual, expected in zip(actual_lines, expected_values):
match = pat.search(actual)
if not match:
self.fail("Log line does not match expected pattern:\n" +
actual)
self.assertEquals(tuple(match.groups()), expected)
s = stream.read()
if s:
self.fail("Remaining output at end of log stream:\n" + s)
def next_message(self):
"""Generate a message consisting solely of an auto-incrementing
integer."""
self.message_num += 1
return "%d" % self.message_num
class ExceptionFormatter(logging.Formatter):
"""A special exception formatter."""
def formatException(self, ei):
return "Got a [%s]" % ei[0].__name__
def formatFunc(format, datefmt=None):
return logging.Formatter(format, datefmt)
def handlerFunc():
return logging.StreamHandler()
class CustomHandler(logging.StreamHandler):
pass
class ConfigDictTest(BaseTest):
"""Reading logging config from a dictionary."""
expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$"
# config0 is a standard configuration.
config0 = {
'version': 1,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'root' : {
'level' : 'WARNING',
'handlers' : ['hand1'],
},
}
# config1 adds a little to the standard configuration.
config1 = {
'version': 1,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
# config2 has a subtle configuration error that should be reported
config2 = {
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdbout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
#As config1 but with a misspelt level on a handler
config2a = {
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NTOSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
#As config1 but with a misspelt level on a logger
config2b = {
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WRANING',
},
}
# config3 has a less subtle configuration error
config3 = {
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'misspelled_name',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
# config4 specifies a custom formatter class to be loaded
config4 = {
'version': 1,
'formatters': {
'form1' : {
'()' : __name__ + '.ExceptionFormatter',
'format' : '%(levelname)s:%(name)s:%(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'root' : {
'level' : 'NOTSET',
'handlers' : ['hand1'],
},
}
# As config4 but using an actual callable rather than a string
config4a = {
'version': 1,
'formatters': {
'form1' : {
'()' : ExceptionFormatter,
'format' : '%(levelname)s:%(name)s:%(message)s',
},
'form2' : {
'()' : __name__ + '.formatFunc',
'format' : '%(levelname)s:%(name)s:%(message)s',
},
'form3' : {
'()' : formatFunc,
'format' : '%(levelname)s:%(name)s:%(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
'hand2' : {
'()' : handlerFunc,
},
},
'root' : {
'level' : 'NOTSET',
'handlers' : ['hand1'],
},
}
# config5 specifies a custom handler class to be loaded
config5 = {
'version': 1,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : __name__ + '.CustomHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
# config6 specifies a custom handler class to be loaded
# but has bad arguments
config6 = {
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : __name__ + '.CustomHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
'9' : 'invalid parameter name',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
#config 7 does not define compiler.parser but defines compiler.lexer
#so compiler.parser should be disabled after applying it
config7 = {
'version': 1,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.lexer' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'WARNING',
},
}
config8 = {
'version': 1,
'disable_existing_loggers' : False,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler' : {
'level' : 'DEBUG',
'handlers' : ['hand1'],
},
'compiler.lexer' : {
},
},
'root' : {
'level' : 'WARNING',
},
}
config9 = {
'version': 1,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'WARNING',
'stream' : 'ext://sys.stdout',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'WARNING',
'handlers' : ['hand1'],
},
},
'root' : {
'level' : 'NOTSET',
},
}
config9a = {
'version': 1,
'incremental' : True,
'handlers' : {
'hand1' : {
'level' : 'WARNING',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'INFO',
},
},
}
config9b = {
'version': 1,
'incremental' : True,
'handlers' : {
'hand1' : {
'level' : 'INFO',
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'INFO',
},
},
}
#As config1 but with a filter added
config10 = {
'version': 1,
'formatters': {
'form1' : {
'format' : '%(levelname)s ++ %(message)s',
},
},
'filters' : {
'filt1' : {
'name' : 'compiler.parser',
},
},
'handlers' : {
'hand1' : {
'class' : 'logging.StreamHandler',
'formatter' : 'form1',
'level' : 'NOTSET',
'stream' : 'ext://sys.stdout',
'filters' : ['filt1'],
},
},
'loggers' : {
'compiler.parser' : {
'level' : 'DEBUG',
'filters' : ['filt1'],
},
},
'root' : {
'level' : 'WARNING',
'handlers' : ['hand1'],
},
}
def apply_config(self, conf):
dictConfig(conf)
def test_config0_ok(self):
# A simple config which overrides the default settings.
with captured_stdout() as output:
self.apply_config(self.config0)
logger = logging.getLogger()
# Won't output anything
logger.info(self.next_message())
# Outputs a message
logger.error(self.next_message())
self.assert_log_lines([
('ERROR', '2'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
def test_config1_ok(self, config=config1):
# A config defining a sub-parser as well.
with captured_stdout() as output:
self.apply_config(config)
logger = logging.getLogger("compiler.parser")
# Both will output a message
logger.info(self.next_message())
logger.error(self.next_message())
self.assert_log_lines([
('INFO', '1'),
('ERROR', '2'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
def test_config2_failure(self):
# A simple config which overrides the default settings.
self.assertRaises(StandardError, self.apply_config, self.config2)
def test_config2a_failure(self):
# A simple config which overrides the default settings.
self.assertRaises(StandardError, self.apply_config, self.config2a)
def test_config2b_failure(self):
# A simple config which overrides the default settings.
self.assertRaises(StandardError, self.apply_config, self.config2b)
def test_config3_failure(self):
# A simple config which overrides the default settings.
self.assertRaises(StandardError, self.apply_config, self.config3)
def test_config4_ok(self):
# A config specifying a custom formatter class.
with captured_stdout() as output:
self.apply_config(self.config4)
#logger = logging.getLogger()
try:
raise RuntimeError()
except RuntimeError:
logging.exception("just testing")
sys.stdout.seek(0)
self.assertEquals(output.getvalue(),
"ERROR:root:just testing\nGot a [RuntimeError]\n")
# Original logger output is empty
self.assert_log_lines([])
def test_config4a_ok(self):
# A config specifying a custom formatter class.
with captured_stdout() as output:
self.apply_config(self.config4a)
#logger = logging.getLogger()
try:
raise RuntimeError()
except RuntimeError:
logging.exception("just testing")
sys.stdout.seek(0)
self.assertEquals(output.getvalue(),
"ERROR:root:just testing\nGot a [RuntimeError]\n")
# Original logger output is empty
self.assert_log_lines([])
def test_config5_ok(self):
self.test_config1_ok(config=self.config5)
def test_config6_failure(self):
self.assertRaises(StandardError, self.apply_config, self.config6)
def test_config7_ok(self):
with captured_stdout() as output:
self.apply_config(self.config1)
logger = logging.getLogger("compiler.parser")
# Both will output a message
logger.info(self.next_message())
logger.error(self.next_message())
self.assert_log_lines([
('INFO', '1'),
('ERROR', '2'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
with captured_stdout() as output:
self.apply_config(self.config7)
logger = logging.getLogger("compiler.parser")
self.assertTrue(logger.disabled)
logger = logging.getLogger("compiler.lexer")
# Both will output a message
logger.info(self.next_message())
logger.error(self.next_message())
self.assert_log_lines([
('INFO', '3'),
('ERROR', '4'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
#Same as test_config_7_ok but don't disable old loggers.
def test_config_8_ok(self):
with captured_stdout() as output:
self.apply_config(self.config1)
logger = logging.getLogger("compiler.parser")
# Both will output a message
logger.info(self.next_message())
logger.error(self.next_message())
self.assert_log_lines([
('INFO', '1'),
('ERROR', '2'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
with captured_stdout() as output:
self.apply_config(self.config8)
logger = logging.getLogger("compiler.parser")
self.assertFalse(logger.disabled)
# Both will output a message
logger.info(self.next_message())
logger.error(self.next_message())
logger = logging.getLogger("compiler.lexer")
# Both will output a message
logger.info(self.next_message())
logger.error(self.next_message())
self.assert_log_lines([
('INFO', '3'),
('ERROR', '4'),
('INFO', '5'),
('ERROR', '6'),
], stream=output)
# Original logger output is empty.
self.assert_log_lines([])
def test_config_9_ok(self):
with captured_stdout() as output:
self.apply_config(self.config9)
logger = logging.getLogger("compiler.parser")
#Nothing will be output since both handler and logger are set to WARNING
logger.info(self.next_message())
self.assert_log_lines([], stream=output)
self.apply_config(self.config9a)
#Nothing will be output since both handler is still set to WARNING
logger.info(self.next_message())
self.assert_log_lines([], stream=output)
self.apply_config(self.config9b)
#Message should now be output
logger.info(self.next_message())
if sys.version_info[:2] == (2, 7):
self.assert_log_lines([
('INFO', '3'),
], stream=output)
else:
self.assert_log_lines([], stream=output)
def test_config_10_ok(self):
with captured_stdout() as output:
self.apply_config(self.config10)
logger = logging.getLogger("compiler.parser")
logger.warning(self.next_message())
logger = logging.getLogger('compiler')
#Not output, because filtered
logger.warning(self.next_message())
logger = logging.getLogger('compiler.lexer')
#Not output, because filtered
logger.warning(self.next_message())
logger = logging.getLogger("compiler.parser.codegen")
#Output, as not filtered
logger.error(self.next_message())
self.assert_log_lines([
('WARNING', '1'),
('ERROR', '4'),
], stream=output)
def test_main():
run_unittest(ConfigDictTest)
if __name__ == "__main__":
test_main()