Merge "Separate scheduler host management"

This commit is contained in:
Jenkins 2012-01-18 16:42:39 +00:00 committed by Gerrit Code Review
commit 885b9aa70d
24 changed files with 1608 additions and 1261 deletions

View File

@ -133,16 +133,12 @@ class Controller(object):
def info(self, req):
"""Return name and capabilities for this zone."""
context = req.environ['nova.context']
items = nova.scheduler.api.get_zone_capabilities(context)
zone = dict(name=FLAGS.zone_name)
caps = FLAGS.zone_capabilities
for cap in caps:
key, value = cap.split('=')
zone[key] = value
for item, (min_value, max_value) in items.iteritems():
zone[item] = "%s,%s" % (min_value, max_value)
return dict(zone=zone)
zone_capabs = nova.scheduler.api.get_zone_capabilities(context)
# NOTE(comstud): This should probably return, instead:
# {'zone': {'name': FLAGS.zone_name,
# 'capabilities': zone_capabs}}
zone_capabs['name'] = FLAGS.zone_name
return dict(zone=zone_capabs)
@wsgi.serializers(xml=ZoneTemplate)
def show(self, req, id):

View File

@ -87,7 +87,20 @@ def zone_update(context, zone_id, data):
def get_zone_capabilities(context):
"""Returns a dict of key, value capabilities for this zone."""
return _call_scheduler('get_zone_capabilities', context=context)
zone_capabs = {}
# First grab the capabilities of combined services.
service_capabs = _call_scheduler('get_service_capabilities', context)
for item, (min_value, max_value) in service_capabs.iteritems():
zone_capabs[item] = "%s,%s" % (min_value, max_value)
# Add the capabilities defined by FLAGS
caps = FLAGS.zone_capabilities
for cap in caps:
key, value = cap.split('=')
zone_capabs[key] = value
return zone_capabs
def select(context, specs=None):

View File

@ -67,12 +67,11 @@ class ChanceScheduler(driver.Scheduler):
def schedule_run_instance(self, context, request_spec, *_args, **kwargs):
"""Create and run an instance or instances"""
elevated = context.elevated()
num_instances = request_spec.get('num_instances', 1)
instances = []
for num in xrange(num_instances):
host = self._schedule(context, 'compute', request_spec, **kwargs)
instance = self.create_instance_db_entry(elevated, request_spec)
instance = self.create_instance_db_entry(context, request_spec)
driver.cast_to_compute_host(context, host,
'run_instance', instance_uuid=instance['uuid'], **kwargs)
instances.append(driver.encode_instance(instance))
@ -85,4 +84,4 @@ class ChanceScheduler(driver.Scheduler):
def schedule_prep_resize(self, context, request_spec, *args, **kwargs):
"""Select a target for resize."""
host = self._schedule(context, 'compute', request_spec, **kwargs)
driver.cast_to_host(context, 'compute', host, 'prep_resize', **kwargs)
driver.cast_to_compute_host(context, host, 'prep_resize', **kwargs)

View File

@ -33,16 +33,13 @@ from nova import flags
from nova import log as logging
from nova.scheduler import api
from nova.scheduler import driver
from nova.scheduler import host_manager
from nova.scheduler import least_cost
from nova.scheduler import scheduler_options
from nova import utils
FLAGS = flags.FLAGS
flags.DEFINE_list('default_host_filters', ['InstanceTypeFilter'],
'Which filters to use for filtering hosts when not specified '
'in the request.')
LOG = logging.getLogger('nova.scheduler.distributed_scheduler')
@ -108,11 +105,11 @@ class DistributedScheduler(driver.Scheduler):
weighted_host = weighted_hosts.pop(0)
instance = None
if weighted_host.host:
instance = self._provision_resource_locally(elevated,
if weighted_host.zone:
instance = self._ask_child_zone_to_create_instance(elevated,
weighted_host, request_spec, kwargs)
else:
instance = self._ask_child_zone_to_create_instance(elevated,
instance = self._provision_resource_locally(elevated,
weighted_host, request_spec, kwargs)
if instance:
@ -145,8 +142,8 @@ class DistributedScheduler(driver.Scheduler):
host = hosts.pop(0)
# Forward off to the host
driver.cast_to_host(context, 'compute', host.host, 'prep_resize',
**kwargs)
driver.cast_to_compute_host(context, host.host_state.host,
'prep_resize', **kwargs)
def select(self, context, request_spec, *args, **kwargs):
"""Select returns a list of weights and zone/host information
@ -167,7 +164,7 @@ class DistributedScheduler(driver.Scheduler):
kwargs):
"""Create the requested resource in this Zone."""
instance = self.create_instance_db_entry(context, request_spec)
driver.cast_to_compute_host(context, weighted_host.host,
driver.cast_to_compute_host(context, weighted_host.host_state.host,
'run_instance', instance_uuid=instance['uuid'], **kwargs)
inst = driver.encode_instance(instance, local=True)
# So if another instance is created, create_instance_db_entry will
@ -189,7 +186,8 @@ class DistributedScheduler(driver.Scheduler):
blob = wh_dict.get('blob', None)
zone = wh_dict.get('zone', None)
return least_cost.WeightedHost(wh_dict['weight'],
host=host, blob=blob, zone=zone)
host_state=host_manager.HostState(host, 'compute'),
blob=blob, zone=zone)
except M2Crypto.EVP.EVPError:
raise InvalidBlob()
@ -265,8 +263,8 @@ class DistributedScheduler(driver.Scheduler):
cooked_weight = offset + scale * raw_weight
weighted_hosts.append(least_cost.WeightedHost(
host=None, weight=cooked_weight,
zone=zone_id, blob=item['blob']))
cooked_weight, zone=zone_id,
blob=item['blob']))
except KeyError:
LOG.exception(_("Bad child zone scaling values "
"for Zone: %(zone_id)s") % locals())
@ -280,6 +278,17 @@ class DistributedScheduler(driver.Scheduler):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
"""Stuff things into filter_properties. Can be overriden in a
subclass to add more data.
"""
try:
if request_spec['avoid_original_host']:
original_host = request_spec['instance_properties']['host']
filter_properties['ignore_hosts'].append(original_host)
except (KeyError, TypeError):
pass
def _schedule(self, elevated, topic, request_spec, *args, **kwargs):
"""Returns a list of hosts that meet the required specs,
ordered by their fitness.
@ -288,6 +297,7 @@ class DistributedScheduler(driver.Scheduler):
msg = _("Scheduler only understands Compute nodes (for now)")
raise NotImplementedError(msg)
instance_properties = request_spec['instance_properties']
instance_type = request_spec.get("instance_type", None)
if not instance_type:
msg = _("Scheduler only understands InstanceType-based" \
@ -299,7 +309,13 @@ class DistributedScheduler(driver.Scheduler):
ram_requirement_mb = instance_type['memory_mb']
disk_requirement_gb = instance_type['local_gb']
options = self._get_configuration_options()
config_options = self._get_configuration_options()
filter_properties = {'config_options': config_options,
'instance_type': instance_type,
'ignore_hosts': []}
self.populate_filter_properties(request_spec, filter_properties)
# Find our local list of acceptable hosts by repeatedly
# filtering and weighing our options. Each time we choose a
@ -307,33 +323,37 @@ class DistributedScheduler(driver.Scheduler):
# selections can adjust accordingly.
# unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
unfiltered_hosts_dict = self.zone_manager.get_all_host_data(elevated)
unfiltered_hosts = unfiltered_hosts_dict.items()
unfiltered_hosts_dict = self.host_manager.get_all_host_states(
elevated, topic)
hosts = unfiltered_hosts_dict.itervalues()
num_instances = request_spec.get('num_instances', 1)
selected_hosts = []
for num in xrange(num_instances):
# Filter local hosts based on requirements ...
filtered_hosts = self._filter_hosts(topic, request_spec,
unfiltered_hosts, options)
if not filtered_hosts:
hosts = self.host_manager.filter_hosts(hosts,
filter_properties)
if not hosts:
# Can't get any more locally.
break
LOG.debug(_("Filtered %(filtered_hosts)s") % locals())
LOG.debug(_("Filtered %(hosts)s") % locals())
# weighted_host = WeightedHost() ... the best
# host for the job.
# TODO(comstud): filter_properties will also be used for
# weighing and I plan fold weighing into the host manager
# in a future patch. I'll address the naming of this
# variable at that time.
weighted_host = least_cost.weighted_sum(cost_functions,
filtered_hosts, options)
hosts, filter_properties)
LOG.debug(_("Weighted %(weighted_host)s") % locals())
selected_hosts.append(weighted_host)
# Now consume the resources so the filter/weights
# will change for the next instance.
weighted_host.hostinfo.consume_resources(disk_requirement_gb,
ram_requirement_mb)
weighted_host.host_state.consume_from_instance(
instance_properties)
# Next, tack on the host weights from the child zones
if not request_spec.get('local_zone', False):
@ -346,72 +366,6 @@ class DistributedScheduler(driver.Scheduler):
selected_hosts.sort(key=operator.attrgetter('weight'))
return selected_hosts[:num_instances]
def _get_filter_classes(self):
# Imported here to avoid circular imports
from nova.scheduler import filters
def get_itm(nm):
return getattr(filters, nm)
return [get_itm(itm) for itm in dir(filters)
if isinstance(get_itm(itm), type)
and issubclass(get_itm(itm), filters.AbstractHostFilter)
and get_itm(itm) is not filters.AbstractHostFilter]
def _choose_host_filters(self, filters=None):
"""Since the caller may specify which filters to use we need
to have an authoritative list of what is permissible. This
function checks the filter names against a predefined set
of acceptable filters.
"""
if not filters:
filters = FLAGS.default_host_filters
if not isinstance(filters, (list, tuple)):
filters = [filters]
good_filters = []
bad_filters = []
filter_classes = self._get_filter_classes()
for filter_name in filters:
found_class = False
for cls in filter_classes:
if cls.__name__ == filter_name:
good_filters.append(cls())
found_class = True
break
if not found_class:
bad_filters.append(filter_name)
if bad_filters:
msg = ", ".join(bad_filters)
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
return good_filters
def _filter_hosts(self, topic, request_spec, hosts, options):
"""Filter the full host list. hosts = [(host, HostInfo()), ...].
This method returns a subset of hosts, in the same format."""
selected_filters = self._choose_host_filters()
# Filter out original host
try:
if request_spec['avoid_original_host']:
original_host = request_spec['instance_properties']['host']
hosts = [(h, hi) for h, hi in hosts if h != original_host]
except (KeyError, TypeError):
pass
# TODO(sandy): We're only using InstanceType-based specs
# currently. Later we'll need to snoop for more detailed
# host filter requests.
instance_type = request_spec.get("instance_type", None)
if instance_type is None:
# No way to select; return the specified hosts.
return hosts
for selected_filter in selected_filters:
query = selected_filter.instance_type_to_filter(instance_type)
hosts = selected_filter.filter_hosts(hosts, query, options)
return hosts
def get_cost_functions(self, topic=None):
"""Returns a list of tuples containing weights and cost functions to
use for weighing hosts

View File

@ -21,22 +21,30 @@
Scheduler base class that all Schedulers should inherit from
"""
from nova.api.ec2 import ec2utils
from nova.compute import api as compute_api
from nova.compute import power_state
from nova.compute import vm_states
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import rpc
from nova.scheduler import host_manager
from nova.scheduler import zone_manager
from nova import utils
from nova.compute import api as compute_api
from nova.compute import power_state
from nova.compute import vm_states
from nova.api.ec2 import ec2utils
FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.scheduler.driver')
flags.DEFINE_integer('service_down_time', 60,
'maximum time since last check-in for up service')
flags.DEFINE_string('scheduler_host_manager',
'nova.scheduler.host_manager.HostManager',
'The scheduler host manager class to use')
flags.DEFINE_string('scheduler_zone_manager',
'nova.scheduler.zone_manager.ZoneManager',
'The scheduler zone manager class to use')
flags.DECLARE('instances_path', 'nova.compute.manager')
@ -113,20 +121,43 @@ def encode_instance(instance, local=True):
if local:
return dict(id=instance['id'], _is_precooked=False)
else:
instance['_is_precooked'] = True
return instance
inst = dict(instance)
inst['_is_precooked'] = True
return inst
class Scheduler(object):
"""The base class that all Scheduler classes should inherit from."""
def __init__(self):
self.zone_manager = None
self.zone_manager = utils.import_object(
FLAGS.scheduler_zone_manager)
self.host_manager = utils.import_object(
FLAGS.scheduler_host_manager)
self.compute_api = compute_api.API()
def set_zone_manager(self, zone_manager):
"""Called by the Scheduler Service to supply a ZoneManager."""
self.zone_manager = zone_manager
def get_host_list(self):
"""Get a list of hosts from the HostManager."""
return self.host_manager.get_host_list()
def get_zone_list(self):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
def get_service_capabilities(self):
"""Get the normalized set of capabilities for the services
in this zone.
"""
return self.host_manager.get_service_capabilities()
def update_service_capabilities(self, service_name, host, capabilities):
"""Process a capability update from a service node."""
self.host_manager.update_service_capabilities(service_name,
host, capabilities)
def poll_child_zones(self, context):
"""Poll child zones periodically to get status."""
return self.zone_manager.update(context)
@staticmethod
def service_is_up(service):
@ -140,7 +171,7 @@ class Scheduler(object):
"""Return the list of hosts that have a running service for topic."""
services = db.service_get_all_by_topic(context, topic)
return [service.host
return [service['host']
for service in services
if self.service_is_up(service)]
@ -168,6 +199,10 @@ class Scheduler(object):
"""Must override at least this method for scheduler to work."""
raise NotImplementedError(_("Must implement a fallback schedule"))
def select(self, context, topic, method, *_args, **_kwargs):
"""Must override this for zones to work."""
raise NotImplementedError(_("Must implement 'select' method"))
def schedule_live_migration(self, context, instance_id, dest,
block_migration=False,
disk_over_commit=False):
@ -232,7 +267,7 @@ class Scheduler(object):
# to the instance.
if len(instance_ref['volumes']) != 0:
services = db.service_get_all_by_topic(context, 'volume')
if len(services) < 1 or not self.service_is_up(services[0]):
if len(services) < 1 or not self.service_is_up(services[0]):
raise exception.VolumeServiceUnavailable()
# Checking src host exists and compute node
@ -302,6 +337,7 @@ class Scheduler(object):
reason = _("Block migration can not be used "
"with shared storage.")
raise exception.InvalidSharedStorage(reason=reason, path=dest)
# FIXME(comstud): See LP891756.
except exception.FileNotFound:
if not block_migration:
src = instance_ref['host']

View File

@ -32,5 +32,5 @@ InstanceType filter.
from abstract_filter import AbstractHostFilter
from all_hosts_filter import AllHostsFilter
from instance_type_filter import InstanceTypeFilter
from compute_filter import ComputeFilter
from json_filter import JsonFilter

View File

@ -16,13 +16,9 @@
class AbstractHostFilter(object):
"""Base class for host filters."""
def instance_type_to_filter(self, instance_type):
"""Convert instance_type into a filter for most common use-case."""
raise NotImplementedError()
def filter_hosts(self, host_list, query, options):
"""Return a list of hosts that fulfill the filter."""
raise NotImplementedError()
def host_passes(self, host_state, filter_properties):
return True
def _full_name(self):
"""module.classname of the filter."""

View File

@ -1,4 +1,4 @@
# Copyright (c) 2011 Openstack, LLC.
# Copyright (c) 2011-2012 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
@ -18,13 +18,7 @@ import abstract_filter
class AllHostsFilter(abstract_filter.AbstractHostFilter):
"""NOP host filter. Returns all hosts in ZoneManager."""
def instance_type_to_filter(self, instance_type):
"""Return anything to prevent base-class from raising
exception.
"""
return instance_type
"""NOP host filter. Returns all hosts."""
def filter_hosts(self, host_list, query, options):
"""Return the entire list of supplied hosts."""
return list(host_list)
def host_passes(self, host_state, filter_properties):
return True

View File

@ -13,19 +13,15 @@
# License for the specific language governing permissions and limitations
# under the License.
import logging
from nova import log as logging
from nova.scheduler.filters import abstract_filter
LOG = logging.getLogger('nova.scheduler.filter.instance_type_filter')
LOG = logging.getLogger('nova.scheduler.filter.compute_filter')
class InstanceTypeFilter(abstract_filter.AbstractHostFilter):
class ComputeFilter(abstract_filter.AbstractHostFilter):
"""HostFilter hard-coded to work with InstanceType records."""
def instance_type_to_filter(self, instance_type):
"""Use instance_type to filter hosts."""
return instance_type
def _satisfies_extra_specs(self, capabilities, instance_type):
"""Check that the capabilities provided by the compute service
@ -36,35 +32,28 @@ class InstanceTypeFilter(abstract_filter.AbstractHostFilter):
# NOTE(lorinh): For now, we are just checking exact matching on the
# values. Later on, we want to handle numerical
# values so we can represent things like number of GPU cards
try:
for key, value in instance_type['extra_specs'].iteritems():
if capabilities[key] != value:
return False
except KeyError, e:
return False
for key, value in instance_type['extra_specs'].iteritems():
if capabilities.get(key, None) != value:
return False
return True
def _basic_ram_filter(self, host_name, host_info, instance_type):
def _basic_ram_filter(self, host_state, instance_type):
"""Only return hosts with sufficient available RAM."""
requested_ram = instance_type['memory_mb']
free_ram_mb = host_info.free_ram_mb
free_ram_mb = host_state.free_ram_mb
return free_ram_mb >= requested_ram
def filter_hosts(self, host_list, query, options):
def host_passes(self, host_state, filter_properties):
"""Return a list of hosts that can create instance_type."""
instance_type = query
selected_hosts = []
for hostname, host_info in host_list:
if not self._basic_ram_filter(hostname, host_info,
instance_type):
continue
capabilities = host_info.compute
if capabilities:
if not capabilities.get("enabled", True):
continue
if not self._satisfies_extra_specs(capabilities,
instance_type):
continue
instance_type = filter_properties.get('instance_type')
if host_state.topic != 'compute' or not instance_type:
return True
capabilities = host_state.capabilities or {}
selected_hosts.append((hostname, host_info))
return selected_hosts
if not self._basic_ram_filter(host_state, instance_type):
return False
if not capabilities.get("enabled", True):
return False
if not self._satisfies_extra_specs(capabilities, instance_type):
return False
return True

View File

@ -86,18 +86,11 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
'and': _and,
}
def instance_type_to_filter(self, instance_type):
"""Convert instance_type into JSON filter object."""
required_ram = instance_type['memory_mb']
required_disk = instance_type['local_gb']
query = ['and',
['>=', '$compute.host_memory_free', required_ram],
['>=', '$compute.disk_available', required_disk]]
return json.dumps(query)
def _parse_string(self, string, host, hostinfo):
def _parse_string(self, string, host_state):
"""Strings prefixed with $ are capability lookups in the
form '$service.capability[.subcap*]'.
form '$variable' where 'variable' is an attribute in the
HostState class. If $variable is a dictionary, you may
use: $variable.dictkey
"""
if not string:
return None
@ -105,18 +98,16 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
return string
path = string[1:].split(".")
services = dict(compute=hostinfo.compute, network=hostinfo.network,
volume=hostinfo.volume)
service = services.get(path[0], None)
if not service:
obj = getattr(host_state, path[0], None)
if obj is None:
return None
for item in path[1:]:
service = service.get(item, None)
if not service:
obj = obj.get(item, None)
if obj is None:
return None
return service
return obj
def _process_filter(self, query, host, hostinfo):
def _process_filter(self, query, host_state):
"""Recursively parse the query structure."""
if not query:
return True
@ -125,30 +116,31 @@ class JsonFilter(abstract_filter.AbstractHostFilter):
cooked_args = []
for arg in query[1:]:
if isinstance(arg, list):
arg = self._process_filter(arg, host, hostinfo)
arg = self._process_filter(arg, host_state)
elif isinstance(arg, basestring):
arg = self._parse_string(arg, host, hostinfo)
arg = self._parse_string(arg, host_state)
if arg is not None:
cooked_args.append(arg)
result = method(self, cooked_args)
return result
def filter_hosts(self, host_list, query, options):
def host_passes(self, host_state, filter_properties):
"""Return a list of hosts that can fulfill the requirements
specified in the query.
"""
expanded = json.loads(query)
filtered_hosts = []
for host, hostinfo in host_list:
if not hostinfo:
continue
if hostinfo.compute and not hostinfo.compute.get("enabled", True):
# Host is disabled
continue
result = self._process_filter(expanded, host, hostinfo)
if isinstance(result, list):
# If any succeeded, include the host
result = any(result)
if result:
filtered_hosts.append((host, hostinfo))
return filtered_hosts
capabilities = host_state.capabilities or {}
if not capabilities.get("enabled", True):
return False
query = filter_properties.get('query', None)
if not query:
return True
result = self._process_filter(json.loads(query), host_state)
if isinstance(result, list):
# If any succeeded, include the host
result = any(result)
if result:
# Filter it out.
return True
return False

View File

@ -0,0 +1,310 @@
# Copyright (c) 2011 Openstack, LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Manage hosts in the current zone.
"""
import datetime
import types
import UserDict
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import utils
FLAGS = flags.FLAGS
flags.DEFINE_integer('reserved_host_disk_mb', 0,
'Amount of disk in MB to reserve for host/dom0')
flags.DEFINE_integer('reserved_host_memory_mb', 512,
'Amount of memory in MB to reserve for host/dom0')
flags.DEFINE_list('default_host_filters', ['ComputeFilter'],
'Which filters to use for filtering hosts when not specified '
'in the request.')
LOG = logging.getLogger('nova.scheduler.host_manager')
class ReadOnlyDict(UserDict.IterableUserDict):
"""A read-only dict."""
def __init__(self, source=None):
self.data = {}
self.update(source)
def __setitem__(self, key, item):
raise TypeError
def __delitem__(self, key):
raise TypeError
def clear(self):
raise TypeError
def pop(self, key, *args):
raise TypeError
def popitem(self):
raise TypeError
def update(self, source=None):
if source is None:
return
elif isinstance(source, UserDict.UserDict):
self.data = source.data
elif isinstance(source, type({})):
self.data = source
else:
raise TypeError
class HostState(object):
"""Mutable and immutable information tracked for a host.
This is an attempt to remove the ad-hoc data structures
previously used and lock down access.
"""
def __init__(self, host, topic, capabilities=None):
self.host = host
self.topic = topic
# Read-only capability dicts
if capabilities is None:
capabilities = {}
self.capabilities = ReadOnlyDict(capabilities.get(topic, None))
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.free_ram_mb = 0
self.free_disk_mb = 0
def update_from_compute_node(self, compute):
"""Update information about a host from its compute_node info."""
all_disk_mb = compute['local_gb'] * 1024
all_ram_mb = compute['memory_mb']
if FLAGS.reserved_host_disk_mb > 0:
all_disk_mb -= FLAGS.reserved_host_disk_mb
if FLAGS.reserved_host_memory_mb > 0:
all_ram_mb -= FLAGS.reserved_host_memory_mb
self.free_ram_mb = all_ram_mb
self.free_disk_mb = all_disk_mb
def consume_from_instance(self, instance):
"""Update information about a host from instance info."""
disk_mb = instance['local_gb'] * 1024
ram_mb = instance['memory_mb']
self.free_ram_mb -= ram_mb
self.free_disk_mb -= disk_mb
def passes_filters(self, filter_fns, filter_properties):
"""Return whether or not this host passes filters."""
if self.host in filter_properties.get('ignore_hosts', []):
return False
for filter_fn in filter_fns:
if not filter_fn(self, filter_properties):
return False
return True
def __repr__(self):
return "host '%s': free_ram_mb:%s free_disk_mb:%s" % \
(self.host, self.free_ram_mb, self.free_disk_mb)
class HostManager(object):
"""Base HostManager class."""
# Can be overriden in a subclass
host_state_cls = HostState
def __init__(self):
self.service_states = {} # { <host> : { <service> : { cap k : v }}}
self.filter_classes = self._get_filter_classes()
def _get_filter_classes(self):
"""Get the list of possible filter classes"""
# Imported here to avoid circular imports
from nova.scheduler import filters
def get_itm(nm):
return getattr(filters, nm)
return [get_itm(itm) for itm in dir(filters)
if (type(get_itm(itm)) is types.TypeType)
and issubclass(get_itm(itm), filters.AbstractHostFilter)
and get_itm(itm) is not filters.AbstractHostFilter]
def _choose_host_filters(self, filters):
"""Since the caller may specify which filters to use we need
to have an authoritative list of what is permissible. This
function checks the filter names against a predefined set
of acceptable filters.
"""
if filters is None:
filters = FLAGS.default_host_filters
if not isinstance(filters, (list, tuple)):
filters = [filters]
good_filters = []
bad_filters = []
for filter_name in filters:
found_class = False
for cls in self.filter_classes:
if cls.__name__ == filter_name:
found_class = True
filter_instance = cls()
# Get the filter function
filter_func = getattr(filter_instance,
'host_passes', None)
if filter_func:
good_filters.append(filter_func)
break
if not found_class:
bad_filters.append(filter_name)
if bad_filters:
msg = ", ".join(bad_filters)
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
return good_filters
def filter_hosts(self, hosts, filter_properties, filters=None):
"""Filter hosts and return only ones passing all filters"""
filtered_hosts = []
filter_fns = self._choose_host_filters(filters)
for host in hosts:
if host.passes_filters(filter_fns, filter_properties):
filtered_hosts.append(host)
return filtered_hosts
def get_host_list(self):
"""Returns a list of dicts for each host that the Zone Manager
knows about. Each dict contains the host_name and the service
for that host.
"""
all_hosts = self.service_states.keys()
ret = []
for host in self.service_states:
for svc in self.service_states[host]:
ret.append({"service": svc, "host_name": host})
return ret
def get_service_capabilities(self):
"""Roll up all the individual host info to generic 'service'
capabilities. Each capability is aggregated into
<cap>_min and <cap>_max values."""
hosts_dict = self.service_states
# TODO(sandy) - be smarter about fabricating this structure.
# But it's likely to change once we understand what the Best-Match
# code will need better.
combined = {} # { <service>_<cap> : (min, max), ... }
stale_host_services = {} # { host1 : [svc1, svc2], host2 :[svc1]}
for host, host_dict in hosts_dict.iteritems():
for service_name, service_dict in host_dict.iteritems():
if not service_dict.get("enabled", True):
# Service is disabled; do no include it
continue
# Check if the service capabilities became stale
if self.host_service_caps_stale(host, service_name):
if host not in stale_host_services:
stale_host_services[host] = [] # Adding host key once
stale_host_services[host].append(service_name)
continue
for cap, value in service_dict.iteritems():
if cap == "timestamp": # Timestamp is not needed
continue
key = "%s_%s" % (service_name, cap)
min_value, max_value = combined.get(key, (value, value))
min_value = min(min_value, value)
max_value = max(max_value, value)
combined[key] = (min_value, max_value)
# Delete the expired host services
self.delete_expired_host_services(stale_host_services)
return combined
def update_service_capabilities(self, service_name, host, capabilities):
"""Update the per-service capabilities based on this notification."""
logging.debug(_("Received %(service_name)s service update from "
"%(host)s.") % locals())
service_caps = self.service_states.get(host, {})
# Copy the capabilities, so we don't modify the original dict
capab_copy = dict(capabilities)
capab_copy["timestamp"] = utils.utcnow() # Reported time
service_caps[service_name] = capab_copy
self.service_states[host] = service_caps
def host_service_caps_stale(self, host, service):
"""Check if host service capabilites are not recent enough."""
allowed_time_diff = FLAGS.periodic_interval * 3
caps = self.service_states[host][service]
if (utils.utcnow() - caps["timestamp"]) <= \
datetime.timedelta(seconds=allowed_time_diff):
return False
return True
def delete_expired_host_services(self, host_services_dict):
"""Delete all the inactive host services information."""
for host, services in host_services_dict.iteritems():
service_caps = self.service_states[host]
for service in services:
del service_caps[service]
if len(service_caps) == 0: # Delete host if no services
del self.service_states[host]
def get_all_host_states(self, context, topic):
"""Returns a dict of all the hosts the HostManager
knows about. Also, each of the consumable resources in HostState
are pre-populated and adjusted based on data in the db.
For example:
{'192.168.1.100': HostState(), ...}
Note: this can be very slow with a lot of instances.
InstanceType table isn't required since a copy is stored
with the instance (in case the InstanceType changed since the
instance was created)."""
if topic != 'compute':
raise NotImplementedError(_(
"host_manager only implemented for 'compute'"))
host_state_map = {}
# Make a compute node dict with the bare essential metrics.
compute_nodes = db.compute_node_get_all(context)
for compute in compute_nodes:
service = compute['service']
if not service:
logging.warn(_("No service for compute ID %s") % compute['id'])
continue
host = service['host']
capabilities = self.service_states.get(host, None)
host_state = self.host_state_cls(host, topic,
capabilities=capabilities)
host_state.update_from_compute_node(compute)
host_state_map[host] = host_state
# "Consume" resources from the host the instance resides on.
instances = db.instance_get_all(context)
for instance in instances:
host = instance['host']
if not host:
continue
host_state = host_state_map.get(host, None)
if not host_state:
continue
host_state.consume_from_instance(instance)
return host_state_map

View File

@ -47,38 +47,37 @@ class WeightedHost(object):
This is an attempt to remove some of the ad-hoc dict structures
previously used."""
def __init__(self, weight, host=None, blob=None, zone=None, hostinfo=None):
def __init__(self, weight, host_state=None, blob=None, zone=None):
self.weight = weight
self.blob = blob
self.host = host
self.zone = zone
# Local members. These are not returned outside of the Zone.
self.hostinfo = hostinfo
self.host_state = host_state
def to_dict(self):
x = dict(weight=self.weight)
if self.blob:
x['blob'] = self.blob
if self.host:
x['host'] = self.host
if self.host_state:
x['host'] = self.host_state.host
if self.zone:
x['zone'] = self.zone
return x
def noop_cost_fn(host_info, options=None):
def noop_cost_fn(host_state, weighing_properties):
"""Return a pre-weight cost of 1 for each host"""
return 1
def compute_fill_first_cost_fn(host_info, options=None):
def compute_fill_first_cost_fn(host_state, weighing_properties):
"""More free ram = higher weight. So servers will less free
ram will be preferred."""
return host_info.free_ram_mb
return host_state.free_ram_mb
def weighted_sum(weighted_fns, host_list, options):
def weighted_sum(weighted_fns, host_states, weighing_properties):
"""Use the weighted-sum method to compute a score for an array of objects.
Normalize the results of the objective-functions so that the weights are
meaningful regardless of objective-function's range.
@ -86,7 +85,8 @@ def weighted_sum(weighted_fns, host_list, options):
host_list - [(host, HostInfo()), ...]
weighted_fns - list of weights and functions like:
[(weight, objective-functions), ...]
options is an arbitrary dict of values.
weighing_properties is an arbitrary dict of values that can influence
weights.
Returns a single WeightedHost object which represents the best
candidate.
@ -96,8 +96,8 @@ def weighted_sum(weighted_fns, host_list, options):
# One row per host. One column per function.
scores = []
for weight, fn in weighted_fns:
scores.append([fn(host_info, options) for hostname, host_info
in host_list])
scores.append([fn(host_state, weighing_properties)
for host_state in host_states])
# Adjust the weights in the grid by the functions weight adjustment
# and sum them up to get a final list of weights.
@ -106,16 +106,16 @@ def weighted_sum(weighted_fns, host_list, options):
adjusted_scores.append([weight * score for score in row])
# Now, sum down the columns to get the final score. Column per host.
final_scores = [0.0] * len(host_list)
final_scores = [0.0] * len(host_states)
for row in adjusted_scores:
for idx, col in enumerate(row):
final_scores[idx] += col
# Super-impose the hostinfo into the scores so
# Super-impose the host_state into the scores so
# we don't lose it when we sort.
final_scores = [(final_scores[idx], host_tuple)
for idx, host_tuple in enumerate(host_list)]
final_scores = [(final_scores[idx], host_state)
for idx, host_state in enumerate(host_states)]
final_scores = sorted(final_scores)
weight, (host, hostinfo) = final_scores[0] # Lowest score is the winner!
return WeightedHost(weight, host=host, hostinfo=hostinfo)
weight, host_state = final_scores[0] # Lowest score is the winner!
return WeightedHost(weight, host_state=host_state)

View File

@ -30,7 +30,6 @@ from nova import flags
from nova import log as logging
from nova import manager
from nova import rpc
from nova.scheduler import zone_manager
from nova import utils
LOG = logging.getLogger('nova.scheduler.manager')
@ -44,11 +43,9 @@ class SchedulerManager(manager.Manager):
"""Chooses a host to run instances on."""
def __init__(self, scheduler_driver=None, *args, **kwargs):
self.zone_manager = zone_manager.ZoneManager()
if not scheduler_driver:
scheduler_driver = FLAGS.scheduler_driver
self.driver = utils.import_object(scheduler_driver)
self.driver.set_zone_manager(self.zone_manager)
super(SchedulerManager, self).__init__(*args, **kwargs)
def __getattr__(self, key):
@ -58,29 +55,29 @@ class SchedulerManager(manager.Manager):
@manager.periodic_task
def _poll_child_zones(self, context):
"""Poll child zones periodically to get status."""
self.zone_manager.ping(context)
self.driver.poll_child_zones(context)
def get_host_list(self, context=None):
"""Get a list of hosts from the ZoneManager."""
return self.zone_manager.get_host_list()
def get_host_list(self, context):
"""Get a list of hosts from the HostManager."""
return self.driver.get_host_list()
def get_zone_list(self, context=None):
def get_zone_list(self, context):
"""Get a list of zones from the ZoneManager."""
return self.zone_manager.get_zone_list()
return self.driver.get_zone_list()
def get_zone_capabilities(self, context=None):
def get_service_capabilities(self, context):
"""Get the normalized set of capabilities for this zone."""
return self.zone_manager.get_zone_capabilities(context)
return self.driver.get_service_capabilities()
def update_service_capabilities(self, context=None, service_name=None,
host=None, capabilities=None):
def update_service_capabilities(self, context, service_name=None,
host=None, capabilities=None, **kwargs):
"""Process a capability update from a service node."""
if not capabilities:
if capabilities is None:
capabilities = {}
self.zone_manager.update_service_capabilities(service_name,
host, capabilities)
self.driver.update_service_capabilities(service_name, host,
capabilities)
def select(self, context=None, *args, **kwargs):
def select(self, context, *args, **kwargs):
"""Select a list of hosts best matching the provided specs."""
return self.driver.select(context, *args, **kwargs)

View File

@ -141,20 +141,3 @@ class SimpleScheduler(chance.ChanceScheduler):
return None
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)
def schedule_set_network_host(self, context, *_args, **_kwargs):
"""Picks a host that is up and has the fewest networks."""
elevated = context.elevated()
results = db.service_get_all_network_sorted(elevated)
for result in results:
(service, instance_count) = result
if instance_count >= FLAGS.max_networks:
msg = _("Not enough allocatable networks remaining")
raise exception.NoValidHost(reason=msg)
if self.service_is_up(service):
driver.cast_to_network_host(context, service['host'],
'set_network_host', **_kwargs)
return None
msg = _("Is the appropriate service running?")
raise exception.NoValidHost(reason=msg)

View File

@ -97,7 +97,7 @@ class VsaScheduler(simple.SimpleScheduler):
return True
def _get_service_states(self):
return self.zone_manager.service_states
return self.host_manager.service_states
def _filter_hosts(self, topic, request_spec, host_list=None):

View File

@ -14,12 +14,11 @@
# under the License.
"""
ZoneManager oversees all communications with child Zones.
Manage communication with child zones and keep state for them.
"""
import datetime
import traceback
import UserDict
from eventlet import greenpool
from novaclient import v1_1 as novaclient
@ -34,149 +33,83 @@ flags.DEFINE_integer('zone_db_check_interval', 60,
'Seconds between getting fresh zone info from db.')
flags.DEFINE_integer('zone_failures_to_offline', 3,
'Number of consecutive errors before marking zone offline')
flags.DEFINE_integer('reserved_host_disk_mb', 0,
'Amount of disk in MB to reserve for host/dom0')
flags.DEFINE_integer('reserved_host_memory_mb', 512,
'Amount of memory in MB to reserve for host/dom0')
LOG = logging.getLogger('nova.scheduler.zone_manager')
class ZoneState(object):
"""Holds the state of all connected child zones."""
"""Holds state for a particular zone."""
def __init__(self):
self.is_active = True
self.name = None
self.capabilities = None
self.capabilities = {}
self.attempt = 0
self.last_seen = datetime.datetime.min
self.last_exception = None
self.last_exception_time = None
self.zone_info = {}
def update_credentials(self, zone):
def update_zone_info(self, zone):
"""Update zone credentials from db"""
self.zone_id = zone.id
self.name = zone.name
self.api_url = zone.api_url
self.username = zone.username
self.password = zone.password
self.weight_offset = zone.weight_offset
self.weight_scale = zone.weight_scale
self.zone_info = dict(zone.iteritems())
def update_metadata(self, zone_metadata):
"""Update zone metadata after successful communications with
child zone."""
self.last_seen = utils.utcnow()
self.attempt = 0
self.capabilities = ", ".join(["%s=%s" % (k, v)
for k, v in zone_metadata.iteritems() if k != 'name'])
self.capabilities = dict(
[(k, v) for k, v in zone_metadata.iteritems() if k != 'name'])
self.is_active = True
def to_dict(self):
return dict(name=self.name, capabilities=self.capabilities,
is_active=self.is_active, api_url=self.api_url,
id=self.zone_id, weight_scale=self.weight_scale,
weight_offset=self.weight_offset)
def get_zone_info(self):
db_fields_to_return = ['api_url', 'id', 'weight_scale',
'weight_offset']
zone_info = dict(is_active=self.is_active,
capabilities=self.capabilities)
for field in db_fields_to_return:
zone_info[field] = self.zone_info[field]
return zone_info
def log_error(self, exception):
"""Something went wrong. Check to see if zone should be
marked as offline."""
self.last_exception = exception
self.last_exception_time = utils.utcnow()
api_url = self.api_url
logging.warning(_("'%(exception)s' error talking to "
api_url = self.zone_info['api_url']
LOG.warning(_("'%(exception)s' error talking to "
"zone %(api_url)s") % locals())
max_errors = FLAGS.zone_failures_to_offline
self.attempt += 1
if self.attempt >= max_errors:
self.is_active = False
logging.error(_("No answer from zone %(api_url)s "
LOG.error(_("No answer from zone %(api_url)s "
"after %(max_errors)d "
"attempts. Marking inactive.") % locals())
def call_novaclient(self):
"""Call novaclient. Broken out for testing purposes. Note that
we have to use the admin credentials for this since there is no
available context."""
username = self.zone_info['username']
password = self.zone_info['password']
api_url = self.zone_info['api_url']
region_name = self.zone_info['name']
client = novaclient.Client(username, password, None, api_url,
region_name)
return client.zones.info()._info
def _call_novaclient(zone):
"""Call novaclient. Broken out for testing purposes. Note that
we have to use the admin credentials for this since there is no
available context."""
client = novaclient.Client(zone.username, zone.password, None,
zone.api_url, region_name=zone.name)
return client.zones.info()._info
def _poll_zone(zone):
"""Eventlet worker to poll a zone."""
name = zone.name
url = zone.api_url
logging.debug(_("Polling zone: %(name)s @ %(url)s") % locals())
try:
zone.update_metadata(_call_novaclient(zone))
except Exception, e:
zone.log_error(traceback.format_exc())
class ReadOnlyDict(UserDict.IterableUserDict):
"""A read-only dict."""
def __init__(self, source=None):
self.update(source)
def __setitem__(self, key, item):
raise TypeError
def __delitem__(self, key):
raise TypeError
def clear(self):
raise TypeError
def pop(self, key, *args):
raise TypeError
def popitem(self):
raise TypeError
def update(self, source=None):
if source is None:
def poll(self):
"""Eventlet worker to poll a self."""
if 'api_url' not in self.zone_info:
return
elif isinstance(source, UserDict.UserDict):
self.data = source.data
elif isinstance(source, dict):
self.data = source
else:
raise TypeError
class HostInfo(object):
"""Mutable and immutable information on hosts tracked
by the ZoneManager. This is an attempt to remove the
ad-hoc data structures previously used and lock down
access."""
def __init__(self, host, caps=None, free_ram_mb=0, free_disk_gb=0):
self.host = host
# Read-only capability dicts
self.compute = None
self.volume = None
self.network = None
if caps:
self.compute = ReadOnlyDict(caps.get('compute', None))
self.volume = ReadOnlyDict(caps.get('volume', None))
self.network = ReadOnlyDict(caps.get('network', None))
# Mutable available resources.
# These will change as resources are virtually "consumed".
self.free_ram_mb = free_ram_mb
self.free_disk_gb = free_disk_gb
def consume_resources(self, disk_gb, ram_mb):
"""Consume some of the mutable resources."""
self.free_disk_gb -= disk_gb
self.free_ram_mb -= ram_mb
def __repr__(self):
return "%s ram:%s disk:%s" % \
(self.host, self.free_ram_mb, self.free_disk_gb)
name = self.zone_info['name']
api_url = self.zone_info['api_url']
LOG.debug(_("Polling zone: %(name)s @ %(api_url)s") % locals())
try:
self.update_metadata(self.call_novaclient())
except Exception, e:
self.log_error(traceback.format_exc())
class ZoneManager(object):
@ -184,116 +117,11 @@ class ZoneManager(object):
def __init__(self):
self.last_zone_db_check = datetime.datetime.min
self.zone_states = {} # { <zone_id> : ZoneState }
self.service_states = {} # { <host> : { <service> : { cap k : v }}}
self.green_pool = greenpool.GreenPool()
def get_zone_list(self):
"""Return the list of zones we know about."""
return [zone.to_dict() for zone in self.zone_states.values()]
def get_host_list(self):
"""Returns a list of dicts for each host that the Zone Manager
knows about. Each dict contains the host_name and the service
for that host.
"""
all_hosts = self.service_states.keys()
ret = []
for host in self.service_states:
for svc in self.service_states[host]:
ret.append({"service": svc, "host_name": host})
return ret
def _compute_node_get_all(self, context):
"""Broken out for testing."""
return db.compute_node_get_all(context)
def _instance_get_all(self, context):
"""Broken out for testing."""
return db.instance_get_all(context)
def get_all_host_data(self, context):
"""Returns a dict of all the hosts the ZoneManager
knows about. Also, each of the consumable resources in HostInfo
are pre-populated and adjusted based on data in the db.
For example:
{'192.168.1.100': HostInfo(), ...}
Note: this can be very slow with a lot of instances.
InstanceType table isn't required since a copy is stored
with the instance (in case the InstanceType changed since the
instance was created)."""
# Make a compute node dict with the bare essential metrics.
compute_nodes = self._compute_node_get_all(context)
host_info_map = {}
for compute in compute_nodes:
all_disk = compute['local_gb']
all_ram = compute['memory_mb']
service = compute['service']
if not service:
logging.warn(_("No service for compute ID %s") % compute['id'])
continue
host = service['host']
caps = self.service_states.get(host, None)
host_info = HostInfo(host, caps=caps,
free_disk_gb=all_disk, free_ram_mb=all_ram)
# Reserve resources for host/dom0
host_info.consume_resources(FLAGS.reserved_host_disk_mb * 1024,
FLAGS.reserved_host_memory_mb)
host_info_map[host] = host_info
# "Consume" resources from the host the instance resides on.
instances = self._instance_get_all(context)
for instance in instances:
host = instance['host']
if not host:
continue
host_info = host_info_map.get(host, None)
if not host_info:
continue
disk = instance['local_gb']
ram = instance['memory_mb']
host_info.consume_resources(disk, ram)
return host_info_map
def get_zone_capabilities(self, context):
"""Roll up all the individual host info to generic 'service'
capabilities. Each capability is aggregated into
<cap>_min and <cap>_max values."""
hosts_dict = self.service_states
# TODO(sandy) - be smarter about fabricating this structure.
# But it's likely to change once we understand what the Best-Match
# code will need better.
combined = {} # { <service>_<cap> : (min, max), ... }
stale_host_services = {} # { host1 : [svc1, svc2], host2 :[svc1]}
for host, host_dict in hosts_dict.iteritems():
for service_name, service_dict in host_dict.iteritems():
if not service_dict.get("enabled", True):
# Service is disabled; do no include it
continue
#Check if the service capabilities became stale
if self.host_service_caps_stale(host, service_name):
if host not in stale_host_services:
stale_host_services[host] = [] # Adding host key once
stale_host_services[host].append(service_name)
continue
for cap, value in service_dict.iteritems():
if cap == "timestamp": # Timestamp is not needed
continue
key = "%s_%s" % (service_name, cap)
min_value, max_value = combined.get(key, (value, value))
min_value = min(min_value, value)
max_value = max(max_value, value)
combined[key] = (min_value, max_value)
# Delete the expired host services
self.delete_expired_host_services(stale_host_services)
return combined
return [zone.get_zone_info() for zone in self.zone_states.values()]
def _refresh_from_db(self, context):
"""Make our zone state map match the db."""
@ -302,10 +130,11 @@ class ZoneManager(object):
existing = self.zone_states.keys()
db_keys = []
for zone in zones:
db_keys.append(zone.id)
if zone.id not in existing:
self.zone_states[zone.id] = ZoneState()
self.zone_states[zone.id].update_credentials(zone)
zone_id = zone['id']
db_keys.append(zone_id)
if zone_id not in existing:
self.zone_states[zone_id] = ZoneState()
self.zone_states[zone_id].update_zone_info(zone)
# Cleanup zones removed from db ...
keys = self.zone_states.keys() # since we're deleting
@ -313,42 +142,19 @@ class ZoneManager(object):
if zone_id not in db_keys:
del self.zone_states[zone_id]
def _poll_zones(self, context):
def _poll_zones(self):
"""Try to connect to each child zone and get update."""
self.green_pool.imap(_poll_zone, self.zone_states.values())
def _worker(zone_state):
zone_state.poll()
self.green_pool.imap(_worker, self.zone_states.values())
def ping(self, context):
"""Ping should be called periodically to update zone status."""
def update(self, context):
"""Update status for all zones. This should be called
periodically to refresh the zone states.
"""
diff = utils.utcnow() - self.last_zone_db_check
if diff.seconds >= FLAGS.zone_db_check_interval:
logging.debug(_("Updating zone cache from db."))
LOG.debug(_("Updating zone cache from db."))
self.last_zone_db_check = utils.utcnow()
self._refresh_from_db(context)
self._poll_zones(context)
def update_service_capabilities(self, service_name, host, capabilities):
"""Update the per-service capabilities based on this notification."""
logging.debug(_("Received %(service_name)s service update from "
"%(host)s.") % locals())
service_caps = self.service_states.get(host, {})
capabilities["timestamp"] = utils.utcnow() # Reported time
service_caps[service_name] = capabilities
self.service_states[host] = service_caps
def host_service_caps_stale(self, host, service):
"""Check if host service capabilites are not recent enough."""
allowed_time_diff = FLAGS.periodic_interval * 3
caps = self.service_states[host][service]
if (utils.utcnow() - caps["timestamp"]) <= \
datetime.timedelta(seconds=allowed_time_diff):
return False
return True
def delete_expired_host_services(self, host_services_dict):
"""Delete all the inactive host services information."""
for host, services in host_services_dict.iteritems():
service_caps = self.service_states[host]
for service in services:
del service_caps[service]
if len(service_caps) == 0: # Delete host if no services
del self.service_states[host]
self._poll_zones()

View File

@ -13,25 +13,52 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
Fakes For Distributed Scheduler tests.
Fakes For Scheduler tests.
"""
from nova import db
from nova.scheduler import distributed_scheduler
from nova.scheduler import host_manager
from nova.scheduler import zone_manager
COMPUTE_NODES = [
dict(id=1, local_gb=1024, memory_mb=1024, service=dict(host='host1')),
dict(id=2, local_gb=2048, memory_mb=2048, service=dict(host='host2')),
dict(id=3, local_gb=4096, memory_mb=4096, service=dict(host='host3')),
dict(id=4, local_gb=8192, memory_mb=8192, service=dict(host='host4')),
# Broken entry
dict(id=5, local_gb=1024, memory_mb=1024, service=None),
]
INSTANCES = [
dict(local_gb=512, memory_mb=512, host='host1'),
dict(local_gb=512, memory_mb=512, host='host2'),
dict(local_gb=512, memory_mb=512, host='host2'),
dict(local_gb=1024, memory_mb=1024, host='host3'),
# Broken host
dict(local_gb=1024, memory_mb=1024, host=None),
# No matching host
dict(local_gb=1024, memory_mb=1024, host='host5'),
]
class FakeDistributedScheduler(distributed_scheduler.DistributedScheduler):
# No need to stub anything at the moment
pass
def __init__(self, *args, **kwargs):
super(FakeDistributedScheduler, self).__init__(*args, **kwargs)
self.zone_manager = zone_manager.ZoneManager()
self.host_manager = host_manager.HostManager()
class FakeZoneManager(zone_manager.ZoneManager):
class FakeHostManager(host_manager.HostManager):
"""host1: free_ram_mb=1024-512-512=0, free_disk_gb=1024-512-512=0
host2: free_ram_mb=2048-512=1536 free_disk_gb=2048-512=1536
host3: free_ram_mb=4096-1024=3072 free_disk_gb=4096-1024=3072
host4: free_ram_mb=8192 free_disk_gb=8192"""
def __init__(self):
super(FakeHostManager, self).__init__()
self.service_states = {
'host1': {
'compute': {'host_memory_free': 1073741824},
@ -55,18 +82,17 @@ class FakeZoneManager(zone_manager.ZoneManager):
('host4', dict(free_disk_gb=8192, free_ram_mb=8192)),
]
def _compute_node_get_all(self, context):
return [
dict(local_gb=1024, memory_mb=1024, service=dict(host='host1')),
dict(local_gb=2048, memory_mb=2048, service=dict(host='host2')),
dict(local_gb=4096, memory_mb=4096, service=dict(host='host3')),
dict(local_gb=8192, memory_mb=8192, service=dict(host='host4')),
]
def _instance_get_all(self, context):
return [
dict(local_gb=512, memory_mb=512, host='host1'),
dict(local_gb=512, memory_mb=512, host='host1'),
dict(local_gb=512, memory_mb=512, host='host2'),
dict(local_gb=1024, memory_mb=1024, host='host3'),
]
class FakeHostState(host_manager.HostState):
def __init__(self, host, topic, attribute_dict):
super(FakeHostState, self).__init__(host, topic)
for (key, val) in attribute_dict.iteritems():
setattr(self, key, val)
def mox_host_manager_db_calls(mox, context):
mox.StubOutWithMock(db, 'compute_node_get_all')
mox.StubOutWithMock(db, 'instance_get_all')
db.compute_node_get_all(context).AndReturn(COMPUTE_NODES)
db.instance_get_all(context).AndReturn(INSTANCES)

View File

@ -18,29 +18,15 @@ Tests For Distributed Scheduler.
import json
import nova.db
from nova.compute import api as compute_api
from nova import context
from nova import db
from nova import exception
from nova import test
from nova.scheduler import distributed_scheduler
from nova.scheduler import least_cost
from nova.scheduler import zone_manager
from nova.tests.scheduler import fake_zone_manager as ds_fakes
class FakeEmptyZoneManager(zone_manager.ZoneManager):
def __init__(self):
self.service_states = {}
def get_host_list_from_db(self, context):
return []
def _compute_node_get_all(*args, **kwargs):
return []
def _instance_get_all(*args, **kwargs):
return []
from nova.scheduler import host_manager
from nova import test
from nova.tests.scheduler import fakes
def fake_call_zone_method(context, method, specs, zones):
@ -80,8 +66,8 @@ def fake_zone_get_all(context):
]
def fake_filter_hosts(topic, request_info, unfiltered_hosts, options):
return unfiltered_hosts
def fake_filter_hosts(hosts, filter_properties):
return list(hosts)
class DistributedSchedulerTestCase(test.TestCase):
@ -92,7 +78,7 @@ class DistributedSchedulerTestCase(test.TestCase):
properly adjusted based on the scale/offset in the zone
db entries.
"""
sched = ds_fakes.FakeDistributedScheduler()
sched = fakes.FakeDistributedScheduler()
child_results = fake_call_zone_method(None, None, None, None)
zones = fake_zone_get_all(None)
weighted_hosts = sched._adjust_child_weights(child_results, zones)
@ -113,14 +99,14 @@ class DistributedSchedulerTestCase(test.TestCase):
def _fake_empty_call_zone_method(*args, **kwargs):
return []
sched = ds_fakes.FakeDistributedScheduler()
sched.zone_manager = FakeEmptyZoneManager()
sched = fakes.FakeDistributedScheduler()
self.stubs.Set(sched, '_call_zone_method',
_fake_empty_call_zone_method)
self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all)
self.stubs.Set(db, 'zone_get_all', fake_zone_get_all)
fake_context = context.RequestContext('user', 'project')
request_spec = dict(instance_type=dict(memory_mb=1, local_gb=1))
request_spec = {'instance_type': {'memory_mb': 1, 'local_gb': 1},
'instance_properties': {'project_id': 1}}
self.assertRaises(exception.NoValidHost, sched.schedule_run_instance,
fake_context, request_spec)
@ -150,7 +136,7 @@ class DistributedSchedulerTestCase(test.TestCase):
self.child_zone_called = True
return 2
sched = ds_fakes.FakeDistributedScheduler()
sched = fakes.FakeDistributedScheduler()
self.stubs.Set(sched, '_schedule', _fake_schedule)
self.stubs.Set(sched, '_make_weighted_host_from_blob',
_fake_make_weighted_host_from_blob)
@ -185,7 +171,7 @@ class DistributedSchedulerTestCase(test.TestCase):
self.was_admin = context.is_admin
return []
sched = ds_fakes.FakeDistributedScheduler()
sched = fakes.FakeDistributedScheduler()
self.stubs.Set(sched, '_schedule', fake_schedule)
fake_context = context.RequestContext('user', 'project')
@ -196,15 +182,16 @@ class DistributedSchedulerTestCase(test.TestCase):
def test_schedule_bad_topic(self):
"""Parameter checking."""
sched = ds_fakes.FakeDistributedScheduler()
sched = fakes.FakeDistributedScheduler()
self.assertRaises(NotImplementedError, sched._schedule, None, "foo",
{})
def test_schedule_no_instance_type(self):
"""Parameter checking."""
sched = ds_fakes.FakeDistributedScheduler()
sched = fakes.FakeDistributedScheduler()
request_spec = {'instance_properties': {}}
self.assertRaises(NotImplementedError, sched._schedule, None,
"compute", {})
"compute", request_spec=request_spec)
def test_schedule_happy_day(self):
"""Make sure there's nothing glaringly wrong with _schedule()
@ -218,26 +205,31 @@ class DistributedSchedulerTestCase(test.TestCase):
return least_cost.WeightedHost(self.next_weight, host=host,
hostinfo=hostinfo)
sched = ds_fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project')
sched.zone_manager = ds_fakes.FakeZoneManager()
self.stubs.Set(sched, '_filter_hosts', fake_filter_hosts)
sched = fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
self.stubs.Set(sched.host_manager, 'filter_hosts',
fake_filter_hosts)
self.stubs.Set(least_cost, 'weighted_sum', _fake_weighted_sum)
self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all)
self.stubs.Set(db, 'zone_get_all', fake_zone_get_all)
self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method)
instance_type = dict(memory_mb=512, local_gb=512)
request_spec = dict(num_instances=10, instance_type=instance_type)
request_spec = {'num_instances': 10,
'instance_type': {'memory_mb': 512, 'local_gb': 512},
'instance_properties': {'project_id': 1}}
self.mox.ReplayAll()
weighted_hosts = sched._schedule(fake_context, 'compute',
request_spec)
request_spec)
self.mox.VerifyAll()
self.assertEquals(len(weighted_hosts), 10)
for weighted_host in weighted_hosts:
# We set this up so remote hosts have even weights ...
if int(weighted_host.weight) % 2 == 0:
self.assertTrue(weighted_host.zone is not None)
self.assertTrue(weighted_host.host is None)
self.assertTrue(weighted_host.host_state is None)
else:
self.assertTrue(weighted_host.host is not None)
self.assertTrue(weighted_host.host_state is not None)
self.assertTrue(weighted_host.zone is None)
def test_schedule_local_zone(self):
@ -248,33 +240,41 @@ class DistributedSchedulerTestCase(test.TestCase):
def _fake_weighted_sum(functions, hosts, options):
self.next_weight += 2.0
host, hostinfo = hosts[0]
return least_cost.WeightedHost(self.next_weight, host=host,
hostinfo=hostinfo)
host = hosts[0]
return least_cost.WeightedHost(self.next_weight, host_state=host)
sched = ds_fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project')
sched.zone_manager = ds_fakes.FakeZoneManager()
self.stubs.Set(sched, '_filter_hosts', fake_filter_hosts)
sched = fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project',
is_admin=True)
fakes.mox_host_manager_db_calls(self.mox, fake_context)
self.stubs.Set(sched.host_manager, 'filter_hosts',
fake_filter_hosts)
self.stubs.Set(least_cost, 'weighted_sum', _fake_weighted_sum)
self.stubs.Set(nova.db, 'zone_get_all', fake_zone_get_all)
self.stubs.Set(db, 'zone_get_all', fake_zone_get_all)
self.stubs.Set(sched, '_call_zone_method', fake_call_zone_method)
instance_type = dict(memory_mb=512, local_gb=512)
request_spec = dict(num_instances=10, instance_type=instance_type,
local_zone=True)
request_spec = {'num_instances': 10,
'instance_type': {'memory_mb': 512, 'local_gb': 512},
'instance_properties': {'project_id': 1,
'memory_mb': 512,
'local_gb': 512},
'local_zone': True}
self.mox.ReplayAll()
weighted_hosts = sched._schedule(fake_context, 'compute',
request_spec)
self.mox.VerifyAll()
self.assertEquals(len(weighted_hosts), 10)
for weighted_host in weighted_hosts:
# There should be no remote hosts
self.assertTrue(weighted_host.host is not None)
self.assertTrue(weighted_host.host_state is not None)
self.assertTrue(weighted_host.zone is None)
def test_decrypt_blob(self):
"""Test that the decrypt method works."""
fixture = ds_fakes.FakeDistributedScheduler()
fixture = fakes.FakeDistributedScheduler()
test_data = {'weight': 1, 'host': 'x', 'blob': 'y', 'zone': 'z'}
class StubDecryptor(object):
@ -290,49 +290,42 @@ class DistributedSchedulerTestCase(test.TestCase):
blob='y', zone='z'))
def test_get_cost_functions(self):
fixture = ds_fakes.FakeDistributedScheduler()
self.flags(reserved_host_memory_mb=128)
fixture = fakes.FakeDistributedScheduler()
fns = fixture.get_cost_functions()
self.assertEquals(len(fns), 1)
weight, fn = fns[0]
self.assertEquals(weight, 1.0)
hostinfo = zone_manager.HostInfo('host', free_ram_mb=1000)
self.assertEquals(1000, fn(hostinfo))
hostinfo = host_manager.HostState('host', 'compute')
hostinfo.update_from_compute_node(dict(memory_mb=1000,
local_gb=0))
self.assertEquals(1000 - 128, fn(hostinfo, {}))
def test_filter_hosts_avoid(self):
"""Test to make sure _filter_hosts() filters original hosts if
avoid_original_host is True."""
def test_populate_filter_properties(self):
request_spec = {'instance_properties': {}}
fixture = fakes.FakeDistributedScheduler()
filter_properties = {'ignore_hosts': []}
fixture.populate_filter_properties(request_spec, filter_properties)
self.assertEqual(len(filter_properties['ignore_hosts']), 0)
def _fake_choose_host_filters():
return []
# No original host results in not ignoring
request_spec = {'instance_properties': {},
'avoid_original_host': True}
fixture = fakes.FakeDistributedScheduler()
fixture.populate_filter_properties(request_spec, filter_properties)
self.assertEqual(len(filter_properties['ignore_hosts']), 0)
sched = ds_fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project')
self.stubs.Set(sched, '_choose_host_filters',
_fake_choose_host_filters)
# Original host but avoid is False should not ignore it
request_spec = {'instance_properties': {'host': 'foo'},
'avoid_original_host': False}
fixture = fakes.FakeDistributedScheduler()
fixture.populate_filter_properties(request_spec, filter_properties)
self.assertEqual(len(filter_properties['ignore_hosts']), 0)
hosts = [('host1', '1info'), ('host2', '2info'), ('host3', '3info')]
request_spec = dict(instance_properties=dict(host='host2'),
avoid_original_host=True)
filtered = sched._filter_hosts('compute', request_spec, hosts, {})
self.assertEqual(filtered,
[('host1', '1info'), ('host3', '3info')])
def test_filter_hosts_no_avoid(self):
"""Test to make sure _filter_hosts() does not filter original
hosts if avoid_original_host is False."""
def _fake_choose_host_filters():
return []
sched = ds_fakes.FakeDistributedScheduler()
fake_context = context.RequestContext('user', 'project')
self.stubs.Set(sched, '_choose_host_filters',
_fake_choose_host_filters)
hosts = [('host1', '1info'), ('host2', '2info'), ('host3', '3info')]
request_spec = dict(instance_properties=dict(host='host2'),
avoid_original_host=False)
filtered = sched._filter_hosts('compute', request_spec, hosts, {})
self.assertEqual(filtered, hosts)
# Original host but avoid is True should ignore it
request_spec = {'instance_properties': {'host': 'foo'},
'avoid_original_host': True}
fixture = fakes.FakeDistributedScheduler()
fixture.populate_filter_properties(request_spec, filter_properties)
self.assertEqual(len(filter_properties['ignore_hosts']), 1)
self.assertEqual(filter_properties['ignore_hosts'][0], 'foo')

View File

@ -1,252 +0,0 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Scheduler Host Filters.
"""
import json
import nova
from nova import exception
from nova import test
from nova.scheduler import distributed_scheduler as dist
from nova.tests.scheduler import fake_zone_manager as ds_fakes
class HostFilterTestCase(test.TestCase):
"""Test case for host filters."""
def _host_caps(self, multiplier):
# Returns host capabilities in the following way:
# host1 = memory:free 10 (100max)
# disk:available 100 (1000max)
# hostN = memory:free 10 + 10N
# disk:available 100 + 100N
# in other words: hostN has more resources than host0
# which means ... don't go above 10 hosts.
return {'host_name-description': 'XenServer %s' % multiplier,
'host_hostname': 'xs-%s' % multiplier,
'host_memory_total': 100,
'host_memory_overhead': 10,
'host_memory_free': 10 + multiplier * 10,
'host_memory_free-computed': 10 + multiplier * 10,
'host_other-config': {},
'host_ip_address': '192.168.1.%d' % (100 + multiplier),
'host_cpu_info': {},
'disk_available': 100 + multiplier * 100,
'disk_total': 1000,
'disk_used': 0,
'host_uuid': 'xxx-%d' % multiplier,
'host_name-label': 'xs-%s' % multiplier,
'enabled': True}
def setUp(self):
super(HostFilterTestCase, self).setUp()
default_host_filters = ['AllHostsFilter']
self.flags(default_host_filters=default_host_filters,
reserved_host_disk_mb=0, reserved_host_memory_mb=0)
self.instance_type = dict(name='tiny',
memory_mb=30,
vcpus=10,
local_gb=300,
flavorid=1,
swap=500,
rxtx_quota=30000,
rxtx_cap=200,
extra_specs={})
self.gpu_instance_type = dict(name='tiny.gpu',
memory_mb=30,
vcpus=10,
local_gb=300,
flavorid=2,
swap=500,
rxtx_quota=30000,
rxtx_cap=200,
extra_specs={'xpu_arch': 'fermi',
'xpu_info': 'Tesla 2050'})
self.zone_manager = ds_fakes.FakeZoneManager()
states = {}
for x in xrange(4):
states['host%d' % (x + 1)] = {'compute': self._host_caps(x)}
self.zone_manager.service_states = states
# Add some extra capabilities to some hosts
host4 = self.zone_manager.service_states['host4']['compute']
host4['xpu_arch'] = 'fermi'
host4['xpu_info'] = 'Tesla 2050'
host2 = self.zone_manager.service_states['host2']['compute']
host2['xpu_arch'] = 'radeon'
host3 = self.zone_manager.service_states['host3']['compute']
host3['xpu_arch'] = 'fermi'
host3['xpu_info'] = 'Tesla 2150'
def _get_all_hosts(self):
return self.zone_manager.get_all_host_data(None).items()
def test_choose_filter(self):
# Test default filter ...
sched = dist.DistributedScheduler()
hfs = sched._choose_host_filters()
hf = hfs[0]
self.assertEquals(hf._full_name().split(".")[-1], 'AllHostsFilter')
# Test valid filter ...
hfs = sched._choose_host_filters('InstanceTypeFilter')
hf = hfs[0]
self.assertEquals(hf._full_name().split(".")[-1], 'InstanceTypeFilter')
# Test invalid filter ...
try:
sched._choose_host_filters('does not exist')
self.fail("Should not find host filter.")
except exception.SchedulerHostFilterNotFound:
pass
def test_all_host_filter(self):
sched = dist.DistributedScheduler()
hfs = sched._choose_host_filters('AllHostsFilter')
hf = hfs[0]
all_hosts = self._get_all_hosts()
cooked = hf.instance_type_to_filter(self.instance_type)
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(4, len(hosts))
for host, capabilities in hosts:
self.assertTrue(host.startswith('host'))
def test_instance_type_filter(self):
hf = nova.scheduler.filters.InstanceTypeFilter()
# filter all hosts that can support 30 ram and 300 disk
cooked = hf.instance_type_to_filter(self.instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(3, len(hosts))
just_hosts = [host for host, hostinfo in hosts]
just_hosts.sort()
self.assertEquals('host4', just_hosts[2])
self.assertEquals('host3', just_hosts[1])
self.assertEquals('host2', just_hosts[0])
def test_instance_type_filter_reserved_memory(self):
self.flags(reserved_host_memory_mb=2048)
hf = nova.scheduler.filters.InstanceTypeFilter()
# filter all hosts that can support 30 ram and 300 disk after
# reserving 2048 ram
cooked = hf.instance_type_to_filter(self.instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(2, len(hosts))
just_hosts = [host for host, hostinfo in hosts]
just_hosts.sort()
self.assertEquals('host4', just_hosts[1])
self.assertEquals('host3', just_hosts[0])
def test_instance_type_filter_extra_specs(self):
hf = nova.scheduler.filters.InstanceTypeFilter()
# filter all hosts that can support 30 ram and 300 disk
cooked = hf.instance_type_to_filter(self.gpu_instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(1, len(hosts))
just_hosts = [host for host, caps in hosts]
self.assertEquals('host4', just_hosts[0])
def test_json_filter(self):
hf = nova.scheduler.filters.JsonFilter()
# filter all hosts that can support 30 ram and 300 disk
cooked = hf.instance_type_to_filter(self.instance_type)
all_hosts = self._get_all_hosts()
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(2, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
self.assertEquals('host3', just_hosts[0])
self.assertEquals('host4', just_hosts[1])
# Try some custom queries
raw = ['or',
['and',
['<', '$compute.host_memory_free', 30],
['<', '$compute.disk_available', 300],
],
['and',
['>', '$compute.host_memory_free', 30],
['>', '$compute.disk_available', 300],
]
]
cooked = json.dumps(raw)
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(3, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
for index, host in zip([1, 2, 4], just_hosts):
self.assertEquals('host%d' % index, host)
raw = ['not',
['=', '$compute.host_memory_free', 30],
]
cooked = json.dumps(raw)
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(3, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
for index, host in zip([1, 2, 4], just_hosts):
self.assertEquals('host%d' % index, host)
raw = ['in', '$compute.host_memory_free', 20, 40, 60, 80, 100]
cooked = json.dumps(raw)
hosts = hf.filter_hosts(all_hosts, cooked, {})
self.assertEquals(2, len(hosts))
just_hosts = [host for host, caps in hosts]
just_hosts.sort()
for index, host in zip([2, 4], just_hosts):
self.assertEquals('host%d' % index, host)
# Try some bogus input ...
raw = ['unknown command', ]
cooked = json.dumps(raw)
try:
hf.filter_hosts(all_hosts, cooked, {})
self.fail("Should give KeyError")
except KeyError, e:
pass
self.assertTrue(hf.filter_hosts(all_hosts, json.dumps([]), {}))
self.assertTrue(hf.filter_hosts(all_hosts, json.dumps({}), {}))
self.assertTrue(hf.filter_hosts(all_hosts, json.dumps(
['not', True, False, True, False],
), {}))
try:
hf.filter_hosts(all_hosts, json.dumps(
'not', True, False, True, False,), {})
self.fail("Should give KeyError")
except KeyError, e:
pass
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(['=', '$foo', 100]), {}))
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(['=', '$.....', 100]), {}))
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(
['>', ['and', ['or', ['not', ['<', ['>=', ['<=', ['in', ]]]]]]]]),
{}))
self.assertFalse(hf.filter_hosts(all_hosts,
json.dumps(['=', {}, ['>', '$missing....foo']]), {}))

View File

@ -0,0 +1,333 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Scheduler Host Filters.
"""
import json
from nova.scheduler import filters
from nova import test
from nova.tests.scheduler import fakes
class HostFiltersTestCase(test.TestCase):
"""Test case for host filters."""
def setUp(self):
super(HostFiltersTestCase, self).setUp()
self.json_query = json.dumps(
['and', ['>=', '$free_ram_mb', 1024],
['>=', '$free_disk_mb', 200 * 1024]])
def test_all_host_filter(self):
filt_cls = filters.AllHostsFilter()
host = fakes.FakeHostState('host1', 'compute', {})
self.assertTrue(filt_cls.host_passes(host, {}))
def test_compute_filter_passes(self):
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_on_memory(self):
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1023, 'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_on_disabled(self):
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_passes_on_volume(self):
filt_cls = filters.ComputeFilter()
filter_properties = {'instance_type': {'memory_mb': 1024}}
capabilities = {'enabled': False}
host = fakes.FakeHostState('host1', 'volume',
{'free_ram_mb': 1024, 'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_passes_on_no_instance_type(self):
filt_cls = filters.ComputeFilter()
filter_properties = {}
capabilities = {'enabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_passes_extra_specs(self):
filt_cls = filters.ComputeFilter()
extra_specs = {'opt1': 1, 'opt2': 2}
capabilities = {'enabled': True, 'opt1': 1, 'opt2': 2}
filter_properties = {'instance_type': {'memory_mb': 1024,
'extra_specs': extra_specs}}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_compute_filter_fails_extra_specs(self):
filt_cls = filters.ComputeFilter()
extra_specs = {'opt1': 1, 'opt2': 3}
capabilities = {'enabled': True, 'opt1': 1, 'opt2': 2}
filter_properties = {'instance_type': {'memory_mb': 1024,
'extra_specs': extra_specs}}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024, 'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_passes(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
'free_disk_mb': 200 * 1024,
'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_json_filter_passes_with_no_query(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200}}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 0,
'free_disk_mb': 0,
'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_json_filter_fails_on_memory(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1023,
'free_disk_mb': 200 * 1024,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_fails_on_disk(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
'free_disk_mb': (200 * 1024) - 1,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_fails_on_disk(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
capabilities = {'enabled': True}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
'free_disk_mb': (200 * 1024) - 1,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_fails_on_disabled(self):
filt_cls = filters.JsonFilter()
filter_properties = {'instance_type': {'memory_mb': 1024,
'local_gb': 200},
'query': self.json_query}
capabilities = {'enabled': False}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 1024,
'free_disk_mb': 200 * 1024,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_happy_day(self):
"""Test json filter more thoroughly"""
filt_cls = filters.JsonFilter()
raw = ['and',
['=', '$capabilities.opt1', 'match'],
['or',
['and',
['<', '$free_ram_mb', 30],
['<', '$free_disk_mb', 300]],
['and',
['>', '$free_ram_mb', 30],
['>', '$free_disk_mb', 300]]]]
filter_properties = {'query': json.dumps(raw)}
# Passes
capabilities = {'enabled': True, 'opt1': 'match'}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 10,
'free_disk_mb': 200,
'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
# Passes
capabilities = {'enabled': True, 'opt1': 'match'}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 40,
'free_disk_mb': 400,
'capabilities': capabilities})
self.assertTrue(filt_cls.host_passes(host, filter_properties))
# Failes due to disabled
capabilities = {'enabled': False, 'opt1': 'match'}
host = fakes.FakeHostState('host1', 'instance_type',
{'free_ram_mb': 40,
'free_disk_mb': 400,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
# Fails due to being exact memory/disk we don't want
capabilities = {'enabled': True, 'opt1': 'match'}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 30,
'free_disk_mb': 300,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
# Fails due to memory lower but disk higher
capabilities = {'enabled': True, 'opt1': 'match'}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 20,
'free_disk_mb': 400,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
# Fails due to capabilities 'opt1' not equal
capabilities = {'enabled': True, 'opt1': 'no-match'}
host = fakes.FakeHostState('host1', 'compute',
{'free_ram_mb': 20,
'free_disk_mb': 400,
'capabilities': capabilities})
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_basic_operators(self):
filt_cls = filters.JsonFilter()
host = fakes.FakeHostState('host1', 'compute',
{'capabilities': {'enabled': True}})
# (operator, arguments, expected_result)
ops_to_test = [
['=', [1, 1], True],
['=', [1, 2], False],
['<', [1, 2], True],
['<', [1, 1], False],
['<', [2, 1], False],
['>', [2, 1], True],
['>', [2, 2], False],
['>', [2, 3], False],
['<=', [1, 2], True],
['<=', [1, 1], True],
['<=', [2, 1], False],
['>=', [2, 1], True],
['>=', [2, 2], True],
['>=', [2, 3], False],
['in', [1, 1], True],
['in', [1, 1, 2, 3], True],
['in', [4, 1, 2, 3], False],
['not', [True], False],
['not', [False], True],
['or', [True, False], True],
['or', [False, False], False],
['and', [True, True], True],
['and', [False, False], False],
['and', [True, False], False],
# Nested ((True or False) and (2 > 1)) == Passes
['and', [['or', True, False], ['>', 2, 1]], True]]
for (op, args, expected) in ops_to_test:
raw = [op] + args
filter_properties = {'query': json.dumps(raw)}
self.assertEqual(expected,
filt_cls.host_passes(host, filter_properties))
# This results in [False, True, False, True] and if any are True
# then it passes...
raw = ['not', True, False, True, False]
filter_properties = {'query': json.dumps(raw)}
self.assertTrue(filt_cls.host_passes(host, filter_properties))
# This results in [False, False, False] and if any are True
# then it passes...which this doesn't
raw = ['not', True, True, True]
filter_properties = {'query': json.dumps(raw)}
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_unknown_operator_raises(self):
filt_cls = filters.JsonFilter()
raw = ['!=', 1, 2]
filter_properties = {'query': json.dumps(raw)}
capabilities = {'enabled': True, 'opt1': 'no-match'}
host = fakes.FakeHostState('host1', 'compute',
{'capabilities': {'enabled': True}})
self.assertRaises(KeyError,
filt_cls.host_passes, host, filter_properties)
def test_json_filter_empty_filters_pass(self):
filt_cls = filters.JsonFilter()
capabilities = {'enabled': True, 'opt1': 'no-match'}
host = fakes.FakeHostState('host1', 'compute',
{'capabilities': {'enabled': True}})
raw = []
filter_properties = {'query': json.dumps(raw)}
self.assertTrue(filt_cls.host_passes(host, filter_properties))
raw = {}
filter_properties = {'query': json.dumps(raw)}
self.assertTrue(filt_cls.host_passes(host, filter_properties))
def test_json_filter_invalid_num_arguments_fails(self):
filt_cls = filters.JsonFilter()
capabilities = {'enabled': True, 'opt1': 'no-match'}
host = fakes.FakeHostState('host1', 'compute',
{'capabilities': {'enabled': True}})
raw = ['>', ['and', ['or', ['not', ['<', ['>=', ['<=', ['in', ]]]]]]]]
filter_properties = {'query': json.dumps(raw)}
self.assertFalse(filt_cls.host_passes(host, filter_properties))
raw = ['>', 1]
filter_properties = {'query': json.dumps(raw)}
self.assertFalse(filt_cls.host_passes(host, filter_properties))
def test_json_filter_unknown_variable_ignored(self):
filt_cls = filters.JsonFilter()
capabilities = {'enabled': True, 'opt1': 'no-match'}
host = fakes.FakeHostState('host1', 'compute',
{'capabilities': {'enabled': True}})
raw = ['=', '$........', 1, 1]
filter_properties = {'query': json.dumps(raw)}
self.assertTrue(filt_cls.host_passes(host, filter_properties))
raw = ['=', '$foo', 2, 2]
filter_properties = {'query': json.dumps(raw)}
self.assertTrue(filt_cls.host_passes(host, filter_properties))

View File

@ -0,0 +1,360 @@
# Copyright (c) 2011 Openstack, LLC
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For HostManager
"""
import datetime
import mox
from nova import db
from nova import exception
from nova import log as logging
from nova.scheduler import host_manager
from nova import test
from nova.tests.scheduler import fakes
from nova import utils
class ComputeFilterClass1(object):
def host_passes(self, *args, **kwargs):
pass
class ComputeFilterClass2(object):
def host_passes(self, *args, **kwargs):
pass
class HostManagerTestCase(test.TestCase):
"""Test case for HostManager class"""
def setUp(self):
super(HostManagerTestCase, self).setUp()
self.host_manager = host_manager.HostManager()
def test_choose_host_filters_not_found(self):
self.flags(default_host_filters='ComputeFilterClass3')
self.host_manager.filter_classes = [ComputeFilterClass1,
ComputeFilterClass2]
self.assertRaises(exception.SchedulerHostFilterNotFound,
self.host_manager._choose_host_filters, None)
def test_choose_host_filters(self):
self.flags(default_host_filters=['ComputeFilterClass2'])
self.host_manager.filter_classes = [ComputeFilterClass1,
ComputeFilterClass2]
# Test 'compute' returns 1 correct function
filter_fns = self.host_manager._choose_host_filters(None)
self.assertEqual(len(filter_fns), 1)
self.assertEqual(filter_fns[0].__func__,
ComputeFilterClass2.host_passes.__func__)
def test_filter_hosts(self):
topic = 'fake_topic'
filters = ['fake-filter1', 'fake-filter2']
fake_host1 = host_manager.HostState('host1', topic)
fake_host2 = host_manager.HostState('host2', topic)
hosts = [fake_host1, fake_host2]
filter_properties = 'fake_properties'
self.mox.StubOutWithMock(self.host_manager,
'_choose_host_filters')
self.mox.StubOutWithMock(fake_host1, 'passes_filters')
self.mox.StubOutWithMock(fake_host2, 'passes_filters')
self.host_manager._choose_host_filters(None).AndReturn(filters)
fake_host1.passes_filters(filters, filter_properties).AndReturn(
False)
fake_host2.passes_filters(filters, filter_properties).AndReturn(
True)
self.mox.ReplayAll()
filtered_hosts = self.host_manager.filter_hosts(hosts,
filter_properties, filters=None)
self.mox.VerifyAll()
self.assertEqual(len(filtered_hosts), 1)
self.assertEqual(filtered_hosts[0], fake_host2)
def test_update_service_capabilities(self):
service_states = self.host_manager.service_states
self.assertDictMatch(service_states, {})
self.mox.StubOutWithMock(utils, 'utcnow')
utils.utcnow().AndReturn(31337)
utils.utcnow().AndReturn(31338)
utils.utcnow().AndReturn(31339)
host1_compute_capabs = dict(free_memory=1234, host_memory=5678,
timestamp=1)
host1_volume_capabs = dict(free_disk=4321, timestamp=1)
host2_compute_capabs = dict(free_memory=8756, timestamp=1)
self.mox.ReplayAll()
self.host_manager.update_service_capabilities('compute', 'host1',
host1_compute_capabs)
self.host_manager.update_service_capabilities('volume', 'host1',
host1_volume_capabs)
self.host_manager.update_service_capabilities('compute', 'host2',
host2_compute_capabs)
self.mox.VerifyAll()
# Make sure dictionary isn't re-assigned
self.assertEqual(self.host_manager.service_states, service_states)
# Make sure original dictionary wasn't copied
self.assertEqual(host1_compute_capabs['timestamp'], 1)
host1_compute_capabs['timestamp'] = 31337
host1_volume_capabs['timestamp'] = 31338
host2_compute_capabs['timestamp'] = 31339
expected = {'host1': {'compute': host1_compute_capabs,
'volume': host1_volume_capabs},
'host2': {'compute': host2_compute_capabs}}
self.assertDictMatch(service_states, expected)
def test_host_service_caps_stale(self):
self.flags(periodic_interval=5)
host1_compute_capabs = dict(free_memory=1234, host_memory=5678,
timestamp=datetime.datetime.fromtimestamp(3000))
host1_volume_capabs = dict(free_disk=4321,
timestamp=datetime.datetime.fromtimestamp(3005))
host2_compute_capabs = dict(free_memory=8756,
timestamp=datetime.datetime.fromtimestamp(3010))
service_states = {'host1': {'compute': host1_compute_capabs,
'volume': host1_volume_capabs},
'host2': {'compute': host2_compute_capabs}}
self.host_manager.service_states = service_states
self.mox.StubOutWithMock(utils, 'utcnow')
utils.utcnow().AndReturn(datetime.datetime.fromtimestamp(3020))
utils.utcnow().AndReturn(datetime.datetime.fromtimestamp(3020))
utils.utcnow().AndReturn(datetime.datetime.fromtimestamp(3020))
self.mox.ReplayAll()
res1 = self.host_manager.host_service_caps_stale('host1', 'compute')
res2 = self.host_manager.host_service_caps_stale('host1', 'volume')
res3 = self.host_manager.host_service_caps_stale('host2', 'compute')
self.mox.VerifyAll()
self.assertEqual(res1, True)
self.assertEqual(res2, False)
self.assertEqual(res3, False)
def test_delete_expired_host_services(self):
host1_compute_capabs = dict(free_memory=1234, host_memory=5678,
timestamp=datetime.datetime.fromtimestamp(3000))
host1_volume_capabs = dict(free_disk=4321,
timestamp=datetime.datetime.fromtimestamp(3005))
host2_compute_capabs = dict(free_memory=8756,
timestamp=datetime.datetime.fromtimestamp(3010))
service_states = {'host1': {'compute': host1_compute_capabs,
'volume': host1_volume_capabs},
'host2': {'compute': host2_compute_capabs}}
self.host_manager.service_states = service_states
to_delete = {'host1': {'volume': host1_volume_capabs},
'host2': {'compute': host2_compute_capabs}}
self.host_manager.delete_expired_host_services(to_delete)
# Make sure dictionary isn't re-assigned
self.assertEqual(self.host_manager.service_states, service_states)
expected = {'host1': {'compute': host1_compute_capabs}}
self.assertEqual(service_states, expected)
def test_get_service_capabilities(self):
host1_compute_capabs = dict(free_memory=1000, host_memory=5678,
timestamp=datetime.datetime.fromtimestamp(3000))
host1_volume_capabs = dict(free_disk=4321,
timestamp=datetime.datetime.fromtimestamp(3005))
host2_compute_capabs = dict(free_memory=8756,
timestamp=datetime.datetime.fromtimestamp(3010))
host2_volume_capabs = dict(free_disk=8756,
enabled=False,
timestamp=datetime.datetime.fromtimestamp(3010))
host3_compute_capabs = dict(free_memory=1234, host_memory=4000,
timestamp=datetime.datetime.fromtimestamp(3010))
host3_volume_capabs = dict(free_disk=2000,
timestamp=datetime.datetime.fromtimestamp(3010))
service_states = {'host1': {'compute': host1_compute_capabs,
'volume': host1_volume_capabs},
'host2': {'compute': host2_compute_capabs,
'volume': host2_volume_capabs},
'host3': {'compute': host3_compute_capabs,
'volume': host3_volume_capabs}}
self.host_manager.service_states = service_states
info = {'called': 0}
# This tests with 1 volume disabled (host2), and 1 volume node
# as stale (host1)
def _fake_host_service_caps_stale(host, service):
info['called'] += 1
if host == 'host1':
if service == 'compute':
return False
elif service == 'volume':
return True
elif host == 'host2':
# Shouldn't get here for 'volume' because the service
# is disabled
self.assertEqual(service, 'compute')
return False
self.assertEqual(host, 'host3')
return False
self.stubs.Set(self.host_manager, 'host_service_caps_stale',
_fake_host_service_caps_stale)
self.mox.StubOutWithMock(self.host_manager,
'delete_expired_host_services')
self.host_manager.delete_expired_host_services({'host1': ['volume']})
self.mox.ReplayAll()
result = self.host_manager.get_service_capabilities()
self.mox.VerifyAll()
self.assertEqual(info['called'], 5)
# only 1 volume node active == 'host3', so min/max is 2000
expected = {'volume_free_disk': (2000, 2000),
'compute_host_memory': (4000, 5678),
'compute_free_memory': (1000, 8756)}
self.assertDictMatch(result, expected)
def test_get_all_host_states(self):
self.flags(reserved_host_memory_mb=512,
reserved_host_disk_mb=1024)
context = 'fake_context'
topic = 'compute'
self.mox.StubOutWithMock(db, 'compute_node_get_all')
self.mox.StubOutWithMock(logging, 'warn')
self.mox.StubOutWithMock(db, 'instance_get_all')
db.compute_node_get_all(context).AndReturn(fakes.COMPUTE_NODES)
# Invalid service
logging.warn("No service for compute ID 5")
db.instance_get_all(context).AndReturn(fakes.INSTANCES)
self.mox.ReplayAll()
host_states = self.host_manager.get_all_host_states(context, topic)
self.mox.VerifyAll()
self.assertEqual(len(host_states), 4)
self.assertEqual(host_states['host1'].free_ram_mb, 0)
# 511GB
self.assertEqual(host_states['host1'].free_disk_mb, 523264)
self.assertEqual(host_states['host2'].free_ram_mb, 512)
# 1023GB
self.assertEqual(host_states['host2'].free_disk_mb, 1047552)
self.assertEqual(host_states['host3'].free_ram_mb, 2560)
# 3071GB
self.assertEqual(host_states['host3'].free_disk_mb, 3144704)
self.assertEqual(host_states['host4'].free_ram_mb, 7680)
# 8191GB
self.assertEqual(host_states['host4'].free_disk_mb, 8387584)
class HostStateTestCase(test.TestCase):
"""Test case for HostState class"""
def setUp(self):
super(HostStateTestCase, self).setUp()
# update_from_compute_node() and consume_from_instance() are tested
# in HostManagerTestCase.test_get_all_host_states()
def test_host_state_passes_filters_passes(self):
fake_host = host_manager.HostState('host1', 'compute')
filter_properties = {}
cls1 = ComputeFilterClass1()
cls2 = ComputeFilterClass2()
self.mox.StubOutWithMock(cls1, 'host_passes')
self.mox.StubOutWithMock(cls2, 'host_passes')
filter_fns = [cls1.host_passes, cls2.host_passes]
cls1.host_passes(fake_host, filter_properties).AndReturn(True)
cls2.host_passes(fake_host, filter_properties).AndReturn(True)
self.mox.ReplayAll()
result = fake_host.passes_filters(filter_fns, filter_properties)
self.mox.VerifyAll()
self.assertTrue(result)
def test_host_state_passes_filters_passes_with_ignore(self):
fake_host = host_manager.HostState('host1', 'compute')
filter_properties = {'ignore_hosts': ['host2']}
cls1 = ComputeFilterClass1()
cls2 = ComputeFilterClass2()
self.mox.StubOutWithMock(cls1, 'host_passes')
self.mox.StubOutWithMock(cls2, 'host_passes')
filter_fns = [cls1.host_passes, cls2.host_passes]
cls1.host_passes(fake_host, filter_properties).AndReturn(True)
cls2.host_passes(fake_host, filter_properties).AndReturn(True)
self.mox.ReplayAll()
result = fake_host.passes_filters(filter_fns, filter_properties)
self.mox.VerifyAll()
self.assertTrue(result)
def test_host_state_passes_filters_fails(self):
fake_host = host_manager.HostState('host1', 'compute')
filter_properties = {}
cls1 = ComputeFilterClass1()
cls2 = ComputeFilterClass2()
self.mox.StubOutWithMock(cls1, 'host_passes')
self.mox.StubOutWithMock(cls2, 'host_passes')
filter_fns = [cls1.host_passes, cls2.host_passes]
cls1.host_passes(fake_host, filter_properties).AndReturn(False)
# cls2.host_passes() not called because of short circuit
self.mox.ReplayAll()
result = fake_host.passes_filters(filter_fns, filter_properties)
self.mox.VerifyAll()
self.assertFalse(result)
def test_host_state_passes_filters_fails_from_ignore(self):
fake_host = host_manager.HostState('host1', 'compute')
filter_properties = {'ignore_hosts': ['host1']}
cls1 = ComputeFilterClass1()
cls2 = ComputeFilterClass2()
self.mox.StubOutWithMock(cls1, 'host_passes')
self.mox.StubOutWithMock(cls2, 'host_passes')
filter_fns = [cls1.host_passes, cls2.host_passes]
# cls[12].host_passes() not called because of short circuit
# with matching host to ignore
self.mox.ReplayAll()
result = fake_host.passes_filters(filter_fns, filter_properties)
self.mox.VerifyAll()
self.assertFalse(result)

View File

@ -15,9 +15,10 @@
"""
Tests For Least Cost functions.
"""
from nova import context
from nova.scheduler import least_cost
from nova import test
from nova.tests.scheduler import fake_zone_manager
from nova.tests.scheduler import fakes
def offset(hostinfo, options):
@ -32,38 +33,47 @@ class LeastCostTestCase(test.TestCase):
def setUp(self):
super(LeastCostTestCase, self).setUp()
self.flags(reserved_host_disk_mb=0, reserved_host_memory_mb=0)
self.zone_manager = fake_zone_manager.FakeZoneManager()
self.host_manager = fakes.FakeHostManager()
def tearDown(self):
super(LeastCostTestCase, self).tearDown()
def _get_all_hosts(self):
ctxt = context.get_admin_context()
fakes.mox_host_manager_db_calls(self.mox, ctxt)
self.mox.ReplayAll()
host_states = self.host_manager.get_all_host_states(ctxt,
'compute').values()
self.mox.VerifyAll()
self.mox.ResetAll()
return host_states
def test_weighted_sum_happy_day(self):
fn_tuples = [(1.0, offset), (1.0, scale)]
hostinfo_list = self.zone_manager.get_all_host_data(None).items()
hostinfo_list = self._get_all_hosts()
# host1: free_ram_mb=0
# host2: free_ram_mb=1536
# host1: free_ram_mb=512
# host2: free_ram_mb=1024
# host3: free_ram_mb=3072
# host4: free_ram_mb=8192
# [offset, scale]=
# [10000, 11536, 13072, 18192]
# [0, 768, 1536, 4096]
# [10512, 11024, 13072, 18192]
# [1024, 2048, 6144, 16384]
# adjusted [ 1.0 * x + 1.0 * y] =
# [10000, 12304, 14608, 22288]
# [11536, 13072, 19216, 34576]
# so, host1 should win:
options = {}
weighted_host = least_cost.weighted_sum(fn_tuples, hostinfo_list,
options)
self.assertEqual(weighted_host.weight, 10000)
self.assertEqual(weighted_host.host, 'host1')
options)
self.assertEqual(weighted_host.weight, 11536)
self.assertEqual(weighted_host.host_state.host, 'host1')
def test_weighted_sum_single_function(self):
fn_tuples = [(1.0, offset), ]
hostinfo_list = self.zone_manager.get_all_host_data(None).items()
hostinfo_list = self._get_all_hosts()
# host1: free_ram_mb=0
# host2: free_ram_mb=1536
@ -71,11 +81,11 @@ class LeastCostTestCase(test.TestCase):
# host4: free_ram_mb=8192
# [offset, ]=
# [10000, 11536, 13072, 18192]
# [10512, 11024, 13072, 18192]
# so, host1 should win:
options = {}
weighted_host = least_cost.weighted_sum(fn_tuples, hostinfo_list,
options)
self.assertEqual(weighted_host.weight, 10000)
self.assertEqual(weighted_host.host, 'host1')
self.assertEqual(weighted_host.weight, 10512)
self.assertEqual(weighted_host.host_state.host, 'host1')

View File

@ -0,0 +1,189 @@
# Copyright 2010 United States Government as represented by the
# All Rights Reserved.
# Copyright 2011 OpenStack LLC
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For ZoneManager
"""
import mox
from nova import db
from nova import flags
from nova.scheduler import zone_manager
from nova import test
FLAGS = flags.FLAGS
def _create_zone(zone_id=1, name=None, api_url=None, username=None):
if api_url is None:
api_url = "http://foo.com"
if username is None:
username = "user1"
if name is None:
name = "child1"
return dict(id=zone_id, name=name, api_url=api_url,
username=username, password="pass1", weight_offset=0.0,
weight_scale=1.0)
def exploding_novaclient(zone):
"""Used when we want to simulate a novaclient call failing."""
raise Exception("kaboom")
class ZoneManagerTestCase(test.TestCase):
"""Test case for zone manager"""
zone_manager_cls = zone_manager.ZoneManager
zone_state_cls = zone_manager.ZoneState
def setUp(self):
super(ZoneManagerTestCase, self).setUp()
self.zone_manager = self.zone_manager_cls()
def _create_zone_state(self, zone_id=1, name=None, api_url=None,
username=None):
zone = self.zone_state_cls()
zone.zone_info = _create_zone(zone_id, name, api_url, username)
return zone
def test_update(self):
zm = self.zone_manager
self.mox.StubOutWithMock(zm, '_refresh_from_db')
self.mox.StubOutWithMock(zm, '_poll_zones')
zm._refresh_from_db(mox.IgnoreArg())
zm._poll_zones()
self.mox.ReplayAll()
zm.update(None)
self.mox.VerifyAll()
def test_refresh_from_db_new(self):
zone = _create_zone(zone_id=1, username='user1')
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([zone])
zm = self.zone_manager
self.assertEquals(len(zm.zone_states), 0)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertIn(1, zm.zone_states)
self.assertEquals(zm.zone_states[1].zone_info['username'], 'user1')
def test_refresh_from_db_replace_existing(self):
zone_state = self._create_zone_state(zone_id=1, username='user1')
zm = self.zone_manager
zm.zone_states[1] = zone_state
zone = _create_zone(zone_id=1, username='user2')
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([zone])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].zone_info['username'], 'user2')
def test_refresh_from_db_missing(self):
zone_state = self._create_zone_state(zone_id=1, username='user1')
zm = self.zone_manager
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 0)
def test_refresh_from_db_add(self):
zone_state = self._create_zone_state(zone_id=1, username='user1')
zm = self.zone_manager
zm.zone_states[1] = zone_state
zone1 = _create_zone(zone_id=1, username='user1')
zone2 = _create_zone(zone_id=2, username='user2')
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([zone1, zone2])
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 2)
self.assertIn(1, zm.zone_states)
self.assertIn(2, zm.zone_states)
self.assertEquals(zm.zone_states[1].zone_info['username'], 'user1')
self.assertEquals(zm.zone_states[2].zone_info['username'], 'user2')
def test_refresh_from_db_add_and_delete(self):
zone_state = self._create_zone_state(zone_id=1, username='user1')
zm = self.zone_manager
zm.zone_states[1] = zone_state
zone2 = _create_zone(zone_id=2, username='user2')
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([zone2])
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertIn(2, zm.zone_states)
self.assertEquals(zm.zone_states[2].zone_info['username'], 'user2')
def test_poll_zone(self):
zone_state = self._create_zone_state(zone_id=1, name='child1')
zone_state.attempt = 1
self.mox.StubOutWithMock(zone_state, 'call_novaclient')
zone_state.call_novaclient().AndReturn(
dict(name=zone_state.zone_info['name'],
hairdresser='dietz'))
self.assertDictMatch(zone_state.capabilities, {})
self.mox.ReplayAll()
zone_state.poll()
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 0)
self.assertDictMatch(zone_state.capabilities,
dict(hairdresser='dietz'))
self.assertTrue(zone_state.is_active)
def test_poll_zones_with_failure(self):
zone_state = self._create_zone_state(zone_id=1)
zone_state.attempt = FLAGS.zone_failures_to_offline - 1
self.mox.StubOutWithMock(zone_state, 'call_novaclient')
zone_state.call_novaclient().AndRaise(Exception('foo'))
self.mox.ReplayAll()
zone_state.poll()
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 3)
self.assertFalse(zone_state.is_active)

View File

@ -1,377 +0,0 @@
# Copyright 2010 United States Government as represented by the
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For ZoneManager
"""
import datetime
import mox
from nova import db
from nova import flags
from nova import test
from nova import utils
from nova.scheduler import zone_manager
FLAGS = flags.FLAGS
class FakeZone:
"""Represents a fake zone from the db"""
def __init__(self, *args, **kwargs):
for k, v in kwargs.iteritems():
setattr(self, k, v)
def exploding_novaclient(zone):
"""Used when we want to simulate a novaclient call failing."""
raise Exception("kaboom")
class ZoneManagerTestCase(test.TestCase):
"""Test case for zone manager"""
def test_ping(self):
zm = zone_manager.ZoneManager()
self.mox.StubOutWithMock(zm, '_refresh_from_db')
self.mox.StubOutWithMock(zm, '_poll_zones')
zm._refresh_from_db(mox.IgnoreArg())
zm._poll_zones(mox.IgnoreArg())
self.mox.ReplayAll()
zm.ping(None)
self.mox.VerifyAll()
def test_refresh_from_db_new(self):
zm = zone_manager.ZoneManager()
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=1, api_url='http://foo.com', username='user1',
password='pass1', name='child', weight_offset=0.0,
weight_scale=1.0),
])
self.assertEquals(len(zm.zone_states), 0)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user1')
def test_service_capabilities(self):
zm = zone_manager.ZoneManager()
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, {})
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
zm.update_service_capabilities("svc1", "host1", dict(a=2, b=3))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 2), svc1_b=(3, 3)))
zm.update_service_capabilities("svc1", "host2", dict(a=20, b=30))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30)))
zm.update_service_capabilities("svc10", "host1", dict(a=99, b=99))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc10_a=(99, 99), svc10_b=(99, 99)))
zm.update_service_capabilities("svc1", "host3", dict(c=5))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(2, 20), svc1_b=(3, 30),
svc1_c=(5, 5), svc10_a=(99, 99),
svc10_b=(99, 99)))
def test_refresh_from_db_replace_existing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1', name='child',
weight_offset=0.0, weight_scale=1.0))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=1, api_url='http://foo.com', username='user2',
password='pass2', name='child',
weight_offset=0.0, weight_scale=1.0),
])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[1].username, 'user2')
def test_refresh_from_db_missing(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1', name='child',
weight_offset=0.0, weight_scale=1.0))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 0)
def test_refresh_from_db_add_and_delete(self):
zm = zone_manager.ZoneManager()
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=1, api_url='http://foo.com',
username='user1', password='pass1', name='child',
weight_offset=2.0, weight_scale=3.0))
zm.zone_states[1] = zone_state
self.mox.StubOutWithMock(db, 'zone_get_all')
db.zone_get_all(mox.IgnoreArg()).AndReturn([
FakeZone(id=2, api_url='http://foo.com', username='user2',
password='pass2', name='child', weight_offset=2.0,
weight_scale=3.0),
])
self.assertEquals(len(zm.zone_states), 1)
self.mox.ReplayAll()
zm._refresh_from_db(None)
self.mox.VerifyAll()
self.assertEquals(len(zm.zone_states), 1)
self.assertEquals(zm.zone_states[2].username, 'user2')
def test_poll_zone(self):
self.mox.StubOutWithMock(zone_manager, '_call_novaclient')
zone_manager._call_novaclient(mox.IgnoreArg()).AndReturn(
dict(name='child', capabilities='hairdresser'))
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=2,
api_url='http://foo.com', username='user2',
password='pass2', name='child',
weight_offset=0.0, weight_scale=1.0))
zone_state.attempt = 1
self.mox.ReplayAll()
zone_manager._poll_zone(zone_state)
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 0)
self.assertEquals(zone_state.name, 'child')
def test_poll_zone_fails(self):
self.stubs.Set(zone_manager, "_call_novaclient", exploding_novaclient)
zone_state = zone_manager.ZoneState()
zone_state.update_credentials(FakeZone(id=2,
api_url='http://foo.com', username='user2',
password='pass2', name='child',
weight_offset=0.0, weight_scale=1.0))
zone_state.attempt = FLAGS.zone_failures_to_offline - 1
self.mox.ReplayAll()
zone_manager._poll_zone(zone_state)
self.mox.VerifyAll()
self.assertEquals(zone_state.attempt, 3)
self.assertFalse(zone_state.is_active)
def test_host_service_caps_stale_no_stale_service(self):
zm = zone_manager.ZoneManager()
# services just updated capabilities
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
self.assertFalse(zm.host_service_caps_stale("host1", "svc1"))
self.assertFalse(zm.host_service_caps_stale("host1", "svc2"))
def test_host_service_caps_stale_all_stale_services(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Both services became stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
time_future = utils.utcnow() + datetime.timedelta(seconds=expiry_time)
utils.set_time_override(time_future)
self.assertTrue(zm.host_service_caps_stale("host1", "svc1"))
self.assertTrue(zm.host_service_caps_stale("host1", "svc2"))
utils.clear_time_override()
def test_host_service_caps_stale_one_stale_service(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# One service became stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
caps = zm.service_states["host1"]["svc1"]
caps["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
self.assertTrue(zm.host_service_caps_stale("host1", "svc1"))
self.assertFalse(zm.host_service_caps_stale("host1", "svc2"))
def test_delete_expired_host_services_del_one_service(self):
zm = zone_manager.ZoneManager()
# Delete one service in a host
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
stale_host_services = {"host1": ["svc1"]}
zm.delete_expired_host_services(stale_host_services)
self.assertFalse("svc1" in zm.service_states["host1"])
self.assertTrue("svc2" in zm.service_states["host1"])
def test_delete_expired_host_services_del_all_hosts(self):
zm = zone_manager.ZoneManager()
# Delete all services in a host
zm.update_service_capabilities("svc2", "host1", dict(a=3, b=4))
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
stale_host_services = {"host1": ["svc1", "svc2"]}
zm.delete_expired_host_services(stale_host_services)
self.assertFalse("host1" in zm.service_states)
def test_delete_expired_host_services_del_one_service_per_host(self):
zm = zone_manager.ZoneManager()
# Delete one service per host
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
stale_host_services = {"host1": ["svc1"], "host2": ["svc1"]}
zm.delete_expired_host_services(stale_host_services)
self.assertFalse("host1" in zm.service_states)
self.assertFalse("host2" in zm.service_states)
def test_get_zone_capabilities_one_host(self):
zm = zone_manager.ZoneManager()
# Service capabilities recent
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
def test_get_zone_capabilities_expired_host(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Service capabilities stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
time_future = utils.utcnow() + datetime.timedelta(seconds=expiry_time)
utils.set_time_override(time_future)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, {})
utils.clear_time_override()
def test_get_zone_capabilities_multiple_hosts(self):
zm = zone_manager.ZoneManager()
# Both host service capabilities recent
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 3), svc1_b=(2, 4)))
def test_get_zone_capabilities_one_stale_host(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# One host service capabilities become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
serv_caps = zm.service_states["host1"]["svc1"]
serv_caps["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(3, 3), svc1_b=(4, 4)))
def test_get_zone_capabilities_multiple_service_per_host(self):
zm = zone_manager.ZoneManager()
# Multiple services per host
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 3), svc1_b=(2, 4),
svc2_a=(5, 7), svc2_b=(6, 8)))
def test_get_zone_capabilities_one_stale_service_per_host(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Two host services among four become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
serv_caps_1 = zm.service_states["host1"]["svc2"]
serv_caps_1["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
serv_caps_2 = zm.service_states["host2"]["svc1"]
serv_caps_2["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2),
svc2_a=(7, 7), svc2_b=(8, 8)))
def test_get_zone_capabilities_three_stale_host_services(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# Three host services among four become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
serv_caps_1 = zm.service_states["host1"]["svc2"]
serv_caps_1["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
serv_caps_2 = zm.service_states["host2"]["svc1"]
serv_caps_2["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
serv_caps_3 = zm.service_states["host2"]["svc2"]
serv_caps_3["timestamp"] = utils.utcnow() - \
datetime.timedelta(seconds=expiry_time)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, dict(svc1_a=(1, 1), svc1_b=(2, 2)))
def test_get_zone_capabilities_all_stale_host_services(self):
zm = zone_manager.ZoneManager()
expiry_time = (FLAGS.periodic_interval * 3) + 1
# All the host services become stale
zm.update_service_capabilities("svc1", "host1", dict(a=1, b=2))
zm.update_service_capabilities("svc1", "host2", dict(a=3, b=4))
zm.update_service_capabilities("svc2", "host1", dict(a=5, b=6))
zm.update_service_capabilities("svc2", "host2", dict(a=7, b=8))
time_future = utils.utcnow() + datetime.timedelta(seconds=expiry_time)
utils.set_time_override(time_future)
caps = zm.get_zone_capabilities(None)
self.assertEquals(caps, {})