Reporting nodes resource to placement service
This change importing placement service, report nodes resources to placement and use it as unique nodes resource provider service for Mogan's scheduler. Co-Authored-By: Zhenguo Niu <Niu.ZGlinux@gmail.com> Change-Id: I1cb91a10cfc8139687ad44167f33f45fd3c08e5c Partially Implements: bp track-resources-using-placement
This commit is contained in:
parent
bddfad0863
commit
ee3c64d77f
@ -84,6 +84,16 @@ function configure_mogan {
|
|||||||
iniset ${MOGAN_CONF_FILE} ironic auth_type "password"
|
iniset ${MOGAN_CONF_FILE} ironic auth_type "password"
|
||||||
iniset ${MOGAN_CONF_FILE} ironic api_endpoint "${KEYSTONE_AUTH_PROTOCOL}://${SERVICE_HOST}:${IRONIC_SERVICE_PORT}"
|
iniset ${MOGAN_CONF_FILE} ironic api_endpoint "${KEYSTONE_AUTH_PROTOCOL}://${SERVICE_HOST}:${IRONIC_SERVICE_PORT}"
|
||||||
|
|
||||||
|
# Setup placement section
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement project_domain_name ${SERVICE_DOMAIN_NAME}
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement project_name ${SERVICE_PROJECT_NAME}
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement user_domain_name ${SERVICE_DOMAIN_NAME}
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement username "placement"
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement password ${SERVICE_PASSWORD}
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement auth_url ${KEYSTONE_AUTH_URI}
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement auth_type "password"
|
||||||
|
iniset ${MOGAN_CONF_FILE} placement api_endpoint "${KEYSTONE_AUTH_PROTOCOL}://${SERVICE_HOST}:${IRONIC_SERVICE_PORT}"
|
||||||
|
|
||||||
# Setup neutron section
|
# Setup neutron section
|
||||||
iniset ${MOGAN_CONF_FILE} neutron url "${NEUTRON_SERVICE_PROTOCOL}://${SERVICE_HOST}:${NEUTRON_SERVICE_PORT}"
|
iniset ${MOGAN_CONF_FILE} neutron url "${NEUTRON_SERVICE_PROTOCOL}://${SERVICE_HOST}:${NEUTRON_SERVICE_PORT}"
|
||||||
|
|
||||||
@ -187,6 +197,9 @@ function update_ironic_node_resource_class {
|
|||||||
|
|
||||||
|
|
||||||
if is_service_enabled mogan; then
|
if is_service_enabled mogan; then
|
||||||
|
if ! is_service_enabled placement; then
|
||||||
|
die "placement service is required for Mogan"
|
||||||
|
fi
|
||||||
if is_service_enabled tempest; then
|
if is_service_enabled tempest; then
|
||||||
iniset $TEMPEST_CONFIG compute fixed_network_name $PRIVATE_NETWORK_NAME
|
iniset $TEMPEST_CONFIG compute fixed_network_name $PRIVATE_NETWORK_NAME
|
||||||
fi
|
fi
|
||||||
|
@ -420,4 +420,19 @@ class KeypairNotFound(NotFound):
|
|||||||
class InvalidKeypair(Invalid):
|
class InvalidKeypair(Invalid):
|
||||||
_msg_fmt = _("Keypair data is invalid: %(reason)s")
|
_msg_fmt = _("Keypair data is invalid: %(reason)s")
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidInventory(Invalid):
|
||||||
|
_msg_fmt = _("Inventory for '%(resource_class)s' on "
|
||||||
|
"resource provider '%(resource_provider)s' invalid.")
|
||||||
|
|
||||||
|
|
||||||
|
class InvalidResourceClass(Invalid):
|
||||||
|
_msg_fmt = _("Resource class '%(resource_class)s' invalid.")
|
||||||
|
|
||||||
|
|
||||||
|
class InventoryInUse(InvalidInventory):
|
||||||
|
_msg_fmt = _("Inventory for '%(resource_classes)s' on "
|
||||||
|
"resource provider '%(resource_provider)s' in use.")
|
||||||
|
|
||||||
|
|
||||||
ObjectActionError = obj_exc.ObjectActionError
|
ObjectActionError = obj_exc.ObjectActionError
|
||||||
|
@ -25,6 +25,7 @@ from mogan.conf import glance
|
|||||||
from mogan.conf import ironic
|
from mogan.conf import ironic
|
||||||
from mogan.conf import keystone
|
from mogan.conf import keystone
|
||||||
from mogan.conf import neutron
|
from mogan.conf import neutron
|
||||||
|
from mogan.conf import placement
|
||||||
from mogan.conf import quota
|
from mogan.conf import quota
|
||||||
from mogan.conf import scheduler
|
from mogan.conf import scheduler
|
||||||
from mogan.conf import shellinabox
|
from mogan.conf import shellinabox
|
||||||
@ -44,3 +45,4 @@ quota.register_opts(CONF)
|
|||||||
scheduler.register_opts(CONF)
|
scheduler.register_opts(CONF)
|
||||||
shellinabox.register_opts(CONF)
|
shellinabox.register_opts(CONF)
|
||||||
cache.register_opts(CONF)
|
cache.register_opts(CONF)
|
||||||
|
placement.register_opts(CONF)
|
||||||
|
53
mogan/conf/placement.py
Normal file
53
mogan/conf/placement.py
Normal file
@ -0,0 +1,53 @@
|
|||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
from keystoneauth1 import loading as ks_loading
|
||||||
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
placement_group = cfg.OptGroup(
|
||||||
|
'placement',
|
||||||
|
title='Placement Service Options',
|
||||||
|
help="Configuration options for connecting to the placement API service")
|
||||||
|
|
||||||
|
placement_opts = [
|
||||||
|
cfg.StrOpt('os_region_name',
|
||||||
|
help="""
|
||||||
|
Region name of this node. This is used when picking the URL in the service
|
||||||
|
catalog.
|
||||||
|
|
||||||
|
Possible values:
|
||||||
|
|
||||||
|
* Any string representing region name
|
||||||
|
"""),
|
||||||
|
cfg.StrOpt('os_interface',
|
||||||
|
help="""
|
||||||
|
Endpoint interface for this node. This is used when picking the URL in the
|
||||||
|
service catalog.
|
||||||
|
""")
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def register_opts(conf):
|
||||||
|
conf.register_group(placement_group)
|
||||||
|
conf.register_opts(placement_opts, group=placement_group)
|
||||||
|
ks_loading.register_session_conf_options(conf, placement_group.name)
|
||||||
|
ks_loading.register_auth_conf_options(conf, placement_group.name)
|
||||||
|
|
||||||
|
|
||||||
|
def list_opts():
|
||||||
|
return {
|
||||||
|
placement_group.name: (
|
||||||
|
placement_opts +
|
||||||
|
ks_loading.get_session_conf_options() +
|
||||||
|
ks_loading.get_auth_common_conf_options() +
|
||||||
|
ks_loading.get_auth_plugin_conf_options('v3password'))
|
||||||
|
}
|
@ -121,6 +121,21 @@ class BaseEngineDriver(object):
|
|||||||
"""
|
"""
|
||||||
raise NotImplementedError()
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
def get_available_nodes(self):
|
||||||
|
"""Retrieve all nodes information.
|
||||||
|
|
||||||
|
:returns: Dictionary describing nodes
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_node_inventory(node):
|
||||||
|
"""Get the inventory of a node.
|
||||||
|
|
||||||
|
:param node: node to get its inventory data.
|
||||||
|
"""
|
||||||
|
raise NotImplementedError()
|
||||||
|
|
||||||
|
|
||||||
def load_engine_driver(engine_driver):
|
def load_engine_driver(engine_driver):
|
||||||
"""Load a engine driver module.
|
"""Load a engine driver module.
|
||||||
|
@ -653,3 +653,51 @@ class IronicDriver(base_driver.BaseEngineDriver):
|
|||||||
else:
|
else:
|
||||||
LOG.debug('Console is disabled for node %s', node_uuid)
|
LOG.debug('Console is disabled for node %s', node_uuid)
|
||||||
raise exception.ConsoleNotAvailable()
|
raise exception.ConsoleNotAvailable()
|
||||||
|
|
||||||
|
def get_available_nodes(self):
|
||||||
|
"""Helper function to return the list of all nodes.
|
||||||
|
|
||||||
|
If unable to connect ironic server, an empty list is returned.
|
||||||
|
|
||||||
|
:returns: a list of normal nodes from ironic
|
||||||
|
|
||||||
|
"""
|
||||||
|
normal_nodes = []
|
||||||
|
params = {
|
||||||
|
'detail': True,
|
||||||
|
'limit': 0,
|
||||||
|
'maintenance': False
|
||||||
|
}
|
||||||
|
try:
|
||||||
|
node_list = self.ironicclient.call("node.list", **params)
|
||||||
|
except client_e.ClientException as e:
|
||||||
|
LOG.exception("Could not get nodes from ironic. Reason: "
|
||||||
|
"%(detail)s", {'detail': e.message})
|
||||||
|
return []
|
||||||
|
|
||||||
|
bad_power_states = [ironic_states.ERROR, ironic_states.NOSTATE]
|
||||||
|
# keep NOSTATE around for compatibility
|
||||||
|
good_provision_states = [
|
||||||
|
ironic_states.AVAILABLE, ironic_states.NOSTATE]
|
||||||
|
for node_obj in node_list:
|
||||||
|
if ((node_obj.resource_class is None) or
|
||||||
|
node_obj.power_state in bad_power_states or
|
||||||
|
(node_obj.provision_state in good_provision_states and
|
||||||
|
node_obj.instance_uuid is not None)):
|
||||||
|
continue
|
||||||
|
normal_nodes.append(node_obj)
|
||||||
|
return normal_nodes
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def get_node_inventory(node):
|
||||||
|
"""Get the inventory of a node.
|
||||||
|
|
||||||
|
:param node: server to get its inventory data.
|
||||||
|
"""
|
||||||
|
return {'total': 1,
|
||||||
|
'reserved': 0,
|
||||||
|
'min_unit': 1,
|
||||||
|
'max_unit': 1,
|
||||||
|
'step_size': 1,
|
||||||
|
'allocation_ratio': 1.0,
|
||||||
|
}
|
||||||
|
@ -37,6 +37,8 @@ from mogan.notifications import base as notifications
|
|||||||
from mogan import objects
|
from mogan import objects
|
||||||
from mogan.objects import fields
|
from mogan.objects import fields
|
||||||
from mogan.objects import quota
|
from mogan.objects import quota
|
||||||
|
from mogan.scheduler import client
|
||||||
|
from mogan.scheduler import utils as sched_utils
|
||||||
|
|
||||||
LOG = log.getLogger(__name__)
|
LOG = log.getLogger(__name__)
|
||||||
|
|
||||||
@ -77,6 +79,7 @@ class EngineManager(base_manager.BaseEngineManager):
|
|||||||
super(EngineManager, self).__init__(*args, **kwargs)
|
super(EngineManager, self).__init__(*args, **kwargs)
|
||||||
self.quota = quota.Quota()
|
self.quota = quota.Quota()
|
||||||
self.quota.register_resource(objects.quota.ServerResource())
|
self.quota.register_resource(objects.quota.ServerResource())
|
||||||
|
self.scheduler_client = client.SchedulerClient()
|
||||||
|
|
||||||
def _get_compute_port(self, context, port_uuid):
|
def _get_compute_port(self, context, port_uuid):
|
||||||
"""Gets compute port by the uuid."""
|
"""Gets compute port by the uuid."""
|
||||||
@ -160,6 +163,7 @@ class EngineManager(base_manager.BaseEngineManager):
|
|||||||
for uuid, node in nodes.items():
|
for uuid, node in nodes.items():
|
||||||
if node.get('resource_class') is None:
|
if node.get('resource_class') is None:
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# initialize the compute node object, creating it
|
# initialize the compute node object, creating it
|
||||||
# if it does not already exist.
|
# if it does not already exist.
|
||||||
self._init_compute_node(context, node)
|
self._init_compute_node(context, node)
|
||||||
@ -171,6 +175,27 @@ class EngineManager(base_manager.BaseEngineManager):
|
|||||||
{'id': cn.node_uuid})
|
{'id': cn.node_uuid})
|
||||||
cn.destroy()
|
cn.destroy()
|
||||||
|
|
||||||
|
all_nodes = self.driver.get_available_nodes()
|
||||||
|
|
||||||
|
all_rps = self.scheduler_client.reportclient\
|
||||||
|
.get_filtered_resource_providers({})
|
||||||
|
node_uuids = [node.uuid for node in all_nodes]
|
||||||
|
|
||||||
|
# Clean orphan resource providers in placement
|
||||||
|
for rp in all_rps:
|
||||||
|
if rp['uuid'] not in node_uuids:
|
||||||
|
self.scheduler_client.reportclient.delete_resource_provider(
|
||||||
|
rp['uuid'])
|
||||||
|
|
||||||
|
for node in all_nodes:
|
||||||
|
resource_class = sched_utils.ensure_resource_class_name(
|
||||||
|
node.resource_class)
|
||||||
|
inventory = self.driver.get_node_inventory(node)
|
||||||
|
inventory_data = {resource_class: inventory}
|
||||||
|
self.scheduler_client.set_inventory_for_provider(
|
||||||
|
node.uuid, node.name, inventory_data,
|
||||||
|
resource_class)
|
||||||
|
|
||||||
@periodic_task.periodic_task(spacing=CONF.engine.sync_power_state_interval,
|
@periodic_task.periodic_task(spacing=CONF.engine.sync_power_state_interval,
|
||||||
run_immediately=True)
|
run_immediately=True)
|
||||||
def _sync_power_states(self, context):
|
def _sync_power_states(self, context):
|
||||||
|
52
mogan/scheduler/client/__init__.py
Normal file
52
mogan/scheduler/client/__init__.py
Normal file
@ -0,0 +1,52 @@
|
|||||||
|
# Copyright (c) 2014 Red Hat, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from oslo_utils import importutils
|
||||||
|
|
||||||
|
|
||||||
|
class LazyLoader(object):
|
||||||
|
|
||||||
|
def __init__(self, klass, *args, **kwargs):
|
||||||
|
self.klass = klass
|
||||||
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
self.instance = None
|
||||||
|
|
||||||
|
def __getattr__(self, name):
|
||||||
|
return functools.partial(self.__run_method, name)
|
||||||
|
|
||||||
|
def __run_method(self, __name, *args, **kwargs):
|
||||||
|
if self.instance is None:
|
||||||
|
self.instance = self.klass(*self.args, **self.kwargs)
|
||||||
|
return getattr(self.instance, __name)(*args, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
class SchedulerClient(object):
|
||||||
|
"""Client library for placing calls to the scheduler."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.reportclient = LazyLoader(importutils.import_class(
|
||||||
|
'mogan.scheduler.client.report.SchedulerReportClient'))
|
||||||
|
|
||||||
|
def set_inventory_for_provider(self, rp_uuid, rp_name, inv_data,
|
||||||
|
res_class):
|
||||||
|
self.reportclient.set_inventory_for_provider(
|
||||||
|
rp_uuid,
|
||||||
|
rp_name,
|
||||||
|
inv_data,
|
||||||
|
res_class
|
||||||
|
)
|
749
mogan/scheduler/client/report.py
Normal file
749
mogan/scheduler/client/report.py
Normal file
@ -0,0 +1,749 @@
|
|||||||
|
# Copyright (c) 2014 Red Hat, Inc.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
import functools
|
||||||
|
import re
|
||||||
|
import time
|
||||||
|
|
||||||
|
from keystoneauth1 import exceptions as ks_exc
|
||||||
|
from keystoneauth1 import loading as keystone
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
from six.moves.urllib import parse
|
||||||
|
|
||||||
|
from mogan.common import exception
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
_RE_INV_IN_USE = re.compile("Inventory for (.+) on resource provider "
|
||||||
|
"(.+) in use")
|
||||||
|
WARN_EVERY = 10
|
||||||
|
|
||||||
|
|
||||||
|
def warn_limit(self, msg):
|
||||||
|
if self._warn_count:
|
||||||
|
self._warn_count -= 1
|
||||||
|
else:
|
||||||
|
self._warn_count = WARN_EVERY
|
||||||
|
LOG.warning(msg)
|
||||||
|
|
||||||
|
|
||||||
|
def safe_connect(f):
|
||||||
|
@functools.wraps(f)
|
||||||
|
def wrapper(self, *a, **k):
|
||||||
|
try:
|
||||||
|
return f(self, *a, **k)
|
||||||
|
except ks_exc.EndpointNotFound:
|
||||||
|
warn_limit(
|
||||||
|
self,
|
||||||
|
'The placement API endpoint not found.')
|
||||||
|
except ks_exc.MissingAuthPlugin:
|
||||||
|
warn_limit(
|
||||||
|
self,
|
||||||
|
'No authentication information found for placement API.')
|
||||||
|
except ks_exc.Unauthorized:
|
||||||
|
warn_limit(
|
||||||
|
self,
|
||||||
|
('Placement service credentials do not work.'))
|
||||||
|
except ks_exc.DiscoveryFailure:
|
||||||
|
# TODO(_gryf): Looks like DiscoveryFailure is not the only missing
|
||||||
|
# exception here. In Pike we should take care about keystoneauth1
|
||||||
|
# failures handling globally.
|
||||||
|
warn_limit(self,
|
||||||
|
'Discovering suitable URL for placement API '
|
||||||
|
'failed.')
|
||||||
|
except ks_exc.ConnectFailure:
|
||||||
|
msg = 'Placement API service is not responding.'
|
||||||
|
LOG.warning(msg)
|
||||||
|
|
||||||
|
return wrapper
|
||||||
|
|
||||||
|
|
||||||
|
def _extract_inventory_in_use(body):
|
||||||
|
"""Given an HTTP response body, extract the resource classes that were
|
||||||
|
still in use when we tried to delete inventory.
|
||||||
|
|
||||||
|
:returns: String of resource classes or None if there was no InventoryInUse
|
||||||
|
error in the response body.
|
||||||
|
"""
|
||||||
|
match = _RE_INV_IN_USE.search(body)
|
||||||
|
if match:
|
||||||
|
return match.group(1)
|
||||||
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def get_placement_request_id(response):
|
||||||
|
if response is not None:
|
||||||
|
return response.headers.get(
|
||||||
|
'openstack-request-id',
|
||||||
|
response.headers.get('x-openstack-request-id'))
|
||||||
|
|
||||||
|
|
||||||
|
class SchedulerReportClient(object):
|
||||||
|
"""Client class for updating the scheduler."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
# A dict, keyed by the resource provider UUID, of ResourceProvider
|
||||||
|
# objects that will have their inventories and allocations tracked by
|
||||||
|
# the placement API for the compute host
|
||||||
|
self._resource_providers = {}
|
||||||
|
# A dict, keyed by resource provider UUID, of sets of aggregate UUIDs
|
||||||
|
# the provider is associated with
|
||||||
|
self._provider_aggregate_map = {}
|
||||||
|
auth_plugin = keystone.load_auth_from_conf_options(
|
||||||
|
CONF, 'placement')
|
||||||
|
self._client = keystone.load_session_from_conf_options(
|
||||||
|
CONF, 'placement', auth=auth_plugin)
|
||||||
|
# NOTE(danms): Keep track of how naggy we've been
|
||||||
|
self._warn_count = 0
|
||||||
|
self.ks_filter = {'service_type': 'placement',
|
||||||
|
'region_name': CONF.placement.os_region_name,
|
||||||
|
'interface': CONF.placement.os_interface}
|
||||||
|
|
||||||
|
def get(self, url, version=None):
|
||||||
|
kwargs = {}
|
||||||
|
if version is not None:
|
||||||
|
# TODO(mriedem): Perform some version discovery at some point.
|
||||||
|
kwargs = {
|
||||||
|
'headers': {
|
||||||
|
'OpenStack-API-Version': 'placement %s' % version
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return self._client.get(
|
||||||
|
url,
|
||||||
|
endpoint_filter=self.ks_filter, raise_exc=False, **kwargs)
|
||||||
|
|
||||||
|
def post(self, url, data, version=None):
|
||||||
|
# NOTE(sdague): using json= instead of data= sets the
|
||||||
|
# media type to application/json for us. Placement API is
|
||||||
|
# more sensitive to this than other APIs in the OpenStack
|
||||||
|
# ecosystem.
|
||||||
|
kwargs = {}
|
||||||
|
if version is not None:
|
||||||
|
# TODO(mriedem): Perform some version discovery at some point.
|
||||||
|
kwargs = {
|
||||||
|
'headers': {
|
||||||
|
'OpenStack-API-Version': 'placement %s' % version
|
||||||
|
},
|
||||||
|
}
|
||||||
|
return self._client.post(
|
||||||
|
url, json=data,
|
||||||
|
endpoint_filter=self.ks_filter, raise_exc=False, **kwargs)
|
||||||
|
|
||||||
|
def put(self, url, data, version=None):
|
||||||
|
# NOTE(sdague): using json= instead of data= sets the
|
||||||
|
# media type to application/json for us. Placement API is
|
||||||
|
# more sensitive to this than other APIs in the OpenStack
|
||||||
|
# ecosystem.
|
||||||
|
kwargs = {}
|
||||||
|
if version is not None:
|
||||||
|
# TODO(mriedem): Perform some version discovery at some point.
|
||||||
|
kwargs = {
|
||||||
|
'headers': {
|
||||||
|
'OpenStack-API-Version': 'placement %s' % version
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if data:
|
||||||
|
kwargs['json'] = data
|
||||||
|
return self._client.put(
|
||||||
|
url, endpoint_filter=self.ks_filter, raise_exc=False,
|
||||||
|
**kwargs)
|
||||||
|
|
||||||
|
def delete(self, url):
|
||||||
|
return self._client.delete(
|
||||||
|
url,
|
||||||
|
endpoint_filter=self.ks_filter, raise_exc=False)
|
||||||
|
|
||||||
|
# TODO(sbauza): Change that poor interface into passing a rich versioned
|
||||||
|
# object that would provide the ResourceProvider requirements.
|
||||||
|
@safe_connect
|
||||||
|
def get_filtered_resource_providers(self, filters):
|
||||||
|
"""Returns a list of ResourceProviders matching the requirements
|
||||||
|
expressed by the filters argument, which can include a dict named
|
||||||
|
'resources' where amounts are keyed by resource class names.
|
||||||
|
|
||||||
|
eg. filters = {'resources': {'CUSTOM_BAREMETAL_GOLD': 1}}
|
||||||
|
"""
|
||||||
|
resources = filters.pop("resources", None)
|
||||||
|
if resources:
|
||||||
|
resource_query = ",".join(sorted("%s:%s" % (rc, amount)
|
||||||
|
for (rc, amount) in resources.items()))
|
||||||
|
filters['resources'] = resource_query
|
||||||
|
resp = self.get("/resource_providers?%s" % parse.urlencode(filters),
|
||||||
|
version='1.4')
|
||||||
|
if resp.status_code == 200:
|
||||||
|
data = resp.json()
|
||||||
|
return data.get('resource_providers', [])
|
||||||
|
else:
|
||||||
|
msg = ("Failed to retrieve filtered list of resource providers "
|
||||||
|
"from placement API for filters %(filters)s. "
|
||||||
|
"Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'filters': filters,
|
||||||
|
'status_code': resp.status_code,
|
||||||
|
'err_text': resp.text,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
return None
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def _get_provider_aggregates(self, rp_uuid):
|
||||||
|
"""Queries the placement API for a resource provider's aggregates.
|
||||||
|
Returns a set() of aggregate UUIDs or None if no such resource provider
|
||||||
|
was found or there was an error communicating with the placement API.
|
||||||
|
|
||||||
|
:param rp_uuid: UUID of the resource provider to grab aggregates for.
|
||||||
|
"""
|
||||||
|
resp = self.get("/resource_providers/%s/aggregates" % rp_uuid,
|
||||||
|
version='1.1')
|
||||||
|
if resp.status_code == 200:
|
||||||
|
data = resp.json()
|
||||||
|
return set(data['aggregates'])
|
||||||
|
|
||||||
|
placement_req_id = get_placement_request_id(resp)
|
||||||
|
if resp.status_code == 404:
|
||||||
|
msg = "[%(placement_req_id)s] Tried to get a provider's "
|
||||||
|
"aggregates; however the provider %(uuid)s does not "
|
||||||
|
"exist."
|
||||||
|
args = {
|
||||||
|
'uuid': rp_uuid,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.warning(msg, args)
|
||||||
|
else:
|
||||||
|
msg = ("[%(placement_req_id)s] Failed to retrieve aggregates "
|
||||||
|
"from placement API for resource provider with UUID "
|
||||||
|
"%(uuid)s. Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
'uuid': rp_uuid,
|
||||||
|
'status_code': resp.status_code,
|
||||||
|
'err_text': resp.text,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def _get_resource_provider(self, uuid):
|
||||||
|
"""Queries the placement API for a resource provider record with the
|
||||||
|
supplied UUID.
|
||||||
|
|
||||||
|
Returns a dict of resource provider information if found or None if no
|
||||||
|
such resource provider could be found.
|
||||||
|
|
||||||
|
:param uuid: UUID identifier for the resource provider to look up
|
||||||
|
"""
|
||||||
|
resp = self.get("/resource_providers/%s" % uuid)
|
||||||
|
if resp.status_code == 200:
|
||||||
|
data = resp.json()
|
||||||
|
return data
|
||||||
|
elif resp.status_code == 404:
|
||||||
|
return None
|
||||||
|
else:
|
||||||
|
placement_req_id = get_placement_request_id(resp)
|
||||||
|
msg = ("[%(placement_req_id)s] Failed to retrieve resource "
|
||||||
|
"provider record from placement API for UUID %(uuid)s. "
|
||||||
|
"Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'uuid': uuid,
|
||||||
|
'status_code': resp.status_code,
|
||||||
|
'err_text': resp.text,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def _create_resource_provider(self, uuid, name):
|
||||||
|
"""Calls the placement API to create a new resource provider record.
|
||||||
|
|
||||||
|
Returns a dict of resource provider information object representing
|
||||||
|
the newly-created resource provider.
|
||||||
|
|
||||||
|
:param uuid: UUID of the new resource provider
|
||||||
|
:param name: Name of the resource provider
|
||||||
|
"""
|
||||||
|
url = "/resource_providers"
|
||||||
|
payload = {
|
||||||
|
'uuid': uuid,
|
||||||
|
'name': name,
|
||||||
|
}
|
||||||
|
resp = self.post(url, payload)
|
||||||
|
placement_req_id = get_placement_request_id(resp)
|
||||||
|
if resp.status_code == 201:
|
||||||
|
msg = ("[%(placement_req_id)s] Created resource provider "
|
||||||
|
"record via placement API for resource provider with "
|
||||||
|
"UUID %(uuid)s and name %(name)s.")
|
||||||
|
args = {
|
||||||
|
'uuid': uuid,
|
||||||
|
'name': name,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.info(msg, args)
|
||||||
|
return dict(
|
||||||
|
uuid=uuid,
|
||||||
|
name=name,
|
||||||
|
generation=0,
|
||||||
|
)
|
||||||
|
elif resp.status_code == 409:
|
||||||
|
# Another thread concurrently created a resource provider with the
|
||||||
|
# same UUID. Log a warning and then just return the resource
|
||||||
|
# provider object from _get_resource_provider()
|
||||||
|
msg = ("[%(placement_req_id)s] Another thread already created "
|
||||||
|
"a resource provider with the UUID %(uuid)s. Grabbing "
|
||||||
|
"that record from the placement API.")
|
||||||
|
args = {
|
||||||
|
'uuid': uuid,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.info(msg, args)
|
||||||
|
return self._get_resource_provider(uuid)
|
||||||
|
else:
|
||||||
|
msg = ("[%(placement_req_id)s] Failed to create resource "
|
||||||
|
"provider record in placement API for UUID %(uuid)s. "
|
||||||
|
"Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'uuid': uuid,
|
||||||
|
'status_code': resp.status_code,
|
||||||
|
'err_text': resp.text,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
|
||||||
|
def _ensure_resource_provider(self, uuid, name=None):
|
||||||
|
"""Ensures that the placement API has a record of a resource provider
|
||||||
|
with the supplied UUID. If not, creates the resource provider record in
|
||||||
|
the placement API for the supplied UUID, optionally passing in a name
|
||||||
|
for the resource provider.
|
||||||
|
|
||||||
|
The found or created resource provider object is returned from this
|
||||||
|
method. If the resource provider object for the supplied uuid was not
|
||||||
|
found and the resource provider record could not be created in the
|
||||||
|
placement API, we return None.
|
||||||
|
|
||||||
|
:param uuid: UUID identifier for the resource provider to ensure exists
|
||||||
|
:param name: Optional name for the resource provider if the record
|
||||||
|
does not exist. If empty, the name is set to the UUID
|
||||||
|
value
|
||||||
|
"""
|
||||||
|
if uuid in self._resource_providers:
|
||||||
|
# NOTE(jaypipes): This isn't optimal to check if aggregate
|
||||||
|
# associations have changed each time we call
|
||||||
|
# _ensure_resource_provider() and get a hit on the local cache of
|
||||||
|
# provider objects, however the alternative is to force operators
|
||||||
|
# to restart all their nova-compute workers every time they add or
|
||||||
|
# change an aggregate. We might optionally want to add some sort of
|
||||||
|
# cache refresh delay or interval as an optimization?
|
||||||
|
msg = "Refreshing aggregate associations for resource provider %s"
|
||||||
|
LOG.debug(msg, uuid)
|
||||||
|
aggs = self._get_provider_aggregates(uuid)
|
||||||
|
self._provider_aggregate_map[uuid] = aggs
|
||||||
|
return self._resource_providers[uuid]
|
||||||
|
|
||||||
|
rp = self._get_resource_provider(uuid)
|
||||||
|
if rp is None:
|
||||||
|
name = name or uuid
|
||||||
|
rp = self._create_resource_provider(uuid, name)
|
||||||
|
if rp is None:
|
||||||
|
return
|
||||||
|
msg = "Grabbing aggregate associations for resource provider %s"
|
||||||
|
LOG.debug(msg, uuid)
|
||||||
|
aggs = self._get_provider_aggregates(uuid)
|
||||||
|
self._resource_providers[uuid] = rp
|
||||||
|
self._provider_aggregate_map[uuid] = aggs
|
||||||
|
return rp
|
||||||
|
|
||||||
|
def _get_inventory(self, rp_uuid):
|
||||||
|
url = '/resource_providers/%s/inventories' % rp_uuid
|
||||||
|
result = self.get(url)
|
||||||
|
if not result:
|
||||||
|
return {'inventories': {}}
|
||||||
|
return result.json()
|
||||||
|
|
||||||
|
def _get_inventory_and_update_provider_generation(self, rp_uuid):
|
||||||
|
"""Helper method that retrieves the current inventory for the supplied
|
||||||
|
resource provider according to the placement API. If the cached
|
||||||
|
generation of the resource provider is not the same as the generation
|
||||||
|
returned from the placement API, we update the cached generation.
|
||||||
|
"""
|
||||||
|
curr = self._get_inventory(rp_uuid)
|
||||||
|
|
||||||
|
# Update our generation immediately, if possible. Even if there
|
||||||
|
# are no inventories we should always have a generation but let's
|
||||||
|
# be careful.
|
||||||
|
server_gen = curr.get('resource_provider_generation')
|
||||||
|
if server_gen:
|
||||||
|
my_rp = self._resource_providers[rp_uuid]
|
||||||
|
if server_gen != my_rp['generation']:
|
||||||
|
LOG.debug('Updating our resource provider generation '
|
||||||
|
'from %(old)i to %(new)i',
|
||||||
|
{'old': my_rp['generation'],
|
||||||
|
'new': server_gen})
|
||||||
|
my_rp['generation'] = server_gen
|
||||||
|
return curr
|
||||||
|
|
||||||
|
def _update_inventory_attempt(self, rp_uuid, inv_data):
|
||||||
|
"""Update the inventory for this resource provider if needed.
|
||||||
|
|
||||||
|
:param rp_uuid: The resource provider UUID for the operation
|
||||||
|
:param inv_data: The new inventory for the resource provider
|
||||||
|
:returns: True if the inventory was updated (or did not need to be),
|
||||||
|
False otherwise.
|
||||||
|
"""
|
||||||
|
curr = self._get_inventory_and_update_provider_generation(rp_uuid)
|
||||||
|
|
||||||
|
# Check to see if we need to update placement's view
|
||||||
|
if inv_data == curr.get('inventories', {}):
|
||||||
|
return True
|
||||||
|
|
||||||
|
cur_rp_gen = self._resource_providers[rp_uuid]['generation']
|
||||||
|
payload = {
|
||||||
|
'resource_provider_generation': cur_rp_gen,
|
||||||
|
'inventories': inv_data,
|
||||||
|
}
|
||||||
|
url = '/resource_providers/%s/inventories' % rp_uuid
|
||||||
|
result = self.put(url, payload)
|
||||||
|
if result.status_code == 409:
|
||||||
|
LOG.info('[%(placement_req_id)s] Inventory update conflict '
|
||||||
|
'for %(resource_provider_uuid)s with generation ID '
|
||||||
|
'%(generation_id)s',
|
||||||
|
{'placement_req_id': get_placement_request_id(result),
|
||||||
|
'resource_provider_uuid': rp_uuid,
|
||||||
|
'generation_id': cur_rp_gen})
|
||||||
|
# NOTE(jaypipes): There may be cases when we try to set a
|
||||||
|
# provider's inventory that results in attempting to delete an
|
||||||
|
# inventory record for a resource class that has an active
|
||||||
|
# allocation. We need to catch this particular case and raise an
|
||||||
|
# exception here instead of returning False, since we should not
|
||||||
|
# re-try the operation in this case.
|
||||||
|
#
|
||||||
|
# A use case for where this can occur is the following:
|
||||||
|
#
|
||||||
|
# 1) Provider created for each Ironic baremetal node in Newton
|
||||||
|
# 2) Inventory records for baremetal node created for VCPU,
|
||||||
|
# MEMORY_MB and DISK_GB
|
||||||
|
# 3) A Nova instance consumes the baremetal node and allocation
|
||||||
|
# records are created for VCPU, MEMORY_MB and DISK_GB matching
|
||||||
|
# the total amount of those resource on the baremetal node.
|
||||||
|
# 3) Upgrade to Ocata and now resource tracker wants to set the
|
||||||
|
# provider's inventory to a single record of resource class
|
||||||
|
# CUSTOM_IRON_SILVER (or whatever the Ironic node's
|
||||||
|
# "resource_class" attribute is)
|
||||||
|
# 4) Scheduler report client sends the inventory list containing a
|
||||||
|
# single CUSTOM_IRON_SILVER record and placement service
|
||||||
|
# attempts to delete the inventory records for VCPU, MEMORY_MB
|
||||||
|
# and DISK_GB. An exception is raised from the placement service
|
||||||
|
# because allocation records exist for those resource classes,
|
||||||
|
# and a 409 Conflict is returned to the compute node. We need to
|
||||||
|
# trigger a delete of the old allocation records and then set
|
||||||
|
# the new inventory, and then set the allocation record to the
|
||||||
|
# new CUSTOM_IRON_SILVER record.
|
||||||
|
match = _RE_INV_IN_USE.search(result.text)
|
||||||
|
if match:
|
||||||
|
rc = match.group(1)
|
||||||
|
raise exception.InventoryInUse(
|
||||||
|
resource_classes=rc,
|
||||||
|
resource_provider=rp_uuid,
|
||||||
|
)
|
||||||
|
|
||||||
|
# Invalidate our cache and re-fetch the resource provider
|
||||||
|
# to be sure to get the latest generation.
|
||||||
|
del self._resource_providers[rp_uuid]
|
||||||
|
# NOTE(jaypipes): We don't need to pass a name parameter to
|
||||||
|
# _ensure_resource_provider() because we know the resource provider
|
||||||
|
# record already exists. We're just reloading the record here.
|
||||||
|
self._ensure_resource_provider(rp_uuid)
|
||||||
|
return False
|
||||||
|
elif not result:
|
||||||
|
placement_req_id = get_placement_request_id(result)
|
||||||
|
LOG.warning(('[%(placement_req_id)s] Failed to update '
|
||||||
|
'inventory for resource provider '
|
||||||
|
'%(uuid)s: %(status)i %(text)s'),
|
||||||
|
{'placement_req_id': placement_req_id,
|
||||||
|
'uuid': rp_uuid,
|
||||||
|
'status': result.status_code,
|
||||||
|
'text': result.text})
|
||||||
|
# log the body at debug level
|
||||||
|
LOG.debug('[%(placement_req_id)s] Failed inventory update request '
|
||||||
|
'for resource provider %(uuid)s with body: %(payload)s',
|
||||||
|
{'placement_req_id': placement_req_id,
|
||||||
|
'uuid': rp_uuid,
|
||||||
|
'payload': payload})
|
||||||
|
return False
|
||||||
|
|
||||||
|
if result.status_code != 200:
|
||||||
|
placement_req_id = get_placement_request_id(result)
|
||||||
|
LOG.info(
|
||||||
|
('[%(placement_req_id)s] Received unexpected response code '
|
||||||
|
'%(code)i while trying to update inventory for resource '
|
||||||
|
'provider %(uuid)s: %(text)s'),
|
||||||
|
{'placement_req_id': placement_req_id,
|
||||||
|
'uuid': rp_uuid,
|
||||||
|
'code': result.status_code,
|
||||||
|
'text': result.text})
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Update our view of the generation for next time
|
||||||
|
updated_inventories_result = result.json()
|
||||||
|
new_gen = updated_inventories_result['resource_provider_generation']
|
||||||
|
self._resource_providers[rp_uuid]['generation'] = new_gen
|
||||||
|
LOG.debug('Updated inventory for %s at generation %i',
|
||||||
|
rp_uuid, new_gen)
|
||||||
|
return True
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def _update_inventory(self, rp_uuid, inv_data):
|
||||||
|
for attempt in (1, 2, 3):
|
||||||
|
if rp_uuid not in self._resource_providers:
|
||||||
|
# NOTE(danms): Either we failed to fetch/create the RP
|
||||||
|
# on our first attempt, or a previous attempt had to
|
||||||
|
# invalidate the cache, and we were unable to refresh
|
||||||
|
# it. Bail and try again next time.
|
||||||
|
LOG.warning('Unable to refresh my resource provider record')
|
||||||
|
return False
|
||||||
|
if self._update_inventory_attempt(rp_uuid, inv_data):
|
||||||
|
return True
|
||||||
|
time.sleep(1)
|
||||||
|
return False
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def _delete_inventory(self, rp_uuid):
|
||||||
|
"""Deletes all inventory records for a resource provider with the
|
||||||
|
supplied UUID.
|
||||||
|
"""
|
||||||
|
curr = self._get_inventory_and_update_provider_generation(rp_uuid)
|
||||||
|
|
||||||
|
# Check to see if we need to update placement's view
|
||||||
|
if not curr.get('inventories', {}):
|
||||||
|
msg = "No inventory to delete from resource provider %s."
|
||||||
|
LOG.debug(msg, rp_uuid)
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = ("Compute node %s reported no inventory but previous "
|
||||||
|
"inventory was detected. Deleting existing inventory "
|
||||||
|
"records.")
|
||||||
|
LOG.info(msg, rp_uuid)
|
||||||
|
|
||||||
|
url = '/resource_providers/%s/inventories' % rp_uuid
|
||||||
|
cur_rp_gen = self._resource_providers[rp_uuid]['generation']
|
||||||
|
payload = {
|
||||||
|
'resource_provider_generation': cur_rp_gen,
|
||||||
|
'inventories': {},
|
||||||
|
}
|
||||||
|
r = self.put(url, payload)
|
||||||
|
placement_req_id = get_placement_request_id(r)
|
||||||
|
if r.status_code == 200:
|
||||||
|
# Update our view of the generation for next time
|
||||||
|
updated_inv = r.json()
|
||||||
|
new_gen = updated_inv['resource_provider_generation']
|
||||||
|
|
||||||
|
self._resource_providers[rp_uuid]['generation'] = new_gen
|
||||||
|
msg_args = {
|
||||||
|
'rp_uuid': rp_uuid,
|
||||||
|
'generation': new_gen,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.info(('[%(placement_req_id)s] Deleted all inventory for '
|
||||||
|
'resource provider %(rp_uuid)s at generation '
|
||||||
|
'%(generation)i'),
|
||||||
|
msg_args)
|
||||||
|
return
|
||||||
|
elif r.status_code == 409:
|
||||||
|
rc_str = _extract_inventory_in_use(r.text)
|
||||||
|
if rc_str is not None:
|
||||||
|
msg = ("[%(placement_req_id)s] We cannot delete inventory "
|
||||||
|
"%(rc_str)s for resource provider %(rp_uuid)s "
|
||||||
|
"because the inventory is in use.")
|
||||||
|
msg_args = {
|
||||||
|
'rp_uuid': rp_uuid,
|
||||||
|
'rc_str': rc_str,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.warning(msg, msg_args)
|
||||||
|
return
|
||||||
|
|
||||||
|
msg = ("[%(placement_req_id)s] Failed to delete inventory for "
|
||||||
|
"resource provider %(rp_uuid)s. Got error response: %(err)s")
|
||||||
|
msg_args = {
|
||||||
|
'rp_uuid': rp_uuid,
|
||||||
|
'err': r.text,
|
||||||
|
'placement_req_id': placement_req_id,
|
||||||
|
}
|
||||||
|
LOG.error(msg, msg_args)
|
||||||
|
|
||||||
|
def set_inventory_for_provider(self, rp_uuid, rp_name, inv_data,
|
||||||
|
resource_class):
|
||||||
|
"""Given the UUID of a provider, set the inventory records for the
|
||||||
|
provider to the supplied dict of resources.
|
||||||
|
|
||||||
|
:param rp_uuid: UUID of the resource provider to set inventory for
|
||||||
|
:param rp_name: Name of the resource provider in case we need to create
|
||||||
|
a record for it in the placement API
|
||||||
|
:param inv_data: Dict, keyed by resource class name, of inventory data
|
||||||
|
to set against the provider
|
||||||
|
|
||||||
|
:raises: exc.InvalidResourceClass if a supplied custom resource class
|
||||||
|
name does not meet the placement API's format requirements.
|
||||||
|
"""
|
||||||
|
self._ensure_resource_provider(rp_uuid, rp_name)
|
||||||
|
|
||||||
|
# Auto-create custom resource classes coming from a virt driver
|
||||||
|
self._ensure_resource_class(resource_class)
|
||||||
|
|
||||||
|
if inv_data:
|
||||||
|
self._update_inventory(rp_uuid, inv_data)
|
||||||
|
else:
|
||||||
|
self._delete_inventory(rp_uuid)
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def _ensure_resource_class(self, name):
|
||||||
|
"""Make sure a custom resource class exists.
|
||||||
|
|
||||||
|
First attempt to PUT the resource class using microversion 1.7. If
|
||||||
|
this results in a 406, fail over to a GET and POST with version 1.2.
|
||||||
|
|
||||||
|
Returns the name of the resource class if it was successfully
|
||||||
|
created or already exists. Otherwise None.
|
||||||
|
|
||||||
|
:param name: String name of the resource class to check/create.
|
||||||
|
:raises: `exception.InvalidResourceClass` upon error.
|
||||||
|
"""
|
||||||
|
# no payload on the put request
|
||||||
|
response = self.put("/resource_classes/%s" % name, None, version="1.7")
|
||||||
|
if 200 <= response.status_code < 300:
|
||||||
|
return name
|
||||||
|
elif response.status_code == 406:
|
||||||
|
# microversion 1.7 not available so try the earlier way
|
||||||
|
# TODO(cdent): When we're happy that all placement
|
||||||
|
# servers support microversion 1.7 we can remove this
|
||||||
|
# call and the associated code.
|
||||||
|
LOG.debug('Falling back to placement API microversion 1.2 '
|
||||||
|
'for resource class management.')
|
||||||
|
return self._get_or_create_resource_class(name)
|
||||||
|
else:
|
||||||
|
msg = ("Failed to ensure resource class record with "
|
||||||
|
"placement API for resource class %(rc_name)s. "
|
||||||
|
"Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'rc_name': name,
|
||||||
|
'status_code': response.status_code,
|
||||||
|
'err_text': response.text,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
raise exception.InvalidResourceClass(resource_class=name)
|
||||||
|
|
||||||
|
def _get_or_create_resource_class(self, name):
|
||||||
|
"""Queries the placement API for a resource class supplied resource
|
||||||
|
class string name. If the resource class does not exist, creates it.
|
||||||
|
|
||||||
|
Returns the resource class name if exists or was created, else None.
|
||||||
|
|
||||||
|
:param name: String name of the resource class to check/create.
|
||||||
|
"""
|
||||||
|
resp = self.get("/resource_classes/%s" % name, version="1.2")
|
||||||
|
if 200 <= resp.status_code < 300:
|
||||||
|
return name
|
||||||
|
elif resp.status_code == 404:
|
||||||
|
self._create_resource_class(name)
|
||||||
|
return name
|
||||||
|
else:
|
||||||
|
msg = ("Failed to retrieve resource class record from "
|
||||||
|
"placement API for resource class %(rc_name)s. "
|
||||||
|
"Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'rc_name': name,
|
||||||
|
'status_code': resp.status_code,
|
||||||
|
'err_text': resp.text,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _create_resource_class(self, name):
|
||||||
|
"""Calls the placement API to create a new resource class.
|
||||||
|
|
||||||
|
:param name: String name of the resource class to create.
|
||||||
|
|
||||||
|
:returns: None on successful creation.
|
||||||
|
:raises: `exception.InvalidResourceClass` upon error.
|
||||||
|
"""
|
||||||
|
url = "/resource_classes"
|
||||||
|
payload = {
|
||||||
|
'name': name,
|
||||||
|
}
|
||||||
|
resp = self.post(url, payload, version="1.2")
|
||||||
|
if 200 <= resp.status_code < 300:
|
||||||
|
msg = ("Created resource class record via placement API "
|
||||||
|
"for resource class %s.")
|
||||||
|
LOG.info(msg, name)
|
||||||
|
elif resp.status_code == 409:
|
||||||
|
# Another thread concurrently created a resource class with the
|
||||||
|
# same name. Log a warning and then just return
|
||||||
|
msg = ("Another thread already created a resource class "
|
||||||
|
"with the name %s. Returning.")
|
||||||
|
LOG.info(msg, name)
|
||||||
|
else:
|
||||||
|
msg = ("Failed to create resource class %(resource_class)s in "
|
||||||
|
"placement API. Got %(status_code)d: %(err_text)s.")
|
||||||
|
args = {
|
||||||
|
'resource_class': name,
|
||||||
|
'status_code': resp.status_code,
|
||||||
|
'err_text': resp.text,
|
||||||
|
}
|
||||||
|
LOG.error(msg, args)
|
||||||
|
raise exception.InvalidResourceClass(resource_class=name)
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def put_allocations(self, rp_uuid, consumer_uuid, alloc_data):
|
||||||
|
"""Creates allocation records for the supplied instance UUID against
|
||||||
|
the supplied resource provider.
|
||||||
|
|
||||||
|
:note Currently we only allocate against a single resource provider.
|
||||||
|
Once shared storage and things like NUMA allocations are a
|
||||||
|
reality, this will change to allocate against multiple providers.
|
||||||
|
|
||||||
|
:param rp_uuid: The UUID of the resource provider to allocate against.
|
||||||
|
:param consumer_uuid: The instance's UUID.
|
||||||
|
:param alloc_data: Dict, keyed by resource class, of amounts to
|
||||||
|
consume.
|
||||||
|
:returns: True if the allocations were created, False otherwise.
|
||||||
|
"""
|
||||||
|
payload = {
|
||||||
|
'allocations': [
|
||||||
|
{
|
||||||
|
'resource_provider': {
|
||||||
|
'uuid': rp_uuid,
|
||||||
|
},
|
||||||
|
'resources': alloc_data,
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}
|
||||||
|
url = '/allocations/%s' % consumer_uuid
|
||||||
|
r = self.put(url, payload)
|
||||||
|
if r.status_code != 204:
|
||||||
|
LOG.warning(
|
||||||
|
'Unable to submit allocation for instance '
|
||||||
|
'%(uuid)s (%(code)i %(text)s)',
|
||||||
|
{'uuid': consumer_uuid,
|
||||||
|
'code': r.status_code,
|
||||||
|
'text': r.text})
|
||||||
|
return r.status_code == 204
|
||||||
|
|
||||||
|
@safe_connect
|
||||||
|
def get_allocations_for_resource_provider(self, rp_uuid):
|
||||||
|
url = '/resource_providers/%s/allocations' % rp_uuid
|
||||||
|
resp = self.get(url)
|
||||||
|
if not resp:
|
||||||
|
return {}
|
||||||
|
else:
|
||||||
|
return resp.json()['allocations']
|
69
mogan/scheduler/utils.py
Normal file
69
mogan/scheduler/utils.py
Normal file
@ -0,0 +1,69 @@
|
|||||||
|
# Copyright 2017 Huawei Technologies Co.,LTD.
|
||||||
|
# All Rights Reserved.
|
||||||
|
#
|
||||||
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||||
|
# not use this file except in compliance with the License. You may obtain
|
||||||
|
# a copy of the License at
|
||||||
|
#
|
||||||
|
# http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
#
|
||||||
|
# Unless required by applicable law or agreed to in writing, software
|
||||||
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||||
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||||
|
# License for the specific language governing permissions and limitations
|
||||||
|
# under the License.
|
||||||
|
|
||||||
|
"""Utility methods for scheduling."""
|
||||||
|
|
||||||
|
import functools
|
||||||
|
|
||||||
|
from oslo_config import cfg
|
||||||
|
from oslo_log import log as logging
|
||||||
|
import oslo_messaging as messaging
|
||||||
|
|
||||||
|
CONF = cfg.CONF
|
||||||
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
ATTEMPTS = 10
|
||||||
|
|
||||||
|
|
||||||
|
def retry_on_timeout(retries=1):
|
||||||
|
"""Retry the call in case a MessagingTimeout is raised.
|
||||||
|
|
||||||
|
A decorator for retrying calls when a service dies mid-request.
|
||||||
|
|
||||||
|
:param retries: Number of retries
|
||||||
|
:returns: Decorator
|
||||||
|
"""
|
||||||
|
|
||||||
|
def outer(func):
|
||||||
|
@functools.wraps(func)
|
||||||
|
def wrapped(*args, **kwargs):
|
||||||
|
attempt = 0
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
return func(*args, **kwargs)
|
||||||
|
except messaging.MessagingTimeout:
|
||||||
|
attempt += 1
|
||||||
|
if attempt <= retries:
|
||||||
|
LOG.warning(
|
||||||
|
"Retrying %(name)s after a MessagingTimeout, "
|
||||||
|
"attempt %(attempt)s of %(retries)s.",
|
||||||
|
{'attempt': attempt, 'retries': retries,
|
||||||
|
'name': func.__name__})
|
||||||
|
else:
|
||||||
|
raise
|
||||||
|
|
||||||
|
return wrapped
|
||||||
|
|
||||||
|
return outer
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_resource_class_name(resource_class):
|
||||||
|
upper = resource_class.upper()
|
||||||
|
if not resource_class.startswith('CUSTOM_'):
|
||||||
|
return 'CUSTOM_' + upper
|
||||||
|
else:
|
||||||
|
return upper
|
||||||
|
|
||||||
|
retry_select_destinations = retry_on_timeout(ATTEMPTS - 1)
|
Loading…
Reference in New Issue
Block a user