Merge "[S-RBAC] Add service role in neutron policy"

This commit is contained in:
Zuul 2023-10-06 13:37:01 +00:00 committed by Gerrit Code Review
commit a45263e146
39 changed files with 1812 additions and 71 deletions

View File

@ -13,6 +13,9 @@
from neutron_lib import policy as neutron_policy
from oslo_policy import policy
# This role is used only for communication between services, it shouldn't be
# used by human users
SERVICE = 'rule:service_api'
# For completion of the phase 1
# https://governance.openstack.org/tc/goals/selected/consistent-and-secure-rbac.html#phase-1
@ -33,6 +36,8 @@ PROJECT_READER = 'role:reader and project_id:%(project_id)s'
# protecting APIs designed to operate with multiple scopes (e.g.,
# an administrator should be able to delete any router in the deployment, a
# project member should only be able to delete routers in their project).
ADMIN_OR_SERVICE = (
'(' + ADMIN + ') or (' + SERVICE + ')')
ADMIN_OR_PROJECT_MEMBER = (
'(' + ADMIN + ') or (' + PROJECT_MEMBER + ')')
ADMIN_OR_PROJECT_READER = (
@ -75,6 +80,10 @@ rules = [
'context_is_admin',
'role:admin',
description='Rule for cloud admin access'),
policy.RuleDefault(
"service_api",
"role:service",
description="Default rule for the service-to-service APIs."),
policy.RuleDefault(
'owner',
'tenant_id:%(tenant_id)s',
@ -87,7 +96,10 @@ rules = [
policy.RuleDefault(
'context_is_advsvc',
'role:advsvc',
description='Rule for advsvc role access'),
description='Rule for advsvc role access',
deprecated_reason=('Neutron now supports service role for '
'service to service communication.'),
deprecated_since='2024.1'),
policy.RuleDefault(
'admin_or_network_owner',
neutron_policy.policy_or('rule:context_is_admin',

View File

@ -168,6 +168,7 @@ rules = [
name='get_network',
check_str=neutron_policy.policy_or(
base.ADMIN_OR_PROJECT_READER,
base.SERVICE,
'rule:shared',
'rule:external',
neutron_policy.RULE_ADVSVC

View File

@ -66,8 +66,8 @@ rules = [
name='create_port:device_owner',
check_str=neutron_policy.policy_or(
'not rule:network_device',
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
description='Specify ``device_owner`` attribute when creating a port',
@ -84,8 +84,8 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port:mac_address',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER),
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER),
scope_types=['project'],
description='Specify ``mac_address`` attribute when creating a port',
operations=ACTION_POST,
@ -100,8 +100,8 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port:fixed_ips',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER,
'rule:shared'),
scope_types=['project'],
description='Specify ``fixed_ips`` information when creating a port',
@ -118,8 +118,8 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port:fixed_ips:ip_address',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER),
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER),
scope_types=['project'],
description='Specify IP address in ``fixed_ips`` when creating a port',
operations=ACTION_POST,
@ -134,8 +134,8 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port:fixed_ips:subnet_id',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER,
'rule:shared'),
scope_types=['project'],
description='Specify subnet ID in ``fixed_ips`` when creating a port',
@ -152,8 +152,8 @@ rules = [
policy.DocumentedRuleDefault(
name='create_port:port_security_enabled',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER),
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER),
scope_types=['project'],
description=(
'Specify ``port_security_enabled`` '
@ -271,8 +271,8 @@ rules = [
policy.DocumentedRuleDefault(
name='get_port',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_READER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_READER,
base.PROJECT_READER
),
scope_types=['project'],
@ -359,9 +359,8 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port',
check_str=neutron_policy.policy_or(
base.ADMIN,
base.ADMIN_OR_SERVICE,
base.PROJECT_MEMBER,
neutron_policy.RULE_ADVSVC
),
scope_types=['project'],
description='Update a port',
@ -378,8 +377,8 @@ rules = [
name='update_port:device_owner',
check_str=neutron_policy.policy_or(
'not rule:network_device',
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER,
),
scope_types=['project'],
description='Update ``device_owner`` attribute of a port',
@ -396,8 +395,7 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port:mac_address',
check_str=neutron_policy.policy_or(
base.ADMIN,
neutron_policy.RULE_ADVSVC
base.ADMIN_OR_SERVICE,
),
scope_types=['project'],
description='Update ``mac_address`` attribute of a port',
@ -413,8 +411,8 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port:fixed_ips',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
description='Specify ``fixed_ips`` information when updating a port',
@ -430,8 +428,8 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port:fixed_ips:ip_address',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
description=(
@ -450,8 +448,8 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port:fixed_ips:subnet_id',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER,
'rule:shared'
),
scope_types=['project'],
@ -472,8 +470,8 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port:port_security_enabled',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_NET_OWNER_MEMBER,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER
),
scope_types=['project'],
description='Update ``port_security_enabled`` attribute of a port',
@ -513,9 +511,8 @@ rules = [
policy.DocumentedRuleDefault(
name='update_port:binding:vnic_type',
check_str=neutron_policy.policy_or(
base.ADMIN,
base.ADMIN_OR_SERVICE,
base.PROJECT_MEMBER,
neutron_policy.RULE_ADVSVC
),
scope_types=['project'],
description='Update ``binding:vnic_type`` attribute of a port',
@ -595,9 +592,9 @@ rules = [
policy.DocumentedRuleDefault(
name='delete_port',
check_str=neutron_policy.policy_or(
neutron_policy.RULE_ADVSVC,
base.ADMIN_OR_SERVICE,
base.NET_OWNER_MEMBER,
base.PROJECT_MEMBER,
base.ADMIN_OR_NET_OWNER_MEMBER
),
scope_types=['project'],
description='Delete a port',

View File

@ -22,7 +22,7 @@ ACTIVATE_BINDING_PATH = '/ports/{port_id}/bindings/{host}'
rules = [
policy.DocumentedRuleDefault(
name='get_port_binding',
check_str=base.ADMIN,
check_str=base.ADMIN_OR_SERVICE,
scope_types=['project'],
description='Get port binding information',
operations=[
@ -34,7 +34,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='create_port_binding',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description='Create port binding on the host',
operations=[
@ -46,7 +46,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='delete_port_binding',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description='Delete port binding on the host',
operations=[
@ -58,7 +58,7 @@ rules = [
),
policy.DocumentedRuleDefault(
name='activate',
check_str=base.ADMIN,
check_str=base.SERVICE,
scope_types=['project'],
description='Activate port binding on the host',
operations=[

View File

@ -91,3 +91,16 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(AddressGroupAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_address_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_address_group", self.target)

View File

@ -252,3 +252,46 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_address_scope', self.alt_target)
class ServiceRoleTests(AddressScopeAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_address_scope(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_address_scope', self.target)
def test_create_address_scope_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_address_scope:shared', self.target)
def test_get_address_scope(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_address_scope', self.target)
def test_update_address_scope(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_address_scope', self.target)
def test_update_address_scope_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_address_scope:shared', self.target)
def test_delete_address_scope(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_address_scope', self.target)

View File

@ -254,3 +254,76 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(AgentAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_agent", self.target)
def test_update_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "update_agent", self.target)
def test_delete_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_agent", self.target)
def test_add_network_to_dhcp_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_dhcp-network", self.target)
def test_networks_on_dhcp_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_dhcp-networks", self.target)
def test_delete_network_from_dhcp_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_dhcp-network", self.target)
def test_add_router_to_l3_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_l3-router", self.target)
def test_get_routers_on_l3_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_l3-routers", self.target)
def test_delete_router_from_l3_agent(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_l3-router", self.target)
def test_get_dhcp_agents_hosting_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_dhcp-agents", self.target)
def test_get_l3_agents_hosting_router(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_l3-agents", self.target)

View File

@ -153,3 +153,24 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, DELETE_POLICY, self.alt_target
)
class ServiceRoleTests(AutoAllocatedTopologyAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_topology(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, GET_POLICY, self.target
)
def test_delete_topology(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, DELETE_POLICY, self.target
)

View File

@ -76,3 +76,16 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(AvailabilityZoneAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_availability_zone(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_availability_zone", self.target)

View File

@ -68,6 +68,7 @@ class PolicyBaseTestCase(tests_base.BaseTestCase):
self.user_id = uuidutils.generate_uuid()
self._prepare_system_scope_personas()
self._prepare_project_scope_personas()
self._prepare_service_persona()
self.alt_project_id = uuidutils.generate_uuid()
def _prepare_system_scope_personas(self):
@ -98,6 +99,12 @@ class PolicyBaseTestCase(tests_base.BaseTestCase):
roles=['reader'],
project_id=self.project_id)
def _prepare_service_persona(self):
self.service_ctx = context.Context(
user_id='service',
roles=['service'],
project_id='service')
class RuleScopesTestCase(PolicyBaseTestCase):

View File

@ -220,3 +220,66 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(FlavorAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_flavor(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_flavor', self.target)
def test_update_flavor(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_flavor', self.target)
def test_delete_flavor(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_flavor', self.target)
def test_create_service_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_service_profile', self.target)
def test_get_service_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_service_profile', self.target)
def test_update_service_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_service_profile', self.target)
def test_delete_service_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_service_profile', self.target)
def test_create_flavor_service_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_flavor_service_profile',
self.target)
def test_delete_flavor_service_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_flavor_service_profile',
self.target)

View File

@ -228,3 +228,41 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_floatingip", self.alt_target)
class ServiceRoleTests(FloatingIPAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_floatingip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_floatingip", self.target)
def test_create_floatingip_with_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_floatingip:floating_ip_address",
self.target)
def test_get_floatingip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_floatingip", self.target)
def test_update_floatingip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "update_floatingip", self.target)
def test_delete_floatingip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_floatingip", self.target)

View File

@ -90,3 +90,16 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(FloatingipPoolsAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_floatingip_pool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_floatingip_pool', self.target)

View File

@ -317,3 +317,46 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.alt_target)
class ServiceRoleTests(FloatingipPortForwardingAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_fip_pf(self):
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_floatingip_port_forwarding',
self.target)
def test_get_fip_pf(self):
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_floatingip_port_forwarding',
self.target)
def test_update_fip_pf(self):
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_floatingip_port_forwarding',
self.target)
def test_delete_fip_pf(self):
with mock.patch.object(self.plugin_mock, 'get_floatingip',
return_value=self.fip):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_floatingip_port_forwarding',
self.target)

View File

@ -224,3 +224,34 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_router_conntrack_helper', self.alt_target)
class ServiceRoleTests(L3ConntrackHelperAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_router_conntrack_helper(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router_conntrack_helper', self.target)
def test_get_router_conntrack_helper(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_router_conntrack_helper', self.target)
def test_update_router_conntrack_helper(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router_conntrack_helper', self.target)
def test_delete_router_conntrack_helper(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_router_conntrack_helper', self.target)

View File

@ -182,3 +182,30 @@ class ProjectReaderTests(LocalIPAPITestCase):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "delete_local_ip", self.alt_target)
class ServiceRoleTests(LocalIPAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_local_ip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "create_local_ip", self.target)
def test_get_local_ip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "get_local_ip", self.target)
def test_update_local_ip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "update_local_ip", self.target)
def test_delete_local_ip(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, "delete_local_ip", self.target)

View File

@ -209,3 +209,31 @@ class ProjectReaderTests(ProjectMemberTests):
policy.enforce,
self.context, 'delete_local_ip_port_association',
self.alt_target)
class ServiceRoleTests(LocalIPAssociationAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_local_ip_port_association(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_local_ip_port_association',
self.target)
def test_get_local_ip_port_association(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_local_ip_port_association',
self.target)
def test_delete_local_ip_port_association(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_local_ip_port_association',
self.target)

View File

@ -136,3 +136,35 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(LoggingAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_loggable_resource(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_loggable_resource', self.target)
def test_create_log(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_log', self.target)
def test_get_log(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_log', self.target)
def test_update_log(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_log', self.target)
def test_delete_log(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_log', self.target)

View File

@ -229,3 +229,46 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(MeteringAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_metering_label(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_metering_label', self.target)
def test_get_metering_label(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_metering_label', self.target)
def test_delete_metering_label(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_metering_label', self.target)
def test_create_metering_label_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_metering_label_rule', self.target)
def test_get_metering_label_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_metering_label_rule', self.target)
def test_delete_metering_label_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_metering_label_rule', self.target)

View File

@ -190,3 +190,34 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_ndp_proxy", self.alt_target)
class ServiceRoleTests(NDPProxyAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_ndp_proxy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "create_ndp_proxy", self.target)
def test_get_ndp_proxy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_ndp_proxy", self.target)
def test_update_ndp_proxy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "update_ndp_proxy", self.target)
def test_delete_ndp_proxy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "delete_ndp_proxy", self.target)

View File

@ -813,3 +813,157 @@ class ProjectReaderTests(ProjectMemberTests):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_network', self.alt_target)
class ServiceRoleTests(NetworkAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_network', self.target)
def test_create_network_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:shared', self.target)
def test_create_network_external(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:router:external', self.target)
def test_create_network_default(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:is_default', self.target)
def test_create_network_port_security_enabled(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:port_security_enabled',
self.target)
def test_create_network_segments(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:segments', self.target)
def test_create_network_provider_network_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:provider:network_type', self.target)
def test_create_network_provider_physical_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:provider:physical_network',
self.target)
def test_create_network_provider_segmentation_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network:provider:segmentation_id',
self.target)
def test_get_network(self):
self.assertTrue(
policy.enforce(self.context, 'get_network', self.target))
def test_get_network_segments(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network:segments', self.target)
def test_get_network_provider_network_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network:provider:network_type', self.target)
def test_get_network_provider_physical_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network:provider:physical_network',
self.target)
def test_get_network_provider_segmentation_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network:provider:segmentation_id',
self.target)
def test_update_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_network', self.target)
def test_update_network_segments(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:segments', self.target)
def test_update_network_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:shared', self.target)
def test_update_network_provider_network_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:provider:network_type', self.target)
def test_update_network_provider_physical_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:provider:physical_network',
self.target)
def test_update_network_provider_segmentation_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:provider:segmentation_id',
self.target)
def test_update_network_external(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:router:external', self.target)
def test_update_network_default(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:is_default', self.target)
def test_update_network_port_security_enabled(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network:port_security_enabled',
self.target)
def test_delete_network(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_network', self.target)

View File

@ -83,3 +83,16 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(NetworkIPAvailabilityAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_network_ip_availability(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network_ip_availability', self.target)

View File

@ -134,3 +134,34 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(NetworkSegmentRangeAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_network_segment_range(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_network_segment_range', self.target)
def test_get_network_segment_range(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_network_segment_range', self.target)
def test_update_network_segment_range(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_network_segment_range', self.target)
def test_delete_network_segment_range(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_network_segment_range', self.target)

View File

@ -1170,3 +1170,202 @@ class ProjectReaderTests(ProjectMemberTests):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_port', self.alt_target)
class ServiceRoleTests(PortAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_port(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port', self.target)
def test_create_port_with_device_owner(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:device_owner', self.target))
def test_create_port_with_mac_address(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:mac_address', self.target))
def test_create_port_with_fixed_ips(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:fixed_ips', self.target))
def test_create_port_with_fixed_ips_and_ip_address(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:fixed_ips:ip_address', self.target))
def test_create_port_with_fixed_ips_and_subnet_id(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:fixed_ips:subnet_id', self.target))
def test_create_port_with_port_security_enabled(self):
self.assertTrue(
policy.enforce(
self.context, 'create_port:port_security_enabled',
self.target))
def test_create_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:host_id',
self.target)
def test_create_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:profile',
self.target)
def test_create_port_with_binding_vnic_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_port:binding:vnic_type',
self.target)
def test_create_port_with_allowed_address_pairs(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs',
self.target)
def test_create_port_with_allowed_address_pairs_and_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:mac_address',
self.alt_target)
def test_create_port_with_allowed_address_pairs_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_port:allowed_address_pairs:ip_address',
self.target)
def test_get_port(self):
self.assertTrue(
policy.enforce(self.context, 'get_port', self.target))
def test_get_port_binding_vif_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:vif_type',
self.target)
def test_get_port_binding_vif_details(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:vif_details',
self.target)
def test_get_port_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:host_id',
self.target)
def test_get_port_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:binding:profile',
self.target)
def test_get_port_resource_request(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_port:resource_request',
self.target)
def test_update_port(self):
self.assertTrue(
policy.enforce(self.context, 'update_port', self.target))
def test_update_port_with_device_owner(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:device_owner', self.target))
def test_update_port_with_mac_address(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:mac_address', self.target))
def test_update_port_with_fixed_ips(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:fixed_ips', self.target))
def test_update_port_with_fixed_ips_and_ip_address(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:fixed_ips:ip_address', self.target))
def test_update_port_with_fixed_ips_and_subnet_id(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:fixed_ips:subnet_id', self.target))
def test_update_port_with_port_security_enabled(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:port_security_enabled',
self.target))
def test_update_port_with_binding_host_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:host_id',
self.target)
def test_update_port_with_binding_profile(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_port:binding:profile',
self.target)
def test_update_port_with_binding_vnic_type(self):
self.assertTrue(
policy.enforce(
self.context, 'update_port:binding:vnic_type', self.target))
def test_update_port_with_allowed_address_pairs(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs',
self.target)
def test_update_port_with_allowed_address_pairs_and_mac_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs:mac_address',
self.target)
def test_update_port_with_allowed_address_pairs_and_ip_address(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:allowed_address_pairs:ip_address',
self.target)
def test_update_port_data_plane_status(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_port:data_plane_status', self.target)
def test_delete_port(self):
self.assertTrue(
policy.enforce(self.context, 'delete_port', self.target))

View File

@ -81,31 +81,6 @@ class AdminTests(PortBindingsAPITestCase):
self.assertTrue(
policy.enforce(self.context, "get_port_binding", self.target))
def test_create_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "create_port_binding", self.target))
def test_delete_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "delete_port_binding", self.target))
def test_activate_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "activate", self.target))
class ProjectMemberTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
def test_get_port_binding(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_port_binding", self.target)
def test_create_port_binding(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
@ -125,8 +100,44 @@ class ProjectMemberTests(AdminTests):
self.context, "activate", self.target)
class ProjectMemberTests(AdminTests):
def setUp(self):
super(ProjectMemberTests, self).setUp()
self.context = self.project_member_ctx
def test_get_port_binding(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, "get_port_binding", self.target)
class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(PortBindingsAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "get_port_binding", self.target))
def test_create_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "create_port_binding", self.target))
def test_delete_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "delete_port_binding", self.target))
def test_activate_port_binding(self):
self.assertTrue(
policy.enforce(self.context, "activate", self.target))

View File

@ -159,6 +159,33 @@ class ProjectReaderQosPolicyTests(ProjectMemberQosPolicyTests):
self.context = self.project_reader_ctx
class ServiceRoleQosPolicyTests(QosPolicyAPITestCase):
def setUp(self):
super(ServiceRoleQosPolicyTests, self).setUp()
self.context = self.service_ctx
def test_get_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'get_policy', self.target)
def test_create_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'create_policy', self.target)
def test_update_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'update_policy', self.target)
def test_delete_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce, self.context, 'delete_policy', self.target)
class QosRuleTypeAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
@ -218,6 +245,19 @@ class ProjectReaderQosRuleTypeTests(ProjectMemberQosRuleTypeTests):
self.context = self.project_reader_ctx
class ServiceRoleQosRuleTypeTests(QosRuleTypeAPITestCase):
def setUp(self):
super(ServiceRoleQosRuleTypeTests, self).setUp()
self.context = self.service_ctx
def test_get_rule_type(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_rule_type', self.target)
class QosRulesAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
@ -541,6 +581,63 @@ class ProjectReaderQosBandwidthLimitRuleTests(
self.context = self.project_reader_ctx
class ServiceRoleQosBandwidthLimitRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ServiceRoleQosBandwidthLimitRuleTests, self).setUp()
self.context = self.service_ctx
def test_get_policy_bandwidth_limit_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_policy_bandwidth_limit_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_alias_bandwidth_limit_rule',
self.target)
def test_create_policy_bandwidth_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_bandwidth_limit_rule',
self.target)
def test_update_policy_bandwidth_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_bandwidth_limit_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_bandwidth_limit_rule',
self.target)
def test_delete_policy_bandwidth_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_bandwidth_limit_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_bandwidth_limit_rule',
self.target)
class SystemAdminQosPacketRateLimitRuleTests(QosRulesAPITestCase):
def setUp(self):
@ -716,6 +813,41 @@ class ProjectReaderQosPacketRateLimitRuleTests(
self.context = self.project_reader_ctx
class ServiceRoleQosPacketRateLimitRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ServiceRoleQosPacketRateLimitRuleTests, self).setUp()
self.context = self.service_ctx
def test_get_policy_packet_rate_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_policy_packet_rate_limit_rule',
self.target)
def test_create_policy_packet_rate_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_packet_rate_limit_rule',
self.target)
def test_update_policy_packet_rate_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_packet_rate_limit_rule',
self.target)
def test_delete_policy_packet_rate_limit_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_packet_rate_limit_rule',
self.target)
class SystemAdminQosDSCPMarkingRuleTests(QosRulesAPITestCase):
def setUp(self):
@ -1013,6 +1145,64 @@ class ProjectReaderQosDSCPMarkingRuleTests(
self.context = self.project_reader_ctx
class ServiceRoleQosDSCPMarkingRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ServiceRoleQosDSCPMarkingRuleTests, self).setUp()
self.context = self.service_ctx
def test_get_policy_dscp_marking_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_policy_dscp_marking_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_alias_dscp_marking_rule',
self.target)
def test_create_policy_dscp_marking_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_dscp_marking_rule',
self.target)
def test_update_policy_dscp_marking_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_dscp_marking_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_dscp_marking_rule',
self.target)
def test_delete_policy_dscp_marking_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_dscp_marking_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_dscp_marking_rule',
self.target)
class SystemAdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase):
def setUp(self):
@ -1310,6 +1500,63 @@ class ProjectReaderQosMinimumBandwidthRuleTests(
self.context = self.project_reader_ctx
class ServiceRoleQosMinimumBandwidthRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ServiceRoleQosMinimumBandwidthRuleTests, self).setUp()
self.context = self.service_ctx
def test_get_policy_minimum_bandwidth_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_policy_minimum_bandwidth_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_alias_minimum_bandwidth_rule',
self.target)
def test_create_policy_minimum_bandwidth_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_minimum_bandwidth_rule',
self.target)
def test_update_policy_minimum_bandwidth_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_minimum_bandwidth_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_alias_minimum_bandwidth_rule',
self.target)
def test_delete_policy_minimum_bandwidth_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_minimum_bandwidth_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_minimum_bandwidth_rule',
self.target)
class SystemAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase):
def setUp(self):
@ -1595,3 +1842,60 @@ class ProjectReaderQosMinimumPacketRateRuleTests(
def setUp(self):
super(ProjectReaderQosMinimumPacketRateRuleTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleQosMinimumPacketRateRuleTests(QosRulesAPITestCase):
def setUp(self):
super(ServiceRoleQosMinimumPacketRateRuleTests, self).setUp()
self.context = self.service_ctx
def test_get_policy_minimum_packet_rate_rule(self):
with mock.patch.object(self.plugin_mock, "get_policy",
return_value=self.qos_policy):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_policy_minimum_packet_rate_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_alias_minimum_packet_rate_rule',
self.target)
def test_create_policy_minimum_packet_rate_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_policy_minimum_packet_rate_rule',
self.target)
def test_update_policy_minimum_packet_rate_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_policy_minimum_packet_rate_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_alias_minimum_packet_rate_rule',
self.target)
def test_delete_policy_minimum_packet_rate_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_policy_minimum_packet_rate_rule',
self.target)
# And the same for aliases
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_alias_minimum_packet_rate_rule',
self.target)

View File

@ -145,3 +145,28 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(QuoatsAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_quota(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_quota', self.target)
def test_update_quota(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_quota', self.target)
def test_delete_quota(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_quota', self.target)

View File

@ -268,3 +268,42 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_rbac_policy', self.alt_target)
class ServiceRoleTests(RbacAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_rbac_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_rbac_policy', self.target)
def test_create_rbac_policy_target_tenant(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_rbac_policy:target_tenant',
self.wildcard_target)
def test_update_rbac_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_rbac_policy', self.target)
def test_update_rbac_policy_target_tenant(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_rbac_policy:target_tenant',
self.wildcard_target)
def test_get_rbac_policy(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_rbac_policy', self.target)

View File

@ -987,3 +987,140 @@ class ProjectReaderExtrarouteTests(ProjectMemberExtrarouteTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'remove_extraroutes', self.alt_target)
class ServiceRoleTests(RouterAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_router(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router', self.target)
def test_create_router_distributed(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router:distributed', self.target)
def test_create_router_ha(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router:ha', self.target)
def test_create_router_external_gateway_info(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router:external_gateway_info',
self.target)
def test_create_router_external_gateway_info_network_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router:external_gateway_info:network_id',
self.target)
def test_create_router_external_gateway_info_enable_snat(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_router:external_gateway_info:enable_snat',
self.target)
def test_create_router_external_gateway_info_external_fixed_ips(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context,
'create_router:external_gateway_info:external_fixed_ips',
self.target)
def test_get_router(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_router', self.target)
def test_get_router_distributed(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_router:distributed', self.target)
def test_get_router_ha(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_router:ha', self.target)
def test_update_router(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router', self.target)
def test_update_router_distributed(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router:distributed', self.target)
def test_update_router_ha(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router:ha', self.target)
def test_update_router_external_gateway_info(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router:external_gateway_info',
self.target)
def test_update_router_external_gateway_info_network_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router:external_gateway_info:network_id',
self.target)
def test_update_router_external_gateway_info_enable_snat(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_router:external_gateway_info:enable_snat',
self.target)
def test_update_router_external_gateway_info_external_fixed_ips(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context,
'update_router:external_gateway_info:external_fixed_ips',
self.target)
def test_delete_router(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_router', self.target)
def test_add_router_interface(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'add_router_interface', self.target)
def test_remove_router_interface(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'remove_router_interface', self.target)

View File

@ -202,6 +202,37 @@ class ProjectReaderSecurityGroupTests(ProjectMemberSecurityGroupTests):
self.context, 'delete_security_group', self.alt_target)
class ServiceRoleSecurityGroupTests(SecurityGroupAPITestCase):
def setUp(self):
super(ServiceRoleSecurityGroupTests, self).setUp()
self.context = self.service_ctx
def test_create_security_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group', self.target)
def test_get_security_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_security_group', self.target)
def test_update_security_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_security_group', self.target)
def test_delete_security_group(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group', self.target)
class SecurityGroupRuleAPITestCase(base.PolicyBaseTestCase):
def setUp(self):
@ -378,3 +409,28 @@ class ProjectReaderSecurityGroupRuleTests(ProjectMemberSecurityGroupRuleTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', self.alt_target)
class ServiceRoleSecurityGroupRuleTests(SecurityGroupRuleAPITestCase):
def setUp(self):
super(ServiceRoleSecurityGroupRuleTests, self).setUp()
self.context = self.service_ctx
def test_create_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_security_group_rule', self.target)
def test_get_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_security_group_rule', self.target)
def test_delete_security_group_rule(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_security_group_rule', self.target)

View File

@ -130,3 +130,34 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(SegmentAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_segment(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_segment', self.target)
def test_get_segment(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_segment', self.target)
def test_update_segment(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_segment', self.target)
def test_delete_segment(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_segment', self.target)

View File

@ -76,3 +76,16 @@ class ProjectReaderTests(ProjectMemberTests):
def setUp(self):
super(ProjectReaderTests, self).setUp()
self.context = self.project_reader_ctx
class ServiceRoleTests(ServiceTypeAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_get_service_provider(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_service_provider', self.target)

View File

@ -366,3 +366,64 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnet', self.alt_target)
class ServiceRoleTests(SubnetAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_subnet(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_subnet', self.target)
def test_create_subnet_segment_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_subnet:segment_id', self.target)
def test_create_subnet_service_types(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_subnet:service_types', self.target)
def test_get_subnet(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subnet', self.target)
def test_get_subnet_segment_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subnet:segment_id', self.target)
def test_update_subnet(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnet', self.target)
def test_update_subnet_segment_id(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnet:segment_id', self.target)
def test_update_subnet_service_types(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnet:service_types', self.target)
def test_delete_subnet(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnet', self.target)

View File

@ -382,3 +382,70 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'remove_prefixes', self.alt_target)
class ServiceRoleTests(SubnetpoolAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_subnetpool', self.target)
def test_create_subnetpool_shared(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_subnetpool:shared', self.target)
def test_create_subnetpool_default(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_subnetpool:is_default', self.target)
def test_get_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subnetpool', self.target)
def test_update_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnetpool', self.target)
def test_update_subnetpool_default(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_subnetpool:is_default', self.target)
def test_delete_subnetpool(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_subnetpool', self.target)
def test_onboard_network_subnets(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'onboard_network_subnets', self.target)
def test_add_prefixes(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'add_prefixes', self.target)
def test_remove_prefixes(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'remove_prefixes', self.target)

View File

@ -285,3 +285,52 @@ class ProjectReaderTests(ProjectMemberTests):
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'remove_subports', self.alt_target)
class ServiceRoleTests(TrunkAPITestCase):
def setUp(self):
super(ServiceRoleTests, self).setUp()
self.context = self.service_ctx
def test_create_trunk(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'create_trunk', self.target)
def test_get_trunk(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_trunk', self.target)
def test_update_trunk(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'update_trunk', self.target)
def test_delete_trunk(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'delete_trunk', self.target)
def test_get_subports(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'get_subports', self.target)
def test_add_subports(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'add_subports', self.target)
def test_remove_subports(self):
self.assertRaises(
base_policy.PolicyNotAuthorized,
policy.enforce,
self.context, 'remove_subports', self.target)

View File

@ -260,6 +260,15 @@ class NeutronDbPluginV2TestCase(testlib_api.WebTestCase):
roles=['admin', 'member', 'reader'])
return req
def _service_req(self, method, resource, data=None, fmt=None, id=None,
params=None, action=None, subresource=None, sub_id=None,
ctx=None, headers=None):
req = self._req(method, resource, data, fmt, id, params, action,
subresource, sub_id, ctx, headers)
req.environ['neutron.context'] = context.Context(
'service-user', 'service-project', roles=['service'])
return req
def _member_req(self, method, resource, data=None, fmt=None, id=None,
params=None, action=None, subresource=None, sub_id=None,
ctx=None, headers=None, tenant_id=None):

View File

@ -372,7 +372,9 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
data['binding'].update(kwargs)
binding_resource = 'ports/%s/bindings' % port_id
binding_req = self.new_create_request(
binding_resource, data, fmt, as_admin=True)
binding_resource, data, fmt)
binding_req.environ['neutron.context'] = context.Context(
'service', 'service', roles=['service'])
return binding_req.get_response(self.api)
def _make_port_binding(self, fmt, port_id, host, **kwargs):
@ -396,7 +398,8 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
return self.deserialize(fmt, res)
def _activate_port_binding(self, port_id, host, raw_response=True):
response = self._req('PUT', 'ports', id=port_id,
response = self._service_req(
'PUT', 'ports', id=port_id,
data={'port_id': port_id},
subresource='bindings', sub_id=host,
action='activate').get_response(self.api)
@ -410,20 +413,20 @@ class ExtendedPortBindingTestCase(test_plugin.NeutronDbPluginV2TestCase):
return self.deserialize(self.fmt, response)
def _list_port_bindings(self, port_id, params=None, raw_response=True):
response = self._req(
response = self._service_req(
'GET', 'ports', fmt=self.fmt, id=port_id, subresource='bindings',
params=params).get_response(self.api)
return self._check_code_and_serialize(response, raw_response)
def _show_port_binding(self, port_id, host, params=None,
raw_response=True):
response = self._req(
response = self._service_req(
'GET', 'ports', fmt=self.fmt, id=port_id, subresource='bindings',
sub_id=host, params=params).get_response(self.api)
return self._check_code_and_serialize(response, raw_response)
def _delete_port_binding(self, port_id, host):
response = self._req(
response = self._service_req(
'DELETE', 'ports', fmt=self.fmt, id=port_id,
subresource='bindings', sub_id=host).get_response(self.api)
return response

View File

@ -0,0 +1,10 @@
---
features:
- |
Support for new ``service`` role is added to the Neutron API policies as
part of the Secure-RBAC initiative. This new role is designed to be used for
the service-to-service communication.
deprecations:
- |
Old role ``advsvc`` used in the Neutron API policies is now deprecated. New
``service`` role should be used for service-to-service communication.

View File

@ -20,7 +20,7 @@ Jinja2>=2.10 # BSD License (3 clause)
keystonemiddleware>=5.1.0 # Apache-2.0
netaddr>=0.7.18 # BSD
netifaces>=0.10.4 # MIT
neutron-lib>=3.7.0 # Apache-2.0
neutron-lib>=3.8.0 # Apache-2.0
python-neutronclient>=7.8.0 # Apache-2.0
tenacity>=6.0.0 # Apache-2.0
SQLAlchemy>=1.4.23 # MIT