Verify port_forwarding subnet and IP address both

Free subnet can not remove from router if other router's
subnets have port_forwarding. This patch fixed it by
checking the router interface subnet and IP address.

Co-Authored-By: LIU Yulong <i@liuyulong.me>
Closes-Bug: #1799140
Change-Id: Idace35126bb00139fa1f9f48be3aa3aab265b9d8
This commit is contained in:
lizheng 2018-10-02 14:10:52 +08:00 committed by LIU Yulong
parent 04c772a4cd
commit f5d3a4159b
3 changed files with 130 additions and 16 deletions

View File

@ -53,6 +53,7 @@ from neutron.db import standardattrdescription_db as st_attr
from neutron.extensions import l3
from neutron.extensions import qos_fip
from neutron.objects import base as base_obj
from neutron.objects import port_forwarding
from neutron.objects import ports as port_obj
from neutron.objects import router as l3_obj
from neutron import worker as neutron_worker
@ -947,10 +948,31 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
context, port_id, {'port': {'device_id': router_id,
'device_owner': device_owner}})
def _confirm_router_interface_not_in_use(self, context, router_id,
subnet_id):
def _check_router_interface_not_in_use(self, router_id, subnet_id):
context = n_ctx.get_admin_context()
subnet = self._core_plugin.get_subnet(context, subnet_id)
subnet_cidr = netaddr.IPNetwork(subnet['cidr'])
fip_objs = l3_obj.FloatingIP.get_objects(context, router_id=router_id)
pf_plugin = directory.get_plugin(plugin_constants.PORTFORWARDING)
if pf_plugin:
fip_ids = [fip_obj.id for fip_obj in fip_objs]
pf_objs = port_forwarding.PortForwarding.get_objects(
context, floatingip_id=fip_ids)
for pf_obj in pf_objs:
if (pf_obj.internal_ip_address and
pf_obj.internal_ip_address in subnet_cidr):
raise l3_exc.RouterInterfaceInUseByFloatingIP(
router_id=router_id, subnet_id=subnet_id)
for fip_obj in fip_objs:
if (fip_obj.fixed_ip_address and
fip_obj.fixed_ip_address in subnet_cidr):
raise l3_exc.RouterInterfaceInUseByFloatingIP(
router_id=router_id, subnet_id=subnet_id)
def _confirm_router_interface_not_in_use(self, context, router_id,
subnet_id):
try:
kwargs = {'context': context, 'router_id': router_id,
'subnet_id': subnet_id}
@ -962,11 +984,8 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
if len(e.errors) == 1:
raise e.errors[0].error
raise l3_exc.RouterInUse(router_id=router_id, reason=e)
fip_objs = l3_obj.FloatingIP.get_objects(context, router_id=router_id)
for fip_obj in fip_objs:
if fip_obj.fixed_ip_address in subnet_cidr:
raise l3_exc.RouterInterfaceInUseByFloatingIP(
router_id=router_id, subnet_id=subnet_id)
self._check_router_interface_not_in_use(router_id, subnet_id)
def _remove_interface_by_port(self, context, router_id,
port_id, subnet_id, owner):

View File

@ -18,6 +18,8 @@ from neutron_lib.api.definitions import floating_ip_port_forwarding as apidef
from neutron_lib.callbacks import exceptions as c_exc
from neutron_lib import exceptions as lib_exc
from neutron_lib.exceptions import l3 as lib_l3_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from oslo_utils import uuidutils
from six.moves import queue
@ -30,6 +32,7 @@ class PortForwardingTestCaseBase(ml2_test_base.ML2TestFramework):
def setUp(self):
super(PortForwardingTestCaseBase, self).setUp()
self.pf_plugin = pf_plugin.PortForwardingPlugin()
directory.add_plugin(plugin_constants.PORTFORWARDING, self.pf_plugin)
def _create_floatingip(self, network_id, port_id=None,
fixed_ip_address=None):
@ -65,6 +68,11 @@ class PortForwardingTestCaseBase(ml2_test_base.ML2TestFramework):
self.l3_plugin.add_router_interface(
self.context, router_id, interface_info=interface_info)
def _remove_router_interface(self, router_id, subnet_id):
interface_info = {"subnet_id": subnet_id}
self.l3_plugin.remove_router_interface(
self.context, router_id, interface_info=interface_info)
def _set_router_gw(self, router_id, ext_net_id):
body = {
'router':
@ -101,7 +109,14 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
apidef.INTERNAL_IP_ADDRESS:
self.port['fixed_ips'][0]['ip_address']}}
def test_create_floatingip_port_forwarding(self):
def test_create_floatingip_port_forwarding_and_remove_subnets(self):
subnet_2 = self._create_subnet(self.fmt, self.net['id'],
'10.0.2.0/24').json['subnet']
self._add_router_interface(self.router['id'], subnet_2['id'])
subnet_3 = self._create_subnet(self.fmt, self.net['id'],
'10.0.3.0/24').json['subnet']
self._add_router_interface(self.router['id'], subnet_3['id'])
res = self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)
expect = {
@ -116,6 +131,13 @@ class PortForwardingTestCase(PortForwardingTestCaseBase):
'floatingip_id': self.fip['id']}
self.assertEqual(expect, res)
self.assertRaises(lib_l3_exc.RouterInterfaceInUseByFloatingIP,
self._remove_router_interface,
self.router['id'], self.subnet['id'])
self._remove_router_interface(self.router['id'], subnet_2['id'])
self._remove_router_interface(self.router['id'], subnet_3['id'])
def test_negative_create_floatingip_port_forwarding(self):
self.pf_plugin.create_floatingip_port_forwarding(
self.context, self.fip['id'], self.port_forwarding)

