diff --git a/neutron/conf/policies/address_scope.py b/neutron/conf/policies/address_scope.py index e30465d8f99..bc55beab820 100644 --- a/neutron/conf/policies/address_scope.py +++ b/neutron/conf/policies/address_scope.py @@ -31,7 +31,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_address_scope', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), description='Create an address scope', operations=[ { @@ -48,7 +50,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_address_scope:shared', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, description='Create a shared address scope', operations=[ { @@ -65,8 +67,10 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_address_scope', - check_str=base.policy_or(base.PROJECT_READER, - 'rule:shared_address_scopes'), + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER, + 'rule:shared_address_scopes'), description='Get an address scope', operations=[ { @@ -88,7 +92,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_address_scope', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), description='Update an address scope', operations=[ { @@ -105,7 +111,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_address_scope:shared', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, description='Update ``shared`` attribute of an address scope', operations=[ { @@ -122,7 +128,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_address_scope', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), description='Delete an address scope', operations=[ { diff --git a/neutron/conf/policies/floatingip.py b/neutron/conf/policies/floatingip.py index bd26c0668f6..85bbb6d69df 100644 --- a/neutron/conf/policies/floatingip.py +++ b/neutron/conf/policies/floatingip.py @@ -25,7 +25,9 @@ DEPRECATION_REASON = ( rules = [ policy.DocumentedRuleDefault( name='create_floatingip', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), description='Create a floating IP', operations=[ { @@ -42,7 +44,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_floatingip:floating_ip_address', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, description='Create a floating IP with a specific IP address', operations=[ { @@ -59,7 +61,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_floatingip', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), description='Get a floating IP', operations=[ { @@ -80,7 +84,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_floatingip', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), description='Update a floating IP', operations=[ { @@ -97,7 +103,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_floatingip', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), description='Delete a floating IP', operations=[ { diff --git a/neutron/conf/policies/metering.py b/neutron/conf/policies/metering.py index fa9e4a033ae..612713ed1c6 100644 --- a/neutron/conf/policies/metering.py +++ b/neutron/conf/policies/metering.py @@ -29,7 +29,7 @@ RULE_RESOURCE_PATH = '/metering/metering-label-rules/{id}' rules = [ policy.DocumentedRuleDefault( name='create_metering_label', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a metering label', operations=[ @@ -46,7 +46,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_metering_label', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a metering label', operations=[ @@ -67,7 +69,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_metering_label', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a metering label', operations=[ @@ -84,7 +86,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_metering_label_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a metering label rule', operations=[ @@ -101,7 +103,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_metering_label_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a metering label rule', operations=[ @@ -122,7 +126,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_metering_label_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a metering label rule', operations=[ diff --git a/neutron/conf/policies/network.py b/neutron/conf/policies/network.py index ad05ed85969..65d2c85744b 100644 --- a/neutron/conf/policies/network.py +++ b/neutron/conf/policies/network.py @@ -45,7 +45,9 @@ rules = [ policy.DocumentedRuleDefault( name='create_network', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Create a network', operations=ACTION_POST, @@ -57,7 +59,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:shared', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a shared network', operations=ACTION_POST, @@ -69,7 +71,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:router:external', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create an external network', operations=ACTION_POST, @@ -81,7 +83,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:is_default', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Specify ``is_default`` attribute when creating a network', operations=ACTION_POST, @@ -93,7 +95,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:port_security_enabled', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description=( 'Specify ``port_security_enabled`` ' @@ -108,7 +112,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:segments', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Specify ``segments`` attribute when creating a network', operations=ACTION_POST, @@ -120,7 +124,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:provider:network_type', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``provider:network_type`` ' @@ -135,7 +139,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:provider:physical_network', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``provider:physical_network`` ' @@ -150,7 +154,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_network:provider:segmentation_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``provider:segmentation_id`` when creating a network' @@ -166,6 +170,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_network', check_str=base.policy_or( + base.ADMIN, base.PROJECT_READER, 'rule:shared', 'rule:external', @@ -186,7 +191,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_network:router:external', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get ``router:external`` attribute of a network', operations=ACTION_GET, @@ -198,7 +205,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_network:segments', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``segments`` attribute of a network', operations=ACTION_GET, @@ -210,7 +217,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_network:provider:network_type', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``provider:network_type`` attribute of a network', operations=ACTION_GET, @@ -222,7 +229,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_network:provider:physical_network', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``provider:physical_network`` attribute of a network', operations=ACTION_GET, @@ -234,7 +241,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_network:provider:segmentation_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``provider:segmentation_id`` attribute of a network', operations=ACTION_GET, @@ -247,7 +254,9 @@ rules = [ policy.DocumentedRuleDefault( name='update_network', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Update a network', operations=ACTION_PUT, @@ -259,7 +268,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:segments', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``segments`` attribute of a network', operations=ACTION_PUT, @@ -271,7 +280,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:shared', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``shared`` attribute of a network', operations=ACTION_PUT, @@ -283,7 +292,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:provider:network_type', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``provider:network_type`` attribute of a network', operations=ACTION_PUT, @@ -295,7 +304,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:provider:physical_network', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Update ``provider:physical_network`` ' @@ -310,7 +319,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:provider:segmentation_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Update ``provider:segmentation_id`` ' @@ -325,7 +334,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:router:external', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``router:external`` attribute of a network', operations=ACTION_PUT, @@ -337,7 +346,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:is_default', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``is_default`` attribute of a network', operations=ACTION_PUT, @@ -349,7 +358,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_network:port_security_enabled', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Update ``port_security_enabled`` attribute of a network', operations=ACTION_PUT, @@ -362,7 +373,9 @@ rules = [ policy.DocumentedRuleDefault( name='delete_network', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Delete a network', operations=ACTION_DELETE, diff --git a/neutron/conf/policies/port.py b/neutron/conf/policies/port.py index 5b9d20c9905..f928180b07e 100644 --- a/neutron/conf/policies/port.py +++ b/neutron/conf/policies/port.py @@ -51,7 +51,9 @@ rules = [ policy.DocumentedRuleDefault( name='create_port', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Create a port', operations=ACTION_POST, @@ -65,7 +67,7 @@ rules = [ name='create_port:device_owner', check_str=base.policy_or( 'not rule:network_device', - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_ADVSVC, base.RULE_NET_OWNER ), @@ -86,7 +88,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN), + base.ADMIN), scope_types=['project'], description='Specify ``mac_address`` attribute when creating a port', operations=ACTION_POST, @@ -103,7 +105,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN, + base.ADMIN, 'rule:shared'), scope_types=['project'], description='Specify ``fixed_ips`` information when creating a port', @@ -122,7 +124,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN), + base.ADMIN), scope_types=['project'], description='Specify IP address in ``fixed_ips`` when creating a port', operations=ACTION_POST, @@ -139,7 +141,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN, + base.ADMIN, 'rule:shared'), scope_types=['project'], description='Specify subnet ID in ``fixed_ips`` when creating a port', @@ -158,7 +160,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN), + base.ADMIN), scope_types=['project'], description=( 'Specify ``port_security_enabled`` ' @@ -175,7 +177,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_port:binding:host_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``binding:host_id`` ' @@ -190,7 +192,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_port:binding:profile', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``binding:profile`` attribute ' @@ -205,7 +207,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_port:binding:vnic_type', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description=( 'Specify ``binding:vnic_type`` ' @@ -221,7 +225,7 @@ rules = [ policy.DocumentedRuleDefault( name='create_port:allowed_address_pairs', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_NET_OWNER), scope_types=['project'], description=( @@ -238,7 +242,7 @@ rules = [ policy.DocumentedRuleDefault( name='create_port:allowed_address_pairs:mac_address', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_NET_OWNER), scope_types=['project'], description=( @@ -255,7 +259,7 @@ rules = [ policy.DocumentedRuleDefault( name='create_port:allowed_address_pairs:ip_address', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_NET_OWNER), scope_types=['project'], description=( @@ -273,6 +277,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_port', check_str=base.policy_or( + base.ADMIN, base.RULE_ADVSVC, base.PROJECT_READER ), @@ -289,7 +294,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_port:binding:vif_type', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``binding:vif_type`` attribute of a port', operations=ACTION_GET, @@ -301,7 +306,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_port:binding:vif_details', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``binding:vif_details`` attribute of a port', operations=ACTION_GET, @@ -313,7 +318,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_port:binding:host_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``binding:host_id`` attribute of a port', operations=ACTION_GET, @@ -325,7 +330,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_port:binding:profile', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``binding:profile`` attribute of a port', operations=ACTION_GET, @@ -337,7 +342,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_port:resource_request', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``resource_request`` attribute of a port', operations=ACTION_GET, @@ -353,6 +358,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port', check_str=base.policy_or( + base.ADMIN, base.PROJECT_MEMBER, base.RULE_ADVSVC ), @@ -373,7 +379,7 @@ rules = [ 'not rule:network_device', base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN + base.ADMIN ), scope_types=['project'], description='Update ``device_owner`` attribute of a port', @@ -390,7 +396,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port:mac_address', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_ADVSVC ), scope_types=['project'], @@ -409,7 +415,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN + base.ADMIN ), scope_types=['project'], description='Specify ``fixed_ips`` information when updating a port', @@ -427,7 +433,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN + base.ADMIN ), scope_types=['project'], description=( @@ -448,7 +454,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN, + base.ADMIN, 'rule:shared' ), scope_types=['project'], @@ -471,7 +477,7 @@ rules = [ check_str=base.policy_or( base.RULE_ADVSVC, base.RULE_NET_OWNER, - base.PROJECT_ADMIN + base.ADMIN ), scope_types=['project'], description='Update ``port_security_enabled`` attribute of a port', @@ -486,7 +492,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_port:binding:host_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``binding:host_id`` attribute of a port', operations=ACTION_PUT, @@ -498,7 +504,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_port:binding:profile', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``binding:profile`` attribute of a port', operations=ACTION_PUT, @@ -511,6 +517,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port:binding:vnic_type', check_str=base.policy_or( + base.ADMIN, base.PROJECT_MEMBER, base.RULE_ADVSVC ), @@ -528,7 +535,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port:allowed_address_pairs', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_NET_OWNER), scope_types=['project'], description='Update ``allowed_address_pairs`` attribute of a port', @@ -542,7 +549,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port:allowed_address_pairs:mac_address', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_NET_OWNER), scope_types=['project'], description=( @@ -559,7 +566,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port:allowed_address_pairs:ip_address', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, base.RULE_NET_OWNER), scope_types=['project'], description=( @@ -576,7 +583,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_port:data_plane_status', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, 'role:data_plane_integrator'), scope_types=['project'], description='Update ``data_plane_status`` attribute of a port', @@ -591,6 +598,7 @@ rules = [ policy.DocumentedRuleDefault( name='delete_port', check_str=base.policy_or( + base.ADMIN, base.RULE_ADVSVC, base.PROJECT_MEMBER ), diff --git a/neutron/conf/policies/qos.py b/neutron/conf/policies/qos.py index 0aa0d47d5de..5b3eae8273a 100644 --- a/neutron/conf/policies/qos.py +++ b/neutron/conf/policies/qos.py @@ -23,7 +23,9 @@ The QoS API now supports project scope and default roles. rules = [ policy.DocumentedRuleDefault( name='get_policy', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get QoS policies', operations=[ @@ -44,7 +46,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_policy', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a QoS policy', operations=[ @@ -61,7 +63,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_policy', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS policy', operations=[ @@ -78,7 +80,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_policy', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS policy', operations=[ @@ -118,7 +120,9 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_bandwidth_limit_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS bandwidth limit rule', operations=[ @@ -140,7 +144,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_policy_bandwidth_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a QoS bandwidth limit rule', operations=[ @@ -157,7 +161,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_policy_bandwidth_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS bandwidth limit rule', operations=[ @@ -175,7 +179,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_policy_bandwidth_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS bandwidth limit rule', operations=[ @@ -194,7 +198,9 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_packet_rate_limit_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS packet rate limit rule', operations=[ @@ -211,7 +217,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_policy_packet_rate_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a QoS packet rate limit rule', operations=[ @@ -223,7 +229,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_policy_packet_rate_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS packet rate limit rule', operations=[ @@ -236,7 +242,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_policy_packet_rate_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS packet rate limit rule', operations=[ @@ -250,7 +256,9 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_dscp_marking_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS DSCP marking rule', operations=[ @@ -272,7 +280,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_policy_dscp_marking_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a QoS DSCP marking rule', operations=[ @@ -289,7 +297,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_policy_dscp_marking_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS DSCP marking rule', operations=[ @@ -307,7 +315,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_policy_dscp_marking_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS DSCP marking rule', operations=[ @@ -326,7 +334,9 @@ rules = [ policy.DocumentedRuleDefault( name='get_policy_minimum_bandwidth_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS minimum bandwidth rule', operations=[ @@ -348,7 +358,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_policy_minimum_bandwidth_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a QoS minimum bandwidth rule', operations=[ @@ -365,7 +375,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_policy_minimum_bandwidth_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS minimum bandwidth rule', operations=[ @@ -383,7 +393,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_policy_minimum_bandwidth_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS minimum bandwidth rule', operations=[ @@ -401,7 +411,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_policy_minimum_packet_rate_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS minimum packet rate rule', operations=[ @@ -418,7 +430,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_policy_minimum_packet_rate_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a QoS minimum packet rate rule', operations=[ @@ -430,7 +442,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_policy_minimum_packet_rate_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS minimum packet rate rule', operations=[ @@ -443,7 +455,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_policy_minimum_packet_rate_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS minimum packet rate rule', operations=[ @@ -456,7 +468,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_alias_bandwidth_limit_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS bandwidth limit rule through alias', operations=[ @@ -473,7 +487,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_alias_bandwidth_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS bandwidth limit rule through alias', operations=[ @@ -490,7 +504,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_alias_bandwidth_limit_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS bandwidth limit rule through alias', operations=[ @@ -507,7 +521,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_alias_dscp_marking_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS DSCP marking rule through alias', operations=[ @@ -524,7 +540,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_alias_dscp_marking_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS DSCP marking rule through alias', operations=[ @@ -541,7 +557,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_alias_dscp_marking_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS DSCP marking rule through alias', operations=[ @@ -558,7 +574,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_alias_minimum_bandwidth_rule', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a QoS minimum bandwidth rule through alias', operations=[ @@ -575,7 +593,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_alias_minimum_bandwidth_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update a QoS minimum bandwidth rule through alias', operations=[ @@ -592,7 +610,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_alias_minimum_bandwidth_rule', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Delete a QoS minimum bandwidth rule through alias', operations=[ @@ -610,6 +628,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_alias_minimum_packet_rate_rule', check_str='rule:get_policy_minimum_packet_rate_rule', + scope_types=['project'], description='Get a QoS minimum packet rate rule through alias', operations=[ { @@ -621,6 +640,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_alias_minimum_packet_rate_rule', check_str='rule:update_policy_minimum_packet_rate_rule', + scope_types=['project'], description='Update a QoS minimum packet rate rule through alias', operations=[ { @@ -632,6 +652,7 @@ rules = [ policy.DocumentedRuleDefault( name='delete_alias_minimum_packet_rate_rule', check_str='rule:delete_policy_minimum_packet_rate_rule', + scope_types=['project'], description='Delete a QoS minimum packet rate rule through alias', operations=[ { diff --git a/neutron/conf/policies/rbac.py b/neutron/conf/policies/rbac.py index dae163d31b8..d275166892e 100644 --- a/neutron/conf/policies/rbac.py +++ b/neutron/conf/policies/rbac.py @@ -36,7 +36,9 @@ rules = [ policy.DocumentedRuleDefault( name='create_rbac_policy', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Create an RBAC policy', operations=[ @@ -56,7 +58,7 @@ rules = [ policy.DocumentedRuleDefault( name='create_rbac_policy:target_tenant', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, '(not field:rbac_policy:target_tenant=* and ' 'not field:rbac_policy:target_project=*)'), description='Specify ``target_tenant`` when creating an RBAC policy', @@ -75,7 +77,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_rbac_policy', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Update an RBAC policy', operations=[ @@ -95,7 +99,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_rbac_policy:target_tenant', check_str=base.policy_or( - base.PROJECT_ADMIN, + base.ADMIN, '(not field:rbac_policy:target_tenant=* and ' 'not field:rbac_policy:target_project=*)'), description='Update ``target_tenant`` attribute of an RBAC policy', @@ -116,7 +120,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_rbac_policy', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get an RBAC policy', operations=[ @@ -137,7 +143,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_rbac_policy', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Delete an RBAC policy', operations=[ diff --git a/neutron/conf/policies/router.py b/neutron/conf/policies/router.py index 94d2fd0719d..2a56a833fd6 100644 --- a/neutron/conf/policies/router.py +++ b/neutron/conf/policies/router.py @@ -39,7 +39,9 @@ ACTION_GET = [ rules = [ policy.DocumentedRuleDefault( name='create_router', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Create a router', operations=ACTION_POST, @@ -51,7 +53,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_router:distributed', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Specify ``distributed`` attribute when creating a router', operations=ACTION_POST, @@ -63,7 +65,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_router:ha', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Specify ``ha`` attribute when creating a router', operations=ACTION_POST, @@ -75,7 +77,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_router:external_gateway_info', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description=('Specify ``external_gateway_info`` ' 'information when creating a router'), @@ -88,7 +92,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_router:external_gateway_info:network_id', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description=('Specify ``network_id`` in ``external_gateway_info`` ' 'information when creating a router'), @@ -101,7 +107,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_router:external_gateway_info:enable_snat', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=('Specify ``enable_snat`` in ``external_gateway_info`` ' 'information when creating a router'), @@ -114,7 +120,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_router:external_gateway_info:external_fixed_ips', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=('Specify ``external_fixed_ips`` in ' '``external_gateway_info`` information when creating a ' @@ -129,7 +135,9 @@ rules = [ policy.DocumentedRuleDefault( name='get_router', - check_str=base.PROJECT_READER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_READER), scope_types=['project'], description='Get a router', operations=ACTION_GET, @@ -141,7 +149,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_router:distributed', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``distributed`` attribute of a router', operations=ACTION_GET, @@ -153,7 +161,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_router:ha', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``ha`` attribute of a router', operations=ACTION_GET, @@ -166,7 +174,9 @@ rules = [ policy.DocumentedRuleDefault( name='update_router', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Update a router', operations=ACTION_PUT, @@ -178,7 +188,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_router:distributed', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``distributed`` attribute of a router', operations=ACTION_PUT, @@ -190,7 +200,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_router:ha', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``ha`` attribute of a router', operations=ACTION_PUT, @@ -202,7 +212,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_router:external_gateway_info', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Update ``external_gateway_info`` information of a router', operations=ACTION_PUT, @@ -214,7 +226,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_router:external_gateway_info:network_id', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description=('Update ``network_id`` attribute of ' '``external_gateway_info`` information of a router'), @@ -227,7 +241,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_router:external_gateway_info:enable_snat', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=('Update ``enable_snat`` attribute of ' '``external_gateway_info`` information of a router'), @@ -240,7 +254,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_router:external_gateway_info:external_fixed_ips', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=('Update ``external_fixed_ips`` attribute of ' '``external_gateway_info`` information of a router'), @@ -254,7 +268,9 @@ rules = [ policy.DocumentedRuleDefault( name='delete_router', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Delete a router', operations=ACTION_DELETE, @@ -267,7 +283,9 @@ rules = [ policy.DocumentedRuleDefault( name='add_router_interface', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Add an interface to a router', operations=[ @@ -284,7 +302,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='remove_router_interface', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Remove an interface from a router', operations=[ @@ -301,7 +321,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='add_extraroutes', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Add extra route to a router', operations=[ @@ -318,7 +340,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='remove_extraroutes', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Remove extra route from a router', operations=[ diff --git a/neutron/conf/policies/subnet.py b/neutron/conf/policies/subnet.py index 4a40bf7cff7..f64a3268ccc 100644 --- a/neutron/conf/policies/subnet.py +++ b/neutron/conf/policies/subnet.py @@ -40,6 +40,7 @@ rules = [ policy.DocumentedRuleDefault( name='create_subnet', check_str=base.policy_or( + base.ADMIN, base.PROJECT_MEMBER, base.RULE_NET_OWNER), scope_types=['project'], @@ -53,7 +54,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_subnet:segment_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``segment_id`` attribute when creating a subnet' @@ -67,7 +68,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_subnet:service_types', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``service_types`` attribute when creating a subnet' @@ -82,6 +83,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_subnet', check_str=base.policy_or( + base.ADMIN, base.PROJECT_READER, 'rule:shared'), scope_types=['project'], @@ -97,7 +99,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='get_subnet:segment_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Get ``segment_id`` attribute of a subnet', operations=ACTION_GET, @@ -110,6 +112,7 @@ rules = [ policy.DocumentedRuleDefault( name='update_subnet', check_str=base.policy_or( + base.ADMIN, base.PROJECT_MEMBER, base.RULE_NET_OWNER), scope_types=['project'], @@ -123,7 +126,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_subnet:segment_id', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``segment_id`` attribute of a subnet', operations=ACTION_PUT, @@ -135,7 +138,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_subnet:service_types', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``service_types`` attribute of a subnet', operations=ACTION_PUT, @@ -148,6 +151,7 @@ rules = [ policy.DocumentedRuleDefault( name='delete_subnet', check_str=base.policy_or( + base.ADMIN, base.PROJECT_MEMBER, base.RULE_NET_OWNER), scope_types=['project'], diff --git a/neutron/conf/policies/subnetpool.py b/neutron/conf/policies/subnetpool.py index 9a22c3dc51a..9be4a09556b 100644 --- a/neutron/conf/policies/subnetpool.py +++ b/neutron/conf/policies/subnetpool.py @@ -33,7 +33,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_subnetpool', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Create a subnetpool', operations=[ @@ -50,7 +52,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_subnetpool:shared', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Create a shared subnetpool', operations=[ @@ -67,7 +69,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='create_subnetpool:is_default', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description=( 'Specify ``is_default`` attribute when creating a subnetpool' @@ -87,6 +89,7 @@ rules = [ policy.DocumentedRuleDefault( name='get_subnetpool', check_str=base.policy_or( + base.ADMIN, base.PROJECT_READER, 'rule:shared_subnetpools' ), @@ -112,7 +115,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_subnetpool', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Update a subnetpool', operations=[ @@ -129,7 +134,7 @@ rules = [ ), policy.DocumentedRuleDefault( name='update_subnetpool:is_default', - check_str=base.PROJECT_ADMIN, + check_str=base.ADMIN, scope_types=['project'], description='Update ``is_default`` attribute of a subnetpool', operations=[ @@ -146,7 +151,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='delete_subnetpool', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Delete a subnetpool', operations=[ @@ -163,7 +170,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='onboard_network_subnets', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Onboard existing subnet into a subnetpool', operations=[ @@ -180,7 +189,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='add_prefixes', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Add prefixes to a subnetpool', operations=[ @@ -197,7 +208,9 @@ rules = [ ), policy.DocumentedRuleDefault( name='remove_prefixes', - check_str=base.PROJECT_MEMBER, + check_str=base.policy_or( + base.ADMIN, + base.PROJECT_MEMBER), scope_types=['project'], description='Remove unallocated prefixes from a subnetpool', operations=[ diff --git a/neutron/tests/unit/conf/policies/test_address_scope.py b/neutron/tests/unit/conf/policies/test_address_scope.py index 009195029fd..f4ee631c6fd 100644 --- a/neutron/tests/unit/conf/policies/test_address_scope.py +++ b/neutron/tests/unit/conf/policies/test_address_scope.py @@ -114,6 +114,56 @@ class ProjectAdminTests(AddressScopeAPITestCase): super(ProjectAdminTests, self).setUp() self.context = self.project_admin_ctx + def test_create_address_scope(self): + self.assertTrue( + policy.enforce(self.context, 'create_address_scope', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_address_scope', self.alt_target)) + + def test_create_address_scope_shared(self): + self.assertTrue( + policy.enforce( + self.context, 'create_address_scope:shared', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_address_scope:shared', self.alt_target)) + + def test_get_address_scope(self): + self.assertTrue( + policy.enforce(self.context, 'get_address_scope', self.target)) + self.assertTrue( + policy.enforce(self.context, 'get_address_scope', self.alt_target)) + + def test_update_address_scope(self): + self.assertTrue( + policy.enforce(self.context, 'update_address_scope', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_address_scope', self.alt_target)) + + def test_update_address_scope_shared(self): + self.assertTrue( + policy.enforce( + self.context, 'update_address_scope:shared', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_address_scope:shared', self.alt_target)) + + def test_delete_address_scope(self): + self.assertTrue( + policy.enforce(self.context, 'delete_address_scope', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_address_scope', self.alt_target)) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + def test_create_address_scope(self): self.assertTrue( policy.enforce(self.context, 'create_address_scope', self.target)) @@ -123,9 +173,10 @@ class ProjectAdminTests(AddressScopeAPITestCase): self.context, 'create_address_scope', self.alt_target) def test_create_address_scope_shared(self): - self.assertTrue( - policy.enforce( - self.context, 'create_address_scope:shared', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_address_scope:shared', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -148,9 +199,10 @@ class ProjectAdminTests(AddressScopeAPITestCase): self.context, 'update_address_scope', self.alt_target) def test_update_address_scope_shared(self): - self.assertTrue( - policy.enforce( - self.context, 'update_address_scope:shared', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_address_scope:shared', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -165,33 +217,6 @@ class ProjectAdminTests(AddressScopeAPITestCase): self.context, 'delete_address_scope', self.alt_target) -class ProjectMemberTests(ProjectAdminTests): - - def setUp(self): - super(ProjectMemberTests, self).setUp() - self.context = self.project_member_ctx - - def test_create_address_scope_shared(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_address_scope:shared', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_address_scope:shared', self.alt_target) - - def test_update_address_scope_shared(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_address_scope:shared', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_address_scope:shared', self.alt_target) - - class ProjectReaderTests(ProjectMemberTests): def setUp(self): diff --git a/neutron/tests/unit/conf/policies/test_floatingip.py b/neutron/tests/unit/conf/policies/test_floatingip.py index 7c0d18ebfd6..fb8d5de248f 100644 --- a/neutron/tests/unit/conf/policies/test_floatingip.py +++ b/neutron/tests/unit/conf/policies/test_floatingip.py @@ -113,16 +113,58 @@ class ProjectAdminTests(FloatingIPAPITestCase): def test_create_floatingip(self): self.assertTrue( policy.enforce(self.context, "create_floatingip", self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, "create_floatingip", self.alt_target) + self.assertTrue( + policy.enforce(self.context, "create_floatingip", self.alt_target)) def test_create_floatingip_with_ip_address(self): self.assertTrue( policy.enforce( self.context, "create_floatingip:floating_ip_address", self.target)) + self.assertTrue( + policy.enforce( + self.context, + "create_floatingip:floating_ip_address", self.alt_target)) + + def test_get_floatingip(self): + self.assertTrue( + policy.enforce(self.context, "get_floatingip", self.target)) + self.assertTrue( + policy.enforce(self.context, "get_floatingip", self.alt_target)) + + def test_update_floatingip(self): + self.assertTrue( + policy.enforce(self.context, "update_floatingip", self.target)) + self.assertTrue( + policy.enforce(self.context, "update_floatingip", self.alt_target)) + + def test_delete_floatingip(self): + self.assertTrue( + policy.enforce(self.context, "delete_floatingip", self.target)) + self.assertTrue( + policy.enforce(self.context, "delete_floatingip", self.alt_target)) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + + def test_create_floatingip(self): + self.assertTrue( + policy.enforce(self.context, "create_floatingip", self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, "create_floatingip", self.alt_target) + + def test_create_floatingip_with_ip_address(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, "create_floatingip:floating_ip_address", + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -151,25 +193,6 @@ class ProjectAdminTests(FloatingIPAPITestCase): policy.enforce, self.context, "delete_floatingip", self.alt_target) -class ProjectMemberTests(ProjectAdminTests): - - def setUp(self): - super(ProjectMemberTests, self).setUp() - self.context = self.project_member_ctx - - def test_create_floatingip_with_ip_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, "create_floatingip:floating_ip_address", - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, "create_floatingip:floating_ip_address", - self.alt_target) - - class ProjectReaderTests(ProjectMemberTests): def setUp(self): diff --git a/neutron/tests/unit/conf/policies/test_metering.py b/neutron/tests/unit/conf/policies/test_metering.py index df53a3fca9b..9dff9fb0beb 100644 --- a/neutron/tests/unit/conf/policies/test_metering.py +++ b/neutron/tests/unit/conf/policies/test_metering.py @@ -117,53 +117,47 @@ class ProjectAdminTests(MeteringAPITestCase): def test_create_metering_label(self): self.assertTrue( policy.enforce(self.context, 'create_metering_label', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_metering_label', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'create_metering_label', self.alt_target)) def test_get_metering_label(self): self.assertTrue( policy.enforce(self.context, 'get_metering_label', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_metering_label', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'get_metering_label', self.alt_target)) def test_delete_metering_label(self): self.assertTrue( policy.enforce(self.context, 'delete_metering_label', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_metering_label', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'delete_metering_label', self.alt_target)) def test_create_metering_label_rule(self): self.assertTrue( policy.enforce( self.context, 'create_metering_label_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_metering_label_rule', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'create_metering_label_rule', self.alt_target)) def test_get_metering_label_rule(self): self.assertTrue( policy.enforce( self.context, 'get_metering_label_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_metering_label_rule', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'get_metering_label_rule', self.alt_target)) def test_delete_metering_label_rule(self): self.assertTrue( policy.enforce( self.context, 'delete_metering_label_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_metering_label_rule', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'delete_metering_label_rule', self.alt_target)) class ProjectMemberTests(ProjectAdminTests): @@ -182,6 +176,14 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'create_metering_label', self.alt_target) + def test_get_metering_label(self): + self.assertTrue( + policy.enforce(self.context, 'get_metering_label', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_metering_label', self.alt_target) + def test_delete_metering_label(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -202,6 +204,15 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'create_metering_label_rule', self.alt_target) + def test_get_metering_label_rule(self): + self.assertTrue( + policy.enforce( + self.context, 'get_metering_label_rule', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_metering_label_rule', self.alt_target) + def test_delete_metering_label_rule(self): self.assertRaises( base_policy.PolicyNotAuthorized, diff --git a/neutron/tests/unit/conf/policies/test_network.py b/neutron/tests/unit/conf/policies/test_network.py index e3557ded8a1..00388dcb7ce 100644 --- a/neutron/tests/unit/conf/policies/test_network.py +++ b/neutron/tests/unit/conf/policies/test_network.py @@ -323,242 +323,214 @@ class ProjectAdminTests(NetworkAPITestCase): def test_create_network(self): self.assertTrue( policy.enforce(self.context, 'create_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'create_network', self.alt_target)) def test_create_network_shared(self): self.assertTrue( policy.enforce(self.context, 'create_network:shared', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:shared', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'create_network:shared', self.alt_target)) def test_create_network_external(self): self.assertTrue( policy.enforce(self.context, 'create_network:router:external', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:router:external', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:router:external', self.alt_target)) def test_create_network_default(self): self.assertTrue( policy.enforce(self.context, 'create_network:is_default', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:is_default', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:is_default', self.alt_target)) def test_create_network_port_security_enabled(self): self.assertTrue( policy.enforce(self.context, 'create_network:port_security_enabled', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:port_security_enabled', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:port_security_enabled', + self.alt_target)) def test_create_network_segments(self): self.assertTrue( policy.enforce(self.context, 'create_network:segments', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:segments', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:segments', self.alt_target)) def test_create_network_provider_network_type(self): self.assertTrue( policy.enforce(self.context, 'create_network:provider:network_type', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:provider:network_type', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:provider:network_type', + self.alt_target)) def test_create_network_provider_physical_network(self): self.assertTrue( policy.enforce(self.context, 'create_network:provider:physical_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:provider:physical_network', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:provider:physical_network', + self.alt_target)) def test_create_network_provider_segmentation_id(self): self.assertTrue( policy.enforce(self.context, 'create_network:provider:segmentation_id', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_network:provider:segmentation_id', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_network:provider:segmentation_id', + self.alt_target)) def test_get_network(self): self.assertTrue( policy.enforce(self.context, 'get_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_network', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'get_network', self.alt_target)) def test_get_network_external(self): self.assertTrue( policy.enforce(self.context, 'get_network:router:external', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_network:router:external', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'get_network:router:external', self.alt_target)) def test_get_network_segments(self): self.assertTrue( policy.enforce(self.context, 'get_network:segments', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_network:segments', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'get_network:segments', self.alt_target)) def test_get_network_provider_network_type(self): self.assertTrue( policy.enforce(self.context, 'get_network:provider:network_type', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_network:provider:network_type', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'get_network:provider:network_type', + self.alt_target)) def test_get_network_provider_physical_network(self): self.assertTrue( policy.enforce(self.context, 'get_network:provider:physical_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_network:provider:physical_network', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'get_network:provider:physical_network', + self.alt_target)) def test_get_network_provider_segmentation_id(self): self.assertTrue( policy.enforce(self.context, 'get_network:provider:segmentation_id', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_network:provider:segmentation_id', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'get_network:provider:segmentation_id', + self.alt_target)) def test_update_network(self): self.assertTrue( policy.enforce(self.context, 'update_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'update_network', self.alt_target)) def test_update_network_segments(self): self.assertTrue( policy.enforce(self.context, 'update_network:segments', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:segments', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:segments', self.alt_target)) def test_update_network_shared(self): self.assertTrue( policy.enforce(self.context, 'update_network:shared', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:shared', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'update_network:shared', self.alt_target)) def test_update_network_provider_network_type(self): self.assertTrue( policy.enforce(self.context, 'update_network:provider:network_type', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:provider:network_type', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:provider:network_type', + self.alt_target)) def test_update_network_provider_physical_network(self): self.assertTrue( policy.enforce(self.context, 'update_network:provider:physical_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:provider:physical_network', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:provider:physical_network', + self.alt_target)) def test_update_network_provider_segmentation_id(self): self.assertTrue( policy.enforce(self.context, 'update_network:provider:segmentation_id', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:provider:segmentation_id', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:provider:segmentation_id', + self.alt_target)) def test_update_network_external(self): self.assertTrue( policy.enforce(self.context, 'update_network:router:external', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:router:external', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:router:external', self.alt_target)) def test_update_network_default(self): self.assertTrue( policy.enforce(self.context, 'update_network:is_default', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:is_default', self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:is_default', self.alt_target)) def test_update_network_port_security_enabled(self): self.assertTrue( policy.enforce(self.context, 'update_network:port_security_enabled', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_network:port_security_enabled', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_network:port_security_enabled', + self.alt_target)) def test_delete_network(self): self.assertTrue( policy.enforce(self.context, 'delete_network', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_network', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'delete_network', self.alt_target)) class ProjectMemberTests(ProjectAdminTests): @@ -567,6 +539,14 @@ class ProjectMemberTests(ProjectAdminTests): super(ProjectMemberTests, self).setUp() self.context = self.project_member_ctx + def test_create_network(self): + self.assertTrue( + policy.enforce(self.context, 'create_network', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_network', self.alt_target) + def test_create_network_shared(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -597,6 +577,17 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'create_network:is_default', self.alt_target) + def test_create_network_port_security_enabled(self): + self.assertTrue( + policy.enforce(self.context, + 'create_network:port_security_enabled', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_network:port_security_enabled', + self.alt_target) + def test_create_network_segments(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -642,6 +633,23 @@ class ProjectMemberTests(ProjectAdminTests): self.context, 'create_network:provider:segmentation_id', self.alt_target) + def test_get_network(self): + self.assertTrue( + policy.enforce(self.context, 'get_network', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_network', self.alt_target) + + def test_get_network_external(self): + self.assertTrue( + policy.enforce(self.context, + 'get_network:router:external', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_network:router:external', self.alt_target) + def test_get_network_segments(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -686,6 +694,14 @@ class ProjectMemberTests(ProjectAdminTests): self.context, 'get_network:provider:segmentation_id', self.alt_target) + def test_update_network(self): + self.assertTrue( + policy.enforce(self.context, 'update_network', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_network', self.alt_target) + def test_update_network_segments(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -761,6 +777,25 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'update_network:is_default', self.alt_target) + def test_update_network_port_security_enabled(self): + self.assertTrue( + policy.enforce(self.context, + 'update_network:port_security_enabled', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_network:port_security_enabled', + self.alt_target) + + def test_delete_network(self): + self.assertTrue( + policy.enforce(self.context, 'delete_network', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_network', self.alt_target) + class ProjectReaderTests(ProjectMemberTests): diff --git a/neutron/tests/unit/conf/policies/test_port.py b/neutron/tests/unit/conf/policies/test_port.py index cc1acc1270f..5bc3d6dd8f7 100644 --- a/neutron/tests/unit/conf/policies/test_port.py +++ b/neutron/tests/unit/conf/policies/test_port.py @@ -63,18 +63,14 @@ class SystemAdminTests(PortAPITestCase): policy.enforce, self.context, 'create_port', self.alt_target) def test_create_port_with_device_owner(self): - target = self.target.copy() - target['device_owner'] = 'network:test' - alt_target = self.alt_target.copy() - alt_target['device_owner'] = 'network:test' self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'create_port:device_owner', - target) + self.target) self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'create_port:device_owner', - alt_target) + self.alt_target) def test_create_port_with_mac_address(self): self.assertRaises( @@ -259,18 +255,14 @@ class SystemAdminTests(PortAPITestCase): policy.enforce, self.context, 'update_port', self.alt_target) def test_update_port_with_device_owner(self): - target = self.target.copy() - target['device_owner'] = 'network:test' - alt_target = self.alt_target.copy() - alt_target['device_owner'] = 'network:test' self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'update_port:device_owner', - target) + self.target) self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'update_port:device_owner', - alt_target) + self.alt_target) def test_update_port_with_mac_address(self): self.assertRaises( @@ -430,9 +422,8 @@ class ProjectAdminTests(PortAPITestCase): def test_create_port(self): self.assertTrue( policy.enforce(self.context, 'create_port', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'create_port', self.alt_target)) def test_create_port_with_device_owner(self): target = self.target.copy() @@ -442,69 +433,373 @@ class ProjectAdminTests(PortAPITestCase): self.assertTrue( policy.enforce(self.context, 'create_port:device_owner', target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:device_owner', alt_target)) + + def test_create_port_with_mac_address(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:mac_address', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:mac_address', self.alt_target)) + + def test_create_port_with_fixed_ips(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:fixed_ips', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:fixed_ips', self.alt_target)) + + def test_create_port_with_fixed_ips_and_ip_address(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:fixed_ips:ip_address', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'create_port:fixed_ips:ip_address', self.alt_target)) + + def test_create_port_with_fixed_ips_and_subnet_id(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:fixed_ips:subent_id', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:fixed_ips:subent_id', self.alt_target)) + + def test_create_port_with_port_security_enabled(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:port_security_enabled', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'create_port:port_security_enabled', self.alt_target)) + + def test_create_port_with_binding_host_id(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:binding:host_id', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:binding:host_id', self.alt_target)) + + def test_create_port_with_binding_profile(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:binding:profile', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:binding:profile', self.alt_target)) + + def test_create_port_with_binding_vnic_type(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:binding:vnic_type', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:binding:vnic_type', self.alt_target)) + + def test_create_port_with_allowed_address_pairs(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:allowed_address_pairs', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'create_port:allowed_address_pairs', self.alt_target)) + + def test_create_port_with_allowed_address_pairs_and_mac_address(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:allowed_address_pairs:mac_address', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:allowed_address_pairs:mac_address', + self.alt_target)) + + def test_create_port_with_allowed_address_pairs_and_ip_address(self): + self.assertTrue( + policy.enforce(self.context, + 'create_port:allowed_address_pairs:ip_address', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_port:allowed_address_pairs:ip_address', + self.alt_target)) + + def test_get_port(self): + self.assertTrue( + policy.enforce(self.context, 'get_port', self.target)) + self.assertTrue( + policy.enforce(self.context, 'get_port', self.alt_target)) + + def test_get_port_binding_vif_type(self): + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:vif_type', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:vif_type', self.alt_target)) + + def test_get_port_binding_vif_details(self): + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:vif_details', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:vif_details', self.alt_target)) + + def test_get_port_binding_host_id(self): + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:host_id', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:host_id', self.alt_target)) + + def test_get_port_binding_profile(self): + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:profile', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_port:binding:profile', self.alt_target)) + + def test_get_port_resource_request(self): + self.assertTrue( + policy.enforce( + self.context, 'get_port:resource_request', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_port:resource_request', self.alt_target)) + + def test_update_port(self): + self.assertTrue( + policy.enforce(self.context, 'update_port', self.target)) + self.assertTrue( + policy.enforce(self.context, 'update_port', self.alt_target)) + + def test_update_port_with_device_owner(self): + target = self.target.copy() + target['device_owner'] = 'network:test' + alt_target = self.alt_target.copy() + alt_target['device_owner'] = 'network:test' + self.assertTrue( + policy.enforce(self.context, + 'update_port:device_owner', target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:device_owner', alt_target)) + + def test_update_port_with_mac_address(self): + self.assertTrue( + policy.enforce( + self.context, 'update_port:mac_address', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_port:mac_address', self.alt_target)) + + def test_update_port_with_fixed_ips(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:fixed_ips', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:fixed_ips', self.alt_target)) + + def test_update_port_with_fixed_ips_and_ip_address(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:fixed_ips:ip_address', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'update_port:fixed_ips:ip_address', self.alt_target)) + + def test_update_port_with_fixed_ips_and_subnet_id(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:fixed_ips:subent_id', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:fixed_ips:subent_id', self.alt_target)) + + def test_update_port_with_port_security_enabled(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:port_security_enabled', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'update_port:port_security_enabled', self.alt_target)) + + def test_update_port_with_binding_host_id(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:binding:host_id', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:binding:host_id', self.alt_target)) + + def test_update_port_with_binding_profile(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:binding:profile', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:binding:profile', self.alt_target)) + + def test_update_port_with_binding_vnic_type(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:binding:vnic_type', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:binding:vnic_type', self.alt_target)) + + def test_update_port_with_allowed_address_pairs(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:allowed_address_pairs', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'update_port:allowed_address_pairs', self.alt_target)) + + def test_update_port_with_allowed_address_pairs_and_mac_address(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:allowed_address_pairs:mac_address', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:allowed_address_pairs:mac_address', + self.alt_target)) + + def test_update_port_with_allowed_address_pairs_and_ip_address(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:allowed_address_pairs:ip_address', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:allowed_address_pairs:ip_address', + self.alt_target)) + + def test_update_port_data_plane_status(self): + self.assertTrue( + policy.enforce(self.context, + 'update_port:data_plane_status', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_port:data_plane_status', + self.alt_target)) + + def test_delete_port(self): + self.assertTrue( + policy.enforce(self.context, 'delete_port', self.target)) + self.assertTrue( + policy.enforce(self.context, 'delete_port', self.alt_target)) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + + def test_create_port(self): + self.assertTrue( + policy.enforce(self.context, 'create_port', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port', self.alt_target) + + def test_create_port_with_device_owner(self): + target = self.target.copy() + target['device_owner'] = 'network:test' + alt_target = self.alt_target.copy() + alt_target['device_owner'] = 'network:test' + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:device_owner', + target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:device_owner', alt_target) def test_create_port_with_mac_address(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:mac_address', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:mac_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:mac_address', self.alt_target) def test_create_port_with_fixed_ips(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:fixed_ips', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:fixed_ips', self.alt_target) def test_create_port_with_fixed_ips_and_ip_address(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:fixed_ips:ip_address', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips:ip_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:fixed_ips:ip_address', self.alt_target) def test_create_port_with_fixed_ips_and_subnet_id(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:fixed_ips:subent_id', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips:subnet_id', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:fixed_ips:subnet_id', self.alt_target) def test_create_port_with_port_security_enabled(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:port_security_enabled', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:port_security_enabled', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:port_security_enabled', self.alt_target) def test_create_port_with_binding_host_id(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:binding:host_id', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:binding:host_id', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:binding:host_id', self.alt_target) def test_create_port_with_binding_profile(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:binding:profile', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:binding:profile', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:binding:profile', @@ -520,9 +815,11 @@ class ProjectAdminTests(PortAPITestCase): self.alt_target) def test_create_port_with_allowed_address_pairs(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:allowed_address_pairs', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_port:allowed_address_pairs', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -530,10 +827,11 @@ class ProjectAdminTests(PortAPITestCase): self.alt_target) def test_create_port_with_allowed_address_pairs_and_mac_address(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:allowed_address_pairs:mac_address', - self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_port:allowed_address_pairs:mac_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -541,10 +839,11 @@ class ProjectAdminTests(PortAPITestCase): self.alt_target) def test_create_port_with_allowed_address_pairs_and_ip_address(self): - self.assertTrue( - policy.enforce(self.context, - 'create_port:allowed_address_pairs:ip_address', - self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_port:allowed_address_pairs:ip_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -559,45 +858,50 @@ class ProjectAdminTests(PortAPITestCase): policy.enforce, self.context, 'get_port', self.alt_target) def test_get_port_binding_vif_type(self): - self.assertTrue( - policy.enforce( - self.context, 'get_port:binding:vif_type', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_port:binding:vif_type', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'get_port:binding:vif_type', self.alt_target) def test_get_port_binding_vif_details(self): - self.assertTrue( - policy.enforce( - self.context, 'get_port:binding:vif_details', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_port:binding:vif_details', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'get_port:binding:vif_details', self.alt_target) def test_get_port_binding_host_id(self): - self.assertTrue( - policy.enforce( - self.context, 'get_port:binding:host_id', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_port:binding:host_id', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'get_port:binding:host_id', self.alt_target) def test_get_port_binding_profile(self): - self.assertTrue( - policy.enforce( - self.context, 'get_port:binding:profile', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_port:binding:profile', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'get_port:binding:profile', self.alt_target) def test_get_port_resource_request(self): - self.assertTrue( - policy.enforce( - self.context, 'get_port:resource_request', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_port:resource_request', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'get_port:resource_request', @@ -615,72 +919,80 @@ class ProjectAdminTests(PortAPITestCase): target['device_owner'] = 'network:test' alt_target = self.alt_target.copy() alt_target['device_owner'] = 'network:test' - self.assertTrue( - policy.enforce(self.context, - 'update_port:device_owner', target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:device_owner', + target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:device_owner', alt_target) def test_update_port_with_mac_address(self): - self.assertTrue( - policy.enforce( - self.context, 'update_port:mac_address', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:mac_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:mac_address', self.alt_target) def test_update_port_with_fixed_ips(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:fixed_ips', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:fixed_ips', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:fixed_ips', self.alt_target) def test_update_port_with_fixed_ips_and_ip_address(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:fixed_ips:ip_address', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:fixed_ips:ip_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:fixed_ips:ip_address', self.alt_target) def test_update_port_with_fixed_ips_and_subnet_id(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:fixed_ips:subent_id', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', self.alt_target) def test_update_port_with_port_security_enabled(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:port_security_enabled', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:port_security_enabled', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:port_security_enabled', self.alt_target) def test_update_port_with_binding_host_id(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:binding:host_id', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:binding:host_id', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:binding:host_id', self.alt_target) def test_update_port_with_binding_profile(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:binding:profile', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:binding:profile', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:binding:profile', @@ -695,316 +1007,6 @@ class ProjectAdminTests(PortAPITestCase): policy.enforce, self.context, 'update_port:binding:vnic_type', self.alt_target) - def test_update_port_with_allowed_address_pairs(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:allowed_address_pairs', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_port:allowed_address_pairs', - self.alt_target) - - def test_update_port_with_allowed_address_pairs_and_mac_address(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:allowed_address_pairs:mac_address', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_port:allowed_address_pairs:mac_address', - self.alt_target) - - def test_update_port_with_allowed_address_pairs_and_ip_address(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:allowed_address_pairs:ip_address', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_port:allowed_address_pairs:ip_address', - self.alt_target) - - def test_update_port_data_plane_status(self): - self.assertTrue( - policy.enforce(self.context, - 'update_port:data_plane_status', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_port:data_plane_status', self.alt_target) - - def test_delete_port(self): - self.assertTrue( - policy.enforce(self.context, 'delete_port', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'delete_port', self.alt_target) - - -class ProjectMemberTests(ProjectAdminTests): - - def setUp(self): - super(ProjectMemberTests, self).setUp() - self.context = self.project_member_ctx - - def test_create_port_with_device_owner(self): - target = self.target.copy() - target['device_owner'] = 'network:test' - alt_target = self.alt_target.copy() - alt_target['device_owner'] = 'network:test' - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:device_owner', - target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:device_owner', - alt_target) - - def test_create_port_with_mac_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:mac_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:mac_address', - self.alt_target) - - def test_create_port_with_fixed_ips(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips', - self.alt_target) - - def test_create_port_with_fixed_ips_and_ip_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips:ip_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips:ip_address', - self.alt_target) - - def test_create_port_with_fixed_ips_and_subnet_id(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips:subnet_id', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips:subnet_id', - self.alt_target) - - def test_create_port_with_port_security_enabled(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:port_security_enabled', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:port_security_enabled', - self.alt_target) - - def test_create_port_with_binding_host_id(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:binding:host_id', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:binding:host_id', - self.alt_target) - - def test_create_port_with_binding_profile(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:binding:profile', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:binding:profile', - self.alt_target) - - def test_create_port_with_allowed_address_pairs(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_port:allowed_address_pairs', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_port:allowed_address_pairs', - self.alt_target) - - def test_create_port_with_allowed_address_pairs_and_mac_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_port:allowed_address_pairs:mac_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_port:allowed_address_pairs:mac_address', - self.alt_target) - - def test_create_port_with_allowed_address_pairs_and_ip_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_port:allowed_address_pairs:ip_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_port:allowed_address_pairs:ip_address', - self.alt_target) - - def test_get_port_binding_vif_type(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:vif_type', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:vif_type', - self.alt_target) - - def test_get_port_binding_vif_details(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:vif_details', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:vif_details', - self.alt_target) - - def test_get_port_binding_host_id(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:host_id', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:host_id', - self.alt_target) - - def test_get_port_binding_profile(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:profile', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:binding:profile', - self.alt_target) - - def test_get_port_resource_request(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:resource_request', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_port:resource_request', - self.alt_target) - - def test_update_port_with_device_owner(self): - target = self.target.copy() - target['device_owner'] = 'network:test' - alt_target = self.alt_target.copy() - alt_target['device_owner'] = 'network:test' - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:device_owner', - target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:device_owner', - alt_target) - - def test_update_port_with_mac_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:mac_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:mac_address', - self.alt_target) - - def test_update_port_with_fixed_ips(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips', - self.alt_target) - - def test_update_port_with_fixed_ips_and_ip_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:ip_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:ip_address', - self.alt_target) - - def test_update_port_with_fixed_ips_and_subnet_id(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', - self.alt_target) - - def test_update_port_with_port_security_enabled(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:port_security_enabled', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:port_security_enabled', - self.alt_target) - - def test_update_port_with_binding_host_id(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:binding:host_id', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:binding:host_id', - self.alt_target) - - def test_update_port_with_binding_profile(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:binding:profile', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:binding:profile', - self.alt_target) - def test_update_port_with_allowed_address_pairs(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -1051,6 +1053,13 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'update_port:data_plane_status', self.alt_target) + def test_delete_port(self): + self.assertTrue( + policy.enforce(self.context, 'delete_port', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'delete_port', self.alt_target) + class ProjectReaderTests(ProjectMemberTests): diff --git a/neutron/tests/unit/conf/policies/test_qos.py b/neutron/tests/unit/conf/policies/test_qos.py index 6d633ef9d35..fa75158a019 100644 --- a/neutron/tests/unit/conf/policies/test_qos.py +++ b/neutron/tests/unit/conf/policies/test_qos.py @@ -92,30 +92,26 @@ class ProjectAdminQosPolicyTests(QosPolicyAPITestCase): def test_get_policy(self): self.assertTrue( policy.enforce(self.context, 'get_policy', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'get_policy', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'get_policy', self.alt_target)) def test_create_policy(self): self.assertTrue( policy.enforce(self.context, 'create_policy', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_policy', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'create_policy', self.alt_target)) def test_update_policy(self): self.assertTrue( policy.enforce(self.context, 'update_policy', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_policy', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'update_policy', self.alt_target)) def test_delete_policy(self): self.assertTrue( policy.enforce(self.context, 'delete_policy', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'delete_policy', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'delete_policy', self.alt_target)) class ProjectMemberQosPolicyTests(ProjectAdminQosPolicyTests): @@ -124,6 +120,13 @@ class ProjectMemberQosPolicyTests(ProjectAdminQosPolicyTests): super(ProjectMemberQosPolicyTests, self).setUp() self.context = self.project_member_ctx + def test_get_policy(self): + self.assertTrue( + policy.enforce(self.context, 'get_policy', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'get_policy', self.alt_target) + def test_create_policy(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -357,6 +360,84 @@ class ProjectAdminQosBandwidthLimitRuleTests(QosRulesAPITestCase): super(ProjectAdminQosBandwidthLimitRuleTests, self).setUp() self.context = self.project_admin_ctx + def test_get_policy_bandwidth_limit_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'get_policy_bandwidth_limit_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'get_alias_bandwidth_limit_rule', + self.alt_target)) + + def test_create_policy_bandwidth_limit_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'create_policy_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_policy_bandwidth_limit_rule', + self.alt_target)) + + def test_update_policy_bandwidth_limit_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'update_policy_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_policy_bandwidth_limit_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'update_alias_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_alias_bandwidth_limit_rule', + self.alt_target)) + + def test_delete_policy_bandwidth_limit_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_bandwidth_limit_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'delete_alias_bandwidth_limit_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'delete_alias_bandwidth_limit_rule', + self.alt_target)) + + +class ProjectMemberQosBandwidthLimitRuleTests( + ProjectAdminQosBandwidthLimitRuleTests): + + def setUp(self): + super(ProjectMemberQosBandwidthLimitRuleTests, self).setUp() + self.context = self.project_member_ctx + def test_get_policy_bandwidth_limit_rule(self): self.assertTrue( policy.enforce(self.context, @@ -379,69 +460,6 @@ class ProjectAdminQosBandwidthLimitRuleTests(QosRulesAPITestCase): self.context, 'get_alias_bandwidth_limit_rule', self.alt_target) - def test_create_policy_bandwidth_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'create_policy_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_policy_bandwidth_limit_rule', - self.alt_target) - - def test_update_policy_bandwidth_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'update_policy_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_policy_bandwidth_limit_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'update_alias_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_alias_bandwidth_limit_rule', - self.alt_target) - - def test_delete_policy_bandwidth_limit_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'delete_policy_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_policy_bandwidth_limit_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'delete_alias_bandwidth_limit_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_alias_bandwidth_limit_rule', - self.alt_target) - - -class ProjectMemberQosBandwidthLimitRuleTests( - ProjectAdminQosBandwidthLimitRuleTests): - - def setUp(self): - super(ProjectMemberQosBandwidthLimitRuleTests, self).setUp() - self.context = self.project_member_ctx - def test_create_policy_bandwidth_limit_rule(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -577,44 +595,40 @@ class ProjectAdminQosPacketRateLimitRuleTests(QosRulesAPITestCase): policy.enforce(self.context, 'get_policy_packet_rate_limit_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_policy_packet_rate_limit_rule', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'get_policy_packet_rate_limit_rule', + self.alt_target)) def test_create_policy_packet_rate_limit_rule(self): self.assertTrue( policy.enforce(self.context, 'create_policy_packet_rate_limit_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_policy_packet_rate_limit_rule', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'create_policy_packet_rate_limit_rule', + self.alt_target)) def test_update_policy_packet_rate_limit_rule(self): self.assertTrue( policy.enforce(self.context, 'update_policy_packet_rate_limit_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_policy_packet_rate_limit_rule', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'update_policy_packet_rate_limit_rule', + self.alt_target)) def test_delete_policy_packet_rate_limit_rule(self): self.assertTrue( policy.enforce(self.context, 'delete_policy_packet_rate_limit_rule', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_policy_packet_rate_limit_rule', - self.alt_target) + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_packet_rate_limit_rule', + self.alt_target)) class ProjectMemberQosPacketRateLimitRuleTests( @@ -624,6 +638,17 @@ class ProjectMemberQosPacketRateLimitRuleTests( super(ProjectMemberQosPacketRateLimitRuleTests, self).setUp() self.context = self.project_member_ctx + def test_get_policy_packet_rate_limit_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_packet_rate_limit_rule', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_policy_packet_rate_limit_rule', + self.alt_target) + def test_create_policy_packet_rate_limit_rule(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -780,6 +805,84 @@ class ProjectAdminQosDSCPMarkingRuleTests(QosRulesAPITestCase): super(ProjectAdminQosDSCPMarkingRuleTests, self).setUp() self.context = self.project_admin_ctx + def test_get_policy_dscp_marking_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'get_policy_dscp_marking_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'get_alias_dscp_marking_rule', + self.alt_target)) + + def test_create_policy_dscp_marking_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'create_policy_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_policy_dscp_marking_rule', + self.alt_target)) + + def test_update_policy_dscp_marking_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'update_policy_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_policy_dscp_marking_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'update_alias_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_alias_dscp_marking_rule', + self.alt_target)) + + def test_delete_policy_dscp_marking_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_dscp_marking_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'update_alias_dscp_marking_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_alias_dscp_marking_rule', + self.alt_target)) + + +class ProjectMemberQosDSCPMarkingRuleTests( + ProjectAdminQosDSCPMarkingRuleTests): + + def setUp(self): + super(ProjectMemberQosDSCPMarkingRuleTests, self).setUp() + self.context = self.project_member_ctx + def test_get_policy_dscp_marking_rule(self): self.assertTrue( policy.enforce(self.context, @@ -802,69 +905,6 @@ class ProjectAdminQosDSCPMarkingRuleTests(QosRulesAPITestCase): self.context, 'get_alias_dscp_marking_rule', self.alt_target) - def test_create_policy_dscp_marking_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'create_policy_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_policy_dscp_marking_rule', - self.alt_target) - - def test_update_policy_dscp_marking_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'update_policy_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_policy_dscp_marking_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'update_alias_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_alias_dscp_marking_rule', - self.alt_target) - - def test_delete_policy_dscp_marking_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'delete_policy_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_policy_dscp_marking_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'update_alias_dscp_marking_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_alias_dscp_marking_rule', - self.alt_target) - - -class ProjectMemberQosDSCPMarkingRuleTests( - ProjectAdminQosDSCPMarkingRuleTests): - - def setUp(self): - super(ProjectMemberQosDSCPMarkingRuleTests, self).setUp() - self.context = self.project_member_ctx - def test_create_policy_dscp_marking_rule(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -1047,6 +1087,84 @@ class ProjectAdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase): super(ProjectAdminQosMinimumBandwidthRuleTests, self).setUp() self.context = self.project_admin_ctx + def test_get_policy_minimum_bandwidth_rule(self): + self.assertTrue( + policy.enforce( + self.context, 'get_policy_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_policy_minimum_bandwidth_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce( + self.context, 'get_alias_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_alias_minimum_bandwidth_rule', + self.alt_target)) + + def test_create_policy_minimum_bandwidth_rule(self): + self.assertTrue( + policy.enforce( + self.context, 'create_policy_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_policy_minimum_bandwidth_rule', + self.alt_target)) + + def test_update_policy_minimum_bandwidth_rule(self): + self.assertTrue( + policy.enforce( + self.context, 'update_policy_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_policy_minimum_bandwidth_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce( + self.context, 'update_alias_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_alias_minimum_bandwidth_rule', + self.alt_target)) + + def test_delete_policy_minimum_bandwidth_rule(self): + self.assertTrue( + policy.enforce( + self.context, 'delete_policy_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_policy_minimum_bandwidth_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce( + self.context, 'delete_alias_minimum_bandwidth_rule', + self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_alias_minimum_bandwidth_rule', + self.alt_target)) + + +class ProjectMemberQosMinimumBandwidthRuleTests( + ProjectAdminQosMinimumBandwidthRuleTests): + + def setUp(self): + super(ProjectMemberQosMinimumBandwidthRuleTests, self).setUp() + self.context = self.project_member_ctx + def test_get_policy_minimum_bandwidth_rule(self): self.assertTrue( policy.enforce( @@ -1069,69 +1187,6 @@ class ProjectAdminQosMinimumBandwidthRuleTests(QosRulesAPITestCase): self.context, 'get_alias_minimum_bandwidth_rule', self.alt_target) - def test_create_policy_minimum_bandwidth_rule(self): - self.assertTrue( - policy.enforce( - self.context, 'create_policy_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_policy_minimum_bandwidth_rule', - self.alt_target) - - def test_update_policy_minimum_bandwidth_rule(self): - self.assertTrue( - policy.enforce( - self.context, 'update_policy_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_policy_minimum_bandwidth_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce( - self.context, 'update_alias_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_alias_minimum_bandwidth_rule', - self.alt_target) - - def test_delete_policy_minimum_bandwidth_rule(self): - self.assertTrue( - policy.enforce( - self.context, 'delete_policy_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_policy_minimum_bandwidth_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce( - self.context, 'delete_alias_minimum_bandwidth_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_alias_minimum_bandwidth_rule', - self.alt_target) - - -class ProjectMemberQosMinimumBandwidthRuleTests( - ProjectAdminQosMinimumBandwidthRuleTests): - - def setUp(self): - super(ProjectMemberQosMinimumBandwidthRuleTests, self).setUp() - self.context = self.project_member_ctx - def test_create_policy_minimum_bandwidth_rule(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -1221,12 +1276,12 @@ class SystemAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): # And the same for aliases self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'get_alias_minimum_packet_rate_rule', self.target) self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'get_alias_minimum_packet_rate_rule', self.alt_target) @@ -1257,12 +1312,12 @@ class SystemAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): # And the same for aliases self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'get_alias_minimum_packet_rate_rule', self.target) self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'get_alias_minimum_packet_rate_rule', self.alt_target) @@ -1281,12 +1336,12 @@ class SystemAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): # And the same for aliases self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'delete_alias_minimum_packet_rate_rule', self.target) self.assertRaises( - base_policy.PolicyNotAuthorized, + base_policy.InvalidScope, policy.enforce, self.context, 'delete_alias_minimum_packet_rate_rule', self.alt_target) @@ -1314,6 +1369,74 @@ class ProjectAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): super(ProjectAdminQosMinimumPacketRateRuleTests, self).setUp() self.context = self.project_admin_ctx + def test_get_policy_minimum_packet_rate_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'get_policy_minimum_packet_rate_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'get_policy_minimum_packet_rate_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'get_alias_minimum_packet_rate_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'get_alias_minimum_packet_rate_rule', + self.alt_target)) + + def test_create_policy_minimum_packet_rate_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'create_policy_minimum_packet_rate_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_policy_minimum_packet_rate_rule', + self.alt_target)) + + def test_update_policy_minimum_packet_rate_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'update_policy_minimum_packet_rate_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_policy_minimum_packet_rate_rule', + self.alt_target)) + + # And the same for aliases + self.assertTrue( + policy.enforce(self.context, + 'update_alias_minimum_packet_rate_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_alias_minimum_packet_rate_rule', + self.alt_target)) + + def test_delete_policy_minimum_packet_rate_rule(self): + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_minimum_packet_rate_rule', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'delete_policy_minimum_packet_rate_rule', + self.alt_target)) + + +class ProjectMemberQosMinimumPacketRateRuleTests( + ProjectAdminQosMinimumPacketRateRuleTests): + + def setUp(self): + super(ProjectMemberQosMinimumPacketRateRuleTests, self).setUp() + self.context = self.project_member_ctx + def test_get_policy_minimum_packet_rate_rule(self): self.assertTrue( policy.enforce(self.context, @@ -1336,58 +1459,6 @@ class ProjectAdminQosMinimumPacketRateRuleTests(QosRulesAPITestCase): self.context, 'get_alias_minimum_packet_rate_rule', self.alt_target) - def test_create_policy_minimum_packet_rate_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'create_policy_minimum_packet_rate_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_policy_minimum_packet_rate_rule', - self.alt_target) - - def test_update_policy_minimum_packet_rate_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'update_policy_minimum_packet_rate_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_policy_minimum_packet_rate_rule', - self.alt_target) - - # And the same for aliases - self.assertTrue( - policy.enforce(self.context, - 'update_alias_minimum_packet_rate_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_alias_minimum_packet_rate_rule', - self.alt_target) - - def test_delete_policy_minimum_packet_rate_rule(self): - self.assertTrue( - policy.enforce(self.context, - 'delete_policy_minimum_packet_rate_rule', - self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_policy_minimum_packet_rate_rule', - self.alt_target) - - -class ProjectMemberQosMinimumPacketRateRuleTests( - ProjectAdminQosMinimumPacketRateRuleTests): - - def setUp(self): - super(ProjectMemberQosMinimumPacketRateRuleTests, self).setUp() - self.context = self.project_member_ctx - def test_create_policy_minimum_packet_rate_rule(self): self.assertRaises( base_policy.PolicyNotAuthorized, diff --git a/neutron/tests/unit/conf/policies/test_rbac.py b/neutron/tests/unit/conf/policies/test_rbac.py index 45e7ae071c4..f37eb5b6bb5 100644 --- a/neutron/tests/unit/conf/policies/test_rbac.py +++ b/neutron/tests/unit/conf/policies/test_rbac.py @@ -124,6 +124,58 @@ class ProjectAdminTests(RbacAPITestCase): super(ProjectAdminTests, self).setUp() self.context = self.project_admin_ctx + def test_create_rbac_policy(self): + self.assertTrue( + policy.enforce(self.context, 'create_rbac_policy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_rbac_policy', self.alt_target)) + + def test_create_rbac_policy_target_tenant(self): + self.assertTrue( + policy.enforce( + self.context, 'create_rbac_policy:target_tenant', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'create_rbac_policy:alt_target_tenant', self.target)) + + def test_update_rbac_policy(self): + self.assertTrue( + policy.enforce(self.context, 'update_rbac_policy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_rbac_policy', self.alt_target)) + + def test_update_rbac_policy_target_tenant(self): + self.assertTrue( + policy.enforce( + self.context, 'update_rbac_policy:target_tenant', self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'update_rbac_policy:alt_target_tenant', self.target)) + + def test_get_rbac_policy(self): + self.assertTrue( + policy.enforce(self.context, 'get_rbac_policy', self.target)) + self.assertTrue( + policy.enforce(self.context, 'get_rbac_policy', self.alt_target)) + + def test_delete_rbac_policy(self): + self.assertTrue( + policy.enforce(self.context, 'delete_rbac_policy', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'delete_rbac_policy', self.alt_target)) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + def test_create_rbac_policy(self): self.assertTrue( policy.enforce(self.context, 'create_rbac_policy', self.target)) @@ -133,9 +185,11 @@ class ProjectAdminTests(RbacAPITestCase): self.context, 'create_rbac_policy', self.alt_target) def test_create_rbac_policy_target_tenant(self): - self.assertTrue( - policy.enforce( - self.context, 'create_rbac_policy:target_tenant', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_rbac_policy:target_tenant', + self.wildcard_target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -151,9 +205,11 @@ class ProjectAdminTests(RbacAPITestCase): self.context, 'update_rbac_policy', self.alt_target) def test_update_rbac_policy_target_tenant(self): - self.assertTrue( - policy.enforce( - self.context, 'update_rbac_policy:target_tenant', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_rbac_policy:target_tenant', + self.wildcard_target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -177,37 +233,6 @@ class ProjectAdminTests(RbacAPITestCase): self.context, 'delete_rbac_policy', self.alt_target) -class ProjectMemberTests(ProjectAdminTests): - - def setUp(self): - super(ProjectMemberTests, self).setUp() - self.context = self.project_member_ctx - - def test_create_rbac_policy_target_tenant(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_rbac_policy:target_tenant', - self.wildcard_target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_rbac_policy:target_tenant', - self.wildcard_alt_target) - - def test_update_rbac_policy_target_tenant(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_rbac_policy:target_tenant', - self.wildcard_target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_rbac_policy:target_tenant', - self.wildcard_alt_target) - - class ProjectReaderTests(ProjectMemberTests): def setUp(self): diff --git a/neutron/tests/unit/conf/policies/test_router.py b/neutron/tests/unit/conf/policies/test_router.py index d9a42070792..345f234cd37 100644 --- a/neutron/tests/unit/conf/policies/test_router.py +++ b/neutron/tests/unit/conf/policies/test_router.py @@ -275,6 +275,179 @@ class ProjectAdminTests(RouterAPITestCase): super(ProjectAdminTests, self).setUp() self.context = self.project_admin_ctx + def test_create_router(self): + self.assertTrue( + policy.enforce(self.context, 'create_router', self.target)) + self.assertTrue( + policy.enforce(self.context, 'create_router', self.alt_target)) + + def test_create_router_distributed(self): + self.assertTrue( + policy.enforce( + self.context, 'create_router:distributed', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_router:distributed', self.alt_target)) + + def test_create_router_ha(self): + self.assertTrue( + policy.enforce(self.context, 'create_router:ha', self.target)) + self.assertTrue( + policy.enforce(self.context, 'create_router:ha', self.alt_target)) + + def test_create_router_external_gateway_info(self): + self.assertTrue( + policy.enforce(self.context, + 'create_router:external_gateway_info', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_router:external_gateway_info', + self.alt_target)) + + def test_create_router_external_gateway_info_network_id(self): + self.assertTrue( + policy.enforce(self.context, + 'create_router:external_gateway_info:network_id', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_router:external_gateway_info:network_id', + self.alt_target)) + + def test_create_router_external_gateway_info_enable_snat(self): + self.assertTrue( + policy.enforce(self.context, + 'create_router:external_gateway_info:enable_snat', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'create_router:external_gateway_info:enable_snat', + self.alt_target)) + + def test_create_router_external_gateway_info_external_fixed_ips(self): + self.assertTrue( + policy.enforce( + self.context, + 'create_router:external_gateway_info:external_fixed_ips', + self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'create_router:external_gateway_info:external_fixed_ips', + self.alt_target)) + + def test_get_router(self): + self.assertTrue( + policy.enforce(self.context, 'get_router', self.target)) + self.assertTrue( + policy.enforce(self.context, 'get_router', self.alt_target)) + + def test_get_router_distributed(self): + self.assertTrue( + policy.enforce( + self.context, 'get_router:distributed', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'get_router:distributed', self.alt_target)) + + def test_get_router_ha(self): + self.assertTrue( + policy.enforce(self.context, 'get_router:ha', self.target)) + self.assertTrue( + policy.enforce(self.context, 'get_router:ha', self.alt_target)) + + def test_update_router(self): + self.assertTrue( + policy.enforce(self.context, 'update_router', self.target)) + self.assertTrue( + policy.enforce(self.context, 'update_router', self.alt_target)) + + def test_update_router_distributed(self): + self.assertTrue( + policy.enforce( + self.context, 'update_router:distributed', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_router:distributed', self.alt_target)) + + def test_update_router_ha(self): + self.assertTrue( + policy.enforce(self.context, 'update_router:ha', self.target)) + self.assertTrue( + policy.enforce(self.context, 'update_router:ha', self.alt_target)) + + def test_update_router_external_gateway_info(self): + self.assertTrue( + policy.enforce(self.context, + 'update_router:external_gateway_info', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_router:external_gateway_info', + self.alt_target)) + + def test_update_router_external_gateway_info_network_id(self): + self.assertTrue( + policy.enforce(self.context, + 'update_router:external_gateway_info:network_id', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_router:external_gateway_info:network_id', + self.alt_target)) + + def test_update_router_external_gateway_info_enable_snat(self): + self.assertTrue( + policy.enforce(self.context, + 'update_router:external_gateway_info:enable_snat', + self.target)) + self.assertTrue( + policy.enforce(self.context, + 'update_router:external_gateway_info:enable_snat', + self.alt_target)) + + def test_update_router_external_gateway_info_external_fixed_ips(self): + self.assertTrue( + policy.enforce( + self.context, + 'update_router:external_gateway_info:external_fixed_ips', + self.target)) + self.assertTrue( + policy.enforce( + self.context, + 'update_router:external_gateway_info:external_fixed_ips', + self.alt_target)) + + def test_delete_router(self): + self.assertTrue( + policy.enforce(self.context, 'delete_router', self.target)) + self.assertTrue( + policy.enforce(self.context, 'delete_router', self.alt_target)) + + def test_add_router_interface(self): + self.assertTrue( + policy.enforce(self.context, + 'add_router_interface', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'add_router_interface', self.alt_target)) + + def test_remove_router_interface(self): + self.assertTrue( + policy.enforce(self.context, + 'remove_router_interface', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'remove_router_interface', self.alt_target)) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + def test_create_router(self): self.assertTrue( policy.enforce(self.context, 'create_router', self.target)) @@ -284,17 +457,20 @@ class ProjectAdminTests(RouterAPITestCase): self.context, 'create_router', self.alt_target) def test_create_router_distributed(self): - self.assertTrue( - policy.enforce( - self.context, 'create_router:distributed', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_router:distributed', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_router:distributed', self.alt_target) def test_create_router_ha(self): - self.assertTrue( - policy.enforce(self.context, 'create_router:ha', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_router:ha', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -323,10 +499,11 @@ class ProjectAdminTests(RouterAPITestCase): self.alt_target) def test_create_router_external_gateway_info_enable_snat(self): - self.assertTrue( - policy.enforce(self.context, - 'create_router:external_gateway_info:enable_snat', - self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_router:external_gateway_info:enable_snat', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -334,11 +511,12 @@ class ProjectAdminTests(RouterAPITestCase): self.alt_target) def test_create_router_external_gateway_info_external_fixed_ips(self): - self.assertTrue( - policy.enforce( - self.context, - 'create_router:external_gateway_info:external_fixed_ips', - self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, + 'create_router:external_gateway_info:external_fixed_ips', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -355,17 +533,20 @@ class ProjectAdminTests(RouterAPITestCase): self.context, 'get_router', self.alt_target) def test_get_router_distributed(self): - self.assertTrue( - policy.enforce( - self.context, 'get_router:distributed', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_router:distributed', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'get_router:distributed', self.alt_target) def test_get_router_ha(self): - self.assertTrue( - policy.enforce(self.context, 'get_router:ha', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_router:ha', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -380,17 +561,20 @@ class ProjectAdminTests(RouterAPITestCase): self.context, 'update_router', self.alt_target) def test_update_router_distributed(self): - self.assertTrue( - policy.enforce( - self.context, 'update_router:distributed', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_router:distributed', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_router:distributed', self.alt_target) def test_update_router_ha(self): - self.assertTrue( - policy.enforce(self.context, 'update_router:ha', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_router:ha', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -419,10 +603,11 @@ class ProjectAdminTests(RouterAPITestCase): self.alt_target) def test_update_router_external_gateway_info_enable_snat(self): - self.assertTrue( - policy.enforce(self.context, - 'update_router:external_gateway_info:enable_snat', - self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_router:external_gateway_info:enable_snat', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -430,11 +615,12 @@ class ProjectAdminTests(RouterAPITestCase): self.alt_target) def test_update_router_external_gateway_info_external_fixed_ips(self): - self.assertTrue( - policy.enforce( - self.context, - 'update_router:external_gateway_info:external_fixed_ips', - self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, + 'update_router:external_gateway_info:external_fixed_ips', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -469,125 +655,6 @@ class ProjectAdminTests(RouterAPITestCase): self.context, 'remove_router_interface', self.alt_target) -class ProjectMemberTests(ProjectAdminTests): - - def setUp(self): - super(ProjectMemberTests, self).setUp() - self.context = self.project_member_ctx - - def test_create_router_distributed(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_router:distributed', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_router:distributed', self.alt_target) - - def test_create_router_ha(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_router:ha', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_router:ha', self.alt_target) - - def test_create_router_external_gateway_info_enable_snat(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_router:external_gateway_info:enable_snat', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_router:external_gateway_info:enable_snat', - self.alt_target) - - def test_create_router_external_gateway_info_external_fixed_ips(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, - 'create_router:external_gateway_info:external_fixed_ips', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, - 'create_router:external_gateway_info:external_fixed_ips', - self.alt_target) - - def test_get_router_distributed(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_router:distributed', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_router:distributed', self.alt_target) - - def test_get_router_ha(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_router:ha', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_router:ha', self.alt_target) - - def test_update_router_distributed(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_router:distributed', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_router:distributed', self.alt_target) - - def test_update_router_ha(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_router:ha', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_router:ha', self.alt_target) - - def test_update_router_external_gateway_info_enable_snat(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_router:external_gateway_info:enable_snat', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_router:external_gateway_info:enable_snat', - self.alt_target) - - def test_update_router_external_gateway_info_external_fixed_ips(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, - 'update_router:external_gateway_info:external_fixed_ips', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, - 'update_router:external_gateway_info:external_fixed_ips', - self.alt_target) - - class ProjectReaderTests(ProjectMemberTests): def setUp(self): @@ -758,6 +825,26 @@ class ProjectAdminExtrarouteTests(ExtrarouteAPITestCase): super(ProjectAdminExtrarouteTests, self).setUp() self.context = self.project_admin_ctx + def test_add_extraroute(self): + self.assertTrue( + policy.enforce(self.context, 'add_extraroutes', self.target)) + self.assertTrue( + policy.enforce(self.context, 'add_extraroutes', self.alt_target)) + + def test_remove_extraroute(self): + self.assertTrue( + policy.enforce(self.context, 'remove_extraroutes', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'remove_extraroutes', self.alt_target)) + + +class ProjectMemberExtrarouteTests(ProjectAdminExtrarouteTests): + + def setUp(self): + super(ProjectMemberExtrarouteTests, self).setUp() + self.context = self.project_member_ctx + def test_add_extraroute(self): self.assertTrue( policy.enforce(self.context, 'add_extraroutes', self.target)) @@ -775,13 +862,6 @@ class ProjectAdminExtrarouteTests(ExtrarouteAPITestCase): self.context, 'remove_extraroutes', self.alt_target) -class ProjectMemberExtrarouteTests(ProjectAdminExtrarouteTests): - - def setUp(self): - super(ProjectMemberExtrarouteTests, self).setUp() - self.context = self.project_member_ctx - - class ProjectReaderExtrarouteTests(ProjectMemberExtrarouteTests): def setUp(self): diff --git a/neutron/tests/unit/conf/policies/test_subnet.py b/neutron/tests/unit/conf/policies/test_subnet.py index 81b3bfc4b1c..adae4280c33 100644 --- a/neutron/tests/unit/conf/policies/test_subnet.py +++ b/neutron/tests/unit/conf/policies/test_subnet.py @@ -167,78 +167,65 @@ class ProjectAdminTests(SubnetAPITestCase): def test_create_subnet(self): self.assertTrue( policy.enforce(self.context, 'create_subnet', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnet', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'create_subnet', self.alt_target)) def test_create_subnet_segment_id(self): self.assertTrue( policy.enforce( self.context, 'create_subnet:segment_id', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnet:segment_id', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'create_subnet:segment_id', self.alt_target)) def test_create_subnet_service_types(self): self.assertTrue( policy.enforce( self.context, 'create_subnet:service_types', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnet:service_types', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'create_subnet:service_types', self.alt_target)) def test_get_subnet(self): self.assertTrue( policy.enforce(self.context, 'get_subnet', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_subnet', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'get_subnet', self.alt_target)) def test_get_subnet_segment_id(self): self.assertTrue( policy.enforce(self.context, 'get_subnet:segment_id', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'get_subnet:segment_id', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'get_subnet:segment_id', self.alt_target)) def test_update_subnet(self): self.assertTrue( policy.enforce(self.context, 'update_subnet', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_subnet', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'update_subnet', self.alt_target)) def test_update_subnet_segment_id(self): self.assertTrue( policy.enforce( self.context, 'update_subnet:segment_id', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_subnet:segment_id', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'update_subnet:segment_id', self.alt_target)) def test_update_subnet_service_types(self): self.assertTrue( policy.enforce( self.context, 'update_subnet:service_types', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_subnet:service_types', self.alt_target) + self.assertTrue( + policy.enforce( + self.context, 'update_subnet:service_types', self.alt_target)) def test_delete_subnet(self): self.assertTrue( policy.enforce(self.context, 'delete_subnet', self.target)) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'delete_subnet', self.alt_target) + self.assertTrue( + policy.enforce(self.context, 'delete_subnet', self.alt_target)) class ProjectMemberTests(ProjectAdminTests): @@ -247,6 +234,14 @@ class ProjectMemberTests(ProjectAdminTests): super(ProjectMemberTests, self).setUp() self.context = self.project_member_ctx + def test_create_subnet(self): + self.assertTrue( + policy.enforce(self.context, 'create_subnet', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_subnet', self.alt_target) + def test_create_subnet_segment_id(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -267,6 +262,14 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'create_subnet:service_types', self.alt_target) + def test_get_subnet(self): + self.assertTrue( + policy.enforce(self.context, 'get_subnet', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'get_subnet', self.alt_target) + def test_get_subnet_segment_id(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -277,6 +280,14 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'get_subnet:segment_id', self.alt_target) + def test_update_subnet(self): + self.assertTrue( + policy.enforce(self.context, 'update_subnet', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_subnet', self.alt_target) + def test_update_subnet_segment_id(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -297,6 +308,14 @@ class ProjectMemberTests(ProjectAdminTests): policy.enforce, self.context, 'update_subnet:service_types', self.alt_target) + def test_delete_subnet(self): + self.assertTrue( + policy.enforce(self.context, 'delete_subnet', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'delete_subnet', self.alt_target) + class ProjectReaderTests(ProjectMemberTests): diff --git a/neutron/tests/unit/conf/policies/test_subnetpool.py b/neutron/tests/unit/conf/policies/test_subnetpool.py index f6fc11ca07a..ecf3e28327e 100644 --- a/neutron/tests/unit/conf/policies/test_subnetpool.py +++ b/neutron/tests/unit/conf/policies/test_subnetpool.py @@ -154,6 +154,81 @@ class ProjectAdminTests(SubnetpoolAPITestCase): super(ProjectAdminTests, self).setUp() self.context = self.project_admin_ctx + def test_create_subnetpool(self): + self.assertTrue( + policy.enforce(self.context, 'create_subnetpool', self.target)) + self.assertTrue( + policy.enforce(self.context, 'create_subnetpool', self.alt_target)) + + def test_create_subnetpool_shared(self): + self.assertTrue( + policy.enforce( + self.context, 'create_subnetpool:shared', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_subnetpool:shared', self.alt_target)) + + def test_create_subnetpool_default(self): + self.assertTrue( + policy.enforce( + self.context, 'create_subnetpool:default', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'create_subnetpool:default', self.alt_target)) + + def test_get_subnetpool(self): + self.assertTrue( + policy.enforce(self.context, 'get_subnetpool', self.target)) + self.assertTrue( + policy.enforce(self.context, 'get_subnetpool', self.alt_target)) + + def test_update_subnetpool(self): + self.assertTrue( + policy.enforce(self.context, 'update_subnetpool', self.target)) + self.assertTrue( + policy.enforce(self.context, 'update_subnetpool', self.alt_target)) + + def test_update_subnetpool_default(self): + self.assertTrue( + policy.enforce( + self.context, 'update_subnetpool:default', self.target)) + self.assertTrue( + policy.enforce( + self.context, 'update_subnetpool:default', self.alt_target)) + + def test_delete_subnetpool(self): + self.assertTrue( + policy.enforce(self.context, 'delete_subnetpool', self.target)) + self.assertTrue( + policy.enforce(self.context, 'delete_subnetpool', self.alt_target)) + + def test_onboard_network_subnets(self): + self.assertTrue( + policy.enforce(self.context, + 'onboard_network_subnets', self.target)) + self.assertTrue( + policy.enforce(self.context, + 'onboard_network_subnets', self.alt_target)) + + def test_add_prefixes(self): + self.assertTrue( + policy.enforce(self.context, 'add_prefixes', self.target)) + self.assertTrue( + policy.enforce(self.context, 'add_prefixes', self.alt_target)) + + def test_remove_prefixes(self): + self.assertTrue( + policy.enforce(self.context, 'remove_prefixes', self.target)) + self.assertTrue( + policy.enforce(self.context, 'remove_prefixes', self.alt_target)) + + +class ProjectMemberTests(ProjectAdminTests): + + def setUp(self): + super(ProjectMemberTests, self).setUp() + self.context = self.project_member_ctx + def test_create_subnetpool(self): self.assertTrue( policy.enforce(self.context, 'create_subnetpool', self.target)) @@ -163,18 +238,20 @@ class ProjectAdminTests(SubnetpoolAPITestCase): self.context, 'create_subnetpool', self.alt_target) def test_create_subnetpool_shared(self): - self.assertTrue( - policy.enforce( - self.context, 'create_subnetpool:shared', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_subnetpool:shared', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_subnetpool:shared', self.alt_target) def test_create_subnetpool_default(self): - self.assertTrue( - policy.enforce( - self.context, 'create_subnetpool:default', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_subnetpool:is_default', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -197,9 +274,10 @@ class ProjectAdminTests(SubnetpoolAPITestCase): self.context, 'update_subnetpool', self.alt_target) def test_update_subnetpool_default(self): - self.assertTrue( - policy.enforce( - self.context, 'update_subnetpool:default', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_subnetpool:is_default', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, @@ -239,43 +317,6 @@ class ProjectAdminTests(SubnetpoolAPITestCase): self.context, 'remove_prefixes', self.alt_target) -class ProjectMemberTests(ProjectAdminTests): - - def setUp(self): - super(ProjectMemberTests, self).setUp() - self.context = self.project_member_ctx - - def test_create_subnetpool_shared(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnetpool:shared', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnetpool:shared', self.alt_target) - - def test_create_subnetpool_default(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnetpool:is_default', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'create_subnetpool:is_default', self.alt_target) - - def test_update_subnetpool_default(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_subnetpool:is_default', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, - self.context, 'update_subnetpool:is_default', self.alt_target) - - class ProjectReaderTests(ProjectMemberTests): def setUp(self):