Add rest of api tests.

Change-Id: I639876622bfb5a034717463256fe0dbd5b9a8399
This commit is contained in:
Andrey Pavlov 2015-02-10 21:27:39 +03:00
parent 5584ef9ad6
commit 462de41470
15 changed files with 1490 additions and 10 deletions

View File

@ -23,6 +23,10 @@ export TEST_CONFIG="functional_tests.conf"
if [[ ! -f $TEST_CONFIG_DIR/$TEST_CONFIG ]]; then
IMAGE_ID=$(euca-describe-images | grep "cirros" | grep "ami-" | head -n 1 | awk '{print $2}')
VPC_ENABLED="False"
if [[ $DEVSTACK_GATE_NEUTRON -eq "1" ]]; then
VPC_ENABLED="True"
fi
sudo bash -c "cat > $TEST_CONFIG_DIR/$TEST_CONFIG <<EOF
[aws]
@ -30,6 +34,7 @@ ec2_url = $EC2_URL
aws_access = $EC2_ACCESS_KEY
aws_secret = $EC2_SECRET_KEY
image_id = $IMAGE_ID
vpc_enabled = $VPC_ENABLED
EOF"
fi

View File

@ -0,0 +1,357 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import time
from tempest_lib.openstack.common import log
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class AddressTest(base.EC2TestCase):
@testtools.skipUnless(CONF.aws.run_incompatible_tests, "VPC is disabled")
def test_create_delete_vpc_address(self):
kwargs = {
'Domain': 'vpc',
}
resp, data = self.client.AllocateAddress(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
id = data['AllocationId']
res_clean = self.addResourceCleanUp(self.client.ReleaseAddress,
AllocationId=id)
self.assertEqual('vpc', data['Domain'])
resp, data = self.client.ReleaseAddress(AllocationId=id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
def test_create_delete_standard_address(self):
resp, data = self.client.AllocateAddress()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ip = data['PublicIp']
res_clean = self.addResourceCleanUp(self.client.ReleaseAddress,
PublicIp=ip)
resp, data = self.client.ReleaseAddress(PublicIp=ip)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
@testtools.skipUnless(CONF.aws.run_incompatible_tests, "VPC is disabled")
def test_invalid_delete_vpc_address(self):
kwargs = {
'Domain': 'vpc',
}
resp, data = self.client.AllocateAddress(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ip = data['PublicIp']
id = data['AllocationId']
res_clean = self.addResourceCleanUp(self.client.ReleaseAddress,
AllocationId=id)
self.assertEqual('vpc', data['Domain'])
resp, data = self.client.ReleaseAddress(PublicIp=ip, AllocationId=id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination', data['Error']['Code'])
resp, data = self.client.ReleaseAddress(PublicIp=ip)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
resp, data = self.client.ReleaseAddress(AllocationId=id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
if CONF.aws.run_incompatible_tests:
resp, data = self.client.ReleaseAddress(PublicIp=ip)
self.assertEqual(400, resp.status_code)
self.assertEqual('AuthFailure', data['Error']['Code'])
resp, data = self.client.ReleaseAddress(AllocationId=id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidAllocationID.NotFound', data['Error']['Code'])
kwargs = {
"AllocationId": 'eipalloc-00000000',
}
resp, data = self.client.ReleaseAddress(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidAllocationID.NotFound', data['Error']['Code'])
if CONF.aws.run_incompatible_tests:
resp, data = self.client.ReleaseAddress(PublicIp='ip')
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
def test_invalid_create_address(self):
kwargs = {
'Domain': 'invalid',
}
resp, data = self.client.AllocateAddress(*[], **kwargs)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.ReleaseAddress,
AllocationId=data['AllocationId'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
@testtools.skipUnless(CONF.aws.run_incompatible_tests, "VPC is disabled")
def test_describe_vpc_addresses(self):
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
start_count = len(data['Addresses'])
kwargs = {
'Domain': 'vpc',
}
resp, data = self.client.AllocateAddress(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ip = data['PublicIp']
id = data['AllocationId']
res_clean = self.addResourceCleanUp(self.client.ReleaseAddress,
AllocationId=id)
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(start_count + 1, len(data['Addresses']))
for address in data['Addresses']:
if address['AllocationId'] == id:
self.assertEqual('vpc', address['Domain'])
self.assertEqual(ip, address['PublicIp'])
break
else:
self.fail('Created address could not be found')
kwargs = {
'PublicIps': [ip],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Addresses']))
self.assertEqual(id, data['Addresses'][0]['AllocationId'])
kwargs = {
'AllocationIds': [id],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Addresses']))
self.assertEqual(ip, data['Addresses'][0]['PublicIp'])
kwargs = {
'PublicIps': ['invalidIp'],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
kwargs = {
'AllocationIds': ['eipalloc-00000000'],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidAllocationID.NotFound', data['Error']['Code'])
kwargs = {
'Domain': 'vpc',
}
resp, data = self.client.AllocateAddress(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
id2 = data['AllocationId']
res_clean2 = self.addResourceCleanUp(self.client.ReleaseAddress,
AllocationId=id2)
kwargs = {
'PublicIps': [ip],
'AllocationIds': [id2],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(2, len(data['Addresses']))
# NOTE(andrey-mp): wait abit before releasing
time.sleep(3)
resp, data = self.client.ReleaseAddress(AllocationId=id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
resp, data = self.client.ReleaseAddress(AllocationId=id2)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean2)
def test_describe_standard_addresses(self):
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
start_count = len(data['Addresses'])
resp, data = self.client.AllocateAddress(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ip = data['PublicIp']
res_clean = self.addResourceCleanUp(self.client.ReleaseAddress,
PublicIp=ip)
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(start_count + 1, len(data['Addresses']))
for address in data['Addresses']:
if address['PublicIp'] == ip:
self.assertEqual('standard', address['Domain'])
break
else:
self.fail('Created address could not be found')
kwargs = {
'PublicIps': [ip],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Addresses']))
self.assertEqual(ip, data['Addresses'][0]['PublicIp'])
kwargs = {
'PublicIps': ['invalidIp'],
}
resp, data = self.client.DescribeAddresses(*[], **kwargs)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
# NOTE(andrey-mp): wait abit before releasing
time.sleep(3)
resp, data = self.client.ReleaseAddress(PublicIp=ip)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
@testtools.skipUnless(CONF.aws.run_incompatible_tests, "VPC is disabled")
def test_associate_disassociate_vpc_addresses(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
aws_zone = CONF.aws.aws_zone
if not image_id:
raise self.skipException('aws image_id does not provided')
resp, data = self.client.CreateVpc(CidrBlock='10.2.0.0/20')
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
vpc_id = data['Vpc']['VpcId']
self.addResourceCleanUp(self.client.DeleteVpc, VpcId=vpc_id)
self.get_vpc_waiter().wait_available(vpc_id)
cidr = '10.2.0.0/24'
resp, data = self.client.CreateSubnet(VpcId=vpc_id, CidrBlock=cidr,
AvailabilityZone=aws_zone)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
subnet_id = data['Subnet']['SubnetId']
self.addResourceCleanUp(self.client.DeleteSubnet,
SubnetId=subnet_id)
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=instance_type, MinCount=1,
MaxCount=1, SubnetId=subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
kwargs = {
'Domain': 'vpc',
}
resp, data = self.client.AllocateAddress(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
alloc_id = data['AllocationId']
self.addResourceCleanUp(self.client.ReleaseAddress,
AllocationId=alloc_id)
resp, data = self.client.AssociateAddress(InstanceId=instance_id,
AllocationId=alloc_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('Gateway.NotAttached', data['Error']['Code'])
# Create internet gateway and try to associate again
resp, data = self.client.CreateInternetGateway()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
gw_id = data['InternetGateway']['InternetGatewayId']
self.addResourceCleanUp(self.client.DeleteInternetGateway,
InternetGatewayId=gw_id)
resp, data = self.client.AttachInternetGateway(VpcId=vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.addResourceCleanUp(self.client.DetachInternetGateway,
VpcId=vpc_id,
InternetGatewayId=gw_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.AssociateAddress(InstanceId=instance_id,
AllocationId=alloc_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
association_id = data['AssociationId']
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(instance_id, data['Addresses'][0]['InstanceId'])
kwargs = {
"AssociationId": association_id,
}
resp, data = self.client.DisassociateAddress(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertIsNone(data['Addresses'][0].get('InstanceId'))
def test_associate_disassociate_standard_addresses(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
if not image_id:
raise self.skipException('aws image_id does not provided')
resp, data = self.client.RunInstances(ImageId=image_id,
InstanceType=instance_type,
MinCount=1,
MaxCount=1)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.AllocateAddress(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ip = data['PublicIp']
self.addResourceCleanUp(self.client.ReleaseAddress,
PublicIp=ip)
resp, data = self.client.AssociateAddress(InstanceId=instance_id,
PublicIp=ip)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(instance_id, data['Addresses'][0]['InstanceId'])
resp, data = self.client.DisassociateAddress(PublicIp=ip)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.DescribeAddresses(*[], **{})
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertIsNone(data['Addresses'][0].get('InstanceId'))

View File

@ -18,14 +18,20 @@ import time
from tempest_lib.openstack.common import log
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class DhcpOptionsTest(base.EC2TestCase):
VPC_CIDR = '10.12.0.0/24'
vpc_id = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(DhcpOptionsTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
def test_create_delete_dhcp_options(self):
kwargs = {

View File

@ -0,0 +1,383 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import base64
from tempest_lib.openstack.common import log
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
class InstanceTest(base.EC2TestCase):
VPC_CIDR = '10.16.0.0/20'
vpc_id = None
SUBNET_CIDR = '10.16.0.0/24'
subnet_id = None
@classmethod
@base.safe_setup
def setUpClass(cls):
super(InstanceTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
if not CONF.aws.image_id:
raise cls.skipException('aws image_id does not provided')
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.vpc_id = data['Vpc']['VpcId']
cls.addResourceCleanUpStatic(cls.client.DeleteVpc, VpcId=cls.vpc_id)
cls.get_vpc_waiter().wait_available(cls.vpc_id)
aws_zone = CONF.aws.aws_zone
resp, data = cls.client.CreateSubnet(VpcId=cls.vpc_id,
CidrBlock=cls.SUBNET_CIDR,
AvailabilityZone=aws_zone)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))
assert 200 == resp.status_code
cls.subnet_id = data['Subnet']['SubnetId']
cls.addResourceCleanUpStatic(cls.client.DeleteSubnet,
SubnetId=cls.subnet_id)
cls.get_subnet_waiter().wait_available(cls.subnet_id)
def test_create_delete_instance(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=instance_type, MinCount=1,
MaxCount=1, SubnetId=self.subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.DescribeInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
reservations = data.get('Reservations', [])
self.assertNotEmpty(reservations)
instances = reservations[0].get('Instances', [])
self.assertNotEmpty(instances)
instance = instances[0]
self.assertEqual(self.vpc_id, instance['VpcId'])
self.assertEqual(self.subnet_id, instance['SubnetId'])
if CONF.aws.run_incompatible_tests:
self.assertTrue(instance['SourceDestCheck'])
self.assertEqual(1, len(instance['NetworkInterfaces']))
ni = instance['NetworkInterfaces'][0]
self.assertIsNone(ni['Description'])
self.assertEqual(1, len(ni['Groups']))
self.assertIsNotNone(ni['MacAddress'])
self.assertIsNotNone(ni['PrivateIpAddress'])
self.assertTrue(ni['SourceDestCheck'])
self.assertEqual('in-use', ni['Status'])
self.assertEqual(self.vpc_id, ni['VpcId'])
self.assertEqual(self.subnet_id, ni['SubnetId'])
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
# NOTE(andrey-mp): There is difference between Openstack and Amazon.
# Amazon returns instance in 'terminated' state some time after
# instance deletion. But Openstack doesn't return such instance.
def test_describe_instances_filter(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=instance_type, MinCount=1,
MaxCount=1, SubnetId=self.subnet_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
# NOTE(andrey-mp): by real id
resp, data = self.client.DescribeInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assert_instance(data, instance_id)
instances = data['Reservations'][0]['Instances']
private_dns = instances[0]['PrivateDnsName']
private_ip = instances[0]['PrivateIpAddress']
# NOTE(andrey-mp): by fake id
resp, data = self.client.DescribeInstances(InstanceIds=['i-0'])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidInstanceID.NotFound', data['Error']['Code'])
# NOTE(andrey-mp): by private ip
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'private-ip-address', 'Values': ['1.2.3.4']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['Reservations']))
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'private-ip-address', 'Values': [private_ip]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assert_instance(data, instance_id)
# NOTE(andrey-mp): by private dns
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'private-dns-name', 'Values': ['fake.com']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['Reservations']))
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'private-dns-name', 'Values': [private_dns]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assert_instance(data, instance_id)
# NOTE(andrey-mp): by subnet id
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'subnet-id', 'Values': ['subnet-0']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['Reservations']))
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'subnet-id', 'Values': [self.subnet_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assert_instance(data, instance_id)
# NOTE(andrey-mp): by vpc id
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'vpc-id', 'Values': ['vpc-0']}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(0, len(data['Reservations']))
resp, data = self.client.DescribeInstances(
Filters=[{'Name': 'vpc-id', 'Values': [self.vpc_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assert_instance(data, instance_id)
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
def assert_instance(self, data, instance_id):
reservations = data.get('Reservations', [])
self.assertNotEmpty(reservations)
instances = reservations[0].get('Instances', [])
self.assertNotEmpty(instances)
self.assertEqual(instance_id, instances[0]['InstanceId'])
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Amazon can create instance with several network interfaces in"
"one subnet. Openstack can't do it without additional configuration."
"Worked only from Juno with parameter in config - "
"nova.conf/neutron/allow_duplicate_networks = True")
def test_create_instance_with_interfaces(self):
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id1 = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id1)
self.get_network_interface_waiter().wait_available(ni_id1)
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id2 = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id2)
self.get_network_interface_waiter().wait_available(ni_id2)
kwargs = {
'ImageId': CONF.aws.image_id,
'InstanceType': CONF.aws.instance_type,
'MinCount': 1,
'MaxCount': 1,
'NetworkInterfaces': [{'NetworkInterfaceId': ni_id1,
'DeviceIndex': 0},
{'NetworkInterfaceId': ni_id2,
'DeviceIndex': 2}]
}
resp, data = self.client.RunInstances(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.DescribeInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
reservations = data.get('Reservations', [])
self.assertNotEmpty(reservations)
instances = reservations[0].get('Instances', [])
self.assertNotEmpty(instances)
instance = instances[0]
nis = instance.get('NetworkInterfaces', [])
self.assertEqual(2, len(nis))
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
def test_create_instance_with_private_ip(self):
ip = '10.16.0.12'
kwargs = {
'ImageId': CONF.aws.image_id,
'InstanceType': CONF.aws.instance_type,
'MinCount': 1,
'MaxCount': 1,
'SubnetId': self.subnet_id,
'PrivateIpAddress': ip
}
resp, data = self.client.RunInstances(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.DescribeInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
reservations = data.get('Reservations', [])
self.assertNotEmpty(reservations)
instances = reservations[0].get('Instances', [])
self.assertNotEmpty(instances)
instance = instances[0]
self.assertEqual(ip, instance['PrivateIpAddress'])
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)
def test_create_instance_with_invalid_params(self):
kwargs = {
'ImageId': CONF.aws.image_id,
'InstanceType': CONF.aws.instance_type,
'MinCount': 1,
'MaxCount': 1,
'PrivateIpAddress': '10.16.1.2'
}
resp, data = self.client.RunInstances(*[], **kwargs)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[data['Instances'][0]['InstanceId']])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterCombination', data['Error']['Code'])
kwargs = {
'ImageId': CONF.aws.image_id,
'InstanceType': CONF.aws.instance_type,
'MinCount': 1,
'MaxCount': 1,
'SubnetId': self.subnet_id,
'PrivateIpAddress': '10.16.1.12'
}
resp, data = self.client.RunInstances(*[], **kwargs)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[data['Instances'][0]['InstanceId']])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id1 = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id1)
self.get_network_interface_waiter().wait_available(ni_id1)
kwargs = {
'SubnetId': self.subnet_id,
}
resp, data = self.client.CreateNetworkInterface(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
ni_id2 = data['NetworkInterface']['NetworkInterfaceId']
self.addResourceCleanUp(self.client.DeleteNetworkInterface,
NetworkInterfaceId=ni_id2)
self.get_network_interface_waiter().wait_available(ni_id2)
if CONF.aws.run_incompatible_tests:
kwargs = {
'ImageId': CONF.aws.image_id,
'InstanceType': CONF.aws.instance_type,
'MinCount': 1,
'MaxCount': 1,
'NetworkInterfaces': [{'NetworkInterfaceId': ni_id1},
{'NetworkInterfaceId': ni_id2}]
}
resp, data = self.client.RunInstances(*[], **kwargs)
if resp.status_code == 200:
self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[data['Instances'][0]['InstanceId']])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidParameterValue', data['Error']['Code'])
def test_get_password_data_and_console_output(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
user_data = base64.b64encode('password_test=password')
resp, data = self.client.RunInstances(
ImageId=image_id, InstanceType=instance_type, MinCount=1,
MaxCount=1, SubnetId=self.subnet_id, UserData=user_data)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
res_clean = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
resp, data = self.client.GetPasswordData(InstanceId=instance_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(instance_id, data['InstanceId'])
self.assertIsNotNone(data['Timestamp'])
self.assertIn('PasswordData', data)
waiter = base.EC2Waiter(self.client.GetConsoleOutput)
waiter.wait_no_exception(InstanceId=instance_id)
resp, data = self.client.GetConsoleOutput(InstanceId=instance_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(instance_id, data['InstanceId'])
self.assertIsNotNone(data['Timestamp'])
self.assertIn('Output', data)
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_instance_waiter().wait_delete(instance_id)

View File

@ -36,6 +36,9 @@ class InternetGatewayTest(base.EC2TestCase):
@base.safe_setup
def setUpClass(cls):
super(InternetGatewayTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))

View File

@ -36,6 +36,9 @@ class NetworkInterfaceTest(base.EC2TestCase):
@base.safe_setup
def setUpClass(cls):
super(NetworkInterfaceTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))

View File

@ -0,0 +1,45 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
class RegionTest(base.EC2TestCase):
def test_describe_regions(self):
resp, data = self.client.DescribeRegions()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertNotEmpty(data['Regions'])
region = CONF.aws.aws_region
if not region:
return
regions = [r['RegionName'] for r in data['Regions']]
self.assertIn(region, regions)
def test_describe_zones(self):
resp, data = self.client.DescribeAvailabilityZones()
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertNotEmpty(data['AvailabilityZones'])
region = CONF.aws.aws_region
if not region:
return
# TODO(andrey-mp): add checking of other fields of returned data

View File

@ -16,7 +16,9 @@
from tempest_lib.openstack.common import log
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
LOG = log.getLogger(__name__)
@ -30,6 +32,9 @@ class RouteTest(base.EC2TestCase):
@base.safe_setup
def setUpClass(cls):
super(RouteTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))

View File

@ -35,6 +35,9 @@ class SecurityGroupTest(base.EC2TestCase):
@base.safe_setup
def setUpClass(cls):
super(SecurityGroupTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))

View File

@ -0,0 +1,233 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
class SnapshotTest(base.EC2TestCase):
def test_create_get_delete_snapshot(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_vol = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
desc = 'test snapshot'
kwargs = {
'VolumeId': volume_id,
'Description': desc
}
resp, data = self.client.CreateSnapshot(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
snapshot_id = data['SnapshotId']
res_clean = self.addResourceCleanUp(self.client.DeleteSnapshot,
SnapshotId=snapshot_id)
self.get_snapshot_waiter().wait_available(snapshot_id,
final_set=('completed'))
self.assertEqual(desc, data['Description'])
self.assertEqual(volume_id, data['VolumeId'])
self.assertEqual(1, data['VolumeSize'])
self.assertNotEmpty(data.get('State', ''))
if 'Encrypted' in data:
self.assertFalse(data['Encrypted'])
self.assertIsNotNone(data['StartTime'])
resp, data = self.client.DeleteSnapshot(SnapshotId=snapshot_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_snapshot_waiter().wait_delete(snapshot_id)
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vol)
self.get_volume_waiter().wait_delete(volume_id)
def test_describe_snapshots(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_vol = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
desc = 'test snapshot'
kwargs = {
'VolumeId': volume_id,
'Description': desc
}
resp, data = self.client.CreateSnapshot(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
snapshot_id = data['SnapshotId']
ownerId = data['OwnerId']
res_clean = self.addResourceCleanUp(self.client.DeleteSnapshot,
SnapshotId=snapshot_id)
self.get_snapshot_waiter().wait_available(snapshot_id,
final_set=('completed'))
self.assertEqual(desc, data['Description'])
self.assertEqual(volume_id, data['VolumeId'])
self.assertEqual(1, data['VolumeSize'])
self.assertNotEmpty(data.get('State', ''))
if 'Encrypted' in data:
self.assertFalse(data['Encrypted'])
self.assertIsNotNone(data['StartTime'])
resp, data = self.client.DescribeSnapshots(SnapshotIds=[snapshot_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Snapshots']))
data = data['Snapshots'][0]
self.assertEqual(snapshot_id, data['SnapshotId'])
self.assertEqual(desc, data['Description'])
self.assertEqual(volume_id, data['VolumeId'])
self.assertEqual(1, data['VolumeSize'])
self.assertNotEmpty(data.get('State', ''))
if 'Encrypted' in data:
self.assertFalse(data['Encrypted'])
self.assertIsNotNone(data['StartTime'])
resp, data = self.client.DescribeSnapshots(OwnerIds=[ownerId])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Snapshots']))
data = data['Snapshots'][0]
self.assertEqual(snapshot_id, data['SnapshotId'])
resp, data = self.client.DeleteSnapshot(SnapshotId=snapshot_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_snapshot_waiter().wait_delete(snapshot_id)
resp, data = self.client.DescribeSnapshots(SnapshotIds=[snapshot_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidSnapshot.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vol)
self.get_volume_waiter().wait_delete(volume_id)
def test_create_volume_from_snapshot(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_vol = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
vol1 = data
desc = 'test snapshot'
kwargs = {
'VolumeId': volume_id,
'Description': desc
}
resp, data = self.client.CreateSnapshot(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
snapshot_id = data['SnapshotId']
res_clean = self.addResourceCleanUp(self.client.DeleteSnapshot,
SnapshotId=snapshot_id)
self.get_snapshot_waiter().wait_available(snapshot_id,
final_set=('completed'))
kwargs = {
'SnapshotId': snapshot_id,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id2 = data['VolumeId']
clean_vol2 = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id2)
self.get_volume_waiter().wait_available(volume_id2)
self.assertNotEqual(volume_id, volume_id2)
self.assertEqual(vol1['Size'], data['Size'])
self.assertEqual(snapshot_id, data['SnapshotId'])
resp, data = self.client.DescribeVolumes(
Filters=[{'Name': 'snapshot-id', 'Values': [snapshot_id]}])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual(1, len(data['Volumes']))
self.assertEqual(volume_id2, data['Volumes'][0]['VolumeId'])
resp, data = self.client.DeleteSnapshot(SnapshotId=snapshot_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_snapshot_waiter().wait_delete(snapshot_id)
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vol)
self.get_volume_waiter().wait_delete(volume_id)
resp, data = self.client.DeleteVolume(VolumeId=volume_id2)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vol2)
self.get_volume_waiter().wait_delete(volume_id2)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Openstack can't delete volume with snapshots")
def test_delete_volume_with_snapshots(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_vol = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
desc = 'test snapshot'
kwargs = {
'VolumeId': volume_id,
'Description': desc
}
resp, data = self.client.CreateSnapshot(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
snapshot_id = data['SnapshotId']
res_clean = self.addResourceCleanUp(self.client.DeleteSnapshot,
SnapshotId=snapshot_id)
self.get_snapshot_waiter().wait_available(snapshot_id,
final_set=('completed'))
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vol)
self.get_volume_waiter().wait_delete(volume_id)
resp, data = self.client.DeleteSnapshot(SnapshotId=snapshot_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_snapshot_waiter().wait_delete(snapshot_id)

View File

@ -32,6 +32,9 @@ class SubnetTest(base.EC2TestCase):
@base.safe_setup
def setUpClass(cls):
super(SubnetTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
resp, data = cls.client.CreateVpc(CidrBlock=cls.VPC_CIDR)
if resp.status_code != 200:
LOG.error(base.EC2ErrorConverter(data))

View File

@ -0,0 +1,424 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import testtools
from ec2api.tests.functional import base
from ec2api.tests.functional import config
CONF = config.CONF
class VolumeTest(base.EC2TestCase):
def test_create_delete_volume(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
res_clean = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
if CONF.aws.run_incompatible_tests:
self.assertEqual('standard', data['VolumeType'])
self.assertEqual(1, data['Size'])
if 'Encrypted' in data:
self.assertFalse(data['Encrypted'])
if 'SnapshotId' in data:
self.assertIsNone(data['SnapshotId'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_volume_waiter().wait_delete(volume_id)
resp, data = self.client.DescribeVolumes(VolumeIds=[volume_id])
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVolume.NotFound', data['Error']['Code'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(400, resp.status_code)
self.assertEqual('InvalidVolume.NotFound', data['Error']['Code'])
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Encryption is not implemented")
def test_create_encrypted_volume(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone,
'Encrypted': True,
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
res_clean = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
self.assertTrue(data['Encrypted'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_volume_waiter().wait_delete(volume_id)
def test_describe_volumes(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
res_clean = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
resp, data = self.client.DescribeVolumes(VolumeIds=[volume_id])
self.assertEqual(200, resp.status_code)
self.assertEqual(1, len(data['Volumes']))
volume = data['Volumes'][0]
if CONF.aws.run_incompatible_tests:
self.assertEqual('standard', data['VolumeType'])
self.assertEqual(1, volume['Size'])
if 'Encrypted' in volume:
self.assertFalse(volume['Encrypted'])
if 'SnapshotId' in volume:
self.assertIsNone(volume['SnapshotId'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_volume_waiter().wait_delete(volume_id)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Volume statuses are not implemented")
def test_describe_volume_status(self):
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
res_clean = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
resp, data = self.client.DescribeVolumeStatus(VolumeIds=[volume_id])
self.assertEqual(200, resp.status_code)
self.assertEqual(1, len(data['VolumeStatuses']))
volume_status = data['VolumeStatuses'][0]
self.assertIn('Actions', volume_status)
self.assertIn('Events', volume_status)
self.assertIn('VolumeStatus', volume_status)
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(res_clean)
self.get_volume_waiter().wait_delete(volume_id)
def test_attach_detach_volume(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
if not image_id:
raise self.skipException('aws image_id does not provided')
kwargs = {
'ImageId': image_id,
'InstanceType': instance_type,
'MinCount': 1,
'MaxCount': 1,
'Placement': {'AvailabilityZone': CONF.aws.aws_zone}
}
resp, data = self.client.RunInstances(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
clean_i = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_v = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
kwargs = {
'Device': '/dev/sdh',
'InstanceId': instance_id,
'VolumeId': volume_id,
}
resp, data = self.client.AttachVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
clean_vi = self.addResourceCleanUp(self.client.DetachVolume,
VolumeId=volume_id)
self.get_volume_attachment_waiter().wait_available(
volume_id, final_set=('attached'))
resp, data = self.client.DescribeVolumes(VolumeIds=[volume_id])
self.assertEqual(200, resp.status_code)
self.assertEqual(1, len(data['Volumes']))
volume = data['Volumes'][0]
self.assertEqual('in-use', volume['State'])
self.assertEqual(1, len(volume['Attachments']))
attachment = volume['Attachments'][0]
self.assertFalse(attachment['DeleteOnTermination'])
self.assertIsNotNone(attachment['Device'])
self.assertEqual(instance_id, attachment['InstanceId'])
self.assertEqual(volume_id, attachment['VolumeId'])
resp, data = self.client.DetachVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vi)
self.get_volume_attachment_waiter().wait_delete(volume_id)
resp, data = self.client.DescribeVolumes(VolumeIds=[volume_id])
self.assertEqual(200, resp.status_code)
self.assertEqual(1, len(data['Volumes']))
volume = data['Volumes'][0]
self.assertEqual('available', volume['State'])
self.assertEqual(0, len(volume['Attachments']))
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_v)
self.get_volume_waiter().wait_delete(volume_id)
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_i)
self.get_instance_waiter().wait_delete(instance_id)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"Volume statuses are not implemented")
def test_delete_detach_attached_volume(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
if not image_id:
raise self.skipException('aws image_id does not provided')
kwargs = {
'ImageId': image_id,
'InstanceType': instance_type,
'MinCount': 1,
'MaxCount': 1,
'Placement': {'AvailabilityZone': CONF.aws.aws_zone}
}
resp, data = self.client.RunInstances(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
clean_i = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_v = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
kwargs = {
'Device': '/dev/sdh',
'InstanceId': instance_id,
'VolumeId': volume_id,
}
resp, data = self.client.AttachVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
clean_vi = self.addResourceCleanUp(self.client.DetachVolume,
VolumeId=volume_id)
self.get_volume_attachment_waiter().wait_available(
volume_id, final_set=('attached'))
resp, data = self.client.AttachVolume(*[], **kwargs)
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('VolumeInUse', data['Error']['Code'])
kwargs['Device'] = '/dev/sdi'
resp, data = self.client.AttachVolume(*[], **kwargs)
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('VolumeInUse', data['Error']['Code'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('VolumeInUse', data['Error']['Code'])
resp, data = self.client.DetachVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_vi)
self.get_volume_attachment_waiter().wait_delete(volume_id)
resp, data = self.client.DetachVolume(VolumeId=volume_id)
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('IncorrectState', data['Error']['Code'])
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_v)
self.get_volume_waiter().wait_delete(volume_id)
resp, data = self.client.DetachVolume(VolumeId=volume_id)
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('InvalidVolume.NotFound', data['Error']['Code'])
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_i)
self.get_instance_waiter().wait_delete(instance_id)
def test_volume_auto_termination_swithed_off(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
if not image_id:
raise self.skipException('aws image_id does not provided')
kwargs = {
'ImageId': image_id,
'InstanceType': instance_type,
'MinCount': 1,
'MaxCount': 1,
'Placement': {'AvailabilityZone': CONF.aws.aws_zone}
}
resp, data = self.client.RunInstances(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
clean_i = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
clean_v = self.addResourceCleanUp(self.client.DeleteVolume,
VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
kwargs = {
'Device': '/dev/sdh',
'InstanceId': instance_id,
'VolumeId': volume_id,
}
resp, data = self.client.AttachVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.addResourceCleanUp(self.client.DetachVolume, VolumeId=volume_id)
self.get_volume_attachment_waiter().wait_available(
volume_id, final_set=('attached'))
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_i)
self.get_instance_waiter().wait_delete(instance_id)
resp, data = self.client.DescribeVolumes(VolumeIds=[volume_id])
self.assertEqual(200, resp.status_code)
self.assertEqual(1, len(data['Volumes']))
volume = data['Volumes'][0]
self.assertEqual('available', volume['State'])
if 'Attachments' in volume:
self.assertEqual(0, len(volume['Attachments']))
resp, data = self.client.DeleteVolume(VolumeId=volume_id)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_v)
self.get_volume_waiter().wait_delete(volume_id)
@testtools.skipUnless(CONF.aws.run_incompatible_tests,
"ModifyInstanceAttribute is not implemented")
def test_volume_auto_termination_swithed_on(self):
instance_type = CONF.aws.instance_type
image_id = CONF.aws.image_id
if not image_id:
raise self.skipException('aws image_id does not provided')
kwargs = {
'ImageId': image_id,
'InstanceType': instance_type,
'MinCount': 1,
'MaxCount': 1,
'Placement': {'AvailabilityZone': CONF.aws.aws_zone}
}
resp, data = self.client.RunInstances(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
instance_id = data['Instances'][0]['InstanceId']
clean_i = self.addResourceCleanUp(self.client.TerminateInstances,
InstanceIds=[instance_id])
self.get_instance_waiter().wait_available(instance_id,
final_set=('running'))
kwargs = {
'Size': 1,
'AvailabilityZone': CONF.aws.aws_zone
}
resp, data = self.client.CreateVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
volume_id = data['VolumeId']
self.addResourceCleanUp(self.client.DeleteVolume, VolumeId=volume_id)
self.get_volume_waiter().wait_available(volume_id)
kwargs = {
'Device': '/dev/sdh',
'InstanceId': instance_id,
'VolumeId': volume_id,
}
resp, data = self.client.AttachVolume(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.addResourceCleanUp(self.client.DetachVolume, VolumeId=volume_id)
self.get_volume_attachment_waiter().wait_available(
volume_id, final_set=('attached'))
kwargs = {
'InstanceId': instance_id,
'BlockDeviceMappings': [{'DeviceName': '/dev/sdh',
'Ebs': {'VolumeId': volume_id,
'DeleteOnTermination': True}}],
}
resp, data = self.client.ModifyInstanceAttribute(*[], **kwargs)
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
resp, data = self.client.TerminateInstances(InstanceIds=[instance_id])
self.assertEqual(200, resp.status_code, base.EC2ErrorConverter(data))
self.cancelResourceCleanUp(clean_i)
self.get_instance_waiter().wait_delete(instance_id)
resp, data = self.client.DescribeVolumes(VolumeIds=[volume_id])
self.assertEqual(400, resp.status_code, base.EC2ErrorConverter(data))
self.assertEqual('InvalidVolume.NotFound', data['Error']['Code'])

View File

@ -23,6 +23,13 @@ CONF = config.CONF
class VPCTest(base.EC2TestCase):
@classmethod
@base.safe_setup
def setUpClass(cls):
super(VPCTest, cls).setUpClass()
if not CONF.aws.vpc_enabled:
raise cls.skipException('VPC is disabled')
def test_create_delete_vpc(self):
cidr = '10.1.0.0/16'
resp, data = self.client.CreateVpc(CidrBlock=cidr)

View File

@ -59,6 +59,9 @@ AWSGroup = [
cfg.StrOpt('image_id',
default=None,
help="Image ID for instance running"),
cfg.BoolOpt('vpc_enabled',
default=False,
help='Will run all tests that works with VPC.'),
cfg.BoolOpt('run_incompatible_tests',
default=False,
help='Will run all tests plus incompatible with Amazon.'),

View File

@ -169,9 +169,9 @@ def get_elapsed_time_color(elapsed_time):
return 'green'
class NovaTestResult(testtools.TestResult):
class EC2ApiTestResult(testtools.TestResult):
def __init__(self, stream, descriptions, verbosity):
super(NovaTestResult, self).__init__()
super(EC2ApiTestResult, self).__init__()
self.stream = stream
self.showAll = verbosity > 1
self.num_slow_tests = 10
@ -225,26 +225,26 @@ class NovaTestResult(testtools.TestResult):
self.colorizer.write(short_result, color)
def addSuccess(self, test):
super(NovaTestResult, self).addSuccess(test)
super(EC2ApiTestResult, self).addSuccess(test)
self._addResult(test, 'OK', 'green', '.', True)
def addFailure(self, test, err):
if test.id() == 'process-returncode':
return
super(NovaTestResult, self).addFailure(test, err)
super(EC2ApiTestResult, self).addFailure(test, err)
self._addResult(test, 'FAIL', 'red', 'F', False)
def addError(self, test, err):
super(NovaTestResult, self).addFailure(test, err)
super(EC2ApiTestResult, self).addFailure(test, err)
self._addResult(test, 'ERROR', 'red', 'E', False)
def addSkip(self, test, reason=None, details=None):
super(NovaTestResult, self).addSkip(test, reason, details)
super(EC2ApiTestResult, self).addSkip(test, reason, details)
self._addResult(test, 'SKIP', 'blue', 'S', True)
def startTest(self, test):
self.start_time = self._now()
super(NovaTestResult, self).startTest(test)
super(EC2ApiTestResult, self).startTest(test)
def writeTestCase(self, cls):
if not self.results.get(cls):
@ -323,7 +323,7 @@ test = subunit.ProtocolTestCase(sys.stdin, passthrough=None)
if sys.version_info[0:2] <= (2, 6):
runner = unittest.TextTestRunner(verbosity=2)
else:
runner = unittest.TextTestRunner(verbosity=2, resultclass=NovaTestResult)
runner = unittest.TextTestRunner(verbosity=2, resultclass=EC2ApiTestResult)
if runner.run(test).wasSuccessful():
exit_code = 0