block migration feature added

This commit is contained in:
Kei Masumoto
2011-06-11 19:48:48 +09:00
parent a94992f199
commit 09bd503a98
12 changed files with 449 additions and 167 deletions

View File

@@ -649,11 +649,12 @@ class VmCommands(object):
instance['availability_zone'],
instance['launch_index'])
def live_migration(self, ec2_id, dest):
def _migration(self, ec2_id, dest, block_migration=False):
"""Migrates a running instance to a new machine.
:param ec2_id: instance id which comes from euca-describe-instance.
:param dest: destination host name.
:param block_migration: if True, do block_migration.
"""
@@ -676,11 +677,30 @@ class VmCommands(object):
{"method": "live_migration",
"args": {"instance_id": instance_id,
"dest": dest,
"topic": FLAGS.compute_topic}})
"topic": FLAGS.compute_topic,
"block_migration": block_migration}})
print _('Migration of %s initiated.'
'Check its progress using euca-describe-instances.') % ec2_id
def live_migration(self, ec2_id, dest):
"""Migrates a running instance to a new machine.
:param ec2_id: instance id which comes from euca-describe-instance.
:param dest: destination host name.
"""
self._migration(ec2_id, dest)
def block_migration(self, ec2_id, dest):
"""Migrates a running instance to a new machine with storage data.
:param ec2_id: instance id which comes from euca-describe-instance.
:param dest: destination host name.
"""
self._migration(ec2_id, dest, True)
class ServiceCommands(object):
"""Enable and disable running services"""
@@ -749,9 +769,19 @@ class ServiceCommands(object):
mem_u = result['resource']['memory_mb_used']
hdd_u = result['resource']['local_gb_used']
cpu_sum = 0
mem_sum = 0
hdd_sum = 0
print 'HOST\t\t\tPROJECT\t\tcpu\tmem(mb)\tdisk(gb)'
print '%s(total)\t\t\t%s\t%s\t%s' % (host, cpu, mem, hdd)
print '%s(used)\t\t\t%s\t%s\t%s' % (host, cpu_u, mem_u, hdd_u)
print '%s(used_now)\t\t\t%s\t%s\t%s' % (host, cpu_u, mem_u, hdd_u)
for p_id, val in result['usage'].items():
cpu_sum += val['vcpus']
mem_sum += val['memory_mb']
hdd_sum += val['local_gb']
print '%s(used_max)\t\t\t%s\t%s\t%s' % (host, cpu_sum,
mem_sum, hdd_sum)
for p_id, val in result['usage'].items():
print '%s\t\t%s\t\t%s\t%s\t%s' % (host,
p_id,

View File

@@ -921,11 +921,13 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
return self.driver.update_available_resource(context, self.host)
def pre_live_migration(self, context, instance_id, time=None):
def pre_live_migration(self, context, instance_id, time=None,
block_migration=False, **kwargs):
"""Preparations for live migration at dest host.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param block_migration: if true, prepare for block migration
"""
if not time:
@@ -977,17 +979,24 @@ class ComputeManager(manager.SchedulerDependentManager):
# onto destination host.
self.driver.ensure_filtering_rules_for_instance(instance_ref)
def live_migration(self, context, instance_id, dest):
# Preparation for block migration
if block_migration:
self.driver.pre_block_migration(context,
instance_ref,
kwargs.get('disk'))
def live_migration(self, context, instance_id,
dest, block_migration=False):
"""Executing live migration.
:param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param dest: destination host
:param block_migration: if true, do block migration
"""
# Get instance for error handling.
instance_ref = self.db.instance_get(context, instance_id)
i_name = instance_ref.name
try:
# Checking volume node is working correctly when any volumes
@@ -998,13 +1007,20 @@ class ComputeManager(manager.SchedulerDependentManager):
{"method": "check_for_export",
"args": {'instance_id': instance_id}})
# Asking dest host to preparing live migration.
args = {}
args['instance_id'] = instance_id
if block_migration:
args['block_migration'] = block_migration
args['disk'] = \
self.driver.get_instance_disk_info(context, instance_ref)
rpc.call(context,
self.db.queue_get_for(context, FLAGS.compute_topic, dest),
{"method": "pre_live_migration",
"args": {'instance_id': instance_id}})
"args": args})
except Exception:
i_name = instance_ref.name
msg = _("Pre live migration for %(i_name)s failed at %(dest)s")
LOG.error(msg % locals())
self.recover_live_migration(context, instance_ref)
@@ -1015,9 +1031,11 @@ class ComputeManager(manager.SchedulerDependentManager):
# nothing must be recovered in this version.
self.driver.live_migration(context, instance_ref, dest,
self.post_live_migration,
self.recover_live_migration)
self.recover_live_migration,
block_migration)
def post_live_migration(self, ctxt, instance_ref, dest):
def post_live_migration(self, ctxt, instance_ref,
dest, block_migration=False):
"""Post operations for live migration.
This method is called from live_migration
@@ -1068,6 +1086,10 @@ class ComputeManager(manager.SchedulerDependentManager):
# Restore instance/volume state
self.recover_live_migration(ctxt, instance_ref, dest)
# No instance booting at source host, but instance dir
# must be deleted for preparing next block migration
if block_migration:
self.driver.destroy(instance_ref)
LOG.info(_('Migrating %(i_name)s to %(dest)s finished successfully.')
% locals())
@@ -1075,14 +1097,20 @@ class ComputeManager(manager.SchedulerDependentManager):
"Domain not found: no domain with matching name.\" "
"This error can be safely ignored."))
def recover_live_migration(self, ctxt, instance_ref, host=None, dest=None):
def recover_live_migration(self, ctxt, instance_ref, host=None,
dest=None, delete=True):
"""Recovers Instance/volume state from migrating -> running.
:param ctxt: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id
:param host: DB column value is updated by this hostname.
If none, the host instance currently running is selected.
:param dest:
This method is called from live migration src host.
This param specifies destination host.
:param delete:
If true, ask destination host to remove instance dir,
since empty disk image was created for block migration
"""
if not host:
host = instance_ref['host']
@@ -1101,6 +1129,15 @@ class ComputeManager(manager.SchedulerDependentManager):
if dest:
volume_api.remove_from_compute(ctxt, volume_id, dest)
# TODO: Block migration needs empty image at destination host
# before migration starts, so if any failure occurs,
# any empty images has to be deleted. but not sure adding below
# method is appropreate here. for now, admin has to delete manually.
# rpc.call(ctxt,
# self.db.queue_get_for(ctxt, FLAGS.compute_topic, dest),
# {"method": "self.driver.destroy",
# "args": {'instance':instance_ref})
def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval."""
error_list = super(ComputeManager, self).periodic_tasks(context)

