Merge "Filter out the floating IPs when removing a shared RBAC" into stable/2024.1

This commit is contained in:
Zuul 2024-11-27 18:19:28 +00:00 committed by Gerrit Code Review
commit e7716f0dfd
2 changed files with 100 additions and 26 deletions

View File

@ -276,6 +276,10 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
allowed_tenants.append(net_tenant_id)
ports = ports.filter(
~models_v2.Port.tenant_id.in_(allowed_tenants))
# Filter any port with project_id=''. These ports are related
# to floating IPs, router ports (gateway, SNAT, FIP agent, HA
# interface).
ports = ports.filter(models_v2.Port.project_id.notin_(['']))
else:
# if there is a wildcard rule, we can return early because it
# allows any ports

View File

@ -20,10 +20,11 @@ from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.db import rbac_db_models
from neutron.db import l3_dvrscheduler_db
from neutron.objects import network as network_obj
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron import quota
from neutron.services.l3_router import l3_router_plugin
from neutron.tests.unit import testlib_api
@ -36,6 +37,9 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
DB_PLUGIN_KLASS = 'neutron.plugins.ml2.plugin.Ml2Plugin'
self.setup_coreplugin(DB_PLUGIN_KLASS)
self.plugin = ml2_plugin.Ml2Plugin()
self.mock_notify_l3_agent = mock.patch.object(
l3_dvrscheduler_db, '_notify_l3_agent_new_port').start()
self.plugin_l3 = l3_router_plugin.L3RouterPlugin()
self.ctx = context.Context(user_id=None,
tenant_id=None,
is_admin=True,
@ -92,14 +96,26 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
'fixed_ips': constants.ATTR_NOT_SPECIFIED}
return self.plugin.create_port(self.ctx, {'port': port})
def _create_floating_ip(self, tenant_id, network_id):
fip = {'tenant_id': tenant_id,
'floating_network_id': network_id}
return self.plugin_l3.create_floatingip(self.ctx, {'floatingip': fip})
def _create_rbac(self, project_id, network_id, action, target_project):
rbac = {'project_id': project_id,
'object_id': network_id,
'object_type': 'network',
'target_project': target_project,
'action': action}
return self.plugin.create_rbac_policy(self.ctx, {'rbac_policy': rbac})
def _delete_rbac(self, rbac_id):
return self.plugin.delete_rbac_policy(self.ctx, rbac_id)
def _list_networks(self, ctx):
return self.plugin.get_networks(ctx)
def _check_rbac(self, network_id, is_none, external):
if external:
action = rbac_db_models.ACCESS_EXTERNAL
else:
action = rbac_db_models.ACCESS_SHARED
def _check_rbac(self, network_id, is_none, action):
rbac = network_obj.NetworkRBAC.get_object(
self.ctx, object_id=network_id, action=action, target_project='*')
if is_none:
@ -116,10 +132,12 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
'net-shared': (uuidutils.generate_uuid(), True)}
for uuid, shared in tenant_1.values():
self._create_network(self.tenant_1, uuid, shared)
self._check_rbac(uuid, is_none=(not shared), external=False)
self._check_rbac(uuid, is_none=(not shared),
action=constants.ACCESS_SHARED)
for uuid, shared in tenant_2.values():
self._create_network(self.tenant_2, uuid, shared)
self._check_rbac(uuid, is_none=(not shared), external=False)
self._check_rbac(uuid, is_none=(not shared),
action=constants.ACCESS_SHARED)
ctx_1 = context.Context(user_id=None,
tenant_id=self.tenant_1,
@ -144,66 +162,84 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
def test_create_network_shared(self):
self._create_network(self.tenant_1, self.network_id, True)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
def test_create_network_not_shared(self):
self._create_network(self.tenant_1, self.network_id, False)
self._check_rbac(self.network_id, is_none=True, external=False)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_SHARED)
def test_create_network_not_shared_external(self):
with mock.patch.object(resource_extend, 'apply_funcs'):
self._create_network(self.tenant_1, self.network_id, False,
external=True)
self._check_rbac(self.network_id, is_none=False, external=True)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_EXTERNAL)
def test_update_network_to_shared(self):
self._create_network(self.tenant_1, self.network_id, False)
self._check_rbac(self.network_id, is_none=True, external=False)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_SHARED)
network_data = {'shared': True}
self._update_network(self.network_id, network_data)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
def test_update_network_to_no_shared_no_subnets(self):
self._create_network(self.tenant_1, self.network_id, True)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
network_data = {'shared': False}
self._update_network(self.network_id, network_data)
self._check_rbac(self.network_id, is_none=True, external=False)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_SHARED)
def test_update_network_shared_to_external(self):
self._create_network(self.tenant_1, self.network_id, True)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=True, external=True)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_EXTERNAL)
network_data = {extnet_apidef.EXTERNAL: True}
self._update_network(self.network_id, network_data)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False, external=True)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_EXTERNAL)
def test_update_network_shared_to_internal(self):
self._create_network(self.tenant_1, self.network_id, True,
external=True)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False, external=True)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_EXTERNAL)
network_data = {extnet_apidef.EXTERNAL: False}
self._update_network(self.network_id, network_data)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=True, external=True)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_EXTERNAL)
def test_update_network_to_no_shared_tenant_subnet(self):
self._create_network(self.tenant_1, self.network_id, True)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._create_subnet(self.tenant_1, self.subnet_1_id, True)
network_data = {'shared': False}
self._update_network(self.network_id, network_data)
self._check_rbac(self.network_id, is_none=True, external=False)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_SHARED)
def test_update_network_to_no_shared_no_tenant_subnet(self):
self._create_network(self.tenant_1, self.network_id, True)
self._check_rbac(self.network_id, is_none=False, external=False)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._create_subnet(self.tenant_1, self.subnet_1_id, True)
self._create_subnet(self.tenant_2, self.subnet_2_id, True,
cidr='10.10.20/24')
@ -264,3 +300,37 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
self.plugin.ensure_no_tenant_ports_on_network,
self.ctx, self.network_id, self.tenant_1,
self.tenant_2)
def _external_and_shared_network(self, project_id):
self._create_network(self.tenant_1, self.network_id, False,
external=True)
self._create_subnet(self.tenant_1, self.subnet_1_id, False)
self._create_floating_ip(project_id, self.network_id)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_EXTERNAL)
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_SHARED)
# Add a RBAC with action=access_as_shared
rbac_shared = self._create_rbac(
self.tenant_1, self.network_id, action=constants.ACCESS_SHARED,
target_project='*')
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_SHARED)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_EXTERNAL)
# Delete the created RBAC with action=access_as_shared. The FIP does
# not interfere with the RBAC deletion because it can be created due
# to the RBAC action=access_as_external.
self._delete_rbac(rbac_shared['id'])
self._check_rbac(self.network_id, is_none=True,
action=constants.ACCESS_SHARED)
self._check_rbac(self.network_id, is_none=False,
action=constants.ACCESS_EXTERNAL)
def test_external_network_update_shared_flag_own_project_fip(self):
self._external_and_shared_network(self.tenant_1)
def test_external_network_update_shared_flag_other_project_fip(self):
self._external_and_shared_network(self.tenant_2)