Move files out of the namespace package

Move the public API out of oslo.config to oslo_config. Retain the ability
to import from the old namespace package for backwards compatibility
for this release cycle.

bp/drop-namespace-packages

Change-Id: I56274336802036de050efc62eb2ee6b5d4ede77b
This commit is contained in:
Doug Hellmann 2014-12-05 15:33:58 -05:00
parent 063a5ef774
commit 70c5b67df3
43 changed files with 8856 additions and 3790 deletions

1
.gitignore vendored
View File

@ -14,3 +14,4 @@ dist/
.testrepository/
.project
.pydevproject
pbr-*.egg/

View File

@ -2,4 +2,4 @@
The cfg Module
--------------
.. automodule:: oslo.config.cfg
.. automodule:: oslo_config.cfg

View File

@ -2,4 +2,4 @@
The cfgfilter Module
--------------------
.. automodule:: oslo.config.cfgfilter
.. automodule:: oslo_config.cfgfilter

View File

@ -2,7 +2,7 @@
The ConfigOpts Class
--------------------
.. currentmodule:: oslo.config.cfg
.. currentmodule:: oslo_config.cfg
.. autoclass:: ConfigOpts
:members:

View File

@ -2,7 +2,7 @@
Exceptions
----------
.. currentmodule:: oslo.config.cfg
.. currentmodule:: oslo_config.cfg
.. autoexception:: Error
.. autoexception:: ArgsAlreadyParsedError

View File

@ -2,7 +2,7 @@
Test Fixture
------------
.. currentmodule:: oslo.config.fixture
.. currentmodule:: oslo_config.fixture
.. autoclass:: Config
:members:

View File

@ -2,9 +2,9 @@
oslo-config-generator
---------------------
.. automodule:: oslo.config.generator
.. automodule:: oslo_config.generator
.. currentmodule:: oslo.config.generator
.. currentmodule:: oslo_config.generator
.. autofunction:: main
.. autofunction:: generate

View File

@ -2,7 +2,7 @@
Helper Functions
----------------
.. currentmodule:: oslo.config.cfg
.. currentmodule:: oslo_config.cfg
.. autofunction:: find_config_files
.. autofunction:: set_defaults

View File

@ -2,7 +2,7 @@
Option Definitions
------------------
.. currentmodule:: oslo.config.cfg
.. currentmodule:: oslo_config.cfg
.. autoclass:: Opt
.. autoclass:: StrOpt

View File

@ -2,10 +2,10 @@
File Parsing
------------
.. autoclass:: oslo.config.iniparser.BaseParser
.. autoclass:: oslo_config.iniparser.BaseParser
.. autoclass:: oslo.config.cfg.ConfigParser
.. autoclass:: oslo_config.cfg.ConfigParser
:members: parse
.. autoclass:: oslo.config.cfg.MultiConfigParser
.. autoclass:: oslo_config.cfg.MultiConfigParser
:members: read, get

View File

@ -2,5 +2,5 @@
Option Types and Validation
---------------------------
.. automodule:: oslo.config.types
.. automodule:: oslo_config.types
:members:

View File

@ -0,0 +1,28 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import warnings
from oslo_config import * # noqa
def deprecated():
new_name = __name__.replace('.', '_')
warnings.warn(
('The oslo namespace package is deprecated. Please use %s instead.' %
new_name),
DeprecationWarning,
stacklevel=3,
)
deprecated()

File diff suppressed because it is too large Load Diff

View File

@ -1,5 +1,3 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -12,307 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
r"""
There are two use cases for the ConfigFilter class:
1. Help enforce that a given module does not access options registered
by another module, without first declaring those cross-module
dependencies using import_opt().
2. Prevent private configuration opts from being visible to modules
other than the one which registered it.
Cross-Module Option Dependencies
--------------------------------
When using the global cfg.CONF object, it is quite common for a module
to require the existence of configuration options registered by other
modules.
For example, if module 'foo' registers the 'blaa' option and the module
'bar' uses the 'blaa' option then 'bar' might do::
import foo
print(CONF.blaa)
However, it's completely non-obvious why foo is being imported (is it
unused, can we remove the import) and where the 'blaa' option comes from.
The CONF.import_opt() method allows such a dependency to be explicitly
declared::
CONF.import_opt('blaa', 'foo')
print(CONF.blaa)
However, import_opt() has a weakness - if 'bar' imports 'foo' using the
import builtin and doesn't use import_opt() to import 'blaa', then 'blaa'
can still be used without problems. Similarly, where multiple options
are registered a module imported via importopt(), a lazy programmer can
get away with only declaring a dependency on a single option.
The ConfigFilter class provides a way to ensure that options are not
available unless they have been registered in the module or imported using
import_opt() for example with::
CONF = ConfigFilter(cfg.CONF)
CONF.import_opt('blaa', 'foo')
print(CONF.blaa)
no other options other than 'blaa' are available via CONF.
Private Configuration Options
-----------------------------
Libraries which register configuration options typically do not want
users of the library API to access those configuration options. If
API users do access private configuration options, those users will
be disrupted if and when a configuration option is renamed. In other
words, one does not typically wish for the name of the private config
options to be part of the public API.
The ConfigFilter class provides a way for a library to register
options such that they are not visible via the ConfigOpts instance
which the API user supplies to the library. For example::
from __future__ import print_function
from oslo.config.cfg import *
from oslo.config.cfgfilter import *
class Widget(object):
def __init__(self, conf):
self.conf = conf
self._private_conf = ConfigFilter(self.conf)
self._private_conf.register_opt(StrOpt('foo'))
@property
def foo(self):
return self._private_conf.foo
conf = ConfigOpts()
widget = Widget(conf)
print(widget.foo)
print(conf.foo) # raises NoSuchOptError
"""
import collections
import itertools
from oslo.config import cfg
class ConfigFilter(collections.Mapping):
"""A helper class which wraps a ConfigOpts object.
ConfigFilter enforces the explicit declaration of dependencies on external
options and allows private options which are not registered with the
wrapped Configopts object.
"""
def __init__(self, conf):
"""Construct a ConfigFilter object.
:param conf: a ConfigOpts object
"""
self._conf = conf
self._fconf = cfg.ConfigOpts()
self._sync()
self._imported_opts = set()
self._imported_groups = dict()
def _sync(self):
if self._fconf._namespace is not self._conf._namespace:
self._fconf.clear()
self._fconf._namespace = self._conf._namespace
self._fconf._args = self._conf._args
def __getattr__(self, name):
"""Look up an option value.
:param name: the opt name (or 'dest', more precisely)
:returns: the option value (after string subsititution) or a GroupAttr
:raises: NoSuchOptError,ConfigFileValueError,TemplateSubstitutionError
"""
if name in self._imported_groups:
return self._imported_groups[name]
elif name in self._imported_opts:
return getattr(self._conf, name)
else:
self._sync()
return getattr(self._fconf, name)
def __getitem__(self, key):
"""Look up an option value."""
return getattr(self, key)
def __contains__(self, key):
"""Return True if key is the name of a registered opt or group."""
return (key in self._fconf or
key in self._imported_opts or
key in self._imported_groups)
def __iter__(self):
"""Iterate over all registered opt and group names."""
return itertools.chain(self._fconf.keys(),
self._imported_opts,
self._imported_groups.keys())
def __len__(self):
"""Return the number of options and option groups."""
return (len(self._fconf) +
len(self._imported_opts) +
len(self._imported_groups))
@staticmethod
def _already_registered(conf, opt, group=None):
group_name = group.name if isinstance(group, cfg.OptGroup) else group
return ((group_name is None and
opt.dest in conf) or
(group_name is not None and
group_name in conf and
opt.dest in conf[group_name]))
def register_opt(self, opt, group=None):
"""Register an option schema.
:param opt: an instance of an Opt sub-class
:param group: an optional OptGroup object or group name
:return: False if the opt was already registered, True otherwise
:raises: DuplicateOptError
"""
if self._already_registered(self._conf, opt, group):
# Raises DuplicateError if there is another opt with the same name
ret = self._conf.register_opt(opt, group)
self._import_opt(opt.dest, group)
return ret
else:
return self._fconf.register_opt(opt, group)
def register_opts(self, opts, group=None):
"""Register multiple option schemas at once."""
for opt in opts:
self.register_opt(opt, group)
def register_cli_opt(self, opt, group=None):
"""Register a CLI option schema.
:param opt: an instance of an Opt sub-class
:param group: an optional OptGroup object or group name
:return: False if the opt was already register, True otherwise
:raises: DuplicateOptError, ArgsAlreadyParsedError
"""
if self._already_registered(self._conf, opt, group):
# Raises DuplicateError if there is another opt with the same name
ret = self._conf.register_cli_opt(opt, group)
self._import_opt(opt.dest, group)
return ret
else:
return self._fconf.register_cli_opt(opt, group)
def register_cli_opts(self, opts, group=None):
"""Register multiple CLI option schemas at once."""
for opt in opts:
self.register_cli_opt(opt, group)
def register_group(self, group):
"""Register an option group.
:param group: an OptGroup object
"""
self._fconf.register_group(group)
def import_opt(self, opt_name, module_str, group=None):
"""Import an option definition from a module.
:param name: the name/dest of the opt
:param module_str: the name of a module to import
:param group: an option OptGroup object or group name
:raises: NoSuchOptError, NoSuchGroupError
"""
self._conf.import_opt(opt_name, module_str, group)
self._import_opt(opt_name, group)
def import_group(self, group, module_str):
"""Import an option group from a module.
Note that this allows access to all options registered with
the group whether or not those options were registered by
the given module.
:param group: an option OptGroup object or group name
:param module_str: the name of a module to import
:raises: ImportError, NoSuchGroupError
"""
self._conf.import_group(group, module_str)
group = self._import_group(group)
group._all_opts = True
def _import_opt(self, opt_name, group):
if group is None:
self._imported_opts.add(opt_name)
return True
else:
group = self._import_group(group)
return group._import_opt(opt_name)
def _import_group(self, group_or_name):
if isinstance(group_or_name, cfg.OptGroup):
group_name = group_or_name.name
else:
group_name = group_or_name
if group_name in self._imported_groups:
return self._imported_groups[group_name]
else:
group = self.GroupAttr(self._conf, group_name)
self._imported_groups[group_name] = group
return group
class GroupAttr(collections.Mapping):
"""Helper class to wrap a group object.
Represents the option values of a group as a mapping and attributes.
"""
def __init__(self, conf, group):
"""Construct a GroupAttr object.
:param conf: a ConfigOpts object
:param group: an OptGroup object
"""
self._conf = conf
self._group = group
self._imported_opts = set()
self._all_opts = False
def __getattr__(self, name):
"""Look up an option value."""
if not self._all_opts and name not in self._imported_opts:
raise cfg.NoSuchOptError(name)
return getattr(self._conf[self._group], name)
def __getitem__(self, key):
"""Look up an option value."""
return getattr(self, key)
def __contains__(self, key):
"""Return True if key is the name of a registered opt or group."""
return key in self._imported_opts
def __iter__(self):
"""Iterate over all registered opt and group names."""
for key in self._imported_opts:
yield key
def __len__(self):
"""Return the number of options and option groups."""
return len(self._imported_opts)
def _import_opt(self, opt_name):
self._imported_opts.add(opt_name)
from oslo_config.cfgfilter import * # noqa

View File

@ -1,8 +1,3 @@
#
# Copyright 2013 Mirantis, Inc.
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -15,104 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import six
from oslo.config import cfg
class Config(fixtures.Fixture):
"""Allows overriding configuration settings for the test.
`conf` will be reset on cleanup.
"""
def __init__(self, conf=cfg.CONF):
self.conf = conf
def setUp(self):
super(Config, self).setUp()
# NOTE(morganfainberg): unregister must be added to cleanup before
# reset is because cleanup works in reverse order of registered items,
# and a reset must occur before unregistering options can occur.
self.addCleanup(self._unregister_config_opts)
self.addCleanup(self.conf.reset)
self._registered_config_opts = {}
def config(self, **kw):
"""Override configuration values.
The keyword arguments are the names of configuration options to
override and their values.
If a `group` argument is supplied, the overrides are applied to
the specified configuration option group, otherwise the overrides
are applied to the ``default`` group.
"""
group = kw.pop('group', None)
for k, v in six.iteritems(kw):
self.conf.set_override(k, v, group)
def _unregister_config_opts(self):
for group in self._registered_config_opts:
self.conf.unregister_opts(self._registered_config_opts[group],
group=group)
def register_opt(self, opt, group=None):
"""Register a single option for the test run.
Options registered in this manner will automatically be unregistered
during cleanup.
If a `group` argument is supplied, it will register the new option
to that group, otherwise the option is registered to the ``default``
group.
"""
self.conf.register_opt(opt, group=group)
self._registered_config_opts.setdefault(group, set()).add(opt)
def register_opts(self, opts, group=None):
"""Register multiple options for the test run.
This works in the same manner as register_opt() but takes a list of
options as the first argument. All arguments will be registered to the
same group if the ``group`` argument is supplied, otherwise all options
will be registered to the ``default`` group.
"""
for opt in opts:
self.register_opt(opt, group=group)
def register_cli_opt(self, opt, group=None):
"""Register a single CLI option for the test run.
Options registered in this manner will automatically be unregistered
during cleanup.
If a `group` argument is supplied, it will register the new option
to that group, otherwise the option is registered to the ``default``
group.
CLI options must be registered before the command line and config files
are parsed. This is to ensure that all CLI options are shown in --help
and option validation works as expected.
"""
self.conf.register_cli_opt(opt, group=group)
self._registered_config_opts.setdefault(group, set()).add(opt)
def register_cli_opts(self, opts, group=None):
"""Register multiple CLI options for the test run.
This works in the same manner as register_opt() but takes a list of
options as the first argument. All arguments will be registered to the
same group if the ``group`` argument is supplied, otherwise all options
will be registered to the ``default`` group.
CLI options must be registered before the command line and config files
are parsed. This is to ensure that all CLI options are shown in --help
and option validation works as expected.
"""
for opt in opts:
self.register_cli_opt(opt, group=group)
from oslo_config.fixture import * # noqa

