Validate CIDR given as ip-prefix in security-group-rule-create

There was no validation for the provided ip prefix. This just adds
a simple parse using netaddr and explodes with appropriate message.
Also makes sure ip prefix _is_ cidr (192.168.1.1-->192.168.1.1/32).

Validation occurs at the attribute level (API model) as well as at
the db level, where the ethertype is validated against the ip_prefix
address type.

Unit test cases added - bad prefix, unmasked prefix and incorrect
ethertype. Also adds attribute test cases for the added
convert_ip_prefix_to_cidr method

Change-Id: I71fb8c887963a122a5bd8cfdda800026c1cd3954
Closes-Bug: 1255338
This commit is contained in:
marios 2013-11-29 18:23:54 +02:00
parent f89d3fec9b
commit 65aa92b034
5 changed files with 128 additions and 2 deletions

View File

@ -319,3 +319,7 @@ class DuplicatedExtension(NeutronException):
class DeviceIDNotOwnedByTenant(Conflict): class DeviceIDNotOwnedByTenant(Conflict):
message = _("The following device_id %(device_id)s is not owned by your " message = _("The following device_id %(device_id)s is not owned by your "
"tenant or matches another tenants router.") "tenant or matches another tenants router.")
class InvalidCIDR(BadRequest):
message = _("Invalid CIDR %(input)s given as IP prefix")

View File

@ -12,6 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import netaddr
import sqlalchemy as sa import sqlalchemy as sa
from sqlalchemy import orm from sqlalchemy import orm
from sqlalchemy.orm import exc from sqlalchemy.orm import exc
@ -331,6 +332,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
new_rules.add(rule['security_group_id']) new_rules.add(rule['security_group_id'])
self._validate_port_range(rule) self._validate_port_range(rule)
self._validate_ip_prefix(rule)
if rule['remote_ip_prefix'] and rule['remote_group_id']: if rule['remote_ip_prefix'] and rule['remote_group_id']:
raise ext_sg.SecurityGroupRemoteGroupAndRemoteIpPrefix() raise ext_sg.SecurityGroupRemoteGroupAndRemoteIpPrefix()
@ -411,6 +413,24 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
if (i['security_group_rule'] == db_rule): if (i['security_group_rule'] == db_rule):
raise ext_sg.SecurityGroupRuleExists(id=id) raise ext_sg.SecurityGroupRuleExists(id=id)
def _validate_ip_prefix(self, rule):
"""Check that a valid cidr was specified as remote_ip_prefix
No need to check that it is in fact an IP address as this is already
validated by attribute validators.
Check that rule ethertype is consistent with remote_ip_prefix ip type.
Add mask to ip_prefix if absent (192.168.1.10 -> 192.168.1.10/32).
"""
input_prefix = rule['remote_ip_prefix']
if input_prefix:
addr = netaddr.IPNetwork(input_prefix)
# set input_prefix to always include the netmask:
rule['remote_ip_prefix'] = str(addr)
# check consistency of ethertype with addr version
if rule['ethertype'] != "IPv%d" % (addr.version):
raise ext_sg.SecurityGroupRuleParameterConflict(
ethertype=rule['ethertype'], cidr=input_prefix)
def get_security_group_rules(self, context, filters=None, fields=None, def get_security_group_rules(self, context, filters=None, fields=None,
sorts=None, limit=None, marker=None, sorts=None, limit=None, marker=None,
page_reverse=False): page_reverse=False):

View File

@ -15,6 +15,7 @@
from abc import ABCMeta from abc import ABCMeta
from abc import abstractmethod from abc import abstractmethod
import netaddr
from oslo.config import cfg from oslo.config import cfg
import six import six
@ -106,6 +107,10 @@ class SecurityGroupRuleExists(qexception.InUse):
message = _("Security group rule already exists. Group id is %(id)s.") message = _("Security group rule already exists. Group id is %(id)s.")
class SecurityGroupRuleParameterConflict(qexception.InvalidInput):
message = _("Conflicting value ethertype %(ethertype)s for CIDR %(cidr)s")
def convert_protocol(value): def convert_protocol(value):
if value is None: if value is None:
return return
@ -156,6 +161,16 @@ def convert_to_uuid_list_or_none(value_list):
return value_list return value_list
def convert_ip_prefix_to_cidr(ip_prefix):
if not ip_prefix:
return
try:
cidr = netaddr.IPNetwork(ip_prefix)
return str(cidr)
except (TypeError, netaddr.AddrFormatError):
raise qexception.InvalidCIDR(input=ip_prefix)
def _validate_name_not_default(data, valid_values=None): def _validate_name_not_default(data, valid_values=None):
if data == "default": if data == "default":
raise SecurityGroupDefaultAlreadyExists() raise SecurityGroupDefaultAlreadyExists()
@ -211,7 +226,8 @@ RESOURCE_ATTRIBUTE_MAP = {
'convert_to': convert_ethertype_to_case_insensitive, 'convert_to': convert_ethertype_to_case_insensitive,
'validate': {'type:values': sg_supported_ethertypes}}, 'validate': {'type:values': sg_supported_ethertypes}},
'remote_ip_prefix': {'allow_post': True, 'allow_put': False, 'remote_ip_prefix': {'allow_post': True, 'allow_put': False,
'default': None, 'is_visible': True}, 'default': None, 'is_visible': True,
'convert_to': convert_ip_prefix_to_cidr},
'tenant_id': {'allow_post': True, 'allow_put': False, 'tenant_id': {'allow_post': True, 'allow_put': False,
'required_by_policy': True, 'required_by_policy': True,
'is_visible': True}, 'is_visible': True},

