NSX|V: Restrict creating conflicting address_pair in the same network

Restrincting the user of creating an address_pair with an IP address
that might already exist in the network, either as another pair
or fixed_ip

Change-Id: I49924b3d769e87d6e80157b723a4b26fa2c957a3
Signed-off-by: Michal Kelner Mishali <mkelnermishal@vmware.com>
This commit is contained in:
Michal Kelner Mishali 2019-03-18 14:50:00 +02:00
parent 8426b63a4c
commit 1a4fe2dca3
2 changed files with 56 additions and 0 deletions

View File

@ -1959,7 +1959,42 @@ class NsxVPluginV2(addr_pair_db.AllowedAddressPairsMixin,
return net_res
def _validate_unique_address_pair_across_network(self, context,
port, address_pairs):
network_id = port['network_id']
filters = {'network_id': [network_id]}
valid_existing_ports = []
existing_fixed_and_addr_pairs = []
for exist_port in self.get_ports(context, filters=filters):
if exist_port['id'] != port['id']:
valid_existing_ports.append(exist_port)
for valid_port in valid_existing_ports:
for fixed in valid_port.get('fixed_ips', []):
existing_fixed_and_addr_pairs.append(fixed['ip_address'])
for addr_pair in valid_port.get('allowed_address_pairs', []):
existing_fixed_and_addr_pairs.append(addr_pair['ip_address'])
fixed_ips_list = port.get('fixed_ips', [])
# validate ip collision with fixed ips
for fixed_ip in fixed_ips_list:
ip = fixed_ip.get('ip_address')
if ip in existing_fixed_and_addr_pairs:
msg = _('IP address %s entered as fixed ip already '
'exists in the network. Duplicate IP addresses is not '
'supported at backend') % ip
raise n_exc.InvalidInput(error_message=msg)
# validate ip collision with address pair
for pair in address_pairs:
ip = pair.get('ip_address')
if ip in existing_fixed_and_addr_pairs:
msg = _('IP address %s entered as address pair already '
'exists in the network. Duplicate IP addresses is not '
'supported at backend') % ip
raise n_exc.InvalidInput(error_message=msg)
def _validate_address_pairs(self, context, attrs, db_port):
self._validate_unique_address_pair_across_network(
context, db_port, attrs[addr_apidef.ADDRESS_PAIRS])
network_port_security = self._get_network_security_binding(
context, db_port['network_id'])
if (not cfg.CONF.nsxv.allow_multiple_ip_addresses and

View File

@ -4556,6 +4556,27 @@ class TestNSXvAllowedAddressPairs(NsxVPluginV2TestCase,
arg_list=(addrp_apidef.ADDRESS_PAIRS,),
allowed_address_pairs=address_pairs)
def test_create_port_with_address_pair_existing_fixed_ip_fail(self):
address_pairs1 = [{'ip_address': '10.0.0.2'}]
with self.network() as network:
with self.subnet(network=network, cidr='10.0.0.0/24',
enable_dhcp=False) as subnet:
fixed_ips1 = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.4'}]
fixed_ips2 = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]
self._create_port(self.fmt, network['network']['id'],
arg_list=(addrp_apidef.ADDRESS_PAIRS,
'fixed_ips'),
allowed_address_pairs=address_pairs1,
fixed_ips=fixed_ips1)
res = self._create_port(self.fmt, network['network']['id'],
arg_list=(addrp_apidef.ADDRESS_PAIRS,
'fixed_ips'),
allowed_address_pairs=address_pairs1,
fixed_ips=fixed_ips2)
self.assertEqual(res.status_int, 400)
class TestNSXPortSecurity(test_psec.TestPortSecurity,
NsxVPluginV2TestCase):