From 663c727f993106f654f8087f9d5122cee94bf466 Mon Sep 17 00:00:00 2001 From: Nate Johnston Date: Mon, 25 Mar 2019 10:29:23 -0400 Subject: [PATCH] Allow first address in an IPv6 subnet as valid unicast When looking at the RFC [1], there's no mention that this can't be the gateway address. Permit it. [1] https://tools.ietf.org/html/rfc4291#section-2.6.1 Change-Id: I3f2905c2c4fca02406dfa3c801c166c14389ba41 Fixes-Bug: #1682094 (cherry picked from commit 1916bc5c0627ca24e312ca55ea62c8df7eb8f158) --- neutron/db/ipam_backend_mixin.py | 11 ++- neutron/db/ipam_pluggable_backend.py | 1 + neutron/ipam/utils.py | 27 ++++-- .../tests/unit/db/test_db_base_plugin_v2.py | 90 ++++++++++++++----- .../unit/db/test_ipam_pluggable_backend.py | 1 + neutron/tests/unit/ipam/test_utils.py | 2 +- ...rst_ip_address_valid-cd94b47bdcc642cf.yaml | 6 ++ 7 files changed, 106 insertions(+), 32 deletions(-) create mode 100644 releasenotes/notes/ipv6_first_ip_address_valid-cd94b47bdcc642cf.yaml diff --git a/neutron/db/ipam_backend_mixin.py b/neutron/db/ipam_backend_mixin.py index 3b39387491d..c4145e6b957 100644 --- a/neutron/db/ipam_backend_mixin.py +++ b/neutron/db/ipam_backend_mixin.py @@ -58,7 +58,10 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon): @staticmethod def _gateway_ip_str(subnet, cidr_net): if subnet.get('gateway_ip') is const.ATTR_NOT_SPECIFIED: - return str(netaddr.IPNetwork(cidr_net).network + 1) + if subnet.get('version') == const.IP_VERSION_6: + return str(netaddr.IPNetwork(cidr_net).network) + else: + return str(netaddr.IPNetwork(cidr_net).network + 1) return subnet.get('gateway_ip') @staticmethod @@ -375,7 +378,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon): # Ensure that the IP is valid on the subnet if ('ip_address' in fixed and not ipam_utils.check_subnet_ip(subnet['cidr'], - fixed['ip_address'])): + fixed['ip_address'], + fixed['device_owner'])): raise exc.InvalidIpForSubnet(ip_address=fixed['ip_address']) return subnet @@ -385,7 +389,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon): for subnet in subnets: if ipam_utils.check_subnet_ip(subnet['cidr'], - fixed['ip_address']): + fixed['ip_address'], + fixed['device_owner']): return subnet raise exc.InvalidIpForNetwork(ip_address=fixed['ip_address']) diff --git a/neutron/db/ipam_pluggable_backend.py b/neutron/db/ipam_pluggable_backend.py index 7ad83b0daf2..893e8ee2f7b 100644 --- a/neutron/db/ipam_pluggable_backend.py +++ b/neutron/db/ipam_pluggable_backend.py @@ -262,6 +262,7 @@ class IpamPluggableBackend(ipam_backend_mixin.IpamBackendMixin): """ fixed_ip_list = [] for fixed in fixed_ips: + fixed['device_owner'] = device_owner subnet = self._get_subnet_for_fixed_ip(context, fixed, subnets) is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet) diff --git a/neutron/ipam/utils.py b/neutron/ipam/utils.py index c3097ab330c..e8d520deae4 100644 --- a/neutron/ipam/utils.py +++ b/neutron/ipam/utils.py @@ -16,16 +16,27 @@ import netaddr from neutron_lib import constants +from neutron.common import constants as n_const -def check_subnet_ip(cidr, ip_address): + +def check_subnet_ip(cidr, ip_address, port_owner=None): """Validate that the IP address is on the subnet.""" ip = netaddr.IPAddress(ip_address) net = netaddr.IPNetwork(cidr) - # Check that the IP is valid on subnet. This cannot be the - # network or the broadcast address (which exists only in IPv4) - return (ip != net.network and - (net.version == 6 or ip != net[-1]) and - net.netmask & ip == net.network) + # Check that the IP is valid on subnet. In IPv4 this cannot be the + # network or the broadcast address + if net.version == constants.IP_VERSION_6: + # NOTE(njohnston): In some cases the code cannot know the owner of the + # port. In these cases port_owner should be None, and we pass it + # through here. + return ((port_owner in n_const.ROUTER_PORT_OWNERS or + port_owner is None or + ip != net.network) and + net.netmask & ip == net.network) + else: + return (ip != net.network and + ip != net[-1] and + net.netmask & ip == net.network) def check_gateway_invalid_in_subnet(cidr, gateway): @@ -38,8 +49,8 @@ def check_gateway_invalid_in_subnet(cidr, gateway): # If gateway is out of subnet, there is no way to # check since we don't have gateway's subnet cidr. return (ip in net and - (ip == net.network or - (net.version == constants.IP_VERSION_4 and ip == net[-1]))) + (net.version == constants.IP_VERSION_4 and + ip in (net.network, net[-1]))) def generate_pools(cidr, gateway_ip): diff --git a/neutron/tests/unit/db/test_db_base_plugin_v2.py b/neutron/tests/unit/db/test_db_base_plugin_v2.py index faa0587e948..6debc6bba07 100644 --- a/neutron/tests/unit/db/test_db_base_plugin_v2.py +++ b/neutron/tests/unit/db/test_db_base_plugin_v2.py @@ -814,6 +814,15 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): self.assertEqual(expected_res[k], observed_res[res_name][k]) def _validate_resource(self, resource, keys, res_name): + ipv6_zero_gateway = False + ipv6_null_gateway = False + if res_name == 'subnet': + attrs = resource[res_name] + if not attrs['gateway_ip']: + ipv6_null_gateway = True + elif (attrs['ip_version'] is constants.IP_VERSION_6 and + attrs['gateway_ip'][-2:] == "::"): + ipv6_zero_gateway = True for k in keys: self.assertIn(k, resource[res_name]) if isinstance(keys[k], list): @@ -821,7 +830,12 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase): sorted(keys[k], key=helpers.safe_sort_key), sorted(resource[res_name][k], key=helpers.safe_sort_key)) else: - self.assertEqual(keys[k], resource[res_name][k]) + if not ipv6_null_gateway: + if (k == 'gateway_ip' and ipv6_zero_gateway and + keys[k][-3:] == "::0"): + self.assertEqual(keys[k][:-1], resource[res_name][k]) + else: + self.assertEqual(keys[k], resource[res_name][k]) class TestBasicGet(NeutronDbPluginV2TestCase): @@ -4267,23 +4281,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): self.assertEqual(cidr, subnet['subnet']['cidr']) - def test_create_subnet_ipv6_gw_is_nw_addr_returns_400(self): - gateway_ip = '2001::0' - cidr = '2001::/64' - - with testlib_api.ExpectedException( - webob.exc.HTTPClientError) as ctx_manager: - self._test_create_subnet( - gateway_ip=gateway_ip, cidr=cidr, - ip_version=constants.IP_VERSION_6, - ipv6_ra_mode=constants.DHCPV6_STATEFUL, - ipv6_address_mode=constants.DHCPV6_STATEFUL) - self.assertEqual(webob.exc.HTTPClientError.code, - ctx_manager.exception.code) - - def test_create_subnet_ipv6_gw_is_nw_end_addr_returns_201(self): - gateway_ip = '2001::ffff' - cidr = '2001::/112' + def _create_subnet_ipv6_gw(self, gateway_ip, cidr): subnet = self._test_create_subnet( gateway_ip=gateway_ip, cidr=cidr, ip_version=constants.IP_VERSION_6, @@ -4291,11 +4289,30 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): ipv6_address_mode=constants.DHCPV6_STATEFUL) self.assertEqual(constants.IP_VERSION_6, subnet['subnet']['ip_version']) - self.assertEqual(gateway_ip, - subnet['subnet']['gateway_ip']) + if gateway_ip and gateway_ip[-3:] == '::0': + self.assertEqual(gateway_ip[:-1], + subnet['subnet']['gateway_ip']) + else: + self.assertEqual(gateway_ip, + subnet['subnet']['gateway_ip']) self.assertEqual(cidr, subnet['subnet']['cidr']) + def test_create_subnet_ipv6_gw_is_nw_start_addr(self): + gateway_ip = '2001::0' + cidr = '2001::/64' + self._create_subnet_ipv6_gw(gateway_ip, cidr) + + def test_create_subnet_ipv6_gw_is_nw_start_addr_canonicalize(self): + gateway_ip = '2001::' + cidr = '2001::/64' + self._create_subnet_ipv6_gw(gateway_ip, cidr) + + def test_create_subnet_ipv6_gw_is_nw_end_addr(self): + gateway_ip = '2001::ffff' + cidr = '2001::/112' + self._create_subnet_ipv6_gw(gateway_ip, cidr) + def test_create_subnet_ipv6_out_of_cidr_lla(self): gateway_ip = 'fe80::1' cidr = '2001::/64' @@ -4305,6 +4322,39 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase): ipv6_ra_mode=constants.IPV6_SLAAC, ipv6_address_mode=constants.IPV6_SLAAC) + def test_create_subnet_ipv6_first_ip_owned_by_router(self): + cidr = '2001::/64' + with self.network() as network: + net_id = network['network']['id'] + with self.subnet(network=network, + ip_version=constants.IP_VERSION_6, + cidr=cidr) as subnet: + fixed_ip = [{'subnet_id': subnet['subnet']['id'], + 'ip_address': '2001::'}] + kwargs = {'fixed_ips': fixed_ip, + 'tenant_id': 'tenant_id', + 'device_id': 'fake_device', + 'device_owner': constants.DEVICE_OWNER_ROUTER_GW} + res = self._create_port(self.fmt, net_id=net_id, **kwargs) + self.assertEqual(webob.exc.HTTPCreated.code, res.status_int) + + def test_create_subnet_ipv6_first_ip_owned_by_non_router(self): + cidr = '2001::/64' + with self.network() as network: + net_id = network['network']['id'] + with self.subnet(network=network, + ip_version=constants.IP_VERSION_6, + cidr=cidr) as subnet: + fixed_ip = [{'subnet_id': subnet['subnet']['id'], + 'ip_address': '2001::'}] + kwargs = {'fixed_ips': fixed_ip, + 'tenant_id': 'tenant_id', + 'device_id': 'fake_device', + 'device_owner': 'fake_owner'} + res = self._create_port(self.fmt, net_id=net_id, **kwargs) + self.assertEqual(webob.exc.HTTPClientError.code, + res.status_int) + def test_create_subnet_ipv6_attributes_no_dhcp_enabled(self): gateway_ip = 'fe80::1' cidr = 'fe80::/64' diff --git a/neutron/tests/unit/db/test_ipam_pluggable_backend.py b/neutron/tests/unit/db/test_ipam_pluggable_backend.py index 80ef3b6a1a1..c17b054da6c 100644 --- a/neutron/tests/unit/db/test_ipam_pluggable_backend.py +++ b/neutron/tests/unit/db/test_ipam_pluggable_backend.py @@ -730,6 +730,7 @@ class TestDbBasePluginIpam(test_db_base.NeutronDbPluginV2TestCase): port_dict_with_id = port_dict['port'].copy() port_dict_with_id['id'] = port_id # Validate port id is added to port dict before address_factory call + ip_dict.pop('device_owner') address_factory.get_request.assert_called_once_with(context, port_dict_with_id, ip_dict) diff --git a/neutron/tests/unit/ipam/test_utils.py b/neutron/tests/unit/ipam/test_utils.py index db2ee9c5135..50dd565ebdd 100644 --- a/neutron/tests/unit/ipam/test_utils.py +++ b/neutron/tests/unit/ipam/test_utils.py @@ -32,7 +32,7 @@ class TestIpamUtils(base.BaseTestCase): self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.254')) def test_check_subnet_ip_v6_network(self): - self.assertFalse(utils.check_subnet_ip('F111::0/64', 'F111::0')) + self.assertTrue(utils.check_subnet_ip('F111::0/64', 'F111::0')) def test_check_subnet_ip_v6_valid(self): self.assertTrue(utils.check_subnet_ip('F111::0/64', 'F111::1')) diff --git a/releasenotes/notes/ipv6_first_ip_address_valid-cd94b47bdcc642cf.yaml b/releasenotes/notes/ipv6_first_ip_address_valid-cd94b47bdcc642cf.yaml new file mode 100644 index 00000000000..a22ebd75aa0 --- /dev/null +++ b/releasenotes/notes/ipv6_first_ip_address_valid-cd94b47bdcc642cf.yaml @@ -0,0 +1,6 @@ +--- +upgrade: + - | + The first address in an IPv6 network is now a valid, usable IP for routers. + It had previously been reserved, but now can be assigned to a router so + that an IPv6 address ending in "::" could be a valid default route.