Remove AoE, Clean up volume code

* Removes Ata Over Ethernet
 * Adds drivers to libvirt for volumes
 * Adds initialize_connection and terminate_connection to volume api
 * Passes connection info back through volume api

Change-Id: I1b1626f40bebe8466ab410fb174683293c7c474f
This commit is contained in:
Vishvananda Ishaya
2011-09-23 09:22:32 -07:00
parent e164f3f703
commit eb03d47fec
37 changed files with 1016 additions and 912 deletions

View File

@@ -12,6 +12,7 @@ Anthony Young <sleepsonthefloor@gmail.com>
Antony Messerli <ant@openstack.org> Antony Messerli <ant@openstack.org>
Armando Migliaccio <Armando.Migliaccio@eu.citrix.com> Armando Migliaccio <Armando.Migliaccio@eu.citrix.com>
Arvind Somya <asomya@cisco.com> Arvind Somya <asomya@cisco.com>
Ben McGraw <ben@pistoncloud.com>
Bilal Akhtar <bilalakhtar@ubuntu.com> Bilal Akhtar <bilalakhtar@ubuntu.com>
Brad Hall <brad@nicira.com> Brad Hall <brad@nicira.com>
Brad McConnell <bmcconne@rackspace.com> Brad McConnell <bmcconne@rackspace.com>

View File

@@ -962,9 +962,8 @@ class VmCommands(object):
msg = _('Only KVM and QEmu are supported for now. Sorry!') msg = _('Only KVM and QEmu are supported for now. Sorry!')
raise exception.Error(msg) raise exception.Error(msg)
if (FLAGS.volume_driver != 'nova.volume.driver.AOEDriver' and \ if FLAGS.volume_driver != 'nova.volume.driver.ISCSIDriver':
FLAGS.volume_driver != 'nova.volume.driver.ISCSIDriver'): msg = _("Support only ISCSIDriver. Sorry!")
msg = _("Support only AOEDriver and ISCSIDriver. Sorry!")
raise exception.Error(msg) raise exception.Error(msg)
rpc.call(ctxt, rpc.call(ctxt,

0
bin/nova-spoolsentry Normal file → Executable file
View File

View File

@@ -73,7 +73,6 @@ External unix tools that are required:
* dnsmasq * dnsmasq
* vlan * vlan
* open-iscsi and iscsitarget (if you use iscsi volumes) * open-iscsi and iscsitarget (if you use iscsi volumes)
* aoetools and vblade-persist (if you use aoe-volumes)
Nova uses cutting-edge versions of many packages. There are ubuntu packages in Nova uses cutting-edge versions of many packages. There are ubuntu packages in
the nova-core trunk ppa. You can use add this ppa to your sources list on an the nova-core trunk ppa. You can use add this ppa to your sources list on an

View File

@@ -37,7 +37,6 @@ from nova.compute import instance_types
from nova.compute import power_state from nova.compute import power_state
from nova.compute import task_states from nova.compute import task_states
from nova.compute import vm_states from nova.compute import vm_states
from nova.compute.utils import terminate_volumes
from nova.scheduler import api as scheduler_api from nova.scheduler import api as scheduler_api
from nova.db import base from nova.db import base
@@ -790,7 +789,6 @@ class API(base.Base):
else: else:
LOG.warning(_("No host for instance %s, deleting immediately"), LOG.warning(_("No host for instance %s, deleting immediately"),
instance_id) instance_id)
terminate_volumes(self.db, context, instance_id)
self.db.instance_destroy(context, instance_id) self.db.instance_destroy(context, instance_id)
def _delete(self, context, instance): def _delete(self, context, instance):
@@ -804,7 +802,6 @@ class API(base.Base):
self._cast_compute_message('terminate_instance', context, self._cast_compute_message('terminate_instance', context,
instance['id'], host) instance['id'], host)
else: else:
terminate_volumes(self.db, context, instance['id'])
self.db.instance_destroy(context, instance['id']) self.db.instance_destroy(context, instance['id'])
@scheduler_api.reroute_compute("delete") @scheduler_api.reroute_compute("delete")

View File

@@ -30,8 +30,6 @@ terminating it.
:instances_path: Where instances are kept on disk :instances_path: Where instances are kept on disk
:compute_driver: Name of class that is used to handle virtualization, loaded :compute_driver: Name of class that is used to handle virtualization, loaded
by :func:`nova.utils.import_object` by :func:`nova.utils.import_object`
:volume_manager: Name of class that handles persistent storage, loaded by
:func:`nova.utils.import_object`
""" """
@@ -60,7 +58,6 @@ from nova.compute import power_state
from nova.compute import task_states from nova.compute import task_states
from nova.compute import vm_states from nova.compute import vm_states
from nova.notifier import api as notifier from nova.notifier import api as notifier
from nova.compute.utils import terminate_volumes
from nova.virt import driver from nova.virt import driver
@@ -143,7 +140,6 @@ class ComputeManager(manager.SchedulerDependentManager):
self.network_api = network.API() self.network_api = network.API()
self.network_manager = utils.import_object(FLAGS.network_manager) self.network_manager = utils.import_object(FLAGS.network_manager)
self.volume_manager = utils.import_object(FLAGS.volume_manager)
self._last_host_check = 0 self._last_host_check = 0
super(ComputeManager, self).__init__(service_name="compute", super(ComputeManager, self).__init__(service_name="compute",
*args, **kwargs) *args, **kwargs)
@@ -281,8 +277,8 @@ class ComputeManager(manager.SchedulerDependentManager):
if not ((bdm['snapshot_id'] is None) or if not ((bdm['snapshot_id'] is None) or
(bdm['volume_id'] is not None)): (bdm['volume_id'] is not None)):
LOG.error(_('corrupted state of block device mapping ' LOG.error(_('corrupted state of block device mapping '
'id: %(id)s ' 'id: %(id)s snapshot: %(snapshot_id)s '
'snapshot: %(snapshot_id) volume: %(vollume_id)') % 'volume: %(volume_id)s') %
{'id': bdm['id'], {'id': bdm['id'],
'snapshot_id': bdm['snapshot'], 'snapshot_id': bdm['snapshot'],
'volume_id': bdm['volume_id']}) 'volume_id': bdm['volume_id']})
@@ -292,10 +288,13 @@ class ComputeManager(manager.SchedulerDependentManager):
if bdm['volume_id'] is not None: if bdm['volume_id'] is not None:
volume_api.check_attach(context, volume_api.check_attach(context,
volume_id=bdm['volume_id']) volume_id=bdm['volume_id'])
dev_path = self._attach_volume_boot(context, instance_id, cinfo = self._attach_volume_boot(context, instance_id,
bdm['volume_id'], bdm['volume_id'],
bdm['device_name']) bdm['device_name'])
block_device_mapping.append({'device_path': dev_path, self.db.block_device_mapping_update(
context, bdm['id'],
{'connection_info': utils.dumps(cinfo)})
block_device_mapping.append({'connection_info': cinfo,
'mount_device': 'mount_device':
bdm['device_name']}) bdm['device_name']})
@@ -476,6 +475,23 @@ class ComputeManager(manager.SchedulerDependentManager):
# a per-line disable flag here for W602 # a per-line disable flag here for W602
raise type_, value, traceback raise type_, value, traceback
def _get_instance_volume_bdms(self, context, instance_id):
bdms = self.db.block_device_mapping_get_all_by_instance(context,
instance_id)
return [bdm for bdm in bdms if bdm['volume_id']]
def _get_instance_volume_block_device_info(self, context, instance_id):
bdms = self._get_instance_volume_bdms(context, instance_id)
block_device_mapping = []
for bdm in bdms:
cinfo = utils.loads(bdm['connection_info'])
block_device_mapping.append({'connection_info': cinfo,
'mount_device':
bdm['device_name']})
## NOTE(vish): The mapping is passed in so the driver can disconnect
## from remote volumes if necessary
return {'block_device_mapping': block_device_mapping}
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def run_instance(self, context, instance_id, **kwargs): def run_instance(self, context, instance_id, **kwargs):
self._run_instance(context, instance_id, **kwargs) self._run_instance(context, instance_id, **kwargs)
@@ -486,9 +502,11 @@ class ComputeManager(manager.SchedulerDependentManager):
"""Starting an instance on this host.""" """Starting an instance on this host."""
# TODO(yamahata): injected_files isn't supported. # TODO(yamahata): injected_files isn't supported.
# Anyway OSAPI doesn't support stop/start yet # Anyway OSAPI doesn't support stop/start yet
# FIXME(vish): I've kept the files during stop instance, but
# I think start will fail due to the files still
self._run_instance(context, instance_id) self._run_instance(context, instance_id)
def _shutdown_instance(self, context, instance_id, action_str): def _shutdown_instance(self, context, instance_id, action_str, cleanup):
"""Shutdown an instance on this host.""" """Shutdown an instance on this host."""
context = context.elevated() context = context.elevated()
instance = self.db.instance_get(context, instance_id) instance = self.db.instance_get(context, instance_id)
@@ -500,22 +518,35 @@ class ComputeManager(manager.SchedulerDependentManager):
if not FLAGS.stub_network: if not FLAGS.stub_network:
self.network_api.deallocate_for_instance(context, instance) self.network_api.deallocate_for_instance(context, instance)
volumes = instance.get('volumes') or [] for bdm in self._get_instance_volume_bdms(context, instance_id):
for volume in volumes: volume_id = bdm['volume_id']
self._detach_volume(context, instance_id, volume['id'], False) try:
self._detach_volume(context, instance_id, volume_id)
except exception.DiskNotFound as exc:
LOG.warn(_("Ignoring DiskNotFound: %s") % exc)
if instance['power_state'] == power_state.SHUTOFF: if instance['power_state'] == power_state.SHUTOFF:
self.db.instance_destroy(context, instance_id) self.db.instance_destroy(context, instance_id)
raise exception.Error(_('trying to destroy already destroyed' raise exception.Error(_('trying to destroy already destroyed'
' instance: %s') % instance_id) ' instance: %s') % instance_id)
self.driver.destroy(instance, network_info) block_device_info = self._get_instance_volume_block_device_info(
context, instance_id)
self.driver.destroy(instance, network_info, block_device_info, cleanup)
if action_str == 'Terminating': def _cleanup_volumes(self, context, instance_id):
terminate_volumes(self.db, context, instance_id) volume_api = volume.API()
bdms = self.db.block_device_mapping_get_all_by_instance(context,
instance_id)
for bdm in bdms:
LOG.debug(_("terminating bdm %s") % bdm)
if bdm['volume_id'] and bdm['delete_on_termination']:
volume_api.delete(context, bdm['volume_id'])
# NOTE(vish): bdms will be deleted on instance destroy
def _delete_instance(self, context, instance_id): def _delete_instance(self, context, instance_id):
"""Delete an instance on this host.""" """Delete an instance on this host."""
self._shutdown_instance(context, instance_id, 'Terminating') self._shutdown_instance(context, instance_id, 'Terminating', True)
self._cleanup_volumes(context, instance_id)
instance = self.db.instance_get(context.elevated(), instance_id) instance = self.db.instance_get(context.elevated(), instance_id)
self._instance_update(context, self._instance_update(context,
instance_id, instance_id,
@@ -540,7 +571,11 @@ class ComputeManager(manager.SchedulerDependentManager):
@checks_instance_lock @checks_instance_lock
def stop_instance(self, context, instance_id): def stop_instance(self, context, instance_id):
"""Stopping an instance on this host.""" """Stopping an instance on this host."""
self._shutdown_instance(context, instance_id, 'Stopping') # FIXME(vish): I've kept the files during stop instance, but
# I think start will fail due to the files still
# existing. I don't really know what the purpose of
# stop and start are when compared to pause and unpause
self._shutdown_instance(context, instance_id, 'Stopping', False)
self._instance_update(context, self._instance_update(context,
instance_id, instance_id,
vm_state=vm_states.STOPPED, vm_state=vm_states.STOPPED,
@@ -612,7 +647,6 @@ class ComputeManager(manager.SchedulerDependentManager):
instance_id, instance_id,
vm_state=vm_states.REBUILDING, vm_state=vm_states.REBUILDING,
task_state=task_states.SPAWNING) task_state=task_states.SPAWNING)
# pull in new password here since the original password isn't in the db # pull in new password here since the original password isn't in the db
instance_ref.admin_pass = kwargs.get('new_pass', instance_ref.admin_pass = kwargs.get('new_pass',
utils.generate_password(FLAGS.password_length)) utils.generate_password(FLAGS.password_length))
@@ -1283,17 +1317,17 @@ class ComputeManager(manager.SchedulerDependentManager):
"""Attach a volume to an instance at boot time. So actual attach """Attach a volume to an instance at boot time. So actual attach
is done by instance creation""" is done by instance creation"""
# TODO(yamahata):
# should move check_attach to volume manager?
volume.API().check_attach(context, volume_id)
context = context.elevated() context = context.elevated()
LOG.audit(_("instance %(instance_id)s: booting with " LOG.audit(_("instance %(instance_id)s: booting with "
"volume %(volume_id)s at %(mountpoint)s") % "volume %(volume_id)s at %(mountpoint)s") %
locals(), context=context) locals(), context=context)
dev_path = self.volume_manager.setup_compute_volume(context, volume_id) address = FLAGS.my_ip
self.db.volume_attached(context, volume_id, instance_id, mountpoint) volume_api = volume.API()
return dev_path connection_info = volume_api.initialize_connection(context,
volume_id,
address)
volume_api.attach(context, volume_id, instance_id, mountpoint)
return connection_info
@checks_instance_lock @checks_instance_lock
def attach_volume(self, context, instance_id, volume_id, mountpoint): def attach_volume(self, context, instance_id, volume_id, mountpoint):
@@ -1302,18 +1336,29 @@ class ComputeManager(manager.SchedulerDependentManager):
instance_ref = self.db.instance_get(context, instance_id) instance_ref = self.db.instance_get(context, instance_id)
LOG.audit(_("instance %(instance_id)s: attaching volume %(volume_id)s" LOG.audit(_("instance %(instance_id)s: attaching volume %(volume_id)s"
" to %(mountpoint)s") % locals(), context=context) " to %(mountpoint)s") % locals(), context=context)
dev_path = self.volume_manager.setup_compute_volume(context, volume_api = volume.API()
volume_id) address = FLAGS.my_ip
try: connection_info = volume_api.initialize_connection(context,
self.driver.attach_volume(instance_ref['name'],
dev_path,
mountpoint)
self.db.volume_attached(context,
volume_id, volume_id,
instance_id, address)
try:
self.driver.attach_volume(connection_info,
instance_ref['name'],
mountpoint) mountpoint)
except Exception: # pylint: disable=W0702
exc = sys.exc_info()
# NOTE(vish): The inline callback eats the exception info so we
# log the traceback here and reraise the same
# ecxception below.
LOG.exception(_("instance %(instance_id)s: attach failed"
" %(mountpoint)s, removing") % locals(), context=context)
volume_api.terminate_connection(context, volume_id, address)
raise exc
volume_api.attach(context, volume_id, instance_id, mountpoint)
values = { values = {
'instance_id': instance_id, 'instance_id': instance_id,
'connection_info': utils.dumps(connection_info),
'device_name': mountpoint, 'device_name': mountpoint,
'delete_on_termination': False, 'delete_on_termination': False,
'virtual_name': None, 'virtual_name': None,
@@ -1322,36 +1367,42 @@ class ComputeManager(manager.SchedulerDependentManager):
'volume_size': None, 'volume_size': None,
'no_device': None} 'no_device': None}
self.db.block_device_mapping_create(context, values) self.db.block_device_mapping_create(context, values)
except Exception as exc: # pylint: disable=W0702
# NOTE(vish): The inline callback eats the exception info so we
# log the traceback here and reraise the same
# ecxception below.
LOG.exception(_("instance %(instance_id)s: attach failed"
" %(mountpoint)s, removing") % locals(), context=context)
self.volume_manager.remove_compute_volume(context,
volume_id)
raise exc
return True return True
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
@checks_instance_lock @checks_instance_lock
def _detach_volume(self, context, instance_id, volume_id, destroy_bdm): def _detach_volume(self, context, instance_id, volume_id,
destroy_bdm=False, mark_detached=True,
force_detach=False):
"""Detach a volume from an instance.""" """Detach a volume from an instance."""
context = context.elevated() context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id) instance_ref = self.db.instance_get(context, instance_id)
volume_ref = self.db.volume_get(context, volume_id) bdms = self.db.block_device_mapping_get_all_by_instance(
mp = volume_ref['mountpoint'] context, instance_id)
for item in bdms:
# NOTE(vish): Comparing as strings because the os_api doesn't
# convert to integer and we may wish to support uuids
# in the future.
if str(item['volume_id']) == str(volume_id):
bdm = item
break
mp = bdm['device_name']
LOG.audit(_("Detach volume %(volume_id)s from mountpoint %(mp)s" LOG.audit(_("Detach volume %(volume_id)s from mountpoint %(mp)s"
" on instance %(instance_id)s") % locals(), context=context) " on instance %(instance_id)s") % locals(), context=context)
if instance_ref['name'] not in self.driver.list_instances(): volume_api = volume.API()
if (instance_ref['name'] not in self.driver.list_instances() and
not force_detach):
LOG.warn(_("Detaching volume from unknown instance %s"), LOG.warn(_("Detaching volume from unknown instance %s"),
instance_id, context=context) instance_id, context=context)
else: else:
self.driver.detach_volume(instance_ref['name'], self.driver.detach_volume(utils.loads(bdm['connection_info']),
volume_ref['mountpoint']) instance_ref['name'],
self.volume_manager.remove_compute_volume(context, volume_id) bdm['device_name'])
self.db.volume_detached(context, volume_id) address = FLAGS.my_ip
volume_api.terminate_connection(context, volume_id, address)
if mark_detached:
volume_api.detach(context, volume_id)
if destroy_bdm: if destroy_bdm:
self.db.block_device_mapping_destroy_by_instance_and_volume( self.db.block_device_mapping_destroy_by_instance_and_volume(
context, instance_id, volume_id) context, instance_id, volume_id)
@@ -1361,13 +1412,17 @@ class ComputeManager(manager.SchedulerDependentManager):
"""Detach a volume from an instance.""" """Detach a volume from an instance."""
return self._detach_volume(context, instance_id, volume_id, True) return self._detach_volume(context, instance_id, volume_id, True)
def remove_volume(self, context, volume_id): @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
"""Remove volume on compute host. def remove_volume_connection(self, context, instance_id, volume_id):
"""Detach a volume from an instance.,"""
:param context: security context # NOTE(vish): We don't want to actually mark the volume
:param volume_id: volume ID # detached, or delete the bdm, just remove the
""" # connection from this host.
self.volume_manager.remove_compute_volume(context, volume_id) try:
self._detach_volume(context, instance_id, volume_id,
False, False, True)
except exception.NotFound:
pass
@exception.wrap_exception(notifier=notifier, publisher_id=publisher_id()) @exception.wrap_exception(notifier=notifier, publisher_id=publisher_id())
def compare_cpu(self, context, cpu_info): def compare_cpu(self, context, cpu_info):
@@ -1450,14 +1505,14 @@ class ComputeManager(manager.SchedulerDependentManager):
# Getting instance info # Getting instance info
instance_ref = self.db.instance_get(context, instance_id) instance_ref = self.db.instance_get(context, instance_id)
hostname = instance_ref['hostname']
# If any volume is mounted, prepare here. # If any volume is mounted, prepare here.
if not instance_ref['volumes']: block_device_info = \
LOG.info(_("%s has no volume."), hostname) self._get_instance_volume_block_device_info(context, instance_id)
else: if not block_device_info['block_device_mapping']:
for v in instance_ref['volumes']: LOG.info(_("%s has no volume."), instance_ref.name)
self.volume_manager.setup_compute_volume(context, v['id'])
self.driver.pre_live_migration(block_device_info)
# Bridge settings. # Bridge settings.
# Call this method prior to ensure_filtering_rules_for_instance, # Call this method prior to ensure_filtering_rules_for_instance,
@@ -1517,7 +1572,7 @@ class ComputeManager(manager.SchedulerDependentManager):
try: try:
# Checking volume node is working correctly when any volumes # Checking volume node is working correctly when any volumes
# are attached to instances. # are attached to instances.
if instance_ref['volumes']: if self._get_instance_volume_bdms(context, instance_id):
rpc.call(context, rpc.call(context,
FLAGS.volume_topic, FLAGS.volume_topic,
{"method": "check_for_export", {"method": "check_for_export",
@@ -1537,12 +1592,13 @@ class ComputeManager(manager.SchedulerDependentManager):
'disk': disk}}) 'disk': disk}})
except Exception: except Exception:
exc = sys.exc_info()
i_name = instance_ref.name i_name = instance_ref.name
msg = _("Pre live migration for %(i_name)s failed at %(dest)s") msg = _("Pre live migration for %(i_name)s failed at %(dest)s")
LOG.error(msg % locals()) LOG.exception(msg % locals())
self.rollback_live_migration(context, instance_ref, self.rollback_live_migration(context, instance_ref,
dest, block_migration) dest, block_migration)
raise raise exc
# Executing live migration # Executing live migration
# live_migration might raises exceptions, but # live_migration might raises exceptions, but
@@ -1570,11 +1626,12 @@ class ComputeManager(manager.SchedulerDependentManager):
instance_id = instance_ref['id'] instance_id = instance_ref['id']
# Detaching volumes. # Detaching volumes.
try: for bdm in self._get_instance_volume_bdms(ctxt, instance_id):
for vol in self.db.volume_get_all_by_instance(ctxt, instance_id): # NOTE(vish): We don't want to actually mark the volume
self.volume_manager.remove_compute_volume(ctxt, vol['id']) # detached, or delete the bdm, just remove the
except exception.NotFound: # connection from this host.
pass self.remove_volume_connection(ctxt, instance_id,
bdm['volume_id'])
# Releasing vlan. # Releasing vlan.
# (not necessary in current implementation?) # (not necessary in current implementation?)
@@ -1673,10 +1730,11 @@ class ComputeManager(manager.SchedulerDependentManager):
vm_state=vm_states.ACTIVE, vm_state=vm_states.ACTIVE,
task_state=None) task_state=None)
for volume_ref in instance_ref['volumes']: for bdm in self._get_instance_volume_bdms(context, instance_ref['id']):
volume_id = volume_ref['id'] volume_id = bdm['volume_id']
self.db.volume_update(context, volume_id, {'status': 'in-use'}) self.db.volume_update(context, volume_id, {'status': 'in-use'})
volume.API().remove_from_compute(context, volume_id, dest) volume.API().remove_from_compute(context, instance_ref['id'],
volume_id, dest)
# Block migration needs empty image at destination host # Block migration needs empty image at destination host
# before migration starts, so if any failure occurs, # before migration starts, so if any failure occurs,
@@ -1693,9 +1751,15 @@ class ComputeManager(manager.SchedulerDependentManager):
:param context: security context :param context: security context
:param instance_id: nova.db.sqlalchemy.models.Instance.Id :param instance_id: nova.db.sqlalchemy.models.Instance.Id
""" """
instances_ref = self.db.instance_get(context, instance_id) instance_ref = self.db.instance_get(context, instance_id)
network_info = self._get_instance_nw_info(context, instances_ref) network_info = self._get_instance_nw_info(context, instance_ref)
self.driver.destroy(instances_ref, network_info)
# NOTE(vish): The mapping is passed in so the driver can disconnect
# from remote volumes if necessary
block_device_info = \
self._get_instance_volume_block_device_info(context, instance_id)
self.driver.destroy(instance_ref, network_info,
block_device_info, True)
def periodic_tasks(self, context=None): def periodic_tasks(self, context=None):
"""Tasks to be run at a periodic interval.""" """Tasks to be run at a periodic interval."""

