Check port VNIC type when associating a floating IP

When associating a floating IP to a port and the router is distributed,
the VNIC type of this port must be "normal" only. In any other case,
the floating IP can't be assigned. For example, a SR-IOV can have a
floating IP if the router is distributed (the router is in the same
host of the port).

This patch adds also function can_port_be_bound_to_virtual_bridge to
neutron/db/l3_db.py module.
Originally this function was introduced in neutron-lib with [1] but
in stable branch there is older neutron-lib used so this isn't available
from neutron-lib.

[1] https://review.openstack.org/#/c/615126/

Closes-Bug: #1566951

Change-Id: I4944041df81e24683bc612560808bcdcc2db6bf2
(cherry picked from commit 1966ad3945)
(cherry picked from commit 0f14e30fa4)
(cherry picked from commit 1c573bb8b9)
This commit is contained in:
Rodolfo Alonso Hernandez 2018-11-02 11:35:25 +00:00 committed by Slawek Kaplonski
parent 66fa2735c8
commit edd8ad31d7
3 changed files with 75 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import random
from debtcollector import removals
import netaddr
from neutron_lib.api.definitions import portbindings as pb
from neutron_lib.api import validators
from neutron_lib.callbacks import events
from neutron_lib.callbacks import exceptions
@ -72,6 +73,15 @@ API_TO_DB_COLUMN_MAP = {'port_id': 'fixed_port_id'}
CORE_ROUTER_ATTRS = ('id', 'name', 'tenant_id', 'admin_state_up', 'status')
def can_port_be_bound_to_virtual_bridge(port):
"""Returns if port can be bound to a virtual bridge (e.g.: LB, OVS)
:param port: (dict) A port dictionary.
:returns: True if the port VNIC type is 'normal'; False in any other case.
"""
return port[pb.VNIC_TYPE] == pb.VNIC_NORMAL
@registry.has_registry_receivers
class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
base_services.WorkerBase,
@ -1194,6 +1204,12 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
context, internal_port,
internal_subnet_id, floatingip_db['floating_network_id'])
if self.is_router_distributed(context, router_id):
if not can_port_be_bound_to_virtual_bridge(internal_port):
msg = _('Port VNIC type is not valid to associate a FIP in '
'DVR mode')
raise n_exc.BadRequest(resource='floatingip', msg=msg)
return (fip['port_id'], internal_ip_address, router_id)
def _check_and_get_fip_assoc(self, context, fip, floatingip_db):
@ -1776,6 +1792,15 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
self._process_interfaces(routers_dict, interfaces)
return list(routers_dict.values())
def is_router_distributed(self, context, router_id):
"""Returns if a router is distributed or not
If DVR extension is not enabled, no router will be distributed. This
function is overridden in L3_NAT_with_dvr_db_mixin in case the DVR
extension is loaded.
"""
return False
@registry.has_registry_receivers
class L3RpcNotifierMixin(object):

View File

@ -1210,6 +1210,13 @@ class L3_NAT_with_dvr_db_mixin(_DVRAgentInterfaceMixin,
floating_ip = self._delete_floatingip(context, id)
self._notify_floating_ip_change(context, floating_ip)
@db_api.retry_if_session_inactive()
def is_router_distributed(self, context, router_id):
if router_id:
return is_distributed_router(
self.get_router(context.elevated(), router_id))
return False
def is_distributed_router(router):
"""Return True if router to be handled is distributed."""

View File

@ -27,6 +27,7 @@ from oslo_utils import uuidutils
from neutron.db import agents_db
from neutron.db import common_db_mixin
from neutron.db import l3_db
from neutron.db import l3_dvr_db
from neutron.db import l3_dvrscheduler_db
from neutron.db.models import agent as agent_model
@ -1008,3 +1009,45 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
routers = self.mixin._get_sync_routers(
self.ctx, router_ids=[router['id']])
self.assertEqual("fake-host", routers[0]['gw_port_host'])
def test_is_router_distributed(self):
router_id = 'router_id'
with mock.patch.object(self.mixin, 'get_router') as \
mock_get_router:
mock_get_router.return_value = {'distributed': True}
self.assertTrue(
self.mixin.is_router_distributed(self.ctx, router_id))
@mock.patch.object(l3_db, 'can_port_be_bound_to_virtual_bridge',
return_value=True)
def test__get_assoc_data_valid_vnic_type(self, *args):
with mock.patch.object(self.mixin, '_internal_fip_assoc_data') as \
mock_fip_assoc_data, \
mock.patch.object(self.mixin, '_get_router_for_floatingip') \
as mock_router_fip, \
mock.patch.object(self.mixin, 'is_router_distributed',
return_value=True):
port = {portbindings.VNIC_TYPE: portbindings.VNIC_NORMAL}
mock_fip_assoc_data.return_value = (port, 'subnet_id', 'ip_addr')
mock_router_fip.return_value = 'router_id'
fip = {'port_id': 'port_id'}
self.assertEqual(
('port_id', 'ip_addr', 'router_id'),
self.mixin._get_assoc_data(self.ctx, fip, mock.MagicMock()))
@mock.patch.object(l3_db, 'can_port_be_bound_to_virtual_bridge',
return_value=False)
def test__get_assoc_data_invalid_vnic_type(self, *args):
with mock.patch.object(self.mixin, '_internal_fip_assoc_data') as \
mock_fip_assoc_data, \
mock.patch.object(self.mixin, '_get_router_for_floatingip') \
as mock_router_fip, \
mock.patch.object(self.mixin, 'is_router_distributed',
return_value=True):
port = {portbindings.VNIC_TYPE: portbindings.VNIC_NORMAL}
mock_fip_assoc_data.return_value = (port, 'subnet_id', 'ip_addr')
mock_router_fip.return_value = 'router_id'
self.assertRaises(
exceptions.BadRequest,
self.mixin._get_assoc_data,
self.ctx, mock.ANY, mock.MagicMock())