trunk merge

This commit is contained in:
Sandy Walsh
2011-06-30 11:11:03 -07:00
9 changed files with 206 additions and 83 deletions

View File

@@ -617,7 +617,7 @@ class VmCommands(object):
:param host: show all instance on specified host.
:param instance: show specificed instance.
"""
print "%-10s %-15s %-10s %-10s %-19s %-12s %-12s %-12s" \
print "%-10s %-15s %-10s %-10s %-26s %-9s %-9s %-9s" \
" %-10s %-10s %-10s %-5s" % (
_('instance'),
_('node'),
@@ -639,14 +639,14 @@ class VmCommands(object):
context.get_admin_context(), host)
for instance in instances:
print "%-10s %-15s %-10s %-10s %-19s %-12s %-12s %-12s" \
print "%-10s %-15s %-10s %-10s %-26s %-9s %-9s %-9s" \
" %-10s %-10s %-10s %-5d" % (
instance['hostname'],
instance['host'],
instance['instance_type'],
instance['instance_type'].name,
instance['state_description'],
instance['launched_at'],
instance['image_id'],
instance['image_ref'],
instance['kernel_id'],
instance['ramdisk_id'],
instance['project_id'],
@@ -878,7 +878,7 @@ class InstanceTypeCommands(object):
try:
instance_types.create(name, memory, vcpus, local_gb,
flavorid, swap, rxtx_quota, rxtx_cap)
except exception.InvalidInput:
except exception.InvalidInput, e:
print "Must supply valid parameters to create instance_type"
print e
sys.exit(1)

View File

@@ -275,6 +275,11 @@ class FanoutAdapterConsumer(AdapterConsumer):
unique = uuid.uuid4().hex
self.queue = '%s_fanout_%s' % (topic, unique)
self.durable = False
# Fanout creates unique queue names, so we should auto-remove
# them when done, so they're not left around on restart.
# Also, we're the only one that should be consuming. exclusive
# implies auto_delete, so we'll just set that..
self.exclusive = True
LOG.info(_('Created "%(exchange)s" fanout exchange '
'with "%(key)s" routing key'),
dict(exchange=self.exchange, key=self.routing_key))

View File

@@ -165,32 +165,53 @@ def child_zone_helper(zone_list, func):
_wrap_method(_process, func), zone_list)]
def _issue_novaclient_command(nova, zone, collection, method_name, item_id):
def _issue_novaclient_command(nova, zone, collection,
method_name, *args, **kwargs):
"""Use novaclient to issue command to a single child zone.
One of these will be run in parallel for each child zone."""
One of these will be run in parallel for each child zone.
"""
manager = getattr(nova, collection)
result = None
try:
# NOTE(comstud): This is not ideal, but we have to do this based on
# how novaclient is implemented right now.
# 'find' is special cased as novaclient requires kwargs for it to
# filter on a 'get_all'.
# Every other method first needs to do a 'get' on the first argument
# passed, which should be a UUID. If it's 'get' itself that we want,
# we just return the result. Otherwise, we next call the real method
# that's wanted... passing other arguments that may or may not exist.
if method_name in ['find', 'findall']:
try:
result = manager.get(int(item_id))
except ValueError, e:
result = manager.find(name=item_id)
return getattr(manager, method_name)(**kwargs)
except novaclient.NotFound:
url = zone.api_url
LOG.debug(_("%(collection)s.%(method_name)s didn't find "
"anything matching '%(kwargs)s' on '%(url)s'" %
locals()))
return None
args = list(args)
# pop off the UUID to look up
item = args.pop(0)
try:
result = manager.get(item)
except novaclient.NotFound:
url = zone.api_url
LOG.debug(_("%(collection)s '%(item_id)s' not found on '%(url)s'" %
LOG.debug(_("%(collection)s '%(item)s' not found on '%(url)s'" %
locals()))
return None
if method_name.lower() not in ['get', 'find']:
result = getattr(result, method_name)()
if method_name.lower() != 'get':
# if we're doing something other than 'get', call it passing args.
result = getattr(result, method_name)(*args, **kwargs)
return result
def wrap_novaclient_function(f, collection, method_name, item_id):
"""Appends collection, method_name and item_id to the incoming
def wrap_novaclient_function(f, collection, method_name, *args, **kwargs):
"""Appends collection, method_name and arguments to the incoming
(nova, zone) call from child_zone_helper."""
def inner(nova, zone):
return f(nova, zone, collection, method_name, item_id)
return f(nova, zone, collection, method_name, *args, **kwargs)
return inner
@@ -223,7 +244,7 @@ class reroute_compute(object):
the wrapped method. (This ensures that zone-local code can
continue to use integer IDs).
4. If the item was not found, we delgate the call to a child zone
4. If the item was not found, we delegate the call to a child zone
using the UUID.
"""
def __init__(self, method_name):

View File

@@ -329,8 +329,9 @@ class HostFilterScheduler(zone_aware_scheduler.ZoneAwareScheduler):
'instance_type': <InstanceType dict>}
"""
def filter_hosts(self, num, request_spec):
def filter_hosts(self, topic, request_spec, hosts=None):
"""Filter the full host list (from the ZoneManager)"""
filter_name = request_spec.get('filter', None)
host_filter = choose_host_filter(filter_name)
@@ -341,8 +342,9 @@ class HostFilterScheduler(zone_aware_scheduler.ZoneAwareScheduler):
name, query = host_filter.instance_type_to_filter(instance_type)
return host_filter.filter_hosts(self.zone_manager, query)
def weigh_hosts(self, num, request_spec, hosts):
def weigh_hosts(self, topic, request_spec, hosts):
"""Derived classes must override this method and return
a lists of hosts in [{weight, hostname}] format.
"""
return [dict(weight=1, hostname=host) for host, caps in hosts]
return [dict(weight=1, hostname=hostname, capabilities=caps)
for hostname, caps in hosts]

View File

@@ -48,25 +48,43 @@ def noop_cost_fn(host):
return 1
flags.DEFINE_integer('fill_first_cost_fn_weight', 1,
flags.DEFINE_integer('compute_fill_first_cost_fn_weight', 1,
'How much weight to give the fill-first cost function')
def fill_first_cost_fn(host):
def compute_fill_first_cost_fn(host):
"""Prefer hosts that have less ram available, filter_hosts will exclude
hosts that don't have enough ram"""
hostname, caps = host
free_mem = caps['compute']['host_memory_free']
free_mem = caps['host_memory_free']
return free_mem
class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
def get_cost_fns(self):
def __init__(self, *args, **kwargs):
self.cost_fns_cache = {}
super(LeastCostScheduler, self).__init__(*args, **kwargs)
def get_cost_fns(self, topic):
"""Returns a list of tuples containing weights and cost functions to
use for weighing hosts
"""
if topic in self.cost_fns_cache:
return self.cost_fns_cache[topic]
cost_fns = []
for cost_fn_str in FLAGS.least_cost_scheduler_cost_functions:
if '.' in cost_fn_str:
short_name = cost_fn_str.split('.')[-1]
else:
short_name = cost_fn_str
cost_fn_str = "%s.%s.%s" % (
__name__, self.__class__.__name__, short_name)
if not (short_name.startswith('%s_' % topic) or
short_name.startswith('noop')):
continue
try:
# NOTE(sirp): import_class is somewhat misnamed since it can
@@ -84,23 +102,23 @@ class LeastCostScheduler(zone_aware_scheduler.ZoneAwareScheduler):
cost_fns.append((weight, cost_fn))
self.cost_fns_cache[topic] = cost_fns
return cost_fns
def weigh_hosts(self, num, request_spec, hosts):
def weigh_hosts(self, topic, request_spec, hosts):
"""Returns a list of dictionaries of form:
[ {weight: weight, hostname: hostname} ]"""
[ {weight: weight, hostname: hostname, capabilities: capabs} ]
"""
# FIXME(sirp): weigh_hosts should handle more than just instances
hostnames = [hostname for hostname, caps in hosts]
cost_fns = self.get_cost_fns()
cost_fns = self.get_cost_fns(topic)
costs = weighted_sum(domain=hosts, weighted_fns=cost_fns)
weighted = []
weight_log = []
for cost, hostname in zip(costs, hostnames):
for cost, (hostname, caps) in zip(costs, hosts):
weight_log.append("%s: %s" % (hostname, "%.2f" % cost))
weight_dict = dict(weight=cost, hostname=hostname)
weight_dict = dict(weight=cost, hostname=hostname,
capabilities=caps)
weighted.append(weight_dict)
LOG.debug(_("Weighted Costs => %s") % weight_log)
@@ -127,7 +145,8 @@ def weighted_sum(domain, weighted_fns, normalize=True):
weighted_fns - list of weights and functions like:
[(weight, objective-functions)]
Returns an unsorted of scores. To pair with hosts do: zip(scores, hosts)
Returns an unsorted list of scores. To pair with hosts do:
zip(scores, hosts)
"""
# Table of form:
# { domain1: [score1, score2, ..., scoreM]
@@ -150,7 +169,6 @@ def weighted_sum(domain, weighted_fns, normalize=True):
domain_scores = []
for idx in sorted(score_table):
elem_score = sum(score_table[idx])
elem = domain[idx]
domain_scores.append(elem_score)
return domain_scores

View File

@@ -217,12 +217,16 @@ class ZoneAwareScheduler(driver.Scheduler):
request_spec, kwargs)
return None
num_instances = request_spec.get('num_instances', 1)
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
locals())
# Create build plan and provision ...
build_plan = self.select(context, request_spec)
if not build_plan:
raise driver.NoValidHost(_('No hosts were available'))
for num in xrange(request_spec['num_instances']):
for num in xrange(num_instances):
if not build_plan:
break
@@ -261,18 +265,36 @@ class ZoneAwareScheduler(driver.Scheduler):
raise NotImplemented(_("Zone Aware Scheduler only understands "
"Compute nodes (for now)"))
#TODO(sandy): how to infer this from OS API params?
num_instances = 1
num_instances = request_spec.get('num_instances', 1)
instance_type = request_spec['instance_type']
# Filter local hosts based on requirements ...
host_list = self.filter_hosts(num_instances, request_spec)
weighted = []
host_list = None
# TODO(sirp): weigh_hosts should also be a function of 'topic' or
# resources, so that we can apply different objective functions to it
for i in xrange(num_instances):
# Filter local hosts based on requirements ...
#
# The first pass through here will pass 'None' as the
# host_list.. which tells the filter to build the full
# list of hosts.
# On a 2nd pass, the filter can modify the host_list with
# any updates it needs to make based on resources that
# may have been consumed from a previous build..
host_list = self.filter_hosts(topic, request_spec, host_list)
if not host_list:
LOG.warn(_("Filter returned no hosts after processing "
"%(i)d of %(num_instances)d instances") % locals())
break
# then weigh the selected hosts.
# weighted = [{weight=weight, name=hostname}, ...]
weighted = self.weigh_hosts(num_instances, request_spec, host_list)
# then weigh the selected hosts.
# weighted = [{weight=weight, hostname=hostname,
# capabilities=capabs}, ...]
weights = self.weigh_hosts(topic, request_spec, host_list)
weights.sort(key=operator.itemgetter('weight'))
best_weight = weights[0]
weighted.append(best_weight)
self.consume_resources(topic, best_weight['capabilities'],
instance_type)
# Next, tack on the best weights from the child zones ...
json_spec = json.dumps(request_spec)
@@ -293,18 +315,65 @@ class ZoneAwareScheduler(driver.Scheduler):
weighted.sort(key=operator.itemgetter('weight'))
return weighted
def filter_hosts(self, num, request_spec):
"""Derived classes must override this method and return
a list of hosts in [(hostname, capability_dict)] format.
def compute_filter(self, hostname, capabilities, request_spec):
"""Return whether or not we can schedule to this compute node.
Derived classes should override this and return True if the host
is acceptable for scheduling.
"""
# NOTE(sirp): The default logic is the equivalent to AllHostsFilter
service_states = self.zone_manager.service_states
return [(host, services)
for host, services in service_states.iteritems()]
instance_type = request_spec['instance_type']
requested_mem = instance_type['memory_mb'] * 1024 * 1024
return capabilities['host_memory_free'] >= requested_mem
def weigh_hosts(self, num, request_spec, hosts):
def filter_hosts(self, topic, request_spec, host_list=None):
"""Return a list of hosts which are acceptable for scheduling.
Return value should be a list of (hostname, capability_dict)s.
Derived classes may override this, but may find the
'<topic>_filter' function more appropriate.
"""
def _default_filter(self, hostname, capabilities, request_spec):
"""Default filter function if there's no <topic>_filter"""
# NOTE(sirp): The default logic is the equivalent to
# AllHostsFilter
return True
filter_func = getattr(self, '%s_filter' % topic, _default_filter)
if host_list is None:
first_run = True
host_list = self.zone_manager.service_states.iteritems()
else:
first_run = False
filtered_hosts = []
for host, services in host_list:
if first_run:
if topic not in services:
continue
services = services[topic]
if filter_func(host, services, request_spec):
filtered_hosts.append((host, services))
return filtered_hosts
def weigh_hosts(self, topic, request_spec, hosts):
"""Derived classes may override this to provide more sophisticated
scheduling objectives
"""
# NOTE(sirp): The default logic is the same as the NoopCostFunction
return [dict(weight=1, hostname=host) for host, caps in hosts]
return [dict(weight=1, hostname=hostname, capabilities=capabilities)
for hostname, capabilities in hosts]
def compute_consume(self, capabilities, instance_type):
"""Consume compute resources for selected host"""
requested_mem = max(instance_type['memory_mb'], 0) * 1024 * 1024
capabilities['host_memory_free'] -= requested_mem
def consume_resources(self, topic, capabilities, instance_type):
"""Consume resources for a specific host. 'host' is a tuple
of the hostname and the services"""
consume_func = getattr(self, '%s_consume' % topic, None)
if not consume_func:
return
consume_func(capabilities, instance_type)

