merged to rev 561 and fixed based on reviewer's comment
This commit is contained in:
@@ -62,6 +62,7 @@ import time
|
||||
|
||||
import IPy
|
||||
|
||||
|
||||
# If ../nova/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
possible_topdir = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
@@ -468,18 +469,20 @@ class InstanceCommands(object):
|
||||
def live_migration(self, ec2_id, dest):
|
||||
"""live_migration"""
|
||||
|
||||
if FLAGS.connection_type != 'libvirt':
|
||||
raise exception.Error('Only KVM is supported for now. '
|
||||
'Sorry.')
|
||||
|
||||
if FLAGS.volume_driver != 'nova.volume.driver.AOEDriver':
|
||||
raise exception.Error('Only AOEDriver is supported for now. '
|
||||
'Sorry.')
|
||||
|
||||
logging.basicConfig()
|
||||
ctxt = context.get_admin_context()
|
||||
instance_id = cloud.ec2_id_to_id(ec2_id)
|
||||
|
||||
if FLAGS.connection_type != 'libvirt':
|
||||
msg = _('Only KVM is supported for now. Sorry!')
|
||||
raise exception.Error(msg)
|
||||
|
||||
if FLAGS.volume_driver != 'nova.volume.driver.AOEDriver':
|
||||
instance_ref = db.instance_get(instance_id)
|
||||
if len(instance_ref['volumes']) != 0:
|
||||
msg = _(("""Volumes attached by ISCSIDriver"""
|
||||
""" are not supported. Sorry!"""))
|
||||
raise exception.Error(msg)
|
||||
|
||||
rpc.call(ctxt,
|
||||
FLAGS.scheduler_topic,
|
||||
{"method": "live_migration",
|
||||
@@ -501,16 +504,15 @@ class HostCommands(object):
|
||||
# To supress msg: No handlers could be found for logger "amqplib"
|
||||
logging.basicConfig()
|
||||
|
||||
host_refs = db.host_get_all(context.get_admin_context())
|
||||
for host_ref in host_refs:
|
||||
print host_ref['name']
|
||||
service_refs = db.service_get_all(context.get_admin_context())
|
||||
hosts = [ h['host'] for h in service_refs]
|
||||
hosts = list(set(hosts))
|
||||
for host in hosts:
|
||||
print host
|
||||
|
||||
def show(self, host):
|
||||
"""describe cpu/memory/hdd info for host."""
|
||||
|
||||
# To supress msg: No handlers could be found for logger "amqplib"
|
||||
logging.basicConfig()
|
||||
|
||||
result = rpc.call(context.get_admin_context(),
|
||||
FLAGS.scheduler_topic,
|
||||
{"method": "show_host_resource",
|
||||
|
||||
@@ -75,75 +75,14 @@ class Scheduler(object):
|
||||
instance_ref = db.instance_get(context, instance_id)
|
||||
ec2_id = instance_ref['hostname']
|
||||
|
||||
# Checking instance state.
|
||||
if power_state.RUNNING != instance_ref['state'] or \
|
||||
'running' != instance_ref['state_description']:
|
||||
msg = _('Instance(%s) is not running')
|
||||
raise exception.Invalid(msg % ec2_id)
|
||||
# Checking instance.
|
||||
self._live_migration_src_check(context, instance_ref)
|
||||
|
||||
# Checking destination host exists
|
||||
dhost_ref = db.host_get_by_name(context, dest)
|
||||
# Checking destination host.
|
||||
self._live_migration_dest_check(context, instance_ref, dest)
|
||||
|
||||
# Checking whether The host where instance is running
|
||||
# and dest is not same.
|
||||
src = instance_ref['host']
|
||||
if dest == src:
|
||||
msg = _('%s is where %s is running now. choose other host.')
|
||||
raise exception.Invalid(msg % (dest, ec2_id))
|
||||
|
||||
# Checking dest is compute node.
|
||||
services = db.service_get_all_by_topic(context, 'compute')
|
||||
if dest not in [service.host for service in services]:
|
||||
msg = _('%s must be compute node')
|
||||
raise exception.Invalid(msg % dest)
|
||||
|
||||
# Checking dest host is alive.
|
||||
service = [service for service in services if service.host == dest]
|
||||
service = service[0]
|
||||
if not self.service_is_up(service):
|
||||
msg = _('%s is not alive(time synchronize problem?)')
|
||||
raise exception.Invalid(msg % dest)
|
||||
|
||||
# NOTE(masumotok): Below pre-checkings are followed by
|
||||
# http://wiki.libvirt.org/page/TodoPreMigrationChecks
|
||||
|
||||
# Checking hypervisor is same.
|
||||
orighost = instance_ref['launched_on']
|
||||
ohost_ref = db.host_get_by_name(context, orighost)
|
||||
|
||||
otype = ohost_ref['hypervisor_type']
|
||||
dtype = dhost_ref['hypervisor_type']
|
||||
if otype != dtype:
|
||||
msg = _('Different hypervisor type(%s->%s)')
|
||||
raise exception.Invalid(msg % (otype, dtype))
|
||||
|
||||
# Checkng hypervisor version.
|
||||
oversion = ohost_ref['hypervisor_version']
|
||||
dversion = dhost_ref['hypervisor_version']
|
||||
if oversion > dversion:
|
||||
msg = _('Older hypervisor version(%s->%s)')
|
||||
raise exception.Invalid(msg % (oversion, dversion))
|
||||
|
||||
# Checking cpuinfo.
|
||||
cpuinfo = ohost_ref['cpu_info']
|
||||
if str != type(cpuinfo):
|
||||
msg = _('Unexpected err: not found cpu_info for %s on DB.hosts')
|
||||
raise exception.Invalid(msg % orighost)
|
||||
|
||||
try:
|
||||
rpc.call(context,
|
||||
db.queue_get_for(context, FLAGS.compute_topic, dest),
|
||||
{"method": 'compare_cpu',
|
||||
"args": {'xml': cpuinfo}})
|
||||
|
||||
except rpc.RemoteError, e:
|
||||
msg = '%s doesnt have compatibility to %s(where %s launching at)\n'
|
||||
msg += 'result:%s \n'
|
||||
logging.error(_(msg) % (dest, src, ec2_id, ret))
|
||||
raise e
|
||||
|
||||
# Checking dst host still has enough capacities.
|
||||
self.has_enough_resource(context, instance_id, dest)
|
||||
# Common checking.
|
||||
self._live_migration_common_check(context, instance_ref, dest)
|
||||
|
||||
# Changing instance_state.
|
||||
db.instance_set_state(context,
|
||||
@@ -152,33 +91,143 @@ class Scheduler(object):
|
||||
'migrating')
|
||||
|
||||
# Changing volume state
|
||||
try:
|
||||
for vol in db.volume_get_all_by_instance(context, instance_id):
|
||||
db.volume_update(context,
|
||||
vol['id'],
|
||||
{'status': 'migrating'})
|
||||
except exception.NotFound:
|
||||
pass
|
||||
for v in instance_ref['volumes']:
|
||||
db.volume_update(context,
|
||||
v['id'],
|
||||
{'status': 'migrating'})
|
||||
|
||||
# Return value is necessary to send request to src
|
||||
# Check _schedule() in detail.
|
||||
src = instance_ref['host']
|
||||
return src
|
||||
|
||||
def has_enough_resource(self, context, instance_id, dest):
|
||||
def _live_migration_src_check(self, context, instance_ref):
|
||||
"""Live migration check routine (for src host)"""
|
||||
|
||||
# Checking instance is running.
|
||||
if power_state.RUNNING != instance_ref['state'] or \
|
||||
'running' != instance_ref['state_description']:
|
||||
msg = _('Instance(%s) is not running')
|
||||
ec2_id = instance_ref['hostname']
|
||||
raise exception.Invalid(msg % ec2_id)
|
||||
|
||||
# Checing volume node is running when any volumes are mounted to the instance.
|
||||
if len(instance_ref['volumes']) != 0:
|
||||
services = db.service_get_all_by_topic(context, 'volume')
|
||||
if len(services) < 1 or not self.service_is_up(services[0]):
|
||||
msg = _('volume node is not alive(time synchronize problem?)')
|
||||
raise exception.Invalid(msg)
|
||||
|
||||
# Checking src host is alive.
|
||||
src = instance_ref['host']
|
||||
services = db.service_get_all_by_topic(context, 'compute')
|
||||
services = [service for service in services if service.host == src]
|
||||
if len(services) < 1 or not self.service_is_up(services[0]):
|
||||
msg = _('%s is not alive(time synchronize problem?)')
|
||||
raise exception.Invalid(msg % src)
|
||||
|
||||
|
||||
def _live_migration_dest_check(self, context, instance_ref, dest):
|
||||
"""Live migration check routine (for destination host)"""
|
||||
|
||||
# Checking dest exists and compute node.
|
||||
dservice_refs = db.service_get_all_by_host(context, dest)
|
||||
if len(dservice_refs) <= 0 :
|
||||
msg = _('%s does not exists.')
|
||||
raise exception.Invalid(msg % dest)
|
||||
|
||||
dservice_ref = dservice_refs[0]
|
||||
if dservice_ref['topic'] != 'compute':
|
||||
msg = _('%s must be compute node')
|
||||
raise exception.Invalid(msg % dest)
|
||||
|
||||
# Checking dest host is alive.
|
||||
if not self.service_is_up(dservice_ref):
|
||||
msg = _('%s is not alive(time synchronize problem?)')
|
||||
raise exception.Invalid(msg % dest)
|
||||
|
||||
# Checking whether The host where instance is running
|
||||
# and dest is not same.
|
||||
src = instance_ref['host']
|
||||
if dest == src:
|
||||
ec2_id = instance_ref['hostname']
|
||||
msg = _('%s is where %s is running now. choose other host.')
|
||||
raise exception.Invalid(msg % (dest, ec2_id))
|
||||
|
||||
# Checking dst host still has enough capacities.
|
||||
self.has_enough_resource(context, instance_ref, dest)
|
||||
|
||||
def _live_migration_common_check(self, context, instance_ref, dest):
|
||||
"""
|
||||
Live migration check routine.
|
||||
Below pre-checkings are followed by
|
||||
http://wiki.libvirt.org/page/TodoPreMigrationChecks
|
||||
|
||||
"""
|
||||
|
||||
# Checking dest exists.
|
||||
dservice_refs = db.service_get_all_by_host(context, dest)
|
||||
if len(dservice_refs) <= 0 :
|
||||
msg = _('%s does not exists.')
|
||||
raise exception.Invalid(msg % dest)
|
||||
dservice_ref = dservice_refs[0]
|
||||
|
||||
# Checking original host( where instance was launched at) exists.
|
||||
orighost = instance_ref['launched_on']
|
||||
oservice_refs = db.service_get_all_by_host(context, orighost)
|
||||
if len(oservice_refs) <= 0 :
|
||||
msg = _('%s(where instance was launched at) does not exists.')
|
||||
raise exception.Invalid(msg % orighost)
|
||||
oservice_ref = oservice_refs[0]
|
||||
|
||||
# Checking hypervisor is same.
|
||||
otype = oservice_ref['hypervisor_type']
|
||||
dtype = dservice_ref['hypervisor_type']
|
||||
if otype != dtype:
|
||||
msg = _('Different hypervisor type(%s->%s)')
|
||||
raise exception.Invalid(msg % (otype, dtype))
|
||||
|
||||
# Checkng hypervisor version.
|
||||
oversion = oservice_ref['hypervisor_version']
|
||||
dversion = dservice_ref['hypervisor_version']
|
||||
if oversion > dversion:
|
||||
msg = _('Older hypervisor version(%s->%s)')
|
||||
raise exception.Invalid(msg % (oversion, dversion))
|
||||
|
||||
# Checking cpuinfo.
|
||||
cpu_info = oservice_ref['cpu_info']
|
||||
try:
|
||||
rpc.call(context,
|
||||
db.queue_get_for(context, FLAGS.compute_topic, dest),
|
||||
{"method": 'compare_cpu',
|
||||
"args": {'cpu_info': cpu_info}})
|
||||
|
||||
except rpc.RemoteError, e:
|
||||
msg = _('%s doesnt have compatibility to %s(where %s launching at)')
|
||||
ec2_id = instance_ref['hostname']
|
||||
src = instance_ref['host']
|
||||
logging.error(msg % (dest, src, ec2_id))
|
||||
raise e
|
||||
|
||||
def has_enough_resource(self, context, instance_ref, dest):
|
||||
""" Check if destination host has enough resource for live migration"""
|
||||
|
||||
# Getting instance information
|
||||
instance_ref = db.instance_get(context, instance_id)
|
||||
ec2_id = instance_ref['hostname']
|
||||
vcpus = instance_ref['vcpus']
|
||||
mem = instance_ref['memory_mb']
|
||||
hdd = instance_ref['local_gb']
|
||||
|
||||
# Gettin host information
|
||||
host_ref = db.host_get_by_name(context, dest)
|
||||
total_cpu = int(host_ref['vcpus'])
|
||||
total_mem = int(host_ref['memory_mb'])
|
||||
total_hdd = int(host_ref['local_gb'])
|
||||
service_refs = db.service_get_all_by_host(context, dest)
|
||||
if len(service_refs) <= 0 :
|
||||
msg = _('%s does not exists.')
|
||||
raise exception.Invalid(msg % dest)
|
||||
service_ref = service_refs[0]
|
||||
|
||||
total_cpu = int(service_ref['vcpus'])
|
||||
total_mem = int(service_ref['memory_mb'])
|
||||
total_hdd = int(service_ref['local_gb'])
|
||||
|
||||
instances_ref = db.instance_get_all_by_host(context, dest)
|
||||
for i_ref in instances_ref:
|
||||
@@ -196,4 +245,4 @@ class Scheduler(object):
|
||||
msg = '%s doesnt have enough resource for %s' % (dest, ec2_id)
|
||||
raise exception.NotEmpty(msg)
|
||||
|
||||
logging.debug(_('%s has enough resource for %s') % (dest, ec2_id))
|
||||
logging.debug(_('%s has_enough_resource() for %s') % (dest, ec2_id))
|
||||
|
||||
@@ -76,20 +76,27 @@ class SchedulerManager(manager.Manager):
|
||||
""" show the physical/usage resource given by hosts."""
|
||||
|
||||
try:
|
||||
host_ref = db.host_get_by_name(context, host)
|
||||
services = db.service_get_all_by_host(context, host)
|
||||
except exception.NotFound:
|
||||
return {'ret': False, 'msg': 'No such Host'}
|
||||
except:
|
||||
raise
|
||||
|
||||
compute = [ s for s in services if s['topic'] == 'compute']
|
||||
if 0 == len(compute):
|
||||
service_ref = services[0]
|
||||
else:
|
||||
service_ref = compute[0]
|
||||
|
||||
# Getting physical resource information
|
||||
h_resource = {'vcpus': host_ref['vcpus'],
|
||||
'memory_mb': host_ref['memory_mb'],
|
||||
'local_gb': host_ref['local_gb']}
|
||||
h_resource = {'vcpus': service_ref['vcpus'],
|
||||
'memory_mb': service_ref['memory_mb'],
|
||||
'local_gb': service_ref['local_gb']}
|
||||
|
||||
# Getting usage resource information
|
||||
u_resource = {}
|
||||
instances_ref = db.instance_get_all_by_host(context, host_ref['name'])
|
||||
instances_ref = db.instance_get_all_by_host(context,
|
||||
service_ref['host'])
|
||||
|
||||
if 0 == len(instances_ref):
|
||||
return {'ret': True, 'phy_resource': h_resource, 'usage': {}}
|
||||
@@ -98,11 +105,11 @@ class SchedulerManager(manager.Manager):
|
||||
project_ids = list(set(project_ids))
|
||||
for p_id in project_ids:
|
||||
vcpus = db.instance_get_vcpu_sum_by_host_and_project(context,
|
||||
host,
|
||||
p_id)
|
||||
host,
|
||||
p_id)
|
||||
mem = db.instance_get_memory_sum_by_host_and_project(context,
|
||||
host,
|
||||
p_id)
|
||||
host,
|
||||
p_id)
|
||||
hdd = db.instance_get_disk_sum_by_host_and_project(context,
|
||||
host,
|
||||
p_id)
|
||||
|
||||
@@ -81,12 +81,6 @@ class Service(object):
|
||||
self.model_disconnected = False
|
||||
ctxt = context.get_admin_context()
|
||||
|
||||
try:
|
||||
host_ref = db.host_get_by_name(ctxt, self.host)
|
||||
except exception.NotFound:
|
||||
host_ref = db.host_create(ctxt, {'name': self.host})
|
||||
host_ref = self._update_host_ref(ctxt, host_ref)
|
||||
|
||||
try:
|
||||
service_ref = db.service_get_by_args(ctxt,
|
||||
self.host,
|
||||
@@ -95,6 +89,9 @@ class Service(object):
|
||||
except exception.NotFound:
|
||||
self._create_service_ref(ctxt)
|
||||
|
||||
if 'nova-compute' == self.binary:
|
||||
self.manager.update_service(ctxt, self.host, self.binary)
|
||||
|
||||
conn1 = rpc.Connection.instance(new=True)
|
||||
conn2 = rpc.Connection.instance(new=True)
|
||||
if self.report_interval:
|
||||
@@ -129,26 +126,6 @@ class Service(object):
|
||||
'availability_zone': zone})
|
||||
self.service_id = service_ref['id']
|
||||
|
||||
def _update_host_ref(self, context, host_ref):
|
||||
|
||||
if 0 <= self.manager_class_name.find('ComputeManager'):
|
||||
vcpu = self.manager.driver.get_vcpu_number()
|
||||
memory_mb = self.manager.driver.get_memory_mb()
|
||||
local_gb = self.manager.driver.get_local_gb()
|
||||
hypervisor = self.manager.driver.get_hypervisor_type()
|
||||
version = self.manager.driver.get_hypervisor_version()
|
||||
cpu_xml = self.manager.driver.get_cpu_xml()
|
||||
|
||||
db.host_update(context,
|
||||
host_ref['id'],
|
||||
{'vcpus': vcpu,
|
||||
'memory_mb': memory_mb,
|
||||
'local_gb': local_gb,
|
||||
'hypervisor_type': hypervisor,
|
||||
'hypervisor_version': version,
|
||||
'cpu_info': cpu_xml})
|
||||
return host_ref
|
||||
|
||||
def __getattr__(self, key):
|
||||
manager = self.__dict__.get('manager', None)
|
||||
return getattr(manager, key)
|
||||
|
||||
Reference in New Issue
Block a user