TPM: add RequestContext checks to functional tests

Key manager service secret ownership can be a challenge when dealing
vTPM instances. Some instance actions require access to the secret and
will fail if there is a mismatch.

In preparation for vTPM live migration changes which will involve
different users accessing secrets (user|admin|Nova service user), this
removes ADMIN_ONLY from the functional tests class and adds checking of
RequestContext user_id in the FakeKeyManager.

Change-Id: I2790cd274a4776ab306b39df1e591e8304b63f96
Signed-off-by: melanie witt <melwittt@gmail.com>
This commit is contained in:
melanie witt
2025-11-07 21:42:21 +00:00
parent cf930034f2
commit 0f82c2953e
2 changed files with 72 additions and 5 deletions

View File

@@ -600,11 +600,13 @@ class InstanceHelperMixin:
self.notifier.wait_for_versioned_notifications('instance.rebuild.end')
return self._wait_for_state_change(server, expected_state)
def _migrate_server(self, server, host=None):
def _migrate_server(self, server, host=None,
expected_state='VERIFY_RESIZE', api=None):
"""Cold migrate a server."""
body = {'host': host} if host else None
self.api.post_server_action(server['id'], {'migrate': body})
return self._wait_for_state_change(server, 'VERIFY_RESIZE')
api = api or self.api
api.post_server_action(server['id'], {'migrate': body})
return self._wait_for_state_change(server, expected_state)
def _resize_server(self, server, flavor_id):
self.api.post_server_action(

View File

@@ -15,6 +15,7 @@
from unittest import mock
from castellan.common import exception as castellan_exc
from castellan.common.objects import passphrase
from castellan.key_manager import key_manager
import fixtures
@@ -28,6 +29,7 @@ from nova import exception
from nova import objects
from nova.tests.functional.api import client
from nova.tests.functional.libvirt import base
from nova import utils
CONF = nova.conf.CONF
LOG = logging.getLogger(__name__)
@@ -48,6 +50,9 @@ class FakeKeyManager(key_manager.KeyManager):
#: A mapping of UUIDs to passphrases.
self._passphrases = {}
#: A mapping of UUIDs to RequestContext objects.
self._contexts = {}
def create_key(self, context, algorithm, length, **kwargs):
"""Creates a symmetric key.
@@ -78,6 +83,7 @@ class FakeKeyManager(key_manager.KeyManager):
uuid = uuidutils.generate_uuid()
managed_object._id = uuid # set the id to simulate persistence
self._passphrases[uuid] = managed_object
self._contexts[uuid] = context
return uuid
@@ -91,6 +97,12 @@ class FakeKeyManager(key_manager.KeyManager):
if context is None:
raise exception.Forbidden()
if context.user_id != self._contexts[managed_object_id].user_id:
raise castellan_exc.KeyManagerError(
'Key manager error: Forbidden: Secret payload retrieval '
'attempt not allowed - please review your user/project '
'privileges')
if managed_object_id not in self._passphrases:
raise KeyError('cannot retrieve non-existent secret')
@@ -104,11 +116,18 @@ class FakeKeyManager(key_manager.KeyManager):
if context is None:
raise exception.Forbidden()
if context.user_id != self._contexts[managed_object_id].user_id:
raise castellan_exc.KeyManagerError(
'Key manager error: Forbidden: Secret payload retrieval '
'attempt not allowed - please review your user/project '
'privileges')
if managed_object_id not in self._passphrases:
raise exception.KeyManagerError(
reason="cannot delete non-existent secret")
del self._passphrases[managed_object_id]
del self._contexts[managed_object_id]
def add_consumer(self, context, managed_object_id, consumer_data):
raise NotImplementedError(
@@ -123,8 +142,11 @@ class FakeKeyManager(key_manager.KeyManager):
class VTPMServersTest(base.ServersTestBase):
# many move operations are admin-only
ADMIN_API = True
# NOTE: ADMIN_API is intentionally not set to True in order to catch key
# manager service secret ownership issues.
# Reflect reality more for async API requests like migration
CAST_AS_CALL = False
def setUp(self):
# enable vTPM and use our own fake key service
@@ -372,6 +394,16 @@ class VTPMServersTest(base.ServersTestBase):
self.assertInstanceHasNoSecret(server)
def test_migrate_server(self):
"""Test cold migrate as a non-admin user.
Cold migrate policy defaults to admin-only but this will not currently
work as admin due to key manager service secret ownership.
"""
# Allow non-admin to cold migrate a server.
rules = {
'os_compute_api:os-migrate-server:migrate': 'rule:admin_or_owner'}
self.policy.set_rules(rules, overwrite=False)
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
@@ -394,6 +426,39 @@ class VTPMServersTest(base.ServersTestBase):
# ensure nothing has changed
self.assertInstanceHasSecret(server)
def test_migrate_server_as_admin(self):
"""Test cold migrate as an admin user.
Cold migrate policy defaults to admin-only but this will not currently
work as admin due to key manager service secret ownership.
"""
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# cold migrate the server
self._migrate_server(
server, expected_state='ERROR', api=self.admin_api)
# Migration should have failed due to permissions error.
# Need microversion 2.84 to get events.details field.
with utils.temporary_mutation(self.admin_api, microversion='2.84'):
event = self._wait_for_instance_action_event(
server, 'migrate', 'compute_finish_resize', 'Error')
msg = ('Key manager error: Forbidden: Secret payload retrieval '
'attempt not allowed - please review your user/project '
'privileges')
self.assertIn(msg, event['details'])
def test_live_migrate_server(self):
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)