Ensure unit tests don't assume an IP address allocation strategy

These unit tests initially asserted sequential allocation of IP
addresses, even though they have no need to specifically assert
that a specific IP was allocated. This made it difficult to
change out the IP allocation algorithm in the future and made
these tests fragile and poorly isolated.

This change breaks the dependency these unit tests have on a
specific IP allocation strategy and isolates them from any
changes that may be made to the order in which IP addresses
are allocated on a subnet.

Change-Id: Idc879b7f1e6496aa96b4f7ae6c3eaca6079bdcac
Partial-Bug: #1543094
This commit is contained in:
Ryan Tidwell 2016-04-08 14:12:01 -07:00
parent a4985d16f6
commit cbc15d2e1d
10 changed files with 241 additions and 152 deletions

View File

@ -463,6 +463,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
security_groups=[sg2_id])
ports_rest2 = self.deserialize(self.fmt, res2)
port_id2 = ports_rest2['port']['id']
port_fixed_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
@ -476,7 +477,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': u'ingress',
'source_ip_prefix': u'10.0.0.3/32',
'source_ip_prefix': port_fixed_ip2 + '/32',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24,
@ -519,6 +520,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
security_groups=[sg2_id])
ports_rest2 = self.deserialize(self.fmt, res2)
port_id2 = ports_rest2['port']['id']
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_info_for_devices(
ctx, devices=devices)
@ -533,7 +535,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
'remote_group_id': sg2_id}
]},
'sg_member_ips': {sg2_id: {
'IPv4': set([u'10.0.0.3']),
'IPv4': set([port_ip2]),
'IPv6': set(),
}}
}
@ -1077,6 +1079,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
security_groups=[sg2_id])
port_id2 = ports_rest2['port']['id']
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_rules_for_devices(
@ -1091,7 +1094,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': 'ingress',
'source_ip_prefix': '2001:db8::2/128',
'source_ip_prefix': port_ip2 + '/128',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 25, 'port_range_min': 24,

View File

@ -891,14 +891,18 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual('myname', port['port']['name'])
with self.network(shared=True) as network:
with self.subnet(network=network) as subnet:
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertEqual(1, len(ips))
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
subnet_ip_net)
self.assertEqual('myname', port['port']['name'])
def test_create_port_as_admin(self):
with self.network() as network:
@ -938,11 +942,10 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_public_network_with_ip(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
ip_net = netaddr.IPNetwork('10.0.0.0/24')
with self.subnet(network=network, cidr=str(ip_net)):
keys = [('admin_state_up', True),
('status', self.port_create_status),
('fixed_ips', [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}])]
('status', self.port_create_status)]
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
@ -951,6 +954,8 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
port_ip = port['port']['fixed_ips'][0]['ip_address']
self.assertIn(port_ip, ip_net)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
@ -1532,7 +1537,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
@ -1550,21 +1557,24 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ip_address = '10.0.0.2'
fixed_ip_data = [{'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ip_address, ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"},
{'ip_address': "10.0.0.2"}]}}
{'ip_address': ip_address}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
self.assertIn({'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.10',
'subnet_id': subnet['subnet']['id']}, ips)
@ -1607,10 +1617,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.3',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.4',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_update_port_invalid_fixed_ip_address_v6_slaac(self):
with self.subnet(
@ -1692,10 +1703,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_duplicate_ip(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Check configuring of duplicate IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
@ -1706,10 +1718,12 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_subnet_id(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(subnet_ip_net))
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
@ -1718,7 +1732,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.3', ips[0]['ip_address'])
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
@ -1771,23 +1785,43 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
cidr_v4 = subnet['subnet']['cidr']
cidr_v6 = subnet2['subnet']['cidr']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '2607:f0d0:1002:51::2',
'subnet_id': subnet2['subnet']['id']}, ips)
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.3',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '2607:f0d0:1002:51::3',
'subnet_id': subnet2['subnet']['id']}, ips)
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def _test_requested_port_subnet_ids(self, ips, expected_subnet_ids):
self.assertEqual(set(x['subnet_id'] for x in ips),
set(expected_subnet_ids))
def _test_dual_stack_port_ip_addresses_in_subnets(self, ips, cidr_v4,
cidr_v6):
ip_net_v4 = netaddr.IPNetwork(cidr_v4)
ip_net_v6 = netaddr.IPNetwork(cidr_v6)
for address in ips:
ip_addr = netaddr.IPAddress(address['ip_address'])
expected_ip_net = ip_net_v4 if ip_addr.version == 4 else ip_net_v6
self.assertIn(ip_addr, expected_ip_net)
def test_create_port_invalid_fixed_ip_address_v6_pd_slaac(self):
with self.network(name='net') as network:
subnet = self._make_v6_subnet(
@ -1923,25 +1957,30 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
{'subnet_id': subnet2['subnet']['id']}]
) as port:
ips = port['port']['fixed_ips']
subnet1_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
subnet2_net = netaddr.IPNetwork(subnet2['subnet']['cidr'])
network_ip_set = netaddr.IPSet(subnet1_net)
network_ip_set.add(subnet2_net)
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
port_mac = port['port']['mac_address']
subnet_cidr = subnet2['subnet']['cidr']
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(
subnet_cidr, port_mac))
self.assertIn(ips[0]['ip_address'], network_ip_set)
self.assertIn(ips[1]['ip_address'], network_ip_set)
self.assertIn({'ip_address': eui_addr,
'subnet_id': subnet2['subnet']['id']}, ips)
def test_create_router_port_ipv4_and_ipv6_slaac_no_fixed_ips(self):
with self.network() as network:
# Create an IPv4 and an IPv6 SLAAC subnet on the network
with self.subnet(network),\
with self.subnet(network) as subnet_v4,\
self.subnet(network,
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
gateway_ip='fe80::1',
ipv6_address_mode=n_const.IPV6_SLAAC):
subnet_ip_net = netaddr.IPNetwork(subnet_v4['subnet']['cidr'])
# Create a router port without specifying fixed_ips
port = self._make_port(
self.fmt, network['network']['id'],
@ -1949,7 +1988,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
# Router port should only have an IPv4 address
fixed_ips = port['port']['fixed_ips']
self.assertEqual(1, len(fixed_ips))
self.assertEqual('10.0.0.2', fixed_ips[0]['ip_address'])
self.assertIn(fixed_ips[0]['ip_address'], subnet_ip_net)
@staticmethod
def _calc_ipv6_addr_by_EUI64(port, subnet):
@ -1974,15 +2013,16 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
subnet = self._make_v6_subnet(network, addr_mode, ipv6_pd)
subnet_id = subnet['subnet']['id']
fixed_ips = [{'subnet_id': subnet_id}]
with self.port(subnet=subnet, fixed_ips=fixed_ips) as port:
if addr_mode == n_const.IPV6_SLAAC:
exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet)
else:
exp_ip_addr = 'fe80::2'
port_fixed_ips = port['port']['fixed_ips']
self.assertEqual(1, len(port_fixed_ips))
self.assertEqual(exp_ip_addr,
port_fixed_ips[0]['ip_address'])
if addr_mode == n_const.IPV6_SLAAC:
exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet)
self.assertEqual(exp_ip_addr,
port_fixed_ips[0]['ip_address'])
self.assertIn(port_fixed_ips[0]['ip_address'],
netaddr.IPNetwork(subnet['subnet']['cidr']))
def test_create_port_with_ipv6_slaac_subnet_in_fixed_ips(self):
self._test_create_port_with_ipv6_subnet_in_fixed_ips(
@ -2182,10 +2222,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_invalid_fixed_ips(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Test invalid subnet_id
kwargs = {"fixed_ips":
@ -2240,41 +2281,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
def test_requested_split(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ports_to_delete = []
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.5', ips[0]['ip_address'])
self.assertEqual(subnet['subnet']['id'], ips[0]['subnet_id'])
# Allocate specific IP's
allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6']
for a in allocated:
res = self._create_port(self.fmt, net_id=net_id)
port2 = self.deserialize(self.fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(a, ips[0]['ip_address'])
self.assertEqual(subnet['subnet']['id'],
ips[0]['subnet_id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_duplicate_ips(self):
with self.subnet() as subnet:
# Allocate specific IP
@ -2306,7 +2312,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_ips_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])

View File

@ -559,9 +559,28 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
mocks['subnet'].deallocate.assert_called_once_with(auto_ip)
def test_recreate_port_ipam(self):
ip = '10.0.0.2'
with self.subnet() as subnet:
subnet_cidr = subnet['subnet']['cidr']
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
orig_ip = ips[0]['ip_address']
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(netaddr.IPNetwork(subnet_cidr)))
req = self.new_delete_request('ports', port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
with self.port(subnet=subnet, fixed_ips=ips) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(orig_ip, ips[0]['ip_address'])
def test_recreate_port_ipam_specific_ip(self):
with self.subnet() as subnet:
ip = '10.0.0.2'
fixed_ip_data = [{'subnet_id': subnet['subnet']['id'],
'ip_address': ip}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ip, ips[0]['ip_address'])

View File

@ -23,6 +23,7 @@ from neutron.common import utils
from neutron import context
from neutron.db import db_base_plugin_v2
from neutron.extensions import dns
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -121,10 +122,13 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
subnet_db = manager.NeutronManager.get_plugin().get_subnet(
context.get_admin_context(), ips[0]['subnet_id'])
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(netaddr.IPNetwork(subnet_db['cidr'])))
self.assertEqual('myname', port['port']['name'])
self._verify_dns_assigment(port['port'],
ips_list=['10.0.0.2'])
ips_list=[ips[0]['ip_address']])
def test_list_ports(self):
# for this test we need to enable overlapping ips
@ -147,6 +151,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
def test_update_port_non_default_dns_domain_with_dns_name(self):
with self.port() as port:
port_ip = port['port']['fixed_ips'][0]['ip_address']
cfg.CONF.set_override('dns_domain', 'example.com')
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
req = self.new_update_request('ports', data, port['port']['id'])
@ -154,18 +159,19 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self._verify_dns_assigment(res['port'],
ips_list=['10.0.0.2'],
ips_list=[port_ip],
dns_name='vm1')
def test_update_port_default_dns_domain_with_dns_name(self):
with self.port() as port:
port_ip = port['port']['fixed_ips'][0]['ip_address']
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self._verify_dns_assigment(res['port'],
ips_list=['10.0.0.2'])
ips_list=[port_ip])
def _verify_dns_assigment(self, port, ips_list=None, exp_ips_ipv4=0,
exp_ips_ipv6=0, ipv4_cidrs=None, ipv6_cidrs=None,
@ -254,10 +260,10 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2'}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
@ -273,10 +279,10 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2'}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],

View File

@ -78,7 +78,8 @@ class ExtraRouteDBTestCaseBase(object):
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
@ -88,8 +89,11 @@ class ExtraRouteDBTestCaseBase(object):
def test_route_update_with_external_route(self):
my_tenant = 'tenant1'
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet:
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet,\
self.port(subnet=ext_subnet) as nexthop_port:
nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': nexthop_ip}]
self._set_net_external(ext_subnet['subnet']['network_id'])
ext_info = {'network_id': ext_subnet['subnet']['network_id']}
with self.router(
@ -101,8 +105,11 @@ class ExtraRouteDBTestCaseBase(object):
def test_route_update_with_route_via_another_tenant_subnet(self):
my_tenant = 'tenant1'
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet:
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet,\
self.port(subnet=subnet) as nexthop_port:
nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': nexthop_ip}]
with self.router(tenant_id=my_tenant) as r:
body = self._routes_update_prepare(
r['router']['id'], subnet['subnet']['id'], None, routes,
@ -118,7 +125,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
self._routes_update_prepare(r['router']['id'],
None, p['port']['id'], routes)
body = self._update('routers', r['router']['id'],
@ -132,7 +140,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
@ -156,7 +165,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
@ -168,14 +178,17 @@ class ExtraRouteDBTestCaseBase(object):
None, r['router']['id'], [])
def test_routes_update_for_multiple_routers(self):
routes1 = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.0.3'}]
routes2 = [{'destination': '12.0.0.0/8',
'nexthop': '10.0.0.4'}]
with self.router() as r1,\
self.router() as r2,\
self.subnet(cidr='10.0.0.0/24') as s:
with self.port(subnet=s) as p1, self.port(subnet=s) as p2:
with self.port(subnet=s) as p1,\
self.port(subnet=s) as p2:
p1_ip = p1['port']['fixed_ips'][0]['ip_address']
p2_ip = p2['port']['fixed_ips'][0]['ip_address']
routes1 = [{'destination': '135.207.0.0/16',
'nexthop': p2_ip}]
routes2 = [{'destination': '12.0.0.0/8',
'nexthop': p1_ip}]
body = self._routes_update_prepare(r1['router']['id'],
None, p1['port']['id'],
routes1)
@ -204,7 +217,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes_orig)
@ -435,22 +449,23 @@ class ExtraRouteDBTestCaseBase(object):
port_list = self.deserialize('json', port_res)
self.assertEqual(1, len(port_list['ports']))
routes = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.1.3'}]
with self.port(subnet=s) as p:
next_hop = p['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': next_hop}]
body = self._update('routers', r['router']['id'],
{'router': {'routes':
routes}})
body = self._update('routers', r['router']['id'],
{'router': {'routes':
routes}})
body = self._show('routers', r['router']['id'])
self.assertEqual(routes, body['router']['routes'])
body = self._show('routers', r['router']['id'])
self.assertEqual(routes, body['router']['routes'])
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
def test_router_list_with_sort(self):
with self.router(name='router1') as router1,\

View File

@ -2478,11 +2478,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
) as private_port:
fp1 = self._make_floatingip(self.fmt, network_ex_id1,
private_port['port']['id'],
floating_ip='10.0.0.3')
private_port['port']['id'])
fp2 = self._make_floatingip(self.fmt, network_ex_id2,
private_port['port']['id'],
floating_ip='11.0.0.3')
private_port['port']['id'])
self.assertEqual(fp1['floatingip']['router_id'],
r1['router']['id'])
self.assertEqual(fp2['floatingip']['router_id'],
@ -2519,8 +2517,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
) as private_port:
fp = self._make_floatingip(self.fmt, network_ex_id,
private_port['port']['id'],
floating_ip='10.0.0.8')
private_port['port']['id'])
self.assertEqual(r1['router']['id'],
fp['floatingip']['router_id'])
@ -2681,8 +2678,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n):
self._set_net_external(n['network']['id'])
fip = self._make_floatingip(self.fmt, n['network']['id'])
self.assertEqual('192.168.1.2',
fip['floatingip']['floating_ip_address'])
fip_set = netaddr.IPSet(netaddr.IPNetwork("192.168.1.0/24"))
fip_ip = fip['floatingip']['floating_ip_address']
self.assertTrue(netaddr.IPAddress(fip_ip) in fip_set)
def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
with self.subnet() as public_sub:

