API Extensions framework for v3 API Part 2

This is the second patch for the new extension framework
which is only to be used by the Nova v3 API.

- Adds tracking of extensions loaded and allows
  extensions access to this information
- Adds core API functionality as extensions
  - 'server'
    - Adds an entry point that other extensions can
      use to modify the server create arguments without
      having to modify the server extension itself
    - TODO: Will have to add more entry points as other
      extensions are ported. Delaying adding entry points
      now so they can be tested as they are added.
- Adds port of os-keypairs extension
  - This is an example of a controller extension in the new
    framework
  - This is an example of using the server extension entry
    point to add functionality without modify the core API code
  - Ports tests for the os-keypairs extensions
    - Adds v3 API fake specific code for tests

This completes the bulk of the new extension framework. Porting
of the server tests will be done in future changesets as more
of the core API is ported across as the tests are dependent
on multiple core APIs.

Partially implements blueprint v3-api-extension-framework

Change-Id: Ibadb5bbe808c27d2f4afebe65c06a92576397085
This commit is contained in:
Chris Yeoh 2013-05-06 22:39:13 +09:30
parent e06ab58774
commit d7da449eef
12 changed files with 2360 additions and 11 deletions

View File

@ -74,6 +74,7 @@
"compute_extension:instance_actions:events": "rule:admin_api",
"compute_extension:instance_usage_audit_log": "rule:admin_api",
"compute_extension:keypairs": "",
"compute_extension:v3:os-keypairs": "",
"compute_extension:multinic": "",
"compute_extension:networks": "rule:admin_api",
"compute_extension:networks:view": "",

View File

