Validate ethertype for icmp protocols

This patch will add the validation function to make sure security_group
rule is not created when ethertype is IPv4 and protocol is icmpv6.

Closes-Bug: #1505832
Depends-On: Ia1a5342a1d568cb1a015e1b7acecf38b8d1f46e1

Co-Authored By: Matt Dorn <madorn@gmail.com>

Change-Id: I4a15935c564aaa48555ed08f6da51113787ecb73
This commit is contained in:
Manjeet Singh Bhatia 2015-10-14 16:36:23 +00:00
parent ac7c82fbe5
commit 7335dbdabe
3 changed files with 24 additions and 0 deletions

View File

@ -446,6 +446,13 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
raise ext_sg.SecurityGroupMissingIcmpType(
value=rule['port_range_max'])
def _validate_ethertype_and_protocol(self, rule):
"""Check if given ethertype and protocol are valid or not"""
if rule['protocol'] == constants.PROTO_NAME_ICMP_V6:
if rule['ethertype'] == constants.IPv4:
raise ext_sg.SecurityGroupEthertypeConflictWithProtocol(
ethertype=rule['ethertype'], protocol=rule['protocol'])
def _validate_single_tenant_and_group(self, security_group_rules):
"""Check that all rules belong to the same security group and tenant
"""
@ -466,6 +473,7 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
rule = security_group_rule['security_group_rule']
self._validate_port_range(rule)
self._validate_ip_prefix(rule)
self._validate_ethertype_and_protocol(rule)
if rule['remote_ip_prefix'] and rule['remote_group_id']:
raise ext_sg.SecurityGroupRemoteGroupAndRemoteIpPrefix()

View File

@ -44,6 +44,11 @@ class SecurityGroupInvalidIcmpValue(nexception.InvalidInput):
"%(value)s. It must be 0 to 255.")
class SecurityGroupEthertypeConflictWithProtocol(nexception.InvalidInput):
message = ("Invalid ethertype %(ethertype)s for protocol "
"%(protocol)s .")
class SecurityGroupMissingIcmpType(nexception.InvalidInput):
message = _("ICMP code (port-range-max) %(value)s is provided"
" but ICMP type (port-range-min) is missing.")

View File

@ -420,6 +420,17 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_ethertype_invalid_for_protocol(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
rule = self._build_security_group_rule(
security_group_id, 'ingress', const.PROTO_NAME_ICMP_V6)
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code)
def test_create_security_group_rule_invalid_ip_prefix(self):
name = 'webservers'
description = 'my webservers'