Merged trunk

This commit is contained in:
Ed Leafe
2011-08-15 17:11:58 -05:00
15 changed files with 829 additions and 170 deletions

View File

@@ -30,6 +30,7 @@ from nova import log as logging
from nova import rpc
from nova import utils
from nova.compute import power_state
from nova.api.ec2 import ec2utils
FLAGS = flags.FLAGS
@@ -78,7 +79,8 @@ class Scheduler(object):
"""Must override at least this method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
def schedule_live_migration(self, context, instance_id, dest):
def schedule_live_migration(self, context, instance_id, dest,
block_migration=False):
"""Live migration scheduling method.
:param context:
@@ -87,9 +89,7 @@ class Scheduler(object):
:return:
The host where instance is running currently.
Then scheduler send request that host.
"""
# Whether instance exists and is running.
instance_ref = db.instance_get(context, instance_id)
@@ -97,10 +97,11 @@ class Scheduler(object):
self._live_migration_src_check(context, instance_ref)
# Checking destination host.
self._live_migration_dest_check(context, instance_ref, dest)
self._live_migration_dest_check(context, instance_ref,
dest, block_migration)
# Common checking.
self._live_migration_common_check(context, instance_ref, dest)
self._live_migration_common_check(context, instance_ref,
dest, block_migration)
# Changing instance_state.
db.instance_set_state(context,
@@ -130,7 +131,8 @@ class Scheduler(object):
# Checking instance is running.
if (power_state.RUNNING != instance_ref['state'] or \
'running' != instance_ref['state_description']):
raise exception.InstanceNotRunning(instance_id=instance_ref['id'])
instance_id = ec2utils.id_to_ec2_id(instance_ref['id'])
raise exception.InstanceNotRunning(instance_id=instance_id)
# Checing volume node is running when any volumes are mounted
# to the instance.
@@ -147,7 +149,8 @@ class Scheduler(object):
if not self.service_is_up(services[0]):
raise exception.ComputeServiceUnavailable(host=src)
def _live_migration_dest_check(self, context, instance_ref, dest):
def _live_migration_dest_check(self, context, instance_ref, dest,
block_migration):
"""Live migration check routine (for destination host).
:param context: security context
@@ -168,16 +171,18 @@ class Scheduler(object):
# and dest is not same.
src = instance_ref['host']
if dest == src:
raise exception.UnableToMigrateToSelf(
instance_id=instance_ref['id'],
host=dest)
instance_id = ec2utils.id_to_ec2_id(instance_ref['id'])
raise exception.UnableToMigrateToSelf(instance_id=instance_id,
host=dest)
# Checking dst host still has enough capacities.
self.assert_compute_node_has_enough_resources(context,
instance_ref,
dest)
dest,
block_migration)
def _live_migration_common_check(self, context, instance_ref, dest):
def _live_migration_common_check(self, context, instance_ref, dest,
block_migration):
"""Live migration common check routine.
Below checkings are followed by
@@ -186,11 +191,26 @@ class Scheduler(object):
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration if True, check for block_migration.
"""
# Checking shared storage connectivity
self.mounted_on_same_shared_storage(context, instance_ref, dest)
# if block migration, instances_paths should not be on shared storage.
try:
self.mounted_on_same_shared_storage(context, instance_ref, dest)
if block_migration:
reason = _("Block migration can not be used "
"with shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
except exception.FileNotFound:
if not block_migration:
src = instance_ref['host']
ipath = FLAGS.instances_path
logging.error(_("Cannot confirm tmpfile at %(ipath)s is on "
"same shared storage between %(src)s "
"and %(dest)s.") % locals())
raise
# Checking dest exists.
dservice_refs = db.service_get_all_compute_by_host(context, dest)
@@ -229,14 +249,26 @@ class Scheduler(object):
"original host %(src)s.") % locals())
raise
def assert_compute_node_has_enough_resources(self, context,
instance_ref, dest):
def assert_compute_node_has_enough_resources(self, context, instance_ref,
dest, block_migration):
"""Checks if destination host has enough resource for live migration.
Currently, only memory checking has been done.
If storage migration(block migration, meaning live-migration
without any shared storage) will be available, local storage
checking is also necessary.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
:param block_migration: if True, disk checking has been done
"""
self.assert_compute_node_has_enough_memory(context, instance_ref, dest)
if not block_migration:
return
self.assert_compute_node_has_enough_disk(context, instance_ref, dest)
def assert_compute_node_has_enough_memory(self, context,
instance_ref, dest):
"""Checks if destination host has enough memory for live migration.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
@@ -244,23 +276,70 @@ class Scheduler(object):
"""
# Getting instance information
hostname = instance_ref['hostname']
# Getting total available memory and disk of host
avail = self._get_compute_info(context, dest, 'memory_mb')
# Getting host information
service_refs = db.service_get_all_compute_by_host(context, dest)
compute_node_ref = service_refs[0]['compute_node'][0]
# Getting total used memory and disk of host
# It should be sum of memories that are assigned as max value,
# because overcommiting is risky.
used = 0
instance_refs = db.instance_get_all_by_host(context, dest)
used_list = [i['memory_mb'] for i in instance_refs]
if used_list:
used = reduce(lambda x, y: x + y, used_list)
mem_total = int(compute_node_ref['memory_mb'])
mem_used = int(compute_node_ref['memory_mb_used'])
mem_avail = mem_total - mem_used
mem_inst = instance_ref['memory_mb']
if mem_avail <= mem_inst:
reason = _("Unable to migrate %(hostname)s to destination: "
"%(dest)s (host:%(mem_avail)s <= instance:"
"%(mem_inst)s)")
avail = avail - used
if avail <= mem_inst:
instance_id = ec2utils.id_to_ec2_id(instance_ref['id'])
reason = _("Unable to migrate %(instance_id)s to %(dest)s: "
"Lack of disk(host:%(avail)s <= instance:%(mem_inst)s)")
raise exception.MigrationError(reason=reason % locals())
def assert_compute_node_has_enough_disk(self, context,
instance_ref, dest):
"""Checks if destination host has enough disk for block migration.
:param context: security context
:param instance_ref: nova.db.sqlalchemy.models.Instance object
:param dest: destination host
"""
# Getting total available memory and disk of host
avail = self._get_compute_info(context, dest, 'local_gb')
# Getting total used memory and disk of host
# It should be sum of disks that are assigned as max value
# because overcommiting is risky.
used = 0
instance_refs = db.instance_get_all_by_host(context, dest)
used_list = [i['local_gb'] for i in instance_refs]
if used_list:
used = reduce(lambda x, y: x + y, used_list)
disk_inst = instance_ref['local_gb']
avail = avail - used
if avail <= disk_inst:
instance_id = ec2utils.id_to_ec2_id(instance_ref['id'])
reason = _("Unable to migrate %(instance_id)s to %(dest)s: "
"Lack of disk(host:%(avail)s "
"<= instance:%(disk_inst)s)")
raise exception.MigrationError(reason=reason % locals())
def _get_compute_info(self, context, host, key):
"""get compute node's infomation specified by key
:param context: security context
:param host: hostname(must be compute node)
:param key: column name of compute_nodes
:return: value specified by key
"""
compute_node_ref = db.service_get_all_compute_by_host(context, host)
compute_node_ref = compute_node_ref[0]['compute_node'][0]
return compute_node_ref[key]
def mounted_on_same_shared_storage(self, context, instance_ref, dest):
"""Check if the src and dest host mount same shared storage.
@@ -283,15 +362,13 @@ class Scheduler(object):
{"method": 'create_shared_storage_test_file'})
# make sure existence at src host.
rpc.call(context, src_t,
{"method": 'check_shared_storage_test_file',
"args": {'filename': filename}})
ret = rpc.call(context, src_t,
{"method": 'check_shared_storage_test_file',
"args": {'filename': filename}})
if not ret:
raise exception.FileNotFound(file_path=filename)
except rpc.RemoteError:
ipath = FLAGS.instances_path
logging.error(_("Cannot confirm tmpfile at %(ipath)s is on "
"same shared storage between %(src)s "
"and %(dest)s.") % locals())
except exception.FileNotFound:
raise
finally:

View File

@@ -34,12 +34,13 @@ from nova.scheduler import zone_manager
LOG = logging.getLogger('nova.scheduler.manager')
FLAGS = flags.FLAGS
flags.DEFINE_string('scheduler_driver',
'nova.scheduler.chance.ChanceScheduler',
'Driver to use for the scheduler')
'nova.scheduler.multi.MultiScheduler',
'Default driver to use for the scheduler')
class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
self.zone_manager = zone_manager.ZoneManager()
if not scheduler_driver:
@@ -71,8 +72,8 @@ class SchedulerManager(manager.Manager):
def update_service_capabilities(self, context=None, service_name=None,
host=None, capabilities=None):
"""Process a capability update from a service node."""
if not capability:
capability = {}
if not capabilities:
capabilities = {}
self.zone_manager.update_service_capabilities(service_name,
host, capabilities)
@@ -113,7 +114,7 @@ class SchedulerManager(manager.Manager):
# NOTE (masumotok) : This method should be moved to nova.api.ec2.admin.
# Based on bexar design summit discussion,
# just put this here for bexar release.
def show_host_resources(self, context, host, *args):
def show_host_resources(self, context, host):
"""Shows the physical/usage resource given by hosts.
:param context: security context
@@ -121,43 +122,45 @@ class SchedulerManager(manager.Manager):
:returns:
example format is below.
{'resource':D, 'usage':{proj_id1:D, proj_id2:D}}
D: {'vcpus':3, 'memory_mb':2048, 'local_gb':2048}
D: {'vcpus': 3, 'memory_mb': 2048, 'local_gb': 2048,
'vcpus_used': 12, 'memory_mb_used': 10240,
'local_gb_used': 64}
"""
# Getting compute node info and related instances info
compute_ref = db.service_get_all_compute_by_host(context, host)
compute_ref = compute_ref[0]
# Getting physical resource information
compute_node_ref = compute_ref['compute_node'][0]
resource = {'vcpus': compute_node_ref['vcpus'],
'memory_mb': compute_node_ref['memory_mb'],
'local_gb': compute_node_ref['local_gb'],
'vcpus_used': compute_node_ref['vcpus_used'],
'memory_mb_used': compute_node_ref['memory_mb_used'],
'local_gb_used': compute_node_ref['local_gb_used']}
# Getting usage resource information
usage = {}
instance_refs = db.instance_get_all_by_host(context,
compute_ref['host'])
# Getting total available/used resource
compute_ref = compute_ref['compute_node'][0]
resource = {'vcpus': compute_ref['vcpus'],
'memory_mb': compute_ref['memory_mb'],
'local_gb': compute_ref['local_gb'],
'vcpus_used': compute_ref['vcpus_used'],
'memory_mb_used': compute_ref['memory_mb_used'],
'local_gb_used': compute_ref['local_gb_used']}
usage = dict()
if not instance_refs:
return {'resource': resource, 'usage': usage}
# Getting usage resource per project
project_ids = [i['project_id'] for i in instance_refs]
project_ids = list(set(project_ids))
for project_id in project_ids:
vcpus = db.instance_get_vcpu_sum_by_host_and_project(context,
host,
project_id)
mem = db.instance_get_memory_sum_by_host_and_project(context,
host,
project_id)
hdd = db.instance_get_disk_sum_by_host_and_project(context,
host,
project_id)
usage[project_id] = {'vcpus': int(vcpus),
'memory_mb': int(mem),
'local_gb': int(hdd)}
vcpus = [i['vcpus'] for i in instance_refs \
if i['project_id'] == project_id]
mem = [i['memory_mb'] for i in instance_refs \
if i['project_id'] == project_id]
disk = [i['local_gb'] for i in instance_refs \
if i['project_id'] == project_id]
usage[project_id] = {'vcpus': reduce(lambda x, y: x + y, vcpus),
'memory_mb': reduce(lambda x, y: x + y, mem),
'local_gb': reduce(lambda x, y: x + y, disk)}
return {'resource': resource, 'usage': usage}

73
nova/scheduler/multi.py Normal file
View File

@@ -0,0 +1,73 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Openstack, LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler that allows routing some calls to one driver and others to another.
"""
from nova import flags
from nova import utils
from nova.scheduler import driver
FLAGS = flags.FLAGS
flags.DEFINE_string('compute_scheduler_driver',
'nova.scheduler.chance.ChanceScheduler',
'Driver to use for scheduling compute calls')
flags.DEFINE_string('volume_scheduler_driver',
'nova.scheduler.chance.ChanceScheduler',
'Driver to use for scheduling volume calls')
# A mapping of methods to topics so we can figure out which driver to use.
_METHOD_MAP = {'run_instance': 'compute',
'start_instance': 'compute',
'create_volume': 'volume'}
class MultiScheduler(driver.Scheduler):
"""A scheduler that holds multiple sub-schedulers.
This exists to allow flag-driven composibility of schedulers, allowing
third parties to integrate custom schedulers more easily.
"""
def __init__(self):
super(MultiScheduler, self).__init__()
compute_driver = utils.import_object(FLAGS.compute_scheduler_driver)
volume_driver = utils.import_object(FLAGS.volume_scheduler_driver)
self.drivers = {'compute': compute_driver,
'volume': volume_driver}
def __getattr__(self, key):
if not key.startswith('schedule_'):
raise AttributeError(key)
method = key[len('schedule_'):]
if method not in _METHOD_MAP:
raise AttributeError(key)
return getattr(self.drivers[_METHOD_MAP[method]], key)
def set_zone_manager(self, zone_manager):
for k, v in self.drivers.iteritems():
v.set_zone_manager(zone_manager)
def schedule(self, context, topic, *_args, **_kwargs):
return self.drivers[topic].schedule(context, topic, *_args, **_kwargs)