Adds support for x509 certificates as keypairs

Adds new parameter --key-type for novaclient for creating x509
certificates as keypairs. If no --key-type is specified, a ssh
keypair is created, for backwards compatibility.
Adds 'Type' column for keypair-list, displaying the keypair type.

This commit will have to merge after:
https://review.openstack.org/#/c/140313

Depends-On: I215662f2f92a01921a866c3218031787a9eaf915

Implements: blueprint keypair-x509-certificates

Co-Authored-By: Andrey Kurilin <andr.kurilin@gmail.com>
Co-Authored-By: Alex Xu <hejie.xu@intel.com>
Change-Id: I12bb13e24b660ffb6da0e5be275acbba7453d011
This commit is contained in:
Claudiu Belu 2014-06-26 09:30:10 -04:00 committed by Andrey Kurilin
parent 02889515a5
commit 4a812d953b
7 changed files with 315 additions and 23 deletions

View File

@ -20,4 +20,4 @@ from novaclient import api_versions
__version__ = pbr.version.VersionInfo('python-novaclient').version_string()
API_MIN_VERSION = api_versions.APIVersion("2.1")
API_MAX_VERSION = api_versions.APIVersion("2.1")
API_MAX_VERSION = api_versions.APIVersion("2.2")

View File

@ -0,0 +1,49 @@
# Copyright 2015 Cloudbase Solutions
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def get_x509_cert_and_fingerprint():
fingerprint = "a1:6f:6d:ea:a6:36:d0:3a:c6:eb:b6:ee:07:94:3e:2a:90:98:2b:c9"
certif = (
"-----BEGIN CERTIFICATE-----\n"
"MIIDIjCCAgqgAwIBAgIJAIE8EtWfZhhFMA0GCSqGSIb3DQEBCwUAMCQxIjAgBgNV\n"
"BAMTGWNsb3VkYmFzZS1pbml0LXVzZXItMTM1NTkwHhcNMTUwMTI5MTgyMzE4WhcN\n"
"MjUwMTI2MTgyMzE4WjAkMSIwIAYDVQQDExljbG91ZGJhc2UtaW5pdC11c2VyLTEz\n"
"NTU5MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEAv4lv95ofkXLIbALU\n"
"UEb1f949TYNMUvMGNnLyLgGOY+D61TNG7RZn85cRg9GVJ7KDjSLN3e3LwH5rgv5q\n"
"pU+nM/idSMhG0CQ1lZeExTsMEJVT3bG7LoU5uJ2fJSf5+hA0oih2M7/Kap5ggHgF\n"
"h+h8MWvDC9Ih8x1aadkk/OEmJsTrziYm0C/V/FXPHEuXfZn8uDNKZ/tbyfI6hwEj\n"
"nLz5Zjgg29n6tIPYMrnLNDHScCwtNZOcnixmWzsxCt1bxsAEA/y9gXUT7xWUf52t\n"
"2+DGQbLYxo0PHjnPf3YnFXNavfTt+4c7ZdHhOQ6ZA8FGQ2LJHDHM1r2/8lK4ld2V\n"
"qgNTcQIDAQABo1cwVTATBgNVHSUEDDAKBggrBgEFBQcDAjA+BgNVHREENzA1oDMG\n"
"CisGAQQBgjcUAgOgJQwjY2xvdWRiYXNlLWluaXQtdXNlci0xMzU1OUBsb2NhbGhv\n"
"c3QwDQYJKoZIhvcNAQELBQADggEBAHHX/ZUOMR0ZggQnfXuXLIHWlffVxxLOV/bE\n"
"7JC/dtedHqi9iw6sRT5R6G1pJo0xKWr2yJVDH6nC7pfxCFkby0WgVuTjiu6iNRg2\n"
"4zNJd8TGrTU+Mst+PPJFgsxrAY6vjwiaUtvZ/k8PsphHXu4ON+oLurtVDVgog7Vm\n"
"fQCShx434OeJj1u8pb7o2WyYS5nDVrHBhlCAqVf2JPKu9zY+i9gOG2kimJwH7fJD\n"
"xXpMIwAQ+flwlHR7OrE0L8TNcWwKPRAY4EPcXrT+cWo1k6aTqZDSK54ygW2iWtni\n"
"ZBcstxwcB4GIwnp1DrPW9L2gw5eLe1Sl6wdz443TW8K/KPV9rWQ=\n"
"-----END CERTIFICATE-----\n")
return certif, fingerprint
def get_ssh_pub_key_and_fingerprint():
fingerprint = "1e:2c:9b:56:79:4b:45:77:f9:ca:7a:98:2c:b0:d5:3c"
public_key = ("ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAAAgQDx8nkQv/zgGg"
"B4rMYmIf+6A4l6Rr+o/6lHBQdW5aYd44bd8JttDCE/F/pNRr0l"
"RE+PiqSPO8nDPHw0010JeMH9gYgnnFlyY3/OcJ02RhIPyyxYpv"
"9FhY+2YiUkpwFOcLImyrxEsYXpD/0d3ac30bNH6Sw9JD9UZHYc"
"pSxsIbECHw== Generated-by-Nova")
return public_key, fingerprint

