Merged with trunk

This commit is contained in:
Anne Gentle 2011-01-07 10:57:53 -06:00
commit 5d02ee9e5f
38 changed files with 1709 additions and 552 deletions

View File

@ -30,3 +30,4 @@
<rconradharris@gmail.com> <rick.harris@rackspace.com>
<corywright@gmail.com> <cory.wright@rackspace.com>
<ant@openstack.org> <amesserl@rackspace.com>
<chiradeep@cloud.com> <chiradeep@chiradeep-lt2>

View File

@ -3,6 +3,7 @@ Anne Gentle <anne@openstack.org>
Anthony Young <sleepsonthefloor@gmail.com>
Antony Messerli <ant@openstack.org>
Armando Migliaccio <Armando.Migliaccio@eu.citrix.com>
Chiradeep Vittal <chiradeep@cloud.com>
Chris Behrens <cbehrens@codestud.com>
Chmouel Boudjnah <chmouel@chmouel.com>
Cory Wright <corywright@gmail.com>

View File

@ -35,7 +35,8 @@ Contributing Code
To contribute code, sign up for a Launchpad account and sign a contributor license agreement,
available on the `OpenStack Wiki <http://wiki.openstack.org/CLA>`_. Once the CLA is signed you
can contribute code through the Bazaar version control system which is related to your Launchpad account.
can contribute code through the Bazaar version control system which is related to your Launchpad
account. See the :doc:`devref/development.environment` page to get started.
#openstack on Freenode IRC Network
----------------------------------

View File

@ -88,7 +88,12 @@ Here's how to get the latest code::
source .nova_venv/bin/activate
./run_tests.sh
And then you can do cleaning work or hack hack hack with a branched named cleaning::
Then you can do cleaning work or hack hack hack with a branched named cleaning.
Contributing Your Work
----------------------
Once your work is complete you may wish to contribute it to the project. Add your name and email address to the `Authors` file, and also to the `.mailmap` file if you use multiple email addresses. Your contributions can not be merged into trunk unless you are listed in the Authors file. Now, push the branch to Launchpad::
bzr push lp:~launchpaduserid/nova/cleaning

View File

@ -23,12 +23,9 @@ import base64
import boto
import httplib
from nova import flags
from boto.ec2.regioninfo import RegionInfo
FLAGS = flags.FLAGS
DEFAULT_CLC_URL = 'http://127.0.0.1:8773'
DEFAULT_REGION = 'nova'
@ -199,8 +196,8 @@ class NovaAdminClient(object):
self,
clc_url=DEFAULT_CLC_URL,
region=DEFAULT_REGION,
access_key=FLAGS.aws_access_key_id,
secret_key=FLAGS.aws_secret_access_key,
access_key=None,
secret_key=None,
**kwargs):
parts = self.split_clc_url(clc_url)

View File

@ -31,19 +31,19 @@ import os
from nova import context
import IPy
from nova import compute
from nova import crypto
from nova import db
from nova import exception
from nova import flags
from nova import quota
from nova import network
from nova import rpc
from nova import utils
from nova.compute import api as compute_api
from nova import volume
from nova.compute import instance_types
FLAGS = flags.FLAGS
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
InvalidInputException = exception.InvalidInputException
@ -71,16 +71,16 @@ def _gen_key(context, user_id, key_name):
return {'private_key': private_key, 'fingerprint': fingerprint}
def ec2_id_to_internal_id(ec2_id):
"""Convert an ec2 ID (i-[base 36 number]) to an internal id (int)"""
def ec2_id_to_id(ec2_id):
"""Convert an ec2 ID (i-[base 36 number]) to an instance id (int)"""
return int(ec2_id[2:], 36)
def internal_id_to_ec2_id(internal_id):
"""Convert an internal ID (int) to an ec2 ID (i-[base 36 number])"""
def id_to_ec2_id(instance_id):
"""Convert an instance ID (int) to an ec2 ID (i-[base 36 number])"""
digits = []
while internal_id != 0:
internal_id, remainder = divmod(internal_id, 36)
while instance_id != 0:
instance_id, remainder = divmod(instance_id, 36)
digits.append('0123456789abcdefghijklmnopqrstuvwxyz'[remainder])
return "i-%s" % ''.join(reversed(digits))
@ -91,10 +91,11 @@ class CloudController(object):
sent to the other nodes.
"""
def __init__(self):
self.network_manager = utils.import_object(FLAGS.network_manager)
self.image_service = utils.import_object(FLAGS.image_service)
self.compute_api = compute_api.ComputeAPI(self.network_manager,
self.image_service)
self.network_api = network.API()
self.volume_api = volume.API()
self.compute_api = compute.API(self.image_service, self.network_api,
self.volume_api)
self.setup()
def __str__(self):
@ -118,7 +119,8 @@ class CloudController(object):
def _get_mpi_data(self, context, project_id):
result = {}
for instance in self.compute_api.get_instances(context, project_id):
for instance in self.compute_api.get_all(context,
project_id=project_id):
if instance['fixed_ip']:
line = '%s slots=%d' % (instance['fixed_ip']['address'],
instance['vcpus'])
@ -140,7 +142,7 @@ class CloudController(object):
def get_metadata(self, address):
ctxt = context.get_admin_context()
instance_ref = db.fixed_ip_get_instance(ctxt, address)
instance_ref = self.compute_api.get_all(ctxt, fixed_ip=address)
if instance_ref is None:
return None
mpi = self._get_mpi_data(ctxt, instance_ref['project_id'])
@ -152,7 +154,7 @@ class CloudController(object):
hostname = instance_ref['hostname']
floating_ip = db.instance_get_floating_address(ctxt,
instance_ref['id'])
ec2_id = internal_id_to_ec2_id(instance_ref['internal_id'])
ec2_id = id_to_ec2_id(instance_ref['id'])
data = {
'user-data': base64.b64decode(instance_ref['user_data']),
'meta-data': {
@ -478,8 +480,8 @@ class CloudController(object):
def get_console_output(self, context, instance_id, **kwargs):
# instance_id is passed in as a list of instances
ec2_id = instance_id[0]
internal_id = ec2_id_to_internal_id(ec2_id)
instance_ref = self.compute_api.get_instance(context, internal_id)
instance_id = ec2_id_to_id(ec2_id)
instance_ref = self.compute_api.get(context, instance_id)
output = rpc.call(context,
'%s.%s' % (FLAGS.compute_topic,
instance_ref['host']),
@ -492,27 +494,22 @@ class CloudController(object):
"output": base64.b64encode(output)}
def describe_volumes(self, context, volume_id=None, **kwargs):
if context.user.is_admin():
volumes = db.volume_get_all(context)
else:
volumes = db.volume_get_all_by_project(context, context.project_id)
volumes = self.volume_api.get_all(context)
# NOTE(vish): volume_id is an optional list of volume ids to filter by.
volumes = [self._format_volume(context, v) for v in volumes
if volume_id is None or v['ec2_id'] in volume_id]
if volume_id is None or v['id'] in volume_id]
return {'volumeSet': volumes}
def _format_volume(self, context, volume):
instance_ec2_id = None
instance_data = None
if volume.get('instance', None):
internal_id = volume['instance']['internal_id']
instance_ec2_id = internal_id_to_ec2_id(internal_id)
instance_id = volume['instance']['id']
instance_ec2_id = id_to_ec2_id(instance_id)
instance_data = '%s[%s]' % (instance_ec2_id,
volume['instance']['host'])
v = {}
v['volumeId'] = volume['ec2_id']
v['volumeId'] = volume['id']
v['status'] = volume['status']
v['size'] = volume['size']
v['availabilityZone'] = volume['availability_zone']
@ -539,95 +536,17 @@ class CloudController(object):
return v
def create_volume(self, context, size, **kwargs):
# check quota
if quota.allowed_volumes(context, 1, size) < 1:
logging.warn("Quota exceeeded for %s, tried to create %sG volume",
context.project_id, size)
raise quota.QuotaError("Volume quota exceeded. You cannot "
"create a volume of size %s" % size)
vol = {}
vol['size'] = size
vol['user_id'] = context.user.id
vol['project_id'] = context.project_id
vol['availability_zone'] = FLAGS.storage_availability_zone
vol['status'] = "creating"
vol['attach_status'] = "detached"
vol['display_name'] = kwargs.get('display_name')
vol['display_description'] = kwargs.get('display_description')
volume_ref = db.volume_create(context, vol)
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "create_volume",
"args": {"topic": FLAGS.volume_topic,
"volume_id": volume_ref['id']}})
volume = self.volume_api.create(context, size,
kwargs.get('display_name'),
kwargs.get('display_description'))
# TODO(vish): Instance should be None at db layer instead of
# trying to lazy load, but for now we turn it into
# a dict to avoid an error.
return {'volumeSet': [self._format_volume(context, dict(volume_ref))]}
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
volume_ref = db.volume_get_by_ec2_id(context, volume_id)
if not re.match("^/dev/[a-z]d[a-z]+$", device):
raise exception.ApiError(_("Invalid device specified: %s. "
"Example device: /dev/vdb") % device)
# TODO(vish): abstract status checking?
if volume_ref['status'] != "available":
raise exception.ApiError(_("Volume status must be available"))
if volume_ref['attach_status'] == "attached":
raise exception.ApiError(_("Volume is already attached"))
internal_id = ec2_id_to_internal_id(instance_id)
instance_ref = self.compute_api.get_instance(context, internal_id)
host = instance_ref['host']
rpc.cast(context,
db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "attach_volume",
"args": {"volume_id": volume_ref['id'],
"instance_id": instance_ref['id'],
"mountpoint": device}})
return {'attachTime': volume_ref['attach_time'],
'device': volume_ref['mountpoint'],
'instanceId': instance_ref['id'],
'requestId': context.request_id,
'status': volume_ref['attach_status'],
'volumeId': volume_ref['id']}
def detach_volume(self, context, volume_id, **kwargs):
volume_ref = db.volume_get_by_ec2_id(context, volume_id)
instance_ref = db.volume_get_instance(context.elevated(),
volume_ref['id'])
if not instance_ref:
raise exception.ApiError(_("Volume isn't attached to anything!"))
# TODO(vish): abstract status checking?
if volume_ref['status'] == "available":
raise exception.ApiError(_("Volume is already detached"))
try:
host = instance_ref['host']
rpc.cast(context,
db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "detach_volume",
"args": {"instance_id": instance_ref['id'],
"volume_id": volume_ref['id']}})
except exception.NotFound:
# If the instance doesn't exist anymore,
# then we need to call detach blind
db.volume_detached(context)
internal_id = instance_ref['internal_id']
ec2_id = internal_id_to_ec2_id(internal_id)
return {'attachTime': volume_ref['attach_time'],
'device': volume_ref['mountpoint'],
'instanceId': internal_id,
'requestId': context.request_id,
'status': volume_ref['attach_status'],
'volumeId': volume_ref['id']}
def _convert_to_set(self, lst, label):
if lst == None or lst == []:
return None
if not isinstance(lst, list):
lst = [lst]
return [{label: x} for x in lst]
def delete_volume(self, context, volume_id, **kwargs):
self.volume_api.delete(context, volume_id)
return True
def update_volume(self, context, volume_id, **kwargs):
updatable_fields = ['display_name', 'display_description']
@ -636,9 +555,36 @@ class CloudController(object):
if field in kwargs:
changes[field] = kwargs[field]
if changes:
db.volume_update(context, volume_id, kwargs)
self.volume_api.update(context, volume_id, kwargs)
return True
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
self.compute_api.attach_volume(context, instance_id, volume_id, device)
volume = self.volume_api.get(context, volume_id)
return {'attachTime': volume['attach_time'],
'device': volume['mountpoint'],
'instanceId': instance_id,
'requestId': context.request_id,
'status': volume['attach_status'],
'volumeId': volume_id}
def detach_volume(self, context, volume_id, **kwargs):
volume = self.volume_api.get(context, volume_id)
instance = self.compute_api.detach_volume(context, volume_id)
return {'attachTime': volume['attach_time'],
'device': volume['mountpoint'],
'instanceId': id_to_ec2_id(instance['id']),
'requestId': context.request_id,
'status': volume['attach_status'],
'volumeId': volume_id}
def _convert_to_set(self, lst, label):
if lst == None or lst == []:
return None
if not isinstance(lst, list):
lst = [lst]
return [{label: x} for x in lst]
def describe_instances(self, context, **kwargs):
return self._format_describe_instances(context)
@ -646,24 +592,20 @@ class CloudController(object):
return {'reservationSet': self._format_instances(context)}
def _format_run_instances(self, context, reservation_id):
i = self._format_instances(context, reservation_id)
i = self._format_instances(context, reservation_id=reservation_id)
assert len(i) == 1
return i[0]
def _format_instances(self, context, reservation_id=None):
def _format_instances(self, context, **kwargs):
reservations = {}
if reservation_id:
instances = db.instance_get_all_by_reservation(context,
reservation_id)
else:
instances = self.compute_api.get_instances(context)
instances = self.compute_api.get_all(context, **kwargs)
for instance in instances:
if not context.user.is_admin():
if instance['image_id'] == FLAGS.vpn_image_id:
continue
i = {}
internal_id = instance['internal_id']
ec2_id = internal_id_to_ec2_id(internal_id)
instance_id = instance['id']
ec2_id = id_to_ec2_id(instance_id)
i['instanceId'] = ec2_id
i['imageId'] = instance['image_id']
i['instanceState'] = {
@ -716,8 +658,8 @@ class CloudController(object):
ec2_id = None
if (floating_ip_ref['fixed_ip']
and floating_ip_ref['fixed_ip']['instance']):
internal_id = floating_ip_ref['fixed_ip']['instance']['ec2_id']
ec2_id = internal_id_to_ec2_id(internal_id)
instance_id = floating_ip_ref['fixed_ip']['instance']['ec2_id']
ec2_id = id_to_ec2_id(instance_id)
address_rv = {'public_ip': address,
'instance_id': ec2_id}
if context.user.is_admin():
@ -728,69 +670,25 @@ class CloudController(object):
return {'addressesSet': addresses}
def allocate_address(self, context, **kwargs):
# check quota
if quota.allowed_floating_ips(context, 1) < 1:
logging.warn(_("Quota exceeeded for %s, tried to allocate "
"address"),
context.project_id)
raise quota.QuotaError(_("Address quota exceeded. You cannot "
"allocate any more addresses"))
# NOTE(vish): We don't know which network host should get the ip
# when we allocate, so just send it to any one. This
# will probably need to move into a network supervisor
# at some point.
public_ip = rpc.call(context,
FLAGS.network_topic,
{"method": "allocate_floating_ip",
"args": {"project_id": context.project_id}})
public_ip = self.network_api.allocate_floating_ip(context)
return {'addressSet': [{'publicIp': public_ip}]}
def release_address(self, context, public_ip, **kwargs):
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
# NOTE(vish): We don't know which network host should get the ip
# when we deallocate, so just send it to any one. This
# will probably need to move into a network supervisor
# at some point.
rpc.cast(context,
FLAGS.network_topic,
{"method": "deallocate_floating_ip",
"args": {"floating_address": floating_ip_ref['address']}})
self.network_api.release_floating_ip(context, public_ip)
return {'releaseResponse': ["Address released."]}
def associate_address(self, context, instance_id, public_ip, **kwargs):
internal_id = ec2_id_to_internal_id(instance_id)
instance_ref = self.compute_api.get_instance(context, internal_id)
fixed_address = db.instance_get_fixed_address(context,
instance_ref['id'])
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
# NOTE(vish): Perhaps we should just pass this on to compute and
# let compute communicate with network.
network_topic = self.compute_api.get_network_topic(context,
internal_id)
rpc.cast(context,
network_topic,
{"method": "associate_floating_ip",
"args": {"floating_address": floating_ip_ref['address'],
"fixed_address": fixed_address}})
instance_id = ec2_id_to_id(instance_id)
self.compute_api.associate_floating_ip(context, instance_id, public_ip)
return {'associateResponse': ["Address associated."]}
def disassociate_address(self, context, public_ip, **kwargs):
floating_ip_ref = db.floating_ip_get_by_address(context, public_ip)
# NOTE(vish): Get the topic from the host name of the network of
# the associated fixed ip.
if not floating_ip_ref.get('fixed_ip'):
raise exception.ApiError('Address is not associated.')
host = floating_ip_ref['fixed_ip']['network']['host']
topic = db.queue_get_for(context, FLAGS.network_topic, host)
rpc.cast(context,
topic,
{"method": "disassociate_floating_ip",
"args": {"floating_address": floating_ip_ref['address']}})
self.network_api.disassociate_floating_ip(context, public_ip)
return {'disassociateResponse': ["Address disassociated."]}
def run_instances(self, context, **kwargs):
max_count = int(kwargs.get('max_count', 1))
instances = self.compute_api.create_instances(context,
instances = self.compute_api.create(context,
instance_types.get_by_type(kwargs.get('instance_type', None)),
kwargs['image_id'],
min_count=int(kwargs.get('min_count', max_count)),
@ -798,13 +696,13 @@ class CloudController(object):
kernel_id=kwargs.get('kernel_id', None),
ramdisk_id=kwargs.get('ramdisk_id'),
display_name=kwargs.get('display_name'),
description=kwargs.get('display_description'),
display_description=kwargs.get('display_description'),
key_name=kwargs.get('key_name'),
user_data=kwargs.get('user_data'),
security_group=kwargs.get('security_group'),
availability_zone=kwargs.get('placement', {}).get(
'AvailabilityZone'),
generate_hostname=internal_id_to_ec2_id)
generate_hostname=id_to_ec2_id)
return self._format_run_instances(context,
instances[0]['reservation_id'])
@ -813,27 +711,27 @@ class CloudController(object):
instance_id is a kwarg so its name cannot be modified."""
logging.debug("Going to start terminating instances")
for ec2_id in instance_id:
internal_id = ec2_id_to_internal_id(ec2_id)
self.compute_api.delete_instance(context, internal_id)
instance_id = ec2_id_to_id(ec2_id)
self.compute_api.delete(context, instance_id)
return True
def reboot_instances(self, context, instance_id, **kwargs):
"""instance_id is a list of instance ids"""
for ec2_id in instance_id:
internal_id = ec2_id_to_internal_id(ec2_id)
self.compute_api.reboot(context, internal_id)
instance_id = ec2_id_to_id(ec2_id)
self.compute_api.reboot(context, instance_id)
return True
def rescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
internal_id = ec2_id_to_internal_id(instance_id)
self.compute_api.rescue(context, internal_id)
instance_id = ec2_id_to_id(instance_id)
self.compute_api.rescue(context, instance_id)
return True
def unrescue_instance(self, context, instance_id, **kwargs):
"""This is an extension to the normal ec2_api"""
internal_id = ec2_id_to_internal_id(instance_id)
self.compute_api.unrescue(context, internal_id)
instance_id = ec2_id_to_id(instance_id)
self.compute_api.unrescue(context, instance_id)
return True
def update_instance(self, context, ec2_id, **kwargs):
@ -843,24 +741,8 @@ class CloudController(object):
if field in kwargs:
changes[field] = kwargs[field]
if changes:
internal_id = ec2_id_to_internal_id(ec2_id)
inst = self.compute_api.get_instance(context, internal_id)
db.instance_update(context, inst['id'], kwargs)
return True
def delete_volume(self, context, volume_id, **kwargs):
# TODO: return error if not authorized
volume_ref = db.volume_get_by_ec2_id(context, volume_id)
if volume_ref['status'] != "available":
raise exception.ApiError(_("Volume status must be available"))
now = datetime.datetime.utcnow()
db.volume_update(context, volume_ref['id'], {'status': 'deleting',
'terminated_at': now})
host = volume_ref['host']
rpc.cast(context,
db.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume_ref['id']}})
instance_id = ec2_id_to_id(ec2_id)
self.compute_api.update(context, instance_id, **kwargs)
return True
def describe_images(self, context, image_id=None, **kwargs):

