f8cf050a13
In py310 unittest.mock does not allow to mock the same function twice as the second mocking will fail to autospec the Mock object created by the first mocking. This patch manually fixes the double mocking. Fixed cases: 1) one of the mock was totally unnecessary so it was removed 2) the second mock specialized the behavior of the first generic mock. In this case the second mock is replaced with the configuration of the first mock 3) a test case with two test steps mocked the same function for each step with overlapping mocks. Here the overlap was removed to have the two mock exists independently The get_connection injection in the libvirt functional test needed a further tweak (yeah I know it has many already) to act like a single mock (basically case #2) instead of a temporary re-mocking. Still the globalness of the get_connection mocking warrant the special set / reset logic there. Change-Id: I3998d0d49583806ac1c3ae64f1b1fe343cefd20d
372 lines
13 KiB
Python
372 lines
13 KiB
Python
# Copyright (C) 2020 Red Hat, Inc.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from unittest import mock
|
|
|
|
from castellan.common.objects import passphrase
|
|
from castellan.key_manager import key_manager
|
|
from oslo_log import log as logging
|
|
from oslo_utils import uuidutils
|
|
|
|
import nova.conf
|
|
from nova import context as nova_context
|
|
from nova import crypto
|
|
from nova import exception
|
|
from nova import objects
|
|
from nova.tests.functional.api import client
|
|
from nova.tests.functional.libvirt import base
|
|
|
|
CONF = nova.conf.CONF
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class FakeKeyManager(key_manager.KeyManager):
|
|
"""A fake key manager.
|
|
|
|
This key manager implementation supports a minimum subset of methods
|
|
specified by the key manager interface that are required for vTPM. Side
|
|
effects (e.g., raising exceptions) for each method are handled as specified
|
|
by the key manager interface.
|
|
"""
|
|
|
|
def __init__(self, configuration):
|
|
super().__init__(configuration)
|
|
|
|
#: A mapping of UUIDs to passphrases.
|
|
self._passphrases = {}
|
|
|
|
def create_key(self, context, algorithm, length, **kwargs):
|
|
"""Creates a symmetric key.
|
|
|
|
This is not implemented as it's unnecessary here.
|
|
"""
|
|
raise NotImplementedError(
|
|
"FakeKeyManager does not support symmetric keys"
|
|
)
|
|
|
|
def create_key_pair(self, context, **kwargs):
|
|
"""Creates an asymmetric keypair.
|
|
|
|
This is not implemented as it's unnecessary here.
|
|
"""
|
|
raise NotImplementedError(
|
|
"FakeKeyManager does not support asymmetric keys"
|
|
)
|
|
|
|
def store(self, context, managed_object, **kwargs):
|
|
"""Stores (i.e., registers) a passphrase with the key manager."""
|
|
if context is None:
|
|
raise exception.Forbidden()
|
|
|
|
if not isinstance(managed_object, passphrase.Passphrase):
|
|
raise exception.KeyManagerError(
|
|
reason='cannot store anything except passphrases')
|
|
|
|
uuid = uuidutils.generate_uuid()
|
|
managed_object._id = uuid # set the id to simulate persistence
|
|
self._passphrases[uuid] = managed_object
|
|
|
|
return uuid
|
|
|
|
def get(self, context, managed_object_id):
|
|
"""Retrieves the key identified by the specified id.
|
|
|
|
This implementation returns the key that is associated with the
|
|
specified UUID. A Forbidden exception is raised if the specified
|
|
context is None; a KeyError is raised if the UUID is invalid.
|
|
"""
|
|
if context is None:
|
|
raise exception.Forbidden()
|
|
|
|
if managed_object_id not in self._passphrases:
|
|
raise KeyError('cannot retrieve non-existent secret')
|
|
|
|
return self._passphrases[managed_object_id]
|
|
|
|
def delete(self, context, managed_object_id):
|
|
"""Represents deleting the key.
|
|
|
|
Simply delete the key from our list of keys.
|
|
"""
|
|
if context is None:
|
|
raise exception.Forbidden()
|
|
|
|
if managed_object_id not in self._passphrases:
|
|
raise exception.KeyManagerError(
|
|
reason="cannot delete non-existent secret")
|
|
|
|
del self._passphrases[managed_object_id]
|
|
|
|
|
|
class VTPMServersTest(base.ServersTestBase):
|
|
|
|
# many move operations are admin-only
|
|
ADMIN_API = True
|
|
|
|
def setUp(self):
|
|
# enable vTPM and use our own fake key service
|
|
self.flags(swtpm_enabled=True, group='libvirt')
|
|
self.flags(
|
|
backend='nova.tests.functional.libvirt.test_vtpm.FakeKeyManager',
|
|
group='key_manager')
|
|
|
|
super().setUp()
|
|
|
|
# mock the '_check_vtpm_support' function which validates things like
|
|
# the presence of users on the host, none of which makes sense here
|
|
_p = mock.patch(
|
|
'nova.virt.libvirt.driver.LibvirtDriver._check_vtpm_support')
|
|
_p.start()
|
|
self.addCleanup(_p.stop)
|
|
|
|
self.key_mgr = crypto._get_key_manager()
|
|
|
|
def _create_server_with_vtpm(self):
|
|
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
|
|
flavor_id = self._create_flavor(extra_spec=extra_specs)
|
|
server = self._create_server(flavor_id=flavor_id)
|
|
|
|
return server
|
|
|
|
def _create_server_without_vtpm(self):
|
|
# use the default flavor (i.e. one without vTPM extra specs)
|
|
return self._create_server()
|
|
|
|
def assertInstanceHasSecret(self, server):
|
|
ctx = nova_context.get_admin_context()
|
|
instance = objects.Instance.get_by_uuid(ctx, server['id'])
|
|
self.assertIn('vtpm_secret_uuid', instance.system_metadata)
|
|
self.assertEqual(1, len(self.key_mgr._passphrases))
|
|
self.assertIn(
|
|
instance.system_metadata['vtpm_secret_uuid'],
|
|
self.key_mgr._passphrases)
|
|
|
|
def assertInstanceHasNoSecret(self, server):
|
|
ctx = nova_context.get_admin_context()
|
|
instance = objects.Instance.get_by_uuid(ctx, server['id'])
|
|
self.assertNotIn('vtpm_secret_uuid', instance.system_metadata)
|
|
self.assertEqual(0, len(self.key_mgr._passphrases))
|
|
|
|
def test_create_server(self):
|
|
compute = self.start_compute()
|
|
|
|
# ensure we are reporting the correct traits
|
|
traits = self._get_provider_traits(self.compute_rp_uuids[compute])
|
|
for trait in ('COMPUTE_SECURITY_TPM_1_2', 'COMPUTE_SECURITY_TPM_2_0'):
|
|
self.assertIn(trait, traits)
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# now delete the server
|
|
self._delete_server(server)
|
|
|
|
# ensure we deleted the key now that we no longer need it
|
|
self.assertEqual(0, len(self.key_mgr._passphrases))
|
|
|
|
def test_suspend_resume_server(self):
|
|
self.start_compute()
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# suspend the server
|
|
server = self._suspend_server(server)
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# resume the server
|
|
server = self._resume_server(server)
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is still correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
def test_soft_reboot_server(self):
|
|
self.start_compute()
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
|
|
# soft reboot the server
|
|
server = self._reboot_server(server, hard=False)
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is still correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
def test_hard_reboot_server(self):
|
|
self.start_compute()
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# hard reboot the server
|
|
server = self._reboot_server(server, hard=True)
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is still correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
def test_resize_server__no_vtpm_to_vtpm(self):
|
|
for host in ('test_compute0', 'test_compute1'):
|
|
self.start_compute(host)
|
|
|
|
# create a server without vTPM
|
|
server = self._create_server_without_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is correct
|
|
self.assertInstanceHasNoSecret(server)
|
|
|
|
# create a flavor with vTPM
|
|
extra_specs = {'hw:tpm_model': 'tpm-tis', 'hw:tpm_version': '1.2'}
|
|
flavor_id = self._create_flavor(extra_spec=extra_specs)
|
|
|
|
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
|
|
# probably be less...dumb
|
|
with mock.patch(
|
|
'nova.virt.libvirt.driver.LibvirtDriver'
|
|
'.migrate_disk_and_power_off', return_value='{}',
|
|
):
|
|
# resize the server to a new flavor *with* vTPM
|
|
server = self._resize_server(server, flavor_id=flavor_id)
|
|
|
|
# ensure our instance's system_metadata field and key manager inventory
|
|
# is updated to reflect the new vTPM requirement
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# revert the instance rather than confirming it, and ensure the secret
|
|
# is correctly cleaned up
|
|
|
|
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
|
|
# probably be less...dumb
|
|
with mock.patch(
|
|
'nova.virt.libvirt.driver.LibvirtDriver'
|
|
'.migrate_disk_and_power_off', return_value='{}',
|
|
):
|
|
# revert back to the old flavor *without* vTPM
|
|
server = self._revert_resize(server)
|
|
|
|
# ensure we delete the new key since we no longer need it
|
|
self.assertInstanceHasNoSecret(server)
|
|
|
|
def test_resize_server__vtpm_to_no_vtpm(self):
|
|
for host in ('test_compute0', 'test_compute1'):
|
|
self.start_compute(host)
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# ensure our instance's system_metadata field is correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# create a flavor without vTPM
|
|
flavor_id = self._create_flavor()
|
|
|
|
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
|
|
# probably be less...dumb
|
|
with mock.patch(
|
|
'nova.virt.libvirt.driver.LibvirtDriver'
|
|
'.migrate_disk_and_power_off', return_value='{}',
|
|
):
|
|
# resize the server to a new flavor *without* vTPM
|
|
server = self._resize_server(server, flavor_id=flavor_id)
|
|
|
|
# ensure we still have the key for the vTPM device in storage in case
|
|
# we revert
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# confirm the instance and ensure the secret is correctly cleaned up
|
|
|
|
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
|
|
# probably be less...dumb
|
|
with mock.patch(
|
|
'nova.virt.libvirt.driver.LibvirtDriver'
|
|
'.migrate_disk_and_power_off', return_value='{}',
|
|
):
|
|
# revert back to the old flavor *with* vTPM
|
|
server = self._confirm_resize(server)
|
|
|
|
# ensure we have finally deleted the key for the vTPM device since
|
|
# there is no going back now
|
|
self.assertInstanceHasNoSecret(server)
|
|
|
|
def test_migrate_server(self):
|
|
for host in ('test_compute0', 'test_compute1'):
|
|
self.start_compute(host)
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# ensure our instance's system_metadata field is correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# TODO(stephenfin): The mock of 'migrate_disk_and_power_off' should
|
|
# probably be less...dumb
|
|
with mock.patch(
|
|
'nova.virt.libvirt.driver.LibvirtDriver'
|
|
'.migrate_disk_and_power_off', return_value='{}',
|
|
):
|
|
# cold migrate the server
|
|
self._migrate_server(server)
|
|
|
|
# ensure nothing has changed
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
def test_live_migrate_server(self):
|
|
for host in ('test_compute0', 'test_compute1'):
|
|
self.start_compute(host)
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# ensure our instance's system_metadata field is correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# live migrate the server
|
|
self.assertRaises(
|
|
client.OpenStackApiException,
|
|
self._live_migrate_server, server)
|
|
|
|
def test_shelve_server(self):
|
|
for host in ('test_compute0', 'test_compute1'):
|
|
self.start_compute(host)
|
|
|
|
# create a server with vTPM
|
|
server = self._create_server_with_vtpm()
|
|
self.addCleanup(self._delete_server, server)
|
|
|
|
# ensure our instance's system_metadata field is correct
|
|
self.assertInstanceHasSecret(server)
|
|
|
|
# attempt to shelve the server
|
|
self.assertRaises(
|
|
client.OpenStackApiException,
|
|
self._shelve_server, server)
|