View File

@ -355,20 +355,16 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
return ipam_subnet.allocate(address_request)
def test_allocate_any_v4_address_succeeds(self):
ip_address = self._allocate_address(
'10.0.0.0/24', 4, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
self.assertEqual('10.0.0.2', ip_address)
self._test_allocate_any_address_succeeds('10.0.0.0/24', 4)
def test_allocate_any_v6_address_succeeds(self):
self._test_allocate_any_address_succeeds('fde3:abcd:4321:1::/64', 6)
def _test_allocate_any_address_succeeds(self, subnet_cidr, ip_version):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
self.assertEqual('fde3:abcd:4321:1::2', ip_address)
subnet_cidr, ip_version, ipam_req.AnyAddressRequest)
self.assertIn(netaddr.IPAddress(ip_address),
netaddr.IPSet(netaddr.IPNetwork(subnet_cidr)))
def test_allocate_specific_v4_address_succeeds(self):
ip_address = self._allocate_address(

View File

@ -636,7 +636,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
p1_ip = p1['fixed_ips'][0]['ip_address']
self.mock_fanout.reset_mock()
device = 'tap' + p1['id']
@ -667,7 +667,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
'20.0.0.1': [
l2pop_rpc.PortInfo('00:00:00:00:00:00',
'0.0.0.0'),
l2pop_rpc.PortInfo(new_mac, '10.0.0.2')
l2pop_rpc.PortInfo(new_mac, p1_ip)
]
}
}
@ -680,9 +680,12 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
with self.subnet(network=self._network) as subnet:
host_arg = {portbindings.HOST_ID: HOST}
fixed_ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]
with self.port(subnet=subnet, cidr='10.0.0.0/24',
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
fixed_ips=fixed_ips,
**host_arg) as port1:
p1 = port1['port']

