Reworked initial parameters validation mechanism

Change-Id: Icf9be8a9d1a70365be21c40e684a3184095947c2
This commit is contained in:
alevine 2014-12-29 00:42:11 +04:00
parent 28756f3ad6
commit fa1397fe10
11 changed files with 293 additions and 166 deletions

View File

@ -19,7 +19,6 @@ import functools
import hashlib import hashlib
import sys import sys
import netaddr
from oslo.config import cfg from oslo.config import cfg
import requests import requests
import six import six
@ -330,40 +329,95 @@ class Requestify(wsgi.Middleware):
return self.application return self.application
def validate_ec2_id(val):
if not validator.validate_str()(val):
return False
try:
ec2utils.ec2_id_to_id(val)
except exception.InvalidId:
return False
return True
def is_valid_ipv4(address):
"""Verify that address represents a valid IPv4 address."""
try:
return netaddr.valid_ipv4(address)
except Exception:
return False
class Validator(wsgi.Middleware): class Validator(wsgi.Middleware):
validator.validate_ec2_id = validate_ec2_id
validator.DEFAULT_VALIDATOR = { validator.DEFAULT_VALIDATOR = {
'instance_id': validate_ec2_id, 'AllocationId': validator.validate_ec2_id(['eipalloc']),
'volume_id': validate_ec2_id, 'AllowReassignment': validator.validate_dummy,
'image_id': validate_ec2_id, 'AllowReassociation': validator.validate_dummy,
'attribute': validator.validate_str(), 'Architecture': validator.validate_dummy,
'image_location': validator.validate_image_path, 'AssociationId': validator.validate_ec2_association_id,
'public_ip': is_valid_ipv4, 'AttachmentId': validator.validate_ec2_id(['eni-attach']),
'region_name': validator.validate_str(), 'Attribute': validator.validate_dummy,
'group_name': validator.validate_str(max_length=255), 'AvailabilityZone': validator.validate_dummy,
'group_description': validator.validate_str(max_length=255), 'BlockDeviceMapping': validator.validate_dummy,
'size': validator.validate_int(), 'CidrBlock': validator.validate_cidr_block,
'user_data': validator.validate_user_data 'ClientToken': validator.validate_dummy,
'Description': validator.validate_dummy,
'DestinationCidrBlock': validator.validate_cidr_block,
'Device': validator.validate_dummy,
'DeviceIndex': validator.validate_dummy,
'DhcpConfiguration': validator.validate_dummy,
'Dhcp_optionsId': validator.validate_dummy,
'DisableApiTermination': validator.validate_dummy,
'Domain': validator.validate_dummy,
'Ebs_optimized': validator.validate_dummy,
'Encrypted': validator.validate_dummy,
'ExecutableBy': validator.validate_dummy,
'Filter': validator.validate_dummy,
'Force': validator.validate_dummy,
'GatewayId': validator.validate_dummy,
'GroupDescription': validator.validate_str(max_length=255),
'GroupId': validator.validate_ec2_id(['sg']),
'GroupName': validator.validate_str(max_length=255),
'IamInstanceProfile': validator.validate_dummy,
'ImageId': validator.validate_ec2_id(['ami', 'ari', 'aki']),
'ImageLocation': validator.validate_image_path,
'InstanceId': validator.validate_dummy,
'InstanceInitiatedShutdownBehavior': validator.validate_dummy,
'InstanceTenancy': validator.validate_dummy,
'InstanceType': validator.validate_dummy,
'InternetGatewayId': validator.validate_dummy,
'Iops': validator.validate_dummy,
'IpPermissions': validator.validate_dummy,
'KernelId': validator.validate_dummy,
'KeyName': validator.validate_dummy,
'KmsKeyId': validator.validate_dummy,
'LaunchPermission': validator.validate_dummy,
'MaxCount': validator.validate_dummy,
'MaxResults': validator.validate_dummy,
'Metadata': validator.validate_dummy,
'MinCount': validator.validate_dummy,
'Monitoring': validator.validate_dummy,
'Name': validator.validate_dummy,
'NetworkInterface': validator.validate_dummy,
'NetworkInterfaceId': validator.validate_dummy,
'NextToken': validator.validate_dummy,
'NoReboot': validator.validate_dummy,
'OperationType': validator.validate_dummy,
'Owner': validator.validate_dummy,
'Placement': validator.validate_dummy,
'PrivateIpAddress': validator.validate_dummy,
'PrivateIpAddresses': validator.validate_dummy,
'ProductCode': validator.validate_dummy,
'PublicIp': validator.validate_ipv4,
'PublicKey_material': validator.validate_dummy,
'RamdiskId': validator.validate_dummy,
'RemoteIpPrefix': validator.validate_dummy,
'RegionName': validator.validate_str(),
'ResourceId': validator.validate_dummy,
'RestorableBy': validator.validate_dummy,
'RootDeviceName': validator.validate_dummy,
'RouteTableId': validator.validate_dummy,
'SecondaryPrivateIpAddressCount': validator.validate_dummy,
'SecurityGroup': validator.validate_dummy,
'SecurityGroupId': validator.validate_dummy,
'Size': validator.validate_int(),
'SnapshotId': validator.validate_dummy,
'SourceDestCheck': validator.validate_dummy,
'SriovNetSupport': validator.validate_dummy,
'SubnetId': validator.validate_dummy,
'Tag': validator.validate_dummy,
'UserData': validator.validate_user_data,
'UserGroup': validator.validate_dummy,
'UserId': validator.validate_dummy,
'Value': validator.validate_dummy,
'VirtualizationType': validator.validate_dummy,
'VolumeId': validator.validate_dummy,
'VolumeType': validator.validate_dummy,
'VpcId': validator.validate_dummy,
'VpcPeeringConnectionId': validator.validate_dummy,
'ZoneName': validator.validate_dummy,
} }
def __init__(self, application): def __init__(self, application):
@ -371,11 +425,15 @@ class Validator(wsgi.Middleware):
@webob.dec.wsgify(RequestClass=wsgi.Request) @webob.dec.wsgify(RequestClass=wsgi.Request)
def __call__(self, req): def __call__(self, req):
if validator.validate(req.environ['ec2.request'].args, try:
if validator.validate(req.environ['ec2.request'],
validator.DEFAULT_VALIDATOR): validator.DEFAULT_VALIDATOR):
return self.application return self.application
else: else:
raise webob.exc.HTTPBadRequest() raise webob.exc.HTTPBadRequest()
except Exception as ex:
return ec2_error_ex(
ex, req, unexpected=not isinstance(ex, exception.EC2Exception))
def exception_to_ec2code(ex): def exception_to_ec2code(ex):