View File

@@ -483,27 +483,6 @@ def instance_add_security_group(context, instance_id, security_group_id):
security_group_id)
def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id):
"""Get instances.vcpus by host and project."""
return IMPL.instance_get_vcpu_sum_by_host_and_project(context,
hostname,
proj_id)
def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id):
"""Get amount of memory by host and project."""
return IMPL.instance_get_memory_sum_by_host_and_project(context,
hostname,
proj_id)
def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id):
"""Get total amount of disk by host and project."""
return IMPL.instance_get_disk_sum_by_host_and_project(context,
hostname,
proj_id)
def instance_action_create(context, values):
"""Create an instance action from the values dictionary."""
return IMPL.instance_action_create(context, values)

View File

@@ -1040,45 +1040,6 @@ def instance_add_security_group(context, instance_id, security_group_id):
instance_ref.save(session=session)
@require_context
def instance_get_vcpu_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
filter_by(host=hostname).\
filter_by(project_id=proj_id).\
filter_by(deleted=False).\
value(func.sum(models.Instance.vcpus))
if not result:
return 0
return result
@require_context
def instance_get_memory_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
filter_by(host=hostname).\
filter_by(project_id=proj_id).\
filter_by(deleted=False).\
value(func.sum(models.Instance.memory_mb))
if not result:
return 0
return result
@require_context
def instance_get_disk_sum_by_host_and_project(context, hostname, proj_id):
session = get_session()
result = session.query(models.Instance).\
filter_by(host=hostname).\
filter_by(project_id=proj_id).\
filter_by(deleted=False).\
value(func.sum(models.Instance.local_gb))
if not result:
return 0
return result
@require_context
def instance_action_create(context, values):
"""Create an instance action from the values dictionary."""

View File

