349 lines
12 KiB
Python
349 lines
12 KiB
Python
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""Utility methods for scheduling."""
|
|
|
|
import collections
|
|
import functools
|
|
import sys
|
|
|
|
from oslo_log import log as logging
|
|
import oslo_messaging as messaging
|
|
from oslo_serialization import jsonutils
|
|
|
|
from nova.compute import flavors
|
|
from nova.compute import utils as compute_utils
|
|
import nova.conf
|
|
from nova import exception
|
|
from nova.i18n import _, _LE, _LW
|
|
from nova import objects
|
|
from nova.objects import base as obj_base
|
|
from nova.objects import instance as obj_instance
|
|
from nova import rpc
|
|
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
CONF = nova.conf.CONF
|
|
|
|
GroupDetails = collections.namedtuple('GroupDetails', ['hosts', 'policies'])
|
|
|
|
|
|
def build_request_spec(ctxt, image, instances, instance_type=None):
|
|
"""Build a request_spec for the scheduler.
|
|
|
|
The request_spec assumes that all instances to be scheduled are the same
|
|
type.
|
|
"""
|
|
instance = instances[0]
|
|
if instance_type is None:
|
|
if isinstance(instance, obj_instance.Instance):
|
|
instance_type = instance.get_flavor()
|
|
else:
|
|
instance_type = flavors.extract_flavor(instance)
|
|
|
|
if isinstance(instance, obj_instance.Instance):
|
|
instance = obj_base.obj_to_primitive(instance)
|
|
# obj_to_primitive doesn't copy this enough, so be sure
|
|
# to detach our metadata blob because we modify it below.
|
|
instance['system_metadata'] = dict(instance.get('system_metadata', {}))
|
|
|
|
if isinstance(instance_type, objects.Flavor):
|
|
instance_type = obj_base.obj_to_primitive(instance_type)
|
|
# NOTE(danms): Replicate this old behavior because the
|
|
# scheduler RPC interface technically expects it to be
|
|
# there. Remove this when we bump the scheduler RPC API to
|
|
# v5.0
|
|
try:
|
|
flavors.save_flavor_info(instance.get('system_metadata', {}),
|
|
instance_type)
|
|
except KeyError:
|
|
# If the flavor isn't complete (which is legit with a
|
|
# flavor object, just don't put it in the request spec
|
|
pass
|
|
|
|
request_spec = {
|
|
'image': image or {},
|
|
'instance_properties': instance,
|
|
'instance_type': instance_type,
|
|
'num_instances': len(instances)}
|
|
return jsonutils.to_primitive(request_spec)
|
|
|
|
|
|
def set_vm_state_and_notify(context, instance_uuid, service, method, updates,
|
|
ex, request_spec, db):
|
|
"""changes VM state and notifies."""
|
|
LOG.warning(_LW("Failed to %(service)s_%(method)s: %(ex)s"),
|
|
{'service': service, 'method': method, 'ex': ex})
|
|
|
|
vm_state = updates['vm_state']
|
|
properties = request_spec.get('instance_properties', {})
|
|
# NOTE(vish): We shouldn't get here unless we have a catastrophic
|
|
# failure, so just set the instance to its internal state
|
|
notifier = rpc.get_notifier(service)
|
|
state = vm_state.upper()
|
|
LOG.warning(_LW('Setting instance to %s state.'), state,
|
|
instance_uuid=instance_uuid)
|
|
|
|
instance = objects.Instance(context=context, uuid=instance_uuid,
|
|
**updates)
|
|
instance.obj_reset_changes(['uuid'])
|
|
instance.save()
|
|
compute_utils.add_instance_fault_from_exc(context,
|
|
instance, ex, sys.exc_info())
|
|
|
|
payload = dict(request_spec=request_spec,
|
|
instance_properties=properties,
|
|
instance_id=instance_uuid,
|
|
state=vm_state,
|
|
method=method,
|
|
reason=ex)
|
|
|
|
event_type = '%s.%s' % (service, method)
|
|
notifier.error(context, event_type, payload)
|
|
|
|
|
|
def populate_filter_properties(filter_properties, host_state):
|
|
"""Add additional information to the filter properties after a node has
|
|
been selected by the scheduling process.
|
|
"""
|
|
if isinstance(host_state, dict):
|
|
host = host_state['host']
|
|
nodename = host_state['nodename']
|
|
limits = host_state['limits']
|
|
else:
|
|
host = host_state.host
|
|
nodename = host_state.nodename
|
|
limits = host_state.limits
|
|
|
|
# Adds a retry entry for the selected compute host and node:
|
|
_add_retry_host(filter_properties, host, nodename)
|
|
|
|
# Adds oversubscription policy
|
|
if not filter_properties.get('force_hosts'):
|
|
filter_properties['limits'] = limits
|
|
|
|
|
|
def populate_retry(filter_properties, instance_uuid):
|
|
max_attempts = _max_attempts()
|
|
force_hosts = filter_properties.get('force_hosts', [])
|
|
force_nodes = filter_properties.get('force_nodes', [])
|
|
|
|
# In the case of multiple force hosts/nodes, scheduler should not
|
|
# disable retry filter but traverse all force hosts/nodes one by
|
|
# one till scheduler gets a valid target host.
|
|
if (max_attempts == 1 or len(force_hosts) == 1
|
|
or len(force_nodes) == 1):
|
|
# re-scheduling is disabled.
|
|
return
|
|
|
|
# retry is enabled, update attempt count:
|
|
retry = filter_properties.setdefault(
|
|
'retry', {
|
|
'num_attempts': 0,
|
|
'hosts': [] # list of compute hosts tried
|
|
})
|
|
retry['num_attempts'] += 1
|
|
|
|
_log_compute_error(instance_uuid, retry)
|
|
exc_reason = retry.pop('exc_reason', None)
|
|
|
|
if retry['num_attempts'] > max_attempts:
|
|
msg = (_('Exceeded max scheduling attempts %(max_attempts)d '
|
|
'for instance %(instance_uuid)s. '
|
|
'Last exception: %(exc_reason)s')
|
|
% {'max_attempts': max_attempts,
|
|
'instance_uuid': instance_uuid,
|
|
'exc_reason': exc_reason})
|
|
raise exception.MaxRetriesExceeded(reason=msg)
|
|
|
|
|
|
def _log_compute_error(instance_uuid, retry):
|
|
"""If the request contained an exception from a previous compute
|
|
build/resize operation, log it to aid debugging
|
|
"""
|
|
exc = retry.get('exc') # string-ified exception from compute
|
|
if not exc:
|
|
return # no exception info from a previous attempt, skip
|
|
|
|
hosts = retry.get('hosts', None)
|
|
if not hosts:
|
|
return # no previously attempted hosts, skip
|
|
|
|
last_host, last_node = hosts[-1]
|
|
LOG.error(_LE('Error from last host: %(last_host)s (node %(last_node)s):'
|
|
' %(exc)s'),
|
|
{'last_host': last_host,
|
|
'last_node': last_node,
|
|
'exc': exc},
|
|
instance_uuid=instance_uuid)
|
|
|
|
|
|
def _max_attempts():
|
|
max_attempts = CONF.scheduler_max_attempts
|
|
if max_attempts < 1:
|
|
raise exception.NovaException(_("Invalid value for "
|
|
"'scheduler_max_attempts', must be >= 1"))
|
|
return max_attempts
|
|
|
|
|
|
def _add_retry_host(filter_properties, host, node):
|
|
"""Add a retry entry for the selected compute node. In the event that
|
|
the request gets re-scheduled, this entry will signal that the given
|
|
node has already been tried.
|
|
"""
|
|
retry = filter_properties.get('retry', None)
|
|
if not retry:
|
|
return
|
|
hosts = retry['hosts']
|
|
hosts.append([host, node])
|
|
|
|
|
|
def parse_options(opts, sep='=', converter=str, name=""):
|
|
"""Parse a list of options, each in the format of <key><sep><value>. Also
|
|
use the converter to convert the value into desired type.
|
|
|
|
:params opts: list of options, e.g. from oslo_config.cfg.ListOpt
|
|
:params sep: the separator
|
|
:params converter: callable object to convert the value, should raise
|
|
ValueError for conversion failure
|
|
:params name: name of the option
|
|
|
|
:returns: a lists of tuple of values (key, converted_value)
|
|
"""
|
|
good = []
|
|
bad = []
|
|
for opt in opts:
|
|
try:
|
|
key, seen_sep, value = opt.partition(sep)
|
|
value = converter(value)
|
|
except ValueError:
|
|
key = None
|
|
value = None
|
|
if key and seen_sep and value is not None:
|
|
good.append((key, value))
|
|
else:
|
|
bad.append(opt)
|
|
if bad:
|
|
LOG.warning(_LW("Ignoring the invalid elements of the option "
|
|
"%(name)s: %(options)s"),
|
|
{'name': name,
|
|
'options': ", ".join(bad)})
|
|
return good
|
|
|
|
|
|
def validate_filter(filter):
|
|
"""Validates that the filter is configured in the default filters."""
|
|
return filter in CONF.scheduler_default_filters
|
|
|
|
|
|
_SUPPORTS_AFFINITY = None
|
|
_SUPPORTS_ANTI_AFFINITY = None
|
|
|
|
|
|
def _get_group_details(context, instance_uuid, user_group_hosts=None):
|
|
"""Provide group_hosts and group_policies sets related to instances if
|
|
those instances are belonging to a group and if corresponding filters are
|
|
enabled.
|
|
|
|
:param instance_uuid: UUID of the instance to check
|
|
:param user_group_hosts: Hosts from the group or empty set
|
|
|
|
:returns: None or namedtuple GroupDetails
|
|
"""
|
|
global _SUPPORTS_AFFINITY
|
|
if _SUPPORTS_AFFINITY is None:
|
|
_SUPPORTS_AFFINITY = validate_filter(
|
|
'ServerGroupAffinityFilter')
|
|
global _SUPPORTS_ANTI_AFFINITY
|
|
if _SUPPORTS_ANTI_AFFINITY is None:
|
|
_SUPPORTS_ANTI_AFFINITY = validate_filter(
|
|
'ServerGroupAntiAffinityFilter')
|
|
_supports_server_groups = any((_SUPPORTS_AFFINITY,
|
|
_SUPPORTS_ANTI_AFFINITY))
|
|
if not _supports_server_groups or not instance_uuid:
|
|
return
|
|
|
|
try:
|
|
group = objects.InstanceGroup.get_by_instance_uuid(context,
|
|
instance_uuid)
|
|
except exception.InstanceGroupNotFound:
|
|
return
|
|
|
|
policies = set(('anti-affinity', 'affinity'))
|
|
if any((policy in policies) for policy in group.policies):
|
|
if (not _SUPPORTS_AFFINITY and 'affinity' in group.policies):
|
|
msg = _("ServerGroupAffinityFilter not configured")
|
|
LOG.error(msg)
|
|
raise exception.UnsupportedPolicyException(reason=msg)
|
|
if (not _SUPPORTS_ANTI_AFFINITY and 'anti-affinity' in group.policies):
|
|
msg = _("ServerGroupAntiAffinityFilter not configured")
|
|
LOG.error(msg)
|
|
raise exception.UnsupportedPolicyException(reason=msg)
|
|
group_hosts = set(group.get_hosts())
|
|
user_hosts = set(user_group_hosts) if user_group_hosts else set()
|
|
return GroupDetails(hosts=user_hosts | group_hosts,
|
|
policies=group.policies)
|
|
|
|
|
|
def setup_instance_group(context, request_spec, filter_properties):
|
|
"""Add group_hosts and group_policies fields to filter_properties dict
|
|
based on instance uuids provided in request_spec, if those instances are
|
|
belonging to a group.
|
|
|
|
:param request_spec: Request spec
|
|
:param filter_properties: Filter properties
|
|
"""
|
|
group_hosts = filter_properties.get('group_hosts')
|
|
# NOTE(sbauza) If there are multiple instance UUIDs, it's a boot
|
|
# request and they will all be in the same group, so it's safe to
|
|
# only check the first one.
|
|
instance_uuid = request_spec.get('instance_properties', {}).get('uuid')
|
|
group_info = _get_group_details(context, instance_uuid, group_hosts)
|
|
if group_info is not None:
|
|
filter_properties['group_updated'] = True
|
|
filter_properties['group_hosts'] = group_info.hosts
|
|
filter_properties['group_policies'] = group_info.policies
|
|
|
|
|
|
def retry_on_timeout(retries=1):
|
|
"""Retry the call in case a MessagingTimeout is raised.
|
|
|
|
A decorator for retrying calls when a service dies mid-request.
|
|
|
|
:param retries: Number of retries
|
|
:returns: Decorator
|
|
"""
|
|
def outer(func):
|
|
@functools.wraps(func)
|
|
def wrapped(*args, **kwargs):
|
|
attempt = 0
|
|
while True:
|
|
try:
|
|
return func(*args, **kwargs)
|
|
except messaging.MessagingTimeout:
|
|
attempt += 1
|
|
if attempt <= retries:
|
|
LOG.warning(_LW(
|
|
"Retrying %(name)s after a MessagingTimeout, "
|
|
"attempt %(attempt)s of %(retries)s."),
|
|
{'attempt': attempt, 'retries': retries,
|
|
'name': func.__name__})
|
|
else:
|
|
raise
|
|
return wrapped
|
|
return outer
|
|
|
|
retry_select_destinations = retry_on_timeout(_max_attempts() - 1)
|