Security Group rule validation for ICMP rules

Currently there is no validation in Security Group rules
when an ICMP rule is added with icmp code alone. A rule
is getting added but there is a mismatch between SG rules
and the corresponding iptables rule that is added.
This patch does the necessary validation on the input.

Closes-Bug: #1301838
Change-Id: I510abac4c426f68ea57c99a5fef3da4058f88797
This commit is contained in:
sridhargaddam 2014-04-03 18:30:07 +05:30
parent 4cfb85f2c8
commit 7607e3da88
3 changed files with 22 additions and 0 deletions

View File

@ -312,6 +312,10 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
if rule[attr] > 255:
raise ext_sg.SecurityGroupInvalidIcmpValue(
field=field, attr=attr, value=rule[attr])
if (rule['port_range_min'] is None and
rule['port_range_max']):
raise ext_sg.SecurityGroupMissingIcmpType(
value=rule['port_range_max'])
def _validate_security_group_rules(self, context, security_group_rule):
"""Check that rules being installed.

View File

@ -44,6 +44,11 @@ class SecurityGroupInvalidIcmpValue(qexception.InvalidInput):
"%(value)s. It must be 0 to 255.")
class SecurityGroupMissingIcmpType(qexception.InvalidInput):
message = _("ICMP code (port-range-max) %(value)s is provided"
" but ICMP type (port-range-min) is missing.")
class SecurityGroupInUse(qexception.InUse):
message = _("Security Group %(id)s in use.")

View File

@ -893,6 +893,19 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_icmp_with_code_only(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
with self.security_group_rule(security_group_id):
rule = self._build_security_group_rule(
sg['security_group']['id'], 'ingress',
const.PROTO_NAME_ICMP, None, '2')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_list_ports_security_group(self):
with self.network() as n:
with self.subnet(n):