View File

@ -0,0 +1,125 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import tempfile
import uuid
from tempest_lib import exceptions
from novaclient.tests.functional import base
from novaclient.tests.functional import fake_crypto
class TestKeypairsNovaClient(base.ClientTestBase):
"""Keypairs functional tests.
"""
def _serialize_kwargs(self, kwargs):
kwargs_pairs = ['--%(key)s %(val)s' % {'key': key.replace('_', '-'),
'val': val}
for key, val in kwargs.items()]
return " ".join(kwargs_pairs)
def _create_keypair(self, **kwargs):
key_name = self._raw_create_keypair(**kwargs)
self.addCleanup(self.nova, 'keypair-delete %s' % key_name)
return key_name
def _raw_create_keypair(self, **kwargs):
key_name = 'keypair-' + str(uuid.uuid4())
kwargs_str = self._serialize_kwargs(kwargs)
self.nova('keypair-add %s %s' % (kwargs_str, key_name))
return key_name
def _show_keypair(self, key_name):
return self.nova('keypair-show %s' % key_name)
def _list_keypairs(self):
return self.nova('keypair-list')
def _delete_keypair(self, key_name):
self.nova('keypair-delete %s' % key_name)
def _create_public_key_file(self, public_key):
pubfile = tempfile.mkstemp()[1]
with open(pubfile, 'w') as f:
f.write(public_key)
return pubfile
def test_create_keypair(self):
key_name = self._create_keypair()
keypair = self._show_keypair(key_name)
self.assertIn(key_name, keypair)
return keypair
def _test_import_keypair(self, fingerprint, **create_kwargs):
key_name = self._create_keypair(**create_kwargs)
keypair = self._show_keypair(key_name)
self.assertIn(key_name, keypair)
self.assertIn(fingerprint, keypair)
return keypair
def test_import_keypair(self):
pub_key, fingerprint = fake_crypto.get_ssh_pub_key_and_fingerprint()
pub_key_file = self._create_public_key_file(pub_key)
self._test_import_keypair(fingerprint, pub_key=pub_key_file)
def test_list_keypair(self):
key_name = self._create_keypair()
keypairs = self._list_keypairs()
self.assertIn(key_name, keypairs)
def test_delete_keypair(self):
key_name = self._raw_create_keypair()
keypair = self._show_keypair(key_name)
self.assertIsNotNone(keypair)
self._delete_keypair(key_name)
# keypair-show should fail if no keypair with given name is found.
self.assertRaises(exceptions.CommandFailed,
self._show_keypair, key_name)
class TestKeypairsNovaClientV22(TestKeypairsNovaClient):
"""Keypairs functional tests for v2.2 nova-api microversion.
"""
def nova(self, *args, **kwargs):
return self.cli_clients.nova(flags='--os-compute-api-version 2.2 '
'--service-type computev21',
*args, **kwargs)
def test_create_keypair(self):
keypair = super(TestKeypairsNovaClientV22, self).test_create_keypair()
self.assertIn('ssh', keypair)
def test_create_keypair_x509(self):
key_name = self._create_keypair(key_type='x509')
keypair = self._show_keypair(key_name)
self.assertIn(key_name, keypair)
self.assertIn('x509', keypair)
def test_import_keypair(self):
pub_key, fingerprint = fake_crypto.get_ssh_pub_key_and_fingerprint()
pub_key_file = self._create_public_key_file(pub_key)
keypair = self._test_import_keypair(fingerprint, pub_key=pub_key_file)
self.assertIn('ssh', keypair)
def test_import_keypair_x509(self):
certif, fingerprint = fake_crypto.get_x509_cert_and_fingerprint()
pub_key_file = self._create_public_key_file(certif)
keypair = self._test_import_keypair(fingerprint, key_type='x509',
pub_key=pub_key_file)
self.assertIn('x509', keypair)

View File