View File

@@ -1,29 +0,0 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 VA Linux Systems Japan K.K
# Copyright (c) 2011 Isaku Yamahata
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import volume
def terminate_volumes(db, context, instance_id):
"""delete volumes of delete_on_termination=True in block device mapping"""
volume_api = volume.API()
for bdm in db.block_device_mapping_get_all_by_instance(context,
instance_id):
#LOG.debug(_("terminating bdm %s") % bdm)
if bdm['volume_id'] and bdm['delete_on_termination']:
volume_api.delete(context, bdm['volume_id'])
db.block_device_mapping_destroy(context, bdm['id'])

View File

@@ -56,18 +56,13 @@ IMPL = utils.LazyPluggable(FLAGS['db_backend'],
sqlalchemy='nova.db.sqlalchemy.api') sqlalchemy='nova.db.sqlalchemy.api')
class NoMoreBlades(exception.Error):
"""No more available blades."""
pass
class NoMoreNetworks(exception.Error): class NoMoreNetworks(exception.Error):
"""No more available networks.""" """No more available networks."""
pass pass
class NoMoreTargets(exception.Error): class NoMoreTargets(exception.Error):
"""No more available blades""" """No more available targets"""
pass pass
@@ -814,25 +809,6 @@ def queue_get_for(context, topic, physical_node_id):
################### ###################
def export_device_count(context):
"""Return count of export devices."""
return IMPL.export_device_count(context)
def export_device_create_safe(context, values):
"""Create an export_device from the values dictionary.
The device is not returned. If the create violates the unique
constraints because the shelf_id and blade_id already exist,
no exception is raised.
"""
return IMPL.export_device_create_safe(context, values)
###################
def iscsi_target_count_by_host(context, host): def iscsi_target_count_by_host(context, host):
"""Return count of export devices.""" """Return count of export devices."""
return IMPL.iscsi_target_count_by_host(context, host) return IMPL.iscsi_target_count_by_host(context, host)
@@ -908,11 +884,6 @@ def quota_destroy_all_by_project(context, project_id):
################### ###################
def volume_allocate_shelf_and_blade(context, volume_id):
"""Atomically allocate a free shelf and blade from the pool."""
return IMPL.volume_allocate_shelf_and_blade(context, volume_id)
def volume_allocate_iscsi_target(context, volume_id, host): def volume_allocate_iscsi_target(context, volume_id, host):
"""Atomically allocate a free iscsi_target from the pool.""" """Atomically allocate a free iscsi_target from the pool."""
return IMPL.volume_allocate_iscsi_target(context, volume_id, host) return IMPL.volume_allocate_iscsi_target(context, volume_id, host)
@@ -978,11 +949,6 @@ def volume_get_instance(context, volume_id):
return IMPL.volume_get_instance(context, volume_id) return IMPL.volume_get_instance(context, volume_id)
def volume_get_shelf_and_blade(context, volume_id):
"""Get the shelf and blade allocated to the volume."""
return IMPL.volume_get_shelf_and_blade(context, volume_id)
def volume_get_iscsi_target_num(context, volume_id): def volume_get_iscsi_target_num(context, volume_id):
"""Get the target num (tid) allocated to the volume.""" """Get the target num (tid) allocated to the volume."""
return IMPL.volume_get_iscsi_target_num(context, volume_id) return IMPL.volume_get_iscsi_target_num(context, volume_id)

View File

@@ -1164,6 +1164,11 @@ def instance_destroy(context, instance_id):
update({'deleted': True, update({'deleted': True,
'deleted_at': utils.utcnow(), 'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')}) 'updated_at': literal_column('updated_at')})
session.query(models.BlockDeviceMapping).\
filter_by(instance_id=instance_id).\
update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
@require_context @require_context
@@ -2002,28 +2007,6 @@ def queue_get_for(_context, topic, physical_node_id):
################### ###################
@require_admin_context
def export_device_count(context):
session = get_session()
return session.query(models.ExportDevice).\
filter_by(deleted=can_read_deleted(context)).\
count()
@require_admin_context
def export_device_create_safe(context, values):
export_device_ref = models.ExportDevice()
export_device_ref.update(values)
try:
export_device_ref.save()
return export_device_ref
except IntegrityError:
return None
###################
@require_admin_context @require_admin_context
def iscsi_target_count_by_host(context, host): def iscsi_target_count_by_host(context, host):
session = get_session() session = get_session()
@@ -2159,24 +2142,6 @@ def quota_destroy_all_by_project(context, project_id):
################### ###################
@require_admin_context
def volume_allocate_shelf_and_blade(context, volume_id):
session = get_session()
with session.begin():
export_device = session.query(models.ExportDevice).\
filter_by(volume=None).\
filter_by(deleted=False).\
with_lockmode('update').\
first()
# NOTE(vish): if with_lockmode isn't supported, as in sqlite,
# then this has concurrency issues
if not export_device:
raise db.NoMoreBlades()
export_device.volume_id = volume_id
session.add(export_device)
return (export_device.shelf_id, export_device.blade_id)
@require_admin_context @require_admin_context
def volume_allocate_iscsi_target(context, volume_id, host): def volume_allocate_iscsi_target(context, volume_id, host):
session = get_session() session = get_session()
@@ -2243,9 +2208,6 @@ def volume_destroy(context, volume_id):
update({'deleted': True, update({'deleted': True,
'deleted_at': utils.utcnow(), 'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')}) 'updated_at': literal_column('updated_at')})
session.query(models.ExportDevice).\
filter_by(volume_id=volume_id).\
update({'volume_id': None})
session.query(models.IscsiTarget).\ session.query(models.IscsiTarget).\
filter_by(volume_id=volume_id).\ filter_by(volume_id=volume_id).\
update({'volume_id': None}) update({'volume_id': None})
@@ -2364,18 +2326,6 @@ def volume_get_instance(context, volume_id):
return result.instance return result.instance
@require_admin_context
def volume_get_shelf_and_blade(context, volume_id):
session = get_session()
result = session.query(models.ExportDevice).\
filter_by(volume_id=volume_id).\
first()
if not result:
raise exception.ExportDeviceNotFoundForVolume(volume_id=volume_id)
return (result.shelf_id, result.blade_id)
@require_admin_context @require_admin_context
def volume_get_iscsi_target_num(context, volume_id): def volume_get_iscsi_target_num(context, volume_id):
session = get_session() session = get_session()

View File

@@ -0,0 +1,51 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 University of Southern California
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import Boolean, Column, DateTime, ForeignKey, Integer
from sqlalchemy import MetaData, String, Table
from nova import log as logging
meta = MetaData()
# Table definition
export_devices = Table('export_devices', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, nullable=False),
Column('shelf_id', Integer()),
Column('blade_id', Integer()),
Column('volume_id',
Integer(),
ForeignKey('volumes.id'),
nullable=True),
)
def downgrade(migrate_engine):
meta.bind = migrate_engine
try:
export_devices.create()
except Exception:
logging.info(repr(export_devices))
logging.exception('Exception while creating table')
raise
def upgrade(migrate_engine):
meta.bind = migrate_engine
export_devices.drop()

View File

@@ -0,0 +1,35 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.from sqlalchemy import *
from sqlalchemy import Column, MetaData, Table, Text
meta = MetaData()
new_column = Column('connection_info', Text())
def upgrade(migrate_engine):
meta.bind = migrate_engine
table = Table('block_device_mapping', meta, autoload=True)
table.create_column(new_column)
def downgrade(migrate_engine):
meta.bind = migrate_engine
table = Table('block_device_mapping', meta, autoload=True)
table.c.connection_info.drop()

View File

@@ -470,21 +470,7 @@ class BlockDeviceMapping(BASE, NovaBase):
# for no device to suppress devices. # for no device to suppress devices.
no_device = Column(Boolean, nullable=True) no_device = Column(Boolean, nullable=True)
connection_info = Column(Text, nullable=True)
class ExportDevice(BASE, NovaBase):
"""Represates a shelf and blade that a volume can be exported on."""
__tablename__ = 'export_devices'
__table_args__ = (schema.UniqueConstraint("shelf_id", "blade_id"),
{'mysql_engine': 'InnoDB'})
id = Column(Integer, primary_key=True)
shelf_id = Column(Integer)
blade_id = Column(Integer)
volume_id = Column(Integer, ForeignKey('volumes.id'), nullable=True)
volume = relationship(Volume,
backref=backref('export_device', uselist=False),
foreign_keys=volume_id,
primaryjoin='and_(ExportDevice.volume_id==Volume.id,'
'ExportDevice.deleted==False)')
class IscsiTarget(BASE, NovaBase): class IscsiTarget(BASE, NovaBase):

