# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 Cloudbase Solutions Srl # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. """ Test suite for the Hyper-V driver and related APIs. """ import json import os import platform import shutil import sys import uuid from nova.compute import power_state from nova.compute import task_states from nova import context from nova import db from nova.image import glance from nova.openstack.common import cfg from nova.tests import fake_network from nova.tests.hyperv import basetestcase from nova.tests.hyperv import db_fakes from nova.tests.hyperv import hypervutils from nova.tests.hyperv import mockproxy import nova.tests.image.fake as fake_image from nova.tests import matchers from nova.virt.hyperv import constants from nova.virt.hyperv import driver as driver_hyperv from nova.virt.hyperv import vmutils from nova.virt import images CONF = cfg.CONF class HyperVAPITestCase(basetestcase.BaseTestCase): """Unit tests for Hyper-V driver calls.""" def setUp(self): super(HyperVAPITestCase, self).setUp() self._user_id = 'fake' self._project_id = 'fake' self._instance_data = None self._image_metadata = None self._dest_server = None self._fetched_image = None self._update_image_raise_exception = False self._post_method_called = False self._recover_method_called = False self._volume_target_portal = 'testtargetportal:3260' self._volume_id = '8957e088-dbee-4216-8056-978353a3e737' self._context = context.RequestContext(self._user_id, self._project_id) self._setup_stubs() self.flags(instances_path=r'C:\Hyper-V\test\instances', vswitch_name='external', network_api_class='nova.network.quantumv2.api.API') self._hypervutils = hypervutils.HyperVUtils() self._conn = driver_hyperv.HyperVDriver(None) def _setup_stubs(self): db_fakes.stub_out_db_instance_api(self.stubs) fake_image.stub_out_image_service(self.stubs) fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs) def fake_dumps(msg, default=None, **kwargs): return '""' self.stubs.Set(json, 'dumps', fake_dumps) def fake_fetch(context, image_id, target, user, project): self._fetched_image = target if not os.path.exists(target): self._hypervutils.create_vhd(target) self.stubs.Set(images, 'fetch', fake_fetch) def fake_get_remote_image_service(context, name): class FakeGlanceImageService(object): def update(self_fake, context, image_id, image_metadata, f): if self._update_image_raise_exception: raise vmutils.HyperVException( "Simulated update failure") self._image_metadata = image_metadata return (FakeGlanceImageService(), 1) self.stubs.Set(glance, 'get_remote_image_service', fake_get_remote_image_service) # Modules to mock modules_to_mock = [ 'wmi', 'os', 'shutil', 'uuid', 'time', 'multiprocessing', '_winreg', 'nova.virt.configdrive', 'nova.utils', 'ctypes' ] # Modules in which the mocks are going to be injected from nova.virt.hyperv import baseops from nova.virt.hyperv import basevolumeutils from nova.virt.hyperv import hostops from nova.virt.hyperv import livemigrationops from nova.virt.hyperv import snapshotops from nova.virt.hyperv import vif from nova.virt.hyperv import vmops from nova.virt.hyperv import volumeops from nova.virt.hyperv import volumeutils from nova.virt.hyperv import volumeutilsV2 modules_to_test = [ driver_hyperv, basevolumeutils, baseops, hostops, vif, vmops, vmutils, volumeops, volumeutils, volumeutilsV2, snapshotops, livemigrationops, hypervutils, db_fakes, sys.modules[__name__] ] self._inject_mocks_in_modules(modules_to_mock, modules_to_test) if isinstance(snapshotops.wmi, mockproxy.Mock): from nova.virt.hyperv import ioutils import StringIO def fake_open(name, mode): return StringIO.StringIO("fake file content") self.stubs.Set(ioutils, 'open', fake_open) def tearDown(self): try: if self._instance_data and self._hypervutils.vm_exists( self._instance_data["name"]): self._hypervutils.remove_vm(self._instance_data["name"]) if self._dest_server and \ self._hypervutils.remote_vm_exists(self._dest_server, self._instance_data["name"]): self._hypervutils.remove_remote_vm(self._dest_server, self._instance_data["name"]) self._hypervutils.logout_iscsi_volume_sessions(self._volume_id) shutil.rmtree(CONF.instances_path, True) fake_image.FakeImageService_reset() finally: super(HyperVAPITestCase, self).tearDown() def test_get_available_resource(self): dic = self._conn.get_available_resource(None) self.assertEquals(dic['hypervisor_hostname'], platform.node()) def test_get_host_stats(self): dic = self._conn.get_host_stats(True) self.assertEquals(dic['disk_total'], dic['disk_used'] + dic['disk_available']) self.assertEquals(dic['host_memory_total'], dic['host_memory_overhead'] + dic['host_memory_free']) def test_list_instances(self): num_vms = self._hypervutils.get_vm_count() instances = self._conn.list_instances() self.assertEquals(len(instances), num_vms) def test_get_info(self): self._spawn_instance(True) info = self._conn.get_info(self._instance_data) self.assertEquals(info["state"], power_state.RUNNING) def test_spawn_cow_image(self): self._test_spawn_instance(True) def test_spawn_no_cow_image(self): self._test_spawn_instance(False) def test_spawn_config_drive(self): self.skip('broken by move to contextlib for configdrive') self.flags(force_config_drive=True) self.flags(mkisofs_cmd='mkisofs.exe') self._spawn_instance(True) (vhd_paths, _, dvd_paths) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(dvd_paths), 0) self.assertEquals(len(vhd_paths), 2) def test_spawn_config_drive_cdrom(self): self.skip('broken by move to contextlib for configdrive') self.flags(force_config_drive=True) self.flags(config_drive_cdrom=True) self.flags(mkisofs_cmd='mkisofs.exe') self._spawn_instance(True) (vhd_paths, _, dvd_paths) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(dvd_paths), 1) self.assertEquals(len(vhd_paths), 1) self.assertTrue(os.path.exists(dvd_paths[0])) def test_spawn_no_config_drive(self): self.flags(force_config_drive=False) self._spawn_instance(True) (_, _, dvd_paths) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(dvd_paths), 0) def test_spawn_no_vswitch_exception(self): self.flags(network_api_class='nova.network.api.API') # Reinstantiate driver, as the VIF plugin is loaded during __init__ self._conn = driver_hyperv.HyperVDriver(None) # Set flag to a non existing vswitch self.flags(vswitch_name=str(uuid.uuid4())) self.assertRaises(vmutils.HyperVException, self._spawn_instance, True) self.assertFalse(self._hypervutils.vm_exists( self._instance_data["name"])) def _test_vm_state_change(self, action, from_state, to_state): self._spawn_instance(True) if from_state: self._hypervutils.set_vm_state(self._instance_data["name"], from_state) action(self._instance_data) vmstate = self._hypervutils.get_vm_state(self._instance_data["name"]) self.assertEquals(vmstate, to_state) def test_pause(self): self._test_vm_state_change(self._conn.pause, None, constants.HYPERV_VM_STATE_PAUSED) def test_pause_already_paused(self): self._test_vm_state_change(self._conn.pause, constants.HYPERV_VM_STATE_PAUSED, constants.HYPERV_VM_STATE_PAUSED) def test_unpause(self): self._test_vm_state_change(self._conn.unpause, constants.HYPERV_VM_STATE_PAUSED, constants.HYPERV_VM_STATE_ENABLED) def test_unpause_already_running(self): self._test_vm_state_change(self._conn.unpause, None, constants.HYPERV_VM_STATE_ENABLED) def test_suspend(self): self._test_vm_state_change(self._conn.suspend, None, constants.HYPERV_VM_STATE_SUSPENDED) def test_suspend_already_suspended(self): self._test_vm_state_change(self._conn.suspend, constants.HYPERV_VM_STATE_SUSPENDED, constants.HYPERV_VM_STATE_SUSPENDED) def test_resume(self): self._test_vm_state_change(lambda i: self._conn.resume(i, None), constants.HYPERV_VM_STATE_SUSPENDED, constants.HYPERV_VM_STATE_ENABLED) def test_resume_already_running(self): self._test_vm_state_change(lambda i: self._conn.resume(i, None), None, constants.HYPERV_VM_STATE_ENABLED) def test_power_off(self): self._test_vm_state_change(self._conn.power_off, None, constants.HYPERV_VM_STATE_DISABLED) def test_power_off_already_powered_off(self): self._test_vm_state_change(self._conn.suspend, constants.HYPERV_VM_STATE_DISABLED, constants.HYPERV_VM_STATE_DISABLED) def test_power_on(self): self._test_vm_state_change(self._conn.power_on, constants.HYPERV_VM_STATE_DISABLED, constants.HYPERV_VM_STATE_ENABLED) def test_power_on_already_running(self): self._test_vm_state_change(self._conn.power_on, None, constants.HYPERV_VM_STATE_ENABLED) def test_reboot(self): self._spawn_instance(True) network_info = fake_network.fake_get_instance_nw_info(self.stubs, spectacular=True) self._conn.reboot(self._instance_data, network_info, None) vmstate = self._hypervutils.get_vm_state(self._instance_data["name"]) self.assertEquals(vmstate, constants.HYPERV_VM_STATE_ENABLED) def test_destroy(self): self._spawn_instance(True) (vhd_paths, _, _) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self._conn.destroy(self._instance_data) self.assertFalse(self._hypervutils.vm_exists( self._instance_data["name"])) self._instance_data = None for vhd_path in vhd_paths: self.assertFalse(os.path.exists(vhd_path)) def test_live_migration(self): self.flags(limit_cpu_features=True) self._spawn_instance(False) # Existing server self._dest_server = "HV12OSDEMO2" self._live_migration(self._dest_server) instance_name = self._instance_data["name"] self.assertFalse(self._hypervutils.vm_exists(instance_name)) self.assertTrue(self._hypervutils.remote_vm_exists(self._dest_server, instance_name)) self.assertTrue(self._post_method_called) self.assertFalse(self._recover_method_called) def test_live_migration_with_target_failure(self): self.flags(limit_cpu_features=True) self._spawn_instance(False) dest_server = "nonexistingserver" exception_raised = False try: self._live_migration(dest_server) except Exception: exception_raised = True # Cannot use assertRaises with pythoncom.com_error on Linux self.assertTrue(exception_raised) instance_name = self._instance_data["name"] self.assertTrue(self._hypervutils.vm_exists(instance_name)) self.assertFalse(self._post_method_called) self.assertTrue(self._recover_method_called) def _live_migration(self, dest_server): def fake_post_method(context, instance_ref, dest, block_migration): self._post_method_called = True def fake_recover_method(context, instance_ref, dest, block_migration): self._recover_method_called = True self._conn.live_migration(self._context, self._instance_data, dest_server, fake_post_method, fake_recover_method) def test_pre_live_migration_cow_image(self): self._test_pre_live_migration(True) def test_pre_live_migration_no_cow_image(self): self._test_pre_live_migration(False) def _test_pre_live_migration(self, cow): self.flags(use_cow_images=cow) instance_name = 'openstack_unit_test_vm_' + str(uuid.uuid4()) network_info = fake_network.fake_get_instance_nw_info(self.stubs, spectacular=True) instance_data = db_fakes.get_fake_instance_data(instance_name, self._project_id, self._user_id) block_device_info = None self._conn.pre_live_migration(self._context, instance_data, block_device_info, network_info) if cow: self.assertTrue(not self._fetched_image is None) else: self.assertTrue(self._fetched_image is None) def test_snapshot_with_update_failure(self): expected_calls = [ {'args': (), 'kwargs': {'task_state': task_states.IMAGE_PENDING_UPLOAD}}, {'args': (), 'kwargs': {'task_state': task_states.IMAGE_UPLOADING, 'expected_state': task_states.IMAGE_PENDING_UPLOAD}}] func_call_matcher = matchers.FunctionCallMatcher(expected_calls) self._spawn_instance(True) self._update_image_raise_exception = True snapshot_name = 'test_snapshot_' + str(uuid.uuid4()) self.assertRaises(vmutils.HyperVException, self._conn.snapshot, self._context, self._instance_data, snapshot_name, func_call_matcher.call) # assert states changed in correct order self.assertIsNone(func_call_matcher.match()) # assert VM snapshots have been removed self.assertEquals(self._hypervutils.get_vm_snapshots_count( self._instance_data["name"]), 0) def test_snapshot(self): expected_calls = [ {'args': (), 'kwargs': {'task_state': task_states.IMAGE_PENDING_UPLOAD}}, {'args': (), 'kwargs': {'task_state': task_states.IMAGE_UPLOADING, 'expected_state': task_states.IMAGE_PENDING_UPLOAD}}] func_call_matcher = matchers.FunctionCallMatcher(expected_calls) self._spawn_instance(True) snapshot_name = 'test_snapshot_' + str(uuid.uuid4()) self._conn.snapshot(self._context, self._instance_data, snapshot_name, func_call_matcher.call) self.assertTrue(self._image_metadata and "disk_format" in self._image_metadata and self._image_metadata["disk_format"] == "vhd") # assert states changed in correct order self.assertIsNone(func_call_matcher.match()) # assert VM snapshots have been removed self.assertEquals(self._hypervutils.get_vm_snapshots_count( self._instance_data["name"]), 0) def _spawn_instance(self, cow, block_device_info=None): self.flags(use_cow_images=cow) instance_name = 'openstack_unit_test_vm_' + str(uuid.uuid4()) self._instance_data = db_fakes.get_fake_instance_data(instance_name, self._project_id, self._user_id) instance = db.instance_create(self._context, self._instance_data) image = db_fakes.get_fake_image_data(self._project_id, self._user_id) network_info = fake_network.fake_get_instance_nw_info(self.stubs, spectacular=True) self._conn.spawn(self._context, instance, image, injected_files=[], admin_password=None, network_info=network_info, block_device_info=block_device_info) def _test_spawn_instance(self, cow): self._spawn_instance(cow) self.assertTrue(self._hypervutils.vm_exists( self._instance_data["name"])) vmstate = self._hypervutils.get_vm_state(self._instance_data["name"]) self.assertEquals(vmstate, constants.HYPERV_VM_STATE_ENABLED) (vhd_paths, _, _) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(vhd_paths), 1) parent_path = self._hypervutils.get_vhd_parent_path(vhd_paths[0]) if cow: self.assertTrue(not parent_path is None) self.assertEquals(self._fetched_image, parent_path) else: self.assertTrue(parent_path is None) self.assertEquals(self._fetched_image, vhd_paths[0]) def _attach_volume(self): self._spawn_instance(True) connection_info = db_fakes.get_fake_volume_info_data( self._volume_target_portal, self._volume_id) self._conn.attach_volume(connection_info, self._instance_data, '/dev/sdc') def test_attach_volume(self): self._attach_volume() (_, volumes_paths, _) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(volumes_paths), 1) sessions_exist = self._hypervutils.iscsi_volume_sessions_exist( self._volume_id) self.assertTrue(sessions_exist) def test_detach_volume(self): self._attach_volume() connection_info = db_fakes.get_fake_volume_info_data( self._volume_target_portal, self._volume_id) self._conn.detach_volume(connection_info, self._instance_data, '/dev/sdc') (_, volumes_paths, _) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(volumes_paths), 0) sessions_exist = self._hypervutils.iscsi_volume_sessions_exist( self._volume_id) self.assertFalse(sessions_exist) def test_boot_from_volume(self): block_device_info = db_fakes.get_fake_block_device_info( self._volume_target_portal, self._volume_id) self._spawn_instance(False, block_device_info) (_, volumes_paths, _) = self._hypervutils.get_vm_disks( self._instance_data["name"]) self.assertEquals(len(volumes_paths), 1) sessions_exist = self._hypervutils.iscsi_volume_sessions_exist( self._volume_id) self.assertTrue(sessions_exist)