Add "can_set_project_id" attribute to the context object

In case when API policies with custom roles has to be defined by the
operator and such custom role should have granted persmission to
send project_id, other than own project_id in the request body, like for
example "network_admin" role who should be able to create networks on
behalf of every project in the cloud, it was not possible to achieve so far.

The problem was that for all non-admin and not service users, function
``neutron_lib.api.attributes._validate_privileges`` had hardcoded that
sending project_id in the request body is only allowed for admin and
service user (advsvc).

This patch introduces new API policy rule called
`context_can_set_project_id` and attribute `can_set_project_id` to the
neutron_lib.context.ContextBase class.
By default `context_can_set_project_id` rule is granted to nobody but it
can be defined in the neutron policy file like e.g.:

    "context_can_set_project_id": "role:network_admin"

This doesn't mean that anyone with such role will be able to create
anything for any project because there is still policy engine with
defined API policies which prevents that.
So to e.g. grant such network_admin user permission to create networks
for every project, additional rule would be needed in policy file and it
can looks like:

"create_network": "(rule:admin_only) or
                   (role:member and project_id:%(project_id)s) or
                   role:network_admin"

Closes-Bug: #2133212

Change-Id: I45fd5d227fb6d6bf31e239e9d36f7b39f9b1257e
Signed-off-by: Slawek Kaplonski <skaplons@redhat.com>
This commit is contained in:
Slawek Kaplonski
2025-12-03 16:21:49 +01:00
parent cdc293be4c
commit dde9ccfee0
5 changed files with 72 additions and 4 deletions

View File

@@ -25,10 +25,10 @@ from neutron_lib import exceptions
def _validate_privileges(context, res_dict):
if ('project_id' in res_dict and
res_dict['project_id'] != context.project_id and
not (context.is_admin or context.is_service_role)):
not (context.can_set_project_id or context.is_service_role)):
msg = _("Specifying 'project_id' or 'tenant_id' other than the "
"authenticated project in request requires admin or advsvc "
"privileges")
"authenticated project in request is not allowed for "
"current user.")
raise exc.HTTPBadRequest(msg)

View File

@@ -36,7 +36,7 @@ class ContextBase(oslo_context.RequestContext):
def __init__(self, user_id=None, project_id=None, is_admin=None,
timestamp=None, project_name=None, user_name=None,
is_advsvc=None, tenant_id=None, tenant_name=None,
has_global_access=False, **kwargs):
has_global_access=False, can_set_project_id=False, **kwargs):
# NOTE(jamielennox): We maintain this argument order for tests that
# pass arguments positionally.
@@ -55,6 +55,7 @@ class ContextBase(oslo_context.RequestContext):
kwargs.setdefault('project_id', project_id)
kwargs.setdefault('project_name', project_name)
self._has_global_access = has_global_access
self._can_set_project_id = can_set_project_id
super().__init__(
is_admin=is_admin, user_id=user_id, **kwargs)
@@ -70,6 +71,9 @@ class ContextBase(oslo_context.RequestContext):
if not self._has_global_access:
self._has_global_access = policy_engine.check_has_global_access(
self)
if not self._can_set_project_id:
self._can_set_project_id = policy_engine.check_can_set_project_id(
self)
@property
def tenant_id(self):
@@ -108,6 +112,14 @@ class ContextBase(oslo_context.RequestContext):
def has_global_access(self):
return self.is_admin or self._has_global_access
@property
def can_set_project_id(self):
# TODO(slaweq): add also self.is_service_role to the condition
# once is_advsvc attribute will be deprecated and is_service_role
# will be possible to set through the argument to the __init__
# method
return self.is_admin or self._can_set_project_id
def to_dict(self):
context = super().to_dict()
context.update({
@@ -119,6 +131,7 @@ class ContextBase(oslo_context.RequestContext):
'project_name': self.project_name,
'user_name': self.user_name,
'has_global_access': self.has_global_access,
'can_set_project_id': self.can_set_project_id,
})
return context
@@ -127,6 +140,7 @@ class ContextBase(oslo_context.RequestContext):
values['tenant_id'] = self.project_id
values['is_admin'] = self.is_admin
values['has_global_access'] = self.has_global_access
values['can_set_project_id'] = self.can_set_project_id
# NOTE(jamielennox): These are almost certainly unused and non-standard
# but kept for backwards compatibility. Remove them in Pike

View File

