Rename DistributedScheduler as FilterScheduler

Change-Id: I1091609d5997c4ba9c26a3f2426496ff7f1e64fa
This commit is contained in:
Joe Gordon
2012-03-05 17:53:57 -08:00
parent d15fbabb22
commit 6a751fe98c
3 changed files with 1 additions and 438 deletions

View File

@@ -1,260 +0,0 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
The DistributedScheduler is for creating instances locally.
You can customize this scheduler by specifying your own Host Filters and
Weighing Functions.
"""
import operator
from nova import exception
from nova import flags
from nova import log as logging
from nova.notifier import api as notifier
from nova.scheduler import driver
from nova.scheduler import least_cost
from nova.scheduler import scheduler_options
from nova import utils
FLAGS = flags.FLAGS
LOG = logging.getLogger(__name__)
class DistributedScheduler(driver.Scheduler):
"""Scheduler that can be used for filtering and weighing."""
def __init__(self, *args, **kwargs):
super(DistributedScheduler, self).__init__(*args, **kwargs)
self.cost_function_cache = {}
self.options = scheduler_options.SchedulerOptions()
def schedule(self, context, topic, method, *args, **kwargs):
"""The schedule() contract requires we return the one
best-suited host for this request.
NOTE: We're only focused on compute instances right now,
so this method will always raise NoValidHost()."""
msg = _("No host selection for %s defined.") % topic
raise exception.NoValidHost(reason=msg)
def schedule_run_instance(self, context, request_spec, *args, **kwargs):
"""This method is called from nova.compute.api to provision
an instance. We first create a build plan (a list of WeightedHosts)
and then provision.
Returns a list of the instances created.
"""
elevated = context.elevated()
num_instances = request_spec.get('num_instances', 1)
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
locals())
payload = dict(request_spec=request_spec)
notifier.notify(notifier.publisher_id("scheduler"),
'scheduler.run_instance.start', notifier.INFO, payload)
weighted_hosts = self._schedule(context, "compute", request_spec,
*args, **kwargs)
if not weighted_hosts:
raise exception.NoValidHost(reason="")
# NOTE(comstud): Make sure we do not pass this through. It
# contains an instance of RpcContext that cannot be serialized.
kwargs.pop('filter_properties', None)
instances = []
for num in xrange(num_instances):
if not weighted_hosts:
break
weighted_host = weighted_hosts.pop(0)
request_spec['instance_properties']['launch_index'] = num
instance = self._provision_resource(elevated, weighted_host,
request_spec, kwargs)
if instance:
instances.append(instance)
notifier.notify(notifier.publisher_id("scheduler"),
'scheduler.run_instance.end', notifier.INFO, payload)
return instances
def schedule_prep_resize(self, context, request_spec, *args, **kwargs):
"""Select a target for resize.
Selects a target host for the instance, post-resize, and casts
the prep_resize operation to it.
"""
hosts = self._schedule(context, 'compute', request_spec,
*args, **kwargs)
if not hosts:
raise exception.NoValidHost(reason="")
host = hosts.pop(0)
# NOTE(comstud): Make sure we do not pass this through. It
# contains an instance of RpcContext that cannot be serialized.
kwargs.pop('filter_properties', None)
# Forward off to the host
driver.cast_to_compute_host(context, host.host_state.host,
'prep_resize', **kwargs)
def _provision_resource(self, context, weighted_host, request_spec,
kwargs):
"""Create the requested resource in this Zone."""
instance = self.create_instance_db_entry(context, request_spec)
payload = dict(request_spec=request_spec,
weighted_host=weighted_host.to_dict(),
instance_id=instance['uuid'])
notifier.notify(notifier.publisher_id("scheduler"),
'scheduler.run_instance.scheduled', notifier.INFO,
payload)
driver.cast_to_compute_host(context, weighted_host.host_state.host,
'run_instance', instance_uuid=instance['uuid'], **kwargs)
inst = driver.encode_instance(instance, local=True)
# So if another instance is created, create_instance_db_entry will
# actually create a new entry, instead of assume it's been created
# already
del request_spec['instance_properties']['uuid']
return inst
def _get_configuration_options(self):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
"""Stuff things into filter_properties. Can be overriden in a
subclass to add more data.
"""
pass
def _schedule(self, context, topic, request_spec, *args, **kwargs):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
"""
elevated = context.elevated()
if topic != "compute":
msg = _("Scheduler only understands Compute nodes (for now)")
raise NotImplementedError(msg)
instance_properties = request_spec['instance_properties']
instance_type = request_spec.get("instance_type", None)
cost_functions = self.get_cost_functions()
config_options = self._get_configuration_options()
filter_properties = kwargs.get('filter_properties', {})
filter_properties.update({'context': context,
'request_spec': request_spec,
'config_options': config_options,
'instance_type': instance_type})
self.populate_filter_properties(request_spec,
filter_properties)
# Find our local list of acceptable hosts by repeatedly
# filtering and weighing our options. Each time we choose a
# host, we virtually consume resources on it so subsequent
# selections can adjust accordingly.
# unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
unfiltered_hosts_dict = self.host_manager.get_all_host_states(
elevated, topic)
# Note: remember, we are using an iterator here. So only
# traverse this list once. This can bite you if the hosts
# are being scanned in a filter or weighing function.
hosts = unfiltered_hosts_dict.itervalues()
num_instances = request_spec.get('num_instances', 1)
selected_hosts = []
for num in xrange(num_instances):
# Filter local hosts based on requirements ...
hosts = self.host_manager.filter_hosts(hosts,
filter_properties)
if not hosts:
# Can't get any more locally.
break
LOG.debug(_("Filtered %(hosts)s") % locals())
# weighted_host = WeightedHost() ... the best
# host for the job.
# TODO(comstud): filter_properties will also be used for
# weighing and I plan fold weighing into the host manager
# in a future patch. I'll address the naming of this
# variable at that time.
weighted_host = least_cost.weighted_sum(cost_functions,
hosts, filter_properties)
LOG.debug(_("Weighted %(weighted_host)s") % locals())
selected_hosts.append(weighted_host)
# Now consume the resources so the filter/weights
# will change for the next instance.
weighted_host.host_state.consume_from_instance(
instance_properties)
selected_hosts.sort(key=operator.attrgetter('weight'))
return selected_hosts[:num_instances]
def get_cost_functions(self, topic=None):
"""Returns a list of tuples containing weights and cost functions to
use for weighing hosts
"""
if topic is None:
# Schedulers only support compute right now.
topic = "compute"
if topic in self.cost_function_cache:
return self.cost_function_cache[topic]
cost_fns = []
for cost_fn_str in FLAGS.least_cost_functions:
if '.' in cost_fn_str:
short_name = cost_fn_str.split('.')[-1]
else:
short_name = cost_fn_str
cost_fn_str = "%s.%s.%s" % (
__name__, self.__class__.__name__, short_name)
if not (short_name.startswith('%s_' % topic) or
short_name.startswith('noop')):
continue
try:
# NOTE: import_class is somewhat misnamed since
# the weighing function can be any non-class callable
# (i.e., no 'self')
cost_fn = utils.import_class(cost_fn_str)
except exception.ClassNotFound:
raise exception.SchedulerCostFunctionNotFound(
cost_fn_str=cost_fn_str)
try:
flag_name = "%s_weight" % cost_fn.__name__
weight = getattr(FLAGS, flag_name)
except AttributeError:
raise exception.SchedulerWeightFlagNotFound(
flag_name=flag_name)
cost_fns.append((weight, cost_fn))
self.cost_function_cache[topic] = cost_fns
return cost_fns