View File

@@ -394,10 +394,6 @@ class VolumeIsBusy(Error):
message = _("deleting volume %(volume_name)s that has snapshot") message = _("deleting volume %(volume_name)s that has snapshot")
class ExportDeviceNotFoundForVolume(NotFound):
message = _("No export device found for volume %(volume_id)s.")
class ISCSITargetNotFoundForVolume(NotFound): class ISCSITargetNotFoundForVolume(NotFound):
message = _("No target id found for volume %(volume_id)s.") message = _("No target id found for volume %(volume_id)s.")
@@ -406,6 +402,10 @@ class DiskNotFound(NotFound):
message = _("No disk at %(location)s") message = _("No disk at %(location)s")
class VolumeDriverNotFound(NotFound):
message = _("Could not find a handler for %(driver_type)s volume.")
class InvalidImageRef(Invalid): class InvalidImageRef(Invalid):
message = _("Invalid image href %(image_href)s.") message = _("Invalid image href %(image_href)s.")

View File

@@ -10,7 +10,7 @@ flags.DEFINE_integer('rpc_conn_pool_size', 30,
'Size of RPC connection pool') 'Size of RPC connection pool')
class RemoteError(exception.Error): class RemoteError(exception.NovaException):
"""Signifies that a remote class has raised an exception. """Signifies that a remote class has raised an exception.
Containes a string representation of the type of the original exception, Containes a string representation of the type of the original exception,
@@ -19,11 +19,10 @@ class RemoteError(exception.Error):
contains all of the relevent info. contains all of the relevent info.
""" """
message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.")
def __init__(self, exc_type, value, traceback): def __init__(self, exc_type=None, value=None, traceback=None):
self.exc_type = exc_type self.exc_type = exc_type
self.value = value self.value = value
self.traceback = traceback self.traceback = traceback
super(RemoteError, self).__init__('%s %s\n%s' % (exc_type, super(RemoteError, self).__init__(**self.__dict__)
value,
traceback))

View File

@@ -1277,7 +1277,7 @@ class CloudTestCase(test.TestCase):
LOG.debug(info) LOG.debug(info)
if predicate(info): if predicate(info):
break break
greenthread.sleep(1) greenthread.sleep(0.5)
def _wait_for_running(self, instance_id): def _wait_for_running(self, instance_id):
def is_running(info): def is_running(info):
@@ -1296,6 +1296,16 @@ class CloudTestCase(test.TestCase):
def _wait_for_terminate(self, instance_id): def _wait_for_terminate(self, instance_id):
def is_deleted(info): def is_deleted(info):
return info['deleted'] return info['deleted']
id = ec2utils.ec2_id_to_id(instance_id)
# NOTE(vish): Wait for InstanceNotFound, then verify that
# the instance is actually deleted.
while True:
try:
self.cloud.compute_api.get(self.context, instance_id=id)
except exception.InstanceNotFound:
break
greenthread.sleep(0.1)
elevated = self.context.elevated(read_deleted=True) elevated = self.context.elevated(read_deleted=True)
self._wait_for_state(elevated, instance_id, is_deleted) self._wait_for_state(elevated, instance_id, is_deleted)
@@ -1311,26 +1321,21 @@ class CloudTestCase(test.TestCase):
# a running instance can't be started. It is just ignored. # a running instance can't be started. It is just ignored.
result = self.cloud.start_instances(self.context, [instance_id]) result = self.cloud.start_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result) self.assertTrue(result)
result = self.cloud.stop_instances(self.context, [instance_id]) result = self.cloud.stop_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result) self.assertTrue(result)
self._wait_for_stopped(instance_id) self._wait_for_stopped(instance_id)
result = self.cloud.start_instances(self.context, [instance_id]) result = self.cloud.start_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result) self.assertTrue(result)
self._wait_for_running(instance_id) self._wait_for_running(instance_id)
result = self.cloud.stop_instances(self.context, [instance_id]) result = self.cloud.stop_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result) self.assertTrue(result)
self._wait_for_stopped(instance_id) self._wait_for_stopped(instance_id)
result = self.cloud.terminate_instances(self.context, [instance_id]) result = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result) self.assertTrue(result)
self._restart_compute_service() self._restart_compute_service()
@@ -1542,24 +1547,20 @@ class CloudTestCase(test.TestCase):
self.assertTrue(vol2_id) self.assertTrue(vol2_id)
self.cloud.terminate_instances(self.context, [ec2_instance_id]) self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
self._wait_for_terminate(ec2_instance_id) self._wait_for_terminate(ec2_instance_id)
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=False) admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1_id) vol = db.volume_get(admin_ctxt, vol1_id)
self._assert_volume_detached(vol) self._assert_volume_detached(vol)
self.assertFalse(vol['deleted']) self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1_id) db.volume_destroy(self.context, vol1_id)
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True) admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2_id) vol = db.volume_get(admin_ctxt, vol2_id)
self.assertTrue(vol['deleted']) self.assertTrue(vol['deleted'])
for snapshot_id in (ec2_snapshot1_id, ec2_snapshot2_id): for snapshot_id in (ec2_snapshot1_id, ec2_snapshot2_id):
self.cloud.delete_snapshot(self.context, snapshot_id) self.cloud.delete_snapshot(self.context, snapshot_id)
greenthread.sleep(0.3)
db.volume_destroy(self.context, vol['id']) db.volume_destroy(self.context, vol['id'])
def test_create_image(self): def test_create_image(self):

View File

@@ -33,11 +33,7 @@ FLAGS['network_size'].SetDefault(8)
FLAGS['num_networks'].SetDefault(2) FLAGS['num_networks'].SetDefault(2)
FLAGS['fake_network'].SetDefault(True) FLAGS['fake_network'].SetDefault(True)
FLAGS['image_service'].SetDefault('nova.image.fake.FakeImageService') FLAGS['image_service'].SetDefault('nova.image.fake.FakeImageService')
flags.DECLARE('num_shelves', 'nova.volume.driver')
flags.DECLARE('blades_per_shelf', 'nova.volume.driver')
flags.DECLARE('iscsi_num_targets', 'nova.volume.driver') flags.DECLARE('iscsi_num_targets', 'nova.volume.driver')
FLAGS['num_shelves'].SetDefault(2)
FLAGS['blades_per_shelf'].SetDefault(4)
FLAGS['iscsi_num_targets'].SetDefault(8) FLAGS['iscsi_num_targets'].SetDefault(8)
FLAGS['verbose'].SetDefault(True) FLAGS['verbose'].SetDefault(True)
FLAGS['sqlite_db'].SetDefault("tests.sqlite") FLAGS['sqlite_db'].SetDefault("tests.sqlite")

View File

@@ -263,22 +263,23 @@ class VolumesTest(integrated_helpers._IntegratedTestBase):
LOG.debug("Logs: %s" % driver.LoggingVolumeDriver.all_logs()) LOG.debug("Logs: %s" % driver.LoggingVolumeDriver.all_logs())
# Discover_volume and undiscover_volume are called from compute # prepare_attach and prepare_detach are called from compute
# on attach/detach # on attach/detach
disco_moves = driver.LoggingVolumeDriver.logs_like( disco_moves = driver.LoggingVolumeDriver.logs_like(
'discover_volume', 'initialize_connection',
id=volume_id) id=volume_id)
LOG.debug("discover_volume actions: %s" % disco_moves) LOG.debug("initialize_connection actions: %s" % disco_moves)
self.assertEquals(1, len(disco_moves)) self.assertEquals(1, len(disco_moves))
disco_move = disco_moves[0] disco_move = disco_moves[0]
self.assertEquals(disco_move['id'], volume_id) self.assertEquals(disco_move['id'], volume_id)
last_days_of_disco_moves = driver.LoggingVolumeDriver.logs_like( last_days_of_disco_moves = driver.LoggingVolumeDriver.logs_like(
'undiscover_volume', 'terminate_connection',
id=volume_id) id=volume_id)
LOG.debug("undiscover_volume actions: %s" % last_days_of_disco_moves) LOG.debug("terminate_connection actions: %s" %
last_days_of_disco_moves)
self.assertEquals(1, len(last_days_of_disco_moves)) self.assertEquals(1, len(last_days_of_disco_moves))
undisco_move = last_days_of_disco_moves[0] undisco_move = last_days_of_disco_moves[0]

View File

@@ -1080,7 +1080,8 @@ class SimpleDriverTestCase(test.TestCase):
rpc.call(mox.IgnoreArg(), mox.IgnoreArg(), rpc.call(mox.IgnoreArg(), mox.IgnoreArg(),
{"method": 'compare_cpu', {"method": 'compare_cpu',
"args": {'cpu_info': s_ref2['compute_node'][0]['cpu_info']}}).\ "args": {'cpu_info': s_ref2['compute_node'][0]['cpu_info']}}).\
AndRaise(rpc.RemoteError("doesn't have compatibility to", "", "")) AndRaise(rpc.RemoteError(exception.InvalidCPUInfo,
exception.InvalidCPUInfo(reason='fake')))
self.mox.ReplayAll() self.mox.ReplayAll()
try: try:
@@ -1089,7 +1090,7 @@ class SimpleDriverTestCase(test.TestCase):
dest, dest,
False) False)
except rpc.RemoteError, e: except rpc.RemoteError, e:
c = (e.message.find(_("doesn't have compatibility to")) >= 0) c = (e.exc_type == exception.InvalidCPUInfo)
self.assertTrue(c) self.assertTrue(c)
db.instance_destroy(self.context, instance_id) db.instance_destroy(self.context, instance_id)

View File

