NSX|V3+P: Validate allowed address pairs ipv6 cidr

Change-Id: Ib9085da9ff64c81d45d7e2a2c1a5542ab69bcaa9
This commit is contained in:
asarfaty 2020-05-28 11:59:50 +02:00 committed by Adit Sarfaty
parent 6d89e45016
commit bb6ee137f8
2 changed files with 55 additions and 2 deletions

View File

@ -13,6 +13,8 @@
# License for the specific language governing permissions and limitations
# under the License.
import ipaddress
import decorator
import mock
import netaddr
@ -321,8 +323,19 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
def _validate_address_pairs(self, address_pairs):
for pair in address_pairs:
ip = pair.get('ip_address')
# Validate ipv4 cidrs (No limitation on ipv6):
if ':' not in ip:
if ':' in ip:
# Validate ipv6 cidrs:
ip_split = ip.split('/')
if len(ip_split) > 1 and ip_split[1] != '128':
try:
ipaddress.ip_network(ip)
except ValueError:
# This means the host bits are set
err_msg = (_("Allowed address pairs Cidr %s cannot "
"have host bits set") % ip)
raise n_exc.InvalidInput(error_message=err_msg)
else:
# Validate ipv4 cidrs (No limitation on ipv6):
if len(ip.split('/')) > 1 and ip.split('/')[1] != '32':
LOG.error("cidr %s is not supported in allowed address "
"pairs", ip)

View File

@ -60,6 +60,7 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
def test_create_port_allowed_address_pairs_v6(self):
with self.network() as net:
# Single IPv6
address_pairs = [{'ip_address': '1001::12'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
@ -70,6 +71,25 @@ class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
address_pairs)
self._delete('ports', port['port']['id'])
# IPv6 cidr
address_pairs = [{'ip_address': '1001::/64'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
address_pairs[0]['mac_address'] = port['port']['mac_address']
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
address_pairs)
self._delete('ports', port['port']['id'])
# Illegal IPv6 cidr
address_pairs = [{'ip_address': '1001::12/64'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
def test_update_add_bad_address_pairs_with_cidr(self):
with self.network() as net:
res = self._create_port(self.fmt, net['network']['id'])
@ -116,6 +136,7 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
def test_create_port_allowed_address_pairs_v6(self):
with self.network() as net:
# Single IPv6 address
address_pairs = [{'ip_address': '1001::12'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
@ -126,6 +147,25 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
address_pairs)
self._delete('ports', port['port']['id'])
# IPv6 cidr
address_pairs = [{'ip_address': '1001::/64'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
address_pairs[0]['mac_address'] = port['port']['mac_address']
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
address_pairs)
self._delete('ports', port['port']['id'])
# Illegal IPv6 cidr
address_pairs = [{'ip_address': '1001::12/64'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
self.assertIn('NeutronError', port)
def test_update_add_bad_address_pairs_with_cidr(self):
with self.network() as net:
res = self._create_port(self.fmt, net['network']['id'])