crypto: Add type hints

Nothing to complicated here, other than working around mypy's dislike of
changing variable types and inability to process 'if six.PY3'
conditional blocks because it doesn't run the code.

Part of blueprint add-emulated-virtual-tpm

Change-Id: I805eaa8b6fb55ce9cbc8f6b8b777af48302ba2ba
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2020-07-02 14:12:47 +01:00
parent 7e4d8afb95
commit 9ce6c0d8e9
2 changed files with 32 additions and 31 deletions

View File

@ -1,3 +1,4 @@
nova/crypto.py
nova/virt/driver.py nova/virt/driver.py
nova/virt/hardware.py nova/virt/hardware.py
nova/virt/libvirt/__init__.py nova/virt/libvirt/__init__.py

View File

@ -21,7 +21,9 @@ Includes root and intermediate CAs, SSH key_pairs and x509 certificates.
import base64 import base64
import binascii import binascii
import io
import os import os
import typing as ty
from cryptography.hazmat import backends from cryptography.hazmat import backends
from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives.asymmetric import padding
@ -31,7 +33,6 @@ from cryptography import x509
from oslo_concurrency import processutils from oslo_concurrency import processutils
from oslo_log import log as logging from oslo_log import log as logging
import paramiko import paramiko
import six
import nova.conf import nova.conf
from nova import exception from nova import exception
@ -44,7 +45,7 @@ LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF CONF = nova.conf.CONF
def generate_fingerprint(public_key): def generate_fingerprint(public_key: str) -> str:
try: try:
pub_bytes = public_key.encode('utf-8') pub_bytes = public_key.encode('utf-8')
# Test that the given public_key string is a proper ssh key. The # Test that the given public_key string is a proper ssh key. The
@ -56,24 +57,22 @@ def generate_fingerprint(public_key):
digest = hashes.Hash(hashes.MD5(), backends.default_backend()) digest = hashes.Hash(hashes.MD5(), backends.default_backend())
digest.update(pub_data) digest.update(pub_data)
md5hash = digest.finalize() md5hash = digest.finalize()
raw_fp = binascii.hexlify(md5hash) raw_fp = binascii.hexlify(md5hash).decode('ascii')
if six.PY3:
raw_fp = raw_fp.decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except Exception: except Exception:
raise exception.InvalidKeypair( raise exception.InvalidKeypair(
reason=_('failed to generate fingerprint')) reason=_('failed to generate fingerprint'))
def generate_x509_fingerprint(pem_key): def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str:
try: try:
if isinstance(pem_key, six.text_type): if isinstance(pem_key, str):
pem_key = pem_key.encode('utf-8') pem_key = pem_key.encode('utf-8')
cert = x509.load_pem_x509_certificate( cert = x509.load_pem_x509_certificate(
pem_key, backends.default_backend()) pem_key, backends.default_backend())
raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1())) raw_fp = binascii.hexlify(
if six.PY3: cert.fingerprint(hashes.SHA1())
raw_fp = raw_fp.decode('ascii') ).decode('ascii')
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except (ValueError, TypeError, binascii.Error) as ex: except (ValueError, TypeError, binascii.Error) as ex:
raise exception.InvalidKeypair( raise exception.InvalidKeypair(
@ -81,9 +80,9 @@ def generate_x509_fingerprint(pem_key):
'Error message: %s') % ex) 'Error message: %s') % ex)
def generate_key_pair(bits=2048): def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]:
key = paramiko.RSAKey.generate(bits) key = paramiko.RSAKey.generate(bits)
keyout = six.StringIO() keyout = io.StringIO()
key.write_private_key(keyout) key.write_private_key(keyout)
private_key = keyout.getvalue() private_key = keyout.getvalue()
public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64()) public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
@ -91,12 +90,12 @@ def generate_key_pair(bits=2048):
return (private_key, public_key, fingerprint) return (private_key, public_key, fingerprint)
def ssh_encrypt_text(ssh_public_key, text): def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes:
"""Encrypt text with an ssh public key. """Encrypt text with an ssh public key.
If text is a Unicode string, encode it to UTF-8. If text is a Unicode string, encode it to UTF-8.
""" """
if isinstance(text, six.text_type): if isinstance(text, str):
text = text.encode('utf-8') text = text.encode('utf-8')
try: try:
pub_bytes = ssh_public_key.encode('utf-8') pub_bytes = ssh_public_key.encode('utf-8')
@ -104,10 +103,13 @@ def ssh_encrypt_text(ssh_public_key, text):
pub_bytes, backends.default_backend()) pub_bytes, backends.default_backend())
return pub_key.encrypt(text, padding.PKCS1v15()) return pub_key.encrypt(text, padding.PKCS1v15())
except Exception as exc: except Exception as exc:
raise exception.EncryptionFailure(reason=six.text_type(exc)) raise exception.EncryptionFailure(reason=str(exc))
def generate_winrm_x509_cert(user_id, bits=2048): def generate_winrm_x509_cert(
user_id: str,
bits: int = 2048
) -> ty.Tuple[str, str, str]:
"""Generate a cert for passwordless auth for user in project.""" """Generate a cert for passwordless auth for user in project."""
subject = '/CN=%s' % user_id subject = '/CN=%s' % user_id
upn = '%s@localhost' % user_id upn = '%s@localhost' % user_id
@ -118,28 +120,26 @@ def generate_winrm_x509_cert(user_id, bits=2048):
_create_x509_openssl_config(conffile, upn) _create_x509_openssl_config(conffile, upn)
(certificate, _err) = processutils.execute( out, _ = processutils.execute(
'openssl', 'req', '-x509', '-nodes', '-days', '3650', 'openssl', 'req', '-x509', '-nodes', '-days', '3650',
'-config', conffile, '-newkey', 'rsa:%s' % bits, '-config', conffile, '-newkey', 'rsa:%s' % bits,
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject, '-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
'-extensions', 'v3_req_client', '-extensions', 'v3_req_client',
binary=True) binary=True)
(out, _err) = processutils.execute('openssl', 'pkcs12', '-export', certificate = out.decode('utf-8')
'-inkey', keyfile, '-password', 'pass:',
process_input=certificate,
binary=True)
private_key = base64.b64encode(out) out, _ = processutils.execute(
'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password',
'pass:', process_input=out, binary=True)
private_key = base64.b64encode(out).decode('ascii')
fingerprint = generate_x509_fingerprint(certificate) fingerprint = generate_x509_fingerprint(certificate)
if six.PY3:
private_key = private_key.decode('ascii')
certificate = certificate.decode('utf-8')
return (private_key, certificate, fingerprint) return (private_key, certificate, fingerprint)
def _create_x509_openssl_config(conffile, upn): def _create_x509_openssl_config(conffile: str, upn: str):
content = ("distinguished_name = req_distinguished_name\n" content = ("distinguished_name = req_distinguished_name\n"
"[req_distinguished_name]\n" "[req_distinguished_name]\n"
"[v3_req_client]\n" "[v3_req_client]\n"