diff --git a/neutron/db/firewall/firewall_db.py b/neutron/db/firewall/firewall_db.py index b574758772c..5636a8cbfab 100644 --- a/neutron/db/firewall/firewall_db.py +++ b/neutron/db/firewall/firewall_db.py @@ -230,6 +230,13 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): else: return '%d:%d' % (min_port, max_port) + def _validate_fwr_protocol_parameters(self, fwr): + protocol = fwr['protocol'] + if protocol not in (const.TCP, const.UDP): + if fwr['source_port'] or fwr['destination_port']: + raise firewall.FirewallRuleInvalidICMPParameter( + param="Source, destination port") + def create_firewall(self, context, firewall): LOG.debug(_("create_firewall() called")) fw = firewall['firewall'] @@ -341,6 +348,7 @@ class Firewall_db_mixin(firewall.FirewallPluginBase, base_db.CommonDbMixin): def create_firewall_rule(self, context, firewall_rule): LOG.debug(_("create_firewall_rule() called")) fwr = firewall_rule['firewall_rule'] + self._validate_fwr_protocol_parameters(fwr) tenant_id = self._get_tenant_id_for_create(context, fwr) src_port_min, src_port_max = self._get_min_max_ports_from_range( fwr['source_port']) diff --git a/neutron/extensions/firewall.py b/neutron/extensions/firewall.py index 8ac534ce431..bbb5d163ec7 100644 --- a/neutron/extensions/firewall.py +++ b/neutron/extensions/firewall.py @@ -78,6 +78,11 @@ class FirewallRuleInvalidAction(qexception.InvalidInput): "Only action values %(values)s are supported.") +class FirewallRuleInvalidICMPParameter(qexception.InvalidInput): + message = _("%(param)s are not allowed when protocol " + "is set to ICMP.") + + class FirewallInvalidPortValue(qexception.InvalidInput): message = _("Invalid value for port %(port)s.") diff --git a/neutron/tests/unit/db/firewall/test_db_firewall.py b/neutron/tests/unit/db/firewall/test_db_firewall.py index e3f021f9b75..69cf3648349 100644 --- a/neutron/tests/unit/db/firewall/test_db_firewall.py +++ b/neutron/tests/unit/db/firewall/test_db_firewall.py @@ -580,6 +580,24 @@ class TestFirewallDBPlugin(FirewallPluginDbTestCase): for k, v in attrs.iteritems(): self.assertEqual(firewall_rule['firewall_rule'][k], v) + def test_create_firewall_rule_icmp_with_port(self): + attrs = self._get_test_firewall_rule_attrs() + attrs['protocol'] = 'icmp' + res = self._create_firewall_rule(self.fmt, **attrs) + self.assertEqual(400, res.status_int) + + def test_create_firewall_rule_icmp_without_port(self): + attrs = self._get_test_firewall_rule_attrs() + + attrs['protocol'] = 'icmp' + attrs['source_port'] = None + attrs['destination_port'] = None + with self.firewall_rule(source_port=None, + destination_port=None, + protocol='icmp') as firewall_rule: + for k, v in attrs.iteritems(): + self.assertEqual(firewall_rule['firewall_rule'][k], v) + def test_show_firewall_rule_with_fw_policy_not_associated(self): attrs = self._get_test_firewall_rule_attrs() with self.firewall_rule() as fw_rule: