Refactor the scheduler to use placement service

This change change the scheduler to use placement api to filter query
nodes with specified resource class of flavor, and clean the filters
and weighers in scheduler.

Change-Id: I89ad443f553510da6daf289b83f3c30d9d546ace
Partially Implements: bp track-resources-using-placement
changes/26/477426/29
liusheng 2017-06-26 11:10:55 +08:00 committed by liusheng
parent adf9828348
commit 9de2e37188
34 changed files with 165 additions and 1929 deletions

View File

@ -15,6 +15,7 @@ from oslo_concurrency import processutils
from oslo_context import context
from oslo_log import log
import oslo_messaging as messaging
from oslo_service import periodic_task
from oslo_service import service
from oslo_service import wsgi
from oslo_utils import importutils
@ -53,10 +54,11 @@ class RPCService(service.Service):
self.rpcserver.start()
self.manager.init_host()
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=CONF.periodic_interval,
context=admin_context)
if isinstance(self.manager, periodic_task.PeriodicTasks):
self.tg.add_dynamic_timer(
self.manager.periodic_tasks,
periodic_interval_max=CONF.periodic_interval,
context=admin_context)
LOG.info('Created RPC server for service %(service)s on host '
'%(host)s.',

View File

@ -286,7 +286,6 @@ class API(object):
servers = self._provision_servers(context, base_options,
min_count, max_count)
request_spec = {
'server_id': servers[0].uuid,
'server_properties': {
'flavor_uuid': servers[0].flavor_uuid,
'networks': requested_networks,

View File

@ -24,7 +24,6 @@ from mogan.db import api as dbapi
from mogan.engine.baremetal import driver
from mogan.engine import rpcapi
from mogan import network
from mogan.scheduler import rpcapi as scheduler_rpcapi
class BaseEngineManager(periodic_task.PeriodicTasks):
@ -36,7 +35,6 @@ class BaseEngineManager(periodic_task.PeriodicTasks):
self.host = host
self.topic = topic
self.network_api = network.API()
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
self.driver = driver.load_engine_driver(CONF.engine.engine_driver)
self.engine_rpcapi = rpcapi.EngineAPI()
self._sync_power_pool = greenpool.GreenPool(

View File

@ -34,7 +34,7 @@ from mogan.common import utils
from mogan.engine import configdrive
from mogan.engine import metadata as server_metadata
from mogan import objects
from mogan.scheduler import client as sched_client
LOG = logging.getLogger(__name__)
@ -64,6 +64,7 @@ class OnFailureRescheduleTask(flow_utils.MoganTask):
exception.ServerDeployAborted,
exception.NetworkError,
]
self.reportclient = sched_client.SchedulerClient().reportclient
def execute(self, **kwargs):
pass
@ -137,15 +138,16 @@ class BuildNetworkTask(flow_utils.MoganTask):
"""Build network for the server."""
def __init__(self, manager):
requires = ['server', 'requested_networks', 'ports', 'context']
requires = ['server', 'requested_networks', 'context']
super(BuildNetworkTask, self).__init__(addons=[ACTION],
requires=requires)
self.manager = manager
def _build_networks(self, context, server, requested_networks, ports):
def _build_networks(self, context, server, requested_networks):
# TODO(zhenguo): This seems not needed as our scheduler has already
# guaranteed this.
ports = self.manager.driver.get_ports_from_node(server.node_uuid)
if len(requested_networks) > len(ports):
raise exception.InterfacePlugException(_(
"Ironic node: %(id)s virtual to physical interface count"
@ -196,12 +198,11 @@ class BuildNetworkTask(flow_utils.MoganTask):
return nics_obj
def execute(self, context, server, requested_networks, ports):
def execute(self, context, server, requested_networks):
server_nics = self._build_networks(
context,
server,
requested_networks,
ports)
requested_networks)
server.nics = server_nics
server.save()
@ -298,7 +299,7 @@ class CreateServerTask(flow_utils.MoganTask):
def get_flow(context, manager, server, requested_networks, user_data,
injected_files, key_pair, ports, request_spec,
injected_files, key_pair, request_spec,
filter_properties):
"""Constructs and returns the manager entrypoint flow
@ -326,7 +327,6 @@ def get_flow(context, manager, server, requested_networks, user_data,
'user_data': user_data,
'injected_files': injected_files,
'key_pair': key_pair,
'ports': ports,
'configdrive': {}
}

View File

@ -195,6 +195,9 @@ class EngineManager(base_manager.BaseEngineManager):
self.scheduler_client.set_inventory_for_provider(
node.uuid, node.name, inventory_data,
resource_class)
if node.provision_state == 'available':
self.scheduler_client.reportclient \
.delete_allocations_for_resource_provider(node.uuid)
@periodic_task.periodic_task(spacing=CONF.engine.sync_power_state_interval,
run_immediately=True)
@ -392,9 +395,10 @@ class EngineManager(base_manager.BaseEngineManager):
}
filter_properties['retry'] = retry
request_spec['num_servers'] = len(servers)
request_spec['server_ids'] = [s.uuid for s in servers]
try:
nodes = self.scheduler_rpcapi.select_destinations(
nodes = self.scheduler_client.select_destinations(
context, request_spec, filter_properties)
except exception.NoValidNode as e:
# Here should reset the state of building servers to Error
@ -413,11 +417,11 @@ class EngineManager(base_manager.BaseEngineManager):
{"nodes": nodes})
for (server, node) in six.moves.zip(servers, nodes):
server.node_uuid = node['node_uuid']
server.node_uuid = node
server.save()
# Add a retry entry for the selected node
retry_nodes = retry['nodes']
retry_nodes.append(node['node_uuid'])
retry_nodes.append(node)
for server in servers:
utils.spawn_n(self._create_server,
@ -444,7 +448,6 @@ class EngineManager(base_manager.BaseEngineManager):
target_state=states.ACTIVE)
try:
node = objects.ComputeNode.get(context, server.node_uuid)
flow_engine = create_server.get_flow(
context,
self,
@ -453,7 +456,6 @@ class EngineManager(base_manager.BaseEngineManager):
user_data,
injected_files,
key_pair,
node['ports'],
request_spec,
filter_properties,
)

View File

@ -1,134 +0,0 @@
# Copyright (c) 2011-2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Filter support
"""
from oslo_log import log as logging
import six
from mogan.scheduler import base_handler
LOG = logging.getLogger(__name__)
class BaseFilter(object):
"""Base class for all filter classes."""
def _filter_one(self, obj, filter_properties):
"""Return True if it passes the filter, False otherwise.
Override this in a subclass.
"""
return True
def filter_all(self, filter_obj_list, filter_properties):
"""Yield objects that pass the filter.
Can be overridden in a subclass, if you need to base filtering
decisions on all objects. Otherwise, one can just override
_filter_one() to filter a single object.
"""
for obj in filter_obj_list:
if self._filter_one(obj, filter_properties):
yield obj
# Set to true in a subclass if a filter only needs to be run once
# for each request rather than for each server
run_filter_once_per_request = False
def run_filter_for_index(self, index):
"""Return True if the filter needs to be run for n-th servers.
Only need to override this if a filter needs anything other than
"first only" or "all" behaviour.
"""
return not (self.run_filter_once_per_request and index > 0)
class BaseFilterHandler(base_handler.BaseHandler):
"""Base class to handle loading filter classes.
This class should be subclassed where one needs to use filters.
"""
def _log_filtration(self, full_filter_results,
part_filter_results, filter_properties):
# Log the filtration history
rspec = filter_properties.get("request_spec", {})
msg_dict = {"server_id": rspec.get("server_id", ""),
"str_results": six.text_type(full_filter_results),
}
full_msg = ("Filtering removed all nodes for the request with "
"server ID "
"'%(server_id)s'. Filter results: %(str_results)s"
) % msg_dict
msg_dict["str_results"] = ', '.join(
("%(cls_name)s: (start: %(start)s, end: %(end)s)") %
{"cls_name": value[0], "start": value[1], "end": value[2]}
for value in part_filter_results)
part_msg = ("Filtering removed all nodes for the request with "
"server ID '%(server_id)s'. "
"Filter results: %(str_results)s") % msg_dict
LOG.debug(full_msg)
LOG.info(part_msg)
def get_filtered_objects(self, filter_classes, objs,
filter_properties, index=0):
"""Get objects after filter
:param filter_classes: filters that will be used to filter the
objects
:param objs: objects that will be filtered
:param filter_properties: client filter properties
:param index: This value needs to be increased in the caller
function of get_filtered_objects when handling
each resource.
"""
list_objs = list(objs)
LOG.debug("Starting with %d node(s)", len(list_objs))
# The 'part_filter_results' list just tracks the number of hosts
# before and after the filter, unless the filter returns zero
# hosts, in which it records the host/nodename for the last batch
# that was removed. Since the full_filter_results can be very large,
# it is only recorded if the LOG level is set to debug.
part_filter_results = []
full_filter_results = []
for filter_cls in filter_classes:
cls_name = filter_cls.__name__
start_count = len(list_objs)
filter_class = filter_cls()
if filter_class.run_filter_for_index(index):
objs = filter_class.filter_all(list_objs, filter_properties)
if objs is None:
LOG.info("Filter %s returned 0 nodes", cls_name)
full_filter_results.append((cls_name, None))
list_objs = None
break
list_objs = list(objs)
end_count = len(list_objs)
part_filter_results.append((cls_name, start_count, end_count))
remaining = [getattr(obj, "node", obj)
for obj in list_objs]
full_filter_results.append((cls_name, remaining))
LOG.debug("Filter %(cls_name)s returned "
"%(obj_len)d node(s)",
{'cls_name': cls_name, 'obj_len': len(list_objs)})
if not list_objs:
self._log_filtration(full_filter_results,
part_filter_results, filter_properties)
return list_objs

View File

@ -1,47 +0,0 @@
# Copyright (c) 2011-2013 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A common base for handling extension classes.
Used by BaseFilterHandler and BaseWeightHandler
"""
import inspect
from stevedore import extension
class BaseHandler(object):
"""Base class to handle loading filter and weight classes."""
def __init__(self, modifier_class_type, modifier_namespace):
self.namespace = modifier_namespace
self.modifier_class_type = modifier_class_type
self.extension_manager = extension.ExtensionManager(modifier_namespace)
def _is_correct_class(self, cls):
"""Return whether an object is a class of the correct type.
(or is not prefixed with an underscore)
"""
return (inspect.isclass(cls) and
not cls.__name__.startswith('_') and
issubclass(cls, self.modifier_class_type))
def get_all_classes(self):
# We use a set, as some classes may have an entrypoint of their own,
# and also be returned by a function such as 'all_filters' for example
return [ext.plugin for ext in self.extension_manager if
self._is_correct_class(ext.plugin)]

View File

@ -1,145 +0,0 @@
# Copyright (c) 2011-2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Pluggable Weighing support
"""
import abc
import six
from mogan.scheduler import base_handler
def normalize(weight_list, minval=None, maxval=None):
"""Normalize the values in a list between 0 and 1.0.
The normalization is made regarding the lower and upper values present in
weight_list. If the minval and/or maxval parameters are set, these values
will be used instead of the minimum and maximum from the list.
If all the values are equal, they are normalized to 0.
"""
if not weight_list:
return ()
if maxval is None:
maxval = max(weight_list)
if minval is None:
minval = min(weight_list)
maxval = float(maxval)
minval = float(minval)
if minval == maxval:
return [0] * len(weight_list)
range_ = maxval - minval
return ((i - minval) / range_ for i in weight_list)
class WeighedObject(object):
"""Object with weight information."""
def __init__(self, obj, weight):
self.obj = obj
self.weight = weight
def __repr__(self):
return "<WeighedObject '%s': %s>" % (self.obj, self.weight)
@six.add_metaclass(abc.ABCMeta)
class BaseWeigher(object):
"""Base class for pluggable weighers.
The attributes maxval and minval can be specified to set up the maximum
and minimum values for the weighed objects. These values will then be
taken into account in the normalization step, instead of taking the values
from the calculated weights.
"""
minval = None
maxval = None
def weight_multiplier(self):
"""How weighted this weigher should be.
Override this method in a subclass, so that the returned value is
read from a configuration option to permit operators specify a
multiplier for the weigher.
"""
return 1.0
@abc.abstractmethod
def _weigh_object(self, obj, weight_properties):
"""Override in a subclass to specify a weight for a specific object."""
def weigh_objects(self, weighed_obj_list, weight_properties):
"""Weigh multiple objects.
Override in a subclass if you need access to all objects in order
to calculate weights. Do not modify the weight of an object here,
just return a list of weights.
"""
# Calculate the weights
weights = []
for obj in weighed_obj_list:
weight = self._weigh_object(obj.obj, weight_properties)
# Record the min and max values if they are None. If they anything
# but none we assume that the weigher has set them
if self.minval is None:
self.minval = weight
if self.maxval is None:
self.maxval = weight
if weight < self.minval:
self.minval = weight
elif weight > self.maxval:
self.maxval = weight
weights.append(weight)
return weights
class BaseWeightHandler(base_handler.BaseHandler):
object_class = WeighedObject
def get_weighed_objects(self, weigher_classes, obj_list,
weighing_properties):
"""Return a sorted (descending), normalized list of WeighedObjects."""
if not obj_list:
return []
weighed_objs = [self.object_class(obj, 0.0) for obj in obj_list]
for weigher_cls in weigher_classes:
weigher = weigher_cls()
weights = weigher.weigh_objects(weighed_objs, weighing_properties)
# Normalize the weights
weights = normalize(weights,
minval=weigher.minval,
maxval=weigher.maxval)
for i, weight in enumerate(weights):
obj = weighed_objs[i]
obj.weight += weigher.weight_multiplier() * weight
return sorted(weighed_objs, key=lambda x: x.weight, reverse=True)

View File

@ -17,6 +17,8 @@ import functools
from oslo_utils import importutils
from mogan.scheduler import utils
class LazyLoader(object):
@ -39,9 +41,16 @@ class SchedulerClient(object):
"""Client library for placing calls to the scheduler."""
def __init__(self):
self.queryclient = LazyLoader(importutils.import_class(
'mogan.scheduler.client.query.SchedulerQueryClient'))
self.reportclient = LazyLoader(importutils.import_class(
'mogan.scheduler.client.report.SchedulerReportClient'))
@utils.retry_select_destinations
def select_destinations(self, context, spec_obj, filter_properties):
return self.queryclient.select_destinations(context, spec_obj,
filter_properties)
def set_inventory_for_provider(self, rp_uuid, rp_name, inv_data,
res_class):
self.reportclient.set_inventory_for_provider(

View File

@ -0,0 +1,33 @@
# Copyright (c) 2014 Red Hat, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mogan.scheduler import rpcapi as scheduler_rpcapi
class SchedulerQueryClient(object):
"""Client class for querying to the scheduler."""
def __init__(self):
self.scheduler_rpcapi = scheduler_rpcapi.SchedulerAPI()
def select_destinations(self, context, spec_obj, filter_properties):
"""Returns destinations(s) best suited for this request_spec and
filter_properties.
The result should be a list of dicts with 'host', 'nodename' and
'limits' as keys.
"""
return self.scheduler_rpcapi.select_destinations(context, spec_obj,
filter_properties)

View File

@ -98,7 +98,7 @@ class SchedulerReportClient(object):
def __init__(self):
# A dict, keyed by the resource provider UUID, of ResourceProvider
# objects that will have their inventories and allocations tracked by
# the placement API for the compute host
# the placement API for the node
self._resource_providers = {}
# A dict, keyed by resource provider UUID, of sets of aggregate UUIDs
# the provider is associated with
@ -167,8 +167,6 @@ class SchedulerReportClient(object):
url,
endpoint_filter=self.ks_filter, raise_exc=False)
# TODO(sbauza): Change that poor interface into passing a rich versioned
# object that would provide the ResourceProvider requirements.
@safe_connect
def get_filtered_resource_providers(self, filters):
"""Returns a list of ResourceProviders matching the requirements
@ -421,34 +419,6 @@ class SchedulerReportClient(object):
{'placement_req_id': get_placement_request_id(result),
'resource_provider_uuid': rp_uuid,
'generation_id': cur_rp_gen})
# NOTE(jaypipes): There may be cases when we try to set a
# provider's inventory that results in attempting to delete an
# inventory record for a resource class that has an active
# allocation. We need to catch this particular case and raise an
# exception here instead of returning False, since we should not
# re-try the operation in this case.
#
# A use case for where this can occur is the following:
#
# 1) Provider created for each Ironic baremetal node in Newton
# 2) Inventory records for baremetal node created for VCPU,
# MEMORY_MB and DISK_GB
# 3) A Nova instance consumes the baremetal node and allocation
# records are created for VCPU, MEMORY_MB and DISK_GB matching
# the total amount of those resource on the baremetal node.
# 3) Upgrade to Ocata and now resource tracker wants to set the
# provider's inventory to a single record of resource class
# CUSTOM_IRON_SILVER (or whatever the Ironic node's
# "resource_class" attribute is)
# 4) Scheduler report client sends the inventory list containing a
# single CUSTOM_IRON_SILVER record and placement service
# attempts to delete the inventory records for VCPU, MEMORY_MB
# and DISK_GB. An exception is raised from the placement service
# because allocation records exist for those resource classes,
# and a 409 Conflict is returned to the compute node. We need to
# trigger a delete of the old allocation records and then set
# the new inventory, and then set the allocation record to the
# new CUSTOM_IRON_SILVER record.
match = _RE_INV_IN_USE.search(result.text)
if match:
rc = match.group(1)
@ -517,71 +487,6 @@ class SchedulerReportClient(object):
time.sleep(1)
return False
@safe_connect
def _delete_inventory(self, rp_uuid):
"""Deletes all inventory records for a resource provider with the
supplied UUID.
"""
curr = self._get_inventory_and_update_provider_generation(rp_uuid)
# Check to see if we need to update placement's view
if not curr.get('inventories', {}):
msg = "No inventory to delete from resource provider %s."
LOG.debug(msg, rp_uuid)
return
msg = ("Compute node %s reported no inventory but previous "
"inventory was detected. Deleting existing inventory "
"records.")
LOG.info(msg, rp_uuid)
url = '/resource_providers/%s/inventories' % rp_uuid
cur_rp_gen = self._resource_providers[rp_uuid]['generation']
payload = {
'resource_provider_generation': cur_rp_gen,
'inventories': {},
}
r = self.put(url, payload)
placement_req_id = get_placement_request_id(r)
if r.status_code == 200:
# Update our view of the generation for next time
updated_inv = r.json()
new_gen = updated_inv['resource_provider_generation']
self._resource_providers[rp_uuid]['generation'] = new_gen
msg_args = {
'rp_uuid': rp_uuid,
'generation': new_gen,
'placement_req_id': placement_req_id,
}
LOG.info(('[%(placement_req_id)s] Deleted all inventory for '
'resource provider %(rp_uuid)s at generation '
'%(generation)i'),
msg_args)
return
elif r.status_code == 409:
rc_str = _extract_inventory_in_use(r.text)
if rc_str is not None:
msg = ("[%(placement_req_id)s] We cannot delete inventory "
"%(rc_str)s for resource provider %(rp_uuid)s "
"because the inventory is in use.")
msg_args = {
'rp_uuid': rp_uuid,
'rc_str': rc_str,
'placement_req_id': placement_req_id,
}
LOG.warning(msg, msg_args)
return
msg = ("[%(placement_req_id)s] Failed to delete inventory for "
"resource provider %(rp_uuid)s. Got error response: %(err)s")
msg_args = {
'rp_uuid': rp_uuid,
'err': r.text,
'placement_req_id': placement_req_id,
}
LOG.error(msg, msg_args)
def set_inventory_for_provider(self, rp_uuid, rp_name, inv_data,
resource_class):
"""Given the UUID of a provider, set the inventory records for the
@ -601,10 +506,7 @@ class SchedulerReportClient(object):
# Auto-create custom resource classes coming from a virt driver
self._ensure_resource_class(resource_class)
if inv_data:
self._update_inventory(rp_uuid, inv_data)
else:
self._delete_inventory(rp_uuid)
self._update_inventory(rp_uuid, inv_data)
@safe_connect
def _ensure_resource_class(self, name):
@ -704,8 +606,26 @@ class SchedulerReportClient(object):
raise exception.InvalidResourceClass(resource_class=name)
@safe_connect
def put_allocations(self, rp_uuid, consumer_uuid, alloc_data):
"""Creates allocation records for the supplied instance UUID against
def delete_allocation_for_server(self, uuid):
url = '/allocations/%s' % uuid
r = self.delete(url)
if r:
LOG.info('Deleted allocation for server %s', uuid)
else:
# Check for 404 since we don't need to log a warning if we tried to
# delete something which doesn't actually exist.
if r.status_code != 404:
LOG.warning(
'Unable to delete allocation for server '
'%(uuid)s: (%(code)i %(text)s)',
{'uuid': uuid,
'code': r.status_code,
'text': r.text})
@safe_connect
def put_allocations(self, rp_uuid, consumer_uuid, alloc_data, project_id,
user_id):
"""Creates allocation records for the supplied server UUID against
the supplied resource provider.
:note Currently we only allocate against a single resource provider.
@ -713,9 +633,11 @@ class SchedulerReportClient(object):
reality, this will change to allocate against multiple providers.
:param rp_uuid: The UUID of the resource provider to allocate against.
:param consumer_uuid: The instance's UUID.
:param consumer_uuid: The server's UUID.
:param alloc_data: Dict, keyed by resource class, of amounts to
consume.
:param project_id: The project_id associated with the allocations.
:param user_id: The user_id associated with the allocations.
:returns: True if the allocations were created, False otherwise.
"""
payload = {
@ -727,18 +649,51 @@ class SchedulerReportClient(object):
'resources': alloc_data,
},
],
'project_id': project_id,
'user_id': user_id,
}
url = '/allocations/%s' % consumer_uuid
r = self.put(url, payload)
r = self.put(url, payload, version='1.8')
if r.status_code == 406:
# microversion 1.8 not available so try the earlier way
# TODO(melwitt): Remove this when we can be sure all placement
# servers support version 1.8.
payload.pop('project_id')
payload.pop('user_id')
r = self.put(url, payload)
if r.status_code != 204:
LOG.warning(
'Unable to submit allocation for instance '
'Unable to submit allocation for server '
'%(uuid)s (%(code)i %(text)s)',
{'uuid': consumer_uuid,
'code': r.status_code,
'text': r.text})
return r.status_code == 204
@safe_connect
def delete_resource_provider(self, rp_uuid):
"""Deletes the ResourceProvider record for the compute_node.
:param rp_uuid: The uuid of resource provider being deleted.
"""
url = "/resource_providers/%s" % rp_uuid
resp = self.delete(url)
if resp:
LOG.info("Deleted resource provider %s", rp_uuid)
# clean the caches
self._resource_providers.pop(rp_uuid, None)
self._provider_aggregate_map.pop(rp_uuid, None)
else:
# Check for 404 since we don't need to log a warning if we tried to
# delete something which doesn"t actually exist.
if resp.status_code != 404:
LOG.warning(
"Unable to delete resource provider "
"%(uuid)s: (%(code)i %(text)s)",
{"uuid": rp_uuid,
"code": resp.status_code,
"text": resp.text})
@safe_connect
def get_allocations_for_resource_provider(self, rp_uuid):
url = '/resource_providers/%s/allocations' % rp_uuid
@ -747,3 +702,10 @@ class SchedulerReportClient(object):
return {}
else:
return resp.json()['allocations']
def delete_allocations_for_resource_provider(self, rp_uuid):
allocations = self.get_allocations_for_resource_provider(rp_uuid)
if allocations:
LOG.info('Deleted allocation for resource provider %s', rp_uuid)
for consumer_id in allocations:
self.delete_allocation_for_server(consumer_id)

View File

@ -20,7 +20,6 @@ Scheduler base class that all Schedulers should inherit from
"""
from oslo_config import cfg
from oslo_utils import importutils
from mogan.common.i18n import _
@ -31,10 +30,6 @@ CONF = cfg.CONF
class Scheduler(object):
"""The base class that all Scheduler classes should inherit from."""
def __init__(self):
self.node_manager = importutils.import_object(
CONF.scheduler.scheduler_node_manager)
def schedule(self, context, request_spec, filter_properties):
"""Must override schedule method for scheduler to work."""
raise NotImplementedError(_("Must implement schedule"))

View File

@ -15,16 +15,16 @@
You can customize this scheduler by specifying your own node Filters and
Weighing Functions.
"""
from oslo_config import cfg
from oslo_log import log as logging
from oslo_serialization import jsonutils
from mogan.common import exception
from mogan.common.i18n import _
from mogan.common import utils
from mogan import objects
from mogan.scheduler import client
from mogan.scheduler import driver
from mogan.scheduler import scheduler_options
from mogan.scheduler import utils as sched_utils
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
@ -34,21 +34,8 @@ class FilterScheduler(driver.Scheduler):
"""Scheduler that can be used for filtering and weighing."""
def __init__(self, *args, **kwargs):
super(FilterScheduler, self).__init__(*args, **kwargs)
self.options = scheduler_options.SchedulerOptions()
self.max_attempts = self._max_attempts()
def _get_configuration_options(self):
"""Fetch options dictionary. Broken out for testing."""
return self.options.get_configuration()
def populate_filter_properties(self, request_spec, filter_properties):
"""Stuff things into filter_properties.
Can be overridden in a subclass to add more data.
"""
server = request_spec['server_properties']
filter_properties['availability_zone'] = \
server.get('availability_zone')
self.reportclient = client.SchedulerClient().reportclient
def _max_attempts(self):
max_attempts = CONF.scheduler.scheduler_max_attempts
@ -87,7 +74,7 @@ class FilterScheduler(driver.Scheduler):
# re-scheduling is disabled.
return
server_id = request_spec.get('server_id')
server_id = request_spec.get('server_ids')[0]
self._log_server_error(server_id, retry)
if retry['num_attempts'] > max_attempts:
@ -97,54 +84,21 @@ class FilterScheduler(driver.Scheduler):
{'max_attempts': max_attempts,
'server_id': server_id})
def _get_weighted_candidates(self, context, request_spec,
filter_properties=None):
"""Return a list of nodes that meet required specs.
@staticmethod
def _get_res_cls_filters(request_spec):
flavor_dict = request_spec['flavor']
resources = dict([(sched_utils.ensure_resource_class_name(res[0]),
int(res[1]))
for res in flavor_dict['resources'].items()])
return resources
Returned list is ordered by their fitness.
"""
# Since Mogan is using mixed filters from Oslo and it's own, which
# takes 'resource_XX' and 'server_XX' as input respectively, copying
# 'flavor' to 'resource_type' will make both filters happy.
flavor = resource_type = request_spec.get("flavor")
config_options = self._get_configuration_options()
if filter_properties is None:
filter_properties = {}
self._populate_retry(filter_properties, request_spec)
request_spec_dict = jsonutils.to_primitive(request_spec)
filter_properties.update({'request_spec': request_spec_dict,
'config_options': config_options,
'flavor': flavor,
'resource_type': resource_type})
self.populate_filter_properties(request_spec,
filter_properties)
# Find our local list of acceptable nodes by filtering and
# weighing our options. we virtually consume resources on
# it so subsequent selections can adjust accordingly.
# Note: remember, we are using an iterator here. So only
# traverse this list once.
nodes = self.node_manager.get_all_node_states(context)
# Filter local nodes based on requirements ...
nodes = self.node_manager.get_filtered_nodes(nodes,
filter_properties)
if not nodes:
def _get_filtered_nodes(self, request_spec):
query_filters = {'resources': self._get_res_cls_filters(request_spec)}
filtered_nodes = self.reportclient.get_filtered_resource_providers(
query_filters)
if not filtered_nodes:
return []
LOG.debug("Filtered %(nodes)s", {'nodes': nodes})
# weighted_node = WeightedNode() ... the best
# node for the job.
weighed_nodes = self.node_manager.get_weighed_nodes(nodes,
filter_properties)
LOG.debug("Weighed %(nodes)s", {'nodes': weighed_nodes})
return weighed_nodes
return [node['uuid'] for node in filtered_nodes]
def schedule(self, context, request_spec, filter_properties=None):
@ -155,20 +109,21 @@ class FilterScheduler(driver.Scheduler):
# we need to improve this.
@utils.synchronized('schedule')
def _schedule(self, context, request_spec, filter_properties):
weighed_nodes = self._get_weighted_candidates(
context, request_spec, filter_properties)
if not weighed_nodes:
LOG.warning('No weighed nodes found for server '
self._populate_retry(filter_properties, request_spec)
filtered_nodes = self._get_filtered_nodes(request_spec)
if not filtered_nodes:
LOG.warning('No filtered nodes found for server '
'with properties: %s',
request_spec.get('flavor'))
raise exception.NoValidNode(_("No weighed nodes available"))
dest_nodes = []
nodes = self._choose_nodes(weighed_nodes, request_spec)
for node in nodes:
node.obj.consume_from_request(context)
dest_nodes.append(
dict(node_uuid=node.obj.node_uuid, ports=node.obj.ports))
raise exception.NoValidNode(_("No filtered nodes available"))
dest_nodes = self._choose_nodes(filtered_nodes, request_spec)
for server_id, node in zip(request_spec['server_ids'], dest_nodes):
server_obj = objects.Server.get(
context, server_id)
alloc_data = self._get_res_cls_filters(request_spec)
self.reportclient.put_allocations(
node, server_obj.uuid, alloc_data,
server_obj.project_id, server_obj.user_id)
return dest_nodes
return _schedule(self, context, request_spec, filter_properties)

View File

@ -1,39 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Scheduler node filters
"""
from mogan.scheduler import base_filter
class BaseNodeFilter(base_filter.BaseFilter):
"""Base class for node filters."""
def _filter_one(self, obj, filter_properties):
"""Return True if the object passes the filter, otherwise False."""
return self.node_passes(obj, filter_properties)
def node_passes(self, node_state, filter_properties):
"""Return True if the NodeState passes the filter, otherwise False.
Override this in a subclass.
"""
raise NotImplementedError()
class NodeFilterHandler(base_filter.BaseFilterHandler):
def __init__(self, namespace):
super(NodeFilterHandler, self).__init__(BaseNodeFilter, namespace)

View File

@ -1,31 +0,0 @@
# Copyright (c) 2011-2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mogan.scheduler import filters
class AvailabilityZoneFilter(filters.BaseNodeFilter):
"""Filters Nodes by availability zone."""
# Availability zones do not change within a request
run_filter_once_per_request = True
def node_passes(self, node_state, filter_properties):
spec = filter_properties.get('request_spec', {})
availability_zone = spec.get('availability_zone')
if availability_zone:
return availability_zone == node_state.availability_zone
return True

View File

@ -1,86 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log as logging
from mogan.scheduler import filters
from mogan.scheduler.filters import extra_specs_ops
LOG = logging.getLogger(__name__)
class CapabilitiesFilter(filters.BaseNodeFilter):
"""NodeFilter to work with resource server type records."""
def _satisfies_extra_specs(self, capabilities, resource_type):
"""Check if capabilities satisfy resource type requirements.
Check that the capabilities provided by the services satisfy
the extra specs associated with the resource type.
"""
if not resource_type:
return True
extra_specs = resource_type.get('extra_specs', [])
if not extra_specs:
return True
for key, req in extra_specs.items():
# Either not scoped format, or in capabilities scope
scope = key.split(':')
# Ignore scoped (such as vendor-specific) capabilities
if len(scope) > 1 and scope[0] != "capabilities":
continue
# Strip off prefix if spec started with 'capabilities:'
elif scope[0] == "capabilities":
del scope[0]
cap = capabilities
for index in range(len(scope)):
try:
cap = cap[scope[index]]
except (TypeError, KeyError):
LOG.debug("Node doesn't provide capability '%(cap)s' " %
{'cap': scope[index]})
return False
# Make all capability values a list so we can handle lists
cap_list = [cap] if not isinstance(cap, list) else cap
# Loop through capability values looking for any match
for cap_value in cap_list:
if extra_specs_ops.match(cap_value, req):
break
else:
# Nothing matched, so bail out
LOG.debug('Flavor extra spec requirement '
'"%(key)s=%(req)s" does not match reported '
'capability "%(cap)s"',
{'key': key, 'req': req, 'cap': cap})
return False
return True
def node_passes(self, node_state, filter_properties):
"""Return a list of nodes that can create resource_type."""
resource_type = filter_properties.get('resource_type')
if not self._satisfies_extra_specs(node_state.capabilities,
resource_type):
LOG.debug("%(node_state)s fails resource_type extra_specs "
"requirements", {'node_state': node_state})
return False
return True

View File

@ -1,77 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
from oslo_utils import strutils
# 1. The following operations are supported:
# =, s==, s!=, s>=, s>, s<=, s<, <in>, <is>, <or>, ==, !=, >=, <=
# 2. Note that <or> is handled in a different way below.
# 3. If the first word in the extra_specs is not one of the operators,
# it is ignored.
_op_methods = {'=': lambda x, y: float(x) >= float(y),
'<in>': lambda x, y: y in x,
'<is>': lambda x, y: (strutils.bool_from_string(x) is
strutils.bool_from_string(y)),
'==': lambda x, y: float(x) == float(y),
'!=': lambda x, y: float(x) != float(y),
'>=': lambda x, y: float(x) >= float(y),
'<=': lambda x, y: float(x) <= float(y),
's==': operator.eq,
's!=': operator.ne,
's<': operator.lt,
's<=': operator.le,
's>': operator.gt,
's>=': operator.ge}
def match(value, req):
if req is None:
if value is None:
return True
else:
return False
words = req.split()
op = method = None
if words:
op = words.pop(0)
method = _op_methods.get(op)
if op != '<or>' and not method:
return value == req
if value is None:
return False
if op == '<or>': # Ex: <or> v1 <or> v2 <or> v3
while True:
if words.pop(0) == value:
return True
if not words:
break
op = words.pop(0) # remove a keyword <or>
if not words:
break
return False
try:
if words and method(value, words[0]):
return True
except ValueError:
pass
return False

View File

@ -1,32 +0,0 @@
# Copyright 2016 Huawei Technologies Co.,LTD.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from mogan.scheduler import filters
class FlavorFilter(filters.BaseNodeFilter):
"""Filters Nodes by server type."""
# Flavors do not change within a request
run_filter_once_per_request = True
def node_passes(self, node_state, filter_properties):
spec = filter_properties.get('request_spec', {})
flavor = spec.get('flavor', {})
type_name = flavor.get('name')
if type_name:
return type_name == node_state.flavor
return True

View File

@ -1,149 +0,0 @@
# Copyright (c) 2011 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import operator
from oslo_serialization import jsonutils
import six
from mogan.scheduler import filters
class JsonFilter(filters.BaseNodeFilter):
"""Node Filter to allow simple JSON-based grammar for selecting nodes."""
def _op_compare(self, args, op):
"""Compare first item of args with the rest using specified operator.
Returns True if the specified operator can successfully
compare the first item in the args with all the rest. Will
return False if only one item is in the list.
"""
if len(args) < 2:
return False
if op is operator.contains:
bad = args[0] not in args[1:]
else:
bad = [arg for arg in args[1:]
if not op(args[0], arg)]
return not bool(bad)
def _equals(self, args):
"""First term is == all the other terms."""
return self._op_compare(args, operator.eq)
def _less_than(self, args):
"""First term is < all the other terms."""
return self._op_compare(args, operator.lt)
def _greater_than(self, args):
"""First term is > all the other terms."""
return self._op_compare(args, operator.gt)
def _in(self, args):
"""First term is in set of remaining terms."""
return self._op_compare(args, operator.contains)
def _less_than_equal(self, args):
"""First term is <= all the other terms."""
return self._op_compare(args, operator.le)
def _greater_than_equal(self, args):
"""First term is >= all the other terms."""
return self._op_compare(args, operator.ge)
def _not(self, args):
"""Flip each of the arguments."""
return [not arg for arg in args]
def _or(self, args):
"""True if any arg is True."""
return any(args)
def _and(self, args):
"""True if all args are True."""
return all(args)
commands = {
'=': _equals,
'<': _less_than,
'>': _greater_than,
'in': _in,
'<=': _less_than_equal,
'>=': _greater_than_equal,
'not': _not,
'or': _or,
'and': _and,
}
def _parse_string(self, string, node_state):
"""Parse capability lookup strings.
Strings prefixed with $ are capability lookups in the
form '$variable' where 'variable' is an attribute in the
NodeState class. If $variable is a dictionary, you may
use: $variable.dictkey
"""
if not string:
return None
if not string.startswith("$"):
return string
path = string[1:].split(".")
obj = getattr(node_state, path[0], None)
if obj is None:
return None
for item in path[1:]:
obj = obj.get(item)
if obj is None:
return None
return obj
def _process_filter(self, query, node_state):
"""Recursively parse the query structure."""
if not query:
return True
cmd = query[0]
method = self.commands[cmd]
cooked_args = []
for arg in query[1:]:
if isinstance(arg, list):
arg = self._process_filter(arg, node_state)
elif isinstance(arg, six.string_types):