Merging trunk, fixing conflicts

This commit is contained in:
Rick Harris 2011-06-20 14:58:00 +00:00
commit b84cb654ac
25 changed files with 1165 additions and 157 deletions

View File

@ -21,22 +21,15 @@ APIRequest class
"""
import datetime
import re
# TODO(termie): replace minidom with etree
from xml.dom import minidom
from nova import log as logging
from nova.api.ec2 import ec2utils
LOG = logging.getLogger("nova.api.request")
_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))')
def _camelcase_to_underscore(str):
return _c2u.sub(r'_\1', str).lower().strip('_')
def _underscore_to_camelcase(str):
return ''.join([x[:1].upper() + x[1:] for x in str.split('_')])
@ -51,59 +44,6 @@ def _database_to_isoformat(datetimeobj):
return datetimeobj.strftime("%Y-%m-%dT%H:%M:%SZ")
def _try_convert(value):
"""Return a non-string from a string or unicode, if possible.
============= =====================================================
When value is returns
============= =====================================================
zero-length ''
'None' None
'True' True
'False' False
'0', '-0' 0
0xN, -0xN int from hex (postitive) (N is any number)
0bN, -0bN int from binary (positive) (N is any number)
* try conversion to int, float, complex, fallback value
"""
if len(value) == 0:
return ''
if value == 'None':
return None
if value == 'True':
return True
if value == 'False':
return False
valueneg = value[1:] if value[0] == '-' else value
if valueneg == '0':
return 0
if valueneg == '':
return value
if valueneg[0] == '0':
if valueneg[1] in 'xX':
return int(value, 16)
elif valueneg[1] in 'bB':
return int(value, 2)
else:
try:
return int(value, 8)
except ValueError:
pass
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
try:
return complex(value)
except ValueError:
return value
class APIRequest(object):
def __init__(self, controller, action, version, args):
self.controller = controller
@ -114,7 +54,7 @@ class APIRequest(object):
def invoke(self, context):
try:
method = getattr(self.controller,
_camelcase_to_underscore(self.action))
ec2utils.camelcase_to_underscore(self.action))
except AttributeError:
controller = self.controller
action = self.action
@ -125,19 +65,7 @@ class APIRequest(object):
# and reraise as 400 error.
raise Exception(_error)
args = {}
for key, value in self.args.items():
parts = key.split(".")
key = _camelcase_to_underscore(parts[0])
if isinstance(value, str) or isinstance(value, unicode):
# NOTE(vish): Automatically convert strings back
# into their respective values
value = _try_convert(value)
if len(parts) > 1:
d = args.get(key, {})
d[parts[1]] = value
value = d
args[key] = value
args = ec2utils.dict_from_dotted_str(self.args.items())
for key in args.keys():
# NOTE(vish): Turn numeric dict keys into lists

View File

@ -909,6 +909,25 @@ class CloudController(object):
if kwargs.get('ramdisk_id'):
ramdisk = self._get_image(context, kwargs['ramdisk_id'])
kwargs['ramdisk_id'] = ramdisk['id']
for bdm in kwargs.get('block_device_mapping', []):
# NOTE(yamahata)
# BlockDevicedMapping.<N>.DeviceName
# BlockDevicedMapping.<N>.Ebs.SnapshotId
# BlockDevicedMapping.<N>.Ebs.VolumeSize
# BlockDevicedMapping.<N>.Ebs.DeleteOnTermination
# BlockDevicedMapping.<N>.VirtualName
# => remove .Ebs and allow volume id in SnapshotId
ebs = bdm.pop('ebs', None)
if ebs:
ec2_id = ebs.pop('snapshot_id')
id = ec2utils.ec2_id_to_id(ec2_id)
if ec2_id.startswith('snap-'):
bdm['snapshot_id'] = id
elif ec2_id.startswith('vol-'):
bdm['volume_id'] = id
ebs.setdefault('delete_on_termination', True)
bdm.update(ebs)
image = self._get_image(context, kwargs['image_id'])
if image:
@ -933,37 +952,54 @@ class CloudController(object):
user_data=kwargs.get('user_data'),
security_group=kwargs.get('security_group'),
availability_zone=kwargs.get('placement', {}).get(
'AvailabilityZone'))
'AvailabilityZone'),
block_device_mapping=kwargs.get('block_device_mapping', {}))
return self._format_run_instances(context,
instances[0]['reservation_id'])
def _do_instance(self, action, context, ec2_id):
instance_id = ec2utils.ec2_id_to_id(ec2_id)
action(context, instance_id=instance_id)
def _do_instances(self, action, context, instance_id):
for ec2_id in instance_id:
self._do_instance(action, context, ec2_id)
def terminate_instances(self, context, instance_id, **kwargs):
"""Terminate each instance in instance_id, which is a list of ec2 ids.
instance_id is a kwarg so its name cannot be modified."""
LOG.debug(_("Going to start terminating instances"))
for ec2_id in instance_id:
instance_id = ec2utils.ec2_id_to_id(ec2_id)
self.compute_api.delete(context, instance_id=instance_id)
self._do_instances(self.compute_api.delete, context, instance_id)
return True
def reboot_instances(self, context, instance_id, **kwargs):
"""instance_id is a list of instance ids"""
LOG.audit(_("Reboot instance %r"), instance_id, context=context)
for ec2_id in instance_id:
instance_id = ec2utils.ec2_id_to_id(ec2_id)
self.compute_api.reboot(context, instance_id=instance_id)
self._do_instances(self.compute_api.reboot, context, instance_id)
return True
def stop_instances(self, context, instance_id, **kwargs):
"""Stop each instances in instance_id.
Here instance_id is a list of instance ids"""
LOG.debug(_("Going to stop instances"))
self._do_instances(self.compute_api.stop, context, instance_id)
return True
def start_instances(self, context, instance_id, **kwargs):
"""Start each instances in instance_id.
Here instance_id is a list of instance ids"""
LOG.debug(_("Going to start instances"))
self._do_instances(self.compute_api.start, context, instance_id)
return True
def rescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
instance_id = ec2utils.ec2_id_to_id(instance_id)
self.compute_api.rescue(context, instance_id=instance_id)
self._do_instance(self.compute_api.rescue, contect, instnace_id)
return True
def unrescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
instance_id = ec2utils.ec2_id_to_id(instance_id)
self.compute_api.unrescue(context, instance_id=instance_id)
self._do_instance(self.compute_api.unrescue, context, instance_id)
return True
def update_instance(self, context, instance_id, **kwargs):

View File

@ -16,6 +16,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import re
from nova import exception
@ -30,3 +32,95 @@ def ec2_id_to_id(ec2_id):
def id_to_ec2_id(instance_id, template='i-%08x'):
"""Convert an instance ID (int) to an ec2 ID (i-[base 16 number])"""
return template % instance_id
_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))')
def camelcase_to_underscore(str):
return _c2u.sub(r'_\1', str).lower().strip('_')
def _try_convert(value):
"""Return a non-string from a string or unicode, if possible.
============= =====================================================
When value is returns
============= =====================================================
zero-length ''
'None' None
'True' True case insensitive
'False' False case insensitive
'0', '-0' 0
0xN, -0xN int from hex (postitive) (N is any number)
0bN, -0bN int from binary (positive) (N is any number)
* try conversion to int, float, complex, fallback value
"""
if len(value) == 0:
return ''
if value == 'None':
return None
lowered_value = value.lower()
if lowered_value == 'true':
return True
if lowered_value == 'false':
return False
valueneg = value[1:] if value[0] == '-' else value
if valueneg == '0':
return 0
if valueneg == '':
return value
if valueneg[0] == '0':
if valueneg[1] in 'xX':
return int(value, 16)
elif valueneg[1] in 'bB':
return int(value, 2)
else:
try:
return int(value, 8)
except ValueError:
pass
try:
return int(value)
except ValueError:
pass
try:
return float(value)
except ValueError:
pass
try:
return complex(value)
except ValueError:
return value
def dict_from_dotted_str(items):
"""parse multi dot-separated argument into dict.
EBS boot uses multi dot-separeted arguments like
BlockDeviceMapping.1.DeviceName=snap-id
Convert the above into
{'block_device_mapping': {'1': {'device_name': snap-id}}}
"""
args = {}
for key, value in items:
parts = key.split(".")
key = camelcase_to_underscore(parts[0])
if isinstance(value, str) or isinstance(value, unicode):
# NOTE(vish): Automatically convert strings back
# into their respective values
value = _try_convert(value)
if len(parts) > 1:
d = args.get(key, {})
args[key] = d
for k in parts[1:-1]:
k = camelcase_to_underscore(k)
v = d.get(k, {})
d[k] = v
d = v
d[camelcase_to_underscore(parts[-1])] = value
else:
args[key] = value
return args

View File

@ -18,9 +18,10 @@
from webob import exc
from nova import compute
from nova import quota
from nova.api.openstack import faults
from nova.api.openstack import wsgi
from nova import exception
from nova import quota
class Controller(object):
@ -45,7 +46,11 @@ class Controller(object):
def index(self, req, server_id):
""" Returns the list of metadata for a given instance """
context = req.environ['nova.context']
try:
return self._get_metadata(context, server_id)
except exception.InstanceNotFound:
msg = _('Server %(server_id)s does not exist') % locals()
raise exc.HTTPNotFound(explanation=msg)
def create(self, req, server_id, body):
self._check_body(body)
@ -55,8 +60,13 @@ class Controller(object):
self.compute_api.update_or_create_instance_metadata(context,
server_id,
metadata)
except exception.InstanceNotFound:
msg = _('Server %(server_id)s does not exist') % locals()
raise exc.HTTPNotFound(explanation=msg)
except quota.QuotaError as error:
self._handle_quota_error(error)
return body
def update(self, req, server_id, id, body):
@ -72,6 +82,10 @@ class Controller(object):
self.compute_api.update_or_create_instance_metadata(context,
server_id,
body)
except exception.InstanceNotFound:
msg = _('Server %(server_id)s does not exist') % locals()
raise exc.HTTPNotFound(explanation=msg)
except quota.QuotaError as error:
self._handle_quota_error(error)
@ -80,16 +94,26 @@ class Controller(object):
def show(self, req, server_id, id):
""" Return a single metadata item """
context = req.environ['nova.context']
try:
data = self._get_metadata(context, server_id)
if id in data['metadata']:
except exception.InstanceNotFound:
msg = _('Server %(server_id)s does not exist') % locals()
raise exc.HTTPNotFound(explanation=msg)
try:
return {id: data['metadata'][id]}
else:
return faults.Fault(exc.HTTPNotFound())
except KeyError:
msg = _("metadata item %s was not found" % (id))
raise exc.HTTPNotFound(explanation=msg)
def delete(self, req, server_id, id):
""" Deletes an existing metadata """
context = req.environ['nova.context']
try:
self.compute_api.delete_instance_metadata(context, server_id, id)
except exception.InstanceNotFound:
msg = _('Server %(server_id)s does not exist') % locals()
raise exc.HTTPNotFound(explanation=msg)
def _handle_quota_error(self, error):
"""Reraise quota errors as api-specific http exceptions."""

View File

@ -34,6 +34,7 @@ from nova import utils
from nova import volume
from nova.compute import instance_types
from nova.compute import power_state
from nova.compute.utils import terminate_volumes
from nova.scheduler import api as scheduler_api
from nova.db import base
@ -52,6 +53,18 @@ def generate_default_hostname(instance_id):
return str(instance_id)
def _is_able_to_shutdown(instance, instance_id):
states = {'terminating': "Instance %s is already being terminated",
'migrating': "Instance %s is being migrated",
'stopping': "Instance %s is being stopped"}
msg = states.get(instance['state_description'])
if msg:
LOG.warning(_(msg), instance_id)
return False
return True
class API(base.Base):
"""API for interacting with the compute manager."""
@ -235,7 +248,7 @@ class API(base.Base):
return (num_instances, base_options, security_groups)
def create_db_entry_for_new_instance(self, context, base_options,
security_groups, num=1):
security_groups, block_device_mapping, num=1):
"""Create an entry in the DB for this new instance,
including any related table updates (such as security
groups, MAC address, etc). This will called by create()
@ -255,6 +268,23 @@ class API(base.Base):
instance_id,
security_group_id)
# NOTE(yamahata)
# tell vm driver to attach volume at boot time by updating
# BlockDeviceMapping
for bdm in block_device_mapping:
LOG.debug(_('bdm %s'), bdm)
assert 'device_name' in bdm
values = {
'instance_id': instance_id,
'device_name': bdm['device_name'],
'delete_on_termination': bdm.get('delete_on_termination'),
'virtual_name': bdm.get('virtual_name'),
'snapshot_id': bdm.get('snapshot_id'),
'volume_id': bdm.get('volume_id'),
'volume_size': bdm.get('volume_size'),
'no_device': bdm.get('no_device')}
self.db.block_device_mapping_create(elevated, values)
# Set sane defaults if not specified
updates = dict(hostname=self.hostname_factory(instance_id))
if (not hasattr(instance, 'display_name') or
@ -339,7 +369,7 @@ class API(base.Base):
key_name=None, key_data=None, security_group='default',
availability_zone=None, user_data=None, metadata={},
injected_files=None, admin_password=None, zone_blob=None,
reservation_id=None):
reservation_id=None, block_device_mapping=None):
"""
Provision the instances by sending off a series of single
instance requests to the Schedulers. This is fine for trival
@ -360,11 +390,13 @@ class API(base.Base):
injected_files, admin_password, zone_blob,
reservation_id)
block_device_mapping = block_device_mapping or []
instances = []
LOG.debug(_("Going to run %s instances..."), num_instances)
for num in range(num_instances):
instance = self.create_db_entry_for_new_instance(context,
base_options, security_groups, num=num)
base_options, security_groups,
block_device_mapping, num=num)
instances.append(instance)
instance_id = instance['id']
@ -474,24 +506,22 @@ class API(base.Base):
rv = self.db.instance_update(context, instance_id, kwargs)
return dict(rv.iteritems())
def _get_instance(self, context, instance_id, action_str):
try:
return self.get(context, instance_id)
except exception.NotFound:
LOG.warning(_("Instance %(instance_id)s was not found during "
"%(action_str)s") %
{'instance_id': instance_id, 'action_str': action_str})
raise
@scheduler_api.reroute_compute("delete")
def delete(self, context, instance_id):
"""Terminate an instance."""
LOG.debug(_("Going to try to terminate %s"), instance_id)
try:
instance = self.get(context, instance_id)
except exception.NotFound:
LOG.warning(_("Instance %s was not found during terminate"),
instance_id)
raise
instance = self._get_instance(context, instance_id, 'terminating')
if instance['state_description'] == 'terminating':
LOG.warning(_("Instance %s is already being terminated"),
instance_id)
return
if instance['state_description'] == 'migrating':
LOG.warning(_("Instance %s is being migrated"), instance_id)
if not _is_able_to_shutdown(instance, instance_id):
return
self.update(context,
@ -505,8 +535,48 @@ class API(base.Base):
self._cast_compute_message('terminate_instance', context,
instance_id, host)
else:
terminate_volumes(self.db, context, instance_id)
self.db.instance_destroy(context, instance_id)
@scheduler_api.reroute_compute("stop")
def stop(self, context, instance_id):
"""Stop an instance."""
LOG.debug(_("Going to try to stop %s"), instance_id)
instance = self._get_instance(context, instance_id, 'stopping')
if not _is_able_to_shutdown(instance, instance_id):
return
self.update(context,
instance['id'],
state_description='stopping',
state=power_state.NOSTATE,
terminated_at=utils.utcnow())
host = instance['host']
if host:
self._cast_compute_message('stop_instance', context,
instance_id, host)
def start(self, context, instance_id):
"""Start an instance."""
LOG.debug(_("Going to try to start %s"), instance_id)
instance = self._get_instance(context, instance_id, 'starting')
if instance['state_description'] != 'stopped':
_state_description = instance['state_description']
LOG.warning(_("Instance %(instance_id)s is not "
"stopped(%(_state_description)s)") % locals())
return
# TODO(yamahata): injected_files isn't supported right now.
# It is used only for osapi. not for ec2 api.
# availability_zone isn't used by run_instance.
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "start_instance",
"args": {"topic": FLAGS.compute_topic,
"instance_id": instance_id}})
def get(self, context, instance_id):
"""Get a single instance with the given instance_id."""
# NOTE(sirp): id used to be exclusively integer IDs; now we're

View File

@ -53,6 +53,7 @@ from nova import rpc
from nova import utils
from nova import volume
from nova.compute import power_state
from nova.compute.utils import terminate_volumes
from nova.virt import driver
@ -214,8 +215,63 @@ class ComputeManager(manager.SchedulerDependentManager):
"""
return self.driver.refresh_security_group_members(security_group_id)
@exception.wrap_exception
def run_instance(self, context, instance_id, **kwargs):
def _setup_block_device_mapping(self, context, instance_id):
"""setup volumes for block device mapping"""
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
'block_device_mapping')
volume_api = volume.API()
block_device_mapping = []
for bdm in self.db.block_device_mapping_get_all_by_instance(
context, instance_id):
LOG.debug(_("setting up bdm %s"), bdm)
if ((bdm['snapshot_id'] is not None) and
(bdm['volume_id'] is None)):
# TODO(yamahata): default name and description
vol = volume_api.create(context, bdm['volume_size'],
bdm['snapshot_id'], '', '')
# TODO(yamahata): creating volume simultaneously
# reduces creation time?
volume_api.wait_creation(context, vol['id'])
self.db.block_device_mapping_update(
context, bdm['id'], {'volume_id': vol['id']})
bdm['volume_id'] = vol['id']
if not ((bdm['snapshot_id'] is None) or
(bdm['volume_id'] is not None)):
LOG.error(_('corrupted state of block device mapping '
'id: %(id)s '
'snapshot: %(snapshot_id) volume: %(vollume_id)') %
{'id': bdm['id'],
'snapshot_id': bdm['snapshot'],
'volume_id': bdm['volume_id']})
raise exception.ApiError(_('broken block device mapping %d') %
bdm['id'])
if bdm['volume_id'] is not None:
volume_api.check_attach(context,
volume_id=bdm['volume_id'])
dev_path = self._attach_volume_boot(context, instance_id,
bdm['volume_id'],
bdm['device_name'])
block_device_mapping.append({'device_path': dev_path,
'mount_device':
bdm['device_name']})
elif bdm['virtual_name'] is not None:
# TODO(yamahata): ephemeral/swap device support
LOG.debug(_('block_device_mapping: '
'ephemeral device is not supported yet'))
else:
# TODO(yamahata): NoDevice support
assert bdm['no_device']
LOG.debug(_('block_device_mapping: '
'no device is not supported yet'))
return block_device_mapping
def _run_instance(self, context, instance_id, **kwargs):
"""Launch a new instance with specified options."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
@ -249,11 +305,15 @@ class ComputeManager(manager.SchedulerDependentManager):
self.network_manager.setup_compute_network(context,
instance_id)
block_device_mapping = self._setup_block_device_mapping(context,
instance_id)
# TODO(vish) check to make sure the availability zone matches
self._update_state(context, instance_id, power_state.BUILDING)
try:
self.driver.spawn(instance_ref)
self.driver.spawn(instance_ref,
block_device_mapping=block_device_mapping)
except Exception as ex: # pylint: disable=W0702
msg = _("Instance '%(instance_id)s' failed to spawn. Is "
"virtualization enabled in the BIOS? Details: "
@ -276,13 +336,25 @@ class ComputeManager(manager.SchedulerDependentManager):
self._update_launched_at(context, instance_id)
self._update_state(context, instance_id)
@exception.wrap_exception
def run_instance(self, context, instance_id, **kwargs):
self._run_instance(context, instance_id, **kwargs)
@exception.wrap_exception
@checks_instance_lock
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this host."""
def start_instance(self, context, instance_id):
"""Starting an instance on this host."""
# TODO(yamahata): injected_files isn't supported.
# Anyway OSAPI doesn't support stop/start yet
self._run_instance(context, instance_id)
def _shutdown_instance(self, context, instance_id, action_str):
"""Shutdown an instance on this host."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
LOG.audit(_("Terminating instance %s"), instance_id, context=context)
LOG.audit(_("%(action_str)s instance %(instance_id)s") %
{'action_str': action_str, 'instance_id': instance_id},
context=context)
fixed_ip = instance_ref.get('fixed_ip')
if not FLAGS.stub_network and fixed_ip:
@ -318,16 +390,34 @@ class ComputeManager(manager.SchedulerDependentManager):
volumes = instance_ref.get('volumes') or []
for volume in volumes:
self.detach_volume(context, instance_id, volume['id'])
if instance_ref['state'] == power_state.SHUTOFF:
self._detach_volume(context, instance_id, volume['id'], False)
if (instance_ref['state'] == power_state.SHUTOFF and
instance_ref['state_description'] != 'stopped'):
self.db.instance_destroy(context, instance_id)
raise exception.Error(_('trying to destroy already destroyed'
' instance: %s') % instance_id)
self.driver.destroy(instance_ref)
if action_str == 'Terminating':
terminate_volumes(self.db, context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this host."""
self._shutdown_instance(context, instance_id, 'Terminating')
# TODO(ja): should we keep it in a terminated state for a bit?
self.db.instance_destroy(context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def stop_instance(self, context, instance_id):
"""Stopping an instance on this host."""
self._shutdown_instance(context, instance_id, 'Stopping')
# instance state will be updated to stopped by _poll_instance_states()
@exception.wrap_exception
@checks_instance_lock
def rebuild_instance(self, context, instance_id, **kwargs):
@ -799,6 +889,22 @@ class ComputeManager(manager.SchedulerDependentManager):
instance_ref = self.db.instance_get(context, instance_id)
return self.driver.get_vnc_console(instance_ref)
def _attach_volume_boot(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance at boot time. So actual attach
is done by instance creation"""
# TODO(yamahata):
# should move check_attach to volume manager?
volume.API().check_attach(context, volume_id)
context = context.elevated()
LOG.audit(_("instance %(instance_id)s: booting with "
"volume %(volume_id)s at %(mountpoint)s") %
locals(), context=context)
dev_path = self.volume_manager.setup_compute_volume(context, volume_id)
self.db.volume_attached(context, volume_id, instance_id, mountpoint)
return dev_path
@checks_instance_lock
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
@ -816,6 +922,16 @@ class ComputeManager(manager.SchedulerDependentManager):
volume_id,
instance_id,
mountpoint)
values = {
'instance_id': instance_id,
'device_name': mountpoint,
'delete_on_termination': False,
'virtual_name': None,
'snapshot_id': None,
'volume_id': volume_id,
'volume_size': None,
'no_device': None}
self.db.block_device_mapping_create(context, values)
except Exception as exc: # pylint: disable=W0702
# NOTE(vish): The inline callback eats the exception info so we
# log the traceback here and reraise the same
@ -830,7 +946,7 @@ class ComputeManager(manager.SchedulerDependentManager):
@exception.wrap_exception
@checks_instance_lock
def detach_volume(self, context, instance_id, volume_id):
def _detach_volume(self, context, instance_id, volume_id, destroy_bdm):
"""Detach a volume from an instance."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
@ -846,8 +962,15 @@ class ComputeManager(manager.SchedulerDependentManager):
volume_ref['mountpoint'])
self.volume_manager.remove_compute_volume(context, volume_id)
self.db.volume_detached(context, volume_id)
if destroy_bdm:
self.db.block_device_mapping_destroy_by_instance_and_volume(
context, instance_id, volume_id)
return True
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
return self._detach_volume(context, instance_id, volume_id, True)
def remove_volume(self, context, volume_id):
"""Remove volume on compute host.
@ -1173,11 +1296,14 @@ class ComputeManager(manager.SchedulerDependentManager):
"State=%(db_state)s, so setting state to "
"shutoff.") % locals())
vm_state = power_state.SHUTOFF
if db_instance['state_description'] == 'stopping':
self.db.instance_stop(context, db_instance['id'])
continue
else:
vm_state = vm_instance.state
vms_not_found_in_db.remove(name)
if db_instance['state_description'] == 'migrating':
if (db_instance['state_description'] in ['migrating', 'stopping']):
# A situation which db record exists, but no instance"
# sometimes occurs while live-migration at src compute,
# this case should be ignored.

29
nova/compute/utils.py Normal file
View File

@ -0,0 +1,29 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2011 VA Linux Systems Japan K.K
# Copyright (c) 2011 Isaku Yamahata
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from nova import volume
def terminate_volumes(db, context, instance_id):
"""delete volumes of delete_on_termination=True in block device mapping"""
volume_api = volume.API()
for bdm in db.block_device_mapping_get_all_by_instance(context,
instance_id):
#LOG.debug(_("terminating bdm %s") % bdm)
if bdm['volume_id'] and bdm['delete_on_termination']:
volume_api.delete(context, bdm['volume_id'])
db.block_device_mapping_destroy(context, bdm['id'])

View File

@ -414,6 +414,11 @@ def instance_destroy(context, instance_id):
return IMPL.instance_destroy(context, instance_id)
def instance_stop(context, instance_id):
"""Stop the instance or raise if it does not exist."""
return IMPL.instance_stop(context, instance_id)
def instance_get_by_uuid(context, uuid):
"""Get an instance or raise if it does not exist."""
return IMPL.instance_get_by_uuid(context, uuid)
@ -925,6 +930,36 @@ def snapshot_update(context, snapshot_id, values):
####################
def block_device_mapping_create(context, values):
"""Create an entry of block device mapping"""
return IMPL.block_device_mapping_create(context, values)
def block_device_mapping_update(context, bdm_id, values):
"""Create an entry of block device mapping"""
return IMPL.block_device_mapping_update(context, bdm_id, values)
def block_device_mapping_get_all_by_instance(context, instance_id):
"""Get all block device mapping belonging to a instance"""
return IMPL.block_device_mapping_get_all_by_instance(context, instance_id)
def block_device_mapping_destroy(context, bdm_id):
"""Destroy the block device mapping."""
return IMPL.block_device_mapping_destroy(context, bdm_id)
def block_device_mapping_destroy_by_instance_and_volume(context, instance_id,
volume_id):
"""Destroy the block device mapping or raise if it does not exist."""
return IMPL.block_device_mapping_destroy_by_instance_and_volume(
context, instance_id, volume_id)
####################
def security_group_get_all(context):
"""Get all security groups."""
return IMPL.security_group_get_all(context)

View File

@ -18,7 +18,7 @@
"""
Implementation of SQLAlchemy backend.
"""
import traceback
import warnings
from nova import db
@ -841,6 +841,25 @@ def instance_destroy(context, instance_id):
'updated_at': literal_column('updated_at')})
@require_context
def instance_stop(context, instance_id):
session = get_session()
with session.begin():
from nova.compute import power_state
session.query(models.Instance).\
filter_by(id=instance_id).\
update({'host': None,
'state': power_state.SHUTOFF,
'state_description': 'stopped',
'updated_at': literal_column('updated_at')})
session.query(models.SecurityGroupInstanceAssociation).\
filter_by(instance_id=instance_id).\
update({'updated_at': literal_column('updated_at')})
session.query(models.InstanceMetadata).\
filter_by(instance_id=instance_id).\
update({'updated_at': literal_column('updated_at')})
@require_context
def instance_get_by_uuid(context, uuid, session=None):
partial = _build_instance_get(context, session=session)
@ -1893,6 +1912,66 @@ def snapshot_update(context, snapshot_id, values):
###################
@require_context
def block_device_mapping_create(context, values):
bdm_ref = models.BlockDeviceMapping()
bdm_ref.update(values)
session = get_session()
with session.begin():
bdm_ref.save(session=session)
@require_context
def block_device_mapping_update(context, bdm_id, values):
session = get_session()
with session.begin():
session.query(models.BlockDeviceMapping).\
filter_by(id=bdm_id).\
filter_by(deleted=False).\
update(values)
@require_context
def block_device_mapping_get_all_by_instance(context, instance_id):
session = get_session()
result = session.query(models.BlockDeviceMapping).\
filter_by(instance_id=instance_id).\
filter_by(deleted=False).\
all()
if not result:
return []
return result
@require_context
def block_device_mapping_destroy(context, bdm_id):
session = get_session()
with session.begin():
session.query(models.BlockDeviceMapping).\
filter_by(id=bdm_id).\
update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
@require_context
def block_device_mapping_destroy_by_instance_and_volume(context, instance_id,
volume_id):
session = get_session()
with session.begin():
session.query(models.BlockDeviceMapping).\
filter_by(instance_id=instance_id).\
filter_by(volume_id=volume_id).\
filter_by(deleted=False).\
update({'deleted': True,
'deleted_at': utils.utcnow(),
'updated_at': literal_column('updated_at')})
###################
@require_context
def security_group_get_all(context):
session = get_session()
@ -2626,7 +2705,17 @@ def zone_get_all(context):
####################
def require_instance_exists(func):
def new_func(context, instance_id, *args, **kwargs):
db.api.instance_get(context, instance_id)
return func(context, instance_id, *args, **kwargs)
new_func.__name__ = func.__name__
return new_func
@require_context
@require_instance_exists
def instance_metadata_get(context, instance_id):
session = get_session()
@ -2642,6 +2731,7 @@ def instance_metadata_get(context, instance_id):
@require_context
@require_instance_exists
def instance_metadata_delete(context, instance_id, key):
session = get_session()
session.query(models.InstanceMetadata).\
@ -2654,6 +2744,7 @@ def instance_metadata_delete(context, instance_id, key):
@require_context
@require_instance_exists
def instance_metadata_delete_all(context, instance_id):
session = get_session()
session.query(models.InstanceMetadata).\
@ -2665,6 +2756,7 @@ def instance_metadata_delete_all(context, instance_id):
@require_context
@require_instance_exists
def instance_metadata_get_item(context, instance_id, key):
session = get_session()
@ -2681,6 +2773,7 @@ def instance_metadata_get_item(context, instance_id, key):
@require_context
@require_instance_exists
def instance_metadata_update_or_create(context, instance_id, metadata):
session = get_session()

View File

@ -0,0 +1,87 @@
# Copyright 2011 OpenStack LLC.
# Copyright 2011 Isaku Yamahata
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from sqlalchemy import MetaData, Table, Column
from sqlalchemy import DateTime, Boolean, Integer, String
from sqlalchemy import ForeignKey
from nova import log as logging
meta = MetaData()
# Just for the ForeignKey and column creation to succeed, these are not the
# actual definitions of instances or services.
instances = Table('instances', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
volumes = Table('volumes', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
snapshots = Table('snapshots', meta,
Column('id', Integer(), primary_key=True, nullable=False),
)
block_device_mapping = Table('block_device_mapping', meta,
Column('created_at', DateTime(timezone=False)),
Column('updated_at', DateTime(timezone=False)),
Column('deleted_at', DateTime(timezone=False)),
Column('deleted', Boolean(create_constraint=True, name=None)),
Column('id', Integer(), primary_key=True, autoincrement=True),
Column('instance_id',
Integer(),
ForeignKey('instances.id'),
nullable=False),
Column('device_name',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=False),
Column('delete_on_termination',
Boolean(create_constraint=True, name=None),
default=False),
Column('virtual_name',
String(length=255, convert_unicode=False, assert_unicode=None,
unicode_error=None, _warn_on_bytestring=False),
nullable=True),
Column('snapshot_id',
Integer(),
ForeignKey('snapshots.id'),
nullable=True),
Column('volume_id', Integer(), ForeignKey('volumes.id'),
nullable=True),
Column('volume_size', Integer(), nullable=True),
Column('no_device',
Boolean(create_constraint=True, name=None),
nullable=True),
)
def upgrade(migrate_engine):
# Upgrade operations go here. Don't create your own engine;
# bind migrate_engine to your metadata
meta.bind = migrate_engine
try:
block_device_mapping.create()
except Exception:
logging.info(repr(block_device_mapping))
logging.exception('Exception while creating table')
meta.drop_all(tables=[block_device_mapping])
raise
def downgrade(migrate_engine):
# Operations to reverse the above upgrade go here.
block_device_mapping.drop()

View File

@ -358,6 +358,45 @@ class Snapshot(BASE, NovaBase):
display_description = Column(String(255))
class BlockDeviceMapping(BASE, NovaBase):
"""Represents block device mapping that is defined by EC2"""
__tablename__ = "block_device_mapping"
id = Column(Integer, primary_key=True, autoincrement=True)
instance_id = Column(Integer, ForeignKey('instances.id'), nullable=False)
instance = relationship(Instance,
backref=backref('balock_device_mapping'),
foreign_keys=instance_id,
primaryjoin='and_(BlockDeviceMapping.instance_id=='
'Instance.id,'
'BlockDeviceMapping.deleted=='
'False)')
device_name = Column(String(255), nullable=False)
# default=False for compatibility of the existing code.
# With EC2 API,
# default True for ami specified device.
# default False for created with other timing.
delete_on_termination = Column(Boolean, default=False)
# for ephemeral device
virtual_name = Column(String(255), nullable=True)
# for snapshot or volume
snapshot_id = Column(Integer, ForeignKey('snapshots.id'), nullable=True)
# outer join
snapshot = relationship(Snapshot,
foreign_keys=snapshot_id)
volume_id = Column(Integer, ForeignKey('volumes.id'), nullable=True)
volume = relationship(Volume,
foreign_keys=volume_id)
volume_size = Column(Integer, nullable=True)
# for no device to suppress devices.
no_device = Column(Boolean, nullable=True)
class ExportDevice(BASE, NovaBase):
"""Represates a shelf and blade that a volume can be exported on."""
__tablename__ = 'export_devices'

View File

@ -39,7 +39,7 @@ flags.DEFINE_integer("max_networks", 1000,
class SimpleScheduler(chance.ChanceScheduler):
"""Implements Naive Scheduler that tries to find least loaded host."""
def schedule_run_instance(self, context, instance_id, *_args, **_kwargs):
def _schedule_instance(self, context, instance_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest running instances."""
instance_ref = db.instance_get(context, instance_id)
if (instance_ref['availability_zone']
@ -75,6 +75,12 @@ class SimpleScheduler(chance.ChanceScheduler):
" for this request. Is the appropriate"
" service running?"))
def schedule_run_instance(self, context, instance_id, *_args, **_kwargs):
return self._schedule_instance(context, instance_id, *_args, **_kwargs)
def schedule_start_instance(self, context, instance_id, *_args, **_kwargs):
return self._schedule_instance(context, instance_id, *_args, **_kwargs)
def schedule_create_volume(self, context, volume_id, *_args, **_kwargs):
"""Picks a host that is up and has the fewest volumes."""
volume_ref = db.volume_get(context, volume_id)

View File

@ -21,6 +21,7 @@ import unittest
import webob
from nova import exception
from nova import flags
from nova.api import openstack
from nova.tests.api.openstack import fakes
@ -67,6 +68,14 @@ def stub_max_server_metadata():
return metadata
def return_server(context, server_id):
return {'id': server_id}
def return_server_nonexistant(context, server_id):
raise exception.InstanceNotFound()
class ServerMetaDataTest(unittest.TestCase):
def setUp(self):
@ -76,6 +85,7 @@ class ServerMetaDataTest(unittest.TestCase):
fakes.FakeAuthDatabase.data = {}
fakes.stub_out_auth(self.stubs)
fakes.stub_out_key_pair_funcs(self.stubs)
self.stubs.Set(nova.db.api, 'instance_get', return_server)
def tearDown(self):
self.stubs.UnsetAll()
@ -92,6 +102,13 @@ class ServerMetaDataTest(unittest.TestCase):
self.assertEqual('application/json', res.headers['Content-Type'])
self.assertEqual('value1', res_dict['metadata']['key1'])
def test_index_nonexistant_server(self):
self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant)
req = webob.Request.blank('/v1.1/servers/1/meta')
req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
def test_index_no_data(self):
self.stubs.Set(nova.db.api, 'instance_metadata_get',
return_empty_server_metadata)
@ -114,13 +131,19 @@ class ServerMetaDataTest(unittest.TestCase):
self.assertEqual('application/json', res.headers['Content-Type'])
self.assertEqual('value5', res_dict['key5'])
def test_show_nonexistant_server(self):
self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant)
req = webob.Request.blank('/v1.1/servers/1/meta/key5')
req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
def test_show_meta_not_found(self):
self.stubs.Set(nova.db.api, 'instance_metadata_get',
return_empty_server_metadata)
req = webob.Request.blank('/v1.1/servers/1/meta/key6')
req.environ['api.version'] = '1.1'
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(404, res.status_int)
def test_delete(self):
@ -132,6 +155,14 @@ class ServerMetaDataTest(unittest.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(200, res.status_int)
def test_delete_nonexistant_server(self):
self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant)
req = webob.Request.blank('/v1.1/servers/1/meta/key5')
req.environ['api.version'] = '1.1'
req.method = 'DELETE'
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
def test_create(self):
self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
return_create_instance_metadata)
@ -141,8 +172,8 @@ class ServerMetaDataTest(unittest.TestCase):
req.body = '{"metadata": {"key1": "value1"}}'
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
res_dict = json.loads(res.body)
self.assertEqual(200, res.status_int)
res_dict = json.loads(res.body)
self.assertEqual('application/json', res.headers['Content-Type'])
self.assertEqual('value1', res_dict['metadata']['key1'])
@ -156,6 +187,16 @@ class ServerMetaDataTest(unittest.TestCase):
res = req.get_response(fakes.wsgi_app())
self.assertEqual(400, res.status_int)
def test_create_nonexistant_server(self):
self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant)
req = webob.Request.blank('/v1.1/servers/100/meta')
req.environ['api.version'] = '1.1'
req.method = 'POST'
req.body = '{"metadata": {"key1": "value1"}}'
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
def test_update_item(self):
self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
return_create_instance_metadata)
@ -170,6 +211,16 @@ class ServerMetaDataTest(unittest.TestCase):
res_dict = json.loads(res.body)
self.assertEqual('value1', res_dict['key1'])
def test_update_item_nonexistant_server(self):
self.stubs.Set(nova.db.api, 'instance_get', return_server_nonexistant)
req = webob.Request.blank('/v1.1/servers/asdf/100/key1')
req.environ['api.version'] = '1.1'
req.method = 'PUT'
req.body = '{"key1": "value1"}'
req.headers["content-type"] = "application/json"
res = req.get_response(fakes.wsgi_app())
self.assertEqual(404, res.status_int)
def test_update_item_empty_body(self):
self.stubs.Set(nova.db.api, 'instance_metadata_update_or_create',
return_create_instance_metadata)

