Fix IPv6 Subnet Slaac Check

Currently, "is_slaac_subnet" method only checks "ipv6_address_mode".
This will let neutron allocate a stateful IPv6 address for instance
when ipv6_ra_mode set to "stateless" or "slaac", and "ipv6_address_mode"
unset. But actually, instance get IPv6 address prefix from radvd, and
auto configure IPv6 address by itself. This change will let "is_slaac_
subnet" also check ipv6_ra_mode attribute, which will no longer allow
fixed IPs on auto-address subnets and delete subnet will correctly delete
auto-address ports.

Change-Id: Ic99193a0a2669fb281759d9b47c7aae6a4f6ecce
Closes-Bug: 1406436
This commit is contained in:
WeiHu 2014-12-30 10:45:55 +08:00
parent f83b08abf5
commit 7018eff5ec
5 changed files with 57 additions and 18 deletions

View File

@ -64,7 +64,8 @@ def is_enabled():
return _IS_IPV6_ENABLED
def is_slaac_subnet(subnet):
"""Check if subnet uses SLAAC addressing."""
return (subnet['ipv6_address_mode'] == constants.IPV6_SLAAC
or subnet['ipv6_address_mode'] == constants.DHCPV6_STATELESS)
def is_auto_address_subnet(subnet):
"""Check if subnet is an auto address subnet."""
modes = [constants.IPV6_SLAAC, constants.DHCPV6_STATELESS]
return (subnet['ipv6_address_mode'] in modes
or subnet['ipv6_ra_mode'] in modes)

View File

