Merge "crypto: Add type hints"
This commit is contained in:
commit
e6a015e6e9
@ -1,3 +1,4 @@
|
||||
nova/crypto.py
|
||||
nova/virt/driver.py
|
||||
nova/virt/hardware.py
|
||||
nova/virt/libvirt/__init__.py
|
||||
|
@ -21,7 +21,9 @@ Includes root and intermediate CAs, SSH key_pairs and x509 certificates.
|
||||
|
||||
import base64
|
||||
import binascii
|
||||
import io
|
||||
import os
|
||||
import typing as ty
|
||||
|
||||
from cryptography.hazmat import backends
|
||||
from cryptography.hazmat.primitives.asymmetric import padding
|
||||
@ -31,7 +33,6 @@ from cryptography import x509
|
||||
from oslo_concurrency import processutils
|
||||
from oslo_log import log as logging
|
||||
import paramiko
|
||||
import six
|
||||
|
||||
import nova.conf
|
||||
from nova import exception
|
||||
@ -44,7 +45,7 @@ LOG = logging.getLogger(__name__)
|
||||
CONF = nova.conf.CONF
|
||||
|
||||
|
||||
def generate_fingerprint(public_key):
|
||||
def generate_fingerprint(public_key: str) -> str:
|
||||
try:
|
||||
pub_bytes = public_key.encode('utf-8')
|
||||
# Test that the given public_key string is a proper ssh key. The
|
||||
@ -56,24 +57,22 @@ def generate_fingerprint(public_key):
|
||||
digest = hashes.Hash(hashes.MD5(), backends.default_backend())
|
||||
digest.update(pub_data)
|
||||
md5hash = digest.finalize()
|
||||
raw_fp = binascii.hexlify(md5hash)
|
||||
if six.PY3:
|
||||
raw_fp = raw_fp.decode('ascii')
|
||||
raw_fp = binascii.hexlify(md5hash).decode('ascii')
|
||||
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
|
||||
except Exception:
|
||||
raise exception.InvalidKeypair(
|
||||
reason=_('failed to generate fingerprint'))
|
||||
|
||||
|
||||
def generate_x509_fingerprint(pem_key):
|
||||
def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str:
|
||||
try:
|
||||
if isinstance(pem_key, six.text_type):
|
||||
if isinstance(pem_key, str):
|
||||
pem_key = pem_key.encode('utf-8')
|
||||
cert = x509.load_pem_x509_certificate(
|
||||
pem_key, backends.default_backend())
|
||||
raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1()))
|
||||
if six.PY3:
|
||||
raw_fp = raw_fp.decode('ascii')
|
||||
raw_fp = binascii.hexlify(
|
||||
cert.fingerprint(hashes.SHA1())
|
||||
).decode('ascii')
|
||||
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
|
||||
except (ValueError, TypeError, binascii.Error) as ex:
|
||||
raise exception.InvalidKeypair(
|
||||
@ -81,9 +80,9 @@ def generate_x509_fingerprint(pem_key):
|
||||
'Error message: %s') % ex)
|
||||
|
||||
|
||||
def generate_key_pair(bits=2048):
|
||||
def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]:
|
||||
key = paramiko.RSAKey.generate(bits)
|
||||
keyout = six.StringIO()
|
||||
keyout = io.StringIO()
|
||||
key.write_private_key(keyout)
|
||||
private_key = keyout.getvalue()
|
||||
public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
|
||||
@ -91,12 +90,12 @@ def generate_key_pair(bits=2048):
|
||||
return (private_key, public_key, fingerprint)
|
||||
|
||||
|
||||
def ssh_encrypt_text(ssh_public_key, text):
|
||||
def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes:
|
||||
"""Encrypt text with an ssh public key.
|
||||
|
||||
If text is a Unicode string, encode it to UTF-8.
|
||||
"""
|
||||
if isinstance(text, six.text_type):
|
||||
if isinstance(text, str):
|
||||
text = text.encode('utf-8')
|
||||
try:
|
||||
pub_bytes = ssh_public_key.encode('utf-8')
|
||||
@ -104,10 +103,13 @@ def ssh_encrypt_text(ssh_public_key, text):
|
||||
pub_bytes, backends.default_backend())
|
||||
return pub_key.encrypt(text, padding.PKCS1v15())
|
||||
except Exception as exc:
|
||||
raise exception.EncryptionFailure(reason=six.text_type(exc))
|
||||
raise exception.EncryptionFailure(reason=str(exc))
|
||||
|
||||
|
||||
def generate_winrm_x509_cert(user_id, bits=2048):
|
||||
def generate_winrm_x509_cert(
|
||||
user_id: str,
|
||||
bits: int = 2048
|
||||
) -> ty.Tuple[str, str, str]:
|
||||
"""Generate a cert for passwordless auth for user in project."""
|
||||
subject = '/CN=%s' % user_id
|
||||
upn = '%s@localhost' % user_id
|
||||
@ -118,28 +120,26 @@ def generate_winrm_x509_cert(user_id, bits=2048):
|
||||
|
||||
_create_x509_openssl_config(conffile, upn)
|
||||
|
||||
(certificate, _err) = processutils.execute(
|
||||
'openssl', 'req', '-x509', '-nodes', '-days', '3650',
|
||||
'-config', conffile, '-newkey', 'rsa:%s' % bits,
|
||||
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
|
||||
'-extensions', 'v3_req_client',
|
||||
binary=True)
|
||||
out, _ = processutils.execute(
|
||||
'openssl', 'req', '-x509', '-nodes', '-days', '3650',
|
||||
'-config', conffile, '-newkey', 'rsa:%s' % bits,
|
||||
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
|
||||
'-extensions', 'v3_req_client',
|
||||
binary=True)
|
||||
|
||||
(out, _err) = processutils.execute('openssl', 'pkcs12', '-export',
|
||||
'-inkey', keyfile, '-password', 'pass:',
|
||||
process_input=certificate,
|
||||
binary=True)
|
||||
certificate = out.decode('utf-8')
|
||||
|
||||
private_key = base64.b64encode(out)
|
||||
out, _ = processutils.execute(
|
||||
'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password',
|
||||
'pass:', process_input=out, binary=True)
|
||||
|
||||
private_key = base64.b64encode(out).decode('ascii')
|
||||
fingerprint = generate_x509_fingerprint(certificate)
|
||||
if six.PY3:
|
||||
private_key = private_key.decode('ascii')
|
||||
certificate = certificate.decode('utf-8')
|
||||
|
||||
return (private_key, certificate, fingerprint)
|
||||
|
||||
|
||||
def _create_x509_openssl_config(conffile, upn):
|
||||
def _create_x509_openssl_config(conffile: str, upn: str):
|
||||
content = ("distinguished_name = req_distinguished_name\n"
|
||||
"[req_distinguished_name]\n"
|
||||
"[v3_req_client]\n"
|
||||
|
Loading…
Reference in New Issue
Block a user