@ -11,6 +11,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from novaclient import api_versions
from novaclient.tests.unit.fixture_data import client
from novaclient.tests.unit.fixture_data import keypairs as data
from novaclient.tests.unit import utils
@ -54,12 +55,49 @@ class KeypairsTest(utils.FixturedTestCase):
self.cs.keypairs.delete(kp)
self.assert_called('DELETE', '/%s/test' % self.keypair_prefix)
class KeypairsV2TestCase(KeypairsTest):
def setUp(self):
super(KeypairsV2TestCase, self).setUp()
self.cs.api_version = api_versions.APIVersion("2.0")
def test_create_keypair(self):
kp = self.cs.keypairs.create("foo")
self.assert_called('POST', '/%s' % self.keypair_prefix)
name = "foo"
kp = self.cs.keypairs.create(name)
self.assert_called('POST', '/%s' % self.keypair_prefix,
body={'keypair': {'name': name}})
self.assertIsInstance(kp, keypairs.Keypair)
def test_import_keypair(self):
kp = self.cs.keypairs.create("foo", "fake-public-key")
self.assert_called('POST', '/%s' % self.keypair_prefix)
name = "foo"
pub_key = "fake-public-key"
kp = self.cs.keypairs.create(name, pub_key)
self.assert_called('POST', '/%s' % self.keypair_prefix,
body={'keypair': {'name': name,
'public_key': pub_key}})
self.assertIsInstance(kp, keypairs.Keypair)
class KeypairsV22TestCase(KeypairsTest):
def setUp(self):
super(KeypairsV22TestCase, self).setUp()
self.cs.api_version = api_versions.APIVersion("2.2")
def test_create_keypair(self):
name = "foo"
key_type = "some_type"
kp = self.cs.keypairs.create(name, key_type=key_type)
self.assert_called('POST', '/%s' % self.keypair_prefix,
body={'keypair': {'name': name,
'type': key_type}})
self.assertIsInstance(kp, keypairs.Keypair)
def test_import_keypair(self):
name = "foo"
pub_key = "fake-public-key"
kp = self.cs.keypairs.create(name, pub_key)
self.assert_called('POST', '/%s' % self.keypair_prefix,
body={'keypair': {'name': name,
'public_key': pub_key,
'type': 'ssh'}})
self.assertIsInstance(kp, keypairs.Keypair)

View File

@ -75,11 +75,15 @@ class ShellTest(utils.TestCase):
@mock.patch('sys.stdout', new_callable=six.StringIO)
@mock.patch('sys.stderr', new_callable=six.StringIO)
def run_command(self, cmd, mock_stderr, mock_stdout):
def run_command(self, cmd, mock_stderr, mock_stdout, api_version=None):
version_options = []
if api_version:
version_options.extend(["--os-compute-api-version", api_version,
"--service-type", "computev21"])
if isinstance(cmd, list):
self.shell.main(cmd)
self.shell.main(version_options + cmd)
else:
self.shell.main(cmd.split())
self.shell.main(version_options + cmd.split())
return mock_stdout.getvalue(), mock_stderr.getvalue()
def assert_called(self, method, url, body=None, **kwargs):
@ -2361,28 +2365,61 @@ class ShellTest(utils.TestCase):
self.run_command,
"ssh --ipv6 --network nonexistent server")
def test_keypair_add(self):
self.run_command('keypair-add test')
self.assert_called('POST', '/os-keypairs',
{'keypair':
{'name': 'test'}})
def _check_keypair_add(self, expected_key_type=None, extra_args='',
api_version=None):
self.run_command("keypair-add %s test" % extra_args,
api_version=api_version)
expected_body = {"keypair": {"name": "test"}}
if expected_key_type:
expected_body["keypair"]["type"] = expected_key_type
self.assert_called("POST", "/os-keypairs", expected_body)
def test_keypair_import(self):
def test_keypair_add_v20(self):
self._check_keypair_add(api_version="2.0")
def test_keypair_add_v22(self):
self._check_keypair_add('ssh', api_version="2.2")
def test_keypair_add_ssh(self):
self._check_keypair_add('ssh', '--key-type ssh', api_version="2.2")
def test_keypair_add_ssh_x509(self):
self._check_keypair_add('x509', '--key-type x509', api_version="2.2")
def _check_keypair_import(self, expected_key_type=None, extra_args='',
api_version=None):
with mock.patch.object(builtins, 'open',
mock.mock_open(read_data='FAKE_PUBLIC_KEY')):
self.run_command('keypair-add --pub-key test.pub test')
self.run_command('keypair-add --pub-key test.pub %s test' %
extra_args, api_version=api_version)
expected_body = {"keypair": {'public_key': 'FAKE_PUBLIC_KEY',
'name': 'test'}}
if expected_key_type:
expected_body["keypair"]["type"] = expected_key_type
self.assert_called(
'POST', '/os-keypairs', {
'keypair': {'public_key': 'FAKE_PUBLIC_KEY',
'name': 'test'}})
'POST', '/os-keypairs', expected_body)
def test_keypair_import_v20(self):
self._check_keypair_import(api_version="2.0")
def test_keypair_import_v22(self):
self._check_keypair_import('ssh', api_version="2.2")
def test_keypair_import_ssh(self):
self._check_keypair_import('ssh', '--key-type ssh', api_version="2.2")
def test_keypair_import_x509(self):
self._check_keypair_import('x509', '--key-type x509',
api_version="2.2")
def test_keypair_stdin(self):
with mock.patch('sys.stdin', six.StringIO('FAKE_PUBLIC_KEY')):
self.run_command('keypair-add --pub-key - test')
self.run_command('keypair-add --pub-key - test', api_version="2.2")
self.assert_called(
'POST', '/os-keypairs', {
'keypair':
{'public_key': 'FAKE_PUBLIC_KEY', 'name': 'test'}})
{'public_key': 'FAKE_PUBLIC_KEY', 'name': 'test',
'type': 'ssh'}})
def test_keypair_list(self):
self.run_command('keypair-list')

