[S-RBAC] New default API policies for neutron-vpnaas

Change-Id: I9e13c75ae9bc0c63a0fa12782e52777586d93d8c
Signed-off-by: lajoskatona <lajos.katona@est.tech>
This commit is contained in:
elajkat
2025-05-06 18:18:12 +02:00
committed by lajoskatona
parent 93b8fddf45
commit eafa521504
11 changed files with 1126 additions and 132 deletions

View File

@@ -10,50 +10,72 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.conf.policies import base as neutron_base
from neutron_lib import policy as base
from oslo_policy import policy from oslo_policy import policy
from neutron_lib import policy as base DEPRECATED_REASON = """
The VPaaS API now supports Secure RBAC default roles for endpoint groups.
"""
rules = [ rules = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'create_endpoint_group', name='create_endpoint_group',
base.RULE_ANY, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Create a VPN endpoint group', scope_types=['project'],
[ description='Create a VPN endpoint group',
operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/vpn/endpoint-groups', 'path': '/vpn/endpoint-groups',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='create_endpoint_group',
check_str=base.RULE_ANY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'update_endpoint_group', name='update_endpoint_group',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Update a VPN endpoint group', scope_types=['project'],
[ description='Update a VPN endpoint group',
operations=[
{ {
'method': 'PUT', 'method': 'PUT',
'path': '/vpn/endpoint-groups/{id}', 'path': '/vpn/endpoint-groups/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='update_endpoint_group',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'delete_endpoint_group', name='delete_endpoint_group',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Delete a VPN endpoint group', scope_types=['project'],
[ description='Delete a VPN endpoint group',
operations=[
{ {
'method': 'DELETE', 'method': 'DELETE',
'path': '/vpn/endpoint-groups/{id}', 'path': '/vpn/endpoint-groups/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='delete_endpoint_group',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'get_endpoint_group', name='get_endpoint_group',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Get VPN endpoint groups', scope_types=['project'],
[ description='Get VPN endpoint groups',
operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/vpn/endpoint-groups', 'path': '/vpn/endpoint-groups',
@@ -62,7 +84,12 @@ rules = [
'method': 'GET', 'method': 'GET',
'path': '/vpn/endpoint-groups/{id}', 'path': '/vpn/endpoint-groups/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='get_endpoint_group',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
] ]

View File

@@ -10,50 +10,72 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.conf.policies import base as neutron_base
from neutron_lib import policy as base
from oslo_policy import policy from oslo_policy import policy
from neutron_lib import policy as base DEPRECATED_REASON = """
The VPaaS API now supports Secure RBAC default roles for ike policies.
"""
rules = [ rules = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'create_ikepolicy', name='create_ikepolicy',
base.RULE_ANY, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Create an IKE policy', scope_types=['project'],
[ description='Create an IKE policy',
operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/vpn/ikepolicies', 'path': '/vpn/ikepolicies',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='create_ikepolicy',
check_str=base.RULE_ANY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'update_ikepolicy', name='update_ikepolicy',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Update an IKE policy', scope_types=['project'],
[ description='Update an IKE policy',
operations=[
{ {
'method': 'PUT', 'method': 'PUT',
'path': '/vpn/ikepolicies/{id}', 'path': '/vpn/ikepolicies/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='update_ikepolicy',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'delete_ikepolicy', name='delete_ikepolicy',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Delete an IKE policy', scope_types=['project'],
[ description='Delete an IKE policy',
operations=[
{ {
'method': 'DELETE', 'method': 'DELETE',
'path': '/vpn/ikepolicies/{id}', 'path': '/vpn/ikepolicies/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='delete_ikepolicy',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'get_ikepolicy', name='get_ikepolicy',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Get IKE policyies', scope_types=['project'],
[ description='Get IKE policyies',
operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/vpn/ikepolicies', 'path': '/vpn/ikepolicies',
@@ -62,7 +84,12 @@ rules = [
'method': 'GET', 'method': 'GET',
'path': '/vpn/ikepolicies/{id}', 'path': '/vpn/ikepolicies/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='get_ikepolicy',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
] ]

View File

@@ -10,50 +10,72 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.conf.policies import base as neutron_base
from neutron_lib import policy as base
from oslo_policy import policy from oslo_policy import policy
from neutron_lib import policy as base DEPRECATED_REASON = """
The VPaaS API now supports Secure RBAC default roles for ipsec policies.
"""
rules = [ rules = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'create_ipsecpolicy', name='create_ipsecpolicy',
base.RULE_ANY, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Create an IPsec policy', scope_types=['project'],
[ description='Create an IPsec policy',
operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/vpn/ipsecpolicies', 'path': '/vpn/ipsecpolicies',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='create_ipsecpolicy',
check_str=base.RULE_ANY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'update_ipsecpolicy', name='update_ipsecpolicy',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Update an IPsec policy', scope_types=['project'],
[ description='Update an IPsec policy',
operations=[
{ {
'method': 'PUT', 'method': 'PUT',
'path': '/vpn/ipsecpolicies/{id}', 'path': '/vpn/ipsecpolicies/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='update_ipsecpolicy',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'delete_ipsecpolicy', name='delete_ipsecpolicy',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Delete an IPsec policy', scope_types=['project'],
[ description='Delete an IPsec policy',
operations=[
{ {
'method': 'DELETE', 'method': 'DELETE',
'path': '/vpn/ipsecpolicies/{id}', 'path': '/vpn/ipsecpolicies/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='delete_ipsecpolicy',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'get_ipsecpolicy', name='get_ipsecpolicy',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Get IPsec policies', scope_types=['project'],
[ description='Get IPsec policies',
operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/vpn/ipsecpolicies', 'path': '/vpn/ipsecpolicies',
@@ -62,7 +84,12 @@ rules = [
'method': 'GET', 'method': 'GET',
'path': '/vpn/ipsecpolicies/{id}', 'path': '/vpn/ipsecpolicies/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='get_ipsecpolicy',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
] ]

View File

@@ -10,50 +10,73 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.conf.policies import base as neutron_base
from neutron_lib import policy as base
from oslo_policy import policy from oslo_policy import policy
from neutron_lib import policy as base DEPRECATED_REASON = """
The VPaaS API now supports Secure RBAC default roles for ipsec site
connections.
"""
rules = [ rules = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'create_ipsec_site_connection', name='create_ipsec_site_connection',
base.RULE_ANY, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Create an IPsec site connection', scope_types=['project'],
[ description='Create an IPsec site connection',
operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/vpn/ipsec-site-connections', 'path': '/vpn/ipsec-site-connections',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='create_ipsec_site_connection',
check_str=base.RULE_ANY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'update_ipsec_site_connection', name='update_ipsec_site_connection',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Update an IPsec site connection', scope_types=['project'],
[ description='Update an IPsec site connection',
operations=[
{ {
'method': 'PUT', 'method': 'PUT',
'path': '/vpn/ipsec-site-connections/{id}', 'path': '/vpn/ipsec-site-connections/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='update_ipsec_site_connection',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'delete_ipsec_site_connection', name='delete_ipsec_site_connection',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Delete an IPsec site connection', scope_types=['project'],
[ description='Delete an IPsec site connection',
operations=[
{ {
'method': 'DELETE', 'method': 'DELETE',
'path': '/vpn/ipsec-site-connections/{id}', 'path': '/vpn/ipsec-site-connections/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='delete_ipsec_site_connection',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'get_ipsec_site_connection', name='get_ipsec_site_connection',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Get IPsec site connections', scope_types=['project'],
[ description='Get IPsec site connections',
operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/vpn/ipsec-site-connections', 'path': '/vpn/ipsec-site-connections',
@@ -62,7 +85,12 @@ rules = [
'method': 'GET', 'method': 'GET',
'path': '/vpn/ipsec-site-connections/{id}', 'path': '/vpn/ipsec-site-connections/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='get_ipsec_site_connection',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
] ]

View File

@@ -10,50 +10,72 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from neutron.conf.policies import base as neutron_base
from neutron_lib import policy as base
from oslo_policy import policy from oslo_policy import policy
from neutron_lib import policy as base DEPRECATED_REASON = """
The VPaaS API now supports Secure RBAC default roles for VPN services.
"""
rules = [ rules = [
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'create_vpnservice', name='create_vpnservice',
base.RULE_ANY, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Create a VPN service', scope_types=['project'],
[ description='Create a VPN service',
operations=[
{ {
'method': 'POST', 'method': 'POST',
'path': '/vpn/vpnservices', 'path': '/vpn/vpnservices',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='create_vpnservice',
check_str=base.RULE_ANY,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'update_vpnservice', name='update_vpnservice',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Update a VPN service', scope_types=['project'],
[ description='Update a VPN service',
operations=[
{ {
'method': 'PUT', 'method': 'PUT',
'path': '/vpn/vpnservices/{id}', 'path': '/vpn/vpnservices/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='update_vpnservice',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'delete_vpnservice', name='delete_vpnservice',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Delete a VPN service', scope_types=['project'],
[ description='Delete a VPN service',
operations=[
{ {
'method': 'DELETE', 'method': 'DELETE',
'path': '/vpn/vpnservices/{id}', 'path': '/vpn/vpnservices/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='delete_vpnservice',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
policy.DocumentedRuleDefault( policy.DocumentedRuleDefault(
'get_vpnservice', name='get_vpnservice',
base.RULE_ADMIN_OR_OWNER, check_str=neutron_base.ADMIN_OR_PROJECT_MEMBER,
'Get VPN services', scope_types=['project'],
[ description='Get VPN services',
operations=[
{ {
'method': 'GET', 'method': 'GET',
'path': '/vpn/vpnservices', 'path': '/vpn/vpnservices',
@@ -62,7 +84,12 @@ rules = [
'method': 'GET', 'method': 'GET',
'path': '/vpn/vpnservices/{id}', 'path': '/vpn/vpnservices/{id}',
}, },
] ],
deprecated_rule=policy.DeprecatedRule(
name='get_vpnservice',
check_str=base.RULE_ADMIN_OR_OWNER,
deprecated_reason=DEPRECATED_REASON,
deprecated_since='2025.2')
), ),
] ]

View File

@@ -563,7 +563,8 @@ class TestVpnaas(VPNPluginDbTestCase):
with self.ikepolicy(name=name, description=description) as ikepolicy: with self.ikepolicy(name=name, description=description) as ikepolicy:
req = self.new_show_request('ikepolicies', req = self.new_show_request('ikepolicies',
ikepolicy['ikepolicy']['id'], ikepolicy['ikepolicy']['id'],
fmt=self.fmt) fmt=self.fmt,
as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self._check_policy(res['ikepolicy'], keys, lifetime) self._check_policy(res['ikepolicy'], keys, lifetime)
@@ -582,7 +583,7 @@ class TestVpnaas(VPNPluginDbTestCase):
'value': 3600} 'value': 3600}
with self.ikepolicy(name=name) as ikepolicy: with self.ikepolicy(name=name) as ikepolicy:
keys.append(('id', ikepolicy['ikepolicy']['id'])) keys.append(('id', ikepolicy['ikepolicy']['id']))
req = self.new_list_request('ikepolicies') req = self.new_list_request('ikepolicies', as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
for k, v in keys: for k, v in keys:
@@ -599,7 +600,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ikepolicy2, ikepolicy2,
ikepolicy1), ikepolicy1),
[('name', 'desc')], [('name', 'desc')],
'ikepolicies') 'ikepolicies',
as_admin=True)
def test_list_ikepolicies_with_pagination_emulated(self): def test_list_ikepolicies_with_pagination_emulated(self):
"""Test case to list all ikepolicies with pagination.""" """Test case to list all ikepolicies with pagination."""
@@ -611,7 +613,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ikepolicy2, ikepolicy2,
ikepolicy3), ikepolicy3),
('name', 'asc'), 2, 2, ('name', 'asc'), 2, 2,
'ikepolicies') 'ikepolicies',
as_admin=True)
def test_list_ikepolicies_with_pagination_reverse_emulated(self): def test_list_ikepolicies_with_pagination_reverse_emulated(self):
"""Test case to list all ikepolicies with reverse pagination.""" """Test case to list all ikepolicies with reverse pagination."""
@@ -623,7 +626,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ikepolicy2, ikepolicy2,
ikepolicy3), ikepolicy3),
('name', 'asc'), 2, 2, ('name', 'asc'), 2, 2,
'ikepolicies') 'ikepolicies',
as_admin=True)
def test_update_ikepolicy(self): def test_update_ikepolicy(self):
"""Test case to update an ikepolicy.""" """Test case to update an ikepolicy."""
@@ -781,7 +785,8 @@ class TestVpnaas(VPNPluginDbTestCase):
with self.ipsecpolicy(name=name) as ipsecpolicy: with self.ipsecpolicy(name=name) as ipsecpolicy:
req = self.new_show_request('ipsecpolicies', req = self.new_show_request('ipsecpolicies',
ipsecpolicy['ipsecpolicy']['id'], ipsecpolicy['ipsecpolicy']['id'],
fmt=self.fmt) fmt=self.fmt,
as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self._check_policy(res['ipsecpolicy'], keys, lifetime) self._check_policy(res['ipsecpolicy'], keys, lifetime)
@@ -800,7 +805,7 @@ class TestVpnaas(VPNPluginDbTestCase):
'value': 3600} 'value': 3600}
with self.ipsecpolicy(name=name) as ipsecpolicy: with self.ipsecpolicy(name=name) as ipsecpolicy:
keys.append(('id', ipsecpolicy['ipsecpolicy']['id'])) keys.append(('id', ipsecpolicy['ipsecpolicy']['id']))
req = self.new_list_request('ipsecpolicies') req = self.new_list_request('ipsecpolicies', as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
self._check_policy(res['ipsecpolicies'][0], keys, lifetime) self._check_policy(res['ipsecpolicies'][0], keys, lifetime)
@@ -814,7 +819,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ipsecpolicy2, ipsecpolicy2,
ipsecpolicy1), ipsecpolicy1),
[('name', 'desc')], [('name', 'desc')],
'ipsecpolicies') 'ipsecpolicies',
as_admin=True)
def test_list_ipsecpolicies_with_pagination_emulated(self): def test_list_ipsecpolicies_with_pagination_emulated(self):
"""Test case to list all ipsecpolicies with pagination.""" """Test case to list all ipsecpolicies with pagination."""
@@ -826,7 +832,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ipsecpolicy2, ipsecpolicy2,
ipsecpolicy3), ipsecpolicy3),
('name', 'asc'), 2, 2, ('name', 'asc'), 2, 2,
'ipsecpolicies') 'ipsecpolicies',
as_admin=True)
def test_list_ipsecpolicies_with_pagination_reverse_emulated(self): def test_list_ipsecpolicies_with_pagination_reverse_emulated(self):
"""Test case to list all ipsecpolicies with reverse pagination.""" """Test case to list all ipsecpolicies with reverse pagination."""
@@ -838,7 +845,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ipsecpolicy2, ipsecpolicy2,
ipsecpolicy3), ipsecpolicy3),
('name', 'asc'), 2, 2, ('name', 'asc'), 2, 2,
'ipsecpolicies') 'ipsecpolicies',
as_admin=True)
def test_update_ipsecpolicy(self): def test_update_ipsecpolicy(self):
"""Test case to update an ipsecpolicy.""" """Test case to update an ipsecpolicy."""
@@ -1099,7 +1107,8 @@ class TestVpnaas(VPNPluginDbTestCase):
('status', 'PENDING_CREATE')] ('status', 'PENDING_CREATE')]
with self.vpnservice(name=name) as vpnservice: with self.vpnservice(name=name) as vpnservice:
req = self.new_show_request('vpnservices', req = self.new_show_request('vpnservices',
vpnservice['vpnservice']['id']) vpnservice['vpnservice']['id'],
as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
for k, v in keys: for k, v in keys:
self.assertEqual(res['vpnservice'][k], v) self.assertEqual(res['vpnservice'][k], v)
@@ -1114,7 +1123,7 @@ class TestVpnaas(VPNPluginDbTestCase):
with self.vpnservice(name=name) as vpnservice: with self.vpnservice(name=name) as vpnservice:
keys.append(('subnet_id', vpnservice['vpnservice']['subnet_id'])) keys.append(('subnet_id', vpnservice['vpnservice']['subnet_id']))
keys.append(('router_id', vpnservice['vpnservice']['router_id'])) keys.append(('router_id', vpnservice['vpnservice']['router_id']))
req = self.new_list_request('vpnservices') req = self.new_list_request('vpnservices', as_admin=True)
res = self.deserialize(self.fmt, req.get_response(self.ext_api)) res = self.deserialize(self.fmt, req.get_response(self.ext_api))
self.assertEqual(len(res), 1) self.assertEqual(len(res), 1)
for k, v in keys: for k, v in keys:
@@ -1146,7 +1155,8 @@ class TestVpnaas(VPNPluginDbTestCase):
self._test_list_with_sort('vpnservice', (vpnservice3, self._test_list_with_sort('vpnservice', (vpnservice3,
vpnservice2, vpnservice2,
vpnservice1), vpnservice1),
[('name', 'desc')]) [('name', 'desc')],
as_admin=True)
def test_list_vpnservice_with_pagination_emulated(self): def test_list_vpnservice_with_pagination_emulated(self):
"""Test case to list all vpnservices with pagination.""" """Test case to list all vpnservices with pagination."""
@@ -1175,7 +1185,8 @@ class TestVpnaas(VPNPluginDbTestCase):
(vpnservice1, (vpnservice1,
vpnservice2, vpnservice2,
vpnservice3), vpnservice3),
('name', 'asc'), 2, 2) ('name', 'asc'), 2, 2,
as_admin=True)
def test_list_vpnservice_with_pagination_reverse_emulated(self): def test_list_vpnservice_with_pagination_reverse_emulated(self):
"""Test case to list all vpnservices with reverse pagination.""" """Test case to list all vpnservices with reverse pagination."""
@@ -1205,7 +1216,8 @@ class TestVpnaas(VPNPluginDbTestCase):
vpnservice2, vpnservice2,
vpnservice3), vpnservice3),
('name', 'asc'), ('name', 'asc'),
2, 2) 2, 2,
as_admin=True)
def test_create_ipsec_site_connection_with_invalid_values(self): def test_create_ipsec_site_connection_with_invalid_values(self):
"""Test case to create an ipsec_site_connection with invalid values.""" """Test case to create an ipsec_site_connection with invalid values."""
@@ -1476,7 +1488,8 @@ class TestVpnaas(VPNPluginDbTestCase):
'ipsec-site-connections', 'ipsec-site-connections',
ipsec_site_connection[ ipsec_site_connection[
'ipsec_site_connection']['id'], 'ipsec_site_connection']['id'],
fmt=self.fmt fmt=self.fmt,
as_admin=True
) )
res = self.deserialize( res = self.deserialize(
self.fmt, self.fmt,
@@ -1506,7 +1519,8 @@ class TestVpnaas(VPNPluginDbTestCase):
) as conn3: ) as conn3:
self._test_list_with_sort('ipsec-site-connection', self._test_list_with_sort('ipsec-site-connection',
(conn3, conn2, conn1), (conn3, conn2, conn1),
[('name', 'desc')]) [('name', 'desc')],
as_admin=True)
def test_list_ipsec_site_connections_with_pagination_emulated(self): def test_list_ipsec_site_connections_with_pagination_emulated(self):
"""Test case to list all ipsec_site_connections with pagination.""" """Test case to list all ipsec_site_connections with pagination."""
@@ -1527,7 +1541,8 @@ class TestVpnaas(VPNPluginDbTestCase):
self._test_list_with_pagination( self._test_list_with_pagination(
'ipsec-site-connection', 'ipsec-site-connection',
(conn1, conn2, conn3), (conn1, conn2, conn3),
('name', 'asc'), 2, 2) ('name', 'asc'), 2, 2,
as_admin=True)
def test_list_ipsec_site_conns_with_pagination_reverse_emulated(self): def test_list_ipsec_site_conns_with_pagination_reverse_emulated(self):
"""Test to list all ipsec_site_connections with reverse pagination.""" """Test to list all ipsec_site_connections with reverse pagination."""
@@ -1548,7 +1563,8 @@ class TestVpnaas(VPNPluginDbTestCase):
self._test_list_with_pagination_reverse( self._test_list_with_pagination_reverse(
'ipsec-site-connection', 'ipsec-site-connection',
(conn1, conn2, conn3), (conn1, conn2, conn3),
('name', 'asc'), 2, 2 ('name', 'asc'), 2, 2,
as_admin=True
) )
def test_create_vpn(self): def test_create_vpn(self):
@@ -1585,7 +1601,8 @@ class TestVpnaas(VPNPluginDbTestCase):
vpnservice_req = self.new_show_request( vpnservice_req = self.new_show_request(
'vpnservices', 'vpnservices',
vpnservice_id, vpnservice_id,
fmt=self.fmt) fmt=self.fmt,
as_admin=True)
vpnservice_updated = self.deserialize( vpnservice_updated = self.deserialize(
self.fmt, self.fmt,
vpnservice_req.get_response(self.ext_api) vpnservice_req.get_response(self.ext_api)
@@ -1596,7 +1613,8 @@ class TestVpnaas(VPNPluginDbTestCase):
) )
ikepolicy_req = self.new_show_request('ikepolicies', ikepolicy_req = self.new_show_request('ikepolicies',
ikepolicy_id, ikepolicy_id,
fmt=self.fmt) fmt=self.fmt,
as_admin=True)
ikepolicy_res = self.deserialize( ikepolicy_res = self.deserialize(
self.fmt, self.fmt,
ikepolicy_req.get_response(self.ext_api) ikepolicy_req.get_response(self.ext_api)
@@ -1607,7 +1625,8 @@ class TestVpnaas(VPNPluginDbTestCase):
ipsecpolicy_req = self.new_show_request( ipsecpolicy_req = self.new_show_request(
'ipsecpolicies', 'ipsecpolicies',
ipsecpolicy_id, ipsecpolicy_id,
fmt=self.fmt) fmt=self.fmt,
as_admin=True)
ipsecpolicy_res = self.deserialize( ipsecpolicy_res = self.deserialize(
self.fmt, self.fmt,
ipsecpolicy_req.get_response(self.ext_api) ipsecpolicy_req.get_response(self.ext_api)

View File

@@ -0,0 +1,211 @@
# Copyright (c) Ericsson Software Technology 2025 Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_policy import policy as base_policy
from neutron import policy
from neutron.tests.unit.conf.policies import test_base as base
class EndpointGroupAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
super().setUp()
self.target = {
'project_id': self.project_id,
'tenant_id': self.project_id}
self.alt_target = {
'project_id': self.alt_project_id,
'tenant_id': self.alt_project_id}
class SystemAdminTests(EndpointGroupAPITestCase):
def setUp(self):
super().setUp()
self.context = self.system_admin_ctx
def test_create_endpoint_group(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_endpoint_group',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_endpoint_group',
self.alt_target)
def test_update_endpoint_group(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_endpoint_group',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_endpoint_group',
self.alt_target)
def test_delete_endpoint_group(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_endpoint_group',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_endpoint_group',
self.alt_target)
def test_get_endpoint_group(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_endpoint_group',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_endpoint_group',
self.alt_target)
class SystemMemberTests(SystemAdminTests):
def setUp(self):
super().setUp()
self.context = self.system_member_ctx
class SystemReaderTests(SystemMemberTests):
def setUp(self):
super().setUp()
self.context = self.system_reader_ctx
class AdminTests(EndpointGroupAPITestCase):
def setUp(self):
super().setUp()
self.context = self.project_admin_ctx
def test_create_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'create_endpoint_group', self.target))
self.assertTrue(
policy.enforce(
self.context, 'create_endpoint_group', self.alt_target))
def test_update_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'update_endpoint_group', self.target))
self.assertTrue(
policy.enforce(
self.context, 'update_endpoint_group', self.alt_target))
def test_delete_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_endpoint_group', self.target))
self.assertTrue(
policy.enforce(
self.context, 'delete_endpoint_group', self.alt_target))
def test_get_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'get_endpoint_group', self.target))
self.assertTrue(
policy.enforce(
self.context, 'get_endpoint_group', self.alt_target))
class ProjectManagerTests(AdminTests):
def setUp(self):
super().setUp()
self.context = self.project_manager_ctx
def test_create_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'create_endpoint_group', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_endpoint_group',
self.alt_target)
def test_update_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'update_endpoint_group', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_endpoint_group',
self.alt_target)
def test_delete_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_endpoint_group', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_endpoint_group',
self.alt_target)
def test_get_endpoint_group(self):
self.assertTrue(
policy.enforce(
self.context, 'get_endpoint_group', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_endpoint_group',
self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super().setUp()
self.context = self.project_member_ctx
class ServiceRoleTests(EndpointGroupAPITestCase):
def setUp(self):
super().setUp()
self.context = self.service_ctx
def test_create_endpoint_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_endpoint_group',
self.target)
def test_update_endpoint_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_endpoint_group',
self.target)
def test_delete_endpoint_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_endpoint_group',
self.target)
def test_get_endpoint_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_endpoint_group', self.target)

View File

@@ -0,0 +1,196 @@
# Copyright (c) Ericsson Software Technology 2025 Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_policy import policy as base_policy
from neutron import policy
from neutron.tests.unit.conf.policies import test_base as base
class IkePolicyAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
super().setUp()
self.target = {
'project_id': self.project_id,
'tenant_id': self.project_id}
self.alt_target = {
'project_id': self.alt_project_id,
'tenant_id': self.alt_project_id}
class SystemAdminTests(IkePolicyAPITestCase):
def setUp(self):
super().setUp()
self.context = self.system_admin_ctx
def test_create_ikepolicy(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_ikepolicy', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_ikepolicy', self.alt_target)
def test_update_ikepolicy(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_ikepolicy', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_ikepolicy', self.alt_target)
def test_delete_ikepolicy(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_ikepolicy', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_ikepolicy', self.alt_target)
def test_get_ikepolicy(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_ikepolicy', self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_ikepolicy', self.alt_target)
class SystemMemberTests(SystemAdminTests):
def setUp(self):
super().setUp()
self.context = self.system_member_ctx
class SystemReaderTests(SystemMemberTests):
def setUp(self):
super().setUp()
self.context = self.system_reader_ctx
class AdminTests(IkePolicyAPITestCase):
def setUp(self):
super().setUp()
self.context = self.project_admin_ctx
def test_create_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'create_ikepolicy', self.target))
self.assertTrue(
policy.enforce(
self.context, 'create_ikepolicy', self.alt_target))
def test_update_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'update_ikepolicy', self.target))
self.assertTrue(
policy.enforce(
self.context, 'update_ikepolicy', self.alt_target))
def test_delete_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_ikepolicy', self.target))
self.assertTrue(
policy.enforce(
self.context, 'delete_ikepolicy', self.alt_target))
def test_get_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'get_ikepolicy', self.target))
self.assertTrue(
policy.enforce(
self.context, 'get_ikepolicy', self.alt_target))
class ProjectManagerTests(AdminTests):
def setUp(self):
super().setUp()
self.context = self.project_manager_ctx
def test_create_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'create_ikepolicy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_ikepolicy', self.alt_target)
def test_update_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'update_ikepolicy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ikepolicy', self.alt_target)
def test_delete_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_ikepolicy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_ikepolicy', self.alt_target)
def test_get_ikepolicy(self):
self.assertTrue(
policy.enforce(
self.context, 'get_ikepolicy', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_ikepolicy', self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super().setUp()
self.context = self.project_member_ctx
class ServiceRoleTests(IkePolicyAPITestCase):
def setUp(self):
super().setUp()
self.context = self.service_ctx
def test_create_ikepolicy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_ikepolicy', self.target)
def test_update_ikepolicy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ikepolicy', self.target)
def test_delete_ikepolicy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_ikepolicy', self.target)
def test_get_ikepolicy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_ikepolicy', self.target)

View File

@@ -0,0 +1,224 @@
# Copyright (c) Ericsson Software Technology 2025 Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_policy import policy as base_policy
from neutron import policy
from neutron.tests.unit.conf.policies import test_base as base
class IpsecSiteConnectionAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
super().setUp()
self.target = {
'project_id': self.project_id,
'tenant_id': self.project_id}
self.alt_target = {
'project_id': self.alt_project_id,
'tenant_id': self.alt_project_id}
class SystemAdminTests(IpsecSiteConnectionAPITestCase):
def setUp(self):
super().setUp()
self.context = self.system_admin_ctx
def test_create_ipsec_site_connection(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_ipsec_site_connection',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_ipsec_site_connection',
self.alt_target)
def test_update_ipsec_site_connection(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_ipsec_site_connection',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_ipsec_site_connection',
self.alt_target)
def test_delete_ipsec_site_connection(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_ipsec_site_connection',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_ipsec_site_connection',
self.alt_target)
def test_get_ipsec_site_connection(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_ipsec_site_connection',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_ipsec_site_connection',
self.alt_target)
class SystemMemberTests(SystemAdminTests):
def setUp(self):
super().setUp()
self.context = self.system_member_ctx
class SystemReaderTests(SystemMemberTests):
def setUp(self):
super().setUp()
self.context = self.system_reader_ctx
class AdminTests(IpsecSiteConnectionAPITestCase):
def setUp(self):
super().setUp()
self.context = self.project_admin_ctx
def test_create_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'create_ipsec_site_connection',
self.target))
self.assertTrue(
policy.enforce(
self.context, 'create_ipsec_site_connection',
self.alt_target))
def test_update_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'update_ipsec_site_connection',
self.target))
self.assertTrue(
policy.enforce(
self.context, 'update_ipsec_site_connection',
self.alt_target))
def test_delete_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_ipsec_site_connection',
self.target))
self.assertTrue(
policy.enforce(
self.context, 'delete_ipsec_site_connection',
self.alt_target))
def test_get_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'get_ipsec_site_connection',
self.target))
self.assertTrue(
policy.enforce(
self.context, 'get_ipsec_site_connection',
self.alt_target))
class ProjectManagerTests(AdminTests):
def setUp(self):
super().setUp()
self.context = self.project_manager_ctx
def test_create_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'create_ipsec_site_connection',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_ipsec_site_connection',
self.alt_target)
def test_update_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'update_ipsec_site_connection',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ipsec_site_connection',
self.alt_target)
def test_delete_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_ipsec_site_connection',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_ipsec_site_connection',
self.alt_target)
def test_get_ipsec_site_connection(self):
self.assertTrue(
policy.enforce(
self.context, 'get_ipsec_site_connection',
self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_ipsec_site_connection',
self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super().setUp()
self.context = self.project_member_ctx
class ServiceRoleTests(IpsecSiteConnectionAPITestCase):
def setUp(self):
super().setUp()
self.context = self.service_ctx
def test_create_ipsec_site_connection(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_ipsec_site_connection',
self.target)
def test_update_ipsec_site_connection(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_ipsec_site_connection',
self.target)
def test_delete_ipsec_site_connection(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_ipsec_site_connection',
self.target)
def test_get_ipsec_site_connection(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_ipsec_site_connection',
self.target)

View File

@@ -0,0 +1,208 @@
# Copyright (c) Ericsson Software Technology 2025 Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
# implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from oslo_policy import policy as base_policy
from neutron import policy
from neutron.tests.unit.conf.policies import test_base as base
class VpnServiceAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
super().setUp()
self.target = {
'project_id': self.project_id,
'tenant_id': self.project_id}
self.alt_target = {
'project_id': self.alt_project_id,
'tenant_id': self.alt_project_id}
class SystemAdminTests(VpnServiceAPITestCase):
def setUp(self):
super().setUp()
self.context = self.system_admin_ctx
def test_create_vpnservice(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_vpnservice',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'create_vpnservice',
self.alt_target)
def test_update_vpnservice(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_vpnservice',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'update_vpnservice',
self.alt_target)
def test_delete_vpnservice(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_vpnservice',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'delete_vpnservice',
self.alt_target)
def test_get_vpnservice(self):
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_vpnservice',
self.target)
self.assertRaises(
base_policy.InvalidScope,
policy.enforce, self.context, 'get_vpnservice',
self.alt_target)
class SystemMemberTests(SystemAdminTests):
def setUp(self):
super().setUp()
self.context = self.system_member_ctx
class SystemReaderTests(SystemMemberTests):
def setUp(self):
super().setUp()
self.context = self.system_reader_ctx
class AdminTests(VpnServiceAPITestCase):
def setUp(self):
super().setUp()
self.context = self.project_admin_ctx
def test_create_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'create_vpnservice', self.target))
self.assertTrue(
policy.enforce(
self.context, 'create_vpnservice', self.alt_target))
def test_update_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'update_vpnservice', self.target))
self.assertTrue(
policy.enforce(
self.context, 'update_vpnservice', self.alt_target))
def test_delete_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_vpnservice', self.target))
self.assertTrue(
policy.enforce(
self.context, 'delete_vpnservice', self.alt_target))
def test_get_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'get_vpnservice', self.target))
self.assertTrue(
policy.enforce(
self.context, 'get_vpnservice', self.alt_target))
class ProjectManagerTests(AdminTests):
def setUp(self):
super().setUp()
self.context = self.project_manager_ctx
def test_create_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'create_vpnservice', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_vpnservice',
self.alt_target)
def test_update_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'update_vpnservice', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_vpnservice',
self.alt_target)
def test_delete_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'delete_vpnservice', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_vpnservice',
self.alt_target)
def test_get_vpnservice(self):
self.assertTrue(
policy.enforce(
self.context, 'get_vpnservice', self.target))
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_vpnservice',
self.alt_target)
class ProjectMemberTests(ProjectManagerTests):
def setUp(self):
super().setUp()
self.context = self.project_member_ctx
class ServiceRoleTests(VpnServiceAPITestCase):
def setUp(self):
super().setUp()
self.context = self.service_ctx
def test_create_vpnservice(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_vpnservice', self.target)
def test_update_vpnservice(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_vpnservice', self.target)
def test_delete_vpnservice(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_vpnservice', self.target)
def test_get_vpnservice(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_vpnservice', self.target)