From 04fc6017233c9bb7032d8af5f3860df274ef0ed0 Mon Sep 17 00:00:00 2001 From: "Sean M. Collins" Date: Mon, 10 Aug 2015 15:24:52 -0400 Subject: [PATCH] Validate src_ip_adress, dest_ip_address and ip_version The FwaaS API should not allow the creation of firewall rules where the ip_version is set to 4, but the source or destination IPs are IPv6 addresses APIImpact DocImpact Closes-Bug: #1487599 Change-Id: Iad680996a47adcf27f9dc7e0bc0fea924fff4f9f (cherry picked from commit 29c51879683087e8ea486aaa9866bbce1f47a15c) --- neutron_fwaas/db/firewall/firewall_db.py | 15 ++++++++ neutron_fwaas/extensions/firewall.py | 3 ++ .../unit/db/firewall/test_firewall_db.py | 34 +++++++++++++++++++ 3 files changed, 52 insertions(+) diff --git a/neutron_fwaas/db/firewall/firewall_db.py b/neutron_fwaas/db/firewall/firewall_db.py index 486a68197..87dfaac46 100644 --- a/neutron_fwaas/db/firewall/firewall_db.py +++ b/neutron_fwaas/db/firewall/firewall_db.py @@ -31,6 +31,8 @@ from sqlalchemy.ext.orderinglist import ordering_list from sqlalchemy import orm from sqlalchemy.orm import exc +import netaddr + from neutron_fwaas.extensions import firewall as fw_ext @@ -285,6 +287,18 @@ class Firewall_db_mixin(fw_ext.FirewallPluginBase, base_db.CommonDbMixin): if fw_tenant_id != fwp['tenant_id'] and not fwp['shared']: raise fw_ext.FirewallPolicyConflict(firewall_policy_id=fwp_id) + def _validate_fwr_src_dst_ip_version(self, fwr): + src_version = dst_version = None + if fwr['source_ip_address']: + src_version = netaddr.IPNetwork(fwr['source_ip_address']).version + if fwr['destination_ip_address']: + dst_version = netaddr.IPNetwork( + fwr['destination_ip_address']).version + rule_ip_version = fwr['ip_version'] + if ((src_version and src_version != rule_ip_version) or + (dst_version and dst_version != rule_ip_version)): + raise fw_ext.FirewallIpAddressConflict() + def _validate_fwr_port_range(self, min_port, max_port): if int(min_port) > int(max_port): port_range = '%s:%s' % (min_port, max_port) @@ -440,6 +454,7 @@ class Firewall_db_mixin(fw_ext.FirewallPluginBase, base_db.CommonDbMixin): LOG.debug("create_firewall_rule() called") fwr = firewall_rule['firewall_rule'] self._validate_fwr_protocol_parameters(fwr) + self._validate_fwr_src_dst_ip_version(fwr) tenant_id = self._get_tenant_id_for_create(context, fwr) if not fwr['protocol'] and (fwr['source_port'] or fwr['destination_port']): diff --git a/neutron_fwaas/extensions/firewall.py b/neutron_fwaas/extensions/firewall.py index 0c622f027..8b7b4ff62 100644 --- a/neutron_fwaas/extensions/firewall.py +++ b/neutron_fwaas/extensions/firewall.py @@ -137,6 +137,9 @@ class FirewallRuleInfoMissing(nexception.InvalidInput): "rule operation.") +class FirewallIpAddressConflict(nexception.InvalidInput): + message = _("Invalid input - IP addresses do not agree with IP Version") + # TODO(dougwig) - once this exception is out of neutron, restore this #class FirewallInternalDriverError(nexception.NeutronException): # """Fwaas exception for all driver errors. diff --git a/neutron_fwaas/tests/unit/db/firewall/test_firewall_db.py b/neutron_fwaas/tests/unit/db/firewall/test_firewall_db.py index cefb50bc4..96e409d06 100644 --- a/neutron_fwaas/tests/unit/db/firewall/test_firewall_db.py +++ b/neutron_fwaas/tests/unit/db/firewall/test_firewall_db.py @@ -713,6 +713,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): for k, v in six.iteritems(attrs): self.assertEqual(firewall_rule['firewall_rule'][k], v) + def test_create_firewall_without_source(self): + attrs = self._get_test_firewall_rule_attrs() + attrs['source_ip_address'] = None + res = self._create_firewall_rule(self.fmt, **attrs) + self.assertEqual(201, res.status_int) + + def test_create_firewall_rule_without_destination(self): + attrs = self._get_test_firewall_rule_attrs() + attrs['destination_ip_address'] = None + res = self._create_firewall_rule(self.fmt, **attrs) + self.assertEqual(201, res.status_int) + def test_create_firewall_rule_without_protocol_with_dport(self): attrs = self._get_test_firewall_rule_attrs() attrs['protocol'] = None @@ -758,6 +770,28 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): for k, v in six.iteritems(attrs): self.assertEqual(res['firewall_rule'][k], v) + def test_create_firewall_rule_with_ipv6_addrs_and_wrong_ip_version(self): + attrs = self._get_test_firewall_rule_attrs() + attrs['source_ip_address'] = '::/0' + attrs['destination_ip_address'] = '2001:db8:3::/64' + attrs['ip_version'] = 4 + res = self._create_firewall_rule(self.fmt, **attrs) + self.assertEqual(400, res.status_int) + + attrs = self._get_test_firewall_rule_attrs() + attrs['source_ip_address'] = None + attrs['destination_ip_address'] = '2001:db8:3::/64' + attrs['ip_version'] = 4 + res = self._create_firewall_rule(self.fmt, **attrs) + self.assertEqual(400, res.status_int) + + attrs = self._get_test_firewall_rule_attrs() + attrs['source_ip_address'] = '::/0' + attrs['destination_ip_address'] = None + attrs['ip_version'] = 4 + res = self._create_firewall_rule(self.fmt, **attrs) + self.assertEqual(400, res.status_int) + def test_list_firewall_rules(self): with self.firewall_rule(name='fwr1') as fwr1, \ self.firewall_rule(name='fwr2') as fwr2, \