rbacs: clean-up to use defined constants ACCESS_*

Some files are using strings access_as_shared or access_as_external
instead of using defined constants ACCESS_SHARED and ACCESS_EXTERNAL.

This commit is doing the cleaning it does not bring any functional
change.

Signed-off-by: Sahid Orentino Ferdjaoui <sahid.ferdjaoui@industrialdiscipline.com>
Change-Id: Ib75326c762776c5259740cb2f0abc1163842f95d
This commit is contained in:
Sahid Orentino Ferdjaoui 2023-05-05 15:59:13 +02:00
parent 232a67f444
commit 256297fc7f
7 changed files with 44 additions and 31 deletions

View File

@ -34,6 +34,7 @@ from oslo_log import log as logging
from sqlalchemy.orm import exc
from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.objects import base as base_obj
from neutron.objects import ports as port_obj
from neutron.objects import subnet as subnet_obj
@ -347,7 +348,7 @@ class DbBasePluginCommon(object):
# is shared to the calling tenant via an RBAC entry.
matches = ('*',) + ((context.tenant_id,) if context else ())
for entry in rbac_entries:
if (entry.action == 'access_as_shared' and
if (entry.action == rbac_db_models.ACCESS_SHARED and
entry.target_project in matches):
return True
return False

View File

@ -57,6 +57,7 @@ from neutron.db import db_base_plugin_common
from neutron.db import ipam_pluggable_backend
from neutron.db import models_v2
from neutron.db import rbac_db_mixin as rbac_mixin
from neutron.db import rbac_db_models
from neutron.db import standardattrdescription_db as stattr_db
from neutron.exceptions import mtu as mtu_exc
from neutron.extensions import subnetpool_prefix_ops
@ -204,7 +205,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
policy = (payload.request_body if event == events.BEFORE_CREATE
else payload.latest_state)
if object_type != 'network' or policy['action'] != 'access_as_shared':
if (object_type != 'network' or
policy['action'] != rbac_db_models.ACCESS_SHARED):
# we only care about shared network policies
return
# The object a policy targets cannot be changed so we can look
@ -247,7 +249,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# any port with another RBAC entry covering it or one belonging
# to the same tenant as the network owner is ok
other_rbac_objs = network_obj.NetworkRBAC.get_objects(
elevated, object_id=network_id, action='access_as_shared')
elevated, object_id=network_id,
action=rbac_db_models.ACCESS_SHARED)
allowed_tenants = [rbac['target_project'] for rbac
in other_rbac_objs
if rbac.target_project != tenant_id]
@ -259,7 +262,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
# allows any ports
if network_obj.NetworkRBAC.get_object(
elevated, object_id=network_id,
action='access_as_shared', target_project='*'):
action=rbac_db_models.ACCESS_SHARED,
target_project='*'):
return
ports = ports.filter(models_v2.Port.project_id == tenant_id)
if ports.count():
@ -305,7 +309,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
def _validate_projects_have_access_to_network(self, network, project_ids):
ctx_admin = ctx.get_admin_context()
other_rbac_objs = network_obj.NetworkRBAC.get_objects(
ctx_admin, object_id=network.id, action='access_as_shared')
ctx_admin, object_id=network.id,
action=rbac_db_models.ACCESS_SHARED)
allowed_projects = {rbac['target_project'] for rbac in other_rbac_objs
if rbac.target_project != '*'}
allowed_projects.add(network.project_id)
@ -419,7 +424,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if n['shared']:
np_rbac_args = {'project_id': network.project_id,
'object_id': network.id,
'action': 'access_as_shared',
'action': rbac_db_models.ACCESS_SHARED,
'target_project': '*'}
np_rbac_obj = network_obj.NetworkRBAC(context, **np_rbac_args)
np_rbac_obj.create()
@ -437,7 +442,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if 'shared' in n:
entry = None
for item in network.rbac_entries:
if (item.action == 'access_as_shared' and
if (item.action == rbac_db_models.ACCESS_SHARED and
item.target_project == '*'):
entry = item
break
@ -447,7 +452,7 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
if update_shared and not entry:
np_rbac_args = {'project_id': network.project_id,
'object_id': network.id,
'action': 'access_as_shared',
'action': rbac_db_models.ACCESS_SHARED,
'target_project': '*'}
np_rbac_obj = network_obj.NetworkRBAC(context,
**np_rbac_args)
@ -455,7 +460,8 @@ class NeutronDbPluginV2(db_base_plugin_common.DbBasePluginCommon,
elif not update_shared and entry:
network_obj.NetworkRBAC.delete_objects(
context, object_id=network.id,
action='access_as_shared', target_project='*')
action=rbac_db_models.ACCESS_SHARED,
target_project='*')
# TODO(ihrachys) Below can be removed when we make sqlalchemy
# event listeners in neutron_lib/db/api.py to refresh expired

View File

@ -31,6 +31,7 @@ from sqlalchemy.sql import expression as expr
from neutron._i18n import _
from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.extensions import rbac as rbac_ext
from neutron.objects import network as net_obj
from neutron.objects import ports as port_obj
@ -47,7 +48,7 @@ def _network_filter_hook(context, original_model, conditions):
# shared check so we don't need to worry about ensuring that
rbac_model = original_model.rbac_entries.property.mapper.class_
tenant_allowed = (
(rbac_model.action == 'access_as_external') &
(rbac_model.action == rbac_db_models.ACCESS_EXTERNAL) &
(rbac_model.target_project == context.tenant_id) |
(rbac_model.target_project == '*'))
conditions = expr.or_(tenant_allowed, *conditions)
@ -102,7 +103,7 @@ class External_net_db_mixin(object):
context, network_id=net_data['id']).create()
net_rbac_args = {'project_id': net_data['tenant_id'],
'object_id': net_data['id'],
'action': 'access_as_external',
'action': rbac_db_models.ACCESS_EXTERNAL,
'target_project': '*'}
net_obj.NetworkRBAC(context, **net_rbac_args).create()
net_data[extnet_apidef.EXTERNAL] = external
@ -123,7 +124,7 @@ class External_net_db_mixin(object):
if allow_all:
net_rbac_args = {'project_id': net_data['tenant_id'],
'object_id': net_id,
'action': 'access_as_external',
'action': rbac_db_models.ACCESS_EXTERNAL,
'target_project': '*'}
net_obj.NetworkRBAC(context, **net_rbac_args).create()
else:
@ -138,7 +139,8 @@ class External_net_db_mixin(object):
net_obj.ExternalNetwork.delete_objects(
context, network_id=net_id)
net_obj.NetworkRBAC.delete_objects(
context, object_id=net_id, action='access_as_external')
context, object_id=net_id,
action=rbac_db_models.ACCESS_EXTERNAL)
net_data[extnet_apidef.EXTERNAL] = False
def _process_l3_delete(self, context, network_id):
@ -154,7 +156,7 @@ class External_net_db_mixin(object):
context = payload.context
if (object_type != 'network' or
policy['action'] != 'access_as_external'):
policy['action'] != rbac_db_models.ACCESS_EXTERNAL):
return
net = self.get_network(context, policy['object_id'])
if not context.is_admin and net['tenant_id'] != context.tenant_id:
@ -175,12 +177,12 @@ class External_net_db_mixin(object):
context = payload.context
if (object_type != 'network' or
policy['action'] != 'access_as_external'):
policy['action'] != rbac_db_models.ACCESS_EXTERNAL):
return
# If the network still have rbac policies, we should not
# update external attribute.
if net_obj.NetworkRBAC.count(context, object_id=policy['object_id'],
action='access_as_external'):
action=rbac_db_models.ACCESS_EXTERNAL):
return
net = self.get_network(context, policy['object_id'])
self._process_l3_update(context, net,
@ -195,7 +197,7 @@ class External_net_db_mixin(object):
context = payload.context
if (object_type != 'network' or
policy['action'] != 'access_as_external'):
policy['action'] != rbac_db_models.ACCESS_EXTERNAL):
return
new_project = None
if event == events.BEFORE_UPDATE:
@ -215,7 +217,7 @@ class External_net_db_mixin(object):
# router lookup because they will have access either way
if net_obj.NetworkRBAC.count(
context, object_id=policy['object_id'],
action='access_as_external', target_project='*'):
action=rbac_db_models.ACCESS_EXTERNAL, target_project='*'):
return
router_exist = l3_obj.Router.objects_exist(context, **filters)
else:
@ -230,7 +232,7 @@ class External_net_db_mixin(object):
details=msg)
projects = net_obj.NetworkRBAC.get_projects(
context, object_id=policy['object_id'],
action='access_as_external')
action=rbac_db_models.ACCESS_EXTERNAL)
projects_with_entries = [project for project in projects
if project != '*']
if new_project:

