TCP/UDP port range checks

This commit is contained in:
Kevin George
2013-07-16 22:54:52 -05:00
parent b063311716
commit b4040fd687
2 changed files with 39 additions and 25 deletions

View File

@@ -159,33 +159,37 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
return (group_ids, security_groups) return (group_ids, security_groups)
def _validate_security_group_rule(self, context, rule): def _validate_security_group_rule(self, context, rule):
PROTOCOLS = {'icmp': 1, 'tcp': 6, 'udp': 17}
ALLOWED_WITH_RANGE = [6, 17]
if (rule.get('remote_ip_prefix', None) and if (rule.get('remote_ip_prefix', None) and
rule.get('remote_group_id', None)): rule.get('remote_group_id', None)):
raise sg_ext.SecurityGroupRemoteGroupAndRemoteIpPrefix() raise sg_ext.SecurityGroupRemoteGroupAndRemoteIpPrefix()
protocol = rule.get('protocol', None) protocol = rule.pop('protocol')
if protocol is not None: port_range_min = rule['port_range_min']
if (protocol in [6, 17] and port_range_max = rule['port_range_max']
(type(rule.get('port_range_min', None)) != if protocol and not isinstance(protocol, int):
type(rule.get('port_range_max', None)))): protocol = PROTOCOLS[protocol]
if protocol in ALLOWED_WITH_RANGE:
if (port_range_min is None) != (port_range_max is None):
raise exceptions.InvalidInput( raise exceptions.InvalidInput(
error_message="For TCP/UDP rules, cannot wildcard only " error_message="For TCP/UDP rules, cannot wildcard "
"one end of port range.") "only one end of port range.")
try: if port_range_min > port_range_max:
protonumber = int(rule['protocol']) raise sg_ext.SecurityGroupInvalidPortRange()
if protonumber < 0 or protonumber > 255:
raise sg_ext.SecurityGroupRuleInvalidProtocol( if protocol:
protocol=protocol, if protocol < 0 or protocol > 255:
values=['udp', 'tcp', 'icmp']) raise sg_ext.SecurityGroupRuleInvalidProtocol()
except (ValueError, TypeError): if port_range_min > 65535:
raise sg_ext.SecurityGroupRuleInvalidProtocol( raise sg_ext.SecurityGroupInvalidPortValue(port=port_range_min)
protocol=protocol, values=['udp', 'tcp', 'icmp']) if port_range_max > 65535:
else: raise sg_ext.SecurityGroupInvalidPortValue(port=port_range_max)
rule.pop('protocol', None) rule['protocol'] = protocol
if (rule.get('port_range_min', None) is not None or elif port_range_min is not None or port_range_max is not None:
rule.get('port_range_max', None)) is not None: raise sg_ext.SecurityGroupProtocolRequiredWithPorts()
raise sg_ext.SecurityGroupProtocolRequiredWithPorts()
return rule return rule
@@ -639,7 +643,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
mac_address_string = str(netaddr.EUI(mac['address'], mac_address_string = str(netaddr.EUI(mac['address'],
dialect=netaddr.mac_unix)) dialect=netaddr.mac_unix))
address_pairs = [{'mac_address': mac_address_string, address_pairs = [{'mac_address': mac_address_string,
'ip_address': address.get('address_readable') or ''} 'ip_address': address.get('address_readable', '')}
for address in addresses] for address in addresses]
backend_port = self.net_driver.create_port(context, net["id"], backend_port = self.net_driver.create_port(context, net["id"],
port_id=port_id, port_id=port_id,
@@ -692,7 +696,7 @@ class Plugin(neutron_plugin_base_v2.NeutronPluginBaseV2,
dialect=netaddr.mac_unix)) dialect=netaddr.mac_unix))
address_pairs = [{'mac_address': mac_address_string, address_pairs = [{'mac_address': mac_address_string,
'ip_address': 'ip_address':
address.get('address_readable') or ''} address.get('address_readable', '')}
for address in addresses] for address in addresses]
(group_ids, security_groups) = self._make_security_group_list( (group_ids, security_groups) = self._make_security_group_list(

View File

@@ -1177,7 +1177,7 @@ class TestQuarkCreatePort(TestQuarkPlugin):
network = dict(id=1) network = dict(id=1)
mac = dict(address="aa:bb:cc:dd:ee:ff") mac = dict(address="aa:bb:cc:dd:ee:ff")
ip = mock.MagicMock() ip = mock.MagicMock()
ip.get = lambda x: 1 if x == "subnet_id" else None ip.get = lambda x, *y: 1 if x == "subnet_id" else None
ip.formatted = lambda: "192.168.10.45" ip.formatted = lambda: "192.168.10.45"
fixed_ips = [dict(subnet_id=1, ip_address="192.168.10.45")] fixed_ips = [dict(subnet_id=1, ip_address="192.168.10.45")]
port = dict(port=dict(mac_address=mac["address"], network_id=1, port = dict(port=dict(mac_address=mac["address"], network_id=1,
@@ -1903,7 +1903,9 @@ class TestQuarkCreateSecurityGroupRule(TestQuarkPlugin):
cfg.CONF.set_override('quota_security_group_rule', 1, 'QUOTAS') cfg.CONF.set_override('quota_security_group_rule', 1, 'QUOTAS')
cfg.CONF.set_override('quota_security_rules_per_group', 1, 'QUOTAS') cfg.CONF.set_override('quota_security_rules_per_group', 1, 'QUOTAS')
self.rule = {'id': 1, 'ethertype': 'IPv4', self.rule = {'id': 1, 'ethertype': 'IPv4',
'security_group_id': 1, 'group': {'id': 1}} 'security_group_id': 1, 'group': {'id': 1},
'protocol': None, 'port_range_min': None,
'port_range_max': None}
self.expected = { self.expected = {
'id': 1, 'id': 1,
'remote_group_id': None, 'remote_group_id': None,
@@ -1974,6 +1976,9 @@ class TestQuarkCreateSecurityGroupRule(TestQuarkPlugin):
self._test_create_security_rule(protocol=17, port_range_max=10) self._test_create_security_rule(protocol=17, port_range_max=10)
with self.assertRaises(sg_ext.SecurityGroupProtocolRequiredWithPorts): with self.assertRaises(sg_ext.SecurityGroupProtocolRequiredWithPorts):
self._test_create_security_rule(protocol=None, port_range_min=0) self._test_create_security_rule(protocol=None, port_range_min=0)
with self.assertRaises(Exception):
self._test_create_security_rule(
protocol=6, port_range_min=1, port_range_max=0)
def test_create_security_rule_remote_conflicts(self): def test_create_security_rule_remote_conflicts(self):
with self.assertRaises(Exception): with self.assertRaises(Exception):
@@ -1984,6 +1989,11 @@ class TestQuarkCreateSecurityGroupRule(TestQuarkPlugin):
with self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol): with self.assertRaises(sg_ext.SecurityGroupRuleInvalidProtocol):
self._test_create_security_rule(protocol=256) self._test_create_security_rule(protocol=256)
def test_create_security_rule_bad_port(self):
with self.assertRaises(sg_ext.SecurityGroupInvalidPortValue):
self._test_create_security_rule(protocol=6, port_range_min=0,
port_range_max=66000)
def test_create_security_rule_no_group(self): def test_create_security_rule_no_group(self):
with self.assertRaises(sg_ext.SecurityGroupNotFound): with self.assertRaises(sg_ext.SecurityGroupNotFound):
self._test_create_security_rule(group=None) self._test_create_security_rule(group=None)