NSX|V3+P: Address pair ip cannot duplicate the port fixed ip

Also exclude some other illegal ips

Change-Id: I81da2adf33d11b753d2f2ad862101d0bf94d53de
This commit is contained in:
asarfaty 2020-05-05 11:23:14 +02:00 committed by Adit Sarfaty
parent 920d93def2
commit 7d4e6ee4cd
3 changed files with 116 additions and 4 deletions

View File

@ -306,15 +306,29 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
context.elevated(), router_db.id, gw_network_id,
interface_subnet['id'], subnet=interface_subnet)
def _validate_address_pairs(self, address_pairs):
def _validate_address_pairs(self, address_pairs, fixed_ips=None):
port_ips = []
if fixed_ips:
# Make sure there are no duplications
for fixed_ip in fixed_ips:
port_ips.append(fixed_ip['ip_address'])
for pair in address_pairs:
ip = pair.get('ip_address')
# Validate ipv4 cidrs (No limitation on ipv6):
if ':' not in ip:
if len(ip.split('/')) > 1 and ip.split('/')[1] != '32':
LOG.error("cidr %s is not supported in allowed address "
LOG.error("Cidr %s is not supported in allowed address "
"pairs", ip)
raise nsx_exc.InvalidIPAddress(ip_address=ip)
if ip in port_ips:
err_msg = (_("Port cannot have duplicate values %s as part of "
"port manual bindings") % ip)
raise n_exc.InvalidInput(error_message=err_msg)
if ip in ['127.0.0.0', '0.0.0.0', '::']:
LOG.error("IP %s is not supported in allowed address "
"pairs", ip)
raise nsx_exc.InvalidIPAddress(ip_address=ip)
def _validate_number_of_address_pairs(self, port):
address_pairs = port.get(addr_apidef.ADDRESS_PAIRS)
@ -344,7 +358,8 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if not port_security:
raise addr_exc.AddressPairAndPortSecurityRequired()
else:
self._validate_address_pairs(address_pairs)
self._validate_address_pairs(
address_pairs, fixed_ips=port_data.get('fixed_ips'))
self._validate_number_of_address_pairs(port_data)
self._process_create_allowed_address_pairs(context, port_data,
address_pairs)
@ -426,7 +441,9 @@ class NsxPluginV3Base(agentschedulers_db.AZDhcpAgentSchedulerDbMixin,
if delete_addr_pairs or has_addr_pairs:
self._validate_address_pairs(
updated_port[addr_apidef.ADDRESS_PAIRS])
updated_port[addr_apidef.ADDRESS_PAIRS],
fixed_ips=(updated_port.get('fixed_ips') or
port_data.get('fixed_ips')))
# delete address pairs and read them in
self._delete_allowed_address_pairs(context, id)
self._process_create_allowed_address_pairs(

View File

@ -19,6 +19,7 @@ from oslo_config import cfg
from neutron.tests.unit.db import test_allowedaddresspairs_db as ext_pairs
from vmware_nsx.tests.unit.nsx_p import test_plugin as test_p_plugin
from vmware_nsx.tests.unit.nsx_v import test_plugin as test_nsx_v_plugin
from vmware_nsx.tests.unit.nsx_v3 import test_constants as v3_constants
from vmware_nsx.tests.unit.nsx_v3 import test_plugin as test_v3_plugin
@ -39,6 +40,56 @@ class TestAllowedAddressPairsNSXv2(test_v3_plugin.NsxV3PluginTestCaseMixin,
def test_create_port_security_false_allowed_address_pairs(self):
self.skipTest('TBD')
def test_create_overlap_with_fixed_ip(self):
self.skipTest('Not supported')
class TestAllowedAddressPairsNSXp(test_p_plugin.NsxPPluginTestCaseMixin,
ext_pairs.TestAllowedAddressPairs):
def setUp(self, plugin=test_p_plugin.PLUGIN_NAME,
ext_mgr=None,
service_plugins=None):
super(TestAllowedAddressPairsNSXp, self).setUp(
plugin=plugin, ext_mgr=ext_mgr, service_plugins=service_plugins)
def test_create_bad_address_pairs_with_cidr(self):
address_pairs = [{'mac_address': '00:00:00:00:00:01',
'ip_address': '10.0.0.1/24'}]
self._create_port_with_address_pairs(address_pairs, 400)
def test_create_port_allowed_address_pairs_v6(self):
with self.network() as net:
address_pairs = [{'ip_address': '1001::12'}]
res = self._create_port(self.fmt, net['network']['id'],
arg_list=(addr_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
port = self.deserialize(self.fmt, res)
address_pairs[0]['mac_address'] = port['port']['mac_address']
self.assertEqual(port['port'][addr_apidef.ADDRESS_PAIRS],
address_pairs)
self._delete('ports', port['port']['id'])
def test_update_add_bad_address_pairs_with_cidr(self):
with self.network() as net:
res = self._create_port(self.fmt, net['network']['id'])
port = self.deserialize(self.fmt, res)
address_pairs = [{'mac_address': '00:00:00:00:00:01',
'ip_address': '10.0.0.1/24'}]
update_port = {'port': {addr_apidef.ADDRESS_PAIRS:
address_pairs}}
req = self.new_update_request('ports', update_port,
port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(res.status_int, 400)
self._delete('ports', port['port']['id'])
def test_create_port_security_false_allowed_address_pairs(self):
self.skipTest('TBD')
def test_create_overlap_with_fixed_ip(self):
self.skipTest('Not supported')
class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
ext_pairs.TestAllowedAddressPairs):
@ -83,6 +134,9 @@ class TestAllowedAddressPairsNSXv3(test_v3_plugin.NsxV3PluginTestCaseMixin,
def test_create_port_security_false_allowed_address_pairs(self):
self.skipTest('TBD')
def test_create_overlap_with_fixed_ip(self):
self.skipTest('Not supported')
class TestAllowedAddressPairsNSXv(test_nsx_v_plugin.NsxVPluginV2TestCase,
ext_pairs.TestAllowedAddressPairs):

View File

@ -54,6 +54,7 @@ from oslo_utils import uuidutils
from webob import exc
from vmware_nsx.api_client import exception as api_exc
from vmware_nsx.common import exceptions as nsx_exc
from vmware_nsx.common import utils
from vmware_nsx.db import db as nsx_db
from vmware_nsx.plugins.nsx_v3 import plugin as nsx_plugin
@ -1129,6 +1130,46 @@ class TestPortsV2(common_v3.NsxV3SubnetMixin,
self.assertRaises(n_exc.InvalidInput,
self.plugin.create_port, self.ctx, data)
def test_fail_create_allowed_address_pairs_dup(self):
with self.network() as network, self.subnet(
network=network, cidr="1.1.1.0/24",
enable_dhcp=True) as s1:
data = {
'port': {
'network_id': network['network']['id'],
'tenant_id': self._tenant_id,
'name': 'pair_port',
'admin_state_up': True,
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [{'subnet_id': s1['subnet']['id'],
'ip_address': '1.1.1.30'}]
}
}
data['port']['allowed_address_pairs'] = [
{'ip_address': '1.1.1.30'}]
self.assertRaises(n_exc.InvalidInput,
self.plugin.create_port, self.ctx, data)
def test_fail_create_allowed_address_pairs_illegal_ip(self):
with self.network() as network, self.subnet(
network=network, enable_dhcp=True) as s1:
data = {
'port': {
'network_id': network['network']['id'],
'tenant_id': self._tenant_id,
'name': 'pair_port',
'admin_state_up': True,
'device_id': 'fake_device',
'device_owner': 'fake_owner',
'fixed_ips': [{'subnet_id': s1['subnet']['id']}]
}
}
data['port']['allowed_address_pairs'] = [
{'ip_address': '127.0.0.0'}]
self.assertRaises(nsx_exc.InvalidIPAddress,
self.plugin.create_port, self.ctx, data)
def test_fail_update_lb_port_with_fixed_ip(self):
with self.network() as network:
data = {'port': {