Merge "Fix FWaaS plugin to allow one firewall per tenant"

This commit is contained in:
Jenkins 2013-09-25 03:06:27 +00:00 committed by Gerrit Code Review
commit 7539477a97
2 changed files with 36 additions and 10 deletions

View File

@ -19,6 +19,7 @@
from oslo.config import cfg
from neutron.common import exceptions as n_exception
from neutron.common import rpc as q_rpc
from neutron.common import topics
from neutron import context as neutron_context
@ -127,6 +128,17 @@ class FirewallAgentApi(proxy.RpcProxy):
)
class FirewallCountExceeded(n_exception.NeutronException):
"""Reference implementation specific exception for firewall count.
Only one firewall is supported per tenant. When a second
firewall is tried to be created, this exception will be raised.
"""
message = _("Exceeded allowed count of firewalls for tenant "
"%(tenant_id)s. Only one firewall is supported per tenant.")
class FirewallPlugin(firewall_db.Firewall_db_mixin):
"""Implementation of the Neutron Firewall Service Plugin.
@ -209,6 +221,11 @@ class FirewallPlugin(firewall_db.Firewall_db_mixin):
def create_firewall(self, context, firewall):
LOG.debug(_("create_firewall() called"))
tenant_id = self._get_tenant_id_for_create(context,
firewall['firewall'])
fw_count = self.get_firewalls_count(context)
if fw_count:
raise FirewallCountExceeded(tenant_id=tenant_id)
firewall['firewall']['status'] = const.PENDING_CREATE
fw = super(FirewallPlugin, self).create_firewall(context, firewall)
fw_with_rules = (

View File

@ -140,16 +140,10 @@ class TestFirewallCallbacks(test_db_firewall.FirewallPluginDbTestCase):
fwp_id = fwp['firewall_policy']['id']
attrs = self._get_test_firewall_attrs()
attrs['firewall_policy_id'] = fwp_id
with contextlib.nested(self.firewall(
firewall_policy_id=fwp_id,
tenant_id=tenant_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
no_delete=True), self.firewall(
firewall_policy_id=fwp_id,
tenant_id=tenant_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
no_delete=True)) as fws:
fw_list = [fw['firewall'] for fw in fws]
with self.firewall(firewall_policy_id=fwp_id, tenant_id=tenant_id,
admin_state_up=test_db_firewall.ADMIN_STATE_UP,
no_delete=True) as fw:
fw_list = [fw['firewall']]
f = self.callbacks.get_firewalls_for_tenant_without_rules
res = f(ctx, host='dummy')
for fw in res:
@ -202,6 +196,13 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
super(TestFirewallPluginBase, self).setUp(fw_plugin=FW_PLUGIN_KLASS)
self.callbacks = self.plugin.callbacks
def test_create_second_firewall_not_permitted(self):
with self.firewall(no_delete=True):
res = self._create_firewall(
None, 'firewall2', description='test',
firewall_policy_id=None, admin_state_up=True)
self.assertEqual(res.status_int, 500)
def test_update_firewall(self):
ctx = context.get_admin_context()
name = "new_firewall1"
@ -352,3 +353,11 @@ class TestFirewallPluginBase(test_db_firewall.TestFirewallDBPlugin):
fw_id = fw['firewall']['id']
fw_rules = self.plugin._make_firewall_dict_with_rules(ctx, fw_id)
self.assertEqual(fw_rules['firewall_rule_list'], [])
def test_list_firewalls(self):
with self.firewall_policy(no_delete=True) as fwp:
fwp_id = fwp['firewall_policy']['id']
with self.firewall(name='fw1', firewall_policy_id=fwp_id,
description='fw') as fwalls:
self._test_list_resources('firewall', [fwalls],
query_params='description=fw')