View File

@ -20,6 +20,7 @@ from neutron import context
from neutron.db import dns_db
from neutron.extensions import dns
from neutron.extensions import providernet as pnet
from neutron import manager
from neutron.plugins.ml2 import config
from neutron.plugins.ml2.extensions import dns_integration
from neutron.tests.unit.plugins.ml2 import test_plugin
@ -51,6 +52,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
super(DNSIntegrationTestCase, self).setUp()
dns_integration.DNS_DRIVER = None
dns_integration.subscribe()
self.plugin = manager.NeutronManager.get_plugin()
def _create_port_for_test(self, provider_net=True, dns_domain=True,
dns_name=True, ipv4=True, ipv6=True):
@ -69,11 +71,13 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
**net_kwargs)
network = self.deserialize(self.fmt, res)
if ipv4:
self._create_subnet(self.fmt, network['network']['id'],
'10.0.0.0/24', ip_version=4)
cidr = '10.0.0.0/24'
self._create_subnet_for_test(network['network']['id'], cidr)
if ipv6:
self._create_subnet(self.fmt, network['network']['id'],
'fd3d:bdd4:da60::/64', ip_version=6)
cidr = 'fd3d:bdd4:da60::/64'
self._create_subnet_for_test(network['network']['id'], cidr)
port_kwargs = {}
if dns_name:
port_kwargs = {
@ -90,6 +94,19 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
port_id=port['id']).one_or_none()
return network['network'], port, dns_data_db
def _create_subnet_for_test(self, network_id, cidr):
ip_net = netaddr.IPNetwork(cidr)
# initialize the allocation_pool to the lower half of the subnet
subnet_size = ip_net.last - ip_net.first
subnet_mid_point = int(ip_net.first + subnet_size / 2)
start, end = (netaddr.IPAddress(ip_net.first + 2),
netaddr.IPAddress(subnet_mid_point))
allocation_pools = [{'start': str(start),
'end': str(end)}]
return self._create_subnet(self.fmt, network_id,
str(ip_net), ip_version=ip_net.ip.version,
allocation_pools=allocation_pools)
def _update_port_for_test(self, port, new_dns_name=NEWDNSNAME,
**kwargs):
mock_client.reset_mock()
@ -100,7 +117,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
recordsets = []
if records_v4:
recordsets.append({'id': V4UUID, 'records': records_v4})
if records_v4:
if records_v6:
recordsets.append({'id': V6UUID, 'records': records_v6})
mock_client.recordsets.list.return_value = recordsets
mock_admin_client.reset_mock()
@ -362,10 +379,26 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
config.cfg.CONF.set_override('dns_domain', DNSDOMAIN)
net, port, dns_data_db = self._create_port_for_test()
original_ips = [ip['ip_address'] for ip in port['fixed_ips']]
ctx = context.get_admin_context()
kwargs = {'fixed_ips': []}
for ip in port['fixed_ips']:
# Since this tests using an "any" IP allocation to update the port
# IP address, change the allocation pools so that IPAM won't ever
# give us back the IP address we originally had.
subnet = self.plugin.get_subnet(ctx, ip['subnet_id'])
ip_net = netaddr.IPNetwork(subnet['cidr'])
subnet_size = ip_net.last - ip_net.first
subnet_mid_point = int(ip_net.first + subnet_size / 2)
start, end = (netaddr.IPAddress(subnet_mid_point + 1),
netaddr.IPAddress(ip_net.last - 1))
allocation_pools = [{'start': str(start), 'end': str(end)}]
body = {'allocation_pools': allocation_pools}
req = self.new_update_request('subnets', {'subnet': body},
ip['subnet_id'])
req.get_response(self.api)
kwargs['fixed_ips'].append(
{'subnet_id': ip['subnet_id']})
port, dns_data_db = self._update_port_for_test(port,
new_dns_name=None,
**kwargs)