@@ -125,14 +125,14 @@ class ComputeNode(BASE, NovaBase):
'ComputeNode.service_id == Service.id,'
'ComputeNode.deleted == False)')
vcpus = Column(Integer, nullable=True)
memory_mb = Column(Integer, nullable=True)
local_gb = Column(Integer, nullable=True)
vcpus_used = Column(Integer, nullable=True)
memory_mb_used = Column(Integer, nullable=True)
local_gb_used = Column(Integer, nullable=True)
hypervisor_type = Column(Text, nullable=True)
hypervisor_version = Column(Integer, nullable=True)
vcpus = Column(Integer)
memory_mb = Column(Integer)
local_gb = Column(Integer)
vcpus_used = Column(Integer)
memory_mb_used = Column(Integer)
local_gb_used = Column(Integer)
hypervisor_type = Column(Text)
hypervisor_version = Column(Integer)
# Note(masumotok): Expected Strings example:
#

View File

@@ -211,6 +211,11 @@ class DestinationHypervisorTooOld(Invalid):
"has been provided.")
class DestinatioinDiskExists(Invalid):
message = _("The supplied disk path (%(path)s) already exists, "
"it is expected not to exist.")
class InvalidDevicePath(Invalid):
message = _("The supplied device path (%(path)s) is invalid.")

View File

@@ -77,7 +77,8 @@ class Scheduler(object):
"""Must override at least this method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
def schedule_live_migration(self, context, instance_id, dest):
def schedule_live_migration(self, context, instance_id, dest,
block_migration=False):
"""Live migration scheduling method.
:param context:
@@ -88,7 +89,6 @@ class Scheduler(object):
Then scheduler send request that host.
"""
# Whether instance exists and is running.
instance_ref = db.instance_get(context, instance_id)
@@ -96,10 +96,12 @@ class Scheduler(object):
self._live_migration_src_check(context, instance_ref)
# Checking destination host.
self._live_migration_dest_check(context, instance_ref, dest)
self._live_migration_dest_check(context, instance_ref,
dest, block_migration)
# Common checking.
self._live_migration_common_check(context, instance_ref, dest)
self._live_migration_common_check(context, instance_ref,
dest, block_migration)
# Changing instance_state.
db.instance_set_state(context,
@@ -147,7 +149,8 @@ class Scheduler(object):
if not self.service_is_up(services[0]):
raise exception.ComputeServiceUnavailable(host=src)
def _live_migration_dest_check(self, context, instance_ref, dest):
def _live_migration_dest_check(self, context, instance_ref, dest,
block_migration):
"""Live migration check routine (for destination host).
:param context: security context
@@ -175,9 +178,11 @@ class Scheduler(object):
# Checking dst host still has enough capacities.
self.assert_compute_node_has_enough_resources(context,
instance_ref,
dest)
dest,
block_migration)
def _live_migration_common_check(self, context, instance_ref, dest):
def _live_migration_common_check(self, context, instance_ref, dest,
block_migration):
"""Live migration common check routine.
Below checkings are followed by
@@ -186,11 +191,19 @@ class Scheduler(object):
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration if True, check for block_migration.
"""
# Checking shared storage connectivity
self.mounted_on_same_shared_storage(context, instance_ref, dest)
# if block migration, instances_paths should not be on shared storage.
try:
self.mounted_on_same_shared_storage(context, instance_ref, dest)
if block_migration:
raise
except rpc.RemoteError:
if not block_migration:
raise
# Checking dest exists.
dservice_refs = db.service_get_all_compute_by_host(context, dest)
@@ -229,14 +242,24 @@ class Scheduler(object):
"original host %(src)s.") % locals())
raise
def assert_compute_node_has_enough_resources(self, context,
instance_ref, dest):
def assert_compute_node_has_enough_resources(self, context, instance_ref,
dest, block_migration):
"""Checks if destination host has enough resource for live migration.
Currently, only memory checking has been done.
If storage migration(block migration, meaning live-migration
without any shared storage) will be available, local storage
checking is also necessary.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration: if True, disk checking has been done
"""
self.assert_compute_node_has_enough_memory(context, instance_ref, dest)
if not block_migration:
return
self.assert_compute_node_has_enough_disk(context, instance_ref, dest)
def assert_compute_node_has_enough_memory(self, context,
instance_ref, dest):
"""Checks if destination host has enough memory for live migration.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
@@ -244,22 +267,69 @@ class Scheduler(object):
"""
# Getting instance information
ec2_id = instance_ref['hostname']
# Getting total available memory and disk of host
avail = self._get_compute_info(context, dest, 'memory_mb')
# Getting host information
service_refs = db.service_get_all_compute_by_host(context, dest)
compute_node_ref = service_refs[0]['compute_node'][0]
# Getting total used memory and disk of host
# It should be sum of memories that are assigned as max value,
# because overcommiting is risky.
used = 0
instance_refs = db.instance_get_all_by_host(context, dest)
used_list = [i['memory_mb'] for i in instance_refs]
if used_list:
used = reduce(lambda x, y: x + y, used_list)
mem_total = int(compute_node_ref['memory_mb'])
mem_used = int(compute_node_ref['memory_mb_used'])
mem_avail = mem_total - mem_used
mem_inst = instance_ref['memory_mb']
if mem_avail <= mem_inst:
reason = _("Unable to migrate %(ec2_id)s to destination: %(dest)s "
"(host:%(mem_avail)s <= instance:%(mem_inst)s)")
avail = avail - used
if avail <= mem_inst:
ec2_id = instance_ref['hostname']
reason = _("Unable to migrate %(ec2_id)s to %(dest)s: Lack of "
"disk(host:%(avail)s <= instance:%(mem_inst)s)")
raise exception.MigrationError(reason=reason % locals())
def assert_compute_node_has_enough_disk(self, context,
instance_ref, dest):
"""Checks if destination host has enough disk for block migration.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
# Getting total available memory and disk of host
avail = self._get_compute_info(context, dest, 'local_gb')
# Getting total used memory and disk of host
# It should be sum of disks that are assigned as max value
# because overcommiting is risky.
used = 0
instance_refs = db.instance_get_all_by_host(context, dest)
used_list = [i['local_gb'] for i in instance_refs]
if used_list:
used = reduce(lambda x, y: x + y, used_list)
disk_inst = instance_ref['local_gb']
avail = avail - used
if avail <= disk_inst:
ec2_id = instance_ref['hostname']
reason = _("Unable to migrate %(ec2_id)s to %(dest)s: Lack of "
"disk(host:%(avail)s <= instance:%(disk_inst)s)")
raise exception.MigrationError(reason=reason % locals())
def _get_compute_info(self, context, host, key):
"""get compute node's infomation specified by key
:param context: security context
:param host: hostname(must be compute node)
:param key: column name of compute_nodes
:return: value specified by key
"""
compute_node_ref = db.service_get_all_compute_by_host(context, host)
compute_node_ref = compute_node_ref[0]['compute_node'][0]
return compute_node_ref[key]
def mounted_on_same_shared_storage(self, context, instance_ref, dest):
"""Check if the src and dest host mount same shared storage.

