diff --git a/neutron/tests/unit/conf/policies/test_port.py b/neutron/tests/unit/conf/policies/test_port.py index 827f689ccbf..49863d6298c 100644 --- a/neutron/tests/unit/conf/policies/test_port.py +++ b/neutron/tests/unit/conf/policies/test_port.py @@ -29,20 +29,33 @@ class PortAPITestCase(base.PolicyBaseTestCase): self.network = { 'id': uuidutils.generate_uuid(), + 'tenant_id': self.project_id, 'project_id': self.project_id} - self.target = { - 'project_id': self.project_id, + self.alt_network = { + 'id': uuidutils.generate_uuid(), 'tenant_id': self.alt_project_id, + 'project_id': self.alt_project_id} + self.target = { + 'tenant_id': self.project_id, + 'project_id': self.project_id, 'network_id': self.network['id'], 'ext_parent_network_id': self.network['id']} self.alt_target = { + 'tenant_id': self.project_id, 'project_id': self.alt_project_id, - 'tenant_id': self.alt_project_id, - 'network_id': self.network['id'], - 'ext_parent_network_id': self.network['id']} + 'network_id': self.alt_network['id'], + 'ext_parent_network_id': self.alt_network['id']} + + networks = { + self.network['id']: self.network, + self.alt_network['id']: self.alt_network, + } + + def get_network(context, id, fields=None): + return networks[id] self.plugin_mock = mock.Mock() - self.plugin_mock.get_network.return_value = self.network + self.plugin_mock.get_network.side_effect = get_network mock.patch( 'neutron_lib.plugins.directory.get_plugin', return_value=self.plugin_mock).start() @@ -814,10 +827,8 @@ class ProjectManagerTests(AdminTests): target['device_owner'] = 'network:test' alt_target = self.alt_target.copy() alt_target['device_owner'] = 'network:test' - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:device_owner', - target) + self.assertTrue( + policy.enforce(self.context, 'create_port:device_owner', target)) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'create_port:device_owner', @@ -1056,10 +1067,8 @@ class ProjectManagerTests(AdminTests): target['device_owner'] = 'network:test' alt_target = self.alt_target.copy() alt_target['device_owner'] = 'network:test' - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:device_owner', - target) + self.assertTrue( + policy.enforce(self.context, 'update_port:device_owner', target)) self.assertRaises( base_policy.PolicyNotAuthorized, policy.enforce, self.context, 'update_port:device_owner', @@ -1222,6 +1231,199 @@ class ProjectMemberTests(ProjectManagerTests): super().setUp() self.context = self.project_member_ctx + def test_create_port_with_device_owner(self): + target = self.target.copy() + target['device_owner'] = 'network:test' + alt_target = self.alt_target.copy() + alt_target['device_owner'] = 'network:test' + self.assertTrue( + policy.enforce(self.context, 'create_port:device_owner', target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:device_owner', + alt_target) + + def test_create_port_with_mac_address(self): + self.assertTrue( + policy.enforce(self.context, 'create_port:mac_address', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:mac_address', + self.alt_target) + + def test_create_port_with_fixed_ips(self): + self.assertTrue( + policy.enforce(self.context, 'create_port:fixed_ips', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips', + self.alt_target) + + def test_create_port_with_fixed_ips_and_ip_address(self): + self.assertTrue( + policy.enforce(self.context, 'create_port:fixed_ips:ip_address', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips:ip_address', + self.alt_target) + + def test_create_port_with_fixed_ips_and_subnet_id(self): + self.assertTrue( + policy.enforce(self.context, 'create_port:fixed_ips:subnet_id', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips:subnet_id', + self.alt_target) + + def test_create_port_with_port_security_enabled(self): + self.assertTrue( + policy.enforce(self.context, 'create_port:port_security_enabled', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:port_security_enabled', + self.alt_target) + + def test_create_port_with_allowed_address_pairs(self): + self.assertTrue( + policy.enforce( + self.context, 'create_port:allowed_address_pairs', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_port:allowed_address_pairs', + self.alt_target) + + def test_create_port_with_allowed_address_pairs_and_mac_address(self): + self.assertTrue( + policy.enforce( + self.context, 'create_port:allowed_address_pairs:mac_address', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_port:allowed_address_pairs:mac_address', + self.alt_target) + + def test_create_port_with_allowed_address_pairs_and_ip_address(self): + self.assertTrue( + policy.enforce( + self.context, 'create_port:allowed_address_pairs:ip_address', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'create_port:allowed_address_pairs:ip_address', + self.alt_target) + + def test_update_port_with_device_owner(self): + target = self.target.copy() + target['device_owner'] = 'network:test' + alt_target = self.alt_target.copy() + alt_target['device_owner'] = 'network:test' + self.assertTrue( + policy.enforce(self.context, 'update_port:device_owner', target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:device_owner', + alt_target) + + def test_update_port_with_mac_address(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:mac_address', + self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:mac_address', + self.alt_target) + + def test_update_port_with_fixed_ips(self): + self.assertTrue( + policy.enforce(self.context, 'update_port:fixed_ips', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:fixed_ips', + self.alt_target) + + def test_update_port_with_fixed_ips_and_ip_address(self): + self.assertTrue( + policy.enforce(self.context, 'update_port:fixed_ips', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:fixed_ips:ip_address', + self.alt_target) + + def test_update_port_with_fixed_ips_and_subnet_id(self): + self.assertTrue( + policy.enforce(self.context, 'update_port:fixed_ips', self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', + self.alt_target) + + def test_update_port_with_port_security_enabled(self): + self.assertTrue( + policy.enforce( + self.context, 'update_port:port_security_enabled', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port:port_security_enabled', + self.alt_target) + + def test_update_port_with_allowed_address_pairs(self): + self.assertTrue( + policy.enforce( + self.context, 'update_port:allowed_address_pairs', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_port:allowed_address_pairs', + self.alt_target) + + def test_update_port_with_allowed_address_pairs_and_mac_address(self): + self.assertTrue( + policy.enforce( + self.context, 'update_port:allowed_address_pairs:mac_address', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_port:allowed_address_pairs:mac_address', + self.alt_target) + + def test_update_port_with_allowed_address_pairs_and_ip_address(self): + self.assertTrue( + policy.enforce( + self.context, 'update_port:allowed_address_pairs:ip_address', + self.target)) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, + self.context, 'update_port:allowed_address_pairs:ip_address', + self.alt_target) + + +class ProjectReaderTests(ProjectMemberTests): + + def setUp(self): + super().setUp() + self.context = self.project_reader_ctx + + def test_create_port(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port', self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port', self.alt_target) + def test_create_port_with_device_owner(self): target = self.target.copy() target['device_owner'] = 'network:test' @@ -1256,16 +1458,6 @@ class ProjectMemberTests(ProjectManagerTests): policy.enforce, self.context, 'create_port:fixed_ips', self.alt_target) - def test_create_port_with_fixed_ips_and_ip_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips:ip_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:fixed_ips:ip_address', - self.alt_target) - def test_create_port_with_fixed_ips_and_subnet_id(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -1322,6 +1514,42 @@ class ProjectMemberTests(ProjectManagerTests): self.context, 'create_port:allowed_address_pairs:ip_address', self.alt_target) + def test_create_port_with_fixed_ips_and_ip_address(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips:ip_address', + self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:fixed_ips:ip_address', + self.alt_target) + + def test_create_port_with_binding_vnic_type(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:binding:vnic_type', + self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:binding:vnic_type', + self.alt_target) + + def test_create_port_tags(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:tags', self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'create_port:tags', self.alt_target) + + def test_update_port(self): + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port', self.target) + self.assertRaises( + base_policy.PolicyNotAuthorized, + policy.enforce, self.context, 'update_port', self.alt_target) + def test_update_port_with_device_owner(self): target = self.target.copy() target['device_owner'] = 'network:test' @@ -1336,46 +1564,6 @@ class ProjectMemberTests(ProjectManagerTests): policy.enforce, self.context, 'update_port:device_owner', alt_target) - def test_update_port_with_mac_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:mac_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:mac_address', - self.alt_target) - - def test_update_port_with_fixed_ips(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips', - self.alt_target) - - def test_update_port_with_fixed_ips_and_ip_address(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:ip_address', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:ip_address', - self.alt_target) - - def test_update_port_with_fixed_ips_and_subnet_id(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', - self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port:fixed_ips:subnet_id', - self.alt_target) - def test_update_port_with_port_security_enabled(self): self.assertRaises( base_policy.PolicyNotAuthorized, @@ -1422,46 +1610,41 @@ class ProjectMemberTests(ProjectManagerTests): self.context, 'update_port:allowed_address_pairs:ip_address', self.alt_target) - -class ProjectReaderTests(ProjectMemberTests): - - def setUp(self): - super().setUp() - self.context = self.project_reader_ctx - - def test_create_port(self): + def test_update_port_with_fixed_ips(self): self.assertRaises( base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port', self.target) - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port', self.alt_target) - - def test_create_port_with_binding_vnic_type(self): - self.assertRaises( - base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:binding:vnic_type', + policy.enforce, + self.context, 'update_port:fixed_ips', self.target) self.assertRaises( base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:binding:vnic_type', + policy.enforce, + self.context, 'update_port:fixed_ips', self.alt_target) - def test_create_port_tags(self): + def test_update_port_with_fixed_ips_and_ip_address(self): self.assertRaises( base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:tags', self.target) + policy.enforce, + self.context, 'update_port:fixed_ips:ip_address', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'create_port:tags', self.alt_target) + policy.enforce, + self.context, 'update_port:fixed_ips:ip_address', + self.alt_target) - def test_update_port(self): + def test_update_port_with_fixed_ips_and_subnet_id(self): self.assertRaises( base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port', self.target) + policy.enforce, + self.context, 'update_port:fixed_ips:subnet_id', + self.target) self.assertRaises( base_policy.PolicyNotAuthorized, - policy.enforce, self.context, 'update_port', self.alt_target) + policy.enforce, + self.context, 'update_port:fixed_ips:subnet_id', + self.alt_target) def test_update_port_with_binding_vnic_type(self): self.assertRaises(