Make IPAM more pythonic

__init__.py is usually an empty file, or contains platform specific
code to make the package function. One more use is to have __init__.py
contain import statements, to import functions and classes, for convenience.

http://docs.python-guide.org/en/latest/writing/structure/#packages
http://mikegrouchy.com/blog/2012/05/be-pythonic-__init__py.html

Change-Id: I7408ac95f4970fbd7009257dfb698102ffabb6e3
This commit is contained in:
Sean M. Collins 2015-06-26 11:14:41 -06:00 committed by Carl Baldwin
parent b522896c31
commit 310e1e0553
9 changed files with 381 additions and 379 deletions

View File

@ -27,7 +27,7 @@ from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.db import ipam_backend_mixin
from neutron.db import models_v2
from neutron import ipam
from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.ipam import utils as ipam_utils
@ -486,9 +486,10 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
subnet['allocation_pools'],
subnet['cidr'],
subnet['gateway_ip'])
subnet_request = ipam.SubnetRequestFactory.get_request(context,
subnet,
subnetpool)
subnet_request = ipam_req.SubnetRequestFactory.get_request(context,
subnet,
subnetpool)
if subnetpool_id:
driver = subnet_alloc.SubnetAllocator(subnetpool, context)

View File

@ -1,289 +0,0 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import netaddr
from oslo_config import cfg
from oslo_utils import uuidutils
import six
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.ipam import exceptions as ipam_exc
@six.add_metaclass(abc.ABCMeta)
class SubnetPool(object):
"""Represents a pool of IPs available inside an address scope."""
@six.add_metaclass(abc.ABCMeta)
class SubnetRequest(object):
"""Carries the data needed to make a subnet request
The data validated and carried by an instance of this class is the data
that is common to any type of request. This class shouldn't be
instantiated on its own. Rather, a subclass of this class should be used.
"""
def __init__(self, tenant_id, subnet_id,
gateway_ip=None, allocation_pools=None):
"""Initialize and validate
:param tenant_id: The tenant id who will own the subnet
:type tenant_id: str uuid
:param subnet_id: Neutron's subnet ID
:type subnet_id: str uuid
:param gateway_ip: An IP to reserve for the subnet gateway.
:type gateway_ip: None or convertible to netaddr.IPAddress
:param allocation_pools: The pool from which IPAM should allocate
addresses. The allocator *may* allow allocating addresses outside
of this range if specifically requested.
:type allocation_pools: A list of netaddr.IPRange. None if not
specified.
"""
self._tenant_id = tenant_id
self._subnet_id = subnet_id
self._gateway_ip = None
self._allocation_pools = None
if gateway_ip is not None:
self._gateway_ip = netaddr.IPAddress(gateway_ip)
if allocation_pools is not None:
allocation_pools = sorted(allocation_pools)
previous = None
for pool in allocation_pools:
if not isinstance(pool, netaddr.ip.IPRange):
raise TypeError("Ranges must be netaddr.IPRange")
if previous and pool.first <= previous.last:
raise ValueError("Ranges must not overlap")
previous = pool
if 1 < len(allocation_pools):
# Checks that all the ranges are in the same IP version.
# IPRange sorts first by ip version so we can get by with just
# checking the first and the last range having sorted them
# above.
first_version = allocation_pools[0].version
last_version = allocation_pools[-1].version
if first_version != last_version:
raise ValueError("Ranges must be in the same IP version")
self._allocation_pools = allocation_pools
if self.gateway_ip and self.allocation_pools:
if self.gateway_ip.version != self.allocation_pools[0].version:
raise ValueError("Gateway IP version inconsistent with "
"allocation pool version")
@property
def tenant_id(self):
return self._tenant_id
@property
def subnet_id(self):
return self._subnet_id
@property
def gateway_ip(self):
return self._gateway_ip
@property
def allocation_pools(self):
return self._allocation_pools
def _validate_with_subnet(self, subnet_cidr):
if self.gateway_ip and cfg.CONF.force_gateway_on_subnet:
gw_ip = netaddr.IPAddress(self.gateway_ip)
if (gw_ip.version == 4 or (gw_ip.version == 6
and not gw_ip.is_link_local())):
if self.gateway_ip not in subnet_cidr:
raise ValueError("gateway_ip is not in the subnet")
if self.allocation_pools:
if subnet_cidr.version != self.allocation_pools[0].version:
raise ValueError("allocation_pools use the wrong ip version")
for pool in self.allocation_pools:
if pool not in subnet_cidr:
raise ValueError("allocation_pools are not in the subnet")
class AnySubnetRequest(SubnetRequest):
"""A template for allocating an unspecified subnet from IPAM
A driver may not implement this type of request. For example, The initial
reference implementation will not support this. The API has no way of
creating a subnet without a specific address until subnet-allocation is
implemented.
"""
WILDCARDS = {constants.IPv4: '0.0.0.0',
constants.IPv6: '::'}
def __init__(self, tenant_id, subnet_id, version, prefixlen,
gateway_ip=None, allocation_pools=None):
"""
:param version: Either constants.IPv4 or constants.IPv6
:param prefixlen: The prefix len requested. Must be within the min and
max allowed.
:type prefixlen: int
"""
super(AnySubnetRequest, self).__init__(
tenant_id=tenant_id,
subnet_id=subnet_id,
gateway_ip=gateway_ip,
allocation_pools=allocation_pools)
net = netaddr.IPNetwork(self.WILDCARDS[version] + '/' + str(prefixlen))
self._validate_with_subnet(net)
self._prefixlen = prefixlen
@property
def prefixlen(self):
return self._prefixlen
class SpecificSubnetRequest(SubnetRequest):
"""A template for allocating a specified subnet from IPAM
The initial reference implementation will probably just allow any
allocation, even overlapping ones. This can be expanded on by future
blueprints.
"""
def __init__(self, tenant_id, subnet_id, subnet_cidr,
gateway_ip=None, allocation_pools=None):
"""
:param subnet: The subnet requested. Can be IPv4 or IPv6. However,
when IPAM tries to fulfill this request, the IP version must match
the version of the address scope being used.
:type subnet: netaddr.IPNetwork or convertible to one
"""
super(SpecificSubnetRequest, self).__init__(
tenant_id=tenant_id,
subnet_id=subnet_id,
gateway_ip=gateway_ip,
allocation_pools=allocation_pools)
self._subnet_cidr = netaddr.IPNetwork(subnet_cidr)
self._validate_with_subnet(self._subnet_cidr)
@property
def subnet_cidr(self):
return self._subnet_cidr
@property
def prefixlen(self):
return self._subnet_cidr.prefixlen
@six.add_metaclass(abc.ABCMeta)
class AddressRequest(object):
"""Abstract base class for address requests"""
class SpecificAddressRequest(AddressRequest):
"""For requesting a specified address from IPAM"""
def __init__(self, address):
"""
:param address: The address being requested
:type address: A netaddr.IPAddress or convertible to one.
"""
super(SpecificAddressRequest, self).__init__()
self._address = netaddr.IPAddress(address)
@property
def address(self):
return self._address
class AnyAddressRequest(AddressRequest):
"""Used to request any available address from the pool."""
class AutomaticAddressRequest(SpecificAddressRequest):
"""Used to create auto generated addresses, such as EUI64"""
EUI64 = 'eui64'
def _generate_eui64_address(self, **kwargs):
if set(kwargs) != set(['prefix', 'mac']):
raise ipam_exc.AddressCalculationFailure(
address_type='eui-64',
reason='must provide exactly 2 arguments - cidr and MAC')
prefix = kwargs['prefix']
mac_address = kwargs['mac']
return ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac_address)
_address_generators = {EUI64: _generate_eui64_address}
def __init__(self, address_type=EUI64, **kwargs):
"""
This constructor builds an automatic IP address. Parameter needed for
generating it can be passed as optional keyword arguments.
:param address_type: the type of address to generate.
It could be a eui-64 address, a random IPv6 address, or
a ipv4 link-local address.
For the Kilo release only eui-64 addresses will be supported.
"""
address_generator = self._address_generators.get(address_type)
if not address_generator:
raise ipam_exc.InvalidAddressType(address_type=address_type)
address = address_generator(self, **kwargs)
super(AutomaticAddressRequest, self).__init__(address)
class RouterGatewayAddressRequest(AddressRequest):
"""Used to request allocating the special router gateway address."""
class AddressRequestFactory(object):
"""Builds request using ip info
Additional parameters(port and context) are not used in default
implementation, but planned to be used in sub-classes
provided by specific ipam driver,
"""
@classmethod
def get_request(cls, context, port, ip):
if not ip:
return AnyAddressRequest()
else:
return SpecificAddressRequest(ip)
class SubnetRequestFactory(object):
"""Builds request using subnet info"""
@classmethod
def get_request(cls, context, subnet, subnetpool):
cidr = subnet.get('cidr')
subnet_id = subnet.get('id', uuidutils.generate_uuid())
is_any_subnetpool_request = not attributes.is_attr_set(cidr)
if is_any_subnetpool_request:
prefixlen = subnet['prefixlen']
if not attributes.is_attr_set(prefixlen):
prefixlen = int(subnetpool['default_prefixlen'])
return AnySubnetRequest(
subnet['tenant_id'],
subnet_id,
common_utils.ip_version_from_int(subnetpool['ip_version']),
prefixlen)
else:
return SpecificSubnetRequest(subnet['tenant_id'],
subnet_id,
cidr,
subnet.get('gateway_ip'),
subnet.get('allocation_pools'))

