diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index cf31b831cb8..5d9efccc7c6 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -804,6 +804,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): external_network_id=external_network_id, port_id=internal_port['id']) + def _port_ipv4_fixed_ips(self, port): + return [ip for ip in port['fixed_ips'] + if netaddr.IPAddress(ip['ip_address']).version == 4] + def _internal_fip_assoc_data(self, context, fip): """Retrieve internal port data for floating IP. @@ -829,6 +833,18 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): internal_subnet_id = None if fip.get('fixed_ip_address'): internal_ip_address = fip['fixed_ip_address'] + if netaddr.IPAddress(internal_ip_address).version != 4: + if 'id' in fip: + data = {'floatingip_id': fip['id'], + 'internal_ip': internal_ip_address} + msg = (_('Floating IP %(floatingip_id) is associated ' + 'with non-IPv4 address %s(internal_ip)s and ' + 'therefore cannot be bound.') % data) + else: + msg = (_('Cannot create floating IP and bind it to %s, ' + 'since that is not an IPv4 address.') % + internal_ip_address) + raise n_exc.BadRequest(resource='floatingip', msg=msg) for ip in internal_port['fixed_ips']: if ip['ip_address'] == internal_ip_address: internal_subnet_id = ip['subnet_id'] @@ -838,18 +854,18 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): 'address': internal_ip_address}) raise n_exc.BadRequest(resource='floatingip', msg=msg) else: - ips = [ip['ip_address'] for ip in internal_port['fixed_ips']] - if not ips: + ipv4_fixed_ips = self._port_ipv4_fixed_ips(internal_port) + if not ipv4_fixed_ips: msg = (_('Cannot add floating IP to port %s that has ' - 'no fixed IP addresses') % internal_port['id']) + 'no fixed IPv4 addresses') % internal_port['id']) raise n_exc.BadRequest(resource='floatingip', msg=msg) - if len(ips) > 1: - msg = (_('Port %s has multiple fixed IPs. Must provide' - ' a specific IP when assigning a floating IP') % - internal_port['id']) + if len(ipv4_fixed_ips) > 1: + msg = (_('Port %s has multiple fixed IPv4 addresses. Must ' + 'provide a specific IPv4 address when assigning a ' + 'floating IP') % internal_port['id']) raise n_exc.BadRequest(resource='floatingip', msg=msg) - internal_ip_address = internal_port['fixed_ips'][0]['ip_address'] - internal_subnet_id = internal_port['fixed_ips'][0]['subnet_id'] + internal_ip_address = ipv4_fixed_ips[0]['ip_address'] + internal_subnet_id = ipv4_fixed_ips[0]['subnet_id'] return internal_port, internal_subnet_id, internal_ip_address def get_assoc_data(self, context, fip, floating_network_id): @@ -905,6 +921,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): 'router_id': router_id, 'last_known_router_id': previous_router_id}) + def _is_ipv4_network(self, context, net_id): + net = self._core_plugin._get_network(context, net_id) + return any(s.ip_version == 4 for s in net.subnets) + def create_floatingip(self, context, floatingip, initial_status=l3_constants.FLOATINGIP_STATUS_ACTIVE): fip = floatingip['floatingip'] @@ -916,6 +936,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): msg = _("Network %s is not a valid external network") % f_net_id raise n_exc.BadRequest(resource='floatingip', msg=msg) + if not self._is_ipv4_network(context, f_net_id): + msg = _("Network %s does not contain any IPv4 subnet") % f_net_id + raise n_exc.BadRequest(resource='floatingip', msg=msg) + with context.session.begin(subtransactions=True): # This external port is never exposed to the tenant. # it is used purely for internal system and admin use when @@ -938,11 +962,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase): external_port = self._core_plugin.create_port(context.elevated(), {'port': port}) - # Ensure IP addresses are allocated on external port - if not external_port['fixed_ips']: + # Ensure IPv4 addresses are allocated on external port + external_ipv4_ips = self._port_ipv4_fixed_ips(external_port) + if not external_ipv4_ips: raise n_exc.ExternalIpAddressExhausted(net_id=f_net_id) - floating_fixed_ip = external_port['fixed_ips'][0] + floating_fixed_ip = external_ipv4_ips[0] floating_ip_address = floating_fixed_ip['ip_address'] floatingip_db = FloatingIP( id=fip_id, diff --git a/neutron/tests/unit/extensions/test_l3.py b/neutron/tests/unit/extensions/test_l3.py index 07bac0696b6..0ae0b6d562d 100644 --- a/neutron/tests/unit/extensions/test_l3.py +++ b/neutron/tests/unit/extensions/test_l3.py @@ -2371,6 +2371,51 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): result = plugin.create_router(context.Context('', 'foo'), router_req) self.assertEqual(result['id'], router_req['router']['id']) + def test_create_floatingip_ipv6_only_network_returns_400(self): + with self.subnet(cidr="2001:db8::/48", ip_version=6) as public_sub: + self._set_net_external(public_sub['subnet']['network_id']) + res = self._create_floatingip( + self.fmt, + public_sub['subnet']['network_id']) + self.assertEqual(res.status_int, exc.HTTPBadRequest.code) + + def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self): + with self.network() as n,\ + self.subnet(cidr="2001:db8::/48", ip_version=6, network=n),\ + self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n): + self._set_net_external(n['network']['id']) + fip = self._make_floatingip(self.fmt, n['network']['id']) + self.assertEqual(fip['floatingip']['floating_ip_address'], + '192.168.1.2') + + def test_create_floatingip_with_assoc_to_ipv6_subnet(self): + with self.subnet() as public_sub: + self._set_net_external(public_sub['subnet']['network_id']) + with self.subnet(cidr="2001:db8::/48", + ip_version=6) as private_sub: + with self.port(subnet=private_sub) as private_port: + res = self._create_floatingip( + self.fmt, + public_sub['subnet']['network_id'], + port_id=private_port['port']['id']) + self.assertEqual(res.status_int, exc.HTTPBadRequest.code) + + def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self): + with self.network() as n,\ + self.subnet(cidr='10.0.0.0/24', network=n) as s4,\ + self.subnet(cidr='2001:db8::/64', ip_version=6, network=n),\ + self.port(subnet=s4) as p: + self.assertEqual(len(p['port']['fixed_ips']), 2) + ipv4_address = next(i['ip_address'] for i in + p['port']['fixed_ips'] if + netaddr.IPAddress(i['ip_address']).version == 4) + with self.floatingip_with_assoc(port_id=p['port']['id']) as fip: + self.assertEqual(fip['floatingip']['fixed_ip_address'], + ipv4_address) + floating_ip = netaddr.IPAddress( + fip['floatingip']['floating_ip_address']) + self.assertEqual(floating_ip.version, 4) + class L3AgentDbTestCaseBase(L3NatTestCaseMixin):