Replace objectstore images with S3 image service backending to glance or local
This commit is contained in:
@@ -81,7 +81,7 @@ from nova import log as logging
|
||||
from nova import quota
|
||||
from nova import rpc
|
||||
from nova import utils
|
||||
from nova.api.ec2.cloud import ec2_id_to_id
|
||||
from nova.api.ec2.ec2utils import ec2_id_to_id
|
||||
from nova.auth import manager
|
||||
from nova.cloudpipe import pipelib
|
||||
from nova.compute import instance_types
|
||||
|
||||
@@ -39,7 +39,9 @@ from nova import log as logging
|
||||
from nova import network
|
||||
from nova import utils
|
||||
from nova import volume
|
||||
from nova.api.ec2 import ec2utils
|
||||
from nova.compute import instance_types
|
||||
from nova.image import s3
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
@@ -73,30 +75,19 @@ def _gen_key(context, user_id, key_name):
|
||||
return {'private_key': private_key, 'fingerprint': fingerprint}
|
||||
|
||||
|
||||
def ec2_id_to_id(ec2_id):
|
||||
"""Convert an ec2 ID (i-[base 16 number]) to an instance id (int)"""
|
||||
return int(ec2_id.split('-')[-1], 16)
|
||||
|
||||
|
||||
def id_to_ec2_id(instance_id, template='i-%08x'):
|
||||
"""Convert an instance ID (int) to an ec2 ID (i-[base 16 number])"""
|
||||
return template % instance_id
|
||||
|
||||
|
||||
class CloudController(object):
|
||||
""" CloudController provides the critical dispatch between
|
||||
inbound API calls through the endpoint and messages
|
||||
sent to the other nodes.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.image_service = utils.import_object(FLAGS.image_service)
|
||||
self.image_service = s3.S3ImageService()
|
||||
self.network_api = network.API()
|
||||
self.volume_api = volume.API()
|
||||
self.compute_api = compute.API(
|
||||
network_api=self.network_api,
|
||||
image_service=self.image_service,
|
||||
volume_api=self.volume_api,
|
||||
hostname_factory=id_to_ec2_id)
|
||||
hostname_factory=ec2utils.id_to_ec2_id)
|
||||
self.setup()
|
||||
|
||||
def __str__(self):
|
||||
@@ -154,7 +145,7 @@ class CloudController(object):
|
||||
availability_zone = self._get_availability_zone_by_host(ctxt, host)
|
||||
floating_ip = db.instance_get_floating_address(ctxt,
|
||||
instance_ref['id'])
|
||||
ec2_id = id_to_ec2_id(instance_ref['id'])
|
||||
ec2_id = ec2utils.id_to_ec2_id(instance_ref['id'])
|
||||
data = {
|
||||
'user-data': base64.b64decode(instance_ref['user_data']),
|
||||
'meta-data': {
|
||||
@@ -525,7 +516,7 @@ class CloudController(object):
|
||||
ec2_id = instance_id[0]
|
||||
else:
|
||||
ec2_id = instance_id
|
||||
instance_id = ec2_id_to_id(ec2_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
output = self.compute_api.get_console_output(
|
||||
context, instance_id=instance_id)
|
||||
now = datetime.datetime.utcnow()
|
||||
@@ -535,7 +526,7 @@ class CloudController(object):
|
||||
|
||||
def get_ajax_console(self, context, instance_id, **kwargs):
|
||||
ec2_id = instance_id[0]
|
||||
instance_id = ec2_id_to_id(ec2_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
return self.compute_api.get_ajax_console(context,
|
||||
instance_id=instance_id)
|
||||
|
||||
@@ -543,7 +534,7 @@ class CloudController(object):
|
||||
if volume_id:
|
||||
volumes = []
|
||||
for ec2_id in volume_id:
|
||||
internal_id = ec2_id_to_id(ec2_id)
|
||||
internal_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
volume = self.volume_api.get(context, internal_id)
|
||||
volumes.append(volume)
|
||||
else:
|
||||
@@ -556,11 +547,11 @@ class CloudController(object):
|
||||
instance_data = None
|
||||
if volume.get('instance', None):
|
||||
instance_id = volume['instance']['id']
|
||||
instance_ec2_id = id_to_ec2_id(instance_id)
|
||||
instance_ec2_id = ec2utils.id_to_ec2_id(instance_id)
|
||||
instance_data = '%s[%s]' % (instance_ec2_id,
|
||||
volume['instance']['host'])
|
||||
v = {}
|
||||
v['volumeId'] = id_to_ec2_id(volume['id'], 'vol-%08x')
|
||||
v['volumeId'] = ec2utils.id_to_ec2_id(volume['id'], 'vol-%08x')
|
||||
v['status'] = volume['status']
|
||||
v['size'] = volume['size']
|
||||
v['availabilityZone'] = volume['availability_zone']
|
||||
@@ -578,8 +569,7 @@ class CloudController(object):
|
||||
'device': volume['mountpoint'],
|
||||
'instanceId': instance_ec2_id,
|
||||
'status': 'attached',
|
||||
'volumeId': id_to_ec2_id(volume['id'],
|
||||
'vol-%08x')}]
|
||||
'volumeId': v['volumeId']}]
|
||||
else:
|
||||
v['attachmentSet'] = [{}]
|
||||
|
||||
@@ -598,12 +588,12 @@ class CloudController(object):
|
||||
return {'volumeSet': [self._format_volume(context, dict(volume))]}
|
||||
|
||||
def delete_volume(self, context, volume_id, **kwargs):
|
||||
volume_id = ec2_id_to_id(volume_id)
|
||||
volume_id = ec2utils.ec2_id_to_id(volume_id)
|
||||
self.volume_api.delete(context, volume_id=volume_id)
|
||||
return True
|
||||
|
||||
def update_volume(self, context, volume_id, **kwargs):
|
||||
volume_id = ec2_id_to_id(volume_id)
|
||||
volume_id = ec2utils.ec2_id_to_id(volume_id)
|
||||
updatable_fields = ['display_name', 'display_description']
|
||||
changes = {}
|
||||
for field in updatable_fields:
|
||||
@@ -614,8 +604,8 @@ class CloudController(object):
|
||||
return True
|
||||
|
||||
def attach_volume(self, context, volume_id, instance_id, device, **kwargs):
|
||||
volume_id = ec2_id_to_id(volume_id)
|
||||
instance_id = ec2_id_to_id(instance_id)
|
||||
volume_id = ec2utils.ec2_id_to_id(volume_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(instance_id)
|
||||
msg = _("Attach volume %(volume_id)s to instance %(instance_id)s"
|
||||
" at %(device)s") % locals()
|
||||
LOG.audit(msg, context=context)
|
||||
@@ -626,22 +616,22 @@ class CloudController(object):
|
||||
volume = self.volume_api.get(context, volume_id)
|
||||
return {'attachTime': volume['attach_time'],
|
||||
'device': volume['mountpoint'],
|
||||
'instanceId': id_to_ec2_id(instance_id),
|
||||
'instanceId': ec2utils.id_to_ec2_id(instance_id),
|
||||
'requestId': context.request_id,
|
||||
'status': volume['attach_status'],
|
||||
'volumeId': id_to_ec2_id(volume_id, 'vol-%08x')}
|
||||
'volumeId': ec2utils.id_to_ec2_id(volume_id, 'vol-%08x')}
|
||||
|
||||
def detach_volume(self, context, volume_id, **kwargs):
|
||||
volume_id = ec2_id_to_id(volume_id)
|
||||
volume_id = ec2utils.ec2_id_to_id(volume_id)
|
||||
LOG.audit(_("Detach volume %s"), volume_id, context=context)
|
||||
volume = self.volume_api.get(context, volume_id)
|
||||
instance = self.compute_api.detach_volume(context, volume_id=volume_id)
|
||||
return {'attachTime': volume['attach_time'],
|
||||
'device': volume['mountpoint'],
|
||||
'instanceId': id_to_ec2_id(instance['id']),
|
||||
'instanceId': ec2utils.id_to_ec2_id(instance['id']),
|
||||
'requestId': context.request_id,
|
||||
'status': volume['attach_status'],
|
||||
'volumeId': id_to_ec2_id(volume_id, 'vol-%08x')}
|
||||
'volumeId': ec2utils.id_to_ec2_id(volume_id, 'vol-%08x')}
|
||||
|
||||
def _convert_to_set(self, lst, label):
|
||||
if lst == None or lst == []:
|
||||
@@ -675,7 +665,7 @@ class CloudController(object):
|
||||
if instance_id:
|
||||
instances = []
|
||||
for ec2_id in instance_id:
|
||||
internal_id = ec2_id_to_id(ec2_id)
|
||||
internal_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
instance = self.compute_api.get(context,
|
||||
instance_id=internal_id)
|
||||
instances.append(instance)
|
||||
@@ -687,7 +677,7 @@ class CloudController(object):
|
||||
continue
|
||||
i = {}
|
||||
instance_id = instance['id']
|
||||
ec2_id = id_to_ec2_id(instance_id)
|
||||
ec2_id = ec2utils.id_to_ec2_id(instance_id)
|
||||
i['instanceId'] = ec2_id
|
||||
i['imageId'] = instance['image_id']
|
||||
i['instanceState'] = {
|
||||
@@ -755,7 +745,7 @@ class CloudController(object):
|
||||
if (floating_ip_ref['fixed_ip']
|
||||
and floating_ip_ref['fixed_ip']['instance']):
|
||||
instance_id = floating_ip_ref['fixed_ip']['instance']['id']
|
||||
ec2_id = id_to_ec2_id(instance_id)
|
||||
ec2_id = ec2utils.id_to_ec2_id(instance_id)
|
||||
address_rv = {'public_ip': address,
|
||||
'instance_id': ec2_id}
|
||||
if context.is_admin:
|
||||
@@ -778,7 +768,7 @@ class CloudController(object):
|
||||
def associate_address(self, context, instance_id, public_ip, **kwargs):
|
||||
LOG.audit(_("Associate address %(public_ip)s to"
|
||||
" instance %(instance_id)s") % locals(), context=context)
|
||||
instance_id = ec2_id_to_id(instance_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(instance_id)
|
||||
self.compute_api.associate_floating_ip(context,
|
||||
instance_id=instance_id,
|
||||
address=public_ip)
|
||||
@@ -791,13 +781,17 @@ class CloudController(object):
|
||||
|
||||
def run_instances(self, context, **kwargs):
|
||||
max_count = int(kwargs.get('max_count', 1))
|
||||
if kwargs.get('kernel_id'):
|
||||
kwargs['kernel_id'] = ec2utils.ec2_id_to_id(kwargs['kernel_id'])
|
||||
if kwargs.get('ramdisk_id'):
|
||||
kwargs['ramdisk_id'] = ec2utils.ec2_id_to_id(kwargs['ramdisk_id'])
|
||||
instances = self.compute_api.create(context,
|
||||
instance_type=instance_types.get_by_type(
|
||||
kwargs.get('instance_type', None)),
|
||||
image_id=kwargs['image_id'],
|
||||
image_id=ec2utils.ec2_id_to_id(kwargs['image_id']),
|
||||
min_count=int(kwargs.get('min_count', max_count)),
|
||||
max_count=max_count,
|
||||
kernel_id=kwargs.get('kernel_id', None),
|
||||
kernel_id=kwargs.get('kernel_id'),
|
||||
ramdisk_id=kwargs.get('ramdisk_id'),
|
||||
display_name=kwargs.get('display_name'),
|
||||
display_description=kwargs.get('display_description'),
|
||||
@@ -814,7 +808,7 @@ class CloudController(object):
|
||||
instance_id is a kwarg so its name cannot be modified."""
|
||||
LOG.debug(_("Going to start terminating instances"))
|
||||
for ec2_id in instance_id:
|
||||
instance_id = ec2_id_to_id(ec2_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
self.compute_api.delete(context, instance_id=instance_id)
|
||||
return True
|
||||
|
||||
@@ -822,19 +816,19 @@ class CloudController(object):
|
||||
"""instance_id is a list of instance ids"""
|
||||
LOG.audit(_("Reboot instance %r"), instance_id, context=context)
|
||||
for ec2_id in instance_id:
|
||||
instance_id = ec2_id_to_id(ec2_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
self.compute_api.reboot(context, instance_id=instance_id)
|
||||
return True
|
||||
|
||||
def rescue_instance(self, context, instance_id, **kwargs):
|
||||
"""This is an extension to the normal ec2_api"""
|
||||
instance_id = ec2_id_to_id(instance_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(instance_id)
|
||||
self.compute_api.rescue(context, instance_id=instance_id)
|
||||
return True
|
||||
|
||||
def unrescue_instance(self, context, instance_id, **kwargs):
|
||||
"""This is an extension to the normal ec2_api"""
|
||||
instance_id = ec2_id_to_id(instance_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(instance_id)
|
||||
self.compute_api.unrescue(context, instance_id=instance_id)
|
||||
return True
|
||||
|
||||
@@ -845,41 +839,50 @@ class CloudController(object):
|
||||
if field in kwargs:
|
||||
changes[field] = kwargs[field]
|
||||
if changes:
|
||||
instance_id = ec2_id_to_id(ec2_id)
|
||||
instance_id = ec2utils.ec2_id_to_id(ec2_id)
|
||||
self.compute_api.update(context, instance_id=instance_id, **kwargs)
|
||||
return True
|
||||
|
||||
def _format_image(self, context, image):
|
||||
def _format_image(self, image):
|
||||
"""Convert from format defined by BaseImageService to S3 format."""
|
||||
i = {}
|
||||
i['imageId'] = image.get('id')
|
||||
i['kernelId'] = image.get('kernel_id')
|
||||
i['ramdiskId'] = image.get('ramdisk_id')
|
||||
i['imageOwnerId'] = image.get('owner_id')
|
||||
i['imageLocation'] = image.get('location')
|
||||
i['imageState'] = image.get('status')
|
||||
i['kernelId'] = image['properties'].get('kernel_id')
|
||||
i['ramdiskId'] = image['properties'].get('ramdisk_id')
|
||||
i['imageOwnerId'] = image['properties'].get('owner_id')
|
||||
i['imageLocation'] = image['properties'].get('image_location')
|
||||
i['imageState'] = image['properties'].get('image_state')
|
||||
i['type'] = image.get('type')
|
||||
i['isPublic'] = image.get('is_public')
|
||||
i['architecture'] = image.get('architecture')
|
||||
i['isPublic'] = image['properties'].get('is_public') == 'True'
|
||||
i['architecture'] = image['properties'].get('architecture')
|
||||
return i
|
||||
|
||||
def describe_images(self, context, image_id=None, **kwargs):
|
||||
# NOTE: image_id is a list!
|
||||
images = self.image_service.index(context)
|
||||
if image_id:
|
||||
images = filter(lambda x: x['id'] in image_id, images)
|
||||
images = [self._format_image(context, i) for i in images]
|
||||
images = []
|
||||
for ec2_id in image_id:
|
||||
try:
|
||||
image = self.image_service.show(context, ec2_id)
|
||||
except exception.NotFound:
|
||||
raise exception.NotFound(_('Image %s not found') %
|
||||
ec2_id)
|
||||
images.append(image)
|
||||
else:
|
||||
images = self.image_service.detail(context)
|
||||
images = [self._format_image(i) for i in images]
|
||||
return {'imagesSet': images}
|
||||
|
||||
def deregister_image(self, context, image_id, **kwargs):
|
||||
LOG.audit(_("De-registering image %s"), image_id, context=context)
|
||||
self.image_service.deregister(context, image_id)
|
||||
self.image_service.delete(context, image_id)
|
||||
return {'imageId': image_id}
|
||||
|
||||
def register_image(self, context, image_location=None, **kwargs):
|
||||
if image_location is None and 'name' in kwargs:
|
||||
image_location = kwargs['name']
|
||||
image_id = self.image_service.register(context, image_location)
|
||||
image = {"image_location": image_location}
|
||||
image_id = self.image_service.create(context, image)
|
||||
msg = _("Registered image %(image_location)s with"
|
||||
" id %(image_id)s") % locals()
|
||||
LOG.audit(msg, context=context)
|
||||
@@ -890,11 +893,10 @@ class CloudController(object):
|
||||
raise exception.ApiError(_('attribute not supported: %s')
|
||||
% attribute)
|
||||
try:
|
||||
image = self._format_image(context,
|
||||
self.image_service.show(context,
|
||||
image = self._format_image(self.image_service.show(context,
|
||||
image_id))
|
||||
except IndexError:
|
||||
raise exception.ApiError(_('invalid id: %s') % image_id)
|
||||
except (IndexError, exception.NotFound):
|
||||
raise exception.NotFound(_('Image %s not found') % image_id)
|
||||
result = {'image_id': image_id, 'launchPermission': []}
|
||||
if image['isPublic']:
|
||||
result['launchPermission'].append({'group': 'all'})
|
||||
@@ -913,7 +915,14 @@ class CloudController(object):
|
||||
if not operation_type in ['add', 'remove']:
|
||||
raise exception.ApiError(_('operation_type must be add or remove'))
|
||||
LOG.audit(_("Updating image %s publicity"), image_id, context=context)
|
||||
return self.image_service.modify(context, image_id, operation_type)
|
||||
|
||||
try:
|
||||
metadata = self.image_service.show(context, image_id)
|
||||
except exception.NotFound:
|
||||
raise exception.NotFound(_('Image %s not found') % image_id)
|
||||
del(metadata['id'])
|
||||
metadata['properties']['is_public'] = (operation_type == 'add')
|
||||
return self.image_service.update(context, image_id, metadata)
|
||||
|
||||
def update_image(self, context, image_id, **kwargs):
|
||||
result = self.image_service.update(context, image_id, dict(kwargs))
|
||||
|
||||
@@ -346,7 +346,7 @@ DEFINE_string('scheduler_manager', 'nova.scheduler.manager.SchedulerManager',
|
||||
'Manager for scheduler')
|
||||
|
||||
# The service to use for image search and retrieval
|
||||
DEFINE_string('image_service', 'nova.image.s3.S3ImageService',
|
||||
DEFINE_string('image_service', 'nova.image.glance.GlanceImageService',
|
||||
'The service to use for retrieving and searching for images.')
|
||||
|
||||
DEFINE_string('host', socket.gethostname(),
|
||||
|
||||
@@ -21,6 +21,8 @@ import httplib
|
||||
import json
|
||||
import urlparse
|
||||
|
||||
from glance.common import exception as glance_exception
|
||||
|
||||
from nova import exception
|
||||
from nova import flags
|
||||
from nova import log as logging
|
||||
@@ -57,27 +59,32 @@ class GlanceImageService(service.BaseImageService):
|
||||
"""
|
||||
Returns a dict containing image data for the given opaque image id.
|
||||
"""
|
||||
image = self.client.get_image_meta(id)
|
||||
if image:
|
||||
return image
|
||||
raise exception.NotFound
|
||||
try:
|
||||
image = self.client.get_image_meta(id)
|
||||
except glance_exception.NotFound:
|
||||
raise exception.NotFound
|
||||
return image
|
||||
|
||||
def create(self, context, data):
|
||||
def create(self, context, metadata, data=None):
|
||||
"""
|
||||
Store the image data and return the new image id.
|
||||
|
||||
:raises AlreadyExists if the image already exist.
|
||||
|
||||
"""
|
||||
return self.client.add_image(image_meta=data)
|
||||
return self.client.add_image(metadata, data)
|
||||
|
||||
def update(self, context, image_id, data):
|
||||
def update(self, context, image_id, metadata, data=None):
|
||||
"""Replace the contents of the given image with the new data.
|
||||
|
||||
:raises NotFound if the image does not exist.
|
||||
|
||||
"""
|
||||
return self.client.update_image(image_id, data)
|
||||
try:
|
||||
result = self.client.update_image(image_id, metadata, data)
|
||||
except glance_exception.NotFound:
|
||||
raise exception.NotFound
|
||||
return result
|
||||
|
||||
def delete(self, context, image_id):
|
||||
"""
|
||||
@@ -86,7 +93,11 @@ class GlanceImageService(service.BaseImageService):
|
||||
:raises NotFound if the image does not exist.
|
||||
|
||||
"""
|
||||
return self.client.delete_image(image_id)
|
||||
try:
|
||||
result = self.client.delete_image(image_id)
|
||||
except glance_exception.NotFound:
|
||||
raise exception.NotFound
|
||||
return result
|
||||
|
||||
def delete_all(self):
|
||||
"""
|
||||
|
||||
282
nova/image/s3.py
282
nova/image/s3.py
@@ -21,8 +21,12 @@ Proxy AMI-related calls from the cloud controller, to the running
|
||||
objectstore service.
|
||||
"""
|
||||
|
||||
import json
|
||||
import urllib
|
||||
import binascii
|
||||
import os
|
||||
import shutil
|
||||
import tarfile
|
||||
import tempfile
|
||||
from xml.etree import ElementTree
|
||||
|
||||
import boto.s3.connection
|
||||
|
||||
@@ -31,84 +35,89 @@ from nova import flags
|
||||
from nova import utils
|
||||
from nova.auth import manager
|
||||
from nova.image import service
|
||||
from nova.api.ec2 import ec2utils
|
||||
|
||||
|
||||
FLAGS = flags.FLAGS
|
||||
flags.DEFINE_string('image_decryption_dir', '/tmp',
|
||||
'parent dir for tempdir used for image decryption')
|
||||
|
||||
|
||||
def map_s3_to_base(image):
|
||||
"""Convert from S3 format to format defined by BaseImageService."""
|
||||
i = {}
|
||||
i['id'] = image.get('imageId')
|
||||
i['name'] = image.get('imageId')
|
||||
i['kernel_id'] = image.get('kernelId')
|
||||
i['ramdisk_id'] = image.get('ramdiskId')
|
||||
i['location'] = image.get('imageLocation')
|
||||
i['owner_id'] = image.get('imageOwnerId')
|
||||
i['status'] = image.get('imageState')
|
||||
i['type'] = image.get('type')
|
||||
i['is_public'] = image.get('isPublic')
|
||||
i['architecture'] = image.get('architecture')
|
||||
return i
|
||||
_type_prefix_map = {'machine': 'ami',
|
||||
'kernel': 'aki',
|
||||
'ramdisk': 'ari'}
|
||||
|
||||
|
||||
def image_ec2_id(image_id, image_type):
|
||||
prefix = _type_prefix_map[image_type]
|
||||
template = prefix + '-%08x'
|
||||
return ec2utils.id_to_ec2_id(int(image_id), template=template)
|
||||
|
||||
|
||||
class S3ImageService(service.BaseImageService):
|
||||
def __init__(self, service=None, *args, **kwargs):
|
||||
if service == None:
|
||||
service = utils.import_object(FLAGS.image_service)
|
||||
self.service = service
|
||||
self.service.__init__(*args, **kwargs)
|
||||
|
||||
def modify(self, context, image_id, operation):
|
||||
self._conn(context).make_request(
|
||||
method='POST',
|
||||
bucket='_images',
|
||||
query_args=self._qs({'image_id': image_id,
|
||||
'operation': operation}))
|
||||
return True
|
||||
def create(self, context, properties, data=None):
|
||||
"""image should contain image_location"""
|
||||
image_id, metadata = self._s3_create(context, properties)
|
||||
return image_ec2_id(image_id, metadata['type'])
|
||||
|
||||
def update(self, context, image_id, attributes):
|
||||
"""update an image's attributes / info.json"""
|
||||
attributes.update({"image_id": image_id})
|
||||
self._conn(context).make_request(
|
||||
method='POST',
|
||||
bucket='_images',
|
||||
query_args=self._qs(attributes))
|
||||
return True
|
||||
def delete(self, context, image_id):
|
||||
# FIXME(vish): call to show is to check filter
|
||||
self.show(context, image_id)
|
||||
image_id = ec2utils.ec2_id_to_id(image_id)
|
||||
self.service.delete(context, image_id)
|
||||
|
||||
def register(self, context, image_location):
|
||||
""" rpc call to register a new image based from a manifest """
|
||||
image_id = utils.generate_uid('ami')
|
||||
self._conn(context).make_request(
|
||||
method='PUT',
|
||||
bucket='_images',
|
||||
query_args=self._qs({'image_location': image_location,
|
||||
'image_id': image_id}))
|
||||
return image_id
|
||||
|
||||
def index(self, context):
|
||||
"""Return a list of all images that a user can see."""
|
||||
response = self._conn(context).make_request(
|
||||
method='GET',
|
||||
bucket='_images')
|
||||
images = json.loads(response.read())
|
||||
return [map_s3_to_base(i) for i in images]
|
||||
|
||||
def show(self, context, image_id):
|
||||
"""return a image object if the context has permissions"""
|
||||
if FLAGS.connection_type == 'fake':
|
||||
return {'imageId': 'bar'}
|
||||
result = self.index(context)
|
||||
result = [i for i in result if i['id'] == image_id]
|
||||
if not result:
|
||||
raise exception.NotFound(_('Image %s could not be found')
|
||||
% image_id)
|
||||
image = result[0]
|
||||
def update(self, context, image_id, metadata, data=None):
|
||||
# FIXME(vish): call to show is to check filter
|
||||
self.show(context, image_id)
|
||||
image_id = ec2utils.ec2_id_to_id(image_id)
|
||||
image = self.service.update(context, image_id, metadata, data)
|
||||
image['id'] = image_ec2_id(image['id'], image['type'])
|
||||
return image
|
||||
|
||||
def deregister(self, context, image_id):
|
||||
""" unregister an image """
|
||||
self._conn(context).make_request(
|
||||
method='DELETE',
|
||||
bucket='_images',
|
||||
query_args=self._qs({'image_id': image_id}))
|
||||
def index(self, context):
|
||||
images = self.service.index(context)
|
||||
# FIXME(vish): index doesn't filter so we do it manually
|
||||
return self._filter(context, images)
|
||||
|
||||
def _conn(self, context):
|
||||
def detail(self, context):
|
||||
images = self.service.detail(context)
|
||||
# FIXME(vish): detail doesn't filter so we do it manually
|
||||
return self._filter(context, images)
|
||||
|
||||
@staticmethod
|
||||
def _is_visible(context, image):
|
||||
return (context.is_admin
|
||||
or context.project_id == image['properties']['owner_id']
|
||||
or image['properties']['is_public'] == 'True')
|
||||
|
||||
@staticmethod
|
||||
def _filter(context, images):
|
||||
filtered = []
|
||||
for image in images:
|
||||
if not S3ImageService._is_visible(context, image):
|
||||
continue
|
||||
image['id'] = image_ec2_id(image['id'], image['type'])
|
||||
filtered.append(image)
|
||||
return filtered
|
||||
|
||||
def show(self, context, image_id):
|
||||
image_id = ec2utils.ec2_id_to_id(image_id)
|
||||
image = self.service.show(context, image_id)
|
||||
if not self._is_visible(context, image):
|
||||
raise exception.NotFound
|
||||
image['id'] = image_ec2_id(image['id'], image['type'])
|
||||
return image
|
||||
|
||||
@staticmethod
|
||||
def _conn(context):
|
||||
# TODO(vish): is there a better way to get creds to sign
|
||||
# for the user?
|
||||
access = manager.AuthManager().get_access_key(context.user,
|
||||
context.project)
|
||||
secret = str(context.user.secret)
|
||||
@@ -120,8 +129,139 @@ class S3ImageService(service.BaseImageService):
|
||||
port=FLAGS.s3_port,
|
||||
host=FLAGS.s3_host)
|
||||
|
||||
def _qs(self, params):
|
||||
pairs = []
|
||||
for key in params.keys():
|
||||
pairs.append(key + '=' + urllib.quote(params[key]))
|
||||
return '&'.join(pairs)
|
||||
@staticmethod
|
||||
def _download_file(bucket, filename, local_dir):
|
||||
key = bucket.get_key(filename)
|
||||
local_filename = os.path.join(local_dir, filename)
|
||||
key.get_contents_to_filename(local_filename)
|
||||
return local_filename
|
||||
|
||||
def _s3_create(self, context, properties):
|
||||
image_path = tempfile.mkdtemp(dir=FLAGS.image_decryption_dir)
|
||||
|
||||
image_location = properties['image_location']
|
||||
bucket_name = image_location.split("/")[0]
|
||||
manifest_path = image_location[len(bucket_name) + 1:]
|
||||
bucket = self._conn(context).get_bucket(bucket_name)
|
||||
key = bucket.get_key(manifest_path)
|
||||
manifest = key.get_contents_as_string()
|
||||
|
||||
manifest = ElementTree.fromstring(manifest)
|
||||
image_type = 'machine'
|
||||
|
||||
try:
|
||||
kernel_id = manifest.find("machine_configuration/kernel_id").text
|
||||
if kernel_id == 'true':
|
||||
image_type = 'kernel'
|
||||
kernel_id = None
|
||||
except:
|
||||
kernel_id = None
|
||||
|
||||
try:
|
||||
ramdisk_id = manifest.find("machine_configuration/ramdisk_id").text
|
||||
if ramdisk_id == 'true':
|
||||
image_type = 'ramdisk'
|
||||
ramdisk_id = None
|
||||
except:
|
||||
ramdisk_id = None
|
||||
|
||||
try:
|
||||
arch = manifest.find("machine_configuration/architecture").text
|
||||
except:
|
||||
arch = 'x86_64'
|
||||
|
||||
properties.update({'owner_id': context.project_id,
|
||||
'architecture': arch})
|
||||
|
||||
if kernel_id:
|
||||
properties['kernel_id'] = kernel_id
|
||||
|
||||
if ramdisk_id:
|
||||
properties['ramdisk_id'] = ramdisk_id
|
||||
|
||||
properties['is_public'] = False
|
||||
metadata = {'type': image_type,
|
||||
'status': 'queued',
|
||||
'is_public': True,
|
||||
'properties': properties}
|
||||
metadata['properties']['image_state'] = 'pending'
|
||||
image = self.service.create(context, metadata)
|
||||
image_id = image['id']
|
||||
|
||||
parts = []
|
||||
for fn_element in manifest.find("image").getiterator("filename"):
|
||||
part = self._download_file(bucket, fn_element.text, image_path)
|
||||
parts.append(part)
|
||||
|
||||
# NOTE(vish): this may be suboptimal, should we use cat?
|
||||
encrypted_filename = os.path.join(image_path, 'image.encrypted')
|
||||
with open(encrypted_filename, 'w') as combined:
|
||||
for filename in parts:
|
||||
with open(filename) as part:
|
||||
shutil.copyfileobj(part, combined)
|
||||
|
||||
metadata['properties']['image_state'] = 'decrypting'
|
||||
self.service.update(context, image_id, metadata)
|
||||
|
||||
hex_key = manifest.find("image/ec2_encrypted_key").text
|
||||
encrypted_key = binascii.a2b_hex(hex_key)
|
||||
hex_iv = manifest.find("image/ec2_encrypted_iv").text
|
||||
encrypted_iv = binascii.a2b_hex(hex_iv)
|
||||
|
||||
# FIXME(vish): grab key from common service so this can run on
|
||||
# any host.
|
||||
cloud_private_key = os.path.join(FLAGS.ca_path, "private/cakey.pem")
|
||||
|
||||
decrypted_filename = os.path.join(image_path, 'image.tar.gz')
|
||||
self._decrypt_image(encrypted_filename, encrypted_key, encrypted_iv,
|
||||
cloud_private_key, decrypted_filename)
|
||||
|
||||
metadata['properties']['image_state'] = 'untarring'
|
||||
self.service.update(context, image_id, metadata)
|
||||
|
||||
unz_filename = self._untarzip_image(image_path, decrypted_filename)
|
||||
|
||||
metadata['properties']['image_state'] = 'uploading'
|
||||
with open(unz_filename) as image_file:
|
||||
self.service.update(context, image_id, metadata, image_file)
|
||||
metadata['properties']['image_state'] = 'available'
|
||||
self.service.update(context, image_id, metadata)
|
||||
|
||||
shutil.rmtree(image_path)
|
||||
return image_id, metadata
|
||||
|
||||
@staticmethod
|
||||
def _decrypt_image(encrypted_filename, encrypted_key, encrypted_iv,
|
||||
cloud_private_key, decrypted_filename):
|
||||
key, err = utils.execute(
|
||||
'openssl rsautl -decrypt -inkey %s' % cloud_private_key,
|
||||
process_input=encrypted_key,
|
||||
check_exit_code=False)
|
||||
if err:
|
||||
raise exception.Error(_("Failed to decrypt private key: %s")
|
||||
% err)
|
||||
iv, err = utils.execute(
|
||||
'openssl rsautl -decrypt -inkey %s' % cloud_private_key,
|
||||
process_input=encrypted_iv,
|
||||
check_exit_code=False)
|
||||
if err:
|
||||
raise exception.Error(_("Failed to decrypt initialization "
|
||||
"vector: %s") % err)
|
||||
|
||||
_out, err = utils.execute(
|
||||
'openssl enc -d -aes-128-cbc -in %s -K %s -iv %s -out %s'
|
||||
% (encrypted_filename, key, iv, decrypted_filename),
|
||||
check_exit_code=False)
|
||||
if err:
|
||||
raise exception.Error(_("Failed to decrypt image file "
|
||||
"%(image_file)s: %(err)s") %
|
||||
{'image_file': encrypted_filename,
|
||||
'err': err})
|
||||
|
||||
@staticmethod
|
||||
def _untarzip_image(path, filename):
|
||||
tar_file = tarfile.open(filename, "r|gz")
|
||||
tar_file.extractall(path)
|
||||
image_file = tar_file.getnames()[0]
|
||||
tar_file.close()
|
||||
return os.path.join(path, image_file)
|
||||
|
||||
@@ -76,7 +76,7 @@ class BaseImageService(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def create(self, context, data):
|
||||
def create(self, context, metadata, data=None):
|
||||
"""
|
||||
Store the image data and return the new image id.
|
||||
|
||||
@@ -85,7 +85,7 @@ class BaseImageService(object):
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def update(self, context, image_id, data):
|
||||
def update(self, context, image_id, metadata, data=None):
|
||||
"""Replace the contents of the given image with the new data.
|
||||
|
||||
:raises NotFound if the image does not exist.
|
||||
|
||||
@@ -25,6 +25,7 @@ import webob.dec
|
||||
from paste import urlmap
|
||||
|
||||
from glance import client as glance_client
|
||||
from glance.common import exception as glance_exc
|
||||
|
||||
from nova import auth
|
||||
from nova import context
|
||||
@@ -149,25 +150,25 @@ def stub_out_glance(stubs, initial_fixtures=None):
|
||||
for f in self.fixtures:
|
||||
if f['id'] == image_id:
|
||||
return f
|
||||
return None
|
||||
raise glance_exc.NotFound
|
||||
|
||||
def fake_add_image(self, image_meta):
|
||||
def fake_add_image(self, image_meta, data=None):
|
||||
id = ''.join(random.choice(string.letters) for _ in range(20))
|
||||
image_meta['id'] = id
|
||||
self.fixtures.append(image_meta)
|
||||
return id
|
||||
|
||||
def fake_update_image(self, image_id, image_meta):
|
||||
def fake_update_image(self, image_id, image_meta, data=None):
|
||||
f = self.fake_get_image_meta(image_id)
|
||||
if not f:
|
||||
raise exc.NotFound
|
||||
raise glance_exc.NotFound
|
||||
|
||||
f.update(image_meta)
|
||||
|
||||
def fake_delete_image(self, image_id):
|
||||
f = self.fake_get_image_meta(image_id)
|
||||
if not f:
|
||||
raise exc.NotFound
|
||||
raise glance_exc.NotFound
|
||||
|
||||
self.fixtures.remove(f)
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ flags.DECLARE('fake_network', 'nova.network.manager')
|
||||
FLAGS.network_size = 8
|
||||
FLAGS.num_networks = 2
|
||||
FLAGS.fake_network = True
|
||||
FLAGS.image_service = 'nova.image.local.LocalImageService'
|
||||
flags.DECLARE('num_shelves', 'nova.volume.driver')
|
||||
flags.DECLARE('blades_per_shelf', 'nova.volume.driver')
|
||||
flags.DECLARE('iscsi_num_targets', 'nova.volume.driver')
|
||||
|
||||
@@ -38,6 +38,8 @@ from nova import test
|
||||
from nova.auth import manager
|
||||
from nova.compute import power_state
|
||||
from nova.api.ec2 import cloud
|
||||
from nova.api.ec2 import ec2utils
|
||||
from nova.image import local
|
||||
from nova.objectstore import image
|
||||
|
||||
|
||||
@@ -76,6 +78,11 @@ class CloudTestCase(test.TestCase):
|
||||
project=self.project)
|
||||
host = self.network.get_network_host(self.context.elevated())
|
||||
|
||||
def fake_image_show(meh, context, id):
|
||||
return dict(kernelId=1, ramdiskId=1)
|
||||
|
||||
self.stubs.Set(local.LocalImageService, 'show', fake_image_show)
|
||||
|
||||
def tearDown(self):
|
||||
network_ref = db.project_get_network(self.context,
|
||||
self.project.id)
|
||||
@@ -122,7 +129,7 @@ class CloudTestCase(test.TestCase):
|
||||
self.cloud.allocate_address(self.context)
|
||||
inst = db.instance_create(self.context, {'host': self.compute.host})
|
||||
fixed = self.network.allocate_fixed_ip(self.context, inst['id'])
|
||||
ec2_id = cloud.id_to_ec2_id(inst['id'])
|
||||
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
|
||||
self.cloud.associate_address(self.context,
|
||||
instance_id=ec2_id,
|
||||
public_ip=address)
|
||||
@@ -158,12 +165,12 @@ class CloudTestCase(test.TestCase):
|
||||
vol2 = db.volume_create(self.context, {})
|
||||
result = self.cloud.describe_volumes(self.context)
|
||||
self.assertEqual(len(result['volumeSet']), 2)
|
||||
volume_id = cloud.id_to_ec2_id(vol2['id'], 'vol-%08x')
|
||||
volume_id = ec2utils.id_to_ec2_id(vol2['id'], 'vol-%08x')
|
||||
result = self.cloud.describe_volumes(self.context,
|
||||
volume_id=[volume_id])
|
||||
self.assertEqual(len(result['volumeSet']), 1)
|
||||
self.assertEqual(
|
||||
cloud.ec2_id_to_id(result['volumeSet'][0]['volumeId']),
|
||||
ec2utils.ec2_id_to_id(result['volumeSet'][0]['volumeId']),
|
||||
vol2['id'])
|
||||
db.volume_destroy(self.context, vol1['id'])
|
||||
db.volume_destroy(self.context, vol2['id'])
|
||||
@@ -200,7 +207,7 @@ class CloudTestCase(test.TestCase):
|
||||
result = self.cloud.describe_instances(self.context)
|
||||
result = result['reservationSet'][0]
|
||||
self.assertEqual(len(result['instancesSet']), 2)
|
||||
instance_id = cloud.id_to_ec2_id(inst2['id'])
|
||||
instance_id = ec2utils.id_to_ec2_id(inst2['id'])
|
||||
result = self.cloud.describe_instances(self.context,
|
||||
instance_id=[instance_id])
|
||||
result = result['reservationSet'][0]
|
||||
@@ -216,6 +223,7 @@ class CloudTestCase(test.TestCase):
|
||||
|
||||
def test_console_output(self):
|
||||
image_id = FLAGS.default_image
|
||||
print image_id
|
||||
instance_type = FLAGS.default_instance_type
|
||||
max_count = 1
|
||||
kwargs = {'image_id': image_id,
|
||||
@@ -347,7 +355,7 @@ class CloudTestCase(test.TestCase):
|
||||
|
||||
def test_update_of_instance_display_fields(self):
|
||||
inst = db.instance_create(self.context, {})
|
||||
ec2_id = cloud.id_to_ec2_id(inst['id'])
|
||||
ec2_id = ec2utils.id_to_ec2_id(inst['id'])
|
||||
self.cloud.update_instance(self.context, ec2_id,
|
||||
display_name='c00l 1m4g3')
|
||||
inst = db.instance_get(self.context, inst['id'])
|
||||
@@ -365,7 +373,7 @@ class CloudTestCase(test.TestCase):
|
||||
def test_update_of_volume_display_fields(self):
|
||||
vol = db.volume_create(self.context, {})
|
||||
self.cloud.update_volume(self.context,
|
||||
cloud.id_to_ec2_id(vol['id'], 'vol-%08x'),
|
||||
ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x'),
|
||||
display_name='c00l v0lum3')
|
||||
vol = db.volume_get(self.context, vol['id'])
|
||||
self.assertEqual('c00l v0lum3', vol['display_name'])
|
||||
@@ -374,7 +382,7 @@ class CloudTestCase(test.TestCase):
|
||||
def test_update_of_volume_wont_update_private_fields(self):
|
||||
vol = db.volume_create(self.context, {})
|
||||
self.cloud.update_volume(self.context,
|
||||
cloud.id_to_ec2_id(vol['id'], 'vol-%08x'),
|
||||
ec2utils.id_to_ec2_id(vol['id'], 'vol-%08x'),
|
||||
mountpoint='/not/here')
|
||||
vol = db.volume_get(self.context, vol['id'])
|
||||
self.assertEqual(None, vol['mountpoint'])
|
||||
|
||||
@@ -31,7 +31,7 @@ from nova import test
|
||||
from nova import utils
|
||||
from nova.auth import manager
|
||||
from nova.compute import instance_types
|
||||
|
||||
from nova.image import local
|
||||
|
||||
LOG = logging.getLogger('nova.tests.compute')
|
||||
FLAGS = flags.FLAGS
|
||||
@@ -47,6 +47,11 @@ class ComputeTestCase(test.TestCase):
|
||||
network_manager='nova.network.manager.FlatManager')
|
||||
self.compute = utils.import_object(FLAGS.compute_manager)
|
||||
self.compute_api = compute.API()
|
||||
|
||||
def fake_image_show(meh, context, id):
|
||||
return dict(kernelId=1, ramdiskId=1)
|
||||
|
||||
self.stubs.Set(local.LocalImageService, 'show', fake_image_show)
|
||||
self.manager = manager.AuthManager()
|
||||
self.user = self.manager.create_user('fake', 'fake', 'fake')
|
||||
self.project = self.manager.create_project('fake', 'fake', 'fake')
|
||||
|
||||
@@ -90,8 +90,7 @@ class DirectTestCase(test.TestCase):
|
||||
class DirectCloudTestCase(test_cloud.CloudTestCase):
|
||||
def setUp(self):
|
||||
super(DirectCloudTestCase, self).setUp()
|
||||
compute_handle = compute.API(image_service=self.cloud.image_service,
|
||||
network_api=self.cloud.network_api,
|
||||
compute_handle = compute.API(network_api=self.cloud.network_api,
|
||||
volume_api=self.cloud.volume_api)
|
||||
direct.register_service('compute', compute_handle)
|
||||
self.router = direct.JsonParamsMiddleware(direct.Router())
|
||||
|
||||
@@ -57,7 +57,7 @@ class QuotaTestCase(test.TestCase):
|
||||
def _create_instance(self, cores=2):
|
||||
"""Create a test instance"""
|
||||
inst = {}
|
||||
inst['image_id'] = 'ami-test'
|
||||
inst['image_id'] = 'ami-1'
|
||||
inst['reservation_id'] = 'r-fakeres'
|
||||
inst['user_id'] = self.user.id
|
||||
inst['project_id'] = self.project.id
|
||||
@@ -123,7 +123,7 @@ class QuotaTestCase(test.TestCase):
|
||||
min_count=1,
|
||||
max_count=1,
|
||||
instance_type='m1.small',
|
||||
image_id='fake')
|
||||
image_id='ami-1')
|
||||
for instance_id in instance_ids:
|
||||
db.instance_destroy(self.context, instance_id)
|
||||
|
||||
@@ -136,7 +136,7 @@ class QuotaTestCase(test.TestCase):
|
||||
min_count=1,
|
||||
max_count=1,
|
||||
instance_type='m1.small',
|
||||
image_id='fake')
|
||||
image_id='ami-1')
|
||||
for instance_id in instance_ids:
|
||||
db.instance_destroy(self.context, instance_id)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user