no quota for allowed address pair

There is no quota for allowed address pair. User can create unlimited
allowed address pairs. I add quota for allowed address pairs.

Change-Id: I2efb0c0f527f1fb22c4d4b07f6d280863f565648
Closes-Bug: #1336207
This commit is contained in:
Liping Mao 2014-07-15 14:22:46 +08:00
parent 474b3556a5
commit dc6b07d4a3
2 changed files with 45 additions and 1 deletions

View File

@ -16,6 +16,15 @@ import webob.exc
from neutron.api.v2 import attributes as attr from neutron.api.v2 import attributes as attr
from neutron.common import exceptions as nexception from neutron.common import exceptions as nexception
from oslo.config import cfg
allowed_address_pair_opts = [
#TODO(limao): use quota framework when it support quota for attributes
cfg.IntOpt('max_allowed_address_pair', default=10,
help=_("Maximum number of allowed address pairs")),
]
cfg.CONF.register_opts(allowed_address_pair_opts)
class AllowedAddressPairsMissingIP(nexception.InvalidInput): class AllowedAddressPairsMissingIP(nexception.InvalidInput):
@ -32,8 +41,17 @@ class DuplicateAddressPairInRequest(nexception.InvalidInput):
"mac_address %(mac_address)s ip_address %(ip_address)s.") "mac_address %(mac_address)s ip_address %(ip_address)s.")
class AllowedAddressPairExhausted(nexception.BadRequest):
message = _("The number of allowed address pair "
"exceeds the maximum %(quota)s.")
def _validate_allowed_address_pairs(address_pairs, valid_values=None): def _validate_allowed_address_pairs(address_pairs, valid_values=None):
unique_check = {} unique_check = {}
if len(address_pairs) > cfg.CONF.max_allowed_address_pair:
raise AllowedAddressPairExhausted(
quota=cfg.CONF.max_allowed_address_pair)
for address_pair in address_pairs: for address_pair in address_pairs:
# mac_address is optional, if not set we use the mac on the port # mac_address is optional, if not set we use the mac on the port
if 'mac_address' in address_pair: if 'mac_address' in address_pair:

View File

@ -22,6 +22,8 @@ from neutron.extensions import allowedaddresspairs as addr_pair
from neutron.extensions import portsecurity as psec from neutron.extensions import portsecurity as psec
from neutron import manager from neutron import manager
from neutron.tests.unit import test_db_plugin from neutron.tests.unit import test_db_plugin
from oslo.config import cfg
DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_allowedaddresspairs.' DB_PLUGIN_KLASS = ('neutron.tests.unit.test_extension_allowedaddresspairs.'
'AllowedAddressPairTestPlugin') 'AllowedAddressPairTestPlugin')
@ -159,6 +161,28 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
'ip_address': '10.0.0.1'}] 'ip_address': '10.0.0.1'}]
self._create_port_with_address_pairs(address_pairs, 400) self._create_port_with_address_pairs(address_pairs, 400)
def test_more_than_max_allowed_address_pair(self):
cfg.CONF.set_default('max_allowed_address_pair', 3)
address_pairs = [{'mac_address': '00:00:00:00:00:01',
'ip_address': '10.0.0.1'},
{'mac_address': '00:00:00:00:00:02',
'ip_address': '10.0.0.2'},
{'mac_address': '00:00:00:00:00:03',
'ip_address': '10.0.0.3'},
{'mac_address': '00:00:00:00:00:04',
'ip_address': '10.0.0.4'}]
self._create_port_with_address_pairs(address_pairs, 400)
def test_equal_to_max_allowed_address_pair(self):
cfg.CONF.set_default('max_allowed_address_pair', 3)
address_pairs = [{'mac_address': '00:00:00:00:00:01',
'ip_address': '10.0.0.1'},
{'mac_address': '00:00:00:00:00:02',
'ip_address': '10.0.0.2'},
{'mac_address': '00:00:00:00:00:03',
'ip_address': '10.0.0.3'}]
self._create_port_with_address_pairs(address_pairs, 201)
def test_create_overlap_with_fixed_ip(self): def test_create_overlap_with_fixed_ip(self):
address_pairs = [{'mac_address': '00:00:00:00:00:01', address_pairs = [{'mac_address': '00:00:00:00:00:01',
'ip_address': '10.0.0.2'}] 'ip_address': '10.0.0.2'}]
@ -186,8 +210,10 @@ class TestAllowedAddressPairs(AllowedAddressPairDBTestCase):
res = self._create_port(self.fmt, net['network']['id'], res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_pair.ADDRESS_PAIRS,), arg_list=(addr_pair.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs) allowed_address_pairs=address_pairs)
self.deserialize(self.fmt, res) port = self.deserialize(self.fmt, res)
self.assertEqual(res.status_int, ret_code) self.assertEqual(res.status_int, ret_code)
if ret_code == 201:
self._delete('ports', port['port']['id'])
def test_update_add_address_pairs(self): def test_update_add_address_pairs(self):
with self.network() as net: with self.network() as net: