From 6a751fe98c4c5471575b12864c513cf6972efa2d Mon Sep 17 00:00:00 2001 From: Joe Gordon Date: Mon, 5 Mar 2012 17:53:57 -0800 Subject: [PATCH] Rename DistributedScheduler as FilterScheduler Change-Id: I1091609d5997c4ba9c26a3f2426496ff7f1e64fa --- nova/scheduler/distributed_scheduler.py | 260 ------------------ nova/scheduler/multi.py | 2 +- .../scheduler/test_distributed_scheduler.py | 177 ------------ 3 files changed, 1 insertion(+), 438 deletions(-) delete mode 100644 nova/scheduler/distributed_scheduler.py delete mode 100644 nova/tests/scheduler/test_distributed_scheduler.py diff --git a/nova/scheduler/distributed_scheduler.py b/nova/scheduler/distributed_scheduler.py deleted file mode 100644 index e841eb5f..00000000 --- a/nova/scheduler/distributed_scheduler.py +++ /dev/null @@ -1,260 +0,0 @@ -# Copyright (c) 2011 Openstack, LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -""" -The DistributedScheduler is for creating instances locally. -You can customize this scheduler by specifying your own Host Filters and -Weighing Functions. -""" - -import operator - -from nova import exception -from nova import flags -from nova import log as logging -from nova.notifier import api as notifier -from nova.scheduler import driver -from nova.scheduler import least_cost -from nova.scheduler import scheduler_options -from nova import utils - - -FLAGS = flags.FLAGS -LOG = logging.getLogger(__name__) - - -class DistributedScheduler(driver.Scheduler): - """Scheduler that can be used for filtering and weighing.""" - def __init__(self, *args, **kwargs): - super(DistributedScheduler, self).__init__(*args, **kwargs) - self.cost_function_cache = {} - self.options = scheduler_options.SchedulerOptions() - - def schedule(self, context, topic, method, *args, **kwargs): - """The schedule() contract requires we return the one - best-suited host for this request. - - NOTE: We're only focused on compute instances right now, - so this method will always raise NoValidHost().""" - msg = _("No host selection for %s defined.") % topic - raise exception.NoValidHost(reason=msg) - - def schedule_run_instance(self, context, request_spec, *args, **kwargs): - """This method is called from nova.compute.api to provision - an instance. We first create a build plan (a list of WeightedHosts) - and then provision. - - Returns a list of the instances created. - """ - - elevated = context.elevated() - num_instances = request_spec.get('num_instances', 1) - LOG.debug(_("Attempting to build %(num_instances)d instance(s)") % - locals()) - - payload = dict(request_spec=request_spec) - notifier.notify(notifier.publisher_id("scheduler"), - 'scheduler.run_instance.start', notifier.INFO, payload) - - weighted_hosts = self._schedule(context, "compute", request_spec, - *args, **kwargs) - - if not weighted_hosts: - raise exception.NoValidHost(reason="") - - # NOTE(comstud): Make sure we do not pass this through. It - # contains an instance of RpcContext that cannot be serialized. - kwargs.pop('filter_properties', None) - - instances = [] - for num in xrange(num_instances): - if not weighted_hosts: - break - weighted_host = weighted_hosts.pop(0) - - request_spec['instance_properties']['launch_index'] = num - instance = self._provision_resource(elevated, weighted_host, - request_spec, kwargs) - - if instance: - instances.append(instance) - - notifier.notify(notifier.publisher_id("scheduler"), - 'scheduler.run_instance.end', notifier.INFO, payload) - - return instances - - def schedule_prep_resize(self, context, request_spec, *args, **kwargs): - """Select a target for resize. - - Selects a target host for the instance, post-resize, and casts - the prep_resize operation to it. - """ - - hosts = self._schedule(context, 'compute', request_spec, - *args, **kwargs) - if not hosts: - raise exception.NoValidHost(reason="") - host = hosts.pop(0) - - # NOTE(comstud): Make sure we do not pass this through. It - # contains an instance of RpcContext that cannot be serialized. - kwargs.pop('filter_properties', None) - - # Forward off to the host - driver.cast_to_compute_host(context, host.host_state.host, - 'prep_resize', **kwargs) - - def _provision_resource(self, context, weighted_host, request_spec, - kwargs): - """Create the requested resource in this Zone.""" - instance = self.create_instance_db_entry(context, request_spec) - - payload = dict(request_spec=request_spec, - weighted_host=weighted_host.to_dict(), - instance_id=instance['uuid']) - notifier.notify(notifier.publisher_id("scheduler"), - 'scheduler.run_instance.scheduled', notifier.INFO, - payload) - - driver.cast_to_compute_host(context, weighted_host.host_state.host, - 'run_instance', instance_uuid=instance['uuid'], **kwargs) - inst = driver.encode_instance(instance, local=True) - # So if another instance is created, create_instance_db_entry will - # actually create a new entry, instead of assume it's been created - # already - del request_spec['instance_properties']['uuid'] - return inst - - def _get_configuration_options(self): - """Fetch options dictionary. Broken out for testing.""" - return self.options.get_configuration() - - def populate_filter_properties(self, request_spec, filter_properties): - """Stuff things into filter_properties. Can be overriden in a - subclass to add more data. - """ - pass - - def _schedule(self, context, topic, request_spec, *args, **kwargs): - """Returns a list of hosts that meet the required specs, - ordered by their fitness. - """ - elevated = context.elevated() - if topic != "compute": - msg = _("Scheduler only understands Compute nodes (for now)") - raise NotImplementedError(msg) - - instance_properties = request_spec['instance_properties'] - instance_type = request_spec.get("instance_type", None) - - cost_functions = self.get_cost_functions() - config_options = self._get_configuration_options() - - filter_properties = kwargs.get('filter_properties', {}) - filter_properties.update({'context': context, - 'request_spec': request_spec, - 'config_options': config_options, - 'instance_type': instance_type}) - - self.populate_filter_properties(request_spec, - filter_properties) - - # Find our local list of acceptable hosts by repeatedly - # filtering and weighing our options. Each time we choose a - # host, we virtually consume resources on it so subsequent - # selections can adjust accordingly. - - # unfiltered_hosts_dict is {host : ZoneManager.HostInfo()} - unfiltered_hosts_dict = self.host_manager.get_all_host_states( - elevated, topic) - - # Note: remember, we are using an iterator here. So only - # traverse this list once. This can bite you if the hosts - # are being scanned in a filter or weighing function. - hosts = unfiltered_hosts_dict.itervalues() - - num_instances = request_spec.get('num_instances', 1) - selected_hosts = [] - for num in xrange(num_instances): - # Filter local hosts based on requirements ... - hosts = self.host_manager.filter_hosts(hosts, - filter_properties) - if not hosts: - # Can't get any more locally. - break - - LOG.debug(_("Filtered %(hosts)s") % locals()) - - # weighted_host = WeightedHost() ... the best - # host for the job. - # TODO(comstud): filter_properties will also be used for - # weighing and I plan fold weighing into the host manager - # in a future patch. I'll address the naming of this - # variable at that time. - weighted_host = least_cost.weighted_sum(cost_functions, - hosts, filter_properties) - LOG.debug(_("Weighted %(weighted_host)s") % locals()) - selected_hosts.append(weighted_host) - - # Now consume the resources so the filter/weights - # will change for the next instance. - weighted_host.host_state.consume_from_instance( - instance_properties) - - selected_hosts.sort(key=operator.attrgetter('weight')) - return selected_hosts[:num_instances] - - def get_cost_functions(self, topic=None): - """Returns a list of tuples containing weights and cost functions to - use for weighing hosts - """ - if topic is None: - # Schedulers only support compute right now. - topic = "compute" - if topic in self.cost_function_cache: - return self.cost_function_cache[topic] - - cost_fns = [] - for cost_fn_str in FLAGS.least_cost_functions: - if '.' in cost_fn_str: - short_name = cost_fn_str.split('.')[-1] - else: - short_name = cost_fn_str - cost_fn_str = "%s.%s.%s" % ( - __name__, self.__class__.__name__, short_name) - if not (short_name.startswith('%s_' % topic) or - short_name.startswith('noop')): - continue - - try: - # NOTE: import_class is somewhat misnamed since - # the weighing function can be any non-class callable - # (i.e., no 'self') - cost_fn = utils.import_class(cost_fn_str) - except exception.ClassNotFound: - raise exception.SchedulerCostFunctionNotFound( - cost_fn_str=cost_fn_str) - - try: - flag_name = "%s_weight" % cost_fn.__name__ - weight = getattr(FLAGS, flag_name) - except AttributeError: - raise exception.SchedulerWeightFlagNotFound( - flag_name=flag_name) - cost_fns.append((weight, cost_fn)) - - self.cost_function_cache[topic] = cost_fns - return cost_fns diff --git a/nova/scheduler/multi.py b/nova/scheduler/multi.py index e5e1d736..3bec060a 100644 --- a/nova/scheduler/multi.py +++ b/nova/scheduler/multi.py @@ -30,7 +30,7 @@ from nova.scheduler import driver multi_scheduler_opts = [ cfg.StrOpt('compute_scheduler_driver', default='nova.scheduler.' - 'distributed_scheduler.DistributedScheduler', + 'filter_scheduler.FilterScheduler', help='Driver to use for scheduling compute calls'), cfg.StrOpt('volume_scheduler_driver', default='nova.scheduler.chance.ChanceScheduler', diff --git a/nova/tests/scheduler/test_distributed_scheduler.py b/nova/tests/scheduler/test_distributed_scheduler.py deleted file mode 100644 index 2aa55de0..00000000 --- a/nova/tests/scheduler/test_distributed_scheduler.py +++ /dev/null @@ -1,177 +0,0 @@ -# Copyright 2011 OpenStack LLC. -# All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. -""" -Tests For Distributed Scheduler. -""" - -import mox - -from nova import context -from nova import exception -from nova.scheduler import least_cost -from nova.scheduler import host_manager -from nova.scheduler import distributed_scheduler -from nova import test -from nova.tests.scheduler import fakes -from nova.tests.scheduler import test_scheduler - - -def fake_filter_hosts(hosts, filter_properties): - return list(hosts) - - -class DistributedSchedulerTestCase(test_scheduler.SchedulerTestCase): - """Test case for Distributed Scheduler.""" - - driver_cls = distributed_scheduler.DistributedScheduler - - def test_run_instance_no_hosts(self): - """ - Ensure empty hosts & child_zones result in NoValidHosts exception. - """ - def _fake_empty_call_zone_method(*args, **kwargs): - return [] - - sched = fakes.FakeDistributedScheduler() - - fake_context = context.RequestContext('user', 'project') - request_spec = {'instance_type': {'memory_mb': 1, 'root_gb': 1, - 'ephemeral_gb': 0}, - 'instance_properties': {'project_id': 1}} - self.assertRaises(exception.NoValidHost, sched.schedule_run_instance, - fake_context, request_spec) - - def test_run_instance_non_admin(self): - """Test creating an instance locally using run_instance, passing - a non-admin context. DB actions should work.""" - self.was_admin = False - - def fake_get(context, *args, **kwargs): - # make sure this is called with admin context, even though - # we're using user context below - self.was_admin = context.is_admin - return {} - - sched = fakes.FakeDistributedScheduler() - self.stubs.Set(sched.host_manager, 'get_all_host_states', fake_get) - - fake_context = context.RequestContext('user', 'project') - - request_spec = {'instance_type': {'memory_mb': 1, 'local_gb': 1}, - 'instance_properties': {'project_id': 1}} - self.assertRaises(exception.NoValidHost, sched.schedule_run_instance, - fake_context, request_spec) - self.assertTrue(self.was_admin) - - def test_schedule_bad_topic(self): - """Parameter checking.""" - sched = fakes.FakeDistributedScheduler() - fake_context = context.RequestContext('user', 'project') - self.assertRaises(NotImplementedError, sched._schedule, fake_context, - "foo", {}) - - def test_scheduler_includes_launch_index(self): - ctxt = "fake-context" - fake_kwargs = {'fake_kwarg1': 'fake_value1', - 'fake_kwarg2': 'fake_value2'} - instance_opts = {'fake_opt1': 'meow'} - request_spec = {'num_instances': 2, - 'instance_properties': instance_opts} - instance1 = {'uuid': 'fake-uuid1'} - instance2 = {'uuid': 'fake-uuid2'} - - def _has_launch_index(expected_index): - """Return a function that verifies the expected index.""" - def _check_launch_index(value): - if 'instance_properties' in value: - if 'launch_index' in value['instance_properties']: - index = value['instance_properties']['launch_index'] - if index == expected_index: - return True - return False - return _check_launch_index - - class ContextFake(object): - def elevated(self): - return ctxt - context_fake = ContextFake() - - self.mox.StubOutWithMock(self.driver, '_schedule') - self.mox.StubOutWithMock(self.driver, '_provision_resource') - - self.driver._schedule(context_fake, 'compute', - request_spec, **fake_kwargs - ).AndReturn(['host1', 'host2']) - # instance 1 - self.driver._provision_resource( - ctxt, 'host1', - mox.Func(_has_launch_index(0)), fake_kwargs).AndReturn(instance1) - # instance 2 - self.driver._provision_resource( - ctxt, 'host2', - mox.Func(_has_launch_index(1)), fake_kwargs).AndReturn(instance2) - self.mox.ReplayAll() - - self.driver.schedule_run_instance(context_fake, request_spec, - **fake_kwargs) - - def test_schedule_happy_day(self): - """Make sure there's nothing glaringly wrong with _schedule() - by doing a happy day pass through.""" - - self.next_weight = 1.0 - - def _fake_weighted_sum(functions, hosts, options): - self.next_weight += 2.0 - host_state = hosts[0] - return least_cost.WeightedHost(self.next_weight, - host_state=host_state) - - sched = fakes.FakeDistributedScheduler() - fake_context = context.RequestContext('user', 'project', - is_admin=True) - - self.stubs.Set(sched.host_manager, 'filter_hosts', - fake_filter_hosts) - self.stubs.Set(least_cost, 'weighted_sum', _fake_weighted_sum) - fakes.mox_host_manager_db_calls(self.mox, fake_context) - - request_spec = {'num_instances': 10, - 'instance_type': {'memory_mb': 512, 'root_gb': 512, - 'ephemeral_gb': 0, - 'vcpus': 1}, - 'instance_properties': {'project_id': 1, - 'root_gb': 512, - 'memory_mb': 512, - 'ephemeral_gb': 0, - 'vcpus': 1}} - self.mox.ReplayAll() - weighted_hosts = sched._schedule(fake_context, 'compute', - request_spec) - self.assertEquals(len(weighted_hosts), 10) - for weighted_host in weighted_hosts: - self.assertTrue(weighted_host.host_state is not None) - - def test_get_cost_functions(self): - self.flags(reserved_host_memory_mb=128) - fixture = fakes.FakeDistributedScheduler() - fns = fixture.get_cost_functions() - self.assertEquals(len(fns), 1) - weight, fn = fns[0] - self.assertEquals(weight, 1.0) - hostinfo = host_manager.HostState('host', 'compute') - hostinfo.update_from_compute_node(dict(memory_mb=1000, - local_gb=0, vcpus=1)) - self.assertEquals(1000 - 128, fn(hostinfo, {}))