View File

@ -16,7 +16,7 @@ from oslo_config import cfg
from oslo_log import log
import six
from neutron import ipam
from neutron.ipam import requests as ipam_req
from neutron import manager
LOG = log.getLogger(__name__)
@ -101,14 +101,14 @@ class Pool(object):
Can be overridden on driver level to return custom factory
"""
return ipam.SubnetRequestFactory
return ipam_req.SubnetRequestFactory
def get_address_request_factory(self):
"""Returns default AddressRequestFactory
Can be overridden on driver level to return custom factory
"""
return ipam.AddressRequestFactory
return ipam_req.AddressRequestFactory
@six.add_metaclass(abc.ABCMeta)

View File

@ -21,10 +21,10 @@ from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.db import api as db_api
from neutron.i18n import _LE
from neutron import ipam
from neutron.ipam import driver as ipam_base
from neutron.ipam.drivers.neutrondb_ipam import db_api as ipam_db_api
from neutron.ipam import exceptions as ipam_exc
from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron.ipam import utils as ipam_utils
from neutron import manager
@ -319,7 +319,7 @@ class NeutronDbSubnet(ipam_base.Subnet):
# NOTE(salv-orlando): It would probably better to have a simpler
# model for address requests and just check whether there is a
# specific IP address specified in address_request
if isinstance(address_request, ipam.SpecificAddressRequest):
if isinstance(address_request, ipam_req.SpecificAddressRequest):
# This handles both specific and automatic address requests
# Check availability of requested IP
ip_address = str(address_request.address)
@ -359,7 +359,7 @@ class NeutronDbSubnet(ipam_base.Subnet):
def get_details(self):
"""Return subnet data as a SpecificSubnetRequest"""
return ipam.SpecificSubnetRequest(
return ipam_req.SpecificSubnetRequest(
self._tenant_id, self.subnet_manager.neutron_id,
self._cidr, self._gateway_ip, self._pools)
@ -404,7 +404,7 @@ class NeutronDbPool(subnet_alloc.SubnetAllocator):
subnet_request = subnet.get_details()
# SubnetRequest must be an instance of SpecificSubnet
if not isinstance(subnet_request, ipam.SpecificSubnetRequest):
if not isinstance(subnet_request, ipam_req.SpecificSubnetRequest):
raise ipam_exc.InvalidSubnetRequestType(
subnet_type=type(subnet_request))
return NeutronDbSubnet.create_from_subnet_request(subnet_request,

289
neutron/ipam/requests.py Normal file
View File

@ -0,0 +1,289 @@
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import abc
import netaddr
from oslo_config import cfg
from oslo_utils import uuidutils
import six
from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron.common import utils as common_utils
from neutron.ipam import exceptions as ipam_exc
@six.add_metaclass(abc.ABCMeta)
class SubnetPool(object):
"""Represents a pool of IPs available inside an address scope."""
@six.add_metaclass(abc.ABCMeta)
class SubnetRequest(object):
"""Carries the data needed to make a subnet request
The data validated and carried by an instance of this class is the data
that is common to any type of request. This class shouldn't be
instantiated on its own. Rather, a subclass of this class should be used.
"""
def __init__(self, tenant_id, subnet_id,
gateway_ip=None, allocation_pools=None):
"""Initialize and validate
:param tenant_id: The tenant id who will own the subnet
:type tenant_id: str uuid
:param subnet_id: Neutron's subnet ID
:type subnet_id: str uuid
:param gateway_ip: An IP to reserve for the subnet gateway.
:type gateway_ip: None or convertible to netaddr.IPAddress
:param allocation_pools: The pool from which IPAM should allocate
addresses. The allocator *may* allow allocating addresses outside
of this range if specifically requested.
:type allocation_pools: A list of netaddr.IPRange. None if not
specified.
"""
self._tenant_id = tenant_id
self._subnet_id = subnet_id
self._gateway_ip = None
self._allocation_pools = None
if gateway_ip is not None:
self._gateway_ip = netaddr.IPAddress(gateway_ip)
if allocation_pools is not None:
allocation_pools = sorted(allocation_pools)
previous = None
for pool in allocation_pools:
if not isinstance(pool, netaddr.ip.IPRange):
raise TypeError("Ranges must be netaddr.IPRange")
if previous and pool.first <= previous.last:
raise ValueError("Ranges must not overlap")
previous = pool
if 1 < len(allocation_pools):
# Checks that all the ranges are in the same IP version.
# IPRange sorts first by ip version so we can get by with just
# checking the first and the last range having sorted them
# above.
first_version = allocation_pools[0].version
last_version = allocation_pools[-1].version
if first_version != last_version:
raise ValueError("Ranges must be in the same IP version")
self._allocation_pools = allocation_pools
if self.gateway_ip and self.allocation_pools:
if self.gateway_ip.version != self.allocation_pools[0].version:
raise ValueError("Gateway IP version inconsistent with "
"allocation pool version")
@property
def tenant_id(self):
return self._tenant_id
@property
def subnet_id(self):
return self._subnet_id
@property
def gateway_ip(self):
return self._gateway_ip
@property
def allocation_pools(self):
return self._allocation_pools
def _validate_with_subnet(self, subnet_cidr):
if self.gateway_ip and cfg.CONF.force_gateway_on_subnet:
gw_ip = netaddr.IPAddress(self.gateway_ip)
if (gw_ip.version == 4 or (gw_ip.version == 6
and not gw_ip.is_link_local())):
if self.gateway_ip not in subnet_cidr:
raise ValueError("gateway_ip is not in the subnet")
if self.allocation_pools:
if subnet_cidr.version != self.allocation_pools[0].version:
raise ValueError("allocation_pools use the wrong ip version")
for pool in self.allocation_pools:
if pool not in subnet_cidr:
raise ValueError("allocation_pools are not in the subnet")
class AnySubnetRequest(SubnetRequest):
"""A template for allocating an unspecified subnet from IPAM
A driver may not implement this type of request. For example, The initial
reference implementation will not support this. The API has no way of
creating a subnet without a specific address until subnet-allocation is
implemented.
"""
WILDCARDS = {constants.IPv4: '0.0.0.0',
constants.IPv6: '::'}
def __init__(self, tenant_id, subnet_id, version, prefixlen,
gateway_ip=None, allocation_pools=None):
"""
:param version: Either constants.IPv4 or constants.IPv6
:param prefixlen: The prefix len requested. Must be within the min and
max allowed.
:type prefixlen: int
"""
super(AnySubnetRequest, self).__init__(
tenant_id=tenant_id,
subnet_id=subnet_id,
gateway_ip=gateway_ip,
allocation_pools=allocation_pools)
net = netaddr.IPNetwork(self.WILDCARDS[version] + '/' + str(prefixlen))
self._validate_with_subnet(net)
self._prefixlen = prefixlen
@property
def prefixlen(self):
return self._prefixlen
class SpecificSubnetRequest(SubnetRequest):
"""A template for allocating a specified subnet from IPAM
The initial reference implementation will probably just allow any
allocation, even overlapping ones. This can be expanded on by future
blueprints.
"""
def __init__(self, tenant_id, subnet_id, subnet_cidr,
gateway_ip=None, allocation_pools=None):
"""
:param subnet: The subnet requested. Can be IPv4 or IPv6. However,
when IPAM tries to fulfill this request, the IP version must match
the version of the address scope being used.
:type subnet: netaddr.IPNetwork or convertible to one
"""
super(SpecificSubnetRequest, self).__init__(
tenant_id=tenant_id,
subnet_id=subnet_id,
gateway_ip=gateway_ip,
allocation_pools=allocation_pools)
self._subnet_cidr = netaddr.IPNetwork(subnet_cidr)
self._validate_with_subnet(self._subnet_cidr)
@property
def subnet_cidr(self):
return self._subnet_cidr
@property
def prefixlen(self):
return self._subnet_cidr.prefixlen
@six.add_metaclass(abc.ABCMeta)
class AddressRequest(object):
"""Abstract base class for address requests"""
class SpecificAddressRequest(AddressRequest):
"""For requesting a specified address from IPAM"""
def __init__(self, address):
"""
:param address: The address being requested
:type address: A netaddr.IPAddress or convertible to one.
"""
super(SpecificAddressRequest, self).__init__()
self._address = netaddr.IPAddress(address)
@property
def address(self):
return self._address
class AnyAddressRequest(AddressRequest):
"""Used to request any available address from the pool."""
class AutomaticAddressRequest(SpecificAddressRequest):
"""Used to create auto generated addresses, such as EUI64"""
EUI64 = 'eui64'
def _generate_eui64_address(self, **kwargs):
if set(kwargs) != set(['prefix', 'mac']):
raise ipam_exc.AddressCalculationFailure(
address_type='eui-64',
reason='must provide exactly 2 arguments - cidr and MAC')
prefix = kwargs['prefix']
mac_address = kwargs['mac']
return ipv6_utils.get_ipv6_addr_by_EUI64(prefix, mac_address)
_address_generators = {EUI64: _generate_eui64_address}
def __init__(self, address_type=EUI64, **kwargs):
"""
This constructor builds an automatic IP address. Parameter needed for
generating it can be passed as optional keyword arguments.
:param address_type: the type of address to generate.
It could be a eui-64 address, a random IPv6 address, or
a ipv4 link-local address.
For the Kilo release only eui-64 addresses will be supported.
"""
address_generator = self._address_generators.get(address_type)
if not address_generator:
raise ipam_exc.InvalidAddressType(address_type=address_type)
address = address_generator(self, **kwargs)
super(AutomaticAddressRequest, self).__init__(address)
class RouterGatewayAddressRequest(AddressRequest):
"""Used to request allocating the special router gateway address."""
class AddressRequestFactory(object):
"""Builds request using ip info
Additional parameters(port and context) are not used in default
implementation, but planned to be used in sub-classes
provided by specific ipam driver,
"""
@classmethod
def get_request(cls, context, port, ip):
if not ip:
return AnyAddressRequest()
else:
return SpecificAddressRequest(ip)
class SubnetRequestFactory(object):
"""Builds request using subnet info"""
@classmethod
def get_request(cls, context, subnet, subnetpool):
cidr = subnet.get('cidr')
subnet_id = subnet.get('id', uuidutils.generate_uuid())
is_any_subnetpool_request = not attributes.is_attr_set(cidr)
if is_any_subnetpool_request:
prefixlen = subnet['prefixlen']
if not attributes.is_attr_set(prefixlen):
prefixlen = int(subnetpool['default_prefixlen'])
return AnySubnetRequest(
subnet['tenant_id'],
subnet_id,
common_utils.ip_version_from_int(subnetpool['ip_version']),
prefixlen)
else:
return SpecificSubnetRequest(subnet['tenant_id'],
subnet_id,
cidr,
subnet.get('gateway_ip'),
subnet.get('allocation_pools'))

View File

@ -23,8 +23,8 @@ from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.db import models_v2
import neutron.ipam as ipam
from neutron.ipam import driver
from neutron.ipam import requests as ipam_req
from neutron.ipam import utils as ipam_utils
@ -151,9 +151,9 @@ class SubnetAllocator(driver.Pool):
prefixlen=request.prefixlen,
min_prefixlen=min_prefixlen)
if isinstance(request, ipam.AnySubnetRequest):
if isinstance(request, ipam_req.AnySubnetRequest):
return self._allocate_any_subnet(request)
elif isinstance(request, ipam.SpecificSubnetRequest):
elif isinstance(request, ipam_req.SpecificSubnetRequest):
return self._allocate_specific_subnet(request)
else:
msg = _("Unsupported request type")
@ -177,7 +177,7 @@ class IpamSubnet(driver.Subnet):
cidr,
gateway_ip=None,
allocation_pools=None):
self._req = ipam.SpecificSubnetRequest(
self._req = ipam_req.SpecificSubnetRequest(
tenant_id,
subnet_id,
cidr,

View File

@ -19,9 +19,9 @@ from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
from neutron import ipam
from neutron.ipam.drivers.neutrondb_ipam import driver
from neutron.ipam import exceptions as ipam_exc
from neutron.ipam import requests as ipam_req
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2 as test_db_plugin
@ -102,7 +102,7 @@ class TestNeutronDbIpamPool(testlib_api.SqlTestCase,
cidr = '10.0.0.0/24'
allocation_pools = [netaddr.IPRange('10.0.0.100', '10.0.0.150'),
netaddr.IPRange('10.0.0.200', '10.0.0.250')]
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
self._tenant_id,
None,
cidr,
@ -118,7 +118,7 @@ class TestNeutronDbIpamPool(testlib_api.SqlTestCase,
def _prepare_specific_subnet_request(self, cidr):
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr)
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
self._tenant_id,
subnet['id'],
cidr,
@ -138,7 +138,8 @@ class TestNeutronDbIpamPool(testlib_api.SqlTestCase,
self.assertRaises(
ipam_exc.InvalidSubnetRequestType,
self.ipam_pool.allocate_subnet,
ipam.AnySubnetRequest(self._tenant_id, 'meh', constants.IPv4, 24))
ipam_req.AnySubnetRequest(self._tenant_id, 'meh',
constants.IPv4, 24))
def test_update_subnet_pools(self):
cidr = '10.0.0.0/24'
@ -147,7 +148,7 @@ class TestNeutronDbIpamPool(testlib_api.SqlTestCase,
ipam_subnet.associate_neutron_subnet(subnet['id'])
allocation_pools = [netaddr.IPRange('10.0.0.100', '10.0.0.150'),
netaddr.IPRange('10.0.0.200', '10.0.0.250')]
update_subnet_req = ipam.SpecificSubnetRequest(
update_subnet_req = ipam_req.SpecificSubnetRequest(
self._tenant_id,
subnet['id'],
cidr,
@ -206,7 +207,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
allocation_pool_ranges = [netaddr.IPRange(
pool['start'], pool['end']) for pool in
subnet['allocation_pools']]
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
tenant_id,
subnet['id'],
cidr,
@ -312,7 +313,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
cidr = '10.0.0.0/24'
subnet = self._create_subnet(
self.plugin, self.ctx, self.net_id, cidr)
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', subnet, cidr, gateway_ip=subnet['gateway_ip'])
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
with self.ctx.session.begin():
@ -333,7 +334,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
def test_allocate_any_v4_address_succeeds(self):
ip_address = self._allocate_address(
'10.0.0.0/24', 4, ipam.AnyAddressRequest)
'10.0.0.0/24', 4, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
@ -341,7 +342,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
def test_allocate_any_v6_address_succeeds(self):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6, ipam.AnyAddressRequest)
'fde3:abcd:4321:1::/64', 6, ipam_req.AnyAddressRequest)
# As the DB IPAM driver allocation logic is strictly sequential, we can
# expect this test to allocate the .2 address as .1 is used by default
# as subnet gateway
@ -349,32 +350,32 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
def test_allocate_specific_v4_address_succeeds(self):
ip_address = self._allocate_address(
'10.0.0.0/24', 4, ipam.SpecificAddressRequest('10.0.0.33'))
'10.0.0.0/24', 4, ipam_req.SpecificAddressRequest('10.0.0.33'))
self.assertEqual('10.0.0.33', ip_address)
def test_allocate_specific_v6_address_succeeds(self):
ip_address = self._allocate_address(
'fde3:abcd:4321:1::/64', 6,
ipam.SpecificAddressRequest('fde3:abcd:4321:1::33'))
ipam_req.SpecificAddressRequest('fde3:abcd:4321:1::33'))
self.assertEqual('fde3:abcd:4321:1::33', ip_address)
def test_allocate_specific_v4_address_out_of_range_fails(self):
self.assertRaises(ipam_exc.InvalidIpForSubnet,
self._allocate_address,
'10.0.0.0/24', 4,
ipam.SpecificAddressRequest('192.168.0.1'))
ipam_req.SpecificAddressRequest('192.168.0.1'))
def test_allocate_specific_v6_address_out_of_range_fails(self):
self.assertRaises(ipam_exc.InvalidIpForSubnet,
self._allocate_address,
'fde3:abcd:4321:1::/64', 6,
ipam.SpecificAddressRequest(
ipam_req.SpecificAddressRequest(
'fde3:abcd:eeee:1::33'))
def test_allocate_specific_address_in_use_fails(self):
ipam_subnet = self._create_and_allocate_ipam_subnet(
'fde3:abcd:4321:1::/64', ip_version=6)[0]
addr_req = ipam.SpecificAddressRequest('fde3:abcd:4321:1::33')
addr_req = ipam_req.SpecificAddressRequest('fde3:abcd:4321:1::33')
ipam_subnet.allocate(addr_req)
self.assertRaises(ipam_exc.IpAddressAlreadyAllocated,
ipam_subnet.allocate,
@ -384,16 +385,16 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
# Same as above, the ranges will be recalculated always
ipam_subnet = self._create_and_allocate_ipam_subnet(
'192.168.0.0/30', ip_version=4)[0]
ipam_subnet.allocate(ipam.AnyAddressRequest)
ipam_subnet.allocate(ipam_req.AnyAddressRequest)
# The second address generation request on a /30 for v4 net must fail
self.assertRaises(ipam_exc.IpAddressGenerationFailure,
ipam_subnet.allocate,
ipam.AnyAddressRequest)
ipam_req.AnyAddressRequest)
def _test_deallocate_address(self, cidr, ip_version):
ipam_subnet = self._create_and_allocate_ipam_subnet(
cidr, ip_version=ip_version)[0]
ip_address = ipam_subnet.allocate(ipam.AnyAddressRequest)
ip_address = ipam_subnet.allocate(ipam_req.AnyAddressRequest)
ipam_subnet.deallocate(ip_address)
def test_deallocate_v4_address(self):
@ -416,14 +417,14 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
pass
def _test_allocate_subnet(self, subnet_id):
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', subnet_id, '192.168.0.0/24')
return self.ipam_pool.allocate_subnet(subnet_req)
def test_allocate_subnet_for_non_existent_subnet_pass(self):
# This test should pass because neutron subnet is not checked
# until associate neutron subnet step
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', 'meh', '192.168.0.0/24')
self.ipam_pool.allocate_subnet(subnet_req)
@ -434,7 +435,7 @@ class TestNeutronDbIpamSubnet(testlib_api.SqlTestCase,
self.assertEqual(subnet['id'], details.subnet_id)
def test_associate_non_existing_neutron_subnet_fails(self):
subnet_req = ipam.SpecificSubnetRequest(
subnet_req = ipam_req.SpecificSubnetRequest(
'tenant_id', 'meh', '192.168.0.0/24')
ipam_subnet = self.ipam_pool.allocate_subnet(subnet_req)
self.assertRaises(n_exc.SubnetNotFound,

View File

@ -20,9 +20,9 @@ from oslo_utils import uuidutils
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron import context
from neutron import ipam
from neutron.ipam import driver
from neutron.ipam import exceptions as ipam_exc
from neutron.ipam import requests as ipam_req
from neutron import manager
from neutron.tests import base
from neutron.tests.unit.ipam import fake_driver
@ -41,7 +41,7 @@ class IpamSubnetRequestTestCase(base.BaseTestCase):
class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
def test_subnet_request(self):
pool = ipam.SubnetRequest(self.tenant_id,
pool = ipam_req.SubnetRequest(self.tenant_id,
self.subnet_id)
self.assertEqual(self.tenant_id, pool.tenant_id)
self.assertEqual(self.subnet_id, pool.subnet_id)
@ -49,14 +49,14 @@ class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
self.assertEqual(None, pool.allocation_pools)
def test_subnet_request_gateway(self):
request = ipam.SubnetRequest(self.tenant_id,
request = ipam_req.SubnetRequest(self.tenant_id,
self.subnet_id,
gateway_ip='1.2.3.1')
self.assertEqual('1.2.3.1', str(request.gateway_ip))
def test_subnet_request_bad_gateway(self):
self.assertRaises(netaddr.core.AddrFormatError,
ipam.SubnetRequest,
ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
gateway_ip='1.2.3.')
@ -64,21 +64,21 @@ class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
def test_subnet_request_with_range(self):
allocation_pools = [netaddr.IPRange('1.2.3.4', '1.2.3.5'),
netaddr.IPRange('1.2.3.7', '1.2.3.9')]
request = ipam.SubnetRequest(self.tenant_id,
request = ipam_req.SubnetRequest(self.tenant_id,
self.subnet_id,
allocation_pools=allocation_pools)
self.assertEqual(allocation_pools, request.allocation_pools)
def test_subnet_request_range_not_list(self):
self.assertRaises(TypeError,
ipam.SubnetRequest,
ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=1)
def test_subnet_request_bad_range(self):
self.assertRaises(TypeError,
ipam.SubnetRequest,
ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=['1.2.3.4'])
@ -87,7 +87,7 @@ class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
pools = [netaddr.IPRange('0.0.0.1', '0.0.0.2'),
netaddr.IPRange('::1', '::2')]
self.assertRaises(ValueError,
ipam.SubnetRequest,
ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=pools)
@ -96,7 +96,7 @@ class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
pools = [netaddr.IPRange('0.0.0.10', '0.0.0.20'),
netaddr.IPRange('0.0.0.8', '0.0.0.10')]
self.assertRaises(ValueError,
ipam.SubnetRequest,
ipam_req.SubnetRequest,
self.tenant_id,
self.subnet_id,
allocation_pools=pools)
@ -105,7 +105,7 @@ class TestIpamSubnetRequests(IpamSubnetRequestTestCase):
class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request(self):
request = ipam.AnySubnetRequest(self.tenant_id,
request = ipam_req.AnySubnetRequest(self.tenant_id,
self.subnet_id,
constants.IPv4,
24,
@ -114,7 +114,7 @@ class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request_bad_prefix_type(self):
self.assertRaises(netaddr.core.AddrFormatError,
ipam.AnySubnetRequest,
ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv4,
@ -122,13 +122,13 @@ class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request_bad_prefix(self):
self.assertRaises(netaddr.core.AddrFormatError,
ipam.AnySubnetRequest,
ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv4,
33)
self.assertRaises(netaddr.core.AddrFormatError,
ipam.AnySubnetRequest,
ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv6,
@ -136,7 +136,7 @@ class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request_bad_gateway(self):
self.assertRaises(ValueError,
ipam.AnySubnetRequest,
ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv6,
@ -146,7 +146,7 @@ class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request_allocation_pool_wrong_version(self):
pools = [netaddr.IPRange('0.0.0.4', '0.0.0.5')]
self.assertRaises(ValueError,
ipam.AnySubnetRequest,
ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv6,
@ -156,7 +156,7 @@ class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request_allocation_pool_not_in_net(self):
pools = [netaddr.IPRange('0.0.0.64', '0.0.0.128')]
self.assertRaises(ValueError,
ipam.AnySubnetRequest,
ipam_req.AnySubnetRequest,
self.tenant_id,
self.subnet_id,
constants.IPv4,
@ -167,7 +167,7 @@ class TestIpamAnySubnetRequest(IpamSubnetRequestTestCase):
class TestIpamSpecificSubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request(self):
request = ipam.SpecificSubnetRequest(self.tenant_id,
request = ipam_req.SpecificSubnetRequest(self.tenant_id,
self.subnet_id,
'1.2.3.0/24',
gateway_ip='1.2.3.1')
@ -177,7 +177,7 @@ class TestIpamSpecificSubnetRequest(IpamSubnetRequestTestCase):
def test_subnet_request_bad_gateway(self):
self.assertRaises(ValueError,
ipam.SpecificSubnetRequest,
ipam_req.SpecificSubnetRequest,
self.tenant_id,
self.subnet_id,
'2001::1',
@ -189,28 +189,28 @@ class TestAddressRequest(base.BaseTestCase):
# This class doesn't test much. At least running through all of the
# constructors may shake out some trivial bugs.
EUI64 = ipam.AutomaticAddressRequest.EUI64
EUI64 = ipam_req.AutomaticAddressRequest.EUI64
def setUp(self):
super(TestAddressRequest, self).setUp()
def test_specific_address_ipv6(self):
request = ipam.SpecificAddressRequest('2000::45')
request = ipam_req.SpecificAddressRequest('2000::45')
self.assertEqual(netaddr.IPAddress('2000::45'), request.address)
def test_specific_address_ipv4(self):
request = ipam.SpecificAddressRequest('1.2.3.32')
request = ipam_req.SpecificAddressRequest('1.2.3.32')
self.assertEqual(netaddr.IPAddress('1.2.3.32'), request.address)
def test_any_address(self):
ipam.AnyAddressRequest()
ipam_req.AnyAddressRequest()
def test_automatic_address_request_eui64(self):
subnet_cidr = '2607:f0d0:1002:51::/64'
port_mac = 'aa:bb:cc:dd:ee:ff'
eui_addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(subnet_cidr,
port_mac))
request = ipam.AutomaticAddressRequest(
request = ipam_req.AutomaticAddressRequest(
address_type=self.EUI64,
prefix=subnet_cidr,
mac=port_mac)
@ -218,18 +218,18 @@ class TestAddressRequest(base.BaseTestCase):
def test_automatic_address_request_invalid_address_type_raises(self):
self.assertRaises(ipam_exc.InvalidAddressType,
ipam.AutomaticAddressRequest,
ipam_req.AutomaticAddressRequest,
address_type='kaboom')
def test_automatic_address_request_eui64_no_mac_raises(self):
self.assertRaises(ipam_exc.AddressCalculationFailure,
ipam.AutomaticAddressRequest,
ipam_req.AutomaticAddressRequest,
address_type=self.EUI64,
prefix='meh')
def test_automatic_address_request_eui64_alien_param_raises(self):
self.assertRaises(ipam_exc.AddressCalculationFailure,
ipam.AutomaticAddressRequest,
ipam_req.AutomaticAddressRequest,
address_type=self.EUI64,
mac='meh',
alien='et',
@ -265,7 +265,7 @@ class TestIpamDriverLoader(base.BaseTestCase):
def test_ipam_driver_raises_import_error(self):
self._verify_import_error_is_generated(
'neutron.tests.unit.ipam.SomeNonExistentClass')
'neutron.tests.unit.ipam_req.SomeNonExistentClass')
def test_ipam_driver_raises_import_error_for_none(self):
self._verify_import_error_is_generated(None)
@ -292,18 +292,18 @@ class TestAddressRequestFactory(base.BaseTestCase):
def test_specific_address_request_is_loaded(self):
for address in ('10.12.0.15', 'fffe::1'):
self.assertIsInstance(
ipam.AddressRequestFactory.get_request(None,
None,
address),
ipam.SpecificAddressRequest)
ipam_req.AddressRequestFactory.get_request(None,
None,
address),
ipam_req.SpecificAddressRequest)
def test_any_address_request_is_loaded(self):
for addr in [None, '']:
self.assertIsInstance(
ipam.AddressRequestFactory.get_request(None,
ipam_req.AddressRequestFactory.get_request(None,
None,
addr),
ipam.AnyAddressRequest)
ipam_req.AnyAddressRequest)
class TestSubnetRequestFactory(IpamSubnetRequestTestCase):
@ -330,34 +330,34 @@ class TestSubnetRequestFactory(IpamSubnetRequestTestCase):
for address in addresses:
subnet, subnetpool = self._build_subnet_dict(cidr=address)
self.assertIsInstance(
ipam.SubnetRequestFactory.get_request(None,
ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool),
ipam.SpecificSubnetRequest)
ipam_req.SpecificSubnetRequest)
def test_any_address_request_is_loaded_for_ipv4(self):
subnet, subnetpool = self._build_subnet_dict(cidr=None, ip_version=4)
self.assertIsInstance(
ipam.SubnetRequestFactory.get_request(None,
ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool),
ipam.AnySubnetRequest)
ipam_req.AnySubnetRequest)
def test_any_address_request_is_loaded_for_ipv6(self):
subnet, subnetpool = self._build_subnet_dict(cidr=None, ip_version=6)
self.assertIsInstance(
ipam.SubnetRequestFactory.get_request(None,
ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool),
ipam.AnySubnetRequest)
ipam_req.AnySubnetRequest)
def test_args_are_passed_to_specific_request(self):
subnet, subnetpool = self._build_subnet_dict()
request = ipam.SubnetRequestFactory.get_request(None,
request = ipam_req.SubnetRequestFactory.get_request(None,
subnet,
subnetpool)
self.assertIsInstance(request,
ipam.SpecificSubnetRequest)
ipam_req.SpecificSubnetRequest)
self.assertEqual(self.tenant_id, request.tenant_id)
self.assertEqual(self.subnet_id, request.subnet_id)
self.assertEqual(None, request.gateway_ip)
@ -374,9 +374,9 @@ class TestGetRequestFactory(base.BaseTestCase):
def test_get_subnet_request_factory(self):
self.assertEqual(
self.driver.get_subnet_request_factory(),
ipam.SubnetRequestFactory)
ipam_req.SubnetRequestFactory)
def test_get_address_request_factory(self):
self.assertEqual(
self.driver.get_address_request_factory(),
ipam.AddressRequestFactory)
ipam_req.AddressRequestFactory)

View File

@ -21,7 +21,7 @@ from neutron.api.v2 import attributes
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron import context
import neutron.ipam as ipam
from neutron.ipam import requests as ipam_req
from neutron.ipam import subnet_alloc
from neutron import manager
from neutron.tests.unit.db import test_db_base_plugin_v2
@ -64,7 +64,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.AnySubnetRequest(self._tenant_id,
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4, 21)
res = sa.allocate_subnet(req)
@ -81,7 +81,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
with self.ctx.session.begin(subtransactions=True):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.2.0/24')
res = sa.allocate_subnet(req)
@ -96,7 +96,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.AnySubnetRequest(self._tenant_id,
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4,
21)
@ -109,7 +109,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
21, 4)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.0.0/21')
self.assertRaises(n_exc.SubnetAllocationError,
@ -122,7 +122,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.AnySubnetRequest(self._tenant_id,
req = ipam_req.AnySubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
constants.IPv4, 21)
res = sa.allocate_subnet(req)
@ -137,7 +137,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'10.1.2.0/24',
gateway_ip='10.1.2.254')
@ -154,10 +154,10 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
with self.ctx.session.begin(subtransactions=True):
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'2210::/64',
'2210::ffff:ffff:ffff:ffff')
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'2210::/64',
'2210::ffff:ffff:ffff:ffff')
res = sa.allocate_subnet(req)
detail = res.get_details()
self.assertEqual(detail.gateway_ip,
@ -177,7 +177,7 @@ class TestSubnetAllocation(testlib_api.SqlTestCase):
48, 6, default_quota=1)
sp = self.plugin._get_subnetpool(self.ctx, sp['id'])
sa = subnet_alloc.SubnetAllocator(sp, self.ctx)
req = ipam.SpecificSubnetRequest(self._tenant_id,
req = ipam_req.SpecificSubnetRequest(self._tenant_id,
uuidutils.generate_uuid(),
'fe80::/63')
self.assertRaises(n_exc.SubnetPoolQuotaExceeded,