View File

@@ -97,7 +97,7 @@ class SchedulerManager(manager.Manager):
# NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
# Based on bexar design summit discussion,
# just put this here for bexar release.
def show_host_resources(self, context, host, *args):
def show_host_resources(self, context, host):
"""Shows the physical/usage resource given by hosts.
:param context: security context
@@ -105,43 +105,45 @@ class SchedulerManager(manager.Manager):
:returns:
example format is below.
{'resource':D, 'usage':{proj_id1:D, proj_id2:D}}
D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048}
D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048
'vcpus_used': 12, 'memory_mb': 10240,
'local_gb': 64}
"""
# Getting compute node info and related instances info
compute_ref = db.service_get_all_compute_by_host(context, host)
compute_ref = compute_ref[0]
# Getting physical resource information
compute_node_ref = compute_ref['compute_node'][0]
resource = {'vcpus': compute_node_ref['vcpus'],
'memory_mb': compute_node_ref['memory_mb'],
'local_gb': compute_node_ref['local_gb'],
'vcpus_used': compute_node_ref['vcpus_used'],
'memory_mb_used': compute_node_ref['memory_mb_used'],
'local_gb_used': compute_node_ref['local_gb_used']}
# Getting usage resource information
usage = {}
instance_refs = db.instance_get_all_by_host(context,
compute_ref['host'])
# Getting total available/used resource
compute_ref = compute_ref['compute_node'][0]
resource = {'vcpus': compute_ref['vcpus'],
'memory_mb': compute_ref['memory_mb'],
'local_gb': compute_ref['local_gb'],
'vcpus_used': compute_ref['vcpus_used'],
'memory_mb_used': compute_ref['memory_mb_used'],
'local_gb_used': compute_ref['local_gb_used']}
usage = dict()
if not instance_refs:
return {'resource': resource, 'usage': usage}
# Getting usage resource per project
project_ids = [i['project_id'] for i in instance_refs]
project_ids = list(set(project_ids))
for project_id in project_ids:
vcpus = db.instance_get_vcpu_sum_by_host_and_project(context,
host,
project_id)
mem = db.instance_get_memory_sum_by_host_and_project(context,
host,
project_id)
hdd = db.instance_get_disk_sum_by_host_and_project(context,
host,
project_id)
usage[project_id] = {'vcpus': int(vcpus),
'memory_mb': int(mem),
'local_gb': int(hdd)}
vcpus = [i['vcpus'] for i in instance_refs \
if i['project_id'] == project_id]
mem = [i['memory_mb'] for i in instance_refs \
if i['project_id'] == project_id]
disk = [i['local_gb'] for i in instance_refs \
if i['project_id'] == project_id]
usage[project_id] = {'vcpus': reduce(lambda x, y: x + y, vcpus),
'memory_mb': reduce(lambda x, y: x + y, mem),
'local_gb': reduce(lambda x, y: x + y, disk)}
return {'resource': resource, 'usage': usage}

