diff --git a/neutron/db/securitygroups_db.py b/neutron/db/securitygroups_db.py index 882a43d6256..4777a0dc5a5 100644 --- a/neutron/db/securitygroups_db.py +++ b/neutron/db/securitygroups_db.py @@ -312,6 +312,10 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase): if rule[attr] > 255: raise ext_sg.SecurityGroupInvalidIcmpValue( field=field, attr=attr, value=rule[attr]) + if (rule['port_range_min'] is None and + rule['port_range_max']): + raise ext_sg.SecurityGroupMissingIcmpType( + value=rule['port_range_max']) def _validate_security_group_rules(self, context, security_group_rule): """Check that rules being installed. diff --git a/neutron/extensions/securitygroup.py b/neutron/extensions/securitygroup.py index f6f18192570..4351de98a2c 100644 --- a/neutron/extensions/securitygroup.py +++ b/neutron/extensions/securitygroup.py @@ -44,6 +44,11 @@ class SecurityGroupInvalidIcmpValue(qexception.InvalidInput): "%(value)s. It must be 0 to 255.") +class SecurityGroupMissingIcmpType(qexception.InvalidInput): + message = _("ICMP code (port-range-max) %(value)s is provided" + " but ICMP type (port-range-min) is missing.") + + class SecurityGroupInUse(qexception.InUse): message = _("Security Group %(id)s in use.") diff --git a/neutron/tests/unit/test_extension_security_group.py b/neutron/tests/unit/test_extension_security_group.py index 1881d8c8484..43acdf9e683 100644 --- a/neutron/tests/unit/test_extension_security_group.py +++ b/neutron/tests/unit/test_extension_security_group.py @@ -893,6 +893,19 @@ class TestSecurityGroups(SecurityGroupDBTestCase): self.deserialize(self.fmt, res) self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) + def test_create_security_group_rule_icmp_with_code_only(self): + name = 'webservers' + description = 'my webservers' + with self.security_group(name, description) as sg: + security_group_id = sg['security_group']['id'] + with self.security_group_rule(security_group_id): + rule = self._build_security_group_rule( + sg['security_group']['id'], 'ingress', + const.PROTO_NAME_ICMP, None, '2') + res = self._create_security_group_rule(self.fmt, rule) + self.deserialize(self.fmt, res) + self.assertEqual(res.status_int, webob.exc.HTTPBadRequest.code) + def test_list_ports_security_group(self): with self.network() as n: with self.subnet(n):