Validate src_ip_adress, dest_ip_address and ip_version
The FwaaS API should not allow the creation of firewall rules where
the ip_version is set to 4, but the source or destination IPs are IPv6
addresses
APIImpact
DocImpact
Closes-Bug: #1487599
Change-Id: Iad680996a47adcf27f9dc7e0bc0fea924fff4f9f
(cherry picked from commit 29c5187968
)
This commit is contained in:
parent
598913f19d
commit
04fc601723
@ -31,6 +31,8 @@ from sqlalchemy.ext.orderinglist import ordering_list
|
||||
from sqlalchemy import orm
|
||||
from sqlalchemy.orm import exc
|
||||
|
||||
import netaddr
|
||||
|
||||
from neutron_fwaas.extensions import firewall as fw_ext
|
||||
|
||||
|
||||
@ -285,6 +287,18 @@ class Firewall_db_mixin(fw_ext.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
if fw_tenant_id != fwp['tenant_id'] and not fwp['shared']:
|
||||
raise fw_ext.FirewallPolicyConflict(firewall_policy_id=fwp_id)
|
||||
|
||||
def _validate_fwr_src_dst_ip_version(self, fwr):
|
||||
src_version = dst_version = None
|
||||
if fwr['source_ip_address']:
|
||||
src_version = netaddr.IPNetwork(fwr['source_ip_address']).version
|
||||
if fwr['destination_ip_address']:
|
||||
dst_version = netaddr.IPNetwork(
|
||||
fwr['destination_ip_address']).version
|
||||
rule_ip_version = fwr['ip_version']
|
||||
if ((src_version and src_version != rule_ip_version) or
|
||||
(dst_version and dst_version != rule_ip_version)):
|
||||
raise fw_ext.FirewallIpAddressConflict()
|
||||
|
||||
def _validate_fwr_port_range(self, min_port, max_port):
|
||||
if int(min_port) > int(max_port):
|
||||
port_range = '%s:%s' % (min_port, max_port)
|
||||
@ -440,6 +454,7 @@ class Firewall_db_mixin(fw_ext.FirewallPluginBase, base_db.CommonDbMixin):
|
||||
LOG.debug("create_firewall_rule() called")
|
||||
fwr = firewall_rule['firewall_rule']
|
||||
self._validate_fwr_protocol_parameters(fwr)
|
||||
self._validate_fwr_src_dst_ip_version(fwr)
|
||||
tenant_id = self._get_tenant_id_for_create(context, fwr)
|
||||
if not fwr['protocol'] and (fwr['source_port'] or
|
||||
fwr['destination_port']):
|
||||
|
@ -137,6 +137,9 @@ class FirewallRuleInfoMissing(nexception.InvalidInput):
|
||||
"rule operation.")
|
||||
|
||||
|
||||
class FirewallIpAddressConflict(nexception.InvalidInput):
|
||||
message = _("Invalid input - IP addresses do not agree with IP Version")
|
||||
|
||||
# TODO(dougwig) - once this exception is out of neutron, restore this
|
||||
#class FirewallInternalDriverError(nexception.NeutronException):
|
||||
# """Fwaas exception for all driver errors.
|
||||
|
@ -713,6 +713,18 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
|
||||
for k, v in six.iteritems(attrs):
|
||||
self.assertEqual(firewall_rule['firewall_rule'][k], v)
|
||||
|
||||
def test_create_firewall_without_source(self):
|
||||
attrs = self._get_test_firewall_rule_attrs()
|
||||
attrs['source_ip_address'] = None
|
||||
res = self._create_firewall_rule(self.fmt, **attrs)
|
||||
self.assertEqual(201, res.status_int)
|
||||
|
||||
def test_create_firewall_rule_without_destination(self):
|
||||
attrs = self._get_test_firewall_rule_attrs()
|
||||
attrs['destination_ip_address'] = None
|
||||
res = self._create_firewall_rule(self.fmt, **attrs)
|
||||
self.assertEqual(201, res.status_int)
|
||||
|
||||
def test_create_firewall_rule_without_protocol_with_dport(self):
|
||||
attrs = self._get_test_firewall_rule_attrs()
|
||||
attrs['protocol'] = None
|
||||
@ -758,6 +770,28 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase):
|
||||
for k, v in six.iteritems(attrs):
|
||||
self.assertEqual(res['firewall_rule'][k], v)
|
||||
|
||||
def test_create_firewall_rule_with_ipv6_addrs_and_wrong_ip_version(self):
|
||||
attrs = self._get_test_firewall_rule_attrs()
|
||||
attrs['source_ip_address'] = '::/0'
|
||||
attrs['destination_ip_address'] = '2001:db8:3::/64'
|
||||
attrs['ip_version'] = 4
|
||||
res = self._create_firewall_rule(self.fmt, **attrs)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
attrs = self._get_test_firewall_rule_attrs()
|
||||
attrs['source_ip_address'] = None
|
||||
attrs['destination_ip_address'] = '2001:db8:3::/64'
|
||||
attrs['ip_version'] = 4
|
||||
res = self._create_firewall_rule(self.fmt, **attrs)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
attrs = self._get_test_firewall_rule_attrs()
|
||||
attrs['source_ip_address'] = '::/0'
|
||||
attrs['destination_ip_address'] = None
|
||||
attrs['ip_version'] = 4
|
||||
res = self._create_firewall_rule(self.fmt, **attrs)
|
||||
self.assertEqual(400, res.status_int)
|
||||
|
||||
def test_list_firewall_rules(self):
|
||||
with self.firewall_rule(name='fwr1') as fwr1, \
|
||||
self.firewall_rule(name='fwr2') as fwr2, \
|
||||
|
Loading…
Reference in New Issue
Block a user