View File

@@ -652,10 +652,13 @@ class SimpleDriverTestCase(test.TestCase):
self.mox.StubOutWithMock(driver_i, '_live_migration_dest_check')
self.mox.StubOutWithMock(driver_i, '_live_migration_common_check')
driver_i._live_migration_src_check(nocare, nocare)
driver_i._live_migration_dest_check(nocare, nocare, i_ref['host'])
driver_i._live_migration_common_check(nocare, nocare, i_ref['host'])
driver_i._live_migration_dest_check(nocare, nocare,
i_ref['host'], False)
driver_i._live_migration_common_check(nocare, nocare,
i_ref['host'], False)
self.mox.StubOutWithMock(rpc, 'cast', use_mock_anything=True)
kwargs = {'instance_id': instance_id, 'dest': i_ref['host']}
kwargs = {'instance_id': instance_id, 'dest': i_ref['host'],
'block_migration': False}
rpc.cast(self.context,
db.queue_get_for(nocare, FLAGS.compute_topic, i_ref['host']),
{"method": 'live_migration', "args": kwargs})
@@ -663,7 +666,8 @@ class SimpleDriverTestCase(test.TestCase):
self.mox.ReplayAll()
self.scheduler.live_migration(self.context, FLAGS.compute_topic,
instance_id=instance_id,
dest=i_ref['host'])
dest=i_ref['host'],
block_migration=False)
i_ref = db.instance_get(self.context, instance_id)
self.assertTrue(i_ref['state_description'] == 'migrating')
@@ -744,7 +748,7 @@ class SimpleDriverTestCase(test.TestCase):
self.assertRaises(exception.ComputeServiceUnavailable,
self.scheduler.driver._live_migration_dest_check,
self.context, i_ref, i_ref['host'])
self.context, i_ref, i_ref['host'], False)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
@@ -757,7 +761,7 @@ class SimpleDriverTestCase(test.TestCase):
self.assertRaises(exception.UnableToMigrateToSelf,
self.scheduler.driver._live_migration_dest_check,
self.context, i_ref, i_ref['host'])
self.context, i_ref, i_ref['host'], False)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
@@ -765,15 +769,33 @@ class SimpleDriverTestCase(test.TestCase):
def test_live_migration_dest_check_service_lack_memory(self):
"""Confirms exception raises when dest doesn't have enough memory."""
instance_id = self._create_instance()
instance_id2 = self._create_instance(host='somewhere',
memory_mb=12)
i_ref = db.instance_get(self.context, instance_id)
s_ref = self._create_compute_service(host='somewhere',
memory_mb_used=12)
s_ref = self._create_compute_service(host='somewhere')
self.assertRaises(exception.MigrationError,
self.scheduler.driver._live_migration_dest_check,
self.context, i_ref, 'somewhere')
self.context, i_ref, 'somewhere', False)
db.instance_destroy(self.context, instance_id)
db.instance_destroy(self.context, instance_id2)
db.service_destroy(self.context, s_ref['id'])
def test_block_migration_dest_check_service_lack_disk(self):
"""Confirms exception raises when dest doesn't have enough disk."""
instance_id = self._create_instance()
instance_id2 = self._create_instance(host='somewhere',
local_gb=70)
i_ref = db.instance_get(self.context, instance_id)
s_ref = self._create_compute_service(host='somewhere')
self.assertRaises(exception.MigrationError,
self.scheduler.driver._live_migration_dest_check,
self.context, i_ref, 'somewhere', True)
db.instance_destroy(self.context, instance_id)
db.instance_destroy(self.context, instance_id2)
db.service_destroy(self.context, s_ref['id'])
def test_live_migration_dest_check_service_works_correctly(self):
@@ -785,7 +807,8 @@ class SimpleDriverTestCase(test.TestCase):
ret = self.scheduler.driver._live_migration_dest_check(self.context,
i_ref,
'somewhere')
'somewhere',
False)
self.assertTrue(ret is None)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
@@ -820,7 +843,7 @@ class SimpleDriverTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.SourceHostUnavailable,
self.scheduler.driver._live_migration_common_check,
self.context, i_ref, dest)
self.context, i_ref, dest, False)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
@@ -844,7 +867,7 @@ class SimpleDriverTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.InvalidHypervisorType,
self.scheduler.driver._live_migration_common_check,
self.context, i_ref, dest)
self.context, i_ref, dest, False)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
@@ -870,7 +893,7 @@ class SimpleDriverTestCase(test.TestCase):
self.mox.ReplayAll()
self.assertRaises(exception.DestinationHypervisorTooOld,
self.scheduler.driver._live_migration_common_check,
self.context, i_ref, dest)
self.context, i_ref, dest, False)
db.instance_destroy(self.context, instance_id)
db.service_destroy(self.context, s_ref['id'])
@@ -902,7 +925,8 @@ class SimpleDriverTestCase(test.TestCase):
try:
self.scheduler.driver._live_migration_common_check(self.context,
i_ref,
dest)
dest,
False)
except rpc.RemoteError, e:
c = (e.message.find(_("doesn't have compatibility to")) >= 0)

