Merge "Ensure floating IPs only use IPv4 addresses"

This commit is contained in:
Jenkins 2015-07-13 17:29:48 +00:00 committed by Gerrit Code Review
commit fe67163538
2 changed files with 82 additions and 12 deletions

View File

@ -808,6 +808,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
external_network_id=external_network_id,
port_id=internal_port['id'])
def _port_ipv4_fixed_ips(self, port):
return [ip for ip in port['fixed_ips']
if netaddr.IPAddress(ip['ip_address']).version == 4]
def _internal_fip_assoc_data(self, context, fip):
"""Retrieve internal port data for floating IP.
@ -833,6 +837,18 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
internal_subnet_id = None
if fip.get('fixed_ip_address'):
internal_ip_address = fip['fixed_ip_address']
if netaddr.IPAddress(internal_ip_address).version != 4:
if 'id' in fip:
data = {'floatingip_id': fip['id'],
'internal_ip': internal_ip_address}
msg = (_('Floating IP %(floatingip_id) is associated '
'with non-IPv4 address %s(internal_ip)s and '
'therefore cannot be bound.') % data)
else:
msg = (_('Cannot create floating IP and bind it to %s, '
'since that is not an IPv4 address.') %
internal_ip_address)
raise n_exc.BadRequest(resource='floatingip', msg=msg)
for ip in internal_port['fixed_ips']:
if ip['ip_address'] == internal_ip_address:
internal_subnet_id = ip['subnet_id']
@ -842,18 +858,18 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
'address': internal_ip_address})
raise n_exc.BadRequest(resource='floatingip', msg=msg)
else:
ips = [ip['ip_address'] for ip in internal_port['fixed_ips']]
if not ips:
ipv4_fixed_ips = self._port_ipv4_fixed_ips(internal_port)
if not ipv4_fixed_ips:
msg = (_('Cannot add floating IP to port %s that has '
'no fixed IP addresses') % internal_port['id'])
'no fixed IPv4 addresses') % internal_port['id'])
raise n_exc.BadRequest(resource='floatingip', msg=msg)
if len(ips) > 1:
msg = (_('Port %s has multiple fixed IPs. Must provide'
' a specific IP when assigning a floating IP') %
internal_port['id'])
if len(ipv4_fixed_ips) > 1:
msg = (_('Port %s has multiple fixed IPv4 addresses. Must '
'provide a specific IPv4 address when assigning a '
'floating IP') % internal_port['id'])
raise n_exc.BadRequest(resource='floatingip', msg=msg)
internal_ip_address = internal_port['fixed_ips'][0]['ip_address']
internal_subnet_id = internal_port['fixed_ips'][0]['subnet_id']
internal_ip_address = ipv4_fixed_ips[0]['ip_address']
internal_subnet_id = ipv4_fixed_ips[0]['subnet_id']
return internal_port, internal_subnet_id, internal_ip_address
def get_assoc_data(self, context, fip, floating_network_id):
@ -909,6 +925,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
'router_id': router_id,
'last_known_router_id': previous_router_id})
def _is_ipv4_network(self, context, net_id):
net = self._core_plugin._get_network(context, net_id)
return any(s.ip_version == 4 for s in net.subnets)
def create_floatingip(self, context, floatingip,
initial_status=l3_constants.FLOATINGIP_STATUS_ACTIVE):
fip = floatingip['floatingip']
@ -920,6 +940,10 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
msg = _("Network %s is not a valid external network") % f_net_id
raise n_exc.BadRequest(resource='floatingip', msg=msg)
if not self._is_ipv4_network(context, f_net_id):
msg = _("Network %s does not contain any IPv4 subnet") % f_net_id
raise n_exc.BadRequest(resource='floatingip', msg=msg)
with context.session.begin(subtransactions=True):
# This external port is never exposed to the tenant.
# it is used purely for internal system and admin use when
@ -942,11 +966,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
external_port = self._core_plugin.create_port(context.elevated(),
{'port': port})
# Ensure IP addresses are allocated on external port
if not external_port['fixed_ips']:
# Ensure IPv4 addresses are allocated on external port
external_ipv4_ips = self._port_ipv4_fixed_ips(external_port)
if not external_ipv4_ips:
raise n_exc.ExternalIpAddressExhausted(net_id=f_net_id)
floating_fixed_ip = external_port['fixed_ips'][0]
floating_fixed_ip = external_ipv4_ips[0]
floating_ip_address = floating_fixed_ip['ip_address']
floatingip_db = FloatingIP(
id=fip_id,

View File

@ -2383,6 +2383,51 @@ class L3NatTestCaseBase(L3NatTestCaseMixin):
result = plugin.create_router(context.Context('', 'foo'), router_req)
self.assertEqual(result['id'], router_req['router']['id'])
def test_create_floatingip_ipv6_only_network_returns_400(self):
with self.subnet(cidr="2001:db8::/48", ip_version=6) as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
res = self._create_floatingip(
self.fmt,
public_sub['subnet']['network_id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_create_floatingip_ipv6_and_ipv4_network_creates_ipv4(self):
with self.network() as n,\
self.subnet(cidr="2001:db8::/48", ip_version=6, network=n),\
self.subnet(cidr="192.168.1.0/24", ip_version=4, network=n):
self._set_net_external(n['network']['id'])
fip = self._make_floatingip(self.fmt, n['network']['id'])
self.assertEqual(fip['floatingip']['floating_ip_address'],
'192.168.1.2')
def test_create_floatingip_with_assoc_to_ipv6_subnet(self):
with self.subnet() as public_sub:
self._set_net_external(public_sub['subnet']['network_id'])
with self.subnet(cidr="2001:db8::/48",
ip_version=6) as private_sub:
with self.port(subnet=private_sub) as private_port:
res = self._create_floatingip(
self.fmt,
public_sub['subnet']['network_id'],
port_id=private_port['port']['id'])
self.assertEqual(res.status_int, exc.HTTPBadRequest.code)
def test_create_floatingip_with_assoc_to_ipv4_and_ipv6_port(self):
with self.network() as n,\
self.subnet(cidr='10.0.0.0/24', network=n) as s4,\
self.subnet(cidr='2001:db8::/64', ip_version=6, network=n),\
self.port(subnet=s4) as p:
self.assertEqual(len(p['port']['fixed_ips']), 2)
ipv4_address = next(i['ip_address'] for i in
p['port']['fixed_ips'] if
netaddr.IPAddress(i['ip_address']).version == 4)
with self.floatingip_with_assoc(port_id=p['port']['id']) as fip:
self.assertEqual(fip['floatingip']['fixed_ip_address'],
ipv4_address)
floating_ip = netaddr.IPAddress(
fip['floatingip']['floating_ip_address'])
self.assertEqual(floating_ip.version, 4)
class L3AgentDbTestCaseBase(L3NatTestCaseMixin):