310 lines
11 KiB

# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Wrappers around standard crypto data elements.
Includes root and intermediate CAs, SSH key_pairs and x509 certificates.
import base64
import binascii
import io
import os
import typing as ty
from castellan.common import exception as castellan_exception
from castellan.common.objects import passphrase
from castellan import key_manager
from cryptography.hazmat import backends
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives import serialization
from cryptography import x509
from oslo_concurrency import processutils
from oslo_log import log as logging
from oslo_serialization import base64 as oslo_base64
from oslo_utils.secretutils import md5
import paramiko
import nova.conf
from nova import context as nova_context
from nova import exception
from nova.i18n import _
from nova import objects
from nova import utils
from nova.virt import block_device as driver_block_device
LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
_KEYMGR = None
def _get_key_manager():
global _KEYMGR
if _KEYMGR is None:
_KEYMGR = key_manager.API(configuration=CONF)
return _KEYMGR
def generate_fingerprint(public_key: str) -> str:
pub_bytes = public_key.encode('utf-8')
# Test that the given public_key string is a proper ssh key. The
# returned object is unused since pyca/cryptography does not have a
# fingerprint method.
pub_bytes, backends.default_backend())
pub_data = base64.b64decode(public_key.split(' ')[1])
raw_fp = md5(pub_data, usedforsecurity=False).hexdigest()
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except Exception:
raise exception.InvalidKeypair(
reason=_('failed to generate fingerprint'))
def generate_x509_fingerprint(pem_key: ty.Union[bytes, str]) -> str:
if isinstance(pem_key, str):
pem_key = pem_key.encode('utf-8')
cert = x509.load_pem_x509_certificate(
pem_key, backends.default_backend())
raw_fp = binascii.hexlify(
return ':'.join(a + b for a, b in zip(raw_fp[::2], raw_fp[1::2]))
except (ValueError, TypeError, binascii.Error) as ex:
raise exception.InvalidKeypair(
reason=_('failed to generate X509 fingerprint. '
'Error message: %s') % ex)
def generate_key_pair(bits: int = 2048) -> ty.Tuple[str, str, str]:
key = paramiko.RSAKey.generate(bits)
keyout = io.StringIO()
private_key = keyout.getvalue()
public_key = '%s %s Generated-by-Nova' % (key.get_name(), key.get_base64())
fingerprint = generate_fingerprint(public_key)
return (private_key, public_key, fingerprint)
def ssh_encrypt_text(ssh_public_key: str, text: ty.Union[str, bytes]) -> bytes:
"""Encrypt text with an ssh public key.
If text is a Unicode string, encode it to UTF-8.
if isinstance(text, str):
text = text.encode('utf-8')
pub_bytes = ssh_public_key.encode('utf-8')
pub_key = serialization.load_ssh_public_key(
pub_bytes, backends.default_backend())
return pub_key.encrypt(text, padding.PKCS1v15())
except Exception as exc:
raise exception.EncryptionFailure(reason=str(exc))
def generate_winrm_x509_cert(
user_id: str,
bits: int = 2048
) -> ty.Tuple[str, str, str]:
"""Generate a cert for passwordless auth for user in project."""
subject = '/CN=%s' % user_id
upn = '%s@localhost' % user_id
with utils.tempdir() as tmpdir:
keyfile = os.path.abspath(os.path.join(tmpdir, 'temp.key'))
conffile = os.path.abspath(os.path.join(tmpdir, 'temp.conf'))
_create_x509_openssl_config(conffile, upn)
out, _ = processutils.execute(
'openssl', 'req', '-x509', '-nodes', '-days', '3650',
'-config', conffile, '-newkey', 'rsa:%s' % bits,
'-outform', 'PEM', '-keyout', keyfile, '-subj', subject,
'-extensions', 'v3_req_client',
certificate = out.decode('utf-8')
out, _ = processutils.execute(
'openssl', 'pkcs12', '-export', '-inkey', keyfile, '-password',
'pass:', process_input=out, binary=True)
private_key = base64.b64encode(out).decode('ascii')
fingerprint = generate_x509_fingerprint(certificate)
return (private_key, certificate, fingerprint)
def _create_x509_openssl_config(conffile: str, upn: str):
content = ("distinguished_name = req_distinguished_name\n"
"extendedKeyUsage = clientAuth\n"
"subjectAltName = otherName:"";UTF8:%s\n")
with open(conffile, 'w') as file:
file.write(content % upn)
def ensure_vtpm_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
) -> ty.Tuple[str, str]:
"""Communicates with the key manager service to retrieve or create a secret
for an instance's emulated TPM.
When creating a secret, its UUID is saved to the instance's system_metadata
as ``vtpm_secret_uuid``.
:param context: Nova auth context.
:param instance: Instance object.
:return: A tuple comprising (secret_uuid, passphrase).
:raise: castellan_exception.ManagedObjectNotFoundError if communication
with the key manager API fails, or if a vtpm_secret_uuid was present in
the instance's system metadata but could not be found in the key
manager service.
key_mgr = _get_key_manager()
secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
if secret_uuid is not None:
# Try to retrieve the secret from the key manager
secret = key_mgr.get(context, secret_uuid)
# assert secret_uuid == ?
"Found existing vTPM secret with UUID %s.",
secret_uuid, instance=instance)
return, secret.get_encoded()
except castellan_exception.ManagedObjectNotFoundError:
"Despite being set on the instance, failed to find a vTPM "
"secret with UUID %s. This should only happen if the secret "
"was manually deleted from the key manager service. Your vTPM "
"is likely to be unrecoverable.",
secret_uuid, instance=instance)
# If we get here, the instance has no vtpm_secret_uuid. Create a new one
# and register it with the key manager.
secret = base64.b64encode(os.urandom(_VTPM_SECRET_BYTE_LENGTH))
# Castellan ManagedObject
cmo = passphrase.Passphrase(
secret, name="vTPM secret for instance %s" % instance.uuid)
secret_uuid =, cmo)
LOG.debug("Created vTPM secret with UUID %s",
secret_uuid, instance=instance)
instance.system_metadata['vtpm_secret_uuid'] = secret_uuid
return secret_uuid, secret
def delete_vtpm_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
"""Communicates with the key manager service to destroy the secret for an
instance's emulated TPM.
This operation is idempotent: if the instance never had a vTPM secret, OR
if the secret has already been deleted, it is a no-op.
The ``vtpm_secret_uuid`` member of the instance's system_metadata is
cleared as a side effect of this method.
:param context: Nova auth context.
:param instance: Instance object.
:return: None
:raise: castellan_exception.ManagedObjectNotFoundError if communication
with the key manager API.
secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
if not secret_uuid:
key_mgr = _get_key_manager()
key_mgr.delete(context, secret_uuid)
LOG.debug("Deleted vTPM secret with UUID %s",
secret_uuid, instance=instance)
except castellan_exception.ManagedObjectNotFoundError:
LOG.debug("vTPM secret with UUID %s already deleted or never existed.",
secret_uuid, instance=instance)
del instance.system_metadata['vtpm_secret_uuid']
def create_encryption_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
driver_bdm: 'driver_block_device.DriverBlockDevice',
for_detail: ty.Optional[str] = None,
# Use oslo.serialization to encode some random data as passphrase
secret = oslo_base64.encode_as_text(
if for_detail is None:
for_detail = f"instance {instance.uuid} BDM {driver_bdm['uuid']}"
secret_name = f'Ephemeral encryption secret for {for_detail}'
cmo = passphrase.Passphrase(secret, name=secret_name)
key_mgr = _get_key_manager()
secret_uuid =, cmo)
f'Created "{secret_name}" with UUID {secret_uuid}',
return secret_uuid, secret
def get_encryption_secret(
context: nova_context.RequestContext,
secret_uuid: str,
) -> ty.Optional[str]:
key_mgr = _get_key_manager()
key = key_mgr.get(context, secret_uuid)
LOG.debug(f"Retrieved secret with UUID {secret_uuid}")
return key.get_encoded()
except castellan_exception.ManagedObjectNotFoundError:
LOG.debug(f"Encryption secret with UUID {secret_uuid} was not found.")
return None
def delete_encryption_secret(
context: nova_context.RequestContext,
instance_uuid: str,
secret_uuid: str,
key_mgr = _get_key_manager()
key_mgr.delete(context, secret_uuid)
LOG.debug(f"Deleted secret with UUID {secret_uuid}",
except castellan_exception.ManagedObjectNotFoundError:
LOG.debug(f"Encryption secret with UUID {secret_uuid} already deleted "
"or never existed.", instance_uuid=instance_uuid)