Merge "Ensure unit tests don't assume an IP address allocation strategy"

This commit is contained in:
Jenkins 2016-05-18 19:18:01 +00:00 committed by Gerrit Code Review
commit b934db80c6
10 changed files with 241 additions and 152 deletions

View File

@ -463,6 +463,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
security_groups=[sg2_id])
ports_rest2 = self.deserialize(self.fmt, res2)
port_id2 = ports_rest2['port']['id']
port_fixed_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_rules_for_devices(
ctx, devices=devices)
@ -476,7 +477,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': u'ingress',
'source_ip_prefix': u'10.0.0.3/32',
'source_ip_prefix': port_fixed_ip2 + '/32',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv4,
'port_range_max': 25, 'port_range_min': 24,
@ -519,6 +520,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
security_groups=[sg2_id])
ports_rest2 = self.deserialize(self.fmt, res2)
port_id2 = ports_rest2['port']['id']
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_info_for_devices(
ctx, devices=devices)
@ -533,7 +535,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
'remote_group_id': sg2_id}
]},
'sg_member_ips': {sg2_id: {
'IPv4': set([u'10.0.0.3']),
'IPv4': set([port_ip2]),
'IPv6': set(),
}}
}
@ -1077,6 +1079,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
fixed_ips=[{'subnet_id': subnet_v6['subnet']['id']}],
security_groups=[sg2_id])
port_id2 = ports_rest2['port']['id']
port_ip2 = ports_rest2['port']['fixed_ips'][0]['ip_address']
ctx = context.get_admin_context()
ports_rpc = self.rpc.security_group_rules_for_devices(
@ -1091,7 +1094,7 @@ class SGServerRpcCallBackTestCase(test_sg.SecurityGroupDBTestCase):
{'direction': 'egress', 'ethertype': const.IPv6,
'security_group_id': sg2_id},
{'direction': 'ingress',
'source_ip_prefix': '2001:db8::2/128',
'source_ip_prefix': port_ip2 + '/128',
'protocol': const.PROTO_NAME_TCP,
'ethertype': const.IPv6,
'port_range_max': 25, 'port_range_min': 24,

View File

@ -891,14 +891,18 @@ class TestV2HTTPResponse(NeutronDbPluginV2TestCase):
class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_json(self):
keys = [('admin_state_up', True), ('status', self.port_create_status)]
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual('myname', port['port']['name'])
with self.network(shared=True) as network:
with self.subnet(network=network) as subnet:
with self.port(name='myname') as port:
for k, v in keys:
self.assertEqual(port['port'][k], v)
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertEqual(1, len(ips))
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
subnet_ip_net)
self.assertEqual('myname', port['port']['name'])
def test_create_port_as_admin(self):
with self.network() as network:
@ -938,11 +942,10 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
def test_create_port_public_network_with_ip(self):
with self.network(shared=True) as network:
with self.subnet(network=network, cidr='10.0.0.0/24') as subnet:
ip_net = netaddr.IPNetwork('10.0.0.0/24')
with self.subnet(network=network, cidr=str(ip_net)):
keys = [('admin_state_up', True),
('status', self.port_create_status),
('fixed_ips', [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}])]
('status', self.port_create_status)]
port_res = self._create_port(self.fmt,
network['network']['id'],
webob.exc.HTTPCreated.code,
@ -951,6 +954,8 @@ class TestPortsV2(NeutronDbPluginV2TestCase):
port = self.deserialize(self.fmt, port_res)
for k, v in keys:
self.assertEqual(port['port'][k], v)
port_ip = port['port']['fixed_ips'][0]['ip_address']
self.assertIn(port_ip, ip_net)
self.assertIn('mac_address', port['port'])
self._delete('ports', port['port']['id'])
@ -1532,7 +1537,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
@ -1550,21 +1557,24 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ip_address = '10.0.0.2'
fixed_ip_data = [{'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ip_address, ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
'ip_address': "10.0.0.10"},
{'ip_address': "10.0.0.2"}]}}
{'ip_address': ip_address}]}}
req = self.new_update_request('ports', data,
port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
self.assertIn({'ip_address': ip_address,
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.10',
'subnet_id': subnet['subnet']['id']}, ips)
@ -1607,10 +1617,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res['port']['admin_state_up'])
ips = res['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.3',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '10.0.0.4',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertNotEqual(ips[0]['ip_address'],
ips[1]['ip_address'])
network_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
self.assertIn(ips[0]['ip_address'], network_ip_net)
self.assertIn(ips[1]['ip_address'], network_ip_net)
def test_update_port_invalid_fixed_ip_address_v6_slaac(self):
with self.subnet(
@ -1692,10 +1703,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_duplicate_ip(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Check configuring of duplicate IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
@ -1706,10 +1718,12 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_subnet_id(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(subnet_ip_net))
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Request a IP from specific subnet
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id']}]}
@ -1718,7 +1732,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
port2 = self.deserialize(self.fmt, res)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.3', ips[0]['ip_address'])
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
self._delete('ports', port2['port']['id'])
@ -1771,23 +1785,43 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port3 = self.deserialize(self.fmt, res)
ips = port3['port']['fixed_ips']
cidr_v4 = subnet['subnet']['cidr']
cidr_v6 = subnet2['subnet']['cidr']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '2607:f0d0:1002:51::2',
'subnet_id': subnet2['subnet']['id']}, ips)
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
res = self._create_port(self.fmt, net_id=net_id)
port4 = self.deserialize(self.fmt, res)
# Check that a v4 and a v6 address are allocated
ips = port4['port']['fixed_ips']
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.3',
'subnet_id': subnet['subnet']['id']}, ips)
self.assertIn({'ip_address': '2607:f0d0:1002:51::3',
'subnet_id': subnet2['subnet']['id']}, ips)
self._test_requested_port_subnet_ids(ips,
[subnet['subnet']['id'],
subnet2['subnet']['id']])
self._test_dual_stack_port_ip_addresses_in_subnets(ips,
cidr_v4,
cidr_v6)
self._delete('ports', port3['port']['id'])
self._delete('ports', port4['port']['id'])
def _test_requested_port_subnet_ids(self, ips, expected_subnet_ids):
self.assertEqual(set(x['subnet_id'] for x in ips),
set(expected_subnet_ids))
def _test_dual_stack_port_ip_addresses_in_subnets(self, ips, cidr_v4,
cidr_v6):
ip_net_v4 = netaddr.IPNetwork(cidr_v4)
ip_net_v6 = netaddr.IPNetwork(cidr_v6)
for address in ips:
ip_addr = netaddr.IPAddress(address['ip_address'])
expected_ip_net = ip_net_v4 if ip_addr.version == 4 else ip_net_v6
self.assertIn(ip_addr, expected_ip_net)
def test_create_port_invalid_fixed_ip_address_v6_pd_slaac(self):
with self.network(name='net') as network:
subnet = self._make_v6_subnet(
@ -1923,25 +1957,30 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
{'subnet_id': subnet2['subnet']['id']}]
) as port:
ips = port['port']['fixed_ips']
subnet1_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
subnet2_net = netaddr.IPNetwork(subnet2['subnet']['cidr'])
network_ip_set = netaddr.IPSet(subnet1_net)
network_ip_set.add(subnet2_net)
self.assertEqual(2, len(ips))
self.assertIn({'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}, ips)
port_mac = port['port']['mac_address']
subnet_cidr = subnet2['subnet']['cidr']
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(
subnet_cidr, port_mac))
self.assertIn(ips[0]['ip_address'], network_ip_set)
self.assertIn(ips[1]['ip_address'], network_ip_set)
self.assertIn({'ip_address': eui_addr,
'subnet_id': subnet2['subnet']['id']}, ips)
def test_create_router_port_ipv4_and_ipv6_slaac_no_fixed_ips(self):
with self.network() as network:
# Create an IPv4 and an IPv6 SLAAC subnet on the network
with self.subnet(network),\
with self.subnet(network) as subnet_v4,\
self.subnet(network,
cidr='2607:f0d0:1002:51::/64',
ip_version=6,
gateway_ip='fe80::1',
ipv6_address_mode=n_const.IPV6_SLAAC):
subnet_ip_net = netaddr.IPNetwork(subnet_v4['subnet']['cidr'])
# Create a router port without specifying fixed_ips
port = self._make_port(
self.fmt, network['network']['id'],
@ -1949,7 +1988,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
# Router port should only have an IPv4 address
fixed_ips = port['port']['fixed_ips']
self.assertEqual(1, len(fixed_ips))
self.assertEqual('10.0.0.2', fixed_ips[0]['ip_address'])
self.assertIn(fixed_ips[0]['ip_address'], subnet_ip_net)
@staticmethod
def _calc_ipv6_addr_by_EUI64(port, subnet):
@ -1974,15 +2013,16 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
subnet = self._make_v6_subnet(network, addr_mode, ipv6_pd)
subnet_id = subnet['subnet']['id']
fixed_ips = [{'subnet_id': subnet_id}]
with self.port(subnet=subnet, fixed_ips=fixed_ips) as port:
if addr_mode == n_const.IPV6_SLAAC:
exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet)
else:
exp_ip_addr = 'fe80::2'
port_fixed_ips = port['port']['fixed_ips']
self.assertEqual(1, len(port_fixed_ips))
self.assertEqual(exp_ip_addr,
port_fixed_ips[0]['ip_address'])
if addr_mode == n_const.IPV6_SLAAC:
exp_ip_addr = self._calc_ipv6_addr_by_EUI64(port, subnet)
self.assertEqual(exp_ip_addr,
port_fixed_ips[0]['ip_address'])
self.assertIn(port_fixed_ips[0]['ip_address'],
netaddr.IPNetwork(subnet['subnet']['cidr']))
def test_create_port_with_ipv6_slaac_subnet_in_fixed_ips(self):
self._test_create_port_with_ipv6_subnet_in_fixed_ips(
@ -2182,10 +2222,11 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_invalid_fixed_ips(self):
with self.subnet() as subnet:
subnet_ip_net = netaddr.IPNetwork(subnet['subnet']['cidr'])
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertIn(ips[0]['ip_address'], subnet_ip_net)
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Test invalid subnet_id
kwargs = {"fixed_ips":
@ -2240,41 +2281,6 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
self.assertEqual(webob.exc.HTTPClientError.code, res.status_int)
def test_requested_split(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
ports_to_delete = []
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
# Allocate specific IP
kwargs = {"fixed_ips": [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.5'}]}
net_id = port['port']['network_id']
res = self._create_port(self.fmt, net_id=net_id, **kwargs)
port2 = self.deserialize(self.fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.5', ips[0]['ip_address'])
self.assertEqual(subnet['subnet']['id'], ips[0]['subnet_id'])
# Allocate specific IP's
allocated = ['10.0.0.3', '10.0.0.4', '10.0.0.6']
for a in allocated:
res = self._create_port(self.fmt, net_id=net_id)
port2 = self.deserialize(self.fmt, res)
ports_to_delete.append(port2)
ips = port2['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(a, ips[0]['ip_address'])
self.assertEqual(subnet['subnet']['id'],
ips[0]['subnet_id'])
for p in ports_to_delete:
self._delete('ports', p['port']['id'])
def test_duplicate_ips(self):
with self.subnet() as subnet:
# Allocate specific IP
@ -2306,7 +2312,9 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
def test_requested_ips_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2',
'subnet_id': subnet['subnet']['id']}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])

View File

@ -559,9 +559,28 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase):
mocks['subnet'].deallocate.assert_called_once_with(auto_ip)
def test_recreate_port_ipam(self):
ip = '10.0.0.2'
with self.subnet() as subnet:
subnet_cidr = subnet['subnet']['cidr']
with self.port(subnet=subnet) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
orig_ip = ips[0]['ip_address']
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(netaddr.IPNetwork(subnet_cidr)))
req = self.new_delete_request('ports', port['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
with self.port(subnet=subnet, fixed_ips=ips) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(orig_ip, ips[0]['ip_address'])
def test_recreate_port_ipam_specific_ip(self):
with self.subnet() as subnet:
ip = '10.0.0.2'
fixed_ip_data = [{'subnet_id': subnet['subnet']['id'],
'ip_address': ip}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual(ip, ips[0]['ip_address'])

View File

@ -23,6 +23,7 @@ from neutron.common import utils
from neutron import context
from neutron.db import db_base_plugin_v2
from neutron.extensions import dns
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -121,10 +122,13 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
self.assertIn('mac_address', port['port'])
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
subnet_db = manager.NeutronManager.get_plugin().get_subnet(
context.get_admin_context(), ips[0]['subnet_id'])
self.assertIn(netaddr.IPAddress(ips[0]['ip_address']),
netaddr.IPSet(netaddr.IPNetwork(subnet_db['cidr'])))
self.assertEqual('myname', port['port']['name'])
self._verify_dns_assigment(port['port'],
ips_list=['10.0.0.2'])
ips_list=[ips[0]['ip_address']])
def test_list_ports(self):
# for this test we need to enable overlapping ips
@ -147,6 +151,7 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
def test_update_port_non_default_dns_domain_with_dns_name(self):
with self.port() as port:
port_ip = port['port']['fixed_ips'][0]['ip_address']
cfg.CONF.set_override('dns_domain', 'example.com')
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
req = self.new_update_request('ports', data, port['port']['id'])
@ -154,18 +159,19 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self._verify_dns_assigment(res['port'],
ips_list=['10.0.0.2'],
ips_list=[port_ip],
dns_name='vm1')
def test_update_port_default_dns_domain_with_dns_name(self):
with self.port() as port:
port_ip = port['port']['fixed_ips'][0]['ip_address']
data = {'port': {'admin_state_up': False, 'dns_name': 'vm1'}}
req = self.new_update_request('ports', data, port['port']['id'])
res = self.deserialize(self.fmt, req.get_response(self.api))
self.assertEqual(data['port']['admin_state_up'],
res['port']['admin_state_up'])
self._verify_dns_assigment(res['port'],
ips_list=['10.0.0.2'])
ips_list=[port_ip])
def _verify_dns_assigment(self, port, ips_list=None, exp_ips_ipv4=0,
exp_ips_ipv6=0, ipv4_cidrs=None, ipv6_cidrs=None,
@ -254,10 +260,10 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
Check that a configured IP 10.0.0.2 is replaced by 10.0.0.10.
"""
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2'}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],
@ -273,10 +279,10 @@ class DnsExtensionTestCase(test_db_base_plugin_v2.TestNetworksV2):
def test_update_port_update_ip_address_only(self):
with self.subnet() as subnet:
with self.port(subnet=subnet) as port:
fixed_ip_data = [{'ip_address': '10.0.0.2'}]
with self.port(subnet=subnet, fixed_ips=fixed_ip_data) as port:
ips = port['port']['fixed_ips']
self.assertEqual(1, len(ips))
self.assertEqual('10.0.0.2', ips[0]['ip_address'])
self.assertEqual(ips[0]['subnet_id'], subnet['subnet']['id'])
data = {'port': {'fixed_ips': [{'subnet_id':
subnet['subnet']['id'],

View File

@ -78,7 +78,8 @@ class ExtraRouteDBTestCaseBase(object):
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
@ -88,8 +89,11 @@ class ExtraRouteDBTestCaseBase(object):
def test_route_update_with_external_route(self):
my_tenant = 'tenant1'
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet:
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as ext_subnet,\
self.port(subnet=ext_subnet) as nexthop_port:
nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': nexthop_ip}]
self._set_net_external(ext_subnet['subnet']['network_id'])
ext_info = {'network_id': ext_subnet['subnet']['network_id']}
with self.router(
@ -101,8 +105,11 @@ class ExtraRouteDBTestCaseBase(object):
def test_route_update_with_route_via_another_tenant_subnet(self):
my_tenant = 'tenant1'
routes = [{'destination': '135.207.0.0/16', 'nexthop': '10.0.1.3'}]
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet:
with self.subnet(cidr='10.0.1.0/24', tenant_id='notme') as subnet,\
self.port(subnet=subnet) as nexthop_port:
nexthop_ip = nexthop_port['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': nexthop_ip}]
with self.router(tenant_id=my_tenant) as r:
body = self._routes_update_prepare(
r['router']['id'], subnet['subnet']['id'], None, routes,
@ -118,7 +125,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
self._routes_update_prepare(r['router']['id'],
None, p['port']['id'], routes)
body = self._update('routers', r['router']['id'],
@ -132,7 +140,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.3'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
@ -156,7 +165,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes)
@ -168,14 +178,17 @@ class ExtraRouteDBTestCaseBase(object):
None, r['router']['id'], [])
def test_routes_update_for_multiple_routers(self):
routes1 = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.0.3'}]
routes2 = [{'destination': '12.0.0.0/8',
'nexthop': '10.0.0.4'}]
with self.router() as r1,\
self.router() as r2,\
self.subnet(cidr='10.0.0.0/24') as s:
with self.port(subnet=s) as p1, self.port(subnet=s) as p2:
with self.port(subnet=s) as p1,\
self.port(subnet=s) as p2:
p1_ip = p1['port']['fixed_ips'][0]['ip_address']
p2_ip = p2['port']['fixed_ips'][0]['ip_address']
routes1 = [{'destination': '135.207.0.0/16',
'nexthop': p2_ip}]
routes2 = [{'destination': '12.0.0.0/8',
'nexthop': p1_ip}]
body = self._routes_update_prepare(r1['router']['id'],
None, p1['port']['id'],
routes1)
@ -204,7 +217,8 @@ class ExtraRouteDBTestCaseBase(object):
'nexthop': '10.0.1.5'}]
with self.router() as r:
with self.subnet(cidr='10.0.1.0/24') as s:
with self.port(subnet=s) as p:
fixed_ip_data = [{'ip_address': '10.0.1.2'}]
with self.port(subnet=s, fixed_ips=fixed_ip_data) as p:
body = self._routes_update_prepare(r['router']['id'],
None, p['port']['id'],
routes_orig)
@ -435,22 +449,23 @@ class ExtraRouteDBTestCaseBase(object):
port_list = self.deserialize('json', port_res)
self.assertEqual(1, len(port_list['ports']))
routes = [{'destination': '135.207.0.0/16',
'nexthop': '10.0.1.3'}]
with self.port(subnet=s) as p:
next_hop = p['port']['fixed_ips'][0]['ip_address']
routes = [{'destination': '135.207.0.0/16',
'nexthop': next_hop}]
body = self._update('routers', r['router']['id'],
{'router': {'routes':
routes}})
body = self._update('routers', r['router']['id'],
{'router': {'routes':
routes}})
body = self._show('routers', r['router']['id'])
self.assertEqual(routes, body['router']['routes'])
body = self._show('routers', r['router']['id'])
self.assertEqual(routes, body['router']['routes'])
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
self._remove_external_gateway_from_router(
r['router']['id'],
s['subnet']['network_id'])
body = self._show('routers', r['router']['id'])
gw_info = body['router']['external_gateway_info']
self.assertIsNone(gw_info)
def test_router_list_with_sort(self):
with self.router(name='router1') as router1,\

View File

@ -2478,11 +2478,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
) as private_port:
fp1 = self._make_floatingip(self.fmt, network_ex_id1,
private_port['port']['id'],
floating_ip='10.0.0.3')
private_port['port']['id'])
fp2 = self._make_floatingip(self.fmt, network_ex_id2,
private_port['port']['id'],
floating_ip='11.0.0.3')
private_port['port']['id'])
self.assertEqual(fp1['floatingip']['router_id'],
r1['router']['id'])
self.assertEqual(fp2['floatingip']['router_id'],
@ -2519,8 +2517,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
) as private_port:
fp = self._make_floatingip(self.fmt, network_ex_id,
private_port['port']['id'],
floating_ip='10.0.0.8')
private_port['port']['id'])
self.assertEqual(r1['router']['id'],
fp['floatingip']['router_id'])
@ -2681,8 +2678,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n):
self._set_net_external(n['network']['id'])
fip = self._make_floatingip(self.fmt, n['network']['id'])
self.assertEqual('192.168.1.2',
fip['floatingip']['floating_ip_address'])
fip_set = netaddr.IPSet(netaddr.IPNetwork("192.168.1.0/24"))
fip_ip = fip['floatingip']['floating_ip_address']
self.assertTrue(netaddr.IPAddress(fip_ip) in fip_set)
def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
with self.subnet() as public_sub:

View File

@ -356,20 +356,16 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
return ipam_subnet.allocate(address_request)
def test_allocate_any_v4_address_succeeds(self):
ip_address = self._allocate_address(
'10.0.0.0/24', 4, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
self.assertEqual('10.0.0.2', ip_address)
self._test_allocate_any_address_succeeds('10.0.0.0/24', 4)
def test_allocate_any_v6_address_succeeds(self):
self._test_allocate_any_address_succeeds('fde3:abcd:4321:1::/64', 6)
def _test_allocate_any_address_succeeds(self, subnet_cidr, ip_version):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
self.assertEqual('fde3:abcd:4321:1::2', ip_address)
subnet_cidr, ip_version, ipam_req.AnyAddressRequest)
self.assertIn(netaddr.IPAddress(ip_address),
netaddr.IPSet(netaddr.IPNetwork(subnet_cidr)))
def test_allocate_specific_v4_address_succeeds(self):
ip_address = self._allocate_address(

View File

@ -636,7 +636,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
arg_list=(portbindings.HOST_ID,),
**host_arg) as port1:
p1 = port1['port']
p1_ip = p1['fixed_ips'][0]['ip_address']
self.mock_fanout.reset_mock()
device = 'tap' + p1['id']
@ -667,7 +667,7 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
'20.0.0.1': [
l2pop_rpc.PortInfo('00:00:00:00:00:00',
'0.0.0.0'),
l2pop_rpc.PortInfo(new_mac, '10.0.0.2')
l2pop_rpc.PortInfo(new_mac, p1_ip)
]
}
}
@ -680,9 +680,12 @@ class TestL2PopulationRpcTestCase(test_plugin.Ml2PluginV2TestCase):
with self.subnet(network=self._network) as subnet:
host_arg = {portbindings.HOST_ID: HOST}
fixed_ips = [{'subnet_id': subnet['subnet']['id'],
'ip_address': '10.0.0.2'}]
with self.port(subnet=subnet, cidr='10.0.0.0/24',
device_owner=DEVICE_OWNER_COMPUTE,
arg_list=(portbindings.HOST_ID,),
fixed_ips=fixed_ips,
**host_arg) as port1:
p1 = port1['port']

View File

@ -20,6 +20,7 @@ from neutron import context
from neutron.db import dns_db
from neutron.extensions import dns
from neutron.extensions import providernet as pnet
from neutron import manager
from neutron.plugins.ml2 import config
from neutron.plugins.ml2.extensions import dns_integration
from neutron.tests.unit.plugins.ml2 import test_plugin
@ -51,6 +52,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
super(DNSIntegrationTestCase, self).setUp()
dns_integration.DNS_DRIVER = None
dns_integration.subscribe()
self.plugin = manager.NeutronManager.get_plugin()
def _create_port_for_test(self, provider_net=True, dns_domain=True,
dns_name=True, ipv4=True, ipv6=True):
@ -69,11 +71,13 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
**net_kwargs)
network = self.deserialize(self.fmt, res)
if ipv4:
self._create_subnet(self.fmt, network['network']['id'],
'10.0.0.0/24', ip_version=4)
cidr = '10.0.0.0/24'
self._create_subnet_for_test(network['network']['id'], cidr)
if ipv6:
self._create_subnet(self.fmt, network['network']['id'],
'fd3d:bdd4:da60::/64', ip_version=6)
cidr = 'fd3d:bdd4:da60::/64'
self._create_subnet_for_test(network['network']['id'], cidr)
port_kwargs = {}
if dns_name:
port_kwargs = {
@ -90,6 +94,19 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
port_id=port['id']).one_or_none()
return network['network'], port, dns_data_db
def _create_subnet_for_test(self, network_id, cidr):
ip_net = netaddr.IPNetwork(cidr)
# initialize the allocation_pool to the lower half of the subnet
subnet_size = ip_net.last - ip_net.first
subnet_mid_point = int(ip_net.first + subnet_size / 2)
start, end = (netaddr.IPAddress(ip_net.first + 2),
netaddr.IPAddress(subnet_mid_point))
allocation_pools = [{'start': str(start),
'end': str(end)}]
return self._create_subnet(self.fmt, network_id,
str(ip_net), ip_version=ip_net.ip.version,
allocation_pools=allocation_pools)
def _update_port_for_test(self, port, new_dns_name=NEWDNSNAME,
**kwargs):
mock_client.reset_mock()
@ -100,7 +117,7 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
recordsets = []
if records_v4:
recordsets.append({'id': V4UUID, 'records': records_v4})
if records_v4:
if records_v6:
recordsets.append({'id': V6UUID, 'records': records_v6})
mock_client.recordsets.list.return_value = recordsets
mock_admin_client.reset_mock()
@ -362,10 +379,26 @@ class DNSIntegrationTestCase(test_plugin.Ml2PluginV2TestCase):
config.cfg.CONF.set_override('dns_domain', DNSDOMAIN)
net, port, dns_data_db = self._create_port_for_test()
original_ips = [ip['ip_address'] for ip in port['fixed_ips']]
ctx = context.get_admin_context()
kwargs = {'fixed_ips': []}
for ip in port['fixed_ips']:
# Since this tests using an "any" IP allocation to update the port
# IP address, change the allocation pools so that IPAM won't ever
# give us back the IP address we originally had.
subnet = self.plugin.get_subnet(ctx, ip['subnet_id'])
ip_net = netaddr.IPNetwork(subnet['cidr'])
subnet_size = ip_net.last - ip_net.first
subnet_mid_point = int(ip_net.first + subnet_size / 2)
start, end = (netaddr.IPAddress(subnet_mid_point + 1),
netaddr.IPAddress(ip_net.last - 1))
allocation_pools = [{'start': str(start), 'end': str(end)}]
body = {'allocation_pools': allocation_pools}
req = self.new_update_request('subnets', {'subnet': body},
ip['subnet_id'])
req.get_response(self.api)
kwargs['fixed_ips'].append(
{'subnet_id': ip['subnet_id']})
port, dns_data_db = self._update_port_for_test(port,
new_dns_name=None,
**kwargs)

View File

@ -599,7 +599,9 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
def test_update_port_fixed_ip_changed(self):
ctx = context.get_admin_context()
plugin = manager.NeutronManager.get_plugin()
with self.port() as port, mock.patch.object(
fixed_ip_data = [{'ip_address': '10.0.0.4'}]
with self.port(fixed_ips=fixed_ip_data) as port,\
mock.patch.object(
plugin.notifier,
'security_groups_member_updated') as sg_member_update:
port['port']['fixed_ips'][0]['ip_address'] = '10.0.0.3'
@ -869,14 +871,20 @@ class TestMl2PortsV2(test_plugin.TestPortsV2, Ml2PluginV2TestCase):
self._test_operation_resillient_to_ipallocation_failure(make_port)
def test_port_update_resillient_to_duplicate_records(self):
with self.port() as p:
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}}
req = self.new_update_request('ports', data, p['port']['id'])
cidr = '10.0.0.0/24'
allocation_pools = [{'start': '10.0.0.2', 'end': '10.0.0.8'}]
with self.subnet(cidr=cidr,
allocation_pools=allocation_pools) as subnet:
with self.port(subnet=subnet) as p:
data = {'port': {'fixed_ips': [{'ip_address': '10.0.0.9'}]}}
req = self.new_update_request('ports', data, p['port']['id'])
def do_request():
self.assertEqual(200, req.get_response(self.api).status_int)
def do_request():
self.assertEqual(200,
req.get_response(self.api).status_int)
self._test_operation_resillient_to_ipallocation_failure(do_request)
self._test_operation_resillient_to_ipallocation_failure(
do_request)
def _test_operation_resillient_to_ipallocation_failure(self, func):
from sqlalchemy import event