520 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			520 lines
		
	
	
		
			18 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
# vim: tabstop=4 shiftwidth=4 softtabstop=4
 | 
						|
 | 
						|
#  Copyright 2012 Cloudbase Solutions Srl
 | 
						|
#
 | 
						|
#    Licensed under the Apache License, Version 2.0 (the "License"); you may
 | 
						|
#    not use this file except in compliance with the License. You may obtain
 | 
						|
#    a copy of the License at
 | 
						|
#
 | 
						|
#         http://www.apache.org/licenses/LICENSE-2.0
 | 
						|
#
 | 
						|
#    Unless required by applicable law or agreed to in writing, software
 | 
						|
#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
 | 
						|
#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
 | 
						|
#    License for the specific language governing permissions and limitations
 | 
						|
#    under the License.
 | 
						|
 | 
						|
"""
 | 
						|
Test suite for the Hyper-V driver and related APIs.
 | 
						|
"""
 | 
						|
 | 
						|
import json
 | 
						|
import os
 | 
						|
import platform
 | 
						|
import shutil
 | 
						|
import sys
 | 
						|
import uuid
 | 
						|
 | 
						|
from nova.compute import power_state
 | 
						|
from nova import context
 | 
						|
from nova import db
 | 
						|
from nova.image import glance
 | 
						|
from nova.openstack.common import cfg
 | 
						|
from nova.tests import fake_network
 | 
						|
from nova.tests.hyperv import basetestcase
 | 
						|
from nova.tests.hyperv import db_fakes
 | 
						|
from nova.tests.hyperv import hypervutils
 | 
						|
from nova.tests.hyperv import mockproxy
 | 
						|
import nova.tests.image.fake as fake_image
 | 
						|
from nova.virt.hyperv import constants
 | 
						|
from nova.virt.hyperv import driver as driver_hyperv
 | 
						|
from nova.virt.hyperv import vmutils
 | 
						|
from nova.virt import images
 | 
						|
 | 
						|
CONF = cfg.CONF
 | 
						|
 | 
						|
 | 
						|
