Merge "DB, IPAM & RPC changes for IPv6 Prefix Delegation"

This commit is contained in:
Jenkins 2015-08-06 11:55:09 +00:00 committed by Gerrit Code Review
commit 5e985b5c5c
11 changed files with 272 additions and 14 deletions

View File

@ -43,7 +43,8 @@ class L3RpcCallback(object):
# 1.4 Added L3 HA update_router_state. This method was later removed,
# since it was unused. The RPC version was not changed
# 1.5 Added update_ha_routers_states
target = oslo_messaging.Target(version='1.5')
# 1.6 Added process_prefix_update to support IPv6 Prefix Delegation
target = oslo_messaging.Target(version='1.6')
@property
def plugin(self):
@ -261,3 +262,10 @@ class L3RpcCallback(object):
LOG.debug('Updating HA routers states on host %s: %s', host, states)
self.l3plugin.update_routers_states(context, states, host)
def process_prefix_update(self, context, **kwargs):
subnets = kwargs.get('subnets')
for subnet_id, prefix in subnets.items():
self.plugin.update_subnet(context, subnet_id,
{'subnet': {'cidr': prefix}})

View File

@ -367,6 +367,16 @@ def _validate_regex_or_none(data, valid_values=None):
return _validate_regex(data, valid_values)
def _validate_subnetpool_id(data, valid_values=None):
if data != constants.IPV6_PD_POOL_ID:
return _validate_uuid_or_none(data, valid_values)
def _validate_subnetpool_id_or_none(data, valid_values=None):
if data is not None:
return _validate_subnetpool_id(data, valid_values)
def _validate_uuid(data, valid_values=None):
if not uuidutils.is_uuid_like(data):
msg = _("'%s' is not a valid UUID") % data
@ -613,6 +623,8 @@ validators = {'type:dict': _validate_dict,
'type:subnet': _validate_subnet,
'type:subnet_list': _validate_subnet_list,
'type:subnet_or_none': _validate_subnet_or_none,
'type:subnetpool_id': _validate_subnetpool_id,
'type:subnetpool_id_or_none': _validate_subnetpool_id_or_none,
'type:uuid': _validate_uuid,
'type:uuid_or_none': _validate_uuid_or_none,
'type:uuid_list': _validate_uuid_list,
@ -743,7 +755,7 @@ RESOURCE_ATTRIBUTE_MAP = {
'allow_put': False,
'default': ATTR_NOT_SPECIFIED,
'required_by_policy': False,
'validate': {'type:uuid_or_none': None},
'validate': {'type:subnetpool_id_or_none': None},
'is_visible': True},
'prefixlen': {'allow_post': True,
'allow_put': False,

View File

@ -143,6 +143,9 @@ IPV6_LLA_PREFIX = 'fe80::/64'
# indicate that IPv6 Prefix Delegation should be used to allocate subnet CIDRs
IPV6_PD_POOL_ID = 'prefix_delegation'
# Special provisional prefix for IPv6 Prefix Delegation
PROVISIONAL_IPV6_PD_PREFIX = '::/64'
# Linux interface max length
DEVICE_NAME_MAX_LEN = 15

View File

@ -77,3 +77,10 @@ def is_eui64_address(ip_address):
# '0xfffe' addition is used to build EUI-64 from MAC (RFC4291)
# Look for it in the middle of the EUI-64 part of address
return ip.version == 6 and not ((ip & 0xffff000000) ^ 0xfffe000000)
def is_ipv6_pd_enabled(subnet):
"""Returns True if the subnetpool_id of the given subnet is equal to
constants.IPV6_PD_POOL_ID
"""
return subnet.get('subnetpool_id') == constants.IPV6_PD_POOL_ID

View File

@ -233,6 +233,10 @@ def get_hostname():
return socket.gethostname()
def get_first_host_ip(net, ip_version):
return str(netaddr.IPAddress(net.first + 1, ip_version))
def compare_elements(a, b):
"""Compare elements if a and b have same elements.

View File

@ -24,6 +24,7 @@ from oslo_utils import uuidutils
from sqlalchemy import and_
from sqlalchemy import event
from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api
from neutron.api.v2 import attributes
from neutron.callbacks import events
from neutron.callbacks import exceptions
@ -32,6 +33,7 @@ from neutron.callbacks import resources
from neutron.common import constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import utils
from neutron import context as ctx
from neutron.db import api as db_api
from neutron.db import db_base_plugin_common
@ -394,7 +396,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# NOTE(salv-orlando): There is slight chance of a race, when
# a subnet-update and a router-interface-add operation are
# executed concurrently
if cur_subnet:
if cur_subnet and not ipv6_utils.is_ipv6_pd_enabled(s):
alloc_qry = context.session.query(models_v2.IPAllocation)
allocated = alloc_qry.filter_by(
ip_address=cur_subnet['gateway_ip'],
@ -439,6 +441,29 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if ip_ver == 6:
self._validate_ipv6_attributes(s, cur_subnet)
def _validate_subnet_for_pd(self, subnet):
"""Validates that subnet parameters are correct for IPv6 PD"""
if (subnet.get('ip_version') != constants.IP_VERSION_6):
reason = _("Prefix Delegation can only be used with IPv6 "
"subnets.")
raise n_exc.BadRequest(resource='subnets', msg=reason)
mode_list = [constants.IPV6_SLAAC,
constants.DHCPV6_STATELESS,
attributes.ATTR_NOT_SPECIFIED]
ra_mode = subnet.get('ipv6_ra_mode')
if ra_mode not in mode_list:
reason = _("IPv6 RA Mode must be SLAAC or Stateless for "
"Prefix Delegation.")
raise n_exc.BadRequest(resource='subnets', msg=reason)
address_mode = subnet.get('ipv6_address_mode')
if address_mode not in mode_list:
reason = _("IPv6 Address Mode must be SLAAC or Stateless for "
"Prefix Delegation.")
raise n_exc.BadRequest(resource='subnets', msg=reason)
def _update_router_gw_ports(self, context, network, subnet):
l3plugin = manager.NeutronManager.get_service_plugins().get(
service_constants.L3_ROUTER_NAT)
@ -543,6 +568,17 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
subnetpool_id = self._get_subnetpool_id(s)
if subnetpool_id:
self.ipam.validate_pools_with_subnetpool(s)
if subnetpool_id == constants.IPV6_PD_POOL_ID:
if has_cidr:
# We do not currently support requesting a specific
# cidr with IPv6 prefix delegation. Set the subnetpool_id
# to None and allow the request to continue as normal.
subnetpool_id = None
self._validate_subnet(context, s)
else:
prefix = constants.PROVISIONAL_IPV6_PD_PREFIX
subnet['subnet']['cidr'] = prefix
self._validate_subnet_for_pd(s)
else:
if not has_cidr:
msg = _('A cidr must be specified in the absence of a '
@ -552,6 +588,16 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
return self._create_subnet(context, subnet, subnetpool_id)
def _update_allocation_pools(self, subnet):
"""Gets new allocation pools and formats them correctly"""
allocation_pools = self.ipam.generate_allocation_pools(
subnet['cidr'],
subnet['gateway_ip'])
return [{'start': str(netaddr.IPAddress(p.first,
subnet['ip_version'])),
'end': str(netaddr.IPAddress(p.last, subnet['ip_version']))}
for p in allocation_pools]
def update_subnet(self, context, id, subnet):
"""Update the subnet with new info.
@ -559,6 +605,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
dns lease or we support gratuitous DHCP offers
"""
s = subnet['subnet']
new_cidr = s.get('cidr')
db_subnet = self._get_subnet(context, id)
# Fill 'ip_version' and 'allocation_pools' fields with the current
# value since _validate_subnet() expects subnet spec has 'ip_version'
@ -567,6 +614,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
s['cidr'] = db_subnet.cidr
s['id'] = db_subnet.id
s['tenant_id'] = db_subnet.tenant_id
s['subnetpool_id'] = db_subnet.subnetpool_id
self._validate_subnet(context, s, cur_subnet=db_subnet)
db_pools = [netaddr.IPRange(p['first_ip'], p['last_ip'])
for p in db_subnet.allocation_pools]
@ -577,6 +625,17 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
range_pools = self.ipam.pools_to_ip_range(s['allocation_pools'])
s['allocation_pools'] = range_pools
update_ports_needed = False
if new_cidr and ipv6_utils.is_ipv6_pd_enabled(s):
# This is an ipv6 prefix delegation-enabled subnet being given an
# updated cidr by the process_prefix_update RPC
s['cidr'] = new_cidr
update_ports_needed = True
net = netaddr.IPNetwork(s['cidr'], s['ip_version'])
# Update gateway_ip and allocation pools based on new cidr
s['gateway_ip'] = utils.get_first_host_ip(net, s['ip_version'])
s['allocation_pools'] = self._update_allocation_pools(s)
# If either gateway_ip or allocation_pools were specified
gateway_ip = s.get('gateway_ip')
if gateway_ip is not None or s.get('allocation_pools') is not None:
@ -591,6 +650,31 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
result = self._make_subnet_dict(subnet, context=context)
# Keep up with fields that changed
result.update(changes)
if update_ports_needed:
# Find ports that have not yet been updated
# with an IP address by Prefix Delegation, and update them
ports = self.get_ports(context)
routers = []
for port in ports:
fixed_ips = []
new_port = {'port': port}
for ip in port['fixed_ips']:
if ip['subnet_id'] == s['id']:
fixed_ip = {'subnet_id': s['id']}
if "router_interface" in port['device_owner']:
routers.append(port['device_id'])
fixed_ip['ip_address'] = s['gateway_ip']
fixed_ips.append(fixed_ip)
if fixed_ips:
new_port['port']['fixed_ips'] = fixed_ips
self.update_port(context, port['id'], new_port)
# Send router_update to l3_agent
if routers:
l3_rpc_notifier = l3_rpc_agent_api.L3AgentNotifyAPI()
l3_rpc_notifier.routers_updated(context, routers)
return result
def _subnet_check_ip_allocations(self, context, subnet_id):

View File

@ -168,7 +168,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
context.session.add_all(new_pools)
# Call static method with self to redefine in child
# (non-pluggable backend)
self._rebuild_availability_ranges(context, [s])
if not ipv6_utils.is_ipv6_pd_enabled(s):
self._rebuild_availability_ranges(context, [s])
# Gather new pools for result
result_pools = [{'start': p[0], 'end': p[1]} for p in pools]
del s['allocation_pools']
@ -199,7 +200,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
Verifies the specified CIDR does not overlap with the ones defined
for the other subnets specified for this network, or with any other
CIDR if overlapping IPs are disabled.
CIDR if overlapping IPs are disabled. Does not apply to subnets with
temporary IPv6 Prefix Delegation CIDRs (::/64).
"""
new_subnet_ipset = netaddr.IPSet([new_subnet_cidr])
# Disallow subnets with prefix length 0 as they will lead to
@ -217,7 +219,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
else:
subnet_list = self._get_all_subnets(context)
for subnet in subnet_list:
if (netaddr.IPSet([subnet.cidr]) & new_subnet_ipset):
if ((netaddr.IPSet([subnet.cidr]) & new_subnet_ipset) and
subnet.cidr != constants.PROVISIONAL_IPV6_PD_PREFIX):
# don't give out details of the overlapping subnet
err_msg = (_("Requested subnet with cidr: %(cidr)s for "
"network: %(network_id)s overlaps with another "
@ -330,10 +333,13 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
return subnet
raise n_exc.InvalidIpForNetwork(ip_address=fixed['ip_address'])
def generate_pools(self, cidr, gateway_ip):
return ipam_utils.generate_pools(cidr, gateway_ip)
def _prepare_allocation_pools(self, allocation_pools, cidr, gateway_ip):
"""Returns allocation pools represented as list of IPRanges"""
if not attributes.is_attr_set(allocation_pools):
return ipam_utils.generate_pools(cidr, gateway_ip)
return self.generate_pools(cidr, gateway_ip)
ip_range_pools = self.pools_to_ip_range(allocation_pools)
self._validate_allocation_pools(ip_range_pools, cidr)
@ -355,7 +361,8 @@ class IpamBackendMixin(db_base_plugin_common.DbBasePluginCommon):
return True
subnet = self._get_subnet(context, subnet_id)
return not ipv6_utils.is_auto_address_subnet(subnet)
return not (ipv6_utils.is_auto_address_subnet(subnet) and
not ipv6_utils.is_ipv6_pd_enabled(subnet))
def _get_changed_ips_for_port(self, context, original_ips,
new_ips, device_owner):

View File

@ -243,7 +243,8 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
subnet = self._get_subnet_for_fixed_ip(context, fixed, network_id)
is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
if 'ip_address' in fixed:
if ('ip_address' in fixed and
subnet['cidr'] != constants.PROVISIONAL_IPV6_PD_PREFIX):
# Ensure that the IP's are unique
if not IpamNonPluggableBackend._check_unique_ip(
context, network_id,
@ -268,6 +269,7 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
# listed explicitly here by subnet ID) are associated
# with the port.
if (device_owner in constants.ROUTER_INTERFACE_OWNERS_SNAT or
ipv6_utils.is_ipv6_pd_enabled(subnet) or
not is_auto_addr_subnet):
fixed_ip_set.append({'subnet_id': subnet['id']})
@ -433,7 +435,7 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
def allocate_subnet(self, context, network, subnet, subnetpool_id):
subnetpool = None
if subnetpool_id:
if subnetpool_id and not subnetpool_id == constants.IPV6_PD_POOL_ID:
subnetpool = self._get_subnetpool(context, subnetpool_id)
self._validate_ip_version_with_subnetpool(subnet, subnetpool)
@ -452,7 +454,7 @@ class IpamNonPluggableBackend(ipam_backend_mixin.IpamBackendMixin):
subnet,
subnetpool)
if subnetpool_id:
if subnetpool_id and not subnetpool_id == constants.IPV6_PD_POOL_ID:
driver = subnet_alloc.SubnetAllocator(subnetpool, context)
ipam_subnet = driver.allocate_subnet(subnet_request)
subnet_request = ipam_subnet.get_details()

View File

@ -30,6 +30,7 @@ from neutron.callbacks import registry
from neutron.callbacks import resources
from neutron.common import constants as l3_constants
from neutron.common import exceptions as n_exc
from neutron.common import ipv6_utils
from neutron.common import rpc as n_rpc
from neutron.common import utils
from neutron.db import model_base
@ -470,6 +471,9 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
msg = (_("Router already has a port on subnet %s")
% subnet_id)
raise n_exc.BadRequest(resource='router', msg=msg)
# Ignore temporary Prefix Delegation CIDRs
if subnet_cidr == l3_constants.PROVISIONAL_IPV6_PD_PREFIX:
continue
sub_id = ip['subnet_id']
cidr = self._core_plugin._get_subnet(context.elevated(),
sub_id)['cidr']
@ -579,7 +583,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
fixed_ip = {'ip_address': subnet['gateway_ip'],
'subnet_id': subnet['id']}
if subnet['ip_version'] == 6:
if (subnet['ip_version'] == 6 and not
ipv6_utils.is_ipv6_pd_enabled(subnet)):
# Add new prefix to an existing ipv6 port with the same network id
# if one exists
port = self._find_ipv6_router_port_by_network(router,
@ -1197,7 +1202,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
for p in each_port_having_fixed_ips())
filters = {'network_id': [id for id in network_ids]}
fields = ['id', 'cidr', 'gateway_ip',
'network_id', 'ipv6_ra_mode']
'network_id', 'ipv6_ra_mode', 'subnetpool_id']
subnets_by_network = dict((id, []) for id in network_ids)
for subnet in self._core_plugin.get_subnets(context, filters, fields):
@ -1215,7 +1220,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
subnet_info = {'id': subnet['id'],
'cidr': subnet['cidr'],
'gateway_ip': subnet['gateway_ip'],
'ipv6_ra_mode': subnet['ipv6_ra_mode']}
'ipv6_ra_mode': subnet['ipv6_ra_mode'],
'subnetpool_id': subnet['subnetpool_id']}
for fixed_ip in port['fixed_ips']:
if fixed_ip['subnet_id'] == subnet['id']:
port['subnets'].append(subnet_info)

View File

@ -3402,6 +3402,38 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
ipv6_ra_mode=constants.IPV6_SLAAC,
ipv6_address_mode=constants.IPV6_SLAAC)
def test_create_subnet_ipv6_pd_gw_values(self):
cidr = constants.PROVISIONAL_IPV6_PD_PREFIX
# Gateway is last IP in IPv6 DHCPv6 Stateless subnet
gateway = '::ffff:ffff:ffff:ffff'
allocation_pools = [{'start': '::1',
'end': '::ffff:ffff:ffff:fffe'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
self._test_create_subnet(expected=expected, gateway_ip=gateway,
cidr=cidr, ip_version=6,
ipv6_ra_mode=constants.DHCPV6_STATELESS,
ipv6_address_mode=constants.DHCPV6_STATELESS)
# Gateway is first IP in IPv6 DHCPv6 Stateless subnet
gateway = '::1'
allocation_pools = [{'start': '::2',
'end': '::ffff:ffff:ffff:ffff'}]
expected = {'gateway_ip': gateway,
'cidr': cidr,
'allocation_pools': allocation_pools}
self._test_create_subnet(expected=expected, gateway_ip=gateway,
cidr=cidr, ip_version=6,
ipv6_ra_mode=constants.DHCPV6_STATELESS,
ipv6_address_mode=constants.DHCPV6_STATELESS)
# If gateway_ip is not specified, allocate first IP from the subnet
expected = {'gateway_ip': gateway,
'cidr': cidr}
self._test_create_subnet(expected=expected,
cidr=cidr, ip_version=6,
ipv6_ra_mode=constants.IPV6_SLAAC,
ipv6_address_mode=constants.IPV6_SLAAC)
def test_create_subnet_gw_outside_cidr_returns_400(self):
with self.network() as network:
self._create_subnet(self.fmt,
@ -3496,6 +3528,15 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
cidr=cidr, ip_version=6,
allocation_pools=allocation_pools)
def test_create_subnet_with_v6_pd_allocation_pool(self):
gateway_ip = '::1'
cidr = constants.PROVISIONAL_IPV6_PD_PREFIX
allocation_pools = [{'start': '::2',
'end': '::ffff:ffff:ffff:fffe'}]
self._test_create_subnet(gateway_ip=gateway_ip,
cidr=cidr, ip_version=6,
allocation_pools=allocation_pools)
def test_create_subnet_with_large_allocation_pool(self):
gateway_ip = '10.0.0.1'
cidr = '10.0.0.0/8'
@ -3693,17 +3734,38 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self.assertRaises(n_exc.InvalidInput, plugin._validate_subnet,
ctx, new_subnet, cur_subnet)
def _test_validate_subnet_ipv6_pd_modes(self, cur_subnet=None,
expect_success=True, **modes):
plugin = manager.NeutronManager.get_plugin()
ctx = context.get_admin_context(load_admin_roles=False)
new_subnet = {'ip_version': 6,
'cidr': constants.PROVISIONAL_IPV6_PD_PREFIX,
'enable_dhcp': True,
'ipv6_address_mode': None,
'ipv6_ra_mode': None}
for mode, value in modes.items():
new_subnet[mode] = value
if expect_success:
plugin._validate_subnet(ctx, new_subnet, cur_subnet)
else:
self.assertRaises(n_exc.InvalidInput, plugin._validate_subnet,
ctx, new_subnet, cur_subnet)
def test_create_subnet_ipv6_ra_modes(self):
# Test all RA modes with no address mode specified
for ra_mode in constants.IPV6_MODES:
self._test_validate_subnet_ipv6_modes(
ipv6_ra_mode=ra_mode)
self._test_validate_subnet_ipv6_pd_modes(
ipv6_ra_mode=ra_mode)
def test_create_subnet_ipv6_addr_modes(self):
# Test all address modes with no RA mode specified
for addr_mode in constants.IPV6_MODES:
self._test_validate_subnet_ipv6_modes(
ipv6_address_mode=addr_mode)
self._test_validate_subnet_ipv6_pd_modes(
ipv6_address_mode=addr_mode)
def test_create_subnet_ipv6_same_ra_and_addr_modes(self):
# Test all ipv6 modes with ra_mode==addr_mode
@ -3711,6 +3773,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
self._test_validate_subnet_ipv6_modes(
ipv6_ra_mode=ipv6_mode,
ipv6_address_mode=ipv6_mode)
self._test_validate_subnet_ipv6_pd_modes(
ipv6_ra_mode=ipv6_mode,
ipv6_address_mode=ipv6_mode)
def test_create_subnet_ipv6_different_ra_and_addr_modes(self):
# Test all ipv6 modes with ra_mode!=addr_mode
@ -3720,6 +3785,10 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
expect_success=not (ra_mode and addr_mode),
ipv6_ra_mode=ra_mode,
ipv6_address_mode=addr_mode)
self._test_validate_subnet_ipv6_pd_modes(
expect_success=not (ra_mode and addr_mode),
ipv6_ra_mode=ra_mode,
ipv6_address_mode=addr_mode)
def test_create_subnet_ipv6_out_of_cidr_global(self):
gateway_ip = '2000::1'
@ -5531,6 +5600,39 @@ class TestNeutronDbPluginV2(base.BaseTestCase):
self._test__allocate_ips_for_port(subnets, port, expected)
def test__allocate_ips_for_port_2_slaac_pd_subnets(self):
subnets = [
{
'cidr': constants.PROVISIONAL_IPV6_PD_PREFIX,
'enable_dhcp': True,
'gateway_ip': '::1',
'id': 'd1a28edd-bd83-480a-bd40-93d036c89f13',
'network_id': 'fbb9b578-95eb-4b79-a116-78e5c4927176',
'ip_version': 6,
'ipv6_address_mode': None,
'ipv6_ra_mode': 'slaac'},
{
'cidr': constants.PROVISIONAL_IPV6_PD_PREFIX,
'enable_dhcp': True,
'gateway_ip': '::1',
'id': 'dc813d3d-ed66-4184-8570-7325c8195e28',
'network_id': 'fbb9b578-95eb-4b79-a116-78e5c4927176',
'ip_version': 6,
'ipv6_address_mode': None,
'ipv6_ra_mode': 'slaac'}]
port = {'port': {
'network_id': 'fbb9b578-95eb-4b79-a116-78e5c4927176',
'fixed_ips': attributes.ATTR_NOT_SPECIFIED,
'mac_address': '12:34:56:78:44:ab',
'device_owner': 'compute'}}
expected = []
for subnet in subnets:
addr = str(ipv6_utils.get_ipv6_addr_by_EUI64(
subnet['cidr'], port['port']['mac_address']))
expected.append({'ip_address': addr, 'subnet_id': subnet['id']})
self._test__allocate_ips_for_port(subnets, port, expected)
class NeutronDbPluginV2AsMixinTestCase(NeutronDbPluginV2TestCase,
testlib_api.SqlTestCase):

View File

@ -80,6 +80,29 @@ class TestIpamBackendMixin(base.BaseTestCase):
self._test_get_changed_ips_for_port(expected_change, original_ips,
new_ips, self.owner_non_router)
def test__get_changed_ips_for_port_autoaddress_ipv6_pd_enabled(self):
owner_not_router = constants.DEVICE_OWNER_DHCP
new_ips = self._prepare_ips(self.default_new_ips)
original = (('id-1', '192.168.1.1'),
('id-5', '2000:1234:5678::12FF:FE34:5678'))
original_ips = self._prepare_ips(original)
# mock to test auto address part
pd_subnet = {'subnetpool_id': constants.IPV6_PD_POOL_ID,
'ipv6_address_mode': constants.IPV6_SLAAC,
'ipv6_ra_mode': constants.IPV6_SLAAC}
self.mixin._get_subnet = mock.Mock(return_value=pd_subnet)
# make a copy of original_ips
# since it is changed by _get_changed_ips_for_port
expected_change = self.mixin.Changes(add=[new_ips[1]],
original=[original_ips[0]],
remove=[original_ips[1]])
self._test_get_changed_ips_for_port(expected_change, original_ips,
new_ips, owner_not_router)
def _test_get_changed_ips_for_port_no_ip_address(self):
# IP address should be added if only subnet_id is provided,
# independently from auto_address status for subnet