Introduce Scheduler base driver

Change-Id: I8e37b6233e5b81dd775d4c578259477de40b8fc1
This commit is contained in:
Zhenguo Niu 2016-09-13 23:51:14 +08:00
parent 0bee14006b
commit 63c34183b5
19 changed files with 1283 additions and 0 deletions

View File

@ -22,6 +22,7 @@ from nimble.conf import engine
from nimble.conf import ironic
from nimble.conf import keystone
from nimble.conf import neutron
from nimble.conf import scheduler
CONF = cfg.CONF
@ -32,3 +33,4 @@ engine.register_opts(CONF)
ironic.register_opts(CONF)
keystone.register_opts(CONF)
neutron.register_opts(CONF)
scheduler.register_opts(CONF)

View File

@ -34,6 +34,10 @@ opts = [
default=60,
help=_('Interval between syncing the node resources from '
'ironic, in seconds.')),
cfg.StrOpt('scheduler_driver',
default='nimble.engine.scheduler.filter_scheduler.'
'FilterScheduler',
help='Default scheduler driver to use')
]

View File

@ -19,6 +19,7 @@ import nimble.conf.engine
import nimble.conf.ironic
import nimble.conf.keystone
import nimble.conf.neutron
import nimble.conf.scheduler
_default_opt_lists = [
nimble.conf.default.api_opts,
@ -35,6 +36,7 @@ _opts = [
('ironic', nimble.conf.ironic.opts),
('keystone', nimble.conf.keystone.opts),
('neutron', nimble.conf.neutron.opts),
('scheduler', nimble.conf.scheduler.opts),
]

54
nimble/conf/scheduler.py Normal file
View File

@ -0,0 +1,54 @@
# Copyright 2016 Huawei Technologies Co.,LTD.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_config import cfg
from nimble.common.i18n import _
opts = [
cfg.StrOpt('scheduler_driver',
default='nimble.engine.scheduler.filter_scheduler.'
'FilterScheduler',
help=_('Default scheduler driver to use')),
cfg.StrOpt('scheduler_node_manager',
default='nimble.engine.scheduler.node_manager.NodeManager',
help=_('The scheduler node manager class to use')),
cfg.IntOpt('scheduler_max_attempts',
default=3,
help=_('Maximum number of attempts to schedule a node')),
cfg.StrOpt('scheduler_json_config_location',
default='',
help=_('Absolute path to scheduler configuration JSON file.')),
cfg.ListOpt('scheduler_default_filters',
default=[
'AvailabilityZoneFilter',
'CapabilitiesFilter'
],
help=_('Which filter class names to use for filtering nodes '
'when not specified in the request.')),
cfg.ListOpt('scheduler_default_weighers',
default=[],
help=_('Which weigher class names to use for weighing '
'nodes.')),
cfg.StrOpt('scheduler_weight_handler',
default='nimble.engine.scheduler.weights.'
'OrderedNodeWeightHandler',
help=_('Which handler to use for selecting the node after '
'weighing')),
]
def register_opts(conf):
conf.register_opts(opts, group='scheduler')

View File

@ -17,6 +17,7 @@
from eventlet import greenpool
from oslo_service import periodic_task
from oslo_utils import importutils
from nimble.common.i18n import _
from nimble.common import rpc
@ -34,6 +35,8 @@ class BaseEngineManager(periodic_task.PeriodicTasks):
self.topic = topic
self.node_cache = {}
self.node_cache_time = 0
scheduler_driver = CONF.scheduler.scheduler_driver
self.scheduler_driver = importutils.import_object(scheduler_driver)
self.notifier = rpc.get_notifier()
self._started = False
@ -52,8 +55,11 @@ class BaseEngineManager(periodic_task.PeriodicTasks):
self._worker_pool = greenpool.GreenPool(
size=CONF.engine.workers_pool_size)
self._started = True
def del_host(self):
self._worker_pool.waitall()
self._started = False
def periodic_tasks(self, context, raise_on_error=False):
return self.run_periodic_tasks(context, raise_on_error=raise_on_error)

View File

View File

@ -0,0 +1,136 @@
# Copyright (c) 2011-2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Filter support
"""
from oslo_log import log as logging
import six
from nimble.common.i18n import _LI
from nimble.engine.scheduler import base_handler
LOG = logging.getLogger(__name__)
class BaseFilter(object):
"""Base class for all filter classes."""
def _filter_one(self, obj, filter_properties):
"""Return True if it passes the filter, False otherwise.
Override this in a subclass.
"""
return True
def filter_all(self, filter_obj_list, filter_properties):
"""Yield objects that pass the filter.
Can be overridden in a subclass, if you need to base filtering
decisions on all objects. Otherwise, one can just override
_filter_one() to filter a single object.
"""
for obj in filter_obj_list:
if self._filter_one(obj, filter_properties):
yield obj
# Set to true in a subclass if a filter only needs to be run once
# for each request rather than for each instance
run_filter_once_per_request = False
def run_filter_for_index(self, index):
"""Return True if the filter needs to be run for n-th instances.
Only need to override this if a filter needs anything other than
"first only" or "all" behaviour.
"""
return not (self.run_filter_once_per_request and index > 0)
class BaseFilterHandler(base_handler.BaseHandler):
"""Base class to handle loading filter classes.
This class should be subclassed where one needs to use filters.
"""
def _log_filtration(self, full_filter_results,
part_filter_results, filter_properties):
# Log the filtration history
rspec = filter_properties.get("request_spec", {})
msg_dict = {"inst_id": rspec.get("instance_id", ""),
"str_results": six.text_type(full_filter_results),
}
full_msg = ("Filtering removed all nodes for the request with "
"instance ID "
"'%(inst_id)s'. Filter results: %(str_results)s"
) % msg_dict
msg_dict["str_results"] = ', '.join(
_LI("%(cls_name)s: (start: %(start)s, end: %(end)s)") % {
"cls_name": value[0], "start": value[1], "end": value[2]}
for value in part_filter_results)
part_msg = _LI("Filtering removed all nodes for the request with "
"instance ID "
"'%(inst_id)s'. Filter results: %(str_results)s"
) % msg_dict
LOG.debug(full_msg)
LOG.info(part_msg)
def get_filtered_objects(self, filter_classes, objs,
filter_properties, index=0):
"""Get objects after filter
:param filter_classes: filters that will be used to filter the
objects
:param objs: objects that will be filtered
:param filter_properties: client filter properties
:param index: This value needs to be increased in the caller
function of get_filtered_objects when handling
each resource.
"""
list_objs = list(objs)
LOG.debug("Starting with %d node(s)", len(list_objs))
# The 'part_filter_results' list just tracks the number of hosts
# before and after the filter, unless the filter returns zero
# hosts, in which it records the host/nodename for the last batch
# that was removed. Since the full_filter_results can be very large,
# it is only recorded if the LOG level is set to debug.
part_filter_results = []
full_filter_results = []
for filter_cls in filter_classes:
cls_name = filter_cls.__name__
start_count = len(list_objs)
filter_class = filter_cls()
if filter_class.run_filter_for_index(index):
objs = filter_class.filter_all(list_objs, filter_properties)
if objs is None:
LOG.info(_LI("Filter %s returned 0 nodes"), cls_name)
full_filter_results.append((cls_name, None))
list_objs = None
break
list_objs = list(objs)
end_count = len(list_objs)
part_filter_results.append((cls_name, start_count, end_count))
remaining = [getattr(obj, "node", obj)
for obj in list_objs]
full_filter_results.append((cls_name, remaining))
LOG.debug("Filter %(cls_name)s returned "
"%(obj_len)d node(s)",
{'cls_name': cls_name, 'obj_len': len(list_objs)})
if not list_objs:
self._log_filtration(full_filter_results,
part_filter_results, filter_properties)
return list_objs

View File

@ -0,0 +1,47 @@
# Copyright (c) 2011-2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A common base for handling extension classes.
Used by BaseFilterHandler and BaseWeightHandler
"""
import inspect
from stevedore import extension
class BaseHandler(object):
"""Base class to handle loading filter and weight classes."""
def __init__(self, modifier_class_type, modifier_namespace):
self.namespace = modifier_namespace
self.modifier_class_type = modifier_class_type
self.extension_manager = extension.ExtensionManager(modifier_namespace)
def _is_correct_class(self, cls):
"""Return whether an object is a class of the correct type.
(or is not prefixed with an underscore)
"""
return (inspect.isclass(cls) and
not cls.__name__.startswith('_') and
issubclass(cls, self.modifier_class_type))
def get_all_classes(self):
# We use a set, as some classes may have an entrypoint of their own,
# and also be returned by a function such as 'all_filters' for example
return [ext.plugin for ext in self.extension_manager if
self._is_correct_class(ext.plugin)]

View File

@ -0,0 +1,145 @@
# Copyright (c) 2011-2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Pluggable Weighing support
"""
import abc
import six
from nimble.engine.scheduler import base_handler
def normalize(weight_list, minval=None, maxval=None):
"""Normalize the values in a list between 0 and 1.0.
The normalization is made regarding the lower and upper values present in
weight_list. If the minval and/or maxval parameters are set, these values
will be used instead of the minimum and maximum from the list.
If all the values are equal, they are normalized to 0.
"""
if not weight_list:
return ()
if maxval is None:
maxval = max(weight_list)
if minval is None:
minval = min(weight_list)
maxval = float(maxval)
minval = float(minval)
if minval == maxval:
return [0] * len(weight_list)
range_ = maxval - minval
return ((i - minval) / range_ for i in weight_list)
class WeighedObject(object):
"""Object with weight information."""
def __init__(self, obj, weight):
self.obj = obj
self.weight = weight
def __repr__(self):
return "<WeighedObject '%s': %s>" % (self.obj, self.weight)
@six.add_metaclass(abc.ABCMeta)
class BaseWeigher(object):
"""Base class for pluggable weighers.
The attributes maxval and minval can be specified to set up the maximum
and minimum values for the weighed objects. These values will then be
taken into account in the normalization step, instead of taking the values
from the calculated weights.
"""
minval = None
maxval = None
def weight_multiplier(self):
"""How weighted this weigher should be.
Override this method in a subclass, so that the returned value is
read from a configuration option to permit operators specify a
multiplier for the weigher.
"""
return 1.0
@abc.abstractmethod
def _weigh_object(self, obj, weight_properties):
"""Override in a subclass to specify a weight for a specific object."""
def weigh_objects(self, weighed_obj_list, weight_properties):
"""Weigh multiple objects.
Override in a subclass if you need access to all objects in order
to calculate weights. Do not modify the weight of an object here,
just return a list of weights.
"""
# Calculate the weights
weights = []
for obj in weighed_obj_list:
weight = self._weigh_object(obj.obj, weight_properties)
# Record the min and max values if they are None. If they anything
# but none we assume that the weigher has set them
if self.minval is None:
self.minval = weight
if self.maxval is None:
self.maxval = weight
if weight < self.minval:
self.minval = weight
elif weight > self.maxval:
self.maxval = weight
weights.append(weight)
return weights
class BaseWeightHandler(base_handler.BaseHandler):
object_class = WeighedObject
def get_weighed_objects(self, weigher_classes, obj_list,
weighing_properties):
"""Return a sorted (descending), normalized list of WeighedObjects."""
if not obj_list:
return []
weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
for weigher_cls in weigher_classes:
weigher = weigher_cls()
weights = weigher.weigh_objects(weighed_objs, weighing_properties)
# Normalize the weights
weights = normalize(weights,
minval=weigher.minval,
maxval=weigher.maxval)
for i, weight in enumerate(weights):
obj = weighed_objs[i]
obj.weight += weigher.weight_multiplier() * weight
return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)

View File

@ -0,0 +1,40 @@
# Copyright (c) 2010 OpenStack Foundation
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler base class that all Schedulers should inherit from
"""
from oslo_config import cfg
from oslo_utils import importutils
from nimble.common.i18n import _
CONF = cfg.CONF
class Scheduler(object):
"""The base class that all Scheduler classes should inherit from."""
def __init__(self):
self.node_manager = importutils.import_object(
CONF.scheduler.scheduler_node_manager)
def schedule(self, context, request_spec, filter_properties):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement schedule"))

View File

@ -0,0 +1,187 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""The FilterScheduler is for creating instances.
You can customize this scheduler by specifying your own node Filters and
Weighing Functions.
"""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from nimble.common import exception
from nimble.common.i18n import _, _LE, _LW
from nimble.engine.scheduler import driver
from nimble.engine.scheduler import scheduler_options
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class FilterScheduler(driver.Scheduler):
"""Scheduler that can be used for filtering and weighing."""
def __init__(self, *args, **kwargs):
super(FilterScheduler, self).__init__(*args, **kwargs)
self.options = scheduler_options.SchedulerOptions()
self.max_attempts = self._max_attempts()
def _get_configuration_options(self):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
"""Stuff things into filter_properties.
Can be overridden in a subclass to add more data.
"""
instance = request_spec['instance_properties']
filter_properties['availability_zone'] = \
instance.get('availability_zone')
def _add_retry_node(self, filter_properties, node):
"""Add a retry entry for the selected Ironic node.
In the event that the request gets re-scheduled, this entry will signal
that the given node has already been tried.
"""
retry = filter_properties.get('retry', None)
if not retry:
return
nodes = retry['nodes']
nodes.append(node)
def _max_attempts(self):
max_attempts = CONF.scheduler.scheduler_max_attempts
if max_attempts < 1:
raise exception.InvalidParameterValue(
err=_("Invalid value for 'scheduler_max_attempts', "
"must be >=1"))
return max_attempts
def _log_instance_error(self, instance_id, retry):
"""Log requests with exceptions from previous instance operations."""
exc = retry.pop('exc', None) # string-ified exception from instance
if not exc:
return # no exception info from a previous attempt, skip
nodes = retry.get('nodes', None)
if not nodes:
return # no previously attempted nodes, skip
last_node = nodes[-1]
LOG.error(_LE("Error scheduling %(instance_id)s from last node: "
"%(last_node)s : %(exc)s"),
{'instance_id': instance_id,
'last_node': last_node,
'exc': exc})
def _populate_retry(self, filter_properties, properties):
"""Populate filter properties with history of retries for request.
If maximum retries is exceeded, raise NoValidNode.
"""
max_attempts = self.max_attempts
retry = filter_properties.pop('retry', {})
if max_attempts == 1:
# re-scheduling is disabled.
return
# retry is enabled, update attempt count:
if retry:
retry['num_attempts'] += 1
else:
retry = {
'num_attempts': 1,
'nodes': [] # list of Ironic nodes tried
}
filter_properties['retry'] = retry
instance_id = properties.get('instance_id')
self._log_instance_error(instance_id, retry)
if retry['num_attempts'] > max_attempts:
raise exception.NoValidNode(
reason=_("Exceeded max scheduling attempts %(max_attempts)d "
"for instance %(instance_id)s") %
{'max_attempts': max_attempts,
'instance_id': instance_id})
def _get_weighted_candidates(self, context, request_spec, node_cache,
filter_properties=None):
"""Return a list of nodes that meet required specs.
Returned list is ordered by their fitness.
"""
# Since Nimble is using mixed filters from Oslo and it's own, which
# takes 'resource_XX' and 'instance_XX' as input respectively, copying
# 'instance_type' to 'resource_type' will make both filters happy.
instance_type = resource_type = request_spec.get("instance_type")
config_options = self._get_configuration_options()
if filter_properties is None:
filter_properties = {}
self._populate_retry(filter_properties,
request_spec['instance_properties'])
request_spec_dict = jsonutils.to_primitive(request_spec)
filter_properties.update({'context': context,
'request_spec': request_spec_dict,
'config_options': config_options,
'instance_type': instance_type,
'resource_type': resource_type})
self.populate_filter_properties(request_spec,
filter_properties)
# Find our local list of acceptable nodes by filtering and
# weighing our options. we virtually consume resources on
# it so subsequent selections can adjust accordingly.
# Note: remember, we are using an iterator here. So only
# traverse this list once.
nodes = self.node_manager.get_all_node_states(node_cache)
# Filter local nodes based on requirements ...
nodes = self.node_manager.get_filtered_nodes(nodes,
filter_properties)
if not nodes:
return []
LOG.debug("Filtered %s", nodes)
# weighted_node = WeightedNode() ... the best
# node for the job.
weighed_nodes = self.node_manager.get_weighed_nodes(nodes,
filter_properties)
return weighed_nodes
def schedule(self, context, request_spec, node_cache,
filter_properties=None):
weighed_nodes = self._get_weighted_candidates(context, request_spec,
node_cache,
filter_properties)
if not weighed_nodes:
LOG.warning(_LW('No weighed nodes found for instance '
'with properties: %s'),
filter_properties['request_spec'].get('instance_type'))
return None
return self._choose_top_node(weighed_nodes, request_spec)
def _choose_top_node(self, weighed_nodes, request_spec):
top_node = weighed_nodes[0]
node_state = top_node.obj
LOG.debug("Choosing %s", node_state.node)
return top_node

View File

@ -0,0 +1,39 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler node filters
"""
from nimble.engine.scheduler import base_filter
class BaseNodeFilter(base_filter.BaseFilter):
"""Base class for node filters."""
def _filter_one(self, obj, filter_properties):
"""Return True if the object passes the filter, otherwise False."""
return self.node_passes(obj, filter_properties)
def node_passes(self, node_state, filter_properties):
"""Return True if the NodeState passes the filter, otherwise False.
Override this in a subclass.
"""
raise NotImplementedError()
class NodeFilterHandler(base_filter.BaseFilterHandler):
def __init__(self, namespace):
super(NodeFilterHandler, self).__init__(BaseNodeFilter, namespace)

View File

@ -0,0 +1,32 @@
# Copyright (c) 2011-2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nimble.engine.scheduler import filters
class AvailabilityZoneFilter(filters.BaseNodeFilter):
"""Filters Nodes by availability zone."""
# Availability zones do not change within a request
run_filter_once_per_request = True
def node_passes(self, node_state, filter_properties):
spec = filter_properties.get('request_spec', {})
props = spec.get('resource_properties', {})
availability_zone = props.get('availability_zone')
if availability_zone:
return availability_zone == node_state.availability_zone
return True

View File

@ -0,0 +1,86 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from nimble.engine.scheduler import filters
from nimble.engine.scheduler.filters import extra_specs_ops
LOG = logging.getLogger(__name__)
class CapabilitiesFilter(filters.BaseNodeFilter):
"""NodeFilter to work with resource instance type records."""
def _satisfies_extra_specs(self, capabilities, resource_type):
"""Check if capabilities satisfy resource type requirements.
Check that the capabilities provided by the services satisfy
the extra specs associated with the resource type.
"""
if not resource_type:
return True
extra_specs = resource_type.get('extra_specs', [])
if not extra_specs:
return True
for key, req in extra_specs.items():
# Either not scoped format, or in capabilities scope
scope = key.split(':')
# Ignore scoped (such as vendor-specific) capabilities
if len(scope) > 1 and scope[0] != "capabilities":
continue
# Strip off prefix if spec started with 'capabilities:'
elif scope[0] == "capabilities":
del scope[0]
cap = capabilities
for index in range(len(scope)):
try:
cap = cap[scope[index]]
except (TypeError, KeyError):
LOG.debug("Node doesn't provide capability '%(cap)s' " %
{'cap': scope[index]})
return False
# Make all capability values a list so we can handle lists
cap_list = [cap] if not isinstance(cap, list) else cap
# Loop through capability values looking for any match
for cap_value in cap_list:
if extra_specs_ops.match(cap_value, req):
break
else:
# Nothing matched, so bail out
LOG.debug('Instance type extra spec requirement '
'"%(key)s=%(req)s" does not match reported '
'capability "%(cap)s"',
{'key': key, 'req': req, 'cap': cap})
return False
return True
def node_passes(self, node_state, filter_properties):
"""Return a list of nodes that can create resource_type."""
resource_type = filter_properties.get('resource_type')
if not self._satisfies_extra_specs(node_state.capabilities,
resource_type):
LOG.debug("%(node_state)s fails resource_type extra_specs "
"requirements", {'node_state': node_state})
return False
return True

View File

@ -0,0 +1,77 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
from oslo_utils import strutils
# 1. The following operations are supported:
# =, s==, s!=, s>=, s>, s<=, s<, <in>, <is>, <or>, ==, !=, >=, <=
# 2. Note that <or> is handled in a different way below.
# 3. If the first word in the extra_specs is not one of the operators,
# it is ignored.
_op_methods = {'=': lambda x, y: float(x) >= float(y),
'<in>': lambda x, y: y in x,
'<is>': lambda x, y: (strutils.bool_from_string(x) is
strutils.bool_from_string(y)),
'==': lambda x, y: float(x) == float(y),
'!=': lambda x, y: float(x) != float(y),
'>=': lambda x, y: float(x) >= float(y),
'<=': lambda x, y: float(x) <= float(y),
's==': operator.eq,
's!=': operator.ne,
's<': operator.lt,
's<=': operator.le,
's>': operator.gt,
's>=': operator.ge}
def match(value, req):
if req is None:
if value is None:
return True
else:
return False
words = req.split()
op = method = None
if words:
op = words.pop(0)
method = _op_methods.get(op)
if op != '<or>' and not method:
return value == req
if value is None:
return False
if op == '<or>': # Ex: <or> v1 <or> v2 <or> v3
while True:
if words.pop(0) == value:
return True
if not words:
break
op = words.pop(0) # remove a keyword <or>
if not words:
break
return False
try:
if words and method(value, words[0]):
return True
except ValueError:
pass
return False

View File

@ -0,0 +1,149 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
from oslo_serialization import jsonutils
import six
from nimble.engine.scheduler import filters
class JsonFilter(filters.BaseNodeFilter):
"""Node Filter to allow simple JSON-based grammar for selecting nodes."""
def _op_compare(self, args, op):
"""Compare first item of args with the rest using specified operator.
Returns True if the specified operator can successfully
compare the first item in the args with all the rest. Will
return False if only one item is in the list.
"""
if len(args) < 2:
return False
if op is operator.contains:
bad = args[0] not in args[1:]
else:
bad = [arg for arg in args[1:]
if not op(args[0], arg)]
return not bool(bad)
def _equals(self, args):
"""First term is == all the other terms."""
return self._op_compare(args, operator.eq)
def _less_than(self, args):
"""First term is < all the other terms."""
return self._op_compare(args, operator.lt)
def _greater_than(self, args):
"""First term is > all the other terms."""
return self._op_compare(args, operator.gt)
def _in(self, args):
"""First term is in set of remaining terms."""
return self._op_compare(args, operator.contains)
def _less_than_equal(self, args):
"""First term is <= all the other terms."""
return self._op_compare(args, operator.le)
def _greater_than_equal(self, args):
"""First term is >= all the other terms."""
return self._op_compare(args, operator.ge)
def _not(self, args):
"""Flip each of the arguments."""
return [not arg for arg in args]
def _or(self, args):
"""True if any arg is True."""
return any(args)
def _and(self, args):
"""True if all args are True."""
return all(args)
commands = {
'=': _equals,
'<': _less_than,
'>': _greater_than,
'in': _in,
'<=': _less_than_equal,
'>=': _greater_than_equal,
'not': _not,
'or': _or,
'and': _and,
}
def _parse_string(self, string, node_state):
"""Parse capability lookup strings.
Strings prefixed with $ are capability lookups in the
form '$variable' where 'variable' is an attribute in the
NodeState class. If $variable is a dictionary, you may
use: $variable.dictkey
"""
if not string:
return None
if not string.startswith("$"):
return string
path = string[1:].split(".")
obj = getattr(node_state, path[0], None)
if obj is None:
return None
for item in path[1:]:
obj = obj.get(item)
if obj is None:
return None
return obj
def _process_filter(self, query, node_state):
"""Recursively parse the query structure."""
if not query:
return True
cmd = query[0]
method = self.commands[cmd]
cooked_args = []
for arg in query[1:]:
if isinstance(arg, list):
arg = self._process_filter(arg, node_state)
elif isinstance(arg, six.string_types):
arg = self._parse_string(arg, node_state)
if arg is not None:
cooked_args.append(arg)
result = method(self, cooked_args)
return result
def node_passes(self, node_state, filter_properties):
"""Return a list of nodes that can fulfill query requirements."""
try:
query = filter_properties['scheduler_hints']['query']
except KeyError:
query = None
if not query:
return True
# NOTE(comstud): Not checking capabilities or service for
# enabled/disabled so that a provided json filter can decide
result = self._process_filter(jsonutils.loads(query), node_state)
if isinstance(result, list):
# If any succeeded, include the node
result = any(result)
if result:
# Filter it out.
return True
return False

View File

@ -0,0 +1,135 @@
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Manage nodes.
"""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import importutils
from nimble.common import exception
from nimble.engine.scheduler import filters
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class NodeState(object):
"""Mutable and immutable information tracked for a Ironic node."""
def __init__(self, node):
self.node = node.uuid
self.capabilities = node.capabilities
self.availability_zone = node.extra.get('availability_zone', None)
self.instance_type = node.extra.get('instance_type', None)
class NodeManager(object):
"""Base NodeManager class."""
node_state_cls = NodeState
def __init__(self):
self.filter_handler = filters.NodeFilterHandler('nimble.engine.'
'scheduler.filters')
self.filter_classes = self.filter_handler.get_all_classes()
self.weight_handler = importutils.import_object(
CONF.scheduler.scheduler_weight_handler,
'nimble.engine.scheduler.weights')
self.weight_classes = self.weight_handler.get_all_classes()
def _choose_node_filters(self, filter_cls_names):
"""Return a list of available filter names.
This function checks input filter names against a predefined set
of acceptable filterss (all loaded filters). If input is None,
it uses CONF.scheduler_default_filters instead.
"""
if filter_cls_names is None:
filter_cls_names = CONF.scheduler.scheduler_default_filters
if not isinstance(filter_cls_names, (list, tuple)):
filter_cls_names = [filter_cls_names]
good_filters = []
bad_filters = []
for filter_name in filter_cls_names:
found_class = False
for cls in self.filter_classes:
if cls.__name__ == filter_name:
found_class = True
good_filters.append(cls)
break
if not found_class:
bad_filters.append(filter_name)
if bad_filters:
raise exception.SchedulerNodeFilterNotFound(
filter_name=", ".join(bad_filters))
return good_filters
def _choose_node_weighers(self, weight_cls_names):
"""Return a list of available weigher names.
This function checks input weigher names against a predefined set
of acceptable weighers (all loaded weighers). If input is None,
it uses CONF.scheduler_default_weighers instead.
"""
if weight_cls_names is None:
weight_cls_names = CONF.scheduler.scheduler_default_weighers
if not isinstance(weight_cls_names, (list, tuple)):
weight_cls_names = [weight_cls_names]
good_weighers = []
bad_weighers = []
for weigher_name in weight_cls_names:
found_class = False
for cls in self.weight_classes:
if cls.__name__ == weigher_name:
good_weighers.append(cls)
found_class = True
break
if not found_class:
bad_weighers.append(weigher_name)
if bad_weighers:
raise exception.SchedulerNodeWeigherNotFound(
weigher_name=", ".join(bad_weighers))
return good_weighers
def get_filtered_nodes(self, nodes, filter_properties,
filter_class_names=None):
"""Filter nodes and return only ones passing all filters."""
filter_classes = self._choose_node_filters(filter_class_names)
return self.filter_handler.get_filtered_objects(filter_classes,
nodes,
filter_properties)
def get_weighed_nodes(self, nodes, weight_properties,
weigher_class_names=None):
"""Weigh the nodes."""
weigher_classes = self._choose_node_weighers(weigher_class_names)
return self.weight_handler.get_weighed_objects(weigher_classes,
nodes,
weight_properties)
def get_all_node_states(self, node_cache):
"""Returns a list of all the nodes the NodeManager knows about."""
node_states = []
for node in node_cache:
node_state = self.node_state_cls(node)
node_states.append(node_state)
return node_states

View File

@ -0,0 +1,97 @@
# Copyright (c) 2011 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
SchedulerOptions monitors a local .json file for changes and loads
it if needed. This file is converted to a data structure and passed
into the filtering and weighing functions which can use it for
dynamic configuration.
"""
import datetime
import json
import os
from oslo_config import cfg
from oslo_log import log as logging
from oslo_utils import timeutils
from nimble.common.i18n import _LE
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class SchedulerOptions(object):
"""SchedulerOptions monitors a local .json file for changes.
The file is reloaded if needed and converted to a data structure and
passed into the filtering and weighing functions which can use it
for dynamic configuration.
"""
def __init__(self):
super(SchedulerOptions, self).__init__()
self.data = {}
self.last_modified = None
self.last_checked = None
def _get_file_handle(self, filename):
"""Get file handle. Broken out for testing."""
return open(filename)
def _get_file_timestamp(self, filename):
"""Get the last modified datetime. Broken out for testing."""
try:
return os.path.getmtime(filename)
except os.error:
LOG.exception(_LE("Could not stat scheduler options file "
"%(filename)s."),
{'filename': filename})
raise
def _load_file(self, handle):
"""Decode the JSON file. Broken out for testing."""
try:
return json.load(handle)
except ValueError:
LOG.exception(_LE("Could not decode scheduler options."))
return {}
def _get_time_now(self):
"""Get current UTC. Broken out for testing."""
return timeutils.utcnow()
def get_configuration(self, filename=None):
"""Check the json file for changes and load it if needed."""
if not filename:
filename = CONF.scheduler.scheduler_json_config_location
if not filename:
return self.data
if self.last_checked:
now = self._get_time_now()
if now - self.last_checked < datetime.timedelta(minutes=5):
return self.data
last_modified = self._get_file_timestamp(filename)
if (not last_modified or not self.last_modified or
last_modified > self.last_modified):
self.data = self._load_file(self._get_file_handle(filename))
self.last_modified = last_modified
if not self.data:
self.data = {}
return self.data

View File

@ -0,0 +1,45 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler node weights
"""
from nimble.engine.scheduler import base_weight
class WeighedNode(base_weight.WeighedObject):
def to_dict(self):
return {
'weight': self.weight,
'node': self.obj.node,
}
def __repr__(self):
return ("WeighedNode [node: %s, weight: %s]" %
(self.obj.node, self.weight))
class BaseNodeWeigher(base_weight.BaseWeigher):
"""Base class for node weights."""
pass
class OrderedNodeWeightHandler(base_weight.BaseWeightHandler):
object_class = WeighedNode
def __init__(self, namespace):
super(OrderedNodeWeightHandler, self).__init__(BaseNodeWeigher,
namespace)