@@ -21,6 +21,7 @@ Tests For Compute
""" """
from copy import copy from copy import copy
import mox
from nova import compute from nova import compute
from nova import context from nova import context
@@ -159,21 +160,6 @@ class ComputeTestCase(test.TestCase):
'project_id': self.project_id} 'project_id': self.project_id}
return db.security_group_create(self.context, values) return db.security_group_create(self.context, values)
def _get_dummy_instance(self):
"""Get mock-return-value instance object
Use this when any testcase executed later than test_run_terminate
"""
vol1 = models.Volume()
vol1['id'] = 1
vol2 = models.Volume()
vol2['id'] = 2
instance_ref = models.Instance()
instance_ref['id'] = 1
instance_ref['volumes'] = [vol1, vol2]
instance_ref['hostname'] = 'hostname-1'
instance_ref['host'] = 'dummy'
return instance_ref
def test_create_instance_defaults_display_name(self): def test_create_instance_defaults_display_name(self):
"""Verify that an instance cannot be created without a display_name.""" """Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)] cases = [dict(), dict(display_name=None)]
@@ -726,235 +712,124 @@ class ComputeTestCase(test.TestCase):
def test_pre_live_migration_instance_has_no_fixed_ip(self): def test_pre_live_migration_instance_has_no_fixed_ip(self):
"""Confirm raising exception if instance doesn't have fixed_ip.""" """Confirm raising exception if instance doesn't have fixed_ip."""
instance_ref = self._get_dummy_instance() # creating instance testdata
instance_id = self._create_instance({'host': 'dummy'})
c = context.get_admin_context() c = context.get_admin_context()
i_id = instance_ref['id'] inst_ref = db.instance_get(c, instance_id)
topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
dbmock = self.mox.CreateMock(db) # start test
dbmock.instance_get(c, i_id).AndReturn(instance_ref) self.assertRaises(exception.FixedIpNotFoundForInstance,
self.compute.db = dbmock
self.mox.ReplayAll()
self.assertRaises(exception.NotFound,
self.compute.pre_live_migration, self.compute.pre_live_migration,
c, instance_ref['id'], time=FakeTime()) c, inst_ref['id'], time=FakeTime())
# cleanup
db.instance_destroy(c, instance_id)
def test_pre_live_migration_instance_has_volume(self): def test_pre_live_migration_works_correctly(self):
"""Confirm setup_compute_volume is called when volume is mounted.""" """Confirm setup_compute_volume is called when volume is mounted."""
def fake_nw_info(*args, **kwargs): # creating instance testdata
return [(0, {'ips':['dummy']})] instance_id = self._create_instance({'host': 'dummy'})
i_ref = self._get_dummy_instance()
c = context.get_admin_context() c = context.get_admin_context()
inst_ref = db.instance_get(c, instance_id)
topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
self._setup_other_managers() # creating mocks
dbmock = self.mox.CreateMock(db) self.mox.StubOutWithMock(self.compute.driver, 'pre_live_migration')
volmock = self.mox.CreateMock(self.volume_manager) self.compute.driver.pre_live_migration({'block_device_mapping': []})
drivermock = self.mox.CreateMock(self.compute_driver) dummy_nw_info = [[None, {'ips':'1.1.1.1'}]]
self.mox.StubOutWithMock(self.compute, '_get_instance_nw_info')
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref) self.compute._get_instance_nw_info(c, mox.IsA(inst_ref)
for i in range(len(i_ref['volumes'])): ).AndReturn(dummy_nw_info)
vid = i_ref['volumes'][i]['id'] self.mox.StubOutWithMock(self.compute.driver, 'plug_vifs')
volmock.setup_compute_volume(c, vid).InAnyOrder('g1') self.compute.driver.plug_vifs(mox.IsA(inst_ref), dummy_nw_info)
drivermock.plug_vifs(i_ref, fake_nw_info()) self.mox.StubOutWithMock(self.compute.driver,
drivermock.ensure_filtering_rules_for_instance(i_ref, fake_nw_info()) 'ensure_filtering_rules_for_instance')
self.compute.driver.ensure_filtering_rules_for_instance(
self.stubs.Set(self.compute, '_get_instance_nw_info', fake_nw_info) mox.IsA(inst_ref), dummy_nw_info)
self.compute.db = dbmock
self.compute.volume_manager = volmock
self.compute.driver = drivermock
# start test
self.mox.ReplayAll() self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, i_ref['id']) ret = self.compute.pre_live_migration(c, inst_ref['id'])
self.assertEqual(ret, None) self.assertEqual(ret, None)
def test_pre_live_migration_instance_has_no_volume(self): # cleanup
"""Confirm log meg when instance doesn't mount any volumes.""" db.instance_destroy(c, instance_id)
def fake_nw_info(*args, **kwargs):
return [(0, {'ips':['dummy']})]
i_ref = self._get_dummy_instance()
i_ref['volumes'] = []
c = context.get_admin_context()
self._setup_other_managers()
dbmock = self.mox.CreateMock(db)
drivermock = self.mox.CreateMock(self.compute_driver)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(compute_manager.LOG, 'info')
compute_manager.LOG.info(_("%s has no volume."), i_ref['hostname'])
drivermock.plug_vifs(i_ref, fake_nw_info())
drivermock.ensure_filtering_rules_for_instance(i_ref, fake_nw_info())
self.stubs.Set(self.compute, '_get_instance_nw_info', fake_nw_info)
self.compute.db = dbmock
self.compute.driver = drivermock
self.mox.ReplayAll()
ret = self.compute.pre_live_migration(c, i_ref['id'], time=FakeTime())
self.assertEqual(ret, None)
def test_pre_live_migration_setup_compute_node_fail(self):
"""Confirm operation setup_compute_network() fails.
It retries and raise exception when timeout exceeded.
"""
def fake_nw_info(*args, **kwargs):
return [(0, {'ips':['dummy']})]
i_ref = self._get_dummy_instance()
c = context.get_admin_context()
self._setup_other_managers()
dbmock = self.mox.CreateMock(db)
netmock = self.mox.CreateMock(self.network_manager)
volmock = self.mox.CreateMock(self.volume_manager)
drivermock = self.mox.CreateMock(self.compute_driver)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
for i in range(len(i_ref['volumes'])):
volmock.setup_compute_volume(c, i_ref['volumes'][i]['id'])
for i in range(FLAGS.live_migration_retry_count):
drivermock.plug_vifs(i_ref, fake_nw_info()).\
AndRaise(exception.ProcessExecutionError())
self.stubs.Set(self.compute, '_get_instance_nw_info', fake_nw_info)
self.compute.db = dbmock
self.compute.network_manager = netmock
self.compute.volume_manager = volmock
self.compute.driver = drivermock
self.mox.ReplayAll()
self.assertRaises(exception.ProcessExecutionError,
self.compute.pre_live_migration,
c, i_ref['id'], time=FakeTime())
def test_live_migration_works_correctly_with_volume(self):
"""Confirm check_for_export to confirm volume health check."""
i_ref = self._get_dummy_instance()
c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host'])
dbmock = self.mox.CreateMock(db)
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export",
"args": {'instance_id': i_ref['id']}})
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id'],
'block_migration': False,
'disk': None}})
self.mox.StubOutWithMock(self.compute.driver, 'live_migration')
self.compute.driver.live_migration(c, i_ref, i_ref['host'],
self.compute.post_live_migration,
self.compute.rollback_live_migration,
False)
self.compute.db = dbmock
self.mox.ReplayAll()
ret = self.compute.live_migration(c, i_ref['id'], i_ref['host'])
self.assertEqual(ret, None)
def test_live_migration_dest_raises_exception(self): def test_live_migration_dest_raises_exception(self):
"""Confirm exception when pre_live_migration fails.""" """Confirm exception when pre_live_migration fails."""
i_ref = self._get_dummy_instance() # creating instance testdata
instance_id = self._create_instance({'host': 'dummy'})
c = context.get_admin_context() c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) inst_ref = db.instance_get(c, instance_id)
topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
# creating volume testdata
volume_id = 1
db.volume_create(c, {'id': volume_id})
values = {'instance_id': instance_id, 'device_name': '/dev/vdc',
'delete_on_termination': False, 'volume_id': volume_id}
db.block_device_mapping_create(c, values)
dbmock = self.mox.CreateMock(db) # creating mocks
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export", rpc.call(c, FLAGS.volume_topic, {"method": "check_for_export",
"args": {'instance_id': i_ref['id']}}) "args": {'instance_id': instance_id}})
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
rpc.call(c, topic, {"method": "pre_live_migration", rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id'], "args": {'instance_id': instance_id,
'block_migration': False, 'block_migration': True,
'disk': None}}).\ 'disk': None}}).\
AndRaise(rpc.RemoteError('', '', '')) AndRaise(rpc.common.RemoteError('', '', ''))
dbmock.instance_update(c, i_ref['id'], {'vm_state': vm_states.ACTIVE, # mocks for rollback
'task_state': None, rpc.call(c, topic, {"method": "remove_volume_connection",
'host': i_ref['host']}) "args": {'instance_id': instance_id,
for v in i_ref['volumes']: 'volume_id': volume_id}})
dbmock.volume_update(c, v['id'], {'status': 'in-use'}) rpc.cast(c, topic, {"method": "rollback_live_migration_at_destination",
# mock for volume_api.remove_from_compute "args": {'instance_id': inst_ref['id']}})
rpc.call(c, topic, {"method": "remove_volume",
"args": {'volume_id': v['id']}})
self.compute.db = dbmock # start test
self.mox.ReplayAll() self.mox.ReplayAll()
self.assertRaises(rpc.RemoteError, self.assertRaises(rpc.RemoteError,
self.compute.live_migration, self.compute.live_migration,
c, i_ref['id'], i_ref['host']) c, instance_id, inst_ref['host'], True)
def test_live_migration_dest_raises_exception_no_volume(self): # cleanup
"""Same as above test(input pattern is different) """ for bdms in db.block_device_mapping_get_all_by_instance(c,
i_ref = self._get_dummy_instance() instance_id):
i_ref['volumes'] = [] db.block_device_mapping_destroy(c, bdms['id'])
c = context.get_admin_context() db.volume_destroy(c, volume_id)
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) db.instance_destroy(c, instance_id)
dbmock = self.mox.CreateMock(db) def test_live_migration_works_correctly(self):
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
self.mox.StubOutWithMock(rpc, 'call')
rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id'],
'block_migration': False,
'disk': None}}).\
AndRaise(rpc.RemoteError('', '', ''))
dbmock.instance_update(c, i_ref['id'], {'vm_state': vm_states.ACTIVE,
'task_state': None,
'host': i_ref['host']})
self.compute.db = dbmock
self.mox.ReplayAll()
self.assertRaises(rpc.RemoteError,
self.compute.live_migration,
c, i_ref['id'], i_ref['host'])
def test_live_migration_works_correctly_no_volume(self):
"""Confirm live_migration() works as expected correctly.""" """Confirm live_migration() works as expected correctly."""
i_ref = self._get_dummy_instance() # creating instance testdata
i_ref['volumes'] = [] instance_id = self._create_instance({'host': 'dummy'})
c = context.get_admin_context() c = context.get_admin_context()
topic = db.queue_get_for(c, FLAGS.compute_topic, i_ref['host']) inst_ref = db.instance_get(c, instance_id)
topic = db.queue_get_for(c, FLAGS.compute_topic, inst_ref['host'])
dbmock = self.mox.CreateMock(db) # create
dbmock.instance_get(c, i_ref['id']).AndReturn(i_ref)
self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'call')
dbmock.queue_get_for(c, FLAGS.compute_topic, i_ref['host']).\
AndReturn(topic)
rpc.call(c, topic, {"method": "pre_live_migration", rpc.call(c, topic, {"method": "pre_live_migration",
"args": {'instance_id': i_ref['id'], "args": {'instance_id': instance_id,
'block_migration': False, 'block_migration': False,
'disk': None}}) 'disk': None}})
self.mox.StubOutWithMock(self.compute.driver, 'live_migration')
self.compute.driver.live_migration(c, i_ref, i_ref['host'],
self.compute.post_live_migration,
self.compute.rollback_live_migration,
False)
self.compute.db = dbmock # start test
self.mox.ReplayAll() self.mox.ReplayAll()
ret = self.compute.live_migration(c, i_ref['id'], i_ref['host']) ret = self.compute.live_migration(c, inst_ref['id'], inst_ref['host'])
self.assertEqual(ret, None) self.assertEqual(ret, None)
# cleanup
db.instance_destroy(c, instance_id)
def test_post_live_migration_working_correctly(self): def test_post_live_migration_working_correctly(self):
"""Confirm post_live_migration() works as expected correctly.""" """Confirm post_live_migration() works as expected correctly."""
dest = 'desthost' dest = 'desthost'
flo_addr = '1.2.1.2' flo_addr = '1.2.1.2'
# Preparing datas # creating testdata
c = context.get_admin_context() c = context.get_admin_context()
instance_id = self._create_instance() instance_id = self._create_instance({'state_description': 'migrating',
'state': power_state.PAUSED})
i_ref = db.instance_get(c, instance_id) i_ref = db.instance_get(c, instance_id)
db.instance_update(c, i_ref['id'], {'vm_state': vm_states.MIGRATING, db.instance_update(c, i_ref['id'], {'vm_state': vm_states.MIGRATING,
'power_state': power_state.PAUSED}) 'power_state': power_state.PAUSED})
@@ -964,14 +839,8 @@ class ComputeTestCase(test.TestCase):
fix_ref = db.fixed_ip_get_by_address(c, fix_addr) fix_ref = db.fixed_ip_get_by_address(c, fix_addr)
flo_ref = db.floating_ip_create(c, {'address': flo_addr, flo_ref = db.floating_ip_create(c, {'address': flo_addr,
'fixed_ip_id': fix_ref['id']}) 'fixed_ip_id': fix_ref['id']})
# reload is necessary before setting mocks
i_ref = db.instance_get(c, instance_id)
# Preparing mocks # creating mocks
self.mox.StubOutWithMock(self.compute.volume_manager,
'remove_compute_volume')
for v in i_ref['volumes']:
self.compute.volume_manager.remove_compute_volume(c, v['id'])
self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance') self.mox.StubOutWithMock(self.compute.driver, 'unfilter_instance')
self.compute.driver.unfilter_instance(i_ref, []) self.compute.driver.unfilter_instance(i_ref, [])
self.mox.StubOutWithMock(rpc, 'call') self.mox.StubOutWithMock(rpc, 'call')
@@ -979,18 +848,18 @@ class ComputeTestCase(test.TestCase):
{"method": "post_live_migration_at_destination", {"method": "post_live_migration_at_destination",
"args": {'instance_id': i_ref['id'], 'block_migration': False}}) "args": {'instance_id': i_ref['id'], 'block_migration': False}})
# executing # start test
self.mox.ReplayAll() self.mox.ReplayAll()
ret = self.compute.post_live_migration(c, i_ref, dest) ret = self.compute.post_live_migration(c, i_ref, dest)
# make sure every data is rewritten to dest # make sure every data is rewritten to destinatioin hostname.
i_ref = db.instance_get(c, i_ref['id']) i_ref = db.instance_get(c, i_ref['id'])
c1 = (i_ref['host'] == dest) c1 = (i_ref['host'] == dest)
flo_refs = db.floating_ip_get_all_by_host(c, dest) flo_refs = db.floating_ip_get_all_by_host(c, dest)
c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr) c2 = (len(flo_refs) != 0 and flo_refs[0]['address'] == flo_addr)
# post operaton
self.assertTrue(c1 and c2) self.assertTrue(c1 and c2)
# cleanup
db.instance_destroy(c, instance_id) db.instance_destroy(c, instance_id)
db.volume_destroy(c, v_ref['id']) db.volume_destroy(c, v_ref['id'])
db.floating_ip_destroy(c, flo_addr) db.floating_ip_destroy(c, flo_addr)

View File

@@ -30,6 +30,7 @@ from nova import context
from nova import db from nova import db
from nova import exception from nova import exception
from nova import flags from nova import flags
from nova import log as logging
from nova import test from nova import test
from nova import utils from nova import utils
from nova.api.ec2 import cloud from nova.api.ec2 import cloud
@@ -38,10 +39,13 @@ from nova.compute import vm_states
from nova.virt import driver from nova.virt import driver
from nova.virt.libvirt import connection from nova.virt.libvirt import connection
from nova.virt.libvirt import firewall from nova.virt.libvirt import firewall
from nova.virt.libvirt import volume
from nova.volume import driver as volume_driver
from nova.tests import fake_network from nova.tests import fake_network
libvirt = None libvirt = None
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
LOG = logging.getLogger('nova.tests.test_libvirt')
_fake_network_info = fake_network.fake_get_instance_nw_info _fake_network_info = fake_network.fake_get_instance_nw_info
_ipv4_like = fake_network.ipv4_like _ipv4_like = fake_network.ipv4_like
@@ -87,6 +91,71 @@ class FakeVirtDomain(object):
return self._fake_dom_xml return self._fake_dom_xml
class LibvirtVolumeTestCase(test.TestCase):
@staticmethod
def fake_execute(*cmd, **kwargs):
LOG.debug("FAKE EXECUTE: %s" % ' '.join(cmd))
return None, None
def setUp(self):
super(LibvirtVolumeTestCase, self).setUp()
self.stubs.Set(utils, 'execute', self.fake_execute)
def test_libvirt_iscsi_driver(self):
# NOTE(vish) exists is to make driver assume connecting worked
self.stubs.Set(os.path, 'exists', lambda x: True)
vol_driver = volume_driver.ISCSIDriver()
libvirt_driver = volume.LibvirtISCSIVolumeDriver('fake')
name = 'volume-00000001'
vol = {'id': 1,
'name': name,
'provider_auth': None,
'provider_location': '10.0.2.15:3260,fake '
'iqn.2010-10.org.openstack:volume-00000001'}
address = '127.0.0.1'
connection_info = vol_driver.initialize_connection(vol, address)
mount_device = "vde"
xml = libvirt_driver.connect_volume(connection_info, mount_device)
tree = xml_to_tree(xml)
dev_str = '/dev/disk/by-path/ip-10.0.2.15:3260-iscsi-iqn.' \
'2010-10.org.openstack:%s-lun-0' % name
self.assertEqual(tree.get('type'), 'block')
self.assertEqual(tree.find('./source').get('dev'), dev_str)
libvirt_driver.disconnect_volume(connection_info, mount_device)
def test_libvirt_sheepdog_driver(self):
vol_driver = volume_driver.SheepdogDriver()
libvirt_driver = volume.LibvirtNetVolumeDriver('fake')
name = 'volume-00000001'
vol = {'id': 1, 'name': name}
address = '127.0.0.1'
connection_info = vol_driver.initialize_connection(vol, address)
mount_device = "vde"
xml = libvirt_driver.connect_volume(connection_info, mount_device)
tree = xml_to_tree(xml)
self.assertEqual(tree.get('type'), 'network')
self.assertEqual(tree.find('./source').get('protocol'), 'sheepdog')
self.assertEqual(tree.find('./source').get('name'), name)
libvirt_driver.disconnect_volume(connection_info, mount_device)
def test_libvirt_rbd_driver(self):
vol_driver = volume_driver.RBDDriver()
libvirt_driver = volume.LibvirtNetVolumeDriver('fake')
name = 'volume-00000001'
vol = {'id': 1, 'name': name}
address = '127.0.0.1'
connection_info = vol_driver.initialize_connection(vol, address)
mount_device = "vde"
xml = libvirt_driver.connect_volume(connection_info, mount_device)
tree = xml_to_tree(xml)
self.assertEqual(tree.get('type'), 'network')
self.assertEqual(tree.find('./source').get('protocol'), 'rbd')
rbd_name = '%s/%s' % (FLAGS.rbd_pool, name)
self.assertEqual(tree.find('./source').get('name'), rbd_name)
libvirt_driver.disconnect_volume(connection_info, mount_device)
class CacheConcurrencyTestCase(test.TestCase): class CacheConcurrencyTestCase(test.TestCase):
def setUp(self): def setUp(self):
super(CacheConcurrencyTestCase, self).setUp() super(CacheConcurrencyTestCase, self).setUp()
@@ -145,6 +214,20 @@ class CacheConcurrencyTestCase(test.TestCase):
eventlet.sleep(0) eventlet.sleep(0)
class FakeVolumeDriver(object):
def __init__(self, *args, **kwargs):
pass
def attach_volume(self, *args):
pass
def detach_volume(self, *args):
pass
def get_xml(self, *args):
return ""
class LibvirtConnTestCase(test.TestCase): class LibvirtConnTestCase(test.TestCase):
def setUp(self): def setUp(self):
@@ -192,14 +275,14 @@ class LibvirtConnTestCase(test.TestCase):
return FakeVirtDomain() return FakeVirtDomain()
# Creating mocks # Creating mocks
volume_driver = 'iscsi=nova.tests.test_libvirt.FakeVolumeDriver'
self.flags(libvirt_volume_drivers=[volume_driver])
fake = FakeLibvirtConnection() fake = FakeLibvirtConnection()
# Customizing above fake if necessary # Customizing above fake if necessary
for key, val in kwargs.items(): for key, val in kwargs.items():
fake.__setattr__(key, val) fake.__setattr__(key, val)
self.flags(image_service='nova.image.fake.FakeImageService') self.flags(image_service='nova.image.fake.FakeImageService')
fw_driver = "nova.tests.fake_network.FakeIptablesFirewallDriver"
self.flags(firewall_driver=fw_driver)
self.flags(libvirt_vif_driver="nova.tests.fake_network.FakeVIFDriver") self.flags(libvirt_vif_driver="nova.tests.fake_network.FakeVIFDriver")
self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn') self.mox.StubOutWithMock(connection.LibvirtConnection, '_conn')
@@ -382,14 +465,16 @@ class LibvirtConnTestCase(test.TestCase):
self.assertEquals(snapshot['status'], 'active') self.assertEquals(snapshot['status'], 'active')
self.assertEquals(snapshot['name'], snapshot_name) self.assertEquals(snapshot['name'], snapshot_name)
def test_attach_invalid_device(self): def test_attach_invalid_volume_type(self):
self.create_fake_libvirt_mock() self.create_fake_libvirt_mock()
connection.LibvirtConnection._conn.lookupByName = self.fake_lookup connection.LibvirtConnection._conn.lookupByName = self.fake_lookup
self.mox.ReplayAll() self.mox.ReplayAll()
conn = connection.LibvirtConnection(False) conn = connection.LibvirtConnection(False)
self.assertRaises(exception.InvalidDevicePath, self.assertRaises(exception.VolumeDriverNotFound,
conn.attach_volume, conn.attach_volume,
"fake", "bad/device/path", "/dev/fake") {"driver_volume_type": "badtype"},
"fake",
"/dev/fake")
def test_multi_nic(self): def test_multi_nic(self):
instance_data = dict(self.test_instance) instance_data = dict(self.test_instance)
@@ -640,9 +725,15 @@ class LibvirtConnTestCase(test.TestCase):
self.mox.ReplayAll() self.mox.ReplayAll()
try: try:
conn = connection.LibvirtConnection(False) conn = connection.LibvirtConnection(False)
conn.firewall_driver.setattr('setup_basic_filtering', fake_none) self.stubs.Set(conn.firewall_driver,
conn.firewall_driver.setattr('prepare_instance_filter', fake_none) 'setup_basic_filtering',
conn.firewall_driver.setattr('instance_filter_exists', fake_none) fake_none)
self.stubs.Set(conn.firewall_driver,
'prepare_instance_filter',
fake_none)
self.stubs.Set(conn.firewall_driver,
'instance_filter_exists',
fake_none)
conn.ensure_filtering_rules_for_instance(instance_ref, conn.ensure_filtering_rules_for_instance(instance_ref,
network_info, network_info,
time=fake_timer) time=fake_timer)
@@ -708,6 +799,27 @@ class LibvirtConnTestCase(test.TestCase):
db.volume_destroy(self.context, volume_ref['id']) db.volume_destroy(self.context, volume_ref['id'])
db.instance_destroy(self.context, instance_ref['id']) db.instance_destroy(self.context, instance_ref['id'])
def test_pre_live_migration_works_correctly(self):
"""Confirms pre_block_migration works correctly."""
# Creating testdata
vol = {'block_device_mapping': [
{'connection_info': 'dummy', 'mount_device': '/dev/sda'},
{'connection_info': 'dummy', 'mount_device': '/dev/sdb'}]}
conn = connection.LibvirtConnection(False)
# Creating mocks
self.mox.StubOutWithMock(driver, "block_device_info_get_mapping")
driver.block_device_info_get_mapping(vol
).AndReturn(vol['block_device_mapping'])
self.mox.StubOutWithMock(conn, "volume_driver_method")
for v in vol['block_device_mapping']:
conn.volume_driver_method('connect_volume',
v['connection_info'], v['mount_device'])
# Starting test
self.mox.ReplayAll()
self.assertEqual(conn.pre_live_migration(vol), None)
def test_pre_block_migration_works_correctly(self): def test_pre_block_migration_works_correctly(self):
"""Confirms pre_block_migration works correctly.""" """Confirms pre_block_migration works correctly."""
@@ -822,8 +934,12 @@ class LibvirtConnTestCase(test.TestCase):
# Start test # Start test
self.mox.ReplayAll() self.mox.ReplayAll()
conn = connection.LibvirtConnection(False) conn = connection.LibvirtConnection(False)
conn.firewall_driver.setattr('setup_basic_filtering', fake_none) self.stubs.Set(conn.firewall_driver,
conn.firewall_driver.setattr('prepare_instance_filter', fake_none) 'setup_basic_filtering',
fake_none)
self.stubs.Set(conn.firewall_driver,
'prepare_instance_filter',
fake_none)
try: try:
conn.spawn(self.context, instance, network_info) conn.spawn(self.context, instance, network_info)

View File

@@ -254,9 +254,11 @@ class _VirtDriverTestCase(test.TestCase):
network_info = test_utils.get_test_network_info() network_info = test_utils.get_test_network_info()
instance_ref = test_utils.get_test_instance() instance_ref = test_utils.get_test_instance()
self.connection.spawn(self.ctxt, instance_ref, network_info) self.connection.spawn(self.ctxt, instance_ref, network_info)
self.connection.attach_volume(instance_ref['name'], self.connection.attach_volume({'driver_volume_type': 'fake'},
'/dev/null', '/mnt/nova/something') instance_ref['name'],
self.connection.detach_volume(instance_ref['name'], '/mnt/nova/something')
self.connection.detach_volume({'driver_volume_type': 'fake'},
instance_ref['name'],
'/mnt/nova/something') '/mnt/nova/something')
@catch_notimplementederror @catch_notimplementederror

View File

@@ -257,7 +257,7 @@ class VolumeTestCase(test.TestCase):
class DriverTestCase(test.TestCase): class DriverTestCase(test.TestCase):
"""Base Test class for Drivers.""" """Base Test class for Drivers."""
driver_name = "nova.volume.driver.FakeAOEDriver" driver_name = "nova.volume.driver.FakeBaseDriver"
def setUp(self): def setUp(self):
super(DriverTestCase, self).setUp() super(DriverTestCase, self).setUp()
@@ -295,83 +295,6 @@ class DriverTestCase(test.TestCase):
self.volume.delete_volume(self.context, volume_id) self.volume.delete_volume(self.context, volume_id)
class AOETestCase(DriverTestCase):
"""Test Case for AOEDriver"""
driver_name = "nova.volume.driver.AOEDriver"
def setUp(self):
super(AOETestCase, self).setUp()
def tearDown(self):
super(AOETestCase, self).tearDown()
def _attach_volume(self):
"""Attach volumes to an instance. This function also sets
a fake log message."""
volume_id_list = []
for index in xrange(3):
vol = {}
vol['size'] = 0
volume_id = db.volume_create(self.context,
vol)['id']
self.volume.create_volume(self.context, volume_id)
# each volume has a different mountpoint
mountpoint = "/dev/sd" + chr((ord('b') + index))
db.volume_attached(self.context, volume_id, self.instance_id,
mountpoint)
(shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context,
volume_id)
self.output += "%s %s eth0 /dev/nova-volumes/vol-foo auto run\n" \
% (shelf_id, blade_id)
volume_id_list.append(volume_id)
return volume_id_list
def test_check_for_export_with_no_volume(self):
"""No log message when no volume is attached to an instance."""
self.stream.truncate(0)
self.volume.check_for_export(self.context, self.instance_id)
self.assertEqual(self.stream.getvalue(), '')
def test_check_for_export_with_all_vblade_processes(self):
"""No log message when all the vblade processes are running."""
volume_id_list = self._attach_volume()
self.stream.truncate(0)
self.volume.check_for_export(self.context, self.instance_id)
self.assertEqual(self.stream.getvalue(), '')
self._detach_volume(volume_id_list)
def test_check_for_export_with_vblade_process_missing(self):
"""Output a warning message when some vblade processes aren't
running."""
volume_id_list = self._attach_volume()
# the first vblade process isn't running
self.output = self.output.replace("run", "down", 1)
(shelf_id, blade_id) = db.volume_get_shelf_and_blade(self.context,
volume_id_list[0])
msg_is_match = False
self.stream.truncate(0)
try:
self.volume.check_for_export(self.context, self.instance_id)
except exception.ProcessExecutionError, e:
volume_id = volume_id_list[0]
msg = _("Cannot confirm exported volume id:%(volume_id)s. "
"vblade process for e%(shelf_id)s.%(blade_id)s "
"isn't running.") % locals()
msg_is_match = (0 <= e.message.find(msg))
self.assertTrue(msg_is_match)
self._detach_volume(volume_id_list)
class ISCSITestCase(DriverTestCase): class ISCSITestCase(DriverTestCase):
"""Test Case for ISCSIDriver""" """Test Case for ISCSIDriver"""
driver_name = "nova.volume.driver.ISCSIDriver" driver_name = "nova.volume.driver.ISCSIDriver"
@@ -408,7 +331,7 @@ class ISCSITestCase(DriverTestCase):
self.assertEqual(self.stream.getvalue(), '') self.assertEqual(self.stream.getvalue(), '')
def test_check_for_export_with_all_volume_exported(self): def test_check_for_export_with_all_volume_exported(self):
"""No log message when all the vblade processes are running.""" """No log message when all the processes are running."""
volume_id_list = self._attach_volume() volume_id_list = self._attach_volume()
self.mox.StubOutWithMock(self.volume.driver, '_execute') self.mox.StubOutWithMock(self.volume.driver, '_execute')
@@ -431,7 +354,6 @@ class ISCSITestCase(DriverTestCase):
by ietd.""" by ietd."""
volume_id_list = self._attach_volume() volume_id_list = self._attach_volume()
# the first vblade process isn't running
tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0]) tid = db.volume_get_iscsi_target_num(self.context, volume_id_list[0])
self.mox.StubOutWithMock(self.volume.driver, '_execute') self.mox.StubOutWithMock(self.volume.driver, '_execute')
self.volume.driver._execute("ietadm", "--op", "show", self.volume.driver._execute("ietadm", "--op", "show",

View File

@@ -99,6 +99,20 @@ class XenAPIVolumeTestCase(test.TestCase):
vol['attach_status'] = "detached" vol['attach_status'] = "detached"
return db.volume_create(self.context, vol) return db.volume_create(self.context, vol)
@staticmethod
def _make_info():
return {
'driver_volume_type': 'iscsi',
'data': {
'volume_id': 1,
'target_iqn': 'iqn.2010-10.org.openstack:volume-00000001',
'target_portal': '127.0.0.1:3260,fake',
'auth_method': 'CHAP',
'auth_method': 'fake',
'auth_method': 'fake',
}
}
def test_create_iscsi_storage(self): def test_create_iscsi_storage(self):
"""This shows how to test helper classes' methods.""" """This shows how to test helper classes' methods."""
stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests) stubs.stubout_session(self.stubs, stubs.FakeSessionForVolumeTests)
@@ -106,7 +120,7 @@ class XenAPIVolumeTestCase(test.TestCase):
helper = volume_utils.VolumeHelper helper = volume_utils.VolumeHelper
helper.XenAPI = session.get_imported_xenapi() helper.XenAPI = session.get_imported_xenapi()
vol = self._create_volume() vol = self._create_volume()
info = helper.parse_volume_info(vol['id'], '/dev/sdc') info = helper.parse_volume_info(self._make_info(), '/dev/sdc')
label = 'SR-%s' % vol['id'] label = 'SR-%s' % vol['id']
description = 'Test-SR' description = 'Test-SR'
sr_ref = helper.create_iscsi_storage(session, info, label, description) sr_ref = helper.create_iscsi_storage(session, info, label, description)
@@ -124,8 +138,9 @@ class XenAPIVolumeTestCase(test.TestCase):
# oops, wrong mount point! # oops, wrong mount point!
self.assertRaises(volume_utils.StorageError, self.assertRaises(volume_utils.StorageError,
helper.parse_volume_info, helper.parse_volume_info,
vol['id'], self._make_info(),
'/dev/sd') 'dev/sd'
)
db.volume_destroy(context.get_admin_context(), vol['id']) db.volume_destroy(context.get_admin_context(), vol['id'])
def test_attach_volume(self): def test_attach_volume(self):
@@ -135,7 +150,8 @@ class XenAPIVolumeTestCase(test.TestCase):
volume = self._create_volume() volume = self._create_volume()
instance = db.instance_create(self.context, self.instance_values) instance = db.instance_create(self.context, self.instance_values)
vm = xenapi_fake.create_vm(instance.name, 'Running') vm = xenapi_fake.create_vm(instance.name, 'Running')
result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc') result = conn.attach_volume(self._make_info(),
instance.name, '/dev/sdc')
def check(): def check():
# check that the VM has a VBD attached to it # check that the VM has a VBD attached to it

View File

