Add some resource checking for memory available when scheduling
Various changes to d-sched to plan for scheduling on different topics, which cleans up some of the resource checking. Re-compute weights when building more than 1 instance, accounting for resources that would be consumed.
This commit is contained in:
@@ -305,8 +305,11 @@ class HostFilterScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
|||||||
'instance_type': <InstanceType dict>}
|
'instance_type': <InstanceType dict>}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def filter_hosts(self, num, request_spec):
|
def filter_hosts(self, topic, request_spec, hosts):
|
||||||
"""Filter the full host list (from the ZoneManager)"""
|
"""Filter the full host list (from the ZoneManager)"""
|
||||||
|
|
||||||
|
if hosts:
|
||||||
|
return hosts
|
||||||
filter_name = request_spec.get('filter', None)
|
filter_name = request_spec.get('filter', None)
|
||||||
host_filter = choose_host_filter(filter_name)
|
host_filter = choose_host_filter(filter_name)
|
||||||
|
|
||||||
@@ -317,8 +320,9 @@ class HostFilterScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
|||||||
name, query = host_filter.instance_type_to_filter(instance_type)
|
name, query = host_filter.instance_type_to_filter(instance_type)
|
||||||
return host_filter.filter_hosts(self.zone_manager, query)
|
return host_filter.filter_hosts(self.zone_manager, query)
|
||||||
|
|
||||||
def weigh_hosts(self, num, request_spec, hosts):
|
def weigh_hosts(self, topic, request_spec, hosts):
|
||||||
"""Derived classes must override this method and return
|
"""Derived classes must override this method and return
|
||||||
a lists of hosts in [{weight, hostname}] format.
|
a lists of hosts in [{weight, hostname}] format.
|
||||||
"""
|
"""
|
||||||
return [dict(weight=1, hostname=host) for host, caps in hosts]
|
return [dict(weight=1, hostname=hostname, capabilities=caps)
|
||||||
|
for hostname, caps in hosts]
|
||||||
|
|||||||
@@ -48,25 +48,36 @@ def noop_cost_fn(host):
|
|||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
||||||
flags.DEFINE_integer('fill_first_cost_fn_weight', 1,
|
flags.DEFINE_integer('compute_fill_first_cost_fn_weight', 1,
|
||||||
'How much weight to give the fill-first cost function')
|
'How much weight to give the fill-first cost function')
|
||||||
|
|
||||||
|
|
||||||
def fill_first_cost_fn(host):
|
def compute_fill_first_cost_fn(host):
|
||||||
"""Prefer hosts that have less ram available, filter_hosts will exclude
|
"""Prefer hosts that have less ram available, filter_hosts will exclude
|
||||||
hosts that don't have enough ram"""
|
hosts that don't have enough ram"""
|
||||||
hostname, caps = host
|
hostname, caps = host
|
||||||
free_mem = caps['compute']['host_memory_free']
|
free_mem = caps['host_memory_free']
|
||||||
return free_mem
|
return free_mem
|
||||||
|
|
||||||
|
|
||||||
class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
||||||
def get_cost_fns(self):
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.cost_fns_cache = {}
|
||||||
|
super(LeastCoastScheduler, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def get_cost_fns(self, topic):
|
||||||
"""Returns a list of tuples containing weights and cost functions to
|
"""Returns a list of tuples containing weights and cost functions to
|
||||||
use for weighing hosts
|
use for weighing hosts
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if topic in self.cost_fns_cache:
|
||||||
|
return self.cost_fns_cache[topic]
|
||||||
|
|
||||||
cost_fns = []
|
cost_fns = []
|
||||||
for cost_fn_str in FLAGS.least_cost_scheduler_cost_functions:
|
for cost_fn_str in FLAGS.least_cost_scheduler_cost_functions:
|
||||||
|
if not cost_fn_str.startswith('%s_' % topic) and \
|
||||||
|
not cost_fn_str.startswith('noop'):
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# NOTE(sirp): import_class is somewhat misnamed since it can
|
# NOTE(sirp): import_class is somewhat misnamed since it can
|
||||||
@@ -84,23 +95,23 @@ class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
|||||||
|
|
||||||
cost_fns.append((weight, cost_fn))
|
cost_fns.append((weight, cost_fn))
|
||||||
|
|
||||||
|
self.cost_fns_cache[topic] = cost_fns
|
||||||
return cost_fns
|
return cost_fns
|
||||||
|
|
||||||
def weigh_hosts(self, num, request_spec, hosts):
|
def weigh_hosts(self, topic, request_spec, hosts):
|
||||||
"""Returns a list of dictionaries of form:
|
"""Returns a list of dictionaries of form:
|
||||||
[ {weight: weight, hostname: hostname} ]"""
|
[ {weight: weight, hostname: hostname, capabilities: capabs} ]
|
||||||
|
"""
|
||||||
|
|
||||||
# FIXME(sirp): weigh_hosts should handle more than just instances
|
cost_fns = self.get_cost_fns(topic)
|
||||||
hostnames = [hostname for hostname, caps in hosts]
|
|
||||||
|
|
||||||
cost_fns = self.get_cost_fns()
|
|
||||||
costs = weighted_sum(domain=hosts, weighted_fns=cost_fns)
|
costs = weighted_sum(domain=hosts, weighted_fns=cost_fns)
|
||||||
|
|
||||||
weighted = []
|
weighted = []
|
||||||
weight_log = []
|
weight_log = []
|
||||||
for cost, hostname in zip(costs, hostnames):
|
for cost, (hostname, caps) in zip(costs, hosts):
|
||||||
weight_log.append("%s: %s" % (hostname, "%.2f" % cost))
|
weight_log.append("%s: %s" % (hostname, "%.2f" % cost))
|
||||||
weight_dict = dict(weight=cost, hostname=hostname)
|
weight_dict = dict(weight=cost, hostname=hostname,
|
||||||
|
capabilities=caps)
|
||||||
weighted.append(weight_dict)
|
weighted.append(weight_dict)
|
||||||
|
|
||||||
LOG.debug(_("Weighted Costs => %s") % weight_log)
|
LOG.debug(_("Weighted Costs => %s") % weight_log)
|
||||||
@@ -127,7 +138,8 @@ def weighted_sum(domain, weighted_fns, normalize=True):
|
|||||||
weighted_fns - list of weights and functions like:
|
weighted_fns - list of weights and functions like:
|
||||||
[(weight, objective-functions)]
|
[(weight, objective-functions)]
|
||||||
|
|
||||||
Returns an unsorted of scores. To pair with hosts do: zip(scores, hosts)
|
Returns an unsorted list of scores. To pair with hosts do:
|
||||||
|
zip(scores, hosts)
|
||||||
"""
|
"""
|
||||||
# Table of form:
|
# Table of form:
|
||||||
# { domain1: [score1, score2, ..., scoreM]
|
# { domain1: [score1, score2, ..., scoreM]
|
||||||
@@ -150,7 +162,6 @@ def weighted_sum(domain, weighted_fns, normalize=True):
|
|||||||
domain_scores = []
|
domain_scores = []
|
||||||
for idx in sorted(score_table):
|
for idx in sorted(score_table):
|
||||||
elem_score = sum(score_table[idx])
|
elem_score = sum(score_table[idx])
|
||||||
elem = domain[idx]
|
|
||||||
domain_scores.append(elem_score)
|
domain_scores.append(elem_score)
|
||||||
|
|
||||||
return domain_scores
|
return domain_scores
|
||||||
|
|||||||
@@ -224,18 +224,34 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
raise NotImplemented(_("Zone Aware Scheduler only understands "
|
raise NotImplemented(_("Zone Aware Scheduler only understands "
|
||||||
"Compute nodes (for now)"))
|
"Compute nodes (for now)"))
|
||||||
|
|
||||||
#TODO(sandy): how to infer this from OS API params?
|
num_instances = request_spec['num_instances']
|
||||||
num_instances = 1
|
instance_type = request_spec['instance_type']
|
||||||
|
|
||||||
# Filter local hosts based on requirements ...
|
weighted = []
|
||||||
host_list = self.filter_hosts(num_instances, request_spec)
|
host_list = None
|
||||||
|
|
||||||
# TODO(sirp): weigh_hosts should also be a function of 'topic' or
|
for i in xrange(num_instances):
|
||||||
# resources, so that we can apply different objective functions to it
|
# Filter local hosts based on requirements ...
|
||||||
|
#
|
||||||
|
# The first pass through here will pass 'None' as the
|
||||||
|
# host_list.. which tells the filter to build the full
|
||||||
|
# list of hosts.
|
||||||
|
# On a 2nd pass, the filter can modify the host_list with
|
||||||
|
# any updates it needs to make based on resources that
|
||||||
|
# may have been consumed from a previous build..
|
||||||
|
host_list = self.filter_hosts(topic, request_spec, host_list)
|
||||||
|
if not host_list:
|
||||||
|
break
|
||||||
|
|
||||||
# then weigh the selected hosts.
|
# then weigh the selected hosts.
|
||||||
# weighted = [{weight=weight, name=hostname}, ...]
|
# weighted = [{weight=weight, hostname=hostname,
|
||||||
weighted = self.weigh_hosts(num_instances, request_spec, host_list)
|
# capabilities=capabs}, ...]
|
||||||
|
weights = self.weigh_hosts(topic, request_spec, host_list)
|
||||||
|
weights.sort(key=operator.itemgetter('weight'))
|
||||||
|
best_weight = weights[0]
|
||||||
|
weighted.append(best_weight)
|
||||||
|
self.consume_resources(best_weight['capabilities'],
|
||||||
|
instance_type)
|
||||||
|
|
||||||
# Next, tack on the best weights from the child zones ...
|
# Next, tack on the best weights from the child zones ...
|
||||||
json_spec = json.dumps(request_spec)
|
json_spec = json.dumps(request_spec)
|
||||||
@@ -254,18 +270,58 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
weighted.sort(key=operator.itemgetter('weight'))
|
weighted.sort(key=operator.itemgetter('weight'))
|
||||||
return weighted
|
return weighted
|
||||||
|
|
||||||
def filter_hosts(self, num, request_spec):
|
def compute_filter(self, hostname, capabilities, request_spec):
|
||||||
"""Derived classes must override this method and return
|
"""Return whether or not we can schedule to this compute node.
|
||||||
a list of hosts in [(hostname, capability_dict)] format.
|
Derived classes should override this and return True if the host
|
||||||
|
is acceptable for scheduling.
|
||||||
"""
|
"""
|
||||||
# NOTE(sirp): The default logic is the equivalent to AllHostsFilter
|
instance_type = request_spec['instance_type']
|
||||||
service_states = self.zone_manager.service_states
|
reqested_mem = instance_type['memory_mb']
|
||||||
return [(host, services)
|
return capabilities['host_memory_free'] >= requested_mem
|
||||||
for host, services in service_states.iteritems()]
|
|
||||||
|
|
||||||
def weigh_hosts(self, num, request_spec, hosts):
|
def filter_hosts(self, topic, request_spec, host_list=None):
|
||||||
|
"""Return a list of hosts which are acceptable for scheduling.
|
||||||
|
Return value should be a list of (hostname, capability_dict)s.
|
||||||
|
Derived classes may override this, but may find the
|
||||||
|
'<topic>_filter' function more appropriate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _default_filter(self, hostname, capabilities, request_spec):
|
||||||
|
"""Default filter function if there's no <topic>_filter"""
|
||||||
|
# NOTE(sirp): The default logic is the equivalent to
|
||||||
|
# AllHostsFilter
|
||||||
|
return True
|
||||||
|
|
||||||
|
filter_func = getattr(self, '%s_filter' % topic, _default_filter)
|
||||||
|
|
||||||
|
filtered_hosts = []
|
||||||
|
if host_list is None:
|
||||||
|
host_list = self.zone_manager.service_states.iteritems()
|
||||||
|
for host, services in host_list:
|
||||||
|
if topic not in services:
|
||||||
|
continue
|
||||||
|
if filter_func(host, services['topic'], request_spec):
|
||||||
|
filtered_hosts.append((host, services['topic']))
|
||||||
|
|
||||||
|
def weigh_hosts(self, topic, request_spec, hosts):
|
||||||
"""Derived classes may override this to provide more sophisticated
|
"""Derived classes may override this to provide more sophisticated
|
||||||
scheduling objectives
|
scheduling objectives
|
||||||
"""
|
"""
|
||||||
# NOTE(sirp): The default logic is the same as the NoopCostFunction
|
# NOTE(sirp): The default logic is the same as the NoopCostFunction
|
||||||
return [dict(weight=1, hostname=host) for host, caps in hosts]
|
return [dict(weight=1, hostname=hostname, capabilities=capabilities)
|
||||||
|
for hostname, capabilities in hosts]
|
||||||
|
|
||||||
|
def compute_consume(self, capabilities, instance_type):
|
||||||
|
"""Consume compute resources for selected host"""
|
||||||
|
|
||||||
|
requested_mem = max(instance_type['memory_mb'], 0)
|
||||||
|
capabilities['host_memory_free'] -= requested_mem
|
||||||
|
|
||||||
|
def consume_resources(self, topic, capabilities, instance_type):
|
||||||
|
"""Consume resources for a specific host. 'host' is a tuple
|
||||||
|
of the hostname and the services"""
|
||||||
|
|
||||||
|
consume_func = getattr(self, '%s_consume' % topic, None)
|
||||||
|
if not consume_func:
|
||||||
|
return
|
||||||
|
consume_func(capabilities, instance_type)
|
||||||
|
|||||||
Reference in New Issue
Block a user