From 6e94491c2c7bf69f9eff3b17e80f596564c5bc65 Mon Sep 17 00:00:00 2001 From: Miro Tomaska Date: Wed, 30 Apr 2025 01:49:39 +0000 Subject: [PATCH] [S-RBAC] Default RBAC policies Update tap_flow and tap_service API to new s-rbac defaults and added unit tests. Added unit tests for the existing tap_mirror API for completeness Change-Id: I9cb1ab098c6a25fc1e1991790a095e9366bb71c5 --- etc/neutron/policy.yaml.sample | 72 ++++- neutron_taas/policies/tap_flow.py | 81 ++++-- neutron_taas/policies/tap_service.py | 81 ++++-- neutron_taas/tests/unit/policies/__init__.py | 0 .../tests/unit/policies/test_tap_flow.py | 261 ++++++++++++++++++ .../tests/unit/policies/test_tap_mirror.py | 260 +++++++++++++++++ .../tests/unit/policies/test_tap_service.py | 261 ++++++++++++++++++ 7 files changed, 960 insertions(+), 56 deletions(-) create mode 100644 neutron_taas/tests/unit/policies/__init__.py create mode 100644 neutron_taas/tests/unit/policies/test_tap_flow.py create mode 100644 neutron_taas/tests/unit/policies/test_tap_mirror.py create mode 100644 neutron_taas/tests/unit/policies/test_tap_service.py diff --git a/etc/neutron/policy.yaml.sample b/etc/neutron/policy.yaml.sample index 00b78898..85274d84 100644 --- a/etc/neutron/policy.yaml.sample +++ b/etc/neutron/policy.yaml.sample @@ -1,34 +1,90 @@ # Create a tap flow # POST /taas/tap_flows -#"create_tap_flow": "rule:admin_or_owner" +# Intended scope(s): project +#"create_tap_flow": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "create_tap_flow":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "create_tap_flow":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Update a tap flow # PUT /taas/tap_flows/{id} -#"update_tap_flow": "rule:admin_or_owner" +# Intended scope(s): project +#"update_tap_flow": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "update_tap_flow":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "update_tap_flow":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Show a tap flow # GET /taas/tap_flows/{id} -#"get_tap_flow": "rule:admin_or_owner" +# Intended scope(s): project +#"get_tap_flow": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "get_tap_flow":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "get_tap_flow":"(rule:admin_only) or (role:member +# and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Delete a tap flow # DELETE /taas/tap_flows/{id} -#"delete_tap_flow": "rule:admin_or_owner" +# Intended scope(s): project +#"delete_tap_flow": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "delete_tap_flow":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "delete_tap_flow":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Create a tap service # POST /taas/tap_services -#"create_tap_service": "rule:admin_or_owner" +# Intended scope(s): project +#"create_tap_service": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "create_tap_service":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "create_tap_service":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Updates a tap service # PUT /taas/tap_services/{id} -#"update_tap_service": "rule:admin_or_owner" +# Intended scope(s): project +#"update_tap_service": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "update_tap_service":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "update_tap_service":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Show a tap service # GET /taas/tap_services/{id} -#"get_tap_service": "rule:admin_or_owner" +# Intended scope(s): project +#"get_tap_service": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "get_tap_service":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "get_tap_service":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Delete a tap service # DELETE /taas/tap_services/{id} -#"delete_tap_service": "rule:admin_or_owner" +# Intended scope(s): project +#"delete_tap_service": "(rule:admin_only) or (role:member and project_id:%(project_id)s)" + +# DEPRECATED +# "delete_tap_service":"rule:admin_or_owner" has been deprecated since +# 2025.2 in favor of "delete_tap_service":"(rule:admin_only) or +# (role:member and project_id:%(project_id)s)". +# The neutron TAAS API now supports Secure RBAC default roles. # Create a Tap Mirror # POST /taas/tap_mirrors diff --git a/neutron_taas/policies/tap_flow.py b/neutron_taas/policies/tap_flow.py index c3f71760..332dd004 100644 --- a/neutron_taas/policies/tap_flow.py +++ b/neutron_taas/policies/tap_flow.py @@ -12,52 +12,85 @@ from oslo_policy import policy +from neutron.conf.policies import base from neutron_lib.policy import RULE_ADMIN_OR_OWNER + +COLLECTION_PATH = '/taas/tap_flows' +RESOURCE_PATH = '/taas/tap_flows/{id}' + +DEPRECATED_REASON = """ +The neutron TAAS API now supports Secure RBAC default roles. +""" + rules = [ policy.DocumentedRuleDefault( - 'create_tap_flow', - RULE_ADMIN_OR_OWNER, - 'Create a tap flow', - [ + name='create_tap_flow', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create a tap flow', + operations=[ { 'method': 'POST', - 'path': '/taas/tap_flows', + 'path': COLLECTION_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_tap_flow', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_tap_flow', - RULE_ADMIN_OR_OWNER, - 'Update a tap flow', - [ + name='update_tap_flow', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Update a tap flow', + operations=[ { 'method': 'PUT', - 'path': '/taas/tap_flows/{id}', + 'path': RESOURCE_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_tap_flow', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_tap_flow', - RULE_ADMIN_OR_OWNER, - 'Show a tap flow', - [ + name='get_tap_flow', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Show a tap flow', + operations=[ { 'method': 'GET', - 'path': '/taas/tap_flows/{id}', + 'path': RESOURCE_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_tap_flow', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_tap_flow', - RULE_ADMIN_OR_OWNER, - 'Delete a tap flow', - [ + name='delete_tap_flow', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete a tap flow', + operations=[ { 'method': 'DELETE', - 'path': '/taas/tap_flows/{id}', + 'path': RESOURCE_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_tap_flow', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_taas/policies/tap_service.py b/neutron_taas/policies/tap_service.py index 65a306cb..c17c0350 100644 --- a/neutron_taas/policies/tap_service.py +++ b/neutron_taas/policies/tap_service.py @@ -12,52 +12,85 @@ from oslo_policy import policy +from neutron.conf.policies import base from neutron_lib.policy import RULE_ADMIN_OR_OWNER + +COLLECTION_PATH = '/taas/tap_services' +RESOURCE_PATH = '/taas/tap_services/{id}' + +DEPRECATED_REASON = """ +The neutron TAAS API now supports Secure RBAC default roles. +""" + rules = [ policy.DocumentedRuleDefault( - 'create_tap_service', - RULE_ADMIN_OR_OWNER, - 'Create a tap service', - [ + name='create_tap_service', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create a tap service', + operations=[ { 'method': 'POST', - 'path': '/taas/tap_services', + 'path': COLLECTION_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_tap_service', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_tap_service', - RULE_ADMIN_OR_OWNER, - 'Updates a tap service', - [ + name='update_tap_service', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Updates a tap service', + operations=[ { 'method': 'PUT', - 'path': '/taas/tap_services/{id}', + 'path': RESOURCE_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_tap_service', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_tap_service', - RULE_ADMIN_OR_OWNER, - 'Show a tap service', - [ + name='get_tap_service', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Show a tap service', + operations=[ { 'method': 'GET', - 'path': '/taas/tap_services/{id}', + 'path': RESOURCE_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_tap_service', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_tap_service', - RULE_ADMIN_OR_OWNER, - 'Delete a tap service', - [ + name='delete_tap_service', + check_str=base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete a tap service', + operations=[ { 'method': 'DELETE', - 'path': '/taas/tap_services/{id}', + 'path': RESOURCE_PATH, } - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_tap_service', + check_str=RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_taas/tests/unit/policies/__init__.py b/neutron_taas/tests/unit/policies/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/neutron_taas/tests/unit/policies/test_tap_flow.py b/neutron_taas/tests/unit/policies/test_tap_flow.py new file mode 100644 index 00000000..dc0f1b90 --- /dev/null +++ b/neutron_taas/tests/unit/policies/test_tap_flow.py @@ -0,0 +1,261 @@ +# Copyright (c) 2025 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy + +from neutron import policy as neutron_policy +from neutron.tests.unit.conf.policies import test_base + + +class TapFlowGroupAPITestCase(test_base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id + } + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id + } + + +class SystemAdminTests(TapFlowGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_tap_flow(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'create_tap_flow', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'create_tap_flow', + self.alt_target) + + def test_update_tap_flow(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'update_tap_flow', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'update_tap_flow', + self.alt_target) + + def test_get_tap_flow(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'get_tap_flow', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'get_tap_flow', + self.alt_target) + + def test_delete_tap_flow(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'delete_tap_flow', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'delete_tap_flow', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTest(TapFlowGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_tap_flow(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'create_tap_flow', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'create_tap_flow', self.alt_target)) + + def test_update_tap_flow(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'update_tap_flow', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'update_tap_flow', self.alt_target)) + + def test_get_tap_flow(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'get_tap_flow', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'get_tap_flow', self.alt_target)) + + def test_delete_tap_flow(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'delete_tap_flow', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'delete_tap_flow', self.alt_target)) + + +class ProjectManagerTests(TapFlowGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_tap_flow(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'create_tap_flow', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_flow', + self.alt_target) + + def test_update_tap_flow(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'update_tap_flow', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_flow', + self.alt_target) + + def test_get_tap_flow(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'get_tap_flow', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_flow', + self.alt_target) + + def test_delete_tap_flow(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'delete_tap_flow', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_flow', + self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ProjectReaderTests(ProjectMemberTests): + + def setUp(self): + super().setUp() + self.context = self.project_reader_ctx + + def test_create_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_flow', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_flow', + self.alt_target) + + def test_update_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_flow', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_flow', + self.alt_target) + + def test_get_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_flow', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_flow', + self.alt_target) + + def test_delete_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_flow', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_flow', + self.alt_target) + + +class ServiceRoleTests(TapFlowGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_flow', + self.target) + + def test_update_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_flow', + self.target) + + def test_get_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_flow', + self.target) + + def test_delete_tap_flow(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_flow', + self.target) diff --git a/neutron_taas/tests/unit/policies/test_tap_mirror.py b/neutron_taas/tests/unit/policies/test_tap_mirror.py new file mode 100644 index 00000000..62075da2 --- /dev/null +++ b/neutron_taas/tests/unit/policies/test_tap_mirror.py @@ -0,0 +1,260 @@ +# Copyright (c) 2025 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy + +from neutron import policy as neutron_policy +from neutron.tests.unit.conf.policies import test_base + + +class TapMirrorAPITestCase(test_base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id + } + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id + } + + +class SystemAdminTests(TapMirrorAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_tap_mirror(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'create_tap_mirror', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'create_tap_mirror', + self.alt_target) + + def test_update_tap_mirror(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'update_tap_mirror', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'update_tap_mirror', + self.alt_target) + + def test_get_tap_mirror(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'get_tap_mirror', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'get_tap_mirror', + self.alt_target) + + def test_delete_tap_mirror(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'delete_tap_mirror', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'delete_tap_mirror', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTests(TapMirrorAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'create_tap_mirror', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'create_tap_mirror', self.alt_target)) + + def test_update_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'update_tap_mirror', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'update_tap_mirror', self.alt_target)) + + def test_get_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'get_tap_mirror', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'get_tap_mirror', self.alt_target)) + + def test_delete_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'delete_tap_mirror', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'delete_tap_mirror', self.alt_target)) + + +class ProjectManagerTests(TapMirrorAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'create_tap_mirror', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_mirror', + self.alt_target) + + def test_update_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'update_tap_mirror', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_mirror', + self.alt_target) + + def test_get_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'get_tap_mirror', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_mirror', + self.alt_target) + + def test_delete_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'delete_tap_mirror', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_mirror', + self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ProjectReaderTests(TapMirrorAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_reader_ctx + + def test_create_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_mirror', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_mirror', + self.alt_target) + + def test_update_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_mirror', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_mirror', + self.alt_target) + + def test_get_tap_mirror(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'get_tap_mirror', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_mirror', + self.alt_target) + + def test_delete_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_mirror', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_mirror', + self.alt_target) + + +class ServiceRoleTests(TapMirrorAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_mirror', + self.target) + + def test_update_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_mirror', + self.target) + + def test_get_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_mirror', + self.target) + + def test_delete_tap_mirror(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_mirror', + self.target) diff --git a/neutron_taas/tests/unit/policies/test_tap_service.py b/neutron_taas/tests/unit/policies/test_tap_service.py new file mode 100644 index 00000000..9b124e18 --- /dev/null +++ b/neutron_taas/tests/unit/policies/test_tap_service.py @@ -0,0 +1,261 @@ +# Copyright (c) 2025 Red Hat Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy + +from neutron import policy as neutron_policy +from neutron.tests.unit.conf.policies import test_base + + +class TapServiceAPITestCase(test_base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id + } + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id + } + + +class SystemAdminTests(TapServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_tap_service(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'create_tap_service', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'create_tap_service', + self.alt_target) + + def test_update_tap_service(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'update_tap_service', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'update_tap_service', + self.alt_target) + + def test_get_tap_service(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'get_tap_service', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'get_tap_service', + self.alt_target) + + def test_delete_tap_service(self): + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'delete_tap_service', + self.target) + self.assertRaises( + policy.InvalidScope, + neutron_policy.enforce, self.context, 'delete_tap_service', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTests(TapServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_tap_service(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'create_tap_service', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'create_tap_service', self.alt_target)) + + def test_update_tap_service(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'update_tap_service', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'update_tap_service', self.alt_target)) + + def test_get_tap_service(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'get_tap_service', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'get_tap_service', self.alt_target)) + + def test_delete_tap_service(self): + self.assertTrue( + neutron_policy.enforce( + self.context, 'delete_tap_service', self.target)) + self.assertTrue( + neutron_policy.enforce( + self.context, 'delete_tap_service', self.alt_target)) + + +class ProjectManagerTests(TapServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_tap_service(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'create_tap_service', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_service', + self.alt_target) + + def test_update_tap_service(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'update_tap_service', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_service', + self.alt_target) + + def test_get_tap_service(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'get_tap_service', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_service', + self.alt_target) + + def test_delete_tap_service(self): + self.assertTrue( + neutron_policy.enforce(self.context, 'delete_tap_service', + self.target)) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_service', + self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ProjectReaderTests(TapServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_reader_ctx + + def test_create_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_service', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_service', + self.alt_target) + + def test_update_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_service', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_service', + self.alt_target) + + def test_get_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_service', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_service', + self.alt_target) + + def test_delete_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_service', + self.target) + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_service', + self.alt_target) + + +class ServiceRoleTests(TapServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'create_tap_service', + self.target) + + def test_update_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'update_tap_service', + self.target) + + def test_get_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'get_tap_service', + self.target) + + def test_delete_tap_service(self): + self.assertRaises( + policy.PolicyNotAuthorized, + neutron_policy.enforce, self.context, 'delete_tap_service', + self.target)