Merge "Change v3 keypairs API to v2.1"

This commit is contained in:
Jenkins 2014-08-29 13:27:19 +00:00 committed by Gerrit Code Review
commit 8f49b1ed9f
9 changed files with 119 additions and 543 deletions

View File

@ -2,6 +2,12 @@
"keypair": {
"fingerprint": "44:fe:29:6e:23:14:b9:53:5b:65:82:58:1c:fe:5a:c3",
"name": "keypair-6638abdb-c4e8-407c-ba88-c8dd7cc3c4f1",
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC1HTrHCbb9NawNLSV8N6tSa8i637+EC2dA+lsdHHfQlT54t+N0nHhJPlKWDLhc579j87vp6RDFriFJ/smsTnDnf64O12z0kBaJpJPH2zXrBkZFK6q2rmxydURzX/z0yLSCP77SFJ0fdXWH2hMsAusflGyryHGX20n+mZK6mDrxVzGxEz228dwQ5G7Az5OoZDWygH2pqPvKjkifRw0jwUKf3BbkP0QvANACOk26cv16mNFpFJfI1N3OC5lUsZQtKGR01ptJoWijYKccqhkAKuo902tg/qup58J5kflNm7I61sy1mJon6SGqNUSfoQagqtBH6vd/tU1jnlwZ03uUroAL Generated-by-Nova\n"
"public_key": "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQC1HTrHCbb9NawNLSV8N6tSa8i637+EC2dA+lsdHHfQlT54t+N0nHhJPlKWDLhc579j87vp6RDFriFJ/smsTnDnf64O12z0kBaJpJPH2zXrBkZFK6q2rmxydURzX/z0yLSCP77SFJ0fdXWH2hMsAusflGyryHGX20n+mZK6mDrxVzGxEz228dwQ5G7Az5OoZDWygH2pqPvKjkifRw0jwUKf3BbkP0QvANACOk26cv16mNFpFJfI1N3OC5lUsZQtKGR01ptJoWijYKccqhkAKuo902tg/qup58J5kflNm7I61sy1mJon6SGqNUSfoQagqtBH6vd/tU1jnlwZ03uUroAL Generated-by-Nova\n",
"user_id": "fake",
"deleted": false,
"created_at": "2014-05-07T12:06:13.681238",
"updated_at": null,
"deleted_at": null,
"id": 1
}
}
}

View File

@ -175,12 +175,12 @@
"compute_extension:keypairs:show": "",
"compute_extension:keypairs:create": "",
"compute_extension:keypairs:delete": "",
"compute_extension:v3:keypairs:discoverable": "",
"compute_extension:v3:keypairs": "",
"compute_extension:v3:keypairs:index": "",
"compute_extension:v3:keypairs:show": "",
"compute_extension:v3:keypairs:create": "",
"compute_extension:v3:keypairs:delete": "",
"compute_extension:v3:os-keypairs:discoverable": "",
"compute_extension:v3:os-keypairs": "",
"compute_extension:v3:os-keypairs:index": "",
"compute_extension:v3:os-keypairs:show": "",
"compute_extension:v3:os-keypairs:create": "",
"compute_extension:v3:os-keypairs:delete": "",
"compute_extension:v3:os-lock-server:discoverable": "",
"compute_extension:v3:os-lock-server:lock": "rule:admin_or_owner",
"compute_extension:v3:os-lock-server:unlock": "rule:admin_or_owner",

View File

@ -69,7 +69,7 @@ API_V3_CORE_EXTENSIONS = set(['consoles',
'flavor-manage',
'flavors',
'ips',
'keypairs',
'os-keypairs',
'server-metadata',
'servers',
'versions'])

View File