View File

@ -17,15 +17,14 @@
from webob import exc
from nova import compute
from nova import flags
from nova import utils
from nova import wsgi
import nova.api.openstack
import nova.image.service
from nova.api.openstack import common
from nova.api.openstack import faults
from nova.compute import api as compute_api
import nova.image.service
FLAGS = flags.FLAGS
@ -131,7 +130,7 @@ class Controller(wsgi.Controller):
env = self._deserialize(req.body, req)
instance_id = env["image"]["serverId"]
name = env["image"]["name"]
return compute_api.ComputeAPI().snapshot(context, instance_id, name)
return compute.API().snapshot(context, instance_id, name)
def update(self, req, id):
# Users may not modify public images, and that's all that

View File

@ -20,12 +20,12 @@ import traceback
from webob import exc
from nova import compute
from nova import exception
from nova import wsgi
from nova.api.openstack import common
from nova.api.openstack import faults
from nova.auth import manager as auth_manager
from nova.compute import api as compute_api
from nova.compute import instance_types
from nova.compute import power_state
import nova.api.openstack
@ -51,7 +51,7 @@ def _translate_detail_keys(inst):
inst_dict = {}
mapped_keys = dict(status='state', imageId='image_id',
flavorId='instance_type', name='display_name', id='internal_id')
flavorId='instance_type', name='display_name', id='id')
for k, v in mapped_keys.iteritems():
inst_dict[k] = inst[v]
@ -67,7 +67,7 @@ def _translate_detail_keys(inst):
def _translate_keys(inst):
""" Coerces into dictionary format, excluding all model attributes
save for id and name """
return dict(server=dict(id=inst['internal_id'], name=inst['display_name']))
return dict(server=dict(id=inst['id'], name=inst['display_name']))
class Controller(wsgi.Controller):
@ -80,7 +80,7 @@ class Controller(wsgi.Controller):
"status", "progress"]}}}
def __init__(self):
self.compute_api = compute_api.ComputeAPI()
self.compute_api = compute.API()
super(Controller, self).__init__()
def index(self, req):
@ -96,8 +96,7 @@ class Controller(wsgi.Controller):
entity_maker - either _translate_detail_keys or _translate_keys
"""
instance_list = self.compute_api.get_instances(
req.environ['nova.context'])
instance_list = self.compute_api.get_all(req.environ['nova.context'])
limited_list = common.limited(instance_list, req)
res = [entity_maker(inst)['server'] for inst in limited_list]
return dict(servers=res)
@ -105,8 +104,7 @@ class Controller(wsgi.Controller):
def show(self, req, id):
""" Returns server details by server id """
try:
instance = self.compute_api.get_instance(
req.environ['nova.context'], int(id))
instance = self.compute_api.get(req.environ['nova.context'], id)
return _translate_detail_keys(instance)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
@ -114,8 +112,7 @@ class Controller(wsgi.Controller):
def delete(self, req, id):
""" Destroys a server """
try:
self.compute_api.delete_instance(req.environ['nova.context'],
int(id))
self.compute_api.delete(req.environ['nova.context'], id)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
return exc.HTTPAccepted()
@ -128,12 +125,12 @@ class Controller(wsgi.Controller):
key_pair = auth_manager.AuthManager.get_key_pairs(
req.environ['nova.context'])[0]
instances = self.compute_api.create_instances(
instances = self.compute_api.create(
req.environ['nova.context'],
instance_types.get_by_flavor_id(env['server']['flavorId']),
env['server']['imageId'],
display_name=env['server']['name'],
description=env['server']['name'],
display_description=env['server']['name'],
key_name=key_pair['name'],
key_data=key_pair['public_key'])
return _translate_keys(instances[0])
@ -151,10 +148,8 @@ class Controller(wsgi.Controller):
update_dict['display_name'] = inst_dict['server']['name']
try:
ctxt = req.environ['nova.context']
self.compute_api.update_instance(ctxt,
id,
**update_dict)
self.compute_api.update(req.environ['nova.context'], id,
**update_dict)
except exception.NotFound:
return faults.Fault(exc.HTTPNotFound())
return exc.HTTPNoContent()
@ -175,6 +170,50 @@ class Controller(wsgi.Controller):
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
def lock(self, req, id):
"""
lock the instance with id
admin only operation
"""
context = req.environ['nova.context']
try:
self.compute_api.lock(context, id)
except:
readable = traceback.format_exc()
logging.error(_("Compute.api::lock %s"), readable)
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
def unlock(self, req, id):
"""
unlock the instance with id
admin only operation
"""
context = req.environ['nova.context']
try:
self.compute_api.unlock(context, id)
except:
readable = traceback.format_exc()
logging.error(_("Compute.api::unlock %s"), readable)
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
def get_lock(self, req, id):
"""
return the boolean state of (instance with id)'s lock
"""
context = req.environ['nova.context']
try:
self.compute_api.get_lock(context, id)
except:
readable = traceback.format_exc()
logging.error(_("Compute.api::get_lock %s"), readable)
return faults.Fault(exc.HTTPUnprocessableEntity())
return exc.HTTPAccepted()
def pause(self, req, id):
""" Permit Admins to Pause the server. """
ctxt = req.environ['nova.context']
@ -227,4 +266,13 @@ class Controller(wsgi.Controller):
def actions(self, req, id):
"""Permit Admins to retrieve server actions."""
ctxt = req.environ["nova.context"]
return self.compute_api.get_actions(ctxt, id)
items = self.compute_api.get_actions(ctxt, id)
actions = []
# TODO(jk0): Do not do pre-serialization here once the default
# serializer is updated
for item in items:
actions.append(dict(
created_at=str(item.created_at),
action=item.action,
error=item.error))
return dict(actions=actions)

View File

@ -16,17 +16,4 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nova.compute` -- Compute Nodes using LibVirt
=====================================================
.. automodule:: nova.compute
:platform: Unix
:synopsis: Thin wrapper around libvirt for VM mgmt.
.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
from nova.compute.api import API

View File

@ -17,7 +17,7 @@
# under the License.
"""
Handles all API requests relating to instances (guest vms).
Handles all requests relating to instances (guest vms).
"""
import datetime
@ -27,36 +27,41 @@ import time
from nova import db
from nova import exception
from nova import flags
from nova import network
from nova import quota
from nova import rpc
from nova import utils
from nova import volume
from nova.compute import instance_types
from nova.db import base
FLAGS = flags.FLAGS
def generate_default_hostname(internal_id):
def generate_default_hostname(instance_id):
"""Default function to generate a hostname given an instance reference."""
return str(internal_id)
return str(instance_id)
class ComputeAPI(base.Base):
class API(base.Base):
"""API for interacting with the compute manager."""
def __init__(self, network_manager=None, image_service=None, **kwargs):
if not network_manager:
network_manager = utils.import_object(FLAGS.network_manager)
self.network_manager = network_manager
def __init__(self, image_service=None, network_api=None, volume_api=None,
**kwargs):
if not image_service:
image_service = utils.import_object(FLAGS.image_service)
self.image_service = image_service
super(ComputeAPI, self).__init__(**kwargs)
if not network_api:
network_api = network.API()
self.network_api = network_api
if not volume_api:
volume_api = volume.API()
self.volume_api = volume_api
super(API, self).__init__(**kwargs)
def get_network_topic(self, context, instance_id):
try:
instance = self.db.instance_get_by_internal_id(context,
instance_id)
instance = self.get(context, instance_id)
except exception.NotFound as e:
logging.warning("Instance %d was not found in get_network_topic",
instance_id)
@ -70,18 +75,18 @@ class ComputeAPI(base.Base):
topic,
{"method": "get_network_topic", "args": {'fake': 1}})
def create_instances(self, context, instance_type, image_id, min_count=1,
max_count=1, kernel_id=None, ramdisk_id=None,
display_name='', description='', key_name=None,
key_data=None, security_group='default',
availability_zone=None,
user_data=None,
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quote and
def create(self, context, instance_type,
image_id, kernel_id=None, ramdisk_id=None,
min_count=1, max_count=1,
display_name='', display_description='',
key_name=None, key_data=None, security_group='default',
availability_zone=None, user_data=None,
generate_hostname=generate_default_hostname):
"""Create the number of instances requested if quota and
other arguments check out ok."""
num_instances = quota.allowed_instances(context, max_count,
instance_type)
type_data = instance_types.INSTANCE_TYPES[instance_type]
num_instances = quota.allowed_instances(context, max_count, type_data)
if num_instances < min_count:
logging.warn("Quota exceeeded for %s, tried to run %s instances",
context.project_id, min_count)
@ -96,7 +101,7 @@ class ComputeAPI(base.Base):
kernel_id = image.get('kernelId', None)
if ramdisk_id is None:
ramdisk_id = image.get('ramdiskId', None)
#No kernel and ramdisk for raw images
# No kernel and ramdisk for raw images
if kernel_id == str(FLAGS.null_kernel):
kernel_id = None
ramdisk_id = None
@ -124,7 +129,6 @@ class ComputeAPI(base.Base):
key_pair = db.key_pair_get(context, context.user_id, key_name)
key_data = key_pair['public_key']
type_data = instance_types.INSTANCE_TYPES[instance_type]
base_options = {
'reservation_id': utils.generate_uid('r'),
'image_id': image_id,
@ -139,10 +143,11 @@ class ComputeAPI(base.Base):
'vcpus': type_data['vcpus'],
'local_gb': type_data['local_gb'],
'display_name': display_name,
'display_description': description,
'display_description': display_description,
'user_data': user_data or '',
'key_name': key_name,
'key_data': key_data,
'locked': False,
'availability_zone': availability_zone}
elevated = context.elevated()
@ -154,7 +159,6 @@ class ComputeAPI(base.Base):
**base_options)
instance = self.db.instance_create(context, instance)
instance_id = instance['id']
internal_id = instance['internal_id']
elevated = context.elevated()
if not security_groups:
@ -165,11 +169,11 @@ class ComputeAPI(base.Base):
security_group_id)
# Set sane defaults if not specified
updates = dict(hostname=generate_hostname(internal_id))
updates = dict(hostname=generate_hostname(instance_id))
if 'display_name' not in instance:
updates['display_name'] = "Server %s" % internal_id
updates['display_name'] = "Server %s" % instance_id
instance = self.update_instance(context, instance_id, **updates)
instance = self.update(context, instance_id, **updates)
instances.append(instance)
logging.debug(_("Casting to scheduler for %s/%s's instance %s"),
@ -199,7 +203,7 @@ class ComputeAPI(base.Base):
'project_id': context.project_id}
db.security_group_create(context, values)
def update_instance(self, context, instance_id, **kwargs):
def update(self, context, instance_id, **kwargs):
"""Updates the instance in the datastore.
:param context: The security context
@ -213,134 +217,204 @@ class ComputeAPI(base.Base):
"""
return self.db.instance_update(context, instance_id, kwargs)
def delete_instance(self, context, instance_id):
logging.debug("Going to try and terminate %d" % instance_id)
def delete(self, context, instance_id):
logging.debug("Going to try and terminate %s" % instance_id)
try:
instance = self.db.instance_get_by_internal_id(context,
instance_id)
instance = self.get(context, instance_id)
except exception.NotFound as e:
logging.warning(_("Instance %d was not found during terminate"),
logging.warning(_("Instance %s was not found during terminate"),
instance_id)
raise e
if (instance['state_description'] == 'terminating'):
logging.warning(_("Instance %d is already being terminated"),
logging.warning(_("Instance %s is already being terminated"),
instance_id)
return
self.update_instance(context,
instance['id'],
state_description='terminating',
state=0,
terminated_at=datetime.datetime.utcnow())
self.update(context,
instance['id'],
state_description='terminating',
state=0,
terminated_at=datetime.datetime.utcnow())
host = instance['host']
if host:
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "terminate_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
else:
self.db.instance_destroy(context, instance['id'])
self.db.instance_destroy(context, instance_id)
def get_instances(self, context, project_id=None):
"""Get all instances, possibly filtered by project ID or
user ID. If there is no filter and the context is an admin,
it will retreive all instances in the system."""
def get(self, context, instance_id):
"""Get a single instance with the given ID."""
return self.db.instance_get_by_id(context, instance_id)
def get_all(self, context, project_id=None, reservation_id=None,
fixed_ip=None):
"""Get all instances, possibly filtered by one of the
given parameters. If there is no filter and the context is
an admin, it will retreive all instances in the system."""
if reservation_id is not None:
return self.db.instance_get_all_by_reservation(context,
reservation_id)
if fixed_ip is not None:
return self.db.fixed_ip_get_instance(context, fixed_ip)
if project_id or not context.is_admin:
if not context.project:
return self.db.instance_get_all_by_user(context,
context.user_id)
if project_id is None:
project_id = context.project_id
return self.db.instance_get_all_by_project(context, project_id)
return self.db.instance_get_all_by_project(context,
project_id)
return self.db.instance_get_all(context)
def get_instance(self, context, instance_id):
return self.db.instance_get_by_internal_id(context, instance_id)
def snapshot(self, context, instance_id, name):
"""Snapshot the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "snapshot_instance",
"args": {"instance_id": instance['id'], "name": name}})
"args": {"instance_id": instance_id, "name": name}})
def reboot(self, context, instance_id):
"""Reboot the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "reboot_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
def pause(self, context, instance_id):
"""Pause the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "pause_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
def unpause(self, context, instance_id):
"""Unpause the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unpause_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
def get_diagnostics(self, context, instance_id):
"""Retrieve diagnostics for the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance["host"]
return rpc.call(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "get_diagnostics",
"args": {"instance_id": instance["id"]}})
"args": {"instance_id": instance_id}})
def get_actions(self, context, instance_id):
"""Retrieve actions for the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
return self.db.instance_get_actions(context, instance["id"])
return self.db.instance_get_actions(context, instance_id)
def suspend(self, context, instance_id):
"""suspend the instance with instance_id"""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "suspend_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
def resume(self, context, instance_id):
"""resume the instance with instance_id"""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "resume_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
def rescue(self, context, instance_id):
"""Rescue the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "rescue_instance",
"args": {"instance_id": instance['id']}})
"args": {"instance_id": instance_id}})
def unrescue(self, context, instance_id):
"""Unrescue the given instance."""
instance = self.db.instance_get_by_internal_id(context, instance_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unrescue_instance",
"args": {"instance_id": instance_id}})
def lock(self, context, instance_id):
"""
lock the instance with instance_id
"""
instance = self.get_instance(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "lock_instance",
"args": {"instance_id": instance['id']}})
def unlock(self, context, instance_id):
"""
unlock the instance with instance_id
"""
instance = self.get_instance(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "unlock_instance",
"args": {"instance_id": instance['id']}})
def get_lock(self, context, instance_id):
"""
return the boolean state of (instance with instance_id)'s lock
"""
instance = self.get_instance(context, instance_id)
return instance['locked']
def attach_volume(self, context, instance_id, volume_id, device):
if not re.match("^/dev/[a-z]d[a-z]+$", device):
raise exception.ApiError(_("Invalid device specified: %s. "
"Example device: /dev/vdb") % device)
self.volume_api.check_attach(context, volume_id)
instance = self.get(context, instance_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "attach_volume",
"args": {"volume_id": volume_id,
"instance_id": instance_id,
"mountpoint": device}})
def detach_volume(self, context, volume_id):
instance = self.db.volume_get_instance(context.elevated(), volume_id)
if not instance:
raise exception.ApiError(_("Volume isn't attached to anything!"))
self.volume_api.check_detach(context, volume_id)
host = instance['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.compute_topic, host),
{"method": "detach_volume",
"args": {"instance_id": instance['id'],
"volume_id": volume_id}})
return instance
def associate_floating_ip(self, context, instance_id, address):
instance = self.get(context, instance_id)
self.network_api.associate_floating_ip(context, address,
instance['fixed_ip'])

