diff --git a/nova/tests/functional/integrated_helpers.py b/nova/tests/functional/integrated_helpers.py index 50899e45d543..dc07a468b640 100644 --- a/nova/tests/functional/integrated_helpers.py +++ b/nova/tests/functional/integrated_helpers.py @@ -600,11 +600,13 @@ class InstanceHelperMixin: self.notifier.wait_for_versioned_notifications('instance.rebuild.end') return self._wait_for_state_change(server, expected_state) - def _migrate_server(self, server, host=None): + def _migrate_server(self, server, host=None, + expected_state='VERIFY_RESIZE', api=None): """Cold migrate a server.""" body = {'host': host} if host else None - self.api.post_server_action(server['id'], {'migrate': body}) - return self._wait_for_state_change(server, 'VERIFY_RESIZE') + api = api or self.api + api.post_server_action(server['id'], {'migrate': body}) + return self._wait_for_state_change(server, expected_state) def _resize_server(self, server, flavor_id): self.api.post_server_action( diff --git a/nova/tests/functional/libvirt/test_vtpm.py b/nova/tests/functional/libvirt/test_vtpm.py index 6fb36dc404b2..c9ba535e0039 100644 --- a/nova/tests/functional/libvirt/test_vtpm.py +++ b/nova/tests/functional/libvirt/test_vtpm.py @@ -15,6 +15,7 @@ from unittest import mock +from castellan.common import exception as castellan_exc from castellan.common.objects import passphrase from castellan.key_manager import key_manager import fixtures @@ -28,6 +29,7 @@ from nova import exception from nova import objects from nova.tests.functional.api import client from nova.tests.functional.libvirt import base +from nova import utils CONF = nova.conf.CONF LOG = logging.getLogger(__name__) @@ -48,6 +50,9 @@ class FakeKeyManager(key_manager.KeyManager): #: A mapping of UUIDs to passphrases. self._passphrases = {} + #: A mapping of UUIDs to RequestContext objects. + self._contexts = {} + def create_key(self, context, algorithm, length, **kwargs): """Creates a symmetric key. @@ -78,6 +83,7 @@ class FakeKeyManager(key_manager.KeyManager): uuid = uuidutils.generate_uuid() managed_object._id = uuid # set the id to simulate persistence self._passphrases[uuid] = managed_object + self._contexts[uuid] = context return uuid @@ -91,6 +97,12 @@ class FakeKeyManager(key_manager.KeyManager): if context is None: raise exception.Forbidden() + if context.user_id != self._contexts[managed_object_id].user_id: + raise castellan_exc.KeyManagerError( + 'Key manager error: Forbidden: Secret payload retrieval ' + 'attempt not allowed - please review your user/project ' + 'privileges') + if managed_object_id not in self._passphrases: raise KeyError('cannot retrieve non-existent secret') @@ -104,11 +116,18 @@ class FakeKeyManager(key_manager.KeyManager): if context is None: raise exception.Forbidden() + if context.user_id != self._contexts[managed_object_id].user_id: + raise castellan_exc.KeyManagerError( + 'Key manager error: Forbidden: Secret payload retrieval ' + 'attempt not allowed - please review your user/project ' + 'privileges') + if managed_object_id not in self._passphrases: raise exception.KeyManagerError( reason="cannot delete non-existent secret") del self._passphrases[managed_object_id] + del self._contexts[managed_object_id] def add_consumer(self, context, managed_object_id, consumer_data): raise NotImplementedError( @@ -123,8 +142,11 @@ class FakeKeyManager(key_manager.KeyManager): class VTPMServersTest(base.ServersTestBase): - # many move operations are admin-only - ADMIN_API = True + # NOTE: ADMIN_API is intentionally not set to True in order to catch key + # manager service secret ownership issues. + + # Reflect reality more for async API requests like migration + CAST_AS_CALL = False def setUp(self): # enable vTPM and use our own fake key service @@ -372,6 +394,16 @@ class VTPMServersTest(base.ServersTestBase): self.assertInstanceHasNoSecret(server) def test_migrate_server(self): + """Test cold migrate as a non-admin user. + + Cold migrate policy defaults to admin-only but this will not currently + work as admin due to key manager service secret ownership. + """ + # Allow non-admin to cold migrate a server. + rules = { + 'os_compute_api:os-migrate-server:migrate': 'rule:admin_or_owner'} + self.policy.set_rules(rules, overwrite=False) + for host in ('test_compute0', 'test_compute1'): self.start_compute(host) @@ -394,6 +426,39 @@ class VTPMServersTest(base.ServersTestBase): # ensure nothing has changed self.assertInstanceHasSecret(server) + def test_migrate_server_as_admin(self): + """Test cold migrate as an admin user. + + Cold migrate policy defaults to admin-only but this will not currently + work as admin due to key manager service secret ownership. + """ + for host in ('test_compute0', 'test_compute1'): + self.start_compute(host) + + # create a server with vTPM + server = self._create_server_with_vtpm() + + # ensure our instance's system_metadata field is correct + self.assertInstanceHasSecret(server) + + with mock.patch( + 'nova.virt.libvirt.driver.LibvirtDriver' + '.migrate_disk_and_power_off', return_value='{}', + ): + # cold migrate the server + self._migrate_server( + server, expected_state='ERROR', api=self.admin_api) + + # Migration should have failed due to permissions error. + # Need microversion 2.84 to get events.details field. + with utils.temporary_mutation(self.admin_api, microversion='2.84'): + event = self._wait_for_instance_action_event( + server, 'migrate', 'compute_finish_resize', 'Error') + msg = ('Key manager error: Forbidden: Secret payload retrieval ' + 'attempt not allowed - please review your user/project ' + 'privileges') + self.assertIn(msg, event['details']) + def test_live_migrate_server(self): for host in ('test_compute0', 'test_compute1'): self.start_compute(host)