From ee3c64d77f4d0061998439da3ec861fa9c5a0205 Mon Sep 17 00:00:00 2001 From: liusheng Date: Mon, 19 Jun 2017 14:08:26 +0800 Subject: [PATCH] Reporting nodes resource to placement service This change importing placement service, report nodes resources to placement and use it as unique nodes resource provider service for Mogan's scheduler. Co-Authored-By: Zhenguo Niu Change-Id: I1cb91a10cfc8139687ad44167f33f45fd3c08e5c Partially Implements: bp track-resources-using-placement --- devstack/plugin.sh | 13 + mogan/common/exception.py | 15 + mogan/conf/__init__.py | 2 + mogan/conf/placement.py | 53 ++ mogan/engine/baremetal/driver.py | 15 + mogan/engine/baremetal/ironic/driver.py | 48 ++ mogan/engine/manager.py | 25 + mogan/scheduler/client/__init__.py | 52 ++ mogan/scheduler/client/report.py | 749 ++++++++++++++++++++++++ mogan/scheduler/utils.py | 69 +++ 10 files changed, 1041 insertions(+) create mode 100644 mogan/conf/placement.py create mode 100644 mogan/scheduler/client/__init__.py create mode 100644 mogan/scheduler/client/report.py create mode 100644 mogan/scheduler/utils.py diff --git a/devstack/plugin.sh b/devstack/plugin.sh index f233ed2b..0d6a178b 100644 --- a/devstack/plugin.sh +++ b/devstack/plugin.sh @@ -84,6 +84,16 @@ function configure_mogan { iniset ${MOGAN_CONF_FILE} ironic auth_type "password" iniset ${MOGAN_CONF_FILE} ironic api_endpoint "${KEYSTONE_AUTH_PROTOCOL}://${SERVICE_HOST}:${IRONIC_SERVICE_PORT}" + # Setup placement section + iniset ${MOGAN_CONF_FILE} placement project_domain_name ${SERVICE_DOMAIN_NAME} + iniset ${MOGAN_CONF_FILE} placement project_name ${SERVICE_PROJECT_NAME} + iniset ${MOGAN_CONF_FILE} placement user_domain_name ${SERVICE_DOMAIN_NAME} + iniset ${MOGAN_CONF_FILE} placement username "placement" + iniset ${MOGAN_CONF_FILE} placement password ${SERVICE_PASSWORD} + iniset ${MOGAN_CONF_FILE} placement auth_url ${KEYSTONE_AUTH_URI} + iniset ${MOGAN_CONF_FILE} placement auth_type "password" + iniset ${MOGAN_CONF_FILE} placement api_endpoint "${KEYSTONE_AUTH_PROTOCOL}://${SERVICE_HOST}:${IRONIC_SERVICE_PORT}" + # Setup neutron section iniset ${MOGAN_CONF_FILE} neutron url "${NEUTRON_SERVICE_PROTOCOL}://${SERVICE_HOST}:${NEUTRON_SERVICE_PORT}" @@ -187,6 +197,9 @@ function update_ironic_node_resource_class { if is_service_enabled mogan; then + if ! is_service_enabled placement; then + die "placement service is required for Mogan" + fi if is_service_enabled tempest; then iniset $TEMPEST_CONFIG compute fixed_network_name $PRIVATE_NETWORK_NAME fi diff --git a/mogan/common/exception.py b/mogan/common/exception.py index 319ad5f7..f35f403d 100644 --- a/mogan/common/exception.py +++ b/mogan/common/exception.py @@ -420,4 +420,19 @@ class KeypairNotFound(NotFound): class InvalidKeypair(Invalid): _msg_fmt = _("Keypair data is invalid: %(reason)s") + +class InvalidInventory(Invalid): + _msg_fmt = _("Inventory for '%(resource_class)s' on " + "resource provider '%(resource_provider)s' invalid.") + + +class InvalidResourceClass(Invalid): + _msg_fmt = _("Resource class '%(resource_class)s' invalid.") + + +class InventoryInUse(InvalidInventory): + _msg_fmt = _("Inventory for '%(resource_classes)s' on " + "resource provider '%(resource_provider)s' in use.") + + ObjectActionError = obj_exc.ObjectActionError diff --git a/mogan/conf/__init__.py b/mogan/conf/__init__.py index e88a9e3b..82c7b756 100644 --- a/mogan/conf/__init__.py +++ b/mogan/conf/__init__.py @@ -25,6 +25,7 @@ from mogan.conf import glance from mogan.conf import ironic from mogan.conf import keystone from mogan.conf import neutron +from mogan.conf import placement from mogan.conf import quota from mogan.conf import scheduler from mogan.conf import shellinabox @@ -44,3 +45,4 @@ quota.register_opts(CONF) scheduler.register_opts(CONF) shellinabox.register_opts(CONF) cache.register_opts(CONF) +placement.register_opts(CONF) diff --git a/mogan/conf/placement.py b/mogan/conf/placement.py new file mode 100644 index 00000000..f68ec6c7 --- /dev/null +++ b/mogan/conf/placement.py @@ -0,0 +1,53 @@ +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from keystoneauth1 import loading as ks_loading +from oslo_config import cfg + +placement_group = cfg.OptGroup( + 'placement', + title='Placement Service Options', + help="Configuration options for connecting to the placement API service") + +placement_opts = [ + cfg.StrOpt('os_region_name', + help=""" +Region name of this node. This is used when picking the URL in the service +catalog. + +Possible values: + +* Any string representing region name +"""), + cfg.StrOpt('os_interface', + help=""" +Endpoint interface for this node. This is used when picking the URL in the +service catalog. +""") +] + + +def register_opts(conf): + conf.register_group(placement_group) + conf.register_opts(placement_opts, group=placement_group) + ks_loading.register_session_conf_options(conf, placement_group.name) + ks_loading.register_auth_conf_options(conf, placement_group.name) + + +def list_opts(): + return { + placement_group.name: ( + placement_opts + + ks_loading.get_session_conf_options() + + ks_loading.get_auth_common_conf_options() + + ks_loading.get_auth_plugin_conf_options('v3password')) + } diff --git a/mogan/engine/baremetal/driver.py b/mogan/engine/baremetal/driver.py index e960796e..8024cafc 100644 --- a/mogan/engine/baremetal/driver.py +++ b/mogan/engine/baremetal/driver.py @@ -121,6 +121,21 @@ class BaseEngineDriver(object): """ raise NotImplementedError() + def get_available_nodes(self): + """Retrieve all nodes information. + + :returns: Dictionary describing nodes + """ + raise NotImplementedError() + + @staticmethod + def get_node_inventory(node): + """Get the inventory of a node. + + :param node: node to get its inventory data. + """ + raise NotImplementedError() + def load_engine_driver(engine_driver): """Load a engine driver module. diff --git a/mogan/engine/baremetal/ironic/driver.py b/mogan/engine/baremetal/ironic/driver.py index 8bedd18f..58848d13 100644 --- a/mogan/engine/baremetal/ironic/driver.py +++ b/mogan/engine/baremetal/ironic/driver.py @@ -653,3 +653,51 @@ class IronicDriver(base_driver.BaseEngineDriver): else: LOG.debug('Console is disabled for node %s', node_uuid) raise exception.ConsoleNotAvailable() + + def get_available_nodes(self): + """Helper function to return the list of all nodes. + + If unable to connect ironic server, an empty list is returned. + + :returns: a list of normal nodes from ironic + + """ + normal_nodes = [] + params = { + 'detail': True, + 'limit': 0, + 'maintenance': False + } + try: + node_list = self.ironicclient.call("node.list", **params) + except client_e.ClientException as e: + LOG.exception("Could not get nodes from ironic. Reason: " + "%(detail)s", {'detail': e.message}) + return [] + + bad_power_states = [ironic_states.ERROR, ironic_states.NOSTATE] + # keep NOSTATE around for compatibility + good_provision_states = [ + ironic_states.AVAILABLE, ironic_states.NOSTATE] + for node_obj in node_list: + if ((node_obj.resource_class is None) or + node_obj.power_state in bad_power_states or + (node_obj.provision_state in good_provision_states and + node_obj.instance_uuid is not None)): + continue + normal_nodes.append(node_obj) + return normal_nodes + + @staticmethod + def get_node_inventory(node): + """Get the inventory of a node. + + :param node: server to get its inventory data. + """ + return {'total': 1, + 'reserved': 0, + 'min_unit': 1, + 'max_unit': 1, + 'step_size': 1, + 'allocation_ratio': 1.0, + } diff --git a/mogan/engine/manager.py b/mogan/engine/manager.py index 491b1f5b..8b860afc 100644 --- a/mogan/engine/manager.py +++ b/mogan/engine/manager.py @@ -37,6 +37,8 @@ from mogan.notifications import base as notifications from mogan import objects from mogan.objects import fields from mogan.objects import quota +from mogan.scheduler import client +from mogan.scheduler import utils as sched_utils LOG = log.getLogger(__name__) @@ -77,6 +79,7 @@ class EngineManager(base_manager.BaseEngineManager): super(EngineManager, self).__init__(*args, **kwargs) self.quota = quota.Quota() self.quota.register_resource(objects.quota.ServerResource()) + self.scheduler_client = client.SchedulerClient() def _get_compute_port(self, context, port_uuid): """Gets compute port by the uuid.""" @@ -160,6 +163,7 @@ class EngineManager(base_manager.BaseEngineManager): for uuid, node in nodes.items(): if node.get('resource_class') is None: continue + # initialize the compute node object, creating it # if it does not already exist. self._init_compute_node(context, node) @@ -171,6 +175,27 @@ class EngineManager(base_manager.BaseEngineManager): {'id': cn.node_uuid}) cn.destroy() + all_nodes = self.driver.get_available_nodes() + + all_rps = self.scheduler_client.reportclient\ + .get_filtered_resource_providers({}) + node_uuids = [node.uuid for node in all_nodes] + + # Clean orphan resource providers in placement + for rp in all_rps: + if rp['uuid'] not in node_uuids: + self.scheduler_client.reportclient.delete_resource_provider( + rp['uuid']) + + for node in all_nodes: + resource_class = sched_utils.ensure_resource_class_name( + node.resource_class) + inventory = self.driver.get_node_inventory(node) + inventory_data = {resource_class: inventory} + self.scheduler_client.set_inventory_for_provider( + node.uuid, node.name, inventory_data, + resource_class) + @periodic_task.periodic_task(spacing=CONF.engine.sync_power_state_interval, run_immediately=True) def _sync_power_states(self, context): diff --git a/mogan/scheduler/client/__init__.py b/mogan/scheduler/client/__init__.py new file mode 100644 index 00000000..54b7e00d --- /dev/null +++ b/mogan/scheduler/client/__init__.py @@ -0,0 +1,52 @@ +# Copyright (c) 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools + +from oslo_utils import importutils + + +class LazyLoader(object): + + def __init__(self, klass, *args, **kwargs): + self.klass = klass + self.args = args + self.kwargs = kwargs + self.instance = None + + def __getattr__(self, name): + return functools.partial(self.__run_method, name) + + def __run_method(self, __name, *args, **kwargs): + if self.instance is None: + self.instance = self.klass(*self.args, **self.kwargs) + return getattr(self.instance, __name)(*args, **kwargs) + + +class SchedulerClient(object): + """Client library for placing calls to the scheduler.""" + + def __init__(self): + self.reportclient = LazyLoader(importutils.import_class( + 'mogan.scheduler.client.report.SchedulerReportClient')) + + def set_inventory_for_provider(self, rp_uuid, rp_name, inv_data, + res_class): + self.reportclient.set_inventory_for_provider( + rp_uuid, + rp_name, + inv_data, + res_class + ) diff --git a/mogan/scheduler/client/report.py b/mogan/scheduler/client/report.py new file mode 100644 index 00000000..d462e5df --- /dev/null +++ b/mogan/scheduler/client/report.py @@ -0,0 +1,749 @@ +# Copyright (c) 2014 Red Hat, Inc. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import functools +import re +import time + +from keystoneauth1 import exceptions as ks_exc +from keystoneauth1 import loading as keystone +from oslo_config import cfg +from oslo_log import log as logging +from six.moves.urllib import parse + +from mogan.common import exception + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +_RE_INV_IN_USE = re.compile("Inventory for (.+) on resource provider " + "(.+) in use") +WARN_EVERY = 10 + + +def warn_limit(self, msg): + if self._warn_count: + self._warn_count -= 1 + else: + self._warn_count = WARN_EVERY + LOG.warning(msg) + + +def safe_connect(f): + @functools.wraps(f) + def wrapper(self, *a, **k): + try: + return f(self, *a, **k) + except ks_exc.EndpointNotFound: + warn_limit( + self, + 'The placement API endpoint not found.') + except ks_exc.MissingAuthPlugin: + warn_limit( + self, + 'No authentication information found for placement API.') + except ks_exc.Unauthorized: + warn_limit( + self, + ('Placement service credentials do not work.')) + except ks_exc.DiscoveryFailure: + # TODO(_gryf): Looks like DiscoveryFailure is not the only missing + # exception here. In Pike we should take care about keystoneauth1 + # failures handling globally. + warn_limit(self, + 'Discovering suitable URL for placement API ' + 'failed.') + except ks_exc.ConnectFailure: + msg = 'Placement API service is not responding.' + LOG.warning(msg) + + return wrapper + + +def _extract_inventory_in_use(body): + """Given an HTTP response body, extract the resource classes that were + still in use when we tried to delete inventory. + + :returns: String of resource classes or None if there was no InventoryInUse + error in the response body. + """ + match = _RE_INV_IN_USE.search(body) + if match: + return match.group(1) + return None + + +def get_placement_request_id(response): + if response is not None: + return response.headers.get( + 'openstack-request-id', + response.headers.get('x-openstack-request-id')) + + +class SchedulerReportClient(object): + """Client class for updating the scheduler.""" + + def __init__(self): + # A dict, keyed by the resource provider UUID, of ResourceProvider + # objects that will have their inventories and allocations tracked by + # the placement API for the compute host + self._resource_providers = {} + # A dict, keyed by resource provider UUID, of sets of aggregate UUIDs + # the provider is associated with + self._provider_aggregate_map = {} + auth_plugin = keystone.load_auth_from_conf_options( + CONF, 'placement') + self._client = keystone.load_session_from_conf_options( + CONF, 'placement', auth=auth_plugin) + # NOTE(danms): Keep track of how naggy we've been + self._warn_count = 0 + self.ks_filter = {'service_type': 'placement', + 'region_name': CONF.placement.os_region_name, + 'interface': CONF.placement.os_interface} + + def get(self, url, version=None): + kwargs = {} + if version is not None: + # TODO(mriedem): Perform some version discovery at some point. + kwargs = { + 'headers': { + 'OpenStack-API-Version': 'placement %s' % version + }, + } + return self._client.get( + url, + endpoint_filter=self.ks_filter, raise_exc=False, **kwargs) + + def post(self, url, data, version=None): + # NOTE(sdague): using json= instead of data= sets the + # media type to application/json for us. Placement API is + # more sensitive to this than other APIs in the OpenStack + # ecosystem. + kwargs = {} + if version is not None: + # TODO(mriedem): Perform some version discovery at some point. + kwargs = { + 'headers': { + 'OpenStack-API-Version': 'placement %s' % version + }, + } + return self._client.post( + url, json=data, + endpoint_filter=self.ks_filter, raise_exc=False, **kwargs) + + def put(self, url, data, version=None): + # NOTE(sdague): using json= instead of data= sets the + # media type to application/json for us. Placement API is + # more sensitive to this than other APIs in the OpenStack + # ecosystem. + kwargs = {} + if version is not None: + # TODO(mriedem): Perform some version discovery at some point. + kwargs = { + 'headers': { + 'OpenStack-API-Version': 'placement %s' % version + }, + } + if data: + kwargs['json'] = data + return self._client.put( + url, endpoint_filter=self.ks_filter, raise_exc=False, + **kwargs) + + def delete(self, url): + return self._client.delete( + url, + endpoint_filter=self.ks_filter, raise_exc=False) + + # TODO(sbauza): Change that poor interface into passing a rich versioned + # object that would provide the ResourceProvider requirements. + @safe_connect + def get_filtered_resource_providers(self, filters): + """Returns a list of ResourceProviders matching the requirements + expressed by the filters argument, which can include a dict named + 'resources' where amounts are keyed by resource class names. + + eg. filters = {'resources': {'CUSTOM_BAREMETAL_GOLD': 1}} + """ + resources = filters.pop("resources", None) + if resources: + resource_query = ",".join(sorted("%s:%s" % (rc, amount) + for (rc, amount) in resources.items())) + filters['resources'] = resource_query + resp = self.get("/resource_providers?%s" % parse.urlencode(filters), + version='1.4') + if resp.status_code == 200: + data = resp.json() + return data.get('resource_providers', []) + else: + msg = ("Failed to retrieve filtered list of resource providers " + "from placement API for filters %(filters)s. " + "Got %(status_code)d: %(err_text)s.") + args = { + 'filters': filters, + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + return None + + @safe_connect + def _get_provider_aggregates(self, rp_uuid): + """Queries the placement API for a resource provider's aggregates. + Returns a set() of aggregate UUIDs or None if no such resource provider + was found or there was an error communicating with the placement API. + + :param rp_uuid: UUID of the resource provider to grab aggregates for. + """ + resp = self.get("/resource_providers/%s/aggregates" % rp_uuid, + version='1.1') + if resp.status_code == 200: + data = resp.json() + return set(data['aggregates']) + + placement_req_id = get_placement_request_id(resp) + if resp.status_code == 404: + msg = "[%(placement_req_id)s] Tried to get a provider's " + "aggregates; however the provider %(uuid)s does not " + "exist." + args = { + 'uuid': rp_uuid, + 'placement_req_id': placement_req_id, + } + LOG.warning(msg, args) + else: + msg = ("[%(placement_req_id)s] Failed to retrieve aggregates " + "from placement API for resource provider with UUID " + "%(uuid)s. Got %(status_code)d: %(err_text)s.") + args = { + 'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + + @safe_connect + def _get_resource_provider(self, uuid): + """Queries the placement API for a resource provider record with the + supplied UUID. + + Returns a dict of resource provider information if found or None if no + such resource provider could be found. + + :param uuid: UUID identifier for the resource provider to look up + """ + resp = self.get("/resource_providers/%s" % uuid) + if resp.status_code == 200: + data = resp.json() + return data + elif resp.status_code == 404: + return None + else: + placement_req_id = get_placement_request_id(resp) + msg = ("[%(placement_req_id)s] Failed to retrieve resource " + "provider record from placement API for UUID %(uuid)s. " + "Got %(status_code)d: %(err_text)s.") + args = { + 'uuid': uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + 'placement_req_id': placement_req_id, + } + LOG.error(msg, args) + + @safe_connect + def _create_resource_provider(self, uuid, name): + """Calls the placement API to create a new resource provider record. + + Returns a dict of resource provider information object representing + the newly-created resource provider. + + :param uuid: UUID of the new resource provider + :param name: Name of the resource provider + """ + url = "/resource_providers" + payload = { + 'uuid': uuid, + 'name': name, + } + resp = self.post(url, payload) + placement_req_id = get_placement_request_id(resp) + if resp.status_code == 201: + msg = ("[%(placement_req_id)s] Created resource provider " + "record via placement API for resource provider with " + "UUID %(uuid)s and name %(name)s.") + args = { + 'uuid': uuid, + 'name': name, + 'placement_req_id': placement_req_id, + } + LOG.info(msg, args) + return dict( + uuid=uuid, + name=name, + generation=0, + ) + elif resp.status_code == 409: + # Another thread concurrently created a resource provider with the + # same UUID. Log a warning and then just return the resource + # provider object from _get_resource_provider() + msg = ("[%(placement_req_id)s] Another thread already created " + "a resource provider with the UUID %(uuid)s. Grabbing " + "that record from the placement API.") + args = { + 'uuid': uuid, + 'placement_req_id': placement_req_id, + } + LOG.info(msg, args) + return self._get_resource_provider(uuid) + else: + msg = ("[%(placement_req_id)s] Failed to create resource " + "provider record in placement API for UUID %(uuid)s. " + "Got %(status_code)d: %(err_text)s.") + args = { + 'uuid': uuid, + 'status_code': resp.status_code, + 'err_text': resp.text, + 'placement_req_id': placement_req_id, + } + LOG.error(msg, args) + + def _ensure_resource_provider(self, uuid, name=None): + """Ensures that the placement API has a record of a resource provider + with the supplied UUID. If not, creates the resource provider record in + the placement API for the supplied UUID, optionally passing in a name + for the resource provider. + + The found or created resource provider object is returned from this + method. If the resource provider object for the supplied uuid was not + found and the resource provider record could not be created in the + placement API, we return None. + + :param uuid: UUID identifier for the resource provider to ensure exists + :param name: Optional name for the resource provider if the record + does not exist. If empty, the name is set to the UUID + value + """ + if uuid in self._resource_providers: + # NOTE(jaypipes): This isn't optimal to check if aggregate + # associations have changed each time we call + # _ensure_resource_provider() and get a hit on the local cache of + # provider objects, however the alternative is to force operators + # to restart all their nova-compute workers every time they add or + # change an aggregate. We might optionally want to add some sort of + # cache refresh delay or interval as an optimization? + msg = "Refreshing aggregate associations for resource provider %s" + LOG.debug(msg, uuid) + aggs = self._get_provider_aggregates(uuid) + self._provider_aggregate_map[uuid] = aggs + return self._resource_providers[uuid] + + rp = self._get_resource_provider(uuid) + if rp is None: + name = name or uuid + rp = self._create_resource_provider(uuid, name) + if rp is None: + return + msg = "Grabbing aggregate associations for resource provider %s" + LOG.debug(msg, uuid) + aggs = self._get_provider_aggregates(uuid) + self._resource_providers[uuid] = rp + self._provider_aggregate_map[uuid] = aggs + return rp + + def _get_inventory(self, rp_uuid): + url = '/resource_providers/%s/inventories' % rp_uuid + result = self.get(url) + if not result: + return {'inventories': {}} + return result.json() + + def _get_inventory_and_update_provider_generation(self, rp_uuid): + """Helper method that retrieves the current inventory for the supplied + resource provider according to the placement API. If the cached + generation of the resource provider is not the same as the generation + returned from the placement API, we update the cached generation. + """ + curr = self._get_inventory(rp_uuid) + + # Update our generation immediately, if possible. Even if there + # are no inventories we should always have a generation but let's + # be careful. + server_gen = curr.get('resource_provider_generation') + if server_gen: + my_rp = self._resource_providers[rp_uuid] + if server_gen != my_rp['generation']: + LOG.debug('Updating our resource provider generation ' + 'from %(old)i to %(new)i', + {'old': my_rp['generation'], + 'new': server_gen}) + my_rp['generation'] = server_gen + return curr + + def _update_inventory_attempt(self, rp_uuid, inv_data): + """Update the inventory for this resource provider if needed. + + :param rp_uuid: The resource provider UUID for the operation + :param inv_data: The new inventory for the resource provider + :returns: True if the inventory was updated (or did not need to be), + False otherwise. + """ + curr = self._get_inventory_and_update_provider_generation(rp_uuid) + + # Check to see if we need to update placement's view + if inv_data == curr.get('inventories', {}): + return True + + cur_rp_gen = self._resource_providers[rp_uuid]['generation'] + payload = { + 'resource_provider_generation': cur_rp_gen, + 'inventories': inv_data, + } + url = '/resource_providers/%s/inventories' % rp_uuid + result = self.put(url, payload) + if result.status_code == 409: + LOG.info('[%(placement_req_id)s] Inventory update conflict ' + 'for %(resource_provider_uuid)s with generation ID ' + '%(generation_id)s', + {'placement_req_id': get_placement_request_id(result), + 'resource_provider_uuid': rp_uuid, + 'generation_id': cur_rp_gen}) + # NOTE(jaypipes): There may be cases when we try to set a + # provider's inventory that results in attempting to delete an + # inventory record for a resource class that has an active + # allocation. We need to catch this particular case and raise an + # exception here instead of returning False, since we should not + # re-try the operation in this case. + # + # A use case for where this can occur is the following: + # + # 1) Provider created for each Ironic baremetal node in Newton + # 2) Inventory records for baremetal node created for VCPU, + # MEMORY_MB and DISK_GB + # 3) A Nova instance consumes the baremetal node and allocation + # records are created for VCPU, MEMORY_MB and DISK_GB matching + # the total amount of those resource on the baremetal node. + # 3) Upgrade to Ocata and now resource tracker wants to set the + # provider's inventory to a single record of resource class + # CUSTOM_IRON_SILVER (or whatever the Ironic node's + # "resource_class" attribute is) + # 4) Scheduler report client sends the inventory list containing a + # single CUSTOM_IRON_SILVER record and placement service + # attempts to delete the inventory records for VCPU, MEMORY_MB + # and DISK_GB. An exception is raised from the placement service + # because allocation records exist for those resource classes, + # and a 409 Conflict is returned to the compute node. We need to + # trigger a delete of the old allocation records and then set + # the new inventory, and then set the allocation record to the + # new CUSTOM_IRON_SILVER record. + match = _RE_INV_IN_USE.search(result.text) + if match: + rc = match.group(1) + raise exception.InventoryInUse( + resource_classes=rc, + resource_provider=rp_uuid, + ) + + # Invalidate our cache and re-fetch the resource provider + # to be sure to get the latest generation. + del self._resource_providers[rp_uuid] + # NOTE(jaypipes): We don't need to pass a name parameter to + # _ensure_resource_provider() because we know the resource provider + # record already exists. We're just reloading the record here. + self._ensure_resource_provider(rp_uuid) + return False + elif not result: + placement_req_id = get_placement_request_id(result) + LOG.warning(('[%(placement_req_id)s] Failed to update ' + 'inventory for resource provider ' + '%(uuid)s: %(status)i %(text)s'), + {'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'status': result.status_code, + 'text': result.text}) + # log the body at debug level + LOG.debug('[%(placement_req_id)s] Failed inventory update request ' + 'for resource provider %(uuid)s with body: %(payload)s', + {'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'payload': payload}) + return False + + if result.status_code != 200: + placement_req_id = get_placement_request_id(result) + LOG.info( + ('[%(placement_req_id)s] Received unexpected response code ' + '%(code)i while trying to update inventory for resource ' + 'provider %(uuid)s: %(text)s'), + {'placement_req_id': placement_req_id, + 'uuid': rp_uuid, + 'code': result.status_code, + 'text': result.text}) + return False + + # Update our view of the generation for next time + updated_inventories_result = result.json() + new_gen = updated_inventories_result['resource_provider_generation'] + self._resource_providers[rp_uuid]['generation'] = new_gen + LOG.debug('Updated inventory for %s at generation %i', + rp_uuid, new_gen) + return True + + @safe_connect + def _update_inventory(self, rp_uuid, inv_data): + for attempt in (1, 2, 3): + if rp_uuid not in self._resource_providers: + # NOTE(danms): Either we failed to fetch/create the RP + # on our first attempt, or a previous attempt had to + # invalidate the cache, and we were unable to refresh + # it. Bail and try again next time. + LOG.warning('Unable to refresh my resource provider record') + return False + if self._update_inventory_attempt(rp_uuid, inv_data): + return True + time.sleep(1) + return False + + @safe_connect + def _delete_inventory(self, rp_uuid): + """Deletes all inventory records for a resource provider with the + supplied UUID. + """ + curr = self._get_inventory_and_update_provider_generation(rp_uuid) + + # Check to see if we need to update placement's view + if not curr.get('inventories', {}): + msg = "No inventory to delete from resource provider %s." + LOG.debug(msg, rp_uuid) + return + + msg = ("Compute node %s reported no inventory but previous " + "inventory was detected. Deleting existing inventory " + "records.") + LOG.info(msg, rp_uuid) + + url = '/resource_providers/%s/inventories' % rp_uuid + cur_rp_gen = self._resource_providers[rp_uuid]['generation'] + payload = { + 'resource_provider_generation': cur_rp_gen, + 'inventories': {}, + } + r = self.put(url, payload) + placement_req_id = get_placement_request_id(r) + if r.status_code == 200: + # Update our view of the generation for next time + updated_inv = r.json() + new_gen = updated_inv['resource_provider_generation'] + + self._resource_providers[rp_uuid]['generation'] = new_gen + msg_args = { + 'rp_uuid': rp_uuid, + 'generation': new_gen, + 'placement_req_id': placement_req_id, + } + LOG.info(('[%(placement_req_id)s] Deleted all inventory for ' + 'resource provider %(rp_uuid)s at generation ' + '%(generation)i'), + msg_args) + return + elif r.status_code == 409: + rc_str = _extract_inventory_in_use(r.text) + if rc_str is not None: + msg = ("[%(placement_req_id)s] We cannot delete inventory " + "%(rc_str)s for resource provider %(rp_uuid)s " + "because the inventory is in use.") + msg_args = { + 'rp_uuid': rp_uuid, + 'rc_str': rc_str, + 'placement_req_id': placement_req_id, + } + LOG.warning(msg, msg_args) + return + + msg = ("[%(placement_req_id)s] Failed to delete inventory for " + "resource provider %(rp_uuid)s. Got error response: %(err)s") + msg_args = { + 'rp_uuid': rp_uuid, + 'err': r.text, + 'placement_req_id': placement_req_id, + } + LOG.error(msg, msg_args) + + def set_inventory_for_provider(self, rp_uuid, rp_name, inv_data, + resource_class): + """Given the UUID of a provider, set the inventory records for the + provider to the supplied dict of resources. + + :param rp_uuid: UUID of the resource provider to set inventory for + :param rp_name: Name of the resource provider in case we need to create + a record for it in the placement API + :param inv_data: Dict, keyed by resource class name, of inventory data + to set against the provider + + :raises: exc.InvalidResourceClass if a supplied custom resource class + name does not meet the placement API's format requirements. + """ + self._ensure_resource_provider(rp_uuid, rp_name) + + # Auto-create custom resource classes coming from a virt driver + self._ensure_resource_class(resource_class) + + if inv_data: + self._update_inventory(rp_uuid, inv_data) + else: + self._delete_inventory(rp_uuid) + + @safe_connect + def _ensure_resource_class(self, name): + """Make sure a custom resource class exists. + + First attempt to PUT the resource class using microversion 1.7. If + this results in a 406, fail over to a GET and POST with version 1.2. + + Returns the name of the resource class if it was successfully + created or already exists. Otherwise None. + + :param name: String name of the resource class to check/create. + :raises: `exception.InvalidResourceClass` upon error. + """ + # no payload on the put request + response = self.put("/resource_classes/%s" % name, None, version="1.7") + if 200 <= response.status_code < 300: + return name + elif response.status_code == 406: + # microversion 1.7 not available so try the earlier way + # TODO(cdent): When we're happy that all placement + # servers support microversion 1.7 we can remove this + # call and the associated code. + LOG.debug('Falling back to placement API microversion 1.2 ' + 'for resource class management.') + return self._get_or_create_resource_class(name) + else: + msg = ("Failed to ensure resource class record with " + "placement API for resource class %(rc_name)s. " + "Got %(status_code)d: %(err_text)s.") + args = { + 'rc_name': name, + 'status_code': response.status_code, + 'err_text': response.text, + } + LOG.error(msg, args) + raise exception.InvalidResourceClass(resource_class=name) + + def _get_or_create_resource_class(self, name): + """Queries the placement API for a resource class supplied resource + class string name. If the resource class does not exist, creates it. + + Returns the resource class name if exists or was created, else None. + + :param name: String name of the resource class to check/create. + """ + resp = self.get("/resource_classes/%s" % name, version="1.2") + if 200 <= resp.status_code < 300: + return name + elif resp.status_code == 404: + self._create_resource_class(name) + return name + else: + msg = ("Failed to retrieve resource class record from " + "placement API for resource class %(rc_name)s. " + "Got %(status_code)d: %(err_text)s.") + args = { + 'rc_name': name, + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + return None + + def _create_resource_class(self, name): + """Calls the placement API to create a new resource class. + + :param name: String name of the resource class to create. + + :returns: None on successful creation. + :raises: `exception.InvalidResourceClass` upon error. + """ + url = "/resource_classes" + payload = { + 'name': name, + } + resp = self.post(url, payload, version="1.2") + if 200 <= resp.status_code < 300: + msg = ("Created resource class record via placement API " + "for resource class %s.") + LOG.info(msg, name) + elif resp.status_code == 409: + # Another thread concurrently created a resource class with the + # same name. Log a warning and then just return + msg = ("Another thread already created a resource class " + "with the name %s. Returning.") + LOG.info(msg, name) + else: + msg = ("Failed to create resource class %(resource_class)s in " + "placement API. Got %(status_code)d: %(err_text)s.") + args = { + 'resource_class': name, + 'status_code': resp.status_code, + 'err_text': resp.text, + } + LOG.error(msg, args) + raise exception.InvalidResourceClass(resource_class=name) + + @safe_connect + def put_allocations(self, rp_uuid, consumer_uuid, alloc_data): + """Creates allocation records for the supplied instance UUID against + the supplied resource provider. + + :note Currently we only allocate against a single resource provider. + Once shared storage and things like NUMA allocations are a + reality, this will change to allocate against multiple providers. + + :param rp_uuid: The UUID of the resource provider to allocate against. + :param consumer_uuid: The instance's UUID. + :param alloc_data: Dict, keyed by resource class, of amounts to + consume. + :returns: True if the allocations were created, False otherwise. + """ + payload = { + 'allocations': [ + { + 'resource_provider': { + 'uuid': rp_uuid, + }, + 'resources': alloc_data, + }, + ], + } + url = '/allocations/%s' % consumer_uuid + r = self.put(url, payload) + if r.status_code != 204: + LOG.warning( + 'Unable to submit allocation for instance ' + '%(uuid)s (%(code)i %(text)s)', + {'uuid': consumer_uuid, + 'code': r.status_code, + 'text': r.text}) + return r.status_code == 204 + + @safe_connect + def get_allocations_for_resource_provider(self, rp_uuid): + url = '/resource_providers/%s/allocations' % rp_uuid + resp = self.get(url) + if not resp: + return {} + else: + return resp.json()['allocations'] diff --git a/mogan/scheduler/utils.py b/mogan/scheduler/utils.py new file mode 100644 index 00000000..efc82832 --- /dev/null +++ b/mogan/scheduler/utils.py @@ -0,0 +1,69 @@ +# Copyright 2017 Huawei Technologies Co.,LTD. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Utility methods for scheduling.""" + +import functools + +from oslo_config import cfg +from oslo_log import log as logging +import oslo_messaging as messaging + +CONF = cfg.CONF +LOG = logging.getLogger(__name__) + +ATTEMPTS = 10 + + +def retry_on_timeout(retries=1): + """Retry the call in case a MessagingTimeout is raised. + + A decorator for retrying calls when a service dies mid-request. + + :param retries: Number of retries + :returns: Decorator + """ + + def outer(func): + @functools.wraps(func) + def wrapped(*args, **kwargs): + attempt = 0 + while True: + try: + return func(*args, **kwargs) + except messaging.MessagingTimeout: + attempt += 1 + if attempt <= retries: + LOG.warning( + "Retrying %(name)s after a MessagingTimeout, " + "attempt %(attempt)s of %(retries)s.", + {'attempt': attempt, 'retries': retries, + 'name': func.__name__}) + else: + raise + + return wrapped + + return outer + + +def ensure_resource_class_name(resource_class): + upper = resource_class.upper() + if not resource_class.startswith('CUSTOM_'): + return 'CUSTOM_' + upper + else: + return upper + +retry_select_destinations = retry_on_timeout(ATTEMPTS - 1)