View File

@@ -30,7 +30,7 @@ from nova.scheduler import driver
multi_scheduler_opts = [
cfg.StrOpt('compute_scheduler_driver',
default='nova.scheduler.'
'distributed_scheduler.DistributedScheduler',
'filter_scheduler.FilterScheduler',
help='Driver to use for scheduling compute calls'),
cfg.StrOpt('volume_scheduler_driver',
default='nova.scheduler.chance.ChanceScheduler',

View File

@@ -1,177 +0,0 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Distributed Scheduler.
"""
import mox
from nova import context
from nova import exception
from nova.scheduler import least_cost
from nova.scheduler import host_manager
from nova.scheduler import distributed_scheduler
from nova import test
from nova.tests.scheduler import fakes
from nova.tests.scheduler import test_scheduler
def fake_filter_hosts(hosts, filter_properties):
return list(hosts)
class DistributedSchedulerTestCase(test_scheduler.SchedulerTestCase):
"""Test case for Distributed Scheduler."""
driver_cls = distributed_scheduler.DistributedScheduler
def test_run_instance_no_hosts(self):
"""
Ensure empty hosts & child_zones result in NoValidHosts exception.
"""
def _fake_empty_call_zone_method(*args, **kwargs):
return []
sched = fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project')
request_spec = {'instance_type': {'memory_mb': 1, 'root_gb': 1,
'ephemeral_gb': 0},
'instance_properties': {'project_id': 1}}
self.assertRaises(exception.NoValidHost, sched.schedule_run_instance,
fake_context, request_spec)
def test_run_instance_non_admin(self):
"""Test creating an instance locally using run_instance, passing
a non-admin context. DB actions should work."""
self.was_admin = False
def fake_get(context, *args, **kwargs):
# make sure this is called with admin context, even though
# we're using user context below
self.was_admin = context.is_admin
return {}
sched = fakes.FakeDistributedScheduler()
self.stubs.Set(sched.host_manager, 'get_all_host_states', fake_get)
fake_context = context.RequestContext('user', 'project')
request_spec = {'instance_type': {'memory_mb': 1, 'local_gb': 1},
'instance_properties': {'project_id': 1}}
self.assertRaises(exception.NoValidHost, sched.schedule_run_instance,
fake_context, request_spec)
self.assertTrue(self.was_admin)
def test_schedule_bad_topic(self):
"""Parameter checking."""
sched = fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project')
self.assertRaises(NotImplementedError, sched._schedule, fake_context,
"foo", {})
def test_scheduler_includes_launch_index(self):
ctxt = "fake-context"
fake_kwargs = {'fake_kwarg1': 'fake_value1',
'fake_kwarg2': 'fake_value2'}
instance_opts = {'fake_opt1': 'meow'}
request_spec = {'num_instances': 2,
'instance_properties': instance_opts}
instance1 = {'uuid': 'fake-uuid1'}
instance2 = {'uuid': 'fake-uuid2'}
def _has_launch_index(expected_index):
"""Return a function that verifies the expected index."""
def _check_launch_index(value):
if 'instance_properties' in value:
if 'launch_index' in value['instance_properties']:
index = value['instance_properties']['launch_index']
if index == expected_index:
return True
return False
return _check_launch_index
class ContextFake(object):
def elevated(self):
return ctxt
context_fake = ContextFake()
self.mox.StubOutWithMock(self.driver, '_schedule')
self.mox.StubOutWithMock(self.driver, '_provision_resource')
self.driver._schedule(context_fake, 'compute',
request_spec, **fake_kwargs
).AndReturn(['host1', 'host2'])
# instance 1
self.driver._provision_resource(
ctxt, 'host1',
mox.Func(_has_launch_index(0)), fake_kwargs).AndReturn(instance1)
# instance 2
self.driver._provision_resource(
ctxt, 'host2',
mox.Func(_has_launch_index(1)), fake_kwargs).AndReturn(instance2)
self.mox.ReplayAll()
self.driver.schedule_run_instance(context_fake, request_spec,
**fake_kwargs)
def test_schedule_happy_day(self):
"""Make sure there's nothing glaringly wrong with _schedule()
by doing a happy day pass through."""
self.next_weight = 1.0
def _fake_weighted_sum(functions, hosts, options):
self.next_weight += 2.0
host_state = hosts[0]
return least_cost.WeightedHost(self.next_weight,
host_state=host_state)
sched = fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
self.stubs.Set(sched.host_manager, 'filter_hosts',
fake_filter_hosts)
self.stubs.Set(least_cost, 'weighted_sum', _fake_weighted_sum)
fakes.mox_host_manager_db_calls(self.mox, fake_context)
request_spec = {'num_instances': 10,
'instance_type': {'memory_mb': 512, 'root_gb': 512,
'ephemeral_gb': 0,
'vcpus': 1},
'instance_properties': {'project_id': 1,
'root_gb': 512,
'memory_mb': 512,
'ephemeral_gb': 0,
'vcpus': 1}}
self.mox.ReplayAll()
weighted_hosts = sched._schedule(fake_context, 'compute',
request_spec)
self.assertEquals(len(weighted_hosts), 10)
for weighted_host in weighted_hosts:
self.assertTrue(weighted_host.host_state is not None)
def test_get_cost_functions(self):
self.flags(reserved_host_memory_mb=128)
fixture = fakes.FakeDistributedScheduler()
fns = fixture.get_cost_functions()
self.assertEquals(len(fns), 1)
weight, fn = fns[0]
self.assertEquals(weight, 1.0)
hostinfo = host_manager.HostState('host', 'compute')
hostinfo.update_from_compute_node(dict(memory_mb=1000,
local_gb=0, vcpus=1))
self.assertEquals(1000 - 128, fn(hostinfo, {}))