@@ -145,11 +145,13 @@ class ComputeDriver(object):
the creation of the new instance. the creation of the new instance.
:param network_info: :param network_info:
:py:meth:`~nova.network.manager.NetworkManager.get_instance_nw_info` :py:meth:`~nova.network.manager.NetworkManager.get_instance_nw_info`
:param block_device_info: :param block_device_info: Information about block devices to be
attached to the instance.
""" """
raise NotImplementedError() raise NotImplementedError()
def destroy(self, instance, network_info, cleanup=True): def destroy(self, instance, network_info, block_device_info=None,
cleanup=True):
"""Destroy (shutdown and delete) the specified instance. """Destroy (shutdown and delete) the specified instance.
If the instance is not found (for example if networking failed), this If the instance is not found (for example if networking failed), this
@@ -159,6 +161,8 @@ class ComputeDriver(object):
:param instance: Instance object as returned by DB layer. :param instance: Instance object as returned by DB layer.
:param network_info: :param network_info:
:py:meth:`~nova.network.manager.NetworkManager.get_instance_nw_info` :py:meth:`~nova.network.manager.NetworkManager.get_instance_nw_info`
:param block_device_info: Information about block devices that should
be detached from the instance.
:param cleanup: :param cleanup:
""" """
@@ -203,12 +207,12 @@ class ComputeDriver(object):
# TODO(Vek): Need to pass context in for access to auth_token # TODO(Vek): Need to pass context in for access to auth_token
raise NotImplementedError() raise NotImplementedError()
def attach_volume(self, context, instance_id, volume_id, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
"""Attach the disk at device_path to the instance at mountpoint""" """Attach the disk to the instance at mountpoint using info"""
raise NotImplementedError() raise NotImplementedError()
def detach_volume(self, context, instance_id, volume_id): def detach_volume(self, connection_info, instance_name, mountpoint):
"""Detach the disk attached to the instance at mountpoint""" """Detach the disk attached to the instance"""
raise NotImplementedError() raise NotImplementedError()
def compare_cpu(self, cpu_info): def compare_cpu(self, cpu_info):

View File

@@ -92,6 +92,10 @@ class FakeConnection(driver.ComputeDriver):
info_list.append(self._map_to_instance_info(instance)) info_list.append(self._map_to_instance_info(instance))
return info_list return info_list
def plug_vifs(self, instance, network_info):
"""Plugin VIFs into networks."""
pass
def spawn(self, context, instance, def spawn(self, context, instance,
network_info=None, block_device_info=None): network_info=None, block_device_info=None):
name = instance.name name = instance.name
@@ -148,7 +152,8 @@ class FakeConnection(driver.ComputeDriver):
def resume(self, instance, callback): def resume(self, instance, callback):
pass pass
def destroy(self, instance, network_info, cleanup=True): def destroy(self, instance, network_info, block_device_info=None,
cleanup=True):
key = instance['name'] key = instance['name']
if key in self.instances: if key in self.instances:
del self.instances[key] del self.instances[key]
@@ -156,13 +161,15 @@ class FakeConnection(driver.ComputeDriver):
LOG.warning("Key '%s' not in instances '%s'" % LOG.warning("Key '%s' not in instances '%s'" %
(key, self.instances)) (key, self.instances))
def attach_volume(self, instance_name, device_path, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
"""Attach the disk to the instance at mountpoint using info"""
if not instance_name in self._mounts: if not instance_name in self._mounts:
self._mounts[instance_name] = {} self._mounts[instance_name] = {}
self._mounts[instance_name][mountpoint] = device_path self._mounts[instance_name][mountpoint] = connection_info
return True return True
def detach_volume(self, instance_name, mountpoint): def detach_volume(self, connection_info, instance_name, mountpoint):
"""Detach the disk attached to the instance"""
try: try:
del self._mounts[instance_name][mountpoint] del self._mounts[instance_name][mountpoint]
except KeyError: except KeyError:
@@ -233,11 +240,19 @@ class FakeConnection(driver.ComputeDriver):
"""This method is supported only by libvirt.""" """This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.') raise NotImplementedError('This method is supported only by libvirt.')
def get_instance_disk_info(self, ctxt, instance_ref):
"""This method is supported only by libvirt."""
return
def live_migration(self, context, instance_ref, dest, def live_migration(self, context, instance_ref, dest,
post_method, recover_method, block_migration=False): post_method, recover_method, block_migration=False):
"""This method is supported only by libvirt.""" """This method is supported only by libvirt."""
return return
def pre_live_migration(self, block_device_info):
"""This method is supported only by libvirt."""
return
def unfilter_instance(self, instance_ref, network_info): def unfilter_instance(self, instance_ref, network_info):
"""This method is supported only by libvirt.""" """This method is supported only by libvirt."""
raise NotImplementedError('This method is supported only by libvirt.') raise NotImplementedError('This method is supported only by libvirt.')

View File

@@ -374,7 +374,8 @@ class HyperVConnection(driver.ComputeDriver):
raise exception.InstanceNotFound(instance_id=instance.id) raise exception.InstanceNotFound(instance_id=instance.id)
self._set_vm_state(instance.name, 'Reboot') self._set_vm_state(instance.name, 'Reboot')
def destroy(self, instance, network_info, cleanup=True): def destroy(self, instance, network_info, block_device_info=None,
cleanup=True):
"""Destroy the VM. Also destroy the associated VHD disk files""" """Destroy the VM. Also destroy the associated VHD disk files"""
LOG.debug(_("Got request to destroy vm %s"), instance.name) LOG.debug(_("Got request to destroy vm %s"), instance.name)
vm = self._lookup(instance.name) vm = self._lookup(instance.name)
@@ -474,12 +475,12 @@ class HyperVConnection(driver.ComputeDriver):
LOG.error(msg) LOG.error(msg)
raise Exception(msg) raise Exception(msg)
def attach_volume(self, instance_name, device_path, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
vm = self._lookup(instance_name) vm = self._lookup(instance_name)
if vm is None: if vm is None:
raise exception.InstanceNotFound(instance_id=instance_name) raise exception.InstanceNotFound(instance_id=instance_name)
def detach_volume(self, instance_name, mountpoint): def detach_volume(self, connection_info, instance_name, mountpoint):
vm = self._lookup(instance_name) vm = self._lookup(instance_name)
if vm is None: if vm is None:
raise exception.InstanceNotFound(instance_id=instance_name) raise exception.InstanceNotFound(instance_id=instance_name)

View File

@@ -95,15 +95,7 @@
</disk> </disk>
#end if #end if
#for $vol in $volumes #for $vol in $volumes
<disk type='${vol.type}'> ${vol}
<driver type='raw'/>
#if $vol.type == 'network'
<source protocol='${vol.protocol}' name='${vol.name}'/>
#else
<source dev='${vol.device_path}'/>
#end if
<target dev='${vol.mount_device}' bus='${disk_bus}'/>
</disk>
#end for #end for
#end if #end if
#if $getVar('config_drive', False) #if $getVar('config_drive', False)

View File

@@ -134,6 +134,12 @@ flags.DEFINE_string('libvirt_vif_type', 'bridge',
flags.DEFINE_string('libvirt_vif_driver', flags.DEFINE_string('libvirt_vif_driver',
'nova.virt.libvirt.vif.LibvirtBridgeDriver', 'nova.virt.libvirt.vif.LibvirtBridgeDriver',
'The libvirt VIF driver to configure the VIFs.') 'The libvirt VIF driver to configure the VIFs.')
flags.DEFINE_list('libvirt_volume_drivers',
['iscsi=nova.virt.libvirt.volume.LibvirtISCSIVolumeDriver',
'local=nova.virt.libvirt.volume.LibvirtVolumeDriver',
'rdb=nova.virt.libvirt.volume.LibvirtNetVolumeDriver',
'sheepdog=nova.virt.libvirt.volume.LibvirtNetVolumeDriver'],
'Libvirt handlers for remote volumes.')
flags.DEFINE_string('default_local_format', flags.DEFINE_string('default_local_format',
None, None,
'The default format a local_volume will be formatted with ' 'The default format a local_volume will be formatted with '
@@ -184,6 +190,11 @@ class LibvirtConnection(driver.ComputeDriver):
fw_class = utils.import_class(FLAGS.firewall_driver) fw_class = utils.import_class(FLAGS.firewall_driver)
self.firewall_driver = fw_class(get_connection=self._get_connection) self.firewall_driver = fw_class(get_connection=self._get_connection)
self.vif_driver = utils.import_object(FLAGS.libvirt_vif_driver) self.vif_driver = utils.import_object(FLAGS.libvirt_vif_driver)
self.volume_drivers = {}
for driver_str in FLAGS.libvirt_volume_drivers:
driver_type, _sep, driver = driver_str.partition('=')
driver_class = utils.import_class(driver)
self.volume_drivers[driver_type] = driver_class(self)
def init_host(self, host): def init_host(self, host):
# NOTE(nsokolov): moved instance restarting to ComputeManager # NOTE(nsokolov): moved instance restarting to ComputeManager
@@ -261,7 +272,8 @@ class LibvirtConnection(driver.ComputeDriver):
for (network, mapping) in network_info: for (network, mapping) in network_info:
self.vif_driver.plug(instance, network, mapping) self.vif_driver.plug(instance, network, mapping)
def destroy(self, instance, network_info, cleanup=True): def destroy(self, instance, network_info, block_device_info=None,
cleanup=True):
instance_name = instance['name'] instance_name = instance['name']
try: try:
@@ -325,6 +337,15 @@ class LibvirtConnection(driver.ComputeDriver):
self.firewall_driver.unfilter_instance(instance, self.firewall_driver.unfilter_instance(instance,
network_info=network_info) network_info=network_info)
# NOTE(vish): we disconnect from volumes regardless
block_device_mapping = driver.block_device_info_get_mapping(
block_device_info)
for vol in block_device_mapping:
connection_info = vol['connection_info']
mountpoint = vol['mount_device']
xml = self.volume_driver_method('disconnect_volume',
connection_info,
mountpoint)
if cleanup: if cleanup:
self._cleanup(instance) self._cleanup(instance)
@@ -340,24 +361,22 @@ class LibvirtConnection(driver.ComputeDriver):
if os.path.exists(target): if os.path.exists(target):
shutil.rmtree(target) shutil.rmtree(target)
def volume_driver_method(self, method_name, connection_info,
*args, **kwargs):
driver_type = connection_info.get('driver_volume_type')
if not driver_type in self.volume_drivers:
raise exception.VolumeDriverNotFound(driver_type=driver_type)
driver = self.volume_drivers[driver_type]
method = getattr(driver, method_name)
return method(connection_info, *args, **kwargs)
@exception.wrap_exception() @exception.wrap_exception()
def attach_volume(self, instance_name, device_path, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
virt_dom = self._lookup_by_name(instance_name) virt_dom = self._lookup_by_name(instance_name)
mount_device = mountpoint.rpartition("/")[2] mount_device = mountpoint.rpartition("/")[2]
(type, protocol, name) = \ xml = self.volume_driver_method('connect_volume',
self._get_volume_device_info(device_path) connection_info,
if type == 'block': mount_device)
xml = """<disk type='block'>
<driver name='qemu' type='raw'/>
<source dev='%s'/>
<target dev='%s' bus='virtio'/>
</disk>""" % (device_path, mount_device)
elif type == 'network':
xml = """<disk type='network'>
<driver name='qemu' type='raw'/>
<source protocol='%s' name='%s'/>
<target dev='%s' bus='virtio'/>
</disk>""" % (protocol, name, mount_device)
virt_dom.attachDevice(xml) virt_dom.attachDevice(xml)
def _get_disk_xml(self, xml, device): def _get_disk_xml(self, xml, device):
@@ -381,13 +400,21 @@ class LibvirtConnection(driver.ComputeDriver):
doc.freeDoc() doc.freeDoc()
@exception.wrap_exception() @exception.wrap_exception()
def detach_volume(self, instance_name, mountpoint): def detach_volume(self, connection_info, instance_name, mountpoint):
virt_dom = self._lookup_by_name(instance_name)
mount_device = mountpoint.rpartition("/")[2] mount_device = mountpoint.rpartition("/")[2]
try:
# NOTE(vish): This is called to cleanup volumes after live
# migration, so we should still logout even if
# the instance doesn't exist here anymore.
virt_dom = self._lookup_by_name(instance_name)
xml = self._get_disk_xml(virt_dom.XMLDesc(0), mount_device) xml = self._get_disk_xml(virt_dom.XMLDesc(0), mount_device)
if not xml: if not xml:
raise exception.DiskNotFound(location=mount_device) raise exception.DiskNotFound(location=mount_device)
virt_dom.detachDevice(xml) virt_dom.detachDevice(xml)
finally:
self.volume_driver_method('disconnect_volume',
connection_info,
mount_device)
@exception.wrap_exception() @exception.wrap_exception()
def snapshot(self, context, instance, image_href): def snapshot(self, context, instance, image_href):
@@ -1049,15 +1076,6 @@ class LibvirtConnection(driver.ComputeDriver):
LOG.debug(_("block_device_list %s"), block_device_list) LOG.debug(_("block_device_list %s"), block_device_list)
return block_device.strip_dev(mount_device) in block_device_list return block_device.strip_dev(mount_device) in block_device_list
def _get_volume_device_info(self, device_path):
if device_path.startswith('/dev/'):
return ('block', None, None)
elif ':' in device_path:
(protocol, name) = device_path.split(':')
return ('network', protocol, name)
else:
raise exception.InvalidDevicePath(path=device_path)
def _prepare_xml_info(self, instance, network_info, rescue, def _prepare_xml_info(self, instance, network_info, rescue,
block_device_info=None): block_device_info=None):
block_device_mapping = driver.block_device_info_get_mapping( block_device_mapping = driver.block_device_info_get_mapping(
@@ -1075,10 +1093,14 @@ class LibvirtConnection(driver.ComputeDriver):
else: else:
driver_type = 'raw' driver_type = 'raw'
volumes = []
for vol in block_device_mapping: for vol in block_device_mapping:
vol['mount_device'] = block_device.strip_dev(vol['mount_device']) connection_info = vol['connection_info']
(vol['type'], vol['protocol'], vol['name']) = \ mountpoint = vol['mount_device']
self._get_volume_device_info(vol['device_path']) xml = self.volume_driver_method('connect_volume',
connection_info,
mountpoint)
volumes.append(xml)
ebs_root = self._volume_in_mapping(self.default_root_device, ebs_root = self._volume_in_mapping(self.default_root_device,
block_device_info) block_device_info)
@@ -1111,7 +1133,7 @@ class LibvirtConnection(driver.ComputeDriver):
'nics': nics, 'nics': nics,
'ebs_root': ebs_root, 'ebs_root': ebs_root,
'local_device': local_device, 'local_device': local_device,
'volumes': block_device_mapping, 'volumes': volumes,
'use_virtio_for_bridges': 'use_virtio_for_bridges':
FLAGS.libvirt_use_virtio_for_bridges, FLAGS.libvirt_use_virtio_for_bridges,
'ephemerals': ephemerals} 'ephemerals': ephemerals}
@@ -1707,6 +1729,24 @@ class LibvirtConnection(driver.ComputeDriver):
timer.f = wait_for_live_migration timer.f = wait_for_live_migration
timer.start(interval=0.5, now=True) timer.start(interval=0.5, now=True)
def pre_live_migration(self, block_device_info):
"""Preparation live migration.
:params block_device_info:
It must be the result of _get_instance_volume_bdms()
at compute manager.
"""
# Establishing connection to volume server.
block_device_mapping = driver.block_device_info_get_mapping(
block_device_info)
for vol in block_device_mapping:
connection_info = vol['connection_info']
mountpoint = vol['mount_device']
xml = self.volume_driver_method('connect_volume',
connection_info,
mountpoint)
def pre_block_migration(self, ctxt, instance_ref, disk_info_json): def pre_block_migration(self, ctxt, instance_ref, disk_info_json):
"""Preparation block migration. """Preparation block migration.

149
nova/virt/libvirt/volume.py Normal file
View File

@@ -0,0 +1,149 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Volume drivers for libvirt."""
import os
import time
from nova import exception
from nova import flags
from nova import log as logging
from nova import utils
LOG = logging.getLogger('nova.virt.libvirt.volume')
FLAGS = flags.FLAGS
flags.DECLARE('num_iscsi_scan_tries', 'nova.volume.driver')
class LibvirtVolumeDriver(object):
"""Base class for volume drivers."""
def __init__(self, connection):
self.connection = connection
def connect_volume(self, connection_info, mount_device):
"""Connect the volume. Returns xml for libvirt."""
device_path = connection_info['data']['device_path']
xml = """<disk type='block'>
<driver name='qemu' type='raw'/>
<source dev='%s'/>
<target dev='%s' bus='virtio'/>
</disk>""" % (device_path, mount_device)
return xml
def disconnect_volume(self, connection_info, mount_device):
"""Disconnect the volume"""
pass
class LibvirtNetVolumeDriver(LibvirtVolumeDriver):
"""Driver to attach Network volumes to libvirt."""
def connect_volume(self, connection_info, mount_device):
protocol = connection_info['driver_volume_type']
name = connection_info['data']['name']
xml = """<disk type='network'>
<driver name='qemu' type='raw'/>
<source protocol='%s' name='%s'/>
<target dev='%s' bus='virtio'/>
</disk>""" % (protocol, name, mount_device)
return xml
class LibvirtISCSIVolumeDriver(LibvirtVolumeDriver):
"""Driver to attach Network volumes to libvirt."""
def _run_iscsiadm(self, iscsi_properties, iscsi_command):
(out, err) = utils.execute('iscsiadm', '-m', 'node', '-T',
iscsi_properties['target_iqn'],
'-p', iscsi_properties['target_portal'],
*iscsi_command, run_as_root=True)
LOG.debug("iscsiadm %s: stdout=%s stderr=%s" %
(iscsi_command, out, err))
return (out, err)
def _iscsiadm_update(self, iscsi_properties, property_key, property_value):
iscsi_command = ('--op', 'update', '-n', property_key,
'-v', property_value)
return self._run_iscsiadm(iscsi_properties, iscsi_command)
def connect_volume(self, connection_info, mount_device):
"""Attach the volume to instance_name"""
iscsi_properties = connection_info['data']
try:
# NOTE(vish): if we are on the same host as nova volume, the
# discovery makes the target so we don't need to
# run --op new
self._run_iscsiadm(iscsi_properties, ())
except exception.ProcessExecutionError:
self._run_iscsiadm(iscsi_properties, ('--op', 'new'))
if iscsi_properties.get('auth_method'):
self._iscsiadm_update(iscsi_properties,
"node.session.auth.authmethod",
iscsi_properties['auth_method'])
self._iscsiadm_update(iscsi_properties,
"node.session.auth.username",
iscsi_properties['auth_username'])
self._iscsiadm_update(iscsi_properties,
"node.session.auth.password",
iscsi_properties['auth_password'])
self._run_iscsiadm(iscsi_properties, ("--login",))
self._iscsiadm_update(iscsi_properties, "node.startup", "automatic")
host_device = ("/dev/disk/by-path/ip-%s-iscsi-%s-lun-0" %
(iscsi_properties['target_portal'],
iscsi_properties['target_iqn']))
# The /dev/disk/by-path/... node is not always present immediately
# TODO(justinsb): This retry-with-delay is a pattern, move to utils?
tries = 0
while not os.path.exists(host_device):
if tries >= FLAGS.num_iscsi_scan_tries:
raise exception.Error(_("iSCSI device not found at %s") %
(host_device))
LOG.warn(_("ISCSI volume not yet found at: %(mount_device)s. "
"Will rescan & retry. Try number: %(tries)s") %
locals())
# The rescan isn't documented as being necessary(?), but it helps
self._run_iscsiadm(iscsi_properties, ("--rescan",))
tries = tries + 1
if not os.path.exists(host_device):
time.sleep(tries ** 2)
if tries != 0:
LOG.debug(_("Found iSCSI node %(mount_device)s "
"(after %(tries)s rescans)") %
locals())
connection_info['data']['device_path'] = host_device
sup = super(LibvirtISCSIVolumeDriver, self)
return sup.connect_volume(connection_info, mount_device)
def disconnect_volume(self, connection_info, mount_device):
"""Detach the volume from instance_name"""
sup = super(LibvirtISCSIVolumeDriver, self)
sup.disconnect_volume(connection_info, mount_device)
iscsi_properties = connection_info['data']
self._iscsiadm_update(iscsi_properties, "node.startup", "manual")
self._run_iscsiadm(iscsi_properties, ("--logout",))
self._run_iscsiadm(iscsi_properties, ('--op', 'delete'))

View File

@@ -137,7 +137,8 @@ class VMWareESXConnection(driver.ComputeDriver):
"""Reboot VM instance.""" """Reboot VM instance."""
self._vmops.reboot(instance, network_info) self._vmops.reboot(instance, network_info)
def destroy(self, instance, network_info, cleanup=True): def destroy(self, instance, network_info, block_device_info=None,
cleanup=True):
"""Destroy VM instance.""" """Destroy VM instance."""
self._vmops.destroy(instance, network_info) self._vmops.destroy(instance, network_info)
@@ -173,11 +174,11 @@ class VMWareESXConnection(driver.ComputeDriver):
"""Return link to instance's ajax console.""" """Return link to instance's ajax console."""
return self._vmops.get_ajax_console(instance) return self._vmops.get_ajax_console(instance)
def attach_volume(self, instance_name, device_path, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
"""Attach volume storage to VM instance.""" """Attach volume storage to VM instance."""
pass pass
def detach_volume(self, instance_name, mountpoint): def detach_volume(self, connection_info, instance_name, mountpoint):
"""Detach volume storage to VM instance.""" """Detach volume storage to VM instance."""
pass pass

View File

@@ -147,7 +147,7 @@ class VolumeHelper(HelperBase):
% sr_ref) % sr_ref)
@classmethod @classmethod
def parse_volume_info(cls, device_path, mountpoint): def parse_volume_info(cls, connection_info, mountpoint):
""" """
Parse device_path and mountpoint as they can be used by XenAPI. Parse device_path and mountpoint as they can be used by XenAPI.
In particular, the mountpoint (e.g. /dev/sdc) must be translated In particular, the mountpoint (e.g. /dev/sdc) must be translated
@@ -161,11 +161,12 @@ class VolumeHelper(HelperBase):
the iscsi driver to set them. the iscsi driver to set them.
""" """
device_number = VolumeHelper.mountpoint_to_number(mountpoint) device_number = VolumeHelper.mountpoint_to_number(mountpoint)
volume_id = _get_volume_id(device_path) data = connection_info['data']
(iscsi_name, iscsi_portal) = _get_target(volume_id) volume_id = data['volume_id']
target_host = _get_target_host(iscsi_portal) target_portal = data['target_portal']
target_port = _get_target_port(iscsi_portal) target_host = _get_target_host(target_portal)
target_iqn = _get_iqn(iscsi_name, volume_id) target_port = _get_target_port(target_portal)
target_iqn = data['target_iqn']
LOG.debug('(vol_id,number,host,port,iqn): (%s,%s,%s,%s)', LOG.debug('(vol_id,number,host,port,iqn): (%s,%s,%s,%s)',
volume_id, target_host, target_port, target_iqn) volume_id, target_host, target_port, target_iqn)
if (device_number < 0) or \ if (device_number < 0) or \
@@ -173,7 +174,7 @@ class VolumeHelper(HelperBase):
(target_host is None) or \ (target_host is None) or \
(target_iqn is None): (target_iqn is None):
raise StorageError(_('Unable to obtain target information' raise StorageError(_('Unable to obtain target information'
' %(device_path)s, %(mountpoint)s') % locals()) ' %(data)s, %(mountpoint)s') % locals())
volume_info = {} volume_info = {}
volume_info['deviceNumber'] = device_number volume_info['deviceNumber'] = device_number
volume_info['volumeId'] = volume_id volume_info['volumeId'] = volume_id

View File

@@ -40,18 +40,21 @@ class VolumeOps(object):
VolumeHelper.XenAPI = self.XenAPI VolumeHelper.XenAPI = self.XenAPI
VMHelper.XenAPI = self.XenAPI VMHelper.XenAPI = self.XenAPI
def attach_volume(self, instance_name, device_path, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
"""Attach volume storage to VM instance""" """Attach volume storage to VM instance"""
# Before we start, check that the VM exists # Before we start, check that the VM exists
vm_ref = VMHelper.lookup(self._session, instance_name) vm_ref = VMHelper.lookup(self._session, instance_name)
if vm_ref is None: if vm_ref is None:
raise exception.InstanceNotFound(instance_id=instance_name) raise exception.InstanceNotFound(instance_id=instance_name)
# NOTE: No Resource Pool concept so far # NOTE: No Resource Pool concept so far
LOG.debug(_("Attach_volume: %(instance_name)s, %(device_path)s," LOG.debug(_("Attach_volume: %(connection_info)s, %(instance_name)s,"
" %(mountpoint)s") % locals()) " %(mountpoint)s") % locals())
driver_type = connection_info['driver_volume_type']
if driver_type != 'iscsi':
raise exception.VolumeDriverNotFound(driver_type=driver_type)
# Create the iSCSI SR, and the PDB through which hosts access SRs. # Create the iSCSI SR, and the PDB through which hosts access SRs.
# But first, retrieve target info, like Host, IQN, LUN and SCSIID # But first, retrieve target info, like Host, IQN, LUN and SCSIID
vol_rec = VolumeHelper.parse_volume_info(device_path, mountpoint) vol_rec = VolumeHelper.parse_volume_info(connection_info, mountpoint)
label = 'SR-%s' % vol_rec['volumeId'] label = 'SR-%s' % vol_rec['volumeId']
description = 'Disk-for:%s' % instance_name description = 'Disk-for:%s' % instance_name
# Create SR # Create SR
@@ -92,7 +95,7 @@ class VolumeOps(object):
LOG.info(_('Mountpoint %(mountpoint)s attached to' LOG.info(_('Mountpoint %(mountpoint)s attached to'
' instance %(instance_name)s') % locals()) ' instance %(instance_name)s') % locals())
def detach_volume(self, instance_name, mountpoint): def detach_volume(self, connection_info, instance_name, mountpoint):
"""Detach volume storage to VM instance""" """Detach volume storage to VM instance"""
# Before we start, check that the VM exists # Before we start, check that the VM exists
vm_ref = VMHelper.lookup(self._session, instance_name) vm_ref = VMHelper.lookup(self._session, instance_name)

View File

@@ -222,7 +222,8 @@ class XenAPIConnection(driver.ComputeDriver):
""" """
self._vmops.inject_file(instance, b64_path, b64_contents) self._vmops.inject_file(instance, b64_path, b64_contents)
def destroy(self, instance, network_info, cleanup=True): def destroy(self, instance, network_info, block_device_info=None,
cleanup=True):
"""Destroy VM instance""" """Destroy VM instance"""
self._vmops.destroy(instance, network_info) self._vmops.destroy(instance, network_info)
@@ -302,15 +303,17 @@ class XenAPIConnection(driver.ComputeDriver):
xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url) xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)
return xs_url.netloc return xs_url.netloc
def attach_volume(self, instance_name, device_path, mountpoint): def attach_volume(self, connection_info, instance_name, mountpoint):
"""Attach volume storage to VM instance""" """Attach volume storage to VM instance"""
return self._volumeops.attach_volume(instance_name, return self._volumeops.attach_volume(connection_info,
device_path, instance_name,
mountpoint) mountpoint)
def detach_volume(self, instance_name, mountpoint): def detach_volume(self, connection_info, instance_name, mountpoint):
"""Detach volume storage to VM instance""" """Detach volume storage to VM instance"""
return self._volumeops.detach_volume(instance_name, mountpoint) return self._volumeops.detach_volume(connection_info,
instance_name,
mountpoint)
def get_console_pool_info(self, console_type): def get_console_pool_info(self, console_type):
xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url) xs_url = urlparse.urlparse(FLAGS.xenapi_connection_url)