View File

@ -1,8 +1,3 @@
# Copyright 2012 SINA Corporation
# Copyright 2014 Cisco Systems, Inc.
# All Rights Reserved.
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -15,298 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
r"""
A sample configuration file generator.
oslo-config-generator is a utility for generating sample config files. For
example, to generate a sample config file for oslo.messaging you would run::
$> oslo-config-generator --namespace oslo.messaging > oslo.messaging.conf
This generated sample lists all of the available options, along with their help
string, type, deprecated aliases and defaults.
The --namespace option specifies an entry point name registered under the
'oslo.config.opts' entry point namespace. For example, in oslo.messaging's
setup.cfg we have::
[entry_points]
oslo.config.opts =
oslo.messaging = oslo.messaging.opts:list_opts
The callable referenced by the entry point should take no arguments and return
a list of (group_name, [opt_1, opt_2]) tuples. For example::
opts = [
cfg.StrOpt('foo'),
cfg.StrOpt('bar'),
]
cfg.CONF.register_opts(opts, group='blaa')
def list_opts():
return [('blaa', opts)]
You might choose to return a copy of the options so that the return value can't
be modified for nefarious purposes::
def list_opts():
return [('blaa', copy.deepcopy(opts))]
A single codebase might have multiple programs, each of which use a subset of
the total set of options registered by the codebase. In that case, you can
register multiple entry points::
[entry_points]
oslo.config.opts =
nova.common = nova.config:list_common_opts
nova.api = nova.config:list_api_opts
nova.compute = nova.config:list_compute_opts
and generate a config file specific to each program::
$> oslo-config-generator --namespace oslo.messaging \
--namespace nova.common \
--namespace nova.api > nova-api.conf
$> oslo-config-generator --namespace oslo.messaging \
--namespace nova.common \
--namespace nova.compute > nova-compute.conf
To make this more convenient, you can use config files to describe your config
files::
$> cat > config-generator/api.conf <<EOF
[DEFAULT]
output_file = etc/nova/nova-api.conf
namespace = oslo.messaging
namespace = nova.common
namespace = nova.api
EOF
$> cat > config-generator/compute.conf <<EOF
[DEFAULT]
output_file = etc/nova/nova-compute.conf
namespace = oslo.messaging
namespace = nova.common
namespace = nova.compute
EOF
$> oslo-config-generator --config-file config-generator/api.conf
$> oslo-config-generator --config-file config-generator/compute.conf
The default runtime values of configuration options are not always the most
suitable values to include in sample config files - for example, rather than
including the IP address or hostname of the machine where the config file
was generated, you might want to include something like '10.0.0.1'. To
facilitate this, options can be supplied with a 'sample_default' attribute::
cfg.StrOpt('base_dir'
default=os.getcwd(),
sample_default='/usr/lib/myapp')
"""
import logging
import operator
import sys
import textwrap
from oslo.config import cfg
import stevedore.named # noqa
LOG = logging.getLogger(__name__)
_generator_opts = [
cfg.StrOpt('output-file',
help='Path of the file to write to. Defaults to stdout.'),
cfg.IntOpt('wrap-width',
default=70,
help='The maximum length of help lines.'),
cfg.MultiStrOpt('namespace',
help='Option namespace under "oslo.config.opts" in which '
'to query for options.'),
]
def register_cli_opts(conf):
"""Register the formatter's CLI options with a ConfigOpts instance.
Note, this must be done before the ConfigOpts instance is called to parse
the configuration.
:param conf: a ConfigOpts instance
:raises: DuplicateOptError, ArgsAlreadyParsedError
"""
conf.register_cli_opts(_generator_opts)
class _OptFormatter(object):
"""Format configuration option descriptions to a file."""
_TYPE_DESCRIPTIONS = {
cfg.StrOpt: 'string value',
cfg.BoolOpt: 'boolean value',
cfg.IntOpt: 'integer value',
cfg.FloatOpt: 'floating point value',
cfg.ListOpt: 'list value',
cfg.DictOpt: 'dict value',
cfg.MultiStrOpt: 'multi valued',
}
def __init__(self, output_file=None, wrap_width=70):
"""Construct an OptFormatter object.
:param output_file: a writeable file object
:param wrap_width: The maximum length of help lines, 0 to not wrap
"""
self.output_file = output_file or sys.stdout
self.wrap_width = wrap_width
def _format_help(self, help_text):
"""Format the help for a group or option to the output file.
:param help_text: The text of the help string
"""
if self.wrap_width is not None and self.wrap_width > 0:
lines = [textwrap.fill(help_text,
self.wrap_width,
initial_indent='# ',
subsequent_indent='# ') + '\n']
else:
lines = ['# ' + help_text + '\n']
return lines
def format(self, opt):
"""Format a description of an option to the output file.
:param opt: a cfg.Opt instance
"""
if not opt.help:
LOG.warning('"%s" is missing a help string', opt.dest)
opt_type = self._TYPE_DESCRIPTIONS.get(type(opt), 'unknown type')
help_text = u'%s(%s)' % (opt.help + ' ' if opt.help else '', opt_type)
lines = self._format_help(help_text)
for d in opt.deprecated_opts:
lines.append('# Deprecated group/name - [%s]/%s\n' %
(d.group or 'DEFAULT', d.name or opt.dest))
if isinstance(opt, cfg.MultiStrOpt):
if opt.sample_default is not None:
defaults = opt.sample_default
elif not opt.default:
defaults = ['']
else:
defaults = opt.default
else:
if opt.sample_default is not None:
default_str = str(opt.sample_default)
elif opt.default is None:
default_str = '<None>'
elif isinstance(opt, cfg.StrOpt):
default_str = opt.default
elif isinstance(opt, cfg.BoolOpt):
default_str = str(opt.default).lower()
elif (isinstance(opt, cfg.IntOpt) or
isinstance(opt, cfg.FloatOpt)):
default_str = str(opt.default)
elif isinstance(opt, cfg.ListOpt):
default_str = ','.join(opt.default)
elif isinstance(opt, cfg.DictOpt):
sorted_items = sorted(opt.default.items(),
key=operator.itemgetter(0))
default_str = ','.join(['%s:%s' % i for i in sorted_items])
else:
LOG.warning('Unknown option type: %s', repr(opt))
default_str = str(opt.default)
defaults = [default_str]
for default_str in defaults:
if default_str.strip() != default_str:
default_str = '"%s"' % default_str
if default_str:
default_str = ' ' + default_str
lines.append('#%s =%s\n' % (opt.dest, default_str))
self.writelines(lines)
def write(self, s):
"""Write an arbitrary string to the output file.
:param s: an arbitrary string
"""
self.output_file.write(s)
def writelines(self, l):
"""Write an arbitrary sequence of strings to the output file.
:param l: a list of arbitrary strings
"""
self.output_file.writelines(l)
def _list_opts(namespaces):
"""List the options available via the given namespaces.
:param namespaces: a list of namespaces registered under 'oslo.config.opts'
:returns: a list of (namespace, [(group, [opt_1, opt_2])]) tuples
"""
mgr = stevedore.named.NamedExtensionManager('oslo.config.opts',
names=namespaces,
invoke_on_load=True)
return [(ep.name, ep.obj) for ep in mgr]
def generate(conf):
"""Generate a sample config file.
List all of the options available via the namespaces specified in the given
configuration and write a description of them to the specified output file.
:param conf: a ConfigOpts instance containing the generator's configuration
"""
conf.register_opts(_generator_opts)
output_file = (open(conf.output_file, 'w')
if conf.output_file else sys.stdout)
formatter = _OptFormatter(output_file=output_file,
wrap_width=conf.wrap_width)
groups = {'DEFAULT': []}
for namespace, listing in _list_opts(conf.namespace):
for group, opts in listing:
if not opts:
continue
namespaces = groups.setdefault(group or 'DEFAULT', [])
namespaces.append((namespace,
dict((opt.dest, opt) for opt in opts)))
def _output_opts(f, group, namespaces):
f.write('[%s]\n' % group)
for (namespace, opts_by_dest) in sorted(namespaces,
key=operator.itemgetter(0)):
f.write('\n#\n# From %s\n#\n' % namespace)
for opt in sorted(opts_by_dest.values(),
key=operator.attrgetter('dest')):
f.write('\n')
f.format(opt)
_output_opts(formatter, 'DEFAULT', groups.pop('DEFAULT'))
for group, namespaces in sorted(groups.items(),
key=operator.itemgetter(0)):
formatter.write('\n\n')
_output_opts(formatter, group, namespaces)
def main(args=None):
"""The main function of oslo-config-generator."""
logging.basicConfig(level=logging.WARN)
conf = cfg.ConfigOpts()
register_cli_opts(conf)
conf(args)
generate(conf)
if __name__ == '__main__':
main()
from oslo_config.generator import * # noqa

View File

@ -1,5 +1,3 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -12,116 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
class ParseError(Exception):
def __init__(self, message, lineno, line):
self.msg = message
self.line = line
self.lineno = lineno
def __str__(self):
return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line)
class BaseParser(object):
lineno = 0
parse_exc = ParseError
def _assignment(self, key, value):
self.assignment(key, value)
return None, []
def _get_section(self, line):
if not line.endswith(']'):
return self.error_no_section_end_bracket(line)
if len(line) <= 2:
return self.error_no_section_name(line)
return line[1:-1]
def _split_key_value(self, line):
colon = line.find(':')
equal = line.find('=')
if colon < 0 and equal < 0:
return self.error_invalid_assignment(line)
if colon < 0 or (equal >= 0 and equal < colon):
key, value = line[:equal], line[equal + 1:]
else:
key, value = line[:colon], line[colon + 1:]
value = value.strip()
if value and value[0] == value[-1] and value.startswith(("\"", "'")):
value = value[1:-1]
return key.strip(), [value]
def parse(self, lineiter):
key = None
value = []
for line in lineiter:
self.lineno += 1
line = line.rstrip()
if not line:
# Blank line, ends multi-line values
if key:
key, value = self._assignment(key, value)
continue
elif line.startswith((' ', '\t')):
# Continuation of previous assignment
if key is None:
self.error_unexpected_continuation(line)
else:
value.append(line.lstrip())
continue
if key:
# Flush previous assignment, if any
key, value = self._assignment(key, value)
if line.startswith('['):
# Section start
section = self._get_section(line)
if section:
self.new_section(section)
elif line.startswith(('#', ';')):
self.comment(line[1:].lstrip())
else:
key, value = self._split_key_value(line)
if not key:
return self.error_empty_key(line)
if key:
# Flush previous assignment, if any
self._assignment(key, value)
def assignment(self, key, value):
"""Called when a full assignment is parsed."""
raise NotImplementedError()
def new_section(self, section):
"""Called when a new section is started."""
raise NotImplementedError()
def comment(self, comment):
"""Called when a comment is parsed."""
pass
def error_invalid_assignment(self, line):
raise self.parse_exc("No ':' or '=' found in assignment",
self.lineno, line)
def error_empty_key(self, line):
raise self.parse_exc('Key cannot be empty', self.lineno, line)
def error_unexpected_continuation(self, line):
raise self.parse_exc('Unexpected continuation line',
self.lineno, line)
def error_no_section_end_bracket(self, line):
raise self.parse_exc('Invalid section (must end with ])',
self.lineno, line)
def error_no_section_name(self, line):
raise self.parse_exc('Empty section name', self.lineno, line)
from oslo_config.iniparser import * # noqa

View File