View File

@ -36,6 +36,7 @@ terminating it.
import datetime
import logging
import functools
from nova import exception
from nova import flags
@ -53,6 +54,38 @@ flags.DEFINE_string('stub_network', False,
'Stub network related code')
def checks_instance_lock(function):
"""
decorator used for preventing action against locked instances
unless, of course, you happen to be admin
"""
@functools.wraps(function)
def decorated_function(self, context, instance_id, *args, **kwargs):
logging.info(_("check_instance_lock: decorating: |%s|"), function)
logging.info(_("check_instance_lock: arguments: |%s| |%s| |%s|"),
self,
context,
instance_id)
locked = self.get_lock(context, instance_id)
admin = context.is_admin
logging.info(_("check_instance_lock: locked: |%s|"), locked)
logging.info(_("check_instance_lock: admin: |%s|"), admin)
# if admin or unlocked call function otherwise log error
if admin or not locked:
logging.info(_("check_instance_lock: executing: |%s|"), function)
function(self, context, instance_id, *args, **kwargs)
else:
logging.error(_("check_instance_lock: not executing |%s|"),
function)
return False
return decorated_function
class ComputeManager(manager.Manager):
"""Manages the running instances from creation to destruction."""
@ -158,6 +191,7 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def terminate_instance(self, context, instance_id):
"""Terminate an instance on this machine."""
context = context.elevated()
@ -202,6 +236,7 @@ class ComputeManager(manager.Manager):
self.db.instance_destroy(context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def reboot_instance(self, context, instance_id):
"""Reboot an instance on this server."""
context = context.elevated()
@ -211,7 +246,7 @@ class ComputeManager(manager.Manager):
if instance_ref['state'] != power_state.RUNNING:
logging.warn(_('trying to reboot a non-running '
'instance: %s (state: %s excepted: %s)'),
instance_ref['internal_id'],
instance_id,
instance_ref['state'],
power_state.RUNNING)
@ -239,20 +274,20 @@ class ComputeManager(manager.Manager):
if instance_ref['state'] != power_state.RUNNING:
logging.warn(_('trying to snapshot a non-running '
'instance: %s (state: %s excepted: %s)'),
instance_ref['internal_id'],
instance_id,
instance_ref['state'],
power_state.RUNNING)
self.driver.snapshot(instance_ref, name)
@exception.wrap_exception
@checks_instance_lock
def rescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug(_('instance %s: rescuing'),
instance_ref['internal_id'])
logging.debug(_('instance %s: rescuing'), instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@ -262,13 +297,13 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def unrescue_instance(self, context, instance_id):
"""Rescue an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug(_('instance %s: unrescuing'),
instance_ref['internal_id'])
logging.debug(_('instance %s: unrescuing'), instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@ -282,13 +317,13 @@ class ComputeManager(manager.Manager):
self._update_state(context, instance_id)
@exception.wrap_exception
@checks_instance_lock
def pause_instance(self, context, instance_id):
"""Pause an instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug('instance %s: pausing',
instance_ref['internal_id'])
logging.debug('instance %s: pausing', instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@ -300,13 +335,13 @@ class ComputeManager(manager.Manager):
result))
@exception.wrap_exception
@checks_instance_lock
def unpause_instance(self, context, instance_id):
"""Unpause a paused instance on this server."""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug('instance %s: unpausing',
instance_ref['internal_id'])
logging.debug('instance %s: unpausing', instance_id)
self.db.instance_set_state(context,
instance_id,
power_state.NOSTATE,
@ -324,16 +359,20 @@ class ComputeManager(manager.Manager):
if instance_ref["state"] == power_state.RUNNING:
logging.debug(_("instance %s: retrieving diagnostics"),
instance_ref["internal_id"])
instance_id)
return self.driver.get_diagnostics(instance_ref)
@exception.wrap_exception
@checks_instance_lock
def suspend_instance(self, context, instance_id):
"""suspend the instance with instance_id"""
"""
suspend the instance with instance_id
"""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug(_('instance %s: suspending'),
instance_ref['internal_id'])
logging.debug(_('instance %s: suspending'), instance_id)
self.db.instance_set_state(context, instance_id,
power_state.NOSTATE,
'suspending')
@ -344,12 +383,16 @@ class ComputeManager(manager.Manager):
result))
@exception.wrap_exception
@checks_instance_lock
def resume_instance(self, context, instance_id):
"""resume the suspended instance with instance_id"""
"""
resume the suspended instance with instance_id
"""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug(_('instance %s: resuming'), instance_ref['internal_id'])
logging.debug(_('instance %s: resuming'), instance_id)
self.db.instance_set_state(context, instance_id,
power_state.NOSTATE,
'resuming')
@ -359,6 +402,41 @@ class ComputeManager(manager.Manager):
instance_id,
result))
@exception.wrap_exception
def lock_instance(self, context, instance_id):
"""
lock the instance with instance_id
"""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug(_('instance %s: locking'), instance_id)
self.db.instance_update(context, instance_id, {'locked': True})
@exception.wrap_exception
def unlock_instance(self, context, instance_id):
"""
unlock the instance with instance_id
"""
context = context.elevated()
instance_ref = self.db.instance_get(context, instance_id)
logging.debug(_('instance %s: unlocking'), instance_id)
self.db.instance_update(context, instance_id, {'locked': False})
@exception.wrap_exception
def get_lock(self, context, instance_id):
"""
return the boolean state of (instance with instance_id)'s lock
"""
context = context.elevated()
logging.debug(_('instance %s: getting locked state'), instance_id)
instance_ref = self.db.instance_get(context, instance_id)
return instance_ref['locked']
@exception.wrap_exception
def get_console_output(self, context, instance_id):
"""Send the console output for an instance."""
@ -369,6 +447,7 @@ class ComputeManager(manager.Manager):
return self.driver.get_console_output(instance_ref)
@exception.wrap_exception
@checks_instance_lock
def attach_volume(self, context, instance_id, volume_id, mountpoint):
"""Attach a volume to an instance."""
context = context.elevated()
@ -398,6 +477,7 @@ class ComputeManager(manager.Manager):
return True
@exception.wrap_exception
@checks_instance_lock
def detach_volume(self, context, instance_id, volume_id):
"""Detach a volume from an instance."""
context = context.elevated()

