Use the RBAC actions field for "network" and "subnet"

Since [1], it is possible to define a set of RBAC actions to filter the
model query. For "network" and "subnet" models, it is needed to add the
RBAC action "access_as_external" to the query. Instead of adding an
additional filter (as is now), this patch replaces the default RBAC
actions used in the model query, adding this extra one.

The neutron-lib library is bumped to version 3.14.0.

[1]https://review.opendev.org/c/openstack/neutron-lib/+/914473

Closes-Bug: #2059236
Change-Id: Ie3e77e2f812bd5cddf1971bc456854866843d4f3
This commit is contained in:
Rodolfo Alonso Hernandez 2024-04-08 22:19:50 +00:00 committed by Rodolfo Alonso
parent 2b6abf0fc0
commit f22f7ae012
3 changed files with 50 additions and 51 deletions

View File

@ -23,12 +23,10 @@ from neutron_lib.callbacks import resources
from neutron_lib import constants
from neutron_lib.db import model_query
from neutron_lib.db import resource_extend
from neutron_lib.db import utils as db_utils
from neutron_lib import exceptions as n_exc
from neutron_lib.exceptions import external_net as extnet_exc
from neutron_lib.plugins import constants as plugin_constants
from neutron_lib.plugins import directory
from sqlalchemy.sql import expression as expr
from neutron._i18n import _
from neutron.db import models_v2
@ -39,23 +37,9 @@ from neutron.objects import ports as port_obj
from neutron.objects import router as l3_obj
def _network_filter_hook(context, original_model, conditions):
if conditions is not None and not hasattr(conditions, '__iter__'):
conditions = (conditions, )
# Apply the external network filter only in non-admin and non-advsvc
# context
if db_utils.model_query_scope_is_project(context, original_model):
# the table will already be joined to the rbac entries for the
# shared check so we don't need to worry about ensuring that
rbac_model = original_model.rbac_entries.property.mapper.class_
tenant_allowed = (
(rbac_model.action == rbac_db_models.ACCESS_EXTERNAL) &
(rbac_model.target_project == context.tenant_id) |
(rbac_model.target_project == '*'))
conditions = expr.or_(tenant_allowed, *conditions)
conditions = expr.or_(original_model.tenant_id == context.tenant_id,
*conditions)
return conditions
EXTERNAL_NETWORK_RBAC_ACTIONS = {constants.ACCESS_SHARED,
constants.ACCESS_READONLY,
constants.ACCESS_EXTERNAL}
def _network_result_filter_hook(query, filters):
@ -77,14 +61,18 @@ class External_net_db_mixin(object):
models_v2.Network,
"external_net",
query_hook=None,
filter_hook=_network_filter_hook,
result_filters=_network_result_filter_hook)
filter_hook=None,
result_filters=_network_result_filter_hook,
rbac_actions=EXTERNAL_NETWORK_RBAC_ACTIONS,
)
model_query.register_hook(
models_v2.Subnet,
"external_subnet",
query_hook=None,
filter_hook=_network_filter_hook,
result_filters=None)
filter_hook=None,
result_filters=None,
rbac_actions=EXTERNAL_NETWORK_RBAC_ACTIONS,
)
return super(External_net_db_mixin, cls).__new__(cls, *args, **kwargs)
def _network_is_external(self, context, net_id):

View File