@ -27,7 +27,7 @@ from nova import exception
from nova.i18n import _
ALIAS = 'keypairs'
ALIAS = 'os-keypairs'
authorize = extensions.extension_authorizer('compute', 'v3:' + ALIAS)
soft_authorize = extensions.soft_extension_authorizer('compute', 'v3:' + ALIAS)
@ -48,8 +48,10 @@ class KeypairController(object):
clean[attr] = keypair[attr]
return clean
# TODO(oomichi): Here should be 201(Created) instead of 200 by v2.1
# +microversions because the keypair creation finishes when returning
# a response.
@extensions.expected_errors((400, 403, 409))
@wsgi.response(201)
@validation.schema(keypairs.create)
def create(self, req, body):
"""Create or import keypair.
@ -92,7 +94,10 @@ class KeypairController(object):
except exception.KeyPairExists as exc:
raise webob.exc.HTTPConflict(explanation=exc.format_message())
@wsgi.response(204)
# TODO(oomichi): Here should be 204(No Content) instead of 202 by v2.1
# +microversions because the resource keypair has been deleted completely
# when returning a response.
@wsgi.response(202)
@extensions.expected_errors(404)
def delete(self, req, id):
"""Delete a keypair with a given name."""
@ -113,7 +118,10 @@ class KeypairController(object):
keypair = self.api.get_key_pair(context, context.user_id, id)
except exception.KeypairNotFound as exc:
raise webob.exc.HTTPNotFound(explanation=exc.format_message())
return {'keypair': self._filter_keypair(keypair)}
# TODO(oomichi): It is necessary to filter a response of keypair with
# _filter_keypair() when v2.1+microversions for implementing consistent
# behaviors in this keypair resource.
return {'keypair': keypair}
@extensions.expected_errors(())
def index(self, req):
@ -165,7 +173,7 @@ class Keypairs(extensions.V3APIExtensionBase):
def get_resources(self):
resources = [
extensions.ResourceExtension('keypairs',
extensions.ResourceExtension(ALIAS,
KeypairController())]
return resources

View File

@ -16,7 +16,8 @@
from lxml import etree
import webob
from nova.api.openstack.compute.contrib import keypairs
from nova.api.openstack.compute.contrib import keypairs as keypairs_v2
from nova.api.openstack.compute.plugins.v3 import keypairs as keypairs_v21
from nova.api.openstack import wsgi
from nova import db
from nova import exception
@ -60,11 +61,15 @@ def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
class KeypairsTest(test.TestCase):
class KeypairsTestV21(test.TestCase):
base_url = '/v3'
def _setup_app(self):
self.app = fakes.wsgi_app_v3(init_only=('os-keypairs', 'servers'))
self.app_server = self.app
def setUp(self):
super(KeypairsTest, self).setUp()
self.Controller = keypairs.Controller()
super(KeypairsTestV21, self).setUp()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
@ -78,10 +83,10 @@ class KeypairsTest(test.TestCase):
osapi_compute_extension=[
'nova.api.openstack.compute.contrib.select_extensions'],
osapi_compute_ext_list=['Keypairs'])
self.app = fakes.wsgi_app(init_only=('os-keypairs',))
self._setup_app()
def test_keypair_list(self):
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
res_dict = jsonutils.loads(res.body)
@ -90,7 +95,7 @@ class KeypairsTest(test.TestCase):
def test_keypair_create(self):
body = {'keypair': {'name': 'create_test'}}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -101,7 +106,7 @@ class KeypairsTest(test.TestCase):
self.assertTrue(len(res_dict['keypair']['private_key']) > 0)
def _test_keypair_create_bad_request_case(self, body):
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -157,7 +162,7 @@ class KeypairsTest(test.TestCase):
},
}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -190,7 +195,7 @@ class KeypairsTest(test.TestCase):
},
}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -214,7 +219,7 @@ class KeypairsTest(test.TestCase):
},
}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -228,7 +233,7 @@ class KeypairsTest(test.TestCase):
def test_keypair_create_duplicate(self):
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
body = {'keypair': {'name': 'create_duplicate'}}
req = webob.Request.blank('/v2/fake/os-keypairs')
req = webob.Request.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
@ -240,14 +245,14 @@ class KeypairsTest(test.TestCase):
res_dict['conflictingRequest']['message'])
def test_keypair_delete(self):
req = webob.Request.blank('/v2/fake/os-keypairs/FAKE')
req = webob.Request.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 202)
def test_keypair_get_keypair_not_found(self):
req = webob.Request.blank('/v2/fake/os-keypairs/DOESNOTEXIST')
req = webob.Request.blank(self.base_url + '/os-keypairs/DOESNOTEXIST')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
@ -258,7 +263,7 @@ class KeypairsTest(test.TestCase):
self.stubs.Set(db, "key_pair_get",
db_key_pair_get_not_found)
req = webob.Request.blank('/v2/fake/os-keypairs/WHAT')
req = webob.Request.blank(self.base_url + '/os-keypairs/WHAT')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
@ -270,7 +275,7 @@ class KeypairsTest(test.TestCase):
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v2/fake/os-keypairs/FAKE')
req = webob.Request.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
@ -287,7 +292,7 @@ class KeypairsTest(test.TestCase):
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v2/fake/os-keypairs/FAKE')
req = webob.Request.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
@ -298,9 +303,9 @@ class KeypairsTest(test.TestCase):
fakes.fake_instance_get())
self.stubs.Set(db, 'instance_get_by_uuid',
fakes.fake_instance_get())
req = webob.Request.blank('/v2/fake/servers/1')
req = webob.Request.blank(self.base_url + '/servers/1')
req.headers['Content-Type'] = 'application/json'
response = req.get_response(fakes.wsgi_app(init_only=('servers',)))
response = req.get_response(self.app_server)
self.assertEqual(response.status_int, 200)
res_dict = jsonutils.loads(response.body)
self.assertIn('key_name', res_dict['server'])
@ -309,8 +314,8 @@ class KeypairsTest(test.TestCase):
def test_detail_servers(self):
self.stubs.Set(db, 'instance_get_all_by_filters',
fakes.fake_instance_get_all_by_filters())
req = fakes.HTTPRequest.blank('/v2/fake/servers/detail')
res = req.get_response(fakes.wsgi_app(init_only=('servers',)))
req = fakes.HTTPRequest.blank(self.base_url + '/servers/detail')
res = req.get_response(self.app_server)
server_dicts = jsonutils.loads(res.body)['servers']
self.assertEqual(len(server_dicts), 5)
@ -319,11 +324,13 @@ class KeypairsTest(test.TestCase):
self.assertEqual(server_dict['key_name'], '')
class KeypairPolicyTest(test.TestCase):
class KeypairPolicyTestV21(test.TestCase):
KeyPairController = keypairs_v21.KeypairController()
policy_path = 'compute_extension:v3:os-keypairs'
base_url = '/v3'
def setUp(self):
super(KeypairPolicyTest, self).setUp()
self.KeyPairController = keypairs.KeypairController()
super(KeypairPolicyTestV21, self).setUp()
def _db_key_pair_get(context, user_id, name):
return dict(test_keypair.fake_keypair,
@ -339,78 +346,85 @@ class KeypairPolicyTest(test.TestCase):
db_key_pair_destroy)
def test_keypair_list_fail_policy(self):
rules = {'compute_extension:keypairs:index':
rules = {self.policy_path + ':index':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
self.assertRaises(exception.Forbidden,
self.KeyPairController.index,
req)
def test_keypair_list_pass_policy(self):
rules = {'compute_extension:keypairs:index':
rules = {self.policy_path + ':index':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
res = self.KeyPairController.index(req)
self.assertIn('keypairs', res)
def test_keypair_show_fail_policy(self):
rules = {'compute_extension:keypairs:show':
rules = {self.policy_path + ':show':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
self.assertRaises(exception.Forbidden,
self.KeyPairController.show,
req, 'FAKE')
def test_keypair_show_pass_policy(self):
rules = {'compute_extension:keypairs:show':
rules = {self.policy_path + ':show':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
res = self.KeyPairController.show(req, 'FAKE')
self.assertIn('keypair', res)
def test_keypair_create_fail_policy(self):
body = {'keypair': {'name': 'create_test'}}
rules = {'compute_extension:keypairs:create':
rules = {self.policy_path + ':create':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
self.assertRaises(exception.Forbidden,
self.KeyPairController.create,
req, body)
req, body=body)
def test_keypair_create_pass_policy(self):
body = {'keypair': {'name': 'create_test'}}
rules = {'compute_extension:keypairs:create':
rules = {self.policy_path + ':create':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs')
req.method = 'POST'
res = self.KeyPairController.create(req, body)
res = self.KeyPairController.create(req, body=body)
self.assertIn('keypair', res)
def test_keypair_delete_fail_policy(self):
rules = {'compute_extension:keypairs:delete':
rules = {self.policy_path + ':delete':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'DELETE'
self.assertRaises(exception.Forbidden,
self.KeyPairController.delete,
req, 'FAKE')
def test_keypair_delete_pass_policy(self):
rules = {'compute_extension:keypairs:delete':
rules = {self.policy_path + ':delete':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequest.blank('/v2/fake/os-keypairs/FAKE')
req = fakes.HTTPRequest.blank(self.base_url + '/os-keypairs/FAKE')
req.method = 'DELETE'
res = self.KeyPairController.delete(req, 'FAKE')
self.assertEqual(res.status_int, 202)
# NOTE: on v2.1, http status code is set as wsgi_code of API
# method instead of status_int in a response object.
if isinstance(self.KeyPairController, keypairs_v21.KeypairController):
status_int = self.KeyPairController.delete.wsgi_code
else:
status_int = res.status_int
self.assertEqual(202, status_int)
class KeypairsXMLSerializerTest(test.TestCase):
@ -425,7 +439,7 @@ class KeypairsXMLSerializerTest(test.TestCase):
fingerprint='fake_fingerprint',
user_id='fake_user_id',
name='fake_key_name'))
serializer = keypairs.KeypairTemplate()
serializer = keypairs_v2.KeypairTemplate()
text = serializer.serialize(exemplar)
tree = etree.fromstring(text)
@ -445,7 +459,7 @@ class KeypairsXMLSerializerTest(test.TestCase):
name='key2_name',
public_key='key2_key',
fingerprint='key2_fingerprint'))])
serializer = keypairs.KeypairsTemplate()
serializer = keypairs_v2.KeypairsTemplate()
text = serializer.serialize(exemplar)
tree = etree.fromstring(text)
@ -469,3 +483,17 @@ class KeypairsXMLSerializerTest(test.TestCase):
result = self.deserializer.deserialize(intext)['body']
self.assertEqual(result, exemplar)
class KeypairsTestV2(KeypairsTestV21):
base_url = '/v2/fake'
def _setup_app(self):
self.app = fakes.wsgi_app(init_only=('os-keypairs',))
self.app_server = fakes.wsgi_app(init_only=('servers',))
class KeypairPolicyTestV2(KeypairPolicyTestV21):
KeyPairController = keypairs_v2.KeypairController()
policy_path = 'compute_extension:keypairs'
base_url = '/v2/fake'

View File

@ -1,472 +0,0 @@
# Copyright 2011 Eldar Nugaev
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import webob
from nova.api.openstack.compute.plugins.v3 import keypairs
from nova import db
from nova import exception
from nova.openstack.common import jsonutils
from nova.openstack.common import policy as common_policy
from nova import policy
from nova import quota
from nova import test
from nova.tests.api.openstack import fakes
from nova.tests.objects import test_keypair
QUOTAS = quota.QUOTAS
keypair_data = {
'public_key': 'FAKE_KEY',
'fingerprint': 'FAKE_FINGERPRINT',
}
def fake_keypair(name):
return dict(test_keypair.fake_keypair,
name=name, **keypair_data)
def db_key_pair_get_all_by_user(self, user_id):
return [fake_keypair('FAKE')]
def db_key_pair_create(self, keypair):
return fake_keypair(name=keypair['name'])
def db_key_pair_destroy(context, user_id, name):
if not (user_id and name):
raise Exception()
def db_key_pair_create_duplicate(context, keypair):
raise exception.KeyPairExists(key_name=keypair.get('name', ''))
class KeypairsTest(test.TestCase):
def setUp(self):
super(KeypairsTest, self).setUp()
self.Controller = keypairs.Controller()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
self.stubs.Set(db, "key_pair_get_all_by_user",
db_key_pair_get_all_by_user)
self.stubs.Set(db, "key_pair_create",
db_key_pair_create)
self.stubs.Set(db, "key_pair_destroy",
db_key_pair_destroy)
self.flags(
osapi_compute_extension=[
'nova.api.openstack.compute.contrib.select_extensions'],
osapi_compute_ext_list=['Keypairs'])
self.app = fakes.wsgi_app_v3(init_only=('keypairs', 'servers'))
def test_keypair_list(self):
req = webob.Request.blank('/v3/keypairs')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 200)
res_dict = jsonutils.loads(res.body)
response = {'keypairs': [{'keypair': dict(keypair_data, name='FAKE')}]}
self.assertEqual(res_dict, response)
def test_keypair_create(self):
body = {'keypair': {'name': 'create_test'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 201)
res_dict = jsonutils.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertTrue(len(res_dict['keypair']['private_key']) > 0)
def test_keypair_create_without_keypair(self):
body = {'foo': None}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
jsonutils.loads(res.body)
def test_keypair_create_without_name(self):
body = {'keypair': {'public_key': 'public key'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual("Invalid input for field/attribute keypair. "
"Value: {u'public_key': u'public key'}. "
"'name' is a required property",
res_dict['badRequest']['message'])
def test_keypair_create_with_empty_name(self):
body = {'keypair': {'name': ''}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual("Invalid input for field/attribute name. "
"Value: . u'' is too short",
res_dict['badRequest']['message'])
def test_keypair_create_with_name_too_long(self):
name = 'a' * 256
body = {
'keypair': {
'name': name
}
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
expected_message = "Invalid input for field/attribute name. "\
"Value: %s. u'%s' is too long" % (name, name)
self.assertEqual(expected_message, res_dict['badRequest']['message'])
def test_keypair_create_with_non_alphanumeric_name(self):
body = {
'keypair': {
'name': 'test/keypair'
}
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Invalid input for field/attribute name. Value: test/keypair. "
"u'test/keypair' does not match '^(?! )[a-zA-Z0-9. _-]*(?<! )$'",
res_dict['badRequest']['message'])
def test_keypair_import(self):
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA'
'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4'
'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y'
'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi'
'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj'
'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1'
'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc'
'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB'
'bHkXa6OciiJDvkRzJXzf',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 201)
# FIXME(ja): sholud we check that public_key was sent to create?
res_dict = jsonutils.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertNotIn('private_key', res_dict['keypair'])
def test_keypair_import_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
return 100
self.stubs.Set(QUOTAS, "count", fake_quotas_count)
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznA'
'x9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4'
'a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6Y'
'Qj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymi'
'MZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7lj'
'j5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1'
'GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGc'
'j7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwB'
'bHkXa6OciiJDvkRzJXzf',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 403)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Quota exceeded, too many key pairs.",
res_dict['forbidden']['message'])
def test_keypair_create_quota_limit(self):
def fake_quotas_count(self, context, resource, *args, **kwargs):
return 100
self.stubs.Set(QUOTAS, "count", fake_quotas_count)
body = {
'keypair': {
'name': 'create_test',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 403)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Quota exceeded, too many key pairs.",
res_dict['forbidden']['message'])
def test_keypair_create_duplicate(self):
self.stubs.Set(db, "key_pair_create", db_key_pair_create_duplicate)
body = {'keypair': {'name': 'create_duplicate'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 409)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
"Key pair 'create_duplicate' already exists.",
res_dict['conflictingRequest']['message'])
def test_keypair_import_bad_key(self):
body = {
'keypair': {
'name': 'create_test',
'public_key': 'ssh-what negative',
},
}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 400)
res_dict = jsonutils.loads(res.body)
self.assertEqual(
'Keypair data is invalid: failed to generate fingerprint',
res_dict['badRequest']['message'])
def test_keypair_delete(self):
req = webob.Request.blank('/v3/keypairs/FAKE')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 204)
def test_keypair_get_keypair_not_found(self):
req = webob.Request.blank('/v3/keypairs/DOESNOTEXIST')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_keypair_delete_not_found(self):
def db_key_pair_get_not_found(context, user_id, name):
raise exception.KeypairNotFound(user_id=user_id, name=name)
self.stubs.Set(db, "key_pair_get",
db_key_pair_get_not_found)
req = webob.Request.blank('/v3/keypairs/WHAT')
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_keypair_show(self):
def _db_key_pair_get(context, user_id, name):
return dict(test_keypair.fake_keypair,
name='foo', public_key='XXX', fingerprint='YYY')
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v3/keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
res_dict = jsonutils.loads(res.body)
self.assertEqual(res.status_int, 200)
self.assertEqual('foo', res_dict['keypair']['name'])
self.assertEqual('XXX', res_dict['keypair']['public_key'])
self.assertEqual('YYY', res_dict['keypair']['fingerprint'])
def test_keypair_show_not_found(self):
def _db_key_pair_get(context, user_id, name):
raise exception.KeypairNotFound(user_id=user_id, name=name)
self.stubs.Set(db, "key_pair_get", _db_key_pair_get)
req = webob.Request.blank('/v3/keypairs/FAKE')
req.method = 'GET'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
self.assertEqual(res.status_int, 404)
def test_show_server(self):
self.stubs.Set(db, 'instance_get',
fakes.fake_instance_get())
self.stubs.Set(db, 'instance_get_by_uuid',
fakes.fake_instance_get())
req = webob.Request.blank('/v3/servers/1')
req.headers['Content-Type'] = 'application/json'
response = req.get_response(self.app)
self.assertEqual(response.status_int, 200)
res_dict = jsonutils.loads(response.body)
self.assertIn('key_name', res_dict['server'])
self.assertEqual(res_dict['server']['key_name'], '')
def test_detail_servers(self):
self.stubs.Set(db, 'instance_get_all_by_filters',
fakes.fake_instance_get_all_by_filters())
self.stubs.Set(db, 'instance_get_by_uuid', fakes.fake_instance_get())
req = webob.Request.blank('/v3/servers/detail')
res = req.get_response(self.app)
server_dicts = jsonutils.loads(res.body)['servers']
self.assertEqual(len(server_dicts), 5)
for server_dict in server_dicts:
self.assertIn('key_name', server_dict)
self.assertEqual(server_dict['key_name'], '')
def test_keypair_create_with_invalid_keypair_body(self):
body = {'alpha': {'name': 'create_test'}}
req = webob.Request.blank('/v3/keypairs')
req.method = 'POST'
req.body = jsonutils.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(self.app)
jsonutils.loads(res.body)
self.assertEqual(res.status_int, 400)
class KeypairPolicyTest(test.TestCase):
def setUp(self):
super(KeypairPolicyTest, self).setUp()
self.KeyPairController = keypairs.KeypairController()
def _db_key_pair_get(context, user_id, name):
return dict(test_keypair.fake_keypair,
name='foo', public_key='XXX', fingerprint='YYY')
self.stubs.Set(db, "key_pair_get",
_db_key_pair_get)
self.stubs.Set(db, "key_pair_get_all_by_user",
db_key_pair_get_all_by_user)
self.stubs.Set(db, "key_pair_create",
db_key_pair_create)
self.stubs.Set(db, "key_pair_destroy",
db_key_pair_destroy)
def test_keypair_list_fail_policy(self):
rules = {'compute_extension:v3:keypairs:index':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
self.assertRaises(exception.Forbidden,
self.KeyPairController.index,
req)
def test_keypair_list_pass_policy(self):
rules = {'compute_extension:v3:keypairs:index':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
res = self.KeyPairController.index(req)
self.assertIn('keypairs', res)
def test_keypair_show_fail_policy(self):
rules = {'compute_extension:v3:keypairs:show':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
self.assertRaises(exception.Forbidden,
self.KeyPairController.show,
req, 'FAKE')
def test_keypair_show_pass_policy(self):
rules = {'compute_extension:v3:keypairs:show':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
res = self.KeyPairController.show(req, 'FAKE')
self.assertIn('keypair', res)
def test_keypair_create_fail_policy(self):
rules = {'compute_extension:v3:keypairs:create':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
req.method = 'POST'
self.assertRaises(exception.Forbidden,
self.KeyPairController.create,
req, body={'keypair': {'name': 'create_test'}})
def test_keypair_create_pass_policy(self):
body = {'keypair': {'name': 'create_test'}}
rules = {'compute_extension:v3:keypairs:create':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs')
req.method = 'POST'
res = self.KeyPairController.create(req, body=body)
self.assertIn('keypair', res)
def test_keypair_delete_fail_policy(self):
rules = {'compute_extension:v3:keypairs:delete':
common_policy.parse_rule('role:admin')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
req.method = 'DELETE'
self.assertRaises(exception.Forbidden,
self.KeyPairController.delete,
req, 'FAKE')
def test_keypair_delete_pass_policy(self):
rules = {'compute_extension:v3:keypairs:delete':
common_policy.parse_rule('')}
policy.set_rules(rules)
req = fakes.HTTPRequestV3.blank('/keypairs/FAKE')
req.method = 'DELETE'
self.assertIsNone(self.KeyPairController.delete(req, 'FAKE'))

View File

@ -232,11 +232,11 @@ policy_data = """
"compute_extension:keypairs:create": "",
"compute_extension:keypairs:delete": "",
"compute_extension:v3:keypairs": "",
"compute_extension:v3:keypairs:index": "",
"compute_extension:v3:keypairs:show": "",
"compute_extension:v3:keypairs:create": "",
"compute_extension:v3:keypairs:delete": "",
"compute_extension:v3:os-keypairs": "",
"compute_extension:v3:os-keypairs:index": "",
"compute_extension:v3:os-keypairs:show": "",
"compute_extension:v3:os-keypairs:create": "",
"compute_extension:v3:os-keypairs:delete": "",
"compute_extension:v3:os-lock-server:lock": "",
"compute_extension:v3:os-lock-server:unlock": "",
"compute_extension:v3:os-migrate-server:migrate": "",