View File

@ -353,9 +353,9 @@ def instance_get_project_vpn(context, project_id):
return IMPL.instance_get_project_vpn(context, project_id)
def instance_get_by_internal_id(context, internal_id):
"""Get an instance by internal id."""
return IMPL.instance_get_by_internal_id(context, internal_id)
def instance_get_by_id(context, instance_id):
"""Get an instance by id."""
return IMPL.instance_get_by_id(context, instance_id)
def instance_is_vpn(context, instance_id):
@ -719,7 +719,7 @@ def security_group_get_all(context):
def security_group_get(context, security_group_id):
"""Get security group by its internal id."""
"""Get security group by its id."""
return IMPL.security_group_get(context, security_group_id)

View File

@ -19,6 +19,25 @@
"""
SQLAlchemy database backend
"""
import logging
import time
from sqlalchemy.exc import OperationalError
from nova import flags
from nova.db.sqlalchemy import models
models.register_models()
FLAGS = flags.FLAGS
for i in xrange(FLAGS.sql_max_retries):
if i > 0:
time.sleep(FLAGS.sql_retry_interval)
try:
models.register_models()
break
except OperationalError:
logging.exception(_("Data store is unreachable."
" Trying again in %d seconds.") % FLAGS.sql_retry_interval)

View File

@ -19,7 +19,6 @@
Implementation of SQLAlchemy backend.
"""
import random
import warnings
from nova import db
@ -606,30 +605,18 @@ def fixed_ip_update(context, address, values):
###################
#TODO(gundlach): instance_create and volume_create are nearly identical
#and should be refactored. I expect there are other copy-and-paste
#functions between the two of them as well.
@require_context
def instance_create(context, values):
"""Create a new Instance record in the database.
context - request context object
values - dict containing column values.
'internal_id' is auto-generated and should not be specified.
"""
instance_ref = models.Instance()
instance_ref.update(values)
session = get_session()
with session.begin():
while instance_ref.internal_id == None:
# Instances have integer internal ids.
internal_id = random.randint(0, 2 ** 31 - 1)
if not instance_internal_id_exists(context, internal_id,
session=session):
instance_ref.internal_id = internal_id
instance_ref.save(session=session)
return instance_ref
@ -751,37 +738,28 @@ def instance_get_project_vpn(context, project_id):
@require_context
def instance_get_by_internal_id(context, internal_id):
def instance_get_by_id(context, instance_id):
session = get_session()
if is_admin_context(context):
result = session.query(models.Instance).\
options(joinedload('security_groups')).\
filter_by(internal_id=internal_id).\
filter_by(id=instance_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
elif is_user_context(context):
result = session.query(models.Instance).\
options(joinedload('security_groups')).\
filter_by(project_id=context.project_id).\
filter_by(internal_id=internal_id).\
filter_by(id=instance_id).\
filter_by(deleted=False).\
first()
if not result:
raise exception.NotFound(_('Instance %s not found') % (internal_id))
raise exception.NotFound(_('Instance %s not found') % (instance_id))
return result
@require_context
def instance_internal_id_exists(context, internal_id, session=None):
if not session:
session = get_session()
return session.query(exists().\
where(models.Instance.internal_id == internal_id)).\
one()[0]
@require_context
def instance_get_fixed_address(context, instance_id):
session = get_session()
@ -862,12 +840,9 @@ def instance_action_create(context, values):
def instance_get_actions(context, instance_id):
"""Return the actions associated to the given instance id"""
session = get_session()
actions = {}
for action in session.query(models.InstanceActions).\
return session.query(models.InstanceActions).\
filter_by(instance_id=instance_id).\
all():
actions[action.action] = action.error
return actions
all()
###################
@ -1317,10 +1292,6 @@ def volume_create(context, values):
session = get_session()
with session.begin():
while volume_ref.ec2_id == None:
ec2_id = utils.generate_uid('vol')
if not volume_ec2_id_exists(context, ec2_id, session=session):
volume_ref.ec2_id = ec2_id
volume_ref.save(session=session)
return volume_ref
@ -1418,41 +1389,6 @@ def volume_get_all_by_project(context, project_id):
all()
@require_context
def volume_get_by_ec2_id(context, ec2_id):
session = get_session()
result = None
if is_admin_context(context):
result = session.query(models.Volume).\
filter_by(ec2_id=ec2_id).\
filter_by(deleted=can_read_deleted(context)).\
first()
elif is_user_context(context):
result = session.query(models.Volume).\
filter_by(project_id=context.project_id).\
filter_by(ec2_id=ec2_id).\
filter_by(deleted=False).\
first()
else:
raise exception.NotAuthorized()
if not result:
raise exception.NotFound(_('Volume %s not found') % ec2_id)
return result
@require_context
def volume_ec2_id_exists(context, ec2_id, session=None):
if not session:
session = get_session()
return session.query(exists().\
where(models.Volume.id == ec2_id)).\
one()[0]
@require_admin_context
def volume_get_instance(context, volume_id):
session = get_session()

View File

@ -164,11 +164,13 @@ class Certificate(BASE, NovaBase):
class Instance(BASE, NovaBase):
"""Represents a guest vm."""
__tablename__ = 'instances'
id = Column(Integer, primary_key=True)
internal_id = Column(Integer, unique=True)
id = Column(Integer, primary_key=True, autoincrement=True)
@property
def name(self):
return "instance-%08x" % self.id
admin_pass = Column(String(255))
user_id = Column(String(255))
project_id = Column(String(255))
@ -180,10 +182,6 @@ class Instance(BASE, NovaBase):
def project(self):
return auth.manager.AuthManager().get_project(self.project_id)
@property
def name(self):
return "instance-%d" % self.internal_id
image_id = Column(String(255))
kernel_id = Column(String(255))
ramdisk_id = Column(String(255))
@ -226,6 +224,8 @@ class Instance(BASE, NovaBase):
display_name = Column(String(255))
display_description = Column(String(255))
locked = Column(Boolean)
# TODO(vish): see Ewan's email about state improvements, probably
# should be in a driver base class or some such
# vmstate_state = running, halted, suspended, paused
@ -251,8 +251,11 @@ class InstanceActions(BASE, NovaBase):
class Volume(BASE, NovaBase):
"""Represents a block storage device that can be attached to a vm."""
__tablename__ = 'volumes'
id = Column(Integer, primary_key=True)
ec2_id = Column(String(12), unique=True)
id = Column(Integer, primary_key=True, autoincrement=True)
@property
def name(self):
return "volume-%08x" % self.id
user_id = Column(String(255))
project_id = Column(String(255))
@ -278,10 +281,6 @@ class Volume(BASE, NovaBase):
display_name = Column(String(255))
display_description = Column(String(255))
@property
def name(self):
return self.ec2_id
class Quota(BASE, NovaBase):
"""Represents quota overrides for a project."""
@ -545,7 +544,8 @@ def register_models():
"""Register Models and create metadata.
Called from nova.db.sqlalchemy.__init__ as part of loading the driver,
it will never need to be called explicitly elsewhere.
it will never need to be called explicitly elsewhere unless the
connection is lost and needs to be reestablished.
"""
from sqlalchemy import create_engine
models = (Service, Instance, InstanceActions,

View File

@ -266,6 +266,8 @@ DEFINE_string('sql_connection',
DEFINE_string('sql_idle_timeout',
'3600',
'timeout for idle sql database connections')
DEFINE_integer('sql_max_retries', 12, 'sql connection attempts')
DEFINE_integer('sql_retry_interval', 10, 'sql connection retry interval')
DEFINE_string('compute_manager', 'nova.compute.manager.ComputeManager',
'Manager for compute')

View File

@ -24,7 +24,6 @@ import urlparse
import webob.exc
from nova.compute import api as compute_api
from nova import utils
from nova import flags
from nova import exception

View File

@ -16,17 +16,4 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nova.network` -- Network Nodes
=====================================================
.. automodule:: nova.network
:platform: Unix
:synopsis: Network is responsible for managing networking
.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
from nova.network.api import API

87
nova/network/api.py Normal file
View File

@ -0,0 +1,87 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to instances (guest vms).
"""
import logging
from nova import db
from nova import flags
from nova import quota
from nova import rpc
from nova.db import base
FLAGS = flags.FLAGS
class API(base.Base):
"""API for interacting with the network manager."""
def allocate_floating_ip(self, context):
if quota.allowed_floating_ips(context, 1) < 1:
logging.warn(_("Quota exceeeded for %s, tried to allocate "
"address"),
context.project_id)
raise quota.QuotaError(_("Address quota exceeded. You cannot "
"allocate any more addresses"))
# NOTE(vish): We don't know which network host should get the ip
# when we allocate, so just send it to any one. This
# will probably need to move into a network supervisor
# at some point.
return rpc.call(context,
FLAGS.network_topic,
{"method": "allocate_floating_ip",
"args": {"project_id": context.project_id}})
def release_floating_ip(self, context, address):
floating_ip = self.db.floating_ip_get_by_address(context, address)
# NOTE(vish): We don't know which network host should get the ip
# when we deallocate, so just send it to any one. This
# will probably need to move into a network supervisor
# at some point.
rpc.cast(context,
FLAGS.network_topic,
{"method": "deallocate_floating_ip",
"args": {"floating_address": floating_ip['address']}})
def associate_floating_ip(self, context, floating_ip, fixed_ip):
if isinstance(fixed_ip, str) or isinstance(fixed_ip, unicode):
fixed_ip = self.db.fixed_ip_get_by_address(context, fixed_ip)
floating_ip = self.db.floating_ip_get_by_address(context, floating_ip)
# NOTE(vish): Perhaps we should just pass this on to compute and
# let compute communicate with network.
host = fixed_ip['network']['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.network_topic, host),
{"method": "associate_floating_ip",
"args": {"floating_address": floating_ip['address'],
"fixed_address": fixed_ip['address']}})
def disassociate_floating_ip(self, context, address):
floating_ip = self.db.floating_ip_get_by_address(context, address)
if not floating_ip.get('fixed_ip'):
raise exception.ApiError('Address is not associated.')
# NOTE(vish): Get the topic from the host name of the network of
# the associated fixed ip.
host = floating_ip['fixed_ip']['network']['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.network_topic, host),
{"method": "disassociate_floating_ip",
"args": {"floating_address": floating_ip['address']}})

View File