View File

@ -28,6 +28,16 @@ L3_PLUGIN = 'neutron.tests.unit.extensions.test_l3.TestL3NatServicePlugin'
CORE_PLUGIN = 'neutron.tests.unit.extensions.test_l3.TestNoL3NatPlugin'
def _get_expected(ref):
want_fields = [apidef.INTERNAL_IP_ADDRESS, apidef.PROTOCOL,
apidef.INTERNAL_PORT, apidef.EXTERNAL_PORT]
expect = {
key: value
for key, value in ref[apidef.RESOURCE_NAME].items()
if key in want_fields}
return expect
class ExtendFipPortForwardingExtensionManager(object):
def get_resources(self):
@ -78,14 +88,70 @@ class TestExtendFipPortForwardingExtension(
self._router_interface_action('add', router['router']['id'],
insub['subnet']['id'], None)
def _get_expected(ref):
want_fields = [apidef.INTERNAL_IP_ADDRESS, apidef.PROTOCOL,
apidef.INTERNAL_PORT, apidef.EXTERNAL_PORT]
expect = {
key: value
for key, value in ref[apidef.RESOURCE_NAME].items()
if key in want_fields}
return expect
with self.port(subnet=insub) as port1,\
self.port(subnet=insub) as port2:
update_dict1 = {
apidef.INTERNAL_PORT_ID: port1['port']['id'],
apidef.INTERNAL_IP_ADDRESS:
port1['port']['fixed_ips'][0]['ip_address']}
port_forwarding[apidef.RESOURCE_NAME].update(update_dict1)
self.pf_plugin.create_floatingip_port_forwarding(
ctx, fip['floatingip']['id'], port_forwarding)
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(
1, len(body['floatingip'][apidef.COLLECTION_NAME]))
expect_result1 = _get_expected(port_forwarding)
self.assertEqual(
expect_result1,
body['floatingip'][apidef.COLLECTION_NAME][0])
update_dict2 = {
apidef.EXTERNAL_PORT: 2226,
apidef.INTERNAL_PORT_ID: port2['port']['id'],
apidef.INTERNAL_IP_ADDRESS:
port2['port']['fixed_ips'][0]['ip_address']}
port_forwarding[apidef.RESOURCE_NAME].update(update_dict2)
self.pf_plugin.create_floatingip_port_forwarding(
ctx, fip['floatingip']['id'], port_forwarding)
body = self._show('floatingips', fip['floatingip']['id'])
self.assertEqual(
2, len(body['floatingip'][apidef.COLLECTION_NAME]))
expect_result2 = _get_expected(port_forwarding)
expect = [expect_result1, expect_result2]
self.assertEqual(
expect, body['floatingip'][apidef.COLLECTION_NAME])
def test_create_port_forwarding_and_remove_subnets(self):
port_forwarding = {
apidef.RESOURCE_NAME:
{apidef.EXTERNAL_PORT: 2225,
apidef.INTERNAL_PORT: 25,
apidef.INTERNAL_PORT_ID: None,
apidef.PROTOCOL: "tcp",
apidef.INTERNAL_IP_ADDRESS: None}}
ctx = context.get_admin_context()
kwargs = {'arg_list': (extnet_apidef.EXTERNAL,),
extnet_apidef.EXTERNAL: True}
with self.network(**kwargs) as extnet, self.network() as innet:
with self.subnet(network=extnet, cidr='200.0.0.0/22'),\
self.subnet(network=innet, cidr='10.0.0.0/24') as insub,\
self.subnet(network=innet, cidr='10.0.8.0/24') as insub2,\
self.subnet(network=innet, cidr='10.0.9.0/24') as insub3,\
self.router() as router:
fip = self._make_floatingip(self.fmt, extnet['network']['id'])
# check the floatingip response contains port_forwarding field
self.assertIn(apidef.COLLECTION_NAME, fip['floatingip'])
self._add_external_gateway_to_router(router['router']['id'],
extnet['network']['id'])
self._router_interface_action('add', router['router']['id'],
insub['subnet']['id'], None)
self._router_interface_action('add', router['router']['id'],
insub2['subnet']['id'], None)
self._router_interface_action('add', router['router']['id'],
insub3['subnet']['id'], None)
with self.port(subnet=insub) as port1,\
self.port(subnet=insub) as port2:
@ -122,3 +188,10 @@ class TestExtendFipPortForwardingExtension(
expect = [expect_result1, expect_result2]
self.assertEqual(
expect, body['floatingip'][apidef.COLLECTION_NAME])
self._router_interface_action(
'remove', router['router']['id'],
insub2['subnet']['id'], None)
self._router_interface_action(
'remove', router['router']['id'],
insub3['subnet']['id'], None)