View File

@@ -545,7 +545,8 @@ class ComputeTestCase(test.TestCase):
self.mox.StubOutWithMock(self.compute.driver, 'live_migration')
self.compute.driver.live_migration(c, i_ref, i_ref['host'],
self.compute.post_live_migration,
self.compute.recover_live_migration)
self.compute.recover_live_migration,
False)
self.compute.db = dbmock
self.mox.ReplayAll()
@@ -622,7 +623,8 @@ class ComputeTestCase(test.TestCase):
self.mox.StubOutWithMock(self.compute.driver, 'live_migration')
self.compute.driver.live_migration(c, i_ref, i_ref['host'],
self.compute.post_live_migration,
self.compute.recover_live_migration)
self.compute.recover_live_migration,
False)
self.compute.db = dbmock
self.mox.ReplayAll()

View File

@@ -20,6 +20,7 @@ import os
import re
import shutil
import sys
import tempfile
from xml.etree.ElementTree import fromstring as xml_to_tree
from xml.dom.minidom import parseString as xml_to_dom
@@ -674,6 +675,95 @@ class LibvirtConnTestCase(test.TestCase):
db.volume_destroy(self.context, volume_ref['id'])
db.instance_destroy(self.context, instance_ref['id'])
def test_pre_block_migration_works_correctly(self):
"""Confirms pre_block_migration works correctly."""
# Skip if non-libvirt environment
if not self.lazy_load_library_exists():
return
# Replace instances_path since this testcase creates tmpfile
tmpdir = tempfile.mkdtemp()
store = FLAGS.instances_path
FLAGS.instances_path = tmpdir
# Test data
instance_ref = db.instance_create(self.context, self.test_instance)
dummyjson = '[{"path": "%s/disk", "local_gb": 10, "type": "raw"}]'
# Preparing mocks
# qemu-img should be mockd since test environment might not have
# large disk space.
self.mox.StubOutWithMock(utils, "execute")
utils.execute('sudo', 'qemu-img', 'create', '-f', 'raw',
'%s/%s/disk' % (tmpdir, instance_ref.name), 10)
self.mox.ReplayAll()
conn = connection.LibvirtConnection(False)
conn.pre_block_migration(self.context, instance_ref,
dummyjson % tmpdir)
self.assertTrue(os.path.exists('%s/%s/libvirt.xml' %
(tmpdir, instance_ref.name)))
shutil.rmtree(tmpdir)
db.instance_destroy(self.context, instance_ref['id'])
# Restore FLAGS.instances_path
FLAGS.instances_path = store
def test_get_instance_disk_info_works_correctly(self):
"""Confirms pre_block_migration works correctly."""
# Skip if non-libvirt environment
if not self.lazy_load_library_exists():
return
# Test data
instance_ref = db.instance_create(self.context, self.test_instance)
dummyxml = ("<domain type='kvm'><name>instance-0000000a</name>"
"<devices>"
"<disk type='file'><driver type='raw'/>"
"<source file='/test/disk'/>"
"<target dev='vda' bus='virtio'/></disk>"
"<disk type='file'><driver type='qcow2'/>"
"<source file='/test/disk.local'/>"
"<target dev='vdb' bus='virtio'/></disk>"
"</devices></domain>")
ret = ("image: /test/disk\nfile format: raw\n"
"virtual size: 20G (21474836480 bytes)\ndisk size: 3.1G\n")
# Preparing mocks
vdmock = self.mox.CreateMock(libvirt.virDomain)
self.mox.StubOutWithMock(vdmock, "XMLDesc")
vdmock.XMLDesc(0).AndReturn(dummyxml)
def fake_lookup(instance_name):
if instance_name == instance_ref.name:
return vdmock
self.create_fake_libvirt_mock(lookupByName=fake_lookup)
self.mox.StubOutWithMock(os.path, "getsize")
# based on above testdata, one is raw image, so getsize is mocked.
os.path.getsize("/test/disk").AndReturn(10 * 1024 * 1024 * 1024)
# another is qcow image, so qemu-img should be mocked.
self.mox.StubOutWithMock(utils, "execute")
utils.execute('sudo', 'qemu-img', 'info', '/test/disk.local').\
AndReturn((ret, ''))
self.mox.ReplayAll()
conn = connection.LibvirtConnection(False)
info = conn.get_instance_disk_info(self.context, instance_ref)
info = utils.loads(info)
self.assertTrue(info[0]['type'] == 'raw' and
info[1]['type'] == 'qcow2' and
info[0]['path'] == '/test/disk' and
info[1]['path'] == '/test/disk.local' and
info[0]['local_gb'] == 10 and
info[1]['local_gb'] == 20)
db.instance_destroy(self.context, instance_ref['id'])
def test_spawn_with_network_info(self):
# Skip if non-libvirt environment
if not self.lazy_load_library_exists():