class HyperVAPITestCase(basetestcase.BaseTestCase):
 | 
						|
    """Unit tests for Hyper-V driver calls."""
 | 
						|
 | 
						|
    def setUp(self):
 | 
						|
        super(HyperVAPITestCase, self).setUp()
 | 
						|
 | 
						|
        self._user_id = 'fake'
 | 
						|
        self._project_id = 'fake'
 | 
						|
        self._instance_data = None
 | 
						|
        self._image_metadata = None
 | 
						|
        self._dest_server = None
 | 
						|
        self._fetched_image = None
 | 
						|
        self._update_image_raise_exception = False
 | 
						|
        self._post_method_called = False
 | 
						|
        self._recover_method_called = False
 | 
						|
        self._volume_target_portal = 'testtargetportal:3260'
 | 
						|
        self._volume_id = '8957e088-dbee-4216-8056-978353a3e737'
 | 
						|
        self._context = context.RequestContext(self._user_id, self._project_id)
 | 
						|
 | 
						|
        self._setup_stubs()
 | 
						|
 | 
						|
        self.flags(instances_path=r'C:\Hyper-V\test\instances',
 | 
						|
                        vswitch_name='external')
 | 
						|
 | 
						|
        self._hypervutils = hypervutils.HyperVUtils()
 | 
						|
        self._conn = driver_hyperv.HyperVDriver(None)
 | 
						|
 | 
						|
    def _setup_stubs(self):
 | 
						|
        db_fakes.stub_out_db_instance_api(self.stubs)
 | 
						|
        fake_image.stub_out_image_service(self.stubs)
 | 
						|
        fake_network.stub_out_nw_api_get_instance_nw_info(self.stubs)
 | 
						|
 | 
						|
        def fake_dumps(msg, default=None, **kwargs):
 | 
						|
            return '""'
 | 
						|
        self.stubs.Set(json, 'dumps', fake_dumps)
 | 
						|
 | 
						|
        def fake_fetch(context, image_id, target, user, project):
 | 
						|
            self._fetched_image = target
 | 
						|
            if not os.path.exists(target):
 | 
						|
                self._hypervutils.create_vhd(target)
 | 
						|
        self.stubs.Set(images, 'fetch', fake_fetch)
 | 
						|
 | 
						|
        def fake_get_remote_image_service(context, name):
 | 
						|
            class FakeGlanceImageService(object):
 | 
						|
                def update(self_fake, context, image_id, image_metadata, f):
 | 
						|
                    if self._update_image_raise_exception:
 | 
						|
                        raise vmutils.HyperVException(
 | 
						|
                            "Simulated update failure")
 | 
						|
                    self._image_metadata = image_metadata
 | 
						|
            return (FakeGlanceImageService(), 1)
 | 
						|
        self.stubs.Set(glance, 'get_remote_image_service',
 | 
						|
            fake_get_remote_image_service)
 | 
						|
 | 
						|
        # Modules to mock
 | 
						|
        modules_to_mock = [
 | 
						|
            'wmi',
 | 
						|
            'os',
 | 
						|
            'shutil',
 | 
						|
            'uuid',
 | 
						|
            'time',
 | 
						|
            'multiprocessing',
 | 
						|
            '_winreg',
 | 
						|
            'nova.virt.configdrive',
 | 
						|
            'nova.utils',
 | 
						|
            'ctypes'
 | 
						|
        ]
 | 
						|
 | 
						|
        # Modules in which the mocks are going to be injected
 | 
						|
        from nova.virt.hyperv import baseops
 | 
						|
        from nova.virt.hyperv import basevolumeutils
 | 
						|
        from nova.virt.hyperv import hostops
 | 
						|
        from nova.virt.hyperv import livemigrationops
 | 
						|
        from nova.virt.hyperv import snapshotops
 | 
						|
        from nova.virt.hyperv import vmops
 | 
						|
        from nova.virt.hyperv import volumeops
 | 
						|
        from nova.virt.hyperv import volumeutils
 | 
						|
        from nova.virt.hyperv import volumeutilsV2
 | 
						|
 | 
						|
        modules_to_test = [
 | 
						|
            driver_hyperv,
 | 
						|
            basevolumeutils,
 | 
						|
            baseops,
 | 
						|
            hostops,
 | 
						|
            vmops,
 | 
						|
            vmutils,
 | 
						|
            volumeops,
 | 
						|
            volumeutils,
 | 
						|
            volumeutilsV2,
 | 
						|
            snapshotops,
 | 
						|
            livemigrationops,
 | 
						|
            hypervutils,
 | 
						|
            db_fakes,
 | 
						|
            sys.modules[__name__]
 | 
						|
        ]
 | 
						|
 | 
						|
        self._inject_mocks_in_modules(modules_to_mock, modules_to_test)
 | 
						|
 | 
						|
        if isinstance(snapshotops.wmi, mockproxy.Mock):
 | 
						|
            from nova.virt.hyperv import ioutils
 | 
						|
            import StringIO
 | 
						|
 | 
						|
            def fake_open(name, mode):
 | 
						|
                return StringIO.StringIO("fake file content")
 | 
						|
            self.stubs.Set(ioutils, 'open', fake_open)
 | 
						|
 | 
						|
    def tearDown(self):
 | 
						|
        try:
 | 
						|
            if self._instance_data and self._hypervutils.vm_exists(
 | 
						|
                    self._instance_data["name"]):
 | 
						|
                self._hypervutils.remove_vm(self._instance_data["name"])
 | 
						|
 | 
						|
            if self._dest_server and \
 | 
						|
                    self._hypervutils.remote_vm_exists(self._dest_server,
 | 
						|
                        self._instance_data["name"]):
 | 
						|
                self._hypervutils.remove_remote_vm(self._dest_server,
 | 
						|
                    self._instance_data["name"])
 | 
						|
 | 
						|
            self._hypervutils.logout_iscsi_volume_sessions(self._volume_id)
 | 
						|
 | 
						|
            shutil.rmtree(CONF.instances_path, True)
 | 
						|
 | 
						|
            fake_image.FakeImageService_reset()
 | 
						|
        finally:
 | 
						|
            super(HyperVAPITestCase, self).tearDown()
 | 
						|
 | 
						|
    def test_get_available_resource(self):
 | 
						|
        dic = self._conn.get_available_resource(None)
 | 
						|
 | 
						|
        self.assertEquals(dic['hypervisor_hostname'], platform.node())
 | 
						|
 | 
						|
    def test_get_host_stats(self):
 | 
						|
        dic = self._conn.get_host_stats(True)
 | 
						|
 | 
						|
        self.assertEquals(dic['disk_total'],
 | 
						|
            dic['disk_used'] + dic['disk_available'])
 | 
						|
        self.assertEquals(dic['host_memory_total'],
 | 
						|
            dic['host_memory_overhead'] + dic['host_memory_free'])
 | 
						|
 | 
						|
    def test_list_instances(self):
 | 
						|
        num_vms = self._hypervutils.get_vm_count()
 | 
						|
        instances = self._conn.list_instances()
 | 
						|
 | 
						|
        self.assertEquals(len(instances), num_vms)
 | 
						|
 | 
						|
    def test_get_info(self):
 | 
						|
        self._spawn_instance(True)
 | 
						|
        info = self._conn.get_info(self._instance_data)
 | 
						|
 | 
						|
        self.assertEquals(info["state"], power_state.RUNNING)
 | 
						|
 | 
						|
    def test_spawn_cow_image(self):
 | 
						|
        self._test_spawn_instance(True)
 | 
						|
 | 
						|
    def test_spawn_no_cow_image(self):
 | 
						|
        self._test_spawn_instance(False)
 | 
						|
 | 
						|
    def test_spawn_config_drive(self):
 | 
						|
        self.flags(force_config_drive=True)
 | 
						|
        self.flags(mkisofs_cmd='mkisofs.exe')
 | 
						|
 | 
						|
        self._spawn_instance(True)
 | 
						|
 | 
						|
        (vhd_paths, _, dvd_paths) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
        self.assertEquals(len(dvd_paths), 0)
 | 
						|
        self.assertEquals(len(vhd_paths), 2)
 | 
						|
 | 
						|
    def test_spawn_config_drive_cdrom(self):
 | 
						|
        self.flags(force_config_drive=True)
 | 
						|
        self.flags(config_drive_cdrom=True)
 | 
						|
        self.flags(mkisofs_cmd='mkisofs.exe')
 | 
						|
 | 
						|
        self._spawn_instance(True)
 | 
						|
 | 
						|
        (vhd_paths, _, dvd_paths) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
        self.assertEquals(len(dvd_paths), 1)
 | 
						|
        self.assertEquals(len(vhd_paths), 1)
 | 
						|
        self.assertTrue(os.path.exists(dvd_paths[0]))
 | 
						|
 | 
						|
    def test_spawn_no_config_drive(self):
 | 
						|
        self.flags(force_config_drive=False)
 | 
						|
 | 
						|
        self._spawn_instance(True)
 | 
						|
 | 
						|
        (_, _, dvd_paths) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
        self.assertEquals(len(dvd_paths), 0)
 | 
						|
 | 
						|
    def test_spawn_no_vswitch_exception(self):
 | 
						|
        # Set flag to a non existing vswitch
 | 
						|
        self.flags(vswitch_name=str(uuid.uuid4()))
 | 
						|
        self.assertRaises(vmutils.HyperVException, self._spawn_instance, True)
 | 
						|
 | 
						|
        self.assertFalse(self._hypervutils.vm_exists(
 | 
						|
            self._instance_data["name"]))
 | 
						|
 | 
						|
    def _test_vm_state_change(self, action, from_state, to_state):
 | 
						|
        self._spawn_instance(True)
 | 
						|
        if from_state:
 | 
						|
            self._hypervutils.set_vm_state(self._instance_data["name"],
 | 
						|
                from_state)
 | 
						|
        action(self._instance_data)
 | 
						|
 | 
						|
        vmstate = self._hypervutils.get_vm_state(self._instance_data["name"])
 | 
						|
        self.assertEquals(vmstate, to_state)
 | 
						|
 | 
						|
    def test_pause(self):
 | 
						|
        self._test_vm_state_change(self._conn.pause, None,
 | 
						|
            constants.HYPERV_VM_STATE_PAUSED)
 | 
						|
 | 
						|
    def test_pause_already_paused(self):
 | 
						|
        self._test_vm_state_change(self._conn.pause,
 | 
						|
            constants.HYPERV_VM_STATE_PAUSED,
 | 
						|
            constants.HYPERV_VM_STATE_PAUSED)
 | 
						|
 | 
						|
    def test_unpause(self):
 | 
						|
        self._test_vm_state_change(self._conn.unpause,
 | 
						|
            constants.HYPERV_VM_STATE_PAUSED,
 | 
						|
            constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_unpause_already_running(self):
 | 
						|
        self._test_vm_state_change(self._conn.unpause, None,
 | 
						|
            constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_suspend(self):
 | 
						|
        self._test_vm_state_change(self._conn.suspend, None,
 | 
						|
                constants.HYPERV_VM_STATE_SUSPENDED)
 | 
						|
 | 
						|
    def test_suspend_already_suspended(self):
 | 
						|
        self._test_vm_state_change(self._conn.suspend,
 | 
						|
                constants.HYPERV_VM_STATE_SUSPENDED,
 | 
						|
                constants.HYPERV_VM_STATE_SUSPENDED)
 | 
						|
 | 
						|
    def test_resume(self):
 | 
						|
        self._test_vm_state_change(lambda i: self._conn.resume(i, None),
 | 
						|
            constants.HYPERV_VM_STATE_SUSPENDED,
 | 
						|
            constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_resume_already_running(self):
 | 
						|
        self._test_vm_state_change(lambda i: self._conn.resume(i, None), None,
 | 
						|
            constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_power_off(self):
 | 
						|
        self._test_vm_state_change(self._conn.power_off, None,
 | 
						|
            constants.HYPERV_VM_STATE_DISABLED)
 | 
						|
 | 
						|
    def test_power_off_already_powered_off(self):
 | 
						|
        self._test_vm_state_change(self._conn.suspend,
 | 
						|
            constants.HYPERV_VM_STATE_DISABLED,
 | 
						|
            constants.HYPERV_VM_STATE_DISABLED)
 | 
						|
 | 
						|
    def test_power_on(self):
 | 
						|
        self._test_vm_state_change(self._conn.power_on,
 | 
						|
            constants.HYPERV_VM_STATE_DISABLED,
 | 
						|
            constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_power_on_already_running(self):
 | 
						|
        self._test_vm_state_change(self._conn.power_on, None,
 | 
						|
            constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_reboot(self):
 | 
						|
        self._spawn_instance(True)
 | 
						|
 | 
						|
        network_info = fake_network.fake_get_instance_nw_info(self.stubs,
 | 
						|
                                                              spectacular=True)
 | 
						|
        self._conn.reboot(self._instance_data, network_info, None)
 | 
						|
 | 
						|
        vmstate = self._hypervutils.get_vm_state(self._instance_data["name"])
 | 
						|
        self.assertEquals(vmstate, constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
    def test_destroy(self):
 | 
						|
        self._spawn_instance(True)
 | 
						|
        (vhd_paths, _, _) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
 | 
						|
        self._conn.destroy(self._instance_data)
 | 
						|
 | 
						|
        self.assertFalse(self._hypervutils.vm_exists(
 | 
						|
            self._instance_data["name"]))
 | 
						|
        self._instance_data = None
 | 
						|
 | 
						|
        for vhd_path in vhd_paths:
 | 
						|
            self.assertFalse(os.path.exists(vhd_path))
 | 
						|
 | 
						|
    def test_live_migration(self):
 | 
						|
        self.flags(limit_cpu_features=True)
 | 
						|
        self._spawn_instance(False)
 | 
						|
 | 
						|
        # Existing server
 | 
						|
        self._dest_server = "HV12OSDEMO2"
 | 
						|
 | 
						|
        self._live_migration(self._dest_server)
 | 
						|
 | 
						|
        instance_name = self._instance_data["name"]
 | 
						|
        self.assertFalse(self._hypervutils.vm_exists(instance_name))
 | 
						|
        self.assertTrue(self._hypervutils.remote_vm_exists(self._dest_server,
 | 
						|
            instance_name))
 | 
						|
 | 
						|
        self.assertTrue(self._post_method_called)
 | 
						|
        self.assertFalse(self._recover_method_called)
 | 
						|
 | 
						|
    def test_live_migration_with_target_failure(self):
 | 
						|
        self.flags(limit_cpu_features=True)
 | 
						|
        self._spawn_instance(False)
 | 
						|
 | 
						|
        dest_server = "nonexistingserver"
 | 
						|
 | 
						|
        exception_raised = False
 | 
						|
        try:
 | 
						|
            self._live_migration(dest_server)
 | 
						|
        except Exception:
 | 
						|
            exception_raised = True
 | 
						|
 | 
						|
        # Cannot use assertRaises with pythoncom.com_error on Linux
 | 
						|
        self.assertTrue(exception_raised)
 | 
						|
 | 
						|
        instance_name = self._instance_data["name"]
 | 
						|
        self.assertTrue(self._hypervutils.vm_exists(instance_name))
 | 
						|
 | 
						|
        self.assertFalse(self._post_method_called)
 | 
						|
        self.assertTrue(self._recover_method_called)
 | 
						|
 | 
						|
    def _live_migration(self, dest_server):
 | 
						|
        def fake_post_method(context, instance_ref, dest, block_migration):
 | 
						|
                self._post_method_called = True
 | 
						|
 | 
						|
        def fake_recover_method(context, instance_ref, dest, block_migration):
 | 
						|
            self._recover_method_called = True
 | 
						|
 | 
						|
        self._conn.live_migration(self._context, self._instance_data,
 | 
						|
            dest_server, fake_post_method, fake_recover_method)
 | 
						|
 | 
						|
    def test_pre_live_migration_cow_image(self):
 | 
						|
        self._test_pre_live_migration(True)
 | 
						|
 | 
						|
    def test_pre_live_migration_no_cow_image(self):
 | 
						|
        self._test_pre_live_migration(False)
 | 
						|
 | 
						|
    def _test_pre_live_migration(self, cow):
 | 
						|
        self.flags(use_cow_images=cow)
 | 
						|
 | 
						|
        instance_name = 'openstack_unit_test_vm_' + str(uuid.uuid4())
 | 
						|
 | 
						|
        network_info = fake_network.fake_get_instance_nw_info(self.stubs,
 | 
						|
                                                              spectacular=True)
 | 
						|
        instance_data = db_fakes.get_fake_instance_data(instance_name,
 | 
						|
            self._project_id, self._user_id)
 | 
						|
        block_device_info = None
 | 
						|
 | 
						|
        self._conn.pre_live_migration(self._context, instance_data,
 | 
						|
            block_device_info, network_info)
 | 
						|
 | 
						|
        if cow:
 | 
						|
            self.assertTrue(not self._fetched_image is None)
 | 
						|
        else:
 | 
						|
            self.assertTrue(self._fetched_image is None)
 | 
						|
 | 
						|
    def test_snapshot_with_update_failure(self):
 | 
						|
        self._spawn_instance(True)
 | 
						|
 | 
						|
        self._update_image_raise_exception = True
 | 
						|
        snapshot_name = 'test_snapshot_' + str(uuid.uuid4())
 | 
						|
        self.assertRaises(vmutils.HyperVException, self._conn.snapshot,
 | 
						|
            self._context, self._instance_data, snapshot_name)
 | 
						|
 | 
						|
        # assert VM snapshots have been removed
 | 
						|
        self.assertEquals(self._hypervutils.get_vm_snapshots_count(
 | 
						|
            self._instance_data["name"]), 0)
 | 
						|
 | 
						|
    def test_snapshot(self):
 | 
						|
        self._spawn_instance(True)
 | 
						|
 | 
						|
        snapshot_name = 'test_snapshot_' + str(uuid.uuid4())
 | 
						|
        self._conn.snapshot(self._context, self._instance_data, snapshot_name)
 | 
						|
 | 
						|
        self.assertTrue(self._image_metadata and
 | 
						|
                "disk_format" in self._image_metadata and
 | 
						|
                self._image_metadata["disk_format"] == "vhd")
 | 
						|
 | 
						|
        # assert VM snapshots have been removed
 | 
						|
        self.assertEquals(self._hypervutils.get_vm_snapshots_count(
 | 
						|
            self._instance_data["name"]), 0)
 | 
						|
 | 
						|
    def _spawn_instance(self, cow, block_device_info=None):
 | 
						|
        self.flags(use_cow_images=cow)
 | 
						|
 | 
						|
        instance_name = 'openstack_unit_test_vm_' + str(uuid.uuid4())
 | 
						|
 | 
						|
        self._instance_data = db_fakes.get_fake_instance_data(instance_name,
 | 
						|
                self._project_id, self._user_id)
 | 
						|
        instance = db.instance_create(self._context, self._instance_data)
 | 
						|
 | 
						|
        image = db_fakes.get_fake_image_data(self._project_id, self._user_id)
 | 
						|
 | 
						|
        network_info = fake_network.fake_get_instance_nw_info(self.stubs,
 | 
						|
                                                              spectacular=True)
 | 
						|
 | 
						|
        self._conn.spawn(self._context, instance, image,
 | 
						|
                         injected_files=[], admin_password=None,
 | 
						|
                         network_info=network_info,
 | 
						|
                         block_device_info=block_device_info)
 | 
						|
 | 
						|
    def _test_spawn_instance(self, cow):
 | 
						|
        self._spawn_instance(cow)
 | 
						|
 | 
						|
        self.assertTrue(self._hypervutils.vm_exists(
 | 
						|
            self._instance_data["name"]))
 | 
						|
 | 
						|
        vmstate = self._hypervutils.get_vm_state(self._instance_data["name"])
 | 
						|
        self.assertEquals(vmstate, constants.HYPERV_VM_STATE_ENABLED)
 | 
						|
 | 
						|
        (vhd_paths, _, _) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
        self.assertEquals(len(vhd_paths), 1)
 | 
						|
 | 
						|
        parent_path = self._hypervutils.get_vhd_parent_path(vhd_paths[0])
 | 
						|
        if cow:
 | 
						|
            self.assertTrue(not parent_path is None)
 | 
						|
            self.assertEquals(self._fetched_image, parent_path)
 | 
						|
        else:
 | 
						|
            self.assertTrue(parent_path is None)
 | 
						|
            self.assertEquals(self._fetched_image, vhd_paths[0])
 | 
						|
 | 
						|
    def _attach_volume(self):
 | 
						|
        self._spawn_instance(True)
 | 
						|
        connection_info = db_fakes.get_fake_volume_info_data(
 | 
						|
            self._volume_target_portal, self._volume_id)
 | 
						|
 | 
						|
        self._conn.attach_volume(connection_info,
 | 
						|
            self._instance_data["name"], '/dev/sdc')
 | 
						|
 | 
						|
    def test_attach_volume(self):
 | 
						|
        self._attach_volume()
 | 
						|
 | 
						|
        (_, volumes_paths, _) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
        self.assertEquals(len(volumes_paths), 1)
 | 
						|
 | 
						|
        sessions_exist = self._hypervutils.iscsi_volume_sessions_exist(
 | 
						|
            self._volume_id)
 | 
						|
        self.assertTrue(sessions_exist)
 | 
						|
 | 
						|
    def test_detach_volume(self):
 | 
						|
        self._attach_volume()
 | 
						|
        connection_info = db_fakes.get_fake_volume_info_data(
 | 
						|
            self._volume_target_portal, self._volume_id)
 | 
						|
 | 
						|
        self._conn.detach_volume(connection_info,
 | 
						|
            self._instance_data["name"], '/dev/sdc')
 | 
						|
 | 
						|
        (_, volumes_paths, _) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
        self.assertEquals(len(volumes_paths), 0)
 | 
						|
 | 
						|
        sessions_exist = self._hypervutils.iscsi_volume_sessions_exist(
 | 
						|
            self._volume_id)
 | 
						|
        self.assertFalse(sessions_exist)
 | 
						|
 | 
						|
    def test_boot_from_volume(self):
 | 
						|
        block_device_info = db_fakes.get_fake_block_device_info(
 | 
						|
            self._volume_target_portal, self._volume_id)
 | 
						|
 | 
						|
        self._spawn_instance(False, block_device_info)
 | 
						|
 | 
						|
        (_, volumes_paths, _) = self._hypervutils.get_vm_disks(
 | 
						|
            self._instance_data["name"])
 | 
						|
 | 
						|
        self.assertEquals(len(volumes_paths), 1)
 | 
						|
 | 
						|
        sessions_exist = self._hypervutils.iscsi_volume_sessions_exist(
 | 
						|
            self._volume_id)
 | 
						|
        self.assertTrue(sessions_exist)
 |