View File

@@ -23,7 +23,6 @@ Handles all requests relating to volumes.
from eventlet import greenthread from eventlet import greenthread
from nova import db
from nova import exception from nova import exception
from nova import flags from nova import flags
from nova import log as logging from nova import log as logging
@@ -180,12 +179,49 @@ class API(base.Base):
if volume['status'] == "available": if volume['status'] == "available":
raise exception.ApiError(_("Volume is already detached")) raise exception.ApiError(_("Volume is already detached"))
def remove_from_compute(self, context, volume_id, host): def remove_from_compute(self, context, instance_id, volume_id, host):
"""Remove volume from specified compute host.""" """Remove volume from specified compute host."""
rpc.call(context, rpc.call(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host), self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "remove_volume", {"method": "remove_volume_connection",
"args": {'volume_id': volume_id}}) "args": {'instance_id': instance_id,
'volume_id': volume_id}})
def attach(self, context, volume_id, instance_id, mountpoint):
volume = self.get(context, volume_id)
host = volume['host']
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "attach_volume",
"args": {"volume_id": volume_id,
"instance_id": instance_id,
"mountpoint": mountpoint}})
def detach(self, context, volume_id):
volume = self.get(context, volume_id)
host = volume['host']
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "detach_volume",
"args": {"volume_id": volume_id}})
def initialize_connection(self, context, volume_id, address):
volume = self.get(context, volume_id)
host = volume['host']
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "initialize_connection",
"args": {"volume_id": volume_id,
"address": address}})
def terminate_connection(self, context, volume_id, address):
volume = self.get(context, volume_id)
host = volume['host']
queue = self.db.queue_get_for(context, FLAGS.volume_topic, host)
return rpc.call(context, queue,
{"method": "terminate_connection",
"args": {"volume_id": volume_id,
"address": address}})
def _create_snapshot(self, context, volume_id, name, description, def _create_snapshot(self, context, volume_id, name, description,
force=False): force=False):

View File

