Deny delete last slaac subnet with allocation on segment

When a port has only one IP allocation on auto-allocation
subnet which is associated with a segment, do not allow
the delete of the subnet. Raise SubnetInUse exception instead.

Related: rhbz#1803989
Related-Bug: #1864225
Related-Bug: #1864333
Closes-Bug: #1865138
Change-Id: I9fb0f05ede42afa1a349635b1936028edf540a1f
This commit is contained in:
Harald Jensås 2020-02-28 22:55:13 +01:00
parent 3d3dc60408
commit f987486feb
3 changed files with 79 additions and 7 deletions

View File

@ -1040,21 +1040,20 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
raise exc.SubnetInUse(subnet_id=id)
@db_api.retry_if_session_inactive()
def _remove_subnet_ip_allocations_from_ports(self, context, id):
def _remove_subnet_ip_allocations_from_ports(self, context, subnet):
# Do not allow a subnet to be deleted if a router is attached to it
self._subnet_check_ip_allocations_internal_router_ports(
context, id)
subnet = self._get_subnet_object(context, id)
context, subnet.id)
is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
if not is_auto_addr_subnet:
# we only automatically remove IP addresses from user ports if
# the IPs come from auto allocation subnets.
self._ensure_no_user_ports_on_subnet(context, id)
self._ensure_no_user_ports_on_subnet(context, subnet.id)
net_allocs = (context.session.query(models_v2.IPAllocation.port_id).
filter_by(subnet_id=id))
filter_by(subnet_id=subnet.id))
port_ids_on_net = [ipal.port_id for ipal in net_allocs]
for port_id in port_ids_on_net:
self._remove_subnet_from_port(context, id, port_id,
self._remove_subnet_from_port(context, subnet.id, port_id,
auto_subnet=is_auto_addr_subnet)
@db_api.retry_if_session_inactive()
@ -1063,7 +1062,12 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# Make sure the subnet isn't used by other resources
_check_subnet_not_used(context, id)
subnet = self._get_subnet_object(context, id)
self._remove_subnet_ip_allocations_from_ports(context, id)
registry.publish(resources.SUBNET,
events.PRECOMMIT_DELETE_ASSOCIATIONS,
self,
payload=events.DBEventPayload(context,
resource_id=subnet.id))
self._remove_subnet_ip_allocations_from_ports(context, subnet)
self._delete_subnet(context, subnet)
def _delete_subnet(self, context, subnet):

View File

@ -33,6 +33,7 @@ from neutron_lib.callbacks import registry
from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import resource_extend
from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import placement as placement_exc
from neutron_lib.placement import client as placement_client
from neutron_lib.plugins import directory
@ -43,9 +44,12 @@ from oslo_log import log
from oslo_utils import excutils
from neutron._i18n import _
from neutron.common import ipv6_utils
from neutron.db import models_v2
from neutron.extensions import segment
from neutron.notifiers import batch_notifier
from neutron.objects import network as net_obj
from neutron.objects import ports as ports_obj
from neutron.objects import subnet as subnet_obj
from neutron.services.segments import db
from neutron.services.segments import exceptions
@ -132,6 +136,38 @@ class Plugin(db.SegmentDbMixin, segment.SegmentPluginBase):
raise exceptions.SegmentInUse(segment_id=segment_id,
reason=reason)
@registry.receives(
resources.SUBNET, [events.PRECOMMIT_DELETE_ASSOCIATIONS])
def _validate_auto_address_subnet_delete(self, resource, event, trigger,
payload):
context = payload.context
subnet = subnet_obj.Subnet.get_object(context, id=payload.resource_id)
is_auto_addr_subnet = ipv6_utils.is_auto_address_subnet(subnet)
if not is_auto_addr_subnet or subnet.segment_id is None:
return
net_allocs = (context.session.query(models_v2.IPAllocation.port_id).
filter_by(subnet_id=subnet.id))
port_ids_on_net = [ipalloc.port_id for ipalloc in net_allocs]
for port_id in port_ids_on_net:
try:
port = ports_obj.Port.get_object(context, id=port_id)
fixed_ips = [f for f in port['fixed_ips']
if f['subnet_id'] != subnet.id]
if len(fixed_ips) != 0:
continue
LOG.info("Found port %(port_id)s, with IP auto-allocation "
"only on subnet %(subnet)s which is associated with "
"segment %(segment_id)s, cannot delete",
{'port_id': port_id,
'subnet': subnet.id,
'segment_id': subnet.segment_id})
raise n_exc.SubnetInUse(subnet_id=subnet.id)
except n_exc.PortNotFound:
# port is gone
continue
class Event(object):

View File

@ -1791,6 +1791,38 @@ class TestSegmentAwareIpam(SegmentAwareIpamTestCase):
port_b_snet_ids = [f['subnet_id'] for f in port_b['port']['fixed_ips']]
self.assertNotIn(subnet_b1['subnet']['id'], port_b_snet_ids)
def test_slaac_segment_aware_delete_last_subnet_on_segment_fails(self):
(network, segment_a, segment_b, subnet_a0, subnet_a1, subnet_b0,
subnet_b1) = self._create_net_two_segments_four_slaac_subnets()
# Create two ports, port_a with subnet_a0 in fixed_ips and port_b
# with subnet_b0 in fixed_ips
port_a = self._create_port_and_show(
network, fixed_ips=[{'subnet_id': subnet_a0['subnet']['id']}])
port_b = self._create_port_and_show(
network, fixed_ips=[{'subnet_id': subnet_b0['subnet']['id']}])
self._validate_immediate_ip_allocation(port_a['port']['id'])
self._validate_immediate_ip_allocation(port_b['port']['id'])
self.assertEqual(2, len(port_a['port']['fixed_ips']))
self.assertEqual(2, len(port_b['port']['fixed_ips']))
# Delete subnet_b1 on segment_b
req = self.new_delete_request('subnets', subnet_b1['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
# Delete subnet_b0 on segment_b fails because port_b has no other
# allocation, SubnetInUse
req = self.new_delete_request('subnets', subnet_b0['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPConflict.code, res.status_int)
# Delete port_b
req = self.new_delete_request('ports', port_b['port']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
# Try to delete subnet_b0 again, should not fail with no ports
req = self.new_delete_request('subnets', subnet_b0['subnet']['id'])
res = req.get_response(self.api)
self.assertEqual(webob.exc.HTTPNoContent.code, res.status_int)
class TestSegmentAwareIpamML2(TestSegmentAwareIpam):