Refactoring required for blueprint xenapi-live-migration

This refactoring of the libvirt live migration code is
required to enable live migration in the xenapi driver.

This change ensures libvirt specific checks are performed
only when the libvirt driver is enabled.

The complication is that some of these checks require
information to be passed between the source and destination
hosts. For example, when comparing CPU flags.

Change-Id: I7389f0b7f03313d7f04b907f481787dadf0716fd
This commit is contained in:
John Garbutt
2012-01-31 14:49:04 +00:00
committed by Gerrit Code Review
parent a2c05274af
commit cac332c396
13 changed files with 769 additions and 674 deletions

View File

@@ -69,6 +69,7 @@ from nova.openstack.common import jsonutils
from nova.openstack.common import log as logging
from nova.openstack.common.notifier import api as notifier
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
from nova.openstack.common import timeutils
from nova import utils
from nova.virt import driver
@@ -234,7 +235,7 @@ def _get_additional_capabilities():
class ComputeManager(manager.SchedulerDependentManager):
"""Manages the running instances from creation to destruction."""
RPC_API_VERSION = '1.1'
RPC_API_VERSION = '1.2'
def __init__(self, compute_driver=None, *args, **kwargs):
"""Load configuration options and connect to the hypervisor."""
@@ -1892,66 +1893,8 @@ class ComputeManager(manager.SchedulerDependentManager):
except exception.NotFound:
pass
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def compare_cpu(self, context, cpu_info):
"""Checks that the host cpu is compatible with a cpu given by xml.
:param context: security context
:param cpu_info: json string obtained from virConnect.getCapabilities
:returns: See driver.compare_cpu
"""
return self.driver.compare_cpu(cpu_info)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def create_shared_storage_test_file(self, context):
"""Makes tmpfile under FLAGS.instance_path.
This method creates a temporary file that acts as an indicator to
other compute nodes that utilize the same shared storage as this node.
(create|check|cleanup)_shared_storage_test_file() are a set and should
be run together.
:param context: security context
:returns: tmpfile name(basename)
"""
dirpath = FLAGS.instances_path
fd, tmp_file = tempfile.mkstemp(dir=dirpath)
LOG.debug(_("Creating tmpfile %s to notify to other "
"compute nodes that they should mount "
"the same storage.") % tmp_file)
os.close(fd)
return os.path.basename(tmp_file)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def check_shared_storage_test_file(self, context, filename):
"""Confirms existence of the tmpfile under FLAGS.instances_path.
Cannot confirm tmpfile return False.
:param context: security context
:param filename: confirm existence of FLAGS.instances_path/thisfile
"""
tmp_file = os.path.join(FLAGS.instances_path, filename)
if not os.path.exists(tmp_file):
return False
else:
return True
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def cleanup_shared_storage_test_file(self, context, filename):
"""Removes existence of the tmpfile under FLAGS.instances_path.
:param context: security context
:param filename: remove existence of FLAGS.instances_path/thisfile
"""
tmp_file = os.path.join(FLAGS.instances_path, filename)
os.remove(tmp_file)
def get_instance_disk_info(self, context, instance_name):
"""Get information about instance's current disk.
"""Getting infomation of instance's current disk.
Implementation nova.virt.libvirt.connection.
@@ -1961,6 +1904,62 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
return self.driver.get_instance_disk_info(instance_name)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def compare_cpu(self, context, cpu_info):
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def create_shared_storage_test_file(self, context):
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def check_shared_storage_test_file(self, context, filename):
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def cleanup_shared_storage_test_file(self, context, filename):
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def check_can_live_migrate_destination(self, ctxt, instance_id,
block_migration=False,
disk_over_commit=False):
"""Check if it is possible to execute live migration.
This runs checks on the destination host, and then calls
back to the source host to check the results.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param block_migration: if true, prepare for block migration
:param disk_over_commit: if true, allow disk over commit
"""
instance_ref = self.db.instance_get(ctxt, instance_id)
dest_check_data = self.driver.check_can_live_migrate_destination(ctxt,
instance_ref, block_migration, disk_over_commit)
try:
self.compute_rpcapi.check_can_live_migrate_source(ctxt,
instance_ref, dest_check_data)
finally:
self.driver.check_can_live_migrate_destination_cleanup(ctxt,
dest_check_data)
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def check_can_live_migrate_source(self, ctxt, instance_id,
dest_check_data):
"""Check if it is possible to execute live migration.
This checks if the live migration can succeed, based on the
results from check_can_live_migrate_destination.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param dest_check_data: result of check_can_live_migrate_destination
"""
instance_ref = self.db.instance_get(ctxt, instance_id)
self.driver.check_can_live_migrate_source(ctxt, instance_ref,
dest_check_data)
def pre_live_migration(self, context, instance_id,
block_migration=False, disk=None):
"""Preparations for live migration at dest host.
@@ -1979,40 +1978,21 @@ class ComputeManager(manager.SchedulerDependentManager):
if not block_device_info['block_device_mapping']:
LOG.info(_('Instance has no volume.'), instance=instance_ref)
self.driver.pre_live_migration(block_device_info)
# NOTE(tr3buchet): setup networks on destination host
self.network_api.setup_networks_on_host(context, instance_ref,
self.host)
# Bridge settings.
# Call this method prior to ensure_filtering_rules_for_instance,
# since bridge is not set up, ensure_filtering_rules_for instance
# fails.
#
# Retry operation is necessary because continuously request comes,
# concorrent request occurs to iptables, then it complains.
network_info = self._get_instance_nw_info(context, instance_ref)
# TODO(tr3buchet): figure out how on the earth this is necessary
fixed_ips = network_info.fixed_ips()
if not fixed_ips:
raise exception.FixedIpNotFoundForInstance(instance_id=instance_id)
raise exception.FixedIpNotFoundForInstance(
instance_id=instance_id)
max_retry = FLAGS.live_migration_retry_count
for cnt in range(max_retry):
try:
self.driver.plug_vifs(instance_ref,
self._legacy_nw_info(network_info))
break
except exception.ProcessExecutionError:
if cnt == max_retry - 1:
raise
else:
LOG.warn(_("plug_vifs() failed %(cnt)d."
"Retry up to %(max_retry)d for %(hostname)s.")
% locals(), instance=instance_ref)
time.sleep(1)
self.driver.pre_live_migration(context, instance_ref,
block_device_info,
self._legacy_nw_info(network_info))
# NOTE(tr3buchet): setup networks on destination host
self.network_api.setup_networks_on_host(context, instance_ref,
self.host)
# Creating filters to hypervisors and firewalls.
# An example is that nova-instance-instance-xxx,
@@ -2083,12 +2063,11 @@ class ComputeManager(manager.SchedulerDependentManager):
and mainly updating database record.
:param ctxt: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param instance_ref: nova.db.sqlalchemy.models.Instance
:param dest: destination host
:param block_migration: if true, prepare for block migration
"""
LOG.info(_('post_live_migration() is started..'),
instance=instance_ref)
@@ -2161,8 +2140,8 @@ class ComputeManager(manager.SchedulerDependentManager):
"This error can be safely ignored."),
instance=instance_ref)
def post_live_migration_at_destination(self, context,
instance_id, block_migration=False):
def post_live_migration_at_destination(self, context, instance_id,
block_migration=False):
"""Post operations for live migration .
:param context: security context
@@ -2204,7 +2183,7 @@ class ComputeManager(manager.SchedulerDependentManager):
"""Recovers Instance/volume state from migrating -> running.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param instance_ref: nova.db.sqlalchemy.models.Instance
:param dest:
This method is called from live migration src host.
This param specifies destination host.

View File

