Change v3 keypairs API to v2.1

This patch changes v3 keypairs API to v2.1 and makes v2 unit tests
share between v2 and v2.1.

The differences between v2 and v3 are described on the wiki page
https://wiki.openstack.org/wiki/NovaAPIv2tov3 .

Partially implements blueprint v2-on-v3-api

Change-Id: I1fcb93a00816eab2d87f251b7cf8a8992e920308
This commit is contained in:
Ken'ichi Ohmichi 2014-08-27 04:59:13 +00:00
parent 509ed96c6f
commit 590e8abce4
9 changed files with 119 additions and 543 deletions

View File

@ -2,6 +2,12 @@
"keypair": {
"fingerprint": "44:fe:29:6e:23:14:b9:53:5b:65:82:58:1c:fe:5a:c3",
"name": "keypair-6638abdb-c4e8-407c-ba88-c8dd7cc3c4f1",
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC1HTrHCbb9NawNLSV8N6tSa8i637+EC2dA+lsdHHfQlT54t+N0nHhJPlKWDLhc579j87vp6RDFriFJ/smsTnDnf64O12z0kBaJpJPH2zXrBkZFK6q2rmxydURzX/z0yLSCP77SFJ0fdXWH2hMsAusflGyryHGX20n+mZK6mDrxVzGxEz228dwQ5G7Az5OoZDWygH2pqPvKjkifRw0jwUKf3BbkP0QvANACOk26cv16mNFpFJfI1N3OC5lUsZQtKGR01ptJoWijYKccqhkAKuo902tg/qup58J5kflNm7I61sy1mJon6SGqNUSfoQagqtBH6vd/tU1jnlwZ03uUroAL Generated-by-Nova\n"
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC1HTrHCbb9NawNLSV8N6tSa8i637+EC2dA+lsdHHfQlT54t+N0nHhJPlKWDLhc579j87vp6RDFriFJ/smsTnDnf64O12z0kBaJpJPH2zXrBkZFK6q2rmxydURzX/z0yLSCP77SFJ0fdXWH2hMsAusflGyryHGX20n+mZK6mDrxVzGxEz228dwQ5G7Az5OoZDWygH2pqPvKjkifRw0jwUKf3BbkP0QvANACOk26cv16mNFpFJfI1N3OC5lUsZQtKGR01ptJoWijYKccqhkAKuo902tg/qup58J5kflNm7I61sy1mJon6SGqNUSfoQagqtBH6vd/tU1jnlwZ03uUroAL Generated-by-Nova\n",
"user_id": "fake",
"deleted": false,
"created_at": "2014-05-07T12:06:13.681238",
"updated_at": null,
"deleted_at": null,
"id": 1
}
}
}

View File

@ -175,12 +175,12 @@
"compute_extension:keypairs:show": "",
"compute_extension:keypairs:create": "",
"compute_extension:keypairs:delete": "",
"compute_extension:v3:keypairs:discoverable": "",
"compute_extension:v3:keypairs": "",
"compute_extension:v3:keypairs:index": "",
"compute_extension:v3:keypairs:show": "",
"compute_extension:v3:keypairs:create": "",
"compute_extension:v3:keypairs:delete": "",
"compute_extension:v3:os-keypairs:discoverable": "",
"compute_extension:v3:os-keypairs": "",
"compute_extension:v3:os-keypairs:index": "",
"compute_extension:v3:os-keypairs:show": "",
"compute_extension:v3:os-keypairs:create": "",
"compute_extension:v3:os-keypairs:delete": "",
"compute_extension:v3:os-lock-server:discoverable": "",
"compute_extension:v3:os-lock-server:lock": "rule:admin_or_owner",
"compute_extension:v3:os-lock-server:unlock": "rule:admin_or_owner",

View File

@ -69,7 +69,7 @@ API_V3_CORE_EXTENSIONS = set(['consoles',
'flavor-manage',
'flavors',
'ips',
'keypairs',
'os-keypairs',
'server-metadata',
'servers',
'versions'])

