added tests - list doesn't pass due to unicode issues

This commit is contained in:
Jesse Andrews 2011-08-09 13:00:13 -07:00
parent d5bcc6764b
commit 057b6ad650
3 changed files with 122 additions and 13 deletions

View File

@ -23,19 +23,20 @@ from nova import db
from nova import crypto
from nova import exception
from nova.api.openstack import extensions
import os
import shutil
import tempfile
class KeypairController(object):
""" Keypair API controller for the Openstack API """
# TODO(ja): the keypair crud logic should be in nova.compute.API?
# TODO(ja): both this file and nova.api.ec2.cloud.py have similar logic.
# move the common keypair logic to nova.compute.API?
def _gen_key(self):
"""
Generate a key
"""
# TODO(ja): crypto.generate_key_pair is currently a slow method
# and should probably be moved to a process pool?
private_key, public_key, fingerprint = crypto.generate_key_pair()
return {'private_key': private_key,
'public_key': public_key,
@ -52,7 +53,6 @@ class KeypairController(object):
params: keypair object with:
key_name (required) - string
fingerprint (optional) - string
public_key (optional) - string
"""
@ -72,8 +72,14 @@ class KeypairController(object):
# import if public_key is sent
if 'public_key' in params:
tmpdir = tempfile.mkdtemp()
fn = os.path.join(tmpdir, 'import.pub')
with open(fn, 'w') as pub:
pub.write(params['public_key'])
fingerprint = crypto.generate_fingerprint(fn)
shutil.rmtree(tmpdir)
keypair['public_key'] = params['public_key']
keypair['fingerprint'] = params.get('fingerprint', None)
keypair['fingerprint'] = fingerprint
else:
generated_key = self._gen_key()
keypair['private_key'] = generated_key['private_key']
@ -83,12 +89,12 @@ class KeypairController(object):
db.key_pair_create(context, keypair)
return {'keypair': keypair}
def delete(self, req, name):
def delete(self, req, id):
"""
Delete a keypair with a given name
"""
context = req.environ['nova.context']
db.key_pair_destroy(context, context.user_id, name)
db.key_pair_destroy(context, context.user_id, id)
return exc.HTTPAccepted()
def index(self, req):
@ -99,11 +105,11 @@ class KeypairController(object):
key_pairs = db.key_pair_get_all_by_user(context, context.user_id)
rval = []
for key_pair in key_pairs:
rval.append({
rval.append({'keypair': {
'name': key_pair['name'],
'key_name': key_pair['name'],
'fingerprint': key_pair['fingerprint'],
})
}})
return {'keypairs': rval}

View File

@ -104,6 +104,12 @@ def fetch_ca(project_id=None, chain=True):
return buffer
def generate_fingerprint(public_key):
(out, err) = utils.execute('ssh-keygen', '-q', '-l', '-f', public_key)
fingerprint = out.split(' ')[1]
return fingerprint
def generate_key_pair(bits=1024):
# what is the magic 65537?
@ -111,9 +117,7 @@ def generate_key_pair(bits=1024):
keyfile = os.path.join(tmpdir, 'temp')
utils.execute('ssh-keygen', '-q', '-b', bits, '-N', '',
'-f', keyfile)
(out, err) = utils.execute('ssh-keygen', '-q', '-l', '-f',
'%s.pub' % (keyfile))
fingerprint = out.split(' ')[1]
fingerprint = generate_fingerprint('%s.pub' % (keyfile))
private_key = open(keyfile).read()
public_key = open(keyfile + '.pub').read()

View File

@ -0,0 +1,99 @@
# Copyright 2011 Eldar Nugaev
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import json
import webob
from nova import context
from nova import db
from nova import test
from nova.api.openstack.contrib.keypairs import KeypairController
from nova.tests.api.openstack import fakes
def fake_keypair(name):
return {'public_key': 'FAKE_KEY',
'fingerprint': 'FAKE_FINGERPRINT',
'name': name}
def db_key_pair_get_all_by_user(self, user_id):
return [fake_keypair('FAKE')]
def db_key_pair_create(self, keypair):
pass
def db_key_pair_destroy(context, user_id, key_name):
if not (user_id and key_name):
raise Exception()
class KeypairsTest(test.TestCase):
def setUp(self):
super(KeypairsTest, self).setUp()
self.controller = KeypairController()
fakes.stub_out_networking(self.stubs)
fakes.stub_out_rate_limiting(self.stubs)
self.stubs.Set(db, "key_pair_get_all_by_user",
db_key_pair_get_all_by_user)
self.stubs.Set(db, "key_pair_create",
db_key_pair_create)
self.stubs.Set(db, "key_pair_destroy",
db_key_pair_destroy)
self.context = context.get_admin_context()
def test_keypair_list(self):
req = webob.Request.blank('/v1.1/os-keypairs')
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_dict = json.loads(res.body)
response = {'keypairs': [{'keypair': fake_keypair('FAKE')}]}
self.assertEqual(res_dict, response)
def test_keypair_create(self):
body = {'keypair': {'key_name': 'create_test'}}
req = webob.Request.blank('/v1.1/os-keypairs')
req.method = 'POST'
req.body = json.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
res_dict = json.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertTrue(len(res_dict['keypair']['private_key']) > 0)
def test_keypair_import(self):
body = {'keypair': {'key_name': 'create_test',
'public_key': 'ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDBYIznAx9D7118Q1VKGpXy2HDiKyUTM8XcUuhQpo0srqb9rboUp4a9NmCwpWpeElDLuva707GOUnfaBAvHBwsRXyxHJjRaI6YQj2oLJwqvaSaWUbyT1vtryRqy6J3TecN0WINY71f4uymiMZP0wby4bKBcYnac8KiCIlvkEl0ETjkOGUq8OyWRmn7ljj5SESEUdBP0JnuTFKddWTU/wD6wydeJaUhBTqOlHn0kX1GyqoNTE1UEhcM5ZRWgfUZfTjVyDF2kGj3vJLCJtJ8LoGcj7YaN4uPg1rBle+izwE/tLonRrds+cev8p6krSSrxWOwBbHkXa6OciiJDvkRzJXzf'}}
req = webob.Request.blank('/v1.1/os-keypairs')
req.method = 'POST'
req.body = json.dumps(body)
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 200)
# FIXME(ja): sholud we check that public_key was sent to create?
res_dict = json.loads(res.body)
self.assertTrue(len(res_dict['keypair']['fingerprint']) > 0)
self.assertFalse('private_key' in res_dict['keypair'])
def test_keypair_delete(self):
req = webob.Request.blank('/v1.1/os-keypairs/FAKE')
req.method = 'DELETE'
req.headers['Content-Type'] = 'application/json'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(res.status_int, 202)