@ -1,5 +1,3 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
@ -12,402 +10,4 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Type conversion and validation classes for configuration options.
Use these classes as values for the `type` argument to
:class:`oslo.config.cfg.Opt` and its subclasses.
"""
import netaddr
import six
class ConfigType(object):
BASE_TYPES = (None,)
def is_base_type(self, other):
return isinstance(other, self.BASE_TYPES)
class String(ConfigType):
"""String type.
String values do not get transformed and are returned as str objects.
:param choices: Optional sequence of valid values.
:param quotes: If True and string is enclosed with single or double
quotes, will strip those quotes. Will signal error if
string have quote at the beginning and no quote at
the end. Turned off by default. Useful if used with
container types like List.
"""
BASE_TYPES = six.string_types
def __init__(self, choices=None, quotes=False):
super(String, self).__init__()
self.choices = choices
self.quotes = quotes
def __call__(self, value):
value = str(value)
if self.quotes and value:
if value[0] in "\"'":
if value[-1] != value[0]:
raise ValueError('Non-closed quote: %s' % value)
value = value[1:-1]
if self.choices is None or value in self.choices:
return value
raise ValueError(
'Valid values are [%s], but found %s' % (
', '.join([str(v) for v in self.choices]),
repr(value)))
def __repr__(self):
if self.choices:
return 'String(choices=%s)' % repr(self.choices)
return 'String'
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.choices == other.choices) and
(self.quotes == other.quotes)
)
class MultiString(String):
BASE_TYPES = six.string_types + (list,)
class Boolean(ConfigType):
"""Boolean type.
Values are case insensitive and can be set using
1/0, yes/no, true/false or on/off.
"""
TRUE_VALUES = ['true', '1', 'on', 'yes']
FALSE_VALUES = ['false', '0', 'off', 'no']
BASE_TYPES = (bool,)
def __call__(self, value):
if isinstance(value, bool):
return value
s = value.lower()
if s in self.TRUE_VALUES:
return True
elif s in self.FALSE_VALUES:
return False
else:
raise ValueError('Unexpected boolean value %r' % value)
def __repr__(self):
return 'Boolean'
def __eq__(self, other):
return self.__class__ == other.__class__
class Integer(ConfigType):
"""Integer type.
Converts value to an integer optionally doing range checking.
If value is whitespace or empty string will return None.
:param min: Optional check that value is greater than or equal to min
:param max: Optional check that value is less than or equal to max
"""
BASE_TYPES = six.integer_types
def __init__(self, min=None, max=None):
super(Integer, self).__init__()
self.min = min
self.max = max
if min and max and max < min:
raise ValueError('Max value is less than min value')
def __call__(self, value):
if not isinstance(value, int):
s = str(value).strip()
if s == '':
value = None
else:
value = int(value)
if value:
self._check_range(value)
return value
def _check_range(self, value):
if self.min and value < self.min:
raise ValueError('Should be greater than or equal to %d' %
self.min)
if self.max and value > self.max:
raise ValueError('Should be less than or equal to %d' % self.max)
def __repr__(self):
props = []
if self.min:
props.append('min=%d' % self.min)
if self.max:
props.append('max=%d' % self.max)
if props:
return 'Integer(%s)' % ', '.join(props)
return 'Integer'
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.min == other.min) and
(self.max == other.max)
)
class Float(ConfigType):
"""Float type."""
# allow float to be set from int
BASE_TYPES = six.integer_types + (float,)
def __call__(self, value):
if isinstance(value, float):
return value
return float(value)
def __repr__(self):
return 'Float'
def __eq__(self, other):
return self.__class__ == other.__class__
class List(ConfigType):
"""List type.
Represent values of other (item) type, separated by commas.
The resulting value is a list containing those values.
List doesn't know if item type can also contain commas. To workaround this
it tries the following: if the next part fails item validation, it appends
comma and next item until validation succeeds or there is no parts left.
In the later case it will signal validation error.
:param item_type: type of list items
:param bounds: if True, value should be inside "[" and "]" pair
"""
BASE_TYPES = (list,)
def __init__(self, item_type=None, bounds=False):
super(List, self).__init__()
if item_type is None:
item_type = String()
if not callable(item_type):
raise TypeError('item_type must be callable')
self.item_type = item_type
self.bounds = bounds
def __call__(self, value):
if isinstance(value, list):
return value
result = []
s = value.strip()
if self.bounds:
if not s.startswith('['):
raise ValueError('Value should start with "["')
if not s.endswith(']'):
raise ValueError('Value should end with "]"')
s = s[1:-1]
if s == '':
return result
values = s.split(',')
while values:
value = values.pop(0)
while True:
first_error = None
try:
validated_value = self.item_type(value.strip())
break
except ValueError as e:
if not first_error:
first_error = e
if len(values) == 0:
raise first_error
value += ',' + values.pop(0)
result.append(validated_value)
return result
def __repr__(self):
return 'List of %s' % repr(self.item_type)
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.item_type == other.item_type)
)
class Dict(ConfigType):
"""Dictionary type.
Dictionary type values are key:value pairs separated by commas.
The resulting value is a dictionary of these key/value pairs.
Type of dictionary key is always string, but dictionary value
type can be customized.
:param value_type: type of values in dictionary
:param bounds: if True, value should be inside "{" and "}" pair
"""
BASE_TYPES = (dict,)
def __init__(self, value_type=None, bounds=False):
super(Dict, self).__init__()
if value_type is None:
value_type = String()
if not callable(value_type):
raise TypeError('value_type must be callable')
self.value_type = value_type
self.bounds = bounds
def __call__(self, value):
if isinstance(value, dict):
return value
result = {}
s = value.strip()
if self.bounds:
if not s.startswith('{'):
raise ValueError('Value should start with "{"')
if not s.endswith('}'):
raise ValueError('Value should end with "}"')
s = s[1:-1]
if s == '':
return result
pairs = s.split(',')
while pairs:
pair = pairs.pop(0)
while True:
first_error = None
try:
key_value = pair.split(':', 1)
if len(key_value) < 2:
raise ValueError('Value should be NAME:VALUE pairs '
'separated by ","')
key, value = key_value
key = key.strip()
value = value.strip()
value = self.value_type(value)
break
except ValueError as e:
if not first_error:
first_error = e
if not pairs:
raise first_error
pair += ',' + pairs.pop(0)
if key == '':
raise ValueError('Key name should not be empty')
if key in result:
raise ValueError('Duplicate key %s' % key)
result[key] = value
return result
def __repr__(self):
return 'Dict of %s' % repr(self.value_type)
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.value_type == other.value_type)
)
class IPAddress(ConfigType):
"""IP address type
Represents either ipv4 or ipv6. Without specifying version parameter both
versions are checked
:param version: defines which version should be explicitly checked (4 or 6)
"""
BASE_TYPES = six.string_types
def __init__(self, version=None):
super(IPAddress, self).__init__()
version_checkers = {
None: self._check_both_versions,
4: self._check_ipv4,
6: self._check_ipv6
}
self.version_checker = version_checkers.get(version)
if self.version_checker is None:
raise TypeError("%s is not a valid IP version." % version)
def __call__(self, value):
value = str(value)
if not value:
raise ValueError("IP address cannot be an empty string")
self.version_checker(value)
return value
def __repr__(self):
return "IPAddress"
def __eq__(self, other):
return self.__class__ == other.__class__
def _check_ipv4(self, address):
if not netaddr.valid_ipv4(address, netaddr.core.INET_PTON):
raise ValueError("%s is not an IPv4 address" % address)
def _check_ipv6(self, address):
if not netaddr.valid_ipv6(address, netaddr.core.INET_PTON):
raise ValueError("%s is not an IPv6 address" % address)
def _check_both_versions(self, address):
if not (netaddr.valid_ipv4(address, netaddr.core.INET_PTON) or
netaddr.valid_ipv6(address, netaddr.core.INET_PTON)):
raise ValueError("%s is not IPv4 or IPv6 address" % address)
from oslo_config.types import * # noqa

0
oslo_config/__init__.py Normal file
View File

2471
oslo_config/cfg.py Normal file

File diff suppressed because it is too large Load Diff

318
oslo_config/cfgfilter.py Normal file
View File

@ -0,0 +1,318 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
r"""
There are two use cases for the ConfigFilter class:
1. Help enforce that a given module does not access options registered
by another module, without first declaring those cross-module
dependencies using import_opt().
2. Prevent private configuration opts from being visible to modules
other than the one which registered it.
Cross-Module Option Dependencies
--------------------------------
When using the global cfg.CONF object, it is quite common for a module
to require the existence of configuration options registered by other
modules.
For example, if module 'foo' registers the 'blaa' option and the module
'bar' uses the 'blaa' option then 'bar' might do::
import foo
print(CONF.blaa)
However, it's completely non-obvious why foo is being imported (is it
unused, can we remove the import) and where the 'blaa' option comes from.
The CONF.import_opt() method allows such a dependency to be explicitly
declared::
CONF.import_opt('blaa', 'foo')
print(CONF.blaa)
However, import_opt() has a weakness - if 'bar' imports 'foo' using the
import builtin and doesn't use import_opt() to import 'blaa', then 'blaa'
can still be used without problems. Similarly, where multiple options
are registered a module imported via importopt(), a lazy programmer can
get away with only declaring a dependency on a single option.
The ConfigFilter class provides a way to ensure that options are not
available unless they have been registered in the module or imported using
import_opt() for example with::
CONF = ConfigFilter(cfg.CONF)
CONF.import_opt('blaa', 'foo')
print(CONF.blaa)
no other options other than 'blaa' are available via CONF.
Private Configuration Options
-----------------------------
Libraries which register configuration options typically do not want
users of the library API to access those configuration options. If
API users do access private configuration options, those users will
be disrupted if and when a configuration option is renamed. In other
words, one does not typically wish for the name of the private config
options to be part of the public API.
The ConfigFilter class provides a way for a library to register
options such that they are not visible via the ConfigOpts instance
which the API user supplies to the library. For example::
from __future__ import print_function
from oslo.config.cfg import *
from oslo.config.cfgfilter import *
class Widget(object):
def __init__(self, conf):
self.conf = conf
self._private_conf = ConfigFilter(self.conf)
self._private_conf.register_opt(StrOpt('foo'))
@property
def foo(self):
return self._private_conf.foo
conf = ConfigOpts()
widget = Widget(conf)
print(widget.foo)
print(conf.foo) # raises NoSuchOptError
"""
import collections
import itertools
from oslo.config import cfg
class ConfigFilter(collections.Mapping):
"""A helper class which wraps a ConfigOpts object.
ConfigFilter enforces the explicit declaration of dependencies on external
options and allows private options which are not registered with the
wrapped Configopts object.
"""
def __init__(self, conf):
"""Construct a ConfigFilter object.
:param conf: a ConfigOpts object
"""
self._conf = conf
self._fconf = cfg.ConfigOpts()
self._sync()
self._imported_opts = set()
self._imported_groups = dict()
def _sync(self):
if self._fconf._namespace is not self._conf._namespace:
self._fconf.clear()
self._fconf._namespace = self._conf._namespace
self._fconf._args = self._conf._args
def __getattr__(self, name):
"""Look up an option value.
:param name: the opt name (or 'dest', more precisely)
:returns: the option value (after string subsititution) or a GroupAttr
:raises: NoSuchOptError,ConfigFileValueError,TemplateSubstitutionError
"""
if name in self._imported_groups:
return self._imported_groups[name]
elif name in self._imported_opts:
return getattr(self._conf, name)
else:
self._sync()
return getattr(self._fconf, name)
def __getitem__(self, key):
"""Look up an option value."""
return getattr(self, key)
def __contains__(self, key):
"""Return True if key is the name of a registered opt or group."""
return (key in self._fconf or
key in self._imported_opts or
key in self._imported_groups)
def __iter__(self):
"""Iterate over all registered opt and group names."""
return itertools.chain(self._fconf.keys(),
self._imported_opts,
self._imported_groups.keys())
def __len__(self):
"""Return the number of options and option groups."""
return (len(self._fconf) +
len(self._imported_opts) +
len(self._imported_groups))
@staticmethod
def _already_registered(conf, opt, group=None):
group_name = group.name if isinstance(group, cfg.OptGroup) else group
return ((group_name is None and
opt.dest in conf) or
(group_name is not None and
group_name in conf and
opt.dest in conf[group_name]))
def register_opt(self, opt, group=None):
"""Register an option schema.
:param opt: an instance of an Opt sub-class
:param group: an optional OptGroup object or group name
:return: False if the opt was already registered, True otherwise
:raises: DuplicateOptError
"""
if self._already_registered(self._conf, opt, group):
# Raises DuplicateError if there is another opt with the same name
ret = self._conf.register_opt(opt, group)
self._import_opt(opt.dest, group)
return ret
else:
return self._fconf.register_opt(opt, group)
def register_opts(self, opts, group=None):
"""Register multiple option schemas at once."""
for opt in opts:
self.register_opt(opt, group)
def register_cli_opt(self, opt, group=None):
"""Register a CLI option schema.
:param opt: an instance of an Opt sub-class
:param group: an optional OptGroup object or group name
:return: False if the opt was already register, True otherwise
:raises: DuplicateOptError, ArgsAlreadyParsedError
"""
if self._already_registered(self._conf, opt, group):
# Raises DuplicateError if there is another opt with the same name
ret = self._conf.register_cli_opt(opt, group)
self._import_opt(opt.dest, group)
return ret
else:
return self._fconf.register_cli_opt(opt, group)
def register_cli_opts(self, opts, group=None):
"""Register multiple CLI option schemas at once."""
for opt in opts:
self.register_cli_opt(opt, group)
def register_group(self, group):
"""Register an option group.
:param group: an OptGroup object
"""
self._fconf.register_group(group)
def import_opt(self, opt_name, module_str, group=None):
"""Import an option definition from a module.
:param name: the name/dest of the opt
:param module_str: the name of a module to import
:param group: an option OptGroup object or group name
:raises: NoSuchOptError, NoSuchGroupError
"""
self._conf.import_opt(opt_name, module_str, group)
self._import_opt(opt_name, group)
def import_group(self, group, module_str):
"""Import an option group from a module.
Note that this allows access to all options registered with
the group whether or not those options were registered by
the given module.
:param group: an option OptGroup object or group name
:param module_str: the name of a module to import
:raises: ImportError, NoSuchGroupError
"""
self._conf.import_group(group, module_str)
group = self._import_group(group)
group._all_opts = True
def _import_opt(self, opt_name, group):
if group is None:
self._imported_opts.add(opt_name)
return True
else:
group = self._import_group(group)
return group._import_opt(opt_name)
def _import_group(self, group_or_name):
if isinstance(group_or_name, cfg.OptGroup):
group_name = group_or_name.name
else:
group_name = group_or_name
if group_name in self._imported_groups:
return self._imported_groups[group_name]
else:
group = self.GroupAttr(self._conf, group_name)
self._imported_groups[group_name] = group
return group
class GroupAttr(collections.Mapping):
"""Helper class to wrap a group object.
Represents the option values of a group as a mapping and attributes.
"""
def __init__(self, conf, group):
"""Construct a GroupAttr object.
:param conf: a ConfigOpts object
:param group: an OptGroup object
"""
self._conf = conf
self._group = group
self._imported_opts = set()
self._all_opts = False
def __getattr__(self, name):
"""Look up an option value."""
if not self._all_opts and name not in self._imported_opts:
raise cfg.NoSuchOptError(name)
return getattr(self._conf[self._group], name)
def __getitem__(self, key):
"""Look up an option value."""
return getattr(self, key)
def __contains__(self, key):
"""Return True if key is the name of a registered opt or group."""
return key in self._imported_opts
def __iter__(self):
"""Iterate over all registered opt and group names."""
for key in self._imported_opts:
yield key
def __len__(self):
"""Return the number of options and option groups."""
return len(self._imported_opts)
def _import_opt(self, opt_name):
self._imported_opts.add(opt_name)

118
oslo_config/fixture.py Normal file
View File

@ -0,0 +1,118 @@
#
# Copyright 2013 Mirantis, Inc.
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import fixtures
import six
from oslo.config import cfg
class Config(fixtures.Fixture):
"""Allows overriding configuration settings for the test.
`conf` will be reset on cleanup.
"""
def __init__(self, conf=cfg.CONF):
self.conf = conf
def setUp(self):
super(Config, self).setUp()
# NOTE(morganfainberg): unregister must be added to cleanup before
# reset is because cleanup works in reverse order of registered items,
# and a reset must occur before unregistering options can occur.
self.addCleanup(self._unregister_config_opts)
self.addCleanup(self.conf.reset)
self._registered_config_opts = {}
def config(self, **kw):
"""Override configuration values.
The keyword arguments are the names of configuration options to
override and their values.
If a `group` argument is supplied, the overrides are applied to
the specified configuration option group, otherwise the overrides
are applied to the ``default`` group.
"""
group = kw.pop('group', None)
for k, v in six.iteritems(kw):
self.conf.set_override(k, v, group)
def _unregister_config_opts(self):
for group in self._registered_config_opts:
self.conf.unregister_opts(self._registered_config_opts[group],
group=group)
def register_opt(self, opt, group=None):
"""Register a single option for the test run.
Options registered in this manner will automatically be unregistered
during cleanup.
If a `group` argument is supplied, it will register the new option
to that group, otherwise the option is registered to the ``default``
group.
"""
self.conf.register_opt(opt, group=group)
self._registered_config_opts.setdefault(group, set()).add(opt)
def register_opts(self, opts, group=None):
"""Register multiple options for the test run.
This works in the same manner as register_opt() but takes a list of
options as the first argument. All arguments will be registered to the
same group if the ``group`` argument is supplied, otherwise all options
will be registered to the ``default`` group.
"""
for opt in opts:
self.register_opt(opt, group=group)
def register_cli_opt(self, opt, group=None):
"""Register a single CLI option for the test run.
Options registered in this manner will automatically be unregistered
during cleanup.
If a `group` argument is supplied, it will register the new option
to that group, otherwise the option is registered to the ``default``
group.
CLI options must be registered before the command line and config files
are parsed. This is to ensure that all CLI options are shown in --help
and option validation works as expected.
"""
self.conf.register_cli_opt(opt, group=group)
self._registered_config_opts.setdefault(group, set()).add(opt)
def register_cli_opts(self, opts, group=None):
"""Register multiple CLI options for the test run.
This works in the same manner as register_opt() but takes a list of
options as the first argument. All arguments will be registered to the
same group if the ``group`` argument is supplied, otherwise all options
will be registered to the ``default`` group.
CLI options must be registered before the command line and config files
are parsed. This is to ensure that all CLI options are shown in --help
and option validation works as expected.
"""
for opt in opts:
self.register_cli_opt(opt, group=group)

312
oslo_config/generator.py Normal file
View File

@ -0,0 +1,312 @@
# Copyright 2012 SINA Corporation
# Copyright 2014 Cisco Systems, Inc.
# All Rights Reserved.
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
r"""
A sample configuration file generator.
oslo-config-generator is a utility for generating sample config files. For
example, to generate a sample config file for oslo.messaging you would run::
$> oslo-config-generator --namespace oslo.messaging > oslo.messaging.conf
This generated sample lists all of the available options, along with their help
string, type, deprecated aliases and defaults.
The --namespace option specifies an entry point name registered under the
'oslo.config.opts' entry point namespace. For example, in oslo.messaging's
setup.cfg we have::
[entry_points]
oslo.config.opts =
oslo.messaging = oslo.messaging.opts:list_opts
The callable referenced by the entry point should take no arguments and return
a list of (group_name, [opt_1, opt_2]) tuples. For example::
opts = [
cfg.StrOpt('foo'),
cfg.StrOpt('bar'),
]
cfg.CONF.register_opts(opts, group='blaa')
def list_opts():
return [('blaa', opts)]
You might choose to return a copy of the options so that the return value can't
be modified for nefarious purposes::
def list_opts():
return [('blaa', copy.deepcopy(opts))]
A single codebase might have multiple programs, each of which use a subset of
the total set of options registered by the codebase. In that case, you can
register multiple entry points::
[entry_points]
oslo.config.opts =
nova.common = nova.config:list_common_opts
nova.api = nova.config:list_api_opts
nova.compute = nova.config:list_compute_opts
and generate a config file specific to each program::
$> oslo-config-generator --namespace oslo.messaging \
--namespace nova.common \
--namespace nova.api > nova-api.conf
$> oslo-config-generator --namespace oslo.messaging \
--namespace nova.common \
--namespace nova.compute > nova-compute.conf
To make this more convenient, you can use config files to describe your config
files::
$> cat > config-generator/api.conf <<EOF
[DEFAULT]
output_file = etc/nova/nova-api.conf
namespace = oslo.messaging
namespace = nova.common
namespace = nova.api
EOF
$> cat > config-generator/compute.conf <<EOF
[DEFAULT]
output_file = etc/nova/nova-compute.conf
namespace = oslo.messaging
namespace = nova.common
namespace = nova.compute
EOF
$> oslo-config-generator --config-file config-generator/api.conf
$> oslo-config-generator --config-file config-generator/compute.conf
The default runtime values of configuration options are not always the most
suitable values to include in sample config files - for example, rather than
including the IP address or hostname of the machine where the config file
was generated, you might want to include something like '10.0.0.1'. To
facilitate this, options can be supplied with a 'sample_default' attribute::
cfg.StrOpt('base_dir'
default=os.getcwd(),
sample_default='/usr/lib/myapp')
"""
import logging
import operator
import sys
import textwrap
from oslo.config import cfg
import stevedore.named # noqa
LOG = logging.getLogger(__name__)
_generator_opts = [
cfg.StrOpt('output-file',
help='Path of the file to write to. Defaults to stdout.'),
cfg.IntOpt('wrap-width',
default=70,
help='The maximum length of help lines.'),
cfg.MultiStrOpt('namespace',
help='Option namespace under "oslo.config.opts" in which '
'to query for options.'),
]
def register_cli_opts(conf):
"""Register the formatter's CLI options with a ConfigOpts instance.
Note, this must be done before the ConfigOpts instance is called to parse
the configuration.
:param conf: a ConfigOpts instance
:raises: DuplicateOptError, ArgsAlreadyParsedError
"""
conf.register_cli_opts(_generator_opts)
class _OptFormatter(object):
"""Format configuration option descriptions to a file."""
_TYPE_DESCRIPTIONS = {
cfg.StrOpt: 'string value',
cfg.BoolOpt: 'boolean value',
cfg.IntOpt: 'integer value',
cfg.FloatOpt: 'floating point value',
cfg.ListOpt: 'list value',
cfg.DictOpt: 'dict value',
cfg.MultiStrOpt: 'multi valued',
}
def __init__(self, output_file=None, wrap_width=70):
"""Construct an OptFormatter object.
:param output_file: a writeable file object
:param wrap_width: The maximum length of help lines, 0 to not wrap
"""
self.output_file = output_file or sys.stdout
self.wrap_width = wrap_width
def _format_help(self, help_text):
"""Format the help for a group or option to the output file.
:param help_text: The text of the help string
"""
if self.wrap_width is not None and self.wrap_width > 0:
lines = [textwrap.fill(help_text,
self.wrap_width,
initial_indent='# ',
subsequent_indent='# ') + '\n']
else:
lines = ['# ' + help_text + '\n']
return lines
def format(self, opt):
"""Format a description of an option to the output file.
:param opt: a cfg.Opt instance
"""
if not opt.help:
LOG.warning('"%s" is missing a help string', opt.dest)
opt_type = self._TYPE_DESCRIPTIONS.get(type(opt), 'unknown type')
help_text = u'%s(%s)' % (opt.help + ' ' if opt.help else '', opt_type)
lines = self._format_help(help_text)
for d in opt.deprecated_opts:
lines.append('# Deprecated group/name - [%s]/%s\n' %
(d.group or 'DEFAULT', d.name or opt.dest))
if isinstance(opt, cfg.MultiStrOpt):
if opt.sample_default is not None:
defaults = opt.sample_default
elif not opt.default:
defaults = ['']
else:
defaults = opt.default
else:
if opt.sample_default is not None:
default_str = str(opt.sample_default)
elif opt.default is None:
default_str = '<None>'
elif isinstance(opt, cfg.StrOpt):
default_str = opt.default
elif isinstance(opt, cfg.BoolOpt):
default_str = str(opt.default).lower()
elif (isinstance(opt, cfg.IntOpt) or
isinstance(opt, cfg.FloatOpt)):
default_str = str(opt.default)
elif isinstance(opt, cfg.ListOpt):
default_str = ','.join(opt.default)
elif isinstance(opt, cfg.DictOpt):
sorted_items = sorted(opt.default.items(),
key=operator.itemgetter(0))
default_str = ','.join(['%s:%s' % i for i in sorted_items])
else:
LOG.warning('Unknown option type: %s', repr(opt))
default_str = str(opt.default)
defaults = [default_str]
for default_str in defaults:
if default_str.strip() != default_str:
default_str = '"%s"' % default_str
if default_str:
default_str = ' ' + default_str
lines.append('#%s =%s\n' % (opt.dest, default_str))
self.writelines(lines)
def write(self, s):
"""Write an arbitrary string to the output file.
:param s: an arbitrary string
"""
self.output_file.write(s)
def writelines(self, l):
"""Write an arbitrary sequence of strings to the output file.
:param l: a list of arbitrary strings
"""
self.output_file.writelines(l)
def _list_opts(namespaces):
"""List the options available via the given namespaces.
:param namespaces: a list of namespaces registered under 'oslo.config.opts'
:returns: a list of (namespace, [(group, [opt_1, opt_2])]) tuples
"""
mgr = stevedore.named.NamedExtensionManager('oslo.config.opts',
names=namespaces,
invoke_on_load=True)
return [(ep.name, ep.obj) for ep in mgr]
def generate(conf):
"""Generate a sample config file.
List all of the options available via the namespaces specified in the given
configuration and write a description of them to the specified output file.
:param conf: a ConfigOpts instance containing the generator's configuration
"""
conf.register_opts(_generator_opts)
output_file = (open(conf.output_file, 'w')
if conf.output_file else sys.stdout)
formatter = _OptFormatter(output_file=output_file,
wrap_width=conf.wrap_width)
groups = {'DEFAULT': []}
for namespace, listing in _list_opts(conf.namespace):
for group, opts in listing:
if not opts:
continue
namespaces = groups.setdefault(group or 'DEFAULT', [])
namespaces.append((namespace,
dict((opt.dest, opt) for opt in opts)))
def _output_opts(f, group, namespaces):
f.write('[%s]\n' % group)
for (namespace, opts_by_dest) in sorted(namespaces,
key=operator.itemgetter(0)):
f.write('\n#\n# From %s\n#\n' % namespace)
for opt in sorted(opts_by_dest.values(),
key=operator.attrgetter('dest')):
f.write('\n')
f.format(opt)
_output_opts(formatter, 'DEFAULT', groups.pop('DEFAULT'))
for group, namespaces in sorted(groups.items(),
key=operator.itemgetter(0)):
formatter.write('\n\n')
_output_opts(formatter, group, namespaces)
def main(args=None):
"""The main function of oslo-config-generator."""
logging.basicConfig(level=logging.WARN)
conf = cfg.ConfigOpts()
register_cli_opts(conf)
conf(args)
generate(conf)
if __name__ == '__main__':
main()

127
oslo_config/iniparser.py Normal file
View File

@ -0,0 +1,127 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class ParseError(Exception):
def __init__(self, message, lineno, line):
self.msg = message
self.line = line
self.lineno = lineno
def __str__(self):
return 'at line %d, %s: %r' % (self.lineno, self.msg, self.line)
class BaseParser(object):
lineno = 0
parse_exc = ParseError
def _assignment(self, key, value):
self.assignment(key, value)
return None, []
def _get_section(self, line):
if not line.endswith(']'):
return self.error_no_section_end_bracket(line)
if len(line) <= 2:
return self.error_no_section_name(line)
return line[1:-1]
def _split_key_value(self, line):
colon = line.find(':')
equal = line.find('=')
if colon < 0 and equal < 0:
return self.error_invalid_assignment(line)
if colon < 0 or (equal >= 0 and equal < colon):
key, value = line[:equal], line[equal + 1:]
else:
key, value = line[:colon], line[colon + 1:]
value = value.strip()
if value and value[0] == value[-1] and value.startswith(("\"", "'")):
value = value[1:-1]
return key.strip(), [value]
def parse(self, lineiter):
key = None
value = []
for line in lineiter:
self.lineno += 1
line = line.rstrip()
if not line:
# Blank line, ends multi-line values
if key:
key, value = self._assignment(key, value)
continue
elif line.startswith((' ', '\t')):
# Continuation of previous assignment
if key is None:
self.error_unexpected_continuation(line)
else:
value.append(line.lstrip())
continue
if key:
# Flush previous assignment, if any
key, value = self._assignment(key, value)
if line.startswith('['):
# Section start
section = self._get_section(line)
if section:
self.new_section(section)
elif line.startswith(('#', ';')):
self.comment(line[1:].lstrip())
else:
key, value = self._split_key_value(line)
if not key:
return self.error_empty_key(line)
if key:
# Flush previous assignment, if any
self._assignment(key, value)
def assignment(self, key, value):
"""Called when a full assignment is parsed."""
raise NotImplementedError()
def new_section(self, section):
"""Called when a new section is started."""
raise NotImplementedError()
def comment(self, comment):
"""Called when a comment is parsed."""
pass
def error_invalid_assignment(self, line):
raise self.parse_exc("No ':' or '=' found in assignment",
self.lineno, line)
def error_empty_key(self, line):
raise self.parse_exc('Key cannot be empty', self.lineno, line)
def error_unexpected_continuation(self, line):
raise self.parse_exc('Unexpected continuation line',
self.lineno, line)
def error_no_section_end_bracket(self, line):
raise self.parse_exc('Invalid section (must end with ])',
self.lineno, line)
def error_no_section_name(self, line):
raise self.parse_exc('Empty section name', self.lineno, line)

View File

File diff suppressed because it is too large Load Diff

View File

@ -0,0 +1,280 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base as test_base
from oslo_config import cfg
from oslo_config import cfgfilter
class BaseTestCase(test_base.BaseTestCase):
def setUp(self, conf=None):
super(BaseTestCase, self).setUp()
if conf is None:
self.conf = cfg.ConfigOpts()
else:
self.conf = conf
self.fconf = cfgfilter.ConfigFilter(self.conf)
class RegisterTestCase(BaseTestCase):
def test_register_opt_default(self):
self.fconf.register_opt(cfg.StrOpt('foo', default='bar'))
self.assertEqual('bar', self.fconf.foo)
self.assertEqual('bar', self.fconf['foo'])
self.assertIn('foo', self.fconf)
self.assertEqual(['foo'], list(self.fconf))
self.assertEqual(1, len(self.fconf))
self.assertNotIn('foo', self.conf)
self.assertEqual(0, len(self.conf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.conf, 'foo')
def test_register_opt_none_default(self):
self.fconf.register_opt(cfg.StrOpt('foo'))
self.assertIsNone(self.fconf.foo)
self.assertIsNone(self.fconf['foo'])
self.assertIn('foo', self.fconf)
self.assertEqual(['foo'], list(self.fconf))
self.assertEqual(1, len(self.fconf))
self.assertNotIn('foo', self.conf)
self.assertEqual(0, len(self.conf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.conf, 'foo')
def test_register_grouped_opt_default(self):
self.fconf.register_opt(cfg.StrOpt('foo', default='bar'),
group='blaa')
self.assertEqual('bar', self.fconf.blaa.foo)
self.assertEqual('bar', self.fconf['blaa']['foo'])
self.assertIn('blaa', self.fconf)
self.assertIn('foo', self.fconf.blaa)
self.assertEqual(['blaa'], list(self.fconf))
self.assertEqual(['foo'], list(self.fconf.blaa))
self.assertEqual(1, len(self.fconf))
self.assertEqual(1, len(self.fconf.blaa))
self.assertNotIn('blaa', self.conf)
self.assertEqual(0, len(self.conf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.conf, 'blaa')
def test_register_grouped_opt_none_default(self):
self.fconf.register_opt(cfg.StrOpt('foo'), group='blaa')
self.assertIsNone(self.fconf.blaa.foo)
self.assertIsNone(self.fconf['blaa']['foo'])
self.assertIn('blaa', self.fconf)
self.assertIn('foo', self.fconf.blaa)
self.assertEqual(['blaa'], list(self.fconf))
self.assertEqual(['foo'], list(self.fconf.blaa))
self.assertEqual(1, len(self.fconf))
self.assertEqual(1, len(self.fconf.blaa))
self.assertNotIn('blaa', self.conf)
self.assertEqual(0, len(self.conf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.conf, 'blaa')
def test_register_group(self):
group = cfg.OptGroup('blaa')
self.fconf.register_group(group)
self.fconf.register_opt(cfg.StrOpt('foo'), group=group)
self.assertIsNone(self.fconf.blaa.foo)
self.assertIsNone(self.fconf['blaa']['foo'])
self.assertIn('blaa', self.fconf)
self.assertIn('foo', self.fconf.blaa)
self.assertEqual(['blaa'], list(self.fconf))
self.assertEqual(['foo'], list(self.fconf.blaa))
self.assertEqual(1, len(self.fconf))
self.assertEqual(1, len(self.fconf.blaa))
self.assertNotIn('blaa', self.conf)
self.assertEqual(0, len(self.conf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.conf, 'blaa')
def test_register_opts(self):
self.fconf.register_opts([cfg.StrOpt('foo'),
cfg.StrOpt('bar')])
self.assertIn('foo', self.fconf)
self.assertIn('bar', self.fconf)
self.assertNotIn('foo', self.conf)
self.assertNotIn('bar', self.conf)
def test_register_cli_opt(self):
self.fconf.register_cli_opt(cfg.StrOpt('foo'))
self.assertIn('foo', self.fconf)
self.assertNotIn('foo', self.conf)
def test_register_cli_opts(self):
self.fconf.register_cli_opts([cfg.StrOpt('foo'), cfg.StrOpt('bar')])
self.assertIn('foo', self.fconf)
self.assertIn('bar', self.fconf)
self.assertNotIn('foo', self.conf)
self.assertNotIn('bar', self.conf)
def test_register_opts_grouped(self):
self.fconf.register_opts([cfg.StrOpt('foo'), cfg.StrOpt('bar')],
group='blaa')
self.assertIn('foo', self.fconf.blaa)
self.assertIn('bar', self.fconf.blaa)
self.assertNotIn('blaa', self.conf)
def test_register_cli_opt_grouped(self):
self.fconf.register_cli_opt(cfg.StrOpt('foo'), group='blaa')
self.assertIn('foo', self.fconf.blaa)
self.assertNotIn('blaa', self.conf)
def test_register_cli_opts_grouped(self):
self.fconf.register_cli_opts([cfg.StrOpt('foo'), cfg.StrOpt('bar')],
group='blaa')
self.assertIn('foo', self.fconf.blaa)
self.assertIn('bar', self.fconf.blaa)
self.assertNotIn('blaa', self.conf)
def test_unknown_opt(self):
self.assertNotIn('foo', self.fconf)
self.assertEqual(0, len(self.fconf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.fconf, 'foo')
self.assertNotIn('blaa', self.conf)
def test_blocked_opt(self):
self.conf.register_opt(cfg.StrOpt('foo'))
self.assertIn('foo', self.conf)
self.assertEqual(1, len(self.conf))
self.assertIsNone(self.conf.foo)
self.assertNotIn('foo', self.fconf)
self.assertEqual(0, len(self.fconf))
self.assertRaises(cfg.NoSuchOptError, getattr, self.fconf, 'foo')
def test_already_registered_opt(self):
self.conf.register_opt(cfg.StrOpt('foo'))
self.fconf.register_opt(cfg.StrOpt('foo'))
self.assertIn('foo', self.conf)
self.assertEqual(1, len(self.conf))
self.assertIsNone(self.conf.foo)
self.assertIn('foo', self.fconf)
self.assertEqual(1, len(self.fconf))
self.assertIsNone(self.fconf.foo)
self.conf.set_override('foo', 'bar')
self.assertEqual('bar', self.conf.foo)
self.assertEqual('bar', self.fconf.foo)
def test_already_registered_opts(self):
self.conf.register_opts([cfg.StrOpt('foo'),
cfg.StrOpt('fu')])
self.fconf.register_opts([cfg.StrOpt('foo'),
cfg.StrOpt('bu')])
self.assertIn('foo', self.conf)
self.assertIn('fu', self.conf)
self.assertNotIn('bu', self.conf)
self.assertEqual(2, len(self.conf))
self.assertIsNone(self.conf.foo)
self.assertIsNone(self.conf.fu)
self.assertIn('foo', self.fconf)
self.assertIn('bu', self.fconf)
self.assertNotIn('fu', self.fconf)
self.assertEqual(2, len(self.fconf))
self.assertIsNone(self.fconf.foo)
self.assertIsNone(self.fconf.bu)
self.conf.set_override('foo', 'bar')
self.assertEqual('bar', self.conf.foo)
self.assertEqual('bar', self.fconf.foo)
def test_already_registered_cli_opt(self):
self.conf.register_cli_opt(cfg.StrOpt('foo'))
self.fconf.register_cli_opt(cfg.StrOpt('foo'))
self.assertIn('foo', self.conf)
self.assertEqual(1, len(self.conf))
self.assertIsNone(self.conf.foo)
self.assertIn('foo', self.fconf)
self.assertEqual(1, len(self.fconf))
self.assertIsNone(self.fconf.foo)
self.conf.set_override('foo', 'bar')
self.assertEqual('bar', self.conf.foo)
self.assertEqual('bar', self.fconf.foo)
def test_already_registered_cli_opts(self):
self.conf.register_cli_opts([cfg.StrOpt('foo'),
cfg.StrOpt('fu')])
self.fconf.register_cli_opts([cfg.StrOpt('foo'),
cfg.StrOpt('bu')])
self.assertIn('foo', self.conf)
self.assertIn('fu', self.conf)
self.assertNotIn('bu', self.conf)
self.assertEqual(2, len(self.conf))
self.assertIsNone(self.conf.foo)
self.assertIsNone(self.conf.fu)
self.assertIn('foo', self.fconf)
self.assertIn('bu', self.fconf)
self.assertNotIn('fu', self.fconf)
self.assertEqual(2, len(self.fconf))
self.assertIsNone(self.fconf.foo)
self.assertIsNone(self.fconf.bu)
self.conf.set_override('foo', 'bar')
self.assertEqual('bar', self.conf.foo)
self.assertEqual('bar', self.fconf.foo)
class ImportTestCase(BaseTestCase):
def setUp(self):
super(ImportTestCase, self).setUp(cfg.CONF)
def test_import_opt(self):
self.assertFalse(hasattr(self.conf, 'fblaa'))
self.conf.import_opt('fblaa', 'tests.testmods.fblaa_opt')
self.assertTrue(hasattr(self.conf, 'fblaa'))
self.assertFalse(hasattr(self.fconf, 'fblaa'))
self.fconf.import_opt('fblaa', 'tests.testmods.fblaa_opt')
self.assertTrue(hasattr(self.fconf, 'fblaa'))
def test_import_opt_in_group(self):
self.assertFalse(hasattr(self.conf, 'fbar'))
self.conf.import_opt('foo', 'tests.testmods.fbar_foo_opt',
group='fbar')
self.assertTrue(hasattr(self.conf, 'fbar'))
self.assertTrue(hasattr(self.conf.fbar, 'foo'))
self.assertFalse(hasattr(self.fconf, 'fbar'))
self.fconf.import_opt('foo', 'tests.testmods.fbar_foo_opt',
group='fbar')
self.assertTrue(hasattr(self.fconf, 'fbar'))
self.assertTrue(hasattr(self.fconf.fbar, 'foo'))
def test_import_group(self):
self.assertFalse(hasattr(self.conf, 'fbaar'))
self.conf.import_group('fbaar', 'tests.testmods.fbaar_baa_opt')
self.assertTrue(hasattr(self.conf, 'fbaar'))
self.assertTrue(hasattr(self.conf.fbaar, 'baa'))
self.assertFalse(hasattr(self.fconf, 'fbaar'))
self.fconf.import_group('fbaar', 'tests.testmods.fbaar_baa_opt')
self.assertTrue(hasattr(self.fconf, 'fbaar'))
self.assertTrue(hasattr(self.fconf.fbaar, 'baa'))

View File

@ -0,0 +1,87 @@
#
# Copyright 2013 Mirantis, Inc.
# Copyright 2013 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslotest import base
from oslo_config import cfg
from oslo_config import fixture as config
conf = cfg.CONF
class ConfigTestCase(base.BaseTestCase):
def setUp(self):
super(ConfigTestCase, self).setUp()
self.config_fixture = self.useFixture(config.Config(conf))
self.config = self.config_fixture.config
self.config_fixture.register_opt(cfg.StrOpt(
'testing_option', default='initial_value'))
def test_overridden_value(self):
self.assertEqual(conf.get('testing_option'), 'initial_value')
self.config(testing_option='changed_value')
self.assertEqual(conf.get('testing_option'),
self.config_fixture.conf.get('testing_option'))
def test_cleanup(self):
self.config(testing_option='changed_value')
self.assertEqual(self.config_fixture.conf.get('testing_option'),
'changed_value')
self.config_fixture.conf.reset()
self.assertEqual(conf.get('testing_option'), 'initial_value')
def test_register_option(self):
opt = cfg.StrOpt('new_test_opt', default='initial_value')
self.config_fixture.register_opt(opt)
self.assertEqual(conf.get('new_test_opt'),
opt.default)
def test_register_options(self):
opt1 = cfg.StrOpt('first_test_opt', default='initial_value_1')
opt2 = cfg.StrOpt('second_test_opt', default='initial_value_2')
self.config_fixture.register_opts([opt1, opt2])
self.assertEqual(conf.get('first_test_opt'), opt1.default)
self.assertEqual(conf.get('second_test_opt'), opt2.default)
def test_cleanup_unregister_option(self):
opt = cfg.StrOpt('new_test_opt', default='initial_value')
self.config_fixture.register_opt(opt)
self.assertEqual(conf.get('new_test_opt'),
opt.default)
self.config_fixture.cleanUp()
self.assertRaises(cfg.NoSuchOptError, conf.get, 'new_test_opt')
def test_register_cli_option(self):
opt = cfg.StrOpt('new_test_opt', default='initial_value')
self.config_fixture.register_cli_opt(opt)
self.assertEqual(conf.get('new_test_opt'),
opt.default)
def test_register_cli_options(self):
opt1 = cfg.StrOpt('first_test_opt', default='initial_value_1')
opt2 = cfg.StrOpt('second_test_opt', default='initial_value_2')
self.config_fixture.register_cli_opts([opt1, opt2])
self.assertEqual(conf.get('first_test_opt'), opt1.default)
self.assertEqual(conf.get('second_test_opt'), opt2.default)
def test_cleanup_unregister_cli_option(self):
opt = cfg.StrOpt('new_test_opt', default='initial_value')
self.config_fixture.register_cli_opt(opt)
self.assertEqual(conf.get('new_test_opt'),
opt.default)
self.config_fixture.cleanUp()
self.assertRaises(cfg.NoSuchOptError, conf.get, 'new_test_opt')

View File

@ -0,0 +1,539 @@
# Copyright 2014 Red Hat, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sys
import fixtures
import mock
from oslotest import base
from six import moves
import testscenarios
from oslo_config import cfg
from oslo_config import fixture as config_fixture
from oslo_config import generator
load_tests = testscenarios.load_tests_apply_scenarios
class GeneratorTestCase(base.BaseTestCase):
opts = {
'foo': cfg.StrOpt('foo', help='foo option'),
'bar': cfg.StrOpt('bar', help='bar option'),
'foo-bar': cfg.StrOpt('foo-bar', help='foobar'),
'no_help': cfg.StrOpt('no_help'),
'long_help': cfg.StrOpt('long_help',
help='Lorem ipsum dolor sit amet, consectetur '
'adipisicing elit, sed do eiusmod tempor '
'incididunt ut labore et dolore magna '
'aliqua. Ut enim ad minim veniam, quis '
'nostrud exercitation ullamco laboris '
'nisi ut aliquip ex ea commodo '
'consequat. Duis aute irure dolor in '
'reprehenderit in voluptate velit esse '
'cillum dolore eu fugiat nulla '
'pariatur. Excepteur sint occaecat '
'cupidatat non proident, sunt in culpa '
'qui officia deserunt mollit anim id est '
'laborum.'),
'deprecated_opt': cfg.StrOpt('bar',
deprecated_name='foobar',
help='deprecated'),
'deprecated_group': cfg.StrOpt('bar',
deprecated_group='group1',
deprecated_name='foobar',
help='deprecated'),
# Unknown Opt default must be a string
'unknown_type': cfg.Opt('unknown_opt',
default='123',
help='unknown'),
'str_opt': cfg.StrOpt('str_opt',
default='foo bar',
help='a string'),
'str_opt_sample_default': cfg.StrOpt('str_opt',
default='fooishbar',
help='a string'),
'str_opt_with_space': cfg.StrOpt('str_opt',
default=' foo bar ',
help='a string with spaces'),
'bool_opt': cfg.BoolOpt('bool_opt',
default=False,
help='a boolean'),
'int_opt': cfg.IntOpt('int_opt',
default=10,
help='an integer'),
'float_opt': cfg.FloatOpt('float_opt',
default=0.1,
help='a float'),
'list_opt': cfg.ListOpt('list_opt',
default=['1', '2', '3'],
help='a list'),
'dict_opt': cfg.DictOpt('dict_opt',
default={'1': 'yes', '2': 'no'},
help='a dict'),
'multi_opt': cfg.MultiStrOpt('multi_opt',
default=['1', '2', '3'],
help='multiple strings'),
'multi_opt_none': cfg.MultiStrOpt('multi_opt_none',
help='multiple strings'),
'multi_opt_empty': cfg.MultiStrOpt('multi_opt_empty',
default=[],
help='multiple strings'),
'multi_opt_sample_default': cfg.MultiStrOpt('multi_opt',
default=['1', '2', '3'],
sample_default=['5', '6'],
help='multiple strings'),
}
content_scenarios = [
('empty',
dict(opts=[], expected='''[DEFAULT]
''')),
('single_namespace',
dict(opts=[('test', [(None, [opts['foo']])])],
expected='''[DEFAULT]
#
# From test
#
# foo option (string value)
#foo = <None>
''')),
('multiple_namespaces',
dict(opts=[('test', [(None, [opts['foo']])]),
('other', [(None, [opts['bar']])])],
expected='''[DEFAULT]
#
# From other
#
# bar option (string value)
#bar = <None>
#
# From test
#
# foo option (string value)
#foo = <None>
''')),
('group',
dict(opts=[('test', [('group1', [opts['foo']])])],
expected='''[DEFAULT]
[group1]
#
# From test
#
# foo option (string value)
#foo = <None>
''')),
('empty_group',
dict(opts=[('test', [('group1', [])])],
expected='''[DEFAULT]
''')),
('multiple_groups',
dict(opts=[('test', [('group1', [opts['foo']]),
('group2', [opts['bar']])])],
expected='''[DEFAULT]
[group1]
#
# From test
#
# foo option (string value)
#foo = <None>
[group2]
#
# From test
#
# bar option (string value)
#bar = <None>
''')),
('group_in_multiple_namespaces',
dict(opts=[('test', [('group1', [opts['foo']])]),
('other', [('group1', [opts['bar']])])],
expected='''[DEFAULT]
[group1]
#
# From other
#
# bar option (string value)
#bar = <None>
#
# From test
#
# foo option (string value)
#foo = <None>
''')),
('hyphenated_name',
dict(opts=[('test', [(None, [opts['foo-bar']])])],
expected='''[DEFAULT]
#
# From test
#
# foobar (string value)
#foo_bar = <None>
''')),
('no_help',
dict(opts=[('test', [(None, [opts['no_help']])])],
log_warning=('"%s" is missing a help string', 'no_help'),
expected='''[DEFAULT]
#
# From test
#
# (string value)
#no_help = <None>
''')),
('long_help',
dict(opts=[('test', [(None, [opts['long_help']])])],
expected='''[DEFAULT]
#
# From test
#
# Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do
# eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim
# ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut
# aliquip ex ea commodo consequat. Duis aute irure dolor in
# reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla
# pariatur. Excepteur sint occaecat cupidatat non proident, sunt in
# culpa qui officia deserunt mollit anim id est laborum. (string
# value)
#long_help = <None>
''')),
('long_help_wrap_at_40',
dict(opts=[('test', [(None, [opts['long_help']])])],
wrap_width=40,
expected='''[DEFAULT]
#
# From test
#
# Lorem ipsum dolor sit amet,
# consectetur adipisicing elit, sed do
# eiusmod tempor incididunt ut labore et
# dolore magna aliqua. Ut enim ad minim
# veniam, quis nostrud exercitation
# ullamco laboris nisi ut aliquip ex ea
# commodo consequat. Duis aute irure
# dolor in reprehenderit in voluptate
# velit esse cillum dolore eu fugiat
# nulla pariatur. Excepteur sint
# occaecat cupidatat non proident, sunt
# in culpa qui officia deserunt mollit
# anim id est laborum. (string value)
#long_help = <None>
''')),
('long_help_no_wrapping',
dict(opts=[('test', [(None, [opts['long_help']])])],
wrap_width=0,
expected='''[DEFAULT]
#
# From test
#
''' # noqa
'# Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod '
'tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, '
'quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo '
'consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse '
'cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat '
'non proident, sunt in culpa qui officia deserunt mollit anim id est laborum. '
'(string value)'
'''
#long_help = <None>
''')),
('deprecated',
dict(opts=[('test', [('foo', [opts['deprecated_opt']])])],
expected='''[DEFAULT]
[foo]
#
# From test
#
# deprecated (string value)
# Deprecated group/name - [DEFAULT]/foobar
#bar = <None>
''')),
('deprecated_group',
dict(opts=[('test', [('foo', [opts['deprecated_group']])])],
expected='''[DEFAULT]
[foo]
#
# From test
#
# deprecated (string value)
# Deprecated group/name - [group1]/foobar
#bar = <None>
''')),
('unknown_type',
dict(opts=[('test', [(None, [opts['unknown_type']])])],
log_warning=('Unknown option type: %s',
repr(opts['unknown_type'])),
expected='''[DEFAULT]
#
# From test
#
# unknown (unknown type)
#unknown_opt = 123
''')),
('str_opt',
dict(opts=[('test', [(None, [opts['str_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# a string (string value)
#str_opt = foo bar
''')),
('str_opt_with_space',
dict(opts=[('test', [(None, [opts['str_opt_with_space']])])],
expected='''[DEFAULT]
#
# From test
#
# a string with spaces (string value)
#str_opt = " foo bar "
''')),
('bool_opt',
dict(opts=[('test', [(None, [opts['bool_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# a boolean (boolean value)
#bool_opt = false
''')),
('int_opt',
dict(opts=[('test', [(None, [opts['int_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# an integer (integer value)
#int_opt = 10
''')),
('float_opt',
dict(opts=[('test', [(None, [opts['float_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# a float (floating point value)
#float_opt = 0.1
''')),
('list_opt',
dict(opts=[('test', [(None, [opts['list_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# a list (list value)
#list_opt = 1,2,3
''')),
('dict_opt',
dict(opts=[('test', [(None, [opts['dict_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# a dict (dict value)
#dict_opt = 1:yes,2:no
''')),
('multi_opt',
dict(opts=[('test', [(None, [opts['multi_opt']])])],
expected='''[DEFAULT]
#
# From test
#
# multiple strings (multi valued)
#multi_opt = 1
#multi_opt = 2
#multi_opt = 3
''')),
('multi_opt_none',
dict(opts=[('test', [(None, [opts['multi_opt_none']])])],
expected='''[DEFAULT]
#
# From test
#
# multiple strings (multi valued)
#multi_opt_none =
''')),
('multi_opt_empty',
dict(opts=[('test', [(None, [opts['multi_opt_empty']])])],
expected='''[DEFAULT]
#
# From test
#
# multiple strings (multi valued)
#multi_opt_empty =
''')),
('str_opt_sample_default',
dict(opts=[('test', [(None, [opts['str_opt_sample_default']])])],
expected='''[DEFAULT]
#
# From test
#
# a string (string value)
#str_opt = fooishbar
''')),
('multi_opt_sample_default',
dict(opts=[('test', [(None, [opts['multi_opt_sample_default']])])],
expected='''[DEFAULT]
#
# From test
#
# multiple strings (multi valued)
#multi_opt = 5
#multi_opt = 6
''')),
]
output_file_scenarios = [
('stdout',
dict(stdout=True, output_file=None)),
('output_file',
dict(output_file='sample.conf', stdout=False)),
]
@classmethod
def generate_scenarios(cls):
cls.scenarios = testscenarios.multiply_scenarios(
cls.content_scenarios,
cls.output_file_scenarios)
def setUp(self):
super(GeneratorTestCase, self).setUp()
self.conf = cfg.ConfigOpts()
self.config_fixture = config_fixture.Config(self.conf)
self.config = self.config_fixture.config
self.useFixture(self.config_fixture)
self.tempdir = self.useFixture(fixtures.TempDir())
def _capture_stream(self, stream_name):
self.useFixture(fixtures.MonkeyPatch("sys.%s" % stream_name,
moves.StringIO()))
return getattr(sys, stream_name)
def _capture_stdout(self):
return self._capture_stream('stdout')
@mock.patch('stevedore.named.NamedExtensionManager')
@mock.patch.object(generator, 'LOG')
def test_generate(self, mock_log, named_mgr):
generator.register_cli_opts(self.conf)
namespaces = [i[0] for i in self.opts]
self.config(namespace=namespaces)
wrap_width = getattr(self, 'wrap_width', None)
if wrap_width is not None:
self.config(wrap_width=wrap_width)
if self.stdout:
stdout = self._capture_stdout()
else:
output_file = self.tempdir.join(self.output_file)
self.config(output_file=output_file)
mock_eps = []
for name, opts in self.opts:
mock_ep = mock.Mock()
mock_ep.configure_mock(name=name, obj=opts)
mock_eps.append(mock_ep)
named_mgr.return_value = mock_eps
generator.generate(self.conf)
if self.stdout:
self.assertEqual(self.expected, stdout.getvalue())
else:
content = open(output_file).read()
self.assertEqual(self.expected, content)
named_mgr.assert_called_once_with('oslo.config.opts',
names=namespaces,
invoke_on_load=True)
log_warning = getattr(self, 'log_warning', None)
if log_warning is not None:
mock_log.warning.assert_called_once_with(*log_warning)
else:
self.assertFalse(mock_log.warning.called)
GeneratorTestCase.generate_scenarios()

View File

@ -0,0 +1,124 @@
# Copyright 2012 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from oslo_config import iniparser
class TestParser(iniparser.BaseParser):
comment_called = False
values = None
section = ''
def __init__(self):
self.values = {}
def assignment(self, key, value):
self.values.setdefault(self.section, {})
self.values[self.section][key] = value
def new_section(self, section):
self.section = section
def comment(self, section):
self.comment_called = True
class BaseParserTestCase(unittest.TestCase):
def setUp(self):
self.parser = iniparser.BaseParser()
def _assertParseError(self, *lines):
self.assertRaises(iniparser.ParseError, self.parser.parse, lines)
def test_invalid_assignment(self):
self._assertParseError("foo - bar")
def test_empty_key(self):
self._assertParseError(": bar")
def test_unexpected_continuation(self):
self._assertParseError(" baz")
def test_invalid_section(self):
self._assertParseError("[section")
def test_no_section_name(self):
self._assertParseError("[]")
class ParserTestCase(unittest.TestCase):
def setUp(self):
self.parser = TestParser()
def test_blank_line(self):
lines = [""]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {})
def test_assignment_equal(self):
lines = ["foo = bar"]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'': {'foo': ['bar']}})
def test_assignment_colon(self):
lines = ["foo: bar"]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'': {'foo': ['bar']}})
def test_assignment_multiline(self):
lines = ["foo = bar0", " bar1"]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'': {'foo': ['bar0', 'bar1']}})
def test_assignment_multline_empty(self):
lines = ["foo = bar0", "", " bar1"]
self.assertRaises(iniparser.ParseError, self.parser.parse, lines)
def test_section_assignment(self):
lines = ["[test]", "foo = bar"]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'test': {'foo': ['bar']}})
def test_new_section(self):
lines = ["[foo]"]
self.parser.parse(lines)
self.assertEqual(self.parser.section, 'foo')
def test_comment(self):
lines = ["# foobar"]
self.parser.parse(lines)
self.assertTrue(self.parser.comment_called)
def test_empty_assignment(self):
lines = ["foo = "]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'': {'foo': ['']}})
def test_assignment_space_single_quote(self):
lines = ["foo = ' bar '"]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'': {'foo': [' bar ']}})
def test_assignment_space_double_quote(self):
lines = ["foo = \" bar \""]
self.parser.parse(lines)
self.assertEqual(self.parser.values, {'': {'foo': [' bar ']}})
class ExceptionTestCase(unittest.TestCase):
def test_parseerror(self):
exc = iniparser.ParseError('test', 42, 'example')
self.assertEqual(str(exc), "at line 42, test: 'example'")

View File

@ -0,0 +1,411 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import unittest
from oslo_config import types
class TypeTestHelper(object):
def setUp(self):
super(TypeTestHelper, self).setUp()
self.type_instance = self.type
def assertConvertedValue(self, s, expected):
self.assertEqual(expected, self.type_instance(s))
def assertInvalid(self, value):
self.assertRaises(ValueError, self.type_instance, value)
class StringTypeTests(TypeTestHelper, unittest.TestCase):
type = types.String()
def test_empty_string_passes(self):
self.assertConvertedValue('', '')
def test_should_return_same_string_if_valid(self):
self.assertConvertedValue('foo bar', 'foo bar')
def test_listed_value(self):
self.type_instance = types.String(choices=['foo', 'bar'])
self.assertConvertedValue('foo', 'foo')
def test_unlisted_value(self):
self.type_instance = types.String(choices=['foo', 'bar'])
self.assertInvalid('baz')
def test_with_no_values_returns_error(self):
self.type_instance = types.String(choices=[])
self.assertInvalid('foo')
def test_string_with_non_closed_quote_is_invalid(self):
self.type_instance = types.String(quotes=True)
self.assertInvalid('"foo bar')
self.assertInvalid("'bar baz")
def test_quotes_are_stripped(self):
self.type_instance = types.String(quotes=True)
self.assertConvertedValue('"foo bar"', 'foo bar')
def test_trailing_quote_is_ok(self):
self.type_instance = types.String(quotes=True)
self.assertConvertedValue('foo bar"', 'foo bar"')
def test_repr(self):
t = types.String()
self.assertEqual('String', repr(t))
def test_repr_with_choices(self):
t = types.String(choices=['foo', 'bar'])
self.assertEqual('String(choices=[\'foo\', \'bar\'])', repr(t))
def test_equal(self):
self.assertTrue(types.String() == types.String())
def test_equal_with_same_choices(self):
t1 = types.String(choices=['foo', 'bar'])
t2 = types.String(choices=['foo', 'bar'])
self.assertTrue(t1 == t2)
def test_not_equal_with_different_choices(self):
t1 = types.String(choices=['foo', 'bar'])
t2 = types.String(choices=['foo', 'baz'])
self.assertFalse(t1 == t2)
def test_equal_with_equal_quote_falgs(self):
t1 = types.String(quotes=True)
t2 = types.String(quotes=True)
self.assertTrue(t1 == t2)
def test_not_equal_with_different_quote_falgs(self):
t1 = types.String(quotes=False)
t2 = types.String(quotes=True)
self.assertFalse(t1 == t2)
def test_not_equal_to_other_class(self):
self.assertFalse(types.String() == types.Integer())
class BooleanTypeTests(TypeTestHelper, unittest.TestCase):
type = types.Boolean()
def test_True(self):
self.assertConvertedValue('True', True)
def test_yes(self):
self.assertConvertedValue('yes', True)
def test_on(self):
self.assertConvertedValue('on', True)
def test_1(self):
self.assertConvertedValue('1', True)
def test_False(self):
self.assertConvertedValue('False', False)
def test_no(self):
self.assertConvertedValue('no', False)
def test_off(self):
self.assertConvertedValue('off', False)
def test_0(self):
self.assertConvertedValue('0', False)
def test_other_values_produce_error(self):
self.assertInvalid('foo')
def test_repr(self):
self.assertEqual('Boolean', repr(types.Boolean()))
def test_equal(self):
self.assertEqual(types.Boolean(), types.Boolean())
def test_not_equal_to_other_class(self):
self.assertFalse(types.Boolean() == types.String())
class IntegerTypeTests(TypeTestHelper, unittest.TestCase):
type = types.Integer()
def test_empty_string(self):
self.assertConvertedValue('', None)
def test_whitespace_string(self):
self.assertConvertedValue(" \t\t\t\t", None)
def test_positive_values_are_valid(self):
self.assertConvertedValue('123', 123)
def test_zero_is_valid(self):
self.assertConvertedValue('0', 0)
def test_negative_values_are_valid(self):
self.assertConvertedValue('-123', -123)
def test_leading_whitespace_is_ignored(self):
self.assertConvertedValue(' 5', 5)
def test_trailing_whitespace_is_ignored(self):
self.assertConvertedValue('7 ', 7)
def test_non_digits_are_invalid(self):
self.assertInvalid('12a45')
def test_repr(self):
t = types.Integer()
self.assertEqual('Integer', repr(t))
def test_repr_with_min(self):
t = types.Integer(min=123)
self.assertEqual('Integer(min=123)', repr(t))
def test_repr_with_max(self):
t = types.Integer(max=456)
self.assertEqual('Integer(max=456)', repr(t))
def test_repr_with_min_and_max(self):
t = types.Integer(min=123, max=456)
self.assertEqual('Integer(min=123, max=456)', repr(t))
def test_equal(self):
self.assertTrue(types.Integer() == types.Integer())
def test_equal_with_same_min_and_no_max(self):
self.assertTrue(types.Integer(min=123) == types.Integer(min=123))
def test_equal_with_same_max_and_no_min(self):
self.assertTrue(types.Integer(max=123) == types.Integer(max=123))
def test_equal_with_same_min_and_max(self):
t1 = types.Integer(min=1, max=123)
t2 = types.Integer(min=1, max=123)
self.assertTrue(t1 == t2)
def test_not_equal(self):
self.assertFalse(types.Integer(min=123) == types.Integer(min=456))
def test_not_equal_to_other_class(self):
self.assertFalse(types.Integer() == types.String())
def test_with_max_and_min(self):
t = types.Integer(min=123, max=456)
self.assertRaises(ValueError, t, 122)
t(123)
t(300)
t(456)
self.assertRaises(ValueError, t, 457)
class FloatTypeTests(TypeTestHelper, unittest.TestCase):
type = types.Float()
def test_decimal_format(self):
v = self.type_instance('123.456')
self.assertAlmostEqual(v, 123.456)
def test_decimal_format_negative_float(self):
v = self.type_instance('-123.456')
self.assertAlmostEqual(v, -123.456)
def test_exponential_format(self):
v = self.type_instance('123e-2')
self.assertAlmostEqual(v, 1.23)
def test_non_float_is_invalid(self):
self.assertInvalid('123,345')
self.assertInvalid('foo')
def test_repr(self):
self.assertEqual('Float', repr(types.Float()))
def test_equal(self):
self.assertTrue(types.Float() == types.Float())
def test_not_equal_to_other_class(self):
self.assertFalse(types.Float() == types.Integer())
class ListTypeTests(TypeTestHelper, unittest.TestCase):
type = types.List()
def test_empty_value(self):
self.assertConvertedValue('', [])
def test_single_value(self):
self.assertConvertedValue(' foo bar ',
['foo bar'])
def test_list_of_values(self):
self.assertConvertedValue(' foo bar, baz ',
['foo bar',
'baz'])
def test_list_of_values_containing_commas(self):
self.type_instance = types.List(types.String(quotes=True))
self.assertConvertedValue('foo,"bar, baz",bam',
['foo',
'bar, baz',
'bam'])
def test_list_of_lists(self):
self.type_instance = types.List(
types.List(types.String(), bounds=True)
)
self.assertConvertedValue('[foo],[bar, baz],[bam]',
[['foo'], ['bar', 'baz'], ['bam']])
def test_list_of_custom_type(self):
self.type_instance = types.List(types.Integer())
self.assertConvertedValue('1,2,3,5',
[1, 2, 3, 5])
def test_bounds_parsing(self):
self.type_instance = types.List(types.Integer(), bounds=True)
self.assertConvertedValue('[1,2,3]', [1, 2, 3])
def test_bounds_required(self):
self.type_instance = types.List(types.Integer(), bounds=True)
self.assertInvalid('1,2,3')
self.assertInvalid('[1,2,3')
self.assertInvalid('1,2,3]')
def test_repr(self):
t = types.List(types.Integer())
self.assertEqual('List of Integer', repr(t))
def test_equal(self):
self.assertTrue(types.List() == types.List())
def test_equal_with_equal_custom_item_types(self):
it1 = types.Integer()
it2 = types.Integer()
self.assertTrue(types.List(it1) == types.List(it2))
def test_not_equal_with_non_equal_custom_item_types(self):
it1 = types.Integer()
it2 = types.String()
self.assertFalse(it1 == it2)
self.assertFalse(types.List(it1) == types.List(it2))
def test_not_equal_to_other_class(self):
self.assertFalse(types.List() == types.Integer())
class DictTypeTests(TypeTestHelper, unittest.TestCase):
type = types.Dict()
def test_empty_value(self):
self.assertConvertedValue('', {})
def test_single_value(self):
self.assertConvertedValue(' foo: bar ',
{'foo': 'bar'})
def test_dict_of_values(self):
self.assertConvertedValue(' foo: bar, baz: 123 ',
{'foo': 'bar',
'baz': '123'})
def test_custom_value_type(self):
self.type_instance = types.Dict(types.Integer())
self.assertConvertedValue('foo:123, bar: 456',
{'foo': 123,
'bar': 456})
def test_dict_of_values_containing_commas(self):
self.type_instance = types.Dict(types.String(quotes=True))
self.assertConvertedValue('foo:"bar, baz",bam:quux',
{'foo': 'bar, baz',
'bam': 'quux'})
def test_dict_of_dicts(self):
self.type_instance = types.Dict(
types.Dict(types.String(), bounds=True)
)
self.assertConvertedValue('k1:{k1:v1,k2:v2},k2:{k3:v3}',
{'k1': {'k1': 'v1', 'k2': 'v2'},
'k2': {'k3': 'v3'}})
def test_bounds_parsing(self):
self.type_instance = types.Dict(types.String(), bounds=True)
self.assertConvertedValue('{foo:bar,baz:123}',
{'foo': 'bar',
'baz': '123'})
def test_bounds_required(self):
self.type_instance = types.Dict(types.String(), bounds=True)
self.assertInvalid('foo:bar,baz:123')
self.assertInvalid('{foo:bar,baz:123')
self.assertInvalid('foo:bar,baz:123}')
def test_no_mapping_produces_error(self):
self.assertInvalid('foo,bar')
def test_repr(self):
t = types.Dict(types.Integer())
self.assertEqual('Dict of Integer', repr(t))
def test_equal(self):
self.assertTrue(types.Dict() == types.Dict())
def test_equal_with_equal_custom_item_types(self):
it1 = types.Integer()
it2 = types.Integer()
self.assertTrue(types.Dict(it1) == types.Dict(it2))
def test_not_equal_with_non_equal_custom_item_types(self):
it1 = types.Integer()
it2 = types.String()
self.assertFalse(it1 == it2)
self.assertFalse(types.Dict(it1) == types.Dict(it2))
def test_not_equal_to_other_class(self):
self.assertFalse(types.Dict() == types.Integer())
class IPAddressTypeTests(TypeTestHelper, unittest.TestCase):
type = types.IPAddress()
def test_ipv4_address(self):
self.assertConvertedValue('192.168.0.1', '192.168.0.1')
def test_ipv6_address(self):
self.assertConvertedValue('abcd:ef::1', 'abcd:ef::1')
def test_strings(self):
self.assertInvalid('')
self.assertInvalid('foo')
def test_numbers(self):
self.assertInvalid(1)
self.assertInvalid(-1)
self.assertInvalid(3.14)
class IPv4AddressTypeTests(IPAddressTypeTests):
type = types.IPAddress(4)
def test_ipv6_address(self):
self.assertInvalid('abcd:ef::1')
class IPv6AddressTypeTests(IPAddressTypeTests):
type = types.IPAddress(6)
def test_ipv4_address(self):
self.assertInvalid('192.168.0.1')

413
oslo_config/types.py Normal file
View File

@ -0,0 +1,413 @@
# Copyright 2013 Mirantis, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Type conversion and validation classes for configuration options.
Use these classes as values for the `type` argument to
:class:`oslo.config.cfg.Opt` and its subclasses.
"""
import netaddr
import six
class ConfigType(object):
BASE_TYPES = (None,)
def is_base_type(self, other):
return isinstance(other, self.BASE_TYPES)
class String(ConfigType):
"""String type.
String values do not get transformed and are returned as str objects.
:param choices: Optional sequence of valid values.
:param quotes: If True and string is enclosed with single or double
quotes, will strip those quotes. Will signal error if
string have quote at the beginning and no quote at
the end. Turned off by default. Useful if used with
container types like List.
"""
BASE_TYPES = six.string_types
def __init__(self, choices=None, quotes=False):
super(String, self).__init__()
self.choices = choices
self.quotes = quotes
def __call__(self, value):
value = str(value)
if self.quotes and value:
if value[0] in "\"'":
if value[-1] != value[0]:
raise ValueError('Non-closed quote: %s' % value)
value = value[1:-1]
if self.choices is None or value in self.choices:
return value
raise ValueError(
'Valid values are [%s], but found %s' % (
', '.join([str(v) for v in self.choices]),
repr(value)))
def __repr__(self):
if self.choices:
return 'String(choices=%s)' % repr(self.choices)
return 'String'
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.choices == other.choices) and
(self.quotes == other.quotes)
)
class MultiString(String):
BASE_TYPES = six.string_types + (list,)
class Boolean(ConfigType):
"""Boolean type.
Values are case insensitive and can be set using
1/0, yes/no, true/false or on/off.
"""
TRUE_VALUES = ['true', '1', 'on', 'yes']
FALSE_VALUES = ['false', '0', 'off', 'no']
BASE_TYPES = (bool,)
def __call__(self, value):
if isinstance(value, bool):
return value
s = value.lower()
if s in self.TRUE_VALUES:
return True
elif s in self.FALSE_VALUES:
return False
else:
raise ValueError('Unexpected boolean value %r' % value)
def __repr__(self):
return 'Boolean'
def __eq__(self, other):
return self.__class__ == other.__class__
class Integer(ConfigType):
"""Integer type.
Converts value to an integer optionally doing range checking.
If value is whitespace or empty string will return None.
:param min: Optional check that value is greater than or equal to min
:param max: Optional check that value is less than or equal to max
"""
BASE_TYPES = six.integer_types
def __init__(self, min=None, max=None):
super(Integer, self).__init__()
self.min = min
self.max = max
if min and max and max < min:
raise ValueError('Max value is less than min value')
def __call__(self, value):
if not isinstance(value, int):
s = str(value).strip()
if s == '':
value = None
else:
value = int(value)
if value:
self._check_range(value)
return value
def _check_range(self, value):
if self.min and value < self.min:
raise ValueError('Should be greater than or equal to %d' %
self.min)
if self.max and value > self.max:
raise ValueError('Should be less than or equal to %d' % self.max)
def __repr__(self):
props = []
if self.min:
props.append('min=%d' % self.min)
if self.max:
props.append('max=%d' % self.max)
if props:
return 'Integer(%s)' % ', '.join(props)
return 'Integer'
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.min == other.min) and
(self.max == other.max)
)
class Float(ConfigType):
"""Float type."""
# allow float to be set from int
BASE_TYPES = six.integer_types + (float,)
def __call__(self, value):
if isinstance(value, float):
return value
return float(value)
def __repr__(self):
return 'Float'
def __eq__(self, other):
return self.__class__ == other.__class__
class List(ConfigType):
"""List type.
Represent values of other (item) type, separated by commas.
The resulting value is a list containing those values.
List doesn't know if item type can also contain commas. To workaround this
it tries the following: if the next part fails item validation, it appends
comma and next item until validation succeeds or there is no parts left.
In the later case it will signal validation error.
:param item_type: type of list items
:param bounds: if True, value should be inside "[" and "]" pair
"""
BASE_TYPES = (list,)
def __init__(self, item_type=None, bounds=False):
super(List, self).__init__()
if item_type is None:
item_type = String()
if not callable(item_type):
raise TypeError('item_type must be callable')
self.item_type = item_type
self.bounds = bounds
def __call__(self, value):
if isinstance(value, list):
return value
result = []
s = value.strip()
if self.bounds:
if not s.startswith('['):
raise ValueError('Value should start with "["')
if not s.endswith(']'):
raise ValueError('Value should end with "]"')
s = s[1:-1]
if s == '':
return result
values = s.split(',')
while values:
value = values.pop(0)
while True:
first_error = None
try:
validated_value = self.item_type(value.strip())
break
except ValueError as e:
if not first_error:
first_error = e
if len(values) == 0:
raise first_error
value += ',' + values.pop(0)
result.append(validated_value)
return result
def __repr__(self):
return 'List of %s' % repr(self.item_type)
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.item_type == other.item_type)
)
class Dict(ConfigType):
"""Dictionary type.
Dictionary type values are key:value pairs separated by commas.
The resulting value is a dictionary of these key/value pairs.
Type of dictionary key is always string, but dictionary value
type can be customized.
:param value_type: type of values in dictionary
:param bounds: if True, value should be inside "{" and "}" pair
"""
BASE_TYPES = (dict,)
def __init__(self, value_type=None, bounds=False):
super(Dict, self).__init__()
if value_type is None:
value_type = String()
if not callable(value_type):
raise TypeError('value_type must be callable')
self.value_type = value_type
self.bounds = bounds
def __call__(self, value):
if isinstance(value, dict):
return value
result = {}
s = value.strip()
if self.bounds:
if not s.startswith('{'):
raise ValueError('Value should start with "{"')
if not s.endswith('}'):
raise ValueError('Value should end with "}"')
s = s[1:-1]
if s == '':
return result
pairs = s.split(',')
while pairs:
pair = pairs.pop(0)
while True:
first_error = None
try:
key_value = pair.split(':', 1)
if len(key_value) < 2:
raise ValueError('Value should be NAME:VALUE pairs '
'separated by ","')
key, value = key_value
key = key.strip()
value = value.strip()
value = self.value_type(value)
break
except ValueError as e:
if not first_error:
first_error = e
if not pairs:
raise first_error
pair += ',' + pairs.pop(0)
if key == '':
raise ValueError('Key name should not be empty')
if key in result:
raise ValueError('Duplicate key %s' % key)
result[key] = value
return result
def __repr__(self):
return 'Dict of %s' % repr(self.value_type)
def __eq__(self, other):
return (
(self.__class__ == other.__class__) and
(self.value_type == other.value_type)
)
class IPAddress(ConfigType):
"""IP address type
Represents either ipv4 or ipv6. Without specifying version parameter both
versions are checked
:param version: defines which version should be explicitly checked (4 or 6)
"""
BASE_TYPES = six.string_types
def __init__(self, version=None):
super(IPAddress, self).__init__()
version_checkers = {
None: self._check_both_versions,
4: self._check_ipv4,
6: self._check_ipv6
}
self.version_checker = version_checkers.get(version)
if self.version_checker is None:
raise TypeError("%s is not a valid IP version." % version)
def __call__(self, value):
value = str(value)
if not value:
raise ValueError("IP address cannot be an empty string")
self.version_checker(value)
return value
def __repr__(self):
return "IPAddress"
def __eq__(self, other):
return self.__class__ == other.__class__
def _check_ipv4(self, address):
if not netaddr.valid_ipv4(address, netaddr.core.INET_PTON):
raise ValueError("%s is not an IPv4 address" % address)
def _check_ipv6(self, address):
if not netaddr.valid_ipv6(address, netaddr.core.INET_PTON):
raise ValueError("%s is not an IPv6 address" % address)
def _check_both_versions(self, address):
if not (netaddr.valid_ipv4(address, netaddr.core.INET_PTON) or
netaddr.valid_ipv6(address, netaddr.core.INET_PTON)):
raise ValueError("%s is not IPv4 or IPv6 address" % address)