View File

@ -17,6 +17,7 @@
Keypair interface (1.1 extension).
"""
from novaclient import api_versions
from novaclient import base
@ -65,6 +66,7 @@ class KeypairManager(base.ManagerWithFind):
return self._get("/%s/%s" % (self.keypair_prefix, base.getid(keypair)),
"keypair")
@api_versions.wraps("2.0", "2.1")
def create(self, name, public_key=None):
"""
Create a keypair
@ -77,6 +79,21 @@ class KeypairManager(base.ManagerWithFind):
body['keypair']['public_key'] = public_key
return self._create('/%s' % self.keypair_prefix, body, 'keypair')
@api_versions.wraps("2.2")
def create(self, name, public_key=None, key_type="ssh"):
"""
Create a keypair
:param name: name for the keypair to create
:param public_key: existing public key to import
:param key_type: keypair type to create
"""
body = {'keypair': {'name': name,
'type': key_type}}
if public_key:
body['keypair']['public_key'] = public_key
return self._create('/%s' % self.keypair_prefix, body, 'keypair')
def delete(self, key):
"""
Delete a keypair

View File

@ -34,6 +34,7 @@ from oslo_utils import timeutils
from oslo_utils import uuidutils
import six
from novaclient import api_versions
from novaclient import client
from novaclient import exceptions
from novaclient.i18n import _
@ -2874,6 +2875,16 @@ def do_secgroup_delete_group_rule(cs, args):
raise exceptions.CommandError(_("Rule not found"))
@api_versions.wraps("2.0", "2.1")
def _keypair_create(cs, args, name, pub_key):
return cs.keypairs.create(name, pub_key)
@api_versions.wraps("2.2")
def _keypair_create(cs, args, name, pub_key):
return cs.keypairs.create(name, pub_key, key_type=args.key_type)
@cliutils.arg('name', metavar='<name>', help=_('Name of key.'))
@cliutils.arg(
'--pub-key',
@ -2883,11 +2894,16 @@ def do_secgroup_delete_group_rule(cs, args):
@cliutils.arg(
'--pub_key',
help=argparse.SUPPRESS)
@cliutils.arg(
'--key-type',
metavar='<key-type>',
default='ssh',
help=_('Keypair type. Can be ssh or x509.'),
start_version="2.2")
def do_keypair_add(cs, args):
"""Create a new key pair for use with servers."""
name = args.name
pub_key = args.pub_key
if pub_key:
if pub_key == '-':
pub_key = sys.stdin.read()
@ -2901,7 +2917,7 @@ def do_keypair_add(cs, args):
% {'key': pub_key, 'exc': e}
)
keypair = cs.keypairs.create(name, pub_key)
keypair = _keypair_create(cs, args, name, pub_key)
if not pub_key:
private_key = keypair.private_key
@ -2915,10 +2931,20 @@ def do_keypair_delete(cs, args):
cs.keypairs.delete(name)
@api_versions.wraps("2.0", "2.1")
def _get_keypairs_list_columns(cs, args):
return ['Name', 'Fingerprint']
@api_versions.wraps("2.2")
def _get_keypairs_list_columns(cs, args):
return ['Name', 'Type', 'Fingerprint']
def do_keypair_list(cs, args):
"""Print a list of keypairs for a user"""
keypairs = cs.keypairs.list()
columns = ['Name', 'Fingerprint']
columns = _get_keypairs_list_columns(cs, args)
utils.print_list(keypairs, columns)