@ -15,7 +15,11 @@
from unittest import mock
from neutron_lib.api import attributes
from neutron_lib.api.definitions import external_net as extnet_apidef
from neutron_lib.api.definitions import subnet as subnet_apidef
from neutron_lib.api.definitions import subnet_external_network as \
extsnet_apidef
from neutron_lib import constants
from neutron_lib import context
from neutron_lib.plugins import constants as plugin_constants
@ -24,10 +28,17 @@ from oslo_utils import uuidutils
import testtools
from webob import exc
from neutron.db import external_net_db
from neutron.db import models_v2
from neutron.tests.unit.api.v2 import test_base
from neutron.tests.unit.db import test_db_base_plugin_v2
# Add subnet 'router:external' extension, without loading the extensions.
# This change must be done before the policies are parsed in order to load the
# 'convert_to' method before the ``FieldCheck`` instance for this field is
# created.
rname = subnet_apidef.COLLECTION_NAME
attributes.RESOURCES[rname].update(
extsnet_apidef.RESOURCE_ATTRIBUTE_MAP[rname])
from neutron.tests.unit.db import test_db_base_plugin_v2 # noqa: E402
_uuid = uuidutils.generate_uuid
@ -136,30 +147,6 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
result = plugin.get_networks(ctx)
self.assertFalse(result[0]['shared'])
def test_network_filter_hook_admin_context(self):
ctx = context.Context(None, None, is_admin=True)
model = models_v2.Network
conditions = external_net_db._network_filter_hook(ctx, model, [])
self.assertEqual([], conditions)
def test_network_filter_hook_nonadmin_context(self):
ctx = context.Context('edinson', 'cavani')
model = models_v2.Network
txt = ("networks.project_id = :project_id_1 OR "
"networkrbacs.action = :action_1 AND "
"networkrbacs.target_project = :target_project_1 OR "
"networkrbacs.target_project = :target_project_2")
conditions = external_net_db._network_filter_hook(ctx, model, [])
self.assertEqual(conditions.__str__(), txt)
# Try to concatenate conditions
txt2 = (txt.replace('project_1', 'project_3').
replace('project_2', 'project_4').
replace('action_1', 'action_2').
replace('project_id_1', 'project_id_2'))
conditions = external_net_db._network_filter_hook(ctx, model,
conditions)
self.assertEqual(conditions.__str__(), "%s OR %s" % (txt, txt2))
def test_create_port_external_network_non_admin_fails(self):
with self.network(as_admin=True, router__external=True) as ext_net:
with self.subnet(network=ext_net) as ext_subnet:
@ -199,3 +186,27 @@ class ExtNetDBTestCase(test_db_base_plugin_v2.NeutronDbPluginV2TestCase):
self.assertEqual(exc.HTTPNoContent.code, res.status_int)
(l3_mock.delete_disassociated_floatingips
.assert_called_once_with(mock.ANY, net['network']['id']))
def test_create_shared_networks_and_subnets(self):
with (self.network(as_admin=True, router__external=True) as net_ext,
self.network(as_admin=True, shared=True) as net_shared,
self.network(as_admin=True) as net_admin):
with (self.subnet(as_admin=True, network=net_ext)
as snet_ext, self.subnet(as_admin=True, network=net_shared)
as snet_shared, self.subnet(as_admin=True, network=net_admin)
as snet_admin):
req = self.new_list_request('networks', as_admin=False,
tenant_id='noadmin')
res = self.deserialize(self.fmt, req.get_response(self.api))
net_ids = {net['id'] for net in res['networks']}
self.assertIn(net_ext['network']['id'], net_ids)
self.assertIn(net_shared['network']['id'], net_ids)
self.assertNotIn(net_admin['network']['id'], net_ids)
req = self.new_list_request('subnets', as_admin=False,
tenant_id='noadmin')
res = self.deserialize(self.fmt, req.get_response(self.api))
snet_ids = {snet['id'] for snet in res['subnets']}
self.assertIn(snet_ext['subnet']['id'], snet_ids)
self.assertIn(snet_shared['subnet']['id'], snet_ids)
self.assertNotIn(snet_admin['subnet']['id'], snet_ids)

View File

@ -16,7 +16,7 @@ Jinja2>=2.10 # BSD License (3 clause)
keystonemiddleware>=5.1.0 # Apache-2.0
netaddr>=0.7.18 # BSD
netifaces>=0.10.4 # MIT
neutron-lib>=3.13.0 # Apache-2.0
neutron-lib>=3.14.0 # Apache-2.0
python-neutronclient>=7.8.0 # Apache-2.0
tenacity>=6.0.0 # Apache-2.0
SQLAlchemy>=1.4.23 # MIT