Fixes bug 844160 Makes the servers create API call work with all schedulers, removes 'zone boot', and folds create_instance_helper back into servers controller. Notable changes: 1) compute API's create_at_all_once has been removed. It was only used by zone boot. 2) compute API's create() no longer creates Instance DB entries. The schedulers now do this. This makes sense, as only the schedulers will know where the instances will be placed. They could be placed locally or in a child zone. However, this comes at a cost. compute_api.create() now does a 'call' to the scheduler instead of a 'cast' in most cases (* see below). This is so it can receive the instance ID(s) that were created back from the scheduler. Ultimately, we probably need to figure out a way to generate UUIDs before scheduling and return only the information we know about an instance before it is actually scheduled and created. We could then revert this back to a cast. (Or maybe we always return a reservation ID instead of an instance.) 3) scheduler* calls do not return a host now. They return a value that'll be returned if the caller does an rpc.call(). The casts to hosts are now done by the scheduler drivers themselves. 4) There's been an undocumented feature in the OS API to allow multiple instances to be built. I've kept it. 5) If compute_api.create() is creating multiple instances, only a single call is made to the scheduler, vs the old way of sending many casts. All schedulers now check how many instances have been requested. 6) I've added an undocumented option 'return_reservation_id' when building. If set to True, only a reservation ID is returned to the API caller, not the instance. This essentially gives you the old 'nova zone-boot' functionality. 7) It was requested I create a stub for a zones extension, so you'll see the empty extension in here. We'll move some code to it later. 8) Fixes an unrelated bug that merged into trunk recently where zones DB calls were not being done with admin context always, anymore. 9) Scheduler calls were always done with admin context when they should elevate only when needed. 10) Moved stub_network flag so individual tests can run again. * Case #6 above doesn't wait for the scheduler response with instance IDs. It does a 'cast' instead. Change-Id: Ic040780a2e86d7330e225f14056dadbaa9fb3c7e
529 lines
20 KiB
Python
529 lines
20 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright (c) 2011 Zadara Storage Inc.
|
|
# Copyright (c) 2011 OpenStack LLC.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
"""
|
|
VSA Simple Scheduler
|
|
"""
|
|
|
|
from nova import context
|
|
from nova import db
|
|
from nova import flags
|
|
from nova import log as logging
|
|
from nova import rpc
|
|
from nova import utils
|
|
from nova.scheduler import driver
|
|
from nova.scheduler import simple
|
|
from nova.vsa.api import VsaState
|
|
from nova.volume import volume_types
|
|
|
|
LOG = logging.getLogger('nova.scheduler.vsa')
|
|
|
|
FLAGS = flags.FLAGS
|
|
flags.DEFINE_integer('drive_type_approx_capacity_percent', 10,
|
|
'The percentage range for capacity comparison')
|
|
flags.DEFINE_integer('vsa_unique_hosts_per_alloc', 10,
|
|
'The number of unique hosts per storage allocation')
|
|
flags.DEFINE_boolean('vsa_select_unique_drives', True,
|
|
'Allow selection of same host for multiple drives')
|
|
|
|
|
|
def BYTES_TO_GB(bytes):
|
|
return bytes >> 30
|
|
|
|
|
|
def GB_TO_BYTES(gb):
|
|
return gb << 30
|
|
|
|
|
|
class VsaScheduler(simple.SimpleScheduler):
|
|
"""Implements Scheduler for volume placement."""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(VsaScheduler, self).__init__(*args, **kwargs)
|
|
self._notify_all_volume_hosts("startup")
|
|
|
|
def _notify_all_volume_hosts(self, event):
|
|
rpc.fanout_cast(context.get_admin_context(),
|
|
FLAGS.volume_topic,
|
|
{"method": "notification",
|
|
"args": {"event": event}})
|
|
|
|
def _qosgrp_match(self, drive_type, qos_values):
|
|
|
|
def _compare_names(str1, str2):
|
|
return str1.lower() == str2.lower()
|
|
|
|
def _compare_sizes_approxim(cap_capacity, size):
|
|
cap_capacity = BYTES_TO_GB(int(cap_capacity))
|
|
size = int(size)
|
|
size_perc = size * \
|
|
FLAGS.drive_type_approx_capacity_percent / 100
|
|
|
|
return cap_capacity >= size - size_perc and \
|
|
cap_capacity <= size + size_perc
|
|
|
|
# Add more entries for additional comparisons
|
|
compare_list = [{'cap1': 'DriveType',
|
|
'cap2': 'type',
|
|
'cmp_func': _compare_names},
|
|
{'cap1': 'DriveCapacity',
|
|
'cap2': 'size',
|
|
'cmp_func': _compare_sizes_approxim}]
|
|
|
|
for cap in compare_list:
|
|
if cap['cap1'] in qos_values.keys() and \
|
|
cap['cap2'] in drive_type.keys() and \
|
|
cap['cmp_func'] is not None and \
|
|
cap['cmp_func'](qos_values[cap['cap1']],
|
|
drive_type[cap['cap2']]):
|
|
pass
|
|
else:
|
|
return False
|
|
return True
|
|
|
|
def _get_service_states(self):
|
|
return self.zone_manager.service_states
|
|
|
|
def _filter_hosts(self, topic, request_spec, host_list=None):
|
|
|
|
LOG.debug(_("_filter_hosts: %(request_spec)s"), locals())
|
|
|
|
drive_type = request_spec['drive_type']
|
|
LOG.debug(_("Filter hosts for drive type %s"), drive_type['name'])
|
|
|
|
if host_list is None:
|
|
host_list = self._get_service_states().iteritems()
|
|
|
|
filtered_hosts = [] # returns list of (hostname, capability_dict)
|
|
for host, host_dict in host_list:
|
|
for service_name, service_dict in host_dict.iteritems():
|
|
if service_name != topic:
|
|
continue
|
|
|
|
gos_info = service_dict.get('drive_qos_info', {})
|
|
for qosgrp, qos_values in gos_info.iteritems():
|
|
if self._qosgrp_match(drive_type, qos_values):
|
|
if qos_values['AvailableCapacity'] > 0:
|
|
filtered_hosts.append((host, gos_info))
|
|
else:
|
|
LOG.debug(_("Host %s has no free capacity. Skip"),
|
|
host)
|
|
break
|
|
|
|
host_names = [item[0] for item in filtered_hosts]
|
|
LOG.debug(_("Filter hosts: %s"), host_names)
|
|
return filtered_hosts
|
|
|
|
def _allowed_to_use_host(self, host, selected_hosts, unique):
|
|
if unique == False or \
|
|
host not in [item[0] for item in selected_hosts]:
|
|
return True
|
|
else:
|
|
return False
|
|
|
|
def _add_hostcap_to_list(self, selected_hosts, host, cap):
|
|
if host not in [item[0] for item in selected_hosts]:
|
|
selected_hosts.append((host, cap))
|
|
|
|
def host_selection_algorithm(self, request_spec, all_hosts,
|
|
selected_hosts, unique):
|
|
"""Must override this method for VSA scheduler to work."""
|
|
raise NotImplementedError(_("Must implement host selection mechanism"))
|
|
|
|
def _select_hosts(self, request_spec, all_hosts, selected_hosts=None):
|
|
|
|
if selected_hosts is None:
|
|
selected_hosts = []
|
|
|
|
host = None
|
|
if len(selected_hosts) >= FLAGS.vsa_unique_hosts_per_alloc:
|
|
# try to select from already selected hosts only
|
|
LOG.debug(_("Maximum number of hosts selected (%d)"),
|
|
len(selected_hosts))
|
|
unique = False
|
|
(host, qos_cap) = self.host_selection_algorithm(request_spec,
|
|
selected_hosts,
|
|
selected_hosts,
|
|
unique)
|
|
|
|
LOG.debug(_("Selected excessive host %(host)s"), locals())
|
|
else:
|
|
unique = FLAGS.vsa_select_unique_drives
|
|
|
|
if host is None:
|
|
# if we've not tried yet (# of sel hosts < max) - unique=True
|
|
# or failed to select from selected_hosts - unique=False
|
|
# select from all hosts
|
|
(host, qos_cap) = self.host_selection_algorithm(request_spec,
|
|
all_hosts,
|
|
selected_hosts,
|
|
unique)
|
|
if host is None:
|
|
raise driver.WillNotSchedule(_("No available hosts"))
|
|
|
|
return (host, qos_cap)
|
|
|
|
def _provision_volume(self, context, vol, vsa_id, availability_zone):
|
|
|
|
if availability_zone is None:
|
|
availability_zone = FLAGS.storage_availability_zone
|
|
|
|
now = utils.utcnow()
|
|
options = {
|
|
'size': vol['size'],
|
|
'user_id': context.user_id,
|
|
'project_id': context.project_id,
|
|
'snapshot_id': None,
|
|
'availability_zone': availability_zone,
|
|
'status': "creating",
|
|
'attach_status': "detached",
|
|
'display_name': vol['name'],
|
|
'display_description': vol['description'],
|
|
'volume_type_id': vol['volume_type_id'],
|
|
'metadata': dict(to_vsa_id=vsa_id),
|
|
}
|
|
|
|
size = vol['size']
|
|
host = vol['host']
|
|
name = vol['name']
|
|
LOG.debug(_("Provision volume %(name)s of size %(size)s GB on "\
|
|
"host %(host)s"), locals())
|
|
|
|
volume_ref = db.volume_create(context.elevated(), options)
|
|
driver.cast_to_volume_host(context, vol['host'],
|
|
'create_volume', volume_id=volume_ref['id'],
|
|
snapshot_id=None)
|
|
|
|
def _check_host_enforcement(self, context, availability_zone):
|
|
if (availability_zone
|
|
and ':' in availability_zone
|
|
and context.is_admin):
|
|
zone, _x, host = availability_zone.partition(':')
|
|
service = db.service_get_by_args(context.elevated(), host,
|
|
'nova-volume')
|
|
if not self.service_is_up(service):
|
|
raise driver.WillNotSchedule(_("Host %s not available") % host)
|
|
|
|
return host
|
|
else:
|
|
return None
|
|
|
|
def _assign_hosts_to_volumes(self, context, volume_params, forced_host):
|
|
|
|
prev_volume_type_id = None
|
|
request_spec = {}
|
|
selected_hosts = []
|
|
|
|
LOG.debug(_("volume_params %(volume_params)s") % locals())
|
|
|
|
i = 1
|
|
for vol in volume_params:
|
|
name = vol['name']
|
|
LOG.debug(_("%(i)d: Volume %(name)s"), locals())
|
|
i += 1
|
|
|
|
if forced_host:
|
|
vol['host'] = forced_host
|
|
vol['capabilities'] = None
|
|
continue
|
|
|
|
volume_type_id = vol['volume_type_id']
|
|
request_spec['size'] = vol['size']
|
|
|
|
if prev_volume_type_id is None or\
|
|
prev_volume_type_id != volume_type_id:
|
|
# generate list of hosts for this drive type
|
|
|
|
volume_type = volume_types.get_volume_type(context,
|
|
volume_type_id)
|
|
drive_type = {
|
|
'name': volume_type['extra_specs'].get('drive_name'),
|
|
'type': volume_type['extra_specs'].get('drive_type'),
|
|
'size': int(volume_type['extra_specs'].get('drive_size')),
|
|
'rpm': volume_type['extra_specs'].get('drive_rpm'),
|
|
}
|
|
request_spec['drive_type'] = drive_type
|
|
|
|
all_hosts = self._filter_hosts("volume", request_spec)
|
|
prev_volume_type_id = volume_type_id
|
|
|
|
(host, qos_cap) = self._select_hosts(request_spec,
|
|
all_hosts, selected_hosts)
|
|
vol['host'] = host
|
|
vol['capabilities'] = qos_cap
|
|
self._consume_resource(qos_cap, vol['size'], -1)
|
|
|
|
def schedule_create_volumes(self, context, request_spec,
|
|
availability_zone=None, *_args, **_kwargs):
|
|
"""Picks hosts for hosting multiple volumes."""
|
|
num_volumes = request_spec.get('num_volumes')
|
|
LOG.debug(_("Attempting to spawn %(num_volumes)d volume(s)") %
|
|
locals())
|
|
|
|
vsa_id = request_spec.get('vsa_id')
|
|
volume_params = request_spec.get('volumes')
|
|
|
|
host = self._check_host_enforcement(context, availability_zone)
|
|
|
|
try:
|
|
self._print_capabilities_info()
|
|
|
|
self._assign_hosts_to_volumes(context, volume_params, host)
|
|
|
|
for vol in volume_params:
|
|
self._provision_volume(context, vol, vsa_id, availability_zone)
|
|
except Exception:
|
|
LOG.exception(_("Error creating volumes"))
|
|
if vsa_id:
|
|
db.vsa_update(context, vsa_id, dict(status=VsaState.FAILED))
|
|
|
|
for vol in volume_params:
|
|
if 'capabilities' in vol:
|
|
self._consume_resource(vol['capabilities'],
|
|
vol['size'], 1)
|
|
raise
|
|
|
|
return None
|
|
|
|
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
|
|
"""Picks the best host based on requested drive type capability."""
|
|
volume_ref = db.volume_get(context, volume_id)
|
|
|
|
host = self._check_host_enforcement(context,
|
|
volume_ref['availability_zone'])
|
|
if host:
|
|
driver.cast_to_volume_host(context, host, 'create_volume',
|
|
volume_id=volume_id, **_kwargs)
|
|
return None
|
|
|
|
volume_type_id = volume_ref['volume_type_id']
|
|
if volume_type_id:
|
|
volume_type = volume_types.get_volume_type(context, volume_type_id)
|
|
|
|
if volume_type_id is None or\
|
|
volume_types.is_vsa_volume(volume_type_id, volume_type):
|
|
|
|
LOG.debug(_("Non-VSA volume %d"), volume_ref['id'])
|
|
return super(VsaScheduler, self).schedule_create_volume(context,
|
|
volume_id, *_args, **_kwargs)
|
|
|
|
self._print_capabilities_info()
|
|
|
|
drive_type = {
|
|
'name': volume_type['extra_specs'].get('drive_name'),
|
|
'type': volume_type['extra_specs'].get('drive_type'),
|
|
'size': int(volume_type['extra_specs'].get('drive_size')),
|
|
'rpm': volume_type['extra_specs'].get('drive_rpm'),
|
|
}
|
|
|
|
LOG.debug(_("Spawning volume %(volume_id)s with drive type "\
|
|
"%(drive_type)s"), locals())
|
|
|
|
request_spec = {'size': volume_ref['size'],
|
|
'drive_type': drive_type}
|
|
hosts = self._filter_hosts("volume", request_spec)
|
|
|
|
try:
|
|
(host, qos_cap) = self._select_hosts(request_spec, all_hosts=hosts)
|
|
except Exception:
|
|
LOG.exception(_("Error creating volume"))
|
|
if volume_ref['to_vsa_id']:
|
|
db.vsa_update(context, volume_ref['to_vsa_id'],
|
|
dict(status=VsaState.FAILED))
|
|
raise
|
|
|
|
if host:
|
|
driver.cast_to_volume_host(context, host, 'create_volume',
|
|
volume_id=volume_id, **_kwargs)
|
|
|
|
def _consume_full_drive(self, qos_values, direction):
|
|
qos_values['FullDrive']['NumFreeDrives'] += direction
|
|
qos_values['FullDrive']['NumOccupiedDrives'] -= direction
|
|
|
|
def _consume_partition(self, qos_values, size, direction):
|
|
|
|
if qos_values['PartitionDrive']['PartitionSize'] != 0:
|
|
partition_size = qos_values['PartitionDrive']['PartitionSize']
|
|
else:
|
|
partition_size = size
|
|
part_per_drive = qos_values['DriveCapacity'] / partition_size
|
|
|
|
if direction == -1 and \
|
|
qos_values['PartitionDrive']['NumFreePartitions'] == 0:
|
|
|
|
self._consume_full_drive(qos_values, direction)
|
|
qos_values['PartitionDrive']['NumFreePartitions'] += \
|
|
part_per_drive
|
|
|
|
qos_values['PartitionDrive']['NumFreePartitions'] += direction
|
|
qos_values['PartitionDrive']['NumOccupiedPartitions'] -= direction
|
|
|
|
if direction == 1 and \
|
|
qos_values['PartitionDrive']['NumFreePartitions'] >= \
|
|
part_per_drive:
|
|
|
|
self._consume_full_drive(qos_values, direction)
|
|
qos_values['PartitionDrive']['NumFreePartitions'] -= \
|
|
part_per_drive
|
|
|
|
def _consume_resource(self, qos_values, size, direction):
|
|
if qos_values is None:
|
|
LOG.debug(_("No capability selected for volume of size %(size)s"),
|
|
locals())
|
|
return
|
|
|
|
if size == 0: # full drive match
|
|
qos_values['AvailableCapacity'] += direction * \
|
|
qos_values['DriveCapacity']
|
|
self._consume_full_drive(qos_values, direction)
|
|
else:
|
|
qos_values['AvailableCapacity'] += direction * GB_TO_BYTES(size)
|
|
self._consume_partition(qos_values, GB_TO_BYTES(size), direction)
|
|
return
|
|
|
|
def _print_capabilities_info(self):
|
|
host_list = self._get_service_states().iteritems()
|
|
for host, host_dict in host_list:
|
|
for service_name, service_dict in host_dict.iteritems():
|
|
if service_name != "volume":
|
|
continue
|
|
|
|
LOG.info(_("Host %s:"), host)
|
|
|
|
gos_info = service_dict.get('drive_qos_info', {})
|
|
for qosgrp, qos_values in gos_info.iteritems():
|
|
total = qos_values['TotalDrives']
|
|
used = qos_values['FullDrive']['NumOccupiedDrives']
|
|
free = qos_values['FullDrive']['NumFreeDrives']
|
|
avail = BYTES_TO_GB(qos_values['AvailableCapacity'])
|
|
|
|
LOG.info(_("\tDrive %(qosgrp)-25s: total %(total)2s, "\
|
|
"used %(used)2s, free %(free)2s. Available "\
|
|
"capacity %(avail)-5s"), locals())
|
|
|
|
|
|
class VsaSchedulerLeastUsedHost(VsaScheduler):
|
|
"""
|
|
Implements VSA scheduler to select the host with least used capacity
|
|
of particular type.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(VsaSchedulerLeastUsedHost, self).__init__(*args, **kwargs)
|
|
|
|
def host_selection_algorithm(self, request_spec, all_hosts,
|
|
selected_hosts, unique):
|
|
size = request_spec['size']
|
|
drive_type = request_spec['drive_type']
|
|
best_host = None
|
|
best_qoscap = None
|
|
best_cap = None
|
|
min_used = 0
|
|
|
|
for (host, capabilities) in all_hosts:
|
|
|
|
has_enough_capacity = False
|
|
used_capacity = 0
|
|
for qosgrp, qos_values in capabilities.iteritems():
|
|
|
|
used_capacity = used_capacity + qos_values['TotalCapacity'] \
|
|
- qos_values['AvailableCapacity']
|
|
|
|
if self._qosgrp_match(drive_type, qos_values):
|
|
# we found required qosgroup
|
|
|
|
if size == 0: # full drive match
|
|
if qos_values['FullDrive']['NumFreeDrives'] > 0:
|
|
has_enough_capacity = True
|
|
matched_qos = qos_values
|
|
else:
|
|
break
|
|
else:
|
|
if qos_values['AvailableCapacity'] >= size and \
|
|
(qos_values['PartitionDrive'][
|
|
'NumFreePartitions'] > 0 or \
|
|
qos_values['FullDrive']['NumFreeDrives'] > 0):
|
|
has_enough_capacity = True
|
|
matched_qos = qos_values
|
|
else:
|
|
break
|
|
|
|
if has_enough_capacity and \
|
|
self._allowed_to_use_host(host,
|
|
selected_hosts,
|
|
unique) and \
|
|
(best_host is None or used_capacity < min_used):
|
|
|
|
min_used = used_capacity
|
|
best_host = host
|
|
best_qoscap = matched_qos
|
|
best_cap = capabilities
|
|
|
|
if best_host:
|
|
self._add_hostcap_to_list(selected_hosts, best_host, best_cap)
|
|
min_used = BYTES_TO_GB(min_used)
|
|
LOG.debug(_("\t LeastUsedHost: Best host: %(best_host)s. "\
|
|
"(used capacity %(min_used)s)"), locals())
|
|
return (best_host, best_qoscap)
|
|
|
|
|
|
class VsaSchedulerMostAvailCapacity(VsaScheduler):
|
|
"""
|
|
Implements VSA scheduler to select the host with most available capacity
|
|
of one particular type.
|
|
"""
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
super(VsaSchedulerMostAvailCapacity, self).__init__(*args, **kwargs)
|
|
|
|
def host_selection_algorithm(self, request_spec, all_hosts,
|
|
selected_hosts, unique):
|
|
size = request_spec['size']
|
|
drive_type = request_spec['drive_type']
|
|
best_host = None
|
|
best_qoscap = None
|
|
best_cap = None
|
|
max_avail = 0
|
|
|
|
for (host, capabilities) in all_hosts:
|
|
for qosgrp, qos_values in capabilities.iteritems():
|
|
if self._qosgrp_match(drive_type, qos_values):
|
|
# we found required qosgroup
|
|
|
|
if size == 0: # full drive match
|
|
available = qos_values['FullDrive']['NumFreeDrives']
|
|
else:
|
|
available = qos_values['AvailableCapacity']
|
|
|
|
if available > max_avail and \
|
|
self._allowed_to_use_host(host,
|
|
selected_hosts,
|
|
unique):
|
|
max_avail = available
|
|
best_host = host
|
|
best_qoscap = qos_values
|
|
best_cap = capabilities
|
|
break # go to the next host
|
|
|
|
if best_host:
|
|
self._add_hostcap_to_list(selected_hosts, best_host, best_cap)
|
|
type_str = "drives" if size == 0 else "bytes"
|
|
LOG.debug(_("\t MostAvailCap: Best host: %(best_host)s. "\
|
|
"(available %(max_avail)s %(type_str)s)"), locals())
|
|
|
|
return (best_host, best_qoscap)
|