NSX|V3+P: limit number of ipv6 address pairs

Change-Id: Id43d3c53a9b58e919503233f2502358224b37116
This commit is contained in:
asarfaty 2020-08-16 07:41:13 +02:00 committed by Adit Sarfaty
parent 6c3b1f6536
commit b486428800
3 changed files with 110 additions and 9 deletions

View File

@ -360,7 +360,10 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_number_of_address_pairs(self, port):
address_pairs = port.get(addr_apidef.ADDRESS_PAIRS)
num_allowed_on_backend = nsxlib_consts.NUM_ALLOWED_IP_ADDRESSES
if not address_pairs:
return
num_allowed_on_backend_v4 = nsxlib_consts.NUM_ALLOWED_IP_ADDRESSES_v4
num_allowed_on_backend_v6 = nsxlib_consts.NUM_ALLOWED_IP_ADDRESSES_v6
# Counting existing ports to take into account. If no fixed ips
# are defined - we set it to 3 in order to reserve 2 fixed and another
# for DHCP.
@ -369,13 +372,26 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
existing_fixed_ips = 3
else:
existing_fixed_ips += 1
if address_pairs:
max_addr_pairs = num_allowed_on_backend - existing_fixed_ips
if len(address_pairs) > max_addr_pairs:
err_msg = (_("Maximum of %(max)s address pairs can be defined "
"for this port on the NSX backend") %
{'max': max_addr_pairs})
raise n_exc.InvalidInput(error_message=err_msg)
max_addr_pairs_v4 = num_allowed_on_backend_v4 - existing_fixed_ips
max_addr_pairs_v6 = num_allowed_on_backend_v6 - existing_fixed_ips
count_v4 = count_v6 = 0
for pair in address_pairs:
ip = pair.get('ip_address')
if ':' in ip:
count_v6 = count_v6 + 1
else:
count_v4 = count_v4 + 1
if count_v4 > max_addr_pairs_v4:
err_msg = (_("Maximum of %(max)s IPv4 address pairs can be "
"defined for this port on the NSX backend") %
{'max': max_addr_pairs_v4})
raise n_exc.InvalidInput(error_message=err_msg)
if count_v6 > max_addr_pairs_v6:
err_msg = (_("Maximum of %(max)s IPv6 address pairs can be "
"defined for this port on the NSX backend") %
{'max': max_addr_pairs_v6})
raise n_exc.InvalidInput(error_message=err_msg)
def _create_port_address_pairs(self, context, port_data):
(port_security, has_ip) = self._determine_port_security_and_has_ip(

View File

@ -90,6 +90,91 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
# Too many ipv6 pairs
cfg.CONF.set_default('max_allowed_address_pair', 300)
address_pairs = []
count = 1
while count < 17:
address_pairs.append({'ip_address': '1001::%s' % count})
count += 1
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
# Legal number of ipv6 pairs
address_pairs = []
count = 1
while count < 13:
address_pairs.append({'ip_address': '1001::%s' % count})
count += 1
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertNotIn('NeutronError', port)
self._delete('ports', port['port']['id'])
def test_create_port_allowed_address_pairs_v4(self):
with self.network() as net:
# Single IPv4
address_pairs = [{'ip_address': '10.0.0.12'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
address_pairs[0]['mac_address'] = port['port']['mac_address']
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
address_pairs)
self._delete('ports', port['port']['id'])
# IPv4 cidr
address_pairs = [{'ip_address': '10.0.0.0/24'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
print("DEBUG ADIT port %s" % port)
address_pairs[0]['mac_address'] = port['port']['mac_address']
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
address_pairs)
self._delete('ports', port['port']['id'])
# Illegal IPv4 cidr
address_pairs = [{'ip_address': '10.0.0.1/24'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
# Too many ipv4 pairs
cfg.CONF.set_default('max_allowed_address_pair', 300)
address_pairs = []
count = 1
while count < 129:
address_pairs.append({'ip_address': '10.0.0.%s' % count})
count += 1
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
# Legal number of ipv4 pairs
address_pairs = []
count = 1
while count < 125:
address_pairs.append({'ip_address': '10.0.0.%s' % count})
count += 1
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertNotIn('NeutronError', port)
self._delete('ports', port['port']['id'])
def test_update_add_bad_address_pairs_with_cidr(self):
with self.network() as net:
res = self._create_port(self.fmt, net['network']['id'])

View File

@ -101,7 +101,7 @@ class NsxPPluginTestCaseMixin(
'display_name': 'test'}]}
mock.patch(
"vmware_nsxlib.v3.policy.NsxPolicyLib.get_version",
return_value=nsx_constants.NSX_VERSION_3_0_0).start()
return_value=nsx_constants.NSX_VERSION_3_1_0).start()
mock.patch(
"vmware_nsxlib.v3.client.RESTClient.get").start()
mock.patch(