View File

@ -21,10 +21,12 @@ import webob.exc
from neutron.api.v2 import attributes as attr from neutron.api.v2 import attributes as attr
from neutron.common import constants as const from neutron.common import constants as const
from neutron.common import exceptions as n_exc
from neutron import context from neutron import context
from neutron.db import db_base_plugin_v2 from neutron.db import db_base_plugin_v2
from neutron.db import securitygroups_db from neutron.db import securitygroups_db
from neutron.extensions import securitygroup as ext_sg from neutron.extensions import securitygroup as ext_sg
from neutron.tests import base
from neutron.tests.unit import test_db_plugin from neutron.tests.unit import test_db_plugin
DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_security_group.' DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_security_group.'
@ -404,6 +406,70 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res) self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_invalid_ip_prefix(self):
name = 'webservers'
description = 'my webservers'
for bad_prefix in ['bad_ip', 256, "2001:db8:a::123/129", '172.30./24']:
with self.security_group(name, description) as sg:
sg_id = sg['security_group']['id']
remote_ip_prefix = bad_prefix
rule = self._build_security_group_rule(
sg_id,
'ingress',
const.PROTO_NAME_TCP,
'22', '22',
remote_ip_prefix)
res = self._create_security_group_rule(self.fmt, rule)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_invalid_ethertype_for_prefix(self):
name = 'webservers'
description = 'my webservers'
test_addr = {'192.168.1.1/24': 'ipv4', '192.168.1.1/24': 'IPv6',
'2001:db8:1234::/48': 'ipv6',
'2001:db8:1234::/48': 'IPv4'}
for prefix, ether in test_addr.iteritems():
with self.security_group(name, description) as sg:
sg_id = sg['security_group']['id']
ethertype = ether
remote_ip_prefix = prefix
rule = self._build_security_group_rule(
sg_id,
'ingress',
const.PROTO_NAME_TCP,
'22', '22',
remote_ip_prefix,
None,
None,
ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_with_unmasked_prefix(self):
name = 'webservers'
description = 'my webservers'
addr = {'10.1.2.3': {'mask': '32', 'ethertype': 'IPv4'},
'fe80::2677:3ff:fe7d:4c': {'mask': '128', 'ethertype': 'IPv6'}}
for ip in addr:
with self.security_group(name, description) as sg:
sg_id = sg['security_group']['id']
ethertype = addr[ip]['ethertype']
remote_ip_prefix = ip
rule = self._build_security_group_rule(
sg_id,
'ingress',
const.PROTO_NAME_TCP,
'22', '22',
remote_ip_prefix,
None,
None,
ethertype)
res = self._create_security_group_rule(self.fmt, rule)
self.assertEqual(res.status_int, 201)
res_sg = self.deserialize(self.fmt, res)
prefix = res_sg['security_group_rule']['remote_ip_prefix']
self.assertEqual(prefix, '%s/%s' % (ip, addr[ip]['mask']))
def test_create_security_group_rule_tcp_protocol_as_number(self): def test_create_security_group_rule_tcp_protocol_as_number(self):
name = 'webservers' name = 'webservers'
description = 'my webservers' description = 'my webservers'
@ -1348,5 +1414,25 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
class TestConvertIPPrefixToCIDR(base.BaseTestCase):
def test_convert_bad_ip_prefix_to_cidr(self):
for val in ['bad_ip', 256, "2001:db8:a::123/129"]:
self.assertRaises(n_exc.InvalidCIDR,
ext_sg.convert_ip_prefix_to_cidr, val)
self.assertIsNone(ext_sg.convert_ip_prefix_to_cidr(None))
def test_convert_ip_prefix_no_netmask_to_cidr(self):
addr = {'10.1.2.3': '32', 'fe80::2677:3ff:fe7d:4c': '128'}
for k, v in addr.iteritems():
self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(k),
'%s/%s' % (k, v))
def test_convert_ip_prefix_with_netmask_to_cidr(self):
addresses = ['10.1.0.0/16', '10.1.2.3/32', '2001:db8:1234::/48']
for addr in addresses:
self.assertEqual(ext_sg.convert_ip_prefix_to_cidr(addr), addr)
class TestSecurityGroupsXML(TestSecurityGroups): class TestSecurityGroupsXML(TestSecurityGroups):
fmt = 'xml' fmt = 'xml'

View File

@ -42,7 +42,7 @@ from neutron.tests.unit import test_extension_security_group as test_sg
FAKE_PREFIX = {const.IPv4: '10.0.0.0/24', FAKE_PREFIX = {const.IPv4: '10.0.0.0/24',
const.IPv6: '2001:0db8::/64'} const.IPv6: '2001:db8::/64'}
FAKE_IP = {const.IPv4: '10.0.0.1', FAKE_IP = {const.IPv4: '10.0.0.1',
const.IPv6: 'fe80::1', const.IPv6: 'fe80::1',
'IPv6_GLOBAL': '2001:0db8::1', 'IPv6_GLOBAL': '2001:0db8::1',