ddb2b028f1
These top-level functions were not called from anywhere but tests. - fetch_ca - ensure_ca_filesystem - fetch_crl - decrypt_text - revoke_certs_by_user - revoke_certs_by_project - revoke_certs_by_user_and_project - generate_x509_cert - generate_vpn_files These other functions are used by the above and are no longer used anywhere. - ca_folder - ca_path - key_path - crl_path - revoke_cert - _project_cert_subject - _user_cert_subject - _ensure_project_folder - sign_csr - _sign_csr Tests for these are removed as are a number of scripts found in 'nova/CA', which were only used by the aforementioned functions. Change-Id: Ie1dadc6bf935f777e0cd0c54a0a21b79545714c5
152 lines
5.4 KiB
Python
152 lines
5.4 KiB
Python
# Copyright 2010 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Wrappers around standard crypto data elements.
|
|
|
|
Includes root and intermediate CAs, SSH key_pairs and x509 certificates.
|
|
"""
|
|
|
|
from __future__ import absolute_import
|
|
|
|
import base64
|
|
import binascii
|
|
import os
|
|
|
|
from cryptography.hazmat import backends
|
|
from cryptography.hazmat.primitives.asymmetric import padding
|
|
from cryptography.hazmat.primitives import hashes
|
|
from cryptography.hazmat.primitives import serialization
|
|
from cryptography import x509
|
|
from oslo_log import log as logging
|
|
import paramiko
|
|
import six
|
|
|
|
import nova.conf
|
|
from nova import exception
|
|
from nova.i18n import _
|
|
from nova import utils
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
CONF = nova.conf.CONF
|
|
|
|
|
|
def generate_fingerprint(public_key):
|
|
try:
|
|
pub_bytes = public_key.encode('utf-8')
|
|
# Test that the given public_key string is a proper ssh key. The
|
|
# returned object is unused since pyca/cryptography does not have a
|
|
# fingerprint method.
|
|
serialization.load_ssh_public_key(
|
|
pub_bytes, backends.default_backend())
|
|
pub_data = base64.b64decode(public_key.split(' ')[1])
|
|
digest = hashes.Hash(hashes.MD5(), backends.default_backend())
|
|
digest.update(pub_data)
|
|
md5hash = digest.finalize()
|
|
raw_fp = binascii.hexlify(md5hash)
|
|
if six.PY3:
|
|
raw_fp = raw_fp.decode('ascii')
|
|
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
|
|
except Exception:
|
|
raise exception.InvalidKeypair(
|
|
reason=_('failed to generate fingerprint'))
|
|
|
|
|
|
def generate_x509_fingerprint(pem_key):
|
|
try:
|
|
if isinstance(pem_key, six.text_type):
|
|
pem_key = pem_key.encode('utf-8')
|
|
cert = x509.load_pem_x509_certificate(
|
|
pem_key, backends.default_backend())
|
|
raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
|
|
if six.PY3:
|
|
raw_fp = raw_fp.decode('ascii')
|
|
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
|
|
except (ValueError, TypeError, binascii.Error) as ex:
|
|
raise exception.InvalidKeypair(
|
|
reason=_('failed to generate X509 fingerprint. '
|
|
'Error message: %s') % ex)
|
|
|
|
|
|
def generate_key_pair(bits=2048):
|
|
key = paramiko.RSAKey.generate(bits)
|
|
keyout = six.StringIO()
|
|
key.write_private_key(keyout)
|
|
private_key = keyout.getvalue()
|
|
public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
|
|
fingerprint = generate_fingerprint(public_key)
|
|
return (private_key, public_key, fingerprint)
|
|
|
|
|
|
def ssh_encrypt_text(ssh_public_key, text):
|
|
"""Encrypt text with an ssh public key.
|
|
|
|
If text is a Unicode string, encode it to UTF-8.
|
|
"""
|
|
if isinstance(text, six.text_type):
|
|
text = text.encode('utf-8')
|
|
try:
|
|
pub_bytes = ssh_public_key.encode('utf-8')
|
|
pub_key = serialization.load_ssh_public_key(
|
|
pub_bytes, backends.default_backend())
|
|
return pub_key.encrypt(text, padding.PKCS1v15())
|
|
except Exception as exc:
|
|
raise exception.EncryptionFailure(reason=six.text_type(exc))
|
|
|
|
|
|
def generate_winrm_x509_cert(user_id, bits=2048):
|
|
"""Generate a cert for passwordless auth for user in project."""
|
|
subject = '/CN=%s' % user_id
|
|
upn = '%s@localhost' % user_id
|
|
|
|
with utils.tempdir() as tmpdir:
|
|
keyfile = os.path.abspath(os.path.join(tmpdir, 'temp.key'))
|
|
conffile = os.path.abspath(os.path.join(tmpdir, 'temp.conf'))
|
|
|
|
_create_x509_openssl_config(conffile, upn)
|
|
|
|
(certificate, _err) = utils.execute(
|
|
'openssl', 'req', '-x509', '-nodes', '-days', '3650',
|
|
'-config', conffile, '-newkey', 'rsa:%s' % bits,
|
|
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
|
|
'-extensions', 'v3_req_client',
|
|
binary=True)
|
|
|
|
(out, _err) = utils.execute('openssl', 'pkcs12', '-export',
|
|
'-inkey', keyfile, '-password', 'pass:',
|
|
process_input=certificate,
|
|
binary=True)
|
|
|
|
private_key = base64.b64encode(out)
|
|
fingerprint = generate_x509_fingerprint(certificate)
|
|
if six.PY3:
|
|
private_key = private_key.decode('ascii')
|
|
certificate = certificate.decode('utf-8')
|
|
|
|
return (private_key, certificate, fingerprint)
|
|
|
|
|
|
def _create_x509_openssl_config(conffile, upn):
|
|
content = ("distinguished_name = req_distinguished_name\n"
|
|
"[req_distinguished_name]\n"
|
|
"[v3_req_client]\n"
|
|
"extendedKeyUsage = clientAuth\n"
|
|
"subjectAltName = otherName:""1.3.6.1.4.1.311.20.2.3;UTF8:%s\n")
|
|
|
|
with open(conffile, 'w') as file:
|
|
file.write(content % upn)
|