364 lines
15 KiB
Python
364 lines
15 KiB
Python
# Copyright (c) 2011 Openstack, LLC.
|
|
# All Rights Reserved.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
The DistributedScheduler is for creating instances locally or across zones.
|
|
You can customize this scheduler by specifying your own Host Filters and
|
|
Weighing Functions.
|
|
"""
|
|
|
|
import json
|
|
import operator
|
|
import types
|
|
|
|
import M2Crypto
|
|
|
|
from novaclient import v1_1 as novaclient
|
|
from novaclient import exceptions as novaclient_exceptions
|
|
|
|
from nova import crypto
|
|
from nova import db
|
|
from nova import exception
|
|
from nova import flags
|
|
from nova import log as logging
|
|
from nova import rpc
|
|
|
|
from nova.compute import api as compute_api
|
|
from nova.scheduler import api
|
|
from nova.scheduler import driver
|
|
from nova.scheduler import filters
|
|
from nova.scheduler import least_cost
|
|
|
|
|
|
FLAGS = flags.FLAGS
|
|
flags.DEFINE_list('default_host_filters', ['InstanceTypeFilter'],
|
|
'Which filters to use for filtering hosts when not specified '
|
|
'in the request.')
|
|
|
|
LOG = logging.getLogger('nova.scheduler.distributed_scheduler')
|
|
|
|
|
|
class InvalidBlob(exception.NovaException):
|
|
message = _("Ill-formed or incorrectly routed 'blob' data sent "
|
|
"to instance create request.")
|
|
|
|
|
|
class DistributedScheduler(driver.Scheduler):
|
|
"""Scheduler that can work across any nova deployment, from simple
|
|
deployments to multiple nested zones.
|
|
"""
|
|
|
|
def schedule(self, context, topic, method, *args, **kwargs):
|
|
"""The schedule() contract requires we return the one
|
|
best-suited host for this request.
|
|
|
|
NOTE: We're only focused on compute instances right now,
|
|
so this method will always raise NoValidHost()."""
|
|
msg = _("No host selection for %s defined." % topic)
|
|
raise driver.NoValidHost(msg)
|
|
|
|
def schedule_run_instance(self, context, request_spec, *args, **kwargs):
|
|
"""This method is called from nova.compute.api to provision
|
|
an instance. However we need to look at the parameters being
|
|
passed in to see if this is a request to:
|
|
1. Create build plan (a list of WeightedHosts) and then provision, or
|
|
2. Use the WeightedHost information in the request parameters
|
|
to simply create the instance (either in this zone or
|
|
a child zone).
|
|
|
|
returns a list of the instances created.
|
|
"""
|
|
|
|
elevated = context.elevated()
|
|
num_instances = request_spec.get('num_instances', 1)
|
|
LOG.debug(_("Attempting to build %(num_instances)d instance(s)") %
|
|
locals())
|
|
|
|
weighted_hosts = []
|
|
|
|
# Having a 'blob' hint means we've already provided a build plan.
|
|
# We need to turn this back into a WeightedHost object.
|
|
blob = request_spec.get('blob', None)
|
|
if blob:
|
|
weighted_hosts.append(self._make_weighted_host_from_blob(blob))
|
|
else:
|
|
# No plan ... better make one.
|
|
weighted_hosts = self._schedule(elevated, "compute", request_spec,
|
|
*args, **kwargs)
|
|
|
|
if not weighted_hosts:
|
|
raise driver.NoValidHost(_('No hosts were available'))
|
|
|
|
instances = []
|
|
for num in xrange(num_instances):
|
|
if not weighted_hosts:
|
|
break
|
|
weighted_host = weighted_hosts.pop(0)
|
|
|
|
instance = None
|
|
if weighted_host.host:
|
|
instance = self._provision_resource_locally(elevated,
|
|
weighted_host, request_spec, kwargs)
|
|
else:
|
|
instance = self._ask_child_zone_to_create_instance(elevated,
|
|
weighted_host, request_spec, kwargs)
|
|
|
|
if instance:
|
|
instances.append(instance)
|
|
|
|
return instances
|
|
|
|
def select(self, context, request_spec, *args, **kwargs):
|
|
"""Select returns a list of weights and zone/host information
|
|
corresponding to the best hosts to service the request. Any
|
|
internal zone information will be encrypted so as not to reveal
|
|
anything about our inner layout.
|
|
"""
|
|
elevated = context.elevated()
|
|
weighted_hosts = self._schedule(elevated, "compute", request_spec,
|
|
*args, **kwargs)
|
|
return [weighted_host.to_dict() for weighted_host in weighted_hosts]
|
|
|
|
def _call_zone_method(self, context, method, specs, zones):
|
|
"""Call novaclient zone method. Broken out for testing."""
|
|
return api.call_zone_method(context, method, specs=specs, zones=zones)
|
|
|
|
def _provision_resource_locally(self, context, weighted_host, request_spec,
|
|
kwargs):
|
|
"""Create the requested resource in this Zone."""
|
|
instance = self.create_instance_db_entry(context, request_spec)
|
|
driver.cast_to_compute_host(context, weighted_host.host,
|
|
'run_instance', instance_id=instance['id'], **kwargs)
|
|
return driver.encode_instance(instance, local=True)
|
|
|
|
def _make_weighted_host_from_blob(self, blob):
|
|
"""Returns the decrypted blob as a WeightedHost object
|
|
or None if invalid. Broken out for testing.
|
|
"""
|
|
decryptor = crypto.decryptor(FLAGS.build_plan_encryption_key)
|
|
try:
|
|
json_entry = decryptor(blob)
|
|
# Extract our WeightedHost values
|
|
wh_dict = json.loads(json_entry)
|
|
host = wh_dict.get('host', None)
|
|
blob = wh_dict.get('blob', None)
|
|
zone = wh_dict.get('zone', None)
|
|
return least_cost.WeightedHost(wh_dict['weight'],
|
|
host=host, blob=blob, zone=zone)
|
|
|
|
except M2Crypto.EVP.EVPError:
|
|
raise InvalidBlob()
|
|
|
|
def _ask_child_zone_to_create_instance(self, context, weighted_host,
|
|
request_spec, kwargs):
|
|
"""Once we have determined that the request should go to one
|
|
of our children, we need to fabricate a new POST /servers/
|
|
call with the same parameters that were passed into us.
|
|
This request is always for a single instance.
|
|
|
|
Note that we have to reverse engineer from our args to get back the
|
|
image, flavor, ipgroup, etc. since the original call could have
|
|
come in from EC2 (which doesn't use these things).
|
|
"""
|
|
instance_type = request_spec['instance_type']
|
|
instance_properties = request_spec['instance_properties']
|
|
|
|
name = instance_properties['display_name']
|
|
image_ref = instance_properties['image_ref']
|
|
meta = instance_properties['metadata']
|
|
flavor_id = instance_type['flavorid']
|
|
reservation_id = instance_properties['reservation_id']
|
|
files = kwargs['injected_files']
|
|
|
|
zone = db.zone_get(context.elevated(), weighted_host.zone)
|
|
zone_name = zone.name
|
|
url = zone.api_url
|
|
LOG.debug(_("Forwarding instance create call to zone '%(zone_name)s'. "
|
|
"ReservationID=%(reservation_id)s") % locals())
|
|
nova = None
|
|
try:
|
|
# This operation is done as the caller, not the zone admin.
|
|
nova = novaclient.Client(zone.username, zone.password, None, url,
|
|
token=context.auth_token,
|
|
region_name=zone_name)
|
|
nova.authenticate()
|
|
except novaclient_exceptions.BadRequest, e:
|
|
raise exception.NotAuthorized(_("Bad credentials attempting "
|
|
"to talk to zone at %(url)s.") % locals())
|
|
# NOTE(Vek): Novaclient has two different calling conventions
|
|
# for this call, depending on whether you're using
|
|
# 1.0 or 1.1 API: in 1.0, there's an ipgroups
|
|
# argument after flavor_id which isn't present in
|
|
# 1.1. To work around this, all the extra
|
|
# arguments are passed as keyword arguments
|
|
# (there's a reasonable default for ipgroups in the
|
|
# novaclient call).
|
|
instance = nova.servers.create(name, image_ref, flavor_id,
|
|
meta=meta, files=files,
|
|
zone_blob=weighted_host.blob,
|
|
reservation_id=reservation_id)
|
|
return driver.encode_instance(instance._info, local=False)
|
|
|
|
def _adjust_child_weights(self, child_results, zones):
|
|
"""Apply the Scale and Offset values from the Zone definition
|
|
to adjust the weights returned from the child zones. Returns
|
|
a list of WeightedHost objects: [WeightedHost(), ...]
|
|
"""
|
|
weighted_hosts = []
|
|
for zone_id, result in child_results:
|
|
if not result:
|
|
continue
|
|
|
|
for zone_rec in zones:
|
|
if zone_rec['id'] != zone_id:
|
|
continue
|
|
for item in result:
|
|
try:
|
|
offset = zone_rec['weight_offset']
|
|
scale = zone_rec['weight_scale']
|
|
raw_weight = item['weight']
|
|
cooked_weight = offset + scale * raw_weight
|
|
|
|
weighted_hosts.append(least_cost.WeightedHost(
|
|
host=None, weight=cooked_weight,
|
|
zone=zone_id, blob=item['blob']))
|
|
except KeyError:
|
|
LOG.exception(_("Bad child zone scaling values "
|
|
"for Zone: %(zone_id)s") % locals())
|
|
return weighted_hosts
|
|
|
|
def _zone_get_all(self, context):
|
|
"""Broken out for testing."""
|
|
return db.zone_get_all(context)
|
|
|
|
def _schedule(self, elevated, topic, request_spec, *args, **kwargs):
|
|
"""Returns a list of hosts that meet the required specs,
|
|
ordered by their fitness.
|
|
"""
|
|
if topic != "compute":
|
|
msg = _("Scheduler only understands Compute nodes (for now)")
|
|
raise NotImplementedError(msg)
|
|
|
|
instance_type = request_spec.get("instance_type", None)
|
|
if not instance_type:
|
|
msg = _("Scheduler only understands InstanceType-based" \
|
|
"provisioning.")
|
|
raise NotImplementedError(msg)
|
|
|
|
ram_requirement_mb = instance_type['memory_mb']
|
|
disk_requirement_bg = instance_type['local_gb']
|
|
|
|
# Find our local list of acceptable hosts by repeatedly
|
|
# filtering and weighing our options. Each time we choose a
|
|
# host, we virtually consume resources on it so subsequent
|
|
# selections can adjust accordingly.
|
|
|
|
# unfiltered_hosts_dict is {host : ZoneManager.HostInfo()}
|
|
unfiltered_hosts_dict = self.zone_manager.get_all_host_data(elevated)
|
|
unfiltered_hosts = unfiltered_hosts_dict.items()
|
|
|
|
num_instances = request_spec.get('num_instances', 1)
|
|
selected_hosts = []
|
|
for num in xrange(num_instances):
|
|
# Filter local hosts based on requirements ...
|
|
filtered_hosts = self._filter_hosts(topic, request_spec,
|
|
unfiltered_hosts)
|
|
|
|
if not filtered_hosts:
|
|
# Can't get any more locally.
|
|
break
|
|
|
|
LOG.debug(_("Filtered %(filtered_hosts)s") % locals())
|
|
|
|
# weighted_host = WeightedHost() ... the best
|
|
# host for the job.
|
|
weighted_host = least_cost.weigh_hosts(request_spec,
|
|
filtered_hosts)
|
|
LOG.debug(_("Weighted %(weighted_host)s") % locals())
|
|
selected_hosts.append(weighted_host)
|
|
|
|
# Now consume the resources so the filter/weights
|
|
# will change for the next instance.
|
|
weighted_host.hostinfo.consume_resources(disk_requirement_bg,
|
|
ram_requirement_mb)
|
|
|
|
# Next, tack on the host weights from the child zones
|
|
json_spec = json.dumps(request_spec)
|
|
all_zones = self._zone_get_all(elevated)
|
|
child_results = self._call_zone_method(elevated, "select",
|
|
specs=json_spec, zones=all_zones)
|
|
selected_hosts.extend(self._adjust_child_weights(
|
|
child_results, all_zones))
|
|
selected_hosts.sort(key=operator.attrgetter('weight'))
|
|
return selected_hosts[:num_instances]
|
|
|
|
def _get_filter_classes(self):
|
|
# Imported here to avoid circular imports
|
|
from nova.scheduler import filters
|
|
|
|
def get_itm(nm):
|
|
return getattr(filters, nm)
|
|
|
|
return [get_itm(itm) for itm in dir(filters)
|
|
if (type(get_itm(itm)) is types.TypeType)
|
|
and issubclass(get_itm(itm), filters.AbstractHostFilter)
|
|
and get_itm(itm) is not filters.AbstractHostFilter]
|
|
|
|
def _choose_host_filters(self, filters=None):
|
|
"""Since the caller may specify which filters to use we need
|
|
to have an authoritative list of what is permissible. This
|
|
function checks the filter names against a predefined set
|
|
of acceptable filters.
|
|
"""
|
|
if not filters:
|
|
filters = FLAGS.default_host_filters
|
|
if not isinstance(filters, (list, tuple)):
|
|
filters = [filters]
|
|
good_filters = []
|
|
bad_filters = []
|
|
filter_classes = self._get_filter_classes()
|
|
for filter_name in filters:
|
|
found_class = False
|
|
for cls in filter_classes:
|
|
if cls.__name__ == filter_name:
|
|
good_filters.append(cls())
|
|
found_class = True
|
|
break
|
|
if not found_class:
|
|
bad_filters.append(filter_name)
|
|
if bad_filters:
|
|
msg = ", ".join(bad_filters)
|
|
raise exception.SchedulerHostFilterNotFound(filter_name=msg)
|
|
return good_filters
|
|
|
|
def _filter_hosts(self, topic, request_spec, hosts=None):
|
|
"""Filter the full host list. hosts = [(host, HostInfo()), ...].
|
|
This method returns a subset of hosts, in the same format."""
|
|
selected_filters = self._choose_host_filters()
|
|
|
|
# TODO(sandy): We're only using InstanceType-based specs
|
|
# currently. Later we'll need to snoop for more detailed
|
|
# host filter requests.
|
|
instance_type = request_spec.get("instance_type", None)
|
|
if instance_type is None:
|
|
# No way to select; return the specified hosts.
|
|
return hosts
|
|
|
|
for selected_filter in selected_filters:
|
|
query = selected_filter.instance_type_to_filter(instance_type)
|
|
hosts = selected_filter.filter_hosts(hosts, query)
|
|
|
|
return hosts
|