View File

@ -27,7 +27,7 @@ from nova import exception
from nova.i18n import _
ALIAS = 'keypairs'
ALIAS = 'os-keypairs'
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
soft_authorize = extensions.soft_extension_authorizer('compute', 'v3:' + ALIAS)
@ -48,8 +48,10 @@ class KeypairController(object):
clean[attr] = keypair[attr]
return clean
# TODO(oomichi): Here should be 201(Created) instead of 200 by v2.1
# +microversions because the keypair creation finishes when returning
# a response.
@extensions.expected_errors((400, 403, 409))
@wsgi.response(201)
@validation.schema(keypairs.create)
def create(self, req, body):
"""Create or import keypair.
@ -92,7 +94,10 @@ class KeypairController(object):
except exception.KeyPairExists as exc:
raise webob.exc.HTTPConflict(explanation=exc.format_message())
@wsgi.response(204)
# TODO(oomichi): Here should be 204(No Content) instead of 202 by v2.1
# +microversions because the resource keypair has been deleted completely
# when returning a response.
@wsgi.response(202)
@extensions.expected_errors(404)
def delete(self, req, id):
"""Delete a keypair with a given name."""
@ -113,7 +118,10 @@ class KeypairController(object):
keypair = self.api.get_key_pair(context, context.user_id, id)
except exception.KeypairNotFound as exc:
raise webob.exc.HTTPNotFound(explanation=exc.format_message())
return {'keypair': self._filter_keypair(keypair)}
# TODO(oomichi): It is necessary to filter a response of keypair with
# _filter_keypair() when v2.1+microversions for implementing consistent
# behaviors in this keypair resource.
return {'keypair': keypair}
@extensions.expected_errors(())
def index(self, req):
@ -165,7 +173,7 @@ class Keypairs(extensions.V3APIExtensionBase):
def get_resources(self):
resources = [
extensions.ResourceExtension('keypairs',
extensions.ResourceExtension(ALIAS,
KeypairController())]
return resources

View File

@ -16,7 +16,8 @@
from lxml import etree
import webob
from nova.api.openstack.compute.contrib import keypairs
from nova.api.openstack.compute.contrib import keypairs as keypairs_v2
from nova.api.openstack.compute.plugins.v3 import keypairs as keypairs_v21
from nova.api.openstack import wsgi
from nova import db
from nova import exception
@ -60,11 +61,15 @@ def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
class KeypairsTest(test.TestCase):
class KeypairsTestV21(test.TestCase):
base_url = '/v3'
def _setup_app(self):
self.app = fakes.wsgi_app_v3(init_only=('os-keypairs', 'servers'))
self.app_server = self.app
def setUp(self):
super(KeypairsTest, self).setUp()
self.Controller = keypairs.Controller()
super(KeypairsTestV21, self).setUp()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
@ -78,10 +83,10 @@ class KeypairsTest(test.TestCase):
osapi_compute_extension=[
'nova.api.openstack.compute.contrib.select_extensions'],
osapi_compute_ext_list=['Keypairs'])
self.app = fakes.wsgi_app(init_only=('os-keypairs',))
self._setup_app()
def test_keypair_list(self):
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
res_dict = jsonutils.loads(res.body)
@ -90,7 +95,7 @@ class KeypairsTest(test.TestCase):
def test_keypair_create(self):
body = {'keypair': {'name': 'create_test'}}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -101,7 +106,7 @@ class KeypairsTest(test.TestCase):
self.assertTrue(len(res_dict['keypair']['private_key']) > 0)
def _test_keypair_create_bad_request_case(self, body):
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -157,7 +162,7 @@ class KeypairsTest(test.TestCase):
},
}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -190,7 +195,7 @@ class KeypairsTest(test.TestCase):
},
}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -214,7 +219,7 @@ class KeypairsTest(test.TestCase):
},
}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -228,7 +233,7 @@ class KeypairsTest(test.TestCase):
def test_keypair_create_duplicate(self):
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
body = {'keypair': {'name': 'create_duplicate'}}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -240,14 +245,14 @@ class KeypairsTest(test.TestCase):
res_dict['conflictingRequest']['message'])
def test_keypair_delete(self):
req = webob.Request.blank('/v2/fake/os-keypairs/FAKE')
req = webob.Request.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 202)
def test_keypair_get_keypair_not_found(self):
req = webob.Request.blank('/v2/fake/os-keypairs/DOESNOTEXIST')
req = webob.Request.blank(self.base_url + '/os-keypairs/DOESNOTEXIST')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
@ -258,7 +263,7 @@ class KeypairsTest(test.TestCase):
self.stubs.Set(db, "key_pair_get",
db_key_pair_get_not_found)
req = webob.Request.blank('/v2/fake/os-keypairs/WHAT')
req = webob.Request.blank(self.base_url + '/os-keypairs/WHAT')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
@ -270,7 +275,7 @@ class KeypairsTest(test.TestCase):
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v2/fake/os-keypairs/FAKE')
req = webob.Request.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
@ -287,7 +292,7 @@ class KeypairsTest(test.TestCase):
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v2/fake/os-keypairs/FAKE')
req = webob.Request.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
@ -298,9 +303,9 @@ class KeypairsTest(test.TestCase):
fakes.fake_instance_get())
self.stubs.Set(db, 'instance_get_by_uuid',
fakes.fake_instance_get())
req = webob.Request.blank('/v2/fake/servers/1')
req = webob.Request.blank(self.base_url + '/servers/1')
req.headers['Content-Type'] = 'application/json'
response = req.get_response(fakes.wsgi_app(init_only=('servers',)))
response = req.get_response(self.app_server)
self.assertEqual(response.status_int, 200)
res_dict = jsonutils.loads(response.body)
self.assertIn('key_name', res_dict['server'])
@ -309,8 +314,8 @@ class KeypairsTest(test.TestCase):
def test_detail_servers(self):
self.stubs.Set(db, 'instance_get_all_by_filters',
fakes.fake_instance_get_all_by_filters())
req = fakes.HTTPRequest.blank('/v2/fake/servers/detail')
res = req.get_response(fakes.wsgi_app(init_only=('servers',)))
req = fakes.HTTPRequest.blank(self.base_url + '/servers/detail')
res = req.get_response(self.app_server)
server_dicts = jsonutils.loads(res.body)['servers']
self.assertEqual(len(server_dicts), 5)
@ -319,11 +324,13 @@ class KeypairsTest(test.TestCase):
self.assertEqual(server_dict['key_name'], '')
class KeypairPolicyTest(test.TestCase):
class KeypairPolicyTestV21(test.TestCase):
KeyPairController = keypairs_v21.KeypairController()
policy_path = 'compute_extension:v3:os-keypairs'
base_url = '/v3'
def setUp(self):
super(KeypairPolicyTest, self).setUp()
self.KeyPairController = keypairs.KeypairController()
super(KeypairPolicyTestV21, self).setUp()
def _db_key_pair_get(context, user_id, name):
return dict(test_keypair.fake_keypair,
@ -339,78 +346,85 @@ class KeypairPolicyTest(test.TestCase):
db_key_pair_destroy)
def test_keypair_list_fail_policy(self):
rules = {'compute_extension:keypairs:index':
rules = {self.policy_path + ':index':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
self.assertRaises(exception.Forbidden,
self.KeyPairController.index,
req)
def test_keypair_list_pass_policy(self):
rules = {'compute_extension:keypairs:index':
rules = {self.policy_path + ':index':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
res = self.KeyPairController.index(req)
self.assertIn('keypairs', res)
def test_keypair_show_fail_policy(self):
rules = {'compute_extension:keypairs:show':
rules = {self.policy_path + ':show':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
self.assertRaises(exception.Forbidden,
self.KeyPairController.show,
req, 'FAKE')
def test_keypair_show_pass_policy(self):
rules = {'compute_extension:keypairs:show':
rules = {self.policy_path + ':show':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
res = self.KeyPairController.show(req, 'FAKE')
self.assertIn('keypair', res)
def test_keypair_create_fail_policy(self):
body = {'keypair': {'name': 'create_test'}}
rules = {'compute_extension:keypairs:create':
rules = {self.policy_path + ':create':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
self.assertRaises(exception.Forbidden,
self.KeyPairController.create,
req, body)
req, body=body)
def test_keypair_create_pass_policy(self):
body = {'keypair': {'name': 'create_test'}}
rules = {'compute_extension:keypairs:create':
rules = {self.policy_path + ':create':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
res = self.KeyPairController.create(req, body)
res = self.KeyPairController.create(req, body=body)
self.assertIn('keypair', res)
def test_keypair_delete_fail_policy(self):
rules = {'compute_extension:keypairs:delete':
rules = {self.policy_path + ':delete':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'DELETE'
self.assertRaises(exception.Forbidden,
self.KeyPairController.delete,
req, 'FAKE')
def test_keypair_delete_pass_policy(self):
rules = {'compute_extension:keypairs:delete':
rules = {self.policy_path + ':delete':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'DELETE'
res = self.KeyPairController.delete(req, 'FAKE')
self.assertEqual(res.status_int, 202)
# NOTE: on v2.1, http status code is set as wsgi_code of API
# method instead of status_int in a response object.
if isinstance(self.KeyPairController, keypairs_v21.KeypairController):
status_int = self.KeyPairController.delete.wsgi_code
else:
status_int = res.status_int
self.assertEqual(202, status_int)
class KeypairsXMLSerializerTest(test.TestCase):
@ -425,7 +439,7 @@ class KeypairsXMLSerializerTest(test.TestCase):
fingerprint='fake_fingerprint',
user_id='fake_user_id',
name='fake_key_name'))
serializer = keypairs.KeypairTemplate()
serializer = keypairs_v2.KeypairTemplate()
text = serializer.serialize(exemplar)
tree = etree.fromstring(text)
@ -445,7 +459,7 @@ class KeypairsXMLSerializerTest(test.TestCase):
name='key2_name',
public_key='key2_key',
fingerprint='key2_fingerprint'))])
serializer = keypairs.KeypairsTemplate()
serializer = keypairs_v2.KeypairsTemplate()
text = serializer.serialize(exemplar)
tree = etree.fromstring(text)
@ -469,3 +483,17 @@ class KeypairsXMLSerializerTest(test.TestCase):
result = self.deserializer.deserialize(intext)['body']
self.assertEqual(result, exemplar)
class KeypairsTestV2(KeypairsTestV21):
base_url = '/v2/fake'
def _setup_app(self):
self.app = fakes.wsgi_app(init_only=('os-keypairs',))
self.app_server = fakes.wsgi_app(init_only=('servers',))
class KeypairPolicyTestV2(KeypairPolicyTestV21):
KeyPairController = keypairs_v2.KeypairController()
policy_path = 'compute_extension:keypairs'
base_url = '/v2/fake'

View File

@ -1,472 +0,0 @@
# Copyright 2011 Eldar Nugaev
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from nova.api.openstack.compute.plugins.v3 import keypairs
from nova import db
from nova import exception
from nova.openstack.common import jsonutils
from nova.openstack.common import policy as common_policy
from nova import policy
from nova import quota
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests.objects import test_keypair
QUOTAS = quota.QUOTAS
keypair_data = {
'public_key': 'FAKE_KEY',
'fingerprint': 'FAKE_FINGERPRINT',
}
def fake_keypair(name):
return dict(test_keypair.fake_keypair,
name=name, **keypair_data)
def db_key_pair_get_all_by_user(self, user_id):
return [fake_keypair('FAKE')]
def db_key_pair_create(self, keypair):
return fake_keypair(name=keypair['name'])
def db_key_pair_destroy(context, user_id, name):
if not (user_id and name):
raise Exception()
def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
class KeypairsTest(test.TestCase):
def setUp(self):
super(KeypairsTest, self).setUp()
self.Controller = keypairs.Controller()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
self.stubs.Set(db, "key_pair_get_all_by_user",
db_key_pair_get_all_by_user)
self.stubs.Set(db, "key_pair_create",
db_key_pair_create)
self.stubs.Set(db, "key_pair_destroy",
db_key_pair_destroy)
self.flags(
osapi_compute_extension=[
'nova.api.openstack.compute.contrib.select_extensions'],
osapi_compute_ext_list=['Keypairs'])
self.app = fakes.wsgi_app_v3(init_only=('keypairs', 'servers'))
def test_keypair_list(self):
req = webob.Request.blank('/v3/keypairs')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
res_dict = jsonutils.loads(res.body)
response = {'keypairs': [{'keypair': dict(keypair_data, name='FAKE')}]}
self.assertEqual(res_dict, response)
def test_keypair_create(self):
body = {'keypair': {'name': 'create_test'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 201)
res_dict = jsonutils.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertTrue(len(res_dict['keypair']['private_key']) > 0)
def test_keypair_create_without_keypair(self):
body = {'foo': None}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
jsonutils.loads(res.body)
def test_keypair_create_without_name(self):
body = {'keypair': {'public_key': 'public key'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual("Invalid input for field/attribute keypair. "
"Value: {u'public_key': u'public key'}. "
"'name' is a required property",
res_dict['badRequest']['message'])
def test_keypair_create_with_empty_name(self):
body = {'keypair': {'name': ''}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual("Invalid input for field/attribute name. "
"Value: . u'' is too short",
res_dict['badRequest']['message'])
def test_keypair_create_with_name_too_long(self):
name = 'a' * 256
body = {
'keypair': {
'name': name
}
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
expected_message = "Invalid input for field/attribute name. "\
"Value: %s. u'%s' is too long" % (name, name)
self.assertEqual(expected_message, res_dict['badRequest']['message'])
def test_keypair_create_with_non_alphanumeric_name(self):
body = {
'keypair': {
'name': 'test/keypair'
}
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Invalid input for field/attribute name. Value: test/keypair. "
"u'test/keypair' does not match '^(?! )[a-zA-Z0-9. _-]*(?<! )$'",
res_dict['badRequest']['message'])
def test_keypair_import(self):
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA'
'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4'
'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y'
'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi'
'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj'
'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1'
'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc'
'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB'
'bHkXa6OciiJDvkRzJXzf',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 201)
# FIXME(ja): sholud we check that public_key was sent to create?
res_dict = jsonutils.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertNotIn('private_key', res_dict['keypair'])
def test_keypair_import_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
return 100
self.stubs.Set(QUOTAS, "count", fake_quotas_count)
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA'
'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4'
'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y'
'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi'
'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj'
'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1'
'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc'
'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB'
'bHkXa6OciiJDvkRzJXzf',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 403)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Quota exceeded, too many key pairs.",
res_dict['forbidden']['message'])
def test_keypair_create_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
return 100
self.stubs.Set(QUOTAS, "count", fake_quotas_count)
body = {
'keypair': {
'name': 'create_test',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 403)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Quota exceeded, too many key pairs.",
res_dict['forbidden']['message'])
def test_keypair_create_duplicate(self):
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
body = {'keypair': {'name': 'create_duplicate'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 409)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Key pair 'create_duplicate' already exists.",
res_dict['conflictingRequest']['message'])
def test_keypair_import_bad_key(self):
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-what negative',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
'Keypair data is invalid: failed to generate fingerprint',
res_dict['badRequest']['message'])
def test_keypair_delete(self):
req = webob.Request.blank('/v3/keypairs/FAKE')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 204)
def test_keypair_get_keypair_not_found(self):
req = webob.Request.blank('/v3/keypairs/DOESNOTEXIST')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_keypair_delete_not_found(self):
def db_key_pair_get_not_found(context, user_id, name):
raise exception.KeypairNotFound(user_id=user_id, name=name)
self.stubs.Set(db, "key_pair_get",
db_key_pair_get_not_found)
req = webob.Request.blank('/v3/keypairs/WHAT')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_keypair_show(self):
def _db_key_pair_get(context, user_id, name):
return dict(test_keypair.fake_keypair,
name='foo', public_key='XXX', fingerprint='YYY')
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v3/keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual('foo', res_dict['keypair']['name'])
self.assertEqual('XXX', res_dict['keypair']['public_key'])
self.assertEqual('YYY', res_dict['keypair']['fingerprint'])
def test_keypair_show_not_found(self):
def _db_key_pair_get(context, user_id, name):
raise exception.KeypairNotFound(user_id=user_id, name=name)
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v3/keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_show_server(self):
self.stubs.Set(db, 'instance_get',
fakes.fake_instance_get())
self.stubs.Set(db, 'instance_get_by_uuid',
fakes.fake_instance_get())
req = webob.Request.blank('/v3/servers/1')
req.headers['Content-Type'] = 'application/json'
response = req.get_response(self.app)
self.assertEqual(response.status_int, 200)
res_dict = jsonutils.loads(response.body)
self.assertIn('key_name', res_dict['server'])
self.assertEqual(res_dict['server']['key_name'], '')
def test_detail_servers(self):
self.stubs.Set(db, 'instance_get_all_by_filters',
fakes.fake_instance_get_all_by_filters())
self.stubs.Set(db, 'instance_get_by_uuid', fakes.fake_instance_get())
req = webob.Request.blank('/v3/servers/detail')
res = req.get_response(self.app)
server_dicts = jsonutils.loads(res.body)['servers']
self.assertEqual(len(server_dicts), 5)
for server_dict in server_dicts:
self.assertIn('key_name', server_dict)
self.assertEqual(server_dict['key_name'], '')
def test_keypair_create_with_invalid_keypair_body(self):
body = {'alpha': {'name': 'create_test'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
jsonutils.loads(res.body)
self.assertEqual(res.status_int, 400)
class KeypairPolicyTest(test.TestCase):
def setUp(self):
super(KeypairPolicyTest, self).setUp()
self.KeyPairController = keypairs.KeypairController()
def _db_key_pair_get(context, user_id, name):
return dict(test_keypair.fake_keypair,
name='foo', public_key='XXX', fingerprint='YYY')
self.stubs.Set(db, "key_pair_get",
_db_key_pair_get)
self.stubs.Set(db, "key_pair_get_all_by_user",
db_key_pair_get_all_by_user)
self.stubs.Set(db, "key_pair_create",
db_key_pair_create)
self.stubs.Set(db, "key_pair_destroy",
db_key_pair_destroy)
def test_keypair_list_fail_policy(self):
rules = {'compute_extension:v3:keypairs:index':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
self.assertRaises(exception.Forbidden,
self.KeyPairController.index,
req)
def test_keypair_list_pass_policy(self):
rules = {'compute_extension:v3:keypairs:index':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
res = self.KeyPairController.index(req)
self.assertIn('keypairs', res)
def test_keypair_show_fail_policy(self):
rules = {'compute_extension:v3:keypairs:show':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
self.assertRaises(exception.Forbidden,
self.KeyPairController.show,
req, 'FAKE')
def test_keypair_show_pass_policy(self):
rules = {'compute_extension:v3:keypairs:show':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
res = self.KeyPairController.show(req, 'FAKE')
self.assertIn('keypair', res)
def test_keypair_create_fail_policy(self):
rules = {'compute_extension:v3:keypairs:create':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
req.method = 'POST'
self.assertRaises(exception.Forbidden,
self.KeyPairController.create,
req, body={'keypair': {'name': 'create_test'}})
def test_keypair_create_pass_policy(self):
body = {'keypair': {'name': 'create_test'}}
rules = {'compute_extension:v3:keypairs:create':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
req.method = 'POST'
res = self.KeyPairController.create(req, body=body)
self.assertIn('keypair', res)
def test_keypair_delete_fail_policy(self):
rules = {'compute_extension:v3:keypairs:delete':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
req.method = 'DELETE'
self.assertRaises(exception.Forbidden,
self.KeyPairController.delete,
req, 'FAKE')
def test_keypair_delete_pass_policy(self):
rules = {'compute_extension:v3:keypairs:delete':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
req.method = 'DELETE'
self.assertIsNone(self.KeyPairController.delete(req, 'FAKE'))

View File

@ -232,11 +232,11 @@ policy_data = """
"compute_extension:keypairs:create": "",
"compute_extension:keypairs:delete": "",
"compute_extension:v3:keypairs": "",
"compute_extension:v3:keypairs:index": "",
"compute_extension:v3:keypairs:show": "",
"compute_extension:v3:keypairs:create": "",
"compute_extension:v3:keypairs:delete": "",
"compute_extension:v3:os-keypairs": "",
"compute_extension:v3:os-keypairs:index": "",
"compute_extension:v3:os-keypairs:show": "",
"compute_extension:v3:os-keypairs:create": "",
"compute_extension:v3:os-keypairs:delete": "",
"compute_extension:v3:os-lock-server:lock": "",
"compute_extension:v3:os-lock-server:unlock": "",
"compute_extension:v3:os-migrate-server:migrate": "",

