Remove openstack.common.config and extensions.

Remove the config module.  All core openstack projects are using the cfg
module instead so no further development will be done on this module.

Also remove the extensions module.  This code was imported but there has
been no attempt to get any projects to use it.

Change-Id: I4a974ba1ea25e94fd55cad243cde5f1ef6a17289
This commit is contained in:
Russell Bryant 2012-10-23 23:33:10 -04:00 committed by Jason Kölker
parent dfd3ce97cd
commit e0f57e92df
6 changed files with 0 additions and 1711 deletions

View File

@ -1,337 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Routines for configuring Openstack Projects
"""
import logging
import logging.config
import logging.handlers
import optparse
import os
import sys
from paste import deploy
DEFAULT_LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
DEFAULT_LOG_DATE_FORMAT = "%Y-%m-%d %H:%M:%S"
def parse_options(parser, cli_args=None):
"""
Returns the parsed CLI options, command to run and its arguments, merged
with any same-named options found in a configuration file.
The function returns a tuple of (options, args), where options is a
mapping of option key/str(value) pairs, and args is the set of arguments
(not options) supplied on the command-line.
The reason that the option values are returned as strings only is that
ConfigParser and paste.deploy only accept string values...
:param parser: The option parser
:param cli_args: (Optional) Set of arguments to process. If not present,
sys.argv[1:] is used.
:retval tuple of (options, args)
"""
(options, args) = parser.parse_args(cli_args)
return (vars(options), args)
def add_common_options(parser):
"""
Given a supplied optparse.OptionParser, adds an OptionGroup that
represents all common configuration options.
:param parser: optparse.OptionParser
"""
help_text = ("The following configuration options are common to "
"this app's programs.")
group = optparse.OptionGroup(parser, "Common Options", help_text)
group.add_option('-v', '--verbose', default=False, dest="verbose",
action="store_true",
help="Print more verbose output")
group.add_option('-d', '--debug', default=False, dest="debug",
action="store_true",
help="Print debugging output")
group.add_option('--config-file', default=None, metavar="PATH",
help="Path to the config file to use. When not specified "
"(the default), we generally look at the first "
"argument specified to be a config file, and if "
"that is also missing, we search standard "
"directories for a config file.")
parser.add_option_group(group)
def add_log_options(parser):
"""
Given a supplied optparse.OptionParser, adds an OptionGroup that
represents all the configuration options around logging.
:param parser: optparse.OptionParser
"""
help_text = ("The following configuration options are specific to "
"logging functionality for this program.")
group = optparse.OptionGroup(parser, "Logging Options", help_text)
group.add_option('--log-config', default=None, metavar="PATH",
help="If this option is specified, the logging "
"configuration file specified is used and overrides "
"any other logging options specified. Please see "
"the Python logging module documentation for "
"details on logging configuration files.")
group.add_option('--log-date-format', metavar="FORMAT",
default=DEFAULT_LOG_DATE_FORMAT,
help="Format string for %(asctime)s in log records. "
"Default: %default")
group.add_option('--log-file', default=None, metavar="PATH",
help="(Optional) Name of log file to output to. "
"If not set, logging will go to stdout.")
group.add_option("--log-dir", default=None,
help="(Optional) The directory to keep log files in "
"(will be prepended to --logfile)")
group.add_option('--use-syslog', default=False, dest="use_syslog",
action="store_true",
help="Use syslog for logging.")
parser.add_option_group(group)
def setup_logging(options, conf):
"""
Sets up the logging options for a log with supplied name
:param options: Mapping of typed option key/values
:param conf: Mapping of untyped key/values from config file
"""
if options.get('log_config', None):
# Use a logging configuration file for all settings...
if os.path.exists(options['log_config']):
logging.config.fileConfig(options['log_config'])
return
else:
raise RuntimeError("Unable to locate specified logging "
"config file: %s" % options['log_config'])
# If either the CLI option or the conf value
# is True, we set to True
debug = (options.get('debug') or
get_option(conf, 'debug', type='bool', default=False))
verbose = (options.get('verbose') or
get_option(conf, 'verbose', type='bool', default=False))
root_logger = logging.root
if debug:
root_logger.setLevel(logging.DEBUG)
elif verbose:
root_logger.setLevel(logging.INFO)
else:
root_logger.setLevel(logging.WARNING)
# Set log configuration from options...
# Note that we use a hard-coded log format in the options
# because of Paste.Deploy bug #379
# http://trac.pythonpaste.org/pythonpaste/ticket/379
log_format = options.get('log_format', DEFAULT_LOG_FORMAT)
log_date_format = options.get('log_date_format', DEFAULT_LOG_DATE_FORMAT)
formatter = logging.Formatter(log_format, log_date_format)
logfile = options.get('log_file')
if not logfile:
logfile = conf.get('log_file')
use_syslog = (options.get('use_syslog') or
get_option(conf, 'use_syslog', type='bool', default=False))
if use_syslog:
handler = logging.handlers.SysLogHandler(address='/dev/log')
elif logfile:
logdir = options.get('log_dir')
if not logdir:
logdir = conf.get('log_dir')
if logdir:
logfile = os.path.join(logdir, logfile)
handler = logging.FileHandler(logfile)
else:
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(formatter)
root_logger.addHandler(handler)
def fix_path(path):
"""
Return the full absolute path
"""
return os.path.abspath(os.path.expanduser(path))
def find_config_file(app_name, options, args, config_dir=None):
"""
Return the first config file found for an application.
We search for the paste config file in the following order:
* If --config-file option is used, use that
* If args[0] is a file, use that
* Search for $app.conf in standard directories:
* .
* ~.config_dir/
* ~
* /etc/config_dir
* /etc
:retval Full path to config file, or None if no config file found
"""
config_dir = config_dir or app_name
if options.get('config_file'):
if os.path.exists(options['config_file']):
return fix_path(options['config_file'])
elif args:
if os.path.exists(args[0]):
return fix_path(args[0])
# Handle standard directory search for $app_name.conf
config_file_dirs = [fix_path(os.getcwd()),
fix_path(os.path.join('~', '.' + config_dir)),
fix_path('~'),
os.path.join('/etc', config_dir),
'/etc']
for cfg_dir in config_file_dirs:
cfg_file = os.path.join(cfg_dir, '%s.conf' % app_name)
if os.path.exists(cfg_file):
return cfg_file
def load_paste_config(app_name, options, args, config_dir=None):
"""
Looks for a config file to use for an app and returns the
config file path and a configuration mapping from a paste config file.
We search for the paste config file in the following order:
* If --config-file option is used, use that
* If args[0] is a file, use that
* Search for $app_name.conf in standard directories:
* .
* ~.config_dir/
* ~
* /etc/config_dir
* /etc
:param app_name: Name of the application to load config for, or None.
None signifies to only load the [DEFAULT] section of
the config file.
:param options: Set of typed options returned from parse_options()
:param args: Command line arguments from argv[1:]
:retval Tuple of (conf_file, conf)
:raises RuntimeError when config file cannot be located or there was a
problem loading the configuration file.
"""
conf_file = find_config_file(app_name, options, args, config_dir)
if not conf_file:
raise RuntimeError("Unable to locate any configuration file. "
"Cannot load application %s" % app_name)
try:
conf = deploy.appconfig("config:%s" % conf_file, name=app_name)
return conf_file, conf
except Exception, e:
raise RuntimeError("Error trying to load config %s: %s"
% (conf_file, e))
def load_paste_app(app_name, options, args, config_dir=None):
"""
Builds and returns a WSGI app from a paste config file.
We search for the paste config file in the following order:
* If --config-file option is used, use that
* If args[0] is a file, use that
* Search for $app_name.conf in standard directories:
* .
* ~.config_dir/
* ~
* /etc/config_dir
* /etc
:param app_name: Name of the application to load
:param options: Set of typed options returned from parse_options()
:param args: Command line arguments from argv[1:]
:raises RuntimeError when config file cannot be located or application
cannot be loaded from config file
"""
conf_file, conf = load_paste_config(app_name, options,
args, config_dir)
try:
# Setup logging early, supplying both the CLI options and the
# configuration mapping from the config file
setup_logging(options, conf)
# We only update the conf dict for the verbose and debug
# flags. Everything else must be set up in the conf file...
debug = (options.get('debug') or
get_option(conf, 'debug', type='bool', default=False))
verbose = (options.get('verbose') or
get_option(conf, 'verbose', type='bool', default=False))
conf['debug'] = debug
conf['verbose'] = verbose
# Log the options used when starting if we're in debug mode...
if debug:
logger = logging.getLogger(app_name)
logger.debug("*" * 80)
logger.debug("Configuration options gathered from config file:")
logger.debug(conf_file)
logger.debug("================================================")
items = dict([(k, v) for k, v in conf.items()
if k not in ('__file__', 'here')])
for key, value in sorted(items.items()):
logger.debug("%(key)-30s %(value)s" % locals())
logger.debug("*" * 80)
app = deploy.loadapp("config:%s" % conf_file, name=app_name)
except (LookupError, ImportError), e:
raise RuntimeError("Unable to load %(app_name)s from "
"configuration file %(conf_file)s."
"\nGot: %(e)r" % locals())
return conf, app
def get_option(options, option, **kwargs):
if option in options:
value = options[option]
type_ = kwargs.get('type', 'str')
if type_ == 'bool':
if hasattr(value, 'lower'):
return value.lower() == 'true'
else:
return value
elif type_ == 'int':
return int(value)
elif type_ == 'float':
return float(value)
else:
return value
elif 'default' in kwargs:
return kwargs['default']
else:
raise KeyError("option '%s' not found" % option)

View File

@ -1,539 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2011 Justin Santa Barbara
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import imp
import logging
from lxml import etree
import os
import routes
import webob.dec
import webob.exc
from openstack.common import exception
from openstack.common.gettextutils import _
from openstack.common import wsgi
LOG = logging.getLogger('extensions')
DEFAULT_XMLNS = "http://docs.openstack.org/"
XMLNS_ATOM = "http://www.w3.org/2005/Atom"
class ExtensionDescriptor(object):
"""Base class that defines the contract for extensions.
Note that you don't have to derive from this class to have a valid
extension; it is purely a convenience.
"""
def get_name(self):
"""The name of the extension.
e.g. 'Fox In Socks'
"""
raise NotImplementedError()
def get_alias(self):
"""The alias for the extension.
e.g. 'FOXNSOX'
"""
raise NotImplementedError()
def get_description(self):
"""Friendly description for the extension.
e.g. 'The Fox In Socks Extension'
"""
raise NotImplementedError()
def get_namespace(self):
"""The XML namespace for the extension.
e.g. 'http://www.fox.in.socks/api/ext/pie/v1.0'
"""
raise NotImplementedError()
def get_updated(self):
"""The timestamp when the extension was last updated.
e.g. '2011-01-22T13:25:27-06:00'
"""
# NOTE(justinsb): Not sure of the purpose of this is, vs the XML NS
raise NotImplementedError()
def get_resources(self):
"""List of extensions.ResourceExtension extension objects.
Resources define new nouns, and are accessible through URLs.
"""
resources = []
return resources
def get_actions(self):
"""List of extensions.ActionExtension extension objects.
Actions are verbs callable from the API.
"""
actions = []
return actions
def get_request_extensions(self):
"""List of extensions.RequestException extension objects.
Request extensions are used to handle custom request data.
"""
request_exts = []
return request_exts
class ActionExtensionController(object):
def __init__(self, application):
self.application = application
self.action_handlers = {}
def add_action(self, action_name, handler):
self.action_handlers[action_name] = handler
def action(self, req, id, body):
for action_name, handler in self.action_handlers.iteritems():
if action_name in body:
return handler(body, req, id)
# no action handler found (bump to downstream application)
res = self.application
return res
class ActionExtensionResource(wsgi.Resource):
def __init__(self, application):
controller = ActionExtensionController(application)
wsgi.Resource.__init__(self, controller)
def add_action(self, action_name, handler):
self.controller.add_action(action_name, handler)
class RequestExtensionController(object):
def __init__(self, application):
self.application = application
self.handlers = []
def add_handler(self, handler):
self.handlers.append(handler)
def process(self, req, *args, **kwargs):
res = req.get_response(self.application)
# currently request handlers are un-ordered
for handler in self.handlers:
res = handler(req, res)
return res
class RequestExtensionResource(wsgi.Resource):
def __init__(self, application):
controller = RequestExtensionController(application)
wsgi.Resource.__init__(self, controller)
def add_handler(self, handler):
self.controller.add_handler(handler)
class ExtensionsResource(wsgi.Resource):
def __init__(self, extension_manager):
self.extension_manager = extension_manager
body_serializers = {'application/xml': ExtensionsXMLSerializer()}
serializer = wsgi.ResponseSerializer(body_serializers=body_serializers)
super(ExtensionsResource, self).__init__(self, None, serializer)
def _translate(self, ext):
ext_data = {}
ext_data['name'] = ext.get_name()
ext_data['alias'] = ext.get_alias()
ext_data['description'] = ext.get_description()
ext_data['namespace'] = ext.get_namespace()
ext_data['updated'] = ext.get_updated()
ext_data['links'] = [] # TODO(dprince): implement extension links
return ext_data
def index(self, req):
extensions = []
for _alias, ext in self.extension_manager.extensions.iteritems():
extensions.append(self._translate(ext))
return dict(extensions=extensions)
def show(self, req, id):
# NOTE(dprince): the extensions alias is used as the 'id' for show
ext = self.extension_manager.extensions.get(id, None)
if not ext:
raise webob.exc.HTTPNotFound(
_("Extension with alias %s does not exist") % id)
return dict(extension=self._translate(ext))
def delete(self, req, id):
raise webob.exc.HTTPNotFound()
def create(self, req):
raise webob.exc.HTTPNotFound()
class ExtensionMiddleware(wsgi.Middleware):
"""Extensions middleware for WSGI."""
@classmethod
def factory(cls, global_config, **local_config):
"""Paste factory."""
def _factory(app):
return cls(app, global_config, **local_config)
return _factory
def _action_ext_resources(self, application, ext_mgr, mapper):
"""Return a dict of ActionExtensionResource-s by collection."""
action_resources = {}
for action in ext_mgr.get_actions():
if not action.collection in action_resources.keys():
resource = ActionExtensionResource(application)
mapper.connect("/%s/:(id)/action.:(format)" %
action.collection,
action='action',
controller=resource,
conditions=dict(method=['POST']))
mapper.connect("/%s/:(id)/action" %
action.collection,
action='action',
controller=resource,
conditions=dict(method=['POST']))
action_resources[action.collection] = resource
return action_resources
def _request_ext_resources(self, application, ext_mgr, mapper):
"""Returns a dict of RequestExtensionResource-s by collection."""
request_ext_resources = {}
for req_ext in ext_mgr.get_request_extensions():
if not req_ext.key in request_ext_resources.keys():
resource = RequestExtensionResource(application)
mapper.connect(req_ext.url_route + '.:(format)',
action='process',
controller=resource,
conditions=req_ext.conditions)
mapper.connect(req_ext.url_route,
action='process',
controller=resource,
conditions=req_ext.conditions)
request_ext_resources[req_ext.key] = resource
return request_ext_resources
def __init__(self, application, config, ext_mgr=None):
ext_mgr = ext_mgr or ExtensionManager(config['api_extensions_path'])
mapper = routes.Mapper()
# extended resources
for resource_ext in ext_mgr.get_resources():
LOG.debug(_('Extended resource: %s'), resource_ext.collection)
controller_resource = wsgi.Resource(resource_ext.controller,
resource_ext.deserializer,
resource_ext.serializer)
self._map_custom_collection_actions(resource_ext, mapper,
controller_resource)
kargs = dict(controller=controller_resource,
collection=resource_ext.collection_actions,
member=resource_ext.member_actions)
if resource_ext.parent:
kargs['parent_resource'] = resource_ext.parent
mapper.resource(resource_ext.collection,
resource_ext.collection, **kargs)
# extended actions
action_resources = self._action_ext_resources(application, ext_mgr,
mapper)
for action in ext_mgr.get_actions():
LOG.debug(_('Extended action: %s'), action.action_name)
resource = action_resources[action.collection]
resource.add_action(action.action_name, action.handler)
# extended requests
req_controllers = self._request_ext_resources(application, ext_mgr,
mapper)
for request_ext in ext_mgr.get_request_extensions():
LOG.debug(_('Extended request: %s'), request_ext.key)
controller = req_controllers[request_ext.key]
controller.add_handler(request_ext.handler)
self._router = routes.middleware.RoutesMiddleware(self._dispatch,
mapper)
super(ExtensionMiddleware, self).__init__(application)
def _map_custom_collection_actions(self, resource_ext, mapper,
controller_resource):
for action, method in resource_ext.collection_actions.iteritems():
parent = resource_ext.parent
conditions = dict(method=[method])
path = "/%s/%s" % (resource_ext.collection, action)
path_prefix = ""
if parent:
path_prefix = "/%s/{%s_id}" % (parent["collection_name"],
parent["member_name"])
with mapper.submapper(controller=controller_resource,
action=action,
path_prefix=path_prefix,
conditions=conditions) as submap:
submap.connect(path)
submap.connect("%s.:(format)" % path)
@webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req):
"""Route the incoming request with router."""
req.environ['extended.app'] = self.application
return self._router
@staticmethod
@webob.dec.wsgify(RequestClass=wsgi.Request)
def _dispatch(req):
"""Dispatch the request.
Returns the routed WSGI app's response or defers to the extended
application.
"""
match = req.environ['wsgiorg.routing_args'][1]
if not match:
return req.environ['extended.app']
app = match['controller']
return app
class ExtensionManager(object):
"""Load extensions from the configured extension path.
See nova/tests/api/openstack/extensions/foxinsocks/extension.py for an
example extension implementation.
"""
def __init__(self, path):
LOG.debug(_('Initializing extension manager.'))
self.path = path
self.extensions = {}
self._load_all_extensions()
def get_resources(self):
"""Returns a list of ResourceExtension objects."""
resources = []
extension_resource = ExtensionsResource(self)
res_ext = ResourceExtension('extensions',
extension_resource,
serializer=extension_resource.serializer)
resources.append(res_ext)
for alias, ext in self.extensions.iteritems():
try:
resources.extend(ext.get_resources())
except AttributeError:
# NOTE(dprince): Extension aren't required to have resource
# extensions
pass
return resources
def get_actions(self):
"""Returns a list of ActionExtension objects."""
actions = []
for alias, ext in self.extensions.iteritems():
try:
actions.extend(ext.get_actions())
except AttributeError:
# NOTE(dprince): Extension aren't required to have action
# extensions
pass
return actions
def get_request_extensions(self):
"""Returns a list of RequestExtension objects."""
request_exts = []
for alias, ext in self.extensions.iteritems():
try:
request_exts.extend(ext.get_request_extensions())
except AttributeError:
# NOTE(dprince): Extension aren't required to have request
# extensions
pass
return request_exts
def _check_extension(self, extension):
"""Checks for required methods in extension objects."""
try:
LOG.debug(_('Ext name: %s'), extension.get_name())
LOG.debug(_('Ext alias: %s'), extension.get_alias())
LOG.debug(_('Ext description: %s'), extension.get_description())
LOG.debug(_('Ext namespace: %s'), extension.get_namespace())
LOG.debug(_('Ext updated: %s'), extension.get_updated())
except AttributeError as ex:
LOG.exception(_("Exception loading extension: %s"), unicode(ex))
return False
return True
def _load_all_extensions(self):
"""Load extensions from the configured path.
Load extensions from the configured path. The extension name is
constructed from the module_name. If your extension module was named
widgets.py the extension class within that module should be
'Widgets'.
In addition, extensions are loaded from the 'contrib' directory.
See nova/tests/api/openstack/extensions/foxinsocks.py for an example
extension implementation.
"""
if os.path.exists(self.path):
self._load_all_extensions_from_path(self.path)
contrib_path = os.path.join(os.path.dirname(__file__), "contrib")
if os.path.exists(contrib_path):
self._load_all_extensions_from_path(contrib_path)
def _load_all_extensions_from_path(self, path):
for f in os.listdir(path):
LOG.debug(_('Loading extension file: %s'), f)
mod_name, file_ext = os.path.splitext(os.path.split(f)[-1])
ext_path = os.path.join(path, f)
if file_ext.lower() == '.py' and not mod_name.startswith('_'):
mod = imp.load_source(mod_name, ext_path)
ext_name = mod_name[0].upper() + mod_name[1:]
new_ext_class = getattr(mod, ext_name, None)
if not new_ext_class:
LOG.warn(_('Did not find expected name '
'"%(ext_name)s" in %(file)s'),
{'ext_name': ext_name,
'file': ext_path})
continue
new_ext = new_ext_class()
self.add_extension(new_ext)
def add_extension(self, ext):
# Do nothing if the extension doesn't check out
if not self._check_extension(ext):
return
alias = ext.get_alias()
LOG.debug(_('Loaded extension: %s'), alias)
if alias in self.extensions:
raise exception.Error("Found duplicate extension: %s" % alias)
self.extensions[alias] = ext
class RequestExtension(object):
"""Extend requests and responses of core nova OpenStack API resources.
Provide a way to add data to responses and handle custom request data
that is sent to core nova OpenStack API controllers.
"""
def __init__(self, method, url_route, handler):
self.url_route = url_route
self.handler = handler
self.conditions = dict(method=[method])
self.key = "%s-%s" % (method, url_route)
class ActionExtension(object):
"""Add custom actions to core nova OpenStack API resources."""
def __init__(self, collection, action_name, handler):
self.collection = collection
self.action_name = action_name
self.handler = handler
class ResourceExtension(object):
"""Add top level resources to the OpenStack API in nova."""
def __init__(self, collection, controller, parent=None,
collection_actions=None, member_actions=None,
deserializer=None, serializer=None):
if not collection_actions:
collection_actions = {}
if not member_actions:
member_actions = {}
self.collection = collection
self.controller = controller
self.parent = parent
self.collection_actions = collection_actions
self.member_actions = member_actions
self.deserializer = deserializer
self.serializer = serializer
class ExtensionsXMLSerializer(wsgi.XMLDictSerializer):
def __init__(self):
self.nsmap = {None: DEFAULT_XMLNS, 'atom': XMLNS_ATOM}
def show(self, ext_dict):
ext = etree.Element('extension', nsmap=self.nsmap)
self._populate_ext(ext, ext_dict['extension'])
return self._to_xml(ext)
def index(self, exts_dict):
exts = etree.Element('extensions', nsmap=self.nsmap)
for ext_dict in exts_dict['extensions']:
ext = etree.SubElement(exts, 'extension')
self._populate_ext(ext, ext_dict)
return self._to_xml(exts)
def _populate_ext(self, ext_elem, ext_dict):
"""Populate an extension xml element from a dict."""
ext_elem.set('name', ext_dict['name'])
ext_elem.set('namespace', ext_dict['namespace'])
ext_elem.set('alias', ext_dict['alias'])
ext_elem.set('updated', ext_dict['updated'])
desc = etree.Element('description')
desc.text = ext_dict['description']
ext_elem.append(desc)
for link in ext_dict.get('links', []):
elem = etree.SubElement(ext_elem, '{%s}link' % XMLNS_ATOM)
elem.set('rel', link['rel'])
elem.set('href', link['href'])
elem.set('type', link['type'])
return ext_elem
def _to_xml(self, root):
"""Convert the xml object to an xml string."""
return etree.tostring(root, encoding='UTF-8')

View File

@ -1,15 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -1,98 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from openstack.common import extensions
from openstack.common import jsonutils
class FoxInSocksController(object):
def index(self, request):
return "Try to say this Mr. Knox, sir..."
class Foxinsocks(object):
def __init__(self):
pass
def get_name(self):
return "Fox In Socks"
def get_alias(self):
return "FOXNSOX"
def get_description(self):
return "The Fox In Socks Extension"
def get_namespace(self):
return "http://www.fox.in.socks/api/ext/pie/v1.0"
def get_updated(self):
return "2011-01-22T13:25:27-06:00"
def get_resources(self):
resources = []
resource = extensions.ResourceExtension('foxnsocks',
FoxInSocksController())
resources.append(resource)
return resources
def get_actions(self):
return [extensions.ActionExtension('dummy_resources',
'FOXNSOX:add_tweedle',
self._add_tweedle_handler),
extensions.ActionExtension('dummy_resources',
'FOXNSOX:delete_tweedle',
self._delete_tweedle_handler)]
def get_request_extensions(self):
request_exts = []
def _goose_handler(req, res):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
data = jsonutils.loads(res.body)
data['FOXNSOX:googoose'] = req.GET.get('chewing')
res.body = jsonutils.dumps(data)
return res
req_ext1 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
_goose_handler)
request_exts.append(req_ext1)
def _bands_handler(req, res):
#NOTE: This only handles JSON responses.
# You can use content type header to test for XML.
data = jsonutils.loads(res.body)
data['FOXNSOX:big_bands'] = 'Pig Bands!'
res.body = jsonutils.dumps(data)
return res
req_ext2 = extensions.RequestExtension('GET', '/dummy_resources/:(id)',
_bands_handler)
request_exts.append(req_ext2)
return request_exts
def _add_tweedle_handler(self, input_dict, req, id):
return "Tweedle {0} Added.".format(
input_dict['FOXNSOX:add_tweedle']['name'])
def _delete_tweedle_handler(self, input_dict, req, id):
return "Tweedle {0} Deleted.".format(
input_dict['FOXNSOX:delete_tweedle']['name'])

View File

@ -1,124 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the 'License'); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an 'AS IS' BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import optparse
import unittest
import mock
from openstack.common import config
class TestConfig(unittest.TestCase):
def test_common_options(self):
parser = optparse.OptionParser()
self.assertEquals(0, len(parser.option_groups))
config.add_common_options(parser)
self.assertEquals(1, len(parser.option_groups))
expected_options = ['--verbose', '--debug', '--config-file']
for e in expected_options:
self.assertTrue(parser.option_groups[0].get_option(e),
'Missing required common option: %s' % e)
def test_add_log_options(self):
parser = optparse.OptionParser()
self.assertEquals(0, len(parser.option_groups))
config.add_log_options(parser)
self.assertEquals(1, len(parser.option_groups))
expected_options = ['--log-config', '--log-date-format',
'--log-file', '--log-dir', '--use-syslog']
for e in expected_options:
self.assertTrue(parser.option_groups[0].get_option(e),
'Missing required common option: %s' % e)
def test_parse_options(self):
# test empty args and that parse_options() returns a mapping
# of typed values
parser = optparse.OptionParser()
config.add_common_options(parser)
parsed_options, args = config.parse_options(parser, [])
expected_options = {'verbose': False, 'debug': False,
'config_file': None}
self.assertEquals(expected_options, parsed_options)
# test non-empty args and that parse_options() returns a mapping
# of typed values matching supplied args
parser = optparse.OptionParser()
config.add_common_options(parser)
parsed_options, args = config.parse_options(parser, ['--verbose'])
expected_options = {'verbose': True, 'debug': False,
'config_file': None}
self.assertEquals(expected_options, parsed_options)
# test non-empty args that contain unknown options raises
# a SystemExit exception. Not ideal, but unfortunately optparse
# raises a sys.exit() when it runs into an error instead of raising
# something a bit more useful for libraries. optparse must have been
# written by the same group that wrote unittest ;)
parser = optparse.OptionParser()
config.add_common_options(parser)
self.assertRaises(SystemExit, config.parse_options,
parser, ['--unknown'])
def test_load_paste_config(self):
# Test that config_file cannot by found raises
self.assertRaises(RuntimeError, config.load_paste_config,
'fake_app', {}, [])
# Test that an app cannot be configured
with mock.patch('openstack.common.config.find_config_file',
mock.Mock(return_value=True)):
self.assertRaises(RuntimeError, config.load_paste_config,
'fake_app', {}, [])
def test_get_option_default(self):
default = 'default'
res = config.get_option({}, 'option', default=default)
self.assertEqual(res, default)
def test_get_option_not_found(self):
self.assertRaises(KeyError, config.get_option, {}, 'options')
def test_get_option_bool(self):
options = {'option': False}
res = config.get_option(options, 'option', type='bool')
self.assertEqual(res, False)
def test_get_option_bool_string(self):
options = {'option': 'FALSE'}
res = config.get_option(options, 'option', type='bool')
self.assertEqual(res, False)
def test_get_option_int(self):
options = {'option': '42'}
res = config.get_option(options, 'option', type='int')
self.assertEqual(res, 42)
def test_get_option_float(self):
options = {'option': '2.71828'}
res = config.get_option(options, 'option', type='float')
self.assertEqual(res, 2.71828)
def test_get_option(self):
options = {'option': dict()}
res = config.get_option(options, 'option')
self.assertEqual(res, options['option'])

View File

@ -1,598 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import os.path
import routes
import unittest
import mock
import webob
from lxml import etree
from webtest import TestApp
from openstack.common import config
from openstack.common import extensions
from openstack.common import jsonutils
from openstack.common import wsgi
from tests.unit import extension_stubs
test_conf_file = os.path.join(os.path.dirname(__file__), os.pardir,
os.pardir, 'etc', 'openstack-common.conf.test')
NS = "{http://docs.openstack.org/}"
ATOMNS = "{http://www.w3.org/2005/Atom}"
class ExtensionsTestApp(wsgi.Router):
def __init__(self, options={}):
mapper = routes.Mapper()
controller = extension_stubs.StubBaseAppController()
mapper.resource("dummy_resource", "/dummy_resources",
controller=controller.create_resource())
super(ExtensionsTestApp, self).__init__(mapper)
class ResourceExtensionTest(unittest.TestCase):
class ResourceExtensionController(object):
def index(self, request):
return "resource index"
def show(self, request, id):
return {'data': {'id': id}}
def custom_member_action(self, request, id):
return {'member_action': 'value'}
def custom_collection_action(self, request, **kwargs):
return {'collection': 'value'}
def test_resource_can_be_added_as_extension(self):
res_ext = extensions.ResourceExtension(
'tweedles',
self.ResourceExtensionController())
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
index_response = test_app.get("/tweedles")
self.assertEqual(200, index_response.status_int)
self.assertEqual("resource index", index_response.json)
show_response = test_app.get("/tweedles/25266")
self.assertEqual({'data': {'id': "25266"}}, show_response.json)
def test_resource_extension_with_custom_member_action(self):
controller = self.ResourceExtensionController()
member = {'custom_member_action': "GET"}
res_ext = extensions.ResourceExtension('tweedles', controller,
member_actions=member)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.get("/tweedles/some_id/custom_member_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['member_action'],
"value")
def test_resource_extension_for_get_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "PUT"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.put("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
def test_resource_extension_for_put_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "PUT"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.put("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'], 'value')
def test_resource_extension_for_post_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "POST"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.post("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'], 'value')
def test_resource_extension_for_delete_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "DELETE"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.delete("/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'], 'value')
def test_resource_ext_for_formatted_req_on_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "GET"}
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.get("/tweedles/custom_collection_action.json")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
def test_resource_ext_for_nested_resource_custom_collection_action(self):
controller = self.ResourceExtensionController()
collections = {'custom_collection_action': "GET"}
parent = dict(collection_name='beetles', member_name='beetle')
res_ext = extensions.ResourceExtension('tweedles', controller,
collection_actions=collections,
parent=parent)
test_app = setup_extensions_app(SimpleExtensionManager(res_ext))
response = test_app.get("/beetles/beetle_id"
"/tweedles/custom_collection_action")
self.assertEqual(200, response.status_int)
self.assertEqual(jsonutils.loads(response.body)['collection'], "value")
def test_returns_404_for_non_existant_extension(self):
test_app = setup_extensions_app(SimpleExtensionManager(None))
response = test_app.get("/non_extistant_extension", status='*')
self.assertEqual(404, response.status_int)
class ActionExtensionTest(unittest.TestCase):
def setUp(self):
super(ActionExtensionTest, self).setUp()
self.extension_app = setup_extensions_app()
def test_extended_action_for_adding_extra_data(self):
action_name = 'FOXNSOX:add_tweedle'
action_params = dict(name='Beetle')
req_body = jsonutils.dumps({action_name: action_params})
response = self.extension_app.post(
'/dummy_resources/1/action',
req_body, content_type='application/json')
self.assertEqual("Tweedle Beetle Added.", response.json)
def test_extended_action_for_deleting_extra_data(self):
action_name = 'FOXNSOX:delete_tweedle'
action_params = dict(name='Bailey')
req_body = jsonutils.dumps({action_name: action_params})
response = self.extension_app.post(
"/dummy_resources/1/action",
req_body, content_type='application/json')
self.assertEqual("Tweedle Bailey Deleted.", response.json)
def test_returns_404_for_non_existant_action(self):
non_existant_action = 'blah_action'
action_params = dict(name="test")
req_body = jsonutils.dumps({non_existant_action: action_params})
response = self.extension_app.post(
"/dummy_resources/1/action",
req_body, content_type='application/json',
status='*')
self.assertEqual(404, response.status_int)
def test_returns_404_for_non_existant_resource(self):
action_name = 'add_tweedle'
action_params = dict(name='Beetle')
req_body = jsonutils.dumps({action_name: action_params})
response = self.extension_app.post(
"/asdf/1/action", req_body,
content_type='application/json', status='*')
self.assertEqual(404, response.status_int)
class RequestExtensionTest(unittest.TestCase):
def test_headers_can_be_extended(self):
def extend_headers(req, res):
assert req.headers['X-NEW-REQUEST-HEADER'] == "sox"
res.headers['X-NEW-RESPONSE-HEADER'] = "response_header_data"
return res
app = self._setup_app_with_request_handler(extend_headers, 'GET')
response = app.get("/dummy_resources/1",
headers={'X-NEW-REQUEST-HEADER': "sox"})
self.assertEqual(response.headers['X-NEW-RESPONSE-HEADER'],
"response_header_data")
def test_extend_get_resource_response(self):
def extend_response_data(req, res):
data = jsonutils.loads(res.body)
data['FOXNSOX:extended_key'] = req.GET.get('extended_key')
res.body = jsonutils.dumps(data)
return res
app = self._setup_app_with_request_handler(extend_response_data, 'GET')
response = app.get("/dummy_resources/1?extended_key=extended_data")
self.assertEqual(200, response.status_int)
response_data = jsonutils.loads(response.body)
self.assertEqual('extended_data',
response_data['FOXNSOX:extended_key'])
self.assertEqual('knox', response_data['fort'])
def test_get_resources(self):
app = setup_extensions_app()
response = app.get("/dummy_resources/1?chewing=newblue")
response_data = jsonutils.loads(response.body)
self.assertEqual('newblue', response_data['FOXNSOX:googoose'])
self.assertEqual("Pig Bands!", response_data['FOXNSOX:big_bands'])
def test_edit_previously_uneditable_field(self):
def _update_handler(req, res):
data = jsonutils.loads(res.body)
data['uneditable'] = jsonutils.loads(req.body)['uneditable']
res.body = jsonutils.dumps(data)
return res
base_app = TestApp(setup_base_app())
response = base_app.put("/dummy_resources/1",
jsonutils.dumps({'uneditable': "new_value"}),
headers={'Content-Type': "application/json"})
self.assertEqual(response.json['uneditable'], "original_value")
ext_app = self._setup_app_with_request_handler(_update_handler,
'PUT')
ext_response = ext_app.put(
"/dummy_resources/1",
jsonutils.dumps({'uneditable': "new_value"}),
headers={'Content-Type': "application/json"})
self.assertEqual(ext_response.json['uneditable'], "new_value")
def _setup_app_with_request_handler(self, handler, verb):
req_ext = extensions.RequestExtension(
verb,
'/dummy_resources/:(id)', handler)
manager = SimpleExtensionManager(None, None, req_ext)
return setup_extensions_app(manager)
class ExtensionManagerTest(unittest.TestCase):
def test_invalid_extensions_are_not_registered(self):
class InvalidExtension(object):
"""
This Extension doesn't implement extension methods :
get_name, get_description, get_namespace and get_updated
"""
def get_alias(self):
return "invalid_extension"
ext_mgr = extensions.ExtensionManager('')
ext_mgr.add_extension(InvalidExtension())
ext_mgr.add_extension(extension_stubs.StubExtension("valid_extension"))
self.assertTrue('valid_extension' in ext_mgr.extensions)
self.assertFalse('invalid_extension' in ext_mgr.extensions)
class ExtensionControllerTest(unittest.TestCase):
def setUp(self):
super(ExtensionControllerTest, self).setUp()
self.test_app = setup_extensions_app()
def test_index_gets_all_registerd_extensions(self):
response = self.test_app.get("/extensions")
foxnsox = response.json["extensions"][0]
self.assertEqual(
foxnsox, {
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
'name': 'Fox In Socks',
'updated': '2011-01-22T13:25:27-06:00',
'description': 'The Fox In Socks Extension',
'alias': 'FOXNSOX',
'links': []
}
)
def test_extension_can_be_accessed_by_alias(self):
json_response = self.test_app.get("/extensions/FOXNSOX").json
foxnsox = json_response['extension']
self.assertEqual(
foxnsox, {
'namespace': 'http://www.fox.in.socks/api/ext/pie/v1.0',
'name': 'Fox In Socks',
'updated': '2011-01-22T13:25:27-06:00',
'description': 'The Fox In Socks Extension',
'alias': 'FOXNSOX',
'links': []
}
)
def test_show_returns_not_found_for_non_existant_extension(self):
response = self.test_app.get("/extensions/non_existant", status="*")
self.assertEqual(response.status_int, 404)
def test_list_extensions_xml(self):
response = self.test_app.get("/extensions.xml")
self.assertEqual(200, response.status_int)
root = etree.XML(response.body)
self.assertEqual(root.tag.split('extensions')[0], NS)
# Make sure that Fox in Sox extension is correct.
exts = root.findall('{0}extension'.format(NS))
fox_ext = exts[0]
self.assertEqual(fox_ext.get('name'), 'Fox In Socks')
self.assertEqual(
fox_ext.get('namespace'),
'http://www.fox.in.socks/api/ext/pie/v1.0')
self.assertEqual(fox_ext.get('updated'), '2011-01-22T13:25:27-06:00')
self.assertEqual(
fox_ext.findtext('{0}description'.format(NS)),
'The Fox In Socks Extension')
def test_get_extension_xml(self):
response = self.test_app.get("/extensions/FOXNSOX.xml")
self.assertEqual(200, response.status_int)
xml = response.body
root = etree.XML(xml)
self.assertEqual(root.tag.split('extension')[0], NS)
self.assertEqual(root.get('alias'), 'FOXNSOX')
self.assertEqual(root.get('name'), 'Fox In Socks')
self.assertEqual(
root.get('namespace'),
'http://www.fox.in.socks/api/ext/pie/v1.0')
self.assertEqual(root.get('updated'), '2011-01-22T13:25:27-06:00')
self.assertEqual(
root.findtext('{0}description'.format(NS)),
'The Fox In Socks Extension')
class DefaultXmlnsTest(unittest.TestCase):
def setUp(self):
super(DefaultXmlnsTest, self).setUp()
self.original_default_xmlns = extensions.DEFAULT_XMLNS
extensions.DEFAULT_XMLNS = "http://blah"
self.test_app = setup_extensions_app()
def test_default_xmlns_can_be_changed(self):
response = self.test_app.get("/extensions/FOXNSOX.xml")
self.assertEqual(200, response.status_int)
xml = response.body
root = etree.XML(xml)
self.assertEqual(root.tag.split('extension')[0], "{http://blah}")
def tearDown(self):
super(DefaultXmlnsTest, self).tearDown()
extensions.DEFAULT_XMLNS = self.original_default_xmlns
class ExtensionsXMLSerializerTest(unittest.TestCase):
def test_serialize_extenstion(self):
serializer = extensions.ExtensionsXMLSerializer()
data = {
'extension': {
'name': 'ext1',
'namespace': 'http://docs.rack.com/servers/api/ext/pie/v1.0',
'alias': 'RS-PIE',
'updated': '2011-01-22T13:25:27-06:00',
'description': 'Adds the capability to share an image.',
'links': [
{'rel': 'describedby',
'type': 'application/pdf',
'href': 'http://docs.rack.com/servers/api/ext/cs.pdf'},
{'rel': 'describedby',
'type': 'application/vnd.sun.wadl+xml',
'href': 'http://docs.rack.com/servers/api/ext/cs.wadl'}]}}
xml = serializer.serialize(data, 'show')
root = etree.XML(xml)
ext_dict = data['extension']
self.assertEqual(root.findtext('{0}description'.format(NS)),
ext_dict['description'])
for key in ['name', 'namespace', 'alias', 'updated']:
self.assertEqual(root.get(key), ext_dict[key])
link_nodes = root.findall('{0}link'.format(ATOMNS))
self.assertEqual(len(link_nodes), 2)
for i, link in enumerate(ext_dict['links']):
for key, value in link.items():
self.assertEqual(link_nodes[i].get(key), value)
def test_serialize_extensions(self):
serializer = extensions.ExtensionsXMLSerializer()
data = {
"extensions": [{
"name": "Public Image Extension",
"namespace": "http://foo.com/api/ext/pie/v1.0",
"alias": "RS-PIE",
"updated": "2011-01-22T13:25:27-06:00",
"description": "Adds the capability to share an image.",
"links": [{"rel": "describedby",
"type": "application/pdf",
"type": "application/vnd.sun.wadl+xml",
"href": "http://foo.com/api/ext/cs-pie.pdf"},
{"rel": "describedby",
"type": "application/vnd.sun.wadl+xml",
"href": "http://foo.com/api/ext/cs-pie.wadl"}]},
{"name": "Cloud Block Storage",
"namespace": "http://foo.com/api/ext/cbs/v1.0",
"alias": "RS-CBS",
"updated": "2011-01-12T11:22:33-06:00",
"description": "Allows mounting cloud block storage.",
"links": [{"rel": "describedby",
"type": "application/pdf",
"href": "http://foo.com/api/ext/cs-cbs.pdf"},
{"rel": "describedby",
"type": "application/vnd.sun.wadl+xml",
"href": "http://foo.com/api/ext/cs-cbs.wadl"}]}]}
xml = serializer.serialize(data, 'index')
root = etree.XML(xml)
ext_elems = root.findall('{0}extension'.format(NS))
self.assertEqual(len(ext_elems), 2)
for i, ext_elem in enumerate(ext_elems):
ext_dict = data['extensions'][i]
self.assertEqual(ext_elem.findtext('{0}description'.format(NS)),
ext_dict['description'])
for key in ['name', 'namespace', 'alias', 'updated']:
self.assertEqual(ext_elem.get(key), ext_dict[key])
link_nodes = ext_elem.findall('{0}link'.format(ATOMNS))
self.assertEqual(len(link_nodes), 2)
for i, link in enumerate(ext_dict['links']):
for key, value in link.items():
self.assertEqual(link_nodes[i].get(key), value)
def app_factory(global_conf, **local_conf):
conf = global_conf.copy()
conf.update(local_conf)
return ExtensionsTestApp(conf)
def setup_base_app():
options = {'config_file': test_conf_file}
conf, app = config.load_paste_app('extensions_test_app', options, None)
return app
def setup_extensions_middleware(extension_manager=None):
options = {'config_file': test_conf_file}
conf, app = config.load_paste_app('extensions_test_app', options, None)
return extensions.ExtensionMiddleware(app, conf, extension_manager)
def setup_extensions_app(extension_manager=None):
return TestApp(setup_extensions_middleware(extension_manager))
class SimpleExtensionManager(object):
def __init__(self, resource_ext=None, action_ext=None, request_ext=None):
self.resource_ext = resource_ext
self.action_ext = action_ext
self.request_ext = request_ext
def get_resources(self):
resource_exts = []
if self.resource_ext:
resource_exts.append(self.resource_ext)
return resource_exts
def get_actions(self):
action_exts = []
if self.action_ext:
action_exts.append(self.action_ext)
return action_exts
def get_request_extensions(self):
request_extensions = []
if self.request_ext:
request_extensions.append(self.request_ext)
return request_extensions
class ExtensionDescriptorInterfaceTest(unittest.TestCase):
def test_interface_functions(self):
# NOTE(jkoelker) This is why we should be using zope.interface for
# our contracts. If you came here because this test
# failed then you need to make sure you update the
# implementors or notify the mailing list that the
# contract has changed.
contract_methods = ['get_name', 'get_alias', 'get_description',
'get_namespace', 'get_updated', 'get_resources',
'get_actions', 'get_request_extensions']
predicate = lambda a: (inspect.ismethod(a) and
not a.__name__.startswith('_'))
for method in inspect.getmembers(extensions.ExtensionDescriptor,
predicate):
self.assertFalse(method[0] not in contract_methods)
contract = extensions.ExtensionDescriptor()
self.assertRaises(NotImplementedError, contract.get_name)
self.assertRaises(NotImplementedError, contract.get_alias)
self.assertRaises(NotImplementedError, contract.get_description)
self.assertRaises(NotImplementedError, contract.get_namespace)
self.assertRaises(NotImplementedError, contract.get_updated)
self.assertFalse(contract.get_resources())
self.assertFalse(contract.get_actions())
self.assertFalse(contract.get_request_extensions())
class ExtensionsResourceTest(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
manager = mock.Mock(spec=extensions.ExtensionManager)
self.extenstions_resource = extensions.ExtensionsResource(manager)
def test_delete_404(self):
self.assertRaises(webob.exc.HTTPNotFound,
self.extenstions_resource.delete, None, None)
def test_create_404(self):
self.assertRaises(webob.exc.HTTPNotFound,
self.extenstions_resource.create, None)
class ExtensionMIddlewareTest(unittest.TestCase):
def test_factory(self):
global_config = mock.sentinel.global_config
app = mock.sentinel.app
with mock.patch.object(extensions.ExtensionMiddleware,
'__init__',
mock.Mock(return_value=None)) as init:
factory = extensions.ExtensionMiddleware.factory(global_config)
factory(app)
init.assert_called_with(app, global_config)