View File

@ -2,6 +2,12 @@
"keypair": {
"public_key": "%(public_key)s",
"name": "%(keypair_name)s",
"fingerprint": "%(fingerprint)s"
"fingerprint": "%(fingerprint)s",
"user_id": "fake",
"deleted": false,
"created_at": "%(strtime)s",
"updated_at": null,
"deleted_at": null,
"id": 1
}
}

View File

@ -28,11 +28,11 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
def test_keypairs_post(self, public_key=None):
"""Get api sample of key pairs post request."""
key_name = 'keypair-' + str(uuid.uuid4())
response = self._do_post('keypairs', 'keypairs-post-req',
response = self._do_post('os-keypairs', 'keypairs-post-req',
{'keypair_name': key_name})
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-post-resp', subs, response, 201)
self._verify_response('keypairs-post-resp', subs, response, 200)
# NOTE(maurosr): return the key_name is necessary cause the
# verification returns the label of the last compared information in
# the response, not necessarily the key name.
@ -49,16 +49,16 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
"9FhY+2YiUkpwFOcLImyrxEsYXpD/0d3ac30bNH6Sw9JD9UZHYc"
"pSxsIbECHw== Generated-by-Nova"
}
response = self._do_post('keypairs', 'keypairs-import-post-req',
response = self._do_post('os-keypairs', 'keypairs-import-post-req',
subs)
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-import-post-resp', subs, response, 201)
self._verify_response('keypairs-import-post-resp', subs, response, 200)
def test_keypairs_list(self):
# Get api sample of key pairs list request.
key_name = self.test_keypairs_post()
response = self._do_get('keypairs')
response = self._do_get('os-keypairs')
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-list-resp', subs, response, 200)
@ -66,7 +66,7 @@ class KeyPairsSampleJsonTest(api_sample_base.ApiSampleTestBaseV3):
def test_keypairs_get(self):
# Get api sample of key pairs get request.
key_name = self.test_keypairs_post()
response = self._do_get('keypairs/%s' % key_name)
response = self._do_get('os-keypairs/%s' % key_name)
subs = self._get_regexes()
subs['keypair_name'] = '(%s)' % key_name
self._verify_response('keypairs-get-resp', subs, response, 200)