Update model_query_scope_is_project to work properly with new policies

In case when enforce_new_defaults is set to True in config, Neutron
shouldn't check context.is_admin flag to check if query should or
shouldn't be scoped to just one tenant as user with PROJECT_ADMIN role
will have is_admin flag set to True but such user shouldn't get e.g.
networks which belongs to other tenants and aren't shared in any way.

So that patch improves neutron_lib.db.utils.model_query_scope_is_project
function so it works in different way in case when enforce_new_defaults
is set to True or False. In case when new defaults should be enforced
only SYSTEM_SCOPE tokens and advanced_services can make queries which
are not scoped to single tenant.

Closes-Bug: #1919386
Change-Id: I7760413a562e5d1a86709fc5e2d0df405a6aed30
This commit is contained in:
Slawek Kaplonski 2021-03-17 11:37:31 +01:00
parent 90ac6aeb58
commit d73449dd1c
2 changed files with 122 additions and 24 deletions

View File

@ -12,6 +12,7 @@
import functools
from oslo_config import cfg
from oslo_db import exception as db_exc
from oslo_utils import excutils
import sqlalchemy
@ -166,12 +167,25 @@ def model_query_scope_is_project(context, model):
:returns: True if the context is not admin and not advsvc and the model
has a project_id. False otherwise.
"""
# Unless a context is a system_scope token or
# context has 'admin' or 'advanced-service' rights the
# query will be scoped to a single project_id
return (context.system_scope != 'all' and
(not context.is_admin and hasattr(model, 'project_id')) and
(not context.is_advsvc and hasattr(model, 'project_id')))
if not hasattr(model, 'project_id'):
# If model don't have project_id, there is no need to scope query to
# just one project
return False
if context.is_advsvc:
# For context which has 'advanced-service' rights the
# query will not be scoped to a single project_id
return False
# TODO(slaweq): Remove that old is_admin check and always check scopes
# when old, deprecated rules will be removed and only rules with new
# personas will be supported
if cfg.CONF.oslo_policy.enforce_new_defaults:
# Unless a context is a system_scope token, query should be scoped to a
# single project_id
return context.system_scope != 'all'
else:
# Unless context has 'admin' rights the
# query will be scoped to a single project_id
return not context.is_admin
def model_query(context, model):

View File

@ -12,6 +12,7 @@
from unittest import mock
from oslo_config import cfg
from oslo_db.sqlalchemy import models
import sqlalchemy as sa
from sqlalchemy.ext import declarative
@ -89,8 +90,13 @@ class TestUtils(base.BaseTestCase):
utils.resource_fields(r, ['name'])
mock_populate.assert_called_once_with({'name': 'n'})
def test_model_query_scope_is_project_admin(self):
ctx = context.Context(project_id='some project', is_admin=True)
def test_model_query_scope_is_project_admin_old_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', False, group='oslo_policy')
ctx = context.Context(
project_id='some project',
is_admin=True,
is_advsvc=False)
model = mock.Mock(project_id='project')
self.assertFalse(
@ -101,8 +107,13 @@ class TestUtils(base.BaseTestCase):
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_advsvc(self):
ctx = context.Context(project_id='some project', is_advsvc=True)
def test_model_query_scope_is_project_advsvc_old_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', False, group='oslo_policy')
ctx = context.Context(
project_id='some project',
is_admin=False,
is_advsvc=True)
model = mock.Mock(project_id='project')
self.assertFalse(
@ -113,20 +124,13 @@ class TestUtils(base.BaseTestCase):
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_system_scope(self):
ctx = context.Context(system_scope='all')
model = mock.Mock(project_id='project')
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
# Ensure that project_id isn't mocked
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_regular_user(self):
ctx = context.Context(project_id='some project')
def test_model_query_scope_is_project_regular_user_old_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', False, group='oslo_policy')
ctx = context.Context(
project_id='some project',
is_admin=False,
is_advsvc=False)
model = mock.Mock(project_id='project')
self.assertTrue(
@ -136,3 +140,83 @@ class TestUtils(base.BaseTestCase):
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_system_scope_old_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', False, group='oslo_policy')
ctx = context.Context(system_scope='all')
model = mock.Mock(project_id='project')
self.assertTrue(
utils.model_query_scope_is_project(ctx, model))
# Ensure that project_id isn't mocked
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_admin_new_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', True, group='oslo_policy')
ctx = context.Context(
project_id='some project',
is_admin=True,
is_advsvc=False)
model = mock.Mock(project_id='project')
self.assertTrue(
utils.model_query_scope_is_project(ctx, model))
# Ensure that project_id isn't mocked
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_advsvc_new_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', True, group='oslo_policy')
ctx = context.Context(
project_id='some project',
is_admin=False,
is_advsvc=True)
model = mock.Mock(project_id='project')
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
# Ensure that project_id isn't mocked
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_regular_user_new_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', True, group='oslo_policy')
ctx = context.Context(
project_id='some project',
is_admin=False,
is_advsvc=False)
model = mock.Mock(project_id='project')
self.assertTrue(
utils.model_query_scope_is_project(ctx, model))
# Ensure that project_id isn't mocked
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
def test_model_query_scope_is_project_system_scope_new_defaults(self):
cfg.CONF.set_override(
'enforce_new_defaults', True, group='oslo_policy')
ctx = context.Context(
system_scope='all')
model = mock.Mock(project_id='project')
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))
# Ensure that project_id isn't mocked
del model.project_id
self.assertFalse(
utils.model_query_scope_is_project(ctx, model))