View File

@@ -117,6 +117,10 @@ flags.DEFINE_string('live_migration_uri',
flags.DEFINE_string('live_migration_flag',
"VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER",
'Define live migration behavior.')
flags.DEFINE_string('block_migration_flag',
"VIR_MIGRATE_UNDEFINE_SOURCE, VIR_MIGRATE_PEER2PEER, "
"VIR_MIGRATE_NON_SHARED_DISK",
'Define block migration behavior.')
flags.DEFINE_integer('live_migration_bandwidth', 0,
'Define live migration behavior')
flags.DEFINE_string('qemu_img', 'qemu-img',
@@ -727,6 +731,7 @@ class LibvirtConnection(driver.ComputeDriver):
If cow is True, it will make a CoW image instead of a copy.
"""
if not os.path.exists(target):
base_dir = os.path.join(FLAGS.instances_path, '_base')
if not os.path.exists(base_dir):
@@ -1458,7 +1463,7 @@ class LibvirtConnection(driver.ComputeDriver):
time.sleep(1)
def live_migration(self, ctxt, instance_ref, dest,
post_method, recover_method):
post_method, recover_method, block_migration=False):
"""Spawning live_migration operation for distributing high-load.
:params ctxt: security context
@@ -1466,20 +1471,22 @@ class LibvirtConnection(driver.ComputeDriver):
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:params dest: destination host
:params block_migration: destination host
:params post_method:
post operation method.
expected nova.compute.manager.post_live_migration.
:params recover_method:
recovery method when any exception occurs.
expected nova.compute.manager.recover_live_migration.
:params block_migration: if true, do block migration.
"""
greenthread.spawn(self._live_migration, ctxt, instance_ref, dest,
post_method, recover_method)
post_method, recover_method, block_migration)
def _live_migration(self, ctxt, instance_ref, dest,
post_method, recover_method):
def _live_migration(self, ctxt, instance_ref, dest, post_method,
recover_method, block_migration=False):
"""Do live migration.
:params ctxt: security context
@@ -1498,24 +1505,18 @@ class LibvirtConnection(driver.ComputeDriver):
# Do live migration.
try:
flaglist = FLAGS.live_migration_flag.split(',')
if block_migration:
flaglist = FLAGS.block_migration_flag.split(',')
else:
flaglist = FLAGS.live_migration_flag.split(',')
flagvals = [getattr(libvirt, x.strip()) for x in flaglist]
logical_sum = reduce(lambda x, y: x | y, flagvals)
if self.read_only:
tmpconn = self._connect(self.libvirt_uri, False)
dom = tmpconn.lookupByName(instance_ref.name)
dom.migrateToURI(FLAGS.live_migration_uri % dest,
logical_sum,
None,
FLAGS.live_migration_bandwidth)
tmpconn.close()
else:
dom = self._conn.lookupByName(instance_ref.name)
dom.migrateToURI(FLAGS.live_migration_uri % dest,
logical_sum,
None,
FLAGS.live_migration_bandwidth)
dom = self._conn.lookupByName(instance_ref.name)
dom.migrateToURI(FLAGS.live_migration_uri % dest,
logical_sum,
None,
FLAGS.live_migration_bandwidth)
except Exception:
recover_method(ctxt, instance_ref, dest=dest)
@@ -1530,11 +1531,92 @@ class LibvirtConnection(driver.ComputeDriver):
self.get_info(instance_ref.name)['state']
except exception.NotFound:
timer.stop()
post_method(ctxt, instance_ref, dest)
post_method(ctxt, instance_ref, dest, block_migration)
timer.f = wait_for_live_migration
timer.start(interval=0.5, now=True)
def pre_block_migration(self, ctxt, instance_ref, disk_info_json):
"""Preparation block migration.
:params ctxt: security context
:params instance_ref:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:params disk_info_json:
json strings specified in get_instance_disk_info
"""
disk_info = utils.loads(disk_info_json)
# make instance directory
instance_dir = os.path.join(FLAGS.instances_path, instance_ref['name'])
if os.path.exists(instance_dir):
raise exception.DestinatioinDiskExists(path=instance_dir)
os.mkdir(instance_dir)
for disk in disk_info:
base = os.path.basename(disk['path'])
# Get image type and create empty disk image.
instance_disk = os.path.join(instance_dir, base)
utils.execute('sudo', 'qemu-img', 'create', '-f', disk['type'],
instance_disk, str(disk['local_gb'])+'G')
# block migration does not migrate libvirt.xml,
# to avoid any confusion of admins, create it now.
xml = self.to_xml(instance_ref)
f = open(os.path.join(instance_dir, 'libvirt.xml'), 'w+')
f.write(xml)
f.close()
def get_instance_disk_info(self, ctxt, instance_ref):
"""Preparation block migration.
:params ctxt: security context
:params instance_ref:
nova.db.sqlalchemy.models.Instance object
instance object that is migrated.
:return:
json strings with below format.
"[{'path':'disk', 'type':'raw', 'local_gb':10},...]"
"""
disk_info = []
virt_dom = self._lookup_by_name(instance_ref.name)
xml = virt_dom.XMLDesc(0)
doc = libxml2.parseDoc(xml)
disk_nodes = doc.xpathEval('//devices/disk')
path_nodes = doc.xpathEval('//devices/disk/source')
driver_nodes = doc.xpathEval('//devices/disk/driver')
for cnt, path_node in enumerate(path_nodes):
disk_type = disk_nodes[cnt].get_properties().getContent()
path = path_node.get_properties().getContent()
if disk_type != 'file':
LOG.debug(_('skipping %(path)s since it looks like volume') %
locals())
continue
# xml is generated by kvm, so format is slightly different
# from libvirt.xml that nova generated.
#disk_type = driver_nodes[cnt].get_properties().getContent()
disk_type = \
driver_nodes[cnt].get_properties().get_next().getContent()
if disk_type == 'raw':
size = int(os.path.getsize(path)) / 1024 / 1024 / 1024
else:
out, err = utils.execute('sudo', 'qemu-img', 'info', path)
size = [i.split('(')[1].split()[0] for i in out.split('\n')
if i.strip().find('virtual size') >= 0]
size = int(size[0]) / 1024 / 1024 / 1024
disk_info.append({'type': disk_type, 'path': path,
'local_gb': size})
return utils.dumps(disk_info)
def unfilter_instance(self, instance_ref):
"""See comments of same method in firewall_driver."""
self.firewall_driver.unfilter_instance(instance_ref)