Only prevent l3 port deletion if router exists

This patch modifies the prevent_l3_port_deletion method to
actually look up the router_id in the device_owner field to
confirm that the router exists before claiming the port is
in use. This will allow users to delete ports that may have
been orphaned due to race conditions in the cleanup of router
interfaces.

Partial-Bug: #1540271
Change-Id: Ieffe632f3f3098baf202d3795ab5182982e234bd
This commit is contained in:
Kevin Benton 2016-02-01 01:17:22 -08:00
parent 44571236ca
commit 3b41808b86
3 changed files with 104 additions and 13 deletions

View File

@ -1154,6 +1154,20 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
return self._get_collection_count(context, FloatingIP,
filters=filters)
def _router_exists(self, context, router_id):
try:
self.get_router(context.elevated(), router_id)
return True
except l3.RouterNotFound:
return False
def _floating_ip_exists(self, context, floating_ip_id):
try:
self.get_floatingip(context, floating_ip_id)
return True
except l3.FloatingIPNotFound:
return False
def prevent_l3_port_deletion(self, context, port_id):
"""Checks to make sure a port is allowed to be deleted.
@ -1168,19 +1182,38 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase):
except n_exc.PortNotFound:
# non-existent ports don't need to be protected from deletion
return
if port['device_owner'] in self.router_device_owners:
if port['device_owner'] not in self.router_device_owners:
return
# Raise port in use only if the port has IP addresses
# Otherwise it's a stale port that can be removed
fixed_ips = port['fixed_ips']
if fixed_ips:
reason = _('has device owner %s') % port['device_owner']
raise n_exc.ServicePortInUse(port_id=port['id'],
reason=reason)
else:
if not fixed_ips:
LOG.debug("Port %(port_id)s has owner %(port_owner)s, but "
"no IP address, so it can be deleted",
{'port_id': port['id'],
'port_owner': port['device_owner']})
return
# NOTE(kevinbenton): we also check to make sure that the
# router still exists. It's possible for HA router interfaces
# to remain after the router is deleted if they encounter an
# error during deletion.
# Elevated context in case router is owned by another tenant
if port['device_owner'] == DEVICE_OWNER_FLOATINGIP:
if not self._floating_ip_exists(context, port['device_id']):
LOG.debug("Floating IP %(f_id)s corresponding to port "
"%(port_id)s no longer exists, allowing deletion.",
{'f_id': port['device_id'], 'port_id': port['id']})
return
elif not self._router_exists(context, port['device_id']):
LOG.debug("Router %(router_id)s corresponding to port "
"%(port_id)s no longer exists, allowing deletion.",
{'router_id': port['device_id'],
'port_id': port['id']})
return
reason = _('has device owner %s') % port['device_owner']
raise n_exc.ServicePortInUse(port_id=port['id'],
reason=reason)
def disassociate_floatingips(self, context, port_id):
"""Disassociate all floating IPs linked to specific port.

View File

@ -14,8 +14,11 @@
# limitations under the License.
import mock
import testtools
from neutron.common import exceptions as n_exc
from neutron.db import l3_db
from neutron.extensions import l3
from neutron import manager
from neutron.tests import base
@ -130,3 +133,55 @@ class TestL3_NAT_dbonly_mixin(base.BaseTestCase):
self.assertEqual([({'id': 'id1'}, 'scope1'),
({'id': 'id2'}, 'scope2'),
({'id': 'id3'}, 'scope3')], result)
@mock.patch.object(manager.NeutronManager, 'get_plugin')
def test_prevent_l3_port_deletion_port_not_found(self, gp):
# port not found doesn't prevent
gp.return_value.get_port.side_effect = n_exc.PortNotFound(port_id='1')
self.db.prevent_l3_port_deletion(None, None)
@mock.patch.object(manager.NeutronManager, 'get_plugin')
def test_prevent_l3_port_device_owner_not_router(self, gp):
# ignores other device owners
gp.return_value.get_port.return_value = {'device_owner': 'cat'}
self.db.prevent_l3_port_deletion(None, None)
@mock.patch.object(manager.NeutronManager, 'get_plugin')
def test_prevent_l3_port_no_fixed_ips(self, gp):
# without fixed IPs is allowed
gp.return_value.get_port.return_value = {
'device_owner': 'network:router_interface', 'fixed_ips': [],
'id': 'f'
}
self.db.prevent_l3_port_deletion(None, None)
@mock.patch.object(manager.NeutronManager, 'get_plugin')
def test_prevent_l3_port_no_router(self, gp):
# without router is allowed
gp.return_value.get_port.return_value = {
'device_owner': 'network:router_interface',
'device_id': '44', 'id': 'f',
'fixed_ips': [{'ip_address': '1.1.1.1', 'subnet_id': '4'}]}
self.db.get_router = mock.Mock()
self.db.get_router.side_effect = l3.RouterNotFound(router_id='44')
self.db.prevent_l3_port_deletion(mock.Mock(), None)
@mock.patch.object(manager.NeutronManager, 'get_plugin')
def test_prevent_l3_port_existing_router(self, gp):
gp.return_value.get_port.return_value = {
'device_owner': 'network:router_interface',
'device_id': 'some_router', 'id': 'f',
'fixed_ips': [{'ip_address': '1.1.1.1', 'subnet_id': '4'}]}
self.db.get_router = mock.Mock()
with testtools.ExpectedException(n_exc.ServicePortInUse):
self.db.prevent_l3_port_deletion(mock.Mock(), None)
@mock.patch.object(manager.NeutronManager, 'get_plugin')
def test_prevent_l3_port_existing_floating_ip(self, gp):
gp.return_value.get_port.return_value = {
'device_owner': 'network:floatingip',
'device_id': 'some_flip', 'id': 'f',
'fixed_ips': [{'ip_address': '1.1.1.1', 'subnet_id': '4'}]}
self.db.get_floatingip = mock.Mock()
with testtools.ExpectedException(n_exc.ServicePortInUse):
self.db.prevent_l3_port_deletion(mock.Mock(), None)

View File

@ -181,6 +181,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
plugin = mock.Mock()
gp.return_value = plugin
plugin.get_port.return_value = port
self.mixin._router_exists = mock.Mock(return_value=True)
self.assertRaises(exceptions.ServicePortInUse,
self.mixin.prevent_l3_port_deletion,
self.ctx,
@ -190,6 +191,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
port = {
'id': 'my_port_id',
'fixed_ips': mock.ANY,
'device_id': 'r_id',
'device_owner': l3_const.DEVICE_OWNER_AGENT_GW
}
self._test_prepare_direct_delete_dvr_internal_ports(port)
@ -198,6 +200,7 @@ class L3DvrTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
port = {
'id': 'my_port_id',
'fixed_ips': mock.ANY,
'device_id': 'r_id',
'device_owner': l3_const.DEVICE_OWNER_ROUTER_SNAT
}
self._test_prepare_direct_delete_dvr_internal_ports(port)