trunk merge with migration renumbering
This commit is contained in:
@@ -114,7 +114,8 @@ def _process(func, zone):
|
|||||||
|
|
||||||
|
|
||||||
def call_zone_method(context, method_name, errors_to_ignore=None,
|
def call_zone_method(context, method_name, errors_to_ignore=None,
|
||||||
novaclient_collection_name='zones', *args, **kwargs):
|
novaclient_collection_name='zones', zones=None,
|
||||||
|
*args, **kwargs):
|
||||||
"""Returns a list of (zone, call_result) objects."""
|
"""Returns a list of (zone, call_result) objects."""
|
||||||
if not isinstance(errors_to_ignore, (list, tuple)):
|
if not isinstance(errors_to_ignore, (list, tuple)):
|
||||||
# This will also handle the default None
|
# This will also handle the default None
|
||||||
@@ -122,7 +123,9 @@ def call_zone_method(context, method_name, errors_to_ignore=None,
|
|||||||
|
|
||||||
pool = greenpool.GreenPool()
|
pool = greenpool.GreenPool()
|
||||||
results = []
|
results = []
|
||||||
for zone in db.zone_get_all(context):
|
if zones is None:
|
||||||
|
zones = db.zone_get_all(context)
|
||||||
|
for zone in zones:
|
||||||
try:
|
try:
|
||||||
nova = novaclient.OpenStack(zone.username, zone.password, None,
|
nova = novaclient.OpenStack(zone.username, zone.password, None,
|
||||||
zone.api_url)
|
zone.api_url)
|
||||||
@@ -162,32 +165,53 @@ def child_zone_helper(zone_list, func):
|
|||||||
_wrap_method(_process, func), zone_list)]
|
_wrap_method(_process, func), zone_list)]
|
||||||
|
|
||||||
|
|
||||||
def _issue_novaclient_command(nova, zone, collection, method_name, item_id):
|
def _issue_novaclient_command(nova, zone, collection,
|
||||||
|
method_name, *args, **kwargs):
|
||||||
"""Use novaclient to issue command to a single child zone.
|
"""Use novaclient to issue command to a single child zone.
|
||||||
One of these will be run in parallel for each child zone."""
|
One of these will be run in parallel for each child zone.
|
||||||
|
"""
|
||||||
manager = getattr(nova, collection)
|
manager = getattr(nova, collection)
|
||||||
result = None
|
|
||||||
try:
|
# NOTE(comstud): This is not ideal, but we have to do this based on
|
||||||
|
# how novaclient is implemented right now.
|
||||||
|
# 'find' is special cased as novaclient requires kwargs for it to
|
||||||
|
# filter on a 'get_all'.
|
||||||
|
# Every other method first needs to do a 'get' on the first argument
|
||||||
|
# passed, which should be a UUID. If it's 'get' itself that we want,
|
||||||
|
# we just return the result. Otherwise, we next call the real method
|
||||||
|
# that's wanted... passing other arguments that may or may not exist.
|
||||||
|
if method_name in ['find', 'findall']:
|
||||||
try:
|
try:
|
||||||
result = manager.get(int(item_id))
|
return getattr(manager, method_name)(**kwargs)
|
||||||
except ValueError, e:
|
except novaclient.NotFound:
|
||||||
result = manager.find(name=item_id)
|
url = zone.api_url
|
||||||
|
LOG.debug(_("%(collection)s.%(method_name)s didn't find "
|
||||||
|
"anything matching '%(kwargs)s' on '%(url)s'" %
|
||||||
|
locals()))
|
||||||
|
return None
|
||||||
|
|
||||||
|
args = list(args)
|
||||||
|
# pop off the UUID to look up
|
||||||
|
item = args.pop(0)
|
||||||
|
try:
|
||||||
|
result = manager.get(item)
|
||||||
except novaclient.NotFound:
|
except novaclient.NotFound:
|
||||||
url = zone.api_url
|
url = zone.api_url
|
||||||
LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" %
|
LOG.debug(_("%(collection)s '%(item)s' not found on '%(url)s'" %
|
||||||
locals()))
|
locals()))
|
||||||
return None
|
return None
|
||||||
|
|
||||||
if method_name.lower() not in ['get', 'find']:
|
if method_name.lower() != 'get':
|
||||||
result = getattr(result, method_name)()
|
# if we're doing something other than 'get', call it passing args.
|
||||||
|
result = getattr(result, method_name)(*args, **kwargs)
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
def wrap_novaclient_function(f, collection, method_name, item_id):
|
def wrap_novaclient_function(f, collection, method_name, *args, **kwargs):
|
||||||
"""Appends collection, method_name and item_id to the incoming
|
"""Appends collection, method_name and arguments to the incoming
|
||||||
(nova, zone) call from child_zone_helper."""
|
(nova, zone) call from child_zone_helper."""
|
||||||
def inner(nova, zone):
|
def inner(nova, zone):
|
||||||
return f(nova, zone, collection, method_name, item_id)
|
return f(nova, zone, collection, method_name, *args, **kwargs)
|
||||||
|
|
||||||
return inner
|
return inner
|
||||||
|
|
||||||
@@ -220,7 +244,7 @@ class reroute_compute(object):
|
|||||||
the wrapped method. (This ensures that zone-local code can
|
the wrapped method. (This ensures that zone-local code can
|
||||||
continue to use integer IDs).
|
continue to use integer IDs).
|
||||||
|
|
||||||
4. If the item was not found, we delgate the call to a child zone
|
4. If the item was not found, we delegate the call to a child zone
|
||||||
using the UUID.
|
using the UUID.
|
||||||
"""
|
"""
|
||||||
def __init__(self, method_name):
|
def __init__(self, method_name):
|
||||||
|
@@ -328,8 +328,9 @@ class HostFilterScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
|||||||
'instance_type': <InstanceType dict>}
|
'instance_type': <InstanceType dict>}
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def filter_hosts(self, num, request_spec):
|
def filter_hosts(self, topic, request_spec, hosts=None):
|
||||||
"""Filter the full host list (from the ZoneManager)"""
|
"""Filter the full host list (from the ZoneManager)"""
|
||||||
|
|
||||||
filter_name = request_spec.get('filter', None)
|
filter_name = request_spec.get('filter', None)
|
||||||
host_filter = choose_host_filter(filter_name)
|
host_filter = choose_host_filter(filter_name)
|
||||||
|
|
||||||
@@ -340,8 +341,9 @@ class HostFilterScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
|||||||
name, query = host_filter.instance_type_to_filter(instance_type)
|
name, query = host_filter.instance_type_to_filter(instance_type)
|
||||||
return host_filter.filter_hosts(self.zone_manager, query)
|
return host_filter.filter_hosts(self.zone_manager, query)
|
||||||
|
|
||||||
def weigh_hosts(self, num, request_spec, hosts):
|
def weigh_hosts(self, topic, request_spec, hosts):
|
||||||
"""Derived classes must override this method and return
|
"""Derived classes must override this method and return
|
||||||
a lists of hosts in [{weight, hostname}] format.
|
a lists of hosts in [{weight, hostname}] format.
|
||||||
"""
|
"""
|
||||||
return [dict(weight=1, hostname=host) for host, caps in hosts]
|
return [dict(weight=1, hostname=hostname, capabilities=caps)
|
||||||
|
for hostname, caps in hosts]
|
||||||
|
@@ -48,25 +48,43 @@ def noop_cost_fn(host):
|
|||||||
return 1
|
return 1
|
||||||
|
|
||||||
|
|
||||||
flags.DEFINE_integer('fill_first_cost_fn_weight', 1,
|
flags.DEFINE_integer('compute_fill_first_cost_fn_weight', 1,
|
||||||
'How much weight to give the fill-first cost function')
|
'How much weight to give the fill-first cost function')
|
||||||
|
|
||||||
|
|
||||||
def fill_first_cost_fn(host):
|
def compute_fill_first_cost_fn(host):
|
||||||
"""Prefer hosts that have less ram available, filter_hosts will exclude
|
"""Prefer hosts that have less ram available, filter_hosts will exclude
|
||||||
hosts that don't have enough ram"""
|
hosts that don't have enough ram"""
|
||||||
hostname, caps = host
|
hostname, caps = host
|
||||||
free_mem = caps['compute']['host_memory_free']
|
free_mem = caps['host_memory_free']
|
||||||
return free_mem
|
return free_mem
|
||||||
|
|
||||||
|
|
||||||
class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
||||||
def get_cost_fns(self):
|
def __init__(self, *args, **kwargs):
|
||||||
|
self.cost_fns_cache = {}
|
||||||
|
super(LeastCostScheduler, self).__init__(*args, **kwargs)
|
||||||
|
|
||||||
|
def get_cost_fns(self, topic):
|
||||||
"""Returns a list of tuples containing weights and cost functions to
|
"""Returns a list of tuples containing weights and cost functions to
|
||||||
use for weighing hosts
|
use for weighing hosts
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
if topic in self.cost_fns_cache:
|
||||||
|
return self.cost_fns_cache[topic]
|
||||||
|
|
||||||
cost_fns = []
|
cost_fns = []
|
||||||
for cost_fn_str in FLAGS.least_cost_scheduler_cost_functions:
|
for cost_fn_str in FLAGS.least_cost_scheduler_cost_functions:
|
||||||
|
if '.' in cost_fn_str:
|
||||||
|
short_name = cost_fn_str.split('.')[-1]
|
||||||
|
else:
|
||||||
|
short_name = cost_fn_str
|
||||||
|
cost_fn_str = "%s.%s.%s" % (
|
||||||
|
__name__, self.__class__.__name__, short_name)
|
||||||
|
|
||||||
|
if not (short_name.startswith('%s_' % topic) or
|
||||||
|
short_name.startswith('noop')):
|
||||||
|
continue
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# NOTE(sirp): import_class is somewhat misnamed since it can
|
# NOTE(sirp): import_class is somewhat misnamed since it can
|
||||||
@@ -84,23 +102,23 @@ class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
|||||||
|
|
||||||
cost_fns.append((weight, cost_fn))
|
cost_fns.append((weight, cost_fn))
|
||||||
|
|
||||||
|
self.cost_fns_cache[topic] = cost_fns
|
||||||
return cost_fns
|
return cost_fns
|
||||||
|
|
||||||
def weigh_hosts(self, num, request_spec, hosts):
|
def weigh_hosts(self, topic, request_spec, hosts):
|
||||||
"""Returns a list of dictionaries of form:
|
"""Returns a list of dictionaries of form:
|
||||||
[ {weight: weight, hostname: hostname} ]"""
|
[ {weight: weight, hostname: hostname, capabilities: capabs} ]
|
||||||
|
"""
|
||||||
|
|
||||||
# FIXME(sirp): weigh_hosts should handle more than just instances
|
cost_fns = self.get_cost_fns(topic)
|
||||||
hostnames = [hostname for hostname, caps in hosts]
|
|
||||||
|
|
||||||
cost_fns = self.get_cost_fns()
|
|
||||||
costs = weighted_sum(domain=hosts, weighted_fns=cost_fns)
|
costs = weighted_sum(domain=hosts, weighted_fns=cost_fns)
|
||||||
|
|
||||||
weighted = []
|
weighted = []
|
||||||
weight_log = []
|
weight_log = []
|
||||||
for cost, hostname in zip(costs, hostnames):
|
for cost, (hostname, caps) in zip(costs, hosts):
|
||||||
weight_log.append("%s: %s" % (hostname, "%.2f" % cost))
|
weight_log.append("%s: %s" % (hostname, "%.2f" % cost))
|
||||||
weight_dict = dict(weight=cost, hostname=hostname)
|
weight_dict = dict(weight=cost, hostname=hostname,
|
||||||
|
capabilities=caps)
|
||||||
weighted.append(weight_dict)
|
weighted.append(weight_dict)
|
||||||
|
|
||||||
LOG.debug(_("Weighted Costs => %s") % weight_log)
|
LOG.debug(_("Weighted Costs => %s") % weight_log)
|
||||||
@@ -127,7 +145,8 @@ def weighted_sum(domain, weighted_fns, normalize=True):
|
|||||||
weighted_fns - list of weights and functions like:
|
weighted_fns - list of weights and functions like:
|
||||||
[(weight, objective-functions)]
|
[(weight, objective-functions)]
|
||||||
|
|
||||||
Returns an unsorted of scores. To pair with hosts do: zip(scores, hosts)
|
Returns an unsorted list of scores. To pair with hosts do:
|
||||||
|
zip(scores, hosts)
|
||||||
"""
|
"""
|
||||||
# Table of form:
|
# Table of form:
|
||||||
# { domain1: [score1, score2, ..., scoreM]
|
# { domain1: [score1, score2, ..., scoreM]
|
||||||
@@ -150,7 +169,6 @@ def weighted_sum(domain, weighted_fns, normalize=True):
|
|||||||
domain_scores = []
|
domain_scores = []
|
||||||
for idx in sorted(score_table):
|
for idx in sorted(score_table):
|
||||||
elem_score = sum(score_table[idx])
|
elem_score = sum(score_table[idx])
|
||||||
elem = domain[idx]
|
|
||||||
domain_scores.append(elem_score)
|
domain_scores.append(elem_score)
|
||||||
|
|
||||||
return domain_scores
|
return domain_scores
|
||||||
|
@@ -33,6 +33,7 @@ from nova import flags
|
|||||||
from nova import log as logging
|
from nova import log as logging
|
||||||
from nova import rpc
|
from nova import rpc
|
||||||
|
|
||||||
|
from nova.compute import api as compute_api
|
||||||
from nova.scheduler import api
|
from nova.scheduler import api
|
||||||
from nova.scheduler import driver
|
from nova.scheduler import driver
|
||||||
|
|
||||||
@@ -48,14 +49,25 @@ class InvalidBlob(exception.NovaException):
|
|||||||
class ZoneAwareScheduler(driver.Scheduler):
|
class ZoneAwareScheduler(driver.Scheduler):
|
||||||
"""Base class for creating Zone Aware Schedulers."""
|
"""Base class for creating Zone Aware Schedulers."""
|
||||||
|
|
||||||
def _call_zone_method(self, context, method, specs):
|
def _call_zone_method(self, context, method, specs, zones):
|
||||||
"""Call novaclient zone method. Broken out for testing."""
|
"""Call novaclient zone method. Broken out for testing."""
|
||||||
return api.call_zone_method(context, method, specs=specs)
|
return api.call_zone_method(context, method, specs=specs, zones=zones)
|
||||||
|
|
||||||
def _provision_resource_locally(self, context, item, instance_id, kwargs):
|
def _provision_resource_locally(self, context, build_plan_item,
|
||||||
|
request_spec, kwargs):
|
||||||
"""Create the requested resource in this Zone."""
|
"""Create the requested resource in this Zone."""
|
||||||
host = item['hostname']
|
host = build_plan_item['hostname']
|
||||||
|
base_options = request_spec['instance_properties']
|
||||||
|
|
||||||
|
# TODO(sandy): I guess someone needs to add block_device_mapping
|
||||||
|
# support at some point? Also, OS API has no concept of security
|
||||||
|
# groups.
|
||||||
|
instance = compute_api.API().create_db_entry_for_new_instance(context,
|
||||||
|
base_options, None, [])
|
||||||
|
|
||||||
|
instance_id = instance['id']
|
||||||
kwargs['instance_id'] = instance_id
|
kwargs['instance_id'] = instance_id
|
||||||
|
|
||||||
rpc.cast(context,
|
rpc.cast(context,
|
||||||
db.queue_get_for(context, "compute", host),
|
db.queue_get_for(context, "compute", host),
|
||||||
{"method": "run_instance",
|
{"method": "run_instance",
|
||||||
@@ -115,8 +127,8 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
nova.servers.create(name, image_ref, flavor_id, ipgroup, meta, files,
|
nova.servers.create(name, image_ref, flavor_id, ipgroup, meta, files,
|
||||||
child_blob, reservation_id=reservation_id)
|
child_blob, reservation_id=reservation_id)
|
||||||
|
|
||||||
def _provision_resource_from_blob(self, context, item, instance_id,
|
def _provision_resource_from_blob(self, context, build_plan_item,
|
||||||
request_spec, kwargs):
|
instance_id, request_spec, kwargs):
|
||||||
"""Create the requested resource locally or in a child zone
|
"""Create the requested resource locally or in a child zone
|
||||||
based on what is stored in the zone blob info.
|
based on what is stored in the zone blob info.
|
||||||
|
|
||||||
@@ -132,12 +144,12 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
request."""
|
request."""
|
||||||
|
|
||||||
host_info = None
|
host_info = None
|
||||||
if "blob" in item:
|
if "blob" in build_plan_item:
|
||||||
# Request was passed in from above. Is it for us?
|
# Request was passed in from above. Is it for us?
|
||||||
host_info = self._decrypt_blob(item['blob'])
|
host_info = self._decrypt_blob(build_plan_item['blob'])
|
||||||
elif "child_blob" in item:
|
elif "child_blob" in build_plan_item:
|
||||||
# Our immediate child zone provided this info ...
|
# Our immediate child zone provided this info ...
|
||||||
host_info = item
|
host_info = build_plan_item
|
||||||
|
|
||||||
if not host_info:
|
if not host_info:
|
||||||
raise InvalidBlob()
|
raise InvalidBlob()
|
||||||
@@ -147,19 +159,44 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
self._ask_child_zone_to_create_instance(context, host_info,
|
self._ask_child_zone_to_create_instance(context, host_info,
|
||||||
request_spec, kwargs)
|
request_spec, kwargs)
|
||||||
else:
|
else:
|
||||||
self._provision_resource_locally(context, host_info,
|
self._provision_resource_locally(context, host_info, request_spec,
|
||||||
instance_id, kwargs)
|
kwargs)
|
||||||
|
|
||||||
def _provision_resource(self, context, item, instance_id, request_spec,
|
def _provision_resource(self, context, build_plan_item, instance_id,
|
||||||
kwargs):
|
request_spec, kwargs):
|
||||||
"""Create the requested resource in this Zone or a child zone."""
|
"""Create the requested resource in this Zone or a child zone."""
|
||||||
if "hostname" in item:
|
if "hostname" in build_plan_item:
|
||||||
self._provision_resource_locally(context, item, instance_id,
|
self._provision_resource_locally(context, build_plan_item,
|
||||||
kwargs)
|
request_spec, kwargs)
|
||||||
return
|
return
|
||||||
|
|
||||||
self._provision_resource_from_blob(context, item, instance_id,
|
self._provision_resource_from_blob(context, build_plan_item,
|
||||||
request_spec, kwargs)
|
instance_id, request_spec, kwargs)
|
||||||
|
|
||||||
|
def _adjust_child_weights(self, child_results, zones):
|
||||||
|
"""Apply the Scale and Offset values from the Zone definition
|
||||||
|
to adjust the weights returned from the child zones. Alters
|
||||||
|
child_results in place.
|
||||||
|
"""
|
||||||
|
for zone, result in child_results:
|
||||||
|
if not result:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for zone_rec in zones:
|
||||||
|
if zone_rec['api_url'] != zone:
|
||||||
|
continue
|
||||||
|
|
||||||
|
for item in result:
|
||||||
|
try:
|
||||||
|
offset = zone_rec['weight_offset']
|
||||||
|
scale = zone_rec['weight_scale']
|
||||||
|
raw_weight = item['weight']
|
||||||
|
cooked_weight = offset + scale * raw_weight
|
||||||
|
item['weight'] = cooked_weight
|
||||||
|
item['raw_weight'] = raw_weight
|
||||||
|
except KeyError:
|
||||||
|
LOG.exception(_("Bad child zone scaling values "
|
||||||
|
"for Zone: %(zone)s") % locals())
|
||||||
|
|
||||||
def schedule_run_instance(self, context, instance_id, request_spec,
|
def schedule_run_instance(self, context, instance_id, request_spec,
|
||||||
*args, **kwargs):
|
*args, **kwargs):
|
||||||
@@ -180,18 +217,22 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
request_spec, kwargs)
|
request_spec, kwargs)
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
num_instances = request_spec.get('num_instances', 1)
|
||||||
|
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
|
||||||
|
locals())
|
||||||
|
|
||||||
# Create build plan and provision ...
|
# Create build plan and provision ...
|
||||||
build_plan = self.select(context, request_spec)
|
build_plan = self.select(context, request_spec)
|
||||||
if not build_plan:
|
if not build_plan:
|
||||||
raise driver.NoValidHost(_('No hosts were available'))
|
raise driver.NoValidHost(_('No hosts were available'))
|
||||||
|
|
||||||
for num in xrange(request_spec['num_instances']):
|
for num in xrange(num_instances):
|
||||||
if not build_plan:
|
if not build_plan:
|
||||||
break
|
break
|
||||||
|
|
||||||
item = build_plan.pop(0)
|
build_plan_item = build_plan.pop(0)
|
||||||
self._provision_resource(context, item, instance_id, request_spec,
|
self._provision_resource(context, build_plan_item, instance_id,
|
||||||
kwargs)
|
request_spec, kwargs)
|
||||||
|
|
||||||
# Returning None short-circuits the routing to Compute (since
|
# Returning None short-circuits the routing to Compute (since
|
||||||
# we've already done it here)
|
# we've already done it here)
|
||||||
@@ -224,23 +265,43 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
raise NotImplemented(_("Zone Aware Scheduler only understands "
|
raise NotImplemented(_("Zone Aware Scheduler only understands "
|
||||||
"Compute nodes (for now)"))
|
"Compute nodes (for now)"))
|
||||||
|
|
||||||
#TODO(sandy): how to infer this from OS API params?
|
num_instances = request_spec.get('num_instances', 1)
|
||||||
num_instances = 1
|
instance_type = request_spec['instance_type']
|
||||||
|
|
||||||
# Filter local hosts based on requirements ...
|
weighted = []
|
||||||
host_list = self.filter_hosts(num_instances, request_spec)
|
host_list = None
|
||||||
|
|
||||||
# TODO(sirp): weigh_hosts should also be a function of 'topic' or
|
for i in xrange(num_instances):
|
||||||
# resources, so that we can apply different objective functions to it
|
# Filter local hosts based on requirements ...
|
||||||
|
#
|
||||||
|
# The first pass through here will pass 'None' as the
|
||||||
|
# host_list.. which tells the filter to build the full
|
||||||
|
# list of hosts.
|
||||||
|
# On a 2nd pass, the filter can modify the host_list with
|
||||||
|
# any updates it needs to make based on resources that
|
||||||
|
# may have been consumed from a previous build..
|
||||||
|
host_list = self.filter_hosts(topic, request_spec, host_list)
|
||||||
|
if not host_list:
|
||||||
|
LOG.warn(_("Filter returned no hosts after processing "
|
||||||
|
"%(i)d of %(num_instances)d instances") % locals())
|
||||||
|
break
|
||||||
|
|
||||||
# then weigh the selected hosts.
|
# then weigh the selected hosts.
|
||||||
# weighted = [{weight=weight, name=hostname}, ...]
|
# weighted = [{weight=weight, hostname=hostname,
|
||||||
weighted = self.weigh_hosts(num_instances, request_spec, host_list)
|
# capabilities=capabs}, ...]
|
||||||
|
weights = self.weigh_hosts(topic, request_spec, host_list)
|
||||||
|
weights.sort(key=operator.itemgetter('weight'))
|
||||||
|
best_weight = weights[0]
|
||||||
|
weighted.append(best_weight)
|
||||||
|
self.consume_resources(topic, best_weight['capabilities'],
|
||||||
|
instance_type)
|
||||||
|
|
||||||
# Next, tack on the best weights from the child zones ...
|
# Next, tack on the best weights from the child zones ...
|
||||||
json_spec = json.dumps(request_spec)
|
json_spec = json.dumps(request_spec)
|
||||||
|
all_zones = db.zone_get_all(context)
|
||||||
child_results = self._call_zone_method(context, "select",
|
child_results = self._call_zone_method(context, "select",
|
||||||
specs=json_spec)
|
specs=json_spec, zones=all_zones)
|
||||||
|
self._adjust_child_weights(child_results, all_zones)
|
||||||
for child_zone, result in child_results:
|
for child_zone, result in child_results:
|
||||||
for weighting in result:
|
for weighting in result:
|
||||||
# Remember the child_zone so we can get back to
|
# Remember the child_zone so we can get back to
|
||||||
@@ -254,18 +315,65 @@ class ZoneAwareScheduler(driver.Scheduler):
|
|||||||
weighted.sort(key=operator.itemgetter('weight'))
|
weighted.sort(key=operator.itemgetter('weight'))
|
||||||
return weighted
|
return weighted
|
||||||
|
|
||||||
def filter_hosts(self, num, request_spec):
|
def compute_filter(self, hostname, capabilities, request_spec):
|
||||||
"""Derived classes must override this method and return
|
"""Return whether or not we can schedule to this compute node.
|
||||||
a list of hosts in [(hostname, capability_dict)] format.
|
Derived classes should override this and return True if the host
|
||||||
|
is acceptable for scheduling.
|
||||||
"""
|
"""
|
||||||
# NOTE(sirp): The default logic is the equivalent to AllHostsFilter
|
instance_type = request_spec['instance_type']
|
||||||
service_states = self.zone_manager.service_states
|
requested_mem = instance_type['memory_mb'] * 1024 * 1024
|
||||||
return [(host, services)
|
return capabilities['host_memory_free'] >= requested_mem
|
||||||
for host, services in service_states.iteritems()]
|
|
||||||
|
|
||||||
def weigh_hosts(self, num, request_spec, hosts):
|
def filter_hosts(self, topic, request_spec, host_list=None):
|
||||||
|
"""Return a list of hosts which are acceptable for scheduling.
|
||||||
|
Return value should be a list of (hostname, capability_dict)s.
|
||||||
|
Derived classes may override this, but may find the
|
||||||
|
'<topic>_filter' function more appropriate.
|
||||||
|
"""
|
||||||
|
|
||||||
|
def _default_filter(self, hostname, capabilities, request_spec):
|
||||||
|
"""Default filter function if there's no <topic>_filter"""
|
||||||
|
# NOTE(sirp): The default logic is the equivalent to
|
||||||
|
# AllHostsFilter
|
||||||
|
return True
|
||||||
|
|
||||||
|
filter_func = getattr(self, '%s_filter' % topic, _default_filter)
|
||||||
|
|
||||||
|
if host_list is None:
|
||||||
|
first_run = True
|
||||||
|
host_list = self.zone_manager.service_states.iteritems()
|
||||||
|
else:
|
||||||
|
first_run = False
|
||||||
|
|
||||||
|
filtered_hosts = []
|
||||||
|
for host, services in host_list:
|
||||||
|
if first_run:
|
||||||
|
if topic not in services:
|
||||||
|
continue
|
||||||
|
services = services[topic]
|
||||||
|
if filter_func(host, services, request_spec):
|
||||||
|
filtered_hosts.append((host, services))
|
||||||
|
return filtered_hosts
|
||||||
|
|
||||||
|
def weigh_hosts(self, topic, request_spec, hosts):
|
||||||
"""Derived classes may override this to provide more sophisticated
|
"""Derived classes may override this to provide more sophisticated
|
||||||
scheduling objectives
|
scheduling objectives
|
||||||
"""
|
"""
|
||||||
# NOTE(sirp): The default logic is the same as the NoopCostFunction
|
# NOTE(sirp): The default logic is the same as the NoopCostFunction
|
||||||
return [dict(weight=1, hostname=host) for host, caps in hosts]
|
return [dict(weight=1, hostname=hostname, capabilities=capabilities)
|
||||||
|
for hostname, capabilities in hosts]
|
||||||
|
|
||||||
|
def compute_consume(self, capabilities, instance_type):
|
||||||
|
"""Consume compute resources for selected host"""
|
||||||
|
|
||||||
|
requested_mem = max(instance_type['memory_mb'], 0) * 1024 * 1024
|
||||||
|
capabilities['host_memory_free'] -= requested_mem
|
||||||
|
|
||||||
|
def consume_resources(self, topic, capabilities, instance_type):
|
||||||
|
"""Consume resources for a specific host. 'host' is a tuple
|
||||||
|
of the hostname and the services"""
|
||||||
|
|
||||||
|
consume_func = getattr(self, '%s_consume' % topic, None)
|
||||||
|
if not consume_func:
|
||||||
|
return
|
||||||
|
consume_func(capabilities, instance_type)
|
||||||
|
@@ -0,0 +1,19 @@
|
|||||||
|
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||||
|
|
||||||
|
# Copyright 2011 Openstack LLC.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
# NOTE(vish): this forces the fixtures from tests/__init.py:setup() to work
|
||||||
|
from nova.tests import *
|
||||||
|
@@ -122,15 +122,16 @@ class LeastCostSchedulerTestCase(test.TestCase):
|
|||||||
for hostname, caps in hosts]
|
for hostname, caps in hosts]
|
||||||
self.assertWeights(expected, num, request_spec, hosts)
|
self.assertWeights(expected, num, request_spec, hosts)
|
||||||
|
|
||||||
def test_fill_first_cost_fn(self):
|
def test_compute_fill_first_cost_fn(self):
|
||||||
FLAGS.least_cost_scheduler_cost_functions = [
|
FLAGS.least_cost_scheduler_cost_functions = [
|
||||||
'nova.scheduler.least_cost.fill_first_cost_fn',
|
'nova.scheduler.least_cost.compute_fill_first_cost_fn',
|
||||||
]
|
]
|
||||||
FLAGS.fill_first_cost_fn_weight = 1
|
FLAGS.compute_fill_first_cost_fn_weight = 1
|
||||||
|
|
||||||
num = 1
|
num = 1
|
||||||
request_spec = {}
|
instance_type = {'memory_mb': 1024}
|
||||||
hosts = self.sched.filter_hosts(num, request_spec)
|
request_spec = {'instance_type': instance_type}
|
||||||
|
hosts = self.sched.filter_hosts('compute', request_spec, None)
|
||||||
|
|
||||||
expected = []
|
expected = []
|
||||||
for idx, (hostname, caps) in enumerate(hosts):
|
for idx, (hostname, caps) in enumerate(hosts):
|
||||||
|
@@ -16,6 +16,8 @@
|
|||||||
Tests For Zone Aware Scheduler.
|
Tests For Zone Aware Scheduler.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import nova.db
|
||||||
|
|
||||||
from nova import exception
|
from nova import exception
|
||||||
from nova import test
|
from nova import test
|
||||||
from nova.scheduler import driver
|
from nova.scheduler import driver
|
||||||
@@ -55,29 +57,21 @@ def fake_zone_manager_service_states(num_hosts):
|
|||||||
|
|
||||||
|
|
||||||
class FakeZoneAwareScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
class FakeZoneAwareScheduler(zone_aware_scheduler.ZoneAwareScheduler):
|
||||||
def filter_hosts(self, num, specs):
|
# No need to stub anything at the moment
|
||||||
# NOTE(sirp): this is returning [(hostname, services)]
|
pass
|
||||||
return self.zone_manager.service_states.items()
|
|
||||||
|
|
||||||
def weigh_hosts(self, num, specs, hosts):
|
|
||||||
fake_weight = 99
|
|
||||||
weighted = []
|
|
||||||
for hostname, caps in hosts:
|
|
||||||
weighted.append(dict(weight=fake_weight, name=hostname))
|
|
||||||
return weighted
|
|
||||||
|
|
||||||
|
|
||||||
class FakeZoneManager(zone_manager.ZoneManager):
|
class FakeZoneManager(zone_manager.ZoneManager):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
self.service_states = {
|
self.service_states = {
|
||||||
'host1': {
|
'host1': {
|
||||||
'compute': {'ram': 1000},
|
'compute': {'host_memory_free': 1073741824},
|
||||||
},
|
},
|
||||||
'host2': {
|
'host2': {
|
||||||
'compute': {'ram': 2000},
|
'compute': {'host_memory_free': 2147483648},
|
||||||
},
|
},
|
||||||
'host3': {
|
'host3': {
|
||||||
'compute': {'ram': 3000},
|
'compute': {'host_memory_free': 3221225472},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -87,7 +81,7 @@ class FakeEmptyZoneManager(zone_manager.ZoneManager):
|
|||||||
self.service_states = {}
|
self.service_states = {}
|
||||||
|
|
||||||
|
|
||||||
def fake_empty_call_zone_method(context, method, specs):
|
def fake_empty_call_zone_method(context, method, specs, zones):
|
||||||
return []
|
return []
|
||||||
|
|
||||||
|
|
||||||
@@ -106,7 +100,7 @@ def fake_ask_child_zone_to_create_instance(context, zone_info,
|
|||||||
was_called = True
|
was_called = True
|
||||||
|
|
||||||
|
|
||||||
def fake_provision_resource_locally(context, item, instance_id, kwargs):
|
def fake_provision_resource_locally(context, build_plan, request_spec, kwargs):
|
||||||
global was_called
|
global was_called
|
||||||
was_called = True
|
was_called = True
|
||||||
|
|
||||||
@@ -126,7 +120,7 @@ def fake_decrypt_blob_returns_child_info(blob):
|
|||||||
'child_blob': True} # values aren't important. Keys are.
|
'child_blob': True} # values aren't important. Keys are.
|
||||||
|
|
||||||
|
|
||||||
def fake_call_zone_method(context, method, specs):
|
def fake_call_zone_method(context, method, specs, zones):
|
||||||
return [
|
return [
|
||||||
('zone1', [
|
('zone1', [
|
||||||
dict(weight=1, blob='AAAAAAA'),
|
dict(weight=1, blob='AAAAAAA'),
|
||||||
@@ -149,28 +143,67 @@ def fake_call_zone_method(context, method, specs):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def fake_zone_get_all(context):
|
||||||
|
return [
|
||||||
|
dict(id=1, api_url='zone1',
|
||||||
|
username='admin', password='password',
|
||||||
|
weight_offset=0.0, weight_scale=1.0),
|
||||||
|
dict(id=2, api_url='zone2',
|
||||||
|
username='admin', password='password',
|
||||||
|
weight_offset=1000.0, weight_scale=1.0),
|
||||||
|
dict(id=3, api_url='zone3',
|
||||||
|
username='admin', password='password',
|
||||||
|
weight_offset=0.0, weight_scale=1000.0),
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
class ZoneAwareSchedulerTestCase(test.TestCase):
|
class ZoneAwareSchedulerTestCase(test.TestCase):
|
||||||
"""Test case for Zone Aware Scheduler."""
|
"""Test case for Zone Aware Scheduler."""
|
||||||
|
|
||||||
def test_zone_aware_scheduler(self):
|
def test_zone_aware_scheduler(self):
|
||||||
"""
|
"""
|
||||||
Create a nested set of FakeZones, ensure that a select call returns the
|
Create a nested set of FakeZones, try to build multiple instances
|
||||||
appropriate build plan.
|
and ensure that a select call returns the appropriate build plan.
|
||||||
"""
|
"""
|
||||||
sched = FakeZoneAwareScheduler()
|
sched = FakeZoneAwareScheduler()
|
||||||
self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method)
|
self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method)
|
||||||
|
self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all)
|
||||||
|
|
||||||
zm = FakeZoneManager()
|
zm = FakeZoneManager()
|
||||||
sched.set_zone_manager(zm)
|
sched.set_zone_manager(zm)
|
||||||
|
|
||||||
fake_context = {}
|
fake_context = {}
|
||||||
build_plan = sched.select(fake_context, {})
|
build_plan = sched.select(fake_context,
|
||||||
|
{'instance_type': {'memory_mb': 512},
|
||||||
|
'num_instances': 4})
|
||||||
|
|
||||||
self.assertEqual(15, len(build_plan))
|
# 4 from local zones, 12 from remotes
|
||||||
|
self.assertEqual(16, len(build_plan))
|
||||||
|
|
||||||
hostnames = [plan_item['name']
|
hostnames = [plan_item['hostname']
|
||||||
for plan_item in build_plan if 'name' in plan_item]
|
for plan_item in build_plan if 'hostname' in plan_item]
|
||||||
self.assertEqual(3, len(hostnames))
|
# 4 local hosts
|
||||||
|
self.assertEqual(4, len(hostnames))
|
||||||
|
|
||||||
|
def test_adjust_child_weights(self):
|
||||||
|
"""Make sure the weights returned by child zones are
|
||||||
|
properly adjusted based on the scale/offset in the zone
|
||||||
|
db entries.
|
||||||
|
"""
|
||||||
|
sched = FakeZoneAwareScheduler()
|
||||||
|
child_results = fake_call_zone_method(None, None, None, None)
|
||||||
|
zones = fake_zone_get_all(None)
|
||||||
|
sched._adjust_child_weights(child_results, zones)
|
||||||
|
scaled = [130000, 131000, 132000, 3000]
|
||||||
|
for zone, results in child_results:
|
||||||
|
for item in results:
|
||||||
|
w = item['weight']
|
||||||
|
if zone == 'zone1': # No change
|
||||||
|
self.assertTrue(w < 1000.0)
|
||||||
|
if zone == 'zone2': # Offset +1000
|
||||||
|
self.assertTrue(w >= 1000.0 and w < 2000)
|
||||||
|
if zone == 'zone3': # Scale x1000
|
||||||
|
self.assertEqual(scaled.pop(0), w)
|
||||||
|
|
||||||
def test_empty_zone_aware_scheduler(self):
|
def test_empty_zone_aware_scheduler(self):
|
||||||
"""
|
"""
|
||||||
@@ -178,6 +211,7 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
|
|||||||
"""
|
"""
|
||||||
sched = FakeZoneAwareScheduler()
|
sched = FakeZoneAwareScheduler()
|
||||||
self.stubs.Set(sched, '_call_zone_method', fake_empty_call_zone_method)
|
self.stubs.Set(sched, '_call_zone_method', fake_empty_call_zone_method)
|
||||||
|
self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all)
|
||||||
|
|
||||||
zm = FakeEmptyZoneManager()
|
zm = FakeEmptyZoneManager()
|
||||||
sched.set_zone_manager(zm)
|
sched.set_zone_manager(zm)
|
||||||
@@ -185,8 +219,7 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
|
|||||||
fake_context = {}
|
fake_context = {}
|
||||||
self.assertRaises(driver.NoValidHost, sched.schedule_run_instance,
|
self.assertRaises(driver.NoValidHost, sched.schedule_run_instance,
|
||||||
fake_context, 1,
|
fake_context, 1,
|
||||||
dict(host_filter=None,
|
dict(host_filter=None, instance_type={}))
|
||||||
request_spec={'instance_type': {}}))
|
|
||||||
|
|
||||||
def test_schedule_do_not_schedule_with_hint(self):
|
def test_schedule_do_not_schedule_with_hint(self):
|
||||||
"""
|
"""
|
||||||
|
@@ -130,7 +130,7 @@ class ComputeTestCase(test.TestCase):
|
|||||||
instance_ref = models.Instance()
|
instance_ref = models.Instance()
|
||||||
instance_ref['id'] = 1
|
instance_ref['id'] = 1
|
||||||
instance_ref['volumes'] = [vol1, vol2]
|
instance_ref['volumes'] = [vol1, vol2]
|
||||||
instance_ref['hostname'] = 'i-00000001'
|
instance_ref['hostname'] = 'hostname-1'
|
||||||
instance_ref['host'] = 'dummy'
|
instance_ref['host'] = 'dummy'
|
||||||
return instance_ref
|
return instance_ref
|
||||||
|
|
||||||
@@ -162,6 +162,18 @@ class ComputeTestCase(test.TestCase):
|
|||||||
db.security_group_destroy(self.context, group['id'])
|
db.security_group_destroy(self.context, group['id'])
|
||||||
db.instance_destroy(self.context, ref[0]['id'])
|
db.instance_destroy(self.context, ref[0]['id'])
|
||||||
|
|
||||||
|
def test_default_hostname_generator(self):
|
||||||
|
cases = [(None, 'server_1'), ('Hello, Server!', 'hello_server'),
|
||||||
|
('<}\x1fh\x10e\x08l\x02l\x05o\x12!{>', 'hello')]
|
||||||
|
for display_name, hostname in cases:
|
||||||
|
ref = self.compute_api.create(self.context,
|
||||||
|
instance_types.get_default_instance_type(), None,
|
||||||
|
display_name=display_name)
|
||||||
|
try:
|
||||||
|
self.assertEqual(ref[0]['hostname'], hostname)
|
||||||
|
finally:
|
||||||
|
db.instance_destroy(self.context, ref[0]['id'])
|
||||||
|
|
||||||
def test_destroy_instance_disassociates_security_groups(self):
|
def test_destroy_instance_disassociates_security_groups(self):
|
||||||
"""Make sure destroying disassociates security groups"""
|
"""Make sure destroying disassociates security groups"""
|
||||||
group = self._create_group()
|
group = self._create_group()
|
||||||
|
Reference in New Issue
Block a user