@@ -22,6 +22,7 @@ _ADMIN_CTX_POLICY = 'context_is_admin'
_GLOBAL_CTX_POLICY = 'context_with_global_access'
_ADVSVC_CTX_POLICY = 'context_is_advsvc'
_SERVICE_ROLE = 'service_api'
_CAN_SET_PROJECT_ID_CTX_POLICY = 'context_can_set_project_id'
opts.set_defaults(cfg.CONF)
@@ -42,6 +43,20 @@ _BASE_RULES = [
_GLOBAL_CTX_POLICY,
'!',
description='Rule for context with global access to the resources'),
policy.RuleDefault(
# By default, no one except admin can send project id in the request
# body.
# That is special meaning of the "!" in rule, see
# https://docs.openstack.org/oslo.policy/latest/admin/policy-yaml-file.html#examples.
# This policy rule should be overridden by the cloud administrator if
# there is need to have any custom role with privileges to send
# project_id in the request body of the POST requests to create
# resources for other projects.
_CAN_SET_PROJECT_ID_CTX_POLICY,
'!',
description='Rule for context with privileges to send project_id in '
'the request body of the requests to create resources '
'for other projects'),
policy.RuleDefault(
_ADVSVC_CTX_POLICY,
'role:advsvc',
@@ -105,6 +120,16 @@ def check_has_global_access(context):
return _check_rule(context, _GLOBAL_CTX_POLICY)
def check_can_set_project_id(context):
"""Verify context has rights to send project_id in the request body
:param context: The context object.
:returns: True if the context has rights to send project_id in
the request body (as per the global enforcer) and False otherwise.
"""
return _check_rule(context, _CAN_SET_PROJECT_ID_CTX_POLICY)
def check_is_advsvc(context):
"""Verify context has advsvc rights according to global policy settings.

View File

@@ -117,12 +117,14 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertFalse(ctx.is_admin)
self.assertTrue(ctx.is_advsvc)
self.assertFalse(ctx.has_global_access)
self.assertFalse(ctx.can_set_project_id)
def test_neutron_context_create_is_service_role(self):
ctx = context.Context('user_id', 'project_id', roles=['service'])
self.assertFalse(ctx.is_admin)
self.assertTrue(ctx.is_service_role)
self.assertFalse(ctx.has_global_access)
self.assertFalse(ctx.has_global_access)
def test_neutron_context_create_with_auth_token(self):
ctx = context.Context('user_id', 'project_id',
@@ -226,6 +228,7 @@ class TestNeutronContext(_base.BaseTestCase):
self.assertIsNone(ctx_dict['auth_token'])
self.assertTrue(ctx_dict['is_admin'])
self.assertTrue(ctx_dict['has_global_access'])
self.assertTrue(ctx_dict['can_set_project_id'])
self.assertIn('admin', ctx_dict['roles'])
self.assertIsNotNone(ctx.session)
self.assertNotIn('session', ctx_dict)
@@ -274,6 +277,7 @@ class TestNeutronContext(_base.BaseTestCase):
elevated_ctx = ctx.elevated()
self.assertTrue(elevated_ctx.is_admin)
self.assertTrue(elevated_ctx.has_global_access)
self.assertTrue(elevated_ctx.can_set_project_id)
for expected_role in expected_roles:
self.assertIn(expected_role, elevated_ctx.roles)
# make sure we do not set the system scope in context
@@ -285,6 +289,7 @@ class TestNeutronContext(_base.BaseTestCase):
ctx = context.Context('user_id', 'project_id', roles=custom_roles)
self.assertFalse(ctx.is_admin)
self.assertFalse(ctx.has_global_access)
self.assertFalse(ctx.can_set_project_id)
self.assertNotEqual('all', ctx.system_scope)
for expected_admin_role in expected_admin_roles:
self.assertNotIn(expected_admin_role, ctx.roles)
@@ -294,6 +299,7 @@ class TestNeutronContext(_base.BaseTestCase):
elevated_ctx = ctx.elevated()
self.assertTrue(elevated_ctx.is_admin)
self.assertTrue(elevated_ctx.has_global_access)
self.assertTrue(elevated_ctx.can_set_project_id)
for expected_admin_role in expected_admin_roles:
self.assertIn(expected_admin_role, elevated_ctx.roles)
for custom_role in custom_roles:
@@ -336,6 +342,17 @@ class TestNeutronContext(_base.BaseTestCase):
ctx = context.Context('user_id', 'project_id')
self.assertTrue(ctx.has_global_access)
def test_neutron_context_can_set_project_id(self):
with mock.patch('neutron_lib.policy._engine.check_can_set_project_id',
return_value=False):
ctx = context.Context('user_id', 'project_id')
self.assertFalse(ctx.can_set_project_id)
with mock.patch('neutron_lib.policy._engine.check_can_set_project_id',
return_value=True):
ctx = context.Context('user_id', 'project_id')
self.assertTrue(ctx.can_set_project_id)
def test_to_policy_values(self):
values = {
'user_id': 'user_id',

View File

@@ -0,0 +1,12 @@
---
features:
- |
New attribute ``can_set_project_id`` is added to the context object. Value of
this attribute is set based on the API policy rule
``context_can_set_project_id`` and should be used in case when there are
custom roles with granted permission to send project_id in the request body of the
POST requests to create resources for other projects.
For example, ``network_admin`` role which should have persmission to create
networks for all projects in the cloud.
By default ``context.can_set_project_id`` is granted to no one and only admin and
service users can send project_id in the request body.