From 90d836bc420ccd309196ece7908b41b9e2c4f766 Mon Sep 17 00:00:00 2001 From: Rodolfo Alonso Hernandez Date: Fri, 15 Nov 2024 11:08:19 +0000 Subject: [PATCH] Filter out the floating IPs when removing a shared RBAC When a RBAC with action=access_as_shared is removed from a network, it is checked first that there are no elements (ports) in this network that could no longer exist due to the RBAC permissions reduction. The floating IP related ports, that have project_id='' by definition, should be removed from this check. These ports can be created due to a RBAC with action=access_as_external. If a floating IP port is present in the network, it should not block the RBAC with action=access_as_shared removal. Closes-Bug: #2075529 Change-Id: I7e31c21c04dc1ef26f5f05537ca0d2cb8f5ca505 --- neutron/db/db_base_plugin_v2.py | 4 + neutron/tests/functional/db/test_network.py | 122 +++++++++++++++----- 2 files changed, 100 insertions(+), 26 deletions(-) diff --git a/neutron/db/db_base_plugin_v2.py b/neutron/db/db_base_plugin_v2.py index 28826ffc773..c2e37622cd1 100644 --- a/neutron/db/db_base_plugin_v2.py +++ b/neutron/db/db_base_plugin_v2.py @@ -276,6 +276,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon, allowed_tenants.append(net_tenant_id) ports = ports.filter( ~models_v2.Port.tenant_id.in_(allowed_tenants)) + # Filter any port with project_id=''. These ports are related + # to floating IPs, router ports (gateway, SNAT, FIP agent, HA + # interface). + ports = ports.filter(models_v2.Port.project_id.notin_([''])) else: # if there is a wildcard rule, we can return early because it # allows any ports diff --git a/neutron/tests/functional/db/test_network.py b/neutron/tests/functional/db/test_network.py index bce61b9c9ff..452ade9a1d5 100644 --- a/neutron/tests/functional/db/test_network.py +++ b/neutron/tests/functional/db/test_network.py @@ -20,10 +20,11 @@ from neutron_lib import exceptions as n_exc from oslo_config import cfg from oslo_utils import uuidutils -from neutron.db import rbac_db_models +from neutron.db import l3_dvrscheduler_db from neutron.objects import network as network_obj from neutron.plugins.ml2 import plugin as ml2_plugin from neutron import quota +from neutron.services.l3_router import l3_router_plugin from neutron.tests.unit import testlib_api @@ -36,6 +37,9 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase): DB_PLUGIN_KLASS = 'neutron.plugins.ml2.plugin.Ml2Plugin' self.setup_coreplugin(DB_PLUGIN_KLASS) self.plugin = ml2_plugin.Ml2Plugin() + self.mock_notify_l3_agent = mock.patch.object( + l3_dvrscheduler_db, '_notify_l3_agent_new_port').start() + self.plugin_l3 = l3_router_plugin.L3RouterPlugin() self.ctx = context.Context(user_id=None, tenant_id=None, is_admin=True, @@ -92,14 +96,26 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase): 'fixed_ips': constants.ATTR_NOT_SPECIFIED} return self.plugin.create_port(self.ctx, {'port': port}) + def _create_floating_ip(self, tenant_id, network_id): + fip = {'tenant_id': tenant_id, + 'floating_network_id': network_id} + return self.plugin_l3.create_floatingip(self.ctx, {'floatingip': fip}) + + def _create_rbac(self, project_id, network_id, action, target_project): + rbac = {'project_id': project_id, + 'object_id': network_id, + 'object_type': 'network', + 'target_project': target_project, + 'action': action} + return self.plugin.create_rbac_policy(self.ctx, {'rbac_policy': rbac}) + + def _delete_rbac(self, rbac_id): + return self.plugin.delete_rbac_policy(self.ctx, rbac_id) + def _list_networks(self, ctx): return self.plugin.get_networks(ctx) - def _check_rbac(self, network_id, is_none, external): - if external: - action = rbac_db_models.ACCESS_EXTERNAL - else: - action = rbac_db_models.ACCESS_SHARED + def _check_rbac(self, network_id, is_none, action): rbac = network_obj.NetworkRBAC.get_object( self.ctx, object_id=network_id, action=action, target_project='*') if is_none: @@ -116,10 +132,12 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase): 'net-shared': (uuidutils.generate_uuid(), True)} for uuid, shared in tenant_1.values(): self._create_network(self.tenant_1, uuid, shared) - self._check_rbac(uuid, is_none=(not shared), external=False) + self._check_rbac(uuid, is_none=(not shared), + action=constants.ACCESS_SHARED) for uuid, shared in tenant_2.values(): self._create_network(self.tenant_2, uuid, shared) - self._check_rbac(uuid, is_none=(not shared), external=False) + self._check_rbac(uuid, is_none=(not shared), + action=constants.ACCESS_SHARED) ctx_1 = context.Context(user_id=None, tenant_id=self.tenant_1, @@ -144,66 +162,84 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase): def test_create_network_shared(self): self._create_network(self.tenant_1, self.network_id, True) - self._check_rbac(self.network_id, is_none=False, external=False) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) def test_create_network_not_shared(self): self._create_network(self.tenant_1, self.network_id, False) - self._check_rbac(self.network_id, is_none=True, external=False) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_SHARED) def test_create_network_not_shared_external(self): with mock.patch.object(resource_extend, 'apply_funcs'): self._create_network(self.tenant_1, self.network_id, False, external=True) - self._check_rbac(self.network_id, is_none=False, external=True) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_EXTERNAL) def test_update_network_to_shared(self): self._create_network(self.tenant_1, self.network_id, False) - self._check_rbac(self.network_id, is_none=True, external=False) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_SHARED) network_data = {'shared': True} self._update_network(self.network_id, network_data) - self._check_rbac(self.network_id, is_none=False, external=False) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) def test_update_network_to_no_shared_no_subnets(self): self._create_network(self.tenant_1, self.network_id, True) - self._check_rbac(self.network_id, is_none=False, external=False) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) network_data = {'shared': False} self._update_network(self.network_id, network_data) - self._check_rbac(self.network_id, is_none=True, external=False) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_SHARED) def test_update_network_shared_to_external(self): self._create_network(self.tenant_1, self.network_id, True) - self._check_rbac(self.network_id, is_none=False, external=False) - self._check_rbac(self.network_id, is_none=True, external=True) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_EXTERNAL) network_data = {extnet_apidef.EXTERNAL: True} self._update_network(self.network_id, network_data) - self._check_rbac(self.network_id, is_none=False, external=False) - self._check_rbac(self.network_id, is_none=False, external=True) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_EXTERNAL) def test_update_network_shared_to_internal(self): self._create_network(self.tenant_1, self.network_id, True, external=True) - self._check_rbac(self.network_id, is_none=False, external=False) - self._check_rbac(self.network_id, is_none=False, external=True) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_EXTERNAL) network_data = {extnet_apidef.EXTERNAL: False} self._update_network(self.network_id, network_data) - self._check_rbac(self.network_id, is_none=False, external=False) - self._check_rbac(self.network_id, is_none=True, external=True) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_EXTERNAL) def test_update_network_to_no_shared_tenant_subnet(self): self._create_network(self.tenant_1, self.network_id, True) - self._check_rbac(self.network_id, is_none=False, external=False) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) self._create_subnet(self.tenant_1, self.subnet_1_id, True) network_data = {'shared': False} self._update_network(self.network_id, network_data) - self._check_rbac(self.network_id, is_none=True, external=False) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_SHARED) def test_update_network_to_no_shared_no_tenant_subnet(self): self._create_network(self.tenant_1, self.network_id, True) - self._check_rbac(self.network_id, is_none=False, external=False) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) self._create_subnet(self.tenant_1, self.subnet_1_id, True) self._create_subnet(self.tenant_2, self.subnet_2_id, True, cidr='10.10.20.0/24') @@ -264,3 +300,37 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase): self.plugin.ensure_no_tenant_ports_on_network, self.ctx, self.network_id, self.tenant_1, self.tenant_2) + + def _external_and_shared_network(self, project_id): + self._create_network(self.tenant_1, self.network_id, False, + external=True) + self._create_subnet(self.tenant_1, self.subnet_1_id, False) + self._create_floating_ip(project_id, self.network_id) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_EXTERNAL) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_SHARED) + + # Add a RBAC with action=access_as_shared + rbac_shared = self._create_rbac( + self.tenant_1, self.network_id, action=constants.ACCESS_SHARED, + target_project='*') + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_SHARED) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_EXTERNAL) + + # Delete the created RBAC with action=access_as_shared. The FIP does + # not interfere with the RBAC deletion because it can be created due + # to the RBAC action=access_as_external. + self._delete_rbac(rbac_shared['id']) + self._check_rbac(self.network_id, is_none=True, + action=constants.ACCESS_SHARED) + self._check_rbac(self.network_id, is_none=False, + action=constants.ACCESS_EXTERNAL) + + def test_external_network_update_shared_flag_own_project_fip(self): + self._external_and_shared_network(self.tenant_1) + + def test_external_network_update_shared_flag_other_project_fip(self): + self._external_and_shared_network(self.tenant_2)