add fixes to images

1. extend modify_image_attributes
2. fix create_image
3. add reset_image_attribute
4. add images' tests

Change-Id: I0e52dbe46e0a0f3e6257365014f9b35bb4a21ad9
This commit is contained in:
Andrey Pavlov 2015-04-17 22:33:02 +03:00
parent a98a506eb9
commit b7d53fe5e6
14 changed files with 477 additions and 88 deletions

View File

@ -107,11 +107,9 @@ Availability zone related:
Image related:
- CopyImage
- ResetImageAttribute
- creationDate Image property
- creationDate Image property
- platform Image property
- productCodes Image property
- description Image property
- hypervisor Image property
- imageOwnerAlias Image property
- sriovNetSupport Image property

View File

@ -1017,10 +1017,10 @@ class CloudController(object):
@module_and_param_types(image, 'amiariaki_id', 'str',
'strs', 'str',
'dummy', 'dummy',
'dummy', 'dummy', 'dummy')
def modify_image_attribute(self, context, image_id, attribute,
user_group, operation_type,
'str', 'dummy',
'dummy', 'dummy', 'str')
def modify_image_attribute(self, context, image_id, attribute=None,
user_group=None, operation_type=None,
description=None, launch_permission=None,
product_code=None, user_id=None, value=None):
"""Modifies the specified attribute of the specified AMI.
@ -1029,19 +1029,31 @@ class CloudController(object):
context (RequestContext): The request context.
image_id (str): The ID of the image.
attribute (str): The name of the attribute to modify.
It's optional for AWS but required for legacy Nova EC2 API.
Only 'launchPermission' is supported now.
user_group (list of str): One or more user groups.
It's optional for AWS but required for legacy Nova EC2 API.
Only 'all' group is supported now.
operation_type (str): The operation type.
It's optional for AWS but required for legacy Nova EC2 API.
Only 'add' and 'remove' operation types are supported now.
description: Not supported now.
launch_permission: : Not supported now.
description: A description for the AMI.
launch_permission: : A launch permission modification.
product_code: : Not supported now.
user_id: : Not supported now.
value: : Not supported now.
value: : The value of the attribute being modified.
This is only valid when modifying the description attribute.
Returns:
true if the request succeeds.
"""
@module_and_param_types(image, 'amiariaki_id', 'str')
def reset_image_attribute(self, context, image_id, attribute):
"""Resets an attribute of an AMI to its default value.
Args:
context (RequestContext): The request context.
image_id (str): The ID of the image.
attribute (str): The attribute to reset (currently you can only
reset the launch permission attribute).
Returns:
true if the request succeeds.
"""

View File

