From 5f9936d9b4c181e1f0bbc0562e596d436e04bbd8 Mon Sep 17 00:00:00 2001 From: asarfaty Date: Sun, 21 Jun 2020 09:34:32 +0200 Subject: [PATCH] NSX|V3+P: prevent overlapping address pairs Change-Id: Ic5c1cad47a5b646a1404b3bd94f11922598268c4 --- vmware_nsx/plugins/common_v3/plugin.py | 9 +++++++++ .../tests/unit/extensions/test_addresspairs.py | 18 ++++++++++++++++++ 2 files changed, 27 insertions(+) diff --git a/vmware_nsx/plugins/common_v3/plugin.py b/vmware_nsx/plugins/common_v3/plugin.py index 363f795f25..d586d37463 100644 --- a/vmware_nsx/plugins/common_v3/plugin.py +++ b/vmware_nsx/plugins/common_v3/plugin.py @@ -326,6 +326,7 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, def _validate_address_pairs(self, address_pairs, fixed_ips=None): port_ips = [] + pairs_ips = [] if fixed_ips: # Make sure there are no duplications for fixed_ip in fixed_ips: @@ -345,6 +346,14 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin, err_msg = (_("Allowed address pairs Cidr %s cannot " "have host bits set") % ip) raise n_exc.InvalidInput(error_message=err_msg) + # verify no overlaps in ipv6 addresses + current_set = netaddr.IPSet(port_ips + pairs_ips) + if netaddr.IPSet([ip]) & current_set: + err_msg = (_("Allowed address pairs %s cannot overlap " + "with port ips or other address pairs") % ip) + raise n_exc.InvalidInput(error_message=err_msg) + + pairs_ips.append(ip) else: # IPv4 address pair if len(ip.split('/')) > 1 and ip.split('/')[1] != '32': diff --git a/vmware_nsx/tests/unit/extensions/test_addresspairs.py b/vmware_nsx/tests/unit/extensions/test_addresspairs.py index eb553db1d4..8ed263761f 100644 --- a/vmware_nsx/tests/unit/extensions/test_addresspairs.py +++ b/vmware_nsx/tests/unit/extensions/test_addresspairs.py @@ -90,6 +90,24 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin, port = self.deserialize(self.fmt, res) self.assertIn('NeutronError', port) + # overlapping ips + address_pairs = [{'ip_address': '1001::/64'}, + {'ip_address': '1001::/128'}] + res = self._create_port(self.fmt, net['network']['id'], + arg_list=(addr_apidef.ADDRESS_PAIRS,), + allowed_address_pairs=address_pairs) + port = self.deserialize(self.fmt, res) + self.assertIn('NeutronError', port) + + # identical ips + address_pairs = [{'ip_address': '1001::'}, + {'ip_address': '1001::/128'}] + res = self._create_port(self.fmt, net['network']['id'], + arg_list=(addr_apidef.ADDRESS_PAIRS,), + allowed_address_pairs=address_pairs) + port = self.deserialize(self.fmt, res) + self.assertIn('NeutronError', port) + def test_update_add_bad_address_pairs_with_cidr(self): with self.network() as net: res = self._create_port(self.fmt, net['network']['id'])