diff --git a/nova/api/ec2/apirequest.py b/nova/api/ec2/apirequest.py index 6672e60bbf3b..7d78c5cfada5 100644 --- a/nova/api/ec2/apirequest.py +++ b/nova/api/ec2/apirequest.py @@ -21,22 +21,15 @@ APIRequest class """ import datetime -import re # TODO(termie): replace minidom with etree from xml.dom import minidom from nova import log as logging +from nova.api.ec2 import ec2utils LOG = logging.getLogger("nova.api.request") -_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))') - - -def _camelcase_to_underscore(str): - return _c2u.sub(r'_\1', str).lower().strip('_') - - def _underscore_to_camelcase(str): return ''.join([x[:1].upper() + x[1:] for x in str.split('_')]) @@ -51,59 +44,6 @@ def _database_to_isoformat(datetimeobj): return datetimeobj.strftime("%Y-%m-%dT%H:%M:%SZ") -def _try_convert(value): - """Return a non-string from a string or unicode, if possible. - - ============= ===================================================== - When value is returns - ============= ===================================================== - zero-length '' - 'None' None - 'True' True - 'False' False - '0', '-0' 0 - 0xN, -0xN int from hex (postitive) (N is any number) - 0bN, -0bN int from binary (positive) (N is any number) - * try conversion to int, float, complex, fallback value - - """ - if len(value) == 0: - return '' - if value == 'None': - return None - if value == 'True': - return True - if value == 'False': - return False - valueneg = value[1:] if value[0] == '-' else value - if valueneg == '0': - return 0 - if valueneg == '': - return value - if valueneg[0] == '0': - if valueneg[1] in 'xX': - return int(value, 16) - elif valueneg[1] in 'bB': - return int(value, 2) - else: - try: - return int(value, 8) - except ValueError: - pass - try: - return int(value) - except ValueError: - pass - try: - return float(value) - except ValueError: - pass - try: - return complex(value) - except ValueError: - return value - - class APIRequest(object): def __init__(self, controller, action, version, args): self.controller = controller @@ -114,7 +54,7 @@ class APIRequest(object): def invoke(self, context): try: method = getattr(self.controller, - _camelcase_to_underscore(self.action)) + ec2utils.camelcase_to_underscore(self.action)) except AttributeError: controller = self.controller action = self.action @@ -125,19 +65,7 @@ class APIRequest(object): # and reraise as 400 error. raise Exception(_error) - args = {} - for key, value in self.args.items(): - parts = key.split(".") - key = _camelcase_to_underscore(parts[0]) - if isinstance(value, str) or isinstance(value, unicode): - # NOTE(vish): Automatically convert strings back - # into their respective values - value = _try_convert(value) - if len(parts) > 1: - d = args.get(key, {}) - d[parts[1]] = value - value = d - args[key] = value + args = ec2utils.dict_from_dotted_str(self.args.items()) for key in args.keys(): # NOTE(vish): Turn numeric dict keys into lists diff --git a/nova/api/ec2/cloud.py b/nova/api/ec2/cloud.py index c8a7e69be5fc..97875f1f59d6 100644 --- a/nova/api/ec2/cloud.py +++ b/nova/api/ec2/cloud.py @@ -909,6 +909,25 @@ class CloudController(object): if kwargs.get('ramdisk_id'): ramdisk = self._get_image(context, kwargs['ramdisk_id']) kwargs['ramdisk_id'] = ramdisk['id'] + for bdm in kwargs.get('block_device_mapping', []): + # NOTE(yamahata) + # BlockDevicedMapping..DeviceName + # BlockDevicedMapping..Ebs.SnapshotId + # BlockDevicedMapping..Ebs.VolumeSize + # BlockDevicedMapping..Ebs.DeleteOnTermination + # BlockDevicedMapping..VirtualName + # => remove .Ebs and allow volume id in SnapshotId + ebs = bdm.pop('ebs', None) + if ebs: + ec2_id = ebs.pop('snapshot_id') + id = ec2utils.ec2_id_to_id(ec2_id) + if ec2_id.startswith('snap-'): + bdm['snapshot_id'] = id + elif ec2_id.startswith('vol-'): + bdm['volume_id'] = id + ebs.setdefault('delete_on_termination', True) + bdm.update(ebs) + image = self._get_image(context, kwargs['image_id']) if image: @@ -933,37 +952,54 @@ class CloudController(object): user_data=kwargs.get('user_data'), security_group=kwargs.get('security_group'), availability_zone=kwargs.get('placement', {}).get( - 'AvailabilityZone')) + 'AvailabilityZone'), + block_device_mapping=kwargs.get('block_device_mapping', {})) return self._format_run_instances(context, instances[0]['reservation_id']) + def _do_instance(self, action, context, ec2_id): + instance_id = ec2utils.ec2_id_to_id(ec2_id) + action(context, instance_id=instance_id) + + def _do_instances(self, action, context, instance_id): + for ec2_id in instance_id: + self._do_instance(action, context, ec2_id) + def terminate_instances(self, context, instance_id, **kwargs): """Terminate each instance in instance_id, which is a list of ec2 ids. instance_id is a kwarg so its name cannot be modified.""" LOG.debug(_("Going to start terminating instances")) - for ec2_id in instance_id: - instance_id = ec2utils.ec2_id_to_id(ec2_id) - self.compute_api.delete(context, instance_id=instance_id) + self._do_instances(self.compute_api.delete, context, instance_id) return True def reboot_instances(self, context, instance_id, **kwargs): """instance_id is a list of instance ids""" LOG.audit(_("Reboot instance %r"), instance_id, context=context) - for ec2_id in instance_id: - instance_id = ec2utils.ec2_id_to_id(ec2_id) - self.compute_api.reboot(context, instance_id=instance_id) + self._do_instances(self.compute_api.reboot, context, instance_id) + return True + + def stop_instances(self, context, instance_id, **kwargs): + """Stop each instances in instance_id. + Here instance_id is a list of instance ids""" + LOG.debug(_("Going to stop instances")) + self._do_instances(self.compute_api.stop, context, instance_id) + return True + + def start_instances(self, context, instance_id, **kwargs): + """Start each instances in instance_id. + Here instance_id is a list of instance ids""" + LOG.debug(_("Going to start instances")) + self._do_instances(self.compute_api.start, context, instance_id) return True def rescue_instance(self, context, instance_id, **kwargs): """This is an extension to the normal ec2_api""" - instance_id = ec2utils.ec2_id_to_id(instance_id) - self.compute_api.rescue(context, instance_id=instance_id) + self._do_instance(self.compute_api.rescue, contect, instnace_id) return True def unrescue_instance(self, context, instance_id, **kwargs): """This is an extension to the normal ec2_api""" - instance_id = ec2utils.ec2_id_to_id(instance_id) - self.compute_api.unrescue(context, instance_id=instance_id) + self._do_instance(self.compute_api.unrescue, context, instance_id) return True def update_instance(self, context, instance_id, **kwargs): diff --git a/nova/api/ec2/ec2utils.py b/nova/api/ec2/ec2utils.py index 163aa4ed2d64..222e1de1efba 100644 --- a/nova/api/ec2/ec2utils.py +++ b/nova/api/ec2/ec2utils.py @@ -16,6 +16,8 @@ # License for the specific language governing permissions and limitations # under the License. +import re + from nova import exception @@ -30,3 +32,95 @@ def ec2_id_to_id(ec2_id): def id_to_ec2_id(instance_id, template='i-%08x'): """Convert an instance ID (int) to an ec2 ID (i-[base 16 number])""" return template % instance_id + + +_c2u = re.compile('(((?<=[a-z])[A-Z])|([A-Z](?![A-Z]|$)))') + + +def camelcase_to_underscore(str): + return _c2u.sub(r'_\1', str).lower().strip('_') + + +def _try_convert(value): + """Return a non-string from a string or unicode, if possible. + + ============= ===================================================== + When value is returns + ============= ===================================================== + zero-length '' + 'None' None + 'True' True case insensitive + 'False' False case insensitive + '0', '-0' 0 + 0xN, -0xN int from hex (postitive) (N is any number) + 0bN, -0bN int from binary (positive) (N is any number) + * try conversion to int, float, complex, fallback value + + """ + if len(value) == 0: + return '' + if value == 'None': + return None + lowered_value = value.lower() + if lowered_value == 'true': + return True + if lowered_value == 'false': + return False + valueneg = value[1:] if value[0] == '-' else value + if valueneg == '0': + return 0 + if valueneg == '': + return value + if valueneg[0] == '0': + if valueneg[1] in 'xX': + return int(value, 16) + elif valueneg[1] in 'bB': + return int(value, 2) + else: + try: + return int(value, 8) + except ValueError: + pass + try: + return int(value) + except ValueError: + pass + try: + return float(value) + except ValueError: + pass + try: + return complex(value) + except ValueError: + return value + + +def dict_from_dotted_str(items): + """parse multi dot-separated argument into dict. + EBS boot uses multi dot-separeted arguments like + BlockDeviceMapping.1.DeviceName=snap-id + Convert the above into + {'block_device_mapping': {'1': {'device_name': snap-id}}} + """ + args = {} + for key, value in items: + parts = key.split(".") + key = camelcase_to_underscore(parts[0]) + if isinstance(value, str) or isinstance(value, unicode): + # NOTE(vish): Automatically convert strings back + # into their respective values + value = _try_convert(value) + + if len(parts) > 1: + d = args.get(key, {}) + args[key] = d + for k in parts[1:-1]: + k = camelcase_to_underscore(k) + v = d.get(k, {}) + d[k] = v + d = v + d[camelcase_to_underscore(parts[-1])] = value + else: + args[key] = value + + return args diff --git a/nova/compute/api.py b/nova/compute/api.py index f1c31a092621..b81aecb7004e 100644 --- a/nova/compute/api.py +++ b/nova/compute/api.py @@ -34,6 +34,7 @@ from nova import utils from nova import volume from nova.compute import instance_types from nova.compute import power_state +from nova.compute.utils import terminate_volumes from nova.scheduler import api as scheduler_api from nova.db import base @@ -52,6 +53,18 @@ def generate_default_hostname(instance_id): return str(instance_id) +def _is_able_to_shutdown(instance, instance_id): + states = {'terminating': "Instance %s is already being terminated", + 'migrating': "Instance %s is being migrated", + 'stopping': "Instance %s is being stopped"} + msg = states.get(instance['state_description']) + if msg: + LOG.warning(_(msg), instance_id) + return False + + return True + + class API(base.Base): """API for interacting with the compute manager.""" @@ -235,7 +248,7 @@ class API(base.Base): return (num_instances, base_options, security_groups) def create_db_entry_for_new_instance(self, context, base_options, - security_groups, num=1): + security_groups, block_device_mapping, num=1): """Create an entry in the DB for this new instance, including any related table updates (such as security groups, MAC address, etc). This will called by create() @@ -255,6 +268,23 @@ class API(base.Base): instance_id, security_group_id) + # NOTE(yamahata) + # tell vm driver to attach volume at boot time by updating + # BlockDeviceMapping + for bdm in block_device_mapping: + LOG.debug(_('bdm %s'), bdm) + assert 'device_name' in bdm + values = { + 'instance_id': instance_id, + 'device_name': bdm['device_name'], + 'delete_on_termination': bdm.get('delete_on_termination'), + 'virtual_name': bdm.get('virtual_name'), + 'snapshot_id': bdm.get('snapshot_id'), + 'volume_id': bdm.get('volume_id'), + 'volume_size': bdm.get('volume_size'), + 'no_device': bdm.get('no_device')} + self.db.block_device_mapping_create(elevated, values) + # Set sane defaults if not specified updates = dict(hostname=self.hostname_factory(instance_id)) if (not hasattr(instance, 'display_name') or @@ -339,7 +369,7 @@ class API(base.Base): key_name=None, key_data=None, security_group='default', availability_zone=None, user_data=None, metadata={}, injected_files=None, admin_password=None, zone_blob=None, - reservation_id=None): + reservation_id=None, block_device_mapping=None): """ Provision the instances by sending off a series of single instance requests to the Schedulers. This is fine for trival @@ -360,11 +390,13 @@ class API(base.Base): injected_files, admin_password, zone_blob, reservation_id) + block_device_mapping = block_device_mapping or [] instances = [] LOG.debug(_("Going to run %s instances..."), num_instances) for num in range(num_instances): instance = self.create_db_entry_for_new_instance(context, - base_options, security_groups, num=num) + base_options, security_groups, + block_device_mapping, num=num) instances.append(instance) instance_id = instance['id'] @@ -474,24 +506,22 @@ class API(base.Base): rv = self.db.instance_update(context, instance_id, kwargs) return dict(rv.iteritems()) + def _get_instance(self, context, instance_id, action_str): + try: + return self.get(context, instance_id) + except exception.NotFound: + LOG.warning(_("Instance %(instance_id)s was not found during " + "%(action_str)s") % + {'instance_id': instance_id, 'action_str': action_str}) + raise + @scheduler_api.reroute_compute("delete") def delete(self, context, instance_id): """Terminate an instance.""" LOG.debug(_("Going to try to terminate %s"), instance_id) - try: - instance = self.get(context, instance_id) - except exception.NotFound: - LOG.warning(_("Instance %s was not found during terminate"), - instance_id) - raise + instance = self._get_instance(context, instance_id, 'terminating') - if instance['state_description'] == 'terminating': - LOG.warning(_("Instance %s is already being terminated"), - instance_id) - return - - if instance['state_description'] == 'migrating': - LOG.warning(_("Instance %s is being migrated"), instance_id) + if not _is_able_to_shutdown(instance, instance_id): return self.update(context, @@ -505,8 +535,48 @@ class API(base.Base): self._cast_compute_message('terminate_instance', context, instance_id, host) else: + terminate_volumes(self.db, context, instance_id) self.db.instance_destroy(context, instance_id) + @scheduler_api.reroute_compute("stop") + def stop(self, context, instance_id): + """Stop an instance.""" + LOG.debug(_("Going to try to stop %s"), instance_id) + + instance = self._get_instance(context, instance_id, 'stopping') + if not _is_able_to_shutdown(instance, instance_id): + return + + self.update(context, + instance['id'], + state_description='stopping', + state=power_state.NOSTATE, + terminated_at=utils.utcnow()) + + host = instance['host'] + if host: + self._cast_compute_message('stop_instance', context, + instance_id, host) + + def start(self, context, instance_id): + """Start an instance.""" + LOG.debug(_("Going to try to start %s"), instance_id) + instance = self._get_instance(context, instance_id, 'starting') + if instance['state_description'] != 'stopped': + _state_description = instance['state_description'] + LOG.warning(_("Instance %(instance_id)s is not " + "stopped(%(_state_description)s)") % locals()) + return + + # TODO(yamahata): injected_files isn't supported right now. + # It is used only for osapi. not for ec2 api. + # availability_zone isn't used by run_instance. + rpc.cast(context, + FLAGS.scheduler_topic, + {"method": "start_instance", + "args": {"topic": FLAGS.compute_topic, + "instance_id": instance_id}}) + def get(self, context, instance_id): """Get a single instance with the given instance_id.""" rv = self.db.instance_get(context, instance_id) diff --git a/nova/compute/manager.py b/nova/compute/manager.py index 245958de79d9..8ab744855948 100644 --- a/nova/compute/manager.py +++ b/nova/compute/manager.py @@ -53,6 +53,7 @@ from nova import rpc from nova import utils from nova import volume from nova.compute import power_state +from nova.compute.utils import terminate_volumes from nova.virt import driver @@ -214,8 +215,63 @@ class ComputeManager(manager.SchedulerDependentManager): """ return self.driver.refresh_security_group_members(security_group_id) - @exception.wrap_exception - def run_instance(self, context, instance_id, **kwargs): + def _setup_block_device_mapping(self, context, instance_id): + """setup volumes for block device mapping""" + self.db.instance_set_state(context, + instance_id, + power_state.NOSTATE, + 'block_device_mapping') + + volume_api = volume.API() + block_device_mapping = [] + for bdm in self.db.block_device_mapping_get_all_by_instance( + context, instance_id): + LOG.debug(_("setting up bdm %s"), bdm) + if ((bdm['snapshot_id'] is not None) and + (bdm['volume_id'] is None)): + # TODO(yamahata): default name and description + vol = volume_api.create(context, bdm['volume_size'], + bdm['snapshot_id'], '', '') + # TODO(yamahata): creating volume simultaneously + # reduces creation time? + volume_api.wait_creation(context, vol['id']) + self.db.block_device_mapping_update( + context, bdm['id'], {'volume_id': vol['id']}) + bdm['volume_id'] = vol['id'] + + if not ((bdm['snapshot_id'] is None) or + (bdm['volume_id'] is not None)): + LOG.error(_('corrupted state of block device mapping ' + 'id: %(id)s ' + 'snapshot: %(snapshot_id) volume: %(vollume_id)') % + {'id': bdm['id'], + 'snapshot_id': bdm['snapshot'], + 'volume_id': bdm['volume_id']}) + raise exception.ApiError(_('broken block device mapping %d') % + bdm['id']) + + if bdm['volume_id'] is not None: + volume_api.check_attach(context, + volume_id=bdm['volume_id']) + dev_path = self._attach_volume_boot(context, instance_id, + bdm['volume_id'], + bdm['device_name']) + block_device_mapping.append({'device_path': dev_path, + 'mount_device': + bdm['device_name']}) + elif bdm['virtual_name'] is not None: + # TODO(yamahata): ephemeral/swap device support + LOG.debug(_('block_device_mapping: ' + 'ephemeral device is not supported yet')) + else: + # TODO(yamahata): NoDevice support + assert bdm['no_device'] + LOG.debug(_('block_device_mapping: ' + 'no device is not supported yet')) + + return block_device_mapping + + def _run_instance(self, context, instance_id, **kwargs): """Launch a new instance with specified options.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) @@ -249,11 +305,15 @@ class ComputeManager(manager.SchedulerDependentManager): self.network_manager.setup_compute_network(context, instance_id) + block_device_mapping = self._setup_block_device_mapping(context, + instance_id) + # TODO(vish) check to make sure the availability zone matches self._update_state(context, instance_id, power_state.BUILDING) try: - self.driver.spawn(instance_ref) + self.driver.spawn(instance_ref, + block_device_mapping=block_device_mapping) except Exception as ex: # pylint: disable=W0702 msg = _("Instance '%(instance_id)s' failed to spawn. Is " "virtualization enabled in the BIOS? Details: " @@ -276,13 +336,25 @@ class ComputeManager(manager.SchedulerDependentManager): self._update_launched_at(context, instance_id) self._update_state(context, instance_id) + @exception.wrap_exception + def run_instance(self, context, instance_id, **kwargs): + self._run_instance(context, instance_id, **kwargs) + @exception.wrap_exception @checks_instance_lock - def terminate_instance(self, context, instance_id): - """Terminate an instance on this host.""" + def start_instance(self, context, instance_id): + """Starting an instance on this host.""" + # TODO(yamahata): injected_files isn't supported. + # Anyway OSAPI doesn't support stop/start yet + self._run_instance(context, instance_id) + + def _shutdown_instance(self, context, instance_id, action_str): + """Shutdown an instance on this host.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) - LOG.audit(_("Terminating instance %s"), instance_id, context=context) + LOG.audit(_("%(action_str)s instance %(instance_id)s") % + {'action_str': action_str, 'instance_id': instance_id}, + context=context) fixed_ip = instance_ref.get('fixed_ip') if not FLAGS.stub_network and fixed_ip: @@ -318,16 +390,34 @@ class ComputeManager(manager.SchedulerDependentManager): volumes = instance_ref.get('volumes') or [] for volume in volumes: - self.detach_volume(context, instance_id, volume['id']) - if instance_ref['state'] == power_state.SHUTOFF: + self._detach_volume(context, instance_id, volume['id'], False) + + if (instance_ref['state'] == power_state.SHUTOFF and + instance_ref['state_description'] != 'stopped'): self.db.instance_destroy(context, instance_id) raise exception.Error(_('trying to destroy already destroyed' ' instance: %s') % instance_id) self.driver.destroy(instance_ref) + if action_str == 'Terminating': + terminate_volumes(self.db, context, instance_id) + + @exception.wrap_exception + @checks_instance_lock + def terminate_instance(self, context, instance_id): + """Terminate an instance on this host.""" + self._shutdown_instance(context, instance_id, 'Terminating') + # TODO(ja): should we keep it in a terminated state for a bit? self.db.instance_destroy(context, instance_id) + @exception.wrap_exception + @checks_instance_lock + def stop_instance(self, context, instance_id): + """Stopping an instance on this host.""" + self._shutdown_instance(context, instance_id, 'Stopping') + # instance state will be updated to stopped by _poll_instance_states() + @exception.wrap_exception @checks_instance_lock def rebuild_instance(self, context, instance_id, **kwargs): @@ -800,6 +890,22 @@ class ComputeManager(manager.SchedulerDependentManager): instance_ref = self.db.instance_get(context, instance_id) return self.driver.get_vnc_console(instance_ref) + def _attach_volume_boot(self, context, instance_id, volume_id, mountpoint): + """Attach a volume to an instance at boot time. So actual attach + is done by instance creation""" + + # TODO(yamahata): + # should move check_attach to volume manager? + volume.API().check_attach(context, volume_id) + + context = context.elevated() + LOG.audit(_("instance %(instance_id)s: booting with " + "volume %(volume_id)s at %(mountpoint)s") % + locals(), context=context) + dev_path = self.volume_manager.setup_compute_volume(context, volume_id) + self.db.volume_attached(context, volume_id, instance_id, mountpoint) + return dev_path + @checks_instance_lock def attach_volume(self, context, instance_id, volume_id, mountpoint): """Attach a volume to an instance.""" @@ -817,6 +923,16 @@ class ComputeManager(manager.SchedulerDependentManager): volume_id, instance_id, mountpoint) + values = { + 'instance_id': instance_id, + 'device_name': mountpoint, + 'delete_on_termination': False, + 'virtual_name': None, + 'snapshot_id': None, + 'volume_id': volume_id, + 'volume_size': None, + 'no_device': None} + self.db.block_device_mapping_create(context, values) except Exception as exc: # pylint: disable=W0702 # NOTE(vish): The inline callback eats the exception info so we # log the traceback here and reraise the same @@ -831,7 +947,7 @@ class ComputeManager(manager.SchedulerDependentManager): @exception.wrap_exception @checks_instance_lock - def detach_volume(self, context, instance_id, volume_id): + def _detach_volume(self, context, instance_id, volume_id, destroy_bdm): """Detach a volume from an instance.""" context = context.elevated() instance_ref = self.db.instance_get(context, instance_id) @@ -847,8 +963,15 @@ class ComputeManager(manager.SchedulerDependentManager): volume_ref['mountpoint']) self.volume_manager.remove_compute_volume(context, volume_id) self.db.volume_detached(context, volume_id) + if destroy_bdm: + self.db.block_device_mapping_destroy_by_instance_and_volume( + context, instance_id, volume_id) return True + def detach_volume(self, context, instance_id, volume_id): + """Detach a volume from an instance.""" + return self._detach_volume(context, instance_id, volume_id, True) + def remove_volume(self, context, volume_id): """Remove volume on compute host. @@ -1174,11 +1297,14 @@ class ComputeManager(manager.SchedulerDependentManager): "State=%(db_state)s, so setting state to " "shutoff.") % locals()) vm_state = power_state.SHUTOFF + if db_instance['state_description'] == 'stopping': + self.db.instance_stop(context, db_instance['id']) + continue else: vm_state = vm_instance.state vms_not_found_in_db.remove(name) - if db_instance['state_description'] == 'migrating': + if (db_instance['state_description'] in ['migrating', 'stopping']): # A situation which db record exists, but no instance" # sometimes occurs while live-migration at src compute, # this case should be ignored. diff --git a/nova/compute/utils.py b/nova/compute/utils.py new file mode 100644 index 000000000000..c8cb9bab8dbf --- /dev/null +++ b/nova/compute/utils.py @@ -0,0 +1,29 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright (c) 2011 VA Linux Systems Japan K.K +# Copyright (c) 2011 Isaku Yamahata +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from nova import volume + + +def terminate_volumes(db, context, instance_id): + """delete volumes of delete_on_termination=True in block device mapping""" + volume_api = volume.API() + for bdm in db.block_device_mapping_get_all_by_instance(context, + instance_id): + #LOG.debug(_("terminating bdm %s") % bdm) + if bdm['volume_id'] and bdm['delete_on_termination']: + volume_api.delete(context, bdm['volume_id']) + db.block_device_mapping_destroy(context, bdm['id']) diff --git a/nova/db/api.py b/nova/db/api.py index 4e0aa60a2924..117c235eacae 100644 --- a/nova/db/api.py +++ b/nova/db/api.py @@ -414,6 +414,11 @@ def instance_destroy(context, instance_id): return IMPL.instance_destroy(context, instance_id) +def instance_stop(context, instance_id): + """Stop the instance or raise if it does not exist.""" + return IMPL.instance_stop(context, instance_id) + + def instance_get(context, instance_id): """Get an instance or raise if it does not exist.""" return IMPL.instance_get(context, instance_id) @@ -920,6 +925,36 @@ def snapshot_update(context, snapshot_id, values): #################### +def block_device_mapping_create(context, values): + """Create an entry of block device mapping""" + return IMPL.block_device_mapping_create(context, values) + + +def block_device_mapping_update(context, bdm_id, values): + """Create an entry of block device mapping""" + return IMPL.block_device_mapping_update(context, bdm_id, values) + + +def block_device_mapping_get_all_by_instance(context, instance_id): + """Get all block device mapping belonging to a instance""" + return IMPL.block_device_mapping_get_all_by_instance(context, instance_id) + + +def block_device_mapping_destroy(context, bdm_id): + """Destroy the block device mapping.""" + return IMPL.block_device_mapping_destroy(context, bdm_id) + + +def block_device_mapping_destroy_by_instance_and_volume(context, instance_id, + volume_id): + """Destroy the block device mapping or raise if it does not exist.""" + return IMPL.block_device_mapping_destroy_by_instance_and_volume( + context, instance_id, volume_id) + + +#################### + + def security_group_get_all(context): """Get all security groups.""" return IMPL.security_group_get_all(context) diff --git a/nova/db/sqlalchemy/api.py b/nova/db/sqlalchemy/api.py index 7119f43ebb0b..88d8cbd631e4 100644 --- a/nova/db/sqlalchemy/api.py +++ b/nova/db/sqlalchemy/api.py @@ -839,6 +839,25 @@ def instance_destroy(context, instance_id): 'updated_at': literal_column('updated_at')}) +@require_context +def instance_stop(context, instance_id): + session = get_session() + with session.begin(): + from nova.compute import power_state + session.query(models.Instance).\ + filter_by(id=instance_id).\ + update({'host': None, + 'state': power_state.SHUTOFF, + 'state_description': 'stopped', + 'updated_at': literal_column('updated_at')}) + session.query(models.SecurityGroupInstanceAssociation).\ + filter_by(instance_id=instance_id).\ + update({'updated_at': literal_column('updated_at')}) + session.query(models.InstanceMetadata).\ + filter_by(instance_id=instance_id).\ + update({'updated_at': literal_column('updated_at')}) + + @require_context def instance_get(context, instance_id, session=None): if not session: @@ -1882,6 +1901,66 @@ def snapshot_update(context, snapshot_id, values): ################### +@require_context +def block_device_mapping_create(context, values): + bdm_ref = models.BlockDeviceMapping() + bdm_ref.update(values) + + session = get_session() + with session.begin(): + bdm_ref.save(session=session) + + +@require_context +def block_device_mapping_update(context, bdm_id, values): + session = get_session() + with session.begin(): + session.query(models.BlockDeviceMapping).\ + filter_by(id=bdm_id).\ + filter_by(deleted=False).\ + update(values) + + +@require_context +def block_device_mapping_get_all_by_instance(context, instance_id): + session = get_session() + result = session.query(models.BlockDeviceMapping).\ + filter_by(instance_id=instance_id).\ + filter_by(deleted=False).\ + all() + if not result: + return [] + return result + + +@require_context +def block_device_mapping_destroy(context, bdm_id): + session = get_session() + with session.begin(): + session.query(models.BlockDeviceMapping).\ + filter_by(id=bdm_id).\ + update({'deleted': True, + 'deleted_at': utils.utcnow(), + 'updated_at': literal_column('updated_at')}) + + +@require_context +def block_device_mapping_destroy_by_instance_and_volume(context, instance_id, + volume_id): + session = get_session() + with session.begin(): + session.query(models.BlockDeviceMapping).\ + filter_by(instance_id=instance_id).\ + filter_by(volume_id=volume_id).\ + filter_by(deleted=False).\ + update({'deleted': True, + 'deleted_at': utils.utcnow(), + 'updated_at': literal_column('updated_at')}) + + +################### + + @require_context def security_group_get_all(context): session = get_session() diff --git a/nova/db/sqlalchemy/migrate_repo/versions/024_add_block_device_mapping.py b/nova/db/sqlalchemy/migrate_repo/versions/024_add_block_device_mapping.py new file mode 100644 index 000000000000..6e9b806cbdd6 --- /dev/null +++ b/nova/db/sqlalchemy/migrate_repo/versions/024_add_block_device_mapping.py @@ -0,0 +1,87 @@ +# Copyright 2011 OpenStack LLC. +# Copyright 2011 Isaku Yamahata +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from sqlalchemy import MetaData, Table, Column +from sqlalchemy import DateTime, Boolean, Integer, String +from sqlalchemy import ForeignKey +from nova import log as logging + +meta = MetaData() + +# Just for the ForeignKey and column creation to succeed, these are not the +# actual definitions of instances or services. +instances = Table('instances', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +volumes = Table('volumes', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + +snapshots = Table('snapshots', meta, + Column('id', Integer(), primary_key=True, nullable=False), + ) + + +block_device_mapping = Table('block_device_mapping', meta, + Column('created_at', DateTime(timezone=False)), + Column('updated_at', DateTime(timezone=False)), + Column('deleted_at', DateTime(timezone=False)), + Column('deleted', Boolean(create_constraint=True, name=None)), + Column('id', Integer(), primary_key=True, autoincrement=True), + Column('instance_id', + Integer(), + ForeignKey('instances.id'), + nullable=False), + Column('device_name', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=False), + Column('delete_on_termination', + Boolean(create_constraint=True, name=None), + default=False), + Column('virtual_name', + String(length=255, convert_unicode=False, assert_unicode=None, + unicode_error=None, _warn_on_bytestring=False), + nullable=True), + Column('snapshot_id', + Integer(), + ForeignKey('snapshots.id'), + nullable=True), + Column('volume_id', Integer(), ForeignKey('volumes.id'), + nullable=True), + Column('volume_size', Integer(), nullable=True), + Column('no_device', + Boolean(create_constraint=True, name=None), + nullable=True), + ) + + +def upgrade(migrate_engine): + # Upgrade operations go here. Don't create your own engine; + # bind migrate_engine to your metadata + meta.bind = migrate_engine + try: + block_device_mapping.create() + except Exception: + logging.info(repr(block_device_mapping)) + logging.exception('Exception while creating table') + meta.drop_all(tables=[block_device_mapping]) + raise + + +def downgrade(migrate_engine): + # Operations to reverse the above upgrade go here. + block_device_mapping.drop() diff --git a/nova/db/sqlalchemy/models.py b/nova/db/sqlalchemy/models.py index 612ccc93f824..23e1bd11234d 100644 --- a/nova/db/sqlalchemy/models.py +++ b/nova/db/sqlalchemy/models.py @@ -357,6 +357,45 @@ class Snapshot(BASE, NovaBase): display_description = Column(String(255)) +class BlockDeviceMapping(BASE, NovaBase): + """Represents block device mapping that is defined by EC2""" + __tablename__ = "block_device_mapping" + id = Column(Integer, primary_key=True, autoincrement=True) + + instance_id = Column(Integer, ForeignKey('instances.id'), nullable=False) + instance = relationship(Instance, + backref=backref('balock_device_mapping'), + foreign_keys=instance_id, + primaryjoin='and_(BlockDeviceMapping.instance_id==' + 'Instance.id,' + 'BlockDeviceMapping.deleted==' + 'False)') + device_name = Column(String(255), nullable=False) + + # default=False for compatibility of the existing code. + # With EC2 API, + # default True for ami specified device. + # default False for created with other timing. + delete_on_termination = Column(Boolean, default=False) + + # for ephemeral device + virtual_name = Column(String(255), nullable=True) + + # for snapshot or volume + snapshot_id = Column(Integer, ForeignKey('snapshots.id'), nullable=True) + # outer join + snapshot = relationship(Snapshot, + foreign_keys=snapshot_id) + + volume_id = Column(Integer, ForeignKey('volumes.id'), nullable=True) + volume = relationship(Volume, + foreign_keys=volume_id) + volume_size = Column(Integer, nullable=True) + + # for no device to suppress devices. + no_device = Column(Boolean, nullable=True) + + class ExportDevice(BASE, NovaBase): """Represates a shelf and blade that a volume can be exported on.""" __tablename__ = 'export_devices' diff --git a/nova/scheduler/simple.py b/nova/scheduler/simple.py index 87cdef11d3a7..fc1b3142a958 100644 --- a/nova/scheduler/simple.py +++ b/nova/scheduler/simple.py @@ -39,7 +39,7 @@ flags.DEFINE_integer("max_networks", 1000, class SimpleScheduler(chance.ChanceScheduler): """Implements Naive Scheduler that tries to find least loaded host.""" - def schedule_run_instance(self, context, instance_id, *_args, **_kwargs): + def _schedule_instance(self, context, instance_id, *_args, **_kwargs): """Picks a host that is up and has the fewest running instances.""" instance_ref = db.instance_get(context, instance_id) if (instance_ref['availability_zone'] @@ -75,6 +75,12 @@ class SimpleScheduler(chance.ChanceScheduler): " for this request. Is the appropriate" " service running?")) + def schedule_run_instance(self, context, instance_id, *_args, **_kwargs): + return self._schedule_instance(context, instance_id, *_args, **_kwargs) + + def schedule_start_instance(self, context, instance_id, *_args, **_kwargs): + return self._schedule_instance(context, instance_id, *_args, **_kwargs) + def schedule_create_volume(self, context, volume_id, *_args, **_kwargs): """Picks a host that is up and has the fewest volumes.""" volume_ref = db.volume_get(context, volume_id) diff --git a/nova/tests/test_api.py b/nova/tests/test_api.py index 7c0331eff46f..20b20fcbf48d 100644 --- a/nova/tests/test_api.py +++ b/nova/tests/test_api.py @@ -89,7 +89,7 @@ class FakeHttplibConnection(object): class XmlConversionTestCase(test.TestCase): """Unit test api xml conversion""" def test_number_conversion(self): - conv = apirequest._try_convert + conv = ec2utils._try_convert self.assertEqual(conv('None'), None) self.assertEqual(conv('True'), True) self.assertEqual(conv('False'), False) diff --git a/nova/tests/test_cloud.py b/nova/tests/test_cloud.py index cbb6233acfe8..6327734f5d47 100644 --- a/nova/tests/test_cloud.py +++ b/nova/tests/test_cloud.py @@ -56,6 +56,7 @@ class CloudTestCase(test.TestCase): self.compute = self.start_service('compute') self.scheduter = self.start_service('scheduler') self.network = self.start_service('network') + self.volume = self.start_service('volume') self.image_service = utils.import_object(FLAGS.image_service) self.manager = manager.AuthManager() @@ -373,14 +374,21 @@ class CloudTestCase(test.TestCase): self.assertRaises(exception.ImageNotFound, deregister_image, self.context, 'ami-bad001') - def test_console_output(self): - instance_type = FLAGS.default_instance_type - max_count = 1 - kwargs = {'image_id': 'ami-1', - 'instance_type': instance_type, - 'max_count': max_count} + def _run_instance(self, **kwargs): rv = self.cloud.run_instances(self.context, **kwargs) instance_id = rv['instancesSet'][0]['instanceId'] + return instance_id + + def _run_instance_wait(self, **kwargs): + ec2_instance_id = self._run_instance(**kwargs) + self._wait_for_running(ec2_instance_id) + return ec2_instance_id + + def test_console_output(self): + instance_id = self._run_instance( + image_id='ami-1', + instance_type=FLAGS.default_instance_type, + max_count=1) output = self.cloud.get_console_output(context=self.context, instance_id=[instance_id]) self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE?OUTPUT') @@ -389,9 +397,7 @@ class CloudTestCase(test.TestCase): rv = self.cloud.terminate_instances(self.context, [instance_id]) def test_ajax_console(self): - kwargs = {'image_id': 'ami-1'} - rv = self.cloud.run_instances(self.context, **kwargs) - instance_id = rv['instancesSet'][0]['instanceId'] + instance_id = self._run_instance(image_id='ami-1') output = self.cloud.get_ajax_console(context=self.context, instance_id=[instance_id]) self.assertEquals(output['url'], @@ -569,3 +575,299 @@ class CloudTestCase(test.TestCase): vol = db.volume_get(self.context, vol['id']) self.assertEqual(None, vol['mountpoint']) db.volume_destroy(self.context, vol['id']) + + def _restart_compute_service(self, periodic_interval=None): + """restart compute service. NOTE: fake driver forgets all instances.""" + self.compute.kill() + if periodic_interval: + self.compute = self.start_service( + 'compute', periodic_interval=periodic_interval) + else: + self.compute = self.start_service('compute') + + def _wait_for_state(self, ctxt, instance_id, predicate): + """Wait for an stopping instance to be a given state""" + id = ec2utils.ec2_id_to_id(instance_id) + while True: + info = self.cloud.compute_api.get(context=ctxt, instance_id=id) + LOG.debug(info) + if predicate(info): + break + greenthread.sleep(1) + + def _wait_for_running(self, instance_id): + def is_running(info): + return info['state_description'] == 'running' + self._wait_for_state(self.context, instance_id, is_running) + + def _wait_for_stopped(self, instance_id): + def is_stopped(info): + return info['state_description'] == 'stopped' + self._wait_for_state(self.context, instance_id, is_stopped) + + def _wait_for_terminate(self, instance_id): + def is_deleted(info): + return info['deleted'] + elevated = self.context.elevated(read_deleted=True) + self._wait_for_state(elevated, instance_id, is_deleted) + + def test_stop_start_instance(self): + """Makes sure stop/start instance works""" + # enforce periodic tasks run in short time to avoid wait for 60s. + self._restart_compute_service(periodic_interval=0.3) + + kwargs = {'image_id': 'ami-1', + 'instance_type': FLAGS.default_instance_type, + 'max_count': 1, } + instance_id = self._run_instance_wait(**kwargs) + + # a running instance can't be started. It is just ignored. + result = self.cloud.start_instances(self.context, [instance_id]) + greenthread.sleep(0.3) + self.assertTrue(result) + + result = self.cloud.stop_instances(self.context, [instance_id]) + greenthread.sleep(0.3) + self.assertTrue(result) + self._wait_for_stopped(instance_id) + + result = self.cloud.start_instances(self.context, [instance_id]) + greenthread.sleep(0.3) + self.assertTrue(result) + self._wait_for_running(instance_id) + + result = self.cloud.stop_instances(self.context, [instance_id]) + greenthread.sleep(0.3) + self.assertTrue(result) + self._wait_for_stopped(instance_id) + + result = self.cloud.terminate_instances(self.context, [instance_id]) + greenthread.sleep(0.3) + self.assertTrue(result) + + self._restart_compute_service() + + def _volume_create(self): + kwargs = {'status': 'available', + 'host': self.volume.host, + 'size': 1, + 'attach_status': 'detached', } + return db.volume_create(self.context, kwargs) + + def _assert_volume_attached(self, vol, instance_id, mountpoint): + self.assertEqual(vol['instance_id'], instance_id) + self.assertEqual(vol['mountpoint'], mountpoint) + self.assertEqual(vol['status'], "in-use") + self.assertEqual(vol['attach_status'], "attached") + + def _assert_volume_detached(self, vol): + self.assertEqual(vol['instance_id'], None) + self.assertEqual(vol['mountpoint'], None) + self.assertEqual(vol['status'], "available") + self.assertEqual(vol['attach_status'], "detached") + + def test_stop_start_with_volume(self): + """Make sure run instance with block device mapping works""" + + # enforce periodic tasks run in short time to avoid wait for 60s. + self._restart_compute_service(periodic_interval=0.3) + + vol1 = self._volume_create() + vol2 = self._volume_create() + kwargs = {'image_id': 'ami-1', + 'instance_type': FLAGS.default_instance_type, + 'max_count': 1, + 'block_device_mapping': [{'device_name': '/dev/vdb', + 'volume_id': vol1['id'], + 'delete_on_termination': False, }, + {'device_name': '/dev/vdc', + 'volume_id': vol2['id'], + 'delete_on_termination': True, }, + ]} + ec2_instance_id = self._run_instance_wait(**kwargs) + instance_id = ec2utils.ec2_id_to_id(ec2_instance_id) + + vols = db.volume_get_all_by_instance(self.context, instance_id) + self.assertEqual(len(vols), 2) + for vol in vols: + self.assertTrue(vol['id'] == vol1['id'] or vol['id'] == vol2['id']) + + vol = db.volume_get(self.context, vol1['id']) + self._assert_volume_attached(vol, instance_id, '/dev/vdb') + + vol = db.volume_get(self.context, vol2['id']) + self._assert_volume_attached(vol, instance_id, '/dev/vdc') + + result = self.cloud.stop_instances(self.context, [ec2_instance_id]) + self.assertTrue(result) + self._wait_for_stopped(ec2_instance_id) + + vol = db.volume_get(self.context, vol1['id']) + self._assert_volume_detached(vol) + vol = db.volume_get(self.context, vol2['id']) + self._assert_volume_detached(vol) + + self.cloud.start_instances(self.context, [ec2_instance_id]) + self._wait_for_running(ec2_instance_id) + vols = db.volume_get_all_by_instance(self.context, instance_id) + self.assertEqual(len(vols), 2) + for vol in vols: + self.assertTrue(vol['id'] == vol1['id'] or vol['id'] == vol2['id']) + self.assertTrue(vol['mountpoint'] == '/dev/vdb' or + vol['mountpoint'] == '/dev/vdc') + self.assertEqual(vol['instance_id'], instance_id) + self.assertEqual(vol['status'], "in-use") + self.assertEqual(vol['attach_status'], "attached") + + self.cloud.terminate_instances(self.context, [ec2_instance_id]) + greenthread.sleep(0.3) + + admin_ctxt = context.get_admin_context(read_deleted=False) + vol = db.volume_get(admin_ctxt, vol1['id']) + self.assertFalse(vol['deleted']) + db.volume_destroy(self.context, vol1['id']) + + greenthread.sleep(0.3) + admin_ctxt = context.get_admin_context(read_deleted=True) + vol = db.volume_get(admin_ctxt, vol2['id']) + self.assertTrue(vol['deleted']) + + self._restart_compute_service() + + def test_stop_with_attached_volume(self): + """Make sure attach info is reflected to block device mapping""" + # enforce periodic tasks run in short time to avoid wait for 60s. + self._restart_compute_service(periodic_interval=0.3) + + vol1 = self._volume_create() + vol2 = self._volume_create() + kwargs = {'image_id': 'ami-1', + 'instance_type': FLAGS.default_instance_type, + 'max_count': 1, + 'block_device_mapping': [{'device_name': '/dev/vdb', + 'volume_id': vol1['id'], + 'delete_on_termination': True}]} + ec2_instance_id = self._run_instance_wait(**kwargs) + instance_id = ec2utils.ec2_id_to_id(ec2_instance_id) + + vols = db.volume_get_all_by_instance(self.context, instance_id) + self.assertEqual(len(vols), 1) + for vol in vols: + self.assertEqual(vol['id'], vol1['id']) + self._assert_volume_attached(vol, instance_id, '/dev/vdb') + + vol = db.volume_get(self.context, vol2['id']) + self._assert_volume_detached(vol) + + self.cloud.compute_api.attach_volume(self.context, + instance_id=instance_id, + volume_id=vol2['id'], + device='/dev/vdc') + greenthread.sleep(0.3) + vol = db.volume_get(self.context, vol2['id']) + self._assert_volume_attached(vol, instance_id, '/dev/vdc') + + self.cloud.compute_api.detach_volume(self.context, + volume_id=vol1['id']) + greenthread.sleep(0.3) + vol = db.volume_get(self.context, vol1['id']) + self._assert_volume_detached(vol) + + result = self.cloud.stop_instances(self.context, [ec2_instance_id]) + self.assertTrue(result) + self._wait_for_stopped(ec2_instance_id) + + for vol_id in (vol1['id'], vol2['id']): + vol = db.volume_get(self.context, vol_id) + self._assert_volume_detached(vol) + + self.cloud.start_instances(self.context, [ec2_instance_id]) + self._wait_for_running(ec2_instance_id) + vols = db.volume_get_all_by_instance(self.context, instance_id) + self.assertEqual(len(vols), 1) + for vol in vols: + self.assertEqual(vol['id'], vol2['id']) + self._assert_volume_attached(vol, instance_id, '/dev/vdc') + + vol = db.volume_get(self.context, vol1['id']) + self._assert_volume_detached(vol) + + self.cloud.terminate_instances(self.context, [ec2_instance_id]) + greenthread.sleep(0.3) + + for vol_id in (vol1['id'], vol2['id']): + vol = db.volume_get(self.context, vol_id) + self.assertEqual(vol['id'], vol_id) + self._assert_volume_detached(vol) + db.volume_destroy(self.context, vol_id) + + self._restart_compute_service() + + def _create_snapshot(self, ec2_volume_id): + result = self.cloud.create_snapshot(self.context, + volume_id=ec2_volume_id) + greenthread.sleep(0.3) + return result['snapshotId'] + + def test_run_with_snapshot(self): + """Makes sure run/stop/start instance with snapshot works.""" + vol = self._volume_create() + ec2_volume_id = ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x') + + ec2_snapshot1_id = self._create_snapshot(ec2_volume_id) + snapshot1_id = ec2utils.ec2_id_to_id(ec2_snapshot1_id) + ec2_snapshot2_id = self._create_snapshot(ec2_volume_id) + snapshot2_id = ec2utils.ec2_id_to_id(ec2_snapshot2_id) + + kwargs = {'image_id': 'ami-1', + 'instance_type': FLAGS.default_instance_type, + 'max_count': 1, + 'block_device_mapping': [{'device_name': '/dev/vdb', + 'snapshot_id': snapshot1_id, + 'delete_on_termination': False, }, + {'device_name': '/dev/vdc', + 'snapshot_id': snapshot2_id, + 'delete_on_termination': True}]} + ec2_instance_id = self._run_instance_wait(**kwargs) + instance_id = ec2utils.ec2_id_to_id(ec2_instance_id) + + vols = db.volume_get_all_by_instance(self.context, instance_id) + self.assertEqual(len(vols), 2) + vol1_id = None + vol2_id = None + for vol in vols: + snapshot_id = vol['snapshot_id'] + if snapshot_id == snapshot1_id: + vol1_id = vol['id'] + mountpoint = '/dev/vdb' + elif snapshot_id == snapshot2_id: + vol2_id = vol['id'] + mountpoint = '/dev/vdc' + else: + self.fail() + + self._assert_volume_attached(vol, instance_id, mountpoint) + + self.assertTrue(vol1_id) + self.assertTrue(vol2_id) + + self.cloud.terminate_instances(self.context, [ec2_instance_id]) + greenthread.sleep(0.3) + self._wait_for_terminate(ec2_instance_id) + + greenthread.sleep(0.3) + admin_ctxt = context.get_admin_context(read_deleted=False) + vol = db.volume_get(admin_ctxt, vol1_id) + self._assert_volume_detached(vol) + self.assertFalse(vol['deleted']) + db.volume_destroy(self.context, vol1_id) + + greenthread.sleep(0.3) + admin_ctxt = context.get_admin_context(read_deleted=True) + vol = db.volume_get(admin_ctxt, vol2_id) + self.assertTrue(vol['deleted']) + + for snapshot_id in (ec2_snapshot1_id, ec2_snapshot2_id): + self.cloud.delete_snapshot(self.context, snapshot_id) + greenthread.sleep(0.3) + db.volume_destroy(self.context, vol['id']) diff --git a/nova/tests/test_compute.py b/nova/tests/test_compute.py index 2a68df2fc3d5..2fa4c7278289 100644 --- a/nova/tests/test_compute.py +++ b/nova/tests/test_compute.py @@ -228,6 +228,21 @@ class ComputeTestCase(test.TestCase): self.assert_(instance_ref['launched_at'] < terminate) self.assert_(instance_ref['deleted_at'] > terminate) + def test_stop(self): + """Ensure instance can be stopped""" + instance_id = self._create_instance() + self.compute.run_instance(self.context, instance_id) + self.compute.stop_instance(self.context, instance_id) + self.compute.terminate_instance(self.context, instance_id) + + def test_start(self): + """Ensure instance can be started""" + instance_id = self._create_instance() + self.compute.run_instance(self.context, instance_id) + self.compute.stop_instance(self.context, instance_id) + self.compute.start_instance(self.context, instance_id) + self.compute.terminate_instance(self.context, instance_id) + def test_pause(self): """Ensure instance can be paused""" instance_id = self._create_instance() diff --git a/nova/virt/driver.py b/nova/virt/driver.py index eb9626d0879e..6341e81d2067 100644 --- a/nova/virt/driver.py +++ b/nova/virt/driver.py @@ -61,7 +61,7 @@ class ComputeDriver(object): """Return a list of InstanceInfo for all registered VMs""" raise NotImplementedError() - def spawn(self, instance, network_info=None): + def spawn(self, instance, network_info=None, block_device_mapping=None): """Launch a VM for the specified instance""" raise NotImplementedError() diff --git a/nova/virt/fake.py b/nova/virt/fake.py index 0225797d7dac..3a65fec8b22f 100644 --- a/nova/virt/fake.py +++ b/nova/virt/fake.py @@ -129,7 +129,7 @@ class FakeConnection(driver.ComputeDriver): info_list.append(self._map_to_instance_info(instance)) return info_list - def spawn(self, instance): + def spawn(self, instance, network_info=None, block_device_mapping=None): """ Create a new instance/VM/domain on the virtualization platform. @@ -237,6 +237,10 @@ class FakeConnection(driver.ComputeDriver): """ pass + def poll_rescued_instances(self, timeout): + """Poll for rescued instances""" + pass + def migrate_disk_and_power_off(self, instance, dest): """ Transfers the disk of a running instance in multiple phases, turning diff --git a/nova/virt/hyperv.py b/nova/virt/hyperv.py index 05b4775c124f..772e7eb59d72 100644 --- a/nova/virt/hyperv.py +++ b/nova/virt/hyperv.py @@ -139,7 +139,7 @@ class HyperVConnection(driver.ComputeDriver): return instance_infos - def spawn(self, instance): + def spawn(self, instance, network_info=None, block_device_mapping=None): """ Create a new VM and start it.""" vm = self._lookup(instance.name) if vm is not None: diff --git a/nova/virt/libvirt.xml.template b/nova/virt/libvirt.xml.template index 20986d4d5894..e1a683da8804 100644 --- a/nova/virt/libvirt.xml.template +++ b/nova/virt/libvirt.xml.template @@ -67,11 +67,13 @@ #else + #if not ($getVar('ebs_root', False)) + #end if #if $getVar('local', False) @@ -79,6 +81,13 @@ #end if + #for $vol in $volumes + + + + + + #end for #end if #end if diff --git a/nova/virt/libvirt/connection.py b/nova/virt/libvirt/connection.py index 98cdff3114d1..96ef92825785 100644 --- a/nova/virt/libvirt/connection.py +++ b/nova/virt/libvirt/connection.py @@ -40,6 +40,7 @@ import hashlib import multiprocessing import os import random +import re import shutil import subprocess import sys @@ -148,6 +149,10 @@ def _late_load_cheetah(): Template = t.Template +def _strip_dev(mount_path): + return re.sub(r'^/dev/', '', mount_path) + + class LibvirtConnection(driver.ComputeDriver): def __init__(self, read_only): @@ -575,11 +580,14 @@ class LibvirtConnection(driver.ComputeDriver): # NOTE(ilyaalekseyev): Implementation like in multinics # for xenapi(tr3buchet) @exception.wrap_exception - def spawn(self, instance, network_info=None): - xml = self.to_xml(instance, False, network_info) + def spawn(self, instance, network_info=None, block_device_mapping=None): + xml = self.to_xml(instance, False, network_info=network_info, + block_device_mapping=block_device_mapping) + block_device_mapping = block_device_mapping or [] self.firewall_driver.setup_basic_filtering(instance, network_info) self.firewall_driver.prepare_instance_filter(instance, network_info) - self._create_image(instance, xml, network_info=network_info) + self._create_image(instance, xml, network_info=network_info, + block_device_mapping=block_device_mapping) domain = self._create_new_domain(xml) LOG.debug(_("instance %s: is running"), instance['name']) self.firewall_driver.apply_instance_filter(instance) @@ -761,7 +769,8 @@ class LibvirtConnection(driver.ComputeDriver): # TODO(vish): should we format disk by default? def _create_image(self, inst, libvirt_xml, suffix='', disk_images=None, - network_info=None): + network_info=None, block_device_mapping=None): + block_device_mapping = block_device_mapping or [] if not network_info: network_info = netutils.get_network_info(inst) @@ -824,16 +833,19 @@ class LibvirtConnection(driver.ComputeDriver): size = None root_fname += "_sm" - self._cache_image(fn=self._fetch_image, - target=basepath('disk'), - fname=root_fname, - cow=FLAGS.use_cow_images, - image_id=disk_images['image_id'], - user=user, - project=project, - size=size) + if not self._volume_in_mapping(self.root_mount_device, + block_device_mapping): + self._cache_image(fn=self._fetch_image, + target=basepath('disk'), + fname=root_fname, + cow=FLAGS.use_cow_images, + image_id=disk_images['image_id'], + user=user, + project=project, + size=size) - if inst_type['local_gb']: + if inst_type['local_gb'] and not self._volume_in_mapping( + self.local_mount_device, block_device_mapping): self._cache_image(fn=self._create_local, target=basepath('disk.local'), fname="local_%s" % inst_type['local_gb'], @@ -948,7 +960,20 @@ class LibvirtConnection(driver.ComputeDriver): return result - def _prepare_xml_info(self, instance, rescue=False, network_info=None): + root_mount_device = 'vda' # FIXME for now. it's hard coded. + local_mount_device = 'vdb' # FIXME for now. it's hard coded. + + def _volume_in_mapping(self, mount_device, block_device_mapping): + mount_device_ = _strip_dev(mount_device) + for vol in block_device_mapping: + vol_mount_device = _strip_dev(vol['mount_device']) + if vol_mount_device == mount_device_: + return True + return False + + def _prepare_xml_info(self, instance, rescue=False, network_info=None, + block_device_mapping=None): + block_device_mapping = block_device_mapping or [] # TODO(adiantum) remove network_info creation code # when multinics will be completed if not network_info: @@ -966,6 +991,16 @@ class LibvirtConnection(driver.ComputeDriver): else: driver_type = 'raw' + for vol in block_device_mapping: + vol['mount_device'] = _strip_dev(vol['mount_device']) + ebs_root = self._volume_in_mapping(self.root_mount_device, + block_device_mapping) + if self._volume_in_mapping(self.local_mount_device, + block_device_mapping): + local_gb = False + else: + local_gb = inst_type['local_gb'] + xml_info = {'type': FLAGS.libvirt_type, 'name': instance['name'], 'basepath': os.path.join(FLAGS.instances_path, @@ -973,9 +1008,11 @@ class LibvirtConnection(driver.ComputeDriver): 'memory_kb': inst_type['memory_mb'] * 1024, 'vcpus': inst_type['vcpus'], 'rescue': rescue, - 'local': inst_type['local_gb'], + 'local': local_gb, 'driver_type': driver_type, - 'nics': nics} + 'nics': nics, + 'ebs_root': ebs_root, + 'volumes': block_device_mapping} if FLAGS.vnc_enabled: if FLAGS.libvirt_type != 'lxc': @@ -991,10 +1028,13 @@ class LibvirtConnection(driver.ComputeDriver): xml_info['disk'] = xml_info['basepath'] + "/disk" return xml_info - def to_xml(self, instance, rescue=False, network_info=None): + def to_xml(self, instance, rescue=False, network_info=None, + block_device_mapping=None): + block_device_mapping = block_device_mapping or [] # TODO(termie): cache? LOG.debug(_('instance %s: starting toXML method'), instance['name']) - xml_info = self._prepare_xml_info(instance, rescue, network_info) + xml_info = self._prepare_xml_info(instance, rescue, network_info, + block_device_mapping) xml = str(Template(self.libvirt_xml, searchList=[xml_info])) LOG.debug(_('instance %s: finished toXML method'), instance['name']) return xml diff --git a/nova/virt/vmwareapi_conn.py b/nova/virt/vmwareapi_conn.py index 1c6d2572db9c..3c6345ec8508 100644 --- a/nova/virt/vmwareapi_conn.py +++ b/nova/virt/vmwareapi_conn.py @@ -124,7 +124,7 @@ class VMWareESXConnection(driver.ComputeDriver): """List VM instances.""" return self._vmops.list_instances() - def spawn(self, instance): + def spawn(self, instance, network_info=None, block_device_mapping=None): """Create VM instance.""" self._vmops.spawn(instance) diff --git a/nova/virt/xenapi_conn.py b/nova/virt/xenapi_conn.py index 6d828e10997c..5fcec1715c70 100644 --- a/nova/virt/xenapi_conn.py +++ b/nova/virt/xenapi_conn.py @@ -194,7 +194,7 @@ class XenAPIConnection(driver.ComputeDriver): def list_instances_detail(self): return self._vmops.list_instances_detail() - def spawn(self, instance): + def spawn(self, instance, network_info=None, block_device_mapping=None): """Create VM instance""" self._vmops.spawn(instance) diff --git a/nova/volume/api.py b/nova/volume/api.py index b07f2e94b8bb..7d27abff9919 100644 --- a/nova/volume/api.py +++ b/nova/volume/api.py @@ -21,6 +21,9 @@ Handles all requests relating to volumes. """ +from eventlet import greenthread + +from nova import db from nova import exception from nova import flags from nova import log as logging @@ -44,7 +47,8 @@ class API(base.Base): if snapshot['status'] != "available": raise exception.ApiError( _("Snapshot status must be available")) - size = snapshot['volume_size'] + if not size: + size = snapshot['volume_size'] if quota.allowed_volumes(context, 1, size) < 1: pid = context.project_id @@ -73,6 +77,14 @@ class API(base.Base): "snapshot_id": snapshot_id}}) return volume + # TODO(yamahata): eliminate dumb polling + def wait_creation(self, context, volume_id): + while True: + volume = self.get(context, volume_id) + if volume['status'] != 'creating': + return + greenthread.sleep(1) + def delete(self, context, volume_id): volume = self.get(context, volume_id) if volume['status'] != "available": diff --git a/nova/volume/driver.py b/nova/volume/driver.py index 87e13277fc43..23e845deb324 100644 --- a/nova/volume/driver.py +++ b/nova/volume/driver.py @@ -582,6 +582,14 @@ class FakeISCSIDriver(ISCSIDriver): """No setup necessary in fake mode.""" pass + def discover_volume(self, context, volume): + """Discover volume on a remote host.""" + return "/dev/disk/by-path/volume-id-%d" % volume['id'] + + def undiscover_volume(self, volume): + """Undiscover volume on a remote host.""" + pass + @staticmethod def fake_execute(cmd, *_args, **_kwargs): """Execute that simply logs the command."""