@@ -20,8 +20,8 @@ Drivers for volumes.
""" """
import time
import os import os
import time
from xml.etree import ElementTree from xml.etree import ElementTree
from nova import exception from nova import exception
@@ -35,25 +35,17 @@ LOG = logging.getLogger("nova.volume.driver")
FLAGS = flags.FLAGS FLAGS = flags.FLAGS
flags.DEFINE_string('volume_group', 'nova-volumes', flags.DEFINE_string('volume_group', 'nova-volumes',
'Name for the VG that will contain exported volumes') 'Name for the VG that will contain exported volumes')
flags.DEFINE_string('aoe_eth_dev', 'eth0',
'Which device to export the volumes on')
flags.DEFINE_string('num_shell_tries', 3, flags.DEFINE_string('num_shell_tries', 3,
'number of times to attempt to run flakey shell commands') 'number of times to attempt to run flakey shell commands')
flags.DEFINE_string('num_iscsi_scan_tries', 3, flags.DEFINE_string('num_iscsi_scan_tries', 3,
'number of times to rescan iSCSI target to find volume') 'number of times to rescan iSCSI target to find volume')
flags.DEFINE_integer('num_shelves',
100,
'Number of vblade shelves')
flags.DEFINE_integer('blades_per_shelf',
16,
'Number of vblade blades per shelf')
flags.DEFINE_integer('iscsi_num_targets', flags.DEFINE_integer('iscsi_num_targets',
100, 100,
'Number of iscsi target ids per host') 'Number of iscsi target ids per host')
flags.DEFINE_string('iscsi_target_prefix', 'iqn.2010-10.org.openstack:', flags.DEFINE_string('iscsi_target_prefix', 'iqn.2010-10.org.openstack:',
'prefix for iscsi volumes') 'prefix for iscsi volumes')
flags.DEFINE_string('iscsi_ip_prefix', '$my_ip', flags.DEFINE_string('iscsi_ip_address', '$my_ip',
'discover volumes on the ip that starts with this prefix') 'use this ip for iscsi')
flags.DEFINE_string('rbd_pool', 'rbd', flags.DEFINE_string('rbd_pool', 'rbd',
'the rbd pool in which volumes are stored') 'the rbd pool in which volumes are stored')
@@ -202,146 +194,24 @@ class VolumeDriver(object):
"""Removes an export for a logical volume.""" """Removes an export for a logical volume."""
raise NotImplementedError() raise NotImplementedError()
def discover_volume(self, context, volume):
"""Discover volume on a remote host."""
raise NotImplementedError()
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
raise NotImplementedError()
def check_for_export(self, context, volume_id): def check_for_export(self, context, volume_id):
"""Make sure volume is exported.""" """Make sure volume is exported."""
raise NotImplementedError() raise NotImplementedError()
def initialize_connection(self, volume, address):
"""Allow connection to ip and return connection info."""
raise NotImplementedError()
def terminate_connection(self, volume, address):
"""Disallow connection from ip"""
raise NotImplementedError()
def get_volume_stats(self, refresh=False): def get_volume_stats(self, refresh=False):
"""Return the current state of the volume service. If 'refresh' is """Return the current state of the volume service. If 'refresh' is
True, run the update first.""" True, run the update first."""
return None return None
class AOEDriver(VolumeDriver):
"""WARNING! Deprecated. This driver will be removed in Essex. Its use
is not recommended.
Implements AOE specific volume commands."""
def __init__(self, *args, **kwargs):
LOG.warn(_("AOEDriver is deprecated and will be removed in Essex"))
super(AOEDriver, self).__init__(*args, **kwargs)
def ensure_export(self, context, volume):
# NOTE(vish): we depend on vblade-persist for recreating exports
pass
def _ensure_blades(self, context):
"""Ensure that blades have been created in datastore."""
total_blades = FLAGS.num_shelves * FLAGS.blades_per_shelf
if self.db.export_device_count(context) >= total_blades:
return
for shelf_id in xrange(FLAGS.num_shelves):
for blade_id in xrange(FLAGS.blades_per_shelf):
dev = {'shelf_id': shelf_id, 'blade_id': blade_id}
self.db.export_device_create_safe(context, dev)
def create_export(self, context, volume):
"""Creates an export for a logical volume."""
self._ensure_blades(context)
(shelf_id,
blade_id) = self.db.volume_allocate_shelf_and_blade(context,
volume['id'])
self._try_execute(
'vblade-persist', 'setup',
shelf_id,
blade_id,
FLAGS.aoe_eth_dev,
"/dev/%s/%s" %
(FLAGS.volume_group,
volume['name']),
run_as_root=True)
# NOTE(vish): The standard _try_execute does not work here
# because these methods throw errors if other
# volumes on this host are in the process of
# being created. The good news is the command
# still works for the other volumes, so we
# just wait a bit for the current volume to
# be ready and ignore any errors.
time.sleep(2)
self._execute('vblade-persist', 'auto', 'all',
check_exit_code=False, run_as_root=True)
self._execute('vblade-persist', 'start', 'all',
check_exit_code=False, run_as_root=True)
def remove_export(self, context, volume):
"""Removes an export for a logical volume."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
volume['id'])
self._try_execute('vblade-persist', 'stop',
shelf_id, blade_id, run_as_root=True)
self._try_execute('vblade-persist', 'destroy',
shelf_id, blade_id, run_as_root=True)
def discover_volume(self, context, _volume):
"""Discover volume on a remote host."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
_volume['id'])
self._execute('aoe-discover', run_as_root=True)
out, err = self._execute('aoe-stat', check_exit_code=False,
run_as_root=True)
device_path = 'e%(shelf_id)d.%(blade_id)d' % locals()
if out.find(device_path) >= 0:
return "/dev/etherd/%s" % device_path
else:
return
def undiscover_volume(self, _volume):
"""Undiscover volume on a remote host."""
pass
def check_for_export(self, context, volume_id):
"""Make sure volume is exported."""
(shelf_id,
blade_id) = self.db.volume_get_shelf_and_blade(context,
volume_id)
cmd = ('vblade-persist', 'ls', '--no-header')
out, _err = self._execute(*cmd, run_as_root=True)
exported = False
for line in out.split('\n'):
param = line.split(' ')
if len(param) == 6 and param[0] == str(shelf_id) \
and param[1] == str(blade_id) and param[-1] == "run":
exported = True
break
if not exported:
# Instance will be terminated in this case.
desc = _("Cannot confirm exported volume id:%(volume_id)s. "
"vblade process for e%(shelf_id)s.%(blade_id)s "
"isn't running.") % locals()
raise exception.ProcessExecutionError(out, _err, cmd=cmd,
description=desc)
class FakeAOEDriver(AOEDriver):
"""Logs calls instead of executing."""
def __init__(self, *args, **kwargs):
super(FakeAOEDriver, self).__init__(execute=self.fake_execute,
sync_exec=self.fake_execute,
*args, **kwargs)
def check_for_setup_error(self):
"""No setup necessary in fake mode."""
pass
@staticmethod
def fake_execute(cmd, *_args, **_kwargs):
"""Execute that simply logs the command."""
LOG.debug(_("FAKE AOE: %s"), cmd)
return (None, None)
class ISCSIDriver(VolumeDriver): class ISCSIDriver(VolumeDriver):
"""Executes commands relating to ISCSI volumes. """Executes commands relating to ISCSI volumes.
@@ -445,7 +315,7 @@ class ISCSIDriver(VolumeDriver):
'-t', 'sendtargets', '-p', volume['host'], '-t', 'sendtargets', '-p', volume['host'],
run_as_root=True) run_as_root=True)
for target in out.splitlines(): for target in out.splitlines():
if FLAGS.iscsi_ip_prefix in target and volume_name in target: if FLAGS.iscsi_ip_address in target and volume_name in target:
return target return target
return None return None
@@ -462,6 +332,8 @@ class ISCSIDriver(VolumeDriver):
:target_portal: the portal of the iSCSI target :target_portal: the portal of the iSCSI target
:volume_id: the id of the volume (currently used by xen)
:auth_method:, :auth_username:, :auth_password: :auth_method:, :auth_username:, :auth_password:
the authentication details. Right now, either auth_method is not the authentication details. Right now, either auth_method is not
@@ -491,6 +363,7 @@ class ISCSIDriver(VolumeDriver):
iscsi_portal = iscsi_target.split(",")[0] iscsi_portal = iscsi_target.split(",")[0]
properties['volume_id'] = volume['id']
properties['target_iqn'] = iscsi_name properties['target_iqn'] = iscsi_name
properties['target_portal'] = iscsi_portal properties['target_portal'] = iscsi_portal
@@ -519,64 +392,32 @@ class ISCSIDriver(VolumeDriver):
'-v', property_value) '-v', property_value)
return self._run_iscsiadm(iscsi_properties, iscsi_command) return self._run_iscsiadm(iscsi_properties, iscsi_command)
def discover_volume(self, context, volume): def initialize_connection(self, volume, address):
"""Discover volume on a remote host.""" """Initializes the connection and returns connection info.
The iscsi driver returns a driver_volume_type of 'iscsi'.
The format of the driver data is defined in _get_iscsi_properties.
Example return value:
{
'driver_volume_type': 'iscsi'
'data': {
'target_discovered': True,
'target_iqn': 'iqn.2010-10.org.openstack:volume-00000001',
'target_portal': '127.0.0.0.1:3260',
'volume_id': 1,
}
}
"""
iscsi_properties = self._get_iscsi_properties(volume) iscsi_properties = self._get_iscsi_properties(volume)
return {
'driver_volume_type': 'iscsi',
'data': iscsi_properties
}
if not iscsi_properties['target_discovered']: def terminate_connection(self, volume, address):
self._run_iscsiadm(iscsi_properties, ('--op', 'new')) pass
if iscsi_properties.get('auth_method'):
self._iscsiadm_update(iscsi_properties,
"node.session.auth.authmethod",
iscsi_properties['auth_method'])
self._iscsiadm_update(iscsi_properties,
"node.session.auth.username",
iscsi_properties['auth_username'])
self._iscsiadm_update(iscsi_properties,
"node.session.auth.password",
iscsi_properties['auth_password'])
self._run_iscsiadm(iscsi_properties, ("--login", ))
self._iscsiadm_update(iscsi_properties, "node.startup", "automatic")
mount_device = ("/dev/disk/by-path/ip-%s-iscsi-%s-lun-0" %
(iscsi_properties['target_portal'],
iscsi_properties['target_iqn']))
# The /dev/disk/by-path/... node is not always present immediately
# TODO(justinsb): This retry-with-delay is a pattern, move to utils?
tries = 0
while not os.path.exists(mount_device):
if tries >= FLAGS.num_iscsi_scan_tries:
raise exception.Error(_("iSCSI device not found at %s") %
(mount_device))
LOG.warn(_("ISCSI volume not yet found at: %(mount_device)s. "
"Will rescan & retry. Try number: %(tries)s") %
locals())
# The rescan isn't documented as being necessary(?), but it helps
self._run_iscsiadm(iscsi_properties, ("--rescan", ))
tries = tries + 1
if not os.path.exists(mount_device):
time.sleep(tries ** 2)
if tries != 0:
LOG.debug(_("Found iSCSI node %(mount_device)s "
"(after %(tries)s rescans)") %
locals())
return mount_device
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
iscsi_properties = self._get_iscsi_properties(volume)
self._iscsiadm_update(iscsi_properties, "node.startup", "manual")
self._run_iscsiadm(iscsi_properties, ("--logout", ))
self._run_iscsiadm(iscsi_properties, ('--op', 'delete'))
def check_for_export(self, context, volume_id): def check_for_export(self, context, volume_id):
"""Make sure volume is exported.""" """Make sure volume is exported."""
@@ -605,12 +446,13 @@ class FakeISCSIDriver(ISCSIDriver):
"""No setup necessary in fake mode.""" """No setup necessary in fake mode."""
pass pass
def discover_volume(self, context, volume): def initialize_connection(self, volume, address):
"""Discover volume on a remote host.""" return {
return "/dev/disk/by-path/volume-id-%d" % volume['id'] 'driver_volume_type': 'iscsi',
'data': {}
}
def undiscover_volume(self, volume): def terminate_connection(self, volume, address):
"""Undiscover volume on a remote host."""
pass pass
@staticmethod @staticmethod
@@ -675,12 +517,15 @@ class RBDDriver(VolumeDriver):
"""Removes an export for a logical volume""" """Removes an export for a logical volume"""
pass pass
def discover_volume(self, context, volume): def initialize_connection(self, volume, address):
"""Discover volume on a remote host""" return {
return "rbd:%s/%s" % (FLAGS.rbd_pool, volume['name']) 'driver_volume_type': 'rbd',
'data': {
'name': '%s/%s' % (FLAGS.rbd_pool, volume['name'])
}
}
def undiscover_volume(self, volume): def terminate_connection(self, volume, address):
"""Undiscover volume on a remote host"""
pass pass
@@ -738,12 +583,15 @@ class SheepdogDriver(VolumeDriver):
"""Removes an export for a logical volume""" """Removes an export for a logical volume"""
pass pass
def discover_volume(self, context, volume): def initialize_connection(self, volume, address):
"""Discover volume on a remote host""" return {
return "sheepdog:%s" % volume['name'] 'driver_volume_type': 'sheepdog',
'data': {
'name': volume['name']
}
}
def undiscover_volume(self, volume): def terminate_connection(self, volume, address):
"""Undiscover volume on a remote host"""
pass pass
@@ -772,11 +620,11 @@ class LoggingVolumeDriver(VolumeDriver):
def remove_export(self, context, volume): def remove_export(self, context, volume):
self.log_action('remove_export', volume) self.log_action('remove_export', volume)
def discover_volume(self, context, volume): def initialize_connection(self, volume, address):
self.log_action('discover_volume', volume) self.log_action('initialize_connection', volume)
def undiscover_volume(self, volume): def terminate_connection(self, volume, address):
self.log_action('undiscover_volume', volume) self.log_action('terminate_connection', volume)
def check_for_export(self, context, volume_id): def check_for_export(self, context, volume_id):
self.log_action('check_for_export', volume_id) self.log_action('check_for_export', volume_id)
@@ -906,6 +754,58 @@ class ZadaraBEDriver(ISCSIDriver):
LOG.debug(_("VSA BE delete_volume for %s suceeded"), volume['name']) LOG.debug(_("VSA BE delete_volume for %s suceeded"), volume['name'])
def _discover_volume(self, context, volume):
"""Discover volume on a remote host."""
iscsi_properties = self._get_iscsi_properties(volume)
if not iscsi_properties['target_discovered']:
self._run_iscsiadm(iscsi_properties, ('--op', 'new'))
if iscsi_properties.get('auth_method'):
self._iscsiadm_update(iscsi_properties,
"node.session.auth.authmethod",
iscsi_properties['auth_method'])
self._iscsiadm_update(iscsi_properties,
"node.session.auth.username",
iscsi_properties['auth_username'])
self._iscsiadm_update(iscsi_properties,
"node.session.auth.password",
iscsi_properties['auth_password'])
self._run_iscsiadm(iscsi_properties, ("--login", ))
self._iscsiadm_update(iscsi_properties, "node.startup", "automatic")
mount_device = ("/dev/disk/by-path/ip-%s-iscsi-%s-lun-0" %
(iscsi_properties['target_portal'],
iscsi_properties['target_iqn']))
# The /dev/disk/by-path/... node is not always present immediately
# TODO(justinsb): This retry-with-delay is a pattern, move to utils?
tries = 0
while not os.path.exists(mount_device):
if tries >= FLAGS.num_iscsi_scan_tries:
raise exception.Error(_("iSCSI device not found at %s") %
(mount_device))
LOG.warn(_("ISCSI volume not yet found at: %(mount_device)s. "
"Will rescan & retry. Try number: %(tries)s") %
locals())
# The rescan isn't documented as being necessary(?), but it helps
self._run_iscsiadm(iscsi_properties, ("--rescan", ))
tries = tries + 1
if not os.path.exists(mount_device):
time.sleep(tries ** 2)
if tries != 0:
LOG.debug(_("Found iSCSI node %(mount_device)s "
"(after %(tries)s rescans)") %
locals())
return mount_device
def local_path(self, volume): def local_path(self, volume):
if self._not_vsa_volume_or_drive(volume): if self._not_vsa_volume_or_drive(volume):
return super(ZadaraBEDriver, self).local_path(volume) return super(ZadaraBEDriver, self).local_path(volume)
@@ -913,7 +813,10 @@ class ZadaraBEDriver(ISCSIDriver):
if self._is_vsa_volume(volume): if self._is_vsa_volume(volume):
LOG.debug(_("\tFE VSA Volume %s local path call - call discover"), LOG.debug(_("\tFE VSA Volume %s local path call - call discover"),
volume['name']) volume['name'])
return super(ZadaraBEDriver, self).discover_volume(None, volume) # NOTE(vish): Copied discover from iscsi_driver since it is used
# but this should probably be refactored into a common
# area because it is used in libvirt driver.
return self._discover_volume(None, volume)
raise exception.Error(_("local_path not supported")) raise exception.Error(_("local_path not supported"))

View File

@@ -28,20 +28,17 @@ intact.
:volume_topic: What :mod:`rpc` topic to listen to (default: `volume`). :volume_topic: What :mod:`rpc` topic to listen to (default: `volume`).
:volume_manager: The module name of a class derived from :volume_manager: The module name of a class derived from
:class:`manager.Manager` (default: :class:`manager.Manager` (default:
:class:`nova.volume.manager.AOEManager`). :class:`nova.volume.manager.Manager`).
:storage_availability_zone: Defaults to `nova`. :storage_availability_zone: Defaults to `nova`.
:volume_driver: Used by :class:`AOEManager`. Defaults to :volume_driver: Used by :class:`Manager`. Defaults to
:class:`nova.volume.driver.AOEDriver`. :class:`nova.volume.driver.ISCSIDriver`.
:num_shelves: Number of shelves for AoE (default: 100).
:num_blades: Number of vblades per shelf to allocate AoE storage from
(default: 16).
:volume_group: Name of the group that will contain exported volumes (default: :volume_group: Name of the group that will contain exported volumes (default:
`nova-volumes`) `nova-volumes`)
:aoe_eth_dev: Device name the volumes will be exported on (default: `eth0`). :num_shell_tries: Number of times to attempt to run commands (default: 3)
:num_shell_tries: Number of times to attempt to run AoE commands (default: 3)
""" """
import sys
from nova import context from nova import context
from nova import exception from nova import exception
@@ -126,10 +123,11 @@ class VolumeManager(manager.SchedulerDependentManager):
if model_update: if model_update:
self.db.volume_update(context, volume_ref['id'], model_update) self.db.volume_update(context, volume_ref['id'], model_update)
except Exception: except Exception:
exc_info = sys.exc_info()
self.db.volume_update(context, self.db.volume_update(context,
volume_ref['id'], {'status': 'error'}) volume_ref['id'], {'status': 'error'})
self._notify_vsa(context, volume_ref, 'error') self._notify_vsa(context, volume_ref, 'error')
raise raise exc_info
now = utils.utcnow() now = utils.utcnow()
self.db.volume_update(context, self.db.volume_update(context,
@@ -181,10 +179,11 @@ class VolumeManager(manager.SchedulerDependentManager):
{'status': 'available'}) {'status': 'available'})
return True return True
except Exception: except Exception:
exc_info = sys.exc_info()
self.db.volume_update(context, self.db.volume_update(context,
volume_ref['id'], volume_ref['id'],
{'status': 'error_deleting'}) {'status': 'error_deleting'})
raise raise exc_info
self.db.volume_destroy(context, volume_id) self.db.volume_destroy(context, volume_id)
LOG.debug(_("volume %s: deleted successfully"), volume_ref['name']) LOG.debug(_("volume %s: deleted successfully"), volume_ref['name'])
@@ -233,26 +232,44 @@ class VolumeManager(manager.SchedulerDependentManager):
LOG.debug(_("snapshot %s: deleted successfully"), snapshot_ref['name']) LOG.debug(_("snapshot %s: deleted successfully"), snapshot_ref['name'])
return True return True
def setup_compute_volume(self, context, volume_id): def attach_volume(self, context, volume_id, instance_id, mountpoint):
"""Setup remote volume on compute host. """Updates db to show volume is attached"""
# TODO(vish): refactor this into a more general "reserve"
self.db.volume_attached(context,
volume_id,
instance_id,
mountpoint)
Returns path to device.""" def detach_volume(self, context, volume_id):
context = context.elevated() """Updates db to show volume is detached"""
volume_ref = self.db.volume_get(context, volume_id) # TODO(vish): refactor this into a more general "unreserve"
if volume_ref['host'] == self.host and FLAGS.use_local_volumes: self.db.volume_detached(context, volume_id)
path = self.driver.local_path(volume_ref)
else:
path = self.driver.discover_volume(context, volume_ref)
return path
def remove_compute_volume(self, context, volume_id): def initialize_connection(self, context, volume_id, address):
"""Remove remote volume on compute host.""" """Initialize volume to be connected from address.
context = context.elevated()
This method calls the driver initialize_connection and returns
it to the caller. The driver is responsible for doing any
necessary security setup and returning a connection_info dictionary
in the following format:
{'driver_volume_type': driver_volume_type
'data': data}
driver_volume_type: a string to identify the type of volume. This
can be used by the calling code to determine the
strategy for connecting to the volume. This could
be 'iscsi', 'rdb', 'sheepdog', etc.
data: this is the data that the calling code will use to connect
to the volume. Keep in mind that this will be serialized to
json in various places, so it should not contain any non-json
data types.
"""
volume_ref = self.db.volume_get(context, volume_id) volume_ref = self.db.volume_get(context, volume_id)
if volume_ref['host'] == self.host and FLAGS.use_local_volumes: return self.driver.initialize_connection(volume_ref, address)
return True
else: def terminate_connection(self, context, volume_id, address):
self.driver.undiscover_volume(volume_ref) volume_ref = self.db.volume_get(context, volume_id)
self.driver.terminate_connection(volume_ref, address)
def check_for_export(self, context, instance_id): def check_for_export(self, context, instance_id):
"""Make sure whether volume is exported.""" """Make sure whether volume is exported."""

View File

@@ -61,9 +61,6 @@ class SanISCSIDriver(ISCSIDriver):
def _build_iscsi_target_name(self, volume): def _build_iscsi_target_name(self, volume):
return "%s%s" % (FLAGS.iscsi_target_prefix, volume['name']) return "%s%s" % (FLAGS.iscsi_target_prefix, volume['name'])
# discover_volume is still OK
# undiscover_volume is still OK
def _connect_to_ssh(self): def _connect_to_ssh(self):
ssh = paramiko.SSHClient() ssh = paramiko.SSHClient()
#TODO(justinsb): We need a better SSH key policy #TODO(justinsb): We need a better SSH key policy