crypto: Add support for creating, destroying vTPM secrets

Provide a method to communicate with the configured key manager service
using Castellan, a generic key manager interface that supports multiple
backends including - but not limited to - Barbican. Once again, there's
nothing using this yet but tests, though this will change shortly.

Part of blueprint add-emulated-virtual-tpm

Change-Id: Iff6195d252b018f008bb9d137e4d80c54b70b2d1
Signed-off-by: Stephen Finucane <stephenfin@redhat.com>
This commit is contained in:
Stephen Finucane 2020-07-03 11:06:11 +01:00
parent 5550f86623
commit 6ac2287826
2 changed files with 237 additions and 0 deletions

View File

@ -25,6 +25,9 @@ import io
import os
import typing as ty
from castellan.common import exception as castellan_exception
from castellan.common.objects import passphrase
from castellan import key_manager
from cryptography.hazmat import backends
from cryptography.hazmat.primitives.asymmetric import padding
from cryptography.hazmat.primitives import hashes
@ -35,8 +38,10 @@ from oslo_log import log as logging
import paramiko
import nova.conf
from nova import context as nova_context
from nova import exception
from nova.i18n import _
from nova import objects
from nova import utils
@ -44,6 +49,17 @@ LOG = logging.getLogger(__name__)
CONF = nova.conf.CONF
_KEYMGR = None
_VTPM_SECRET_BYTE_LENGTH = 384
def _get_key_manager():
global _KEYMGR
if _KEYMGR is None:
_KEYMGR = key_manager.API(configuration=CONF)
return _KEYMGR
def generate_fingerprint(public_key: str) -> str:
try:
@ -148,3 +164,93 @@ def _create_x509_openssl_config(conffile: str, upn: str):
with open(conffile, 'w') as file:
file.write(content % upn)
def ensure_vtpm_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
) -> ty.Tuple[str, str]:
"""Communicates with the key manager service to retrieve or create a secret
for an instance's emulated TPM.
When creating a secret, its UUID is saved to the instance's system_metadata
as ``vtpm_secret_uuid``.
:param context: Nova auth context.
:param instance: Instance object.
:return: A tuple comprising (secret_uuid, passphrase).
:raise: castellan_exception.ManagedObjectNotFoundError if communication
with the key manager API fails, or if a vtpm_secret_uuid was present in
the instance's system metadata but could not be found in the key
manager service.
"""
key_mgr = _get_key_manager()
secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
if secret_uuid is not None:
# Try to retrieve the secret from the key manager
try:
secret = key_mgr.get(context, secret_uuid)
# assert secret_uuid == secret.id ?
LOG.debug(
"Found existing vTPM secret with UUID %s.",
secret_uuid, instance=instance)
return secret.id, secret.get_encoded()
except castellan_exception.ManagedObjectNotFoundError:
LOG.warning(
"Despite being set on the instance, failed to find a vTPM "
"secret with UUID %s. This should only happen if the secret "
"was manually deleted from the key manager service. Your vTPM "
"is likely to be unrecoverable.",
secret_uuid, instance=instance)
raise
# If we get here, the instance has no vtpm_secret_uuid. Create a new one
# and register it with the key manager.
secret = base64.b64encode(os.urandom(_VTPM_SECRET_BYTE_LENGTH))
# Castellan ManagedObject
cmo = passphrase.Passphrase(
secret, name="vTPM secret for instance %s" % instance.uuid)
secret_uuid = key_mgr.store(context, cmo)
LOG.debug("Created vTPM secret with UUID %s",
secret_uuid, instance=instance)
instance.system_metadata['vtpm_secret_uuid'] = secret_uuid
instance.save()
return secret_uuid, secret
def delete_vtpm_secret(
context: nova_context.RequestContext,
instance: 'objects.Instance',
):
"""Communicates with the key manager service to destroy the secret for an
instance's emulated TPM.
This operation is idempotent: if the instance never had a vTPM secret, OR
if the secret has already been deleted, it is a no-op.
The ``vtpm_secret_uuid`` member of the instance's system_metadata is
cleared as a side effect of this method.
:param context: Nova auth context.
:param instance: Instance object.
:return: None
:raise: castellan_exception.ManagedObjectNotFoundError if communication
with the key manager API.
"""
secret_uuid = instance.system_metadata.get('vtpm_secret_uuid')
if not secret_uuid:
return
key_mgr = _get_key_manager()
try:
key_mgr.delete(context, secret_uuid)
LOG.debug("Deleted vTPM secret with UUID %s",
secret_uuid, instance=instance)
except castellan_exception.ManagedObjectNotFoundError:
LOG.debug("vTPM secret with UUID %s already deleted or never existed.",
secret_uuid, instance=instance)
del instance.system_metadata['vtpm_secret_uuid']
instance.save()