View File

@ -53,6 +53,7 @@ from neutron.db import l3_attrs_db
from neutron.db.models import l3 as l3_models
from neutron.db.models import l3_attrs as l3_attrs_models
from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.db import standardattrdescription_db as st_attr
from neutron.extensions import l3
from neutron.extensions import segment as segment_ext
@ -884,7 +885,7 @@ class L3_NAT_dbonly_mixin(l3.RouterPluginBase,
with db_api.CONTEXT_READER.using(elevated):
rbac_allowed_projects = network_obj.NetworkRBAC.get_projects(
elevated, object_id=subnet['network_id'],
action='access_as_shared',
action=rbac_db_models.ACCESS_SHARED,
target_project=context.project_id)
# Fail if the current project_id is NOT in the allowed

View File

@ -20,6 +20,7 @@ from neutron_lib import exceptions as n_exc
from oslo_config import cfg
from oslo_utils import uuidutils
from neutron.db import rbac_db_models
from neutron.objects import network as network_obj
from neutron.plugins.ml2 import plugin as ml2_plugin
from neutron import quota
@ -96,9 +97,9 @@ class NetworkRBACTestCase(testlib_api.SqlTestCase):
def _check_rbac(self, network_id, is_none, external):
if external:
action = 'access_as_external'
action = rbac_db_models.ACCESS_EXTERNAL
else:
action = 'access_as_shared'
action = rbac_db_models.ACCESS_SHARED
rbac = network_obj.NetworkRBAC.get_object(
self.ctx, object_id=network_id, action=action, target_project='*')
if is_none:

View File

@ -61,6 +61,7 @@ from neutron.db import ipam_backend_mixin
from neutron.db.models import l3 as l3_models
from neutron.db.models import securitygroup as sg_models
from neutron.db import models_v2
from neutron.db import rbac_db_models
from neutron.exceptions import mtu as mtu_exc
from neutron.ipam.drivers.neutrondb_ipam import driver as ipam_driver
from neutron.ipam import exceptions as ipam_exc
@ -2898,12 +2899,12 @@ class TestNetworksV2(NeutronDbPluginV2TestCase):
with db_api.CONTEXT_WRITER.using(ctx):
network_obj.NetworkRBAC(
ctx, object_id=network['network']['id'],
action='access_as_shared',
action=rbac_db_models.ACCESS_SHARED,
project_id=network['network']['tenant_id'],
target_project='somebody_else').create()
network_obj.NetworkRBAC(
ctx, object_id=network['network']['id'],
action='access_as_shared',
action=rbac_db_models.ACCESS_SHARED,
project_id=network['network']['tenant_id'],
target_project='one_more_somebody_else').create()
res1 = self._create_port(self.fmt,
@ -6700,7 +6701,7 @@ class DbModelMixin(object):
network_obj.NetworkRBAC(
ctx, object_id=network.id,
action='access_as_shared',
action=rbac_db_models.ACCESS_SHARED,
project_id=network.project_id,
target_project='*').create()
net2 = models_v2.Network(name="net_net2", status="OK",

View File

@ -34,7 +34,8 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
self.context = context.get_admin_context()
super(NetworkRbacTestcase, self).setUp(plugin='ml2')
def _make_networkrbac(self, network, target, action='access_as_shared'):
def _make_networkrbac(self, network, target,
action=rbac_db_models.ACCESS_SHARED):
policy = {
'rbac_policy': {'project_id': network['network']['project_id'],
'object_id': network['network']['id'],
@ -71,7 +72,7 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
self._assert_external_net_state(net_id, is_external=False)
policy = self._make_networkrbac(ext_net,
'*',
'access_as_external')
rbac_db_models.ACCESS_EXTERNAL)
self.plugin.create_rbac_policy(self.context, policy)
self._assert_external_net_state(net_id, is_external=True)
@ -96,7 +97,7 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
with self.network() as ext_net:
policy = self._make_networkrbac(ext_net,
orig_target,
'access_as_external')
rbac_db_models.ACCESS_EXTERNAL)
netrbac = self.plugin.create_rbac_policy(self.context, policy)
update_policy = {'rbac_policy': {'target_project': new_target}}
@ -114,7 +115,7 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
self._assert_external_net_state(net_id, is_external=False)
policy = self._make_networkrbac(ext_net,
'*',
'access_as_external')
rbac_db_models.ACCESS_EXTERNAL)
net_rbac = self.plugin.create_rbac_policy(self.context, policy)
self._assert_external_net_state(net_id, is_external=True)
self.plugin.delete_rbac_policy(self.context, net_rbac['id'])
@ -126,12 +127,12 @@ class NetworkRbacTestcase(test_plugin.NeutronDbPluginV2TestCase):
self._assert_external_net_state(net_id, is_external=False)
policy1 = self._make_networkrbac(ext_net,
'test-tenant-1',
'access_as_external')
rbac_db_models.ACCESS_EXTERNAL)
net_rbac1 = self.plugin.create_rbac_policy(self.context, policy1)
self._assert_external_net_state(net_id, is_external=True)
policy2 = self._make_networkrbac(ext_net,
'test-tenant-2',
'access_as_external')
rbac_db_models.ACCESS_EXTERNAL)
self.plugin.create_rbac_policy(self.context, policy2)
self._assert_external_net_state(net_id, is_external=True)
self.plugin.delete_rbac_policy(self.context, net_rbac1['id'])