diff --git a/mypy-files.txt b/mypy-files.txt index 181e207bdcfc..a2085f8f4a26 100644 --- a/mypy-files.txt +++ b/mypy-files.txt @@ -1,3 +1,4 @@ +nova/crypto.py nova/virt/driver.py nova/virt/hardware.py nova/virt/libvirt/__init__.py diff --git a/nova/crypto.py b/nova/crypto.py index f195bce4fa52..b927f0c255f5 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -21,7 +21,9 @@ Includes root and intermediate CAs, SSH key_pairs and x509 certificates. import base64 import binascii +import io import os +import typing as ty from cryptography.hazmat import backends from cryptography.hazmat.primitives.asymmetric import padding @@ -31,7 +33,6 @@ from cryptography import x509 from oslo_concurrency import processutils from oslo_log import log as logging import paramiko -import six import nova.conf from nova import exception @@ -44,7 +45,7 @@ LOG = logging.getLogger(__name__) CONF = nova.conf.CONF -def generate_fingerprint(public_key): +def generate_fingerprint(public_key: str) -> str: try: pub_bytes = public_key.encode('utf-8') # Test that the given public_key string is a proper ssh key. The @@ -56,24 +57,22 @@ def generate_fingerprint(public_key): digest = hashes.Hash(hashes.MD5(), backends.default_backend()) digest.update(pub_data) md5hash = digest.finalize() - raw_fp = binascii.hexlify(md5hash) - if six.PY3: - raw_fp = raw_fp.decode('ascii') + raw_fp = binascii.hexlify(md5hash).decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) except Exception: raise exception.InvalidKeypair( reason=_('failed to generate fingerprint')) -def generate_x509_fingerprint(pem_key): +def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str: try: - if isinstance(pem_key, six.text_type): + if isinstance(pem_key, str): pem_key = pem_key.encode('utf-8') cert = x509.load_pem_x509_certificate( pem_key, backends.default_backend()) - raw_fp = binascii.hexlify(cert.fingerprint(hashes.SHA1())) - if six.PY3: - raw_fp = raw_fp.decode('ascii') + raw_fp = binascii.hexlify( + cert.fingerprint(hashes.SHA1()) + ).decode('ascii') return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2])) except (ValueError, TypeError, binascii.Error) as ex: raise exception.InvalidKeypair( @@ -81,9 +80,9 @@ def generate_x509_fingerprint(pem_key): 'Error message: %s') % ex) -def generate_key_pair(bits=2048): +def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]: key = paramiko.RSAKey.generate(bits) - keyout = six.StringIO() + keyout = io.StringIO() key.write_private_key(keyout) private_key = keyout.getvalue() public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64()) @@ -91,12 +90,12 @@ def generate_key_pair(bits=2048): return (private_key, public_key, fingerprint) -def ssh_encrypt_text(ssh_public_key, text): +def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes: """Encrypt text with an ssh public key. If text is a Unicode string, encode it to UTF-8. """ - if isinstance(text, six.text_type): + if isinstance(text, str): text = text.encode('utf-8') try: pub_bytes = ssh_public_key.encode('utf-8') @@ -104,10 +103,13 @@ def ssh_encrypt_text(ssh_public_key, text): pub_bytes, backends.default_backend()) return pub_key.encrypt(text, padding.PKCS1v15()) except Exception as exc: - raise exception.EncryptionFailure(reason=six.text_type(exc)) + raise exception.EncryptionFailure(reason=str(exc)) -def generate_winrm_x509_cert(user_id, bits=2048): +def generate_winrm_x509_cert( + user_id: str, + bits: int = 2048 +) -> ty.Tuple[str, str, str]: """Generate a cert for passwordless auth for user in project.""" subject = '/CN=%s' % user_id upn = '%s@localhost' % user_id @@ -118,28 +120,26 @@ def generate_winrm_x509_cert(user_id, bits=2048): _create_x509_openssl_config(conffile, upn) - (certificate, _err) = processutils.execute( - 'openssl', 'req', '-x509', '-nodes', '-days', '3650', - '-config', conffile, '-newkey', 'rsa:%s' % bits, - '-outform', 'PEM', '-keyout', keyfile, '-subj', subject, - '-extensions', 'v3_req_client', - binary=True) + out, _ = processutils.execute( + 'openssl', 'req', '-x509', '-nodes', '-days', '3650', + '-config', conffile, '-newkey', 'rsa:%s' % bits, + '-outform', 'PEM', '-keyout', keyfile, '-subj', subject, + '-extensions', 'v3_req_client', + binary=True) - (out, _err) = processutils.execute('openssl', 'pkcs12', '-export', - '-inkey', keyfile, '-password', 'pass:', - process_input=certificate, - binary=True) + certificate = out.decode('utf-8') - private_key = base64.b64encode(out) + out, _ = processutils.execute( + 'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password', + 'pass:', process_input=out, binary=True) + + private_key = base64.b64encode(out).decode('ascii') fingerprint = generate_x509_fingerprint(certificate) - if six.PY3: - private_key = private_key.decode('ascii') - certificate = certificate.decode('utf-8') return (private_key, certificate, fingerprint) -def _create_x509_openssl_config(conffile, upn): +def _create_x509_openssl_config(conffile: str, upn: str): content = ("distinguished_name = req_distinguished_name\n" "[req_distinguished_name]\n" "[v3_req_client]\n"