From 78aa12567d226fe73df483d75e0d1980e6d3625e Mon Sep 17 00:00:00 2001 From: Sumit Naiksatam Date: Fri, 20 Sep 2013 16:40:52 -0700 Subject: [PATCH] Fix FWaaS plugin to allow one firewall per tenant The reference implementation of the FWaaS iptables agent/driver supports only one firewall per tenant in Havana release. However, the FWaaS plugin will let you create more than one firewall. This is being fixed in this patch to not allow creating the second firewall if a firewall already exists for the tenant. Change-Id: I8f1cad9791723ba919b5774a63982c204686ddfe Closes-Bug: #1228442 --- neutron/services/firewall/fwaas_plugin.py | 17 +++++++++++ .../services/firewall/test_fwaas_plugin.py | 29 ++++++++++++------- 2 files changed, 36 insertions(+), 10 deletions(-) diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index 5afdc0989f8..b7b59bbf1d6 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -19,6 +19,7 @@ from oslo.config import cfg +from neutron.common import exceptions as n_exception from neutron.common import rpc as q_rpc from neutron.common import topics from neutron import context as neutron_context @@ -127,6 +128,17 @@ class FirewallAgentApi(proxy.RpcProxy): ) +class FirewallCountExceeded(n_exception.NeutronException): + + """Reference implementation specific exception for firewall count. + + Only one firewall is supported per tenant. When a second + firewall is tried to be created, this exception will be raised. + """ + message = _("Exceeded allowed count of firewalls for tenant " + "%(tenant_id)s. Only one firewall is supported per tenant.") + + class FirewallPlugin(firewall_db.Firewall_db_mixin): """Implementation of the Neutron Firewall Service Plugin. @@ -209,6 +221,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin): def create_firewall(self, context, firewall): LOG.debug(_("create_firewall() called")) + tenant_id = self._get_tenant_id_for_create(context, + firewall['firewall']) + fw_count = self.get_firewalls_count(context) + if fw_count: + raise FirewallCountExceeded(tenant_id=tenant_id) firewall['firewall']['status'] = const.PENDING_CREATE fw = super(FirewallPlugin, self).create_firewall(context, firewall) fw_with_rules = ( diff --git a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py index 4d0f53383a9..4d02639f651 100644 --- a/neutron/tests/unit/services/firewall/test_fwaas_plugin.py +++ b/neutron/tests/unit/services/firewall/test_fwaas_plugin.py @@ -140,16 +140,10 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase): fwp_id = fwp['firewall_policy']['id'] attrs = self._get_test_firewall_attrs() attrs['firewall_policy_id'] = fwp_id - with contextlib.nested(self.firewall( - firewall_policy_id=fwp_id, - tenant_id=tenant_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True), self.firewall( - firewall_policy_id=fwp_id, - tenant_id=tenant_id, - admin_state_up=test_db_firewall.ADMIN_STATE_UP, - no_delete=True)) as fws: - fw_list = [fw['firewall'] for fw in fws] + with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id, + admin_state_up=test_db_firewall.ADMIN_STATE_UP, + no_delete=True) as fw: + fw_list = [fw['firewall']] f = self.callbacks.get_firewalls_for_tenant_without_rules res = f(ctx, host='dummy') for fw in res: @@ -202,6 +196,13 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS) self.callbacks = self.plugin.callbacks + def test_create_second_firewall_not_permitted(self): + with self.firewall(no_delete=True): + res = self._create_firewall( + None, 'firewall2', description='test', + firewall_policy_id=None, admin_state_up=True) + self.assertEqual(res.status_int, 500) + def test_update_firewall(self): ctx = context.get_admin_context() name = "new_firewall1" @@ -352,3 +353,11 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin): fw_id = fw['firewall']['id'] fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id) self.assertEqual(fw_rules['firewall_rule_list'], []) + + def test_list_firewalls(self): + with self.firewall_policy(no_delete=True) as fwp: + fwp_id = fwp['firewall_policy']['id'] + with self.firewall(name='fw1', firewall_policy_id=fwp_id, + description='fw') as fwalls: + self._test_list_resources('firewall', [fwalls], + query_params='description=fw')