View File

@ -89,7 +89,7 @@ class FakeHttplibConnection(object):
class XmlConversionTestCase(test.TestCase):
"""Unit test api xml conversion"""
def test_number_conversion(self):
conv = apirequest._try_convert
conv = ec2utils._try_convert
self.assertEqual(conv('None'), None)
self.assertEqual(conv('True'), True)
self.assertEqual(conv('False'), False)

View File

@ -56,6 +56,7 @@ class CloudTestCase(test.TestCase):
self.compute = self.start_service('compute')
self.scheduter = self.start_service('scheduler')
self.network = self.start_service('network')
self.volume = self.start_service('volume')
self.image_service = utils.import_object(FLAGS.image_service)
self.manager = manager.AuthManager()
@ -373,14 +374,21 @@ class CloudTestCase(test.TestCase):
self.assertRaises(exception.ImageNotFound, deregister_image,
self.context, 'ami-bad001')
def test_console_output(self):
instance_type = FLAGS.default_instance_type
max_count = 1
kwargs = {'image_id': 'ami-1',
'instance_type': instance_type,
'max_count': max_count}
def _run_instance(self, **kwargs):
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
return instance_id
def _run_instance_wait(self, **kwargs):
ec2_instance_id = self._run_instance(**kwargs)
self._wait_for_running(ec2_instance_id)
return ec2_instance_id
def test_console_output(self):
instance_id = self._run_instance(
image_id='ami-1',
instance_type=FLAGS.default_instance_type,
max_count=1)
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT')
@ -389,9 +397,7 @@ class CloudTestCase(test.TestCase):
rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_ajax_console(self):
kwargs = {'image_id': 'ami-1'}
rv = self.cloud.run_instances(self.context, **kwargs)
instance_id = rv['instancesSet'][0]['instanceId']
instance_id = self._run_instance(image_id='ami-1')
output = self.cloud.get_ajax_console(context=self.context,
instance_id=[instance_id])
self.assertEquals(output['url'],
@ -569,3 +575,299 @@ class CloudTestCase(test.TestCase):
vol = db.volume_get(self.context, vol['id'])
self.assertEqual(None, vol['mountpoint'])
db.volume_destroy(self.context, vol['id'])
def _restart_compute_service(self, periodic_interval=None):
"""restart compute service. NOTE: fake driver forgets all instances."""
self.compute.kill()
if periodic_interval:
self.compute = self.start_service(
'compute', periodic_interval=periodic_interval)
else:
self.compute = self.start_service('compute')
def _wait_for_state(self, ctxt, instance_id, predicate):
"""Wait for an stopping instance to be a given state"""
id = ec2utils.ec2_id_to_id(instance_id)
while True:
info = self.cloud.compute_api.get(context=ctxt, instance_id=id)
LOG.debug(info)
if predicate(info):
break
greenthread.sleep(1)
def _wait_for_running(self, instance_id):
def is_running(info):
return info['state_description'] == 'running'
self._wait_for_state(self.context, instance_id, is_running)
def _wait_for_stopped(self, instance_id):
def is_stopped(info):
return info['state_description'] == 'stopped'
self._wait_for_state(self.context, instance_id, is_stopped)
def _wait_for_terminate(self, instance_id):
def is_deleted(info):
return info['deleted']
elevated = self.context.elevated(read_deleted=True)
self._wait_for_state(elevated, instance_id, is_deleted)
def test_stop_start_instance(self):
"""Makes sure stop/start instance works"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1, }
instance_id = self._run_instance_wait(**kwargs)
# a running instance can't be started. It is just ignored.
result = self.cloud.start_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
result = self.cloud.stop_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._wait_for_stopped(instance_id)
result = self.cloud.start_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._wait_for_running(instance_id)
result = self.cloud.stop_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._wait_for_stopped(instance_id)
result = self.cloud.terminate_instances(self.context, [instance_id])
greenthread.sleep(0.3)
self.assertTrue(result)
self._restart_compute_service()
def _volume_create(self):
kwargs = {'status': 'available',
'host': self.volume.host,
'size': 1,
'attach_status': 'detached', }
return db.volume_create(self.context, kwargs)
def _assert_volume_attached(self, vol, instance_id, mountpoint):
self.assertEqual(vol['instance_id'], instance_id)
self.assertEqual(vol['mountpoint'], mountpoint)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
def _assert_volume_detached(self, vol):
self.assertEqual(vol['instance_id'], None)
self.assertEqual(vol['mountpoint'], None)
self.assertEqual(vol['status'], "available")
self.assertEqual(vol['attach_status'], "detached")
def test_stop_start_with_volume(self):
"""Make sure run instance with block device mapping works"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
vol1 = self._volume_create()
vol2 = self._volume_create()
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1,
'block_device_mapping': [{'device_name': '/dev/vdb',
'volume_id': vol1['id'],
'delete_on_termination': False, },
{'device_name': '/dev/vdc',
'volume_id': vol2['id'],
'delete_on_termination': True, },
]}
ec2_instance_id = self._run_instance_wait(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
for vol in vols:
self.assertTrue(vol['id'] == vol1['id'] or vol['id'] == vol2['id'])
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdb')
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')
result = self.cloud.stop_instances(self.context, [ec2_instance_id])
self.assertTrue(result)
self._wait_for_stopped(ec2_instance_id)
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_detached(vol)
self.cloud.start_instances(self.context, [ec2_instance_id])
self._wait_for_running(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
for vol in vols:
self.assertTrue(vol['id'] == vol1['id'] or vol['id'] == vol2['id'])
self.assertTrue(vol['mountpoint'] == '/dev/vdb' or
vol['mountpoint'] == '/dev/vdc')
self.assertEqual(vol['instance_id'], instance_id)
self.assertEqual(vol['status'], "in-use")
self.assertEqual(vol['attach_status'], "attached")
self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1['id'])
self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1['id'])
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2['id'])
self.assertTrue(vol['deleted'])
self._restart_compute_service()
def test_stop_with_attached_volume(self):
"""Make sure attach info is reflected to block device mapping"""
# enforce periodic tasks run in short time to avoid wait for 60s.
self._restart_compute_service(periodic_interval=0.3)
vol1 = self._volume_create()
vol2 = self._volume_create()
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1,
'block_device_mapping': [{'device_name': '/dev/vdb',
'volume_id': vol1['id'],
'delete_on_termination': True}]}
ec2_instance_id = self._run_instance_wait(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 1)
for vol in vols:
self.assertEqual(vol['id'], vol1['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdb')
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_detached(vol)
self.cloud.compute_api.attach_volume(self.context,
instance_id=instance_id,
volume_id=vol2['id'],
device='/dev/vdc')
greenthread.sleep(0.3)
vol = db.volume_get(self.context, vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')
self.cloud.compute_api.detach_volume(self.context,
volume_id=vol1['id'])
greenthread.sleep(0.3)
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
result = self.cloud.stop_instances(self.context, [ec2_instance_id])
self.assertTrue(result)
self._wait_for_stopped(ec2_instance_id)
for vol_id in (vol1['id'], vol2['id']):
vol = db.volume_get(self.context, vol_id)
self._assert_volume_detached(vol)
self.cloud.start_instances(self.context, [ec2_instance_id])
self._wait_for_running(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 1)
for vol in vols:
self.assertEqual(vol['id'], vol2['id'])
self._assert_volume_attached(vol, instance_id, '/dev/vdc')
vol = db.volume_get(self.context, vol1['id'])
self._assert_volume_detached(vol)
self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
for vol_id in (vol1['id'], vol2['id']):
vol = db.volume_get(self.context, vol_id)
self.assertEqual(vol['id'], vol_id)
self._assert_volume_detached(vol)
db.volume_destroy(self.context, vol_id)
self._restart_compute_service()
def _create_snapshot(self, ec2_volume_id):
result = self.cloud.create_snapshot(self.context,
volume_id=ec2_volume_id)
greenthread.sleep(0.3)
return result['snapshotId']
def test_run_with_snapshot(self):
"""Makes sure run/stop/start instance with snapshot works."""
vol = self._volume_create()
ec2_volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x')
ec2_snapshot1_id = self._create_snapshot(ec2_volume_id)
snapshot1_id = ec2utils.ec2_id_to_id(ec2_snapshot1_id)
ec2_snapshot2_id = self._create_snapshot(ec2_volume_id)
snapshot2_id = ec2utils.ec2_id_to_id(ec2_snapshot2_id)
kwargs = {'image_id': 'ami-1',
'instance_type': FLAGS.default_instance_type,
'max_count': 1,
'block_device_mapping': [{'device_name': '/dev/vdb',
'snapshot_id': snapshot1_id,
'delete_on_termination': False, },
{'device_name': '/dev/vdc',
'snapshot_id': snapshot2_id,
'delete_on_termination': True}]}
ec2_instance_id = self._run_instance_wait(**kwargs)
instance_id = ec2utils.ec2_id_to_id(ec2_instance_id)
vols = db.volume_get_all_by_instance(self.context, instance_id)
self.assertEqual(len(vols), 2)
vol1_id = None
vol2_id = None
for vol in vols:
snapshot_id = vol['snapshot_id']
if snapshot_id == snapshot1_id:
vol1_id = vol['id']
mountpoint = '/dev/vdb'
elif snapshot_id == snapshot2_id:
vol2_id = vol['id']
mountpoint = '/dev/vdc'
else:
self.fail()
self._assert_volume_attached(vol, instance_id, mountpoint)
self.assertTrue(vol1_id)
self.assertTrue(vol2_id)
self.cloud.terminate_instances(self.context, [ec2_instance_id])
greenthread.sleep(0.3)
self._wait_for_terminate(ec2_instance_id)
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=False)
vol = db.volume_get(admin_ctxt, vol1_id)
self._assert_volume_detached(vol)
self.assertFalse(vol['deleted'])
db.volume_destroy(self.context, vol1_id)
greenthread.sleep(0.3)
admin_ctxt = context.get_admin_context(read_deleted=True)
vol = db.volume_get(admin_ctxt, vol2_id)
self.assertTrue(vol['deleted'])
for snapshot_id in (ec2_snapshot1_id, ec2_snapshot2_id):
self.cloud.delete_snapshot(self.context, snapshot_id)
greenthread.sleep(0.3)
db.volume_destroy(self.context, vol['id'])

View File

@ -228,6 +228,21 @@ class ComputeTestCase(test.TestCase):
self.assert_(instance_ref['launched_at'] < terminate)
self.assert_(instance_ref['deleted_at'] > terminate)
def test_stop(self):
"""Ensure instance can be stopped"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.compute.stop_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
def test_start(self):
"""Ensure instance can be started"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
self.compute.stop_instance(self.context, instance_id)
self.compute.start_instance(self.context, instance_id)
self.compute.terminate_instance(self.context, instance_id)
def test_pause(self):
"""Ensure instance can be paused"""
instance_id = self._create_instance()

View File

@ -61,7 +61,7 @@ class ComputeDriver(object):
"""Return a list of InstanceInfo for all registered VMs"""
raise NotImplementedError()
def spawn(self, instance, network_info=None):
def spawn(self, instance, network_info=None, block_device_mapping=None):
"""Launch a VM for the specified instance"""
raise NotImplementedError()

View File

@ -129,7 +129,7 @@ class FakeConnection(driver.ComputeDriver):
info_list.append(self._map_to_instance_info(instance))
return info_list
def spawn(self, instance):
def spawn(self, instance, network_info=None, block_device_mapping=None):
"""
Create a new instance/VM/domain on the virtualization platform.
@ -237,6 +237,10 @@ class FakeConnection(driver.ComputeDriver):
"""
pass
def poll_rescued_instances(self, timeout):
"""Poll for rescued instances"""
pass
def migrate_disk_and_power_off(self, instance, dest):
"""
Transfers the disk of a running instance in multiple phases, turning

View File

@ -139,7 +139,7 @@ class HyperVConnection(driver.ComputeDriver):
return instance_infos
def spawn(self, instance):
def spawn(self, instance, network_info=None, block_device_mapping=None):
""" Create a new VM and start it."""
vm = self._lookup(instance.name)
if vm is not None:

View File

@ -67,11 +67,13 @@
<target dev='${disk_prefix}b' bus='${disk_bus}'/>
</disk>
#else
#if not ($getVar('ebs_root', False))
<disk type='file'>
<driver type='${driver_type}'/>
<source file='${basepath}/disk'/>
<target dev='${disk_prefix}a' bus='${disk_bus}'/>
</disk>
#end if
#if $getVar('local', False)
<disk type='file'>
<driver type='${driver_type}'/>
@ -79,6 +81,13 @@
<target dev='${disk_prefix}b' bus='${disk_bus}'/>
</disk>
#end if
#for $vol in $volumes
<disk type='block'>
<driver type='raw'/>
<source dev='${vol.device_path}'/>
<target dev='${vol.mount_device}' bus='${disk_bus}'/>
</disk>
#end for
#end if
#end if

View File

@ -40,6 +40,7 @@ import hashlib
import multiprocessing
import os
import random
import re
import shutil
import subprocess
import sys
@ -148,6 +149,10 @@ def _late_load_cheetah():
Template = t.Template
def _strip_dev(mount_path):
return re.sub(r'^/dev/', '', mount_path)
class LibvirtConnection(driver.ComputeDriver):
def __init__(self, read_only):
@ -575,11 +580,14 @@ class LibvirtConnection(driver.ComputeDriver):
# NOTE(ilyaalekseyev): Implementation like in multinics
# for xenapi(tr3buchet)
@exception.wrap_exception
def spawn(self, instance, network_info=None):
xml = self.to_xml(instance, False, network_info)
def spawn(self, instance, network_info=None, block_device_mapping=None):
xml = self.to_xml(instance, False, network_info=network_info,
block_device_mapping=block_device_mapping)
block_device_mapping = block_device_mapping or []
self.firewall_driver.setup_basic_filtering(instance, network_info)
self.firewall_driver.prepare_instance_filter(instance, network_info)
self._create_image(instance, xml, network_info=network_info)
self._create_image(instance, xml, network_info=network_info,
block_device_mapping=block_device_mapping)
domain = self._create_new_domain(xml)
LOG.debug(_("instance %s: is running"), instance['name'])
self.firewall_driver.apply_instance_filter(instance)
@ -761,7 +769,8 @@ class LibvirtConnection(driver.ComputeDriver):
# TODO(vish): should we format disk by default?
def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None,
network_info=None):
network_info=None, block_device_mapping=None):
block_device_mapping = block_device_mapping or []
if not network_info:
network_info = netutils.get_network_info(inst)
@ -824,6 +833,8 @@ class LibvirtConnection(driver.ComputeDriver):
size = None
root_fname += "_sm"
if not self._volume_in_mapping(self.root_mount_device,
block_device_mapping):
self._cache_image(fn=self._fetch_image,
target=basepath('disk'),
fname=root_fname,
@ -833,7 +844,8 @@ class LibvirtConnection(driver.ComputeDriver):
project=project,
size=size)
if inst_type['local_gb']:
if inst_type['local_gb'] and not self._volume_in_mapping(
self.local_mount_device, block_device_mapping):
self._cache_image(fn=self._create_local,
target=basepath('disk.local'),
fname="local_%s" % inst_type['local_gb'],
@ -948,7 +960,20 @@ class LibvirtConnection(driver.ComputeDriver):
return result
def _prepare_xml_info(self, instance, rescue=False, network_info=None):
root_mount_device = 'vda' # FIXME for now. it's hard coded.
local_mount_device = 'vdb' # FIXME for now. it's hard coded.
def _volume_in_mapping(self, mount_device, block_device_mapping):
mount_device_ = _strip_dev(mount_device)
for vol in block_device_mapping:
vol_mount_device = _strip_dev(vol['mount_device'])
if vol_mount_device == mount_device_:
return True
return False
def _prepare_xml_info(self, instance, rescue=False, network_info=None,
block_device_mapping=None):
block_device_mapping = block_device_mapping or []
# TODO(adiantum) remove network_info creation code
# when multinics will be completed
if not network_info:
@ -966,6 +991,16 @@ class LibvirtConnection(driver.ComputeDriver):
else:
driver_type = 'raw'
for vol in block_device_mapping:
vol['mount_device'] = _strip_dev(vol['mount_device'])
ebs_root = self._volume_in_mapping(self.root_mount_device,
block_device_mapping)
if self._volume_in_mapping(self.local_mount_device,
block_device_mapping):
local_gb = False
else:
local_gb = inst_type['local_gb']
xml_info = {'type': FLAGS.libvirt_type,
'name': instance['name'],
'basepath': os.path.join(FLAGS.instances_path,
@ -973,9 +1008,11 @@ class LibvirtConnection(driver.ComputeDriver):
'memory_kb': inst_type['memory_mb'] * 1024,
'vcpus': inst_type['vcpus'],
'rescue': rescue,
'local': inst_type['local_gb'],
'local': local_gb,
'driver_type': driver_type,
'nics': nics}
'nics': nics,
'ebs_root': ebs_root,
'volumes': block_device_mapping}
if FLAGS.vnc_enabled:
if FLAGS.libvirt_type != 'lxc':
@ -991,10 +1028,13 @@ class LibvirtConnection(driver.ComputeDriver):
xml_info['disk'] = xml_info['basepath'] + "/disk"
return xml_info
def to_xml(self, instance, rescue=False, network_info=None):
def to_xml(self, instance, rescue=False, network_info=None,
block_device_mapping=None):
block_device_mapping = block_device_mapping or []
# TODO(termie): cache?
LOG.debug(_('instance %s: starting toXML method'), instance['name'])
xml_info = self._prepare_xml_info(instance, rescue, network_info)
xml_info = self._prepare_xml_info(instance, rescue, network_info,
block_device_mapping)
xml = str(Template(self.libvirt_xml, searchList=[xml_info]))
LOG.debug(_('instance %s: finished toXML method'), instance['name'])
return xml

View File

@ -124,7 +124,7 @@ class VMWareESXConnection(driver.ComputeDriver):
"""List VM instances."""
return self._vmops.list_instances()
def spawn(self, instance):
def spawn(self, instance, network_info=None, block_device_mapping=None):
"""Create VM instance."""
self._vmops.spawn(instance)

View File

@ -194,7 +194,7 @@ class XenAPIConnection(driver.ComputeDriver):
def list_instances_detail(self):
return self._vmops.list_instances_detail()
def spawn(self, instance):
def spawn(self, instance, network_info=None, block_device_mapping=None):
"""Create VM instance"""
self._vmops.spawn(instance)

View File

@ -21,6 +21,9 @@ Handles all requests relating to volumes.
"""
from eventlet import greenthread
from nova import db
from nova import exception
from nova import flags
from nova import log as logging
@ -44,6 +47,7 @@ class API(base.Base):
if snapshot['status'] != "available":
raise exception.ApiError(
_("Snapshot status must be available"))
if not size:
size = snapshot['volume_size']
if quota.allowed_volumes(context, 1, size) < 1:
@ -73,6 +77,14 @@ class API(base.Base):
"snapshot_id": snapshot_id}})
return volume
# TODO(yamahata): eliminate dumb polling
def wait_creation(self, context, volume_id):
while True:
volume = self.get(context, volume_id)
if volume['status'] != 'creating':
return
greenthread.sleep(1)
def delete(self, context, volume_id):
volume = self.get(context, volume_id)
if volume['status'] != "available":

View File

@ -582,6 +582,14 @@ class FakeISCSIDriver(ISCSIDriver):
"""No setup necessary in fake mode."""
pass
def discover_volume(self, context, volume):
"""Discover volume on a remote host."""
return "/dev/disk/by-path/volume-id-%d" % volume['id']
def undiscover_volume(self, volume):
"""Undiscover volume on a remote host."""
pass
@staticmethod
def fake_execute(cmd, *_args, **_kwargs):
"""Execute that simply logs the command."""