From dc6b07d4a37ef0db9906187611fec8e8753803cd Mon Sep 17 00:00:00 2001 From: Liping Mao Date: Tue, 15 Jul 2014 14:22:46 +0800 Subject: [PATCH] no quota for allowed address pair There is no quota for allowed address pair. User can create unlimited allowed address pairs. I add quota for allowed address pairs. Change-Id: I2efb0c0f527f1fb22c4d4b07f6d280863f565648 Closes-Bug: #1336207 --- neutron/extensions/allowedaddresspairs.py | 18 ++++++++++++ .../test_extension_allowedaddresspairs.py | 28 ++++++++++++++++++- 2 files changed, 45 insertions(+), 1 deletion(-) diff --git a/neutron/extensions/allowedaddresspairs.py b/neutron/extensions/allowedaddresspairs.py index a9328aaa48c..6588d5fa739 100644 --- a/neutron/extensions/allowedaddresspairs.py +++ b/neutron/extensions/allowedaddresspairs.py @@ -16,6 +16,15 @@ import webob.exc from neutron.api.v2 import attributes as attr from neutron.common import exceptions as nexception +from oslo.config import cfg + +allowed_address_pair_opts = [ + #TODO(limao): use quota framework when it support quota for attributes + cfg.IntOpt('max_allowed_address_pair', default=10, + help=_("Maximum number of allowed address pairs")), +] + +cfg.CONF.register_opts(allowed_address_pair_opts) class AllowedAddressPairsMissingIP(nexception.InvalidInput): @@ -32,8 +41,17 @@ class DuplicateAddressPairInRequest(nexception.InvalidInput): "mac_address %(mac_address)s ip_address %(ip_address)s.") +class AllowedAddressPairExhausted(nexception.BadRequest): + message = _("The number of allowed address pair " + "exceeds the maximum %(quota)s.") + + def _validate_allowed_address_pairs(address_pairs, valid_values=None): unique_check = {} + if len(address_pairs) > cfg.CONF.max_allowed_address_pair: + raise AllowedAddressPairExhausted( + quota=cfg.CONF.max_allowed_address_pair) + for address_pair in address_pairs: # mac_address is optional, if not set we use the mac on the port if 'mac_address' in address_pair: diff --git a/neutron/tests/unit/test_extension_allowedaddresspairs.py b/neutron/tests/unit/test_extension_allowedaddresspairs.py index 28dcd91b165..bcaa11b0f3a 100644 --- a/neutron/tests/unit/test_extension_allowedaddresspairs.py +++ b/neutron/tests/unit/test_extension_allowedaddresspairs.py @@ -22,6 +22,8 @@ from neutron.extensions import allowedaddresspairs as addr_pair from neutron.extensions import portsecurity as psec from neutron import manager from neutron.tests.unit import test_db_plugin +from oslo.config import cfg + DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_allowedaddresspairs.' 'AllowedAddressPairTestPlugin') @@ -159,6 +161,28 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase): 'ip_address': '10.0.0.1'}] self._create_port_with_address_pairs(address_pairs, 400) + def test_more_than_max_allowed_address_pair(self): + cfg.CONF.set_default('max_allowed_address_pair', 3) + address_pairs = [{'mac_address': '00:00:00:00:00:01', + 'ip_address': '10.0.0.1'}, + {'mac_address': '00:00:00:00:00:02', + 'ip_address': '10.0.0.2'}, + {'mac_address': '00:00:00:00:00:03', + 'ip_address': '10.0.0.3'}, + {'mac_address': '00:00:00:00:00:04', + 'ip_address': '10.0.0.4'}] + self._create_port_with_address_pairs(address_pairs, 400) + + def test_equal_to_max_allowed_address_pair(self): + cfg.CONF.set_default('max_allowed_address_pair', 3) + address_pairs = [{'mac_address': '00:00:00:00:00:01', + 'ip_address': '10.0.0.1'}, + {'mac_address': '00:00:00:00:00:02', + 'ip_address': '10.0.0.2'}, + {'mac_address': '00:00:00:00:00:03', + 'ip_address': '10.0.0.3'}] + self._create_port_with_address_pairs(address_pairs, 201) + def test_create_overlap_with_fixed_ip(self): address_pairs = [{'mac_address': '00:00:00:00:00:01', 'ip_address': '10.0.0.2'}] @@ -186,8 +210,10 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase): res = self._create_port(self.fmt, net['network']['id'], arg_list=(addr_pair.ADDRESS_PAIRS,), allowed_address_pairs=address_pairs) - self.deserialize(self.fmt, res) + port = self.deserialize(self.fmt, res) self.assertEqual(res.status_int, ret_code) + if ret_code == 201: + self._delete('ports', port['port']['id']) def test_update_add_address_pairs(self): with self.network() as net: