diff --git a/nova/scheduler/distributed_scheduler.py b/nova/scheduler/distributed_scheduler.py index db1e1ed4..224b1940 100644 --- a/nova/scheduler/distributed_scheduler.py +++ b/nova/scheduler/distributed_scheduler.py @@ -25,21 +25,21 @@ import types import M2Crypto +from nova.compute import api as compute_api from novaclient import v1_1 as novaclient from novaclient import exceptions as novaclient_exceptions - from nova import crypto from nova import db from nova import exception from nova import flags from nova import log as logging from nova import rpc - -from nova.compute import api as compute_api from nova.scheduler import api from nova.scheduler import driver from nova.scheduler import filters from nova.scheduler import least_cost +from nova.scheduler import scheduler_options +from nova import utils FLAGS = flags.FLAGS @@ -59,6 +59,10 @@ class DistributedScheduler(driver.Scheduler): """Scheduler that can work across any nova deployment, from simple deployments to multiple nested zones. """ + def __init__(self, *args, **kwargs): + super(DistributedScheduler, self).__init__(*args, **kwargs) + self.cost_function_cache = {} + self.options = scheduler_options.SchedulerOptions() def schedule(self, context, topic, method, *args, **kwargs): """The schedule() contract requires we return the one @@ -243,6 +247,10 @@ class DistributedScheduler(driver.Scheduler): """Broken out for testing.""" return db.zone_get_all(context) + def _get_configuration_options(self): + """Fetch options dictionary. Broken out for testing.""" + return self.options.get_configuration() + def _schedule(self, elevated, topic, request_spec, *args, **kwargs): """Returns a list of hosts that meet the required specs, ordered by their fitness. @@ -257,9 +265,13 @@ class DistributedScheduler(driver.Scheduler): "provisioning.") raise NotImplementedError(msg) + cost_functions = self.get_cost_functions() + ram_requirement_mb = instance_type['memory_mb'] disk_requirement_bg = instance_type['local_gb'] + options = self._get_configuration_options() + # Find our local list of acceptable hosts by repeatedly # filtering and weighing our options. Each time we choose a # host, we virtually consume resources on it so subsequent @@ -274,7 +286,7 @@ class DistributedScheduler(driver.Scheduler): for num in xrange(num_instances): # Filter local hosts based on requirements ... filtered_hosts = self._filter_hosts(topic, request_spec, - unfiltered_hosts) + unfiltered_hosts, options) if not filtered_hosts: # Can't get any more locally. @@ -284,8 +296,8 @@ class DistributedScheduler(driver.Scheduler): # weighted_host = WeightedHost() ... the best # host for the job. - weighted_host = least_cost.weigh_hosts(request_spec, - filtered_hosts) + weighted_host = least_cost.weighted_sum(cost_functions, + filtered_hosts, options) LOG.debug(_("Weighted %(weighted_host)s") % locals()) selected_hosts.append(weighted_host) @@ -343,7 +355,7 @@ class DistributedScheduler(driver.Scheduler): raise exception.SchedulerHostFilterNotFound(filter_name=msg) return good_filters - def _filter_hosts(self, topic, request_spec, hosts=None): + def _filter_hosts(self, topic, request_spec, hosts, options): """Filter the full host list. hosts = [(host, HostInfo()), ...]. This method returns a subset of hosts, in the same format.""" selected_filters = self._choose_host_filters() @@ -358,6 +370,48 @@ class DistributedScheduler(driver.Scheduler): for selected_filter in selected_filters: query = selected_filter.instance_type_to_filter(instance_type) - hosts = selected_filter.filter_hosts(hosts, query) + hosts = selected_filter.filter_hosts(hosts, query, options) return hosts + + def get_cost_functions(self, topic=None): + """Returns a list of tuples containing weights and cost functions to + use for weighing hosts + """ + if topic is None: + # Schedulers only support compute right now. + topic = "compute" + if topic in self.cost_function_cache: + return self.cost_function_cache[topic] + + cost_fns = [] + for cost_fn_str in FLAGS.least_cost_functions: + if '.' in cost_fn_str: + short_name = cost_fn_str.split('.')[-1] + else: + short_name = cost_fn_str + cost_fn_str = "%s.%s.%s" % ( + __name__, self.__class__.__name__, short_name) + if not (short_name.startswith('%s_' % topic) or + short_name.startswith('noop')): + continue + + try: + # NOTE: import_class is somewhat misnamed since + # the weighing function can be any non-class callable + # (i.e., no 'self') + cost_fn = utils.import_class(cost_fn_str) + except exception.ClassNotFound: + raise exception.SchedulerCostFunctionNotFound( + cost_fn_str=cost_fn_str) + + try: + flag_name = "%s_weight" % cost_fn.__name__ + weight = getattr(FLAGS, flag_name) + except AttributeError: + raise exception.SchedulerWeightFlagNotFound( + flag_name=flag_name) + cost_fns.append((weight, cost_fn)) + + self.cost_function_cache[topic] = cost_fns + return cost_fns diff --git a/nova/scheduler/least_cost.py b/nova/scheduler/least_cost.py index 83dc087a..7e12ca39 100644 --- a/nova/scheduler/least_cost.py +++ b/nova/scheduler/least_cost.py @@ -23,11 +23,8 @@ is then selected for provisioning. """ -import collections - from nova import flags from nova import log as logging -from nova import utils from nova import exception LOG = logging.getLogger('nova.scheduler.least_cost') @@ -46,9 +43,6 @@ flags.DEFINE_float('compute_fill_first_cost_fn_weight', 1.0, 'How much weight to give the fill-first cost function') -COST_FUNCTION_CACHE = {} - - class WeightedHost(object): """Reduced set of information about a host that has been weighed. This is an attempt to remove some of the ad-hoc dict structures @@ -74,36 +68,18 @@ class WeightedHost(object): return x -def noop_cost_fn(host_info): +def noop_cost_fn(host_info, options=None): """Return a pre-weight cost of 1 for each host""" return 1 -def compute_fill_first_cost_fn(host_info): +def compute_fill_first_cost_fn(host_info, options=None): """More free ram = higher weight. So servers will less free ram will be preferred.""" return host_info.free_ram_mb -def normalize_grid(grid): - """Normalize a grid of numbers by row.""" - if not grid: - return [[]] - - normalized = [] - for row in grid: - if not row: - normalized.append([]) - continue - mx = float(max(row)) - if abs(mx) < 0.001: - normalized = [0.0] * len(row) - continue - normalized.append([float(col) / mx for col in row]) - return normalized - - -def weighted_sum(host_list, weighted_fns): +def weighted_sum(weighted_fns, host_list, options): """Use the weighted-sum method to compute a score for an array of objects. Normalize the results of the objective-functions so that the weights are meaningful regardless of objective-function's range. @@ -111,6 +87,7 @@ def weighted_sum(host_list, weighted_fns): host_list - [(host, HostInfo()), ...] weighted_fns - list of weights and functions like: [(weight, objective-functions), ...] + options is an arbitrary dict of values. Returns a single WeightedHost object which represents the best candidate. @@ -120,8 +97,8 @@ def weighted_sum(host_list, weighted_fns): # One row per host. One column per function. scores = [] for weight, fn in weighted_fns: - scores.append([fn(host_info) for hostname, host_info in host_list]) - scores = normalize_grid(scores) + scores.append([fn(host_info, options) for hostname, host_info + in host_list]) # Adjust the weights in the grid by the functions weight adjustment # and sum them up to get a final list of weights. @@ -143,54 +120,3 @@ def weighted_sum(host_list, weighted_fns): final_scores = sorted(final_scores) weight, (host, hostinfo) = final_scores[0] # Lowest score is the winner! return WeightedHost(weight, host=host, hostinfo=hostinfo) - - -def get_cost_fns(topic=None): - """Returns a list of tuples containing weights and cost functions to - use for weighing hosts - """ - global COST_FUNCTION_CACHE - cost_function_cache = COST_FUNCTION_CACHE - - if topic is None: - # Schedulers only support compute right now. - topic = "compute" - if topic in cost_function_cache: - return cost_function_cache[topic] - - cost_fns = [] - for cost_fn_str in FLAGS.least_cost_functions: - if '.' in cost_fn_str: - short_name = cost_fn_str.split('.')[-1] - else: - short_name = cost_fn_str - cost_fn_str = "%s.%s.%s" % ( - __name__, self.__class__.__name__, short_name) - if not (short_name.startswith('%s_' % topic) or - short_name.startswith('noop')): - continue - - try: - # NOTE(sirp): import_class is somewhat misnamed since it can - # any callable from a module - cost_fn = utils.import_class(cost_fn_str) - except exception.ClassNotFound: - raise exception.SchedulerCostFunctionNotFound( - cost_fn_str=cost_fn_str) - - try: - flag_name = "%s_weight" % cost_fn.__name__ - weight = getattr(FLAGS, flag_name) - except AttributeError: - raise exception.SchedulerWeightFlagNotFound( - flag_name=flag_name) - cost_fns.append((weight, cost_fn)) - - cost_function_cache[topic] = cost_fns - return cost_fns - - -def weigh_hosts(request_spec, host_list): - """Returns the best host as a WeightedHost.""" - cost_fns = get_cost_fns() - return weighted_sum(host_list, cost_fns) diff --git a/nova/tests/scheduler/test_distributed_scheduler.py b/nova/tests/scheduler/test_distributed_scheduler.py index a66fae34..657232f7 100644 --- a/nova/tests/scheduler/test_distributed_scheduler.py +++ b/nova/tests/scheduler/test_distributed_scheduler.py @@ -212,10 +212,11 @@ class DistributedSchedulerTestCase(test.TestCase): self.next_weight = 1.0 - def _fake_filter_hosts(topic, request_info, unfiltered_hosts): + def _fake_filter_hosts(topic, request_info, unfiltered_hosts, + options): return unfiltered_hosts - def _fake_weigh_hosts(request_info, hosts): + def _fake_weighted_sum(functions, hosts, options): self.next_weight += 2.0 host, hostinfo = hosts[0] return least_cost.WeightedHost(self.next_weight, host=host, @@ -225,7 +226,7 @@ class DistributedSchedulerTestCase(test.TestCase): fake_context = context.RequestContext('user', 'project') sched.zone_manager = ds_fakes.FakeZoneManager() self.stubs.Set(sched, '_filter_hosts', _fake_filter_hosts) - self.stubs.Set(least_cost, 'weigh_hosts', _fake_weigh_hosts) + self.stubs.Set(least_cost, 'weighted_sum', _fake_weighted_sum) self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all) self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method) @@ -260,3 +261,12 @@ class DistributedSchedulerTestCase(test.TestCase): self.assertTrue(isinstance(weighted_host, least_cost.WeightedHost)) self.assertEqual(weighted_host.to_dict(), dict(weight=1, host='x', blob='y', zone='z')) + + def test_get_cost_functions(self): + fixture = ds_fakes.FakeDistributedScheduler() + fns = fixture.get_cost_functions() + self.assertEquals(len(fns), 1) + weight, fn = fns[0] + self.assertEquals(weight, 1.0) + hostinfo = zone_manager.HostInfo('host', free_ram_mb=1000) + self.assertEquals(1000, fn(hostinfo)) diff --git a/nova/tests/scheduler/test_host_filter.py b/nova/tests/scheduler/test_host_filter.py index b5b5aade..62131a62 100644 --- a/nova/tests/scheduler/test_host_filter.py +++ b/nova/tests/scheduler/test_host_filter.py @@ -122,7 +122,7 @@ class HostFilterTestCase(test.TestCase): hf = hfs[0] all_hosts = self._get_all_hosts() cooked = hf.instance_type_to_filter(self.instance_type) - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(4, len(hosts)) for host, capabilities in hosts: self.assertTrue(host.startswith('host')) @@ -132,7 +132,7 @@ class HostFilterTestCase(test.TestCase): # filter all hosts that can support 30 ram and 300 disk cooked = hf.instance_type_to_filter(self.instance_type) all_hosts = self._get_all_hosts() - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(3, len(hosts)) just_hosts = [host for host, hostinfo in hosts] just_hosts.sort() @@ -147,7 +147,7 @@ class HostFilterTestCase(test.TestCase): # reserving 2048 ram cooked = hf.instance_type_to_filter(self.instance_type) all_hosts = self._get_all_hosts() - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(2, len(hosts)) just_hosts = [host for host, hostinfo in hosts] just_hosts.sort() @@ -159,7 +159,7 @@ class HostFilterTestCase(test.TestCase): # filter all hosts that can support 30 ram and 300 disk cooked = hf.instance_type_to_filter(self.gpu_instance_type) all_hosts = self._get_all_hosts() - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(1, len(hosts)) just_hosts = [host for host, caps in hosts] self.assertEquals('host4', just_hosts[0]) @@ -169,7 +169,7 @@ class HostFilterTestCase(test.TestCase): # filter all hosts that can support 30 ram and 300 disk cooked = hf.instance_type_to_filter(self.instance_type) all_hosts = self._get_all_hosts() - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(2, len(hosts)) just_hosts = [host for host, caps in hosts] just_hosts.sort() @@ -189,7 +189,7 @@ class HostFilterTestCase(test.TestCase): ] ] cooked = json.dumps(raw) - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(3, len(hosts)) just_hosts = [host for host, caps in hosts] @@ -201,7 +201,7 @@ class HostFilterTestCase(test.TestCase): ['=', '$compute.host_memory_free', 30], ] cooked = json.dumps(raw) - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(3, len(hosts)) just_hosts = [host for host, caps in hosts] @@ -211,7 +211,7 @@ class HostFilterTestCase(test.TestCase): raw = ['in', '$compute.host_memory_free', 20, 40, 60, 80, 100] cooked = json.dumps(raw) - hosts = hf.filter_hosts(all_hosts, cooked) + hosts = hf.filter_hosts(all_hosts, cooked, {}) self.assertEquals(2, len(hosts)) just_hosts = [host for host, caps in hosts] just_hosts.sort() @@ -222,32 +222,32 @@ class HostFilterTestCase(test.TestCase): raw = ['unknown command', ] cooked = json.dumps(raw) try: - hf.filter_hosts(all_hosts, cooked) + hf.filter_hosts(all_hosts, cooked, {}) self.fail("Should give KeyError") except KeyError, e: pass - self.assertTrue(hf.filter_hosts(all_hosts, json.dumps([]))) - self.assertTrue(hf.filter_hosts(all_hosts, json.dumps({}))) + self.assertTrue(hf.filter_hosts(all_hosts, json.dumps([]), {})) + self.assertTrue(hf.filter_hosts(all_hosts, json.dumps({}), {})) self.assertTrue(hf.filter_hosts(all_hosts, json.dumps( ['not', True, False, True, False], - ))) + ), {})) try: hf.filter_hosts(all_hosts, json.dumps( - 'not', True, False, True, False, - )) + 'not', True, False, True, False,), {}) self.fail("Should give KeyError") except KeyError, e: pass self.assertFalse(hf.filter_hosts(all_hosts, - json.dumps(['=', '$foo', 100]))) + json.dumps(['=', '$foo', 100]), {})) self.assertFalse(hf.filter_hosts(all_hosts, - json.dumps(['=', '$.....', 100]))) + json.dumps(['=', '$.....', 100]), {})) self.assertFalse(hf.filter_hosts(all_hosts, json.dumps( - ['>', ['and', ['or', ['not', ['<', ['>=', ['<=', ['in', ]]]]]]]]))) + ['>', ['and', ['or', ['not', ['<', ['>=', ['<=', ['in', ]]]]]]]]), + {})) self.assertFalse(hf.filter_hosts(all_hosts, - json.dumps(['=', {}, ['>', '$missing....foo']]))) + json.dumps(['=', {}, ['>', '$missing....foo']]), {})) diff --git a/nova/tests/scheduler/test_least_cost.py b/nova/tests/scheduler/test_least_cost.py index a45e9518..4a3af2ea 100644 --- a/nova/tests/scheduler/test_least_cost.py +++ b/nova/tests/scheduler/test_least_cost.py @@ -21,11 +21,11 @@ from nova import test from nova.tests.scheduler import fake_zone_manager -def offset(hostinfo): +def offset(hostinfo, options): return hostinfo.free_ram_mb + 10000 -def scale(hostinfo): +def scale(hostinfo, options): return hostinfo.free_ram_mb * 2 @@ -39,23 +39,6 @@ class LeastCostTestCase(test.TestCase): def tearDown(self): super(LeastCostTestCase, self).tearDown() - def test_normalize_grid(self): - raw = [ - [1, 2, 3, 4, 5], - [10, 20, 30, 40, 50], - [100, 200, 300, 400, 500], - ] - expected = [ - [.2, .4, .6, .8, 1.0], - [.2, .4, .6, .8, 1.0], - [.2, .4, .6, .8, 1.0], - ] - - self.assertEquals(expected, least_cost.normalize_grid(raw)) - - self.assertEquals([[]], least_cost.normalize_grid([])) - self.assertEquals([[]], least_cost.normalize_grid([[]])) - def test_weighted_sum_happy_day(self): fn_tuples = [(1.0, offset), (1.0, scale)] hostinfo_list = self.zone_manager.get_all_host_data(None).items() @@ -69,16 +52,14 @@ class LeastCostTestCase(test.TestCase): # [10000, 11536, 13072, 18192] # [0, 768, 1536, 4096] - # normalized = - # [ 0.55, 0.63, 0.72, 1.0] - # [ 0.0, 0.19, 0.38, 1.0] - # adjusted [ 1.0 * x + 1.0 * y] = - # [0.55, 0.82, 1.1, 2.0] + # [10000, 12304, 14608, 22288] # so, host1 should win: - weighted_host = least_cost.weighted_sum(hostinfo_list, fn_tuples) - self.assertTrue(abs(weighted_host.weight - 0.55) < 0.01) + options = {} + weighted_host = least_cost.weighted_sum(fn_tuples, hostinfo_list, + options) + self.assertEqual(weighted_host.weight, 10000) self.assertEqual(weighted_host.host, 'host1') def test_weighted_sum_single_function(self): @@ -93,18 +74,9 @@ class LeastCostTestCase(test.TestCase): # [offset, ]= # [10000, 11536, 13072, 18192] - # normalized = - # [ 0.55, 0.63, 0.72, 1.0] - # so, host1 should win: - weighted_host = least_cost.weighted_sum(hostinfo_list, fn_tuples) - self.assertTrue(abs(weighted_host.weight - 0.55) < 0.01) + options = {} + weighted_host = least_cost.weighted_sum(fn_tuples, hostinfo_list, + options) + self.assertEqual(weighted_host.weight, 10000) self.assertEqual(weighted_host.host, 'host1') - - def test_get_cost_functions(self): - fns = least_cost.get_cost_fns() - self.assertEquals(len(fns), 1) - weight, fn = fns[0] - self.assertEquals(weight, 1.0) - hostinfo = zone_manager.HostInfo('host', free_ram_mb=1000) - self.assertEquals(1000, fn(hostinfo))