View File

@ -559,7 +559,9 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_update_port_fixed_ip_changed(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
with self.port() as port, mock.patch.object(
fixed_ip_data = [{'ip_address': '10.0.0.4'}]
with self.port(fixed_ips=fixed_ip_data) as port,\
mock.patch.object(
plugin.notifier,
'security_groups_member_updated') as sg_member_update:
port['port']['fixed_ips'][0]['ip_address'] = '10.0.0.3'
@ -829,14 +831,20 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
self._test_operation_resillient_to_ipallocation_failure(make_port)
def test_port_update_resillient_to_duplicate_records(self):
with self.port() as p:
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}}
req = self.new_update_request('ports', data, p['port']['id'])
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2', 'end': '10.0.0.8'}]
with self.subnet(cidr=cidr,
allocation_pools=allocation_pools) as subnet:
with self.port(subnet=subnet) as p:
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}}
req = self.new_update_request('ports', data, p['port']['id'])
def do_request():
self.assertEqual(200, req.get_response(self.api).status_int)
def do_request():
self.assertEqual(200,
req.get_response(self.api).status_int)
self._test_operation_resillient_to_ipallocation_failure(do_request)
self._test_operation_resillient_to_ipallocation_failure(
do_request)
def _test_operation_resillient_to_ipallocation_failure(self, func):
from sqlalchemy import event