VSA: first cut. merged with 1279

This commit is contained in:
vladimir.p
2011-07-15 17:56:27 -07:00
parent 6327b951d9
commit e34f8d88cd
14 changed files with 1439 additions and 4 deletions

View File

@@ -95,6 +95,7 @@ Tushar Patil <tushar.vitthal.patil@gmail.com>
Vasiliy Shlykov <vash@vasiliyshlykov.org>
Vishvananda Ishaya <vishvananda@gmail.com>
Vivek Y S <vivek.ys@gmail.com>
Vladimir Popovski <vladimir@zadarastorage.com>
William Wolf <throughnothing@gmail.com>
Yoshiaki Tamura <yoshi@midokura.jp>
Youcef Laribi <Youcef.Laribi@eu.citrix.com>

0
bin/nova-api Executable file → Normal file
View File

View File

@@ -62,6 +62,10 @@ import sys
import time
import tempfile
import zipfile
import ast
# If ../nova/__init__.py exists, add ../ to Python search path, so that
# it will override what happens to be installed in /usr/(local/)lib/python...
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
@@ -84,6 +88,7 @@ from nova import rpc
from nova import utils
from nova import version
from nova.api.ec2 import ec2utils
from nova.api.ec2 import cloud
from nova.auth import manager
from nova.cloudpipe import pipelib
from nova.compute import instance_types
@@ -870,6 +875,243 @@ class VersionCommands(object):
(version.version_string(), version.version_string_with_vcs())
class VsaCommands(object):
"""Methods for dealing with VSAs"""
def __init__(self, *args, **kwargs):
self.controller = cloud.CloudController()
self.manager = manager.AuthManager()
# VP-TMP Solution for APIs. Project should be provided per API call
#self.context = context.get_admin_context()
try:
project = self.manager.get_projects().pop()
except IndexError:
print (_("No projects defined"))
raise
self.context = context.RequestContext(user=project.project_manager,
project=project)
def _list(self, vsas):
format_str = "%-5s %-15s %-25s %-30s %-5s %-10s %-10s %-10s %10s"
if len(vsas):
print format_str %\
(_('ID'),
_('vsa_id'),
_('displayName'),
_('description'),
_('count'),
_('vc_type'),
_('status'),
_('AZ'),
_('createTime'))
for vsa in vsas:
print format_str %\
(vsa['vsaId'],
vsa['name'],
vsa['displayName'],
vsa['displayDescription'],
vsa['vcCount'],
vsa['vcType'],
vsa['status'],
vsa['availabilityZone'],
str(vsa['createTime']))
def create(self, storage='[]', name=None, description=None, vc_count=1,
instance_type_name=None, image_name=None, shared=None,
az=None):
"""Create a VSA.
args: [storage] [name] [description] [vc_count]
[instance_type] [image_name] [--shared|--full_drives]
[availability_zone]
where <storage> is a string representing list of dictionaries
in the following format:
[{'drive_name': 'type', 'num_drives': N, 'size': M},..]
"""
# Sanity check for storage string
storage_list = []
if storage is not None:
try:
storage_list = ast.literal_eval(storage)
except:
print _("Invalid string format %s") % storage
raise
for node in storage_list:
if ('drive_name' not in node) or ('num_drives' not in node):
print (_("Invalid string format for element %s. " \
"Expecting keys 'drive_name' & 'num_drives'"),
str(node))
raise KeyError
if instance_type_name == '':
instance_type_name = None
if shared is None or shared == "--full_drives":
shared = False
elif shared == "--shared":
shared = True
else:
raise ValueError(_('Shared parameter should be set either to "\
"--shared or --full_drives'))
values = {
'display_name': name,
'display_description': description,
'vc_count': int(vc_count),
'vc_type': instance_type_name,
'image_name': image_name,
'storage': storage_list,
'shared': shared,
'placement': {'AvailabilityZone': az}
}
result = self.controller.create_vsa(self.context, **values)
self._list(result['vsaSet'])
def update(self, vsa_id, name=None, description=None, vc_count=None):
"""Updates name/description of vsa and number of VCs
args: vsa_id [display_name] [display_description] [vc_count]"""
values = {}
if name is not None:
values['display_name'] = name
if description is not None:
values['display_description'] = description
if vc_count is not None:
values['vc_count'] = int(vc_count)
self.controller.update_vsa(self.context, vsa_id, **values)
def delete(self, vsa_id):
"""Delete a vsa
args: vsa_id"""
self.controller.delete_vsa(self.context, vsa_id)
def list(self, vsa_id=None):
"""Describe all available VSAs (or particular one)
args: [vsa_id]"""
if vsa_id is not None:
vsa_id = [vsa_id]
result = self.controller.describe_vsas(self.context, vsa_id)
self._list(result['vsaSet'])
class VsaDriveTypeCommands(object):
"""Methods for dealing with VSA drive types"""
def __init__(self, *args, **kwargs):
super(VsaDriveTypeCommands, self).__init__(*args, **kwargs)
def _list(self, drives):
format_str = "%-5s %-30s %-10s %-10s %-10s %-20s %-10s %s"
if len(drives):
print format_str %\
(_('ID'),
_('name'),
_('type'),
_('size_gb'),
_('rpm'),
_('capabilities'),
_('visible'),
_('createTime'))
for drive in drives:
print format_str %\
(str(drive['id']),
drive['name'],
drive['type'],
str(drive['size_gb']),
drive['rpm'],
drive['capabilities'],
str(drive['visible']),
str(drive['created_at']))
def create(self, type, size_gb, rpm, capabilities='',
visible=None, name=None):
"""Create drive type.
args: type size_gb rpm [capabilities] [--show|--hide] [custom_name]
"""
if visible is None or visible == "--show":
visible = True
elif visible == "--hide":
visible = False
else:
raise ValueError(_('Visible parameter should be set to --show '\
'or --hide'))
values = {
'type': type,
'size_gb': int(size_gb),
'rpm': rpm,
'capabilities': capabilities,
'visible': visible,
'name': name
}
result = self.controller.create_drive_type(context.get_admin_context(),
**values)
self._list(result['driveTypeSet'])
def delete(self, name):
"""Delete drive type
args: name"""
self.controller.delete_drive_type(context.get_admin_context(), name)
def rename(self, name, new_name=None):
"""Rename drive type
args: name [new_name]"""
self.controller.rename_drive_type(context.get_admin_context(),
name, new_name)
def list(self, visible=None, name=None):
"""Describe all available VSA drive types (or particular one)
args: [--all] [drive_name]"""
visible = False if visible == "--all" else True
if name is not None:
name = [name]
result = self.controller.describe_drive_types(
context.get_admin_context(), name, visible)
self._list(result['driveTypeSet'])
def update(self, name, type=None, size_gb=None, rpm=None,
capabilities='', visible=None):
"""Update drive type.
args: name [type] [size_gb] [rpm] [capabilities] [--show|--hide]
"""
if visible is None or visible == "--show":
visible = True
elif visible == "--hide":
visible = False
else:
raise ValueError(_('Visible parameter should be set to --show '\
'or --hide'))
values = {
'type': type,
'size_gb': size_gb,
'rpm': rpm,
'capabilities': capabilities,
'visible': visible
}
self.controller.update_drive_type(context.get_admin_context(),
name, **values)
class VolumeCommands(object):
"""Methods for dealing with a cloud in an odd state"""
@@ -1214,6 +1456,7 @@ CATEGORIES = [
('agent', AgentBuildCommands),
('config', ConfigCommands),
('db', DbCommands),
('drive', VsaDriveTypeCommands),
('fixed', FixedIpCommands),
('flavor', InstanceTypeCommands),
('floating', FloatingIpCommands),
@@ -1229,7 +1472,8 @@ CATEGORIES = [
('version', VersionCommands),
('vm', VmCommands),
('volume', VolumeCommands),
('vpn', VpnCommands)]
('vpn', VpnCommands),
('vsa', VsaCommands)]
def lazy_match(name, key_value_tuples):
@@ -1295,6 +1539,10 @@ def main():
action, fn = matches[0]
# call the action with the remaining arguments
try:
for arg in sys.argv:
if arg == '-h' or arg == '--help':
print "%s %s: %s" % (category, action, fn.__doc__)
sys.exit(0)
fn(*argv)
sys.exit(0)
except TypeError:

0
bin/nova-vncproxy Executable file → Normal file
View File

View File

@@ -311,6 +311,10 @@ class VolumeNotFoundForInstance(VolumeNotFound):
message = _("Volume not found for instance %(instance_id)s.")
class VolumeNotFoundForVsa(VolumeNotFound):
message = _("Volume not found for vsa %(vsa_id)s.")
class SnapshotNotFound(NotFound):
message = _("Snapshot %(snapshot_id)s could not be found.")
@@ -682,3 +686,19 @@ class PasteConfigNotFound(NotFound):
class PasteAppNotFound(NotFound):
message = _("Could not load paste app '%(name)s' from %(path)s")
class VirtualStorageArrayNotFound(NotFound):
message = _("Virtual Storage Array %(id)d could not be found.")
class VirtualStorageArrayNotFoundByName(NotFound):
message = _("Virtual Storage Array %(name)s could not be found.")
class VirtualDiskTypeNotFound(NotFound):
message = _("Drive Type %(id)d could not be found.")
class VirtualDiskTypeNotFoundByName(NotFound):
message = _("Drive Type %(name)s could not be found.")

View File

@@ -292,6 +292,7 @@ DEFINE_string('ajax_console_proxy_url',
in the form "http://127.0.0.1:8000"')
DEFINE_string('ajax_console_proxy_port',
8000, 'port that ajax_console_proxy binds')
DEFINE_string('vsa_topic', 'vsa', 'the topic that nova-vsa service listens on')
DEFINE_bool('verbose', False, 'show debug output')
DEFINE_boolean('fake_rabbit', False, 'use a fake rabbit')
DEFINE_bool('fake_network', False,
@@ -364,6 +365,32 @@ DEFINE_string('volume_manager', 'nova.volume.manager.VolumeManager',
'Manager for volume')
DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
'Manager for scheduler')
DEFINE_string('vsa_manager', 'nova.vsa.manager.VsaManager',
'Manager for vsa')
DEFINE_string('vc_image_name', 'vc_image',
'the VC image ID (for a VC image that exists in DB Glance)')
#---------------------------------------------------------------------
# VSA constants and enums
DEFINE_string('default_vsa_instance_type', 'm1.small',
'default instance type for VSA instances')
DEFINE_integer('max_vcs_in_vsa', 32,
'maxinum VCs in a VSA')
DEFINE_integer('vsa_part_size_gb', 100,
'default partition size for shared capacity')
DEFINE_string('vsa_status_creating', 'creating',
'VSA creating (not ready yet)')
DEFINE_string('vsa_status_launching', 'launching',
'Launching VCs (all BE volumes were created)')
DEFINE_string('vsa_status_created', 'created',
'VSA fully created and ready for use')
DEFINE_string('vsa_status_partial', 'partial',
'Some BE storage allocations failed')
DEFINE_string('vsa_status_failed', 'failed',
'Some BE storage allocations failed')
DEFINE_string('vsa_status_deleting', 'deleting',
'VSA started the deletion procedure')
# The service to use for image search and retrieval
DEFINE_string('image_service', 'nova.image.glance.GlanceImageService',

View File

@@ -24,13 +24,13 @@ from nova import flags
FLAGS = flags.FLAGS
flags.DEFINE_integer('quota_instances', 10,
flags.DEFINE_integer('quota_instances', 100, # 10
'number of instances allowed per project')
flags.DEFINE_integer('quota_cores', 20,
'number of instance cores allowed per project')
flags.DEFINE_integer('quota_ram', 50 * 1024,
'megabytes of instance ram allowed per project')
flags.DEFINE_integer('quota_volumes', 10,
flags.DEFINE_integer('quota_volumes', 100, # 10
'number of volumes allowed per project')
flags.DEFINE_integer('quota_gigabytes', 1000,
'number of volume gigabytes allowed per project')

495
nova/scheduler/vsa.py Normal file
View File

@@ -0,0 +1,495 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
VSA Simple Scheduler
"""
from nova import context
from nova import rpc
from nova import db
from nova import flags
from nova import utils
from nova.volume import api as volume_api
from nova.scheduler import driver
from nova.scheduler import simple
from nova import log as logging
LOG = logging.getLogger('nova.scheduler.vsa')
FLAGS = flags.FLAGS
flags.DEFINE_integer('gb_to_bytes_shift', 30,
'Conversion shift between GB and bytes')
flags.DEFINE_integer('drive_type_approx_capacity_percent', 10,
'The percentage range for capacity comparison')
flags.DEFINE_integer('vsa_unique_hosts_per_alloc', 10,
'The number of unique hosts per storage allocation')
flags.DEFINE_boolean('vsa_select_unique_drives', True,
'Allow selection of same host for multiple drives')
class VsaScheduler(simple.SimpleScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
def __init__(self, *args, **kwargs):
super(VsaScheduler, self).__init__(*args, **kwargs)
self._notify_all_volume_hosts("startup")
def _notify_all_volume_hosts(self, event):
rpc.cast(context.get_admin_context(),
FLAGS.volume_topic,
{"method": "notification",
"args": {"event": event}})
def _compare_names(self, str1, str2):
result = str1.lower() == str2.lower()
# LOG.debug(_("Comparing %(str1)s and %(str2)s. "\
# "Result %(result)s"), locals())
return result
def _compare_sizes_exact_match(self, cap_capacity, size_gb):
cap_capacity = int(cap_capacity) >> FLAGS.gb_to_bytes_shift
size_gb = int(size_gb)
result = cap_capacity == size_gb
# LOG.debug(_("Comparing %(cap_capacity)d and %(size_gb)d. "\
# "Result %(result)s"), locals())
return result
def _compare_sizes_approxim(self, cap_capacity, size_gb):
cap_capacity = int(cap_capacity) >> FLAGS.gb_to_bytes_shift
size_gb = int(size_gb)
size_perc = size_gb * FLAGS.drive_type_approx_capacity_percent / 100
result = cap_capacity >= size_gb - size_perc and \
cap_capacity <= size_gb + size_perc
# LOG.debug(_("Comparing %(cap_capacity)d and %(size_gb)d. "\
# "Result %(result)s"), locals())
return result
def _qosgrp_match(self, drive_type, qos_values):
# Add more entries for additional comparisons
compare_list = [{'cap1': 'DriveType',
'cap2': 'type',
'cmp_func': self._compare_names},
{'cap1': 'DriveCapacity',
'cap2': 'size_gb',
'cmp_func': self._compare_sizes_approxim}]
for cap in compare_list:
if cap['cap1'] in qos_values.keys() and \
cap['cap2'] in drive_type.keys() and \
cap['cmp_func'] is not None and \
cap['cmp_func'](qos_values[cap['cap1']],
drive_type[cap['cap2']]):
# LOG.debug(_("One of required capabilities found: %s:%s"),
# cap['cap1'], drive_type[cap['cap2']])
pass
else:
return False
return True
def _filter_hosts(self, topic, request_spec, host_list=None):
drive_type = request_spec['drive_type']
LOG.debug(_("Filter hosts for drive type %(drive_type)s") % locals())
if host_list is None:
host_list = self.zone_manager.service_states.iteritems()
filtered_hosts = [] # returns list of (hostname, capability_dict)
for host, host_dict in host_list:
for service_name, service_dict in host_dict.iteritems():
if service_name != topic:
continue
gos_info = service_dict.get('drive_qos_info', {})
for qosgrp, qos_values in gos_info.iteritems():
if self._qosgrp_match(drive_type, qos_values):
if qos_values['AvailableCapacity'] > 0:
LOG.debug(_("Adding host %s to the list"), host)
filtered_hosts.append((host, gos_info))
else:
LOG.debug(_("Host %s has no free capacity. Skip"),
host)
break
LOG.debug(_("Found hosts %(filtered_hosts)s") % locals())
return filtered_hosts
def _allowed_to_use_host(self, host, selected_hosts, unique):
if unique == False or \
host not in [item[0] for item in selected_hosts]:
return True
else:
return False
def _add_hostcap_to_list(self, selected_hosts, host, cap):
if host not in [item[0] for item in selected_hosts]:
selected_hosts.append((host, cap))
def _alg_least_used_host(self, request_spec, all_hosts, selected_hosts):
size = request_spec['size']
drive_type = request_spec['drive_type']
best_host = None
best_qoscap = None
best_cap = None
min_used = 0
LOG.debug(_("Selecting best host for %(size)sGB volume of type "\
"%(drive_type)s from %(all_hosts)s"), locals())
for (host, capabilities) in all_hosts:
has_enough_capacity = False
used_capacity = 0
for qosgrp, qos_values in capabilities.iteritems():
used_capacity = used_capacity + qos_values['TotalCapacity'] \
- qos_values['AvailableCapacity']
if self._qosgrp_match(drive_type, qos_values):
# we found required qosgroup
if size == 0: # full drive match
if qos_values['FullDrive']['NumFreeDrives'] > 0:
has_enough_capacity = True
matched_qos = qos_values
else:
break
else:
if qos_values['AvailableCapacity'] >= size and \
(qos_values['PartitionDrive'][
'NumFreePartitions'] > 0 or \
qos_values['FullDrive']['NumFreeDrives'] > 0):
has_enough_capacity = True
matched_qos = qos_values
else:
break
if has_enough_capacity and \
self._allowed_to_use_host(host,
selected_hosts,
unique) and \
(best_host is None or used_capacity < min_used):
min_used = used_capacity
best_host = host
best_qoscap = matched_qos
best_cap = capabilities
if best_host:
self._add_hostcap_to_list(selected_hosts, host, best_cap)
LOG.debug(_("Best host found: %(best_host)s. "\
"(used capacity %(min_used)s)"), locals())
return (best_host, best_qoscap)
def _alg_most_avail_capacity(self, request_spec, all_hosts,
selected_hosts, unique):
size = request_spec['size']
drive_type = request_spec['drive_type']
best_host = None
best_qoscap = None
best_cap = None
max_avail = 0
LOG.debug(_("Selecting best host for %(size)sGB volume of type "\
"%(drive_type)s from %(all_hosts)s"), locals())
for (host, capabilities) in all_hosts:
for qosgrp, qos_values in capabilities.iteritems():
if self._qosgrp_match(drive_type, qos_values):
# we found required qosgroup
if size == 0: # full drive match
available = qos_values['FullDrive']['NumFreeDrives']
else:
available = qos_values['AvailableCapacity']
if available > max_avail and \
self._allowed_to_use_host(host,
selected_hosts,
unique):
max_avail = available
best_host = host
best_qoscap = qos_values
best_cap = capabilities
break # go to the next host
if best_host:
self._add_hostcap_to_list(selected_hosts, host, best_cap)
LOG.debug(_("Best host found: %(best_host)s. "\
"(available capacity %(max_avail)s)"), locals())
return (best_host, best_qoscap)
def _select_hosts(self, request_spec, all_hosts, selected_hosts=None):
#self._alg_most_avail_capacity(request_spec, all_hosts, selected_hosts)
if selected_hosts is None:
selected_hosts = []
host = None
if len(selected_hosts) >= FLAGS.vsa_unique_hosts_per_alloc:
# try to select from already selected hosts only
LOG.debug(_("Maximum number of hosts selected (%d)"),
len(selected_hosts))
unique = False
(host, qos_cap) = self._alg_most_avail_capacity(request_spec,
selected_hosts,
selected_hosts,
unique)
LOG.debug(_("Selected excessive host %(host)s"), locals())
else:
unique = FLAGS.vsa_select_unique_drives
if host is None:
# if we've not tried yet (# of sel hosts < max) - unique=True
# or failed to select from selected_hosts - unique=False
# select from all hosts
(host, qos_cap) = self._alg_most_avail_capacity(request_spec,
all_hosts,
selected_hosts,
unique)
LOG.debug(_("Selected host %(host)s"), locals())
if host is None:
raise driver.WillNotSchedule(_("No available hosts"))
return (host, qos_cap)
def _provision_volume(self, context, vol, vsa_id, availability_zone):
if availability_zone is None:
availability_zone = FLAGS.storage_availability_zone
now = utils.utcnow()
options = {
'size': vol['size'],
'user_id': context.user_id,
'project_id': context.project_id,
'snapshot_id': None,
'availability_zone': availability_zone,
'status': "creating",
'attach_status': "detached",
'display_name': vol['name'],
'display_description': vol['description'],
'to_vsa_id': vsa_id,
'drive_type_id': vol['drive_ref']['id'],
'host': vol['host'],
'scheduled_at': now
}
size = vol['size']
host = vol['host']
name = vol['name']
LOG.debug(_("Provision volume %(name)s of size %(size)s GB on "\
"host %(host)s"), locals())
volume_ref = db.volume_create(context, options)
rpc.cast(context,
db.queue_get_for(context, "volume", vol['host']),
{"method": "create_volume",
"args": {"volume_id": volume_ref['id'],
"snapshot_id": None}})
def _check_host_enforcement(self, availability_zone):
if (availability_zone
and ':' in availability_zone
and context.is_admin):
zone, _x, host = availability_zone.partition(':')
service = db.service_get_by_args(context.elevated(), host,
'nova-volume')
if not self.service_is_up(service):
raise driver.WillNotSchedule(_("Host %s not available") % host)
return host
else:
return None
def _assign_hosts_to_volumes(self, context, volume_params, forced_host):
prev_drive_type_id = None
selected_hosts = []
LOG.debug(_("volume_params %(volume_params)s") % locals())
for vol in volume_params:
LOG.debug(_("Assigning host to volume %s") % vol['name'])
if forced_host:
vol['host'] = forced_host
vol['capabilities'] = None
continue
drive_type = vol['drive_ref']
request_spec = {'size': vol['size'],
'drive_type': dict(drive_type)}
if prev_drive_type_id != drive_type['id']:
# generate list of hosts for this drive type
all_hosts = self._filter_hosts("volume", request_spec)
prev_drive_type_id = drive_type['id']
(host, qos_cap) = self._select_hosts(request_spec,
all_hosts, selected_hosts)
vol['host'] = host
vol['capabilities'] = qos_cap
self._consume_resource(qos_cap, vol['size'], -1)
LOG.debug(_("Assigned host %(host)s, capabilities %(qos_cap)s"),
locals())
LOG.debug(_("END: volume_params %(volume_params)s") % locals())
def schedule_create_volumes(self, context, request_spec,
availability_zone, *_args, **_kwargs):
"""Picks hosts for hosting multiple volumes."""
num_volumes = request_spec.get('num_volumes')
LOG.debug(_("Attempting to spawn %(num_volumes)d volume(s)") %
locals())
LOG.debug(_("Service states BEFORE %s"),
self.zone_manager.service_states)
vsa_id = request_spec.get('vsa_id')
volume_params = request_spec.get('volumes')
host = self._check_host_enforcement(availability_zone)
try:
self._assign_hosts_to_volumes(context, volume_params, host)
for vol in volume_params:
self._provision_volume(context, vol, vsa_id, availability_zone)
LOG.debug(_("Service states AFTER %s"),
self.zone_manager.service_states)
except:
if vsa_id:
db.vsa_update(context, vsa_id,
dict(status=FLAGS.vsa_status_failed))
for vol in volume_params:
if 'capabilities' in vol:
self._consume_resource(vol['capabilities'],
vol['size'], 1)
LOG.debug(_("Service states AFTER %s"),
self.zone_manager.service_states)
raise
return None
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
"""Picks the best host based on requested drive type capability."""
volume_ref = db.volume_get(context, volume_id)
host = self._check_host_enforcement(volume_ref['availability_zone'])
if host:
now = utils.utcnow()
db.volume_update(context, volume_id, {'host': host,
'scheduled_at': now})
return host
drive_type = volume_ref['drive_type']
if drive_type is None:
LOG.debug(_("Non-VSA volume %d"), volume_ref['id'])
return super(VsaScheduler, self).schedule_create_volume(context,
volume_id, *_args, **_kwargs)
drive_type = dict(drive_type)
# otherwise - drive type is loaded
LOG.debug(_("Spawning volume %d with drive type %s"),
volume_ref['id'], drive_type)
LOG.debug(_("Service states BEFORE %s"),
self.zone_manager.service_states)
request_spec = {'size': volume_ref['size'],
'drive_type': drive_type}
hosts = self._filter_hosts("volume", request_spec)
try:
(host, qos_cap) = self._select_hosts(request_spec, all_hosts=hosts)
except:
if volume_ref['to_vsa_id']:
db.vsa_update(context, volume_ref['to_vsa_id'],
dict(status=FLAGS.vsa_status_failed))
raise
#return super(VsaScheduler, self).schedule_create_volume(context,
# volume_id, *_args, **_kwargs)
if host:
now = utils.utcnow()
db.volume_update(context, volume_id, {'host': host,
'scheduled_at': now})
self._consume_resource(qos_cap, volume_ref['size'], -1)
LOG.debug(_("Service states AFTER %s"),
self.zone_manager.service_states)
return host
def _consume_full_drive(self, qos_values, direction):
qos_values['FullDrive']['NumFreeDrives'] += direction
qos_values['FullDrive']['NumOccupiedDrives'] -= direction
def _consume_partition(self, qos_values, size, direction):
if qos_values['PartitionDrive']['PartitionSize'] != 0:
partition_size = qos_values['PartitionDrive']['PartitionSize']
else:
partition_size = size
part_per_drive = qos_values['DriveCapacity'] / partition_size
if direction == -1 and \
qos_values['PartitionDrive']['NumFreePartitions'] == 0:
self._consume_full_drive(qos_values, direction)
qos_values['PartitionDrive']['NumFreePartitions'] += \
part_per_drive
qos_values['PartitionDrive']['NumFreePartitions'] += direction
qos_values['PartitionDrive']['NumOccupiedPartitions'] -= direction
if direction == 1 and \
qos_values['PartitionDrive']['NumFreePartitions'] >= \
part_per_drive:
self._consume_full_drive(qos_values, direction)
qos_values['PartitionDrive']['NumFreePartitions'] -= \
part_per_drive
def _consume_resource(self, qos_values, size, direction):
if qos_values is None:
LOG.debug(_("No capability selected for volume of size %(size)s"),
locals())
return
if size == 0: # full drive match
qos_values['AvailableCapacity'] += direction * \
qos_values['DriveCapacity']
self._consume_full_drive(qos_values, direction)
else:
qos_values['AvailableCapacity'] += direction * \
(size << FLAGS.gb_to_bytes_shift)
self._consume_partition(qos_values,
size << FLAGS.gb_to_bytes_shift,
direction)
return

View File

@@ -242,7 +242,7 @@ class LibvirtConnTestCase(test.TestCase):
return """
<domain type='kvm'>
<devices>
<disk type='file'>
<drive type='file'>
<source file='filename'/>
</disk>
</devices>

18
nova/vsa/__init__.py Normal file
View File

@@ -0,0 +1,18 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova.vsa.api import API

407
nova/vsa/api.py Normal file
View File

@@ -0,0 +1,407 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to Virtual Storage Arrays (VSAs).
"""
#import datetime
import sys
import base64
from xml.etree import ElementTree
from xml.etree.ElementTree import Element, SubElement
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
from nova import quota
from nova import rpc
from nova.db import base
from nova import compute
from nova import volume
from nova.compute import instance_types
from nova.vsa import drive_types
FLAGS = flags.FLAGS
flags.DEFINE_boolean('vsa_multi_vol_creation', True,
'Ask scheduler to create multiple volumes in one call')
LOG = logging.getLogger('nova.vsa')
class API(base.Base):
"""API for interacting with the VSA manager."""
def __init__(self, compute_api=None, volume_api=None, **kwargs):
self.compute_api = compute_api or compute.API()
self.volume_api = volume_api or volume.API()
super(API, self).__init__(**kwargs)
def _get_default_vsa_instance_type(self):
return instance_types.get_instance_type_by_name(
FLAGS.default_vsa_instance_type)
def _check_storage_parameters(self, context, vsa_name, storage, shared):
"""
Translates storage array of disks to the list of volumes
:param storage: List of dictionaries with following keys:
disk_name, num_disks, size
:param shared: Specifies if storage is dedicated or shared.
For shared storage disks split into partitions
"""
volume_params = []
for node in storage:
name = node.get('drive_name', None)
num_disks = node.get('num_drives', 1)
if name is None:
raise exception.ApiError(_("No drive_name param found in %s"),
node)
# find DB record for this disk
try:
drive_ref = drive_types.drive_type_get_by_name(context, name)
except exception.NotFound:
raise exception.ApiError(_("Invalid drive type name %s"),
name)
# if size field present - override disk size specified in DB
size = node.get('size', drive_ref['size_gb'])
if shared:
part_size = FLAGS.vsa_part_size_gb
total_capacity = num_disks * size
num_volumes = total_capacity / part_size
size = part_size
else:
num_volumes = num_disks
size = 0 # special handling for full drives
for i in range(num_volumes):
# VP-TODO: potentialy may conflict with previous volumes
volume_name = vsa_name + ("_%s_vol-%d" % (name, i))
volume = {
'size': size,
'snapshot_id': None,
'name': volume_name,
'description': 'BE volume for ' + volume_name,
'drive_ref': drive_ref
}
volume_params.append(volume)
return volume_params
def create(self, context, display_name='', display_description='',
vc_count=1, instance_type=None, image_name=None,
availability_zone=None, storage=[], shared=None):
"""
Provision VSA instance with corresponding compute instances
and associated volumes
:param storage: List of dictionaries with following keys:
disk_name, num_disks, size
:param shared: Specifies if storage is dedicated or shared.
For shared storage disks split into partitions
"""
if vc_count > FLAGS.max_vcs_in_vsa:
LOG.warning(_("Requested number of VCs (%d) is too high."\
" Setting to default"), vc_count)
vc_count = FLAGS.max_vcs_in_vsa
if instance_type is None:
instance_type = self._get_default_vsa_instance_type()
if availability_zone is None:
availability_zone = FLAGS.storage_availability_zone
if storage is None:
storage = []
if shared is None or shared == 'False' or shared == False:
shared = False
else:
shared = True
# check if image is ready before starting any work
if image_name is None or image_name == '':
image_name = FLAGS.vc_image_name
try:
image_service = self.compute_api.image_service
vc_image = image_service.show_by_name(context, image_name)
vc_image_href = vc_image['id']
except exception.ImageNotFound:
raise exception.ApiError(_("Failed to find configured image %s"),
image_name)
options = {
'display_name': display_name,
'display_description': display_description,
'project_id': context.project_id,
'availability_zone': availability_zone,
'instance_type_id': instance_type['id'],
'image_ref': vc_image_href,
'vc_count': vc_count,
'status': FLAGS.vsa_status_creating,
}
LOG.info(_("Creating VSA: %s") % options)
# create DB entry for VSA instance
try:
vsa_ref = self.db.vsa_create(context, options)
except exception.Error:
raise exception.ApiError(_(sys.exc_info()[1]))
vsa_id = vsa_ref['id']
vsa_name = vsa_ref['name']
# check storage parameters
try:
volume_params = self._check_storage_parameters(context, vsa_name,
storage, shared)
except exception.ApiError:
self.update_vsa_status(context, vsa_id,
status=FLAGS.vsa_status_failed)
raise
# after creating DB entry, re-check and set some defaults
updates = {}
if (not hasattr(vsa_ref, 'display_name') or
vsa_ref.display_name is None or
vsa_ref.display_name == ''):
updates['display_name'] = display_name = vsa_name
updates['vol_count'] = len(volume_params)
vsa_ref = self.update(context, vsa_id, **updates)
# create volumes
if FLAGS.vsa_multi_vol_creation:
if len(volume_params) > 0:
#filter_class = 'nova.scheduler.vsa.InstanceTypeFilter'
request_spec = {
'num_volumes': len(volume_params),
'vsa_id': vsa_id,
'volumes': volume_params,
#'filter': filter_class,
}
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "create_volumes",
"args": {"topic": FLAGS.volume_topic,
"request_spec": request_spec,
"availability_zone": availability_zone}})
else:
# create BE volumes one-by-one
for vol in volume_params:
try:
vol_name = vol['name']
vol_size = vol['size']
LOG.debug(_("VSA ID %(vsa_id)d %(vsa_name)s: Create "\
"volume %(vol_name)s, %(vol_size)d GB"),
locals())
vol_ref = self.volume_api.create(context,
vol_size,
vol['snapshot_id'],
vol_name,
vol['description'],
to_vsa_id=vsa_id,
drive_type_id=vol['drive_ref'].get('id'),
availability_zone=availability_zone)
except:
self.update_vsa_status(context, vsa_id,
status=FLAGS.vsa_status_partial)
raise
if len(volume_params) == 0:
# No BE volumes - ask VSA manager to start VCs
rpc.cast(context,
FLAGS.vsa_topic,
{"method": "create_vsa",
"args": {"vsa_id": vsa_id}})
return vsa_ref
def update_vsa_status(self, context, vsa_id, status):
updates = dict(status=status)
LOG.info(_("VSA ID %(vsa_id)d: Update VSA status to %(status)s"),
locals())
return self.update(context, vsa_id, **updates)
def update(self, context, vsa_id, **kwargs):
"""Updates the VSA instance in the datastore.
:param context: The security context
:param vsa_id: ID of the VSA instance to update
:param kwargs: All additional keyword args are treated
as data fields of the instance to be
updated
:returns: None
"""
LOG.info(_("VSA ID %(vsa_id)d: Update VSA call"), locals())
vc_count = kwargs.get('vc_count', None)
if vc_count is not None:
# VP-TODO: This request may want to update number of VCs
# Get number of current VCs and add/delete VCs appropriately
vsa = self.get(context, vsa_id)
vc_count = int(vc_count)
if vsa['vc_count'] != vc_count:
self.update_num_vcs(context, vsa, vc_count)
return self.db.vsa_update(context, vsa_id, kwargs)
def update_num_vcs(self, context, vsa, vc_count):
if vc_count > FLAGS.max_vcs_in_vsa:
LOG.warning(_("Requested number of VCs (%d) is too high."\
" Setting to default"), vc_count)
vc_count = FLAGS.max_vcs_in_vsa
old_vc_count = vsa['vc_count']
if vc_count > old_vc_count:
LOG.debug(_("Adding %d VCs to VSA %s."),
(vc_count - old_vc_count, vsa['name']))
# VP-TODO: actual code for adding new VCs
elif vc_count < old_vc_count:
LOG.debug(_("Deleting %d VCs from VSA %s."),
(old_vc_count - vc_count, vsa['name']))
# VP-TODO: actual code for deleting extra VCs
def _force_volume_delete(self, ctxt, volume):
"""Delete a volume, bypassing the check that it must be available."""
host = volume['host']
if not host:
# Volume not yet assigned to host
# Deleting volume from database and skipping rpc.
self.db.volume_destroy(ctxt, volume['id'])
return
rpc.cast(ctxt,
self.db.queue_get_for(ctxt, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume['id']}})
def delete_be_volumes(self, context, vsa_id, force_delete=True):
be_volumes = self.db.volume_get_all_assigned_to_vsa(context, vsa_id)
for volume in be_volumes:
try:
vol_name = volume['name']
LOG.info(_("VSA ID %(vsa_id)s: Deleting BE volume "\
"%(vol_name)s"), locals())
self.volume_api.delete(context, volume['id'])
except exception.ApiError:
LOG.info(_("Unable to delete volume %s"), volume['name'])
if force_delete:
LOG.info(_("VSA ID %(vsa_id)s: Forced delete. BE volume "\
"%(vol_name)s"), locals())
self._force_volume_delete(context, volume)
def delete(self, context, vsa_id):
"""Terminate a VSA instance."""
LOG.info(_("Going to try to terminate VSA ID %s"), vsa_id)
# allow deletion of volumes in "abnormal" state
# Delete all FE volumes
fe_volumes = self.db.volume_get_all_assigned_from_vsa(context, vsa_id)
for volume in fe_volumes:
try:
vol_name = volume['name']
LOG.info(_("VSA ID %(vsa_id)s: Deleting FE volume "\
"%(vol_name)s"), locals())
self.volume_api.delete(context, volume['id'])
except exception.ApiError:
LOG.info(_("Unable to delete volume %s"), volume['name'])
# Delete all BE volumes
self.delete_be_volumes(context, vsa_id, force_delete=True)
# Delete all VC instances
instances = self.db.instance_get_all_by_vsa(context, vsa_id)
for instance in instances:
name = instance['name']
LOG.debug(_("VSA ID %(vsa_id)s: Delete instance %(name)s"),
locals())
self.compute_api.delete(context, instance['id'])
# Delete VSA instance
self.db.vsa_destroy(context, vsa_id)
def get(self, context, vsa_id):
rv = self.db.vsa_get(context, vsa_id)
return rv
def get_all(self, context):
if context.is_admin:
return self.db.vsa_get_all(context)
return self.db.vsa_get_all_by_project(context, context.project_id)
def generate_user_data(self, context, vsa, volumes):
e_vsa = Element("vsa")
e_vsa_detail = SubElement(e_vsa, "id")
e_vsa_detail.text = str(vsa['id'])
e_vsa_detail = SubElement(e_vsa, "name")
e_vsa_detail.text = vsa['display_name']
e_vsa_detail = SubElement(e_vsa, "description")
e_vsa_detail.text = vsa['display_description']
e_vsa_detail = SubElement(e_vsa, "vc_count")
e_vsa_detail.text = str(vsa['vc_count'])
e_volumes = SubElement(e_vsa, "volumes")
for volume in volumes:
loc = volume['provider_location']
if loc is None:
ip = ''
iscsi_iqn = ''
iscsi_portal = ''
else:
(iscsi_target, _sep, iscsi_iqn) = loc.partition(" ")
(ip, iscsi_portal) = iscsi_target.split(":", 1)
e_vol = SubElement(e_volumes, "volume")
e_vol_detail = SubElement(e_vol, "id")
e_vol_detail.text = str(volume['id'])
e_vol_detail = SubElement(e_vol, "name")
e_vol_detail.text = volume['name']
e_vol_detail = SubElement(e_vol, "display_name")
e_vol_detail.text = volume['display_name']
e_vol_detail = SubElement(e_vol, "size_gb")
e_vol_detail.text = str(volume['size'])
e_vol_detail = SubElement(e_vol, "status")
e_vol_detail.text = volume['status']
e_vol_detail = SubElement(e_vol, "ip")
e_vol_detail.text = ip
e_vol_detail = SubElement(e_vol, "iscsi_iqn")
e_vol_detail.text = iscsi_iqn
e_vol_detail = SubElement(e_vol, "iscsi_portal")
e_vol_detail.text = iscsi_portal
e_vol_detail = SubElement(e_vol, "lun")
e_vol_detail.text = '0'
e_vol_detail = SubElement(e_vol, "sn_host")
e_vol_detail.text = volume['host']
_xml = ElementTree.tostring(e_vsa)
return base64.b64encode(_xml)

25
nova/vsa/connection.py Normal file
View File

@@ -0,0 +1,25 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Abstraction of the underlying connection to VC."""
from nova.vsa import fake
def get_connection():
# Return an object that is able to talk to VCs
return fake.FakeVcConnection()

22
nova/vsa/fake.py Normal file
View File

@@ -0,0 +1,22 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
class FakeVcConnection:
def init_host(self, host):
pass

172
nova/vsa/manager.py Normal file
View File

@@ -0,0 +1,172 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 Zadara Storage Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all processes relating to Virtual Storage Arrays (VSA).
**Related Flags**
"""
from nova import log as logging
from nova import manager
from nova import flags
from nova import utils
from nova import exception
from nova import compute
from nova import volume
from nova import vsa
from nova.compute import instance_types
FLAGS = flags.FLAGS
flags.DEFINE_string('vsa_driver', 'nova.vsa.connection.get_connection',
'Driver to use for controlling VSAs')
LOG = logging.getLogger('nova.vsa.manager')
class VsaManager(manager.SchedulerDependentManager):
"""Manages Virtual Storage Arrays (VSAs)."""
def __init__(self, vsa_driver=None, *args, **kwargs):
if not vsa_driver:
vsa_driver = FLAGS.vsa_driver
self.driver = utils.import_object(vsa_driver)
self.compute_manager = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute.API()
self.volume_api = volume.API()
self.vsa_api = vsa.API()
super(VsaManager, self).__init__(*args, **kwargs)
def init_host(self):
self.driver.init_host(host=self.host)
super(VsaManager, self).init_host()
@exception.wrap_exception()
def create_vsa(self, context, vsa_id):
"""Called by API if there were no BE volumes assigned"""
LOG.debug(_("Create call received for VSA %s"), vsa_id)
vsa_id = int(vsa_id) # just in case
try:
vsa = self.vsa_api.get(context, vsa_id)
except Exception as ex:
msg = _("Failed to find VSA %(vsa_id)d") % locals()
LOG.exception(msg)
return
return self._start_vcs(context, vsa)
@exception.wrap_exception()
def vsa_volume_created(self, context, vol_id, vsa_id, status):
"""Callback for volume creations"""
LOG.debug(_("VSA ID %(vsa_id)s: Volume %(vol_id)s created. "\
"Status %(status)s"), locals())
vsa_id = int(vsa_id) # just in case
# Get all volumes for this VSA
# check if any of them still in creating phase
volumes = self.db.volume_get_all_assigned_to_vsa(context, vsa_id)
for volume in volumes:
if volume['status'] == 'creating':
vol_name = volume['name']
vol_disp_name = volume['display_name']
LOG.debug(_("Volume %(vol_name)s (%(vol_disp_name)s) still "\
"in creating phase - wait"), locals())
return
try:
vsa = self.vsa_api.get(context, vsa_id)
except Exception as ex:
msg = _("Failed to find VSA %(vsa_id)d") % locals()
LOG.exception(msg)
return
if len(volumes) != vsa['vol_count']:
LOG.debug(_("VSA ID %d: Not all volumes are created (%d of %d)"),
vsa_id, len(volumes), vsa['vol_count'])
return
# all volumes created (successfully or not)
return self._start_vcs(context, vsa, volumes)
def _start_vcs(self, context, vsa, volumes=[]):
"""Start VCs for VSA """
vsa_id = vsa['id']
if vsa['status'] == FLAGS.vsa_status_creating:
self.vsa_api.update_vsa_status(context, vsa_id,
FLAGS.vsa_status_launching)
else:
return
# in _separate_ loop go over all volumes and mark as "attached"
has_failed_volumes = False
for volume in volumes:
vol_name = volume['name']
vol_disp_name = volume['display_name']
status = volume['status']
LOG.info(_("VSA ID %(vsa_id)d: Volume %(vol_name)s "\
"(%(vol_disp_name)s) is in %(status)s state"),
locals())
if status == 'available':
try:
# self.volume_api.update(context, volume['id'],
# dict(attach_status="attached"))
pass
except Exception as ex:
msg = _("Failed to update attach status for volume "
"%(vol_name)s. %(ex)s") % locals()
LOG.exception(msg)
else:
has_failed_volumes = True
if has_failed_volumes:
LOG.info(_("VSA ID %(vsa_id)d: Delete all BE volumes"), locals())
self.vsa_api.delete_be_volumes(context, vsa_id, force_delete=True)
self.vsa_api.update_vsa_status(context, vsa_id,
FLAGS.vsa_status_failed)
return
# create user-data record for VC
storage_data = self.vsa_api.generate_user_data(context, vsa, volumes)
instance_type = instance_types.get_instance_type(
vsa['instance_type_id'])
# now start the VC instance
vc_count = vsa['vc_count']
LOG.info(_("VSA ID %(vsa_id)d: Start %(vc_count)d instances"),
locals())
vc_instances = self.compute_api.create(context,
instance_type, # vsa['vsa_instance_type'],
vsa['image_ref'],
min_count=1,
max_count=vc_count,
display_name='vc-' + vsa['display_name'],
display_description='VC for VSA ' + vsa['display_name'],
availability_zone=vsa['availability_zone'],
user_data=storage_data,
vsa_id=vsa_id)
self.vsa_api.update_vsa_status(context, vsa_id,
FLAGS.vsa_status_created)