Host Filtering for Distributed Scheduler (done before weighing)

This commit is contained in:
Sandy Walsh
2011-05-07 15:36:55 +00:00
committed by Tarmac
5 changed files with 514 additions and 28 deletions

View File

@@ -76,11 +76,9 @@ def zone_update(context, zone_id, data):
return db.zone_update(context, zone_id, data)
def get_zone_capabilities(context, service=None):
"""Returns a dict of key, value capabilities for this zone,
or for a particular class of services running in this zone."""
return _call_scheduler('get_zone_capabilities', context=context,
params=dict(service=service))
def get_zone_capabilities(context):
"""Returns a dict of key, value capabilities for this zone."""
return _call_scheduler('get_zone_capabilities', context=context)
def update_service_capabilities(context, service_name, host, capabilities):

View File

@@ -0,0 +1,288 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Host Filter is a driver mechanism for requesting instance resources.
Three drivers are included: AllHosts, Flavor & JSON. AllHosts just
returns the full, unfiltered list of hosts. Flavor is a hard coded
matching mechanism based on flavor criteria and JSON is an ad-hoc
filter grammar.
Why JSON? The requests for instances may come in through the
REST interface from a user or a parent Zone.
Currently Flavors and/or InstanceTypes are used for
specifing the type of instance desired. Specific Nova users have
noted a need for a more expressive way of specifying instances.
Since we don't want to get into building full DSL this is a simple
form as an example of how this could be done. In reality, most
consumers will use the more rigid filters such as FlavorFilter.
Note: These are "required" capability filters. These capabilities
used must be present or the host will be excluded. The hosts
returned are then weighed by the Weighted Scheduler. Weights
can take the more esoteric factors into consideration (such as
server affinity and customer separation).
"""
import json
from nova import exception
from nova import flags
from nova import log as logging
from nova import utils
LOG = logging.getLogger('nova.scheduler.host_filter')
FLAGS = flags.FLAGS
flags.DEFINE_string('default_host_filter_driver',
'nova.scheduler.host_filter.AllHostsFilter',
'Which driver to use for filtering hosts.')
class HostFilter(object):
"""Base class for host filter drivers."""
def instance_type_to_filter(self, instance_type):
"""Convert instance_type into a filter for most common use-case."""
raise NotImplementedError()
def filter_hosts(self, zone_manager, query):
"""Return a list of hosts that fulfill the filter."""
raise NotImplementedError()
def _full_name(self):
"""module.classname of the filter driver"""
return "%s.%s" % (self.__module__, self.__class__.__name__)
class AllHostsFilter(HostFilter):
"""NOP host filter driver. Returns all hosts in ZoneManager.
This essentially does what the old Scheduler+Chance used
to give us."""
def instance_type_to_filter(self, instance_type):
"""Return anything to prevent base-class from raising
exception."""
return (self._full_name(), instance_type)
def filter_hosts(self, zone_manager, query):
"""Return a list of hosts from ZoneManager list."""
return [(host, services)
for host, services in zone_manager.service_states.iteritems()]
class FlavorFilter(HostFilter):
"""HostFilter driver hard-coded to work with flavors."""
def instance_type_to_filter(self, instance_type):
"""Use instance_type to filter hosts."""
return (self._full_name(), instance_type)
def filter_hosts(self, zone_manager, query):
"""Return a list of hosts that can create instance_type."""
instance_type = query
selected_hosts = []
for host, services in zone_manager.service_states.iteritems():
capabilities = services.get('compute', {})
host_ram_mb = capabilities['host_memory']['free']
disk_bytes = capabilities['disk']['available']
if host_ram_mb >= instance_type['memory_mb'] and \
disk_bytes >= instance_type['local_gb']:
selected_hosts.append((host, capabilities))
return selected_hosts
#host entries (currently) are like:
# {'host_name-description': 'Default install of XenServer',
# 'host_hostname': 'xs-mini',
# 'host_memory': {'total': 8244539392,
# 'overhead': 184225792,
# 'free': 3868327936,
# 'free-computed': 3840843776},
# 'host_other-config': {},
# 'host_ip_address': '192.168.1.109',
# 'host_cpu_info': {},
# 'disk': {'available': 32954957824,
# 'total': 50394562560,
# 'used': 17439604736},
# 'host_uuid': 'cedb9b39-9388-41df-8891-c5c9a0c0fe5f',
# 'host_name-label': 'xs-mini'}
# instance_type table has:
#name = Column(String(255), unique=True)
#memory_mb = Column(Integer)
#vcpus = Column(Integer)
#local_gb = Column(Integer)
#flavorid = Column(Integer, unique=True)
#swap = Column(Integer, nullable=False, default=0)
#rxtx_quota = Column(Integer, nullable=False, default=0)
#rxtx_cap = Column(Integer, nullable=False, default=0)
class JsonFilter(HostFilter):
"""Host Filter driver to allow simple JSON-based grammar for
selecting hosts."""
def _equals(self, args):
"""First term is == all the other terms."""
if len(args) < 2:
return False
lhs = args[0]
for rhs in args[1:]:
if lhs != rhs:
return False
return True
def _less_than(self, args):
"""First term is < all the other terms."""
if len(args) < 2:
return False
lhs = args[0]
for rhs in args[1:]:
if lhs >= rhs:
return False
return True
def _greater_than(self, args):
"""First term is > all the other terms."""
if len(args) < 2:
return False
lhs = args[0]
for rhs in args[1:]:
if lhs <= rhs:
return False
return True
def _in(self, args):
"""First term is in set of remaining terms"""
if len(args) < 2:
return False
return args[0] in args[1:]
def _less_than_equal(self, args):
"""First term is <= all the other terms."""
if len(args) < 2:
return False
lhs = args[0]
for rhs in args[1:]:
if lhs > rhs:
return False
return True
def _greater_than_equal(self, args):
"""First term is >= all the other terms."""
if len(args) < 2:
return False
lhs = args[0]
for rhs in args[1:]:
if lhs < rhs:
return False
return True
def _not(self, args):
"""Flip each of the arguments."""
if len(args) == 0:
return False
return [not arg for arg in args]
def _or(self, args):
"""True if any arg is True."""
return True in args
def _and(self, args):
"""True if all args are True."""
return False not in args
commands = {
'=': _equals,
'<': _less_than,
'>': _greater_than,
'in': _in,
'<=': _less_than_equal,
'>=': _greater_than_equal,
'not': _not,
'or': _or,
'and': _and,
}
def instance_type_to_filter(self, instance_type):
"""Convert instance_type into JSON filter object."""
required_ram = instance_type['memory_mb']
required_disk = instance_type['local_gb']
query = ['and',
['>=', '$compute.host_memory.free', required_ram],
['>=', '$compute.disk.available', required_disk]
]
return (self._full_name(), json.dumps(query))
def _parse_string(self, string, host, services):
"""Strings prefixed with $ are capability lookups in the
form '$service.capability[.subcap*]'"""
if not string:
return None
if string[0] != '$':
return string
path = string[1:].split('.')
for item in path:
services = services.get(item, None)
if not services:
return None
return services
def _process_filter(self, zone_manager, query, host, services):
"""Recursively parse the query structure."""
if len(query) == 0:
return True
cmd = query[0]
method = self.commands[cmd] # Let exception fly.
cooked_args = []
for arg in query[1:]:
if isinstance(arg, list):
arg = self._process_filter(zone_manager, arg, host, services)
elif isinstance(arg, basestring):
arg = self._parse_string(arg, host, services)
if arg != None:
cooked_args.append(arg)
result = method(self, cooked_args)
return result
def filter_hosts(self, zone_manager, query):
"""Return a list of hosts that can fulfill filter."""
expanded = json.loads(query)
hosts = []
for host, services in zone_manager.service_states.iteritems():
r = self._process_filter(zone_manager, expanded, host, services)
if isinstance(r, list):
r = True in r
if r:
hosts.append((host, services))
return hosts
DRIVERS = [AllHostsFilter, FlavorFilter, JsonFilter]
def choose_driver(driver_name=None):
"""Since the caller may specify which driver to use we need
to have an authoritative list of what is permissible. This
function checks the driver name against a predefined set
of acceptable drivers."""
if not driver_name:
driver_name = FLAGS.default_host_filter_driver
for driver in DRIVERS:
if "%s.%s" % (driver.__module__, driver.__name__) == driver_name:
return driver()
raise exception.SchedulerHostFilterDriverNotFound(driver_name=driver_name)

View File

@@ -106,28 +106,26 @@ class ZoneManager(object):
def __init__(self):
self.last_zone_db_check = datetime.min
self.zone_states = {} # { <zone_id> : ZoneState }
self.service_states = {} # { <service> : { <host> : { cap k : v }}}
self.service_states = {} # { <host> : { <service> : { cap k : v }}}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def get_zone_capabilities(self, context, service=None):
def get_zone_capabilities(self, context):
"""Roll up all the individual host info to generic 'service'
capabilities. Each capability is aggregated into
<cap>_min and <cap>_max values."""
service_dict = self.service_states
if service:
service_dict = {service: self.service_states.get(service, {})}
hosts_dict = self.service_states
# TODO(sandy) - be smarter about fabricating this structure.
# But it's likely to change once we understand what the Best-Match
# code will need better.
combined = {} # { <service>_<cap> : (min, max), ... }
for service_name, host_dict in service_dict.iteritems():
for host, caps_dict in host_dict.iteritems():
for cap, value in caps_dict.iteritems():
for host, host_dict in hosts_dict.iteritems():
for service_name, service_dict in host_dict.iteritems():
for cap, value in service_dict.iteritems():
key = "%s_%s" % (service_name, cap)
min_value, max_value = combined.get(key, (value, value))
min_value = min(min_value, value)
@@ -171,6 +169,6 @@ class ZoneManager(object):
"""Update the per-service capabilities based on this notification."""
logging.debug(_("Received %(service_name)s service update from "
"%(host)s: %(capabilities)s") % locals())
service_caps = self.service_states.get(service_name, {})
service_caps[host] = capabilities
self.service_states[service_name] = service_caps
service_caps = self.service_states.get(host, {})
service_caps[service_name] = capabilities
self.service_states[host] = service_caps

View File

@@ -0,0 +1,208 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Scheduler Host Filter Drivers.
"""
import json
from nova import exception
from nova import flags
from nova import test
from nova.scheduler import host_filter
FLAGS = flags.FLAGS
class FakeZoneManager:
pass
class HostFilterTestCase(test.TestCase):
"""Test case for host filter drivers."""
def _host_caps(self, multiplier):
# Returns host capabilities in the following way:
# host1 = memory:free 10 (100max)
# disk:available 100 (1000max)
# hostN = memory:free 10 + 10N
# disk:available 100 + 100N
# in other words: hostN has more resources than host0
# which means ... don't go above 10 hosts.
return {'host_name-description': 'XenServer %s' % multiplier,
'host_hostname': 'xs-%s' % multiplier,
'host_memory': {'total': 100,
'overhead': 10,
'free': 10 + multiplier * 10,
'free-computed': 10 + multiplier * 10},
'host_other-config': {},
'host_ip_address': '192.168.1.%d' % (100 + multiplier),
'host_cpu_info': {},
'disk': {'available': 100 + multiplier * 100,
'total': 1000,
'used': 0},
'host_uuid': 'xxx-%d' % multiplier,
'host_name-label': 'xs-%s' % multiplier}
def setUp(self):
self.old_flag = FLAGS.default_host_filter_driver
FLAGS.default_host_filter_driver = \
'nova.scheduler.host_filter.AllHostsFilter'
self.instance_type = dict(name='tiny',
memory_mb=50,
vcpus=10,
local_gb=500,
flavorid=1,
swap=500,
rxtx_quota=30000,
rxtx_cap=200)
self.zone_manager = FakeZoneManager()
states = {}
for x in xrange(10):
states['host%02d' % (x + 1)] = {'compute': self._host_caps(x)}
self.zone_manager.service_states = states
def tearDown(self):
FLAGS.default_host_filter_driver = self.old_flag
def test_choose_driver(self):
# Test default driver ...
driver = host_filter.choose_driver()
self.assertEquals(driver._full_name(),
'nova.scheduler.host_filter.AllHostsFilter')
# Test valid driver ...
driver = host_filter.choose_driver(
'nova.scheduler.host_filter.FlavorFilter')
self.assertEquals(driver._full_name(),
'nova.scheduler.host_filter.FlavorFilter')
# Test invalid driver ...
try:
host_filter.choose_driver('does not exist')
self.fail("Should not find driver")
except exception.SchedulerHostFilterDriverNotFound:
pass
def test_all_host_driver(self):
driver = host_filter.AllHostsFilter()
cooked = driver.instance_type_to_filter(self.instance_type)
hosts = driver.filter_hosts(self.zone_manager, cooked)
self.assertEquals(10, len(hosts))
for host, capabilities in hosts:
self.assertTrue(host.startswith('host'))
def test_flavor_driver(self):
driver = host_filter.FlavorFilter()
# filter all hosts that can support 50 ram and 500 disk
name, cooked = driver.instance_type_to_filter(self.instance_type)
self.assertEquals('nova.scheduler.host_filter.FlavorFilter', name)
hosts = driver.filter_hosts(self.zone_manager, cooked)
self.assertEquals(6, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
self.assertEquals('host05', just_hosts[0])
self.assertEquals('host10', just_hosts[5])
def test_json_driver(self):
driver = host_filter.JsonFilter()
# filter all hosts that can support 50 ram and 500 disk
name, cooked = driver.instance_type_to_filter(self.instance_type)
self.assertEquals('nova.scheduler.host_filter.JsonFilter', name)
hosts = driver.filter_hosts(self.zone_manager, cooked)
self.assertEquals(6, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
self.assertEquals('host05', just_hosts[0])
self.assertEquals('host10', just_hosts[5])
# Try some custom queries
raw = ['or',
['and',
['<', '$compute.host_memory.free', 30],
['<', '$compute.disk.available', 300]
],
['and',
['>', '$compute.host_memory.free', 70],
['>', '$compute.disk.available', 700]
]
]
cooked = json.dumps(raw)
hosts = driver.filter_hosts(self.zone_manager, cooked)
self.assertEquals(5, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
for index, host in zip([1, 2, 8, 9, 10], just_hosts):
self.assertEquals('host%02d' % index, host)
raw = ['not',
['=', '$compute.host_memory.free', 30],
]
cooked = json.dumps(raw)
hosts = driver.filter_hosts(self.zone_manager, cooked)
self.assertEquals(9, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
for index, host in zip([1, 2, 4, 5, 6, 7, 8, 9, 10], just_hosts):
self.assertEquals('host%02d' % index, host)
raw = ['in', '$compute.host_memory.free', 20, 40, 60, 80, 100]
cooked = json.dumps(raw)
hosts = driver.filter_hosts(self.zone_manager, cooked)
self.assertEquals(5, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
for index, host in zip([2, 4, 6, 8, 10], just_hosts):
self.assertEquals('host%02d' % index, host)
# Try some bogus input ...
raw = ['unknown command', ]
cooked = json.dumps(raw)
try:
driver.filter_hosts(self.zone_manager, cooked)
self.fail("Should give KeyError")
except KeyError, e:
pass
self.assertTrue(driver.filter_hosts(self.zone_manager, json.dumps([])))
self.assertTrue(driver.filter_hosts(self.zone_manager, json.dumps({})))
self.assertTrue(driver.filter_hosts(self.zone_manager, json.dumps(
['not', True, False, True, False]
)))
try:
driver.filter_hosts(self.zone_manager, json.dumps(
'not', True, False, True, False
))
self.fail("Should give KeyError")
except KeyError, e:
pass
self.assertFalse(driver.filter_hosts(self.zone_manager, json.dumps(
['=', '$foo', 100]
)))
self.assertFalse(driver.filter_hosts(self.zone_manager, json.dumps(
['=', '$.....', 100]
)))
self.assertFalse(driver.filter_hosts(self.zone_manager, json.dumps(
['>', ['and', ['or', ['not', ['<', ['>=', ['<=', ['in', ]]]]]]]]
)))
self.assertFalse(driver.filter_hosts(self.zone_manager, json.dumps(
['=', {}, ['>', '$missing....foo']]
)))

View File

@@ -78,38 +78,32 @@ class ZoneManagerTestCase(test.TestCase):
def test_service_capabilities(self):
zm = zone_manager.ZoneManager()
caps = zm.get_zone_capabilities(self, None)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, {})
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
caps = zm.get_zone_capabilities(self, None)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
caps = zm.get_zone_capabilities(self, None)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
caps = zm.get_zone_capabilities(self, None)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
caps = zm.get_zone_capabilities(self, None)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc10_a=(99, 99), svc10_b=(99, 99)))
zm.update_service_capabilities("svc1", "host3", dict(c=5))
caps = zm.get_zone_capabilities(self, None)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5), svc10_a=(99, 99),
svc10_b=(99, 99)))
caps = zm.get_zone_capabilities(self, 'svc1')
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5)))
caps = zm.get_zone_capabilities(self, 'svc10')
self.assertEquals(caps, dict(svc10_a=(99, 99), svc10_b=(99, 99)))
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()