@ -222,17 +222,24 @@ class APIRouterV3(base_wsgi.Router):
"""Simple paste factory, :class:`nova.wsgi.Router` doesn't have one."""
return cls()
def __init__(self):
def __init__(self, init_only=None):
# TODO(cyeoh): bp v3-api-extension-framework. Currently load
# all extensions but eventually should be able to exclude
# based on a config file
def _check_load_extension(ext):
return isinstance(ext.obj, extensions.V3APIExtensionBase)
if (self.init_only is None or ext.obj.alias in
self.init_only) and isinstance(ext.obj,
extensions.V3APIExtensionBase):
return self._register_extension(ext)
else:
return False
self.init_only = init_only
self.api_extension_manager = stevedore.enabled.EnabledExtensionManager(
namespace=self.API_EXTENSION_NAMESPACE,
check_func=_check_load_extension,
invoke_on_load=True)
invoke_on_load=True,
invoke_kwds={"extension_info": self.loaded_extension_info})
mapper = PlainMapper()
self.resources = {}
@ -242,14 +249,17 @@ class APIRouterV3(base_wsgi.Router):
if list(self.api_extension_manager):
# NOTE(cyeoh): Stevedore raises an exception if there are
# no plugins detected. I wonder if this is a bug.
self.api_extension_manager.map(self._register_extensions)
self.api_extension_manager.map(self._register_resources,
mapper=mapper)
self.api_extension_manager.map(self._register_controllers)
super(APIRouterV3, self).__init__(mapper)
def _register_extensions(self, ext):
@property
def loaded_extension_info(self):
raise NotImplementedError
def _register_extension(self, ext):
raise NotImplementedError()
def _register_resources(self, ext, mapper):
@ -281,7 +291,14 @@ class APIRouterV3(base_wsgi.Router):
if resource.parent:
kargs['parent_resource'] = resource.parent
mapper.resource(resource.collection, resource.collection,
# non core-API plugins use the collection name as the
# member name, but the core-API plugins use the
# singular/plural convention for member/collection names
if resource.member_name:
member_name = resource.member_name
else:
member_name = resource.collection
mapper.resource(member_name, resource.collection,
**kargs)
if resource.custom_routes_fn:

View File

@ -30,6 +30,7 @@ from nova.api.openstack.compute import image_metadata
from nova.api.openstack.compute import images
from nova.api.openstack.compute import ips
from nova.api.openstack.compute import limits
from nova.api.openstack.compute import plugins
from nova.api.openstack.compute import server_metadata
from nova.api.openstack.compute import servers
from nova.api.openstack.compute import versions
@ -135,8 +136,13 @@ class APIRouterV3(nova.api.openstack.APIRouterV3):
Routes requests on the OpenStack API to the appropriate controller
and method.
"""
def __init__(self, init_only=None):
self._loaded_extension_info = plugins.LoadedExtensionInfo()
super(APIRouterV3, self).__init__(init_only)
def _register_extensions(self, ext):
pass
# TODO(cyeoh): bp v3-api-extension-framework - Register extension
# information
def _register_extension(self, ext):
return self.loaded_extension_info.register_extension(ext.obj)
@property
def loaded_extension_info(self):
return self._loaded_extension_info

View File

@ -0,0 +1,59 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import exception
from nova.openstack.common import log as logging
LOG = logging.getLogger(__name__)
class LoadedExtensionInfo(object):
"""Keep track of all loaded API extensions."""
def __init__(self):
self.extensions = {}
def register_extension(self, ext):
if not self._check_extension(ext):
return False
alias = ext.alias
LOG.audit(_("Loaded extension %s"), alias)
if alias in self.extensions:
raise exception.NovaException("Found duplicate extension: %s"
% alias)
self.extensions[alias] = ext
return True
def _check_extension(self, extension):
"""Checks for required methods in extension objects."""
try:
LOG.debug(_('Ext name: %s'), extension.name)
LOG.debug(_('Ext alias: %s'), extension.alias)
LOG.debug(_('Ext description: %s'),
' '.join(extension.__doc__.strip().split()))
LOG.debug(_('Ext namespace: %s'), extension.namespace)
LOG.debug(_('Ext version: %i'), extension.version)
except AttributeError as ex:
LOG.exception(_("Exception loading extension: %s"), unicode(ex))
return False
return True
def get_extensions(self):
return self.extensions

View File

@ -0,0 +1,105 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2013 IBM Corp.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob.exc
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
def make_ext(elem):
elem.set('name')
elem.set('namespace')
elem.set('alias')
elem.set('version')
desc = xmlutil.SubTemplateElement(elem, 'description')
desc.text = 'description'
ext_nsmap = {None: xmlutil.XMLNS_COMMON_V10, 'atom': xmlutil.XMLNS_ATOM}
class ExtensionTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('extension', selector='extension')
make_ext(root)
return xmlutil.MasterTemplate(root, 1, nsmap=ext_nsmap)
class ExtensionsTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('extensions')
elem = xmlutil.SubTemplateElement(root, 'extension',
selector='extensions')
make_ext(elem)
return xmlutil.MasterTemplate(root, 1, nsmap=ext_nsmap)
class ExtensionInfoController(object):
def __init__(self, extension_info):
self.extension_info = extension_info
def _translate(self, ext):
ext_data = {}
ext_data['name'] = ext.name
ext_data['alias'] = ext.alias
ext_data['description'] = ext.__doc__
ext_data['namespace'] = ext.namespace
ext_data['version'] = ext.version
return ext_data
@wsgi.serializers(xml=ExtensionsTemplate)
def index(self, req):
sorted_ext_list = sorted(
self.extension_info.get_extensions().iteritems())
extensions = []
for _alias, ext in sorted_ext_list:
extensions.append(self._translate(ext))
return dict(extensions=extensions)
@wsgi.serializers(xml=ExtensionTemplate)
def show(self, req, id):
try:
# NOTE(dprince): the extensions alias is used as the 'id' for show
ext = self.extension_info.get_extensions()[id]
except KeyError:
raise webob.exc.HTTPNotFound()
return dict(extension=self._translate(ext))
class ExtensionInfo(extensions.V3APIExtensionBase):
"""Extension information."""
name = "extensions"
alias = "extensions"
namespace = "http://docs.openstack.org/compute/core/extension_info/api/v3"
version = 1
def get_resources(self):
resources = [
extensions.ResourceExtension(
'extensions', ExtensionInfoController(self.extension_info),
member_name='extension')]
return resources
def get_controller_extensions(self):
return []

View File

@ -0,0 +1,215 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Keypair management extension."""
import webob
import webob.exc
from nova.api.openstack.compute import servers
from nova.api.openstack import extensions
from nova.api.openstack import wsgi
from nova.api.openstack import xmlutil
from nova.compute import api as compute_api
from nova import exception
ALIAS = 'os-keypairs'
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
soft_authorize = extensions.soft_extension_authorizer('compute', 'v3:' + ALIAS)
class KeypairTemplate(xmlutil.TemplateBuilder):
def construct(self):
return xmlutil.MasterTemplate(xmlutil.make_flat_dict('keypair'), 1)
class KeypairsTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('keypairs')
elem = xmlutil.make_flat_dict('keypair', selector='keypairs',
subselector='keypair')
root.append(elem)
return xmlutil.MasterTemplate(root, 1)
class KeypairController(object):
"""Keypair API controller for the OpenStack API."""
def __init__(self):
self.api = compute_api.KeypairAPI()
@wsgi.serializers(xml=KeypairTemplate)
def create(self, req, body):
"""
Create or import keypair.
Sending name will generate a key and return private_key
and fingerprint.
You can send a public_key to add an existing ssh key
params: keypair object with:
name (required) - string
public_key (optional) - string
"""
context = req.environ['nova.context']
authorize(context)
try:
params = body['keypair']
name = params['name']
except KeyError:
msg = _("Invalid request body")
raise webob.exc.HTTPBadRequest(explanation=msg)
try:
if 'public_key' in params:
keypair = self.api.import_key_pair(context,
context.user_id, name,
params['public_key'])
else:
keypair = self.api.create_key_pair(context, context.user_id,
name)
return {'keypair': keypair}
except exception.KeypairLimitExceeded:
msg = _("Quota exceeded, too many key pairs.")
raise webob.exc.HTTPRequestEntityTooLarge(
explanation=msg,
headers={'Retry-After': 0})
except exception.InvalidKeypair:
msg = _("Keypair data is invalid")
raise webob.exc.HTTPBadRequest(explanation=msg)
except exception.KeyPairExists:
msg = _("Key pair '%s' already exists.") % name
raise webob.exc.HTTPConflict(explanation=msg)
def delete(self, req, id):
"""
Delete a keypair with a given name
"""
context = req.environ['nova.context']
authorize(context)
try:
self.api.delete_key_pair(context, context.user_id, id)
except exception.KeypairNotFound:
raise webob.exc.HTTPNotFound()
return webob.Response(status_int=202)
@wsgi.serializers(xml=KeypairTemplate)
def show(self, req, id):
"""Return data for the given key name."""
context = req.environ['nova.context']
authorize(context)
try:
keypair = self.api.get_key_pair(context, context.user_id, id)
except exception.KeypairNotFound:
raise webob.exc.HTTPNotFound()
return {'keypair': keypair}
@wsgi.serializers(xml=KeypairsTemplate)
def index(self, req):
"""
List of keypairs for a user
"""
context = req.environ['nova.context']
authorize(context)
key_pairs = self.api.get_key_pairs(context, context.user_id)
rval = []
for key_pair in key_pairs:
rval.append({'keypair': {
'name': key_pair['name'],
'public_key': key_pair['public_key'],
'fingerprint': key_pair['fingerprint'],
}})
return {'keypairs': rval}
class ServerKeyNameTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('server')
root.set('key_name', 'key_name')
return xmlutil.SlaveTemplate(root, 1)
class ServersKeyNameTemplate(xmlutil.TemplateBuilder):
def construct(self):
root = xmlutil.TemplateElement('servers')
elem = xmlutil.SubTemplateElement(root, 'server', selector='servers')
elem.set('key_name', 'key_name')
return xmlutil.SlaveTemplate(root, 1)
class Controller(servers.Controller):
def _add_key_name(self, req, servers):
for server in servers:
db_server = req.get_db_instance(server['id'])
# server['id'] is guaranteed to be in the cache due to
# the core API adding it in its 'show'/'detail' methods.
server['key_name'] = db_server['key_name']
def _show(self, req, resp_obj):
if 'server' in resp_obj.obj:
resp_obj.attach(xml=ServerKeyNameTemplate())
server = resp_obj.obj['server']
self._add_key_name(req, [server])
@wsgi.extends
def show(self, req, resp_obj, id):
context = req.environ['nova.context']
if soft_authorize(context):
self._show(req, resp_obj)
@wsgi.extends
def detail(self, req, resp_obj):
context = req.environ['nova.context']
if 'servers' in resp_obj.obj and soft_authorize(context):
resp_obj.attach(xml=ServersKeyNameTemplate())
servers = resp_obj.obj['servers']
self._add_key_name(req, servers)
class Keypairs(extensions.V3APIExtensionBase):
"""Keypair Support."""
name = "Keypairs"
alias = ALIAS
namespace = "http://docs.openstack.org/compute/ext/keypairs/api/v3"
version = 1
def get_resources(self):
resources = [
extensions.ResourceExtension('os-keypairs',
KeypairController())]
return resources
def get_controller_extensions(self):
controller = Controller()
extension = extensions.ControllerExtension(self, 'servers', controller)
return [extension]
# use nova.api.extensions.server.extensions entry point to modify
# server create kwargs
def server_create(self, server_dict, create_kwargs):
create_kwargs['key_name'] = server_dict.get('key_name')

File diff suppressed because it is too large Load Diff

View File

@ -297,7 +297,7 @@ class ResourceExtension(object):
def __init__(self, collection, controller=None, parent=None,
collection_actions=None, member_actions=None,
custom_routes_fn=None, inherits=None):
custom_routes_fn=None, inherits=None, member_name=None):
if not collection_actions:
collection_actions = {}
if not member_actions:
@ -309,6 +309,7 @@ class ResourceExtension(object):
self.member_actions = member_actions
self.custom_routes_fn = custom_routes_fn
self.inherits = inherits
self.member_name = member_name
def load_standard_extensions(ext_mgr, logger, path, package, ext_list=None):
@ -410,6 +411,9 @@ class V3APIExtensionBase(object):
"""
__metaclass__ = abc.ABCMeta
def __init__(self, extension_info):
self.extension_info = extension_info
@abc.abstractmethod
def get_resources(self):
"""Return a list of resources extensions.

