Merge "TPM: add RequestContext checks to functional tests"

This commit is contained in:
Zuul
2025-11-19 15:51:15 +00:00
committed by Gerrit Code Review
2 changed files with 72 additions and 5 deletions

View File

@@ -600,11 +600,13 @@ class InstanceHelperMixin:
self.notifier.wait_for_versioned_notifications('instance.rebuild.end') self.notifier.wait_for_versioned_notifications('instance.rebuild.end')
return self._wait_for_state_change(server, expected_state) return self._wait_for_state_change(server, expected_state)
def _migrate_server(self, server, host=None): def _migrate_server(self, server, host=None,
expected_state='VERIFY_RESIZE', api=None):
"""Cold migrate a server.""" """Cold migrate a server."""
body = {'host': host} if host else None body = {'host': host} if host else None
self.api.post_server_action(server['id'], {'migrate': body}) api = api or self.api
return self._wait_for_state_change(server, 'VERIFY_RESIZE') api.post_server_action(server['id'], {'migrate': body})
return self._wait_for_state_change(server, expected_state)
def _resize_server(self, server, flavor_id): def _resize_server(self, server, flavor_id):
self.api.post_server_action( self.api.post_server_action(

View File

@@ -15,6 +15,7 @@
from unittest import mock from unittest import mock
from castellan.common import exception as castellan_exc
from castellan.common.objects import passphrase from castellan.common.objects import passphrase
from castellan.key_manager import key_manager from castellan.key_manager import key_manager
import fixtures import fixtures
@@ -28,6 +29,7 @@ from nova import exception
from nova import objects from nova import objects
from nova.tests.functional.api import client from nova.tests.functional.api import client
from nova.tests.functional.libvirt import base from nova.tests.functional.libvirt import base
from nova import utils
CONF = nova.conf.CONF CONF = nova.conf.CONF
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@@ -48,6 +50,9 @@ class FakeKeyManager(key_manager.KeyManager):
#: A mapping of UUIDs to passphrases. #: A mapping of UUIDs to passphrases.
self._passphrases = {} self._passphrases = {}
#: A mapping of UUIDs to RequestContext objects.
self._contexts = {}
def create_key(self, context, algorithm, length, **kwargs): def create_key(self, context, algorithm, length, **kwargs):
"""Creates a symmetric key. """Creates a symmetric key.
@@ -78,6 +83,7 @@ class FakeKeyManager(key_manager.KeyManager):
uuid = uuidutils.generate_uuid() uuid = uuidutils.generate_uuid()
managed_object._id = uuid # set the id to simulate persistence managed_object._id = uuid # set the id to simulate persistence
self._passphrases[uuid] = managed_object self._passphrases[uuid] = managed_object
self._contexts[uuid] = context
return uuid return uuid
@@ -91,6 +97,12 @@ class FakeKeyManager(key_manager.KeyManager):
if context is None: if context is None:
raise exception.Forbidden() raise exception.Forbidden()
if context.user_id != self._contexts[managed_object_id].user_id:
raise castellan_exc.KeyManagerError(
'Key manager error: Forbidden: Secret payload retrieval '
'attempt not allowed - please review your user/project '
'privileges')
if managed_object_id not in self._passphrases: if managed_object_id not in self._passphrases:
raise KeyError('cannot retrieve non-existent secret') raise KeyError('cannot retrieve non-existent secret')
@@ -104,11 +116,18 @@ class FakeKeyManager(key_manager.KeyManager):
if context is None: if context is None:
raise exception.Forbidden() raise exception.Forbidden()
if context.user_id != self._contexts[managed_object_id].user_id:
raise castellan_exc.KeyManagerError(
'Key manager error: Forbidden: Secret payload retrieval '
'attempt not allowed - please review your user/project '
'privileges')
if managed_object_id not in self._passphrases: if managed_object_id not in self._passphrases:
raise exception.KeyManagerError( raise exception.KeyManagerError(
reason="cannot delete non-existent secret") reason="cannot delete non-existent secret")
del self._passphrases[managed_object_id] del self._passphrases[managed_object_id]
del self._contexts[managed_object_id]
def add_consumer(self, context, managed_object_id, consumer_data): def add_consumer(self, context, managed_object_id, consumer_data):
raise NotImplementedError( raise NotImplementedError(
@@ -123,8 +142,11 @@ class FakeKeyManager(key_manager.KeyManager):
class VTPMServersTest(base.ServersTestBase): class VTPMServersTest(base.ServersTestBase):
# many move operations are admin-only # NOTE: ADMIN_API is intentionally not set to True in order to catch key
ADMIN_API = True # manager service secret ownership issues.
# Reflect reality more for async API requests like migration
CAST_AS_CALL = False
def setUp(self): def setUp(self):
# enable vTPM and use our own fake key service # enable vTPM and use our own fake key service
@@ -372,6 +394,16 @@ class VTPMServersTest(base.ServersTestBase):
self.assertInstanceHasNoSecret(server) self.assertInstanceHasNoSecret(server)
def test_migrate_server(self): def test_migrate_server(self):
"""Test cold migrate as a non-admin user.
Cold migrate policy defaults to admin-only but this will not currently
work as admin due to key manager service secret ownership.
"""
# Allow non-admin to cold migrate a server.
rules = {
'os_compute_api:os-migrate-server:migrate': 'rule:admin_or_owner'}
self.policy.set_rules(rules, overwrite=False)
for host in ('test_compute0', 'test_compute1'): for host in ('test_compute0', 'test_compute1'):
self.start_compute(host) self.start_compute(host)
@@ -394,6 +426,39 @@ class VTPMServersTest(base.ServersTestBase):
# ensure nothing has changed # ensure nothing has changed
self.assertInstanceHasSecret(server) self.assertInstanceHasSecret(server)
def test_migrate_server_as_admin(self):
"""Test cold migrate as an admin user.
Cold migrate policy defaults to admin-only but this will not currently
work as admin due to key manager service secret ownership.
"""
for host in ('test_compute0', 'test_compute1'):
self.start_compute(host)
# create a server with vTPM
server = self._create_server_with_vtpm()
# ensure our instance's system_metadata field is correct
self.assertInstanceHasSecret(server)
with mock.patch(
'nova.virt.libvirt.driver.LibvirtDriver'
'.migrate_disk_and_power_off', return_value='{}',
):
# cold migrate the server
self._migrate_server(
server, expected_state='ERROR', api=self.admin_api)
# Migration should have failed due to permissions error.
# Need microversion 2.84 to get events.details field.
with utils.temporary_mutation(self.admin_api, microversion='2.84'):
event = self._wait_for_instance_action_event(
server, 'migrate', 'compute_finish_resize', 'Error')
msg = ('Key manager error: Forbidden: Secret payload retrieval '
'attempt not allowed - please review your user/project '
'privileges')
self.assertIn(msg, event['details'])
def test_live_migrate_server(self): def test_live_migrate_server(self):
for host in ('test_compute0', 'test_compute1'): for host in ('test_compute0', 'test_compute1'):
self.start_compute(host) self.start_compute(host)