@@ -21,6 +21,7 @@ Client side of the compute RPC API.
from nova import exception
from nova import flags
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
import nova.openstack.common.rpc.proxy
@@ -55,6 +56,7 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
1.0 - Initial version.
1.1 - Adds get_host_uptime()
1.2 - Adds check_can_live_migrate_[destination|source]
'''
BASE_RPC_API_VERSION = '1.0'
@@ -88,19 +90,33 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
mountpoint=mountpoint),
topic=_compute_topic(self.topic, ctxt, None, instance))
def check_can_live_migrate_destination(self, ctxt, instance, destination,
block_migration, disk_over_commit):
self.call(ctxt, self.make_msg('check_can_live_migrate_destination',
instance_id=instance['id'],
block_migration=block_migration,
disk_over_commit=disk_over_commit),
topic=_compute_topic(self.topic, ctxt, destination, None),
version='1.2')
def check_can_live_migrate_source(self, ctxt, instance, dest_check_data):
self.call(ctxt, self.make_msg('check_can_live_migrate_source',
instance_id=instance['id'],
dest_check_data=dest_check_data),
topic=_compute_topic(self.topic, ctxt, None, instance),
version='1.2')
def check_shared_storage_test_file(self, ctxt, filename, host):
return self.call(ctxt, self.make_msg('check_shared_storage_test_file',
filename=filename),
topic=_compute_topic(self.topic, ctxt, host, None))
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
def cleanup_shared_storage_test_file(self, ctxt, filename, host):
self.cast(ctxt, self.make_msg('cleanup_shared_storage_test_file',
filename=filename),
topic=_compute_topic(self.topic, ctxt, host, None))
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
def compare_cpu(self, ctxt, cpu_info, host):
return self.call(ctxt, self.make_msg('compare_cpu', cpu_info=cpu_info),
topic=_compute_topic(self.topic, ctxt, host, None))
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
def create_shared_storage_test_file(self, ctxt, host):
raise rpc_common.RPCException(message=_('Deprecated from version 1.2'))
def confirm_resize(self, ctxt, instance, migration_id, host,
cast=True):
@@ -109,11 +125,6 @@ class ComputeAPI(nova.openstack.common.rpc.proxy.RpcProxy):
instance_uuid=instance['uuid'], migration_id=migration_id),
topic=_compute_topic(self.topic, ctxt, host, instance))
def create_shared_storage_test_file(self, ctxt, host):
return self.call(ctxt,
self.make_msg('create_shared_storage_test_file'),
topic=_compute_topic(self.topic, ctxt, host, None))
def detach_volume(self, ctxt, instance, volume_id):
self.cast(ctxt, self.make_msg('detach_volume',
instance_uuid=instance['uuid'], volume_id=volume_id),

View File

@@ -212,36 +212,30 @@ class Scheduler(object):
The host where instance is running currently.
Then scheduler send request that host.
"""
# Whether instance exists and is running.
# Check we can do live migration
instance_ref = db.instance_get(context, instance_id)
# Checking instance.
self._live_migration_src_check(context, instance_ref)
self._live_migration_dest_check(context, instance_ref, dest)
self._live_migration_common_check(context, instance_ref, dest)
self.compute_rpcapi.check_can_live_migrate_destination(context,
instance_ref, dest, block_migration, disk_over_commit)
# Checking destination host.
self._live_migration_dest_check(context, instance_ref,
dest, block_migration,
disk_over_commit)
# Common checking.
self._live_migration_common_check(context, instance_ref,
dest, block_migration,
disk_over_commit)
# Changing instance_state.
# Change instance_state
values = {"task_state": task_states.MIGRATING}
# update instance state and notify
(old_ref, new_instance_ref) = db.instance_update_and_get_original(
context, instance_id, values)
context, instance_ref['uuid'], values)
notifications.send_update(context, old_ref, new_instance_ref,
service="scheduler")
# Perform migration
src = instance_ref['host']
cast_to_compute_host(context, src, 'live_migration',
update_db=False,
instance_id=instance_id,
dest=dest,
block_migration=block_migration)
update_db=False,
instance_id=instance_id,
dest=dest,
block_migration=block_migration)
def _live_migration_src_check(self, context, instance_ref):
"""Live migration check routine (for src host).
@@ -250,7 +244,7 @@ class Scheduler(object):
:param instance_ref: nova.db.sqlalchemy.models.Instance object
"""
# TODO(johngar) why is this not in the API layer?
# Checking instance is running.
if instance_ref['power_state'] != power_state.RUNNING:
raise exception.InstanceNotRunning(
@@ -258,22 +252,21 @@ class Scheduler(object):
# Checking src host exists and compute node
src = instance_ref['host']
services = db.service_get_all_compute_by_host(context, src)
try:
services = db.service_get_all_compute_by_host(context, src)
except exception.NotFound:
raise exception.ComputeServiceUnavailable(host=src)
# Checking src host is alive.
if not utils.service_is_up(services[0]):
raise exception.ComputeServiceUnavailable(host=src)
def _live_migration_dest_check(self, context, instance_ref, dest,
block_migration, disk_over_commit):
def _live_migration_dest_check(self, context, instance_ref, dest):
"""Live migration check routine (for destination host).
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration: if true, block_migration.
:param disk_over_commit: if True, consider real(not virtual)
disk size.
"""
# Checking dest exists and compute node.
@@ -291,15 +284,11 @@ class Scheduler(object):
raise exception.UnableToMigrateToSelf(
instance_id=instance_ref['uuid'], host=dest)
# Checking dst host still has enough capacities.
self.assert_compute_node_has_enough_resources(context,
instance_ref,
dest,
block_migration,
disk_over_commit)
# Check memory requirements
self._assert_compute_node_has_enough_memory(context,
instance_ref, dest)
def _live_migration_common_check(self, context, instance_ref, dest,
block_migration, disk_over_commit):
def _live_migration_common_check(self, context, instance_ref, dest):
"""Live migration common check routine.
Below checkings are followed by
@@ -308,38 +297,10 @@ class Scheduler(object):
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration: if true, block_migration.
:param disk_over_commit: if True, consider real(not virtual)
disk size.
"""
# Checking shared storage connectivity
# if block migration, instances_paths should not be on shared storage.
shared = self.mounted_on_same_shared_storage(context, instance_ref,
dest)
if block_migration:
if shared:
reason = _("Block migration can not be used "
"with shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
elif not shared:
reason = _("Live migration can not be used "
"without shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
# Checking destination host exists.
dservice_refs = db.service_get_all_compute_by_host(context, dest)
dservice_ref = dservice_refs[0]['compute_node'][0]
# Checking original host( where instance was launched at) exists.
try:
oservice_refs = db.service_get_all_compute_by_host(context,
instance_ref['host'])
except exception.NotFound:
raise exception.SourceHostUnavailable()
oservice_ref = oservice_refs[0]['compute_node'][0]
dservice_ref = self._get_compute_info(context, dest)
src = instance_ref['host']
oservice_ref = self._get_compute_info(context, src)
# Checking hypervisor is same.
orig_hypervisor = oservice_ref['hypervisor_type']
@@ -353,40 +314,7 @@ class Scheduler(object):
if orig_hypervisor > dest_hypervisor:
raise exception.DestinationHypervisorTooOld()
# Checking cpuinfo.
try:
self.compute_rpcapi.compare_cpu(context, oservice_ref['cpu_info'],
dest)
except exception.InvalidCPUInfo:
src = instance_ref['host']
LOG.exception(_("host %(dest)s is not compatible with "
"original host %(src)s.") % locals())
raise
def assert_compute_node_has_enough_resources(self, context, instance_ref,
dest, block_migration,
disk_over_commit):
"""Checks if destination host has enough resource for live migration.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration: if true, block_migration.
:param disk_over_commit: if True, consider real(not virtual)
disk size.
"""
self.assert_compute_node_has_enough_memory(context,
instance_ref, dest)
if not block_migration:
return
self.assert_compute_node_has_enough_disk(context,
instance_ref, dest,
disk_over_commit)
def assert_compute_node_has_enough_memory(self, context,
def _assert_compute_node_has_enough_memory(self, context,
instance_ref, dest):
"""Checks if destination host has enough memory for live migration.
@@ -397,7 +325,7 @@ class Scheduler(object):
"""
# Getting total available memory of host
avail = self._get_compute_info(context, dest, 'memory_mb')
avail = self._get_compute_info(context, dest)['memory_mb']
# Getting total used memory and disk of host
# It should be sum of memories that are assigned as max value,
@@ -414,54 +342,7 @@ class Scheduler(object):
"instance:%(mem_inst)s)")
raise exception.MigrationError(reason=reason % locals())
def assert_compute_node_has_enough_disk(self, context, instance_ref, dest,
disk_over_commit):
"""Checks if destination host has enough disk for block migration.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param disk_over_commit: if True, consider real(not virtual)
disk size.
"""
# Libvirt supports qcow2 disk format,which is usually compressed
# on compute nodes.
# Real disk image (compressed) may enlarged to "virtual disk size",
# that is specified as the maximum disk size.
# (See qemu-img -f path-to-disk)
# Scheduler recognizes destination host still has enough disk space
# if real disk size < available disk size
# if disk_over_commit is True,
# otherwise virtual disk size < available disk size.
# Getting total available disk of host
available_gb = self._get_compute_info(context,
dest, 'disk_available_least')
available = available_gb * (1024 ** 3)
# Getting necessary disk size
ret = self.compute_rpcapi.get_instance_disk_info(context, instance_ref)
disk_infos = jsonutils.loads(ret)
necessary = 0
if disk_over_commit:
for info in disk_infos:
necessary += int(info['disk_size'])
else:
for info in disk_infos:
necessary += int(info['virt_disk_size'])
# Check that available disk > necessary disk
if (available - necessary) < 0:
instance_uuid = instance_ref['uuid']
reason = _("Unable to migrate %(instance_uuid)s to %(dest)s: "
"Lack of disk(host:%(available)s "
"<= instance:%(necessary)s)")
raise exception.MigrationError(reason=reason % locals())
def _get_compute_info(self, context, host, key):
def _get_compute_info(self, context, host):
"""get compute node's information specified by key
:param context: security context
@@ -471,33 +352,4 @@ class Scheduler(object):
"""
compute_node_ref = db.service_get_all_compute_by_host(context, host)
compute_node_ref = compute_node_ref[0]['compute_node'][0]
return compute_node_ref[key]
def mounted_on_same_shared_storage(self, context, instance_ref, dest):
"""Check if the src and dest host mount same shared storage.
At first, dest host creates temp file, and src host can see
it if they mounts same shared storage. Then src host erase it.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
src = instance_ref['host']
filename = self.compute_rpcapi.create_shared_storage_test_file(context,
dest)
try:
# make sure existence at src host.
ret = self.compute_rpcapi.check_shared_storage_test_file(context,
filename, src)
finally:
self.compute_rpcapi.cleanup_shared_storage_test_file(context,
filename, dest)
return ret
return compute_node_ref[0]['compute_node'][0]

View File

@@ -1380,19 +1380,114 @@ class ComputeTestCase(BaseTestCase):
self.assertEqual(inst_ref['vm_state'], vm_states.ERROR)
self.compute.terminate_instance(context, inst_ref['uuid'])
def test_check_can_live_migrate_source_works_correctly(self):
"""Confirm check_can_live_migrate_source works on positive path"""
context = self.context.elevated()
inst_ref = self._create_fake_instance({'host': 'fake_host_2'})
inst_id = inst_ref["id"]
dest = "fake_host_1"
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.compute.driver,
'check_can_live_migrate_source')
dest_check_data = {"test": "data"}
db.instance_get(context, inst_id).AndReturn(inst_ref)
self.compute.driver.check_can_live_migrate_source(context,
inst_ref,
dest_check_data)
self.mox.ReplayAll()
self.compute.check_can_live_migrate_source(context, inst_id,
dest_check_data)
def test_check_can_live_migrate_destination_works_correctly(self):
"""Confirm check_can_live_migrate_destination works on positive path"""
context = self.context.elevated()
inst_ref = self._create_fake_instance({'host': 'fake_host_2'})
inst_id = inst_ref["id"]
dest = "fake_host_1"
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.compute.driver,
'check_can_live_migrate_destination')
self.mox.StubOutWithMock(self.compute.compute_rpcapi,
'check_can_live_migrate_source')
self.mox.StubOutWithMock(self.compute.driver,
'check_can_live_migrate_destination_cleanup')
db.instance_get(context, inst_id).AndReturn(inst_ref)
dest_check_data = {"test": "data"}
self.compute.driver.check_can_live_migrate_destination(context,
inst_ref, True, False).AndReturn(dest_check_data)
self.compute.compute_rpcapi.check_can_live_migrate_source(context,
inst_ref, dest_check_data)
self.compute.driver.check_can_live_migrate_destination_cleanup(
context, dest_check_data)
self.mox.ReplayAll()
self.compute.check_can_live_migrate_destination(context, inst_id,
True, False)
def test_check_can_live_migrate_destination_fails_dest_check(self):
"""Confirm check_can_live_migrate_destination works on positive path"""
context = self.context.elevated()
inst_ref = self._create_fake_instance({'host': 'fake_host_2'})
inst_id = inst_ref["id"]
dest = "fake_host_1"
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.compute.driver,
'check_can_live_migrate_destination')
db.instance_get(context, inst_id).AndReturn(inst_ref)
self.compute.driver.check_can_live_migrate_destination(context,
inst_ref, True, False).AndRaise(exception.Invalid())
self.mox.ReplayAll()
self.assertRaises(exception.Invalid,
self.compute.check_can_live_migrate_destination,
context, inst_id, True, False)
def test_check_can_live_migrate_destination_fails_source(self):
"""Confirm check_can_live_migrate_destination works on positive path"""
context = self.context.elevated()
inst_ref = self._create_fake_instance({'host': 'fake_host_2'})
inst_id = inst_ref["id"]
dest = "fake_host_1"
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.compute.driver,
'check_can_live_migrate_destination')
self.mox.StubOutWithMock(self.compute.compute_rpcapi,
'check_can_live_migrate_source')
self.mox.StubOutWithMock(self.compute.driver,
'check_can_live_migrate_destination_cleanup')
db.instance_get(context, inst_id).AndReturn(inst_ref)
dest_check_data = {"test": "data"}
self.compute.driver.check_can_live_migrate_destination(context,
inst_ref, True, False).AndReturn(dest_check_data)
self.compute.compute_rpcapi.check_can_live_migrate_source(context,
inst_ref, dest_check_data).AndRaise(exception.Invalid())
self.compute.driver.check_can_live_migrate_destination_cleanup(
context, dest_check_data)
self.mox.ReplayAll()
self.assertRaises(exception.Invalid,
self.compute.check_can_live_migrate_destination,
context, inst_id, True, False)
def test_pre_live_migration_instance_has_no_fixed_ip(self):
"""Confirm raising exception if instance doesn't have fixed_ip."""
# creating instance testdata
inst_ref = self._create_fake_instance({'host': 'dummy'})
c = context.get_admin_context()
context = self.context.elevated()
inst_ref = self._create_fake_instance()
inst_id = inst_ref["id"]
# start test
self.stubs.Set(time, 'sleep', lambda t: None)
self.mox.ReplayAll()
self.assertRaises(exception.FixedIpNotFoundForInstance,
self.compute.pre_live_migration,
c, inst_ref['id'])
# cleanup
db.instance_destroy(c, inst_ref['uuid'])
self.compute.pre_live_migration, context, inst_id)
def test_pre_live_migration_works_correctly(self):
"""Confirm setup_compute_volume is called when volume is mounted."""
@@ -1401,16 +1496,18 @@ class ComputeTestCase(BaseTestCase):
spectacular=True)
self.stubs.Set(nova.compute.manager.ComputeManager,
'_get_instance_nw_info', stupid)
# creating instance testdata
inst_ref = self._create_fake_instance({'host': 'dummy'})
inst_id = inst_ref['id']
c = context.get_admin_context()
nw_info = fake_network.fake_get_instance_nw_info(self.stubs)
# creating mocks
self.mox.StubOutWithMock(self.compute.driver, 'pre_live_migration')
self.compute.driver.pre_live_migration({'block_device_mapping': []})
nw_info = fake_network.fake_get_instance_nw_info(self.stubs)
self.mox.StubOutWithMock(self.compute.driver, 'plug_vifs')
self.compute.driver.plug_vifs(mox.IsA(inst_ref), nw_info)
self.compute.driver.pre_live_migration(mox.IsA(c), mox.IsA(inst_ref),
{'block_device_mapping': []},
mox.IgnoreArg())
self.mox.StubOutWithMock(self.compute.driver,
'ensure_filtering_rules_for_instance')
self.compute.driver.ensure_filtering_rules_for_instance(
@@ -1418,7 +1515,7 @@ class ComputeTestCase(BaseTestCase):
# start test
self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, inst_ref['id'])
ret = self.compute.pre_live_migration(c, inst_id)
self.assertEqual(ret, None)
# cleanup
@@ -1428,13 +1525,15 @@ class ComputeTestCase(BaseTestCase):
"""Confirm exception when pre_live_migration fails."""
# creating instance testdata
inst_ref = self._create_fake_instance({'host': 'dummy'})
inst_uuid = inst_ref['uuid']
inst_id = inst_ref['id']
c = context.get_admin_context()
topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
# creating volume testdata
volume_id = db.volume_create(c, {'size': 1})['id']
values = {'instance_uuid': inst_ref['uuid'], 'device_name': '/dev/vdc',
values = {'instance_uuid': inst_uuid, 'device_name': '/dev/vdc',
'delete_on_termination': False, 'volume_id': volume_id}
db.block_device_mapping_create(c, values)
@@ -1442,60 +1541,60 @@ class ComputeTestCase(BaseTestCase):
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, FLAGS.volume_topic,
{"method": "check_for_export",
"args": {'instance_id': inst_ref['id']}})
"args": {'instance_id': inst_id}})
self.mox.StubOutWithMock(self.compute.driver, 'get_instance_disk_info')
self.mox.StubOutWithMock(self.compute.driver,
'get_instance_disk_info')
self.compute.driver.get_instance_disk_info(inst_ref.name)
rpc.call(c, topic,
{"method": "pre_live_migration",
"args": {'instance_id': inst_ref['id'],
'block_migration': True,
'disk': None},
"version": compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION
}, None).AndRaise(rpc.common.RemoteError('', '', ''))
self.mox.StubOutWithMock(self.compute.compute_rpcapi,
'pre_live_migration')
self.compute.compute_rpcapi.pre_live_migration(c, mox.IsA(inst_ref),
True, None, inst_ref['host']).AndRaise(
rpc.common.RemoteError('', '', ''))
# mocks for rollback
rpc.call(c, 'network', {'method': 'setup_networks_on_host',
'args': {'instance_id': inst_ref['id'],
'args': {'instance_id': inst_id,
'host': self.compute.host,
'teardown': False}})
rpc.call(c, topic,
{"method": "remove_volume_connection",
"args": {'instance_id': inst_ref['id'],
"args": {'instance_id': inst_id,
'volume_id': volume_id},
"version": compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
None)
rpc.cast(c, topic, {"method": "rollback_live_migration_at_destination",
"args": {'instance_id': inst_ref['id']}})
"version": "1.0"}, None)
rpc.cast(c, topic,
{"method": "rollback_live_migration_at_destination",
"args": {'instance_id': inst_id},
"version": "1.0"})
# start test
self.mox.ReplayAll()
self.assertRaises(rpc_common.RemoteError,
self.compute.live_migration,
c, inst_ref['id'], inst_ref['host'], True)
c, inst_id, inst_ref['host'], True)
# cleanup
for bdms in db.block_device_mapping_get_all_by_instance(
c, inst_ref['uuid']):
c, inst_uuid):
db.block_device_mapping_destroy(c, bdms['id'])
db.volume_destroy(c, volume_id)
db.instance_destroy(c, inst_ref['uuid'])
db.instance_destroy(c, inst_uuid)
def test_live_migration_works_correctly(self):
"""Confirm live_migration() works as expected correctly."""
# creating instance testdata
instance = self._create_fake_instance({'host': 'dummy'})
instance_id = instance['id']
c = context.get_admin_context()
inst_ref = db.instance_get(c, instance_id)
topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
inst_ref = self._create_fake_instance({'host': 'dummy'})
inst_uuid = inst_ref['uuid']
inst_id = inst_ref['id']
# create
self.mox.StubOutWithMock(rpc, 'call')
topic = rpc.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
rpc.call(c, topic,
{"method": "pre_live_migration",
"args": {'instance_id': instance_id,
"args": {'instance_id': inst_id,
'block_migration': False,
'disk': None},
"version": compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
@@ -1503,11 +1602,11 @@ class ComputeTestCase(BaseTestCase):
# start test
self.mox.ReplayAll()
ret = self.compute.live_migration(c, inst_ref['id'], inst_ref['host'])
ret = self.compute.live_migration(c, inst_id, inst_ref['host'])
self.assertEqual(ret, None)
# cleanup
db.instance_destroy(c, instance['uuid'])
db.instance_destroy(c, inst_uuid)
def test_post_live_migration_working_correctly(self):
"""Confirm post_live_migration() works as expected correctly."""
@@ -1516,40 +1615,40 @@ class ComputeTestCase(BaseTestCase):
# creating testdata
c = context.get_admin_context()
instance = self._create_fake_instance({
'state_description': 'migrating',
'state': power_state.PAUSED})
instance_id = instance['id']
i_ref = db.instance_get(c, instance_id)
db.instance_update(c, i_ref['uuid'],
inst_ref = self._create_fake_instance({
'state_description': 'migrating',
'state': power_state.PAUSED})
inst_uuid = inst_ref['uuid']
inst_id = inst_ref['id']
db.instance_update(c, inst_uuid,
{'task_state': task_states.MIGRATING,
'power_state': power_state.PAUSED})
v_ref = db.volume_create(c, {'size': 1, 'instance_id': instance_id})
v_ref = db.volume_create(c, {'size': 1, 'instance_id': inst_id})
fix_addr = db.fixed_ip_create(c, {'address': '1.1.1.1',
'instance_id': instance_id})
'instance_id': inst_id})
fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
db.floating_ip_create(c, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']})
# creating mocks
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref, [])
self.compute.driver.unfilter_instance(inst_ref, [])
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, rpc.queue_get_for(c, FLAGS.compute_topic, dest),
{"method": "post_live_migration_at_destination",
"args": {'instance_id': i_ref['id'], 'block_migration': False},
"version": compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
None)
"args": {'instance_id': inst_id, 'block_migration': False},
"version": "1.0"}, None)
self.mox.StubOutWithMock(self.compute.driver, 'unplug_vifs')
self.compute.driver.unplug_vifs(i_ref, [])
self.compute.driver.unplug_vifs(inst_ref, [])
rpc.call(c, 'network', {'method': 'setup_networks_on_host',
'args': {'instance_id': instance_id,
'args': {'instance_id': inst_id,
'host': self.compute.host,
'teardown': True}})
# start test
self.mox.ReplayAll()
self.compute.post_live_migration(c, i_ref, dest)
self.compute.post_live_migration(c, inst_ref, dest)
# make sure floating ips are rewritten to destinatioin hostname.
flo_refs = db.floating_ip_get_all_by_host(c, dest)
@@ -1557,7 +1656,7 @@ class ComputeTestCase(BaseTestCase):
self.assertEqual(flo_refs[0]['address'], flo_addr)
# cleanup
db.instance_destroy(c, instance['uuid'])
db.instance_destroy(c, inst_uuid)
db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr)

View File

@@ -60,12 +60,16 @@ class ComputeRpcAPITestCase(test.TestCase):
expected_msg['args']['host'] = host_param
elif 'host' in expected_msg['args']:
del expected_msg['args']['host']
if 'destination' in expected_msg['args']:
del expected_msg['args']['destination']
if 'instance' in expected_msg['args']:
instance = expected_msg['args']['instance']
del expected_msg['args']['instance']
if method in ['rollback_live_migration_at_destination',
'pre_live_migration', 'remove_volume_connection',
'post_live_migration_at_destination']:
'post_live_migration_at_destination',
'check_can_live_migrate_destination',
'check_can_live_migrate_source']:
expected_msg['args']['instance_id'] = instance['id']
elif method == 'get_instance_disk_info':
expected_msg['args']['instance_name'] = instance['name']
@@ -78,6 +82,8 @@ class ComputeRpcAPITestCase(test.TestCase):
kwargs['cast'] = False
if 'host' in kwargs:
host = kwargs['host']
elif 'destination' in kwargs:
host = kwargs['destination']
else:
host = kwargs['instance']['host']
expected_topic = '%s.%s' % (FLAGS.compute_topic, host)
@@ -112,17 +118,15 @@ class ComputeRpcAPITestCase(test.TestCase):
self._test_compute_api('attach_volume', 'cast',
instance=self.fake_instance, volume_id='id', mountpoint='mp')
def test_check_shared_storage_test_file(self):
self._test_compute_api('check_shared_storage_test_file', 'call',
filename='fn', host='host')
def test_check_can_live_migrate_destination(self):
self._test_compute_api('check_can_live_migrate_destination', 'call',
version='1.2', instance=self.fake_instance, destination='dest',
block_migration=True, disk_over_commit=True)
def test_cleanup_shared_storage_test_file(self):
self._test_compute_api('cleanup_shared_storage_test_file', 'cast',
filename='fn', host='host')
def test_compare_cpu(self):
self._test_compute_api('compare_cpu', 'call', cpu_info='info',
host='host')
def test_check_can_live_migrate_source(self):
self._test_compute_api('check_can_live_migrate_source', 'call',
version='1.2', instance=self.fake_instance,
dest_check_data={"test": "data"})
def test_confirm_resize_cast(self):
self._test_compute_api('confirm_resize', 'cast',
@@ -132,10 +136,6 @@ class ComputeRpcAPITestCase(test.TestCase):
self._test_compute_api('confirm_resize', 'call',
instance=self.fake_instance, migration_id='id', host='host')
def test_create_shared_storage_test_file(self):
self._test_compute_api('create_shared_storage_test_file', 'call',
host='host')
def test_detach_volume(self):
self._test_compute_api('detach_volume', 'cast',
instance=self.fake_instance, volume_id='id')

View File

@@ -28,6 +28,7 @@ from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import notifications
from nova.openstack.common import jsonutils
from nova.openstack.common import rpc
from nova.openstack.common.rpc import common as rpc_common
@@ -440,23 +441,32 @@ class SchedulerTestCase(test.TestCase):
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_common_check')
self.mox.StubOutWithMock(self.driver.compute_rpcapi,
'check_can_live_migrate_destination')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
self.mox.StubOutWithMock(driver, 'cast_to_compute_host')
self.mox.StubOutWithMock(notifications, 'send_update')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
instance_uuid = instance['uuid']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self.driver._live_migration_dest_check(self.context, instance, dest)
self.driver._live_migration_common_check(self.context, instance,
dest, block_migration, disk_over_commit)
db.instance_update_and_get_original(self.context, instance['id'],
dest)
self.driver.compute_rpcapi.check_can_live_migrate_destination(
self.context, instance, dest, block_migration, disk_over_commit)
db.instance_update_and_get_original(self.context, instance_uuid,
{"task_state": task_states.MIGRATING}).AndReturn(
(instance, instance))
notifications.send_update(self.context, instance, instance,
service="scheduler")
driver.cast_to_compute_host(self.context, instance['host'],
'live_migration', update_db=False,
@@ -469,38 +479,13 @@ class SchedulerTestCase(test.TestCase):
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def _check_shared_storage(self, dest, instance, check_result):
tmp_filename = 'test-filename'
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'create_shared_storage_test_file',
'args': {},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
None).AndReturn(tmp_filename)
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
rpc.call(self.context, 'src_queue',
{'method': 'check_shared_storage_test_file',
'args': {'filename': tmp_filename},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
None).AndReturn(check_result)
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.cast(self.context, 'dest_queue',
{'method': 'cleanup_shared_storage_test_file',
'args': {'filename': tmp_filename},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION})
def test_live_migration_all_checks_pass(self):
"""Test live migration when all checks pass."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(utils, 'service_is_up')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'instance_update_and_get_original')
@@ -510,8 +495,12 @@ class SchedulerTestCase(test.TestCase):
block_migration = True
disk_over_commit = True
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
instance_uuid = instance['uuid']
db.instance_get(self.context,
instance_id).AndReturn(instance)
# Source checks
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(['fake_service2'])
utils.service_is_up('fake_service2').AndReturn(True)
@@ -521,60 +510,42 @@ class SchedulerTestCase(test.TestCase):
dest).AndReturn(['fake_service3'])
utils.service_is_up('fake_service3').AndReturn(True)
# assert_compute_node_has_enough_memory()
self.driver._get_compute_info(self.context, dest,
'memory_mb').AndReturn(2048)
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'memory_mb': 2048,
'hypervisor_version': 1}]}])
db.instance_get_all_by_host(self.context, dest).AndReturn(
[dict(memory_mb=256), dict(memory_mb=512)])
# assert_compute_node_has_enough_disk()
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1025)
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue1')
instance_disk_info_msg = {
'method': 'get_instance_disk_info',
'args': {
'instance_name': instance['name'],
},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION,
}
instance_disk_info = [{'disk_size': 1024 * (1024 ** 3)}]
rpc.call(self.context,
'src_queue1',
instance_disk_info_msg,
None).AndReturn(jsonutils.dumps(instance_disk_info))
# Common checks (shared storage ok, same hypervisor, etc)
self._check_shared_storage(dest, instance, False)
# Common checks (same hypervisor, etc)
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1}]}])
# newer hypervisor version for src
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
'args': {'cpu_info': 'fake_cpu_info'},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
None).AndReturn(True)
db.instance_update_and_get_original(self.context, instance['id'],
rpc.call(self.context, "compute.fake_host2",
{"method": 'check_can_live_migrate_destination',
"args": {'instance_id': instance_id,
'block_migration': block_migration,
'disk_over_commit': disk_over_commit},
"version": "1.2"},
None)
db.instance_update_and_get_original(self.context, instance_uuid,
{"task_state": task_states.MIGRATING}).AndReturn(
(instance, instance))
driver.cast_to_compute_host(self.context, instance['host'],
'live_migration', update_db=False,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration)
self.mox.ReplayAll()
result = self.driver.schedule_live_migration(self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
self.assertEqual(result, None)
@@ -587,17 +558,44 @@ class SchedulerTestCase(test.TestCase):
dest = 'fake_host2'
block_migration = False
instance = self._live_migration_instance()
instance_id = instance['id']
instance['power_state'] = power_state.NOSTATE
db.instance_get(self.context, instance['id']).AndReturn(instance)
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.mox.ReplayAll()
self.assertRaises(exception.InstanceNotRunning,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration)
def test_live_migration_compute_src_not_exist(self):
"""Raise exception when src compute node is does not exist."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(utils, 'service_is_up')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
dest = 'fake_host2'
block_migration = False
instance = self._live_migration_instance()
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
# Compute down
db.service_get_all_compute_by_host(self.context,
instance['host']).AndRaise(
exception.NotFound())
self.mox.ReplayAll()
self.assertRaises(exception.ComputeServiceUnavailable,
self.driver.schedule_live_migration, self.context,
instance_id=instance_id, dest=dest,
block_migration=block_migration)
def test_live_migration_compute_src_not_alive(self):
"""Raise exception when src compute node is not alive."""
@@ -608,7 +606,9 @@ class SchedulerTestCase(test.TestCase):
dest = 'fake_host2'
block_migration = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
# Compute down
db.service_get_all_compute_by_host(self.context,
@@ -618,7 +618,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.ComputeServiceUnavailable,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration)
def test_live_migration_compute_dest_not_alive(self):
@@ -632,7 +632,9 @@ class SchedulerTestCase(test.TestCase):
dest = 'fake_host2'
block_migration = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
@@ -643,7 +645,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.ComputeServiceUnavailable,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration)
def test_live_migration_dest_check_service_same_host(self):
@@ -657,10 +659,12 @@ class SchedulerTestCase(test.TestCase):
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
instance_id = instance['id']
# make dest same as src
dest = instance['host']
db.instance_get(self.context, instance['id']).AndReturn(instance)
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
@@ -670,7 +674,7 @@ class SchedulerTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.UnableToMigrateToSelf,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration,
disk_over_commit=False)
@@ -688,138 +692,29 @@ class SchedulerTestCase(test.TestCase):
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
dest).AndReturn(['fake_service3'])
utils.service_is_up('fake_service3').AndReturn(True)
self.driver._get_compute_info(self.context, dest,
'memory_mb').AndReturn(2048)
self.driver._get_compute_info(self.context, dest).AndReturn(
{'memory_mb': 2048})
db.instance_get_all_by_host(self.context, dest).AndReturn(
[dict(memory_mb=1024), dict(memory_mb=512)])
self.mox.ReplayAll()
self.assertRaises(exception.MigrationError,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def test_block_migration_dest_check_service_lack_disk(self):
"""Confirms exception raises when dest doesn't have enough disk."""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
self.mox.StubOutWithMock(utils, 'service_is_up')
self.mox.StubOutWithMock(self.driver,
'assert_compute_node_has_enough_memory')
self.mox.StubOutWithMock(self.driver, '_get_compute_info')
self.mox.StubOutWithMock(db, 'instance_get_all_by_host')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
dest = 'fake_host2'
block_migration = True
disk_over_commit = True
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
db.service_get_all_compute_by_host(self.context,
dest).AndReturn(['fake_service3'])
utils.service_is_up('fake_service3').AndReturn(True)
# Enough memory
self.driver.assert_compute_node_has_enough_memory(self.context,
instance, dest)
# Not enough disk
self.driver._get_compute_info(self.context, dest,
'disk_available_least').AndReturn(1023)
rpc.queue_get_for(self.context, FLAGS.compute_topic,
instance['host']).AndReturn('src_queue')
instance_disk_info_msg = {
'method': 'get_instance_disk_info',
'args': {
'instance_name': instance['name'],
},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION,
}
instance_disk_info = [{'disk_size': 1024 * (1024 ** 3)}]
rpc.call(self.context,
'src_queue',
instance_disk_info_msg,
None).AndReturn(jsonutils.dumps(instance_disk_info))
self.mox.ReplayAll()
self.assertRaises(exception.MigrationError,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def test_live_migration_different_shared_storage_raises(self):
"""Src and dest must have same shared storage for live migration"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self._check_shared_storage(dest, instance, False)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidSharedStorage,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def test_live_migration_same_shared_storage_okay(self):
"""live migration works with same src and dest shared storage"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self._check_shared_storage(dest, instance, False)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidSharedStorage,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def test_live_migration_different_hypervisor_type_raises(self):
"""Confirm live_migration to hypervisor of different type raises"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
@@ -832,18 +727,16 @@ class SchedulerTestCase(test.TestCase):
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self._check_shared_storage(dest, instance, True)
self.driver._live_migration_dest_check(self.context, instance, dest)
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1}]}])
# different hypervisor type
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(
[{'compute_node': [{'hypervisor_type': 'not-xen',
@@ -852,11 +745,12 @@ class SchedulerTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.InvalidHypervisorType,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def test_live_migration_dest_hypervisor_version_older_raises(self):
"""Confirm live migration to older hypervisor raises"""
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
@@ -869,18 +763,16 @@ class SchedulerTestCase(test.TestCase):
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
instance_id = instance['id']
db.instance_get(self.context,
instance_id).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self._check_shared_storage(dest, instance, True)
self.driver._live_migration_dest_check(self.context, instance, dest)
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1}]}])
# newer hypervisor version for src
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
@@ -888,53 +780,10 @@ class SchedulerTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.DestinationHypervisorTooOld,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
instance_id=instance_id, dest=dest,
block_migration=block_migration,
disk_over_commit=disk_over_commit)
def test_live_migration_dest_host_incompatable_cpu_raises(self):
self.mox.StubOutWithMock(db, 'instance_get')
self.mox.StubOutWithMock(self.driver, '_live_migration_src_check')
self.mox.StubOutWithMock(self.driver, '_live_migration_dest_check')
self.mox.StubOutWithMock(rpc, 'queue_get_for')
self.mox.StubOutWithMock(rpc, 'call')
self.mox.StubOutWithMock(rpc, 'cast')
self.mox.StubOutWithMock(db, 'service_get_all_compute_by_host')
dest = 'fake_host2'
block_migration = False
disk_over_commit = False
instance = self._live_migration_instance()
db.instance_get(self.context, instance['id']).AndReturn(instance)
self.driver._live_migration_src_check(self.context, instance)
self.driver._live_migration_dest_check(self.context, instance,
dest, block_migration, disk_over_commit)
self._check_shared_storage(dest, instance, True)
db.service_get_all_compute_by_host(self.context, dest).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1}]}])
db.service_get_all_compute_by_host(self.context,
instance['host']).AndReturn(
[{'compute_node': [{'hypervisor_type': 'xen',
'hypervisor_version': 1,
'cpu_info': 'fake_cpu_info'}]}])
rpc.queue_get_for(self.context, FLAGS.compute_topic,
dest).AndReturn('dest_queue')
rpc.call(self.context, 'dest_queue',
{'method': 'compare_cpu',
'args': {'cpu_info': 'fake_cpu_info'},
'version': compute_rpcapi.ComputeAPI.BASE_RPC_API_VERSION},
None).AndRaise(rpc_common.RemoteError())
self.mox.ReplayAll()
self.assertRaises(rpc_common.RemoteError,
self.driver.schedule_live_migration, self.context,
instance_id=instance['id'], dest=dest,
block_migration=block_migration)
class SchedulerDriverBaseTestCase(SchedulerTestCase):
"""Test cases for base scheduler driver class methods

View File

@@ -30,6 +30,8 @@ from xml.dom import minidom
from nova.api.ec2 import cloud
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute import rpcapi as compute_rpcapi
from nova.compute import utils as compute_utils
from nova.compute import vm_states
from nova import context
from nova import db
@@ -1483,6 +1485,166 @@ class LibvirtConnTestCase(test.TestCase):
db.instance_destroy(self.context, instance_ref['uuid'])
def test_check_can_live_migrate_dest_all_pass_with_block_migration(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest = "fake_host_2"
src = instance_ref['host']
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, '_get_compute_info')
self.mox.StubOutWithMock(conn, 'get_instance_disk_info')
self.mox.StubOutWithMock(conn, '_create_shared_storage_test_file')
self.mox.StubOutWithMock(conn, '_compare_cpu')
conn._get_compute_info(self.context, FLAGS.host).AndReturn(
{'disk_available_least': 400})
conn.get_instance_disk_info(instance_ref["name"]).AndReturn(
'[{"virt_disk_size":2}]')
# _check_cpu_match
conn._get_compute_info(self.context,
src).AndReturn({'cpu_info': "asdf"})
conn._compare_cpu("asdf")
# mounted_on_same_shared_storage
filename = "file"
conn._create_shared_storage_test_file().AndReturn(filename)
self.mox.ReplayAll()
return_value = conn.check_can_live_migrate_destination(self.context,
instance_ref, True, False)
self.assertDictMatch(return_value,
{"filename": "file", "block_migration": True})
def test_check_can_live_migrate_dest_all_pass_no_block_migration(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest = "fake_host_2"
src = instance_ref['host']
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, '_get_compute_info')
self.mox.StubOutWithMock(conn, '_create_shared_storage_test_file')
self.mox.StubOutWithMock(conn, '_compare_cpu')
# _check_cpu_match
conn._get_compute_info(self.context,
src).AndReturn({'cpu_info': "asdf"})
conn._compare_cpu("asdf")
# mounted_on_same_shared_storage
filename = "file"
conn._create_shared_storage_test_file().AndReturn(filename)
self.mox.ReplayAll()
return_value = conn.check_can_live_migrate_destination(self.context,
instance_ref, False, False)
self.assertDictMatch(return_value,
{"filename": "file", "block_migration": False})
def test_check_can_live_migrate_dest_fails_not_enough_disk(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest = "fake_host_2"
src = instance_ref['host']
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, '_get_compute_info')
self.mox.StubOutWithMock(conn, 'get_instance_disk_info')
conn._get_compute_info(self.context, FLAGS.host).AndReturn(
{'disk_available_least': 0})
conn.get_instance_disk_info(instance_ref["name"]).AndReturn(
'[{"virt_disk_size":2}]')
self.mox.ReplayAll()
self.assertRaises(exception.MigrationError,
conn.check_can_live_migrate_destination,
self.context, instance_ref, True, False)
def test_check_can_live_migrate_dest_incompatible_cpu_raises(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest = "fake_host_2"
src = instance_ref['host']
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, '_get_compute_info')
self.mox.StubOutWithMock(conn, '_compare_cpu')
conn._get_compute_info(self.context, src).AndReturn(
{'cpu_info': "asdf"})
conn._compare_cpu("asdf").AndRaise(exception.InvalidCPUInfo)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidCPUInfo,
conn.check_can_live_migrate_destination,
self.context, instance_ref, False, False)
def test_check_can_live_migrate_dest_fail_space_with_block_migration(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest = "fake_host_2"
src = instance_ref['host']
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, '_get_compute_info')
self.mox.StubOutWithMock(conn, 'get_instance_disk_info')
conn._get_compute_info(self.context, FLAGS.host).AndReturn(
{'disk_available_least': 0})
conn.get_instance_disk_info(instance_ref["name"]).AndReturn(
'[{"virt_disk_size":2}]')
self.mox.ReplayAll()
self.assertRaises(exception.MigrationError,
conn.check_can_live_migrate_destination,
self.context, instance_ref, True, False)
def test_check_can_live_migrate_dest_cleanup_works_correctly(self):
dest_check_data = {"filename": "file", "block_migration": True}
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, '_cleanup_shared_storage_test_file')
conn._cleanup_shared_storage_test_file("file")
self.mox.ReplayAll()
conn.check_can_live_migrate_destination_cleanup(self.context,
dest_check_data)
def test_check_can_live_migrate_source_works_correctly(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest_check_data = {"filename": "file", "block_migration": True}
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, "_check_shared_storage_test_file")
conn._check_shared_storage_test_file("file").AndReturn(False)
self.mox.ReplayAll()
conn.check_can_live_migrate_source(self.context, instance_ref,
dest_check_data)
def test_check_can_live_migrate_dest_fail_shared_storage_with_blockm(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest_check_data = {"filename": "file", "block_migration": True}
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, "_check_shared_storage_test_file")
conn._check_shared_storage_test_file("file").AndReturn(True)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidSharedStorage,
conn.check_can_live_migrate_source,
self.context, instance_ref, dest_check_data)
def test_check_can_live_migrate_no_shared_storage_no_blck_mig_raises(self):
instance_ref = db.instance_create(self.context, self.test_instance)
dest_check_data = {"filename": "file", "block_migration": False}
conn = libvirt_driver.LibvirtDriver(False)
self.mox.StubOutWithMock(conn, "_check_shared_storage_test_file")
conn._check_shared_storage_test_file("file").AndReturn(False)
self.mox.ReplayAll()
self.assertRaises(exception.InvalidSharedStorage,
conn.check_can_live_migrate_source,
self.context, instance_ref, dest_check_data)
def test_live_migration_raises_exception(self):
"""Confirms recover method is called when exceptions are raised."""
# Preparing data
@@ -1535,7 +1697,7 @@ class LibvirtConnTestCase(test.TestCase):
db.volume_destroy(self.context, volume_ref['id'])
db.instance_destroy(self.context, instance_ref['uuid'])
def test_pre_live_migration_works_correctly(self):
def test_pre_live_migration_works_correctly_mocked(self):
"""Confirms pre_block_migration works correctly."""
# Creating testdata
vol = {'block_device_mapping': [
@@ -1543,6 +1705,14 @@ class LibvirtConnTestCase(test.TestCase):
{'connection_info': 'dummy', 'mount_device': '/dev/sdb'}]}
conn = libvirt_driver.LibvirtDriver(False)
class FakeNetworkInfo():
def fixed_ips(self):
return ["test_ip_addr"]
inst_ref = {'id': 'foo'}
c = context.get_admin_context()
nw_info = FakeNetworkInfo()
# Creating mocks
self.mox.StubOutWithMock(driver, "block_device_info_get_mapping")
driver.block_device_info_get_mapping(vol
@@ -1552,10 +1722,12 @@ class LibvirtConnTestCase(test.TestCase):
conn.volume_driver_method('connect_volume',
v['connection_info'],
v['mount_device'].rpartition("/")[2])
self.mox.StubOutWithMock(conn, 'plug_vifs')
conn.plug_vifs(mox.IsA(inst_ref), nw_info)
# Starting test
self.mox.ReplayAll()
self.assertEqual(conn.pre_live_migration(vol), None)
result = conn.pre_live_migration(c, inst_ref, vol, nw_info)
self.assertEqual(result, None)
def test_pre_block_migration_works_correctly(self):
"""Confirms pre_block_migration works correctly."""

View File

@@ -455,32 +455,6 @@ class _VirtDriverTestCase(_FakeDriverBackendTestCase):
instance_ref, network_info = self._get_running_instance()
self.connection.refresh_provider_fw_rules()
@catch_notimplementederror
def test_compare_cpu(self):
cpu_info = '''{ "topology": {
"sockets": 1,
"cores": 2,
"threads": 1 },
"features": [
"xtpr",
"tm2",
"est",
"vmx",
"ds_cpl",
"monitor",
"pbe",
"tm",
"ht",
"ss",
"acpi",
"ds",
"vme"],
"arch": "x86_64",
"model": "Penryn",
"vendor": "Intel" }'''
self.connection.compare_cpu(cpu_info)
@catch_notimplementederror
def test_ensure_filtering_for_instance(self):
instance_ref = test_utils.get_test_instance()

View File

@@ -716,9 +716,6 @@ class BareMetalDriver(driver.ComputeDriver):
LOG.info(_('Compute_service record updated for %s ') % host)
db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
def compare_cpu(self, cpu_info):
raise NotImplementedError()
def ensure_filtering_rules_for_instance(self, instance_ref,
time=None):
raise NotImplementedError()

View File

@@ -251,20 +251,6 @@ class ComputeDriver(object):
"""Detach the disk attached to the instance"""
raise NotImplementedError()
def compare_cpu(self, cpu_info):
"""Compares given cpu info against host
Before attempting to migrate a VM to this host,
compare_cpu is called to ensure that the VM will
actually run here.
:param cpu_info: (str) JSON structure describing the source CPU.
:returns: None if migration is acceptable
:raises: :py:class:`~nova.exception.InvalidCPUInfo` if migration
is not acceptable.
"""
raise NotImplementedError()
def migrate_disk_and_power_off(self, context, instance, dest,
instance_type, network_info):
"""
@@ -357,27 +343,63 @@ class ComputeDriver(object):
:param host: hostname that compute manager is currently running
"""
# TODO(Vek): Need to pass context in for access to auth_token
raise NotImplementedError()
def live_migration(self, ctxt, instance_ref, dest,
post_method, recover_method):
"""Spawning live_migration operation for distributing high-load.
post_method, recover_method, block_migration=False):
"""Live migration of an instance to another host.
:param ctxt: security context
:param instance_ref:
:params ctxt: security context
:params instance_ref:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:param dest: destination host
:param post_method:
:params dest: destination host
:params post_method:
post operation method.
expected nova.compute.manager.post_live_migration.
:param recover_method:
:params recover_method:
recovery method when any exception occurs.
expected nova.compute.manager.recover_live_migration.
:params block_migration: if true, migrate VM disk.
"""
raise NotImplementedError()
def check_can_live_migrate_destination(self, ctxt, instance_ref,
block_migration=False,
disk_over_commit=False):
"""Check if it is possible to execute live migration.
This runs checks on the destination host, and then calls
back to the source host to check the results.
:param ctxt: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance
:param dest: destination host
:param block_migration: if true, prepare for block migration
:param disk_over_commit: if true, allow disk over commit
"""
raise NotImplementedError()
def check_can_live_migrate_destination_cleanup(self, ctxt,
dest_check_data):
"""Do required cleanup on dest host after check_can_live_migrate calls
:param ctxt: security context
:param dest_check_data: result of check_can_live_migrate_destination
"""
raise NotImplementedError()
def check_can_live_migrate_source(self, ctxt, instance_ref,
dest_check_data):
"""Check if it is possible to execute live migration.
This checks if the live migration can succeed, based on the
results from check_can_live_migrate_destination.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance
:param dest_check_data: result of check_can_live_migrate_destination
"""
# TODO(Vek): Need to pass context in for access to auth_token
raise NotImplementedError()
def refresh_security_group_rules(self, security_group_id):

View File

@@ -259,10 +259,6 @@ class FakeDriver(driver.ComputeDriver):
LOG.info(_('Compute_service record updated for %s ') % host)
db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
def compare_cpu(self, xml):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
@@ -283,7 +279,8 @@ class FakeDriver(driver.ComputeDriver):
def confirm_migration(self, migration, instance, network_info):
return
def pre_live_migration(self, block_device_info):
def pre_live_migration(self, context, instance_ref, block_device_info,
network_info):
"""This method is supported only by libvirt."""
return

View File

@@ -2202,7 +2202,118 @@ class LibvirtDriver(driver.ComputeDriver):
LOG.info(_('Compute_service record updated for %s ') % host)
db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
def compare_cpu(self, cpu_info):
def check_can_live_migrate_destination(self, ctxt, instance_ref,
block_migration=False,
disk_over_commit=False):
"""Check if it is possible to execute live migration.
This runs checks on the destination host, and then calls
back to the source host to check the results.
:param ctxt: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance
:param dest: destination host
:param block_migration: if true, prepare for block migration
:param disk_over_commit: if true, allow disk over commit
"""
if block_migration:
self._assert_compute_node_has_enough_disk(ctxt,
instance_ref,
disk_over_commit)
# Compare CPU
src = instance_ref['host']
source_cpu_info = self._get_compute_info(ctxt, src)['cpu_info']
self._compare_cpu(source_cpu_info)
# Create file on storage, to be checked on source host
filename = self._create_shared_storage_test_file()
return {"filename": filename, "block_migration": block_migration}
def check_can_live_migrate_destination_cleanup(self, ctxt,
dest_check_data):
"""Do required cleanup on dest host after check_can_live_migrate calls
:param ctxt: security context
:param disk_over_commit: if true, allow disk over commit
"""
filename = dest_check_data["filename"]
self._cleanup_shared_storage_test_file(filename)
def check_can_live_migrate_source(self, ctxt, instance_ref,
dest_check_data):
"""Check if it is possible to execute live migration.
This checks if the live migration can succeed, based on the
results from check_can_live_migrate_destination.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance
:param dest_check_data: result of check_can_live_migrate_destination
"""
# Checking shared storage connectivity
# if block migration, instances_paths should not be on shared storage.
dest = FLAGS.host
filename = dest_check_data["filename"]
block_migration = dest_check_data["block_migration"]
shared = self._check_shared_storage_test_file(filename)
if block_migration:
if shared:
reason = _("Block migration can not be used "
"with shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
elif not shared:
reason = _("Live migration can not be used "
"without shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
def _get_compute_info(self, context, host):
"""Get compute host's information specified by key"""
compute_node_ref = db.service_get_all_compute_by_host(context, host)
return compute_node_ref[0]['compute_node'][0]
def _assert_compute_node_has_enough_disk(self, context, instance_ref,
disk_over_commit):
"""Checks if host has enough disk for block migration."""
# Libvirt supports qcow2 disk format,which is usually compressed
# on compute nodes.
# Real disk image (compressed) may enlarged to "virtual disk size",
# that is specified as the maximum disk size.
# (See qemu-img -f path-to-disk)
# Scheduler recognizes destination host still has enough disk space
# if real disk size < available disk size
# if disk_over_commit is True,
# otherwise virtual disk size < available disk size.
# Getting total available disk of host
dest = FLAGS.host
available_gb = self._get_compute_info(context,
dest)['disk_available_least']
available = available_gb * (1024 ** 3)
ret = self.get_instance_disk_info(instance_ref['name'])
disk_infos = jsonutils.loads(ret)
necessary = 0
if disk_over_commit:
for info in disk_infos:
necessary += int(info['disk_size'])
else:
for info in disk_infos:
necessary += int(info['virt_disk_size'])
# Check that available disk > necessary disk
if (available - necessary) < 0:
instance_uuid = instance_ref['uuid']
reason = _("Unable to migrate %(instance_uuid)s to %(dest)s: "
"Lack of disk(host:%(available)s "
"<= instance:%(necessary)s)")
raise exception.MigrationError(reason=reason % locals())
def _compare_cpu(self, cpu_info):
"""Checks the host cpu is compatible to a cpu given by xml.
"xml" must be a part of libvirt.openReadonly().getCapabilities().
@@ -2214,9 +2325,7 @@ class LibvirtDriver(driver.ComputeDriver):
:returns:
None. if given cpu info is not compatible to this server,
raise exception.
"""
info = jsonutils.loads(cpu_info)
LOG.info(_('Instance launched has CPU info:\n%s') % cpu_info)
cpu = config.LibvirtConfigCPU()
@@ -2240,8 +2349,33 @@ class LibvirtDriver(driver.ComputeDriver):
raise
if ret <= 0:
LOG.error(reason=m % locals())
raise exception.InvalidCPUInfo(reason=m % locals())
def _create_shared_storage_test_file(self):
"""Makes tmpfile under FLAGS.instance_path."""
dirpath = FLAGS.instances_path
fd, tmp_file = tempfile.mkstemp(dir=dirpath)
LOG.debug(_("Creating tmpfile %s to notify to other "
"compute nodes that they should mount "
"the same storage.") % tmp_file)
os.close(fd)
return os.path.basename(tmp_file)
def _check_shared_storage_test_file(self, filename):
"""Confirms existence of the tmpfile under FLAGS.instances_path.
Cannot confirm tmpfile return False."""
tmp_file = os.path.join(FLAGS.instances_path, filename)
if not os.path.exists(tmp_file):
return False
else:
return True
def _cleanup_shared_storage_test_file(self, filename):
"""Removes existence of the tmpfile under FLAGS.instances_path."""
tmp_file = os.path.join(FLAGS.instances_path, filename)
os.remove(tmp_file)
def ensure_filtering_rules_for_instance(self, instance_ref, network_info,
time=None):
"""Setting up filtering rules and waiting for its completion.
@@ -2363,14 +2497,9 @@ class LibvirtDriver(driver.ComputeDriver):
timer.f = wait_for_live_migration
return timer.start(interval=0.5).wait()
def pre_live_migration(self, block_device_info):
"""Preparation live migration.
:params block_device_info:
It must be the result of _get_instance_volume_bdms()
at compute manager.
"""
def pre_live_migration(self, context, instance_ref, block_device_info,
network_info):
"""Preparation live migration."""
# Establishing connection to volume server.
block_device_mapping = driver.block_device_info_get_mapping(
block_device_info)
@@ -2381,6 +2510,24 @@ class LibvirtDriver(driver.ComputeDriver):
connection_info,
mount_device)
# We call plug_vifs before the compute manager calls
# ensure_filtering_rules_for_instance, to ensure bridge is set up
# Retry operation is necessary because continuously request comes,
# concorrent request occurs to iptables, then it complains.
max_retry = FLAGS.live_migration_retry_count
for cnt in range(max_retry):
try:
self.plug_vifs(instance_ref, network_info)
break
except exception.ProcessExecutionError:
if cnt == max_retry - 1:
raise
else:
LOG.warn(_("plug_vifs() failed %(cnt)d."
"Retry up to %(max_retry)d for %(hostname)s.")
% locals())
time.sleep(1)
def pre_block_migration(self, ctxt, instance_ref, disk_info_json):
"""Preparation block migration.

View File

@@ -410,10 +410,6 @@ class XenAPIDriver(driver.ComputeDriver):
LOG.info(_('Compute_service record updated for %s ') % host)
db.compute_node_update(ctxt, compute_node_ref[0]['id'], dic)
def compare_cpu(self, xml):
"""This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.')
def ensure_filtering_rules_for_instance(self, instance_ref, network_info):
"""This method is supported only libvirt."""
# NOTE(salvatore-orlando): it enforces security groups on