@ -270,8 +270,10 @@ def get_db_item_by_os_id(context, kind, os_id, items_by_os_id=None,
return item
# TODO(andrey-mp): project_id is a temporary workaround which should be
# reworked asap. (c) by ftersin.
def os_id_to_ec2_id(context, kind, os_id, items_by_os_id=None,
ids_by_os_id=None):
ids_by_os_id=None, project_id=None):
if os_id is None:
return None
if ids_by_os_id is not None:
@ -286,7 +288,8 @@ def os_id_to_ec2_id(context, kind, os_id, items_by_os_id=None,
if len(ids):
item_id, _os_id = ids[0]
else:
item_id = db_api.add_item_id(context, kind, os_id)
item_id = db_api.add_item_id(context, kind, os_id,
project_id=project_id)
if ids_by_os_id is not None:
ids_by_os_id[os_id] = item_id
return item_id

View File

@ -36,6 +36,7 @@ from ec2api.api import clients
from ec2api.api import common
from ec2api.api import ec2utils
from ec2api.api import instance as instance_api
from ec2api import context as ec2_context
from ec2api.db import api as db_api
from ec2api import exception
from ec2api.i18n import _, _LE, _LI
@ -104,8 +105,7 @@ def create_image(context, instance_id, name=None, description=None,
instance = ec2utils.get_db_item(context, instance_id)
if not instance_api._is_ebs_instance(context, instance['os_id']):
# TODO(ft): Change the error code and message with the real AWS ones
msg = _('The instance is not an EBS-backed instance.')
msg = _('Instance does not have a volume attached at root (null).')
raise exception.InvalidParameterValue(value=instance_id,
parameter='InstanceId',
reason=msg)
@ -140,12 +140,17 @@ def create_image(context, instance_id, name=None, description=None,
name_map = dict(instance=instance['os_id'], now=timeutils.isotime())
name = name or _('image of %(instance)s at %(now)s') % name_map
glance = clients.glance(context)
with common.OnCrashCleaner() as cleaner:
os_image = os_instance.create_image(name)
cleaner.addCleanup(os_image.delete)
os_image_id = os_instance.create_image(name)
cleaner.addCleanup(glance.images.delete, os_image_id)
# TODO(andrey-mp): snapshot and volume also must be deleted in case
# of error
os_image = glance.images.get(os_image_id)
image = db_api.add_item(context, _get_os_image_kind(os_image),
{'os_id': os_image.id,
'is_public': False})
{'os_id': os_image_id,
'is_public': False,
'description': description})
if restart_instance:
os_instance.start()
@ -208,7 +213,8 @@ def register_image(context, name=None, image_location=None,
cleaner.addCleanup(os_image.delete)
kind = _get_os_image_kind(os_image)
image = db_api.add_item(context, kind, {'os_id': os_image.id,
'is_public': False})
'is_public': False,
'description': description})
return {'imageId': image['id']}
@ -285,6 +291,7 @@ class ImageDescriber(common.TaggableItemsDescriber):
def get_os_items(self):
return clients.glance(self.context).images.list()
# TODO(andrey-mp): project_id will be invalid for new public images
def auto_update_db(self, image, os_image):
if not image:
kind = _get_os_image_kind(os_image)
@ -316,22 +323,34 @@ def describe_images(context, executable_by=None, image_id=None,
def describe_image_attribute(context, image_id, attribute):
def _block_device_mapping_attribute(os_image, result):
image = ec2utils.get_db_item(context, image_id)
try:
glance = clients.glance(ec2_context.get_os_admin_context())
glance.images.get(image['os_id'])
except glance_exception.HTTPNotFound:
raise exception.InvalidAMIIDNotFound(id=image_id)
os_image = _get_owned_os_image(context, image_id, image['os_id'])
_prepare_mappings(os_image)
def _block_device_mapping_attribute(result):
_cloud_format_mappings(context, os_image.properties, result)
def _launch_permission_attribute(os_image, result):
def _description_attribute(result):
result['description'] = {'value': image.get('description')}
def _launch_permission_attribute(result):
result['launchPermission'] = []
if os_image.is_public:
result['launchPermission'].append({'group': 'all'})
def _kernel_attribute(os_image, result):
def _kernel_attribute(result):
kernel_id = os_image.properties.get('kernel_id')
if kernel_id:
result['kernel'] = {
'value': ec2utils.os_id_to_ec2_id(context, 'aki', kernel_id)
}
def _ramdisk_attribute(image, result):
def _ramdisk_attribute(result):
ramdisk_id = os_image.properties.get('ramdisk_id')
if ramdisk_id:
result['ramdisk'] = {
@ -339,12 +358,13 @@ def describe_image_attribute(context, image_id, attribute):
}
# NOTE(ft): Openstack extension, AWS-incompability
def _root_device_name_attribute(os_image, result):
def _root_device_name_attribute(result):
result['rootDeviceName'] = (
_block_device_properties_root_device_name(os_image.properties))
supported_attributes = {
'blockDeviceMapping': _block_device_mapping_attribute,
'description': _description_attribute,
'launchPermission': _launch_permission_attribute,
'kernel': _kernel_attribute,
'ramdisk': _ramdisk_attribute,
@ -352,65 +372,131 @@ def describe_image_attribute(context, image_id, attribute):
'rootDeviceName': _root_device_name_attribute,
}
# TODO(ft): AWS returns AuthFailure for not own public images,
# but we return NotFound for this case because we search for local images
# only
image = ec2utils.get_db_item(context, image_id)
fn = supported_attributes.get(attribute)
if fn is None:
# TODO(ft): Change the error code and message with the real AWS ones
raise exception.InvalidAttribute(attr=attribute)
glance = clients.glance(context)
os_image = glance.images.get(image['os_id'])
_prepare_mappings(os_image)
result = {'imageId': image_id}
fn(os_image, result)
fn(result)
return result
def modify_image_attribute(context, image_id, attribute,
user_group, operation_type,
def modify_image_attribute(context, image_id, attribute=None,
user_group=None, operation_type=None,
description=None, launch_permission=None,
product_code=None, user_id=None, value=None):
if attribute != 'launchPermission':
# TODO(ft): Change the error code and message with the real AWS ones
raise exception.InvalidAttribute(attr=attribute)
if not user_group:
msg = _('user or group not specified')
# TODO(ft): Change the error code and message with the real AWS ones
raise exception.MissingParameter(msg)
if len(user_group) != 1 and user_group[0] != 'all':
msg = _('only group "all" is supported')
raise exception.InvalidParameterValue(parameter='UserGroup',
value=user_group,
reason=msg)
if operation_type not in ['add', 'remove']:
msg = _('operation_type must be add or remove')
raise exception.InvalidParameterValue(parameter='OperationType',
value='operation_type',
reason=msg)
# TODO(ft): AWS returns AuthFailure for public images,
# but we return NotFound due searching for local images only
image = ec2utils.get_db_item(context, image_id)
glance = clients.glance(context)
image = glance.images.get(image['os_id'])
image.update(is_public=(operation_type == 'add'))
try:
glance = clients.glance(ec2_context.get_os_admin_context())
os_image = glance.images.get(image['os_id'])
except glance_exception.HTTPNotFound:
raise exception.InvalidAMIIDNotFound(id=image_id)
attributes = set()
# NOTE(andrey-mp): launchPermission structure is converted here
# to plain parameters: attribute, user_group, operation_type, user_id
if launch_permission is not None:
attributes.add('launchPermission')
user_group = list()
user_id = list()
if len(launch_permission) == 0:
msg = _('No operation specified for launchPermission attribute.')
raise exception.InvalidParameterCombination(msg)
if len(launch_permission) > 1:
msg = _('Only one operation can be specified.')
raise exception.InvalidParameterCombination(msg)
operation_type, permissions = launch_permission.popitem()
for index_key in permissions:
permission = permissions[index_key]
if 'group' in permission:
user_group.append(permission['group'])
if 'user_id' in permission:
user_id.append(permission['user_id'])
if attribute == 'launchPermission':
attributes.add('launchPermission')
if description is not None:
attributes.add('description')
value = description
if attribute == 'description':
attributes.add('description')
# check attributes count
if len(attributes) == 0:
raise exception.InvalidParameterCombination('No attributes specified.')
if len(attributes) > 1:
raise exception.InvalidParameterCombination(
_('Fields for multiple attribute types specified: %s')
% str(attributes))
if 'launchPermission' in attributes:
if not user_group:
msg = _('No operation specified for launchPermission attribute.')
raise exception.InvalidParameterCombination(msg)
if len(user_group) != 1 and user_group[0] != 'all':
msg = _('only group "all" is supported')
raise exception.InvalidParameterValue(parameter='UserGroup',
value=user_group,
reason=msg)
if operation_type not in ['add', 'remove']:
msg = _('operation_type must be add or remove')
raise exception.InvalidParameterValue(parameter='OperationType',
value='operation_type',
reason=msg)
os_image = _get_owned_os_image(context, image_id, image['os_id'])
os_image.update(is_public=(operation_type == 'add'))
return True
if 'description' in attributes:
if not value:
raise exception.MissingParameter(
'The request must contain the parameter description')
# Just check image accessibility
_get_owned_os_image(context, image_id, image['os_id'])
image['description'] = value
db_api.update_item(context, image)
return True
def reset_image_attribute(context, image_id, attribute):
if attribute != 'launchPermission':
raise exception.InvalidRequest()
image = ec2utils.get_db_item(context, image_id)
os_image = _get_owned_os_image(context, image_id, image['os_id'])
os_image.update(is_public=False)
return True
def _get_owned_os_image(context, image_id, os_image_id):
glance = clients.glance(context)
try:
os_image = glance.images.get(os_image_id)
except glance_exception.HTTPNotFound:
os_image = None
if os_image is None or os_image.owner != context.project_id:
raise exception.AuthFailure(_('Not authorized for image:%s')
% image_id)
return os_image
def _format_image(context, image, os_image, images_dict, ids_dict,
snapshot_ids=None):
ec2_image = {'imageId': image['id'],
'imageOwnerId': os_image.owner,
'description': '',
'imageType': IMAGE_TYPES[
ec2utils.get_ec2_id_kind(image['id'])],
'isPublic': image['is_public'],
'architecture': os_image.properties.get('architecture'),
}
if 'description' in image:
ec2_image['description'] = image['description']
state = os_image.status
# NOTE(vish): fallback status if image_state isn't set
if state == 'active':
@ -462,7 +548,7 @@ def _format_image(context, image, os_image, images_dict, ids_dict,
ec2_image['rootDeviceType'] = root_device_type
_cloud_format_mappings(context, properties, ec2_image,
root_device_name, snapshot_ids)
root_device_name, snapshot_ids, os_image.owner)
return ec2_image
@ -498,7 +584,7 @@ _ephemeral = re.compile('^ephemeral(\d|[1-9]\d+)$')
def _cloud_format_mappings(context, properties, result, root_device_name=None,
snapshot_ids=None):
snapshot_ids=None, project_id=None):
"""Format multiple BlockDeviceMappingItemType."""
mappings = [
{'virtualName': m['virtual'],
@ -509,7 +595,7 @@ def _cloud_format_mappings(context, properties, result, root_device_name=None,
for bdm in properties.get('block_device_mapping', []):
formatted_bdm = _cloud_format_block_device_mapping(
context, bdm, root_device_name, snapshot_ids)
context, bdm, root_device_name, snapshot_ids, project_id)
# NOTE(yamahata): overwrite mappings with block_device_mapping
for i in range(len(mappings)):
if (formatted_bdm.get('deviceName')
@ -526,7 +612,7 @@ def _cloud_format_mappings(context, properties, result, root_device_name=None,
def _cloud_format_block_device_mapping(context, bdm, root_device_name=None,
snapshot_ids=None):
snapshot_ids=None, project_id=None):
"""Construct BlockDeviceMappingItemType."""
keys = (('deviceName', 'device_name'),
('virtualName', 'virtual_name'))
@ -541,11 +627,12 @@ def _cloud_format_block_device_mapping(context, bdm, root_device_name=None,
ebs = {name: bdm[k] for name, k in ebs_keys if bdm.get(k) is not None}
if bdm.get('snapshot_id'):
ebs['snapshotId'] = ec2utils.os_id_to_ec2_id(
context, 'snap', bdm['snapshot_id'], ids_by_os_id=snapshot_ids)
context, 'snap', bdm['snapshot_id'], ids_by_os_id=snapshot_ids,
project_id=project_id)
# NOTE(ft): Openstack extension, AWS-incompability
elif bdm.get('volume_id'):
ebs['snapshotId'] = ec2utils.os_id_to_ec2_id(
context, 'vol', bdm['volume_id'])
context, 'vol', bdm['volume_id'], project_id=project_id)
assert 'snapshotId' in ebs
item['ebs'] = ebs
return item

View File

@ -83,8 +83,8 @@ def add_item(context, kind, data):
return IMPL.add_item(context, kind, data)
def add_item_id(context, kind, os_id):
return IMPL.add_item_id(context, kind, os_id)
def add_item_id(context, kind, os_id, project_id=None):
return IMPL.add_item_id(context, kind, os_id, project_id)
def update_item(context, item):

View File

@ -118,12 +118,14 @@ def add_item(context, kind, data):
@require_context
def add_item_id(context, kind, os_id):
def add_item_id(context, kind, os_id, project_id=None):
item_ref = models.Item()
item_ref.update({
"id": _new_id(kind, os_id),
"os_id": os_id,
})
if project_id:
item_ref.project_id = project_id
try:
item_ref.save()
except db_exception.DBDuplicateEntry as ex:

View File

@ -24,6 +24,12 @@ export TEST_CONFIG="functional_tests.conf"
if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
openstack catalog list
if [[ "$?" -ne "0" ]]; then
echo "Looks like credentials are absent."
exit 1
fi
# create separate user/project
tenant_name="tenant-$(cat /dev/urandom | tr -cd 'a-f0-9' | head -c 8)"
eval $(openstack project create -f shell -c id $tenant_name)
@ -32,7 +38,7 @@ if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
eval $(openstack user create "$user_name" --project "$tenant_id" --password "password" --email "$user_name@example.com" -f shell -c id)
user_id=$id
# create network
if [[ -n $(keystone service-list | grep neutron) ]]; then
if [[ -n $(openstack catalog list | grep neutron) ]]; then
net_id=$(neutron net-create --tenant-id $tenant_id "private" | grep ' id ' | awk '{print $4}')
subnet_id=$(neutron subnet-create --tenant-id $tenant_id --ip_version 4 --gateway 10.0.0.1 --name "private_subnet" $net_id 10.0.0.0/24 | grep ' id ' | awk '{print $4}')
router_id=$(neutron router-create --tenant-id $tenant_id "private_router" | grep ' id ' | awk '{print $4}')
@ -90,6 +96,10 @@ if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
done
image_name="image-$(cat /dev/urandom | tr -cd 'a-f0-9' | head -c 8)"
nova image-create $instance_name $image_name
if [[ "$?" -ne "0" ]]; then
echo "Image creation from instance fails"
exit 1
fi
ebs_image_id=$(euca-describe-images --show-empty-fields | grep $image_name | awk '{print $2}')
nova delete $instance_id
@ -101,6 +111,9 @@ aws_secret = $EC2_SECRET_KEY
image_id = $image_id
ebs_image_id = $ebs_image_id
EOF"
# local workaround for LP#1439819. its doesn't work in gating because glance check isatty property.
#glance image-update $image_name --container-format ami --disk-format ami
fi
sudo pip install -r test-requirements.txt

View File

@ -13,6 +13,7 @@
# License for the specific language governing permissions and limitations
# under the License.
from tempest_lib.common.utils import data_utils
import testtools
from ec2api.tests.functional import base
@ -67,9 +68,270 @@ class ImageTest(base.EC2TestCase):
resp, data = self.client.DescribeImages(
# NOTE(ft): limit output to prevent timeout over AWS
Filters=[{'Name': 'image-type', 'Values': ['kernel', 'ramdisk']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
if len(data['Images']) < 2:
self.skipTest("Insufficient images to check filters")
resp, data = self.client.DescribeImages(
Filters=[{'Name': 'image-id', 'Values': [image_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Images']))
self.assertEqual(image_id, data['Images'][0]['ImageId'])
def test_check_image_attributes_negative(self):
# NOTE(andrey-mp): image_id is a public image created by admin
image_id = CONF.aws.image_id
resp, data = self.client.ModifyImageAttribute(
ImageId=CONF.aws.image_id, Attribute='unsupported')
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination', data['Error']['Code'])
resp, data = self.client.ModifyImageAttribute(
ImageId=CONF.aws.image_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination', data['Error']['Code'])
resp, data = self.client.ModifyImageAttribute(
ImageId=image_id, Description={'Value': 'fake'})
self.assertEqual(400, resp.status_code)
self.assertEqual('AuthFailure', data['Error']['Code'])
resp, data = self.client.ModifyImageAttribute(
ImageId=image_id, LaunchPermission={'Add': [{'Group': 'all'}]})
self.assertEqual(400, resp.status_code)
self.assertEqual('AuthFailure', data['Error']['Code'])
resp, data = self.client.ModifyImageAttribute(
ImageId=image_id, Attribute='description')
self.assertEqual(400, resp.status_code)
self.assertEqual('MissingParameter', data['Error']['Code'])
resp, data = self.client.ModifyImageAttribute(
ImageId=image_id, Attribute='launchPermission')
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination', data['Error']['Code'])
resp, data = self.client.ResetImageAttribute(
ImageId=image_id, Attribute='fake')
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidRequest', data['Error']['Code'])
resp, data = self.client.ResetImageAttribute(
ImageId=image_id, Attribute='launchPermission')
self.assertEqual(400, resp.status_code)
self.assertEqual('AuthFailure', data['Error']['Code'])
@testtools.skipUnless(CONF.aws.image_id, 'image id is not defined')
def test_create_image_from_non_ebs_instance(self):
image_id = CONF.aws.image_id
resp, data = self.client.DescribeImages(ImageIds=[image_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
image = data['Images'][0]
if 'RootDeviceType' in image and 'ebs' in image['RootDeviceType']:
raise self.skipException('image_id should not be EBS image.')
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=CONF.aws.instance_type,
Placement={'AvailabilityZone': CONF.aws.aws_zone},
MinCount=1, MaxCount=1)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Instances']))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.CreateImage(InstanceId=instance_id,
Name='name', Description='desc')
if resp.status_code == 200:
image_id = data['ImageId']
self.addResourceCleanUp(self.client.DeregisterImage,
ImageId=image_id)
self.get_image_waiter().wait_available(image_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
def _create_image(self, name, desc):
image_id = CONF.aws.ebs_image_id
resp, data = self.client.DescribeImages(ImageIds=[image_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
image = data['Images'][0]
self.assertTrue('RootDeviceType' in image
and 'ebs' in image['RootDeviceType'])
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=CONF.aws.instance_type,
Placement={'AvailabilityZone': CONF.aws.aws_zone},
MinCount=1, MaxCount=1)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Instances']))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.CreateImage(InstanceId=instance_id,
Name=name, Description=desc)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
image_id = data['ImageId']
image_clean = self.addResourceCleanUp(self.client.DeregisterImage,
ImageId=image_id)
self.get_image_waiter().wait_available(image_id)
resp, data = self.client.DescribeImages(ImageIds=[image_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
for bdm in data['Images'][0].get('BlockDeviceMappings', []):
if 'Ebs' in bdm and 'SnapshotId' in bdm['Ebs']:
snapshot_id = bdm['Ebs']['SnapshotId']
kwargs = {'SnapshotIds': [snapshot_id]}
resp, data = self.client.DescribeSnapshots(**kwargs)
if resp.status_code == 200:
volume_id = data['Snapshots'][0].get('VolumeId')
if volume_id:
self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.addResourceCleanUp(self.client.DeleteSnapshot,
SnapshotId=snapshot_id)
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
return image_id, image_clean
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
'skip due to bug #1439819')
@testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")
def test_create_image_from_ebs_instance(self):
name = data_utils.rand_name('image')
desc = data_utils.rand_name('')
image_id, image_clean = self._create_image(name, desc)
resp, data = self.client.DescribeImages(ImageIds=[image_id])
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Images']))
image = data['Images'][0]
self.assertEqual("ebs", image['RootDeviceType'])
self.assertFalse(image['Public'])
self.assertEqual(name, image['Name'])
self.assertEqual(desc, image['Description'])
self.assertEqual('machine', image['ImageType'])
self.assertNotEmpty(image['BlockDeviceMappings'])
for bdm in image['BlockDeviceMappings']:
self.assertIn('DeviceName', bdm)
resp, data = self.client.DeregisterImage(ImageId=image_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(image_clean)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
'skip due to bug #1439819')
@testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")
def test_check_simple_image_attributes(self):
name = data_utils.rand_name('image')
desc = data_utils.rand_name('desc for image')
image_id, image_clean = self._create_image(name, desc)
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='kernel')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertIn('KernelId', data)
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='ramdisk')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertIn('RamdiskId', data)
# description
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='description')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertIn('Description', data)
self.assertIn('Value', data['Description'])
self.assertEqual(desc, data['Description']['Value'])
def _modify_description(**kwargs):
resp, data = self.client.ModifyImageAttribute(
ImageId=image_id, **kwargs)
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='description')
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.assertEqual(new_desc, data['Description']['Value'])
new_desc = data_utils.rand_name('new desc')
_modify_description(Attribute='description', Value=new_desc)
_modify_description(Description={'Value': new_desc})
resp, data = self.client.DeregisterImage(ImageId=image_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(image_clean)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
'By default glance is configured as "publicize_image": "role:admin"')
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
'skip due to bug #1439819')
@testtools.skipUnless(CONF.aws.ebs_image_id, "EBS image id is not defined")
def test_check_launch_permission_attribute(self):
name = data_utils.rand_name('image')
desc = data_utils.rand_name('desc for image')
image_id, image_clean = self._create_image(name, desc)
# launch permission
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='launchPermission')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertIn('LaunchPermissions', data)
self.assertEmpty(data['LaunchPermissions'])
def _modify_launch_permission(**kwargs):
resp, data = self.client.ModifyImageAttribute(
ImageId=image_id, **kwargs)
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='launchPermission')
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.assertIn('LaunchPermissions', data)
self.assertNotEmpty(data['LaunchPermissions'])
self.assertIn('Group', data['LaunchPermissions'][0])
self.assertEqual('all', data['LaunchPermissions'][0]['Group'])
resp, data = self.client.DescribeImages(ImageIds=[image_id])
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.assertTrue(data['Images'][0]['Public'])
resp, data = self.client.ResetImageAttribute(
ImageId=image_id, Attribute='launchPermission')
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
resp, data = self.client.DescribeImageAttribute(
ImageId=image_id, Attribute='launchPermission')
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.assertEmpty(data['LaunchPermissions'])
resp, data = self.client.DescribeImages(ImageIds=[image_id])
self.assertEqual(200, resp.status_code,
base.EC2ErrorConverter(data))
self.assertFalse(data['Images'][0]['Public'])
_modify_launch_permission(Attribute='launchPermission',
OperationType='add', UserGroups=['all'])
_modify_launch_permission(LaunchPermission={'Add': [{'Group': 'all'}]})
resp, data = self.client.DeregisterImage(ImageId=image_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(image_clean)

View File

@ -256,7 +256,7 @@ class InstanceTest(base.EC2TestCase):
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
def test_instance_attributes(self):
def test_describe_instance_attributes(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
resp, data = self.client.RunInstances(

View File

@ -525,6 +525,8 @@ class EC2TestCase(base.BaseTestCase):
def _image_get_state(cls, image_id):
resp, data = cls.client.DescribeImages(ImageIds=[image_id])
if resp.status_code == 200:
if not data['Images']:
raise exceptions.NotFound()
return data['Images'][0]['State']
if resp.status_code == 400:

View File

@ -1105,7 +1105,6 @@ EC2_IMAGE_1 = {
'imageState': 'available',
'imageType': 'machine',
'name': 'fake_name',
'description': None,
'imageLocation': LOCATION_IMAGE_1,
'kernelId': ID_EC2_IMAGE_AKI_1,
'ramdiskId': ID_EC2_IMAGE_ARI_1,
@ -1138,7 +1137,7 @@ EC2_IMAGE_2 = {
'imageState': 'available',
'imageType': 'machine',
'name': None,
'description': None,
'description': 'fake desc',
'imageLocation': 'None (None)',
'architecture': None,
'rootDeviceType': 'ebs',
@ -1159,6 +1158,7 @@ DB_IMAGE_2 = {
'id': ID_EC2_IMAGE_2,
'os_id': ID_OS_IMAGE_2,
'is_public': True,
'description': 'fake desc'
}
DB_IMAGE_AKI_1 = {
'id': ID_EC2_IMAGE_AKI_1,

View File

@ -169,7 +169,7 @@ class EC2UtilsTestCase(testtools.TestCase):
item_id = ec2utils.os_id_to_ec2_id(fake_context, 'fake', fake_os_id)
self.assertEqual(fake_id, item_id)
db_api.add_item_id.assert_called_once_with(
fake_context, 'fake', fake_os_id)
fake_context, 'fake', fake_os_id, None)
# no item in cache, item isn't found
db_api.reset_mock()
@ -180,7 +180,7 @@ class EC2UtilsTestCase(testtools.TestCase):
self.assertIn(fake_os_id, ids_cache)
self.assertEqual(fake_id, ids_cache[fake_os_id])
db_api.add_item_id.assert_called_once_with(
fake_context, 'fake', fake_os_id)
fake_context, 'fake', fake_os_id, None)
# no item in cache, item is found
db_api.reset_mock()

View File

@ -92,6 +92,15 @@ FILE_MANIFEST_XML = """<?xml version="1.0" ?>
class ImageTestCase(base.ApiTestCase):
def setUp(self):
super(ImageTestCase, self).setUp()
get_os_admin_context_patcher = (
mock.patch('ec2api.context.get_os_admin_context'))
self.get_os_admin_context = get_os_admin_context_patcher.start()
self.addCleanup(get_os_admin_context_patcher.stop)
self.get_os_admin_context.return_value = (
self._create_context(auth_token='admin_token'))
@mock.patch('ec2api.api.instance._is_ebs_instance')
def _test_create_image(self, instance_status, no_reboot, is_ebs_instance):
self.set_mock_db_items(fakes.DB_INSTANCE_2)
@ -103,17 +112,16 @@ class ImageTestCase(base.ApiTestCase):
os_instance.get.side_effect = lambda: (setattr(os_instance, 'status',
'SHUTOFF')
if next(stop_called) else None)
os_image = mock.MagicMock()
os_image.configure_mock(id=fakes.random_os_id())
os_instance.create_image.return_value = os_image
image_id = fakes.random_ec2_id('ami')
os_instance.create_image.return_value = image_id
self.nova.servers.get.return_value = os_instance
is_ebs_instance.return_value = True
image_id = fakes.random_ec2_id('ami')
self.db_api.add_item.side_effect = tools.get_db_api_add_item(image_id)
resp = self.execute('CreateImage',
{'InstanceId': fakes.ID_EC2_INSTANCE_2,
'Name': 'fake_name',
'Description': 'fake desc',
'NoReboot': str(no_reboot)})
self.assertEqual({'imageId': image_id},
resp)
@ -122,8 +130,9 @@ class ImageTestCase(base.ApiTestCase):
self.nova.servers.get.assert_called_once_with(fakes.ID_OS_INSTANCE_2)
is_ebs_instance.assert_called_once_with(mock.ANY, os_instance.id)
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'ami', {'os_id': os_image.id,
'is_public': False})
mock.ANY, 'ami', {'os_id': image_id,
'is_public': False,
'description': 'fake desc'})
if not no_reboot:
os_instance.stop.assert_called_once_with()
os_instance.get.assert_called_once_with()
@ -198,7 +207,8 @@ class ImageTestCase(base.ApiTestCase):
{'imageId': fakes.ID_EC2_IMAGE_2}))
self.db_api.add_item.assert_called_once_with(
mock.ANY, 'ami', {'os_id': fakes.ID_OS_IMAGE_2,
'is_public': False})
'is_public': False,
'description': None})
self.assertEqual(1, self.glance.images.create.call_count)
self.assertEqual((), self.glance.images.create.call_args[0])
self.assertIn('properties', self.glance.images.create.call_args[1])
@ -274,7 +284,7 @@ class ImageTestCase(base.ApiTestCase):
('block-device-mapping.device-name', '/dev/sdb2'),
('block-device-mapping.snapshot-id', fakes.ID_EC2_SNAPSHOT_1),
('block-device-mapping.volume-size', 22),
('description', ''),
('description', 'fake desc'),
('image-id', fakes.ID_EC2_IMAGE_1),
('image-type', 'machine'),
('is-public', True),

View File

@ -1460,9 +1460,9 @@ class InstancePrivateTestCase(test_base.BaseTestCase):
fake_context, instance, os_instance, [], {},
os_flavors=fake_flavors)
db_api.add_item_id.assert_has_calls(
[mock.call(mock.ANY, 'ami', os_instance.image['id']),
mock.call(mock.ANY, 'aki', kernel_id),
mock.call(mock.ANY, 'ari', ramdisk_id)],
[mock.call(mock.ANY, 'ami', os_instance.image['id'], None),
mock.call(mock.ANY, 'aki', kernel_id, None),
mock.call(mock.ANY, 'ari', ramdisk_id, None)],
any_order=True)
@mock.patch('cinderclient.client.Client')