NSX|V3+P: prevent overlapping address pairs

Change-Id: Ic5c1cad47a5b646a1404b3bd94f11922598268c4
This commit is contained in:
asarfaty 2020-06-21 09:34:32 +02:00 committed by Adit Sarfaty
parent 233ac3988e
commit 5f9936d9b4
2 changed files with 27 additions and 0 deletions

View File

@ -326,6 +326,7 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_address_pairs(self, address_pairs, fixed_ips=None): def _validate_address_pairs(self, address_pairs, fixed_ips=None):
port_ips = [] port_ips = []
pairs_ips = []
if fixed_ips: if fixed_ips:
# Make sure there are no duplications # Make sure there are no duplications
for fixed_ip in fixed_ips: for fixed_ip in fixed_ips:
@ -345,6 +346,14 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
err_msg = (_("Allowed address pairs Cidr %s cannot " err_msg = (_("Allowed address pairs Cidr %s cannot "
"have host bits set") % ip) "have host bits set") % ip)
raise n_exc.InvalidInput(error_message=err_msg) raise n_exc.InvalidInput(error_message=err_msg)
# verify no overlaps in ipv6 addresses
current_set = netaddr.IPSet(port_ips + pairs_ips)
if netaddr.IPSet([ip]) & current_set:
err_msg = (_("Allowed address pairs %s cannot overlap "
"with port ips or other address pairs") % ip)
raise n_exc.InvalidInput(error_message=err_msg)
pairs_ips.append(ip)
else: else:
# IPv4 address pair # IPv4 address pair
if len(ip.split('/')) > 1 and ip.split('/')[1] != '32': if len(ip.split('/')) > 1 and ip.split('/')[1] != '32':

View File

@ -90,6 +90,24 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
port = self.deserialize(self.fmt, res) port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port) self.assertIn('NeutronError', port)
# overlapping ips
address_pairs = [{'ip_address': '1001::/64'},
{'ip_address': '1001::/128'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
# identical ips
address_pairs = [{'ip_address': '1001::'},
{'ip_address': '1001::/128'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
def test_update_add_bad_address_pairs_with_cidr(self): def test_update_add_bad_address_pairs_with_cidr(self):
with self.network() as net: with self.network() as net:
res = self._create_port(self.fmt, net['network']['id']) res = self._create_port(self.fmt, net['network']['id'])