Only allow SG port ranges for whitelisted protocols

Iptables only supports port-ranges for certain protocols,
others will generate failures, possibly leaving the agent
looping trying to apply rules.  Change to not allow port
ranges outside of the list of known good protocols.

This backport is based on commit
b564871bb759a38cf96527f94e7c7d4cc760b1c9, excluding validation
and tests for protocols where support for port ranges was
added later (in Pike, only TCP and UDP are supported).

Conflicts:
    neutron/tests/unit/db/test_securitygroups_db.py

Change-Id: I5867f77fc5aedc169b42f50def0424ff209c164c
Closes-bug: #1749667
(cherry picked from commit b564871bb759a38cf96527f94e7c7d4cc760b1c9)
This commit is contained in:
Brian Haley 2018-02-15 13:57:32 -05:00 committed by Florian Haas
parent 1d3568da77
commit d8f9c94470
4 changed files with 65 additions and 0 deletions

View File

@ -465,6 +465,14 @@ class SecurityGroupDbMixin(ext_sg.SecurityGroupPluginBase):
rule['port_range_max'] is not None):
raise ext_sg.SecurityGroupMissingIcmpType(
value=rule['port_range_max'])
else:
# Only the protocols above support port ranges, raise otherwise.
# When min/max are the same it is just a single port.
if (rule['port_range_min'] is not None and
rule['port_range_max'] is not None and
rule['port_range_min'] != rule['port_range_max']):
raise ext_sg.SecurityGroupInvalidProtocolForPortRange(
protocol=ip_proto)
def _validate_ethertype_and_protocol(self, rule):
"""Check if given ethertype and protocol are valid or not"""

View File

@ -40,6 +40,11 @@ class SecurityGroupInvalidPortRange(nexception.InvalidInput):
"<= port_range_max")
class SecurityGroupInvalidProtocolForPortRange(nexception.InvalidInput):
message = _("Invalid protocol %(protocol)s for port range, only "
"supported for TCP and UDP.")
class SecurityGroupInvalidPortValue(nexception.InvalidInput):
message = _("Invalid value for port %(port)s")

View File

@ -440,3 +440,16 @@ class SecurityGroupDbMixinTestCase(testlib_api.SqlTestCase):
{'port_range_min': pmin,
'port_range_max': pmax,
'protocol': protocol})
def test__validate_port_range_exception(self):
self.assertRaises(securitygroup.SecurityGroupInvalidPortValue,
self.mixin._validate_port_range,
{'port_range_min': 0,
'port_range_max': None,
'protocol': constants.PROTO_NAME_TCP})
self.assertRaises(
securitygroup.SecurityGroupInvalidProtocolForPortRange,
self.mixin._validate_port_range,
{'port_range_min': 100,
'port_range_max': 200,
'protocol': '111'})

View File

@ -600,6 +600,45 @@ class TestSecurityGroups(SecurityGroupDBTestCase):
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_number_with_port(self):
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 111
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '70')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_number_range(self):
# This is a SG rule with a port range, but treated as a single
# port since min/max are the same.
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 111
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '70', '70')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPCreated.code, res.status_int)
def test_create_security_group_rule_protocol_as_number_range_bad(self):
# Only certain protocols support a SG rule with a port range
name = 'webservers'
description = 'my webservers'
with self.security_group(name, description) as sg:
security_group_id = sg['security_group']['id']
protocol = 111
rule = self._build_security_group_rule(
security_group_id, 'ingress', protocol, '70', '71')
res = self._create_security_group_rule(self.fmt, rule)
self.deserialize(self.fmt, res)
self.assertEqual(webob.exc.HTTPBadRequest.code, res.status_int)
def test_create_security_group_rule_case_insensitive(self):
name = 'webservers'
description = 'my webservers'