View File

@ -24,6 +24,7 @@ classifier =
packages =
oslo
oslo.config
oslo_config
namespace_packages =
oslo
@ -33,7 +34,7 @@ setup-hooks =
[entry_points]
console_scripts =
oslo-config-generator = oslo.config.generator:main
oslo-config-generator = oslo_config.generator:main
[build_sphinx]
source-dir = doc/source

View File

@ -225,7 +225,7 @@ class DefaultConfigFilesTestCase(BaseTestCase):
paths = self.create_tempfiles([('def', '[DEFAULT]')])
self.useFixture(fixtures.MonkeyPatch(
'oslo.config.cfg.find_config_files',
'oslo_config.cfg.find_config_files',
lambda project, prog: paths))
self.conf(args=[], default_config_files=None)
@ -912,7 +912,7 @@ class ConfigFileOptsTestCase(BaseTestCase):
self.assertTrue(hasattr(self.conf, 'foo'))
self.assertEqual(self.conf.foo, 666)
@mock.patch.object(cfg, 'LOG')
@mock.patch('oslo_config.cfg.LOG')
def test_conf_file_int_wrong_default(self, mock_log):
cfg.IntOpt('foo', default='666')
mock_log.debug.assert_call_count(1)
@ -1044,7 +1044,7 @@ class ConfigFileOptsTestCase(BaseTestCase):
self.assertTrue(hasattr(self.conf, 'foo'))
self.assertEqual(self.conf.foo, ['bar'])
@mock.patch.object(cfg, 'LOG')
@mock.patch('oslo_config.cfg.LOG')
def test_conf_file_list_default_wrong_type(self, mock_log):
cfg.ListOpt('foo', default=25)
mock_log.debug.assert_called_once_with(
@ -2537,49 +2537,6 @@ class UnregisterOptTestCase(BaseTestCase):
self.assertFalse(hasattr(self.conf.blaa, 'foo'))
class ImportOptTestCase(BaseTestCase):
def test_import_opt(self):
self.assertFalse(hasattr(cfg.CONF, 'blaa'))
cfg.CONF.import_opt('blaa', 'tests.testmods.blaa_opt')
self.assertTrue(hasattr(cfg.CONF, 'blaa'))
def test_import_opt_in_group(self):
self.assertFalse(hasattr(cfg.CONF, 'bar'))
cfg.CONF.import_opt('foo', 'tests.testmods.bar_foo_opt', group='bar')
self.assertTrue(hasattr(cfg.CONF, 'bar'))
self.assertTrue(hasattr(cfg.CONF.bar, 'foo'))
def test_import_opt_import_errror(self):
self.assertRaises(ImportError, cfg.CONF.import_opt,
'blaa', 'tests.testmods.blaablaa_opt')
def test_import_opt_no_such_opt(self):
self.assertRaises(cfg.NoSuchOptError, cfg.CONF.import_opt,
'blaablaa', 'tests.testmods.blaa_opt')
def test_import_opt_no_such_group(self):
self.assertRaises(cfg.NoSuchGroupError, cfg.CONF.import_opt,
'blaa', 'tests.testmods.blaa_opt', group='blaa')
class ImportGroupTestCase(BaseTestCase):
def test_import_group(self):
self.assertFalse(hasattr(cfg.CONF, 'qux'))
cfg.CONF.import_group('qux', 'tests.testmods.baz_qux_opt')
self.assertTrue(hasattr(cfg.CONF, 'qux'))
self.assertTrue(hasattr(cfg.CONF.qux, 'baz'))
def test_import_group_import_error(self):
self.assertRaises(ImportError, cfg.CONF.import_group,
'qux', 'tests.testmods.bazzz_quxxx_opt')
def test_import_group_no_such_group(self):
self.assertRaises(cfg.NoSuchGroupError, cfg.CONF.import_group,
'quxxx', 'tests.testmods.baz_qux_opt')
class RequiredOptsTestCase(BaseTestCase):
def setUp(self):

View File

@ -242,39 +242,3 @@ class RegisterTestCase(BaseTestCase):
self.assertEqual('bar', self.conf.foo)
self.assertEqual('bar', self.fconf.foo)
class ImportTestCase(BaseTestCase):
def setUp(self):
super(ImportTestCase, self).setUp(cfg.CONF)
def test_import_opt(self):
self.assertFalse(hasattr(self.conf, 'fblaa'))
self.conf.import_opt('fblaa', 'tests.testmods.fblaa_opt')
self.assertTrue(hasattr(self.conf, 'fblaa'))
self.assertFalse(hasattr(self.fconf, 'fblaa'))
self.fconf.import_opt('fblaa', 'tests.testmods.fblaa_opt')
self.assertTrue(hasattr(self.fconf, 'fblaa'))
def test_import_opt_in_group(self):
self.assertFalse(hasattr(self.conf, 'fbar'))
self.conf.import_opt('foo', 'tests.testmods.fbar_foo_opt',
group='fbar')
self.assertTrue(hasattr(self.conf, 'fbar'))
self.assertTrue(hasattr(self.conf.fbar, 'foo'))
self.assertFalse(hasattr(self.fconf, 'fbar'))
self.fconf.import_opt('foo', 'tests.testmods.fbar_foo_opt',
group='fbar')
self.assertTrue(hasattr(self.fconf, 'fbar'))
self.assertTrue(hasattr(self.fconf.fbar, 'foo'))
def test_import_group(self):
self.assertFalse(hasattr(self.conf, 'fbaar'))
self.conf.import_group('fbaar', 'tests.testmods.fbaar_baa_opt')
self.assertTrue(hasattr(self.conf, 'fbaar'))
self.assertTrue(hasattr(self.conf.fbaar, 'baa'))
self.assertFalse(hasattr(self.fconf, 'fbaar'))
self.fconf.import_group('fbaar', 'tests.testmods.fbaar_baa_opt')
self.assertTrue(hasattr(self.fconf, 'fbaar'))
self.assertTrue(hasattr(self.fconf.fbaar, 'baa'))

View File

@ -493,7 +493,7 @@ class GeneratorTestCase(base.BaseTestCase):
return self._capture_stream('stdout')
@mock.patch('stevedore.named.NamedExtensionManager')
@mock.patch.object(generator, 'LOG')
@mock.patch('oslo_config.generator.LOG')
def test_generate(self, mock_log, named_mgr):
generator.register_cli_opts(self.conf)

61
tests/test_warning.py Normal file
View File

@ -0,0 +1,61 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import imp
import os
import warnings
import mock
from oslotest import base as test_base
import six
class DeprecationWarningTest(test_base.BaseTestCase):
@mock.patch('warnings.warn')
def test_warning(self, mock_warn):
import oslo.config
imp.reload(oslo.config)
self.assertTrue(mock_warn.called)
args = mock_warn.call_args
self.assertIn('oslo_config', args[0][0])
self.assertIn('deprecated', args[0][0])
self.assertTrue(issubclass(args[0][1], DeprecationWarning))
def test_real_warning(self):
with warnings.catch_warnings(record=True) as warning_msgs:
warnings.resetwarnings()
warnings.simplefilter('always', DeprecationWarning)
import oslo.config
# Use a separate function to get the stack level correct
# so we know the message points back to this file. This
# corresponds to an import or reload, which isn't working
# inside the test under Python 3.3. That may be due to a
# difference in the import implementation not triggering
# warnings properly when the module is reloaded, or
# because the warnings module is mostly implemented in C
# and something isn't cleanly resetting the global state
# used to track whether a warning needs to be
# emitted. Whatever the cause, we definitely see the
# warnings.warn() being invoked on a reload (see the test
# above) and warnings are reported on the console when we
# run the tests. A simpler test script run outside of
# testr does correctly report the warnings.
def foo():
oslo.config.deprecated()
foo()
self.assertEqual(1, len(warning_msgs))
msg = warning_msgs[0]
self.assertIn('oslo_config', six.text_type(msg.message))
self.assertEqual('test_warning.py', os.path.basename(msg.filename))

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo_config import cfg
CONF = cfg.CONF

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo_config import cfg
CONF = cfg.CONF

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo_config import cfg
CONF = cfg.CONF

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo_config import cfg
CONF = cfg.CONF

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo_config import cfg
CONF = cfg.CONF

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from oslo.config import cfg
from oslo_config import cfg
CONF = cfg.CONF