@ -445,14 +445,14 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
msg = _('IP address %s is not a valid IP for the defined '
'subnet') % fixed['ip_address']
raise n_exc.InvalidInput(error_message=msg)
if (ipv6_utils.is_slaac_subnet(subnet) and device_owner not in
if (ipv6_utils.is_auto_address_subnet(subnet) and
device_owner not in
constants.ROUTER_INTERFACE_OWNERS):
msg = (_("IPv6 address %(address)s can not be directly "
"assigned to a port on subnet %(id)s with "
"%(mode)s address mode") %
"assigned to a port on subnet %(id)s since the "
"subnet is configured for automatic addresses") %
{'address': fixed['ip_address'],
'id': subnet_id,
'mode': subnet['ipv6_address_mode']})
'id': subnet_id})
raise n_exc.InvalidInput(error_message=msg)
fixed_ip_set.append({'subnet_id': subnet_id,
'ip_address': fixed['ip_address']})
@ -478,7 +478,7 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
else:
subnet = self._get_subnet(context, fixed['subnet_id'])
if (subnet['ip_version'] == 6 and
ipv6_utils.is_slaac_subnet(subnet)):
ipv6_utils.is_auto_address_subnet(subnet)):
prefix = subnet['cidr']
ip_address = ipv6_utils.get_ipv6_addr_by_EUI64(
prefix, mac_address)
@ -559,7 +559,7 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
else:
v6.append(subnet)
for subnet in v6:
if ipv6_utils.is_slaac_subnet(subnet):
if ipv6_utils.is_auto_address_subnet(subnet):
#(dzyu) If true, calculate an IPv6 address
# by mac address and prefix, then remove this
# subnet from the array of subnets that will be passed
@ -767,7 +767,7 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
if ra_mode_set and address_mode_set:
self._validate_ipv6_combination(subnet['ipv6_ra_mode'],
subnet['ipv6_address_mode'])
if address_mode_set:
if address_mode_set or ra_mode_set:
self._validate_eui64_applicable(subnet)
def _validate_eui64_applicable(self, subnet):
@ -775,7 +775,7 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
# id together should be equal to 128. Currently neutron supports
# EUI64 interface id only, thus limiting the prefix
# length to be 64 only.
if ipv6_utils.is_slaac_subnet(subnet):
if ipv6_utils.is_auto_address_subnet(subnet):
if netaddr.IPNetwork(subnet['cidr']).prefixlen != 64:
msg = _('Invalid CIDR %s for IPv6 address mode. '
'OpenStack uses the EUI-64 address format, '
@ -1269,8 +1269,8 @@ class NeutronDbPluginV2(neutron_plugin_base_v2.NeutronPluginBaseV2,
# Remove network owned ports, and delete IP allocations
# for IPv6 addresses which were automatically generated
# via SLAAC
is_ipv6_slaac_subnet = ipv6_utils.is_slaac_subnet(subnet)
if not is_ipv6_slaac_subnet:
is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
if not is_auto_addr_subnet:
qry_network_ports = (
qry_network_ports.filter(models_v2.Port.device_owner.
in_(AUTO_DELETE_PORT_OWNERS)))

View File

@ -731,11 +731,11 @@ class Ml2Plugin(db_base_plugin_v2.NeutronDbPluginV2,
qry_allocated = (session.query(models_v2.IPAllocation).
filter_by(subnet_id=id).
join(models_v2.Port))
is_ipv6_slaac_subnet = ipv6_utils.is_slaac_subnet(subnet)
is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
# Remove network owned ports, and delete IP allocations
# for IPv6 addresses which were automatically generated
# via SLAAC
if not is_ipv6_slaac_subnet:
if not is_auto_addr_subnet:
qry_allocated = (
qry_allocated.filter(models_v2.Port.device_owner.
in_(db_base_plugin_v2.AUTO_DELETE_PORT_OWNERS)))

View File

@ -3192,7 +3192,9 @@ class TestSubnetsV2(NeutronDbPluginV2TestCase):
ctx = context.get_admin_context(load_admin_roles=False)
new_subnet = {'ip_version': 6,
'cidr': 'fe80::/64',
'enable_dhcp': True}
'enable_dhcp': True,
'ipv6_address_mode': None,
'ipv6_ra_mode': None}
for mode, value in modes.items():
new_subnet[mode] = value
if expect_success:

View File

@ -13,8 +13,10 @@
# License for the specific language governing permissions and limitations
# under the License.
import collections
import mock
from neutron.common import constants
from neutron.common import ipv6_utils
from neutron.tests import base
@ -88,3 +90,37 @@ class TestIsEnabled(base.BaseTestCase):
enabled = ipv6_utils.is_enabled()
self.assertTrue(enabled)
self.mock_read.assert_called_once_with()
class TestIsAutoAddressSubnet(base.BaseTestCase):
def setUp(self):
self.subnet = {
'cidr': '2001:200::/64',
'gateway_ip': '2001:200::1',
'ip_version': 6,
'ipv6_address_mode': None,
'ipv6_ra_mode': None
}
super(TestIsAutoAddressSubnet, self).setUp()
def test_combinations(self):
Mode = collections.namedtuple('Mode', "addr_mode ra_mode "
"is_auto_address")
subnets = [
Mode(None, None, False),
Mode(constants.DHCPV6_STATEFUL, None, False),
Mode(constants.DHCPV6_STATELESS, None, True),
Mode(constants.IPV6_SLAAC, None, True),
Mode(None, constants.DHCPV6_STATEFUL, False),
Mode(None, constants.DHCPV6_STATELESS, True),
Mode(None, constants.IPV6_SLAAC, True),
Mode(constants.DHCPV6_STATEFUL, constants.DHCPV6_STATEFUL, False),
Mode(constants.DHCPV6_STATELESS, constants.DHCPV6_STATELESS, True),
Mode(constants.IPV6_SLAAC, constants.IPV6_SLAAC, True),
]
for subnet in subnets:
self.subnet['ipv6_address_mode'] = subnet.addr_mode
self.subnet['ipv6_ra_mode'] = subnet.ra_mode
self.assertEqual(subnet.is_auto_address,
ipv6_utils.is_auto_address_subnet(self.subnet))