From 6ac228782651bc319fa149749e0d229f90f47adc Mon Sep 17 00:00:00 2001 From: Stephen Finucane Date: Fri, 3 Jul 2020 11:06:11 +0100 Subject: [PATCH] crypto: Add support for creating, destroying vTPM secrets Provide a method to communicate with the configured key manager service using Castellan, a generic key manager interface that supports multiple backends including - but not limited to - Barbican. Once again, there's nothing using this yet but tests, though this will change shortly. Part of blueprint add-emulated-virtual-tpm Change-Id: Iff6195d252b018f008bb9d137e4d80c54b70b2d1 Signed-off-by: Stephen Finucane --- nova/crypto.py | 106 ++++++++++++++++++++++++++ nova/tests/unit/test_crypto.py | 131 +++++++++++++++++++++++++++++++++ 2 files changed, 237 insertions(+) diff --git a/nova/crypto.py b/nova/crypto.py index b927f0c255f5..05d6196c4114 100644 --- a/nova/crypto.py +++ b/nova/crypto.py @@ -25,6 +25,9 @@ import io import os import typing as ty +from castellan.common import exception as castellan_exception +from castellan.common.objects import passphrase +from castellan import key_manager from cryptography.hazmat import backends from cryptography.hazmat.primitives.asymmetric import padding from cryptography.hazmat.primitives import hashes @@ -35,8 +38,10 @@ from oslo_log import log as logging import paramiko import nova.conf +from nova import context as nova_context from nova import exception from nova.i18n import _ +from nova import objects from nova import utils @@ -44,6 +49,17 @@ LOG = logging.getLogger(__name__) CONF = nova.conf.CONF +_KEYMGR = None + +_VTPM_SECRET_BYTE_LENGTH = 384 + + +def _get_key_manager(): + global _KEYMGR + if _KEYMGR is None: + _KEYMGR = key_manager.API(configuration=CONF) + return _KEYMGR + def generate_fingerprint(public_key: str) -> str: try: @@ -148,3 +164,93 @@ def _create_x509_openssl_config(conffile: str, upn: str): with open(conffile, 'w') as file: file.write(content % upn) + + +def ensure_vtpm_secret( + context: nova_context.RequestContext, + instance: 'objects.Instance', +) -> ty.Tuple[str, str]: + """Communicates with the key manager service to retrieve or create a secret + for an instance's emulated TPM. + + When creating a secret, its UUID is saved to the instance's system_metadata + as ``vtpm_secret_uuid``. + + :param context: Nova auth context. + :param instance: Instance object. + :return: A tuple comprising (secret_uuid, passphrase). + :raise: castellan_exception.ManagedObjectNotFoundError if communication + with the key manager API fails, or if a vtpm_secret_uuid was present in + the instance's system metadata but could not be found in the key + manager service. + """ + key_mgr = _get_key_manager() + + secret_uuid = instance.system_metadata.get('vtpm_secret_uuid') + if secret_uuid is not None: + # Try to retrieve the secret from the key manager + try: + secret = key_mgr.get(context, secret_uuid) + # assert secret_uuid == secret.id ? + LOG.debug( + "Found existing vTPM secret with UUID %s.", + secret_uuid, instance=instance) + return secret.id, secret.get_encoded() + except castellan_exception.ManagedObjectNotFoundError: + LOG.warning( + "Despite being set on the instance, failed to find a vTPM " + "secret with UUID %s. This should only happen if the secret " + "was manually deleted from the key manager service. Your vTPM " + "is likely to be unrecoverable.", + secret_uuid, instance=instance) + raise + + # If we get here, the instance has no vtpm_secret_uuid. Create a new one + # and register it with the key manager. + secret = base64.b64encode(os.urandom(_VTPM_SECRET_BYTE_LENGTH)) + # Castellan ManagedObject + cmo = passphrase.Passphrase( + secret, name="vTPM secret for instance %s" % instance.uuid) + secret_uuid = key_mgr.store(context, cmo) + LOG.debug("Created vTPM secret with UUID %s", + secret_uuid, instance=instance) + + instance.system_metadata['vtpm_secret_uuid'] = secret_uuid + instance.save() + return secret_uuid, secret + + +def delete_vtpm_secret( + context: nova_context.RequestContext, + instance: 'objects.Instance', +): + """Communicates with the key manager service to destroy the secret for an + instance's emulated TPM. + + This operation is idempotent: if the instance never had a vTPM secret, OR + if the secret has already been deleted, it is a no-op. + + The ``vtpm_secret_uuid`` member of the instance's system_metadata is + cleared as a side effect of this method. + + :param context: Nova auth context. + :param instance: Instance object. + :return: None + :raise: castellan_exception.ManagedObjectNotFoundError if communication + with the key manager API. + """ + secret_uuid = instance.system_metadata.get('vtpm_secret_uuid') + if not secret_uuid: + return + + key_mgr = _get_key_manager() + try: + key_mgr.delete(context, secret_uuid) + LOG.debug("Deleted vTPM secret with UUID %s", + secret_uuid, instance=instance) + except castellan_exception.ManagedObjectNotFoundError: + LOG.debug("vTPM secret with UUID %s already deleted or never existed.", + secret_uuid, instance=instance) + + del instance.system_metadata['vtpm_secret_uuid'] + instance.save() diff --git a/nova/tests/unit/test_crypto.py b/nova/tests/unit/test_crypto.py index 0929f70881b6..c352756d272c 100644 --- a/nova/tests/unit/test_crypto.py +++ b/nova/tests/unit/test_crypto.py @@ -18,15 +18,19 @@ Tests for Crypto module. import os +from castellan.common import exception as castellan_exception from cryptography.hazmat import backends from cryptography.hazmat.primitives import serialization import mock from oslo_concurrency import processutils +from oslo_utils.fixture import uuidsentinel as uuids import paramiko import six +from nova import context as nova_context from nova import crypto from nova import exception +from nova import objects from nova import test from nova import utils @@ -219,3 +223,130 @@ class KeyPairTest(test.NoDBTestCase): (private_key, public_key, fingerprint) = crypto.generate_key_pair() self.assertEqual(self.rsa_pub, public_key) self.assertEqual(self.rsa_fp, fingerprint) + + +class FakePassphrase(): + """A fake castellan ManagedObject.""" + + def __init__(self): + self.id = 1 + + def get_encoded(self): + return b'foo' + + +class VTPMTest(test.NoDBTestCase): + + def setUp(self): + super().setUp() + self.ctxt = nova_context.get_admin_context() + + @mock.patch.object(crypto, '_get_key_manager') + def test_ensure_vtpm_secret(self, mock_get_manager): + """Check behavior when instance already has an associated secret. + + We should attempt to retrieve the details via castellan. + """ + instance = objects.Instance() + instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm} + passphrase = FakePassphrase() + mock_get_manager.return_value.get.return_value = passphrase + + s_id, s_encoded = crypto.ensure_vtpm_secret(self.ctxt, instance) + + mock_get_manager.return_value.get.assert_called_once_with( + self.ctxt, uuids.vtpm) + self.assertEqual(passphrase.id, s_id) + self.assertEqual(passphrase.get_encoded(), s_encoded) + + @mock.patch.object(crypto, '_get_key_manager') + def test_ensure_vtpm_secret_error(self, mock_get_manager): + """Check behavior when we fail to retrieve a secret via castellan. + + We should bubble up the error. + """ + instance = objects.Instance() + instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm} + mock_get_manager.return_value.get.side_effect = ( + castellan_exception.ManagedObjectNotFoundError(uuid=uuids.vtpm)) + + self.assertRaises( + castellan_exception.ManagedObjectNotFoundError, + crypto.ensure_vtpm_secret, + self.ctxt, instance) + + @mock.patch('castellan.common.objects.passphrase.Passphrase') + @mock.patch.object(crypto, '_get_key_manager') + def test_ensure_vtpm_secret_no_secret(self, mock_get_manager, mock_pass): + """Check behavior when instance has no associated vTPM secret. + + We should create a new one. + """ + instance = objects.Instance() + instance.uuid = uuids.instance + instance.system_metadata = {} + mock_get_manager.return_value.store.return_value = uuids.secret + passphrase = FakePassphrase() + mock_pass.return_value = passphrase + + with mock.patch.object(instance, 'save') as mock_save: + secret_uuid, _ = crypto.ensure_vtpm_secret(self.ctxt, instance) + + mock_pass.assert_called_once_with(mock.ANY, name=mock.ANY) + mock_get_manager.return_value.store.assert_called_once_with( + self.ctxt, passphrase) + mock_save.assert_called_once() + self.assertEqual(uuids.secret, secret_uuid) + + @mock.patch.object(crypto, '_get_key_manager') + def test_delete_vtpm_secret(self, mock_get_manager): + """Check behavior when instance has an associated vTPM secret. + + We should delete it. + """ + instance = objects.Instance() + instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm} + + with mock.patch.object(instance, 'save') as mock_save: + crypto.delete_vtpm_secret(self.ctxt, instance) + + self.assertNotIn('vtpm_secret_uuid', instance.system_metadata) + mock_save.assert_called_once() + mock_get_manager.assert_called_once() + mock_get_manager.return_value.delete.assert_called_once_with( + self.ctxt, uuids.vtpm, + ) + + @mock.patch.object(crypto, '_get_key_manager') + def test_delete_vtpm_secret_error(self, mock_get_manager): + """Check behavior when we fail to retrieve the secret via castellan. + + We should carry on and delete the reference from the instance. + """ + instance = objects.Instance() + instance.system_metadata = {'vtpm_secret_uuid': uuids.vtpm} + mock_get_manager.return_value.delete.side_effect = ( + castellan_exception.ManagedObjectNotFoundError(uuid=uuids.vtpm)) + + with mock.patch.object(instance, 'save') as mock_save: + crypto.delete_vtpm_secret(self.ctxt, instance) + + self.assertNotIn('vtpm_secret_uuid', instance.system_metadata) + mock_save.assert_called_once() + mock_get_manager.assert_called_once() + mock_get_manager.return_value.delete.assert_called_once_with( + self.ctxt, uuids.vtpm, + ) + + @mock.patch.object(crypto, '_get_key_manager') + def test_delete_vtpm_secret_no_secret(self, mock_get_manager): + """Check behavior when instance has no associated vTPM secret. + + This should be effectively a no-op. + """ + instance = objects.Instance() + instance.system_metadata = {} + + crypto.delete_vtpm_secret(self.ctxt, instance) + + mock_get_manager.assert_not_called()