View File

@ -0,0 +1,378 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 Eldar Nugaev
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from lxml import etree
import webob
from nova.api.openstack.compute.plugins.v3 import keypairs
from nova.api.openstack import wsgi
from nova import db
from nova import exception
from nova.openstack.common import jsonutils
from nova import quota
from nova import test
from nova.tests.api.openstack import fakes
QUOTAS = quota.QUOTAS
def fake_keypair(name):
return {'public_key': 'FAKE_KEY',
'fingerprint': 'FAKE_FINGERPRINT',
'name': name}
def db_key_pair_get_all_by_user(self, user_id):
return [fake_keypair('FAKE')]
def db_key_pair_create(self, keypair):
return keypair
def db_key_pair_destroy(context, user_id, name):
if not (user_id and name):
raise Exception()
def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
class KeypairsTest(test.TestCase):
def setUp(self):
super(KeypairsTest, self).setUp()
self.Controller = keypairs.Controller()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
self.stubs.Set(db, "key_pair_get_all_by_user",
db_key_pair_get_all_by_user)
self.stubs.Set(db, "key_pair_create",
db_key_pair_create)
self.stubs.Set(db, "key_pair_destroy",
db_key_pair_destroy)
self.flags(
osapi_compute_extension=[
'nova.api.openstack.compute.contrib.select_extensions'],
osapi_compute_ext_list=['Keypairs'])
self.app = fakes.wsgi_app_v3(init_only=('os-keypairs', 'servers'))
def test_keypair_list(self):
req = webob.Request.blank('/v3/os-keypairs')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
res_dict = jsonutils.loads(res.body)
response = {'keypairs': [{'keypair': fake_keypair('FAKE')}]}
self.assertEqual(res_dict, response)
def test_keypair_create(self):
body = {'keypair': {'name': 'create_test'}}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
res_dict = jsonutils.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertTrue(len(res_dict['keypair']['private_key']) > 0)
def test_keypair_create_with_empty_name(self):
body = {'keypair': {'name': ''}}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
def test_keypair_create_with_invalid_name(self):
body = {
'keypair': {
'name': 'a' * 256
}
}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
def test_keypair_create_with_non_alphanumeric_name(self):
body = {
'keypair': {
'name': 'test/keypair'
}
}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 400)
def test_keypair_import(self):
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA'
'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4'
'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y'
'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi'
'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj'
'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1'
'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc'
'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB'
'bHkXa6OciiJDvkRzJXzf',
},
}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
# FIXME(ja): sholud we check that public_key was sent to create?
res_dict = jsonutils.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertFalse('private_key' in res_dict['keypair'])
def test_keypair_import_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
return 100
self.stubs.Set(QUOTAS, "count", fake_quotas_count)
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA'
'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4'
'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y'
'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi'
'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj'
'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1'
'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc'
'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB'
'bHkXa6OciiJDvkRzJXzf',
},
}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 413)
def test_keypair_create_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
return 100
self.stubs.Set(QUOTAS, "count", fake_quotas_count)
body = {
'keypair': {
'name': 'create_test',
},
}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 413)
def test_keypair_create_duplicate(self):
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
body = {'keypair': {'name': 'create_duplicate'}}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 409)
def test_keypair_import_bad_key(self):
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-what negative',
},
}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
def test_keypair_delete(self):
req = webob.Request.blank('/v3/os-keypairs/FAKE')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 202)
def test_keypair_get_keypair_not_found(self):
req = webob.Request.blank('/v3/os-keypairs/DOESNOTEXIST')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_keypair_delete_not_found(self):
def db_key_pair_get_not_found(context, user_id, name):
raise exception.KeypairNotFound(user_id=user_id, name=name)
self.stubs.Set(db, "key_pair_get",
db_key_pair_get_not_found)
req = webob.Request.blank('/v3/os-keypairs/WHAT')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_keypair_show(self):
def _db_key_pair_get(context, user_id, name):
return {'name': 'foo', 'public_key': 'XXX', 'fingerprint': 'YYY'}
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v3/os-keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual('foo', res_dict['keypair']['name'])
self.assertEqual('XXX', res_dict['keypair']['public_key'])
self.assertEqual('YYY', res_dict['keypair']['fingerprint'])
def test_keypair_show_not_found(self):
def _db_key_pair_get(context, user_id, name):
raise exception.KeypairNotFound(user_id=user_id, name=name)
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v3/os-keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_show_server(self):
self.stubs.Set(db, 'instance_get',
fakes.fake_instance_get())
req = webob.Request.blank('/v3/servers/1')
req.headers['Content-Type'] = 'application/json'
response = req.get_response(self.app)
self.assertEquals(response.status_int, 200)
res_dict = jsonutils.loads(response.body)
self.assertTrue('key_name' in res_dict['server'])
self.assertEquals(res_dict['server']['key_name'], '')
def test_detail_servers(self):
self.stubs.Set(db, 'instance_get_all_by_filters',
fakes.fake_instance_get_all_by_filters())
req = fakes.HTTPRequest.blank('/v3/servers/detail')
res = req.get_response(self.app)
server_dicts = jsonutils.loads(res.body)['servers']
self.assertEquals(len(server_dicts), 5)
for server_dict in server_dicts:
self.assertTrue('key_name' in server_dict)
self.assertEquals(server_dict['key_name'], '')
def test_keypair_create_with_invalid_keypairBody(self):
body = {'alpha': {'name': 'create_test'}}
req = webob.Request.blank('/v3/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 400)
self.assertEqual(res_dict['badRequest']['message'],
"Invalid request body")
class KeypairsXMLSerializerTest(test.TestCase):
def setUp(self):
super(KeypairsXMLSerializerTest, self).setUp()
self.deserializer = wsgi.XMLDeserializer()
def test_default_serializer(self):
exemplar = dict(keypair=dict(
public_key='fake_public_key',
private_key='fake_private_key',
fingerprint='fake_fingerprint',
user_id='fake_user_id',
name='fake_key_name'))
serializer = keypairs.KeypairTemplate()
text = serializer.serialize(exemplar)
tree = etree.fromstring(text)
self.assertEqual('keypair', tree.tag)
for child in tree:
self.assertTrue(child.tag in exemplar['keypair'])
self.assertEqual(child.text, exemplar['keypair'][child.tag])
def test_index_serializer(self):
exemplar = dict(keypairs=[
dict(keypair=dict(
name='key1_name',
public_key='key1_key',
fingerprint='key1_fingerprint')),
dict(keypair=dict(
name='key2_name',
public_key='key2_key',
fingerprint='key2_fingerprint'))])
serializer = keypairs.KeypairsTemplate()
text = serializer.serialize(exemplar)
tree = etree.fromstring(text)
self.assertEqual('keypairs', tree.tag)
self.assertEqual(len(exemplar['keypairs']), len(tree))
for idx, keypair in enumerate(tree):
self.assertEqual('keypair', keypair.tag)
kp_data = exemplar['keypairs'][idx]['keypair']
for child in keypair:
self.assertTrue(child.tag in kp_data)
self.assertEqual(child.text, kp_data[child.tag])
def test_deserializer(self):
exemplar = dict(keypair=dict(
name='key_name',
public_key='public_key'))
intext = ("<?xml version='1.0' encoding='UTF-8'?>\n"
'<keypair><name>key_name</name>'
'<public_key>public_key</public_key></keypair>')
result = self.deserializer.deserialize(intext)['body']
self.assertEqual(result, exemplar)

View File

@ -100,6 +100,30 @@ def wsgi_app(inner_app_v2=None, fake_auth_context=None,
return mapper
def wsgi_app_v3(inner_app_v3=None, fake_auth_context=None,
use_no_auth=False, ext_mgr=None, init_only=None):
if not inner_app_v3:
inner_app_v3 = compute.APIRouterV3(init_only)
if use_no_auth:
api_v3 = openstack_api.FaultWrapper(auth.NoAuthMiddleware(
limits.RateLimitingMiddleware(inner_app_v3)))
else:
if fake_auth_context is not None:
ctxt = fake_auth_context
else:
ctxt = context.RequestContext('fake', 'fake', auth_token=True)
api_v3 = openstack_api.FaultWrapper(api_auth.InjectContext(ctxt,
limits.RateLimitingMiddleware(inner_app_v3)))
mapper = urlmap.URLMap()
mapper['/v3'] = api_v3
# TODO(cyeoh): bp nova-api-core-as-extensions
# Still need to implement versions for v3 API
# mapper['/'] = openstack_api.FaultWrapper(versions.Versions())
return mapper
def stub_out_key_pair_funcs(stubs, have_key_pair=True):
def key_pair(context, user_id):
return [dict(name='key', public_key='public_key')]

View File

@ -152,6 +152,7 @@ policy_data = """
"compute_extension:instance_actions:events": "is_admin:True",
"compute_extension:instance_usage_audit_log": "",
"compute_extension:keypairs": "",
"compute_extension:v3:os-keypairs": "",
"compute_extension:multinic": "",
"compute_extension:networks": "",
"compute_extension:networks:view": "",

View File

@ -55,6 +55,12 @@ console_scripts =
nova.api.v3.extensions =
fixed_ips = nova.api.openstack.compute.plugins.v3.fixed_ips:FixedIPs
extension_info = nova.api.openstack.compute.plugins.v3.extension_info:ExtensionInfo
servers = nova.api.openstack.compute.plugins.v3.servers:Servers
keypairs = nova.api.openstack.compute.plugins.v3.keypairs:Keypairs
nova.api.v3.extensions.server.create =
keypairs_create = nova.api.openstack.compute.plugins.v3.keypairs:Keypairs
[build_sphinx]
all_files = 1