View File

@ -14,8 +14,6 @@
import re import re
import netaddr
from ec2api import context from ec2api import context
from ec2api.db import api as db_api from ec2api.db import api as db_api
from ec2api import exception from ec2api import exception
@ -329,65 +327,3 @@ def os_id_to_ec2_id(context, kind, os_id, items_by_os_id=None,
if ids_by_os_id is not None: if ids_by_os_id is not None:
ids_by_os_id[os_id] = item_id ids_by_os_id[os_id] = item_id
return item_id return item_id
def _is_valid_cidr(address):
"""Check if address is valid
The provided address can be a IPv6 or a IPv4
CIDR address.
"""
try:
# Validate the correct CIDR Address
netaddr.IPNetwork(address)
except netaddr.core.AddrFormatError:
return False
except UnboundLocalError:
# NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in
# https://github.com/drkjam/netaddr/issues/2)
return False
# Prior validation partially verify /xx part
# Verify it here
ip_segment = address.split('/')
if (len(ip_segment) <= 1 or
ip_segment[1] == ''):
return False
return True
def validate_cidr_with_ipv6(cidr, parameter_name):
invalid_format_exception = exception.InvalidParameterValue(
value=cidr,
parameter=parameter_name,
reason='This is not a valid CIDR block.')
if not _is_valid_cidr(cidr):
raise invalid_format_exception
_cidr_re = re.compile("^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$")
def validate_cidr(cidr, parameter_name):
invalid_format_exception = exception.InvalidParameterValue(
value=cidr,
parameter=parameter_name,
reason='This is not a valid CIDR block.')
if not _cidr_re.match(cidr):
raise invalid_format_exception
address, size = cidr.split("/")
octets = address.split(".")
if any(int(octet) > 255 for octet in octets):
raise invalid_format_exception
size = int(size)
if size > 32:
raise invalid_format_exception
def validate_vpc_cidr(cidr, invalid_cidr_exception_class):
validate_cidr(cidr, 'cidrBlock')
size = int(cidr.split("/")[-1])
if size > 28 or size < 16:
raise invalid_cidr_exception_class(cidr_block=cidr)

View File

@ -278,7 +278,6 @@ def _delete_route_table(context, route_table_id, vpc=None, cleaner=None):
def _set_route(context, route_table_id, destination_cidr_block, def _set_route(context, route_table_id, destination_cidr_block,
gateway_id, instance_id, network_interface_id, gateway_id, instance_id, network_interface_id,
vpc_peering_connection_id, do_replace): vpc_peering_connection_id, do_replace):
ec2utils.validate_cidr(destination_cidr_block, 'destinationCidrBlock')
route_table = ec2utils.get_db_item(context, 'rtb', route_table_id) route_table = ec2utils.get_db_item(context, 'rtb', route_table_id)
vpc = db_api.get_item_by_id(context, 'vpc', route_table['vpc_id']) vpc = db_api.get_item_by_id(context, 'vpc', route_table['vpc_id'])
vpc_ipnet = netaddr.IPNetwork(vpc['cidr_block']) vpc_ipnet = netaddr.IPNetwork(vpc['cidr_block'])

View File

@ -27,6 +27,7 @@ from ec2api.api import clients
from ec2api.api import common from ec2api.api import common
from ec2api.api import ec2utils from ec2api.api import ec2utils
from ec2api.api import utils from ec2api.api import utils
from ec2api.api import validator
from ec2api.db import api as db_api from ec2api.db import api as db_api
from ec2api import exception from ec2api import exception
from ec2api.openstack.common.gettextutils import _ from ec2api.openstack.common.gettextutils import _
@ -248,7 +249,7 @@ def _build_rules(context, group_id, group_name, ip_permissions, direction):
elif rule.get('ip_ranges'): elif rule.get('ip_ranges'):
os_security_group_rule_body['remote_ip_prefix'] = ( os_security_group_rule_body['remote_ip_prefix'] = (
rule['ip_ranges'][0]['cidr_ip']) rule['ip_ranges'][0]['cidr_ip'])
ec2utils.validate_cidr_with_ipv6( validator.validate_cidr_with_ipv6(
os_security_group_rule_body['remote_ip_prefix'], 'cidr_ip') os_security_group_rule_body['remote_ip_prefix'], 'cidr_ip')
else: else:
raise exception.MissingParameter(param='source group or cidr') raise exception.MissingParameter(param='source group or cidr')

View File

@ -39,8 +39,6 @@ LOG = logging.getLogger(__name__)
def create_subnet(context, vpc_id, cidr_block, def create_subnet(context, vpc_id, cidr_block,
availability_zone=None): availability_zone=None):
ec2utils.validate_vpc_cidr(cidr_block, exception.InvalidSubnetRange)
vpc = ec2utils.get_db_item(context, 'vpc', vpc_id) vpc = ec2utils.get_db_item(context, 'vpc', vpc_id)
vpc_ipnet = netaddr.IPNetwork(vpc['cidr_block']) vpc_ipnet = netaddr.IPNetwork(vpc['cidr_block'])
subnet_ipnet = netaddr.IPNetwork(cidr_block) subnet_ipnet = netaddr.IPNetwork(cidr_block)

View File

@ -15,6 +15,9 @@
import base64 import base64
import re import re
import netaddr
from ec2api import exception
from ec2api.openstack.common.gettextutils import _ from ec2api.openstack.common.gettextutils import _
from ec2api.openstack.common import log as logging from ec2api.openstack.common import log as logging
@ -37,21 +40,26 @@ def _get_path_validator_regex():
VALIDATE_PATH_RE = _get_path_validator_regex() VALIDATE_PATH_RE = _get_path_validator_regex()
def validate_dummy(val, **kwargs):
return True
def validate_str(max_length=None): def validate_str(max_length=None):
def _do(val): def _do(val, parameter_name, **kwargs):
if not isinstance(val, basestring): if (isinstance(val, basestring) and
return False (max_length is None or max_length and len(val) <= max_length)):
if max_length and len(val) > max_length:
return False
return True return True
raise exception.ValidationError(
reason=_("%s should not be greater "
"than 255 characters.") % parameter_name)
return _do return _do
def validate_int(max_value=None): def validate_int(max_value=None):
def _do(val): def _do(val, **kwargs):
if not isinstance(val, int): if not isinstance(val, int):
return False return False
if max_value and val > max_value: if max_value and val > max_value:
@ -61,7 +69,7 @@ def validate_int(max_value=None):
return _do return _do
def validate_url_path(val): def validate_url_path(val, **kwargs):
"""True if val is matched by the path component grammar in rfc3986.""" """True if val is matched by the path component grammar in rfc3986."""
if not validate_str()(val): if not validate_str()(val):
@ -70,7 +78,7 @@ def validate_url_path(val):
return VALIDATE_PATH_RE.match(val).end() == len(val) return VALIDATE_PATH_RE.match(val).end() == len(val)
def validate_image_path(val): def validate_image_path(val, **kwargs):
if not validate_str()(val): if not validate_str()(val):
return False return False
@ -90,7 +98,7 @@ def validate_image_path(val):
return True return True
def validate_user_data(user_data): def validate_user_data(user_data, **kwargs):
"""Check if the user_data is encoded properly.""" """Check if the user_data is encoded properly."""
try: try:
user_data = base64.b64decode(user_data) user_data = base64.b64decode(user_data)
@ -99,7 +107,117 @@ def validate_user_data(user_data):
return True return True
def validate(args, validator): def _is_valid_cidr(address):
"""Check if address is valid
The provided address can be a IPv6 or a IPv4
CIDR address.
"""
try:
# Validate the correct CIDR Address
netaddr.IPNetwork(address)
except netaddr.core.AddrFormatError:
return False
except UnboundLocalError:
# NOTE(MotoKen): work around bug in netaddr 0.7.5 (see detail in
# https://github.com/drkjam/netaddr/issues/2)
return False
# Prior validation partially verify /xx part
# Verify it here
ip_segment = address.split('/')
if (len(ip_segment) <= 1 or
ip_segment[1] == ''):
return False
return True
def validate_cidr_with_ipv6(cidr, parameter_name, **kwargs):
invalid_format_exception = exception.InvalidParameterValue(
value=cidr,
parameter=parameter_name,
reason='This is not a valid CIDR block.')
if not _is_valid_cidr(cidr):
raise invalid_format_exception
return True
_cidr_re = re.compile("^([0-9]{1,3}\.){3}[0-9]{1,3}/[0-9]{1,2}$")
def validate_cidr(cidr, parameter_name, **kwargs):
invalid_format_exception = exception.InvalidParameterValue(
value=cidr,
parameter=parameter_name,
reason='This is not a valid CIDR block.')
if not _cidr_re.match(cidr):
raise invalid_format_exception
address, size = cidr.split("/")
octets = address.split(".")
if any(int(octet) > 255 for octet in octets):
raise invalid_format_exception
size = int(size)
if size > 32:
raise invalid_format_exception
return True
def validate_cidr_block(cidr, action, **kwargs):
validate_cidr(cidr, 'cidrBlock')
size = int(cidr.split("/")[-1])
if size > 28 or size < 16:
if action == 'CreateVpc':
raise exception.InvalidVpcRange(cidr_block=cidr)
elif action == 'CreateSubnet':
raise exception.InvalidSubnetRange(cidr_block=cidr)
return True
# NOTE(Alex) Unfortunately Amazon returns various kinds of error for invalid
# IDs (...ID.Malformed, ...Id.Malformed, ...ID.NotFound, InvalidParameterValue)
# So we decided here to commonize invalid IDs to InvalidParameterValue error.
def validate_ec2_id(prefices):
def _do(val, parameter_name, **kwargs):
if not validate_str()(val, parameter_name, **kwargs):
return False
try:
prefix, value = val.rsplit('-', 1)
int(value, 16)
if prefix in prefices:
return True
except Exception:
pass
raise exception.InvalidParameterValue(
value=val, parameter=parameter_name,
reason=_('Expected: %(prefix)s-...') % {'prefix': prefices[0]})
return _do
def validate_ec2_association_id(id, parameter_name, action):
if action == 'DisassociateAddress':
return validate_ec2_id(['eipassoc'])(id, parameter_name)
else:
return validate_ec2_id(['rtbassoc'])(id, parameter_name)
def validate_ipv4(address, parameter_name, **kwargs):
"""Verify that address represents a valid IPv4 address."""
try:
if netaddr.valid_ipv4(address):
return True
except Exception:
pass
raise exception.InvalidParameterValue(
value=address, parameter=parameter_name,
reason=_('Not a valid IP address'))
def validate(request, validator):
"""Validate values of args against validators in validator. """Validate values of args against validators in validator.
:param args: Dict of values to be validated. :param args: Dict of values to be validated.
@ -117,14 +235,15 @@ def validate(args, validator):
""" """
for key in validator: args = request.args
if key not in args: for key in args:
if key not in validator:
continue continue
f = validator[key] f = validator[key]
assert callable(f) assert callable(f)
if not f(args[key]): if not f(args[key], parameter_name=key, action=request.action):
LOG.debug(_("%(key)s with value %(value)s failed" LOG.debug(_("%(key)s with value %(value)s failed"
" validator %(name)s"), " validator %(name)s"),
{'key': key, 'value': args[key], 'name': f.__name__}) {'key': key, 'value': args[key], 'name': f.__name__})

View File

@ -39,7 +39,6 @@ LOG = logging.getLogger(__name__)
def create_vpc(context, cidr_block, instance_tenancy='default'): def create_vpc(context, cidr_block, instance_tenancy='default'):
ec2utils.validate_vpc_cidr(cidr_block, exception.InvalidVpcRange)
neutron = clients.neutron(context) neutron = clients.neutron(context)
# TODO(Alex): Handle errors like overlimit # TODO(Alex): Handle errors like overlimit
# TODO(ft) dhcp_options_id # TODO(ft) dhcp_options_id

View File

@ -245,14 +245,14 @@ class AddressTestCase(base.ApiTestCase):
do_check({}, do_check({},
'MissingParameter') 'MissingParameter')
do_check({'PublicIp': 'fake_ip', do_check({'PublicIp': '0.0.0.0',
'AllocationId': 'fake_allocation_id'}, 'AllocationId': 'eipalloc-0'},
'InvalidParameterCombination') 'InvalidParameterCombination')
do_check({'PublicIp': 'fake_ip'}, do_check({'PublicIp': '0.0.0.0'},
'MissingParameter') 'MissingParameter')
do_check({'AllocationId': 'fake_allocation_id'}, do_check({'AllocationId': 'eipalloc-0'},
'MissingParameter') 'MissingParameter')
def test_associate_address_invalid_ec2_classic_parameters(self): def test_associate_address_invalid_ec2_classic_parameters(self):
@ -261,7 +261,7 @@ class AddressTestCase(base.ApiTestCase):
# NOTE(ft): ec2 classic instance vs allocation_id parameter # NOTE(ft): ec2 classic instance vs allocation_id parameter
self.db_api.get_items.return_value = [] self.db_api.get_items.return_value = []
resp = self.execute('AssociateAddress', resp = self.execute('AssociateAddress',
{'AllocationId': 'fake_allocation_id', {'AllocationId': 'eipalloc-0',
'InstanceId': fakes.ID_EC2_INSTANCE_1}) 'InstanceId': fakes.ID_EC2_INSTANCE_1})
self.assertEqual(400, resp['status']) self.assertEqual(400, resp['status'])
self.assertEqual('InvalidParameterCombination', resp['Error']['Code']) self.assertEqual('InvalidParameterCombination', resp['Error']['Code'])
@ -289,7 +289,7 @@ class AddressTestCase(base.ApiTestCase):
# NOTE(ft): vpc instance vs public ip parmeter # NOTE(ft): vpc instance vs public ip parmeter
self.db_api.get_items.return_value = [fakes.DB_NETWORK_INTERFACE_2] self.db_api.get_items.return_value = [fakes.DB_NETWORK_INTERFACE_2]
do_check({'PublicIp': 'fake_ip', do_check({'PublicIp': '0.0.0.0',
'InstanceId': fakes.ID_EC2_INSTANCE_1}, 'InstanceId': fakes.ID_EC2_INSTANCE_1},
'InvalidParameterCombination') 'InvalidParameterCombination')
@ -474,8 +474,8 @@ class AddressTestCase(base.ApiTestCase):
do_check({}, do_check({},
'MissingParameter') 'MissingParameter')
do_check({'PublicIp': 'fake_ip', do_check({'PublicIp': '0.0.0.0',
'AssociationId': 'fake_allocation_id'}, 'AssociationId': 'eipassoc-0'},
'InvalidParameterCombination') 'InvalidParameterCombination')
# NOTE(ft): vpc address vs public ip parameter # NOTE(ft): vpc address vs public ip parameter
@ -557,8 +557,8 @@ class AddressTestCase(base.ApiTestCase):
do_check({}, do_check({},
'MissingParameter') 'MissingParameter')
do_check({'PublicIp': 'fake_ip', do_check({'PublicIp': '0.0.0.0',
'AllocationId': 'fake_allocation_id'}, 'AllocationId': 'eipalloc-0'},
'InvalidParameterCombination') 'InvalidParameterCombination')
# NOTE(ft): vpc address vs public ip parameter # NOTE(ft): vpc address vs public ip parameter

View File

@ -19,10 +19,46 @@ import datetime
import testtools import testtools
from ec2api.api import ec2utils from ec2api.api import ec2utils
from ec2api.api import validator
from ec2api import exception from ec2api import exception
from ec2api.openstack.common import timeutils from ec2api.openstack.common import timeutils
class EC2ValidationTestCase(testtools.TestCase):
"""Test case for various validations."""
def test_validate_cidr(self):
self.assertEqual(True, validator.validate_cidr('10.10.0.0/24', 'cidr'))
def check_raise_invalid_parameter(cidr):
self.assertRaises(exception.InvalidParameterValue,
validator.validate_cidr, cidr, 'cidr')
check_raise_invalid_parameter('fake')
check_raise_invalid_parameter('10.10/24')
check_raise_invalid_parameter('10.10.0.0.0/24')
check_raise_invalid_parameter('10.10.0.0')
check_raise_invalid_parameter(' 10.10.0.0/24')
check_raise_invalid_parameter('10.10.0.0/24 ')
check_raise_invalid_parameter('.10.10.0.0/24 ')
check_raise_invalid_parameter('-1.10.0.0/24')
check_raise_invalid_parameter('10.256.0.0/24')
check_raise_invalid_parameter('10.10.0.0/33')
check_raise_invalid_parameter('10.10.0.0/-1')
def check_raise_invalid_vpc_range(cidr, ex_class, action):
self.assertRaises(ex_class,
validator.validate_cidr_block, cidr,
action)
check_raise_invalid_vpc_range('10.10.0.0/15',
exception.InvalidSubnetRange,
'CreateSubnet')
check_raise_invalid_vpc_range('10.10.0.0/29',
exception.InvalidVpcRange,
'CreateVpc')
class EC2TimestampValidationTestCase(testtools.TestCase): class EC2TimestampValidationTestCase(testtools.TestCase):
"""Test case for EC2 request timestamp validation.""" """Test case for EC2 request timestamp validation."""

View File

@ -116,32 +116,3 @@ class EC2UtilsTestCase(testtools.TestCase):
check_not_found('sg', exception.InvalidGroupNotFound) check_not_found('sg', exception.InvalidGroupNotFound)
check_not_found('rtb', exception.InvalidRouteTableIDNotFound) check_not_found('rtb', exception.InvalidRouteTableIDNotFound)
check_not_found('i', exception.InvalidInstanceIDNotFound) check_not_found('i', exception.InvalidInstanceIDNotFound)
def test_validate_cidr(self):
self.assertIsNone(ec2utils.validate_cidr('10.10.0.0/24', 'cidr'))
def check_raise_invalid_parameter(cidr):
self.assertRaises(exception.InvalidParameterValue,
ec2utils.validate_cidr, cidr, 'cidr')
check_raise_invalid_parameter('fake')
check_raise_invalid_parameter('10.10/24')
check_raise_invalid_parameter('10.10.0.0.0/24')
check_raise_invalid_parameter('10.10.0.0')
check_raise_invalid_parameter(' 10.10.0.0/24')
check_raise_invalid_parameter('10.10.0.0/24 ')
check_raise_invalid_parameter('.10.10.0.0/24 ')
check_raise_invalid_parameter('-1.10.0.0/24')
check_raise_invalid_parameter('10.256.0.0/24')
check_raise_invalid_parameter('10.10.0.0/33')
check_raise_invalid_parameter('10.10.0.0/-1')
def check_raise_invalid_vpc_range(cidr, ex_class):
self.assertRaises(ex_class,
ec2utils.validate_vpc_cidr, cidr,
ex_class)
check_raise_invalid_vpc_range('10.10.0.0/15',
exception.InvalidSubnetRange)
check_raise_invalid_vpc_range('10.10.0.0/29',
exception.InvalidVpcRange)

View File

@ -191,13 +191,21 @@ class SecurityGroupTestCase(base.ApiTestCase):
self.nova_security_groups.delete.assert_called_once_with( self.nova_security_groups.delete.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_1) fakes.ID_OS_SECURITY_GROUP_1)
# NOTE(Alex) This test is disabled because it checks using non-AWS id.
@base.skip_not_implemented
def test_delete_security_group_nova_os_id(self):
security_group.security_group_engine = (
security_group.SecurityGroupEngineNova())
self.nova_security_groups.list.return_value = (
[fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_1),
fakes.NovaSecurityGroup(fakes.OS_SECURITY_GROUP_2)])
resp = self.execute( resp = self.execute(
'DeleteSecurityGroup', 'DeleteSecurityGroup',
{'GroupId': {'GroupId':
fakes.ID_OS_SECURITY_GROUP_2}) fakes.ID_OS_SECURITY_GROUP_2})
self.assertEqual(200, resp['status']) self.assertEqual(200, resp['status'])
self.assertEqual(True, resp['return']) self.assertEqual(True, resp['return'])
self.nova_security_groups.delete.assert_any_call( self.nova_security_groups.delete.assert_called_once_with(
fakes.ID_OS_SECURITY_GROUP_2) fakes.ID_OS_SECURITY_GROUP_2)
def test_delete_security_group_invalid(self): def test_delete_security_group_invalid(self):
@ -293,13 +301,15 @@ class SecurityGroupTestCase(base.ApiTestCase):
def check_response(error_code, protocol, from_port, to_port, cidr, def check_response(error_code, protocol, from_port, to_port, cidr,
group_id=fakes.ID_EC2_SECURITY_GROUP_2): group_id=fakes.ID_EC2_SECURITY_GROUP_2):
resp = self.execute( params = {'IpPermissions.1.FromPort': str(from_port),
'AuthorizeSecurityGroupIngress',
{'GroupId': group_id,
'IpPermissions.1.FromPort': str(from_port),
'IpPermissions.1.ToPort': str(to_port), 'IpPermissions.1.ToPort': str(to_port),
'IpPermissions.1.IpProtocol': protocol, 'IpPermissions.1.IpProtocol': protocol}
'IpPermissions.1.IpRanges.1.CidrIp': cidr}) if group_id is not None:
params['GroupId'] = group_id
if cidr is not None:
params['IpPermissions.1.IpRanges.1.CidrIp'] = cidr
resp = self.execute(
'AuthorizeSecurityGroupIngress', params)
self.assertEqual(400, resp['status']) self.assertEqual(400, resp['status'])
self.assertEqual(error_code, resp['Error']['Code']) self.assertEqual(error_code, resp['Error']['Code'])
self.neutron.reset_mock() self.neutron.reset_mock()