View File

@ -18,15 +18,19 @@ Tests for Crypto module.
import os
from castellan.common import exception as castellan_exception
from cryptography.hazmat import backends
from cryptography.hazmat.primitives import serialization
import mock
from oslo_concurrency import processutils
from oslo_utils.fixture import uuidsentinel as uuids
import paramiko
import six
from nova import context as nova_context
from nova import crypto
from nova import exception
from nova import objects
from nova import test
from nova import utils
@ -219,3 +223,130 @@ class KeyPairTest(test.NoDBTestCase):
(private_key, public_key, fingerprint) = crypto.generate_key_pair()
self.assertEqual(self.rsa_pub, public_key)
self.assertEqual(self.rsa_fp, fingerprint)
class FakePassphrase():
"""A fake castellan ManagedObject."""
def __init__(self):
self.id = 1
def get_encoded(self):
return b'foo'
class VTPMTest(test.NoDBTestCase):
def setUp(self):
super().setUp()
self.ctxt = nova_context.get_admin_context()
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret(self, mock_get_manager):
"""Check behavior when instance already has an associated secret.
We should attempt to retrieve the details via castellan.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
passphrase = FakePassphrase()
mock_get_manager.return_value.get.return_value = passphrase
s_id, s_encoded = crypto.ensure_vtpm_secret(self.ctxt, instance)
mock_get_manager.return_value.get.assert_called_once_with(
self.ctxt, uuids.vtpm)
self.assertEqual(passphrase.id, s_id)
self.assertEqual(passphrase.get_encoded(), s_encoded)
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_error(self, mock_get_manager):
"""Check behavior when we fail to retrieve a secret via castellan.
We should bubble up the error.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.get.side_effect = (
castellan_exception.ManagedObjectNotFoundError(uuid=uuids.vtpm))
self.assertRaises(
castellan_exception.ManagedObjectNotFoundError,
crypto.ensure_vtpm_secret,
self.ctxt, instance)
@mock.patch('castellan.common.objects.passphrase.Passphrase')
@mock.patch.object(crypto, '_get_key_manager')
def test_ensure_vtpm_secret_no_secret(self, mock_get_manager, mock_pass):
"""Check behavior when instance has no associated vTPM secret.
We should create a new one.
"""
instance = objects.Instance()
instance.uuid = uuids.instance
instance.system_metadata = {}
mock_get_manager.return_value.store.return_value = uuids.secret
passphrase = FakePassphrase()
mock_pass.return_value = passphrase
with mock.patch.object(instance, 'save') as mock_save:
secret_uuid, _ = crypto.ensure_vtpm_secret(self.ctxt, instance)
mock_pass.assert_called_once_with(mock.ANY, name=mock.ANY)
mock_get_manager.return_value.store.assert_called_once_with(
self.ctxt, passphrase)
mock_save.assert_called_once()
self.assertEqual(uuids.secret, secret_uuid)
@mock.patch.object(crypto, '_get_key_manager')
def test_delete_vtpm_secret(self, mock_get_manager):
"""Check behavior when instance has an associated vTPM secret.
We should delete it.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
with mock.patch.object(instance, 'save') as mock_save:
crypto.delete_vtpm_secret(self.ctxt, instance)
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
mock_save.assert_called_once()
mock_get_manager.assert_called_once()
mock_get_manager.return_value.delete.assert_called_once_with(
self.ctxt, uuids.vtpm,
)
@mock.patch.object(crypto, '_get_key_manager')
def test_delete_vtpm_secret_error(self, mock_get_manager):
"""Check behavior when we fail to retrieve the secret via castellan.
We should carry on and delete the reference from the instance.
"""
instance = objects.Instance()
instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm}
mock_get_manager.return_value.delete.side_effect = (
castellan_exception.ManagedObjectNotFoundError(uuid=uuids.vtpm))
with mock.patch.object(instance, 'save') as mock_save:
crypto.delete_vtpm_secret(self.ctxt, instance)
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
mock_save.assert_called_once()
mock_get_manager.assert_called_once()
mock_get_manager.return_value.delete.assert_called_once_with(
self.ctxt, uuids.vtpm,
)
@mock.patch.object(crypto, '_get_key_manager')
def test_delete_vtpm_secret_no_secret(self, mock_get_manager):
"""Check behavior when instance has no associated vTPM secret.
This should be effectively a no-op.
"""
instance = objects.Instance()
instance.system_metadata = {}
crypto.delete_vtpm_secret(self.ctxt, instance)
mock_get_manager.assert_not_called()