Merge "Use last address in v6 allocation pool generation"

This commit is contained in:
Jenkins 2015-06-30 00:33:33 +00:00 committed by Gerrit Code Review
commit bbefa3136e
7 changed files with 134 additions and 48 deletions

View File

@ -13,8 +13,6 @@
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo_config import cfg
from oslo_log import log as logging
from sqlalchemy.orm import exc
@ -25,7 +23,6 @@ from neutron.common import exceptions as n_exc
from neutron.common import utils
from neutron.db import common_db_mixin
from neutron.db import models_v2
from neutron.ipam import utils as ipam_utils
LOG = logging.getLogger(__name__)
@ -75,14 +72,6 @@ class DbBasePluginCommon(common_db_mixin.CommonDbMixin):
)
context.session.add(allocated)
@classmethod
def _check_gateway_in_subnet(cls, cidr, gateway):
"""Validate that the gateway is on the subnet."""
ip = netaddr.IPAddress(gateway)
if ip.version == 4 or (ip.version == 6 and not ip.is_link_local()):
return ipam_utils.check_subnet_ip(cidr, gateway)
return True
def _make_subnet_dict(self, subnet, fields=None):
res = {'id': subnet['id'],
'name': subnet['name'],

View File

@ -355,7 +355,7 @@ class NeutronDbPluginV2(ipam_non_pluggable_backend.IpamNonPluggableBackend,
if attributes.is_attr_set(s.get('gateway_ip')):
self._validate_ip_version(ip_ver, s['gateway_ip'], 'gateway_ip')
if (cfg.CONF.force_gateway_on_subnet and
not self._check_gateway_in_subnet(
not ipam.utils.check_gateway_in_subnet(
s['cidr'], s['gateway_ip'])):
error_message = _("Gateway is not valid on subnet")
raise n_exc.InvalidInput(error_message=error_message)

View File

@ -27,6 +27,7 @@ from neutron.common import ipv6_utils
from neutron.db import db_base_plugin_common
from neutron.db import models_v2
from neutron.i18n import _LI
from neutron.ipam import utils as ipam_utils
LOG = logging.getLogger(__name__)
@ -174,24 +175,10 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
a list of dict objects with 'start' and 'end' keys for
defining the pool range.
"""
pools = []
# Auto allocate the pool around gateway_ip
net = netaddr.IPNetwork(subnet['cidr'])
first_ip = net.first + 1
last_ip = net.last - 1
gw_ip = int(netaddr.IPAddress(subnet['gateway_ip'] or net.last))
# Use the gw_ip to find a point for splitting allocation pools
# for this subnet
split_ip = min(max(gw_ip, net.first), net.last)
if split_ip > first_ip:
pools.append({'start': str(netaddr.IPAddress(first_ip)),
'end': str(netaddr.IPAddress(split_ip - 1))})
if split_ip < last_ip:
pools.append({'start': str(netaddr.IPAddress(split_ip + 1)),
'end': str(netaddr.IPAddress(last_ip))})
# return auto-generated pools
# no need to check for their validity
return pools
pools = ipam_utils.generate_pools(subnet['cidr'], subnet['gateway_ip'])
return [{'start': str(netaddr.IPAddress(pool.first)),
'end': str(netaddr.IPAddress(pool.last))}
for pool in pools]
def _validate_subnet_cidr(self, context, network, new_subnet_cidr):
"""Validate the CIDR for a subnet.
@ -251,7 +238,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
"""
subnet = netaddr.IPNetwork(subnet_cidr)
subnet_first_ip = netaddr.IPAddress(subnet.first + 1)
subnet_last_ip = netaddr.IPAddress(subnet.last - 1)
# last address is broadcast in v4
subnet_last_ip = netaddr.IPAddress(subnet.last - (subnet.version == 4))
LOG.debug("Performing IP validity checks on allocation pools")
ip_sets = []

View File

@ -21,28 +21,38 @@ def check_subnet_ip(cidr, ip_address):
ip = netaddr.IPAddress(ip_address)
net = netaddr.IPNetwork(cidr)
# Check that the IP is valid on subnet. This cannot be the
# network or the broadcast address
return (ip != net.network and ip != net.broadcast
# network or the broadcast address (which exists only in IPv4)
return (ip != net.network
and (net.version == 6 or ip != net.broadcast)
and net.netmask & ip == net.network)
def check_gateway_in_subnet(cidr, gateway):
"""Validate that the gateway is on the subnet."""
ip = netaddr.IPAddress(gateway)
if ip.version == 4 or (ip.version == 6 and not ip.is_link_local()):
return check_subnet_ip(cidr, gateway)
return True
def generate_pools(cidr, gateway_ip):
"""Create IP allocation pools for a specified subnet
The Neutron API defines a subnet's allocation pools as a list of
IPRange objects for defining the pool range.
"""
pools = []
# Auto allocate the pool around gateway_ip
net = netaddr.IPNetwork(cidr)
if net.first == net.last:
# handle single address subnet case
return [netaddr.IPRange(net.first, net.last)]
first_ip = net.first + 1
last_ip = net.last - 1
gw_ip = int(netaddr.IPAddress(gateway_ip or net.last))
# Use the gw_ip to find a point for splitting allocation pools
# for this subnet
split_ip = min(max(gw_ip, net.first), net.last)
if split_ip > first_ip:
pools.append(netaddr.IPRange(first_ip, split_ip - 1))
if split_ip < last_ip:
pools.append(netaddr.IPRange(split_ip + 1, last_ip))
return pools
# last address is broadcast in v4
last_ip = net.last - (net.version == 4)
if first_ip >= last_ip:
# /31 lands here
return []
ipset = netaddr.IPSet(netaddr.IPRange(first_ip, last_ip))
if gateway_ip:
ipset.remove(netaddr.IPAddress(gateway_ip))
return list(ipset.iter_ipranges())

View File

@ -1344,7 +1344,7 @@ fixed_ips=ip_address%%3D%s&fixed_ips=ip_address%%3D%s&fixed_ips=subnet_id%%3D%s
data['port']['fixed_ips'])
def test_no_more_port_exception(self):
with self.subnet(cidr='10.0.0.0/32', enable_dhcp=False) as subnet:
with self.subnet(cidr='10.0.0.0/31', enable_dhcp=False) as subnet:
id = subnet['subnet']['network_id']
res = self._create_port(self.fmt, id)
data = self.deserialize(self.fmt, res)
@ -3366,9 +3366,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
def test_create_subnet_ipv6_gw_values(self):
cidr = '2001::/64'
# Gateway is last IP in IPv6 DHCPv6 stateful subnet
gateway = '2001::ffff:ffff:ffff:fffe'
gateway = '2001::ffff:ffff:ffff:ffff'
allocation_pools = [{'start': '2001::1',
'end': '2001::ffff:ffff:ffff:fffd'}]
'end': '2001::ffff:ffff:ffff:fffe'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
@ -3379,7 +3379,7 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
# Gateway is first IP in IPv6 DHCPv6 stateful subnet
gateway = '2001::1'
allocation_pools = [{'start': '2001::2',
'end': '2001::ffff:ffff:ffff:fffe'}]
'end': '2001::ffff:ffff:ffff:ffff'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}

View File

@ -146,6 +146,23 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
self.assertEqual(detail.gateway_ip,
netaddr.IPAddress('10.1.2.254'))
def test_allocate_specific_ipv6_subnet_specific_gateway(self):
# Same scenario as described in bug #1466322
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['2210::/64'],
64, 6)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'2210::/64',
'2210::ffff:ffff:ffff:ffff')
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip,
netaddr.IPAddress('2210::ffff:ffff:ffff:ffff'))
def test__allocation_value_for_tenant_no_allocations(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/16', '192.168.1.0/24'],

View File

@ -0,0 +1,82 @@
# Copyright (c) 2015 Mirantis, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron.ipam import utils
from neutron.tests import base
class TestIpamUtils(base.BaseTestCase):
def test_check_subnet_ip_v4_network(self):
self.assertFalse(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.0'))
def test_check_subnet_ip_v4_broadcast(self):
self.assertFalse(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.255'))
def test_check_subnet_ip_v4_valid(self):
self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.1'))
self.assertTrue(utils.check_subnet_ip('1.1.1.0/24', '1.1.1.254'))
def test_check_subnet_ip_v6_network(self):
self.assertFalse(utils.check_subnet_ip('F111::0/64', 'F111::0'))
def test_check_subnet_ip_v6_valid(self):
self.assertTrue(utils.check_subnet_ip('F111::0/64', 'F111::1'))
self.assertTrue(utils.check_subnet_ip('F111::0/64',
'F111::FFFF:FFFF:FFFF:FFFF'))
def test_generate_pools_v4_nogateway(self):
cidr = '192.168.0.0/24'
expected = [netaddr.IPRange('192.168.0.1', '192.168.0.254')]
self.assertEqual(expected, utils.generate_pools(cidr, None))
def test_generate_pools_v4_gateway_first(self):
cidr = '192.168.0.0/24'
gateway = '192.168.0.1'
expected = [netaddr.IPRange('192.168.0.2', '192.168.0.254')]
self.assertEqual(expected, utils.generate_pools(cidr, gateway))
def test_generate_pools_v4_gateway_last(self):
cidr = '192.168.0.0/24'
gateway = '192.168.0.254'
expected = [netaddr.IPRange('192.168.0.1', '192.168.0.253')]
self.assertEqual(expected, utils.generate_pools(cidr, gateway))
def test_generate_pools_v4_32(self):
# 32 is special because it should have 1 usable address
cidr = '192.168.0.0/32'
expected = [netaddr.IPRange('192.168.0.0', '192.168.0.0')]
self.assertEqual(expected, utils.generate_pools(cidr, None))
def test_generate_pools_v4_31(self):
cidr = '192.168.0.0/31'
expected = []
self.assertEqual(expected, utils.generate_pools(cidr, None))
def test_generate_pools_v4_gateway_middle(self):
cidr = '192.168.0.0/24'
gateway = '192.168.0.128'
expected = [netaddr.IPRange('192.168.0.1', '192.168.0.127'),
netaddr.IPRange('192.168.0.129', '192.168.0.254')]
self.assertEqual(expected, utils.generate_pools(cidr, gateway))
def test_generate_pools_v6_nogateway(self):
# other than the difference in the last address, the rest of the
# logic is the same as v4 so we only need one test
cidr = 'F111::0/64'
expected = [netaddr.IPRange('F111::1', 'F111::FFFF:FFFF:FFFF:FFFF')]
self.assertEqual(expected, utils.generate_pools(cidr, None))