diff --git a/neutron_vpnaas/policies/endpoint_group.py b/neutron_vpnaas/policies/endpoint_group.py index 7f96668fc..a3e7c0dc7 100644 --- a/neutron_vpnaas/policies/endpoint_group.py +++ b/neutron_vpnaas/policies/endpoint_group.py @@ -10,50 +10,72 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron.conf.policies import base as neutron_base +from neutron_lib import policy as base from oslo_policy import policy -from neutron_lib import policy as base - +DEPRECATED_REASON = """ +The VPaaS API now supports Secure RBAC default roles for endpoint groups. +""" rules = [ policy.DocumentedRuleDefault( - 'create_endpoint_group', - base.RULE_ANY, - 'Create a VPN endpoint group', - [ + name='create_endpoint_group', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create a VPN endpoint group', + operations=[ { 'method': 'POST', 'path': '/vpn/endpoint-groups', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_endpoint_group', + check_str=base.RULE_ANY, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_endpoint_group', - base.RULE_ADMIN_OR_OWNER, - 'Update a VPN endpoint group', - [ + name='update_endpoint_group', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Update a VPN endpoint group', + operations=[ { 'method': 'PUT', 'path': '/vpn/endpoint-groups/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_endpoint_group', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_endpoint_group', - base.RULE_ADMIN_OR_OWNER, - 'Delete a VPN endpoint group', - [ + name='delete_endpoint_group', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete a VPN endpoint group', + operations=[ { 'method': 'DELETE', 'path': '/vpn/endpoint-groups/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_endpoint_group', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_endpoint_group', - base.RULE_ADMIN_OR_OWNER, - 'Get VPN endpoint groups', - [ + name='get_endpoint_group', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Get VPN endpoint groups', + operations=[ { 'method': 'GET', 'path': '/vpn/endpoint-groups', @@ -62,7 +84,12 @@ rules = [ 'method': 'GET', 'path': '/vpn/endpoint-groups/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_endpoint_group', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_vpnaas/policies/ike_policy.py b/neutron_vpnaas/policies/ike_policy.py index 54552c781..17ac3285c 100644 --- a/neutron_vpnaas/policies/ike_policy.py +++ b/neutron_vpnaas/policies/ike_policy.py @@ -10,50 +10,72 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron.conf.policies import base as neutron_base +from neutron_lib import policy as base from oslo_policy import policy -from neutron_lib import policy as base - +DEPRECATED_REASON = """ +The VPaaS API now supports Secure RBAC default roles for ike policies. +""" rules = [ policy.DocumentedRuleDefault( - 'create_ikepolicy', - base.RULE_ANY, - 'Create an IKE policy', - [ + name='create_ikepolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create an IKE policy', + operations=[ { 'method': 'POST', 'path': '/vpn/ikepolicies', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_ikepolicy', + check_str=base.RULE_ANY, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_ikepolicy', - base.RULE_ADMIN_OR_OWNER, - 'Update an IKE policy', - [ + name='update_ikepolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Update an IKE policy', + operations=[ { 'method': 'PUT', 'path': '/vpn/ikepolicies/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_ikepolicy', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_ikepolicy', - base.RULE_ADMIN_OR_OWNER, - 'Delete an IKE policy', - [ + name='delete_ikepolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete an IKE policy', + operations=[ { 'method': 'DELETE', 'path': '/vpn/ikepolicies/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_ikepolicy', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_ikepolicy', - base.RULE_ADMIN_OR_OWNER, - 'Get IKE policyies', - [ + name='get_ikepolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Get IKE policyies', + operations=[ { 'method': 'GET', 'path': '/vpn/ikepolicies', @@ -62,7 +84,12 @@ rules = [ 'method': 'GET', 'path': '/vpn/ikepolicies/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_ikepolicy', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_vpnaas/policies/ipsec_policy.py b/neutron_vpnaas/policies/ipsec_policy.py index 118f28732..02a3fe52f 100644 --- a/neutron_vpnaas/policies/ipsec_policy.py +++ b/neutron_vpnaas/policies/ipsec_policy.py @@ -10,50 +10,72 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron.conf.policies import base as neutron_base +from neutron_lib import policy as base from oslo_policy import policy -from neutron_lib import policy as base - +DEPRECATED_REASON = """ +The VPaaS API now supports Secure RBAC default roles for ipsec policies. +""" rules = [ policy.DocumentedRuleDefault( - 'create_ipsecpolicy', - base.RULE_ANY, - 'Create an IPsec policy', - [ + name='create_ipsecpolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create an IPsec policy', + operations=[ { 'method': 'POST', 'path': '/vpn/ipsecpolicies', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_ipsecpolicy', + check_str=base.RULE_ANY, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_ipsecpolicy', - base.RULE_ADMIN_OR_OWNER, - 'Update an IPsec policy', - [ + name='update_ipsecpolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Update an IPsec policy', + operations=[ { 'method': 'PUT', 'path': '/vpn/ipsecpolicies/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_ipsecpolicy', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_ipsecpolicy', - base.RULE_ADMIN_OR_OWNER, - 'Delete an IPsec policy', - [ + name='delete_ipsecpolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete an IPsec policy', + operations=[ { 'method': 'DELETE', 'path': '/vpn/ipsecpolicies/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_ipsecpolicy', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_ipsecpolicy', - base.RULE_ADMIN_OR_OWNER, - 'Get IPsec policies', - [ + name='get_ipsecpolicy', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Get IPsec policies', + operations=[ { 'method': 'GET', 'path': '/vpn/ipsecpolicies', @@ -62,7 +84,12 @@ rules = [ 'method': 'GET', 'path': '/vpn/ipsecpolicies/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_ipsecpolicy', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_vpnaas/policies/ipsec_site_connection.py b/neutron_vpnaas/policies/ipsec_site_connection.py index b05d56ed3..64afe28b5 100644 --- a/neutron_vpnaas/policies/ipsec_site_connection.py +++ b/neutron_vpnaas/policies/ipsec_site_connection.py @@ -10,50 +10,73 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron.conf.policies import base as neutron_base +from neutron_lib import policy as base from oslo_policy import policy -from neutron_lib import policy as base - +DEPRECATED_REASON = """ +The VPaaS API now supports Secure RBAC default roles for ipsec site +connections. +""" rules = [ policy.DocumentedRuleDefault( - 'create_ipsec_site_connection', - base.RULE_ANY, - 'Create an IPsec site connection', - [ + name='create_ipsec_site_connection', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create an IPsec site connection', + operations=[ { 'method': 'POST', 'path': '/vpn/ipsec-site-connections', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_ipsec_site_connection', + check_str=base.RULE_ANY, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_ipsec_site_connection', - base.RULE_ADMIN_OR_OWNER, - 'Update an IPsec site connection', - [ + name='update_ipsec_site_connection', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Update an IPsec site connection', + operations=[ { 'method': 'PUT', 'path': '/vpn/ipsec-site-connections/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_ipsec_site_connection', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_ipsec_site_connection', - base.RULE_ADMIN_OR_OWNER, - 'Delete an IPsec site connection', - [ + name='delete_ipsec_site_connection', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete an IPsec site connection', + operations=[ { 'method': 'DELETE', 'path': '/vpn/ipsec-site-connections/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_ipsec_site_connection', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_ipsec_site_connection', - base.RULE_ADMIN_OR_OWNER, - 'Get IPsec site connections', - [ + name='get_ipsec_site_connection', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Get IPsec site connections', + operations=[ { 'method': 'GET', 'path': '/vpn/ipsec-site-connections', @@ -62,7 +85,12 @@ rules = [ 'method': 'GET', 'path': '/vpn/ipsec-site-connections/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_ipsec_site_connection', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_vpnaas/policies/vpnservice.py b/neutron_vpnaas/policies/vpnservice.py index a9a42f0a7..485f0d2a1 100644 --- a/neutron_vpnaas/policies/vpnservice.py +++ b/neutron_vpnaas/policies/vpnservice.py @@ -10,50 +10,72 @@ # License for the specific language governing permissions and limitations # under the License. +from neutron.conf.policies import base as neutron_base +from neutron_lib import policy as base from oslo_policy import policy -from neutron_lib import policy as base - +DEPRECATED_REASON = """ +The VPaaS API now supports Secure RBAC default roles for VPN services. +""" rules = [ policy.DocumentedRuleDefault( - 'create_vpnservice', - base.RULE_ANY, - 'Create a VPN service', - [ + name='create_vpnservice', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Create a VPN service', + operations=[ { 'method': 'POST', 'path': '/vpn/vpnservices', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='create_vpnservice', + check_str=base.RULE_ANY, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'update_vpnservice', - base.RULE_ADMIN_OR_OWNER, - 'Update a VPN service', - [ + name='update_vpnservice', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Update a VPN service', + operations=[ { 'method': 'PUT', 'path': '/vpn/vpnservices/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='update_vpnservice', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'delete_vpnservice', - base.RULE_ADMIN_OR_OWNER, - 'Delete a VPN service', - [ + name='delete_vpnservice', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Delete a VPN service', + operations=[ { 'method': 'DELETE', 'path': '/vpn/vpnservices/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='delete_vpnservice', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), policy.DocumentedRuleDefault( - 'get_vpnservice', - base.RULE_ADMIN_OR_OWNER, - 'Get VPN services', - [ + name='get_vpnservice', + check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER, + scope_types=['project'], + description='Get VPN services', + operations=[ { 'method': 'GET', 'path': '/vpn/vpnservices', @@ -62,7 +84,12 @@ rules = [ 'method': 'GET', 'path': '/vpn/vpnservices/{id}', }, - ] + ], + deprecated_rule=policy.DeprecatedRule( + name='get_vpnservice', + check_str=base.RULE_ADMIN_OR_OWNER, + deprecated_reason=DEPRECATED_REASON, + deprecated_since='2025.2') ), ] diff --git a/neutron_vpnaas/tests/unit/db/vpn/test_vpn_db.py b/neutron_vpnaas/tests/unit/db/vpn/test_vpn_db.py index cfed2f253..d9f48db43 100644 --- a/neutron_vpnaas/tests/unit/db/vpn/test_vpn_db.py +++ b/neutron_vpnaas/tests/unit/db/vpn/test_vpn_db.py @@ -563,7 +563,8 @@ class TestVpnaas(VPNPluginDbTestCase): with self.ikepolicy(name=name, description=description) as ikepolicy: req = self.new_show_request('ikepolicies', ikepolicy['ikepolicy']['id'], - fmt=self.fmt) + fmt=self.fmt, + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self._check_policy(res['ikepolicy'], keys, lifetime) @@ -582,7 +583,7 @@ class TestVpnaas(VPNPluginDbTestCase): 'value': 3600} with self.ikepolicy(name=name) as ikepolicy: keys.append(('id', ikepolicy['ikepolicy']['id'])) - req = self.new_list_request('ikepolicies') + req = self.new_list_request('ikepolicies', as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self.assertEqual(len(res), 1) for k, v in keys: @@ -599,7 +600,8 @@ class TestVpnaas(VPNPluginDbTestCase): ikepolicy2, ikepolicy1), [('name', 'desc')], - 'ikepolicies') + 'ikepolicies', + as_admin=True) def test_list_ikepolicies_with_pagination_emulated(self): """Test case to list all ikepolicies with pagination.""" @@ -611,7 +613,8 @@ class TestVpnaas(VPNPluginDbTestCase): ikepolicy2, ikepolicy3), ('name', 'asc'), 2, 2, - 'ikepolicies') + 'ikepolicies', + as_admin=True) def test_list_ikepolicies_with_pagination_reverse_emulated(self): """Test case to list all ikepolicies with reverse pagination.""" @@ -623,7 +626,8 @@ class TestVpnaas(VPNPluginDbTestCase): ikepolicy2, ikepolicy3), ('name', 'asc'), 2, 2, - 'ikepolicies') + 'ikepolicies', + as_admin=True) def test_update_ikepolicy(self): """Test case to update an ikepolicy.""" @@ -781,7 +785,8 @@ class TestVpnaas(VPNPluginDbTestCase): with self.ipsecpolicy(name=name) as ipsecpolicy: req = self.new_show_request('ipsecpolicies', ipsecpolicy['ipsecpolicy']['id'], - fmt=self.fmt) + fmt=self.fmt, + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self._check_policy(res['ipsecpolicy'], keys, lifetime) @@ -800,7 +805,7 @@ class TestVpnaas(VPNPluginDbTestCase): 'value': 3600} with self.ipsecpolicy(name=name) as ipsecpolicy: keys.append(('id', ipsecpolicy['ipsecpolicy']['id'])) - req = self.new_list_request('ipsecpolicies') + req = self.new_list_request('ipsecpolicies', as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self.assertEqual(len(res), 1) self._check_policy(res['ipsecpolicies'][0], keys, lifetime) @@ -814,7 +819,8 @@ class TestVpnaas(VPNPluginDbTestCase): ipsecpolicy2, ipsecpolicy1), [('name', 'desc')], - 'ipsecpolicies') + 'ipsecpolicies', + as_admin=True) def test_list_ipsecpolicies_with_pagination_emulated(self): """Test case to list all ipsecpolicies with pagination.""" @@ -826,7 +832,8 @@ class TestVpnaas(VPNPluginDbTestCase): ipsecpolicy2, ipsecpolicy3), ('name', 'asc'), 2, 2, - 'ipsecpolicies') + 'ipsecpolicies', + as_admin=True) def test_list_ipsecpolicies_with_pagination_reverse_emulated(self): """Test case to list all ipsecpolicies with reverse pagination.""" @@ -838,7 +845,8 @@ class TestVpnaas(VPNPluginDbTestCase): ipsecpolicy2, ipsecpolicy3), ('name', 'asc'), 2, 2, - 'ipsecpolicies') + 'ipsecpolicies', + as_admin=True) def test_update_ipsecpolicy(self): """Test case to update an ipsecpolicy.""" @@ -1099,7 +1107,8 @@ class TestVpnaas(VPNPluginDbTestCase): ('status', 'PENDING_CREATE')] with self.vpnservice(name=name) as vpnservice: req = self.new_show_request('vpnservices', - vpnservice['vpnservice']['id']) + vpnservice['vpnservice']['id'], + as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) for k, v in keys: self.assertEqual(res['vpnservice'][k], v) @@ -1114,7 +1123,7 @@ class TestVpnaas(VPNPluginDbTestCase): with self.vpnservice(name=name) as vpnservice: keys.append(('subnet_id', vpnservice['vpnservice']['subnet_id'])) keys.append(('router_id', vpnservice['vpnservice']['router_id'])) - req = self.new_list_request('vpnservices') + req = self.new_list_request('vpnservices', as_admin=True) res = self.deserialize(self.fmt, req.get_response(self.ext_api)) self.assertEqual(len(res), 1) for k, v in keys: @@ -1146,7 +1155,8 @@ class TestVpnaas(VPNPluginDbTestCase): self._test_list_with_sort('vpnservice', (vpnservice3, vpnservice2, vpnservice1), - [('name', 'desc')]) + [('name', 'desc')], + as_admin=True) def test_list_vpnservice_with_pagination_emulated(self): """Test case to list all vpnservices with pagination.""" @@ -1175,7 +1185,8 @@ class TestVpnaas(VPNPluginDbTestCase): (vpnservice1, vpnservice2, vpnservice3), - ('name', 'asc'), 2, 2) + ('name', 'asc'), 2, 2, + as_admin=True) def test_list_vpnservice_with_pagination_reverse_emulated(self): """Test case to list all vpnservices with reverse pagination.""" @@ -1205,7 +1216,8 @@ class TestVpnaas(VPNPluginDbTestCase): vpnservice2, vpnservice3), ('name', 'asc'), - 2, 2) + 2, 2, + as_admin=True) def test_create_ipsec_site_connection_with_invalid_values(self): """Test case to create an ipsec_site_connection with invalid values.""" @@ -1476,7 +1488,8 @@ class TestVpnaas(VPNPluginDbTestCase): 'ipsec-site-connections', ipsec_site_connection[ 'ipsec_site_connection']['id'], - fmt=self.fmt + fmt=self.fmt, + as_admin=True ) res = self.deserialize( self.fmt, @@ -1506,7 +1519,8 @@ class TestVpnaas(VPNPluginDbTestCase): ) as conn3: self._test_list_with_sort('ipsec-site-connection', (conn3, conn2, conn1), - [('name', 'desc')]) + [('name', 'desc')], + as_admin=True) def test_list_ipsec_site_connections_with_pagination_emulated(self): """Test case to list all ipsec_site_connections with pagination.""" @@ -1527,7 +1541,8 @@ class TestVpnaas(VPNPluginDbTestCase): self._test_list_with_pagination( 'ipsec-site-connection', (conn1, conn2, conn3), - ('name', 'asc'), 2, 2) + ('name', 'asc'), 2, 2, + as_admin=True) def test_list_ipsec_site_conns_with_pagination_reverse_emulated(self): """Test to list all ipsec_site_connections with reverse pagination.""" @@ -1548,7 +1563,8 @@ class TestVpnaas(VPNPluginDbTestCase): self._test_list_with_pagination_reverse( 'ipsec-site-connection', (conn1, conn2, conn3), - ('name', 'asc'), 2, 2 + ('name', 'asc'), 2, 2, + as_admin=True ) def test_create_vpn(self): @@ -1585,7 +1601,8 @@ class TestVpnaas(VPNPluginDbTestCase): vpnservice_req = self.new_show_request( 'vpnservices', vpnservice_id, - fmt=self.fmt) + fmt=self.fmt, + as_admin=True) vpnservice_updated = self.deserialize( self.fmt, vpnservice_req.get_response(self.ext_api) @@ -1596,7 +1613,8 @@ class TestVpnaas(VPNPluginDbTestCase): ) ikepolicy_req = self.new_show_request('ikepolicies', ikepolicy_id, - fmt=self.fmt) + fmt=self.fmt, + as_admin=True) ikepolicy_res = self.deserialize( self.fmt, ikepolicy_req.get_response(self.ext_api) @@ -1607,7 +1625,8 @@ class TestVpnaas(VPNPluginDbTestCase): ipsecpolicy_req = self.new_show_request( 'ipsecpolicies', ipsecpolicy_id, - fmt=self.fmt) + fmt=self.fmt, + as_admin=True) ipsecpolicy_res = self.deserialize( self.fmt, ipsecpolicy_req.get_response(self.ext_api) diff --git a/neutron_vpnaas/tests/unit/policies/__init__.py b/neutron_vpnaas/tests/unit/policies/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/neutron_vpnaas/tests/unit/policies/test_endpoint_group.py b/neutron_vpnaas/tests/unit/policies/test_endpoint_group.py new file mode 100644 index 000000000..96ad4cc70 --- /dev/null +++ b/neutron_vpnaas/tests/unit/policies/test_endpoint_group.py @@ -0,0 +1,211 @@ +# Copyright (c) Ericsson Software Technology 2025 Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy as base_policy + +from neutron import policy +from neutron.tests.unit.conf.policies import test_base as base + + +class EndpointGroupAPITestCase(base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id} + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id} + + +class SystemAdminTests(EndpointGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_endpoint_group(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_endpoint_group', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_endpoint_group', + self.alt_target) + + def test_update_endpoint_group(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_endpoint_group', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_endpoint_group', + self.alt_target) + + def test_delete_endpoint_group(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_endpoint_group', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_endpoint_group', + self.alt_target) + + def test_get_endpoint_group(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_endpoint_group', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_endpoint_group', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTests(EndpointGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'create_endpoint_group', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_endpoint_group', self.alt_target)) + + def test_update_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'update_endpoint_group', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_endpoint_group', self.alt_target)) + + def test_delete_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_endpoint_group', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_endpoint_group', self.alt_target)) + + def test_get_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'get_endpoint_group', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_endpoint_group', self.alt_target)) + + +class ProjectManagerTests(AdminTests): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'create_endpoint_group', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_endpoint_group', + self.alt_target) + + def test_update_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'update_endpoint_group', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_endpoint_group', + self.alt_target) + + def test_delete_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_endpoint_group', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_endpoint_group', + self.alt_target) + + def test_get_endpoint_group(self): + self.assertTrue( + policy.enforce( + self.context, 'get_endpoint_group', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_endpoint_group', + self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ServiceRoleTests(EndpointGroupAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_endpoint_group(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_endpoint_group', + self.target) + + def test_update_endpoint_group(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_endpoint_group', + self.target) + + def test_delete_endpoint_group(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_endpoint_group', + self.target) + + def test_get_endpoint_group(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_endpoint_group', self.target) diff --git a/neutron_vpnaas/tests/unit/policies/test_ike_policy.py b/neutron_vpnaas/tests/unit/policies/test_ike_policy.py new file mode 100644 index 000000000..8100ae759 --- /dev/null +++ b/neutron_vpnaas/tests/unit/policies/test_ike_policy.py @@ -0,0 +1,196 @@ +# Copyright (c) Ericsson Software Technology 2025 Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy as base_policy + +from neutron import policy +from neutron.tests.unit.conf.policies import test_base as base + + +class IkePolicyAPITestCase(base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id} + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id} + + +class SystemAdminTests(IkePolicyAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_ikepolicy(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_ikepolicy', self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_ikepolicy', self.alt_target) + + def test_update_ikepolicy(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_ikepolicy', self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_ikepolicy', self.alt_target) + + def test_delete_ikepolicy(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_ikepolicy', self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_ikepolicy', self.alt_target) + + def test_get_ikepolicy(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_ikepolicy', self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_ikepolicy', self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTests(IkePolicyAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'create_ikepolicy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_ikepolicy', self.alt_target)) + + def test_update_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'update_ikepolicy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_ikepolicy', self.alt_target)) + + def test_delete_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_ikepolicy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_ikepolicy', self.alt_target)) + + def test_get_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'get_ikepolicy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_ikepolicy', self.alt_target)) + + +class ProjectManagerTests(AdminTests): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'create_ikepolicy', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_ikepolicy', self.alt_target) + + def test_update_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'update_ikepolicy', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_ikepolicy', self.alt_target) + + def test_delete_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_ikepolicy', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_ikepolicy', self.alt_target) + + def test_get_ikepolicy(self): + self.assertTrue( + policy.enforce( + self.context, 'get_ikepolicy', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_ikepolicy', self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ServiceRoleTests(IkePolicyAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_ikepolicy(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_ikepolicy', self.target) + + def test_update_ikepolicy(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_ikepolicy', self.target) + + def test_delete_ikepolicy(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_ikepolicy', self.target) + + def test_get_ikepolicy(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_ikepolicy', self.target) diff --git a/neutron_vpnaas/tests/unit/policies/test_ipsec_site_connection.py b/neutron_vpnaas/tests/unit/policies/test_ipsec_site_connection.py new file mode 100644 index 000000000..afd84974d --- /dev/null +++ b/neutron_vpnaas/tests/unit/policies/test_ipsec_site_connection.py @@ -0,0 +1,224 @@ +# Copyright (c) Ericsson Software Technology 2025 Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy as base_policy + +from neutron import policy +from neutron.tests.unit.conf.policies import test_base as base + + +class IpsecSiteConnectionAPITestCase(base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id} + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id} + + +class SystemAdminTests(IpsecSiteConnectionAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_ipsec_site_connection(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_ipsec_site_connection', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_ipsec_site_connection', + self.alt_target) + + def test_update_ipsec_site_connection(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_ipsec_site_connection', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_ipsec_site_connection', + self.alt_target) + + def test_delete_ipsec_site_connection(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_ipsec_site_connection', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_ipsec_site_connection', + self.alt_target) + + def test_get_ipsec_site_connection(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_ipsec_site_connection', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_ipsec_site_connection', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTests(IpsecSiteConnectionAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'create_ipsec_site_connection', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_ipsec_site_connection', + self.alt_target)) + + def test_update_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'update_ipsec_site_connection', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_ipsec_site_connection', + self.alt_target)) + + def test_delete_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_ipsec_site_connection', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_ipsec_site_connection', + self.alt_target)) + + def test_get_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'get_ipsec_site_connection', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_ipsec_site_connection', + self.alt_target)) + + +class ProjectManagerTests(AdminTests): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'create_ipsec_site_connection', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_ipsec_site_connection', + self.alt_target) + + def test_update_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'update_ipsec_site_connection', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_ipsec_site_connection', + self.alt_target) + + def test_delete_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_ipsec_site_connection', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_ipsec_site_connection', + self.alt_target) + + def test_get_ipsec_site_connection(self): + self.assertTrue( + policy.enforce( + self.context, 'get_ipsec_site_connection', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_ipsec_site_connection', + self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ServiceRoleTests(IpsecSiteConnectionAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_ipsec_site_connection(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_ipsec_site_connection', + self.target) + + def test_update_ipsec_site_connection(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_ipsec_site_connection', + self.target) + + def test_delete_ipsec_site_connection(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_ipsec_site_connection', + self.target) + + def test_get_ipsec_site_connection(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_ipsec_site_connection', + self.target) diff --git a/neutron_vpnaas/tests/unit/policies/test_vpnservice.py b/neutron_vpnaas/tests/unit/policies/test_vpnservice.py new file mode 100644 index 000000000..7addb80e6 --- /dev/null +++ b/neutron_vpnaas/tests/unit/policies/test_vpnservice.py @@ -0,0 +1,208 @@ +# Copyright (c) Ericsson Software Technology 2025 Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or +# implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from oslo_policy import policy as base_policy + +from neutron import policy +from neutron.tests.unit.conf.policies import test_base as base + + +class VpnServiceAPITestCase(base.PolicyBaseTestCase): + + def setUp(self): + super().setUp() + self.target = { + 'project_id': self.project_id, + 'tenant_id': self.project_id} + self.alt_target = { + 'project_id': self.alt_project_id, + 'tenant_id': self.alt_project_id} + + +class SystemAdminTests(VpnServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.system_admin_ctx + + def test_create_vpnservice(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_vpnservice', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'create_vpnservice', + self.alt_target) + + def test_update_vpnservice(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_vpnservice', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'update_vpnservice', + self.alt_target) + + def test_delete_vpnservice(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_vpnservice', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'delete_vpnservice', + self.alt_target) + + def test_get_vpnservice(self): + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_vpnservice', + self.target) + self.assertRaises( + base_policy.InvalidScope, + policy.enforce, self.context, 'get_vpnservice', + self.alt_target) + + +class SystemMemberTests(SystemAdminTests): + + def setUp(self): + super().setUp() + self.context = self.system_member_ctx + + +class SystemReaderTests(SystemMemberTests): + + def setUp(self): + super().setUp() + self.context = self.system_reader_ctx + + +class AdminTests(VpnServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.project_admin_ctx + + def test_create_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'create_vpnservice', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_vpnservice', self.alt_target)) + + def test_update_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'update_vpnservice', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_vpnservice', self.alt_target)) + + def test_delete_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_vpnservice', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_vpnservice', self.alt_target)) + + def test_get_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'get_vpnservice', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_vpnservice', self.alt_target)) + + +class ProjectManagerTests(AdminTests): + + def setUp(self): + super().setUp() + self.context = self.project_manager_ctx + + def test_create_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'create_vpnservice', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_vpnservice', + self.alt_target) + + def test_update_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'update_vpnservice', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_vpnservice', + self.alt_target) + + def test_delete_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_vpnservice', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_vpnservice', + self.alt_target) + + def test_get_vpnservice(self): + self.assertTrue( + policy.enforce( + self.context, 'get_vpnservice', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_vpnservice', + self.alt_target) + + +class ProjectMemberTests(ProjectManagerTests): + + def setUp(self): + super().setUp() + self.context = self.project_member_ctx + + +class ServiceRoleTests(VpnServiceAPITestCase): + + def setUp(self): + super().setUp() + self.context = self.service_ctx + + def test_create_vpnservice(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_vpnservice', self.target) + + def test_update_vpnservice(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_vpnservice', self.target) + + def test_delete_vpnservice(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_vpnservice', self.target) + + def test_get_vpnservice(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_vpnservice', self.target)