@ -22,7 +22,6 @@ Quotas for instances, volumes, and floating ips
from nova import db
from nova import exception
from nova import flags
from nova.compute import instance_types
FLAGS = flags.FLAGS
@ -63,10 +62,9 @@ def allowed_instances(context, num_instances, instance_type):
quota = get_quota(context, project_id)
allowed_instances = quota['instances'] - used_instances
allowed_cores = quota['cores'] - used_cores
type_cores = instance_types.INSTANCE_TYPES[instance_type]['vcpus']
num_cores = num_instances * type_cores
num_cores = num_instances * instance_type['vcpus']
allowed_instances = min(allowed_instances,
int(allowed_cores // type_cores))
int(allowed_cores // instance_type['vcpus']))
return min(num_instances, allowed_instances)

View File

@ -24,17 +24,21 @@ import inspect
import logging
import os
import sys
import time
from eventlet import event
from eventlet import greenthread
from eventlet import greenpool
from sqlalchemy.exc import OperationalError
from nova import context
from nova import db
from nova import exception
from nova import flags
from nova import rpc
from nova import utils
from nova.db.sqlalchemy import models
FLAGS = flags.FLAGS
@ -204,6 +208,14 @@ class Service(object):
self.model_disconnected = True
logging.exception(_("model server went away"))
try:
models.register_models()
except OperationalError:
logging.exception(_("Data store is unreachable."
" Trying again in %d seconds.") %
FLAGS.sql_retry_interval)
time.sleep(FLAGS.sql_retry_interval)
def serve(*services):
argv = FLAGS(sys.argv)

View File

@ -113,7 +113,7 @@ def stub_out_networking(stubs):
def stub_out_compute_api_snapshot(stubs):
def snapshot(self, context, instance_id, name):
return 123
stubs.Set(nova.compute.api.ComputeAPI, 'snapshot', snapshot)
stubs.Set(nova.compute.API, 'snapshot', snapshot)
def stub_out_glance(stubs, initial_fixtures=[]):

View File

@ -56,8 +56,8 @@ def instance_address(context, instance_id):
def stub_instance(id, user_id=1):
return Instance(id=int(id) + 123456, state=0, image_id=10, user_id=user_id,
display_name='server%s' % id, internal_id=id)
return Instance(id=id, state=0, image_id=10, user_id=user_id,
display_name='server%s' % id)
def fake_compute_api(cls, req, id):
@ -76,8 +76,7 @@ class ServersTest(unittest.TestCase):
fakes.stub_out_key_pair_funcs(self.stubs)
fakes.stub_out_image_service(self.stubs)
self.stubs.Set(nova.db.api, 'instance_get_all', return_servers)
self.stubs.Set(nova.db.api, 'instance_get_by_internal_id',
return_server)
self.stubs.Set(nova.db.api, 'instance_get_by_id', return_server)
self.stubs.Set(nova.db.api, 'instance_get_all_by_user',
return_servers)
self.stubs.Set(nova.db.api, 'instance_add_security_group',
@ -87,18 +86,12 @@ class ServersTest(unittest.TestCase):
instance_address)
self.stubs.Set(nova.db.api, 'instance_get_floating_address',
instance_address)
self.stubs.Set(nova.compute.api.ComputeAPI, 'pause',
fake_compute_api)
self.stubs.Set(nova.compute.api.ComputeAPI, 'unpause',
fake_compute_api)
self.stubs.Set(nova.compute.api.ComputeAPI, 'suspend',
fake_compute_api)
self.stubs.Set(nova.compute.api.ComputeAPI, 'resume',
fake_compute_api)
self.stubs.Set(nova.compute.api.ComputeAPI, "get_diagnostics",
fake_compute_api)
self.stubs.Set(nova.compute.api.ComputeAPI, "get_actions",
fake_compute_api)
self.stubs.Set(nova.compute.API, 'pause', fake_compute_api)
self.stubs.Set(nova.compute.API, 'unpause', fake_compute_api)
self.stubs.Set(nova.compute.API, 'suspend', fake_compute_api)
self.stubs.Set(nova.compute.API, 'resume', fake_compute_api)
self.stubs.Set(nova.compute.API, "get_diagnostics", fake_compute_api)
self.stubs.Set(nova.compute.API, "get_actions", fake_compute_api)
self.allow_admin = FLAGS.allow_admin_api
def tearDown(self):
@ -109,7 +102,7 @@ class ServersTest(unittest.TestCase):
req = webob.Request.blank('/v1.0/servers/1')
res = req.get_response(nova.api.API('os'))
res_dict = json.loads(res.body)
self.assertEqual(res_dict['server']['id'], 1)
self.assertEqual(res_dict['server']['id'], '1')
self.assertEqual(res_dict['server']['name'], 'server1')
def test_get_server_list(self):
@ -126,7 +119,7 @@ class ServersTest(unittest.TestCase):
def test_create_instance(self):
def instance_create(context, inst):
return {'id': 1, 'internal_id': 1, 'display_name': ''}
return {'id': '1', 'display_name': ''}
def server_update(context, id, params):
return instance_create(context, id)

View File

@ -0,0 +1,71 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
#
# Copyright 2010 Cloud.com, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Tests For Hyper-V driver
"""
import random
from nova import context
from nova import db
from nova import flags
from nova import test
from nova.auth import manager
from nova.virt import hyperv
FLAGS = flags.FLAGS
FLAGS.connection_type = 'hyperv'
class HyperVTestCase(test.TestCase):
"""Test cases for the Hyper-V driver"""
def setUp(self):
super(HyperVTestCase, self).setUp()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake',
admin=True)
self.project = self.manager.create_project('fake', 'fake', 'fake')
self.context = context.RequestContext(self.user, self.project)
def test_create_destroy(self):
"""Create a VM and destroy it"""
instance = {'internal_id': random.randint(1, 1000000),
'memory_mb': '1024',
'mac_address': '02:12:34:46:56:67',
'vcpus': 2,
'project_id': 'fake',
'instance_type': 'm1.small'}
instance_ref = db.instance_create(self.context, instance)
conn = hyperv.get_connection(False)
conn._create_vm(instance_ref) # pylint: disable-msg=W0212
found = [n for n in conn.list_instances()
if n == instance_ref['name']]
self.assertTrue(len(found) == 1)
info = conn.get_info(instance_ref['name'])
#Unfortunately since the vm is not running at this point,
#we cannot obtain memory information from get_info
self.assertEquals(info['num_cpu'], instance_ref['vcpus'])
conn.destroy(instance_ref)
found = [n for n in conn.list_instances()
if n == instance_ref['name']]
self.assertTrue(len(found) == 0)
def tearDown(self):
super(HyperVTestCase, self).tearDown()
self.manager.delete_project(self.project)
self.manager.delete_user(self.user)

View File

@ -106,7 +106,7 @@ class CloudTestCase(test.TestCase):
self.cloud.allocate_address(self.context)
inst = db.instance_create(self.context, {'host': FLAGS.host})
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.associate_address(self.context,
instance_id=ec2_id,
public_ip=address)
@ -127,9 +127,9 @@ class CloudTestCase(test.TestCase):
result = self.cloud.describe_volumes(self.context)
self.assertEqual(len(result['volumeSet']), 2)
result = self.cloud.describe_volumes(self.context,
volume_id=[vol2['ec2_id']])
volume_id=[vol2['id']])
self.assertEqual(len(result['volumeSet']), 1)
self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['ec2_id'])
self.assertEqual(result['volumeSet'][0]['volumeId'], vol2['id'])
db.volume_destroy(self.context, vol1['id'])
db.volume_destroy(self.context, vol2['id'])
@ -140,15 +140,16 @@ class CloudTestCase(test.TestCase):
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count}
rv = yield self.cloud.run_instances(self.context, **kwargs)
rv = self.cloud.run_instances(self.context, **kwargs)
print rv
instance_id = rv['instancesSet'][0]['instanceId']
output = yield self.cloud.get_console_output(context=self.context,
output = self.cloud.get_console_output(context=self.context,
instance_id=[instance_id])
self.assertEquals(b64decode(output['output']), 'FAKE CONSOLE OUTPUT')
# TODO(soren): We need this until we can stop polling in the rpc code
# for unit tests.
greenthread.sleep(0.3)
rv = yield self.cloud.terminate_instances(self.context, [instance_id])
rv = self.cloud.terminate_instances(self.context, [instance_id])
def test_key_generation(self):
result = self._create_key('test')
@ -186,7 +187,7 @@ class CloudTestCase(test.TestCase):
kwargs = {'image_id': image_id,
'instance_type': instance_type,
'max_count': max_count}
rv = yield self.cloud.run_instances(self.context, **kwargs)
rv = self.cloud.run_instances(self.context, **kwargs)
# TODO: check for proper response
instance_id = rv['reservationSet'][0].keys()[0]
instance = rv['reservationSet'][0][instance_id][0]
@ -209,7 +210,7 @@ class CloudTestCase(test.TestCase):
for instance in reservations[reservations.keys()[0]]:
instance_id = instance['instance_id']
logging.debug("Terminating instance %s" % instance_id)
rv = yield self.compute.terminate_instance(instance_id)
rv = self.compute.terminate_instance(instance_id)
def test_instance_update_state(self):
def instance(num):
@ -296,7 +297,7 @@ class CloudTestCase(test.TestCase):
def test_update_of_instance_display_fields(self):
inst = db.instance_create(self.context, {})
ec2_id = cloud.internal_id_to_ec2_id(inst['internal_id'])
ec2_id = cloud.id_to_ec2_id(inst['id'])
self.cloud.update_instance(self.context, ec2_id,
display_name='c00l 1m4g3')
inst = db.instance_get(self.context, inst['id'])

View File

@ -22,6 +22,7 @@ Tests For Compute
import datetime
import logging
from nova import compute
from nova import context
from nova import db
from nova import exception
@ -29,7 +30,6 @@ from nova import flags
from nova import test
from nova import utils
from nova.auth import manager
from nova.compute import api as compute_api
FLAGS = flags.FLAGS
@ -44,7 +44,7 @@ class ComputeTestCase(test.TestCase):
stub_network=True,
network_manager='nova.network.manager.FlatManager')
self.compute = utils.import_object(FLAGS.compute_manager)
self.compute_api = compute_api.ComputeAPI()
self.compute_api = compute.API()
self.manager = manager.AuthManager()
self.user = self.manager.create_user('fake', 'fake', 'fake')
self.project = self.manager.create_project('fake', 'fake', 'fake')
@ -72,7 +72,7 @@ class ComputeTestCase(test.TestCase):
"""Verify that an instance cannot be created without a display_name."""
cases = [dict(), dict(display_name=None)]
for instance in cases:
ref = self.compute_api.create_instances(self.context,
ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, **instance)
try:
self.assertNotEqual(ref[0].display_name, None)
@ -80,13 +80,13 @@ class ComputeTestCase(test.TestCase):
db.instance_destroy(self.context, ref[0]['id'])
def test_create_instance_associates_security_groups(self):
"""Make sure create_instances associates security groups"""
"""Make sure create associates security groups"""
values = {'name': 'default',
'description': 'default',
'user_id': self.user.id,
'project_id': self.project.id}
group = db.security_group_create(self.context, values)
ref = self.compute_api.create_instances(self.context,
ref = self.compute_api.create(self.context,
FLAGS.default_instance_type, None, security_group=['default'])
try:
self.assertEqual(len(ref[0]['security_groups']), 1)
@ -178,3 +178,22 @@ class ComputeTestCase(test.TestCase):
self.context,
instance_id)
self.compute.terminate_instance(self.context, instance_id)
def test_lock(self):
"""ensure locked instance cannot be changed"""
instance_id = self._create_instance()
self.compute.run_instance(self.context, instance_id)
non_admin_context = context.RequestContext(None, None, False, False)
# decorator should return False (fail) with locked nonadmin context
self.compute.lock_instance(self.context, instance_id)
ret_val = self.compute.reboot_instance(non_admin_context, instance_id)
self.assertEqual(ret_val, False)
# decorator should return None (success) with unlocked nonadmin context
self.compute.unlock_instance(self.context, instance_id)
ret_val = self.compute.reboot_instance(non_admin_context, instance_id)
self.assertEqual(ret_val, None)
self.compute.terminate_instance(self.context, instance_id)

View File

@ -27,6 +27,7 @@ from nova import test
from nova import utils
from nova.auth import manager
from nova.api.ec2 import cloud
from nova.compute import instance_types
FLAGS = flags.FLAGS
@ -78,14 +79,17 @@ class QuotaTestCase(test.TestCase):
def test_quota_overrides(self):
"""Make sure overriding a projects quotas works"""
num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
num_instances = quota.allowed_instances(self.context, 100,
instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 2)
db.quota_create(self.context, {'project_id': self.project.id,
'instances': 10})
num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
num_instances = quota.allowed_instances(self.context, 100,
instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 4)
db.quota_update(self.context, self.project.id, {'cores': 100})
num_instances = quota.allowed_instances(self.context, 100, 'm1.small')
num_instances = quota.allowed_instances(self.context, 100,
instance_types.INSTANCE_TYPES['m1.small'])
self.assertEqual(num_instances, 10)
db.quota_destroy(self.context, self.project.id)

View File

@ -79,8 +79,8 @@ class XenAPIVolumeTestCase(test.TestCase):
helper = volume_utils.VolumeHelper
helper.XenAPI = session.get_imported_xenapi()
vol = self._create_volume()
info = helper.parse_volume_info(vol['ec2_id'], '/dev/sdc')
label = 'SR-%s' % vol['ec2_id']
info = helper.parse_volume_info(vol['id'], '/dev/sdc')
label = 'SR-%s' % vol['id']
description = 'Test-SR'
sr_ref = helper.create_iscsi_storage(session, info, label, description)
srs = xenapi_fake.get_all('SR')
@ -97,7 +97,7 @@ class XenAPIVolumeTestCase(test.TestCase):
# oops, wrong mount point!
self.assertRaises(volume_utils.StorageError,
helper.parse_volume_info,
vol['ec2_id'],
vol['id'],
'/dev/sd')
db.volume_destroy(context.get_admin_context(), vol['id'])
@ -108,8 +108,7 @@ class XenAPIVolumeTestCase(test.TestCase):
volume = self._create_volume()
instance = db.instance_create(self.values)
xenapi_fake.create_vm(instance.name, 'Running')
result = conn.attach_volume(instance.name, volume['ec2_id'],
'/dev/sdc')
result = conn.attach_volume(instance.name, volume['id'], '/dev/sdc')
def check():
# check that the VM has a VBD attached to it
@ -134,7 +133,7 @@ class XenAPIVolumeTestCase(test.TestCase):
self.assertRaises(Exception,
conn.attach_volume,
instance.name,
volume['ec2_id'],
volume['id'],
'/dev/sdc')
def tearDown(self):
@ -250,15 +249,16 @@ class XenAPIVMTestCase(test.TestCase):
def _create_instance(self):
"""Creates and spawns a test instance"""
values = {'name': 1, 'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': 1,
'kernel_id': 2,
'ramdisk_id': 3,
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff'
}
values = {
'name': 1,
'id': 1,
'project_id': self.project.id,
'user_id': self.user.id,
'image_id': 1,
'kernel_id': 2,
'ramdisk_id': 3,
'instance_type': 'm1.large',
'mac_address': 'aa:bb:cc:dd:ee:ff'}
instance = db.instance_create(values)
self.conn.spawn(instance)
return instance

View File

@ -26,6 +26,7 @@ from nova import flags
from nova.virt import fake
from nova.virt import libvirt_conn
from nova.virt import xenapi_conn
from nova.virt import hyperv
FLAGS = flags.FLAGS
@ -62,6 +63,8 @@ def get_connection(read_only=False):
conn = libvirt_conn.get_connection(read_only)
elif t == 'xenapi':
conn = xenapi_conn.get_connection(read_only)
elif t == 'hyperv':
conn = hyperv.get_connection(read_only)
else:
raise Exception('Unknown connection type "%s"' % t)

459
nova/virt/hyperv.py Normal file
View File

@ -0,0 +1,459 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Cloud.com, Inc
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
A connection to Hyper-V .
Uses Windows Management Instrumentation (WMI) calls to interact with Hyper-V
Hyper-V WMI usage:
http://msdn.microsoft.com/en-us/library/cc723875%28v=VS.85%29.aspx
The Hyper-V object model briefly:
The physical computer and its hosted virtual machines are each represented
by the Msvm_ComputerSystem class.
Each virtual machine is associated with a
Msvm_VirtualSystemGlobalSettingData (vs_gs_data) instance and one or more
Msvm_VirtualSystemSettingData (vmsetting) instances. For each vmsetting
there is a series of Msvm_ResourceAllocationSettingData (rasd) objects.
The rasd objects describe the settings for each device in a VM.
Together, the vs_gs_data, vmsettings and rasds describe the configuration
of the virtual machine.
Creating new resources such as disks and nics involves cloning a default
rasd object and appropriately modifying the clone and calling the
AddVirtualSystemResources WMI method
Changing resources such as memory uses the ModifyVirtualSystemResources
WMI method
Using the Python WMI library:
Tutorial:
http://timgolden.me.uk/python/wmi/tutorial.html
Hyper-V WMI objects can be retrieved simply by using the class name
of the WMI object and optionally specifying a column to filter the
result set. More complex filters can be formed using WQL (sql-like)
queries.
The parameters and return tuples of WMI method calls can gleaned by
examining the doc string. For example:
>>> vs_man_svc.ModifyVirtualSystemResources.__doc__
ModifyVirtualSystemResources (ComputerSystem, ResourceSettingData[])
=> (Job, ReturnValue)'
When passing setting data (ResourceSettingData) to the WMI method,
an XML representation of the data is passed in using GetText_(1).
Available methods on a service can be determined using method.keys():
>>> vs_man_svc.methods.keys()
vmsettings and rasds for a vm can be retrieved using the 'associators'
method with the appropriate return class.
Long running WMI commands generally return a Job (an instance of
Msvm_ConcreteJob) whose state can be polled to determine when it finishes
"""
import os
import logging
import time
from nova import exception
from nova import flags
from nova.auth import manager
from nova.compute import power_state
from nova.virt import images
wmi = None
FLAGS = flags.FLAGS
HYPERV_POWER_STATE = {
3: power_state.SHUTDOWN,
2: power_state.RUNNING,
32768: power_state.PAUSED,
}
REQ_POWER_STATE = {
'Enabled': 2,
'Disabled': 3,
'Reboot': 10,
'Reset': 11,
'Paused': 32768,
'Suspended': 32769
}
WMI_JOB_STATUS_STARTED = 4096
WMI_JOB_STATE_RUNNING = 4
WMI_JOB_STATE_COMPLETED = 7
def get_connection(_):
global wmi
if wmi is None:
wmi = __import__('wmi')
return HyperVConnection()
class HyperVConnection(object):
def __init__(self):
self._conn = wmi.WMI(moniker='//./root/virtualization')
self._cim_conn = wmi.WMI(moniker='//./root/cimv2')
def init_host(self):
#FIXME(chiradeep): implement this
logging.debug(_('In init host'))
pass
def list_instances(self):
""" Return the names of all the instances known to Hyper-V. """
vms = [v.ElementName \
for v in self._conn.Msvm_ComputerSystem(['ElementName'])]
return vms
def spawn(self, instance):
""" Create a new VM and start it."""
vm = self._lookup(instance.name)
if vm is not None:
raise exception.Duplicate(_('Attempt to create duplicate vm %s') %
instance.name)
user = manager.AuthManager().get_user(instance['user_id'])
project = manager.AuthManager().get_project(instance['project_id'])
#Fetch the file, assume it is a VHD file.
base_vhd_filename = os.path.join(FLAGS.instances_path,
instance.name)
vhdfile = "%s.vhd" % (base_vhd_filename)
images.fetch(instance['image_id'], vhdfile, user, project)
try:
self._create_vm(instance)
self._create_disk(instance['name'], vhdfile)
self._create_nic(instance['name'], instance['mac_address'])
logging.debug(_('Starting VM %s '), instance.name)
self._set_vm_state(instance['name'], 'Enabled')
logging.info(_('Started VM %s '), instance.name)
except Exception as exn:
logging.error(_('spawn vm failed: %s'), exn)
self.destroy(instance)
def _create_vm(self, instance):
"""Create a VM but don't start it. """
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
vs_gs_data = self._conn.Msvm_VirtualSystemGlobalSettingData.new()
vs_gs_data.ElementName = instance['name']
(job, ret_val) = vs_man_svc.DefineVirtualSystem(
[], None, vs_gs_data.GetText_(1))[1:]
if ret_val == WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
else:
success = (ret_val == 0)
if not success:
raise Exception(_('Failed to create VM %s'), instance.name)
logging.debug(_('Created VM %s...'), instance.name)
vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
vmsettings = vm.associators(
wmi_result_class='Msvm_VirtualSystemSettingData')
vmsetting = [s for s in vmsettings
if s.SettingType == 3][0] # avoid snapshots
memsetting = vmsetting.associators(
wmi_result_class='Msvm_MemorySettingData')[0]
#No Dynamic Memory, so reservation, limit and quantity are identical.
mem = long(str(instance['memory_mb']))
memsetting.VirtualQuantity = mem
memsetting.Reservation = mem
memsetting.Limit = mem
(job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
vm.path_(), [memsetting.GetText_(1)])
logging.debug(_('Set memory for vm %s...'), instance.name)
procsetting = vmsetting.associators(
wmi_result_class='Msvm_ProcessorSettingData')[0]
vcpus = long(instance['vcpus'])
procsetting.VirtualQuantity = vcpus
procsetting.Reservation = vcpus
procsetting.Limit = vcpus
(job, ret_val) = vs_man_svc.ModifyVirtualSystemResources(
vm.path_(), [procsetting.GetText_(1)])
logging.debug(_('Set vcpus for vm %s...'), instance.name)
def _create_disk(self, vm_name, vhdfile):
"""Create a disk and attach it to the vm"""
logging.debug(_('Creating disk for %s by attaching disk file %s'),
vm_name, vhdfile)
#Find the IDE controller for the vm.
vms = self._conn.MSVM_ComputerSystem(ElementName=vm_name)
vm = vms[0]
vmsettings = vm.associators(
wmi_result_class='Msvm_VirtualSystemSettingData')
rasds = vmsettings[0].associators(
wmi_result_class='MSVM_ResourceAllocationSettingData')
ctrller = [r for r in rasds
if r.ResourceSubType == 'Microsoft Emulated IDE Controller'\
and r.Address == "0"]
#Find the default disk drive object for the vm and clone it.
diskdflt = self._conn.query(
"SELECT * FROM Msvm_ResourceAllocationSettingData \
WHERE ResourceSubType LIKE 'Microsoft Synthetic Disk Drive'\
AND InstanceID LIKE '%Default%'")[0]
diskdrive = self._clone_wmi_obj(
'Msvm_ResourceAllocationSettingData', diskdflt)
#Set the IDE ctrller as parent.
diskdrive.Parent = ctrller[0].path_()
diskdrive.Address = 0
#Add the cloned disk drive object to the vm.
new_resources = self._add_virt_resource(diskdrive, vm)
if new_resources is None:
raise Exception(_('Failed to add diskdrive to VM %s'),
vm_name)
diskdrive_path = new_resources[0]
logging.debug(_('New disk drive path is %s'), diskdrive_path)
#Find the default VHD disk object.
vhddefault = self._conn.query(
"SELECT * FROM Msvm_ResourceAllocationSettingData \
WHERE ResourceSubType LIKE 'Microsoft Virtual Hard Disk' AND \
InstanceID LIKE '%Default%' ")[0]
#Clone the default and point it to the image file.
vhddisk = self._clone_wmi_obj(
'Msvm_ResourceAllocationSettingData', vhddefault)
#Set the new drive as the parent.
vhddisk.Parent = diskdrive_path
vhddisk.Connection = [vhdfile]
#Add the new vhd object as a virtual hard disk to the vm.
new_resources = self._add_virt_resource(vhddisk, vm)
if new_resources is None:
raise Exception(_('Failed to add vhd file to VM %s'),
vm_name)
logging.info(_('Created disk for %s'), vm_name)
def _create_nic(self, vm_name, mac):
"""Create a (emulated) nic and attach it to the vm"""
logging.debug(_('Creating nic for %s '), vm_name)
#Find the vswitch that is connected to the physical nic.
vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
extswitch = self._find_external_network()
vm = vms[0]
switch_svc = self._conn.Msvm_VirtualSwitchManagementService()[0]
#Find the default nic and clone it to create a new nic for the vm.
#Use Msvm_SyntheticEthernetPortSettingData for Windows or Linux with
#Linux Integration Components installed.
emulatednics_data = self._conn.Msvm_EmulatedEthernetPortSettingData()
default_nic_data = [n for n in emulatednics_data
if n.InstanceID.rfind('Default') > 0]
new_nic_data = self._clone_wmi_obj(
'Msvm_EmulatedEthernetPortSettingData',
default_nic_data[0])
#Create a port on the vswitch.
(new_port, ret_val) = switch_svc.CreateSwitchPort(vm_name, vm_name,
"", extswitch.path_())
if ret_val != 0:
logging.error(_('Failed creating a port on the external vswitch'))
raise Exception(_('Failed creating port for %s'),
vm_name)
logging.debug(_("Created switch port %s on switch %s"),
vm_name, extswitch.path_())
#Connect the new nic to the new port.
new_nic_data.Connection = [new_port]
new_nic_data.ElementName = vm_name + ' nic'
new_nic_data.Address = ''.join(mac.split(':'))
new_nic_data.StaticMacAddress = 'TRUE'
#Add the new nic to the vm.
new_resources = self._add_virt_resource(new_nic_data, vm)
if new_resources is None:
raise Exception(_('Failed to add nic to VM %s'),
vm_name)
logging.info(_("Created nic for %s "), vm_name)
def _add_virt_resource(self, res_setting_data, target_vm):
"""Add a new resource (disk/nic) to the VM"""
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
(job, new_resources, ret_val) = vs_man_svc.\
AddVirtualSystemResources([res_setting_data.GetText_(1)],
target_vm.path_())
success = True
if ret_val == WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
else:
success = (ret_val == 0)
if success:
return new_resources
else:
return None
#TODO: use the reactor to poll instead of sleep
def _check_job_status(self, jobpath):
"""Poll WMI job state for completion"""
#Jobs have a path of the form:
#\\WIN-P5IG7367DAG\root\virtualization:Msvm_ConcreteJob.InstanceID=
#"8A496B9C-AF4D-4E98-BD3C-1128CD85320D"
inst_id = jobpath.split('=')[1].strip('"')
jobs = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)
if len(jobs) == 0:
return False
job = jobs[0]
while job.JobState == WMI_JOB_STATE_RUNNING:
time.sleep(0.1)
job = self._conn.Msvm_ConcreteJob(InstanceID=inst_id)[0]
if job.JobState != WMI_JOB_STATE_COMPLETED:
logging.debug(_("WMI job failed: %s"), job.ErrorSummaryDescription)
return False
logging.debug(_("WMI job succeeded: %s, Elapsed=%s "), job.Description,
job.ElapsedTime)
return True
def _find_external_network(self):
"""Find the vswitch that is connected to the physical nic.
Assumes only one physical nic on the host
"""
#If there are no physical nics connected to networks, return.
bound = self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')
if len(bound) == 0:
return None
return self._conn.Msvm_ExternalEthernetPort(IsBound='TRUE')[0]\
.associators(wmi_result_class='Msvm_SwitchLANEndpoint')[0]\
.associators(wmi_result_class='Msvm_SwitchPort')[0]\
.associators(wmi_result_class='Msvm_VirtualSwitch')[0]
def _clone_wmi_obj(self, wmi_class, wmi_obj):
"""Clone a WMI object"""
cl = self._conn.__getattr__(wmi_class) # get the class
newinst = cl.new()
#Copy the properties from the original.
for prop in wmi_obj._properties:
newinst.Properties_.Item(prop).Value =\
wmi_obj.Properties_.Item(prop).Value
return newinst
def reboot(self, instance):
"""Reboot the specified instance."""
vm = self._lookup(instance.name)
if vm is None:
raise exception.NotFound('instance not present %s' % instance.name)
self._set_vm_state(instance.name, 'Reboot')
def destroy(self, instance):
"""Destroy the VM. Also destroy the associated VHD disk files"""
logging.debug(_("Got request to destroy vm %s"), instance.name)
vm = self._lookup(instance.name)
if vm is None:
return
vm = self._conn.Msvm_ComputerSystem(ElementName=instance.name)[0]
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
#Stop the VM first.
self._set_vm_state(instance.name, 'Disabled')
vmsettings = vm.associators(
wmi_result_class='Msvm_VirtualSystemSettingData')
rasds = vmsettings[0].associators(
wmi_result_class='MSVM_ResourceAllocationSettingData')
disks = [r for r in rasds \
if r.ResourceSubType == 'Microsoft Virtual Hard Disk']
diskfiles = []
#Collect disk file information before destroying the VM.
for disk in disks:
diskfiles.extend([c for c in disk.Connection])
#Nuke the VM. Does not destroy disks.
(job, ret_val) = vs_man_svc.DestroyVirtualSystem(vm.path_())
if ret_val == WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
elif ret_val == 0:
success = True
if not success:
raise Exception(_('Failed to destroy vm %s') % instance.name)
#Delete associated vhd disk files.
for disk in diskfiles:
vhdfile = self._cim_conn.CIM_DataFile(Name=disk)
for vf in vhdfile:
vf.Delete()
logging.debug(_("Del: disk %s vm %s"), vhdfile, instance.name)
def get_info(self, instance_id):
"""Get information about the VM"""
vm = self._lookup(instance_id)
if vm is None:
raise exception.NotFound('instance not present %s' % instance_id)
vm = self._conn.Msvm_ComputerSystem(ElementName=instance_id)[0]
vs_man_svc = self._conn.Msvm_VirtualSystemManagementService()[0]
vmsettings = vm.associators(
wmi_result_class='Msvm_VirtualSystemSettingData')
settings_paths = [v.path_() for v in vmsettings]
#See http://msdn.microsoft.com/en-us/library/cc160706%28VS.85%29.aspx
summary_info = vs_man_svc.GetSummaryInformation(
[4, 100, 103, 105], settings_paths)[1]
info = summary_info[0]
logging.debug(_("Got Info for vm %s: state=%s, mem=%s, num_cpu=%s, \
cpu_time=%s"), instance_id,
str(HYPERV_POWER_STATE[info.EnabledState]),
str(info.MemoryUsage),
str(info.NumberOfProcessors),
str(info.UpTime))
return {'state': HYPERV_POWER_STATE[info.EnabledState],
'max_mem': info.MemoryUsage,
'mem': info.MemoryUsage,
'num_cpu': info.NumberOfProcessors,
'cpu_time': info.UpTime}
def _lookup(self, i):
vms = self._conn.Msvm_ComputerSystem(ElementName=i)
n = len(vms)
if n == 0:
return None
elif n > 1:
raise Exception(_('duplicate name found: %s') % i)
else:
return vms[0].ElementName
def _set_vm_state(self, vm_name, req_state):
"""Set the desired state of the VM"""
vms = self._conn.Msvm_ComputerSystem(ElementName=vm_name)
if len(vms) == 0:
return False
(job, ret_val) = vms[0].RequestStateChange(REQ_POWER_STATE[req_state])
success = False
if ret_val == WMI_JOB_STATUS_STARTED:
success = self._check_job_status(job)
elif ret_val == 0:
success = True
elif ret_val == 32775:
#Invalid state for current operation. Typically means it is
#already in the state requested
success = True
if success:
logging.info(_("Successfully changed vm state of %s to %s"),
vm_name, req_state)
else:
logging.error(_("Failed to change vm state of %s to %s"),
vm_name, req_state)
raise Exception(_("Failed to change vm state of %s to %s"),
vm_name, req_state)
def attach_volume(self, instance_name, device_path, mountpoint):
vm = self._lookup(instance_name)
if vm is None:
raise exception.NotFound('Cannot attach volume to missing %s vm' %
instance_name)
def detach_volume(self, instance_name, mountpoint):
vm = self._lookup(instance_name)
if vm is None:
raise exception.NotFound('Cannot detach volume from missing %s ' %
instance_name)

View File

@ -21,8 +21,12 @@
Handling of VM disk images.
"""
import logging
import os.path
import shutil
import sys
import time
import urllib2
import urlparse
from nova import flags
@ -45,6 +49,25 @@ def fetch(image, path, user, project):
return f(image, path, user, project)
def _fetch_image_no_curl(url, path, headers):
request = urllib2.Request(url)
for (k, v) in headers.iteritems():
request.add_header(k, v)
def urlretrieve(urlfile, fpath):
chunk = 1 * 1024 * 1024
f = open(fpath, "wb")
while 1:
data = urlfile.read(chunk)
if not data:
break
f.write(data)
urlopened = urllib2.urlopen(request)
urlretrieve(urlopened, path)
logging.debug(_("Finished retreving %s -- placed in %s"), url, path)
def _fetch_s3_image(image, path, user, project):
url = image_url(image)
@ -61,18 +84,24 @@ def _fetch_s3_image(image, path, user, project):
url_path)
headers['Authorization'] = 'AWS %s:%s' % (access, signature)
cmd = ['/usr/bin/curl', '--fail', '--silent', url]
for (k, v) in headers.iteritems():
cmd += ['-H', '"%s: %s"' % (k, v)]
if sys.platform.startswith('win'):
return _fetch_image_no_curl(url, path, headers)
else:
cmd = ['/usr/bin/curl', '--fail', '--silent', url]
for (k, v) in headers.iteritems():
cmd += ['-H', '%s: %s' % (k, v)]
cmd += ['-o', path]
cmd_out = ' '.join(cmd)
return utils.execute(cmd_out)
cmd += ['-o', path]
cmd_out = ' '.join(cmd)
return utils.execute(cmd_out)
def _fetch_local_image(image, path, user, project):
source = _image_path('%s/image' % image)
return utils.execute('cp %s %s' % (source, path))
source = _image_path(os.path.join(image, 'image'))
if sys.platform.startswith('win'):
return shutil.copy(source, path)
else:
return utils.execute('cp %s %s' % (source, path))
def _image_path(path):

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -18,6 +19,7 @@
Management class for VM-related functions (spawn, reboot, etc).
"""
import json
import logging
from nova import db
@ -36,7 +38,6 @@ class VMOps(object):
"""
Management class for VM-related tasks
"""
def __init__(self, session):
self.XenAPI = session.get_imported_xenapi()
self._session = session
@ -120,6 +121,20 @@ class VMOps(object):
timer.f = _wait_for_boot
return timer.start(interval=0.5, now=True)
def _get_vm_opaque_ref(self, instance_or_vm):
"""Refactored out the common code of many methods that receive either
a vm name or a vm instance, and want a vm instance in return.
"""
try:
instance_name = instance_or_vm.name
vm = VMHelper.lookup(self._session, instance_name)
except AttributeError:
# A vm opaque ref was passed
vm = instance_or_vm
if vm is None:
raise Exception(_('Instance not present %s') % instance_name)
return vm
def snapshot(self, instance, name):
""" Create snapshot from a running VM instance
@ -168,11 +183,7 @@ class VMOps(object):
def reboot(self, instance):
"""Reboot VM instance"""
instance_name = instance.name
vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
raise exception.NotFound(_('instance not'
' found %s') % instance_name)
vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.clean_reboot', vm)
self._session.wait_for_task(instance.id, task)
@ -215,27 +226,19 @@ class VMOps(object):
ret = None
try:
ret = self._session.wait_for_task(instance_id, task)
except XenAPI.Failure, exc:
except self.XenAPI.Failure, exc:
logging.warn(exc)
callback(ret)
def pause(self, instance, callback):
"""Pause VM instance"""
instance_name = instance.name
vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
raise exception.NotFound(_('Instance not'
' found %s') % instance_name)
vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.pause', vm)
self._wait_with_callback(instance.id, task, callback)
def unpause(self, instance, callback):
"""Unpause VM instance"""
instance_name = instance.name
vm = VMHelper.lookup(self._session, instance_name)
if vm is None:
raise exception.NotFound(_('Instance not'
' found %s') % instance_name)
vm = self._get_vm_opaque_ref(instance)
task = self._session.call_xenapi('Async.VM.unpause', vm)
self._wait_with_callback(instance.id, task, callback)
@ -270,10 +273,7 @@ class VMOps(object):
def get_diagnostics(self, instance):
"""Return data about VM diagnostics"""
vm = VMHelper.lookup(self._session, instance.name)
if vm is None:
raise exception.NotFound(_("Instance not found %s") %
instance.name)
vm = self._get_vm_opaque_ref(instance)
rec = self._session.get_xenapi().VM.get_record(vm)
return VMHelper.compile_diagnostics(self._session, rec)
@ -281,3 +281,175 @@ class VMOps(object):
"""Return snapshot of console"""
# TODO: implement this to fix pylint!
return 'FAKE CONSOLE OUTPUT of instance'
def list_from_xenstore(self, vm, path):
"""Runs the xenstore-ls command to get a listing of all records
from 'path' downward. Returns a dict with the sub-paths as keys,
and the value stored in those paths as values. If nothing is
found at that path, returns None.
"""
ret = self._make_xenstore_call('list_records', vm, path)
return json.loads(ret)
def read_from_xenstore(self, vm, path):
"""Returns the value stored in the xenstore record for the given VM
at the specified location. A XenAPIPlugin.PluginError will be raised
if any error is encountered in the read process.
"""
try:
ret = self._make_xenstore_call('read_record', vm, path,
{'ignore_missing_path': 'True'})
except self.XenAPI.Failure, e:
return None
ret = json.loads(ret)
if ret == "None":
# Can't marshall None over RPC calls.
return None
return ret
def write_to_xenstore(self, vm, path, value):
"""Writes the passed value to the xenstore record for the given VM
at the specified location. A XenAPIPlugin.PluginError will be raised
if any error is encountered in the write process.
"""
return self._make_xenstore_call('write_record', vm, path,
{'value': json.dumps(value)})
def clear_xenstore(self, vm, path):
"""Deletes the VM's xenstore record for the specified path.
If there is no such record, the request is ignored.
"""
self._make_xenstore_call('delete_record', vm, path)
def _make_xenstore_call(self, method, vm, path, addl_args={}):
"""Handles calls to the xenstore xenapi plugin."""
return self._make_plugin_call('xenstore.py', method=method, vm=vm,
path=path, addl_args=addl_args)
def _make_plugin_call(self, plugin, method, vm, path, addl_args={}):
"""Abstracts out the process of calling a method of a xenapi plugin.
Any errors raised by the plugin will in turn raise a RuntimeError here.
"""
vm = self._get_vm_opaque_ref(vm)
rec = self._session.get_xenapi().VM.get_record(vm)
args = {'dom_id': rec['domid'], 'path': path}
args.update(addl_args)
# If the 'testing_mode' attribute is set, add that to the args.
if getattr(self, 'testing_mode', False):
args['testing_mode'] = 'true'
try:
task = self._session.async_call_plugin(plugin, method, args)
ret = self._session.wait_for_task(0, task)
except self.XenAPI.Failure, e:
raise RuntimeError("%s" % e.details[-1])
return ret
def add_to_xenstore(self, vm, path, key, value):
"""Adds the passed key/value pair to the xenstore record for
the given VM at the specified location. A XenAPIPlugin.PluginError
will be raised if any error is encountered in the write process.
"""
current = self.read_from_xenstore(vm, path)
if not current:
# Nothing at that location
current = {key: value}
else:
current[key] = value
self.write_to_xenstore(vm, path, current)
def remove_from_xenstore(self, vm, path, key_or_keys):
"""Takes either a single key or a list of keys and removes
them from the xenstoreirecord data for the given VM.
If the key doesn't exist, the request is ignored.
"""
current = self.list_from_xenstore(vm, path)
if not current:
return
if isinstance(key_or_keys, basestring):
keys = [key_or_keys]
else:
keys = key_or_keys
keys.sort(lambda x, y: cmp(y.count('/'), x.count('/')))
for key in keys:
if path:
keypath = "%s/%s" % (path, key)
else:
keypath = key
self._make_xenstore_call('delete_record', vm, keypath)
########################################################################
###### The following methods interact with the xenstore parameter
###### record, not the live xenstore. They were created before I
###### knew the difference, and are left in here in case they prove
###### to be useful. They all have '_param' added to their method
###### names to distinguish them. (dabo)
########################################################################
def read_partial_from_param_xenstore(self, instance_or_vm, key_prefix):
"""Returns a dict of all the keys in the xenstore parameter record
for the given instance that begin with the key_prefix.
"""
data = self.read_from_param_xenstore(instance_or_vm)
badkeys = [k for k in data.keys()
if not k.startswith(key_prefix)]
for badkey in badkeys:
del data[badkey]
return data
def read_from_param_xenstore(self, instance_or_vm, keys=None):
"""Returns the xenstore parameter record data for the specified VM
instance as a dict. Accepts an optional key or list of keys; if a
value for 'keys' is passed, the returned dict is filtered to only
return the values for those keys.
"""
vm = self._get_vm_opaque_ref(instance_or_vm)
data = self._session.call_xenapi_request('VM.get_xenstore_data',
(vm, ))
ret = {}
if keys is None:
keys = data.keys()
elif isinstance(keys, basestring):
keys = [keys]
for key in keys:
raw = data.get(key)
if raw:
ret[key] = json.loads(raw)
else:
ret[key] = raw
return ret
def add_to_param_xenstore(self, instance_or_vm, key, val):
"""Takes a key/value pair and adds it to the xenstore parameter
record for the given vm instance. If the key exists in xenstore,
it is overwritten"""
vm = self._get_vm_opaque_ref(instance_or_vm)
self.remove_from_param_xenstore(instance_or_vm, key)
jsonval = json.dumps(val)
self._session.call_xenapi_request('VM.add_to_xenstore_data',
(vm, key, jsonval))
def write_to_param_xenstore(self, instance_or_vm, mapping):
"""Takes a dict and writes each key/value pair to the xenstore
parameter record for the given vm instance. Any existing data for
those keys is overwritten.
"""
for k, v in mapping.iteritems():
self.add_to_param_xenstore(instance_or_vm, k, v)
def remove_from_param_xenstore(self, instance_or_vm, key_or_keys):
"""Takes either a single key or a list of keys and removes
them from the xenstore parameter record data for the given VM.
If the key doesn't exist, the request is ignored.
"""
vm = self._get_vm_opaque_ref(instance_or_vm)
if isinstance(key_or_keys, basestring):
keys = [key_or_keys]
else:
keys = key_or_keys
for key in keys:
self._session.call_xenapi_request('VM.remove_from_xenstore_data',
(vm, key))
def clear_param_xenstore(self, instance_or_vm):
"""Removes all data from the xenstore parameter record for this VM."""
self.write_to_param_xenstore(instance_or_vm, {})
########################################################################

View File

@ -200,15 +200,19 @@ class VolumeHelper(HelperBase):
return -1
def _get_volume_id(path):
def _get_volume_id(path_or_id):
"""Retrieve the volume id from device_path"""
# If we have the ID and not a path, just return it.
if isinstance(path_or_id, int):
return path_or_id
# n must contain at least the volume_id
# /vol- is for remote volumes
# -vol- is for local volumes
# see compute/manager->setup_compute_volume
volume_id = path[path.find('/vol-') + 1:]
if volume_id == path:
volume_id = path[path.find('-vol-') + 1:].replace('--', '-')
volume_id = path_or_id[path_or_id.find('/vol-') + 1:]
if volume_id == path_or_id:
volume_id = path_or_id[path_or_id.find('-vol-') + 1:]
volume_id = volume_id.replace('--', '-')
return volume_id

View File

@ -1,6 +1,7 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright 2010 OpenStack LLC.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
@ -19,15 +20,15 @@ A connection to XenServer or Xen Cloud Platform.
The concurrency model for this class is as follows:
All XenAPI calls are on a thread (using t.i.t.deferToThread, via the decorator
deferredToThread). They are remote calls, and so may hang for the usual
reasons. They should not be allowed to block the reactor thread.
All XenAPI calls are on a green thread (using eventlet's "tpool"
thread pool). They are remote calls, and so may hang for the usual
reasons.
All long-running XenAPI calls (VM.start, VM.reboot, etc) are called async
(using XenAPI.VM.async_start etc). These return a task, which can then be
polled for completion. Polling is handled using reactor.callLater.
(using XenAPI.VM.async_start etc). These return a task, which can then be
polled for completion.
This combination of techniques means that we don't block the reactor thread at
This combination of techniques means that we don't block the main thread at
all, and at the same time we don't hold lots of threads waiting for
long-running operations.
@ -81,7 +82,7 @@ flags.DEFINE_string('xenapi_connection_password',
flags.DEFINE_float('xenapi_task_poll_interval',
0.5,
'The interval used for polling of remote tasks '
'(Async.VM.start, etc). Used only if '
'(Async.VM.start, etc). Used only if '
'connection_type=xenapi.')
flags.DEFINE_float('xenapi_vhd_coalesce_poll_interval',
5.0,
@ -213,6 +214,14 @@ class XenAPISession(object):
f = f.__getattr__(m)
return tpool.execute(f, *args)
def call_xenapi_request(self, method, *args):
"""Some interactions with dom0, such as interacting with xenstore's
param record, require using the xenapi_request method of the session
object. This wraps that call on a background thread.
"""
f = self._session.xenapi_request
return tpool.execute(f, method, *args)
def async_call_plugin(self, plugin, fn, args):
"""Call Async.host.call_plugin on a background thread."""
return tpool.execute(self._unwrap_plugin_exceptions,
@ -222,7 +231,6 @@ class XenAPISession(object):
def wait_for_task(self, id, task):
"""Return the result of the given task. The task is polled
until it completes."""
done = event.Event()
loop = utils.LoopingCall(self._poll_task, id, task, done)
loop.start(FLAGS.xenapi_task_poll_interval, now=True)
@ -235,7 +243,7 @@ class XenAPISession(object):
return self.XenAPI.Session(url)
def _poll_task(self, id, task, done):
"""Poll the given XenAPI task, and fire the given Deferred if we
"""Poll the given XenAPI task, and fire the given action if we
get a result."""
try:
name = self._session.xenapi.task.get_name_label(task)
@ -290,7 +298,7 @@ class XenAPISession(object):
def _parse_xmlrpc_value(val):
"""Parse the given value as if it were an XML-RPC value. This is
"""Parse the given value as if it were an XML-RPC value. This is
sometimes used as the format for the task.result field."""
if not val:
return val

View File

@ -16,16 +16,4 @@
# License for the specific language governing permissions and limitations
# under the License.
"""
:mod:`nova.volume` -- Nova Block Storage
=====================================================
.. automodule:: nova.volume
:platform: Unix
.. moduleauthor:: Jesse Andrews <jesse@ansolabs.com>
.. moduleauthor:: Devin Carlen <devin.carlen@gmail.com>
.. moduleauthor:: Vishvananda Ishaya <vishvananda@yahoo.com>
.. moduleauthor:: Joshua McKenty <joshua@cognition.ca>
.. moduleauthor:: Manish Singh <yosh@gimp.org>
.. moduleauthor:: Andy Smith <andy@anarkystic.com>
"""
from nova.volume.api import API

101
nova/volume/api.py Normal file
View File

@ -0,0 +1,101 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Handles all requests relating to volumes.
"""
import datetime
import logging
from nova import db
from nova import exception
from nova import flags
from nova import quota
from nova import rpc
from nova.db import base
FLAGS = flags.FLAGS
flags.DECLARE('storage_availability_zone', 'nova.volume.manager')
class API(base.Base):
"""API for interacting with the volume manager."""
def create(self, context, size, name, description):
if quota.allowed_volumes(context, 1, size) < 1:
logging.warn("Quota exceeeded for %s, tried to create %sG volume",
context.project_id, size)
raise quota.QuotaError("Volume quota exceeded. You cannot "
"create a volume of size %s" % size)
options = {
'size': size,
'user_id': context.user.id,
'project_id': context.project_id,
'availability_zone': FLAGS.storage_availability_zone,
'status': "creating",
'attach_status': "detached",
'display_name': name,
'display_description': description}
volume = self.db.volume_create(context, options)
rpc.cast(context,
FLAGS.scheduler_topic,
{"method": "create_volume",
"args": {"topic": FLAGS.volume_topic,
"volume_id": volume['id']}})
return volume
def delete(self, context, volume_id):
volume = self.get(context, volume_id)
if volume['status'] != "available":
raise exception.ApiError(_("Volume status must be available"))
now = datetime.datetime.utcnow()
self.db.volume_update(context, volume_id, {'status': 'deleting',
'terminated_at': now})
host = volume['host']
rpc.cast(context,
self.db.queue_get_for(context, FLAGS.volume_topic, host),
{"method": "delete_volume",
"args": {"volume_id": volume_id}})
def update(self, context, volume_id, fields):
self.db.volume_update(context, volume_id, fields)
def get(self, context, volume_id):
return self.db.volume_get(context, volume_id)
def get_all(self, context):
if context.user.is_admin():
return self.db.volume_get_all(context)
return self.db.volume_get_all_by_project(context, context.project_id)
def check_attach(self, context, volume_id):
volume = self.get(context, volume_id)
# TODO(vish): abstract status checking?
if volume['status'] != "available":
raise exception.ApiError(_("Volume status must be available"))
if volume['attach_status'] == "attached":
raise exception.ApiError(_("Volume is already attached"))
def check_detach(self, context, volume_id):
volume = self.get(context, volume_id)
# TODO(vish): abstract status checking?
if volume['status'] == "available":
raise exception.ApiError(_("Volume is already detached"))

View File

@ -45,6 +45,7 @@ class PluginError(Exception):
def __init__(self, *args):
Exception.__init__(self, *args)
class ArgumentError(PluginError):
"""Raised when required arguments are missing, argument values are invalid,
or incompatible arguments are given.
@ -67,6 +68,7 @@ def ignore_failure(func, *args, **kwargs):
ARGUMENT_PATTERN = re.compile(r'^[a-zA-Z0-9_:\.\-,]+$')
def validate_exists(args, key, default=None):
"""Validates that a string argument to a RPC method call is given, and
matches the shell-safe regex, with an optional default value in case it
@ -76,20 +78,24 @@ def validate_exists(args, key, default=None):
"""
if key in args:
if len(args[key]) == 0:
raise ArgumentError('Argument %r value %r is too short.' % (key, args[key]))
raise ArgumentError('Argument %r value %r is too short.' %
(key, args[key]))
if not ARGUMENT_PATTERN.match(args[key]):
raise ArgumentError('Argument %r value %r contains invalid characters.' % (key, args[key]))
raise ArgumentError('Argument %r value %r contains invalid '
'characters.' % (key, args[key]))
if args[key][0] == '-':
raise ArgumentError('Argument %r value %r starts with a hyphen.' % (key, args[key]))
raise ArgumentError('Argument %r value %r starts with a hyphen.'
% (key, args[key]))
return args[key]
elif default is not None:
return default
else:
raise ArgumentError('Argument %s is required.' % key)
def validate_bool(args, key, default=None):
"""Validates that a string argument to a RPC method call is a boolean string,
with an optional default value in case it does not exist.
"""Validates that a string argument to a RPC method call is a boolean
string, with an optional default value in case it does not exist.
Returns the python boolean value.
"""
@ -99,7 +105,9 @@ def validate_bool(args, key, default=None):
elif value.lower() == 'false':
return False
else:
raise ArgumentError("Argument %s may not take value %r. Valid values are ['true', 'false']." % (key, value))
raise ArgumentError("Argument %s may not take value %r. "
"Valid values are ['true', 'false']." % (key, value))
def exists(args, key):
"""Validates that a freeform string argument to a RPC method call is given.
@ -110,6 +118,7 @@ def exists(args, key):
else:
raise ArgumentError('Argument %s is required.' % key)
def optional(args, key):
"""If the given key is in args, return the corresponding value, otherwise
return None"""
@ -122,13 +131,14 @@ def get_this_host(session):
def get_domain_0(session):
this_host_ref = get_this_host(session)
expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"' % this_host_ref
expr = 'field "is_control_domain" = "true" and field "resident_on" = "%s"'
expr = expr % this_host_ref
return session.xenapi.VM.get_all_records_where(expr).keys()[0]
def create_vdi(session, sr_ref, name_label, virtual_size, read_only):
vdi_ref = session.xenapi.VDI.create(
{ 'name_label': name_label,
{'name_label': name_label,
'name_description': '',
'SR': sr_ref,
'virtual_size': str(virtual_size),
@ -138,7 +148,7 @@ def create_vdi(session, sr_ref, name_label, virtual_size, read_only):
'xenstore_data': {},
'other_config': {},
'sm_config': {},
'tags': [] })
'tags': []})
logging.debug('Created VDI %s (%s, %s, %s) on %s.', vdi_ref, name_label,
virtual_size, read_only, sr_ref)
return vdi_ref

View File

@ -0,0 +1,180 @@
#!/usr/bin/env python
# Copyright (c) 2010 Citrix Systems, Inc.
# Copyright 2010 OpenStack LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
# XenAPI plugin for reading/writing information to xenstore
#
try:
import json
except ImportError:
import simplejson as json
import subprocess
import XenAPIPlugin
import pluginlib_nova as pluginlib
pluginlib.configure_logging("xenstore")
def jsonify(fnc):
def wrapper(*args, **kwargs):
return json.dumps(fnc(*args, **kwargs))
return wrapper
@jsonify
def read_record(self, arg_dict):
"""Returns the value stored at the given path for the given dom_id.
These must be encoded as key/value pairs in arg_dict. You can
optinally include a key 'ignore_missing_path'; if this is present
and boolean True, attempting to read a non-existent path will return
the string 'None' instead of raising an exception.
"""
cmd = "xenstore-read /local/domain/%(dom_id)s/%(path)s" % arg_dict
try:
return _run_command(cmd).rstrip("\n")
except pluginlib.PluginError, e:
if arg_dict.get("ignore_missing_path", False):
cmd = "xenstore-exists /local/domain/%(dom_id)s/%(path)s; echo $?"
cmd = cmd % arg_dict
ret = _run_command(cmd).strip()
# If the path exists, the cmd should return "0"
if ret != "0":
# No such path, so ignore the error and return the
# string 'None', since None can't be marshalled
# over RPC.
return "None"
# Either we shouldn't ignore path errors, or another
# error was hit. Re-raise.
raise
@jsonify
def write_record(self, arg_dict):
"""Writes to xenstore at the specified path. If there is information
already stored in that location, it is overwritten. As in read_record,
the dom_id and path must be specified in the arg_dict; additionally,
you must specify a 'value' key, whose value must be a string. Typically,
you can json-ify more complex values and store the json output.
"""
cmd = "xenstore-write /local/domain/%(dom_id)s/%(path)s '%(value)s'"
cmd = cmd % arg_dict
_run_command(cmd)
return arg_dict["value"]
@jsonify
def list_records(self, arg_dict):
"""Returns all the stored data at or below the given path for the
given dom_id. The data is returned as a json-ified dict, with the
path as the key and the stored value as the value. If the path
doesn't exist, an empty dict is returned.
"""
cmd = "xenstore-ls /local/domain/%(dom_id)s/%(path)s" % arg_dict
cmd = cmd.rstrip("/")
try:
recs = _run_command(cmd)
except pluginlib.PluginError, e:
if "No such file or directory" in "%s" % e:
# Path doesn't exist.
return {}
return str(e)
raise
base_path = arg_dict["path"]
paths = _paths_from_ls(recs)
ret = {}
for path in paths:
if base_path:
arg_dict["path"] = "%s/%s" % (base_path, path)
else:
arg_dict["path"] = path
rec = read_record(self, arg_dict)
try:
val = json.loads(rec)
except ValueError:
val = rec
ret[path] = val
return ret
@jsonify
def delete_record(self, arg_dict):
"""Just like it sounds: it removes the record for the specified
VM and the specified path from xenstore.
"""
cmd = "xenstore-rm /local/domain/%(dom_id)s/%(path)s" % arg_dict
return _run_command(cmd)
def _paths_from_ls(recs):
"""The xenstore-ls command returns a listing that isn't terribly
useful. This method cleans that up into a dict with each path
as the key, and the associated string as the value.
"""
ret = {}
last_nm = ""
level = 0
path = []
ret = []
for ln in recs.splitlines():
nm, val = ln.rstrip().split(" = ")
barename = nm.lstrip()
this_level = len(nm) - len(barename)
if this_level == 0:
ret.append(barename)
level = 0
path = []
elif this_level == level:
# child of same parent
ret.append("%s/%s" % ("/".join(path), barename))
elif this_level > level:
path.append(last_nm)
ret.append("%s/%s" % ("/".join(path), barename))
level = this_level
elif this_level < level:
path = path[:this_level]
ret.append("%s/%s" % ("/".join(path), barename))
level = this_level
last_nm = barename
return ret
def _run_command(cmd):
"""Abstracts out the basics of issuing system commands. If the command
returns anything in stderr, a PluginError is raised with that information.
Otherwise, the output from stdout is returned.
"""
pipe = subprocess.PIPE
proc = subprocess.Popen([cmd], shell=True, stdin=pipe, stdout=pipe,
stderr=pipe, close_fds=True)
proc.wait()
err = proc.stderr.read()
if err:
raise pluginlib.PluginError(err)
return proc.stdout.read()
if __name__ == "__main__":
XenAPIPlugin.dispatch(
{"read_record": read_record,
"write_record": write_record,
"list_records": list_records,
"delete_record": delete_record})