View File

@ -2,6 +2,12 @@
"keypair": {
"public_key": "%(public_key)s",
"name": "%(keypair_name)s",
"fingerprint": "%(fingerprint)s"
"fingerprint": "%(fingerprint)s",
"user_id": "fake",
"deleted": false,
"created_at": "%(strtime)s",
"updated_at": null,
"deleted_at": null,
"id": 1
}
}

View File

@ -28,11 +28,11 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
def test_keypairs_post(self, public_key=None):
"""Get api sample of key pairs post request."""
key_name = 'keypair-' + str(uuid.uuid4())
response = self._do_post('keypairs', 'keypairs-post-req',
response = self._do_post('os-keypairs', 'keypairs-post-req',
{'keypair_name': key_name})
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-post-resp', subs, response, 201)
self._verify_response('keypairs-post-resp', subs, response, 200)
# NOTE(maurosr): return the key_name is necessary cause the
# verification returns the label of the last compared information in
# the response, not necessarily the key name.
@ -49,16 +49,16 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
"9FhY+2YiUkpwFOcLImyrxEsYXpD/0d3ac30bNH6Sw9JD9UZHYc"
"pSxsIbECHw== Generated-by-Nova"
}
response = self._do_post('keypairs', 'keypairs-import-post-req',
response = self._do_post('os-keypairs', 'keypairs-import-post-req',
subs)
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-import-post-resp', subs, response, 201)
self._verify_response('keypairs-import-post-resp', subs, response, 200)
def test_keypairs_list(self):
# Get api sample of key pairs list request.
key_name = self.test_keypairs_post()
response = self._do_get('keypairs')
response = self._do_get('os-keypairs')
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-list-resp', subs, response, 200)
@ -66,7 +66,7 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
def test_keypairs_get(self):
# Get api sample of key pairs get request.
key_name = self.test_keypairs_post()
response = self._do_get('keypairs/%s' % key_name)
response = self._do_get('os-keypairs/%s' % key_name)
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-get-resp', subs, response, 200)