View File

@@ -122,15 +122,16 @@ class LeastCostSchedulerTestCase(test.TestCase):
for hostname, caps in hosts]
self.assertWeights(expected, num, request_spec, hosts)
def test_fill_first_cost_fn(self):
def test_compute_fill_first_cost_fn(self):
FLAGS.least_cost_scheduler_cost_functions = [
'nova.scheduler.least_cost.fill_first_cost_fn',
'nova.scheduler.least_cost.compute_fill_first_cost_fn',
]
FLAGS.fill_first_cost_fn_weight = 1
FLAGS.compute_fill_first_cost_fn_weight = 1
num = 1
request_spec = {}
hosts = self.sched.filter_hosts(num, request_spec)
instance_type = {'memory_mb': 1024}
request_spec = {'instance_type': instance_type}
hosts = self.sched.filter_hosts('compute', request_spec, None)
expected = []
for idx, (hostname, caps) in enumerate(hosts):

View File

@@ -57,29 +57,21 @@ def fake_zone_manager_service_states(num_hosts):
class FakeZoneAwareScheduler(zone_aware_scheduler.ZoneAwareScheduler):
def filter_hosts(self, num, specs):
# NOTE(sirp): this is returning [(hostname, services)]
return self.zone_manager.service_states.items()
def weigh_hosts(self, num, specs, hosts):
fake_weight = 99
weighted = []
for hostname, caps in hosts:
weighted.append(dict(weight=fake_weight, name=hostname))
return weighted
# No need to stub anything at the moment
pass
class FakeZoneManager(zone_manager.ZoneManager):
def __init__(self):
self.service_states = {
'host1': {
'compute': {'ram': 1000},
'compute': {'host_memory_free': 1073741824},
},
'host2': {
'compute': {'ram': 2000},
'compute': {'host_memory_free': 2147483648},
},
'host3': {
'compute': {'ram': 3000},
'compute': {'host_memory_free': 3221225472},
},
}
@@ -170,8 +162,8 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
def test_zone_aware_scheduler(self):
"""
Create a nested set of FakeZones, ensure that a select call returns the
appropriate build plan.
Create a nested set of FakeZones, try to build multiple instances
and ensure that a select call returns the appropriate build plan.
"""
sched = FakeZoneAwareScheduler()
self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method)
@@ -181,13 +173,17 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
sched.set_zone_manager(zm)
fake_context = {}
build_plan = sched.select(fake_context, {})
build_plan = sched.select(fake_context,
{'instance_type': {'memory_mb': 512},
'num_instances': 4})
self.assertEqual(15, len(build_plan))
# 4 from local zones, 12 from remotes
self.assertEqual(16, len(build_plan))
hostnames = [plan_item['name']
for plan_item in build_plan if 'name' in plan_item]
self.assertEqual(3, len(hostnames))
hostnames = [plan_item['hostname']
for plan_item in build_plan if 'hostname' in plan_item]
# 4 local hosts
self.assertEqual(4, len(hostnames))
def test_adjust_child_weights(self):
"""Make sure the weights returned by child zones are
@@ -223,8 +219,7 @@ class ZoneAwareSchedulerTestCase(test.TestCase):
fake_context = {}
self.assertRaises(driver.NoValidHost, sched.schedule_run_instance,
fake_context, 1,
dict(host_filter=None,
request_spec={'instance_type': {}}))
dict(host_filter=None, instance_type={}))
def test_schedule_do_not_schedule_with_hint(self):
"""

View File

@@ -131,7 +131,7 @@ class ComputeTestCase(test.TestCase):
instance_ref = models.Instance()
instance_ref['id'] = 1
instance_ref['volumes'] = [vol1, vol2]
instance_ref['hostname'] = 'i-00000001'
instance_ref['hostname'] = 'hostname-1'
instance_ref['host'] = 'dummy'
return instance_ref
@@ -163,6 +163,18 @@ class ComputeTestCase(test.TestCase):
db.security_group_destroy(self.context, group['id'])
db.instance_destroy(self.context, ref[0]['id'])
def test_default_hostname_generator(self):
cases = [(None, 'server_1'), ('Hello, Server!', 'hello_server'),
('<}\x1fh\x10e\x08l\x02l\x05o\x12!{>', 'hello')]
for display_name, hostname in cases:
ref = self.compute_api.create(self.context,
instance_types.get_default_instance_type(), None,
display_name=display_name)
try:
self.assertEqual(ref[0]['hostname'], hostname)
finally:
db.instance_destroy(self.context, ref[0]['id'])
def test_destroy_instance_disassociates_security_groups(self):
"""Make sure destroying disassociates security groups"""
group = self._create_group()