6d8ada0ac9
By default ADMIN user in the new Secure RBAC policies should behave in the same way as in the legacy rules so basically every API operation for any project should be allowed for ADMIN user. In the new rules there are roles like PROJECT_MEMBER and PROJECT_READER and those personas don't inherits directly from ADMIN which means that if something is possible to e.g. PROJECT_MEMBER it isn't automatically also allowed to ADMIN and we need to explicitly allow ADMIN user to do such requests. It was done like that for many of API calls already but not for all of them (probably by mistake). This patch introduces new composite check ADMIN_OR_PROJECT_MEMBER and uses it in the check strings where ADMIN or PROJECT_MEMBER user is allowed to use the API. It also changes some of the check strings which used "policy_or" to combine ADMIN and PROJECT_MEMBER or PROJECT_READER so that those composite checks ADMIN_OR_PROJECT_MEMBER and ADMIN_OR_PROJECT_READER are used everywhere. Closes-Bug: #1997089 Change-Id: Iab5cd6c7aa07ca8527c5fa8396c9ed0da65b4fa7
288 lines
9.4 KiB
Python
288 lines
9.4 KiB
Python
# Copyright (c) 2021 Red Hat Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
from oslo_policy import policy as base_policy
|
|
|
|
from neutron import policy
|
|
from neutron.tests.unit.conf.policies import test_base as base
|
|
|
|
|
|
class TrunkAPITestCase(base.PolicyBaseTestCase):
|
|
|
|
def setUp(self):
|
|
super(TrunkAPITestCase, self).setUp()
|
|
self.target = {'project_id': self.project_id}
|
|
self.alt_target = {'project_id': self.alt_project_id}
|
|
|
|
|
|
class SystemAdminTests(TrunkAPITestCase):
|
|
|
|
def setUp(self):
|
|
super(SystemAdminTests, self).setUp()
|
|
self.context = self.system_admin_ctx
|
|
|
|
def test_create_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'create_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'create_trunk', self.alt_target)
|
|
|
|
def test_get_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'get_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'get_trunk', self.alt_target)
|
|
|
|
def test_update_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'update_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'update_trunk', self.alt_target)
|
|
|
|
def test_delete_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'delete_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'delete_trunk', self.alt_target)
|
|
|
|
def test_get_subports(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'get_subports', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'get_subports', self.alt_target)
|
|
|
|
def test_add_subports(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'add_subports', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'add_subports', self.alt_target)
|
|
|
|
def test_remove_subports(self):
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'remove_subports', self.target)
|
|
self.assertRaises(
|
|
base_policy.InvalidScope,
|
|
policy.enforce,
|
|
self.context, 'remove_subports', self.alt_target)
|
|
|
|
|
|
class SystemMemberTests(SystemAdminTests):
|
|
|
|
def setUp(self):
|
|
super(SystemMemberTests, self).setUp()
|
|
self.context = self.system_member_ctx
|
|
|
|
|
|
class SystemReaderTests(SystemMemberTests):
|
|
|
|
def setUp(self):
|
|
super(SystemReaderTests, self).setUp()
|
|
self.context = self.system_reader_ctx
|
|
|
|
|
|
class AdminTests(TrunkAPITestCase):
|
|
|
|
def setUp(self):
|
|
super(AdminTests, self).setUp()
|
|
self.context = self.project_admin_ctx
|
|
|
|
def test_create_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'create_trunk', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'create_trunk', self.alt_target))
|
|
|
|
def test_get_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'get_trunk', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'get_trunk', self.alt_target))
|
|
|
|
def test_update_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'update_trunk', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'update_trunk', self.alt_target))
|
|
|
|
def test_delete_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'delete_trunk', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'delete_trunk', self.alt_target))
|
|
|
|
def test_get_subports(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'get_subports', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'get_subports', self.alt_target))
|
|
|
|
def test_add_subports(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'add_subports', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'add_subports', self.alt_target))
|
|
|
|
def test_remove_subports(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'remove_subports', self.target))
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'remove_subports', self.alt_target))
|
|
|
|
|
|
class ProjectMemberTests(AdminTests):
|
|
|
|
def setUp(self):
|
|
super(ProjectMemberTests, self).setUp()
|
|
self.context = self.project_member_ctx
|
|
|
|
def test_create_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'create_trunk', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'create_trunk', self.alt_target)
|
|
|
|
def test_get_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'get_trunk', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'get_trunk', self.alt_target)
|
|
|
|
def test_update_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'update_trunk', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'update_trunk', self.alt_target)
|
|
|
|
def test_delete_trunk(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'delete_trunk', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'delete_trunk', self.alt_target)
|
|
|
|
def test_get_subports(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'get_subports', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'get_subports', self.alt_target)
|
|
|
|
def test_add_subports(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'add_subports', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'add_subports', self.alt_target)
|
|
|
|
def test_remove_subports(self):
|
|
self.assertTrue(
|
|
policy.enforce(self.context, 'remove_subports', self.target))
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'remove_subports', self.alt_target)
|
|
|
|
|
|
class ProjectReaderTests(ProjectMemberTests):
|
|
|
|
def setUp(self):
|
|
super(ProjectReaderTests, self).setUp()
|
|
self.context = self.project_reader_ctx
|
|
|
|
def test_create_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'create_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'create_trunk', self.alt_target)
|
|
|
|
def test_update_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'update_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'update_trunk', self.alt_target)
|
|
|
|
def test_delete_trunk(self):
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'delete_trunk', self.target)
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'delete_trunk', self.alt_target)
|
|
|
|
def test_add_subports(self):
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'add_subports', self.target)
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'add_subports', self.alt_target)
|
|
|
|
def test_remove_subports(self):
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'remove_subports', self.target)
|
|
self.assertRaises(
|
|
base_policy.PolicyNotAuthorized,
|
|
policy.enforce,
|
|
self.context, 'remove_subports', self.alt_target)
|