IPAM reference driver

An alternate pluggable IPAM implementation from the built-in one
in db_base_plugin_v2.
Modifies IPAM interface to allow passing context to driver and
introduces new interface method 'associate_neutron_subnet'.

Implements blueprint reference-ipam-driver

Change-Id: I2e1e9fc7994bf1157bcd34b7ea500eb30c61d9ab
This commit is contained in:
Salvatore Orlando 2015-01-16 10:00:42 -08:00 committed by Hosung Hwang
parent 41e289af36
commit b892df1126
23 changed files with 1781 additions and 100 deletions

View File

@ -69,3 +69,11 @@ def is_auto_address_subnet(subnet):
modes = [constants.IPV6_SLAAC, constants.DHCPV6_STATELESS]
return (subnet['ipv6_address_mode'] in modes
or subnet['ipv6_ra_mode'] in modes)
def is_eui64_address(ip_address):
"""Check if ip address is EUI64."""
ip = netaddr.IPAddress(ip_address)
# '0xfffe' addition is used to build EUI-64 from MAC (RFC4291)
# Look for it in the middle of the EUI-64 part of address
return ip.version == 6 and not ((ip & 0xffff000000) ^ 0xfffe000000)

View File

@ -38,6 +38,7 @@ from neutron.extensions import l3
from neutron.i18n import _LE, _LI
from neutron import ipam
from neutron.ipam import subnet_alloc
from neutron.ipam import utils as ipam_utils
from neutron import manager
from neutron import neutron_plugin_base_v2
from neutron.openstack.common import uuidutils
@ -332,22 +333,9 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
"""Validate that the gateway is on the subnet."""
ip = netaddr.IPAddress(gateway)
if ip.version == 4 or (ip.version == 6 and not ip.is_link_local()):
return cls._check_subnet_ip(cidr, gateway)
return ipam_utils.check_subnet_ip(cidr, gateway)
return True
@classmethod
def _check_subnet_ip(cls, cidr, ip_address):
"""Validate that the IP address is on the subnet."""
ip = netaddr.IPAddress(ip_address)
net = netaddr.IPNetwork(cidr)
# Check that the IP is valid on subnet. This cannot be the
# network or the broadcast address
if (ip != net.network and
ip != net.broadcast and
net.netmask & ip == net.network):
return True
return False
@staticmethod
def _check_ip_in_allocation_pool(context, subnet_id, gateway_ip,
ip_address):
@ -395,8 +383,8 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
filter = {'network_id': [network_id]}
subnets = self.get_subnets(context, filters=filter)
for subnet in subnets:
if self._check_subnet_ip(subnet['cidr'],
fixed['ip_address']):
if ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address']):
found = True
subnet_id = subnet['id']
break
@ -425,8 +413,8 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
# Ensure that the IP is valid on the subnet
if (not found and
not self._check_subnet_ip(subnet['cidr'],
fixed['ip_address'])):
not ipam_utils.check_subnet_ip(subnet['cidr'],
fixed['ip_address'])):
raise n_exc.InvalidIpForSubnet(
ip_address=fixed['ip_address'])
if (is_auto_addr_subnet and
@ -1228,10 +1216,10 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
'name': subnet['name'],
'network_id': subnet['network_id'],
'ip_version': subnet['ip_version'],
'cidr': str(detail.subnet.cidr),
'cidr': str(detail.subnet_cidr),
'subnetpool_id': subnetpool_id,
'enable_dhcp': subnet['enable_dhcp'],
'gateway_ip': self._gateway_ip_str(subnet, detail.subnet),
'gateway_ip': self._gateway_ip_str(subnet, detail.subnet_cidr),
'shared': shared}
if subnet['ip_version'] == 6 and subnet['enable_dhcp']:
if attributes.is_attr_set(subnet['ipv6_ra_mode']):
@ -1290,10 +1278,10 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
raise n_exc.BadRequest(resource='subnets', msg=reason)
network = self._get_network(context, s["network_id"])
allocator = subnet_alloc.SubnetAllocator(subnetpool)
allocator = subnet_alloc.SubnetAllocator(subnetpool, context)
req = self._make_subnet_request(tenant_id, s, subnetpool)
ipam_subnet = allocator.allocate_subnet(context.session, req)
ipam_subnet = allocator.allocate_subnet(req)
detail = ipam_subnet.get_details()
subnet = self._save_subnet(context,
network,

View File

@ -0,0 +1,72 @@
# Copyright 2015 OpenStack Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""neutrodb_ipam
Revision ID: 599c6a226151
Revises: 354db87e3225
Create Date: 2015-03-08 18:12:08.962378
"""
# revision identifiers, used by Alembic.
revision = '599c6a226151'
down_revision = '354db87e3225'
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
'ipamsubnets',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('neutron_subnet_id', sa.String(length=36), nullable=True),
sa.PrimaryKeyConstraint('id'))
op.create_table(
'ipamallocations',
sa.Column('ip_address', sa.String(length=64), nullable=False),
sa.Column('status', sa.String(length=36), nullable=True),
sa.Column('ipam_subnet_id', sa.String(length=36), nullable=False),
sa.ForeignKeyConstraint(['ipam_subnet_id'],
['ipamsubnets.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('ip_address', 'ipam_subnet_id'))
op.create_table(
'ipamallocationpools',
sa.Column('id', sa.String(length=36), nullable=False),
sa.Column('ipam_subnet_id', sa.String(length=36), nullable=False),
sa.Column('first_ip', sa.String(length=64), nullable=False),
sa.Column('last_ip', sa.String(length=64), nullable=False),
sa.ForeignKeyConstraint(['ipam_subnet_id'],
['ipamsubnets.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('id'))
op.create_table(
'ipamavailabilityranges',
sa.Column('allocation_pool_id', sa.String(length=36), nullable=False),
sa.Column('first_ip', sa.String(length=64), nullable=False),
sa.Column('last_ip', sa.String(length=64), nullable=False),
sa.ForeignKeyConstraint(['allocation_pool_id'],
['ipamallocationpools.id'],
ondelete='CASCADE'),
sa.PrimaryKeyConstraint('allocation_pool_id', 'first_ip', 'last_ip'),
sa.Index('ix_ipamavailabilityranges_first_ip_allocation_pool_id',
'first_ip', 'allocation_pool_id'),
sa.Index('ix_ipamavailabilityranges_last_ip_allocation_pool_id',
'last_ip', 'allocation_pool_id'))

View File

@ -1 +1 @@
354db87e3225
599c6a226151

View File

@ -42,6 +42,7 @@ from neutron.db import portsecurity_db # noqa
from neutron.db import quota_db # noqa
from neutron.db import securitygroups_db # noqa
from neutron.db import servicetype_db # noqa
from neutron.ipam.drivers.neutrondb_ipam import db_models # noqa
from neutron.plugins.bigswitch.db import consistency_db # noqa
from neutron.plugins.bigswitch import routerrule_db # noqa
from neutron.plugins.brocade.db import models as brocade_models # noqa

View File

@ -13,9 +13,12 @@
import abc
import netaddr
from oslo_config import cfg
import six
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron.ipam import exceptions as ipam_exc
@six.add_metaclass(abc.ABCMeta)
@ -37,8 +40,8 @@ class SubnetRequest(object):
:param tenant_id: The tenant id who will own the subnet
:type tenant_id: str uuid
:param subnet_id: Neutron's subnet id
:type subnet_id: str uuid
:param subnet_id: Neutron's subnet ID
:type subnet_id: srt uuid
:param gateway_ip: An IP to reserve for the subnet gateway.
:type gateway_ip: None or convertible to netaddr.IPAddress
:param allocation_pools: The pool from which IPAM should allocate
@ -96,16 +99,19 @@ class SubnetRequest(object):
def allocation_pools(self):
return self._allocation_pools
def _validate_with_subnet(self, subnet):
if self.gateway_ip:
if self.gateway_ip not in subnet:
raise ValueError("gateway_ip is not in the subnet")
def _validate_with_subnet(self, subnet_cidr):
if self.gateway_ip and cfg.CONF.force_gateway_on_subnet:
gw_ip = netaddr.IPAddress(self.gateway_ip)
if (gw_ip.version == 4 or (gw_ip.version == 6
and not gw_ip.is_link_local())):
if self.gateway_ip not in subnet_cidr:
raise ValueError("gateway_ip is not in the subnet")
if self.allocation_pools:
if subnet.version != self.allocation_pools[0].version:
if subnet_cidr.version != self.allocation_pools[0].version:
raise ValueError("allocation_pools use the wrong ip version")
for pool in self.allocation_pools:
if pool not in subnet:
if pool not in subnet_cidr:
raise ValueError("allocation_pools are not in the subnet")
@ -151,7 +157,7 @@ class SpecificSubnetRequest(SubnetRequest):
allocation, even overlapping ones. This can be expanded on by future
blueprints.
"""
def __init__(self, tenant_id, subnet_id, subnet,
def __init__(self, tenant_id, subnet_id, subnet_cidr,
gateway_ip=None, allocation_pools=None):
"""
:param subnet: The subnet requested. Can be IPv4 or IPv6. However,
@ -165,16 +171,16 @@ class SpecificSubnetRequest(SubnetRequest):
gateway_ip=gateway_ip,
allocation_pools=allocation_pools)
self._subnet = netaddr.IPNetwork(subnet)
self._validate_with_subnet(self._subnet)
self._subnet_cidr = netaddr.IPNetwork(subnet_cidr)
self._validate_with_subnet(self._subnet_cidr)
@property
def subnet(self):
return self._subnet
def subnet_cidr(self):
return self._subnet_cidr
@property
def prefixlen(self):
return self._subnet.prefixlen
return self._subnet_cidr.prefixlen
@six.add_metaclass(abc.ABCMeta)
@ -201,5 +207,37 @@ class AnyAddressRequest(AddressRequest):
"""Used to request any available address from the pool."""
class AutomaticAddressRequest(SpecificAddressRequest):
"""Used to create auto generated addresses, such as EUI64"""
EUI64 = 'eui64'
def _generate_eui64_address(self, **kwargs):
if set(kwargs) != set(['prefix', 'mac']):
raise ipam_exc.AddressCalculationFailure(
address_type='eui-64',
reason='must provide exactly 2 arguments - cidr and MAC')
prefix = kwargs['prefix']
mac_address = kwargs['mac']
return ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac_address)
_address_generators = {EUI64: _generate_eui64_address}
def __init__(self, address_type=EUI64, **kwargs):
"""
This constructor builds an automatic IP address. Parameter needed for
generating it can be passed as optional keyword arguments.
:param address_type: the type of address to generate.
It could be a eui-64 address, a random IPv6 address, or
a ipv4 link-local address.
For the Kilo release only eui-64 addresses will be supported.
"""
address_generator = self._address_generators.get(address_type)
if not address_generator:
raise ipam_exc.InvalidAddressType(address_type=address_type)
address = address_generator(self, **kwargs)
super(AutomaticAddressRequest, self).__init__(address)
class RouterGatewayAddressRequest(AddressRequest):
"""Used to request allocating the special router gateway address."""

View File

@ -14,6 +14,10 @@ import abc
import six
from oslo_log import log
LOG = log.getLogger(__name__)
@six.add_metaclass(abc.ABCMeta)
class Pool(object):
@ -22,20 +26,21 @@ class Pool(object):
There should be an instance of the driver for every subnet pool.
"""
def __init__(self, subnet_pool_id):
def __init__(self, subnetpool, context):
"""Initialize pool
:param subnet_pool_id: SubnetPool ID of the address space to use.
:type subnet_pool_id: str uuid
:param subnetpool: SubnetPool of the address space to use.
:type subnetpool: dict
"""
self._subnet_pool_id = subnet_pool_id
self._subnetpool = subnetpool
self._context = context
@classmethod
def get_instance(cls, subnet_pool_id):
def get_instance(cls, subnet_pool, context):
"""Returns an instance of the configured IPAM driver
:param subnet_pool_id: Subnet pool ID of the address space to use.
:type subnet_pool_id: str uuid
:param subnet_pool: Subnet pool of the address space to use.
:type subnet_pool: dict
:returns: An instance of Driver for the given subnet pool
"""
raise NotImplementedError
@ -121,3 +126,14 @@ class Subnet(object):
:returns: An instance of SpecificSubnetRequest with the subnet detail.
"""
@abc.abstractmethod
def associate_neutron_subnet(self, subnet_id):
"""Associate the IPAM subnet with a neutron subnet.
This operation should be performed to attach a neutron subnet to the
current subnet instance. In some cases IPAM subnets may be created
independently of neutron subnets and associated at a later stage.
:param subnet_id: neutron subnet identifier.
"""

View File

View File

@ -0,0 +1,218 @@
# Copyright 2015 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from oslo_log import log
from neutron.ipam.drivers.neutrondb_ipam import db_models
from neutron.openstack.common import uuidutils
LOG = log.getLogger(__name__)
# Database operations for Neutron's DB-backed IPAM driver
class IpamSubnetManager(object):
@classmethod
def load_by_neutron_subnet_id(cls, session, neutron_subnet_id):
return session.query(db_models.IpamSubnet).filter_by(
neutron_subnet_id=neutron_subnet_id).first()
def __init__(self, ipam_subnet_id, neutron_subnet_id):
self._ipam_subnet_id = ipam_subnet_id
self._neutron_subnet_id = neutron_subnet_id
@property
def neutron_id(self):
return self._neutron_subnet_id
def create(self, session):
"""Create database models for an IPAM subnet.
This method creates a subnet resource for the IPAM driver and
associates it with its neutron identifier, if specified.
:param session: database sesssion.
:returns: the idenfier of created IPAM subnet
"""
if not self._ipam_subnet_id:
self._ipam_subnet_id = uuidutils.generate_uuid()
ipam_subnet = db_models.IpamSubnet(
id=self._ipam_subnet_id,
neutron_subnet_id=self._neutron_subnet_id)
session.add(ipam_subnet)
return self._ipam_subnet_id
def associate_neutron_id(self, session, neutron_subnet_id):
session.query(db_models.IpamSubnet).filter_by(
id=self._ipam_subnet_id).update(
{'neutron_subnet_id': neutron_subnet_id})
self._neutron_subnet_id = neutron_subnet_id
def create_pool(self, session, pool_start, pool_end):
"""Create an allocation pool and availability ranges for the subnet.
This method does not perform any validation on parameters; it simply
persist data on the database.
:param pool_start: string expressing the start of the pool
:param pool_end: string expressing the end of the pool
:return: the newly created pool object.
"""
ip_pool = db_models.IpamAllocationPool(
ipam_subnet_id=self._ipam_subnet_id,
first_ip=pool_start,
last_ip=pool_end)
session.add(ip_pool)
ip_range = db_models.IpamAvailabilityRange(
allocation_pool=ip_pool,
first_ip=pool_start,
last_ip=pool_end)
session.add(ip_range)
return ip_pool
def delete_allocation_pools(self, session):
"""Remove all allocation pools for the current subnet.
:param session: database session
"""
session.query(db_models.IpamAllocationPool).filter_by(
ipam_subnet_id=self._ipam_subnet_id).delete()
def list_pools(self, session):
"""Return pools for the current subnet."""
return session.query(
db_models.IpamAllocationPool).filter_by(
ipam_subnet_id=self._ipam_subnet_id)
def _range_query(self, session, locking):
range_qry = session.query(
db_models.IpamAvailabilityRange).join(
db_models.IpamAllocationPool).filter_by(
ipam_subnet_id=self._ipam_subnet_id)
if locking:
range_qry = range_qry.with_lockmode('update')
return range_qry
def get_first_range(self, session, locking=False):
"""Return the first availability range for the subnet
:param session: database session
:param locking: specifies whether a write-intent lock should be
performed on the database operation
:return: first available range as instance of
neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange
"""
return self._range_query(session, locking).first()
def list_ranges_by_subnet_id(self, session, locking=False):
"""Return availability ranges for a given ipam subnet
:param session: database session
:param locking: specifies whether a write-intent lock should be
acquired with this database operation.
:return: list of availability ranges as instances of
neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange
"""
return self._range_query(session, locking)
def list_ranges_by_allocation_pool(self, session, allocation_pool_id,
locking=False):
"""Return availability ranges for a given pool.
:param session: database session
:param allocation_pool_id: allocation pool identifier
:param locking: specifies whether a write-intent lock should be
acquired with this database operation.
:return: list of availability ranges as instances of
neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange
"""
return session.query(
db_models.IpamAvailabilityRange).join(
db_models.IpamAllocationPool).filter_by(
id=allocation_pool_id)
def create_range(self, session, allocation_pool_id,
range_start, range_end):
"""Create an availabilty range for a given pool.
This method does not perform any validation on parameters; it simply
persist data on the database.
:param session: database session
:param allocation_pool_id: allocation pool identifier
:param range_start: first ip address in the range
:param range_end: last ip address in the range
:return: the newly created availability range as an instance of
neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAvailabilityRange
"""
new_ip_range = db_models.IpamAvailabilityRange(
allocation_pool_id=allocation_pool_id,
first_ip=range_start,
last_ip=range_end)
session.add(new_ip_range)
return new_ip_range
def check_unique_allocation(self, session, ip_address):
"""Validate that the IP address on the subnet is not in use."""
iprequest = session.query(db_models.IpamAllocation).filter_by(
ipam_subnet_id=self._ipam_subnet_id, status='ALLOCATED',
ip_address=ip_address).first()
if iprequest:
return False
return True
def list_allocations(self, session, status='ALLOCATED', locking=False):
"""Return current allocations for the subnet.
:param session: database session
:param status: IP allocation status
:param locking: specifies whether a write-intent lock should be
performed on the database operation
:returns: a list of IP allocation as instance of
neutron.ipam.drivers.neutrondb_ipam.db_models.IpamAllocation
"""
ip_qry = session.query(
db_models.IpamAllocation).filter_by(
ipam_subnet_id=self._ipam_subnet_id,
status=status)
if locking:
ip_qry = ip_qry.with_lockmode('update')
return ip_qry
def create_allocation(self, session, ip_address,
status='ALLOCATED'):
"""Create an IP allocation entry.
:param session: database session
:param ip_address: the IP address to allocate
:param status: IP allocation status
"""
ip_request = db_models.IpamAllocation(
ip_address=ip_address,
status=status,
ipam_subnet_id=self._ipam_subnet_id)
session.add(ip_request)
def delete_allocation(self, session, ip_address):
"""Remove an IP allocation for this subnet.
:param session: database session
:param ip_address: IP address for which the allocation entry should
be removed.
"""
return session.query(db_models.IpamAllocation).filter_by(
ip_address=ip_address,
ipam_subnet_id=self._ipam_subnet_id).delete(
synchronize_session=False)

View File

@ -0,0 +1,111 @@
# Copyright 2015 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import sqlalchemy as sa
from sqlalchemy import orm as sa_orm
from neutron.db import model_base
from neutron.db import models_v2
# Database models used by the neutron DB IPAM driver
# NOTE(salv-orlando): This is meant to replace the class
# neutron.db.models_v2.IPAvailabilityRange.
class IpamAvailabilityRange(model_base.BASEV2):
"""Internal representation of available IPs for Neutron subnets.
Allocation - first entry from the range will be allocated.
If the first entry is equal to the last entry then this row
will be deleted.
Recycling ips involves reading the IPAllocationPool and IPAllocation tables
and inserting ranges representing available ips. This happens after the
final allocation is pulled from this table and a new ip allocation is
requested. Any contiguous ranges of available ips will be inserted as a
single range.
"""
allocation_pool_id = sa.Column(sa.String(36),
sa.ForeignKey('ipamallocationpools.id',
ondelete="CASCADE"),
nullable=False,
primary_key=True)
first_ip = sa.Column(sa.String(64), nullable=False, primary_key=True)
last_ip = sa.Column(sa.String(64), nullable=False, primary_key=True)
__table_args__ = (
sa.Index('ix_ipamavailabilityranges_first_ip_allocation_pool_id',
'first_ip', 'allocation_pool_id'),
sa.Index('ix_ipamavailabilityranges_last_ip_allocation_pool_id',
'last_ip', 'allocation_pool_id'),
model_base.BASEV2.__table_args__
)
def __repr__(self):
return "%s - %s" % (self.first_ip, self.last_ip)
# NOTE(salv-orlando): The following data model creates redundancy with
# models_v2.IPAllocationPool. This level of data redundancy could be tolerated
# considering that the following model is specific to the IPAM driver logic.
# It therefore represents an internal representation of a subnet allocation
# pool and can therefore change in the future, where as
# models_v2.IPAllocationPool is the representation of IP allocation pools in
# the management layer and therefore its evolution is subject to APIs backward
# compatibility policies
class IpamAllocationPool(model_base.BASEV2, models_v2.HasId):
"""Representation of an allocation pool in a Neutron subnet."""
ipam_subnet_id = sa.Column(sa.String(36),
sa.ForeignKey('ipamsubnets.id',
ondelete="CASCADE"),
nullable=False)
first_ip = sa.Column(sa.String(64), nullable=False)
last_ip = sa.Column(sa.String(64), nullable=False)
available_ranges = sa_orm.relationship(IpamAvailabilityRange,
backref='allocation_pool',
lazy="joined",
cascade='all, delete-orphan')
def __repr__(self):
return "%s - %s" % (self.first_ip, self.last_ip)
class IpamSubnet(model_base.BASEV2, models_v2.HasId):
"""Association between IPAM entities and neutron subnets.
For subnet data persistency - such as cidr and gateway IP, the IPAM
driver relies on Neutron's subnet model as source of truth to limit
data redundancy.
"""
neutron_subnet_id = sa.Column(sa.String(36),
nullable=True)
allocation_pools = sa_orm.relationship(IpamAllocationPool,
backref='subnet',
lazy="joined",
cascade='delete')
class IpamAllocation(model_base.BASEV2):
"""Model class for IP Allocation requests. """
ip_address = sa.Column(sa.String(64), nullable=False, primary_key=True)
status = sa.Column(sa.String(36))
# The subnet identifier is redundant but come handy for looking up
# IP addresses to remove.
ipam_subnet_id = sa.Column(sa.String(36),
sa.ForeignKey('ipamsubnets.id',
ondelete="CASCADE"),
primary_key=True,
nullable=False)

View File

@ -0,0 +1,438 @@
# Copyright 2015 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from oslo_log import log
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.db import api as db_api
from neutron.i18n import _LE
from neutron import ipam
from neutron.ipam import driver as ipam_base
from neutron.ipam.drivers.neutrondb_ipam import db_api as ipam_db_api
from neutron.ipam import exceptions as ipam_exc
from neutron.ipam import subnet_alloc
from neutron.ipam import utils as ipam_utils
from neutron import manager
from neutron.openstack.common import uuidutils
LOG = log.getLogger(__name__)
class NeutronDbSubnet(ipam_base.Subnet):
"""Manage IP addresses for Neutron DB IPAM driver.
This class implements the strategy for IP address allocation and
deallocation for the Neutron DB IPAM driver.
Allocation for IP addresses is based on the concept of availability
ranges, which were already used in Neutron's DB base class for handling
IPAM operations.
"""
@classmethod
def create_allocation_pools(cls, subnet_manager, session, pools):
for pool in pools:
subnet_manager.create_pool(
session,
netaddr.IPAddress(pool.first).format(),
netaddr.IPAddress(pool.last).format())
@classmethod
def create_from_subnet_request(cls, subnet_request, ctx):
ipam_subnet_id = uuidutils.generate_uuid()
subnet_manager = ipam_db_api.IpamSubnetManager(
ipam_subnet_id,
None)
# Create subnet resource
session = ctx.session
subnet_manager.create(session)
# If allocation pools are not specified, define them around
# the subnet's gateway IP
if not subnet_request.allocation_pools:
pools = ipam_utils.generate_pools(subnet_request.subnet_cidr,
subnet_request.gateway_ip)
else:
pools = subnet_request.allocation_pools
# Create IPAM allocation pools and availability ranges
cls.create_allocation_pools(subnet_manager, session, pools)
return cls(ipam_subnet_id,
ctx,
cidr=subnet_request.subnet_cidr,
allocation_pools=pools,
gateway_ip=subnet_request.gateway_ip,
tenant_id=subnet_request.tenant_id,
subnet_id=subnet_request.subnet_id,
subnet_id_not_set=True)
@classmethod
def load(cls, neutron_subnet_id, ctx):
"""Load an IPAM subnet from the database given its neutron ID.
:param neutron_subnet_id: neutron subnet identifier.
"""
ipam_subnet = ipam_db_api.IpamSubnetManager.load_by_neutron_subnet_id(
ctx.session, neutron_subnet_id)
if not ipam_subnet:
LOG.error(_LE("Unable to retrieve IPAM subnet as the referenced "
"Neutron subnet %s does not exist"),
neutron_subnet_id)
raise n_exc.SubnetNotFound(subnet_id=neutron_subnet_id)
pools = []
for pool in ipam_subnet.allocation_pools:
pools.append(netaddr.IPRange(pool['first_ip'], pool['last_ip']))
neutron_subnet = cls._fetch_subnet(ctx, neutron_subnet_id)
return cls(ipam_subnet['id'],
ctx,
cidr=neutron_subnet['cidr'],
allocation_pools=pools,
gateway_ip=neutron_subnet['gateway_ip'],
tenant_id=neutron_subnet['tenant_id'],
subnet_id=neutron_subnet_id)
@classmethod
def _fetch_subnet(cls, context, id):
plugin = manager.NeutronManager.get_plugin()
return plugin._get_subnet(context, id)
def __init__(self, internal_id, ctx, cidr=None,
allocation_pools=None, gateway_ip=None, tenant_id=None,
subnet_id=None, subnet_id_not_set=False):
# NOTE: In theory it could have been possible to grant the IPAM
# driver direct access to the database. While this is possible,
# it would have led to duplicate code and/or non-trivial
# refactorings in neutron.db.db_base_plugin_v2.
# This is because in the Neutron V2 plugin logic DB management is
# encapsulated within the plugin.
self._cidr = cidr
self._pools = allocation_pools
self._gateway_ip = gateway_ip
self._tenant_id = tenant_id
self._subnet_id = None if subnet_id_not_set else subnet_id
self.subnet_manager = ipam_db_api.IpamSubnetManager(internal_id,
self._subnet_id)
self._context = ctx
def _verify_ip(self, session, ip_address):
"""Verify whether IP address can be allocated on subnet.
:param session: database session
:param ip_address: String representing the IP address to verify
:raises: InvalidInput, IpAddressAlreadyAllocated
"""
# Ensure that the IP's are unique
if not self.subnet_manager.check_unique_allocation(session,
ip_address):
raise ipam_exc.IpAddressAlreadyAllocated(
subnet_id=self.subnet_manager.neutron_id,
ip=ip_address)
# Ensure that the IP is valid on the subnet
if not ipam_utils.check_subnet_ip(self._cidr, ip_address):
raise ipam_exc.InvalidIpForSubnet(
subnet_id=self.subnet_manager.neutron_id,
ip=ip_address)
def _allocate_specific_ip(self, session, ip_address,
allocation_pool_id=None):
"""Remove an IP address from subnet's availability ranges.
This method is supposed to be called from within a database
transaction, otherwise atomicity and integrity might not be
enforced and the operation might result in incosistent availability
ranges for the subnet.
:param session: database session
:param ip_address: ip address to mark as allocated
:param allocation_pool_id: identifier of the allocation pool from
which the ip address has been extracted. If not specified this
routine will scan all allocation pools.
:returns: list of IP ranges as instances of IPAvailabilityRange
"""
# Return immediately for EUI-64 addresses. For this
# class of subnets availability ranges do not apply
if ipv6_utils.is_eui64_address(ip_address):
return
LOG.debug("Removing %(ip_address)s from availability ranges for "
"subnet id:%(subnet_id)s",
{'ip_address': ip_address,
'subnet_id': self.subnet_manager.neutron_id})
# Netaddr's IPRange and IPSet objects work very well even with very
# large subnets, including IPv6 ones.
final_ranges = []
if allocation_pool_id:
av_ranges = self.subnet_manager.list_ranges_by_allocation_pool(
session, allocation_pool_id, locking=True)
else:
av_ranges = self.subnet_manager.list_ranges_by_subnet_id(
session, locking=True)
for db_range in av_ranges:
initial_ip_set = netaddr.IPSet(netaddr.IPRange(
db_range['first_ip'], db_range['last_ip']))
final_ip_set = initial_ip_set - netaddr.IPSet([ip_address])
if not final_ip_set:
# Range exhausted - bye bye
session.delete(db_range)
continue
if initial_ip_set == final_ip_set:
# IP address does not fall within the current range, move
# to the next one
final_ranges.append(db_range)
continue
for new_range in final_ip_set.iter_ipranges():
# store new range in database
# use netaddr.IPAddress format() method which is equivalent
# to str(...) but also enables us to use different
# representation formats (if needed) for IPv6.
first_ip = netaddr.IPAddress(new_range.first)
last_ip = netaddr.IPAddress(new_range.last)
if (db_range['first_ip'] == first_ip.format() or
db_range['last_ip'] == last_ip.format()):
db_range['first_ip'] = first_ip.format()
db_range['last_ip'] = last_ip.format()
LOG.debug("Adjusted availability range for pool %s",
db_range['allocation_pool_id'])
final_ranges.append(db_range)
else:
new_ip_range = self.subnet_manager.create_range(
session,
db_range['allocation_pool_id'],
first_ip.format(),
last_ip.format())
LOG.debug("Created availability range for pool %s",
new_ip_range['allocation_pool_id'])
final_ranges.append(new_ip_range)
# Most callers might ignore this return value, which is however
# useful for testing purposes
LOG.debug("Availability ranges for subnet id %(subnet_id)s "
"modified: %(new_ranges)s",
{'subnet_id': self.subnet_manager.neutron_id,
'new_ranges': ", ".join(["[%s; %s]" %
(r['first_ip'], r['last_ip']) for
r in final_ranges])})
return final_ranges
def _rebuild_availability_ranges(self, session):
"""Rebuild availability ranges.
This method should be called only when the availability ranges are
exhausted or when the subnet's allocation pools are updated,
which may trigger a deletion of the availability ranges.
For this operation to complete successfully, this method uses a
locking query to ensure that no IP is allocated while the regeneration
of availability ranges is in progress.
:param session: database session
"""
# List all currently allocated addresses, and prevent further
# allocations with a write-intent lock.
# NOTE: because of this driver's logic the write intent lock is
# probably unnecessary as this routine is called when the availability
# ranges for a subnet are exhausted and no further address can be
# allocated.
# TODO(salv-orlando): devise, if possible, a more efficient solution
# for building the IPSet to ensure decent performances even with very
# large subnets.
allocations = netaddr.IPSet(
[netaddr.IPAddress(allocation['ip_address']) for
allocation in self.subnet_manager.list_allocations(
session, locking=True)])
# MEH MEH
# There should be no need to set a write intent lock on the allocation
# pool table. Indeed it is not important for the correctness of this
# operation if the allocation pools are updated by another operation,
# which will result in the generation of new availability ranges.
# NOTE: it might be argued that an allocation pool update should in
# theory preempt rebuilding the availability range. This is an option
# to consider for future developments.
LOG.debug("Rebuilding availability ranges for subnet %s",
self.subnet_manager.neutron_id)
for pool in self.subnet_manager.list_pools(session):
# Create a set of all addresses in the pool
poolset = netaddr.IPSet(netaddr.IPRange(pool['first_ip'],
pool['last_ip']))
# Use set difference to find free addresses in the pool
available = poolset - allocations
# Write the ranges to the db
for ip_range in available.iter_ipranges():
av_range = self.subnet_manager.create_range(
session,
pool['id'],
netaddr.IPAddress(ip_range.first).format(),
netaddr.IPAddress(ip_range.last).format())
session.add(av_range)
def _generate_ip(self, session):
try:
return self._try_generate_ip(session)
except ipam_exc.IpAddressGenerationFailure:
self._rebuild_availability_ranges(session)
return self._try_generate_ip(session)
def _try_generate_ip(self, session):
"""Generate an IP address from availability ranges."""
ip_range = self.subnet_manager.get_first_range(session, locking=True)
if not ip_range:
LOG.debug("All IPs from subnet %(subnet_id)s allocated",
{'subnet_id': self.subnet_manager.neutron_id})
raise ipam_exc.IpAddressGenerationFailure(
subnet_id=self.subnet_manager.neutron_id)
# A suitable range was found. Return IP address.
ip_address = ip_range['first_ip']
LOG.debug("Allocated IP - %(ip_address)s from range "
"[%(first_ip)s; %(last_ip)s]",
{'ip_address': ip_address,
'first_ip': ip_address,
'last_ip': ip_range['last_ip']})
return ip_address, ip_range['allocation_pool_id']
def allocate(self, address_request):
# NOTE(salv-orlando): Creating a new db session might be a rather
# dangerous thing to do, if executed from within another database
# transaction. Therefore the IPAM driver should never be
# called from within a database transaction, which is also good
# practice since in the general case these drivers may interact
# with remote backends
session = self._context.session
all_pool_id = None
# NOTE(salv-orlando): It would probably better to have a simpler
# model for address requests and just check whether there is a
# specific IP address specified in address_request
if isinstance(address_request, ipam.SpecificAddressRequest):
# This handles both specific and automatic address requests
# Check availability of requested IP
ip_address = str(address_request.address)
self._verify_ip(session, ip_address)
else:
ip_address, all_pool_id = self._generate_ip(session)
self._allocate_specific_ip(session, ip_address, all_pool_id)
# Create IP allocation request object
# The only defined status at this stage is 'ALLOCATED'.
# More states will be available in the future - e.g.: RECYCLABLE
self.subnet_manager.create_allocation(session, ip_address)
return ip_address
def deallocate(self, address):
# This is almost a no-op because the Neutron DB IPAM driver does not
# delete IPAllocation objects, neither rebuilds availability ranges
# at every deallocation. The only operation it performs is to delete
# an IPRequest entry.
session = self._context.session
count = self.subnet_manager.delete_allocation(
session, address)
# count can hardly be greater than 1, but it can be 0...
if not count:
raise ipam_exc.IpAddressAllocationNotFound(
subnet_id=self.subnet_manager.neutron_id,
ip_address=address)
def update_allocation_pools(self, pools):
# Pools have already been validated in the subnet request object which
# was sent to the subnet pool driver. Further validation should not be
# required.
session = db_api.get_session()
self.subnet_manager.delete_allocation_pools(session)
self.create_allocation_pools(self.subnet_manager, session, pools)
self._pools = pools
def get_details(self):
"""Return subnet data as a SpecificSubnetRequest"""
return ipam.SpecificSubnetRequest(
self._tenant_id, self.subnet_manager.neutron_id,
self._cidr, self._gateway_ip, self._pools)
def associate_neutron_subnet(self, subnet_id):
"""Set neutron identifier for this subnet"""
session = self._context.session
if self._subnet_id:
raise
# IPAMSubnet does not have foreign key to Subnet,
# so need verify subnet existence.
NeutronDbSubnet._fetch_subnet(self._context, subnet_id)
self.subnet_manager.associate_neutron_id(session, subnet_id)
self._subnet_id = subnet_id
class NeutronDbPool(subnet_alloc.SubnetAllocator):
"""Subnet pools backed by Neutron Database.
As this driver does not implement yet the subnet pool concept, most
operations are either trivial or no-ops.
"""
def get_subnet(self, subnet_id):
"""Retrieve an IPAM subnet.
:param subnet_id: Neutron subnet identifier
:returns: a NeutronDbSubnet instance
"""
return NeutronDbSubnet.load(subnet_id, self._context)
def allocate_subnet(self, subnet_request):
"""Create an IPAMSubnet object for the provided cidr.
This method does not actually do any operation in the driver, given
its simplified nature.
:param cidr: subnet's CIDR
:returns: a NeutronDbSubnet instance
"""
if self._subnetpool:
subnet = super(NeutronDbPool, self).allocate_subnet(subnet_request)
subnet_request = subnet.get_details()
# SubnetRequest must be an instance of SpecificSubnet
if not isinstance(subnet_request, ipam.SpecificSubnetRequest):
raise ipam_exc.InvalidSubnetRequestType(
subnet_type=type(subnet_request))
return NeutronDbSubnet.create_from_subnet_request(subnet_request,
self._context)
def update_subnet(self, subnet_request):
"""Update subnet info the in the IPAM driver.
The only update subnet information the driver needs to be aware of
are allocation pools.
"""
if not subnet_request.subnet_id:
raise ipam_exc.InvalidSubnetRequest(
reason=("An identifier must be specified when updating "
"a subnet"))
if not subnet_request.allocation_pools:
LOG.debug("Update subnet request for subnet %s did not specify "
"new allocation pools, there is nothing to do",
subnet_request.subnet_id)
return
subnet = NeutronDbSubnet.load(subnet_request.subnet_id, self._context)
subnet.update_allocation_pools(subnet_request.allocation_pools)
return subnet
def remove_subnet(self, subnet):
"""Remove data structures for a given subnet.
All the IPAM-related data are cleared when a subnet is deleted thanks
to cascaded foreign key relationships.
"""
pass

View File

@ -0,0 +1,62 @@
# Copyright 2015 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron.common import exceptions
class InvalidSubnetRequestType(exceptions.BadRequest):
message = _("Cannot handle subnet of type %(subnet_type)s")
class AddressCalculationFailure(exceptions.NeutronException):
message = _("Unable to calculate %(address_type)s address because of:"
"%(reason)s")
class InvalidAddressType(exceptions.NeutronException):
message = _("Unknown address type %(address_type)s")
class IpAddressAllocationNotFound(exceptions.NeutronException):
message = _("Unable to find IP address %(ip_address)s on subnet "
"%(subnet_id)s")
class IpAddressAlreadyAllocated(exceptions.Conflict):
message = _("IP address %(ip)s already allocated in subnet %(subnet_id)s")
class InvalidIpForSubnet(exceptions.BadRequest):
message = _("IP address %(ip)s does not belong to subnet %(subnet_id)s")
class InvalidAddressRequest(exceptions.BadRequest):
message = _("The address allocation request could not be satisfied "
"because: %(reason)s")
class InvalidSubnetRequest(exceptions.BadRequest):
message = _("The subnet request could not be satisfied because: "
"%(reason)s")
class AllocationOnAutoAddressSubnet(exceptions.NeutronException):
message = (_("IPv6 address %(ip)s cannot be directly "
"assigned to a port on subnet %(subnet_id)s as the "
"subnet is configured for automatic addresses"))
class IpAddressGenerationFailure(exceptions.Conflict):
message = _("No more IP addresses available for subnet %(subnet_id)s.")

View File

@ -23,6 +23,7 @@ from neutron.common import exceptions as n_exc
from neutron.db import models_v2
import neutron.ipam as ipam
from neutron.ipam import driver
from neutron.ipam import utils as ipam_utils
from neutron.openstack.common import uuidutils
@ -33,19 +34,19 @@ class SubnetAllocator(driver.Pool):
make merging into IPAM framework easier in future cycles.
"""
def __init__(self, subnetpool):
self._subnetpool = subnetpool
def __init__(self, subnetpool, context):
super(SubnetAllocator, self).__init__(subnetpool, context)
self._sp_helper = SubnetPoolHelper()
def _get_allocated_cidrs(self, session):
query = session.query(
def _get_allocated_cidrs(self):
query = self._context.session.query(
models_v2.Subnet).with_lockmode('update')
subnets = query.filter_by(subnetpool_id=self._subnetpool['id'])
return (x.cidr for x in subnets)
def _get_available_prefix_list(self, session):
def _get_available_prefix_list(self):
prefixes = (x.cidr for x in self._subnetpool.prefixes)
allocations = self._get_allocated_cidrs(session)
allocations = self._get_allocated_cidrs()
prefix_set = netaddr.IPSet(iterable=prefixes)
allocation_set = netaddr.IPSet(iterable=allocations)
available_set = prefix_set.difference(allocation_set)
@ -57,11 +58,11 @@ class SubnetAllocator(driver.Pool):
def _num_quota_units_in_prefixlen(self, prefixlen, quota_unit):
return math.pow(2, quota_unit - prefixlen)
def _allocations_used_by_tenant(self, session, quota_unit):
def _allocations_used_by_tenant(self, quota_unit):
subnetpool_id = self._subnetpool['id']
tenant_id = self._subnetpool['tenant_id']
with session.begin(subtransactions=True):
qry = session.query(
with self._context.session.begin(subtransactions=True):
qry = self._context.session.query(
models_v2.Subnet).with_lockmode('update')
allocations = qry.filter_by(subnetpool_id=subnetpool_id,
tenant_id=tenant_id)
@ -72,60 +73,60 @@ class SubnetAllocator(driver.Pool):
quota_unit)
return value
def _check_subnetpool_tenant_quota(self, session, tenant_id, prefixlen):
def _check_subnetpool_tenant_quota(self, tenant_id, prefixlen):
quota_unit = self._sp_helper.ip_version_subnetpool_quota_unit(
self._subnetpool['ip_version'])
quota = self._subnetpool.get('default_quota')
if quota:
used = self._allocations_used_by_tenant(session, quota_unit)
used = self._allocations_used_by_tenant(quota_unit)
requested_units = self._num_quota_units_in_prefixlen(prefixlen,
quota_unit)
if used + requested_units > quota:
raise n_exc.SubnetPoolQuotaExceeded()
def _allocate_any_subnet(self, session, request):
with session.begin(subtransactions=True):
self._check_subnetpool_tenant_quota(session,
request.tenant_id,
def _allocate_any_subnet(self, request):
with self._context.session.begin(subtransactions=True):
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)
prefix_pool = self._get_available_prefix_list(session)
prefix_pool = self._get_available_prefix_list()
for prefix in prefix_pool:
if request.prefixlen >= prefix.prefixlen:
subnet = prefix.subnet(request.prefixlen).next()
gateway_ip = request.gateway_ip
if not gateway_ip:
gateway_ip = subnet.network + 1
pools = ipam_utils.generate_pools(subnet.cidr,
gateway_ip)
return IpamSubnet(request.tenant_id,
request.subnet_id,
subnet.cidr,
gateway_ip=gateway_ip,
allocation_pools=None)
allocation_pools=pools)
msg = _("Insufficient prefix space to allocate subnet size /%s")
raise n_exc.SubnetAllocationError(reason=msg %
str(request.prefixlen))
def _allocate_specific_subnet(self, session, request):
with session.begin(subtransactions=True):
self._check_subnetpool_tenant_quota(session,
request.tenant_id,
def _allocate_specific_subnet(self, request):
with self._context.session.begin(subtransactions=True):
self._check_subnetpool_tenant_quota(request.tenant_id,
request.prefixlen)
subnet = request.subnet
available = self._get_available_prefix_list(session)
matched = netaddr.all_matching_cidrs(subnet, available)
if len(matched) is 1 and matched[0].prefixlen <= subnet.prefixlen:
cidr = request.subnet_cidr
available = self._get_available_prefix_list()
matched = netaddr.all_matching_cidrs(cidr, available)
if len(matched) is 1 and matched[0].prefixlen <= cidr.prefixlen:
return IpamSubnet(request.tenant_id,
request.subnet_id,
subnet.cidr,
cidr,
gateway_ip=request.gateway_ip,
allocation_pools=request.allocation_pools)
msg = _("Cannot allocate requested subnet from the available "
"set of prefixes")
raise n_exc.SubnetAllocationError(reason=msg)
def allocate_subnet(self, session, request):
def allocate_subnet(self, request):
max_prefixlen = int(self._subnetpool['max_prefixlen'])
min_prefixlen = int(self._subnetpool['min_prefixlen'])
if request.prefixlen > max_prefixlen:
@ -138,20 +139,20 @@ class SubnetAllocator(driver.Pool):
min_prefixlen=min_prefixlen)
if isinstance(request, ipam.AnySubnetRequest):
return self._allocate_any_subnet(session, request)
return self._allocate_any_subnet(request)
elif isinstance(request, ipam.SpecificSubnetRequest):
return self._allocate_specific_subnet(session, request)
return self._allocate_specific_subnet(request)
else:
msg = _("Unsupported request type")
raise n_exc.SubnetAllocationError(reason=msg)
def get_subnet(self, subnet, subnet_id):
def get_subnet(self, subnet_id):
raise NotImplementedError()
def update_subnet(self, request):
raise NotImplementedError()
def remove_subnet(self, subnet, subnet_id):
def remove_subnet(self, subnet_id):
raise NotImplementedError()
@ -163,11 +164,12 @@ class IpamSubnet(driver.Subnet):
cidr,
gateway_ip=None,
allocation_pools=None):
self._req = ipam.SpecificSubnetRequest(tenant_id,
subnet_id,
cidr,
gateway_ip=gateway_ip,
allocation_pools=None)
self._req = ipam.SpecificSubnetRequest(
tenant_id,
subnet_id,
cidr,
gateway_ip=gateway_ip,
allocation_pools=allocation_pools)
def allocate(self, address_request):
raise NotImplementedError()
@ -178,6 +180,9 @@ class IpamSubnet(driver.Subnet):
def get_details(self):
return self._req
def associate_neutron_subnet(self, subnet_id):
pass
class SubnetPoolReader(object):
'''Class to assist with reading a subnetpool, loading defaults, and

48
neutron/ipam/utils.py Normal file
View File

@ -0,0 +1,48 @@
# Copyright 2015 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
def check_subnet_ip(cidr, ip_address):
"""Validate that the IP address is on the subnet."""
ip = netaddr.IPAddress(ip_address)
net = netaddr.IPNetwork(cidr)
# Check that the IP is valid on subnet. This cannot be the
# network or the broadcast address
return (ip != net.network and ip != net.broadcast
and net.netmask & ip == net.network)
def generate_pools(cidr, gateway_ip):
"""Create IP allocation pools for a specified subnet
The Neutron API defines a subnet's allocation pools as a list of
IPRange objects for defining the pool range.
"""
pools = []
# Auto allocate the pool around gateway_ip
net = netaddr.IPNetwork(cidr)
first_ip = net.first + 1
last_ip = net.last - 1
gw_ip = int(netaddr.IPAddress(gateway_ip or net.last))
# Use the gw_ip to find a point for splitting allocation pools
# for this subnet
split_ip = min(max(gw_ip, net.first), net.last)
if split_ip > first_ip:
pools.append(netaddr.IPRange(first_ip, split_ip - 1))
if split_ip < last_ip:
pools.append(netaddr.IPRange(split_ip + 1, last_ip))
return pools

View File

@ -124,3 +124,29 @@ class TestIsAutoAddressSubnet(base.BaseTestCase):
self.subnet['ipv6_ra_mode'] = subnet.ra_mode
self.assertEqual(subnet.is_auto_address,
ipv6_utils.is_auto_address_subnet(self.subnet))
class TestIsEui64Address(base.BaseTestCase):
def _test_eui_64(self, ips, expected):
for ip in ips:
self.assertEqual(expected, ipv6_utils.is_eui64_address(ip),
"Error on %s" % ip)
def test_valid_eui64_addresses(self):
ips = ('fffe::0cad:12ff:fe44:5566',
ipv6_utils.get_ipv6_addr_by_EUI64('2001:db8::',
'00:16:3e:33:44:55'))
self._test_eui_64(ips, True)
def test_invalid_eui64_addresses(self):
ips = ('192.168.1.1',
'192.168.1.0',
'255.255.255.255',
'0.0.0.0',
'fffe::',
'ff80::1',
'fffe::0cad:12ff:ff44:5566',
'fffe::0cad:12fe:fe44:5566',
'ffff:ffff:ffff:ffff:ffff:ffff:ffff:ffff')
self._test_eui_64(ips, False)

View File

View File

@ -0,0 +1,170 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from neutron import context
from neutron.ipam.drivers.neutrondb_ipam import db_api
from neutron.ipam.drivers.neutrondb_ipam import db_models
from neutron.openstack.common import uuidutils
from neutron.tests.unit import testlib_api
class TestIpamSubnetManager(testlib_api.SqlTestCase):
"""Test case for SubnetManager DB helper class"""
def setUp(self):
super(TestIpamSubnetManager, self).setUp()
self.ctx = context.get_admin_context()
self.neutron_subnet_id = uuidutils.generate_uuid()
self.ipam_subnet_id = uuidutils.generate_uuid()
self.subnet_ip = '1.2.3.4'
self.single_pool = ('1.2.3.4', '1.2.3.10')
self.multi_pool = (('1.2.3.2', '1.2.3.12'), ('1.2.3.15', '1.2.3.24'))
self.subnet_manager = db_api.IpamSubnetManager(self.ipam_subnet_id,
self.neutron_subnet_id)
self.subnet_manager_id = self.subnet_manager.create(self.ctx.session)
self.ctx.session.flush()
def test_create(self):
self.assertEqual(self.ipam_subnet_id, self.subnet_manager_id)
subnets = self.ctx.session.query(db_models.IpamSubnet).filter_by(
id=self.ipam_subnet_id).all()
self.assertEqual(1, len(subnets))
def test_associate_neutron_id(self):
self.subnet_manager.associate_neutron_id(self.ctx.session,
'test-id')
subnet = self.ctx.session.query(db_models.IpamSubnet).filter_by(
id=self.ipam_subnet_id).first()
self.assertEqual('test-id', subnet['neutron_subnet_id'])
def _create_pools(self, pools):
db_pools = []
for pool in pools:
db_pool = self.subnet_manager.create_pool(self.ctx.session,
pool[0],
pool[1])
db_pools.append(db_pool)
return db_pools
def _validate_ips(self, pool, db_pool):
self.assertEqual(pool[0], db_pool.first_ip)
self.assertEqual(pool[1], db_pool.last_ip)
def test_create_pool(self):
db_pools = self._create_pools([self.single_pool])
ipam_pool = self.ctx.session.query(db_models.IpamAllocationPool).\
filter_by(ipam_subnet_id=self.ipam_subnet_id).first()
self._validate_ips(self.single_pool, ipam_pool)
range = self.ctx.session.query(db_models.IpamAvailabilityRange).\
filter_by(allocation_pool_id=db_pools[0].id).first()
self._validate_ips(self.single_pool, range)
def _test_get_first_range(self, locking):
self._create_pools(self.multi_pool)
range = self.subnet_manager.get_first_range(self.ctx.session,
locking=locking)
self._validate_ips(self.multi_pool[0], range)
def test_get_first_range(self):
self._test_get_first_range(False)
def test_get_first_range_locking(self):
self._test_get_first_range(True)
def test_list_ranges_by_subnet_id(self):
self._create_pools(self.multi_pool)
db_ranges = self.subnet_manager.list_ranges_by_subnet_id(
self.ctx.session,
self.ipam_subnet_id).all()
self.assertEqual(2, len(db_ranges))
self.assertEqual(db_models.IpamAvailabilityRange, type(db_ranges[0]))
def test_list_ranges_by_allocation_pool(self):
db_pools = self._create_pools([self.single_pool])
# generate ids for allocation pools on flush
self.ctx.session.flush()
db_ranges = self.subnet_manager.list_ranges_by_allocation_pool(
self.ctx.session,
db_pools[0].id).all()
self.assertEqual(1, len(db_ranges))
self.assertEqual(db_models.IpamAvailabilityRange, type(db_ranges[0]))
self._validate_ips(self.single_pool, db_ranges[0])
def test_create_range(self):
self._create_pools([self.single_pool])
pool = self.ctx.session.query(db_models.IpamAllocationPool).\
filter_by(ipam_subnet_id=self.ipam_subnet_id).first()
self._validate_ips(self.single_pool, pool)
allocation_pool_id = pool.id
# delete the range
db_range = self.subnet_manager.list_ranges_by_allocation_pool(
self.ctx.session,
pool.id).first()
self._validate_ips(self.single_pool, db_range)
self.ctx.session.delete(db_range)
# create a new range
range_start = '1.2.3.5'
range_end = '1.2.3.9'
new_range = self.subnet_manager.create_range(self.ctx.session,
allocation_pool_id,
range_start,
range_end)
self.assertEqual(range_start, new_range.first_ip)
self.assertEqual(range_end, new_range.last_ip)
def test_check_unique_allocation(self):
self.assertTrue(self.subnet_manager.check_unique_allocation(
self.ctx.session, self.subnet_ip))
def test_check_unique_allocation_negative(self):
self.subnet_manager.create_allocation(self.ctx.session,
self.subnet_ip)
self.assertFalse(self.subnet_manager.check_unique_allocation(
self.ctx.session, self.subnet_ip))
def test_list_allocations(self):
ips = ['1.2.3.4', '1.2.3.6', '1.2.3.7']
for ip in ips:
self.subnet_manager.create_allocation(self.ctx.session, ip)
allocs = self.subnet_manager.list_allocations(self.ctx.session).all()
self.assertEqual(len(ips), len(allocs))
for allocation in allocs:
self.assertIn(allocation.ip_address, ips)
def _test_create_allocation(self):
self.subnet_manager.create_allocation(self.ctx.session,
self.subnet_ip)
alloc = self.ctx.session.query(db_models.IpamAllocation).filter_by(
ipam_subnet_id=self.ipam_subnet_id).all()
self.assertEqual(1, len(alloc))
self.assertEqual(self.subnet_ip, alloc[0].ip_address)
return alloc
def test_create_allocation(self):
self._test_create_allocation()
def test_delete_allocation(self):
allocs = self._test_create_allocation()
self.subnet_manager.delete_allocation(self.ctx.session,
allocs[0].ip_address)
allocs = self.ctx.session.query(db_models.IpamAllocation).filter_by(
ipam_subnet_id=self.ipam_subnet_id).all()
self.assertEqual(0, len(allocs))

View File

@ -0,0 +1,442 @@
# Copyright 2015 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import netaddr
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import ipam
from neutron.ipam.drivers.neutrondb_ipam import driver
from neutron.ipam import exceptions as ipam_exc
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_db_plugin
from neutron.tests.unit import testlib_api
def convert_firstip_to_ipaddress(range_item):
return netaddr.IPAddress(range_item['first_ip'])
class TestNeutronDbIpamMixin(object):
def _create_network(self, plugin, ctx, shared=False):
network = {'network': {'name': 'net',
'shared': shared,
'admin_state_up': True,
'tenant_id': self._tenant_id}}
created_network = plugin.create_network(ctx, network)
return (created_network, created_network['id'])
def _create_subnet(self, plugin, ctx, network_id, cidr, ip_version=4,
v6_address_mode=attributes.ATTR_NOT_SPECIFIED,
allocation_pools=attributes.ATTR_NOT_SPECIFIED):
subnet = {'subnet': {'name': 'sub',
'cidr': cidr,
'ip_version': ip_version,
'gateway_ip': attributes.ATTR_NOT_SPECIFIED,
'allocation_pools': allocation_pools,
'enable_dhcp': True,
'dns_nameservers': attributes.ATTR_NOT_SPECIFIED,
'host_routes': attributes.ATTR_NOT_SPECIFIED,
'ipv6_address_mode': v6_address_mode,
'ipv6_ra_mode': attributes.ATTR_NOT_SPECIFIED,
'network_id': network_id,
'tenant_id': self._tenant_id}}
return plugin.create_subnet(ctx, subnet)
class TestNeutronDbIpamPool(testlib_api.SqlTestCase,
TestNeutronDbIpamMixin):
"""Test case for the Neutron's DB IPAM driver subnet pool interface."""
def setUp(self):
super(TestNeutronDbIpamPool, self).setUp()
self._tenant_id = 'test-tenant'
# Configure plugin for tests
self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS)
# Prepare environment for tests
self.plugin = manager.NeutronManager.get_plugin()
self.ctx = context.get_admin_context()
self.network, self.net_id = self._create_network(self.plugin,
self.ctx)
# Allocate IPAM driver
self.ipam_pool = driver.NeutronDbPool(None, self.ctx)
def _verify_ipam_subnet_details(self, ipam_subnet,
cidr=None,
tenant_id=None,
gateway_ip=None,
allocation_pools=None):
ipam_subnet_details = ipam_subnet.get_details()
gateway_ip_address = None
cidr_ip_network = None
if gateway_ip:
gateway_ip_address = netaddr.IPAddress(gateway_ip)
if cidr:
cidr_ip_network = netaddr.IPNetwork(cidr)
self.assertEqual(tenant_id, ipam_subnet_details.tenant_id)
self.assertEqual(gateway_ip_address, ipam_subnet_details.gateway_ip)
self.assertEqual(cidr_ip_network, ipam_subnet_details.subnet_cidr)
self.assertEqual(allocation_pools,
ipam_subnet_details.allocation_pools)
def test_allocate_ipam_subnet_no_neutron_subnet_id(self):
cidr = '10.0.0.0/24'
allocation_pools = [netaddr.IPRange('10.0.0.100', '10.0.0.150'),
netaddr.IPRange('10.0.0.200', '10.0.0.250')]
subnet_req = ipam.SpecificSubnetRequest(
self._tenant_id,
None,
cidr,
allocation_pools=allocation_pools,
gateway_ip='10.0.0.101')
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
self._verify_ipam_subnet_details(ipam_subnet,
cidr,
self._tenant_id,
'10.0.0.101',
allocation_pools)
def _prepare_specific_subnet_request(self, cidr):
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr)
subnet_req = ipam.SpecificSubnetRequest(
self._tenant_id,
subnet['id'],
cidr,
gateway_ip=subnet['gateway_ip'])
return subnet, subnet_req
def test_allocate_ipam_subnet_with_neutron_subnet_id(self):
cidr = '10.0.0.0/24'
subnet, subnet_req = self._prepare_specific_subnet_request(cidr)
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
self._verify_ipam_subnet_details(
ipam_subnet,
cidr, self._tenant_id, subnet['gateway_ip'],
[netaddr.IPRange('10.0.0.2', '10.0.0.254')])
def test_allocate_any_subnet_fails(self):
self.assertRaises(
ipam_exc.InvalidSubnetRequestType,
self.ipam_pool.allocate_subnet,
ipam.AnySubnetRequest(self._tenant_id, 'meh', constants.IPv4, 24))
def test_update_subnet_pools(self):
cidr = '10.0.0.0/24'
subnet, subnet_req = self._prepare_specific_subnet_request(cidr)
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
ipam_subnet.associate_neutron_subnet(subnet['id'])
allocation_pools = [netaddr.IPRange('10.0.0.100', '10.0.0.150'),
netaddr.IPRange('10.0.0.200', '10.0.0.250')]
update_subnet_req = ipam.SpecificSubnetRequest(
self._tenant_id,
subnet['id'],
cidr,
gateway_ip=subnet['gateway_ip'],
allocation_pools=allocation_pools)
ipam_subnet = self.ipam_pool.update_subnet(update_subnet_req)
self._verify_ipam_subnet_details(
ipam_subnet,
cidr, self._tenant_id, subnet['gateway_ip'], allocation_pools)
def test_get_subnet(self):
cidr = '10.0.0.0/24'
subnet, subnet_req = self._prepare_specific_subnet_request(cidr)
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
ipam_subnet.associate_neutron_subnet(subnet['id'])
# Retrieve the subnet
ipam_subnet = self.ipam_pool.get_subnet(subnet['id'])
self._verify_ipam_subnet_details(
ipam_subnet,
cidr, self._tenant_id, subnet['gateway_ip'],
[netaddr.IPRange('10.0.0.2', '10.0.0.254')])
def test_get_non_existing_subnet_fails(self):
self.assertRaises(n_exc.SubnetNotFound,
self.ipam_pool.get_subnet,
'boo')
class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
TestNeutronDbIpamMixin):
"""Test case for Subnet interface for Nuetron's DB IPAM driver.
This test case exercises the reference IPAM driver.
Even if it loads a plugin, the unit tests in this class do not exercise
it at all; they simply perform white box testing on the IPAM driver.
The plugin is exclusively used to create the neutron objects on which
the IPAM driver will operate.
"""
def _create_and_allocate_ipam_subnet(
self, cidr, allocation_pools=attributes.ATTR_NOT_SPECIFIED,
ip_version=4, v6_auto_address=False, tenant_id=None):
v6_address_mode = attributes.ATTR_NOT_SPECIFIED
if v6_auto_address:
# set ip version to 6 regardless of what's been passed to the
# method
ip_version = 6
v6_address_mode = constants.IPV6_SLAAC
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr,
ip_version=ip_version,
allocation_pools=allocation_pools,
v6_address_mode=v6_address_mode)
# Build netaddr.IPRanges from allocation pools since IPAM SubnetRequest
# objects are strongly typed
allocation_pool_ranges = [netaddr.IPRange(
pool['start'], pool['end']) for pool in
subnet['allocation_pools']]
subnet_req = ipam.SpecificSubnetRequest(
tenant_id,
subnet['id'],
cidr,
gateway_ip=subnet['gateway_ip'],
allocation_pools=allocation_pool_ranges)
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
ipam_subnet.associate_neutron_subnet(subnet['id'])
return ipam_subnet, subnet
def setUp(self):
super(TestNeutronDbIpamSubnet, self).setUp()
self._tenant_id = 'test-tenant'
# Configure plugin for tests
self.setup_coreplugin(test_db_plugin.DB_PLUGIN_KLASS)
# Prepare environment for tests
self.plugin = manager.NeutronManager.get_plugin()
self.ctx = context.get_admin_context()
self.network, self.net_id = self._create_network(self.plugin,
self.ctx)
# Allocate IPAM driver
self.ipam_pool = driver.NeutronDbPool(None, self.ctx)
def test__verify_ip_succeeds(self):
cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
ipam_subnet._verify_ip(self.ctx.session, '10.0.0.2')
def test__verify_ip_not_in_subnet_fails(self):
cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
self.assertRaises(ipam_exc.InvalidIpForSubnet,
ipam_subnet._verify_ip,
self.ctx.session,
'192.168.0.2')
def test__verify_ip_bcast_and_network_fail(self):
cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
self.assertRaises(ipam_exc.InvalidIpForSubnet,
ipam_subnet._verify_ip,
self.ctx.session,
'10.0.0.255')
self.assertRaises(ipam_exc.InvalidIpForSubnet,
ipam_subnet._verify_ip,
self.ctx.session,
'10.0.0.0')
def test__allocate_specific_ip(self):
cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(cidr)[0]
with self.ctx.session.begin():
ranges = ipam_subnet._allocate_specific_ip(
self.ctx.session, '10.0.0.33')
self.assertEqual(2, len(ranges))
# 10.0.0.1 should be allocated for gateway ip
ranges.sort(key=convert_firstip_to_ipaddress)
self.assertEqual('10.0.0.2', ranges[0]['first_ip'])
self.assertEqual('10.0.0.32', ranges[0]['last_ip'])
self.assertEqual('10.0.0.34', ranges[1]['first_ip'])
self.assertEqual('10.0.0.254', ranges[1]['last_ip'])
# Limit test - first address in range
ranges = ipam_subnet._allocate_specific_ip(
self.ctx.session, '10.0.0.2')
self.assertEqual(2, len(ranges))
ranges.sort(key=convert_firstip_to_ipaddress)
self.assertEqual('10.0.0.3', ranges[0]['first_ip'])
self.assertEqual('10.0.0.32', ranges[0]['last_ip'])
self.assertEqual('10.0.0.34', ranges[1]['first_ip'])
self.assertEqual('10.0.0.254', ranges[1]['last_ip'])
# Limit test - last address in range
ranges = ipam_subnet._allocate_specific_ip(
self.ctx.session, '10.0.0.254')
self.assertEqual(2, len(ranges))
ranges.sort(key=convert_firstip_to_ipaddress)
self.assertEqual('10.0.0.3', ranges[0]['first_ip'])
self.assertEqual('10.0.0.32', ranges[0]['last_ip'])
self.assertEqual('10.0.0.34', ranges[1]['first_ip'])
self.assertEqual('10.0.0.253', ranges[1]['last_ip'])
def test__allocate_specific_ips_multiple_ranges(self):
cidr = '10.0.0.0/24'
ipam_subnet = self._create_and_allocate_ipam_subnet(
cidr,
allocation_pools=[{'start': '10.0.0.10', 'end': '10.0.0.19'},
{'start': '10.0.0.30', 'end': '10.0.0.39'}])[0]
with self.ctx.session.begin():
ranges = ipam_subnet._allocate_specific_ip(
self.ctx.session, '10.0.0.33')
self.assertEqual(3, len(ranges))
# 10.0.0.1 should be allocated for gateway ip
ranges.sort(key=convert_firstip_to_ipaddress)
self.assertEqual('10.0.0.10', ranges[0]['first_ip'])
self.assertEqual('10.0.0.19', ranges[0]['last_ip'])
self.assertEqual('10.0.0.30', ranges[1]['first_ip'])
self.assertEqual('10.0.0.32', ranges[1]['last_ip'])
self.assertEqual('10.0.0.34', ranges[2]['first_ip'])
self.assertEqual('10.0.0.39', ranges[2]['last_ip'])
def test__allocate_specific_ip_out_of_range(self):
cidr = '10.0.0.0/24'
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr)
subnet_req = ipam.SpecificSubnetRequest(
'tenant_id', subnet, cidr, gateway_ip=subnet['gateway_ip'])
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
with self.ctx.session.begin():
ranges = ipam_subnet._allocate_specific_ip(
self.ctx.session, '192.168.0.1')
# In this case _allocate_specific_ips does not fail, but
# simply does not update availability ranges at all
self.assertEqual(1, len(ranges))
# 10.0.0.1 should be allocated for gateway ip
ranges.sort(key=convert_firstip_to_ipaddress)
self.assertEqual('10.0.0.2', ranges[0]['first_ip'])
self.assertEqual('10.0.0.254', ranges[0]['last_ip'])
def _allocate_address(self, cidr, ip_version, address_request):
ipam_subnet = self._create_and_allocate_ipam_subnet(
cidr, ip_version=ip_version)[0]
return ipam_subnet.allocate(address_request)
def test_allocate_any_v4_address_succeeds(self):
ip_address = self._allocate_address(
'10.0.0.0/24', 4, ipam.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
self.assertEqual('10.0.0.2', ip_address)
def test_allocate_any_v6_address_succeeds(self):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6, ipam.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
self.assertEqual('fde3:abcd:4321:1::2', ip_address)
def test_allocate_specific_v4_address_succeeds(self):
ip_address = self._allocate_address(
'10.0.0.0/24', 4, ipam.SpecificAddressRequest('10.0.0.33'))
self.assertEqual('10.0.0.33', ip_address)
def test_allocate_specific_v6_address_succeeds(self):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6,
ipam.SpecificAddressRequest('fde3:abcd:4321:1::33'))
self.assertEqual('fde3:abcd:4321:1::33', ip_address)
def test_allocate_specific_v4_address_out_of_range_fails(self):
self.assertRaises(ipam_exc.InvalidIpForSubnet,
self._allocate_address,
'10.0.0.0/24', 4,
ipam.SpecificAddressRequest('192.168.0.1'))
def test_allocate_specific_v6_address_out_of_range_fails(self):
self.assertRaises(ipam_exc.InvalidIpForSubnet,
self._allocate_address,
'fde3:abcd:4321:1::/64', 6,
ipam.SpecificAddressRequest(
'fde3:abcd:eeee:1::33'))
def test_allocate_specific_address_in_use_fails(self):
ipam_subnet = self._create_and_allocate_ipam_subnet(
'fde3:abcd:4321:1::/64', ip_version=6)[0]
addr_req = ipam.SpecificAddressRequest('fde3:abcd:4321:1::33')
ipam_subnet.allocate(addr_req)
self.assertRaises(ipam_exc.IpAddressAlreadyAllocated,
ipam_subnet.allocate,
addr_req)
def test_allocate_any_address_exhausted_pools_fails(self):
# Same as above, the ranges will be recalculated always
ipam_subnet = self._create_and_allocate_ipam_subnet(
'192.168.0.0/30', ip_version=4)[0]
ipam_subnet.allocate(ipam.AnyAddressRequest)
# The second address generation request on a /30 for v4 net must fail
self.assertRaises(ipam_exc.IpAddressGenerationFailure,
ipam_subnet.allocate,
ipam.AnyAddressRequest)
def _test_deallocate_address(self, cidr, ip_version):
ipam_subnet = self._create_and_allocate_ipam_subnet(
cidr, ip_version=ip_version)[0]
ip_address = ipam_subnet.allocate(ipam.AnyAddressRequest)
ipam_subnet.deallocate(ip_address)
def test_deallocate_v4_address(self):
self._test_deallocate_address('10.0.0.0/24', 4)
def test_deallocate_v6_address(self):
# This test does not really exercise any different code path wrt
# test_deallocate_v4_address. It is provided for completeness and for
# future proofing in case v6-specific logic will be added.
self._test_deallocate_address('fde3:abcd:4321:1::/64', 6)
def test_allocate_unallocated_address_fails(self):
ipam_subnet = self._create_and_allocate_ipam_subnet(
'10.0.0.0/24', ip_version=4)[0]
self.assertRaises(ipam_exc.IpAddressAllocationNotFound,
ipam_subnet.deallocate, '10.0.0.2')
def test_allocate_all_pool_addresses_triggers_range_recalculation(self):
# This test instead might be made to pass, but for the wrong reasons!
pass
def _test_allocate_subnet(self, subnet_id):
subnet_req = ipam.SpecificSubnetRequest(
'tenant_id', subnet_id, '192.168.0.0/24')
return self.ipam_pool.allocate_subnet(subnet_req)
def test_allocate_subnet_for_non_existent_subnet_pass(self):
# This test should pass because neutron subnet is not checked
# until associate neutron subnet step
subnet_req = ipam.SpecificSubnetRequest(
'tenant_id', 'meh', '192.168.0.0/24')
self.ipam_pool.allocate_subnet(subnet_req)
def test_associate_neutron_subnet(self):
ipam_subnet, subnet = self._create_and_allocate_ipam_subnet(
'192.168.0.0/24', ip_version=4)
details = ipam_subnet.get_details()
self.assertEqual(subnet['id'], details.subnet_id)
def test_associate_non_existing_neutron_subnet_fails(self):
subnet_req = ipam.SpecificSubnetRequest(
'tenant_id', 'meh', '192.168.0.0/24')
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
self.assertRaises(n_exc.SubnetNotFound,
ipam_subnet.associate_neutron_subnet,
'meh')

View File

@ -63,14 +63,14 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
prefix_list, 21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4, 21)
res = sa.allocate_subnet(self.ctx.session, req)
res = sa.allocate_subnet(req)
detail = res.get_details()
prefix_set = netaddr.IPSet(iterable=prefix_list)
allocated_set = netaddr.IPSet(iterable=[detail.subnet.cidr])
allocated_set = netaddr.IPSet(iterable=[detail.subnet_cidr])
self.assertTrue(allocated_set.issubset(prefix_set))
self.assertEqual(detail.prefixlen, 21)
@ -80,14 +80,14 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
21, 4)
with self.ctx.session.begin(subtransactions=True):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.2.0/24')
res = sa.allocate_subnet(self.ctx.session, req)
res = sa.allocate_subnet(req)
detail = res.get_details()
sp = self._get_subnetpool(self.ctx, self.plugin, sp['id'])
self.assertEqual(str(detail.subnet.cidr), '10.1.2.0/24')
self.assertEqual(str(detail.subnet_cidr), '10.1.2.0/24')
self.assertEqual(detail.prefixlen, 24)
def test_insufficient_prefix_space_for_any_allocation(self):
@ -95,25 +95,25 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['10.1.1.0/24', '192.168.1.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4,
21)
self.assertRaises(n_exc.SubnetAllocationError,
sa.allocate_subnet, self.ctx.session, req)
sa.allocate_subnet, req)
def test_insufficient_prefix_space_for_specific_allocation(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/24'],
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.0.0/21')
self.assertRaises(n_exc.SubnetAllocationError,
sa.allocate_subnet, self.ctx.session, req)
sa.allocate_subnet, req)
def test_allocate_any_subnet_gateway(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
@ -121,13 +121,14 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4, 21)
res = sa.allocate_subnet(self.ctx.session, req)
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip, detail.subnet.network + 1)
self.assertEqual(detail.gateway_ip,
detail.subnet_cidr.network + 1)
def test_allocate_specific_subnet_specific_gateway(self):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
@ -135,12 +136,12 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.2.0/24',
gateway_ip='10.1.2.254')
res = sa.allocate_subnet(self.ctx.session, req)
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip,
netaddr.IPAddress('10.1.2.254'))
@ -149,8 +150,8 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self._create_subnet_pool(self.plugin, self.ctx, 'test-sp',
['10.1.0.0/16', '192.168.1.0/24'],
21, 4)
sa = subnet_alloc.SubnetAllocator(sp)
value = sa._allocations_used_by_tenant(self.ctx.session, 32)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
value = sa._allocations_used_by_tenant(32)
self.assertEqual(value, 0)
def test_subnetpool_default_quota_exceeded(self):
@ -158,11 +159,10 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
['fe80::/48'],
48, 6, default_quota=1)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp)
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'fe80::/63')
self.assertRaises(n_exc.SubnetPoolQuotaExceeded,
sa.allocate_subnet,
self.ctx.session,
req)

View File

@ -13,7 +13,9 @@
import netaddr
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron import ipam
from neutron.ipam import exceptions as ipam_exc
from neutron.openstack.common import uuidutils
from neutron.tests import base
@ -161,7 +163,7 @@ class TestIpamSpecificSubnetRequest(IpamSubnetRequestTestCase):
gateway_ip='1.2.3.1')
self.assertEqual(24, request.prefixlen)
self.assertEqual(netaddr.IPAddress('1.2.3.1'), request.gateway_ip)
self.assertEqual(netaddr.IPNetwork('1.2.3.0/24'), request.subnet)
self.assertEqual(netaddr.IPNetwork('1.2.3.0/24'), request.subnet_cidr)
def test_subnet_request_bad_gateway(self):
self.assertRaises(ValueError,
@ -176,6 +178,12 @@ class TestAddressRequest(base.BaseTestCase):
# This class doesn't test much. At least running through all of the
# constructors may shake out some trivial bugs.
EUI64 = ipam.AutomaticAddressRequest.EUI64
def setUp(self):
super(TestAddressRequest, self).setUp()
def test_specific_address_ipv6(self):
request = ipam.SpecificAddressRequest('2000::45')
self.assertEqual(netaddr.IPAddress('2000::45'), request.address)
@ -186,3 +194,33 @@ class TestAddressRequest(base.BaseTestCase):
def test_any_address(self):
ipam.AnyAddressRequest()
def test_automatic_address_request_eui64(self):
subnet_cidr = '2607:f0d0:1002:51::/64'
port_mac = 'aa:bb:cc:dd:ee:ff'
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
request = ipam.AutomaticAddressRequest(
address_type=self.EUI64,
prefix=subnet_cidr,
mac=port_mac)
self.assertEqual(request.address, netaddr.IPAddress(eui_addr))
def test_automatic_address_request_invalid_address_type_raises(self):
self.assertRaises(ipam_exc.InvalidAddressType,
ipam.AutomaticAddressRequest,
address_type='kaboom')
def test_automatic_address_request_eui64_no_mac_raises(self):
self.assertRaises(ipam_exc.AddressCalculationFailure,
ipam.AutomaticAddressRequest,
address_type=self.EUI64,
prefix='meh')
def test_automatic_address_request_eui64_alien_param_raises(self):
self.assertRaises(ipam_exc.AddressCalculationFailure,
ipam.AutomaticAddressRequest,
address_type=self.EUI64,
mac='meh',
alien='et',
prefix='meh')